pub struct ClipTap { buf: Vec, tag: &'static str, next_dump: Instant, period: Duration, } impl ClipTap { pub fn new(tag: &'static str, period: Duration) -> Self { Self { buf: Vec::with_capacity(260_000), tag, next_dump: Instant::now() + period, period, } } pub fn feed(&mut self, bytes: &[u8]) { self.buf.extend_from_slice(bytes); if self.buf.len() > 256_000 { self.buf.drain(..self.buf.len() - 256_000); } if Instant::now() >= self.next_dump { self.flush(); self.next_dump += self.period; } } pub fn flush(&mut self) { if self.buf.is_empty() { return; } let ts = chrono::Local::now().format("%Y%m%d-%H%M%S"); let path = format!("/tmp/{}-{}.aac", self.tag, ts); let _ = std::fs::write(&path, &self.buf); self.buf.clear(); } } impl Drop for ClipTap { fn drop(&mut self) { self.flush() } } // ────────────────────── microphone sink ──────────────────────────────── pub struct Voice { appsrc: gst_app::AppSrc, _pipe: gst::Pipeline, // keep pipeline alive clock_aligned: bool, tap: ClipTap, } impl Drop for Voice { fn drop(&mut self) { let _ = self._pipe.set_state(gst::State::Null); } } fn voice_input_caps() -> gst::Caps { gst::Caps::builder("audio/mpeg") .field("mpegversion", 4i32) .field("stream-format", "adts") .field("rate", 48_000i32) .field("channels", 2i32) .build() } fn voice_sink_buffer_time_us() -> i64 { positive_voice_sink_timing_env("LESAVKA_UAC_BUFFER_TIME_US", 20_000) } fn voice_sink_latency_time_us() -> i64 { positive_voice_sink_timing_env("LESAVKA_UAC_LATENCY_TIME_US", 5_000) } fn voice_sink_compensation_us() -> i64 { std::env::var("LESAVKA_UAC_COMPENSATION_US") .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value >= 0) .unwrap_or_else(default_voice_sink_compensation_us) } fn default_voice_sink_compensation_us() -> i64 { let cfg = crate::camera::current_camera_config(); if cfg.output == crate::camera::CameraOutput::Hdmi { // HDMI now prefers MJPEG on hosts without hardware H.264 decode, which // removed the old video-side lag that justified the large audio pad. non_negative_voice_sink_timing_env("LESAVKA_UAC_HDMI_COMPENSATION_US", 0) } else { 0 } } fn positive_voice_sink_timing_env(name: &str, default: i64) -> i64 { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value > 0) .unwrap_or(default) } fn non_negative_voice_sink_timing_env(name: &str, default: i64) -> i64 { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value >= 0) .unwrap_or(default) } impl Voice { #[cfg(coverage)] pub async fn new(_alsa_dev: &str) -> anyhow::Result { gst::init().context("gst init")?; let pipeline = gst::Pipeline::new(); let appsrc = gst::ElementFactory::make("appsrc") .build() .context("make appsrc")? .downcast::() .expect("appsrc"); appsrc.set_format(gst::Format::Time); appsrc.set_is_live(true); appsrc.set_caps(Some(&voice_input_caps())); let sink = gst::ElementFactory::make("fakesink") .build() .context("make fakesink")?; pipeline.add_many(&[appsrc.upcast_ref(), &sink])?; gst::Element::link_many(&[appsrc.upcast_ref(), &sink])?; start_pipeline_or_reset(&pipeline, "starting voice pipeline")?; Ok(Self { appsrc, _pipe: pipeline, clock_aligned: false, tap: ClipTap::new("voice", Duration::from_secs(60)), }) } #[cfg(not(coverage))] pub async fn new(alsa_dev: &str) -> anyhow::Result { use gst::prelude::*; gst::init().context("gst init")?; // pipeline let pipeline = gst::Pipeline::new(); // elements let appsrc = gst::ElementFactory::make("appsrc") .build() .context("make appsrc")? .downcast::() .unwrap(); // dedicated AppSrc helpers exist and avoid the needless `?` appsrc.set_caps(Some(&voice_input_caps())); appsrc.set_format(gst::Format::Time); appsrc.set_is_live(true); let decodebin = gst::ElementFactory::make("decodebin") .build() .context("make decodebin")?; let convert = gst::ElementFactory::make("audioconvert") .build() .context("make audioconvert")?; let resample = gst::ElementFactory::make("audioresample") .build() .context("make audioresample")?; let caps = gst::Caps::builder("audio/x-raw") .field("format", "S16LE") .field("channels", 2i32) .field("rate", 48_000i32) .build(); let capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &caps) .build() .context("make capsfilter")?; let level = gst::ElementFactory::make("level") .name("voice_level_pre_queue") .property("interval", 1_000_000_000u64) .property("message", true) .build() .context("make voice pre-queue level probe")?; let alsa_sink = gst::ElementFactory::make("alsasink") .build() .context("make alsasink")?; let delay_queue = gst::ElementFactory::make("queue") .build() .context("make voice delay queue")?; let post_level = gst::ElementFactory::make("level") .name("voice_level_post_queue") .property("interval", 1_000_000_000u64) .property("message", true) .build() .context("make voice post-queue level probe")?; let buffer_time_us = voice_sink_buffer_time_us(); let latency_time_us = voice_sink_latency_time_us(); let compensation_us = voice_sink_compensation_us(); alsa_sink.set_property("device", alsa_dev); alsa_sink.set_property("sync", true); alsa_sink.set_property("async", true); alsa_sink.set_property("enable-last-sample", false); alsa_sink.set_property("provide-clock", false); alsa_sink.set_property("buffer-time", buffer_time_us); alsa_sink.set_property("latency-time", latency_time_us); let compensation_ns = (compensation_us.max(0) as u64).saturating_mul(1_000); delay_queue.set_property("max-size-buffers", 0u32); delay_queue.set_property("max-size-bytes", 0u32); delay_queue.set_property("max-size-time", compensation_ns); delay_queue.set_property("min-threshold-time", compensation_ns); tracing::info!( %alsa_dev, buffer_time_us, latency_time_us, compensation_us, "🎤 UAC sink low-latency timing armed" ); crate::media_timing::prepare_pipeline_clock_sync(&pipeline); pipeline.add_many([ appsrc.upcast_ref(), &decodebin, &convert, &resample, &capsfilter, &level, &delay_queue, &post_level, &alsa_sink, ])?; appsrc.link(&decodebin)?; gst::Element::link_many([ &convert, &resample, &capsfilter, &level, &delay_queue, &post_level, &alsa_sink, ])?; /*------------ decodebin autolink ----------------*/ let convert_sink = convert .static_pad("sink") .context("audioconvert sink pad")?; decodebin.connect_pad_added(move |_db, pad| { if convert_sink.is_linked() { return; } let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None)); let is_audio = caps .structure(0) .map(|s| s.name().starts_with("audio/")) .unwrap_or(false); if !is_audio { return; } let _ = pad.link(&convert_sink); }); let bus = pipeline.bus().context("voice pipeline bus")?; // underrun ≠ error – just show a warning // let _id = alsa_sink.connect("underrun", false, |_| { // tracing::warn!("⚠️ USB playback underrun – host muted/not reading"); // None // }); start_pipeline_or_reset(&pipeline, "starting voice pipeline")?; spawn_pipeline_bus_logger(bus, "voice", "🎤 voice pipeline ▶️"); Ok(Self { appsrc, _pipe: pipeline, clock_aligned: false, tap: ClipTap::new("voice", Duration::from_secs(60)), }) } pub fn push(&mut self, pkt: &AudioPacket) { self.tap.feed(&pkt.data); if !self.clock_aligned { crate::media_timing::align_pipeline_to_session_clock(&self._pipe, pkt.pts); self.clock_aligned = true; } let mut buf = gst::Buffer::from_slice(pkt.data.clone()); if let Some(meta) = buf.get_mut() { let ts = gst::ClockTime::from_useconds(pkt.pts); meta.set_pts(Some(ts)); meta.set_dts(Some(ts)); } let _ = self.appsrc.push_buffer(buf); } pub fn finish(&mut self) { self.tap.flush(); let _ = self.appsrc.end_of_stream(); } } #[cfg(test)] mod voice_sink_timing_tests { use crate::camera::update_camera_config; use super::{voice_sink_buffer_time_us, voice_sink_latency_time_us}; use super::{default_voice_sink_compensation_us, voice_sink_compensation_us}; #[test] fn voice_sink_timing_defaults_stay_live_call_friendly() { temp_env::with_var_unset("LESAVKA_UAC_BUFFER_TIME_US", || { temp_env::with_var_unset("LESAVKA_UAC_LATENCY_TIME_US", || { temp_env::with_var_unset("LESAVKA_UAC_COMPENSATION_US", || { temp_env::with_var_unset("LESAVKA_UAC_HDMI_COMPENSATION_US", || { temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || { update_camera_config(); assert_eq!(voice_sink_buffer_time_us(), 20_000); assert_eq!(voice_sink_latency_time_us(), 5_000); assert_eq!(voice_sink_compensation_us(), 0); }); }); }); }); }); } #[test] fn voice_sink_timing_env_accepts_positive_overrides_only() { temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || { update_camera_config(); temp_env::with_var("LESAVKA_UAC_BUFFER_TIME_US", Some("42000"), || { temp_env::with_var("LESAVKA_UAC_LATENCY_TIME_US", Some("7000"), || { assert_eq!(voice_sink_buffer_time_us(), 42_000); assert_eq!(voice_sink_latency_time_us(), 7_000); assert_eq!(voice_sink_compensation_us(), 0); }); }); }); temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || { update_camera_config(); temp_env::with_var("LESAVKA_UAC_BUFFER_TIME_US", Some("0"), || { temp_env::with_var("LESAVKA_UAC_LATENCY_TIME_US", Some("-5"), || { temp_env::with_var("LESAVKA_UAC_COMPENSATION_US", Some("166667"), || { assert_eq!(voice_sink_buffer_time_us(), 20_000); assert_eq!(voice_sink_latency_time_us(), 5_000); assert_eq!(voice_sink_compensation_us(), 166_667); }); }); }); }); temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || { update_camera_config(); temp_env::with_var("LESAVKA_UAC_COMPENSATION_US", Some("-5"), || { temp_env::with_var("LESAVKA_UAC_BUFFER_TIME_US", Some("0"), || { temp_env::with_var("LESAVKA_UAC_LATENCY_TIME_US", Some("-5"), || { assert_eq!(voice_sink_buffer_time_us(), 20_000); assert_eq!(voice_sink_latency_time_us(), 5_000); assert_eq!(voice_sink_compensation_us(), 0); }); }); }); }); } #[test] fn hdmi_sink_compensation_defaults_to_hdmi_specific_delay() { temp_env::with_var_unset("LESAVKA_UAC_COMPENSATION_US", || { temp_env::with_var_unset("LESAVKA_UAC_HDMI_COMPENSATION_US", || { temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("hdmi"), || { update_camera_config(); assert_eq!(default_voice_sink_compensation_us(), 0); assert_eq!(voice_sink_compensation_us(), 0); }); }); }); } #[test] fn explicit_compensation_override_wins_over_hdmi_default() { temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("hdmi"), || { update_camera_config(); temp_env::with_var("LESAVKA_UAC_HDMI_COMPENSATION_US", Some("120000"), || { temp_env::with_var("LESAVKA_UAC_COMPENSATION_US", Some("90000"), || { assert_eq!(default_voice_sink_compensation_us(), 120_000); assert_eq!(voice_sink_compensation_us(), 90_000); }); }); }); } }