From 7116306662a846a714183358800e99405e731183 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 26 Apr 2026 01:02:27 -0300 Subject: [PATCH] fix(sync): reanchor upstream playout only once --- server/src/upstream_media_runtime.rs | 5 +++- server/src/upstream_media_runtime/state.rs | 1 + server/src/upstream_media_runtime/tests.rs | 29 ++++++++++++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 32f4e67..f4d1204 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -151,6 +151,7 @@ impl UpstreamMediaRuntime { state.startup_anchor_logged = false; state.playout_epoch = None; state.pairing_anchor_deadline = None; + state.catastrophic_reanchor_done = false; } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), @@ -223,6 +224,7 @@ impl UpstreamMediaRuntime { state.startup_anchor_logged = false; state.playout_epoch = None; state.pairing_anchor_deadline = None; + state.catastrophic_reanchor_done = false; } self.pairing_state_notify.notify_waiters(); self.audio_progress_notify.notify_waiters(); @@ -421,7 +423,7 @@ impl UpstreamMediaRuntime { apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); - if late_by > reanchor_threshold { + if !state.catastrophic_reanchor_done && late_by > reanchor_threshold { let old_late_by = late_by; let desired_due_at = now + playout_delay; let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); @@ -430,6 +432,7 @@ impl UpstreamMediaRuntime { .unwrap_or(unoffset_due_at); state.playout_epoch = Some(recovered_epoch); state.pairing_anchor_deadline = Some(desired_due_at); + state.catastrophic_reanchor_done = true; due_at = apply_playout_offset( recovered_epoch + Duration::from_micros(local_pts_us), sink_offset_us, diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs index 1618cd5..4e95001 100644 --- a/server/src/upstream_media_runtime/state.rs +++ b/server/src/upstream_media_runtime/state.rs @@ -15,4 +15,5 @@ pub(super) struct UpstreamClockState { pub startup_anchor_logged: bool, pub playout_epoch: Option, pub pairing_anchor_deadline: Option, + pub catastrophic_reanchor_done: bool, } diff --git a/server/src/upstream_media_runtime/tests.rs b/server/src/upstream_media_runtime/tests.rs index 0e8cfa0..5b1deea 100644 --- a/server/src/upstream_media_runtime/tests.rs +++ b/server/src/upstream_media_runtime/tests.rs @@ -339,6 +339,35 @@ fn catastrophic_lateness_reanchors_the_shared_playout_epoch() { }); } +#[test] +fn catastrophic_lateness_reanchors_only_once_per_session() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { + temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio_first = play(runtime.plan_audio_pts(1_000_000)); + let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); + + std::thread::sleep(Duration::from_millis(30)); + let first_recovered = play(runtime.plan_audio_pts(1_000_000)); + assert!(first_recovered.due_at > tokio::time::Instant::now()); + + std::thread::sleep(Duration::from_millis(30)); + let second_late = play(runtime.plan_audio_pts(1_000_001)); + assert!( + second_late.late_by > Duration::from_millis(5), + "session should not keep extending itself with repeated reanchors" + ); + }); + }); +} + #[tokio::test(flavor = "current_thread")] async fn wait_for_audio_master_releases_video_once_audio_catches_up() { let runtime = Arc::new(UpstreamMediaRuntime::new());