From 28cbdc88081d1945e739d4913f335cf758e4137e Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 25 Apr 2026 23:04:31 -0300 Subject: [PATCH] fix(sync): cap delayed audio source timestamps --- client/src/input/microphone.rs | 7 +- client/src/live_capture_clock.rs | 88 +++++++++++++++++++++++- client/src/sync_probe/capture/runtime.rs | 25 ++++++- 3 files changed, 115 insertions(+), 5 deletions(-) diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index c40542f..879e53d 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -125,7 +125,11 @@ impl MicrophoneCapture { let buf = sample.buffer().unwrap(); let map = buf.map_readable().unwrap(); let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000); - let timing = self.pts_rebaser.rebase_or_now(source_pts_us, 1); + let timing = self.pts_rebaser.rebase_with_lag_cap( + source_pts_us, + 1, + Some(crate::live_capture_clock::upstream_source_lag_cap()), + ); let pts = timing.packet_pts_us; #[cfg(not(coverage))] { @@ -144,6 +148,7 @@ impl MicrophoneCapture { pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, used_source_pts = timing.used_source_pts, + lag_clamped = timing.lag_clamped, bytes = map.len(), "🎤 upstream microphone timing sample" ); diff --git a/client/src/live_capture_clock.rs b/client/src/live_capture_clock.rs index 9a32d5d..c8d995d 100644 --- a/client/src/live_capture_clock.rs +++ b/client/src/live_capture_clock.rs @@ -4,6 +4,7 @@ use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; static CAPTURE_ORIGIN: OnceLock = OnceLock::new(); +const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250; fn origin() -> Instant { *CAPTURE_ORIGIN.get_or_init(Instant::now) @@ -52,6 +53,24 @@ pub fn upstream_timing_trace_enabled() -> bool { .unwrap_or(false) } +/// Cap how far source-derived packet timestamps may trail the live capture clock. +/// +/// Inputs: none. +/// Outputs: the maximum tolerated lag between a rebased source PTS and the +/// current capture clock. +/// Why: encoded appsink buffers can emerge well after the raw capture moment, +/// and trusting those delayed buffer PTS values without a guard can make the +/// server believe fresh audio/video packets are already hopelessly stale. +#[must_use] +pub fn upstream_source_lag_cap() -> Duration { + std::env::var("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS") + .ok() + .and_then(|raw| raw.trim().parse::().ok()) + .filter(|value| *value > 0) + .map(Duration::from_millis) + .unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS)) +} + #[derive(Debug, Default)] struct SourcePtsRebaserState { source_base_us: Option, @@ -82,6 +101,7 @@ pub struct RebasedSourcePts { pub source_base_us: Option, pub capture_base_us: Option, pub used_source_pts: bool, + pub lag_clamped: bool, } impl SourcePtsRebaser { @@ -93,6 +113,25 @@ impl SourcePtsRebaser { /// must still remain monotonic even if buffers repeat or arrive oddly. #[must_use] pub fn rebase_or_now(&self, source_pts_us: Option, min_step_us: u64) -> RebasedSourcePts { + self.rebase_with_lag_cap(source_pts_us, min_step_us, None) + } + + /// Translate one source-buffer timestamp onto the shared capture clock + /// while bounding how stale that source-derived timestamp may become. + /// + /// Inputs: optional source PTS, minimum monotonic step, and an optional + /// maximum lag behind the current capture clock. + /// Outputs: a rebased packet timestamp plus details about any lag clamp. + /// Why: encoder/parser queues can batch source buffers, so a pure + /// source-PTS timeline may fall far behind real packet availability and + /// poison server-side freshness calculations. + #[must_use] + pub fn rebase_with_lag_cap( + &self, + source_pts_us: Option, + min_step_us: u64, + max_lag: Option, + ) -> RebasedSourcePts { let capture_now_us = capture_pts_us(); let mut state = self .state @@ -100,6 +139,7 @@ impl SourcePtsRebaser { .expect("source pts rebaser mutex poisoned"); let mut packet_pts_us = capture_now_us; let mut used_source_pts = false; + let mut lag_clamped = false; if let Some(source_pts_us) = source_pts_us { let source_base_us = *state.source_base_us.get_or_insert(source_pts_us); @@ -109,6 +149,18 @@ impl SourcePtsRebaser { used_source_pts = true; } + if used_source_pts + && let Some(max_lag) = max_lag + { + let lag_floor_us = capture_now_us.saturating_sub( + max_lag.as_micros().min(u64::MAX as u128) as u64, + ); + if packet_pts_us < lag_floor_us { + packet_pts_us = lag_floor_us; + lag_clamped = true; + } + } + if let Some(last_packet_pts_us) = state.last_packet_pts_us && packet_pts_us <= last_packet_pts_us { @@ -123,13 +175,17 @@ impl SourcePtsRebaser { source_base_us: state.source_base_us, capture_base_us: state.capture_base_us, used_source_pts, + lag_clamped, } } } #[cfg(test)] mod tests { - use super::{SourcePtsRebaser, capture_pts_us, packet_age, upstream_timing_trace_enabled}; + use super::{ + SourcePtsRebaser, capture_pts_us, packet_age, upstream_source_lag_cap, + upstream_timing_trace_enabled, + }; use std::time::Duration; #[test] @@ -184,6 +240,25 @@ mod tests { assert!(!first.used_source_pts); assert!(!second.used_source_pts); assert!(second.packet_pts_us > first.packet_pts_us); + assert!(!first.lag_clamped); + assert!(!second.lag_clamped); + } + + #[test] + fn source_pts_rebaser_clamps_source_lag_when_it_falls_too_far_behind_now() { + let rebased = SourcePtsRebaser::default(); + let _first = rebased.rebase_with_lag_cap(Some(1_000_000), 1, None); + std::thread::sleep(Duration::from_millis(8)); + let second = rebased.rebase_with_lag_cap( + Some(1_000_001), + 1, + Some(Duration::from_millis(2)), + ); + + assert!(second.used_source_pts); + assert!(second.lag_clamped); + assert!(second.capture_now_us >= second.packet_pts_us); + assert!(second.capture_now_us - second.packet_pts_us <= 2_500); } #[test] @@ -200,4 +275,15 @@ mod tests { assert!(!upstream_timing_trace_enabled()); }); } + + #[test] + fn upstream_source_lag_cap_defaults_and_accepts_override() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", || { + assert_eq!(upstream_source_lag_cap(), Duration::from_millis(250)); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", Some("90"), || { + assert_eq!(upstream_source_lag_cap(), Duration::from_millis(90)); + }); + } } diff --git a/client/src/sync_probe/capture/runtime.rs b/client/src/sync_probe/capture/runtime.rs index 75eb681..49d637c 100644 --- a/client/src/sync_probe/capture/runtime.rs +++ b/client/src/sync_probe/capture/runtime.rs @@ -258,6 +258,8 @@ fn spawn_audio_thread( queue: FreshPacketQueue, ) -> JoinHandle<()> { thread::spawn(move || { + let pts_rebaser = crate::live_capture_clock::SourcePtsRebaser::default(); + let lag_cap = crate::live_capture_clock::upstream_source_lag_cap(); let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS); let samples_per_chunk = (AUDIO_SAMPLE_RATE as usize * AUDIO_CHUNK_MS as usize / 1_000).max(1); @@ -290,12 +292,26 @@ fn spawn_audio_thread( break; } - drain_audio_samples(&sink, &queue, duration, gst::ClockTime::from_mseconds(25)); + drain_audio_samples( + &sink, + &queue, + &pts_rebaser, + lag_cap, + duration, + gst::ClockTime::from_mseconds(25), + ); chunk_index = chunk_index.saturating_add(1); } let _ = src.end_of_stream(); - drain_audio_samples(&sink, &queue, duration, gst::ClockTime::from_mseconds(500)); + drain_audio_samples( + &sink, + &queue, + &pts_rebaser, + lag_cap, + duration, + gst::ClockTime::from_mseconds(500), + ); queue.close(); }) } @@ -303,6 +319,8 @@ fn spawn_audio_thread( fn drain_audio_samples( sink: &gst_app::AppSink, queue: &FreshPacketQueue, + pts_rebaser: &crate::live_capture_clock::SourcePtsRebaser, + lag_cap: Duration, duration: Duration, timeout: gst::ClockTime, ) { @@ -317,9 +335,10 @@ fn drain_audio_samples( let Ok(map) = buffer.map_readable() else { continue; }; + let timing = pts_rebaser.rebase_with_lag_cap(Some(pts_usecs), 1, Some(lag_cap)); let packet = AudioPacket { id: 0, - pts: pts_usecs, + pts: timing.packet_pts_us, data: map.as_slice().to_vec(), }; let _ = queue.push(packet, Duration::ZERO);