// client/src/output/audio.rs use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::AudioPacket; use gst::prelude::*; use tracing::{error, info, warn, debug}; pub struct AudioOut { src: gst_app::AppSrc, } impl AudioOut { pub fn new() -> anyhow::Result { gst::init()?; // Auto‑audiosink picks PipeWire / Pulse etc. const PIPE: &str = "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! autoaudiosink"; // `parse_launch()` returns `gst::Element`; down‑cast manually and // map the error into anyhow ourselves (no `?` on the downcast). let pipeline: gst::Pipeline = gst::parse::launch(PIPE)? .downcast::() .expect("not a pipeline"); let src: gst_app::AppSrc = pipeline .by_name("src") .expect("no src element") .downcast::() .expect("src not an AppSrc"); src.set_format(gst::Format::Time); pipeline.set_state(gst::State::Playing)?; Ok(Self { src }) } pub fn push(&self, pkt: AudioPacket) { static CNT : std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 300 == 0 || n < 10 { debug!(bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received audio AU"); } let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); let _ = self.src.push_buffer(buf); } }