2026-04-29 01:25:06 -03:00
|
|
|
// Shared live-capture clock shim for include-based client contracts.
|
|
|
|
|
//
|
|
|
|
|
// Scope: provide the subset of `client::live_capture_clock` needed by
|
|
|
|
|
// include tests that compile client modules inside `lesavka_testing`.
|
|
|
|
|
// Targets: client include-contract harnesses under `testing/tests/`.
|
|
|
|
|
// Why: include tests should exercise production modules without depending on
|
|
|
|
|
// the whole client crate module tree.
|
|
|
|
|
|
2026-04-25 16:48:20 -03:00
|
|
|
use std::sync::{Mutex, OnceLock};
|
|
|
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
2026-04-29 01:25:06 -03:00
|
|
|
const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250;
|
|
|
|
|
|
2026-04-25 16:48:20 -03:00
|
|
|
fn capture_clock_origin() -> &'static Instant {
|
|
|
|
|
static ORIGIN: OnceLock<Instant> = OnceLock::new();
|
|
|
|
|
ORIGIN.get_or_init(Instant::now)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn capture_pts_us() -> u64 {
|
|
|
|
|
capture_clock_origin().elapsed().as_micros() as u64
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-29 01:25:06 -03:00
|
|
|
pub fn packet_age(pts_us: u64) -> Duration {
|
|
|
|
|
Duration::from_micros(capture_pts_us().saturating_sub(pts_us))
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-25 16:48:20 -03:00
|
|
|
pub fn upstream_timing_trace_enabled() -> bool {
|
|
|
|
|
std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE")
|
|
|
|
|
.ok()
|
|
|
|
|
.map(|value| {
|
|
|
|
|
let trimmed = value.trim();
|
|
|
|
|
!(trimmed.eq_ignore_ascii_case("0")
|
|
|
|
|
|| trimmed.eq_ignore_ascii_case("false")
|
|
|
|
|
|| trimmed.eq_ignore_ascii_case("no")
|
|
|
|
|
|| trimmed.eq_ignore_ascii_case("off"))
|
|
|
|
|
})
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-29 01:25:06 -03:00
|
|
|
pub fn upstream_source_lag_cap() -> Duration {
|
|
|
|
|
std::env::var("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS")
|
|
|
|
|
.ok()
|
|
|
|
|
.and_then(|raw| raw.trim().parse::<u64>().ok())
|
|
|
|
|
.filter(|value| *value > 0)
|
|
|
|
|
.map(Duration::from_millis)
|
|
|
|
|
.unwrap_or_else(|| Duration::from_millis(DEFAULT_SOURCE_LAG_CAP_MS))
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-25 16:48:20 -03:00
|
|
|
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
|
|
|
|
|
pub struct RebasedSourcePts {
|
|
|
|
|
pub packet_pts_us: u64,
|
|
|
|
|
pub capture_now_us: u64,
|
|
|
|
|
pub source_pts_us: Option<u64>,
|
|
|
|
|
pub source_base_us: Option<u64>,
|
|
|
|
|
pub capture_base_us: Option<u64>,
|
|
|
|
|
pub used_source_pts: bool,
|
2026-04-29 01:25:06 -03:00
|
|
|
pub lag_clamped: bool,
|
2026-04-25 16:48:20 -03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
|
struct SourcePtsRebaserState {
|
|
|
|
|
source_base_us: Option<u64>,
|
|
|
|
|
capture_base_us: Option<u64>,
|
|
|
|
|
last_packet_pts_us: Option<u64>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
|
pub struct SourcePtsRebaser {
|
|
|
|
|
state: Mutex<SourcePtsRebaserState>,
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-29 01:25:06 -03:00
|
|
|
#[derive(Debug, Default)]
|
|
|
|
|
struct DurationPacedSourcePtsState {
|
|
|
|
|
next_packet_pts_us: Option<u64>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
|
pub struct DurationPacedSourcePtsRebaser {
|
|
|
|
|
anchor_rebaser: SourcePtsRebaser,
|
|
|
|
|
state: Mutex<DurationPacedSourcePtsState>,
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-25 16:48:20 -03:00
|
|
|
impl SourcePtsRebaser {
|
|
|
|
|
pub fn rebase_or_now(&self, source_pts_us: Option<u64>, min_step_us: u64) -> RebasedSourcePts {
|
2026-04-29 01:25:06 -03:00
|
|
|
self.rebase_with_lag_cap(source_pts_us, min_step_us, None)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn rebase_with_lag_cap(
|
|
|
|
|
&self,
|
|
|
|
|
source_pts_us: Option<u64>,
|
|
|
|
|
min_step_us: u64,
|
|
|
|
|
max_lag: Option<Duration>,
|
|
|
|
|
) -> RebasedSourcePts {
|
2026-04-25 16:48:20 -03:00
|
|
|
let capture_now_us = capture_pts_us();
|
|
|
|
|
let mut state = self
|
|
|
|
|
.state
|
|
|
|
|
.lock()
|
|
|
|
|
.expect("source pts rebaser mutex poisoned");
|
|
|
|
|
let mut packet_pts_us = capture_now_us;
|
|
|
|
|
let mut used_source_pts = false;
|
2026-04-29 01:25:06 -03:00
|
|
|
let mut lag_clamped = false;
|
2026-04-25 16:48:20 -03:00
|
|
|
|
|
|
|
|
if let Some(source_pts_us) = source_pts_us {
|
|
|
|
|
let source_base_us = *state.source_base_us.get_or_insert(source_pts_us);
|
|
|
|
|
let capture_base_us = *state.capture_base_us.get_or_insert(capture_now_us);
|
|
|
|
|
packet_pts_us =
|
|
|
|
|
capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us));
|
|
|
|
|
used_source_pts = true;
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-29 01:25:06 -03:00
|
|
|
if used_source_pts && let Some(max_lag) = max_lag {
|
|
|
|
|
let lag_floor_us =
|
|
|
|
|
capture_now_us.saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64);
|
|
|
|
|
if packet_pts_us < lag_floor_us {
|
|
|
|
|
packet_pts_us = lag_floor_us;
|
|
|
|
|
lag_clamped = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-25 16:48:20 -03:00
|
|
|
if let Some(last_packet_pts_us) = state.last_packet_pts_us
|
|
|
|
|
&& packet_pts_us <= last_packet_pts_us
|
|
|
|
|
{
|
|
|
|
|
packet_pts_us = last_packet_pts_us.saturating_add(min_step_us.max(1));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
state.last_packet_pts_us = Some(packet_pts_us);
|
|
|
|
|
RebasedSourcePts {
|
|
|
|
|
packet_pts_us,
|
|
|
|
|
capture_now_us,
|
|
|
|
|
source_pts_us,
|
|
|
|
|
source_base_us: state.source_base_us,
|
|
|
|
|
capture_base_us: state.capture_base_us,
|
|
|
|
|
used_source_pts,
|
2026-04-29 01:25:06 -03:00
|
|
|
lag_clamped,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl DurationPacedSourcePtsRebaser {
|
|
|
|
|
pub fn rebase_with_packet_duration(
|
|
|
|
|
&self,
|
|
|
|
|
source_pts_us: Option<u64>,
|
|
|
|
|
packet_duration_us: u64,
|
|
|
|
|
max_lag: Duration,
|
|
|
|
|
) -> RebasedSourcePts {
|
|
|
|
|
let step_us = packet_duration_us.max(1);
|
|
|
|
|
let mut rebased =
|
|
|
|
|
self.anchor_rebaser
|
|
|
|
|
.rebase_with_lag_cap(source_pts_us, step_us, Some(max_lag));
|
|
|
|
|
let lag_floor_us = rebased
|
|
|
|
|
.capture_now_us
|
|
|
|
|
.saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64);
|
|
|
|
|
let mut state = self
|
|
|
|
|
.state
|
|
|
|
|
.lock()
|
|
|
|
|
.expect("duration paced source pts rebaser mutex poisoned");
|
|
|
|
|
let mut packet_pts_us = state.next_packet_pts_us.unwrap_or(rebased.packet_pts_us);
|
|
|
|
|
if packet_pts_us < lag_floor_us {
|
|
|
|
|
packet_pts_us = lag_floor_us;
|
|
|
|
|
rebased.lag_clamped = true;
|
2026-04-25 16:48:20 -03:00
|
|
|
}
|
2026-04-29 01:25:06 -03:00
|
|
|
state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us));
|
|
|
|
|
rebased.packet_pts_us = packet_pts_us;
|
|
|
|
|
rebased
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
#[test]
|
|
|
|
|
fn shim_rebases_packet_duration_monotonically() {
|
|
|
|
|
let rebaser = super::DurationPacedSourcePtsRebaser::default();
|
|
|
|
|
let first = rebaser.rebase_with_packet_duration(
|
|
|
|
|
Some(1_000),
|
|
|
|
|
10_000,
|
|
|
|
|
std::time::Duration::from_millis(250),
|
|
|
|
|
);
|
|
|
|
|
let second = rebaser.rebase_with_packet_duration(
|
|
|
|
|
Some(2_000),
|
|
|
|
|
10_000,
|
|
|
|
|
std::time::Duration::from_millis(250),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
assert!(second.packet_pts_us > first.packet_pts_us);
|
2026-04-25 16:48:20 -03:00
|
|
|
}
|
|
|
|
|
}
|