// client/src/output/audio.rs use anyhow::{Context, Result}; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; use tracing::{error, info, warn, debug}; use lesavka_common::lesavka::AudioPacket; pub struct AudioOut { pipeline: gst::Pipeline, src: gst_app::AppSrc, } impl AudioOut { pub fn new() -> anyhow::Result { gst::init().context("initialising GStreamer")?; // ── 1. Decide which sink element to instantiate ──────────────────── let sink = pick_sink_element()?; // Operator can request a tee to /tmp via LESAVKA_TAP_AUDIO=1 let tee_dump = std::env::var("LESAVKA_TAP_AUDIO") .ok() .as_deref() .map(|v| v == "1") .unwrap_or(false); // ── 2. Assemble pipeline description string ──────────────────────── let mut pipe = format!( "appsrc name=src is-live=true format=time do-timestamp=true \ block=false ! \ queue leaky=downstream ! \ aacparse ! avdec_aac ! audioresample ! audioconvert ! {}", sink, ); if tee_dump { pipe = format!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ tee name=t ! \ queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! audioconvert ! {} \ t. ! queue ! filesink location=/tmp/lesavka-audio.aac", sink, ); warn!("💾 tee to /tmp/lesavka-audio.aac enabled (LESAVKA_TAP_AUDIO=1)"); } // ── 3. Create the pipeline & fetch the AppSrc ─────────────────────── let pipeline: gst::Pipeline = gst::parse::launch(&pipe)? .downcast::() .expect("not a pipeline"); let src: gst_app::AppSrc = pipeline .by_name("src") .expect("no src element") .downcast::() .expect("src not an AppSrc"); src.set_caps(Some(&gst::Caps::builder("audio/mpeg") .field("mpegversion", &4i32) // AAC .field("stream-format", &"adts") // ADTS frames .field("rate", &48_000i32) // 48 kHz .field("channels", &2i32) // stereo .build() )); src.set_format(gst::Format::Time); // ── 4. Log *all* warnings/errors from the bus ────────────────────── let bus = pipeline.bus().unwrap(); std::thread::spawn(move || { use gst::MessageView::*; for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { Error(e) => error!("💥 gst error from {:?}: {} ({})", msg.src().map(|s| s.path_string()), e.error(), e.debug().unwrap_or_default()), Warning(w) => warn!("⚠️ gst warning from {:?}: {} ({})", msg.src().map(|s| s.path_string()), w.error(), w.debug().unwrap_or_default()), Element(e) => debug!("🔎 gst element message: {}", e .structure() .map(|s| s.to_string()) .unwrap_or_default()), StateChanged(s) if s.current() == gst::State::Playing => info!("🔊 audio pipeline PLAYING (sink='{}')", sink), _ => {} } } }); pipeline.set_state(gst::State::Playing).context("starting audio pipeline")?; Ok(Self { pipeline, src }) } pub fn push(&self, pkt: AudioPacket) { let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); if let Err(e) = self.src.push_buffer(buf) { warn!("📉 AppSrc push failed: {e:?}"); } } } impl Drop for AudioOut { fn drop(&mut self) { // put the whole pipeline back to NULL so GStreamer can dispose cleanly let _ = self.pipeline.set_state(gst::State::Null); } } /*──────────────── helper: sink selection ─────────────────────────────*/ fn pick_sink_element() -> Result { // 1. Operator override if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") { info!("🎛️ sink overridden via LESAVKA_AUDIO_SINK={}", s); return Ok(s); } // 2. Query PipeWire for default & running sinks // (works even if PulseAudio is present because PipeWire mimics it) let sinks = list_pw_sinks(); // Vec<(name,state)> for (n, st) in &sinks { if *st == "RUNNING" { info!("🔈 using default RUNNING sink '{}'", n); return Ok(format!("pulsesink device={}", n)); } } // 3. First RUNNING sink if let Some((n, _)) = sinks.iter().find(|(_, st)| *st == "RUNNING") { warn!("🪄 picking first RUNNING sink '{}'", n); return Ok(format!("pulsesink device={}", n)); } // 4. Anything if let Some((n, _)) = sinks.first() { warn!("🪄 picking first sink '{}'", n); return Ok(format!("pulsesink device={}", n)); } // Fallback – let autoaudiosink try its luck warn!("😬 no PipeWire sinks readable – falling back to autoaudiosink"); Ok("autoaudiosink".to_string()) } /// Minimal PipeWire sink enumerator (no extra crate required). fn list_pw_sinks() -> Vec<(String, String)> { let mut out = Vec::new(); // if let Ok(lines) = std::process::Command::new("pw-cli") // .args(["ls", "Node"]) // .output() // .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) // { // for l in lines.lines() { // // Example: " 36 │ node.alive = true │ alsa_output.pci-0000_2f_00.4.iec958-stereo │ state: SUSPENDED ..." // if let Some(pos) = l.find("│") { // let parts: Vec<_> = l[pos..].split('│').map(|s| s.trim()).collect(); // if parts.len() >= 3 && parts[2].starts_with("alsa_output.") { // let name = parts[2].to_string(); // // try to parse state, else UNKNOWN // let state = parts.get(3) // .and_then(|s| s.split_whitespace().nth(1)) // .unwrap_or("UNKNOWN") // .to_string(); // out.push((name, state)); // } // } // } // } if out.is_empty() { // ── PulseAudio / pactl fallback ──────────────────────────────── if let Ok(info) = std::process::Command::new("pactl") .args(["info"]) .output() .map(|o| String::from_utf8_lossy(&o.stdout).to_string()) { if let Some(line) = info.lines().find(|l| l.starts_with("Default Sink:")) { let def = line["Default Sink:".len()..].trim(); return vec![(def.to_string(), "UNKNOWN".to_string())]; } } } out }