From 7224ae399d2739f834e5b795a8c4c28bde94c832 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 27 Apr 2026 00:41:00 -0300 Subject: [PATCH] fix(sync): gate overlap startup on camera warmup --- Cargo.lock | 6 ++-- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- server/src/upstream_media_runtime.rs | 35 +++++++++++++++++++--- server/src/upstream_media_runtime/state.rs | 1 + server/src/upstream_media_runtime/tests.rs | 28 +++++++++++++++++ 7 files changed, 66 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bbdc759..d3aad4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.14.9" +version = "0.14.10" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.14.9" +version = "0.14.10" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.14.9" +version = "0.14.10" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index b289a6e..87183e8 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.9" +version = "0.14.10" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index a64f4ff..33d91a2 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.9" +version = "0.14.10" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 3040ed0..0058e90 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.9" +version = "0.14.10" edition = "2024" autobins = false diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 5893ef0..e7dbc0d 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -16,6 +16,14 @@ use config::{ }; use state::UpstreamClockState; +fn upstream_camera_startup_grace_us() -> u64 { + std::env::var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(if cfg!(test) { 0 } else { 250 }) + .saturating_mul(1_000) +} + /// Logical upstream media kinds that share one live-call session timeline. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamMediaKind { @@ -143,6 +151,7 @@ impl UpstreamMediaRuntime { state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; state.first_camera_remote_pts_us = None; state.first_microphone_remote_pts_us = None; + state.camera_startup_ready = false; state.session_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; @@ -216,6 +225,7 @@ impl UpstreamMediaRuntime { { state.first_camera_remote_pts_us = None; state.first_microphone_remote_pts_us = None; + state.camera_startup_ready = false; state.session_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; @@ -317,11 +327,27 @@ impl UpstreamMediaRuntime { state.microphone_packet_count } }; - let first_slot = match kind { - UpstreamMediaKind::Camera => &mut state.first_camera_remote_pts_us, - UpstreamMediaKind::Microphone => &mut state.first_microphone_remote_pts_us, + let mut first_remote_for_kind = match kind { + UpstreamMediaKind::Camera => { + let first_slot = &mut state.first_camera_remote_pts_us; + *first_slot.get_or_insert(remote_pts_us) + } + UpstreamMediaKind::Microphone => { + let first_slot = &mut state.first_microphone_remote_pts_us; + *first_slot.get_or_insert(remote_pts_us) + } }; - let first_remote_for_kind = *first_slot.get_or_insert(remote_pts_us); + if kind == UpstreamMediaKind::Camera { + let startup_grace_us = upstream_camera_startup_grace_us(); + if !state.camera_startup_ready + && (startup_grace_us == 0 + || remote_pts_us.saturating_sub(first_remote_for_kind) >= startup_grace_us) + { + state.camera_startup_ready = true; + state.first_camera_remote_pts_us = Some(remote_pts_us); + first_remote_for_kind = remote_pts_us; + } + } let now = Instant::now(); let pairing_deadline = *state .pairing_anchor_deadline @@ -331,6 +357,7 @@ impl UpstreamMediaRuntime { if state.session_base_remote_pts_us.is_none() { if state.first_camera_remote_pts_us.is_some() && state.first_microphone_remote_pts_us.is_some() + && state.camera_startup_ready { let first_camera_remote_pts_us = state.first_camera_remote_pts_us.unwrap_or_default(); diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs index 4e95001..ac57526 100644 --- a/server/src/upstream_media_runtime/state.rs +++ b/server/src/upstream_media_runtime/state.rs @@ -7,6 +7,7 @@ pub(super) struct UpstreamClockState { pub active_microphone_generation: Option, pub first_camera_remote_pts_us: Option, pub first_microphone_remote_pts_us: Option, + pub camera_startup_ready: bool, pub session_base_remote_pts_us: Option, pub last_video_local_pts_us: Option, pub last_audio_local_pts_us: Option, diff --git a/server/src/upstream_media_runtime/tests.rs b/server/src/upstream_media_runtime/tests.rs index 480ed8f..fb604dc 100644 --- a/server/src/upstream_media_runtime/tests.rs +++ b/server/src/upstream_media_runtime/tests.rs @@ -63,6 +63,34 @@ fn first_packets_wait_for_the_counterpart_before_pairing() { assert_eq!(audio_first.due_at, video_first.due_at); } +#[test] +fn overlap_waits_for_camera_startup_grace_before_establishing_the_shared_base() { + temp_env::with_var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS", Some("250"), || { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + assert!(matches!( + runtime.plan_audio_pts(1_000_000), + super::UpstreamPlanDecision::AwaitingPair + )); + assert!(matches!( + runtime.plan_video_pts(1_200_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + + let video_ready = play(runtime.plan_video_pts(1_250_000, 16_666)); + let audio_ready = play(runtime.plan_audio_pts(1_260_000)); + + assert_eq!(video_ready.local_pts_us, 0); + assert_eq!(audio_ready.local_pts_us, 10_000); + }); +} + #[test] fn overlap_pairing_drops_leading_packets_before_the_shared_base() { let runtime = UpstreamMediaRuntime::new();