From f0f204b7777fdd00fc68a7798d7da73bc724c844 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 2 May 2026 20:23:40 -0300 Subject: [PATCH] fix: drain camera stream while waiting for audio --- AGENTS.md | 15 ++++ Cargo.lock | 6 +- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- server/src/main/relay_service.rs | 99 ++++++++++++++++++----- server/src/main/relay_service_tests.rs | 49 ++++++++++- server/src/main/relay_stream_lifecycle.rs | 25 ++++++ 8 files changed, 172 insertions(+), 28 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index f95c4fd..7c42ab6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -581,3 +581,18 @@ pairing collapses. - [x] Update manual probe contract coverage for the safer defaults and refusal reason. - [ ] Re-run the probe-calibrate-confirm flow; analyzer failures should diagnose but not mutate calibration unless raw fallback is explicitly enabled and has enough coded support. - [ ] If client send/capture p95 stays low and server receive p95 stays high, localize the transport/server-receive timing layer next. + +## 0.17.31 Server Receive Timing Drain Checklist + +Context: the 0.17.30 mirrored run kept calibration stable and proved the client enqueue-side timing +fix held: client capture/send p95 stayed in single-digit milliseconds after startup. The remaining +blind blocker moved to server receive timing, where video packets were only timestamped when the +camera playout loop woke up after waiting for the audio master. + +- [x] Treat the 0.17.30 run as confirmation that raw-failure calibration no longer whipsaws offsets. +- [x] Keep polling and timing inbound camera packets while video waits for the audio master. +- [x] Keep polling and timing inbound camera packets while video waits for its due time. +- [x] Coalesce pending video to the freshest packet during those waits so the server does not build a stale video backlog. +- [x] Add regression coverage that video timing is recorded at enqueue/drain time before scheduler waits. +- [ ] Re-run the probe-calibrate-confirm flow; `planner_server_receive_abs_skew_p95_ms` should fall if this was the receive-side scheduling leak. +- [ ] If receive p95 remains high after this, inspect actual gRPC/HTTP2 stream delivery and OS/network scheduling rather than static calibration. diff --git a/Cargo.lock b/Cargo.lock index f1d5670..cdc6e55 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.30" +version = "0.17.31" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.30" +version = "0.17.31" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.30" +version = "0.17.31" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index f9cf781..a108dbf 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.30" +version = "0.17.31" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index c5d6c1e..f844420 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.30" +version = "0.17.31" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index eb4e87a..2b9b70f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.30" +version = "0.17.31" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index b499e91..937df4f 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -360,7 +360,7 @@ impl Relay for Handler { let mut inbound_closed = false; let stale_drop_budget = upstream_stale_drop_budget(); let mut startup_video_settled = false; - loop { + 'camera_loop: loop { if !camera_rt.is_active(camera_session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) { @@ -381,21 +381,15 @@ impl Relay for Handler { if let Some(next_packet) = next_packet { match next_packet.transpose() { Ok(Some(pkt)) => { - upstream_media_rt.record_client_timing( - UpstreamMediaKind::Camera, - video_client_timing(&pkt), + enqueue_pending_upstream_video_packet( + &upstream_media_rt, + &mut pending, + pkt, + rpc_id, + upstream_lease.session_id, + camera_session_id, + "poll", ); - pending.push_back(pkt); - let coalesced = retain_freshest_video_packet(&mut pending); - if coalesced > 0 { - tracing::debug!( - rpc_id, - session_id = upstream_lease.session_id, - camera_session_id, - dropped = coalesced, - "🎥 coalesced stale upstream video backlog down to the freshest frame" - ); - } } Ok(None) => inbound_closed = true, Err(err) => { @@ -406,7 +400,7 @@ impl Relay for Handler { camera_session_id, "🎥 stream_camera inbound error before clean EOF: {err}" ); - break; + break 'camera_loop; } } } @@ -460,10 +454,41 @@ impl Relay for Handler { } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; - if !upstream_media_rt - .wait_for_audio_master(plan.local_pts_us, plan.due_at) - .await - { + let mut audio_master_wait = std::pin::pin!( + upstream_media_rt.wait_for_audio_master(plan.local_pts_us, plan.due_at) + ); + let audio_master_ready = loop { + tokio::select! { + ready = &mut audio_master_wait => break ready, + next_packet = s.next(), if !inbound_closed => { + match next_packet.transpose() { + Ok(Some(next_pkt)) => { + enqueue_pending_upstream_video_packet( + &upstream_media_rt, + &mut pending, + next_pkt, + rpc_id, + upstream_lease.session_id, + camera_session_id, + "audio-master-wait", + ); + } + Ok(None) => inbound_closed = true, + Err(err) => { + cleanup.mark_aborted(); + warn!( + rpc_id, + session_id = upstream_lease.session_id, + camera_session_id, + "🎥 stream_camera inbound error while waiting for audio master: {err}" + ); + break 'camera_loop; + } + } + } + } + }; + if !audio_master_ready { upstream_media_rt.record_video_freeze( "video froze because audio master did not reach the frame timestamp", ); @@ -501,7 +526,39 @@ impl Relay for Handler { } continue; } - tokio::time::sleep_until(plan.due_at).await; + let sleep_until_due = tokio::time::sleep_until(plan.due_at); + tokio::pin!(sleep_until_due); + loop { + tokio::select! { + _ = &mut sleep_until_due => break, + next_packet = s.next(), if !inbound_closed => { + match next_packet.transpose() { + Ok(Some(next_pkt)) => { + enqueue_pending_upstream_video_packet( + &upstream_media_rt, + &mut pending, + next_pkt, + rpc_id, + upstream_lease.session_id, + camera_session_id, + "due-wait", + ); + } + Ok(None) => inbound_closed = true, + Err(err) => { + cleanup.mark_aborted(); + warn!( + rpc_id, + session_id = upstream_lease.session_id, + camera_session_id, + "🎥 stream_camera inbound error while waiting for frame due time: {err}" + ); + break 'camera_loop; + } + } + } + } + } let actual_late_by = tokio::time::Instant::now() .checked_duration_since(plan.due_at) .unwrap_or_default(); diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index 28716f4..2235403 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -3,7 +3,9 @@ mod tests { use super::{UpstreamStreamCleanup, retain_freshest_audio_packet, retain_freshest_video_packet}; use lesavka_common::lesavka::{AudioPacket, VideoPacket}; - use lesavka_server::upstream_media_runtime::UpstreamMediaRuntime; + use lesavka_server::upstream_media_runtime::{ + UpstreamMediaKind, UpstreamMediaRuntime, UpstreamClientTiming, + }; use std::sync::Arc; #[test] @@ -47,6 +49,51 @@ mod tests { assert_eq!(pending.back().map(|pkt| pkt.pts), Some(900)); } + #[test] + fn enqueue_pending_upstream_video_packet_records_timing_before_scheduler_waits() { + let runtime = UpstreamMediaRuntime::new(); + let mut pending = std::collections::VecDeque::from(vec![VideoPacket { + pts: 10, + client_capture_pts_us: 10, + client_send_pts_us: 10, + ..Default::default() + }]); + + runtime.record_client_timing( + UpstreamMediaKind::Microphone, + UpstreamClientTiming { + capture_pts_us: 20, + send_pts_us: 20, + queue_depth: 0, + queue_age_ms: 0, + }, + ); + + super::enqueue_pending_upstream_video_packet( + &runtime, + &mut pending, + VideoPacket { + pts: 20, + client_capture_pts_us: 20, + client_send_pts_us: 20, + ..Default::default() + }, + 1, + 2, + 3, + "test", + ); + + assert_eq!(pending.len(), 1); + assert_eq!(pending.front().map(|pkt| pkt.pts), Some(20)); + let snapshot = runtime.snapshot(); + assert_eq!(snapshot.client_timing_window_samples, 1); + assert!( + snapshot.server_receive_abs_skew_p95_ms.unwrap_or(f64::INFINITY) < 50.0, + "video timing should be recorded as the packet is enqueued, not after playout waits" + ); + } + #[test] fn upstream_cleanup_guard_closes_its_microphone_generation() { let runtime = Arc::new(UpstreamMediaRuntime::new()); diff --git a/server/src/main/relay_stream_lifecycle.rs b/server/src/main/relay_stream_lifecycle.rs index 6d7e2ba..b497924 100644 --- a/server/src/main/relay_stream_lifecycle.rs +++ b/server/src/main/relay_stream_lifecycle.rs @@ -22,6 +22,31 @@ fn retain_freshest_video_packet( dropped } +#[cfg(not(coverage))] +fn enqueue_pending_upstream_video_packet( + runtime: &UpstreamMediaRuntime, + pending: &mut std::collections::VecDeque, + pkt: VideoPacket, + rpc_id: u64, + session_id: u64, + camera_session_id: u64, + context: &'static str, +) { + runtime.record_client_timing(UpstreamMediaKind::Camera, video_client_timing(&pkt)); + pending.push_back(pkt); + let coalesced = retain_freshest_video_packet(pending); + if coalesced > 0 { + tracing::debug!( + rpc_id, + session_id, + camera_session_id, + dropped = coalesced, + context, + "🎥 coalesced stale upstream video backlog down to the freshest frame" + ); + } +} + #[cfg(not(coverage))] const AUDIO_PENDING_LIVE_WINDOW_PACKETS: usize = 8;