// client/src/output/audio.rs use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::AudioPacket; use gst::prelude::*; use tracing::{error, info, warn, debug}; pub struct AudioOut { src: gst_app::AppSrc, } impl AudioOut { pub fn new() -> anyhow::Result { gst::init()?; // Auto-audiosink picks PipeWire / Pulse etc. const PIPE: &str = "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! autoaudiosink"; // `parse_launch()` returns `gst::Element`; down-cast manually and // map the error into anyhow ourselves (no `?` on the downcast). let pipeline: gst::Pipeline = gst::parse::launch(PIPE)? .downcast::() .expect("not a pipeline"); let src: gst_app::AppSrc = pipeline .by_name("src") .expect("no src element") .downcast::() .expect("src not an AppSrc"); src.set_caps(Some(&gst::Caps::builder("audio/mpeg") // mpeg‑4 AAC .field("mpegversion", &4i32) .field("stream-format", &"adts") .build())); src.set_format(gst::Format::Time); { let bus = pipeline.bus().expect("bus"); std::thread::spawn(move || { use gst::MessageView::*; for msg in bus.iter_timed(gst::ClockTime::NONE) { if let Error(e) = msg.view() { error!("💥 client‑audio: {} ({})", e.error(), e.debug().unwrap_or_default()); } } }); } pipeline.set_state(gst::State::Playing)?; Ok(Self { src }) } pub fn push(&self, pkt: AudioPacket) { static CNT : std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 300 == 0 || n < 10 { debug!(bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received audio AU"); } let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); let _ = self.src.push_buffer(buf); } }