// client/src/input/microphone.rs #![forbid(unsafe_code)] use anyhow::{Context, Result}; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; use lesavka_common::lesavka::AudioPacket; use tracing::{debug, error, info, warn}; pub struct MicrophoneCapture { pipeline: gst::Pipeline, sink: gst_app::AppSink, } impl MicrophoneCapture { pub fn new() -> Result { gst::init().ok(); // idempotent /* pulsesrc (default mic) → AAC/ADTS → appsink -------------------*/ let desc = concat!( "pulsesrc do-timestamp=true ! ", "audio/x-raw,format=S16LE,channels=2,rate=48000 ! ", "audioconvert ! audioresample ! avenc_aac bitrate=128000 ! ", "aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2 ! ", "appsink name=asink emit-signals=true max-buffers=50 drop=true" ); let pipeline: gst::Pipeline = gst::parse::launch(desc)? .downcast() .expect("pipeline"); let sink: gst_app::AppSink = pipeline .by_name("asink") .unwrap() .downcast() .unwrap(); /* ─── bus for diagnostics ───────────────────────────────────────*/ { let bus = pipeline.bus().unwrap(); std::thread::spawn(move || { use gst::MessageView::*; for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { StateChanged(s) if s.current() == gst::State::Playing && msg.src().map(|s| s.is::()).unwrap_or(false) => info!("🎤 mic pipeline ▶️ (source=pulsesrc)"), Error(e) => error!("🎤💥 mic: {} ({})", e.error(), e.debug().unwrap_or_default()), Warning(w) => warn!("🎤⚠️ mic: {} ({})", w.error(), w.debug().unwrap_or_default()), _ => {} } } }); } pipeline.set_state(gst::State::Playing) .context("start mic pipeline")?; Ok(Self { pipeline, sink }) } /// Blocking pull; call from an async wrapper pub fn pull(&self) -> Option { match self.sink.pull_sample() { Ok(sample) => { let buf = sample.buffer().unwrap(); let map = buf.map_readable().unwrap(); let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; Some(AudioPacket { id: 0, pts, data: map.as_slice().to_vec() }) } Err(_) => None, } } }