fix: drain camera stream while waiting for audio

This commit is contained in:
Brad Stein 2026-05-02 20:23:40 -03:00
parent 5634e7197d
commit f0f204b777
8 changed files with 172 additions and 28 deletions

View File

@ -581,3 +581,18 @@ pairing collapses.
- [x] Update manual probe contract coverage for the safer defaults and refusal reason. - [x] Update manual probe contract coverage for the safer defaults and refusal reason.
- [ ] Re-run the probe-calibrate-confirm flow; analyzer failures should diagnose but not mutate calibration unless raw fallback is explicitly enabled and has enough coded support. - [ ] Re-run the probe-calibrate-confirm flow; analyzer failures should diagnose but not mutate calibration unless raw fallback is explicitly enabled and has enough coded support.
- [ ] If client send/capture p95 stays low and server receive p95 stays high, localize the transport/server-receive timing layer next. - [ ] If client send/capture p95 stays low and server receive p95 stays high, localize the transport/server-receive timing layer next.
## 0.17.31 Server Receive Timing Drain Checklist
Context: the 0.17.30 mirrored run kept calibration stable and proved the client enqueue-side timing
fix held: client capture/send p95 stayed in single-digit milliseconds after startup. The remaining
blind blocker moved to server receive timing, where video packets were only timestamped when the
camera playout loop woke up after waiting for the audio master.
- [x] Treat the 0.17.30 run as confirmation that raw-failure calibration no longer whipsaws offsets.
- [x] Keep polling and timing inbound camera packets while video waits for the audio master.
- [x] Keep polling and timing inbound camera packets while video waits for its due time.
- [x] Coalesce pending video to the freshest packet during those waits so the server does not build a stale video backlog.
- [x] Add regression coverage that video timing is recorded at enqueue/drain time before scheduler waits.
- [ ] Re-run the probe-calibrate-confirm flow; `planner_server_receive_abs_skew_p95_ms` should fall if this was the receive-side scheduling leak.
- [ ] If receive p95 remains high after this, inspect actual gRPC/HTTP2 stream delivery and OS/network scheduling rather than static calibration.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.17.30" version = "0.17.31"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.17.30" version = "0.17.31"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.17.30" version = "0.17.31"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.17.30" version = "0.17.31"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.17.30" version = "0.17.31"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.17.30" version = "0.17.31"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -360,7 +360,7 @@ impl Relay for Handler {
let mut inbound_closed = false; let mut inbound_closed = false;
let stale_drop_budget = upstream_stale_drop_budget(); let stale_drop_budget = upstream_stale_drop_budget();
let mut startup_video_settled = false; let mut startup_video_settled = false;
loop { 'camera_loop: loop {
if !camera_rt.is_active(camera_session_id) if !camera_rt.is_active(camera_session_id)
|| !upstream_media_rt.is_camera_active(upstream_lease.generation) || !upstream_media_rt.is_camera_active(upstream_lease.generation)
{ {
@ -381,21 +381,15 @@ impl Relay for Handler {
if let Some(next_packet) = next_packet { if let Some(next_packet) = next_packet {
match next_packet.transpose() { match next_packet.transpose() {
Ok(Some(pkt)) => { Ok(Some(pkt)) => {
upstream_media_rt.record_client_timing( enqueue_pending_upstream_video_packet(
UpstreamMediaKind::Camera, &upstream_media_rt,
video_client_timing(&pkt), &mut pending,
pkt,
rpc_id,
upstream_lease.session_id,
camera_session_id,
"poll",
); );
pending.push_back(pkt);
let coalesced = retain_freshest_video_packet(&mut pending);
if coalesced > 0 {
tracing::debug!(
rpc_id,
session_id = upstream_lease.session_id,
camera_session_id,
dropped = coalesced,
"🎥 coalesced stale upstream video backlog down to the freshest frame"
);
}
} }
Ok(None) => inbound_closed = true, Ok(None) => inbound_closed = true,
Err(err) => { Err(err) => {
@ -406,7 +400,7 @@ impl Relay for Handler {
camera_session_id, camera_session_id,
"🎥 stream_camera inbound error before clean EOF: {err}" "🎥 stream_camera inbound error before clean EOF: {err}"
); );
break; break 'camera_loop;
} }
} }
} }
@ -460,10 +454,41 @@ impl Relay for Handler {
} }
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
}; };
if !upstream_media_rt let mut audio_master_wait = std::pin::pin!(
.wait_for_audio_master(plan.local_pts_us, plan.due_at) upstream_media_rt.wait_for_audio_master(plan.local_pts_us, plan.due_at)
.await );
{ let audio_master_ready = loop {
tokio::select! {
ready = &mut audio_master_wait => break ready,
next_packet = s.next(), if !inbound_closed => {
match next_packet.transpose() {
Ok(Some(next_pkt)) => {
enqueue_pending_upstream_video_packet(
&upstream_media_rt,
&mut pending,
next_pkt,
rpc_id,
upstream_lease.session_id,
camera_session_id,
"audio-master-wait",
);
}
Ok(None) => inbound_closed = true,
Err(err) => {
cleanup.mark_aborted();
warn!(
rpc_id,
session_id = upstream_lease.session_id,
camera_session_id,
"🎥 stream_camera inbound error while waiting for audio master: {err}"
);
break 'camera_loop;
}
}
}
}
};
if !audio_master_ready {
upstream_media_rt.record_video_freeze( upstream_media_rt.record_video_freeze(
"video froze because audio master did not reach the frame timestamp", "video froze because audio master did not reach the frame timestamp",
); );
@ -501,7 +526,39 @@ impl Relay for Handler {
} }
continue; continue;
} }
tokio::time::sleep_until(plan.due_at).await; let sleep_until_due = tokio::time::sleep_until(plan.due_at);
tokio::pin!(sleep_until_due);
loop {
tokio::select! {
_ = &mut sleep_until_due => break,
next_packet = s.next(), if !inbound_closed => {
match next_packet.transpose() {
Ok(Some(next_pkt)) => {
enqueue_pending_upstream_video_packet(
&upstream_media_rt,
&mut pending,
next_pkt,
rpc_id,
upstream_lease.session_id,
camera_session_id,
"due-wait",
);
}
Ok(None) => inbound_closed = true,
Err(err) => {
cleanup.mark_aborted();
warn!(
rpc_id,
session_id = upstream_lease.session_id,
camera_session_id,
"🎥 stream_camera inbound error while waiting for frame due time: {err}"
);
break 'camera_loop;
}
}
}
}
}
let actual_late_by = tokio::time::Instant::now() let actual_late_by = tokio::time::Instant::now()
.checked_duration_since(plan.due_at) .checked_duration_since(plan.due_at)
.unwrap_or_default(); .unwrap_or_default();

View File

@ -3,7 +3,9 @@
mod tests { mod tests {
use super::{UpstreamStreamCleanup, retain_freshest_audio_packet, retain_freshest_video_packet}; use super::{UpstreamStreamCleanup, retain_freshest_audio_packet, retain_freshest_video_packet};
use lesavka_common::lesavka::{AudioPacket, VideoPacket}; use lesavka_common::lesavka::{AudioPacket, VideoPacket};
use lesavka_server::upstream_media_runtime::UpstreamMediaRuntime; use lesavka_server::upstream_media_runtime::{
UpstreamMediaKind, UpstreamMediaRuntime, UpstreamClientTiming,
};
use std::sync::Arc; use std::sync::Arc;
#[test] #[test]
@ -47,6 +49,51 @@ mod tests {
assert_eq!(pending.back().map(|pkt| pkt.pts), Some(900)); assert_eq!(pending.back().map(|pkt| pkt.pts), Some(900));
} }
#[test]
fn enqueue_pending_upstream_video_packet_records_timing_before_scheduler_waits() {
let runtime = UpstreamMediaRuntime::new();
let mut pending = std::collections::VecDeque::from(vec![VideoPacket {
pts: 10,
client_capture_pts_us: 10,
client_send_pts_us: 10,
..Default::default()
}]);
runtime.record_client_timing(
UpstreamMediaKind::Microphone,
UpstreamClientTiming {
capture_pts_us: 20,
send_pts_us: 20,
queue_depth: 0,
queue_age_ms: 0,
},
);
super::enqueue_pending_upstream_video_packet(
&runtime,
&mut pending,
VideoPacket {
pts: 20,
client_capture_pts_us: 20,
client_send_pts_us: 20,
..Default::default()
},
1,
2,
3,
"test",
);
assert_eq!(pending.len(), 1);
assert_eq!(pending.front().map(|pkt| pkt.pts), Some(20));
let snapshot = runtime.snapshot();
assert_eq!(snapshot.client_timing_window_samples, 1);
assert!(
snapshot.server_receive_abs_skew_p95_ms.unwrap_or(f64::INFINITY) < 50.0,
"video timing should be recorded as the packet is enqueued, not after playout waits"
);
}
#[test] #[test]
fn upstream_cleanup_guard_closes_its_microphone_generation() { fn upstream_cleanup_guard_closes_its_microphone_generation() {
let runtime = Arc::new(UpstreamMediaRuntime::new()); let runtime = Arc::new(UpstreamMediaRuntime::new());

View File

@ -22,6 +22,31 @@ fn retain_freshest_video_packet(
dropped dropped
} }
#[cfg(not(coverage))]
fn enqueue_pending_upstream_video_packet(
runtime: &UpstreamMediaRuntime,
pending: &mut std::collections::VecDeque<VideoPacket>,
pkt: VideoPacket,
rpc_id: u64,
session_id: u64,
camera_session_id: u64,
context: &'static str,
) {
runtime.record_client_timing(UpstreamMediaKind::Camera, video_client_timing(&pkt));
pending.push_back(pkt);
let coalesced = retain_freshest_video_packet(pending);
if coalesced > 0 {
tracing::debug!(
rpc_id,
session_id,
camera_session_id,
dropped = coalesced,
context,
"🎥 coalesced stale upstream video backlog down to the freshest frame"
);
}
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
const AUDIO_PENDING_LIVE_WINDOW_PACKETS: usize = 8; const AUDIO_PENDING_LIVE_WINDOW_PACKETS: usize = 8;