diff --git a/server/src/audio.rs b/server/src/audio.rs index 741e117..4354ac6 100644 --- a/server/src/audio.rs +++ b/server/src/audio.rs @@ -7,13 +7,15 @@ use futures_util::Stream; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; -use gst::{ElementFactory, MessageView}; +use gst::ElementFactory; use gst::MessageView::*; -use lesavka_common::lesavka::AudioPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, error, warn}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; +use std::sync::{Arc, Mutex}; + +use lesavka_common::lesavka::AudioPacket; /// “Speaker” stream coming **from** the remote host (UAC2‑gadget playback /// endpoint) **towards** the client. @@ -54,7 +56,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { * │ alsasrc │────────────► voaacenc │────────► appsink │ * └───────────┘ └─────────┘ └──────────┘ */ - let desc = build_pipeline_desc(alsa_dev)?; + let desc = build_pipeline_desc(alsa_dev)?; let pipeline: gst::Pipeline = gst::parse::launch(&desc)? .downcast() @@ -65,12 +67,12 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { .expect("asink") .downcast() .expect("appsink"); - if let Some(tap) = pipeline - .by_name("debugtap") - .and_then(|e| e.downcast::().ok()) - { - clip_tap(tap); - } + + let tap = Arc::new(Mutex::new(ClipTap::new("🎧 - ear", Duration::from_secs(60)))); + sink.connect("underrun", false, |_| { + tracing::warn!("⚠️ USB playback underrun – host muted or not reading"); + None + }); let (tx, rx) = tokio::sync::mpsc::channel(8192); @@ -92,11 +94,16 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { /*──────────── callbacks ────────────*/ sink.set_callbacks( gst_app::AppSinkCallbacks::builder() - .new_sample(move |s| { + .new_sample({ + let tap = tap.clone(); + move |s| { let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + // -------- clip‑tap (minute dumps) ------------ + tap.lock().unwrap().feed(map.as_slice()); + static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -123,8 +130,8 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { } } Ok(gst::FlowSuccess::Ok) - }) - .build(), + } + }).build(), ); pipeline.set_state(gst::State::Playing) @@ -136,30 +143,6 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { }) } -pub async fn voice( - alsa_dev: &str, -) -> anyhow::Result<(gst::Pipeline, gst_app::AppSrc)> { - gst::init()?; - - let desc = format!( - "appsrc name=src is-live=true format=time do-timestamp=true ! \ - aacparse ! avdec_aac ! audioconvert ! audioresample ! \ - alsasink device=\"{alsa_dev}\"" - ); - let pipeline: gst::Pipeline = gst::parse::launch(&desc)? - .downcast() - .unwrap(); - - let src: gst_app::AppSrc = pipeline - .by_name("src") - .unwrap() - .downcast() - .unwrap(); - - pipeline.set_state(gst::State::Playing)?; - Ok((pipeline, src)) -} - /*────────────────────────── build_pipeline_desc ───────────────────────────*/ fn build_pipeline_desc(dev: &str) -> anyhow::Result { let reg = gst::Registry::get(); @@ -191,58 +174,130 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result { )) } -/*────────────────────────────── clip_tap() ────────────────────────────────*/ -/// Called once per pipeline; spawns a thread that writes a 1 s AAC file -/// at the start of every wall‑clock minute **while log‑level == TRACE**. -fn clip_tap(tap: gst_app::AppSink) { - use gst::prelude::*; +// ────────────────────── minute‑clip helper ─────────────────────────────── +pub struct ClipTap { + buf: Vec, + tag: &'static str, + next_dump: Instant, + period: Duration, +} - std::thread::spawn(move || { - use std::fs::File; - use std::io::Write; - - let mut collecting = Vec::with_capacity(200_000); // ~1 s - let mut next_min_boundary = next_minute(); - - loop { - match tap.pull_sample() { - Ok(s) => { - let buf = s.buffer().unwrap(); - let map = buf.map_readable().unwrap(); - collecting.extend_from_slice(map.as_slice()); - - // once per minute boundary & trace‑level - if tracing::enabled!(tracing::Level::TRACE) && - SystemTime::now() >= next_min_boundary { - if !collecting.is_empty() { - let ts = chrono::Local::now() - .format("%Y%m%d-%H%M%S") - .to_string(); - let path = format!("/tmp/ear-{ts}.aac"); - if let Ok(mut f) = File::create(&path) { - let _ = f.write_all(&collecting); - tracing::debug!("📼 wrote 1 s clip → {}", path); - } - } - collecting.clear(); - next_min_boundary = next_minute(); - } - - if collecting.len() > 192_000 { // keep at most ~1 s - collecting.truncate(192_000); - } - } - Err(_) => break, // EOS - } +impl ClipTap { + pub fn new(tag: &'static str, period: Duration) -> Self { + Self { + buf: Vec::with_capacity(260_000), + tag, + next_dump: Instant::now() + period, + period, } - }); - - fn next_minute() -> SystemTime { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap(); - let secs = now.as_secs(); - let next = (secs / 60 + 1) * 60; - UNIX_EPOCH + Duration::from_secs(next) } -} \ No newline at end of file + pub fn feed(&mut self, bytes: &[u8]) { + self.buf.extend_from_slice(bytes); + if self.buf.len() > 256_000 { + self.buf.drain(..self.buf.len() - 256_000); + } + if Instant::now() >= self.next_dump { + self.flush(); + self.next_dump += self.period; + } + } + pub fn flush(&mut self) { + if self.buf.is_empty() { + return; + } + let ts = chrono::Local::now().format("%Y%m%d-%H%M%S"); + let path = format!("/tmp/{}-{}.aac", self.tag, ts); + if std::fs::write(&path, &self.buf).is_ok() { + tracing::debug!("📼 wrote {} clip → {}", self.tag, path); + } + self.buf.clear(); + } +} +impl Drop for ClipTap { + fn drop(&mut self) { + self.flush() + } +} + +// ────────────────────── microphone sink ──────────────────────────────── +pub struct Voice { + appsrc: gst_app::AppSrc, + _pipe: gst::Pipeline, // keep pipeline alive + tap: ClipTap, +} + +impl Voice { + pub async fn new(alsa_dev: &str) -> anyhow::Result { + use gst::prelude::*; + + gst::init().context("gst init")?; + + // pipeline + let pipeline = gst::Pipeline::new(); + + // elements + let appsrc = gst::ElementFactory::make("appsrc") + .build() + .context("make appsrc")? + .downcast::() + .unwrap(); + + // dedicated AppSrc helpers exist and avoid the needless `?` + appsrc.set_format(gst::Format::Time); + appsrc.set_is_live(true); + + let decodebin = gst::ElementFactory::make("decodebin") + .build() + .context("make decodebin")?; + let alsa_sink = gst::ElementFactory::make("alsasink") + .build() + .context("make alsasink")?; + + alsa_sink.set_property("device", &alsa_dev); + + pipeline.add_many(&[appsrc.upcast_ref(), &decodebin, &alsa_sink])?; + appsrc.link(&decodebin)?; + + /*------------ decodebin autolink ----------------*/ + let sink_clone = alsa_sink.clone(); // keep original for later + decodebin.connect_pad_added(move |_db, pad| { + let sink_pad = sink_clone.static_pad("sink").unwrap(); + if !sink_pad.is_linked() { + let _ = pad.link(&sink_pad); + } + }); + + // underrun ≠ error – just show a warning + let _id = alsa_sink.connect("underrun", false, |_| { + tracing::warn!("⚠️ USB playback underrun – host muted/not reading"); + None + }); + + pipeline.set_state(gst::State::Playing)?; + + Ok(Self { + appsrc, + _pipe: pipeline, + tap: ClipTap::new("voice", Duration::from_secs(60)), + }) + } + + pub fn push(&mut self, pkt: &AudioPacket) { + use gst::prelude::*; + + self.tap.feed(&pkt.data); + + let mut buf = gst::Buffer::from_slice(pkt.data.clone()); + buf.get_mut() + .unwrap() + .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + + if let Err(e) = self.appsrc.push_buffer(buf) { + tracing::warn!("🎤 AppSrc push failed: {e:?}"); + } + } + pub fn finish(&mut self) { + self.tap.flush(); + let _ = self.appsrc.end_of_stream(); + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 5e6567f..1104715 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -180,60 +180,32 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { - - // Build playback pipeline (AppSrc → alsasink) - let (_pipeline, src) = audio::voice("hw:UAC2Gadget,0") - .await - .map_err(|e| Status::internal(format!("{e:#}")))?; - - // channel just to satisfy the “stream Empty” return type - let (tx, rx) = tokio::sync::mpsc::channel(1); - - // -------- 1 clip‑tap variables ---------------------------- - use std::time::{SystemTime, UNIX_EPOCH, Duration}; - let mut capturing = Vec::with_capacity(200_000); // ~1 s @128 kbit - let mut next_min_boundary = next_minute(); - // -------- 2 forward packets + collect for clip‑tap -------- + // 1 ─ build once, early + let mut sink = audio::Voice::new("hw:UAC2Gadget,0").await + .map_err(|e| Status::internal(format!("{e:#}")))?; + + // 2 ─ dummy outbound stream (same trick as before) + let (tx, rx) = tokio::sync::mpsc::channel(1); + + // 3 ─ drive the sink in a background task tokio::spawn(async move { let mut inbound = req.into_inner(); + static CNT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + while let Some(pkt) = inbound.next().await.transpose()? { - /* ---- clip‑tap: accumulate raw AAC ----- */ - capturing.extend_from_slice(&pkt.data); - if capturing.len() > 192_000 { // keep at most ~1 s - capturing.truncate(192_000); - } - if tracing::enabled!(tracing::Level::TRACE) - && SystemTime::now() >= next_min_boundary { - if !capturing.is_empty() { - let ts = chrono::Local::now() - .format("%Y%m%d-%H%M%S").to_string(); - let path = format!("/tmp/mic-{ts}.aac"); - std::fs::write(&path, &capturing).ok(); - tracing::debug!("📼 wrote mic clip → {}", path); - } - capturing.clear(); - next_min_boundary = next_minute(); - } - static CNT: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 300 == 0 { - trace!("🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); - } - - let mut buf = gst::Buffer::from_slice(pkt.data); - buf.get_mut().unwrap() - .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); - if let Err(e) = src.push_buffer(buf) { - warn!("🎤 AppSrc push failed: {e:?}"); + tracing::trace!("🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); } + sink.push(&pkt); } - // optional: send a single Empty to show EOS + sink.finish(); // flush on EOS let _ = tx.send(Ok(Empty {})).await; - Result::<(), Status>::Ok(()) + Ok::<(), Status>(()) }); - + Ok(Response::new(ReceiverStream::new(rx))) }