fix(sync): stamp upstream media from source timing

This commit is contained in:
Brad Stein 2026-04-25 15:49:30 -03:00
parent e155a9e140
commit d7ed1e0a4d
6 changed files with 241 additions and 42 deletions

View File

@ -50,6 +50,7 @@ pub struct CameraCapture {
pipeline: gst::Pipeline,
sink: gst_app::AppSink,
preview_tap_running: Option<Arc<AtomicBool>>,
pts_rebaser: crate::live_capture_clock::SourcePtsRebaser,
}
include!("camera/capture_pipeline.rs");

View File

@ -1,21 +1,3 @@
#[cfg(any(coverage, test))]
fn shared_capture_pts_us() -> u64 {
use std::sync::OnceLock;
use std::time::Instant;
static CAPTURE_ORIGIN: OnceLock<Instant> = OnceLock::new();
CAPTURE_ORIGIN
.get_or_init(Instant::now)
.elapsed()
.as_micros()
.min(u64::MAX as u128) as u64
}
#[cfg(not(any(coverage, test)))]
fn shared_capture_pts_us() -> u64 {
crate::live_capture_clock::capture_pts_us()
}
impl CameraCapture {
pub fn new(device_fragment: Option<&str>, cfg: Option<CameraConfig>) -> anyhow::Result<Self> {
gst::init().ok();
@ -245,6 +227,7 @@ impl CameraCapture {
pipeline,
sink,
preview_tap_running,
pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(),
})
}
@ -252,15 +235,35 @@ impl CameraCapture {
let sample = self.sink.pull_sample().ok()?;
let buf = sample.buffer()?;
let map = buf.map_readable().ok()?;
let pts = shared_capture_pts_us();
static FIRST_CAMERA_PACKET: AtomicBool = AtomicBool::new(false);
if !FIRST_CAMERA_PACKET.swap(true, Ordering::Relaxed) {
let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000);
let timing = self.pts_rebaser.rebase_or_now(source_pts_us, 1);
let pts = timing.packet_pts_us;
static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let packet_index = CAMERA_PACKET_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if packet_index == 0 {
tracing::info!(
bytes = map.as_slice().len(),
pts_us = pts,
"📸 upstream webcam frames flowing"
);
}
if crate::live_capture_clock::upstream_timing_trace_enabled()
&& (packet_index < 10 || packet_index.is_multiple_of(300))
{
tracing::info!(
packet_index,
source_pts_us = timing.source_pts_us.unwrap_or_default(),
source_base_us = timing.source_base_us.unwrap_or_default(),
capture_base_us = timing.capture_base_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us,
pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128,
used_source_pts = timing.used_source_pts,
bytes = map.as_slice().len(),
"📸 upstream webcam timing sample"
);
}
Some(VideoPacket {
id: 2,
pts,

View File

@ -94,6 +94,7 @@ impl CameraCapture {
pipeline,
sink,
preview_tap_running: None,
pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(),
}
}

View File

@ -20,24 +20,6 @@ use tracing::{debug, warn};
#[cfg(not(coverage))]
use tracing::{error, info, trace};
#[cfg(any(coverage, test))]
fn shared_capture_pts_us() -> u64 {
use std::sync::OnceLock;
use std::time::Instant;
static CAPTURE_ORIGIN: OnceLock<Instant> = OnceLock::new();
CAPTURE_ORIGIN
.get_or_init(Instant::now)
.elapsed()
.as_micros()
.min(u64::MAX as u128) as u64
}
#[cfg(not(any(coverage, test)))]
fn shared_capture_pts_us() -> u64 {
crate::live_capture_clock::capture_pts_us()
}
const MIC_GAIN_ENV: &str = "LESAVKA_MIC_GAIN";
const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL";
const MIC_LEVEL_TAP_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL";
@ -47,6 +29,7 @@ pub struct MicrophoneCapture {
pipeline: gst::Pipeline,
sink: gst_app::AppSink,
level_tap_running: Option<Arc<AtomicBool>>,
pts_rebaser: crate::live_capture_clock::SourcePtsRebaser,
}
impl MicrophoneCapture {
@ -131,6 +114,7 @@ impl MicrophoneCapture {
pipeline,
sink,
level_tap_running,
pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(),
})
}
@ -140,11 +124,30 @@ impl MicrophoneCapture {
Ok(sample) => {
let buf = sample.buffer().unwrap();
let map = buf.map_readable().unwrap();
let pts = shared_capture_pts_us();
let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000);
let timing = self.pts_rebaser.rebase_or_now(source_pts_us, 1);
let pts = timing.packet_pts_us;
#[cfg(not(coverage))]
{
static CNT: AtomicU64 = AtomicU64::new(0);
let n = CNT.fetch_add(1, Ordering::Relaxed);
if crate::live_capture_clock::upstream_timing_trace_enabled()
&& (n < 10 || n.is_multiple_of(300))
{
info!(
packet_index = n,
source_pts_us = timing.source_pts_us.unwrap_or_default(),
source_base_us = timing.source_base_us.unwrap_or_default(),
capture_base_us = timing.capture_base_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us,
pull_path_delay_us =
timing.capture_now_us as i128 - timing.packet_pts_us as i128,
used_source_pts = timing.used_source_pts,
bytes = map.len(),
"🎤 upstream microphone timing sample"
);
}
if n < 10 || n.is_multiple_of(300) {
trace!("🎤⇧ cli pkt#{n} {} bytes", map.len());
}

View File

@ -1,6 +1,6 @@
#![forbid(unsafe_code)]
use std::sync::OnceLock;
use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
static CAPTURE_ORIGIN: OnceLock<Instant> = OnceLock::new();
@ -32,9 +32,104 @@ pub fn packet_age(pts_us: u64) -> Duration {
Duration::from_micros(capture_pts_us().saturating_sub(pts_us))
}
/// Decide whether extra upstream timing instrumentation should be emitted.
///
/// Inputs: none.
/// Outputs: `true` when detailed capture/rebase timing logs are enabled.
/// Why: A/V sync work needs bursts of deep timing visibility without leaving
/// noisy logs on during normal live operation.
#[must_use]
pub fn upstream_timing_trace_enabled() -> bool {
std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(false)
}
#[derive(Debug, Default)]
struct SourcePtsRebaserState {
source_base_us: Option<u64>,
capture_base_us: Option<u64>,
last_packet_pts_us: Option<u64>,
}
/// Rebase source-buffer timestamps onto the shared client capture clock.
///
/// Inputs: optional source PTS values from one live capture pipeline.
/// Outputs: packet timestamps that share the same client clock origin across
/// camera and microphone while still advancing based on source timing rather
/// than late appsink pull time.
/// Why: camera and microphone encode paths can add different queue/encode
/// delays before appsink pull, so stamping at pull time bakes skew into the
/// packets before the server ever sees them.
#[derive(Debug, Default)]
pub struct SourcePtsRebaser {
state: Mutex<SourcePtsRebaserState>,
}
/// Snapshot of one client-side timestamp rebasing decision.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct RebasedSourcePts {
pub packet_pts_us: u64,
pub capture_now_us: u64,
pub source_pts_us: Option<u64>,
pub source_base_us: Option<u64>,
pub capture_base_us: Option<u64>,
pub used_source_pts: bool,
}
impl SourcePtsRebaser {
/// Translate one source-buffer timestamp onto the shared capture clock.
///
/// Inputs: the buffer PTS if available plus the minimum monotonic step.
/// Outputs: a rebased packet timestamp and the values used to derive it.
/// Why: source PTS should drive packet timing when available, but packets
/// must still remain monotonic even if buffers repeat or arrive oddly.
#[must_use]
pub fn rebase_or_now(&self, source_pts_us: Option<u64>, min_step_us: u64) -> RebasedSourcePts {
let capture_now_us = capture_pts_us();
let mut state = self
.state
.lock()
.expect("source pts rebaser mutex poisoned");
let mut packet_pts_us = capture_now_us;
let mut used_source_pts = false;
if let Some(source_pts_us) = source_pts_us {
let source_base_us = *state.source_base_us.get_or_insert(source_pts_us);
let capture_base_us = *state.capture_base_us.get_or_insert(capture_now_us);
packet_pts_us =
capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us));
used_source_pts = true;
}
if let Some(last_packet_pts_us) = state.last_packet_pts_us
&& packet_pts_us <= last_packet_pts_us
{
packet_pts_us = last_packet_pts_us.saturating_add(min_step_us.max(1));
}
state.last_packet_pts_us = Some(packet_pts_us);
RebasedSourcePts {
packet_pts_us,
capture_now_us,
source_pts_us,
source_base_us: state.source_base_us,
capture_base_us: state.capture_base_us,
used_source_pts,
}
}
}
#[cfg(test)]
mod tests {
use super::{capture_pts_us, packet_age};
use super::{SourcePtsRebaser, capture_pts_us, packet_age, upstream_timing_trace_enabled};
use std::time::Duration;
#[test]
@ -53,4 +148,56 @@ mod tests {
assert!(age >= Duration::from_millis(1));
assert!(age < Duration::from_secs(1));
}
#[test]
fn source_pts_rebaser_preserves_source_delta_on_shared_capture_clock() {
let rebased = SourcePtsRebaser::default();
let first = rebased.rebase_or_now(Some(1_000_000), 1);
let second = rebased.rebase_or_now(Some(1_033_333), 1);
assert!(first.used_source_pts);
assert_eq!(
second.packet_pts_us.saturating_sub(first.packet_pts_us),
33_333
);
assert_eq!(first.source_base_us, Some(1_000_000));
assert_eq!(second.source_base_us, Some(1_000_000));
assert_eq!(first.capture_base_us, second.capture_base_us);
}
#[test]
fn source_pts_rebaser_stays_monotonic_when_source_pts_repeat() {
let rebased = SourcePtsRebaser::default();
let first = rebased.rebase_or_now(Some(50_000), 1);
let second = rebased.rebase_or_now(Some(50_000), 1);
assert_eq!(second.packet_pts_us, first.packet_pts_us + 1);
}
#[test]
fn source_pts_rebaser_falls_back_to_capture_clock_without_source_pts() {
let rebased = SourcePtsRebaser::default();
let first = rebased.rebase_or_now(None, 1);
std::thread::sleep(Duration::from_millis(2));
let second = rebased.rebase_or_now(None, 1);
assert!(!first.used_source_pts);
assert!(!second.used_source_pts);
assert!(second.packet_pts_us > first.packet_pts_us);
}
#[test]
fn upstream_timing_trace_flag_defaults_off_and_accepts_true_values() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_TIMING_TRACE", || {
assert!(!upstream_timing_trace_enabled());
});
temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || {
assert!(upstream_timing_trace_enabled());
});
temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("false"), || {
assert!(!upstream_timing_trace_enabled());
});
}
}

View File

@ -32,9 +32,24 @@ struct UpstreamClockState {
microphone_base_remote_pts_us: Option<u64>,
last_video_local_pts_us: Option<u64>,
last_audio_local_pts_us: Option<u64>,
camera_packet_count: u64,
microphone_packet_count: u64,
startup_anchor_logged: bool,
}
fn upstream_timing_trace_enabled() -> bool {
std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(false)
}
/// Coordinate upstream stream ownership and keep audio/video on one timeline.
///
/// Inputs: stream-open/close events plus remote packet timestamps.
@ -118,6 +133,8 @@ impl UpstreamMediaRuntime {
state.microphone_base_remote_pts_us = None;
state.last_video_local_pts_us = None;
state.last_audio_local_pts_us = None;
state.camera_packet_count = 0;
state.microphone_packet_count = 0;
state.startup_anchor_logged = false;
}
match kind {
@ -185,6 +202,8 @@ impl UpstreamMediaRuntime {
state.microphone_base_remote_pts_us = None;
state.last_video_local_pts_us = None;
state.last_audio_local_pts_us = None;
state.camera_packet_count = 0;
state.microphone_packet_count = 0;
state.startup_anchor_logged = false;
}
}
@ -211,6 +230,16 @@ impl UpstreamMediaRuntime {
.lock()
.expect("upstream media state mutex poisoned");
let session_id = state.session_id;
let packet_count = match kind {
UpstreamMediaKind::Camera => {
state.camera_packet_count = state.camera_packet_count.saturating_add(1);
state.camera_packet_count
}
UpstreamMediaKind::Microphone => {
state.microphone_packet_count = state.microphone_packet_count.saturating_add(1);
state.microphone_packet_count
}
};
let base_slot = match kind {
UpstreamMediaKind::Camera => &mut state.camera_base_remote_pts_us,
UpstreamMediaKind::Microphone => &mut state.microphone_base_remote_pts_us,
@ -245,6 +274,21 @@ impl UpstreamMediaRuntime {
local_pts_us = last_pts_us.saturating_add(min_step_us.max(1));
}
*last_slot = Some(local_pts_us);
if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
{
let remote_elapsed_us = remote_pts_us.saturating_sub(first_remote_for_kind);
info!(
session_id,
?kind,
packet_count,
remote_pts_us,
first_remote_for_kind,
remote_elapsed_us,
local_pts_us,
"upstream media rebase sample"
);
}
local_pts_us
}
}