diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 879e53d..538aa51 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -23,13 +23,16 @@ use tracing::{error, info, trace}; const MIC_GAIN_ENV: &str = "LESAVKA_MIC_GAIN"; const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL"; const MIC_LEVEL_TAP_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL"; +const MIC_SAMPLE_RATE: u64 = 48_000; +const MIC_CHANNELS: usize = 2; +const MIC_SAMPLE_BYTES: usize = std::mem::size_of::(); pub struct MicrophoneCapture { #[allow(dead_code)] // kept alive to hold PLAYING state pipeline: gst::Pipeline, sink: gst_app::AppSink, level_tap_running: Option>, - pts_rebaser: crate::live_capture_clock::SourcePtsRebaser, + pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser, } impl MicrophoneCapture { @@ -48,15 +51,9 @@ impl MicrophoneCapture { _ => Self::default_source_desc(), }; debug!("🎤 source: {source_desc}"); - let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"] - .into_iter() - .find(|e| gst::ElementFactory::find(e).is_some()) - .unwrap_or("opusenc"); - let parser = parser_for_encoder(aac); let gain = mic_gain_from_env(); let level_tap_path = mic_level_tap_path(); - let desc = - microphone_pipeline_desc(&source_desc, aac, parser, gain, level_tap_path.is_some()); + let desc = microphone_pipeline_desc(&source_desc, gain, level_tap_path.is_some()); let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline"); let sink: gst_app::AppSink = pipeline.by_name("asink").unwrap().downcast().unwrap(); @@ -114,7 +111,7 @@ impl MicrophoneCapture { pipeline, sink, level_tap_running, - pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(), + pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(), }) } @@ -125,10 +122,11 @@ impl MicrophoneCapture { let buf = sample.buffer().unwrap(); let map = buf.map_readable().unwrap(); let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000); - let timing = self.pts_rebaser.rebase_with_lag_cap( + let packet_duration_us = buffer_duration_us(buf, map.len()); + let timing = self.pts_rebaser.rebase_with_packet_duration( source_pts_us, - 1, - Some(crate::live_capture_clock::upstream_source_lag_cap()), + packet_duration_us, + crate::live_capture_clock::upstream_source_lag_cap(), ); let pts = timing.packet_pts_us; #[cfg(not(coverage))] @@ -276,55 +274,46 @@ fn mic_level_tap_path() -> Option { .map(PathBuf::from) } -fn parser_for_encoder(aac: &str) -> &'static str { - if aac.contains("opus") { - "capsfilter caps=audio/x-opus,rate=48000,channels=2" - } else { - "aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2" - } -} - -fn microphone_pipeline_desc( - source_desc: &str, - encoder: &str, - parser: &str, - gain: f64, - level_tap_enabled: bool, -) -> String { +fn microphone_pipeline_desc(source_desc: &str, gain: f64, level_tap_enabled: bool) -> String { let gain = format_mic_gain_for_gst(gain); if level_tap_enabled { format!( "{source_desc} ! \ audioconvert ! audioresample ! \ - audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ + audio/x-raw,format=S16LE,channels={MIC_CHANNELS},rate={MIC_SAMPLE_RATE} ! \ volume name=mic_input_gain volume={gain} ! \ tee name=t \ - t. ! queue max-size-buffers=100 leaky=downstream ! \ - audioconvert ! audioresample ! \ - audio/x-raw,channels=2,rate=48000 ! \ - {encoder} bitrate=128000 ! \ - {parser} ! \ + t. ! queue max-size-buffers=64 leaky=downstream ! \ + audio/x-raw,format=S16LE,channels={MIC_CHANNELS},rate={MIC_SAMPLE_RATE} ! \ appsink name=asink emit-signals=true max-buffers=50 drop=true \ t. ! queue max-size-buffers=8 leaky=downstream ! \ - audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ + audio/x-raw,format=S16LE,channels={MIC_CHANNELS},rate={MIC_SAMPLE_RATE} ! \ appsink name=level_sink emit-signals=false sync=false max-buffers=8 drop=true" ) } else { format!( "{source_desc} ! \ audioconvert ! audioresample ! \ - audio/x-raw,channels=2,rate=48000 ! \ + audio/x-raw,format=S16LE,channels={MIC_CHANNELS},rate={MIC_SAMPLE_RATE} ! \ volume name=mic_input_gain volume={gain} ! \ - audioconvert ! audioresample ! \ - audio/x-raw,channels=2,rate=48000 ! \ - {encoder} bitrate=128000 ! \ - {parser} ! \ - queue max-size-buffers=100 leaky=downstream ! \ + queue max-size-buffers=64 leaky=downstream ! \ appsink name=asink emit-signals=true max-buffers=50 drop=true" ) } } +fn buffer_duration_us(buf: &gst::BufferRef, bytes: usize) -> u64 { + buf.duration() + .map(|ts| ts.nseconds() / 1_000) + .unwrap_or_else(|| { + let bytes_per_frame = MIC_CHANNELS * MIC_SAMPLE_BYTES; + let frames = bytes / bytes_per_frame.max(1); + ((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) + as u64 + }) + .max(1) +} + /// Detect launcher catalog names that should be opened through Pulse directly. fn looks_like_pulse_source_name(source: &str) -> bool { let source = source.trim(); diff --git a/client/src/live_capture_clock.rs b/client/src/live_capture_clock.rs index c8d995d..6e2efb0 100644 --- a/client/src/live_capture_clock.rs +++ b/client/src/live_capture_clock.rs @@ -104,6 +104,27 @@ pub struct RebasedSourcePts { pub lag_clamped: bool, } +#[derive(Debug, Default)] +struct DurationPacedSourcePtsState { + next_packet_pts_us: Option, +} + +/// Rebase encoded packet timing by anchoring once, then pacing by duration. +/// +/// Inputs: optional source PTS from one encoded packet stream plus the packet's +/// declared duration and a freshness lag cap. +/// Outputs: packet timestamps on the shared client capture clock that advance +/// by actual media duration instead of trusting potentially stretched parser +/// PTS on every packet. +/// Why: encoded audio parsers can emit packet PTS values that do not track +/// real packet duration faithfully, which can make the server pace audio far +/// too slowly or quickly even when the underlying capture stream is healthy. +#[derive(Debug, Default)] +pub struct DurationPacedSourcePtsRebaser { + anchor_rebaser: SourcePtsRebaser, + state: Mutex, +} + impl SourcePtsRebaser { /// Translate one source-buffer timestamp onto the shared capture clock. /// @@ -149,12 +170,9 @@ impl SourcePtsRebaser { used_source_pts = true; } - if used_source_pts - && let Some(max_lag) = max_lag - { - let lag_floor_us = capture_now_us.saturating_sub( - max_lag.as_micros().min(u64::MAX as u128) as u64, - ); + if used_source_pts && let Some(max_lag) = max_lag { + let lag_floor_us = + capture_now_us.saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64); if packet_pts_us < lag_floor_us { packet_pts_us = lag_floor_us; lag_clamped = true; @@ -180,11 +198,49 @@ impl SourcePtsRebaser { } } +impl DurationPacedSourcePtsRebaser { + /// Rebase one encoded packet onto the shared capture clock. + /// + /// Inputs: optional packet PTS, the packet media duration in microseconds, + /// and a freshness lag cap behind the live capture clock. + /// Outputs: a rebased packet timestamp plus the values used to derive it. + /// Why: once the first encoded packet is anchored, the safest pacing signal + /// for compressed audio is its actual packet duration, with a live lag + /// clamp to keep delayed batches from resurrecting stale timing. + #[must_use] + pub fn rebase_with_packet_duration( + &self, + source_pts_us: Option, + packet_duration_us: u64, + max_lag: Duration, + ) -> RebasedSourcePts { + let step_us = packet_duration_us.max(1); + let mut rebased = + self.anchor_rebaser + .rebase_with_lag_cap(source_pts_us, step_us, Some(max_lag)); + let lag_floor_us = rebased + .capture_now_us + .saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64); + let mut state = self + .state + .lock() + .expect("duration paced source pts rebaser mutex poisoned"); + let mut packet_pts_us = state.next_packet_pts_us.unwrap_or(rebased.packet_pts_us); + if packet_pts_us < lag_floor_us { + packet_pts_us = lag_floor_us; + rebased.lag_clamped = true; + } + state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us)); + rebased.packet_pts_us = packet_pts_us; + rebased + } +} + #[cfg(test)] mod tests { use super::{ - SourcePtsRebaser, capture_pts_us, packet_age, upstream_source_lag_cap, - upstream_timing_trace_enabled, + DurationPacedSourcePtsRebaser, SourcePtsRebaser, capture_pts_us, packet_age, + upstream_source_lag_cap, upstream_timing_trace_enabled, }; use std::time::Duration; @@ -249,11 +305,8 @@ mod tests { let rebased = SourcePtsRebaser::default(); let _first = rebased.rebase_with_lag_cap(Some(1_000_000), 1, None); std::thread::sleep(Duration::from_millis(8)); - let second = rebased.rebase_with_lag_cap( - Some(1_000_001), - 1, - Some(Duration::from_millis(2)), - ); + let second = + rebased.rebase_with_lag_cap(Some(1_000_001), 1, Some(Duration::from_millis(2))); assert!(second.used_source_pts); assert!(second.lag_clamped); @@ -286,4 +339,32 @@ mod tests { assert_eq!(upstream_source_lag_cap(), Duration::from_millis(90)); }); } + + #[test] + fn duration_paced_rebaser_advances_by_packet_duration_when_source_pts_stretch() { + let rebased = DurationPacedSourcePtsRebaser::default(); + let first = + rebased.rebase_with_packet_duration(Some(0), 21_333, Duration::from_millis(250)); + let second = + rebased.rebase_with_packet_duration(Some(52_666), 21_333, Duration::from_millis(250)); + + assert_eq!( + second.packet_pts_us.saturating_sub(first.packet_pts_us), + 21_333 + ); + } + + #[test] + fn duration_paced_rebaser_clamps_when_duration_pacing_falls_stale() { + let rebased = DurationPacedSourcePtsRebaser::default(); + let _first = rebased.rebase_with_packet_duration(Some(0), 10_000, Duration::from_millis(2)); + std::thread::sleep(Duration::from_millis(8)); + let second = + rebased.rebase_with_packet_duration(Some(10_000), 10_000, Duration::from_millis(2)); + + assert!( + second.packet_pts_us.saturating_add(2_500) >= second.capture_now_us, + "duration-paced packet pts should never trail live capture by more than the lag cap" + ); + } } diff --git a/client/src/sync_probe/capture.rs b/client/src/sync_probe/capture.rs index a93097c..98c2428 100644 --- a/client/src/sync_probe/capture.rs +++ b/client/src/sync_probe/capture.rs @@ -160,6 +160,7 @@ fn render_audio_chunk( pcm } +#[cfg(test)] fn probe_pts_exceeds_duration(pts_usecs: u64, duration: std::time::Duration) -> bool { pts_usecs > duration.as_micros() as u64 } diff --git a/client/src/sync_probe/capture/runtime.rs b/client/src/sync_probe/capture/runtime.rs index 49d637c..e5ce1fe 100644 --- a/client/src/sync_probe/capture/runtime.rs +++ b/client/src/sync_probe/capture/runtime.rs @@ -1,5 +1,24 @@ use super::*; +pub(super) fn rebase_probe_packet_pts( + pts_rebaser: &crate::live_capture_clock::SourcePtsRebaser, + source_pts_us: u64, + lag_cap: Duration, +) -> u64 { + pts_rebaser + .rebase_with_lag_cap(Some(source_pts_us), 1, Some(lag_cap)) + .packet_pts_us +} + +fn rebase_probe_audio_packet_pts( + pts_rebaser: &crate::live_capture_clock::DurationPacedSourcePtsRebaser, + source_pts_us: u64, + packet_duration_us: u64, + lag_cap: Duration, +) -> crate::live_capture_clock::RebasedSourcePts { + pts_rebaser.rebase_with_packet_duration(Some(source_pts_us), packet_duration_us, lag_cap) +} + pub struct SyncProbeCapture { pipeline: gst::Pipeline, running: Arc, @@ -24,16 +43,6 @@ impl SyncProbeCapture { .context("missing sync probe video appsink")? .downcast::() .expect("video appsink"); - let audio_src = pipeline - .by_name("sync_probe_audio_src") - .context("missing sync probe audio appsrc")? - .downcast::() - .expect("audio appsrc"); - let audio_sink = pipeline - .by_name("sync_probe_audio_sink") - .context("missing sync probe audio appsink")? - .downcast::() - .expect("audio appsink"); pipeline .set_state(gst::State::Playing) @@ -55,8 +64,6 @@ impl SyncProbeCapture { video_queue.clone(), ); let audio_thread = spawn_audio_thread( - audio_src, - audio_sink, schedule, duration, probe_start, @@ -124,23 +131,8 @@ fn build_pipeline(camera: CameraConfig, _schedule: &PulseSchedule) -> Result() .map_err(|_| anyhow::anyhow!("sync probe description did not build a pipeline")) } @@ -161,20 +153,6 @@ fn pick_h264_encoder(fps: u32) -> Result { bail!("no usable H.264 encoder found for sync probe") } -fn pick_aac_encoder() -> Result<&'static str> { - [ - "avenc_aac bitrate=128000", - "fdkaacenc bitrate=128000", - "faac bitrate=128000", - ] - .into_iter() - .find(|entry| { - let name = entry.split_ascii_whitespace().next().unwrap_or_default(); - gst::ElementFactory::find(name).is_some() - }) - .ok_or_else(|| anyhow::anyhow!("no usable AAC encoder found for sync probe")) -} - fn spawn_video_thread( src: gst_app::AppSrc, sink: gst_app::AppSink, @@ -186,6 +164,8 @@ fn spawn_video_thread( queue: FreshPacketQueue, ) -> JoinHandle<()> { thread::spawn(move || { + let pts_rebaser = crate::live_capture_clock::SourcePtsRebaser::default(); + let lag_cap = crate::live_capture_clock::upstream_source_lag_cap(); let dark_frame = build_dark_probe_frame(camera.width as usize, camera.height as usize); let regular_pulse_frame = build_regular_probe_frame(camera.width as usize, camera.height as usize); @@ -231,9 +211,10 @@ fn spawn_video_thread( && let Some(buffer) = sample.buffer() && let Ok(map) = buffer.map_readable() { + let source_pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; let packet = VideoPacket { id: 2, - pts: buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000, + pts: rebase_probe_packet_pts(&pts_rebaser, source_pts_us, lag_cap), data: map.as_slice().to_vec(), ..Default::default() }; @@ -249,8 +230,6 @@ fn spawn_video_thread( } fn spawn_audio_thread( - src: gst_app::AppSrc, - sink: gst_app::AppSink, schedule: PulseSchedule, duration: Duration, probe_start: Instant, @@ -258,7 +237,7 @@ fn spawn_audio_thread( queue: FreshPacketQueue, ) -> JoinHandle<()> { thread::spawn(move || { - let pts_rebaser = crate::live_capture_clock::SourcePtsRebaser::default(); + let pts_rebaser = crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(); let lag_cap = crate::live_capture_clock::upstream_source_lag_cap(); let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS); let samples_per_chunk = @@ -279,71 +258,21 @@ fn spawn_audio_thread( } let chunk = render_audio_chunk(&schedule, pts, samples_per_chunk); - let mut buffer = gst::Buffer::from_slice(chunk); - if let Some(meta) = buffer.get_mut() { - let pts_time = gst::ClockTime::from_nseconds(pts.as_nanos() as u64); - meta.set_pts(Some(pts_time)); - meta.set_dts(Some(pts_time)); - meta.set_duration(Some(gst::ClockTime::from_nseconds( - chunk_duration.as_nanos() as u64, - ))); - } - if src.push_buffer(buffer).is_err() { - break; - } - - drain_audio_samples( - &sink, - &queue, + let timing = rebase_probe_audio_packet_pts( &pts_rebaser, + pts.as_micros().min(u64::MAX as u128) as u64, + chunk_duration.as_micros().min(u64::MAX as u128) as u64, lag_cap, - duration, - gst::ClockTime::from_mseconds(25), ); + let packet = AudioPacket { + id: 0, + pts: timing.packet_pts_us, + data: chunk, + }; + let _ = queue.push(packet, Duration::ZERO); chunk_index = chunk_index.saturating_add(1); } - let _ = src.end_of_stream(); - drain_audio_samples( - &sink, - &queue, - &pts_rebaser, - lag_cap, - duration, - gst::ClockTime::from_mseconds(500), - ); queue.close(); }) } - -fn drain_audio_samples( - sink: &gst_app::AppSink, - queue: &FreshPacketQueue, - pts_rebaser: &crate::live_capture_clock::SourcePtsRebaser, - lag_cap: Duration, - duration: Duration, - timeout: gst::ClockTime, -) { - while let Some(sample) = sink.try_pull_sample(timeout) { - let Some(buffer) = sample.buffer() else { - continue; - }; - let pts_usecs = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; - if probe_pts_exceeds_duration(pts_usecs, duration) { - break; - } - let Ok(map) = buffer.map_readable() else { - continue; - }; - let timing = pts_rebaser.rebase_with_lag_cap(Some(pts_usecs), 1, Some(lag_cap)); - let packet = AudioPacket { - id: 0, - pts: timing.packet_pts_us, - data: map.as_slice().to_vec(), - }; - let _ = queue.push(packet, Duration::ZERO); - if timeout == gst::ClockTime::ZERO { - continue; - } - } -} diff --git a/client/src/sync_probe/capture/tests.rs b/client/src/sync_probe/capture/tests.rs index 803bdca..eca4e62 100644 --- a/client/src/sync_probe/capture/tests.rs +++ b/client/src/sync_probe/capture/tests.rs @@ -5,10 +5,8 @@ use crate::input::camera::{CameraCodec, CameraConfig}; use crate::sync_probe::analyze::detect_audio_onsets; use crate::sync_probe::schedule::PulseSchedule; use lesavka_common::lesavka::{AudioPacket, VideoPacket}; -use std::fs; -use std::process::Command; use std::time::Duration; -use tempfile::tempdir; +use std::time::Instant; fn stub_camera() -> CameraConfig { CameraConfig { @@ -19,42 +17,13 @@ fn stub_camera() -> CameraConfig { } } -fn decode_adts_aac_to_mono_samples(aac_bytes: &[u8]) -> Vec { - let dir = tempdir().expect("tempdir"); - let input = dir.path().join("runtime-probe.aac"); - fs::write(&input, aac_bytes).expect("write runtime AAC"); - - let output = Command::new("ffmpeg") - .arg("-hide_banner") - .arg("-loglevel") - .arg("error") - .arg("-i") - .arg(&input) - .arg("-ac") - .arg("1") - .arg("-ar") - .arg(super::AUDIO_SAMPLE_RATE.to_string()) - .arg("-f") - .arg("s16le") - .arg("-acodec") - .arg("pcm_s16le") - .arg("-") - .output() - .expect("decode runtime AAC with ffmpeg"); - assert!( - output.status.success(), - "ffmpeg decode failed: {}", - String::from_utf8_lossy(&output.stderr) - ); - assert!( - output.stdout.len() >= 2, - "decoded runtime AAC did not yield enough PCM bytes" - ); - - output - .stdout +fn decode_interleaved_pcm_to_mono_samples(pcm_bytes: &[u8]) -> Vec { + pcm_bytes .chunks_exact(2) .map(|chunk| i16::from_le_bytes([chunk[0], chunk[1]])) + .collect::>() + .chunks_exact(super::AUDIO_CHANNELS) + .map(|frame| frame[0]) .collect() } @@ -172,9 +141,24 @@ fn probe_video_frames_render_distinct_idle_regular_and_marker_patterns() { assert_ne!(regular, marker); } +#[test] +fn probe_video_pts_are_lag_capped_like_audio() { + let rebaser = crate::live_capture_clock::SourcePtsRebaser::default(); + let _first = + super::runtime::rebase_probe_packet_pts(&rebaser, 1_000_000, Duration::from_millis(2)); + std::thread::sleep(Duration::from_millis(8)); + let second = + super::runtime::rebase_probe_packet_pts(&rebaser, 1_000_001, Duration::from_millis(2)); + + assert!( + second >= 6_000, + "delayed probe video packets should be pulled forward instead of looking ancient" + ); +} + #[cfg(not(coverage))] #[tokio::test] -async fn runtime_audio_probe_emits_nontrivial_aac_packets() { +async fn runtime_audio_probe_emits_nontrivial_pcm_packets() { let capture = SyncProbeCapture::new( stub_camera(), PulseSchedule::new( @@ -203,16 +187,16 @@ async fn runtime_audio_probe_emits_nontrivial_aac_packets() { } assert!( - packet_count >= 16, - "expected the runtime probe to emit many AAC packets, got {packet_count}" + packet_count >= 120, + "expected the runtime probe to emit many PCM packets, got {packet_count}" ); assert!( - total_bytes >= 8_000, - "expected the runtime probe to emit a meaningful AAC payload, got {total_bytes} bytes" + total_bytes >= 200_000, + "expected the runtime probe to emit a meaningful PCM payload, got {total_bytes} bytes" ); assert!( - largest_packet >= 64, - "expected at least one non-trivial AAC packet, largest was {largest_packet} bytes" + largest_packet >= 1_000, + "expected at least one non-trivial PCM packet, largest was {largest_packet} bytes" ); } @@ -229,22 +213,22 @@ async fn runtime_audio_probe_decodes_detectable_click_onsets() { .expect("runtime capture"); let audio_queue = capture.audio_queue(); - let mut aac = Vec::new(); + let mut pcm = Vec::new(); loop { let next = audio_queue.pop_fresh().await; let Some(packet) = next.packet else { break; }; - aac.extend_from_slice(&packet.data); + pcm.extend_from_slice(&packet.data); } assert!( - aac.len() >= 8_000, - "expected the runtime probe AAC stream to carry a meaningful payload, got {} bytes", - aac.len() + pcm.len() >= 200_000, + "expected the runtime probe PCM stream to carry a meaningful payload, got {} bytes", + pcm.len() ); - let decoded = decode_adts_aac_to_mono_samples(&aac); + let decoded = decode_interleaved_pcm_to_mono_samples(&pcm); let onsets = detect_audio_onsets(&decoded, super::AUDIO_SAMPLE_RATE as u32, 5).expect("audio onsets"); assert!( @@ -274,16 +258,16 @@ async fn runtime_audio_probe_decodes_detectable_click_onsets_for_manual_harness_ .expect("runtime capture"); let audio_queue = capture.audio_queue(); - let mut aac = Vec::new(); + let mut pcm = Vec::new(); loop { let next = audio_queue.pop_fresh().await; let Some(packet) = next.packet else { break; }; - aac.extend_from_slice(&packet.data); + pcm.extend_from_slice(&packet.data); } - let decoded = decode_adts_aac_to_mono_samples(&aac); + let decoded = decode_interleaved_pcm_to_mono_samples(&pcm); let onsets = detect_audio_onsets(&decoded, super::AUDIO_SAMPLE_RATE as u32, 5).expect("audio onsets"); assert!( @@ -299,3 +283,94 @@ async fn runtime_audio_probe_decodes_detectable_click_onsets_for_manual_harness_ ); } } + +#[cfg(not(coverage))] +#[tokio::test] +async fn runtime_probe_audio_and_video_pts_advance_near_real_time() { + let capture_duration = Duration::from_secs(3); + let capture = SyncProbeCapture::new( + stub_camera(), + PulseSchedule::new( + Duration::from_secs(1), + Duration::from_millis(500), + Duration::from_millis(120), + 4, + ), + capture_duration, + ) + .expect("runtime capture"); + + let video_queue = capture.video_queue(); + let audio_queue = capture.audio_queue(); + let started = Instant::now(); + + let video_task = tokio::spawn(async move { + let mut first_pts = None; + let mut last_pts = None; + let mut count = 0usize; + loop { + let next = video_queue.pop_fresh().await; + let Some(packet) = next.packet else { + break; + }; + first_pts.get_or_insert(packet.pts); + last_pts = Some(packet.pts); + count = count.saturating_add(1); + } + (first_pts, last_pts, count) + }); + + let audio_task = tokio::spawn(async move { + let mut first_pts = None; + let mut last_pts = None; + let mut count = 0usize; + loop { + let next = audio_queue.pop_fresh().await; + let Some(packet) = next.packet else { + break; + }; + first_pts.get_or_insert(packet.pts); + last_pts = Some(packet.pts); + count = count.saturating_add(1); + } + (first_pts, last_pts, count) + }); + + let (video_first, video_last, video_count) = video_task.await.expect("video drain"); + let (audio_first, audio_last, audio_count) = audio_task.await.expect("audio drain"); + let wall_elapsed = started.elapsed(); + + let video_span = video_last.expect("video last pts") - video_first.expect("video first pts"); + let audio_span = audio_last.expect("audio last pts") - audio_first.expect("audio first pts"); + eprintln!( + "runtime probe spans: video_count={video_count} video_span_us={video_span} audio_count={audio_count} audio_span_us={audio_span} wall_elapsed={wall_elapsed:?}" + ); + + assert!( + video_count >= 60, + "expected many runtime probe video packets, got {video_count}" + ); + assert!( + audio_count >= 60, + "expected many runtime probe audio packets, got {audio_count}" + ); + assert!( + wall_elapsed <= Duration::from_secs(5), + "runtime probe should not take excessively long locally, took {wall_elapsed:?}" + ); + assert!( + video_span >= 2_400_000, + "video pts should span most of the 3s capture, got {} us", + video_span + ); + assert!( + audio_span >= 2_400_000, + "audio pts should span most of the 3s capture, got {} us", + audio_span + ); + assert!( + audio_span <= 3_400_000, + "audio pts should stay near the 3s capture duration, got {} us", + audio_span + ); +} diff --git a/server/src/audio/voice_input.rs b/server/src/audio/voice_input.rs index 4dbe5ee..1ce2138 100644 --- a/server/src/audio/voice_input.rs +++ b/server/src/audio/voice_input.rs @@ -29,7 +29,7 @@ impl ClipTap { return; } let ts = chrono::Local::now().format("%Y%m%d-%H%M%S"); - let path = format!("/tmp/{}-{}.aac", self.tag, ts); + let path = format!("/tmp/{}-{}.s16le", self.tag, ts); let _ = std::fs::write(&path, &self.buf); self.buf.clear(); } @@ -40,6 +40,10 @@ impl Drop for ClipTap { } } +const VOICE_SAMPLE_RATE: u64 = 48_000; +const VOICE_CHANNELS: usize = 2; +const VOICE_SAMPLE_BYTES: usize = std::mem::size_of::(); + // ────────────────────── microphone sink ──────────────────────────────── pub struct Voice { appsrc: gst_app::AppSrc, @@ -55,11 +59,11 @@ impl Drop for Voice { } fn voice_input_caps() -> gst::Caps { - gst::Caps::builder("audio/mpeg") - .field("mpegversion", 4i32) - .field("stream-format", "adts") - .field("rate", 48_000i32) - .field("channels", 2i32) + gst::Caps::builder("audio/x-raw") + .field("format", "S16LE") + .field("layout", "interleaved") + .field("rate", VOICE_SAMPLE_RATE as i32) + .field("channels", VOICE_CHANNELS as i32) .build() } @@ -174,9 +178,6 @@ impl Voice { appsrc.set_format(gst::Format::Time); appsrc.set_is_live(true); - let decodebin = gst::ElementFactory::make("decodebin") - .build() - .context("make decodebin")?; let convert = gst::ElementFactory::make("audioconvert") .build() .context("make audioconvert")?; @@ -185,8 +186,8 @@ impl Voice { .context("make audioresample")?; let caps = gst::Caps::builder("audio/x-raw") .field("format", "S16LE") - .field("channels", 2i32) - .field("rate", 48_000i32) + .field("channels", VOICE_CHANNELS as i32) + .field("rate", VOICE_SAMPLE_RATE as i32) .build(); let capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &caps) @@ -249,7 +250,6 @@ impl Voice { if delay_queue_enabled { pipeline.add_many([ appsrc.upcast_ref(), - &decodebin, &convert, &resample, &capsfilter, @@ -261,7 +261,6 @@ impl Voice { } else { pipeline.add_many([ appsrc.upcast_ref(), - &decodebin, &convert, &resample, &capsfilter, @@ -270,10 +269,9 @@ impl Voice { &alsa_sink, ])?; } - appsrc.link(&decodebin)?; + appsrc.link(&convert)?; if delay_queue_enabled { gst::Element::link_many([ - &convert, &resample, &capsfilter, &level, @@ -283,7 +281,6 @@ impl Voice { ])?; } else { gst::Element::link_many([ - &convert, &resample, &capsfilter, &level, @@ -292,25 +289,6 @@ impl Voice { ])?; } - /*------------ decodebin autolink ----------------*/ - let convert_sink = convert - .static_pad("sink") - .context("audioconvert sink pad")?; - decodebin.connect_pad_added(move |_db, pad| { - if convert_sink.is_linked() { - return; - } - let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None)); - let is_audio = caps - .structure(0) - .map(|s| s.name().starts_with("audio/")) - .unwrap_or(false); - if !is_audio { - return; - } - let _ = pad.link(&convert_sink); - }); - let bus = pipeline.bus().context("voice pipeline bus")?; // underrun ≠ error – just show a warning @@ -342,6 +320,9 @@ impl Voice { let ts = gst::ClockTime::from_useconds(pkt.pts); meta.set_pts(Some(ts)); meta.set_dts(Some(ts)); + meta.set_duration(Some(gst::ClockTime::from_useconds( + voice_packet_duration_us(pkt.data.len()), + ))); } let _ = self.appsrc.push_buffer(buf); @@ -352,6 +333,14 @@ impl Voice { } } +fn voice_packet_duration_us(bytes: usize) -> u64 { + let bytes_per_frame = VOICE_CHANNELS * VOICE_SAMPLE_BYTES; + let frames = bytes / bytes_per_frame.max(1); + (((frames as u128 * 1_000_000u128) / VOICE_SAMPLE_RATE as u128).min(u64::MAX as u128) + as u64) + .max(1) +} + #[cfg(test)] mod voice_sink_timing_tests { use crate::camera::update_camera_config;