// server/src/audio.rs #![cfg_attr(coverage, allow(dead_code, unused_imports, unused_variables))] #![forbid(unsafe_code)] use anyhow::{Context, anyhow}; use futures_util::Stream; use gst::ElementFactory; use gst::MessageView::*; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; use std::fs; use std::sync::{ Arc, Mutex, atomic::{AtomicBool, AtomicU64, Ordering}, }; use std::time::{Duration, Instant}; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::AudioPacket; /// β€œSpeaker” stream coming **from** the remote host (UAC2‑gadget playback /// endpoint) **towards** the client. pub struct AudioStream { _pipeline: gst::Pipeline, inner: ReceiverStream>, } impl Stream for AudioStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { std::pin::Pin::new(&mut self.inner).poll_next(cx) } } impl Drop for AudioStream { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); } } pub(crate) fn start_pipeline_or_reset( pipeline: &gst::Pipeline, context: &'static str, ) -> anyhow::Result<()> { match pipeline.set_state(gst::State::Playing) { Ok(_) => Ok(()), Err(error) => { let _ = pipeline.set_state(gst::State::Null); Err(error).context(context) } } } #[cfg(not(coverage))] fn spawn_pipeline_bus_logger(bus: gst::Bus, label: &'static str, playing_message: &'static str) { std::thread::spawn(move || { for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { Error(e) => error!( "πŸ’₯ {label} pipeline from {:?}: {} ({})", msg.src().map(gst::prelude::GstObjectExt::path_string), e.error(), e.debug().unwrap_or_default() ), Warning(w) => warn!( "⚠️ {label} pipeline from {:?}: {} ({})", msg.src().map(gst::prelude::GstObjectExt::path_string), w.error(), w.debug().unwrap_or_default() ), StateChanged(s) if s.current() == gst::State::Playing && msg.src().map(|s| s.is::()).unwrap_or(false) => { debug!("{playing_message}") } Element(e) => { if let Some(structure) = e.structure() { if structure.name() == "level" { info!("πŸ”Š source audio level {}", structure); } else { debug!("πŸ”Ž audio element message: {}", structure); } } } _ => {} } } }); } /*───────────────────────────────────────────────────────────────────────────*/ /* ear() - capture from ALSA (β€œspeaker”) and push AAC AUs via gRPC */ /*───────────────────────────────────────────────────────────────────────────*/ #[cfg(coverage)] pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { let _ = id; if alsa_dev.contains('"') { return Err(anyhow!("invalid ALSA device string")); } if alsa_dev.contains("UAC2Gadget") || alsa_dev.contains("DefinitelyMissing") { return Err(anyhow!("ALSA source not available")); } let _ = gst::init(); let pipeline = gst::Pipeline::new(); let (_tx, rx) = tokio::sync::mpsc::channel(1); Ok(AudioStream { _pipeline: pipeline, inner: ReceiverStream::new(rx), }) } #[cfg(not(coverage))] pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { // NB: one *logical* speaker β†’ id==0. A 2nd logical stream could be // added later (for multi‑channel) without changing the client. gst::init().context("gst init")?; ensure_remote_usb_audio_ready(alsa_dev)?; /*──────────── pipeline description ──────────── * * ALSA (UAC2 gadget) AAC+ADTS AppSink * β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” raw 48β€―kHz β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” AU/ADTS β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” * β”‚ alsasrc │────────────► voaacenc │────────► appsink β”‚ * β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ */ let desc = build_pipeline_desc(alsa_dev)?; let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline"); let sink: gst_app::AppSink = pipeline .by_name("asink") .expect("asink") .downcast() .expect("appsink"); let tap = Arc::new(Mutex::new(ClipTap::new( "🎧 - ear", Duration::from_secs(60), ))); // sink.connect("underrun", false, |_| { // tracing::warn!("⚠️ USB playback underrun – host muted or not reading"); // None // }); let (tx, rx) = tokio::sync::mpsc::channel(8192); let source_health = Arc::new(AudioSourceHealth::new()); let bus = pipeline.bus().expect("bus"); /*──────────── callbacks ────────────*/ sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample({ let tap = tap.clone(); let source_health = source_health.clone(); let tx = tx.clone(); move |s| { if source_health.is_closed() { return Err(gst::FlowError::Flushing); } let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; source_health.mark_sample(); // -------- clip‑tap (minute dumps) ------------ tap.lock().unwrap().feed(map.as_slice()); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 300 == 0 { debug!("🎧 ear #{n}: {}β€―bytes", map.len()); } let pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; // push non‑blocking; drop oldest on overflow if tx .try_send(Ok(AudioPacket { id, pts: pts_us, data: map.as_slice().to_vec(), })) .is_err() { static DROPS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if d % 300 == 0 { warn!("πŸŽ§πŸ’” dropped {d} audio AUs (client too slow)"); } } Ok(gst::FlowSuccess::Ok) } }) .build(), ); start_pipeline_or_reset(&pipeline, "starting audio pipeline")?; spawn_pipeline_bus_logger(bus, "audio", "🎢 audio pipeline PLAYING"); spawn_audio_source_watchdog( pipeline.clone(), source_health, tx.clone(), alsa_dev.to_string(), ); Ok(AudioStream { _pipeline: pipeline, inner: ReceiverStream::new(rx), }) } /*────────────────────────── build_pipeline_desc ───────────────────────────*/ #[cfg(not(coverage))] fn build_pipeline_desc(dev: &str) -> anyhow::Result { let reg = gst::Registry::get(); // first available encoder let enc = ["fdkaacenc", "voaacenc", "avenc_aac"] .into_iter() .find(|&e| { reg.find_plugin(e).is_some() || reg.find_feature(e, ElementFactory::static_type()).is_some() }) .ok_or_else(|| anyhow!("no AAC encoder plugin available"))?; Ok(format!( concat!( "alsasrc device=\"{dev}\" do-timestamp=true ! ", "audio/x-raw,format=S16LE,channels=2,rate=48000 ! ", "level name=source_level interval=1000000000 message=true ! ", "audioconvert ! audioresample ! {enc} bitrate=192000 ! ", "aacparse ! ", "capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! ", "tee name=t ", "t. ! queue ! appsink name=asink emit-signals=true ", "t. ! queue ! appsink name=debugtap emit-signals=true max-buffers=500 drop=true" ), dev = dev, enc = enc )) } #[cfg(not(coverage))] fn ensure_remote_usb_audio_ready(alsa_dev: &str) -> anyhow::Result<()> { if !alsa_dev_uses_remote_uac_gadget(alsa_dev) { return Ok(()); } let Some((controller, state)) = current_usb_gadget_state()? else { return Ok(()); }; if state == "not attached" { return Err(anyhow!( "remote USB gadget is not attached (UDC {controller} state={state}); remote speaker audio cannot stream until the controlled PC enumerates Lesavka USB" )); } Ok(()) } #[cfg(not(coverage))] fn alsa_dev_uses_remote_uac_gadget(alsa_dev: &str) -> bool { matches!(alsa_dev, "hw:UAC2Gadget,0" | "hw:UAC2_Gadget,0") || alsa_dev.contains("UAC2Gadget") || alsa_dev.contains("UAC2_Gadget") } #[cfg(not(coverage))] fn current_usb_gadget_state() -> anyhow::Result> { let configfs_root = std::env::var("LESAVKA_GADGET_CONFIGFS_ROOT") .unwrap_or_else(|_| "/sys/kernel/config/usb_gadget".to_string()); let sysfs_root = std::env::var("LESAVKA_GADGET_SYSFS_ROOT").unwrap_or_else(|_| "/sys".into()); let udc = fs::read_to_string(format!("{configfs_root}/lesavka/UDC")) .ok() .map(|value| value.trim().to_string()) .filter(|value| !value.is_empty()); let Some(controller) = udc else { return Ok(None); }; let state = fs::read_to_string(format!("{sysfs_root}/class/udc/{controller}/state")) .with_context(|| format!("reading UDC state for {controller}"))? .trim() .to_string(); Ok(Some((controller, state))) } #[cfg(not(coverage))] struct AudioSourceHealth { started_at: Instant, last_sample_at: Mutex, packets: AtomicU64, closed: AtomicBool, } #[cfg(not(coverage))] impl AudioSourceHealth { fn new() -> Self { let now = Instant::now(); Self { started_at: now, last_sample_at: Mutex::new(now), packets: AtomicU64::new(0), closed: AtomicBool::new(false), } } fn mark_sample(&self) { self.packets.fetch_add(1, Ordering::Relaxed); if let Ok(mut last) = self.last_sample_at.lock() { *last = Instant::now(); } } fn is_closed(&self) -> bool { self.closed.load(Ordering::Relaxed) } fn signal_failure(&self) -> bool { !self.closed.swap(true, Ordering::Relaxed) } fn elapsed(&self) -> Duration { self.started_at.elapsed() } fn idle_for(&self) -> Duration { self.last_sample_at .lock() .map(|last| last.elapsed()) .unwrap_or_else(|_| Duration::from_secs(0)) } fn packets(&self) -> u64 { self.packets.load(Ordering::Relaxed) } } #[cfg(not(coverage))] #[derive(Clone, Copy)] struct AudioWatchdogPolicy { startup_grace: Duration, idle_timeout: Duration, min_packets_per_second: u64, } #[cfg(not(coverage))] impl AudioWatchdogPolicy { fn from_env() -> Self { Self { startup_grace: env_duration_ms("LESAVKA_AUDIO_SOURCE_GRACE_MS", 3_000), idle_timeout: env_duration_ms("LESAVKA_AUDIO_SOURCE_IDLE_MS", 1_500), min_packets_per_second: env_u64("LESAVKA_AUDIO_MIN_PACKETS_PER_SEC", 20), } } } #[cfg(not(coverage))] fn env_duration_ms(name: &str, default_ms: u64) -> Duration { Duration::from_millis(env_u64(name, default_ms)) } #[cfg(not(coverage))] fn env_u64(name: &str, default: u64) -> u64 { std::env::var(name) .ok() .and_then(|value| value.parse::().ok()) .filter(|value| *value > 0) .unwrap_or(default) } /// Watch the remote speaker capture source and fail fast when the USB audio /// gadget is open but not producing real-time packets. #[cfg(not(coverage))] fn spawn_audio_source_watchdog( pipeline: gst::Pipeline, health: Arc, tx: tokio::sync::mpsc::Sender>, alsa_dev: String, ) { let policy = AudioWatchdogPolicy::from_env(); std::thread::spawn(move || { loop { std::thread::sleep(Duration::from_millis(250)); if health.is_closed() { break; } let elapsed = health.elapsed(); if elapsed < policy.startup_grace { continue; } let packets = health.packets(); let idle_for = health.idle_for(); let rate = packets as f64 / elapsed.as_secs_f64().max(0.001); let failure = if packets == 0 { Some(format!( "remote speaker capture produced no audio samples after {} ms on {alsa_dev}", elapsed.as_millis() )) } else if idle_for >= policy.idle_timeout { Some(format!( "remote speaker capture stalled for {} ms on {alsa_dev}", idle_for.as_millis() )) } else if (packets / elapsed.as_secs().max(1)) < policy.min_packets_per_second { Some(format!( "remote speaker capture cadence is too low on {alsa_dev}: {rate:.1} packets/s, expected at least {} packets/s", policy.min_packets_per_second )) } else { None }; if let Some(message) = failure { if health.signal_failure() { warn!("πŸ”ŠπŸ›Ÿ {message}; restarting audio capture on next client reconnect"); let _ = pipeline.set_state(gst::State::Null); let _ = tx.blocking_send(Err(Status::unavailable(message))); } break; } } }); } // ────────────────────── minute‑clip helper ─────────────────────────────── pub struct ClipTap { buf: Vec, tag: &'static str, next_dump: Instant, period: Duration, } impl ClipTap { pub fn new(tag: &'static str, period: Duration) -> Self { Self { buf: Vec::with_capacity(260_000), tag, next_dump: Instant::now() + period, period, } } pub fn feed(&mut self, bytes: &[u8]) { self.buf.extend_from_slice(bytes); if self.buf.len() > 256_000 { self.buf.drain(..self.buf.len() - 256_000); } if Instant::now() >= self.next_dump { self.flush(); self.next_dump += self.period; } } pub fn flush(&mut self) { if self.buf.is_empty() { return; } let ts = chrono::Local::now().format("%Y%m%d-%H%M%S"); let path = format!("/tmp/{}-{}.aac", self.tag, ts); let _ = std::fs::write(&path, &self.buf); self.buf.clear(); } } impl Drop for ClipTap { fn drop(&mut self) { self.flush() } } // ────────────────────── microphone sink ──────────────────────────────── pub struct Voice { appsrc: gst_app::AppSrc, _pipe: gst::Pipeline, // keep pipeline alive tap: ClipTap, } impl Drop for Voice { fn drop(&mut self) { let _ = self._pipe.set_state(gst::State::Null); } } fn voice_input_caps() -> gst::Caps { gst::Caps::builder("audio/mpeg") .field("mpegversion", 4i32) .field("stream-format", "adts") .field("rate", 48_000i32) .field("channels", 2i32) .build() } impl Voice { #[cfg(coverage)] pub async fn new(_alsa_dev: &str) -> anyhow::Result { gst::init().context("gst init")?; let pipeline = gst::Pipeline::new(); let appsrc = gst::ElementFactory::make("appsrc") .build() .context("make appsrc")? .downcast::() .expect("appsrc"); appsrc.set_format(gst::Format::Time); appsrc.set_is_live(true); appsrc.set_caps(Some(&voice_input_caps())); let sink = gst::ElementFactory::make("fakesink") .build() .context("make fakesink")?; pipeline.add_many(&[appsrc.upcast_ref(), &sink])?; gst::Element::link_many(&[appsrc.upcast_ref(), &sink])?; start_pipeline_or_reset(&pipeline, "starting voice pipeline")?; Ok(Self { appsrc, _pipe: pipeline, tap: ClipTap::new("voice", Duration::from_secs(60)), }) } #[cfg(not(coverage))] pub async fn new(alsa_dev: &str) -> anyhow::Result { use gst::prelude::*; gst::init().context("gst init")?; // pipeline let pipeline = gst::Pipeline::new(); // elements let appsrc = gst::ElementFactory::make("appsrc") .build() .context("make appsrc")? .downcast::() .unwrap(); // dedicated AppSrc helpers exist and avoid the needless `?` appsrc.set_caps(Some(&voice_input_caps())); appsrc.set_format(gst::Format::Time); appsrc.set_is_live(true); let decodebin = gst::ElementFactory::make("decodebin") .build() .context("make decodebin")?; let convert = gst::ElementFactory::make("audioconvert") .build() .context("make audioconvert")?; let resample = gst::ElementFactory::make("audioresample") .build() .context("make audioresample")?; let caps = gst::Caps::builder("audio/x-raw") .field("format", "S16LE") .field("channels", 2i32) .field("rate", 48_000i32) .build(); let capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &caps) .build() .context("make capsfilter")?; let alsa_sink = gst::ElementFactory::make("alsasink") .build() .context("make alsasink")?; alsa_sink.set_property("device", &alsa_dev); alsa_sink.set_property("sync", false); alsa_sink.set_property("async", false); alsa_sink.set_property("enable-last-sample", false); pipeline.add_many(&[ appsrc.upcast_ref(), &decodebin, &convert, &resample, &capsfilter, &alsa_sink, ])?; appsrc.link(&decodebin)?; gst::Element::link_many(&[&convert, &resample, &capsfilter, &alsa_sink])?; /*------------ decodebin autolink ----------------*/ let convert_sink = convert .static_pad("sink") .context("audioconvert sink pad")?; decodebin.connect_pad_added(move |_db, pad| { if convert_sink.is_linked() { return; } let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None)); let is_audio = caps .structure(0) .map(|s| s.name().starts_with("audio/")) .unwrap_or(false); if !is_audio { return; } let _ = pad.link(&convert_sink); }); let bus = pipeline.bus().context("voice pipeline bus")?; // underrun β‰  error – just show a warning // let _id = alsa_sink.connect("underrun", false, |_| { // tracing::warn!("⚠️ USB playback underrun – host muted/not reading"); // None // }); start_pipeline_or_reset(&pipeline, "starting voice pipeline")?; spawn_pipeline_bus_logger(bus, "voice", "🎀 voice pipeline ▢️"); Ok(Self { appsrc, _pipe: pipeline, tap: ClipTap::new("voice", Duration::from_secs(60)), }) } pub fn push(&mut self, pkt: &AudioPacket) { self.tap.feed(&pkt.data); let mut buf = gst::Buffer::from_slice(pkt.data.clone()); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); let _ = self.appsrc.push_buffer(buf); } pub fn finish(&mut self) { self.tap.flush(); let _ = self.appsrc.end_of_stream(); } } #[cfg(test)] mod voice_caps_tests { use super::voice_input_caps; #[test] fn voice_input_caps_describe_aac_adts_stereo_48k() { let _ = super::gst::init(); let caps = voice_input_caps().to_string(); assert!(caps.contains("audio/mpeg")); assert!(caps.contains("mpegversion=(int)4")); assert!(caps.contains("stream-format=(string)adts")); assert!(caps.contains("rate=(int)48000")); assert!(caps.contains("channels=(int)2")); } } #[cfg(all(test, coverage))] mod tests { use super::Voice; #[tokio::test] async fn coverage_voice_constructor_starts_stub_pipeline() { let mut voice = Voice::new("coverage-audio").await.expect("voice"); voice.finish(); } } #[cfg(all(test, not(coverage)))] mod tests { use super::{build_pipeline_desc, ensure_remote_usb_audio_ready}; use temp_env::with_vars; use tempfile::tempdir; #[test] fn speaker_downlink_pipeline_keeps_aac_adts_transport_and_level_probe() { let _ = super::gst::init(); let result = build_pipeline_desc("hw:Loopback,0"); match result { Ok(desc) => { assert!(desc.contains("alsasrc device=\"hw:Loopback,0\"")); assert!(desc.contains("audio/x-raw,format=S16LE,channels=2,rate=48000")); assert!(desc.contains("aacparse")); assert!(desc.contains("stream-format=adts")); assert!(desc.contains("level name=source_level")); assert!(desc.contains("appsink name=asink")); } Err(err) => { assert!( err.to_string().contains("no AAC encoder plugin available"), "unexpected build failure: {err:#}" ); } } } #[test] fn remote_usb_audio_reports_not_attached_gadget() { let dir = tempdir().expect("tempdir"); let cfg_root = dir.path().join("cfg"); let sys_root = dir.path().join("sys"); let udc_dir = sys_root.join("class/udc/fake-ctrl.usb"); std::fs::create_dir_all(cfg_root.join("lesavka")).expect("cfg"); std::fs::create_dir_all(&udc_dir).expect("udc"); std::fs::write(cfg_root.join("lesavka/UDC"), "fake-ctrl.usb\n").expect("udc file"); std::fs::write(udc_dir.join("state"), "not attached\n").expect("state"); with_vars( [ ( "LESAVKA_GADGET_CONFIGFS_ROOT", Some(cfg_root.to_string_lossy().to_string()), ), ( "LESAVKA_GADGET_SYSFS_ROOT", Some(sys_root.to_string_lossy().to_string()), ), ], || { let err = ensure_remote_usb_audio_ready("hw:UAC2Gadget,0") .expect_err("not attached gadget should block remote speaker audio"); assert!( err.to_string() .contains("remote USB gadget is not attached") ); }, ); } #[test] fn remote_usb_audio_allows_non_gadget_override() { ensure_remote_usb_audio_ready("hw:Loopback,0").expect("non-gadget override"); } }