diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index 2410329..8c75911 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -50,6 +50,7 @@ pub struct CameraCapture { pipeline: gst::Pipeline, sink: gst_app::AppSink, preview_tap_running: Option>, + pts_rebaser: crate::live_capture_clock::SourcePtsRebaser, } include!("camera/capture_pipeline.rs"); diff --git a/client/src/input/camera/capture_pipeline.rs b/client/src/input/camera/capture_pipeline.rs index 41cabb0..54332cf 100644 --- a/client/src/input/camera/capture_pipeline.rs +++ b/client/src/input/camera/capture_pipeline.rs @@ -1,21 +1,3 @@ -#[cfg(any(coverage, test))] -fn shared_capture_pts_us() -> u64 { - use std::sync::OnceLock; - use std::time::Instant; - - static CAPTURE_ORIGIN: OnceLock = OnceLock::new(); - CAPTURE_ORIGIN - .get_or_init(Instant::now) - .elapsed() - .as_micros() - .min(u64::MAX as u128) as u64 -} - -#[cfg(not(any(coverage, test)))] -fn shared_capture_pts_us() -> u64 { - crate::live_capture_clock::capture_pts_us() -} - impl CameraCapture { pub fn new(device_fragment: Option<&str>, cfg: Option) -> anyhow::Result { gst::init().ok(); @@ -245,6 +227,7 @@ impl CameraCapture { pipeline, sink, preview_tap_running, + pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(), }) } @@ -252,15 +235,35 @@ impl CameraCapture { let sample = self.sink.pull_sample().ok()?; let buf = sample.buffer()?; let map = buf.map_readable().ok()?; - let pts = shared_capture_pts_us(); - static FIRST_CAMERA_PACKET: AtomicBool = AtomicBool::new(false); - if !FIRST_CAMERA_PACKET.swap(true, Ordering::Relaxed) { + let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000); + let timing = self.pts_rebaser.rebase_or_now(source_pts_us, 1); + let pts = timing.packet_pts_us; + static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let packet_index = CAMERA_PACKET_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if packet_index == 0 { tracing::info!( bytes = map.as_slice().len(), pts_us = pts, "📸 upstream webcam frames flowing" ); } + if crate::live_capture_clock::upstream_timing_trace_enabled() + && (packet_index < 10 || packet_index.is_multiple_of(300)) + { + tracing::info!( + packet_index, + source_pts_us = timing.source_pts_us.unwrap_or_default(), + source_base_us = timing.source_base_us.unwrap_or_default(), + capture_base_us = timing.capture_base_us.unwrap_or_default(), + capture_now_us = timing.capture_now_us, + packet_pts_us = timing.packet_pts_us, + pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, + used_source_pts = timing.used_source_pts, + bytes = map.as_slice().len(), + "📸 upstream webcam timing sample" + ); + } Some(VideoPacket { id: 2, pts, diff --git a/client/src/input/camera/device_selection.rs b/client/src/input/camera/device_selection.rs index 7b9f21a..c407239 100644 --- a/client/src/input/camera/device_selection.rs +++ b/client/src/input/camera/device_selection.rs @@ -94,6 +94,7 @@ impl CameraCapture { pipeline, sink, preview_tap_running: None, + pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(), } } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index a35aedb..c40542f 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -20,24 +20,6 @@ use tracing::{debug, warn}; #[cfg(not(coverage))] use tracing::{error, info, trace}; -#[cfg(any(coverage, test))] -fn shared_capture_pts_us() -> u64 { - use std::sync::OnceLock; - use std::time::Instant; - - static CAPTURE_ORIGIN: OnceLock = OnceLock::new(); - CAPTURE_ORIGIN - .get_or_init(Instant::now) - .elapsed() - .as_micros() - .min(u64::MAX as u128) as u64 -} - -#[cfg(not(any(coverage, test)))] -fn shared_capture_pts_us() -> u64 { - crate::live_capture_clock::capture_pts_us() -} - const MIC_GAIN_ENV: &str = "LESAVKA_MIC_GAIN"; const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL"; const MIC_LEVEL_TAP_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL"; @@ -47,6 +29,7 @@ pub struct MicrophoneCapture { pipeline: gst::Pipeline, sink: gst_app::AppSink, level_tap_running: Option>, + pts_rebaser: crate::live_capture_clock::SourcePtsRebaser, } impl MicrophoneCapture { @@ -131,6 +114,7 @@ impl MicrophoneCapture { pipeline, sink, level_tap_running, + pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(), }) } @@ -140,11 +124,30 @@ impl MicrophoneCapture { Ok(sample) => { let buf = sample.buffer().unwrap(); let map = buf.map_readable().unwrap(); - let pts = shared_capture_pts_us(); + let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000); + let timing = self.pts_rebaser.rebase_or_now(source_pts_us, 1); + let pts = timing.packet_pts_us; #[cfg(not(coverage))] { static CNT: AtomicU64 = AtomicU64::new(0); let n = CNT.fetch_add(1, Ordering::Relaxed); + if crate::live_capture_clock::upstream_timing_trace_enabled() + && (n < 10 || n.is_multiple_of(300)) + { + info!( + packet_index = n, + source_pts_us = timing.source_pts_us.unwrap_or_default(), + source_base_us = timing.source_base_us.unwrap_or_default(), + capture_base_us = timing.capture_base_us.unwrap_or_default(), + capture_now_us = timing.capture_now_us, + packet_pts_us = timing.packet_pts_us, + pull_path_delay_us = + timing.capture_now_us as i128 - timing.packet_pts_us as i128, + used_source_pts = timing.used_source_pts, + bytes = map.len(), + "🎤 upstream microphone timing sample" + ); + } if n < 10 || n.is_multiple_of(300) { trace!("🎤⇧ cli pkt#{n} {} bytes", map.len()); } diff --git a/client/src/live_capture_clock.rs b/client/src/live_capture_clock.rs index ef955cc..9a32d5d 100644 --- a/client/src/live_capture_clock.rs +++ b/client/src/live_capture_clock.rs @@ -1,6 +1,6 @@ #![forbid(unsafe_code)] -use std::sync::OnceLock; +use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; static CAPTURE_ORIGIN: OnceLock = OnceLock::new(); @@ -32,9 +32,104 @@ pub fn packet_age(pts_us: u64) -> Duration { Duration::from_micros(capture_pts_us().saturating_sub(pts_us)) } +/// Decide whether extra upstream timing instrumentation should be emitted. +/// +/// Inputs: none. +/// Outputs: `true` when detailed capture/rebase timing logs are enabled. +/// Why: A/V sync work needs bursts of deep timing visibility without leaving +/// noisy logs on during normal live operation. +#[must_use] +pub fn upstream_timing_trace_enabled() -> bool { + std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE") + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(false) +} + +#[derive(Debug, Default)] +struct SourcePtsRebaserState { + source_base_us: Option, + capture_base_us: Option, + last_packet_pts_us: Option, +} + +/// Rebase source-buffer timestamps onto the shared client capture clock. +/// +/// Inputs: optional source PTS values from one live capture pipeline. +/// Outputs: packet timestamps that share the same client clock origin across +/// camera and microphone while still advancing based on source timing rather +/// than late appsink pull time. +/// Why: camera and microphone encode paths can add different queue/encode +/// delays before appsink pull, so stamping at pull time bakes skew into the +/// packets before the server ever sees them. +#[derive(Debug, Default)] +pub struct SourcePtsRebaser { + state: Mutex, +} + +/// Snapshot of one client-side timestamp rebasing decision. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct RebasedSourcePts { + pub packet_pts_us: u64, + pub capture_now_us: u64, + pub source_pts_us: Option, + pub source_base_us: Option, + pub capture_base_us: Option, + pub used_source_pts: bool, +} + +impl SourcePtsRebaser { + /// Translate one source-buffer timestamp onto the shared capture clock. + /// + /// Inputs: the buffer PTS if available plus the minimum monotonic step. + /// Outputs: a rebased packet timestamp and the values used to derive it. + /// Why: source PTS should drive packet timing when available, but packets + /// must still remain monotonic even if buffers repeat or arrive oddly. + #[must_use] + pub fn rebase_or_now(&self, source_pts_us: Option, min_step_us: u64) -> RebasedSourcePts { + let capture_now_us = capture_pts_us(); + let mut state = self + .state + .lock() + .expect("source pts rebaser mutex poisoned"); + let mut packet_pts_us = capture_now_us; + let mut used_source_pts = false; + + if let Some(source_pts_us) = source_pts_us { + let source_base_us = *state.source_base_us.get_or_insert(source_pts_us); + let capture_base_us = *state.capture_base_us.get_or_insert(capture_now_us); + packet_pts_us = + capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us)); + used_source_pts = true; + } + + if let Some(last_packet_pts_us) = state.last_packet_pts_us + && packet_pts_us <= last_packet_pts_us + { + packet_pts_us = last_packet_pts_us.saturating_add(min_step_us.max(1)); + } + + state.last_packet_pts_us = Some(packet_pts_us); + RebasedSourcePts { + packet_pts_us, + capture_now_us, + source_pts_us, + source_base_us: state.source_base_us, + capture_base_us: state.capture_base_us, + used_source_pts, + } + } +} + #[cfg(test)] mod tests { - use super::{capture_pts_us, packet_age}; + use super::{SourcePtsRebaser, capture_pts_us, packet_age, upstream_timing_trace_enabled}; use std::time::Duration; #[test] @@ -53,4 +148,56 @@ mod tests { assert!(age >= Duration::from_millis(1)); assert!(age < Duration::from_secs(1)); } + + #[test] + fn source_pts_rebaser_preserves_source_delta_on_shared_capture_clock() { + let rebased = SourcePtsRebaser::default(); + let first = rebased.rebase_or_now(Some(1_000_000), 1); + let second = rebased.rebase_or_now(Some(1_033_333), 1); + + assert!(first.used_source_pts); + assert_eq!( + second.packet_pts_us.saturating_sub(first.packet_pts_us), + 33_333 + ); + assert_eq!(first.source_base_us, Some(1_000_000)); + assert_eq!(second.source_base_us, Some(1_000_000)); + assert_eq!(first.capture_base_us, second.capture_base_us); + } + + #[test] + fn source_pts_rebaser_stays_monotonic_when_source_pts_repeat() { + let rebased = SourcePtsRebaser::default(); + let first = rebased.rebase_or_now(Some(50_000), 1); + let second = rebased.rebase_or_now(Some(50_000), 1); + + assert_eq!(second.packet_pts_us, first.packet_pts_us + 1); + } + + #[test] + fn source_pts_rebaser_falls_back_to_capture_clock_without_source_pts() { + let rebased = SourcePtsRebaser::default(); + let first = rebased.rebase_or_now(None, 1); + std::thread::sleep(Duration::from_millis(2)); + let second = rebased.rebase_or_now(None, 1); + + assert!(!first.used_source_pts); + assert!(!second.used_source_pts); + assert!(second.packet_pts_us > first.packet_pts_us); + } + + #[test] + fn upstream_timing_trace_flag_defaults_off_and_accepts_true_values() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_TIMING_TRACE", || { + assert!(!upstream_timing_trace_enabled()); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { + assert!(upstream_timing_trace_enabled()); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("false"), || { + assert!(!upstream_timing_trace_enabled()); + }); + } } diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 91aed48..03bbdbe 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -32,9 +32,24 @@ struct UpstreamClockState { microphone_base_remote_pts_us: Option, last_video_local_pts_us: Option, last_audio_local_pts_us: Option, + camera_packet_count: u64, + microphone_packet_count: u64, startup_anchor_logged: bool, } +fn upstream_timing_trace_enabled() -> bool { + std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE") + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(false) +} + /// Coordinate upstream stream ownership and keep audio/video on one timeline. /// /// Inputs: stream-open/close events plus remote packet timestamps. @@ -118,6 +133,8 @@ impl UpstreamMediaRuntime { state.microphone_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; + state.camera_packet_count = 0; + state.microphone_packet_count = 0; state.startup_anchor_logged = false; } match kind { @@ -185,6 +202,8 @@ impl UpstreamMediaRuntime { state.microphone_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; + state.camera_packet_count = 0; + state.microphone_packet_count = 0; state.startup_anchor_logged = false; } } @@ -211,6 +230,16 @@ impl UpstreamMediaRuntime { .lock() .expect("upstream media state mutex poisoned"); let session_id = state.session_id; + let packet_count = match kind { + UpstreamMediaKind::Camera => { + state.camera_packet_count = state.camera_packet_count.saturating_add(1); + state.camera_packet_count + } + UpstreamMediaKind::Microphone => { + state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); + state.microphone_packet_count + } + }; let base_slot = match kind { UpstreamMediaKind::Camera => &mut state.camera_base_remote_pts_us, UpstreamMediaKind::Microphone => &mut state.microphone_base_remote_pts_us, @@ -245,6 +274,21 @@ impl UpstreamMediaRuntime { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); + if upstream_timing_trace_enabled() + && (packet_count <= 10 || packet_count.is_multiple_of(300)) + { + let remote_elapsed_us = remote_pts_us.saturating_sub(first_remote_for_kind); + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + first_remote_for_kind, + remote_elapsed_us, + local_pts_us, + "upstream media rebase sample" + ); + } local_pts_us } }