// server/src/audio.rs #![forbid(unsafe_code)] use anyhow::Context; use futures_util::Stream; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; use gst::ElementFactory; use lesavka_common::lesavka::AudioPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, error, warn}; /// “Speaker” stream coming **from** the remote host (UAC2‑gadget playback /// endpoint) **towards** the client. pub struct AudioStream { _pipeline: gst::Pipeline, inner: ReceiverStream>, } impl Stream for AudioStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) } } impl Drop for AudioStream { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); } } /*───────────────────────────────────────────────────────────────────────────*/ /* eye_ear() – capture from ALSA (“speaker”) and push AAC AUs via gRPC */ /*───────────────────────────────────────────────────────────────────────────*/ pub async fn eye_ear(alsa_dev: &str, id: u32) -> anyhow::Result { // NB: one *logical* speaker → id==0. A 2nd logical stream could be // added later (for multi‑channel) without changing the client. gst::init().context("gst init")?; /*──────────── pipeline description ──────────── * * ALSA (UAC2 gadget) AAC+ADTS AppSink * ┌───────────┐ raw 48 kHz ┌─────────┐ AU/ADTS ┌──────────┐ * │ alsasrc │────────────► voaacenc │────────► appsink │ * └───────────┘ └─────────┘ └──────────┘ */ let desc = build_pipeline_desc(alsa_dev)?; let pipeline: gst::Pipeline = gst::parse::launch(&desc)? .downcast() .expect("pipeline"); let sink: gst_app::AppSink = pipeline .by_name("asink") .expect("asink") .downcast() .expect("appsink"); let (tx, rx) = tokio::sync::mpsc::channel(8192); let bus = pipeline.bus().expect("no bus"); std::thread::spawn(move || { for msg in bus.iter_timed(gst::ClockTime::NONE) { use gst::MessageView::*; match msg.view() { Error(e) => error!("💥 audio pipeline: {} ({})", e.error(), e.debug().unwrap_or_default()), Warning(w) => warn!("⚠️ audio pipeline: {} ({})", w.error(), w.debug().unwrap_or_default()), StateChanged(s) if s.current() == gst::State::Playing => debug!("🎶 audio pipeline PLAYING"), _ => {} } } }); /*──────────── callbacks ────────────*/ sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |s| { let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 300 == 0 { debug!("🔊 eye‑ear #{n}: {} bytes", map.len()); } let pts_us = buffer .pts() .unwrap_or(gst::ClockTime::ZERO) .nseconds() / 1_000; // push non‑blocking; drop oldest on overflow if tx.try_send(Ok(AudioPacket { id, pts: pts_us, data: map.as_slice().to_vec(), })).is_err() { static DROPS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if d % 300 == 0 { warn!("🔊💔 dropped {d} audio AUs (client too slow)"); } } Ok(gst::FlowSuccess::Ok) }) .build(), ); pipeline.set_state(gst::State::Playing) .context("starting audio pipeline")?; Ok(AudioStream { _pipeline: pipeline, inner: ReceiverStream::new(rx), }) } fn build_pipeline_desc(dev: &str) -> anyhow::Result { use gst::ElementFactory; // <- simpler probe let enc = ["voaacenc", "avenc_aac", "fdkaacenc"] .into_iter() .find(|&e| ElementFactory::find(e).is_some()) .ok_or_else(|| anyhow::anyhow!("no AAC encoder plugin available"))?; Ok(format!( // ➊ provide-clock=false lets the USB gadget be master // ➋ audioconvert+audioresample make sure caps match encoder "alsasrc device=\"{dev}\" provide-clock=false do-timestamp=true ! \ audioconvert ! audioresample ! audio/x-raw,channels=2,rate=48000,format=F32LE ! \ {enc} bitrate=192000 ! aacparse ! queue ! \ appsink name=asink emit-signals=true max-buffers=64 drop=true" )) }