#![forbid(unsafe_code)] use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; static CAPTURE_ORIGIN: OnceLock = OnceLock::new(); static SHARED_SOURCE_CAPTURE_BASE_US: OnceLock>> = OnceLock::new(); const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250; fn origin() -> Instant { *CAPTURE_ORIGIN.get_or_init(Instant::now) } fn shared_source_capture_base_slot() -> &'static Mutex> { SHARED_SOURCE_CAPTURE_BASE_US.get_or_init(|| Mutex::new(None)) } /// Return the shared live-capture timestamp for upstream camera/mic packets. /// /// Inputs: none. /// Outputs: microseconds elapsed since the relay child first stamped live media. /// Why: camera and microphone capture pipelines run independently, so they need /// one explicit common origin before the server can keep them on the same live /// call timeline. #[must_use] pub fn capture_pts_us() -> u64 { origin().elapsed().as_micros().min(u64::MAX as u128) as u64 } /// Measure how old one shared capture timestamp is right now. /// /// Inputs: a packet timestamp previously produced by `capture_pts_us`. /// Outputs: the elapsed age as a `Duration`. /// Why: upstream freshness telemetry should use the same shared live clock as /// packet timestamps so queue-age calculations stay honest. #[must_use] pub fn packet_age(pts_us: u64) -> Duration { Duration::from_micros(capture_pts_us().saturating_sub(pts_us)) } /// Decide whether extra upstream timing instrumentation should be emitted. /// /// Inputs: none. /// Outputs: `true` when detailed capture/rebase timing logs are enabled. /// Why: A/V sync work needs bursts of deep timing visibility without leaving /// noisy logs on during normal live operation. #[must_use] pub fn upstream_timing_trace_enabled() -> bool { std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE") .ok() .map(|value| { let trimmed = value.trim(); !(trimmed.eq_ignore_ascii_case("0") || trimmed.eq_ignore_ascii_case("false") || trimmed.eq_ignore_ascii_case("no") || trimmed.eq_ignore_ascii_case("off")) }) .unwrap_or(false) } /// Cap how far source-derived packet timestamps may trail the live capture clock. /// /// Inputs: none. /// Outputs: the maximum tolerated lag between a rebased source PTS and the /// current capture clock. /// Why: encoded appsink buffers can emerge well after the raw capture moment, /// and trusting those delayed buffer PTS values without a guard can make the /// server believe fresh audio/video packets are already hopelessly stale. #[must_use] pub fn upstream_source_lag_cap() -> Duration { std::env::var("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS") .ok() .and_then(|raw| raw.trim().parse::().ok()) .filter(|value| *value > 0) .map(Duration::from_millis) .unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS)) } #[derive(Debug, Default)] struct SourcePtsRebaserState { source_base_us: Option, capture_base_us: Option, last_packet_pts_us: Option, } /// Rebase source-buffer timestamps onto the shared client capture clock. /// /// Inputs: optional source PTS values from one live capture pipeline. /// Outputs: packet timestamps that share the same client clock origin across /// camera and microphone while still advancing based on source timing rather /// than late appsink pull time. /// Why: camera and microphone encode paths can add different queue/encode /// delays before appsink pull, so stamping at pull time bakes skew into the /// packets before the server ever sees them. #[derive(Debug, Default)] pub struct SourcePtsRebaser { state: Mutex, } /// Snapshot of one client-side timestamp rebasing decision. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct RebasedSourcePts { pub packet_pts_us: u64, pub capture_now_us: u64, pub source_pts_us: Option, pub source_base_us: Option, pub capture_base_us: Option, pub used_source_pts: bool, pub lag_clamped: bool, } #[derive(Debug, Default)] struct DurationPacedSourcePtsState { next_packet_pts_us: Option, } /// Rebase encoded packet timing by anchoring once, then pacing by duration. /// /// Inputs: optional source PTS from one encoded packet stream plus the packet's /// declared duration and a freshness lag cap. /// Outputs: packet timestamps on the shared client capture clock that advance /// by actual media duration instead of trusting potentially stretched parser /// PTS on every packet. /// Why: encoded audio parsers can emit packet PTS values that do not track /// real packet duration faithfully, which can make the server pace audio far /// too slowly or quickly even when the underlying capture stream is healthy. #[derive(Debug, Default)] pub struct DurationPacedSourcePtsRebaser { anchor_rebaser: SourcePtsRebaser, state: Mutex, } impl SourcePtsRebaser { /// Translate one source-buffer timestamp onto the shared capture clock. /// /// Inputs: the buffer PTS if available plus the minimum monotonic step. /// Outputs: a rebased packet timestamp and the values used to derive it. /// Why: source PTS should drive packet timing when available, but packets /// must still remain monotonic even if buffers repeat or arrive oddly. #[must_use] pub fn rebase_or_now(&self, source_pts_us: Option, min_step_us: u64) -> RebasedSourcePts { self.rebase_with_lag_cap(source_pts_us, min_step_us, None) } /// Translate one source-buffer timestamp onto the shared capture clock /// while bounding how stale that source-derived timestamp may become. /// /// Inputs: optional source PTS, minimum monotonic step, and an optional /// maximum lag behind the current capture clock. /// Outputs: a rebased packet timestamp plus details about any lag clamp. /// Why: encoder/parser queues can batch source buffers, so a pure /// source-PTS timeline may fall far behind real packet availability and /// poison server-side freshness calculations. #[must_use] pub fn rebase_with_lag_cap( &self, source_pts_us: Option, min_step_us: u64, max_lag: Option, ) -> RebasedSourcePts { let capture_now_us = capture_pts_us(); let mut state = self .state .lock() .expect("source pts rebaser mutex poisoned"); let mut packet_pts_us = capture_now_us; let mut used_source_pts = false; let mut lag_clamped = false; if let Some(source_pts_us) = source_pts_us { let source_base_us = *state.source_base_us.get_or_insert(source_pts_us); let capture_base_us = { let mut shared_capture_base_us = shared_source_capture_base_slot() .lock() .expect("shared source capture base mutex poisoned"); *shared_capture_base_us.get_or_insert(capture_now_us) }; state.capture_base_us = Some(capture_base_us); packet_pts_us = capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us)); used_source_pts = true; } if used_source_pts && let Some(max_lag) = max_lag { let lag_floor_us = capture_now_us.saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64); if packet_pts_us < lag_floor_us { packet_pts_us = lag_floor_us; lag_clamped = true; } } if let Some(last_packet_pts_us) = state.last_packet_pts_us && packet_pts_us <= last_packet_pts_us { packet_pts_us = last_packet_pts_us.saturating_add(min_step_us.max(1)); } state.last_packet_pts_us = Some(packet_pts_us); RebasedSourcePts { packet_pts_us, capture_now_us, source_pts_us, source_base_us: state.source_base_us, capture_base_us: state.capture_base_us, used_source_pts, lag_clamped, } } } impl DurationPacedSourcePtsRebaser { /// Rebase one encoded packet onto the shared capture clock. /// /// Inputs: optional packet PTS, the packet media duration in microseconds, /// and a freshness lag cap behind the live capture clock. /// Outputs: a rebased packet timestamp plus the values used to derive it. /// Why: once the first encoded packet is anchored, the safest pacing signal /// for compressed audio is its actual packet duration, with a live lag /// clamp to keep delayed batches from resurrecting stale timing. #[must_use] pub fn rebase_with_packet_duration( &self, source_pts_us: Option, packet_duration_us: u64, max_lag: Duration, ) -> RebasedSourcePts { let step_us = packet_duration_us.max(1); let mut rebased = self.anchor_rebaser .rebase_with_lag_cap(source_pts_us, step_us, Some(max_lag)); let lag_floor_us = rebased .capture_now_us .saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64); let mut state = self .state .lock() .expect("duration paced source pts rebaser mutex poisoned"); let mut packet_pts_us = state.next_packet_pts_us.unwrap_or(rebased.packet_pts_us); if packet_pts_us < lag_floor_us { packet_pts_us = lag_floor_us; rebased.lag_clamped = true; } state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us)); rebased.packet_pts_us = packet_pts_us; rebased } } #[cfg(test)] mod tests { use super::{ DurationPacedSourcePtsRebaser, SourcePtsRebaser, capture_pts_us, packet_age, upstream_source_lag_cap, upstream_timing_trace_enabled, }; use serial_test::serial; use std::time::Duration; fn reset_shared_source_capture_base_for_tests() { *super::shared_source_capture_base_slot() .lock() .expect("shared source capture base mutex poisoned") = None; } #[test] #[serial] fn capture_pts_us_monotonically_advances() { reset_shared_source_capture_base_for_tests(); let first = capture_pts_us(); std::thread::sleep(Duration::from_millis(2)); let second = capture_pts_us(); assert!(second >= first); } #[test] #[serial] fn packet_age_is_small_for_recent_packets() { reset_shared_source_capture_base_for_tests(); let pts = capture_pts_us(); std::thread::sleep(Duration::from_millis(2)); let age = packet_age(pts); assert!(age >= Duration::from_millis(1)); assert!(age < Duration::from_secs(1)); } #[test] #[serial] fn source_pts_rebaser_preserves_source_delta_on_shared_capture_clock() { reset_shared_source_capture_base_for_tests(); let rebased = SourcePtsRebaser::default(); let first = rebased.rebase_or_now(Some(1_000_000), 1); let second = rebased.rebase_or_now(Some(1_033_333), 1); assert!(first.used_source_pts); assert_eq!( second.packet_pts_us.saturating_sub(first.packet_pts_us), 33_333 ); assert_eq!(first.source_base_us, Some(1_000_000)); assert_eq!(second.source_base_us, Some(1_000_000)); assert_eq!(first.capture_base_us, second.capture_base_us); } #[test] #[serial] fn source_pts_rebaser_stays_monotonic_when_source_pts_repeat() { reset_shared_source_capture_base_for_tests(); let rebased = SourcePtsRebaser::default(); let first = rebased.rebase_or_now(Some(50_000), 1); let second = rebased.rebase_or_now(Some(50_000), 1); assert_eq!(second.packet_pts_us, first.packet_pts_us + 1); } #[test] #[serial] fn source_pts_rebaser_falls_back_to_capture_clock_without_source_pts() { reset_shared_source_capture_base_for_tests(); let rebased = SourcePtsRebaser::default(); let first = rebased.rebase_or_now(None, 1); std::thread::sleep(Duration::from_millis(2)); let second = rebased.rebase_or_now(None, 1); assert!(!first.used_source_pts); assert!(!second.used_source_pts); assert!(second.packet_pts_us > first.packet_pts_us); assert!(!first.lag_clamped); assert!(!second.lag_clamped); } #[test] #[serial] fn source_pts_rebaser_clamps_source_lag_when_it_falls_too_far_behind_now() { reset_shared_source_capture_base_for_tests(); let rebased = SourcePtsRebaser::default(); let _first = rebased.rebase_with_lag_cap(Some(1_000_000), 1, None); std::thread::sleep(Duration::from_millis(8)); let second = rebased.rebase_with_lag_cap(Some(1_000_001), 1, Some(Duration::from_millis(2))); assert!(second.used_source_pts); assert!(second.lag_clamped); assert!(second.capture_now_us >= second.packet_pts_us); assert!(second.capture_now_us - second.packet_pts_us <= 2_500); } #[test] #[serial] fn source_pts_rebasers_share_one_capture_base_across_streams() { reset_shared_source_capture_base_for_tests(); let microphone = SourcePtsRebaser::default(); let camera = SourcePtsRebaser::default(); let first_microphone = microphone.rebase_or_now(Some(80_000), 1); std::thread::sleep(Duration::from_millis(5)); let first_camera = camera.rebase_or_now(Some(435_000), 1); assert_eq!(first_microphone.capture_base_us, first_camera.capture_base_us); assert_eq!(first_microphone.packet_pts_us, first_camera.packet_pts_us); assert_eq!(first_microphone.source_base_us, Some(80_000)); assert_eq!(first_camera.source_base_us, Some(435_000)); } #[test] #[serial] fn upstream_timing_trace_flag_defaults_off_and_accepts_true_values() { reset_shared_source_capture_base_for_tests(); temp_env::with_var_unset("LESAVKA_UPSTREAM_TIMING_TRACE", || { assert!(!upstream_timing_trace_enabled()); }); temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { assert!(upstream_timing_trace_enabled()); }); temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("false"), || { assert!(!upstream_timing_trace_enabled()); }); } #[test] #[serial] fn upstream_source_lag_cap_defaults_and_accepts_override() { reset_shared_source_capture_base_for_tests(); temp_env::with_var_unset("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", || { assert_eq!(upstream_source_lag_cap(), Duration::from_millis(250)); }); temp_env::with_var("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", Some("90"), || { assert_eq!(upstream_source_lag_cap(), Duration::from_millis(90)); }); } #[test] #[serial] fn duration_paced_rebaser_advances_by_packet_duration_when_source_pts_stretch() { reset_shared_source_capture_base_for_tests(); let rebased = DurationPacedSourcePtsRebaser::default(); let first = rebased.rebase_with_packet_duration(Some(0), 21_333, Duration::from_millis(250)); let second = rebased.rebase_with_packet_duration(Some(52_666), 21_333, Duration::from_millis(250)); assert_eq!( second.packet_pts_us.saturating_sub(first.packet_pts_us), 21_333 ); } #[test] #[serial] fn duration_paced_rebaser_clamps_when_duration_pacing_falls_stale() { reset_shared_source_capture_base_for_tests(); let rebased = DurationPacedSourcePtsRebaser::default(); let _first = rebased.rebase_with_packet_duration(Some(0), 10_000, Duration::from_millis(2)); std::thread::sleep(Duration::from_millis(8)); let second = rebased.rebase_with_packet_duration(Some(10_000), 10_000, Duration::from_millis(2)); assert!( second.packet_pts_us.saturating_add(2_500) >= second.capture_now_us, "duration-paced packet pts should never trail live capture by more than the lag cap" ); } }