fix(sync): cap delayed audio source timestamps

This commit is contained in:
Brad Stein 2026-04-25 23:04:31 -03:00
parent 1153d6843d
commit 28cbdc8808
3 changed files with 115 additions and 5 deletions

View File

@ -125,7 +125,11 @@ impl MicrophoneCapture {
let buf = sample.buffer().unwrap();
let map = buf.map_readable().unwrap();
let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000);
let timing = self.pts_rebaser.rebase_or_now(source_pts_us, 1);
let timing = self.pts_rebaser.rebase_with_lag_cap(
source_pts_us,
1,
Some(crate::live_capture_clock::upstream_source_lag_cap()),
);
let pts = timing.packet_pts_us;
#[cfg(not(coverage))]
{
@ -144,6 +148,7 @@ impl MicrophoneCapture {
pull_path_delay_us =
timing.capture_now_us as i128 - timing.packet_pts_us as i128,
used_source_pts = timing.used_source_pts,
lag_clamped = timing.lag_clamped,
bytes = map.len(),
"🎤 upstream microphone timing sample"
);

View File

@ -4,6 +4,7 @@ use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
static CAPTURE_ORIGIN: OnceLock<Instant> = OnceLock::new();
const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250;
fn origin() -> Instant {
*CAPTURE_ORIGIN.get_or_init(Instant::now)
@ -52,6 +53,24 @@ pub fn upstream_timing_trace_enabled() -> bool {
.unwrap_or(false)
}
/// Cap how far source-derived packet timestamps may trail the live capture clock.
///
/// Inputs: none.
/// Outputs: the maximum tolerated lag between a rebased source PTS and the
/// current capture clock.
/// Why: encoded appsink buffers can emerge well after the raw capture moment,
/// and trusting those delayed buffer PTS values without a guard can make the
/// server believe fresh audio/video packets are already hopelessly stale.
#[must_use]
pub fn upstream_source_lag_cap() -> Duration {
std::env::var("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS")
.ok()
.and_then(|raw| raw.trim().parse::<u64>().ok())
.filter(|value| *value > 0)
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS))
}
#[derive(Debug, Default)]
struct SourcePtsRebaserState {
source_base_us: Option<u64>,
@ -82,6 +101,7 @@ pub struct RebasedSourcePts {
pub source_base_us: Option<u64>,
pub capture_base_us: Option<u64>,
pub used_source_pts: bool,
pub lag_clamped: bool,
}
impl SourcePtsRebaser {
@ -93,6 +113,25 @@ impl SourcePtsRebaser {
/// must still remain monotonic even if buffers repeat or arrive oddly.
#[must_use]
pub fn rebase_or_now(&self, source_pts_us: Option<u64>, min_step_us: u64) -> RebasedSourcePts {
self.rebase_with_lag_cap(source_pts_us, min_step_us, None)
}
/// Translate one source-buffer timestamp onto the shared capture clock
/// while bounding how stale that source-derived timestamp may become.
///
/// Inputs: optional source PTS, minimum monotonic step, and an optional
/// maximum lag behind the current capture clock.
/// Outputs: a rebased packet timestamp plus details about any lag clamp.
/// Why: encoder/parser queues can batch source buffers, so a pure
/// source-PTS timeline may fall far behind real packet availability and
/// poison server-side freshness calculations.
#[must_use]
pub fn rebase_with_lag_cap(
&self,
source_pts_us: Option<u64>,
min_step_us: u64,
max_lag: Option<Duration>,
) -> RebasedSourcePts {
let capture_now_us = capture_pts_us();
let mut state = self
.state
@ -100,6 +139,7 @@ impl SourcePtsRebaser {
.expect("source pts rebaser mutex poisoned");
let mut packet_pts_us = capture_now_us;
let mut used_source_pts = false;
let mut lag_clamped = false;
if let Some(source_pts_us) = source_pts_us {
let source_base_us = *state.source_base_us.get_or_insert(source_pts_us);
@ -109,6 +149,18 @@ impl SourcePtsRebaser {
used_source_pts = true;
}
if used_source_pts
&& let Some(max_lag) = max_lag
{
let lag_floor_us = capture_now_us.saturating_sub(
max_lag.as_micros().min(u64::MAX as u128) as u64,
);
if packet_pts_us < lag_floor_us {
packet_pts_us = lag_floor_us;
lag_clamped = true;
}
}
if let Some(last_packet_pts_us) = state.last_packet_pts_us
&& packet_pts_us <= last_packet_pts_us
{
@ -123,13 +175,17 @@ impl SourcePtsRebaser {
source_base_us: state.source_base_us,
capture_base_us: state.capture_base_us,
used_source_pts,
lag_clamped,
}
}
}
#[cfg(test)]
mod tests {
use super::{SourcePtsRebaser, capture_pts_us, packet_age, upstream_timing_trace_enabled};
use super::{
SourcePtsRebaser, capture_pts_us, packet_age, upstream_source_lag_cap,
upstream_timing_trace_enabled,
};
use std::time::Duration;
#[test]
@ -184,6 +240,25 @@ mod tests {
assert!(!first.used_source_pts);
assert!(!second.used_source_pts);
assert!(second.packet_pts_us > first.packet_pts_us);
assert!(!first.lag_clamped);
assert!(!second.lag_clamped);
}
#[test]
fn source_pts_rebaser_clamps_source_lag_when_it_falls_too_far_behind_now() {
let rebased = SourcePtsRebaser::default();
let _first = rebased.rebase_with_lag_cap(Some(1_000_000), 1, None);
std::thread::sleep(Duration::from_millis(8));
let second = rebased.rebase_with_lag_cap(
Some(1_000_001),
1,
Some(Duration::from_millis(2)),
);
assert!(second.used_source_pts);
assert!(second.lag_clamped);
assert!(second.capture_now_us >= second.packet_pts_us);
assert!(second.capture_now_us - second.packet_pts_us <= 2_500);
}
#[test]
@ -200,4 +275,15 @@ mod tests {
assert!(!upstream_timing_trace_enabled());
});
}
#[test]
fn upstream_source_lag_cap_defaults_and_accepts_override() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", || {
assert_eq!(upstream_source_lag_cap(), Duration::from_millis(250));
});
temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", Some("90"), || {
assert_eq!(upstream_source_lag_cap(), Duration::from_millis(90));
});
}
}

View File

@ -258,6 +258,8 @@ fn spawn_audio_thread(
queue: FreshPacketQueue<AudioPacket>,
) -> JoinHandle<()> {
thread::spawn(move || {
let pts_rebaser = crate::live_capture_clock::SourcePtsRebaser::default();
let lag_cap = crate::live_capture_clock::upstream_source_lag_cap();
let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS);
let samples_per_chunk =
(AUDIO_SAMPLE_RATE as usize * AUDIO_CHUNK_MS as usize / 1_000).max(1);
@ -290,12 +292,26 @@ fn spawn_audio_thread(
break;
}
drain_audio_samples(&sink, &queue, duration, gst::ClockTime::from_mseconds(25));
drain_audio_samples(
&sink,
&queue,
&pts_rebaser,
lag_cap,
duration,
gst::ClockTime::from_mseconds(25),
);
chunk_index = chunk_index.saturating_add(1);
}
let _ = src.end_of_stream();
drain_audio_samples(&sink, &queue, duration, gst::ClockTime::from_mseconds(500));
drain_audio_samples(
&sink,
&queue,
&pts_rebaser,
lag_cap,
duration,
gst::ClockTime::from_mseconds(500),
);
queue.close();
})
}
@ -303,6 +319,8 @@ fn spawn_audio_thread(
fn drain_audio_samples(
sink: &gst_app::AppSink,
queue: &FreshPacketQueue<AudioPacket>,
pts_rebaser: &crate::live_capture_clock::SourcePtsRebaser,
lag_cap: Duration,
duration: Duration,
timeout: gst::ClockTime,
) {
@ -317,9 +335,10 @@ fn drain_audio_samples(
let Ok(map) = buffer.map_readable() else {
continue;
};
let timing = pts_rebaser.rebase_with_lag_cap(Some(pts_usecs), 1, Some(lag_cap));
let packet = AudioPacket {
id: 0,
pts: pts_usecs,
pts: timing.packet_pts_us,
data: map.as_slice().to_vec(),
};
let _ = queue.push(packet, Duration::ZERO);