From f8e53d596975e8da78164c48a7cbb89209bfe03c Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 27 Apr 2026 02:13:19 -0300 Subject: [PATCH] fix(sync): keep reanchor inside startup window --- Cargo.lock | 6 ++--- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- server/src/upstream_media_runtime.rs | 10 +++++++- server/src/upstream_media_runtime/tests.rs | 30 ++++++++++++++++++++++ 6 files changed, 45 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 73d234e..3f4af52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.14.11" +version = "0.14.12" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.14.11" +version = "0.14.12" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.14.11" +version = "0.14.12" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 9fd0921..ae70c4a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.11" +version = "0.14.12" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 3d56f96..01fad29 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.11" +version = "0.14.12" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index b0fd3d3..ef7e516 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.11" +version = "0.14.12" edition = "2024" autobins = false diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index b811ba2..8957ca0 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -24,6 +24,10 @@ fn upstream_camera_startup_grace_us() -> u64 { .saturating_mul(1_000) } +fn upstream_reanchor_window_us(playout_delay: Duration) -> u64 { + playout_delay.as_micros().min(u64::MAX as u128) as u64 +} + /// Logical upstream media kinds that share one live-call session timeline. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamMediaKind { @@ -468,7 +472,11 @@ impl UpstreamMediaRuntime { apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); - if !state.catastrophic_reanchor_done && late_by > reanchor_threshold { + let reanchor_window_us = upstream_reanchor_window_us(playout_delay); + if !state.catastrophic_reanchor_done + && local_pts_us <= reanchor_window_us + && late_by > reanchor_threshold + { let old_late_by = late_by; let desired_due_at = now + playout_delay; let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); diff --git a/server/src/upstream_media_runtime/tests.rs b/server/src/upstream_media_runtime/tests.rs index 4fb0966..91bfd13 100644 --- a/server/src/upstream_media_runtime/tests.rs +++ b/server/src/upstream_media_runtime/tests.rs @@ -457,6 +457,36 @@ fn catastrophic_lateness_reanchors_only_once_per_session() { }); } +#[test] +fn catastrophic_lateness_does_not_reanchor_once_the_session_is_well_past_startup() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { + temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio_first = play(runtime.plan_audio_pts(1_000_000)); + let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); + std::thread::sleep(Duration::from_millis(130)); + + let late_audio = play(runtime.plan_audio_pts(1_100_000)); + assert_eq!(late_audio.local_pts_us, 100_000); + assert!( + late_audio.late_by > Duration::from_millis(5), + "late packet should remain late instead of reanchoring the shared epoch mid-session" + ); + assert!( + late_audio.due_at <= tokio::time::Instant::now(), + "mid-session lateness should no longer push due_at back into the future" + ); + }); + }); +} + #[tokio::test(flavor = "current_thread")] async fn wait_for_audio_master_releases_video_once_audio_catches_up() { let runtime = Arc::new(UpstreamMediaRuntime::new());