// Shared live-capture clock shim for include-based client contracts. // // Scope: provide the subset of `client::live_capture_clock` needed by // include tests that compile client modules inside `lesavka_testing`. // Targets: client include-contract harnesses under `testing/tests/`. // Why: include tests should exercise production modules without depending on // the whole client crate module tree. use std::sync::{Mutex, OnceLock}; use std::time::{Duration, Instant}; const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250; const DEFAULT_SOURCE_LEAD_CAP_MS: u64 = 80; fn capture_clock_origin() -> &'static Instant { static ORIGIN: OnceLock = OnceLock::new(); ORIGIN.get_or_init(Instant::now) } pub fn capture_pts_us() -> u64 { capture_clock_origin().elapsed().as_micros() as u64 } pub fn packet_age(pts_us: u64) -> Duration { Duration::from_micros(capture_pts_us().saturating_sub(pts_us)) } pub fn upstream_timing_trace_enabled() -> bool { std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE") .ok() .map(|value| { let trimmed = value.trim(); !(trimmed.eq_ignore_ascii_case("0") || trimmed.eq_ignore_ascii_case("false") || trimmed.eq_ignore_ascii_case("no") || trimmed.eq_ignore_ascii_case("off")) }) .unwrap_or(false) } pub fn upstream_source_lag_cap() -> Duration { std::env::var("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS") .ok() .and_then(|raw| raw.trim().parse::().ok()) .filter(|value| *value > 0) .map(Duration::from_millis) .unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS)) } pub fn upstream_source_lead_cap() -> Duration { std::env::var("LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS") .ok() .and_then(|raw| raw.trim().parse::().ok()) .filter(|value| *value > 0) .map(Duration::from_millis) .unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LEAD_CAP_MS)) } #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct RebasedSourcePts { pub packet_pts_us: u64, pub capture_now_us: u64, pub source_pts_us: Option, pub source_base_us: Option, pub capture_base_us: Option, pub used_source_pts: bool, pub lag_clamped: bool, pub lead_clamped: bool, } #[derive(Debug, Default)] struct SourcePtsRebaserState { source_base_us: Option, capture_base_us: Option, last_packet_pts_us: Option, } #[derive(Debug, Default)] pub struct SourcePtsRebaser { state: Mutex, } #[derive(Debug, Default)] struct DurationPacedSourcePtsState { next_packet_pts_us: Option, last_packet_pts_us: Option, } #[derive(Debug, Default)] pub struct DurationPacedSourcePtsRebaser { anchor_rebaser: SourcePtsRebaser, state: Mutex, } impl SourcePtsRebaser { pub fn rebase_or_now(&self, source_pts_us: Option, min_step_us: u64) -> RebasedSourcePts { self.rebase_with_lag_cap(source_pts_us, min_step_us, None) } pub fn rebase_with_lag_cap( &self, source_pts_us: Option, min_step_us: u64, max_lag: Option, ) -> RebasedSourcePts { let capture_now_us = capture_pts_us(); let mut state = self .state .lock() .expect("source pts rebaser mutex poisoned"); let mut packet_pts_us = capture_now_us; let mut used_source_pts = false; let mut lag_clamped = false; let mut lead_clamped = false; if let Some(source_pts_us) = source_pts_us { let source_base_us = *state.source_base_us.get_or_insert(source_pts_us); let capture_base_us = *state.capture_base_us.get_or_insert(capture_now_us); packet_pts_us = capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us)); used_source_pts = true; } if used_source_pts && let Some(max_lag) = max_lag { let lag_floor_us = capture_now_us.saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64); if packet_pts_us < lag_floor_us { packet_pts_us = lag_floor_us; lag_clamped = true; } let lead_ceiling_us = capture_now_us.saturating_add( upstream_source_lead_cap() .as_micros() .min(u64::MAX as u128) as u64, ); if packet_pts_us > lead_ceiling_us { packet_pts_us = lead_ceiling_us; lead_clamped = true; } } if let Some(last_packet_pts_us) = state.last_packet_pts_us && packet_pts_us <= last_packet_pts_us { packet_pts_us = last_packet_pts_us.saturating_add(min_step_us.max(1)); } state.last_packet_pts_us = Some(packet_pts_us); RebasedSourcePts { packet_pts_us, capture_now_us, source_pts_us, source_base_us: state.source_base_us, capture_base_us: state.capture_base_us, used_source_pts, lag_clamped, lead_clamped, } } } impl DurationPacedSourcePtsRebaser { pub fn rebase_with_packet_duration( &self, source_pts_us: Option, packet_duration_us: u64, max_lag: Duration, ) -> RebasedSourcePts { let step_us = packet_duration_us.max(1); let mut rebased = self.anchor_rebaser .rebase_with_lag_cap(source_pts_us, step_us, Some(max_lag)); let lag_floor_us = rebased .capture_now_us .saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64); let mut state = self .state .lock() .expect("duration paced source pts rebaser mutex poisoned"); let mut packet_pts_us = state.next_packet_pts_us.unwrap_or(rebased.packet_pts_us); if packet_pts_us < lag_floor_us { packet_pts_us = lag_floor_us; rebased.lag_clamped = true; } let lead_ceiling_us = rebased.capture_now_us.saturating_add( upstream_source_lead_cap() .as_micros() .min(u64::MAX as u128) as u64, ); if packet_pts_us > lead_ceiling_us { packet_pts_us = lead_ceiling_us; rebased.lead_clamped = true; } if let Some(last_packet_pts_us) = state.last_packet_pts_us && packet_pts_us <= last_packet_pts_us { packet_pts_us = last_packet_pts_us.saturating_add(1); } state.last_packet_pts_us = Some(packet_pts_us); state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us)); rebased.packet_pts_us = packet_pts_us; rebased } } #[cfg(test)] mod tests { #[test] fn shim_rebases_packet_duration_monotonically() { let rebaser = super::DurationPacedSourcePtsRebaser::default(); let first = rebaser.rebase_with_packet_duration( Some(1_000), 10_000, std::time::Duration::from_millis(250), ); let second = rebaser.rebase_with_packet_duration( Some(2_000), 10_000, std::time::Duration::from_millis(250), ); assert!(second.packet_pts_us > first.packet_pts_us); } }