// client/src/input/microphone.rs #![forbid(unsafe_code)] use anyhow::{Context, Result}; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::AudioPacket; use shell_escape::unix::escape; use std::sync::atomic::{AtomicU64, Ordering}; use tracing::{debug, error, info, trace, warn}; pub struct MicrophoneCapture { #[allow(dead_code)] // kept alive to hold PLAYING state pipeline: gst::Pipeline, sink: gst_app::AppSink, } impl MicrophoneCapture { pub fn new() -> Result { gst::init().ok(); // idempotent /* pulsesrc (default mic) β†’ AAC/ADTS β†’ appsink -------------------*/ // Optional override: LESAVKA_MIC_SOURCE= // If not provided or not found, fall back to first non-monitor source. let device_arg = match std::env::var("LESAVKA_MIC_SOURCE") { Ok(s) if !s.is_empty() => match Self::pulse_source_by_substr(&s) { Some(full) => format!("device={}", escape(full.into())), None => { warn!("🎀 requested mic '{s}' not found; using default"); Self::default_source_arg() } }, _ => Self::default_source_arg(), }; debug!("🎀 device: {device_arg}"); let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"] .into_iter() .find(|e| gst::ElementFactory::find(e).is_some()) .unwrap_or("opusenc"); let parser = if aac.contains("opus") { // opusenc already outputs raw Opus frames – just state the caps "capsfilter caps=audio/x-opus,rate=48000,channels=2" } else { // AAC β†’ ADTS frames "aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2" }; let desc = format!( "pulsesrc {device_arg} do-timestamp=true ! \ audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ audioconvert ! audioresample ! {aac} bitrate=128000 ! \ {parser} ! \ queue max-size-buffers=100 leaky=downstream ! \ appsink name=asink emit-signals=true max-buffers=50 drop=true" ); let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline"); let sink: gst_app::AppSink = pipeline.by_name("asink").unwrap().downcast().unwrap(); /* ─── bus for diagnostics ───────────────────────────────────────*/ { let bus = pipeline.bus().unwrap(); std::thread::spawn(move || { use gst::MessageView::*; for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { StateChanged(s) if s.current() == gst::State::Playing && msg.src().map(|s| s.is::()).unwrap_or(false) => { info!("🎀 mic pipeline ▢️ (source=pulsesrc)") } Error(e) => error!( "🎀πŸ’₯ mic: {} ({})", e.error(), e.debug().unwrap_or_default() ), Warning(w) => warn!( "🎀⚠️ mic: {} ({})", w.error(), w.debug().unwrap_or_default() ), _ => {} } } }); } pipeline .set_state(gst::State::Playing) .context("start mic pipeline")?; Ok(Self { pipeline, sink }) } /// Blocking pull; call from an async wrapper pub fn pull(&self) -> Option { match self.sink.pull_sample() { Ok(sample) => { let buf = sample.buffer().unwrap(); let map = buf.map_readable().unwrap(); let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; static CNT: AtomicU64 = AtomicU64::new(0); let n = CNT.fetch_add(1, Ordering::Relaxed); if n < 10 || n % 300 == 0 { trace!("πŸŽ€β‡§ cli pkt#{n} {} bytes", map.len()); } Some(AudioPacket { id: 0, pts, data: map.as_slice().to_vec(), }) } Err(_) => None, } } fn pulse_source_by_substr(fragment: &str) -> Option { use std::process::Command; let out = Command::new("pactl") .args(["list", "short", "sources"]) .output() .ok()?; let list = String::from_utf8_lossy(&out.stdout); list.lines().find_map(|ln| { let mut cols = ln.split_whitespace(); let _id = cols.next()?; let name = cols.next()?; // column #1 if name.contains(fragment) { Some(name.to_owned()) } else { None } }) } /// Pick the first non-monitor Pulse source if available; otherwise empty. fn default_source_arg() -> String { use std::process::Command; let out = Command::new("pactl") .args(["list", "short", "sources"]) .output(); if let Ok(out) = out { let list = String::from_utf8_lossy(&out.stdout); if let Some(name) = list .lines() .filter_map(|ln| ln.split_whitespace().nth(1)) .find(|name| !name.ends_with(".monitor")) { return format!("device={}", escape(name.into())); } } String::new() } }