// server/src/audio.rs #![forbid(unsafe_code)] use anyhow::{anyhow, Context}; use chrono::Local; use futures_util::Stream; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; use gst::ElementFactory; use gst::MessageView::*; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, error, warn}; use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; use std::sync::{Arc, Mutex}; use lesavka_common::lesavka::AudioPacket; /// “Speaker” stream coming **from** the remote host (UAC2‑gadget playback /// endpoint) **towards** the client. pub struct AudioStream { _pipeline: gst::Pipeline, inner: ReceiverStream>, } impl Stream for AudioStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) } } impl Drop for AudioStream { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); } } /*───────────────────────────────────────────────────────────────────────────*/ /* ear() - capture from ALSA (“speaker”) and push AAC AUs via gRPC */ /*───────────────────────────────────────────────────────────────────────────*/ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { // NB: one *logical* speaker → id==0. A 2nd logical stream could be // added later (for multi‑channel) without changing the client. gst::init().context("gst init")?; /*──────────── pipeline description ──────────── * * ALSA (UAC2 gadget) AAC+ADTS AppSink * ┌───────────┐ raw 48 kHz ┌─────────┐ AU/ADTS ┌──────────┐ * │ alsasrc │────────────► voaacenc │────────► appsink │ * └───────────┘ └─────────┘ └──────────┘ */ let desc = build_pipeline_desc(alsa_dev)?; let pipeline: gst::Pipeline = gst::parse::launch(&desc)? .downcast() .expect("pipeline"); let sink: gst_app::AppSink = pipeline .by_name("asink") .expect("asink") .downcast() .expect("appsink"); let tap = Arc::new(Mutex::new(ClipTap::new("🎧 - ear", Duration::from_secs(60)))); // sink.connect("underrun", false, |_| { // tracing::warn!("⚠️ USB playback underrun – host muted or not reading"); // None // }); let (tx, rx) = tokio::sync::mpsc::channel(8192); let bus = pipeline.bus().expect("bus"); std::thread::spawn(move || { for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { Error(e) => error!("💥 audio pipeline: {} ({})", e.error(), e.debug().unwrap_or_default()), Warning(w) => warn!("⚠️ audio pipeline: {} ({})", w.error(), w.debug().unwrap_or_default()), StateChanged(s) if s.current() == gst::State::Playing => debug!("🎶 audio pipeline PLAYING"), _ => {} } } }); /*──────────── callbacks ────────────*/ sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample({ let tap = tap.clone(); move |s| { let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; // -------- clip‑tap (minute dumps) ------------ tap.lock().unwrap().feed(map.as_slice()); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 300 == 0 { debug!("🎧 ear #{n}: {} bytes", map.len()); } let pts_us = buffer .pts() .unwrap_or(gst::ClockTime::ZERO) .nseconds() / 1_000; // push non‑blocking; drop oldest on overflow if tx.try_send(Ok(AudioPacket { id, pts: pts_us, data: map.as_slice().to_vec(), })).is_err() { static DROPS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if d % 300 == 0 { warn!("🎧💔 dropped {d} audio AUs (client too slow)"); } } Ok(gst::FlowSuccess::Ok) } }).build(), ); pipeline.set_state(gst::State::Playing) .context("starting audio pipeline")?; Ok(AudioStream { _pipeline: pipeline, inner: ReceiverStream::new(rx), }) } /*────────────────────────── build_pipeline_desc ───────────────────────────*/ fn build_pipeline_desc(dev: &str) -> anyhow::Result { let reg = gst::Registry::get(); // first available encoder let enc = ["fdkaacenc", "voaacenc", "avenc_aac"] .into_iter() .find(|&e| { reg.find_plugin(e).is_some() || reg .find_feature(e, ElementFactory::static_type()) .is_some() }) .ok_or_else(|| anyhow!("no AAC encoder plugin available"))?; Ok(format!( concat!( "alsasrc device=\"{dev}\" do-timestamp=true ! ", "audio/x-raw,format=S16LE,channels=2,rate=48000 ! ", "audioconvert ! audioresample ! {enc} bitrate=192000 ! ", "aacparse ! ", "capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! ", "tee name=t ", "t. ! queue ! appsink name=asink emit-signals=true ", "t. ! queue ! appsink name=debugtap emit-signals=true max-buffers=500 drop=true" ), dev = dev, enc = enc )) } // ────────────────────── minute‑clip helper ─────────────────────────────── pub struct ClipTap { buf: Vec, tag: &'static str, next_dump: Instant, period: Duration, } impl ClipTap { pub fn new(tag: &'static str, period: Duration) -> Self { Self { buf: Vec::with_capacity(260_000), tag, next_dump: Instant::now() + period, period, } } pub fn feed(&mut self, bytes: &[u8]) { self.buf.extend_from_slice(bytes); if self.buf.len() > 256_000 { self.buf.drain(..self.buf.len() - 256_000); } if Instant::now() >= self.next_dump { self.flush(); self.next_dump += self.period; } } pub fn flush(&mut self) { if self.buf.is_empty() { return; } let ts = chrono::Local::now().format("%Y%m%d-%H%M%S"); let path = format!("/tmp/{}-{}.aac", self.tag, ts); if std::fs::write(&path, &self.buf).is_ok() { tracing::debug!("📼 wrote {} clip → {}", self.tag, path); } self.buf.clear(); } } impl Drop for ClipTap { fn drop(&mut self) { self.flush() } } // ────────────────────── microphone sink ──────────────────────────────── pub struct Voice { appsrc: gst_app::AppSrc, _pipe: gst::Pipeline, // keep pipeline alive tap: ClipTap, } impl Voice { pub async fn new(alsa_dev: &str) -> anyhow::Result { use gst::prelude::*; gst::init().context("gst init")?; // pipeline let pipeline = gst::Pipeline::new(); // elements let appsrc = gst::ElementFactory::make("appsrc") .build() .context("make appsrc")? .downcast::() .unwrap(); // dedicated AppSrc helpers exist and avoid the needless `?` appsrc.set_format(gst::Format::Time); appsrc.set_is_live(true); let decodebin = gst::ElementFactory::make("decodebin") .build() .context("make decodebin")?; let alsa_sink = gst::ElementFactory::make("alsasink") .build() .context("make alsasink")?; alsa_sink.set_property("device", &alsa_dev); pipeline.add_many(&[appsrc.upcast_ref(), &decodebin, &alsa_sink])?; appsrc.link(&decodebin)?; /*------------ decodebin autolink ----------------*/ let sink_clone = alsa_sink.clone(); // keep original for later decodebin.connect_pad_added(move |_db, pad| { let sink_pad = sink_clone.static_pad("sink").unwrap(); if !sink_pad.is_linked() { let _ = pad.link(&sink_pad); } }); // underrun ≠ error – just show a warning // let _id = alsa_sink.connect("underrun", false, |_| { // tracing::warn!("⚠️ USB playback underrun – host muted/not reading"); // None // }); pipeline.set_state(gst::State::Playing)?; Ok(Self { appsrc, _pipe: pipeline, tap: ClipTap::new("voice", Duration::from_secs(60)), }) } pub fn push(&mut self, pkt: &AudioPacket) { use gst::prelude::*; self.tap.feed(&pkt.data); let mut buf = gst::Buffer::from_slice(pkt.data.clone()); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); if let Err(e) = self.appsrc.push_buffer(buf) { tracing::warn!("🎤 AppSrc push failed: {e:?}"); } } pub fn finish(&mut self) { self.tap.flush(); let _ = self.appsrc.end_of_stream(); } }