From 56c475f742cf077ea29f9b315f9bff385a72444d Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 27 Apr 2026 01:08:06 -0300 Subject: [PATCH] fix(sync): keep microphone paired through camera warmup --- Cargo.lock | 6 ++-- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- server/src/main/relay_service.rs | 27 ------------------ server/src/main/relay_service_coverage.rs | 18 ------------ server/src/upstream_media_runtime.rs | 13 +++++++++ server/src/upstream_media_runtime/tests.rs | 33 ++++++++++++++++++++++ 8 files changed, 52 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d3aad4e..73d234e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.14.10" +version = "0.14.11" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 87183e8..9fd0921 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.10" +version = "0.14.11" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 33d91a2..3d56f96 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.10" +version = "0.14.11" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 0058e90..b0fd3d3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.10" +version = "0.14.11" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index a95ba51..7c568d6 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -8,15 +8,6 @@ fn upstream_stale_drop_budget() -> Duration { Duration::from_millis(drop_ms) } -#[cfg(not(coverage))] -fn upstream_camera_startup_grace() -> Duration { - let grace_ms = std::env::var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS") - .ok() - .and_then(|value| value.trim().parse::().ok()) - .unwrap_or(250); - Duration::from_millis(grace_ms) -} - #[cfg(not(coverage))] fn retain_freshest_video_packet( pending: &mut std::collections::VecDeque, @@ -289,10 +280,6 @@ impl Relay for Handler { let mut inbound_closed = false; let stale_drop_budget = upstream_stale_drop_budget(); let mut startup_video_settled = false; - let startup_grace_us = upstream_camera_startup_grace() - .as_micros() - .min(u64::MAX as u128) as u64; - let mut cold_startup_grace_pending = startup_grace_us > 0; loop { if !camera_rt.is_active(session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) @@ -339,20 +326,6 @@ impl Relay for Handler { } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; - if cold_startup_grace_pending && plan.local_pts_us < startup_grace_us { - let coalesced = retain_freshest_video_packet(&mut pending); - tracing::debug!( - rpc_id, - session_id, - remote_pts_us = pkt.pts, - local_pts_us = plan.local_pts_us, - startup_grace_us, - dropped_pending = coalesced, - "🎥 dropping startup video until the shared-session warm-up grace is spent" - ); - continue; - } - cold_startup_grace_pending = false; if !upstream_media_rt .wait_for_audio_master(plan.local_pts_us, plan.due_at) .await diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 6ae039f..c774bc9 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -7,15 +7,6 @@ fn upstream_stale_drop_budget() -> Duration { Duration::from_millis(drop_ms) } -#[cfg(coverage)] -fn upstream_camera_startup_grace() -> Duration { - let grace_ms = std::env::var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS") - .ok() - .and_then(|value| value.trim().parse::().ok()) - .unwrap_or(250); - Duration::from_millis(grace_ms) -} - #[cfg(coverage)] fn retain_freshest_video_packet( pending: &mut std::collections::VecDeque, @@ -184,10 +175,6 @@ impl Relay for Handler { let mut pending = std::collections::VecDeque::new(); let mut inbound_closed = false; let stale_drop_budget = upstream_stale_drop_budget(); - let startup_grace_us = upstream_camera_startup_grace() - .as_micros() - .min(u64::MAX as u128) as u64; - let mut cold_startup_grace_pending = startup_grace_us > 0; loop { if !camera_rt.is_active(session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) @@ -225,11 +212,6 @@ impl Relay for Handler { } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; - if cold_startup_grace_pending && plan.local_pts_us < startup_grace_us { - let _ = retain_freshest_video_packet(&mut pending); - continue; - } - cold_startup_grace_pending = false; if !upstream_media_rt .wait_for_audio_master(plan.local_pts_us, plan.due_at) .await diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index e7dbc0d..b811ba2 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -397,6 +397,19 @@ impl UpstreamMediaRuntime { ); } return UpstreamPlanDecision::AwaitingPair; + } else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready { + if upstream_timing_trace_enabled() + && (packet_count <= 10 || packet_count.is_multiple_of(300)) + { + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + "upstream media packet buffered while camera startup warm-up is still in progress" + ); + } + return UpstreamPlanDecision::AwaitingPair; } else { let single_stream_base_remote_pts_us = match kind { UpstreamMediaKind::Camera => { diff --git a/server/src/upstream_media_runtime/tests.rs b/server/src/upstream_media_runtime/tests.rs index fb604dc..4fb0966 100644 --- a/server/src/upstream_media_runtime/tests.rs +++ b/server/src/upstream_media_runtime/tests.rs @@ -91,6 +91,39 @@ fn overlap_waits_for_camera_startup_grace_before_establishing_the_shared_base() }); } +#[test] +fn pairing_window_does_not_expire_into_one_sided_playout_while_camera_warms_up() { + temp_env::with_var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS", Some("250"), || { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + assert!(matches!( + runtime.plan_audio_pts(1_000_000), + super::UpstreamPlanDecision::AwaitingPair + )); + + std::thread::sleep(Duration::from_millis(30)); + + assert!(matches!( + runtime.plan_audio_pts(1_010_000), + super::UpstreamPlanDecision::AwaitingPair + )); + + let video_ready = play(runtime.plan_video_pts(1_250_000, 16_666)); + let audio_ready = play(runtime.plan_audio_pts(1_260_000)); + + assert_eq!(video_ready.local_pts_us, 0); + assert_eq!(audio_ready.local_pts_us, 10_000); + }); + }); +} + #[test] fn overlap_pairing_drops_leading_packets_before_the_shared_base() { let runtime = UpstreamMediaRuntime::new();