// Server-side Opus decode for upstream microphone packets. // // Why: UAC output remains a raw PCM gadget sink; compressed transport packets // must be decoded immediately at ingress so existing playout timing and // calibration continue to operate on the same PCM contract. use anyhow::{Context, Result, bail}; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::{ audio_transport, lesavka::AudioPacket, }; use std::collections::VecDeque; use std::time::Duration; const OPUS_DECODE_PULL_TIMEOUT: Duration = Duration::from_millis(30); const MAX_PENDING_OPUS_METADATA: usize = 16; pub(super) struct OpusPacketDecoder { _pipeline: gst::Pipeline, appsrc: gst_app::AppSrc, appsink: gst_app::AppSink, pending_packets: VecDeque, } impl OpusPacketDecoder { pub(super) fn new() -> Result { gst::init().context("gst init")?; if gst::ElementFactory::find("opusdec").is_none() { bail!("GStreamer opusdec plugin is not available"); } let desc = "\ appsrc name=src is-live=true block=false format=time \ caps=audio/x-opus,channel-mapping-family=0 ! \ opusdec plc=false use-inband-fec=false min-latency=0 tolerance=0 ! \ audioconvert ! audioresample ! \ audio/x-raw,format=S16LE,layout=interleaved,channels=2,rate=48000 ! \ appsink name=sink emit-signals=false sync=false max-buffers=8 drop=true"; let pipeline: gst::Pipeline = gst::parse::launch(desc)?.downcast().expect("pipeline"); let appsrc = pipeline .by_name("src") .context("missing opus decoder appsrc")? .downcast::() .expect("opus decoder appsrc"); let appsink = pipeline .by_name("sink") .context("missing opus decoder appsink")? .downcast::() .expect("opus decoder appsink"); pipeline .set_state(gst::State::Playing) .context("start opus decoder pipeline")?; Ok(Self { _pipeline: pipeline, appsrc, appsink, pending_packets: VecDeque::new(), }) } pub(super) fn decode_packet(&mut self, packet: &AudioPacket) -> Result> { push_pending_packet(&mut self.pending_packets, packet_metadata(packet)); let mut buffer = gst::Buffer::from_slice(packet.data.clone()); if let Some(meta) = buffer.get_mut() { let pts = gst::ClockTime::from_useconds(packet.pts); meta.set_pts(Some(pts)); meta.set_dts(Some(pts)); meta.set_duration(Some(gst::ClockTime::from_useconds(u64::from( packet.frame_duration_us.max(1), )))); } self.appsrc .push_buffer(buffer) .context("push opus into decoder")?; let Some(sample) = self.appsink .try_pull_sample(gst::ClockTime::from_nseconds( OPUS_DECODE_PULL_TIMEOUT .as_nanos() .min(u128::from(u64::MAX)) as u64, )) else { return Ok(None); }; let sample_buffer = sample.buffer().context("opus decoder sample missing buffer")?; let sample_pts_us = sample_buffer.pts().map(|pts| pts.nseconds() / 1_000); let sample_duration_us = sample_buffer .duration() .map(|duration| duration.nseconds() / 1_000); let decoded = sample_buffer .map_readable() .context("map decoded pcm")? .to_vec(); if decoded.is_empty() { return Ok(None); } let mut output = self .take_pending_packet(sample_pts_us) .unwrap_or_else(|| packet_metadata(packet)); output.data = decoded; if let Some(pts_us) = sample_pts_us { output.pts = pts_us; } if let Some(duration_us) = sample_duration_us { output.frame_duration_us = duration_us.min(u64::from(u32::MAX)) as u32; } audio_transport::mark_packet_pcm_s16le(&mut output); Ok(Some(output)) } fn take_pending_packet(&mut self, sample_pts_us: Option) -> Option { if let Some(pts_us) = sample_pts_us && let Some(index) = self .pending_packets .iter() .position(|packet| packet.pts == pts_us) { return self.pending_packets.drain(..=index).next_back(); } self.pending_packets.pop_front() } } fn packet_metadata(packet: &AudioPacket) -> AudioPacket { let mut metadata = packet.clone(); metadata.data.clear(); metadata } fn push_pending_packet(pending: &mut VecDeque, packet: AudioPacket) { pending.push_back(packet); while pending.len() > MAX_PENDING_OPUS_METADATA { pending.pop_front(); } } impl Drop for OpusPacketDecoder { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); } } #[cfg(test)] mod tests { use super::*; use lesavka_common::lesavka::AudioEncoding; #[test] fn opus_decoder_roundtrips_to_pcm_when_plugins_are_available() { let _ = gst::init(); if gst::ElementFactory::find("opusenc").is_none() || gst::ElementFactory::find("opusdec").is_none() { return; } let Some(opus_payload) = encode_silent_opus_payload() else { return; }; let packet = AudioPacket { pts: 42_000, encoding: AudioEncoding::Opus as i32, sample_rate: 48_000, channels: 2, frame_duration_us: 20_000, data: opus_payload, ..AudioPacket::default() }; let mut decoder = OpusPacketDecoder::new().expect("opus decoder"); let decoded = decoder .decode_packet(&packet) .expect("decode") .expect("decoded pcm"); assert_eq!(decoded.pts, 42_000); assert_eq!(decoded.encoding, AudioEncoding::PcmS16le as i32); assert_eq!(decoded.sample_rate, 48_000); assert_eq!(decoded.channels, 2); assert!( decoded.data.len() >= 1_000, "decoded PCM should be far larger than one compressed Opus frame" ); assert!( decoded.data.chunks_exact(2).any(|sample| { i16::from_le_bytes([sample[0], sample[1]]).unsigned_abs() > 250 }), "decoded Opus payload should preserve non-silent waveform energy" ); } fn encode_silent_opus_payload() -> Option> { let desc = "\ appsrc name=src is-live=true block=false format=time \ caps=audio/x-raw,format=S16LE,layout=interleaved,channels=2,rate=48000 ! \ opusenc audio-type=restricted-lowdelay bitrate=96000 bitrate-type=cbr complexity=7 frame-size=20 perfect-timestamp=true hard-resync=true ! \ appsink name=sink emit-signals=false sync=false max-buffers=8 drop=true"; let pipeline: gst::Pipeline = gst::parse::launch(desc).ok()?.downcast().ok()?; let appsrc = pipeline .by_name("src")? .downcast::() .ok()?; let appsink = pipeline .by_name("sink")? .downcast::() .ok()?; pipeline.set_state(gst::State::Playing).ok()?; for index in 0..4u64 { let mut buffer = gst::Buffer::from_slice(sine_pcm_packet(index, 3_840)); if let Some(meta) = buffer.get_mut() { let pts = gst::ClockTime::from_useconds(index * 20_000); meta.set_pts(Some(pts)); meta.set_dts(Some(pts)); meta.set_duration(Some(gst::ClockTime::from_useconds(20_000))); } appsrc.push_buffer(buffer).ok()?; if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(50)) { let payload = sample.buffer()?.map_readable().ok()?.to_vec(); let _ = pipeline.set_state(gst::State::Null); if !payload.is_empty() { return Some(payload); } } } let _ = pipeline.set_state(gst::State::Null); None } fn sine_pcm_packet(packet_index: u64, len: usize) -> Vec { let mut out = Vec::with_capacity(len); let frames = len / 4; for frame in 0..frames { let absolute = packet_index as usize * frames + frame; let phase = (absolute as f32 * 440.0 * std::f32::consts::TAU) / 48_000.0; let sample = (phase.sin() * 12_000.0) as i16; out.extend_from_slice(&sample.to_le_bytes()); out.extend_from_slice(&sample.to_le_bytes()); } out } }