// client/src/output/audio.rs use anyhow::{Context, Result}; use gst::MessageView::*; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; use std::{ fs as std_fs, path::{Path as StdPath, PathBuf}, sync::Mutex, thread, time::Duration, }; use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::AudioPacket; const AUDIO_GAIN_ENV: &str = "LESAVKA_AUDIO_GAIN"; const AUDIO_GAIN_CONTROL_ENV: &str = "LESAVKA_AUDIO_GAIN_CONTROL"; const DEFAULT_AUDIO_GAIN: f64 = 2.0; const MAX_AUDIO_GAIN: f64 = 8.0; pub struct AudioOut { pipeline: gst::Pipeline, src: gst_app::AppSrc, timeline: Mutex, } #[derive(Default)] struct AudioTimeline { last_remote_pts_us: Option, packets: u64, } impl AudioOut { pub fn new() -> anyhow::Result { Self::new_with_sink_and_env(None, true) } pub fn new_with_sink(sink_override: Option<&str>) -> anyhow::Result { Self::new_with_sink_and_env(sink_override, true) } pub fn new_default_sink() -> anyhow::Result { Self::new_with_sink_and_env(None, false) } fn new_with_sink_and_env( sink_override: Option<&str>, allow_env_sink: bool, ) -> anyhow::Result { gst::init().context("initialising GStreamer")?; let sink = pick_sink_element(sink_override, allow_env_sink)?; let tee_dump = std::env::var("LESAVKA_TAP_AUDIO") .ok() .as_deref() .is_some_and(|v| v == "1"); let gain = audio_gain_from_env(); let pipe = audio_output_pipeline_desc(&sink, gain, tee_dump); if tee_dump { warn!("💾 tee to /tmp/lesavka-audio.aac enabled (LESAVKA_TAP_AUDIO=1)"); } let pipeline: gst::Pipeline = gst::parse::launch(&pipe)? .downcast::() .expect("not a pipeline"); let src: gst_app::AppSrc = pipeline .by_name("src") .expect("no src element") .downcast::() .expect("src not an AppSrc"); let volume = pipeline .by_name("remote_audio_gain") .expect("remote_audio_gain"); src.set_caps(Some( &gst::Caps::builder("audio/mpeg") .field("mpegversion", 4i32) // AAC .field("stream-format", "adts") // ADTS frames .field("rate", 48_000i32) // 48 kHz .field("channels", 2i32) // stereo .build(), )); src.set_format(gst::Format::Time); maybe_spawn_audio_gain_control(volume); #[cfg(not(coverage))] { let bus = pipeline.bus().unwrap(); std::thread::spawn(move || { for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { Error(e) => error!( "💥 gst error from {:?}: {} ({})", msg.src().map(gst::prelude::GstObjectExt::path_string), e.error(), e.debug().unwrap_or_default() ), Warning(w) => warn!( "⚠️ gst warning from {:?}: {} ({})", msg.src().map(gst::prelude::GstObjectExt::path_string), w.error(), w.debug().unwrap_or_default() ), Element(e) => { if let Some(structure) = e.structure() { if structure.name() == "level" { info!("🔊 decoded audio level {}", structure); } else { debug!("🔎 gst element message: {}", structure); } } } StateChanged(s) if s.current() == gst::State::Playing => { if msg.src().is_some_and(|s| s.is::()) { info!("🔊 audio pipeline ▶️ (sink='{sink}' gain={gain:.2}x)"); } else { debug!( "🔊 element {} now ▶️", msg.src() .map(gst::prelude::GstObjectExt::name) .unwrap_or_default() ); } } _ => {} } } }); } pipeline .set_state(gst::State::Playing) .context("starting audio pipeline")?; Ok(Self { pipeline, src, timeline: Mutex::new(AudioTimeline::default()), }) } pub fn push(&self, pkt: AudioPacket) { let buf = live_audio_buffer(pkt, &self.timeline); #[cfg(not(coverage))] if let Err(e) = self.src.push_buffer(buf) { warn!("📉 AppSrc push failed: {e:?}"); } #[cfg(coverage)] { let _ = self.src.push_buffer(buf); } } } fn audio_output_pipeline_desc(sink: &str, gain: f64, tee_dump: bool) -> String { let gain = format_audio_gain_for_gst(gain); if tee_dump { return format!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ tee name=t ! \ queue max-size-time=500000000 max-size-bytes=0 max-size-buffers=0 ! \ aacparse ! avdec_aac ! audioconvert ! audioresample ! \ audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ volume name=remote_audio_gain volume={gain} ! \ level name=remote_audio_level interval=1000000000 message=true ! \ queue max-size-time=400000000 max-size-bytes=0 max-size-buffers=0 ! {sink} \ t. ! queue ! filesink location=/tmp/lesavka-audio.aac" ); } format!( "appsrc name=src is-live=true format=time do-timestamp=true \ block=false ! \ queue max-size-time=500000000 max-size-bytes=0 max-size-buffers=0 ! \ aacparse ! avdec_aac ! \ audioconvert ! audioresample ! \ audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ volume name=remote_audio_gain volume={gain} ! \ level name=remote_audio_level interval=1000000000 message=true ! \ queue max-size-time=400000000 max-size-bytes=0 max-size-buffers=0 ! {sink}" ) } fn audio_gain_from_env() -> f64 { std::env::var(AUDIO_GAIN_ENV) .ok() .and_then(|raw| parse_audio_gain(&raw)) .unwrap_or(DEFAULT_AUDIO_GAIN) } fn parse_audio_gain(raw: &str) -> Option { let value = raw.split_ascii_whitespace().next()?.parse::().ok()?; value.is_finite().then_some(clamp_audio_gain(value)) } fn clamp_audio_gain(value: f64) -> f64 { value.clamp(0.0, MAX_AUDIO_GAIN) } fn format_audio_gain_for_gst(gain: f64) -> String { format!("{:.3}", clamp_audio_gain(gain)) } #[cfg(not(coverage))] fn maybe_spawn_audio_gain_control(volume: gst::Element) { let Ok(path) = std::env::var(AUDIO_GAIN_CONTROL_ENV) else { return; }; let path = PathBuf::from(path); thread::spawn(move || { let mut last_gain = None; loop { apply_audio_gain_control_sample(&path, &volume, &mut last_gain); thread::sleep(Duration::from_millis(250)); } }); } #[cfg(coverage)] fn maybe_spawn_audio_gain_control(volume: gst::Element) { let Ok(path) = std::env::var(AUDIO_GAIN_CONTROL_ENV) else { return; }; let path = PathBuf::from(path); let mut last_gain = None; apply_audio_gain_control_sample(&path, &volume, &mut last_gain); } fn apply_audio_gain_control_sample( path: &StdPath, volume: &gst::Element, last_gain: &mut Option, ) -> Option { let gain = read_audio_gain_control(path)?; if *last_gain == Some(gain) { return None; } volume.set_property("volume", gain); *last_gain = Some(gain); info!("🔊 remote audio gain set to {gain:.2}x"); Some(gain) } fn read_audio_gain_control(path: &StdPath) -> Option { std_fs::read_to_string(path) .ok() .and_then(|raw| parse_audio_gain(&raw)) } fn live_audio_buffer(pkt: AudioPacket, timeline: &Mutex) -> gst::Buffer { let buf = gst::Buffer::from_slice(pkt.data); if let Ok(mut timeline) = timeline.lock() { let remote_gap_us = timeline .last_remote_pts_us .map(|last| pkt.pts.saturating_sub(last)); timeline.last_remote_pts_us = Some(pkt.pts); timeline.packets = timeline.packets.saturating_add(1); if timeline.packets <= 8 || timeline.packets % 600 == 0 { #[cfg(not(coverage))] debug!( packet = timeline.packets, remote_pts_us = pkt.pts, remote_gap_us, bytes = buf.size(), "🔊 audio packet queued for live appsrc timestamping" ); } } buf } impl Drop for AudioOut { fn drop(&mut self) { let _ = self.pipeline.set_state(gst::State::Null); } } #[cfg(not(coverage))] fn pick_sink_element(sink_override: Option<&str>, allow_env_sink: bool) -> Result { if let Some(s) = sink_override.filter(|value| !value.trim().is_empty()) { let sink = normalize_sink_override(s); info!("💪 sink overridden via live media control={s} -> {sink}"); return Ok(sink); } if allow_env_sink && let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") { let sink = normalize_sink_override(&s); info!( "💪 sink overridden via LESAVKA_AUDIO_SINK={} -> {}", s, sink ); return Ok(sink); } let sinks = list_pw_sinks(); if let Some((n, st)) = sinks.first() { info!("🔈 using PipeWire sink '{}' ({st})", n); return Ok(pulsesink_device_element(n)); } warn!("🫣 no PipeWire sinks readable - falling back to autoaudiosink"); Ok("autoaudiosink".to_string()) } #[cfg(coverage)] fn pick_sink_element(sink_override: Option<&str>, allow_env_sink: bool) -> Result { if let Some(s) = sink_override.filter(|value| !value.trim().is_empty()) { return Ok(normalize_sink_override(s)); } if allow_env_sink && let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") { return Ok(normalize_sink_override(&s)); } if let Some((n, _)) = list_pw_sinks().first() { return Ok(pulsesink_device_element(n)); } Ok("autoaudiosink".to_string()) } /// Interpret `LESAVKA_AUDIO_SINK` as either a full sink element or bare device. fn normalize_sink_override(raw: &str) -> String { let trimmed = raw.trim(); if trimmed.contains([' ', '=', '!']) || trimmed.ends_with("sink") { return trimmed.to_string(); } pulsesink_device_element(trimmed) } fn pulsesink_device_element(device: &str) -> String { let escaped = device.replace('\\', "\\\\").replace('"', "\\\""); let (buffer_time, latency_time) = if device.starts_with("bluez_output.") { (750_000, 250_000) } else { (350_000, 100_000) }; format!( "pulsesink device=\"{escaped}\" buffer-time={buffer_time} latency-time={latency_time} sync=true" ) } fn list_pw_sinks() -> Vec<(String, String)> { let default_sink = std::process::Command::new("pactl") .args(["info"]) .output() .ok() .filter(|output| output.status.success()) .and_then(|output| parse_pactl_default_sink(&String::from_utf8_lossy(&output.stdout))); if let Ok(output) = std::process::Command::new("pactl") .args(["list", "short", "sinks"]) .output() && output.status.success() { return parse_pactl_short_sinks( &String::from_utf8_lossy(&output.stdout), default_sink.as_deref(), ); } default_sink .map(|sink| vec![(sink, "DEFAULT".to_string())]) .unwrap_or_default() } fn parse_pactl_default_sink(stdout: &str) -> Option { stdout .lines() .find_map(|line| line.strip_prefix("Default Sink:")) .map(str::trim) .filter(|sink| !sink.is_empty()) .map(str::to_string) } fn parse_pactl_short_sinks(stdout: &str, default_sink: Option<&str>) -> Vec<(String, String)> { let mut sinks = Vec::new(); for line in stdout.lines() { let columns: Vec<_> = line.split_whitespace().collect(); if columns.len() < 2 { continue; } let name = columns[1].to_string(); let state = columns .last() .copied() .unwrap_or("UNKNOWN") .to_ascii_uppercase(); sinks.push((name, state)); } sinks.sort_by_key(|(name, state)| { ( sink_state_rank(state), if Some(name.as_str()) == default_sink { 0 } else { 1 }, name.clone(), ) }); sinks.dedup_by(|left, right| left.0 == right.0); if let Some(default_sink) = default_sink && sinks.iter().all(|(name, _)| name != default_sink) { sinks.insert(0, (default_sink.to_string(), "DEFAULT".to_string())); } sinks } fn sink_state_rank(state: &str) -> u8 { match state { "RUNNING" => 0, "IDLE" => 1, "SUSPENDED" => 2, _ => 3, } }