diff --git a/AGENTS.md b/AGENTS.md index ddea69e..856eecb 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -510,4 +510,18 @@ only if packet-attached metadata cannot explain the next failure. - [x] Include those timing metrics in segmented mirrored-probe summaries. - [x] Add planner tests covering client capture skew, client send skew, server receive skew, and queue ages. - [ ] Use the next mirrored run to compare browser p95/drift against client capture/send skew and server receive skew. -- [ ] If client/server timing is stable while browser p95 still fails, instrument UVC/UAC sink emission timing next. +- [x] Instrument UVC/UAC/HDMI sink handoff timing before waiting for another run. + +## 0.17.26 Blind Timing Window And Sink Handoff Checklist + +Context: the next probe should not be required to discover that the server is +blind between "packet arrived" and "packet handed to UAC/UVC/HDMI". Close +measurement gaps before tuning any new healing controller. + +- [x] Retain rolling client capture/send skew windows inside the server. +- [x] Retain rolling server receive skew and client queue age windows. +- [x] Record audio/video sink handoff instants and schedule lateness at the server boundary. +- [x] Expose sink handoff skew, sink lateness, and rolling p95 timing metrics through `GetUpstreamSync`. +- [x] Include rolling blind metrics in mirrored-probe CSV/JSONL summaries and blind targets. +- [x] Add planner tests for rolling timing windows and sink handoff timing. +- [ ] Use the next mirrored run only for correlation/tuning: decide whether the controller should adjust playout delay, offset, or drop/freeze policy from these blind metrics. diff --git a/Cargo.lock b/Cargo.lock index 925182f..ca47ee3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.25" +version = "0.17.26" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.25" +version = "0.17.26" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.25" +version = "0.17.26" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 14f2415..b2e818f 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.25" +version = "0.17.26" edition = "2024" [dependencies] diff --git a/client/src/bin/lesavka-relayctl.rs b/client/src/bin/lesavka-relayctl.rs index 2a8d255..22b83f7 100644 --- a/client/src/bin/lesavka-relayctl.rs +++ b/client/src/bin/lesavka-relayctl.rs @@ -321,6 +321,83 @@ fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) { .map(|value| format!("{value:.1}")) .unwrap_or_else(|| "pending".to_string()) ); + println!( + "planner_client_capture_abs_skew_p95_ms={}", + state + .client_capture_abs_skew_p95_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_client_send_abs_skew_p95_ms={}", + state + .client_send_abs_skew_p95_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_server_receive_abs_skew_p95_ms={}", + state + .server_receive_abs_skew_p95_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_camera_client_queue_age_p95_ms={}", + state + .camera_client_queue_age_p95_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_microphone_client_queue_age_p95_ms={}", + state + .microphone_client_queue_age_p95_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_sink_handoff_skew_ms={}", + state + .sink_handoff_skew_ms + .map(|value| format!("{value:+.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_sink_handoff_abs_skew_p95_ms={}", + state + .sink_handoff_abs_skew_p95_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_camera_sink_late_ms={}", + state + .camera_sink_late_ms + .map(|value| format!("{value:+.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_microphone_sink_late_ms={}", + state + .microphone_sink_late_ms + .map(|value| format!("{value:+.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_camera_sink_late_p95_ms={}", + state + .camera_sink_late_p95_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_microphone_sink_late_p95_ms={}", + state + .microphone_sink_late_p95_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); println!("planner_detail={}", state.last_reason); } diff --git a/common/Cargo.toml b/common/Cargo.toml index e6f2bb3..ad7d76d 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.25" +version = "0.17.26" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 1c9a55b..348e727 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -125,6 +125,17 @@ message UpstreamSyncState { optional float microphone_client_queue_age_ms = 20; optional float camera_server_receive_age_ms = 21; optional float microphone_server_receive_age_ms = 22; + optional float client_capture_abs_skew_p95_ms = 23; + optional float client_send_abs_skew_p95_ms = 24; + optional float server_receive_abs_skew_p95_ms = 25; + optional float camera_client_queue_age_p95_ms = 26; + optional float microphone_client_queue_age_p95_ms = 27; + optional float sink_handoff_skew_ms = 28; + optional float sink_handoff_abs_skew_p95_ms = 29; + optional float camera_sink_late_ms = 30; + optional float microphone_sink_late_ms = 31; + optional float camera_sink_late_p95_ms = 32; + optional float microphone_sink_late_p95_ms = 33; } message HandshakeSet { diff --git a/scripts/manual/run_upstream_mirrored_av_sync.sh b/scripts/manual/run_upstream_mirrored_av_sync.sh index c26da5d..c7fd7da 100755 --- a/scripts/manual/run_upstream_mirrored_av_sync.sh +++ b/scripts/manual/run_upstream_mirrored_av_sync.sh @@ -861,6 +861,17 @@ for segment in range(1, segment_count + 1): "planner_microphone_client_queue_age_ms_after": as_float(planner_after.get("planner_microphone_client_queue_age_ms")), "planner_camera_server_receive_age_ms_after": as_float(planner_after.get("planner_camera_server_receive_age_ms")), "planner_microphone_server_receive_age_ms_after": as_float(planner_after.get("planner_microphone_server_receive_age_ms")), + "planner_client_capture_abs_skew_p95_ms_after": as_float(planner_after.get("planner_client_capture_abs_skew_p95_ms")), + "planner_client_send_abs_skew_p95_ms_after": as_float(planner_after.get("planner_client_send_abs_skew_p95_ms")), + "planner_server_receive_abs_skew_p95_ms_after": as_float(planner_after.get("planner_server_receive_abs_skew_p95_ms")), + "planner_camera_client_queue_age_p95_ms_after": as_float(planner_after.get("planner_camera_client_queue_age_p95_ms")), + "planner_microphone_client_queue_age_p95_ms_after": as_float(planner_after.get("planner_microphone_client_queue_age_p95_ms")), + "planner_sink_handoff_skew_ms_after": as_float(planner_after.get("planner_sink_handoff_skew_ms")), + "planner_sink_handoff_abs_skew_p95_ms_after": as_float(planner_after.get("planner_sink_handoff_abs_skew_p95_ms")), + "planner_camera_sink_late_ms_after": as_float(planner_after.get("planner_camera_sink_late_ms")), + "planner_microphone_sink_late_ms_after": as_float(planner_after.get("planner_microphone_sink_late_ms")), + "planner_camera_sink_late_p95_ms_after": as_float(planner_after.get("planner_camera_sink_late_p95_ms")), + "planner_microphone_sink_late_p95_ms_after": as_float(planner_after.get("planner_microphone_sink_late_p95_ms")), "active_audio_offset_us_before": as_float(calibration_before.get("calibration_active_audio_offset_us")), "active_audio_offset_us_after": as_float(calibration_after.get("calibration_active_audio_offset_us")), "active_video_offset_us_before": as_float(calibration_before.get("calibration_active_video_offset_us")), @@ -939,6 +950,17 @@ if target_source_rows: "planner_server_receive_skew_ms_after": range_for(target_source_rows, "planner_server_receive_skew_ms_after"), "planner_camera_client_queue_age_ms_after": range_for(target_source_rows, "planner_camera_client_queue_age_ms_after"), "planner_microphone_client_queue_age_ms_after": range_for(target_source_rows, "planner_microphone_client_queue_age_ms_after"), + "planner_client_capture_abs_skew_p95_ms_after": range_for(target_source_rows, "planner_client_capture_abs_skew_p95_ms_after"), + "planner_client_send_abs_skew_p95_ms_after": range_for(target_source_rows, "planner_client_send_abs_skew_p95_ms_after"), + "planner_server_receive_abs_skew_p95_ms_after": range_for(target_source_rows, "planner_server_receive_abs_skew_p95_ms_after"), + "planner_camera_client_queue_age_p95_ms_after": range_for(target_source_rows, "planner_camera_client_queue_age_p95_ms_after"), + "planner_microphone_client_queue_age_p95_ms_after": range_for(target_source_rows, "planner_microphone_client_queue_age_p95_ms_after"), + "planner_sink_handoff_skew_ms_after": range_for(target_source_rows, "planner_sink_handoff_skew_ms_after"), + "planner_sink_handoff_abs_skew_p95_ms_after": range_for(target_source_rows, "planner_sink_handoff_abs_skew_p95_ms_after"), + "planner_camera_sink_late_ms_after": range_for(target_source_rows, "planner_camera_sink_late_ms_after"), + "planner_microphone_sink_late_ms_after": range_for(target_source_rows, "planner_microphone_sink_late_ms_after"), + "planner_camera_sink_late_p95_ms_after": range_for(target_source_rows, "planner_camera_sink_late_p95_ms_after"), + "planner_microphone_sink_late_p95_ms_after": range_for(target_source_rows, "planner_microphone_sink_late_p95_ms_after"), "active_audio_offset_us_after": range_for(target_source_rows, "active_audio_offset_us_after"), "active_video_offset_us_after": range_for(target_source_rows, "active_video_offset_us_after"), "probe_p95_abs_skew_ms": range_for(target_source_rows, "probe_p95_abs_skew_ms"), diff --git a/server/Cargo.toml b/server/Cargo.toml index 0496bc7..34e4fc6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.25" +version = "0.17.26" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 4f135f6..b499e91 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -305,7 +305,7 @@ impl Relay for Handler { tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); } sink.push(&pkt); - upstream_media_rt.mark_audio_presented(pkt.pts); + upstream_media_rt.mark_audio_presented(pkt.pts, plan.due_at); } sink.finish(); // flush on EOS let _ = tx.send(Ok(Empty {})).await; @@ -519,7 +519,7 @@ impl Relay for Handler { startup_video_settled = true; let presented_pts = pkt.pts; relay.feed(pkt); // ← all logging inside video.rs - upstream_media_rt.mark_video_presented(presented_pts); + upstream_media_rt.mark_video_presented(presented_pts, plan.due_at); } tx.send(Ok(Empty {})).await.ok(); Ok::<(), Status>(()) diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 91e8af5..38215c4 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -175,7 +175,7 @@ impl Relay for Handler { } pkt.pts = plan.local_pts_us; sink.push(&pkt); - upstream_media_rt.mark_audio_presented(pkt.pts); + upstream_media_rt.mark_audio_presented(pkt.pts, plan.due_at); } sink.finish(); upstream_media_rt.close_microphone(lease.generation); @@ -264,7 +264,7 @@ impl Relay for Handler { pkt.pts = plan.local_pts_us; let presented_pts = pkt.pts; relay.feed(pkt); - upstream_media_rt.mark_video_presented(presented_pts); + upstream_media_rt.mark_video_presented(presented_pts, plan.due_at); } upstream_media_rt.close_camera(upstream_lease.generation); tx.send(Ok(Empty {})).await.ok(); diff --git a/server/src/main/rpc_helpers.rs b/server/src/main/rpc_helpers.rs index 20cdb9d..8c6af1e 100644 --- a/server/src/main/rpc_helpers.rs +++ b/server/src/main/rpc_helpers.rs @@ -198,6 +198,35 @@ impl Handler { microphone_server_receive_age_ms: snapshot .microphone_server_receive_age_ms .map(|value| value as f32), + client_capture_abs_skew_p95_ms: snapshot + .client_capture_abs_skew_p95_ms + .map(|value| value as f32), + client_send_abs_skew_p95_ms: snapshot + .client_send_abs_skew_p95_ms + .map(|value| value as f32), + server_receive_abs_skew_p95_ms: snapshot + .server_receive_abs_skew_p95_ms + .map(|value| value as f32), + camera_client_queue_age_p95_ms: snapshot + .camera_client_queue_age_p95_ms + .map(|value| value as f32), + microphone_client_queue_age_p95_ms: snapshot + .microphone_client_queue_age_p95_ms + .map(|value| value as f32), + sink_handoff_skew_ms: snapshot.sink_handoff_skew_ms.map(|value| value as f32), + sink_handoff_abs_skew_p95_ms: snapshot + .sink_handoff_abs_skew_p95_ms + .map(|value| value as f32), + camera_sink_late_ms: snapshot.camera_sink_late_ms.map(|value| value as f32), + microphone_sink_late_ms: snapshot + .microphone_sink_late_ms + .map(|value| value as f32), + camera_sink_late_p95_ms: snapshot + .camera_sink_late_p95_ms + .map(|value| value as f32), + microphone_sink_late_p95_ms: snapshot + .microphone_sink_late_p95_ms + .map(|value| value as f32), })) } } diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 5ba9834..e5e6cfc 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -110,11 +110,17 @@ impl UpstreamMediaRuntime { } /// Mark one audio chunk as actually handed to the UAC sink. - pub fn mark_audio_presented(&self, local_pts_us: u64) { + pub fn mark_audio_presented(&self, local_pts_us: u64, due_at: Instant) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); + record_presentation_sample( + &mut state, + UpstreamMediaKind::Microphone, + local_pts_us, + due_at, + ); state.last_audio_presented_pts_us = Some(local_pts_us); if state.phase != UpstreamSyncPhase::Failed { state.phase = UpstreamSyncPhase::Live; @@ -139,14 +145,16 @@ impl UpstreamMediaRuntime { UpstreamMediaKind::Camera => state.latest_camera_timing = Some(sample), UpstreamMediaKind::Microphone => state.latest_microphone_timing = Some(sample), } + record_client_timing_windows(&mut state); } /// Mark one video frame as actually handed to the UVC/HDMI sink. - pub fn mark_video_presented(&self, local_pts_us: u64) { + pub fn mark_video_presented(&self, local_pts_us: u64, due_at: Instant) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); + record_presentation_sample(&mut state, UpstreamMediaKind::Camera, local_pts_us, due_at); state.last_video_presented_pts_us = Some(local_pts_us); if state.phase != UpstreamSyncPhase::Failed { state.phase = UpstreamSyncPhase::Live; @@ -235,6 +243,19 @@ impl UpstreamMediaRuntime { .as_secs_f64() * 1000.0 }), + client_capture_abs_skew_p95_ms: state.client_capture_skew_window_ms.p95_abs(), + client_send_abs_skew_p95_ms: state.client_send_skew_window_ms.p95_abs(), + server_receive_abs_skew_p95_ms: state.server_receive_skew_window_ms.p95_abs(), + camera_client_queue_age_p95_ms: state.camera_client_queue_age_window_ms.p95(), + microphone_client_queue_age_p95_ms: state.microphone_client_queue_age_window_ms.p95(), + sink_handoff_skew_ms: latest_sink_handoff_skew_ms(&state), + sink_handoff_abs_skew_p95_ms: state.sink_handoff_skew_window_ms.p95_abs(), + camera_sink_late_ms: state.latest_camera_presentation.map(presentation_late_ms), + microphone_sink_late_ms: state + .latest_microphone_presentation + .map(presentation_late_ms), + camera_sink_late_p95_ms: state.camera_sink_late_window_ms.p95(), + microphone_sink_late_p95_ms: state.microphone_sink_late_window_ms.p95(), } } } @@ -713,6 +734,75 @@ fn instant_delta_us(left: Instant, right: Instant) -> i128 { } } +fn record_client_timing_windows(state: &mut UpstreamClockState) { + let (Some(camera), Some(microphone)) = + (state.latest_camera_timing, state.latest_microphone_timing) + else { + return; + }; + state + .client_capture_skew_window_ms + .push((camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0); + state + .client_send_skew_window_ms + .push((camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0); + state + .server_receive_skew_window_ms + .push(instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0); + state + .camera_client_queue_age_window_ms + .push(f64::from(camera.queue_age_ms)); + state + .microphone_client_queue_age_window_ms + .push(f64::from(microphone.queue_age_ms)); +} + +fn record_presentation_sample( + state: &mut UpstreamClockState, + kind: UpstreamMediaKind, + local_pts_us: u64, + due_at: Instant, +) { + let sample = state::UpstreamPresentationSample { + local_pts_us, + due_at, + handed_at: Instant::now(), + }; + let late_ms = presentation_late_ms(sample).max(0.0); + match kind { + UpstreamMediaKind::Camera => { + state.latest_camera_presentation = Some(sample); + state.camera_sink_late_window_ms.push(late_ms); + } + UpstreamMediaKind::Microphone => { + state.latest_microphone_presentation = Some(sample); + state.microphone_sink_late_window_ms.push(late_ms); + } + } + if let Some(skew_ms) = latest_sink_handoff_skew_ms(state) { + state.sink_handoff_skew_window_ms.push(skew_ms); + } +} + +fn latest_sink_handoff_skew_ms(state: &UpstreamClockState) -> Option { + let (Some(camera), Some(microphone)) = ( + state.latest_camera_presentation, + state.latest_microphone_presentation, + ) else { + return None; + }; + let local_pts_delta_ms = + (camera.local_pts_us as i128 - microphone.local_pts_us as i128).abs() as f64 / 1000.0; + if local_pts_delta_ms > 250.0 { + return None; + } + Some(instant_delta_us(camera.handed_at, microphone.handed_at) as f64 / 1000.0) +} + +fn presentation_late_ms(sample: state::UpstreamPresentationSample) -> f64 { + instant_delta_us(sample.handed_at, sample.due_at) as f64 / 1000.0 +} + fn refresh_unpaired_pairing_anchor( state: &mut UpstreamClockState, kind: UpstreamMediaKind, diff --git a/server/src/upstream_media_runtime/lease_lifecycle.rs b/server/src/upstream_media_runtime/lease_lifecycle.rs index 0769280..ef5d25b 100644 --- a/server/src/upstream_media_runtime/lease_lifecycle.rs +++ b/server/src/upstream_media_runtime/lease_lifecycle.rs @@ -168,6 +168,16 @@ fn reset_timing_anchors(state: &mut UpstreamClockState) { state.video_freezes = 0; state.latest_camera_timing = None; state.latest_microphone_timing = None; + state.latest_camera_presentation = None; + state.latest_microphone_presentation = None; + state.client_capture_skew_window_ms = Default::default(); + state.client_send_skew_window_ms = Default::default(); + state.server_receive_skew_window_ms = Default::default(); + state.camera_client_queue_age_window_ms = Default::default(); + state.microphone_client_queue_age_window_ms = Default::default(); + state.sink_handoff_skew_window_ms = Default::default(); + state.camera_sink_late_window_ms = Default::default(); + state.microphone_sink_late_window_ms = Default::default(); state.phase = UpstreamSyncPhase::Acquiring; state.last_reason = "timing anchors reset".to_string(); } diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs index 7f324e5..67dae52 100644 --- a/server/src/upstream_media_runtime/state.rs +++ b/server/src/upstream_media_runtime/state.rs @@ -1,5 +1,8 @@ +use std::collections::VecDeque; use tokio::time::Instant; +const TIMING_WINDOW_CAPACITY: usize = 240; + #[derive(Clone, Copy, Debug)] pub(super) struct UpstreamTimingSample { pub capture_pts_us: u64, @@ -8,6 +11,45 @@ pub(super) struct UpstreamTimingSample { pub received_at: Instant, } +#[derive(Clone, Copy, Debug)] +pub(super) struct UpstreamPresentationSample { + pub local_pts_us: u64, + pub due_at: Instant, + pub handed_at: Instant, +} + +#[derive(Debug, Default)] +pub(super) struct UpstreamScalarWindow { + values: VecDeque, +} + +impl UpstreamScalarWindow { + pub fn push(&mut self, value: f64) { + if self.values.len() >= TIMING_WINDOW_CAPACITY { + self.values.pop_front(); + } + self.values.push_back(value); + } + + pub fn p95_abs(&self) -> Option { + percentile(self.values.iter().map(|value| value.abs()), 0.95) + } + + pub fn p95(&self) -> Option { + percentile(self.values.iter().copied(), 0.95) + } +} + +fn percentile(values: impl Iterator, quantile: f64) -> Option { + let mut sorted = values.filter(|value| value.is_finite()).collect::>(); + if sorted.is_empty() { + return None; + } + sorted.sort_by(|left, right| left.total_cmp(right)); + let index = ((sorted.len() - 1) as f64 * quantile.clamp(0.0, 1.0)).ceil() as usize; + sorted.get(index).copied() +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamSyncPhase { Acquiring, @@ -60,6 +102,16 @@ pub(super) struct UpstreamClockState { pub last_reason: String, pub latest_camera_timing: Option, pub latest_microphone_timing: Option, + pub latest_camera_presentation: Option, + pub latest_microphone_presentation: Option, + pub client_capture_skew_window_ms: UpstreamScalarWindow, + pub client_send_skew_window_ms: UpstreamScalarWindow, + pub server_receive_skew_window_ms: UpstreamScalarWindow, + pub camera_client_queue_age_window_ms: UpstreamScalarWindow, + pub microphone_client_queue_age_window_ms: UpstreamScalarWindow, + pub sink_handoff_skew_window_ms: UpstreamScalarWindow, + pub camera_sink_late_window_ms: UpstreamScalarWindow, + pub microphone_sink_late_window_ms: UpstreamScalarWindow, } impl Default for UpstreamSyncPhase { diff --git a/server/src/upstream_media_runtime/tests/async_wait.rs b/server/src/upstream_media_runtime/tests/async_wait.rs index 79dd43e..35a0b10 100644 --- a/server/src/upstream_media_runtime/tests/async_wait.rs +++ b/server/src/upstream_media_runtime/tests/async_wait.rs @@ -15,7 +15,7 @@ async fn wait_for_audio_master_releases_video_once_audio_catches_up() { super::UpstreamPlanDecision::AwaitingPair )); let audio_first = play(runtime.plan_audio_pts(1_000_000)); - runtime.mark_audio_presented(audio_first.local_pts_us); + runtime.mark_audio_presented(audio_first.local_pts_us, audio_first.due_at); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let waiter = tokio::spawn({ @@ -29,7 +29,7 @@ async fn wait_for_audio_master_releases_video_once_audio_catches_up() { tokio::time::sleep(Duration::from_millis(5)).await; let audio_next = play(runtime.plan_audio_pts(1_010_000)); - runtime.mark_audio_presented(audio_next.local_pts_us); + runtime.mark_audio_presented(audio_next.local_pts_us, audio_next.due_at); assert!(waiter.await.expect("audio master waiter should finish")); } @@ -47,7 +47,7 @@ async fn wait_for_audio_master_allows_configured_positive_audio_delay() { super::UpstreamPlanDecision::AwaitingPair )); let audio_first = play(runtime.plan_audio_pts(1_000_000)); - runtime.mark_audio_presented(audio_first.local_pts_us); + runtime.mark_audio_presented(audio_first.local_pts_us, audio_first.due_at); let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let delayed_video = play(runtime.plan_video_pts(1_700_000, 16_666)); @@ -75,7 +75,7 @@ async fn wait_for_audio_master_times_out_when_audio_never_catches_up() { super::UpstreamPlanDecision::AwaitingPair )); let audio_first = play(runtime.plan_audio_pts(1_000_000)); - runtime.mark_audio_presented(audio_first.local_pts_us); + runtime.mark_audio_presented(audio_first.local_pts_us, audio_first.due_at); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let due_at = tokio::time::Instant::now() + Duration::from_millis(20); @@ -98,7 +98,7 @@ async fn wait_for_audio_master_keeps_video_waiting_through_sync_grace() { super::UpstreamPlanDecision::AwaitingPair )); let audio_first = play(runtime.plan_audio_pts(1_000_000)); - runtime.mark_audio_presented(audio_first.local_pts_us); + runtime.mark_audio_presented(audio_first.local_pts_us, audio_first.due_at); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let waiter = tokio::spawn({ @@ -117,7 +117,7 @@ async fn wait_for_audio_master_keeps_video_waiting_through_sync_grace() { tokio::time::sleep(Duration::from_millis(5)).await; let audio_next = play(runtime.plan_audio_pts(1_120_000)); - runtime.mark_audio_presented(audio_next.local_pts_us); + runtime.mark_audio_presented(audio_next.local_pts_us, audio_next.due_at); assert!( waiter diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/server/src/upstream_media_runtime/tests/planning.rs index 6655398..4b41997 100644 --- a/server/src/upstream_media_runtime/tests/planning.rs +++ b/server/src/upstream_media_runtime/tests/planning.rs @@ -345,7 +345,7 @@ fn video_too_far_behind_audio_master_is_dropped_and_counted_as_freeze() { )); let audio = play(runtime.plan_audio_pts(1_000_000)); let _video = play(runtime.plan_video_pts(1_000_000, 16_666)); - runtime.mark_audio_presented(audio.local_pts_us); + runtime.mark_audio_presented(audio.local_pts_us, audio.due_at); let audio_master = play(runtime.plan_audio_pts(1_200_000)); assert!( @@ -355,7 +355,7 @@ fn video_too_far_behind_audio_master_is_dropped_and_counted_as_freeze() { ), "future planned audio alone must not freeze video before UAC presentation" ); - runtime.mark_audio_presented(audio_master.local_pts_us); + runtime.mark_audio_presented(audio_master.local_pts_us, audio_master.due_at); assert!(matches!( runtime.plan_video_pts(1_116_666, 16_666), @@ -465,8 +465,8 @@ fn planner_snapshot_tracks_presented_playheads_and_skew() { )); let audio = play(runtime.plan_audio_pts(1_000_000)); let video = play(runtime.plan_video_pts(1_000_000, 16_666)); - runtime.mark_audio_presented(audio.local_pts_us); - runtime.mark_video_presented(video.local_pts_us); + runtime.mark_audio_presented(audio.local_pts_us, audio.due_at); + runtime.mark_video_presented(video.local_pts_us, video.due_at); let snapshot = runtime.snapshot(); assert_eq!(snapshot.phase, "live"); @@ -475,6 +475,44 @@ fn planner_snapshot_tracks_presented_playheads_and_skew() { assert_eq!(snapshot.planner_skew_ms, Some(0.0)); } +#[test] +#[serial(upstream_media_runtime)] +fn planner_snapshot_tracks_sink_handoff_timing_windows() { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + let due_at = tokio::time::Instant::now() + .checked_sub(Duration::from_millis(5)) + .unwrap_or_else(tokio::time::Instant::now); + + runtime.mark_audio_presented(123_000, due_at); + std::thread::sleep(Duration::from_millis(1)); + runtime.mark_video_presented(123_000, due_at); + + let snapshot = runtime.snapshot(); + + assert!( + snapshot.sink_handoff_skew_ms.is_some_and(|skew| skew > 0.0), + "video was handed to its sink after audio" + ); + assert!( + snapshot + .sink_handoff_abs_skew_p95_ms + .is_some_and(|skew| skew > 0.0), + "the rolling handoff window should include the audio/video handoff gap" + ); + assert!( + snapshot.camera_sink_late_ms.is_some_and(|late| late > 0.0), + "handoff after due_at should be reported as positive lateness" + ); + assert!( + snapshot + .microphone_sink_late_p95_ms + .is_some_and(|late| late > 0.0), + "audio sink lateness should be retained in the rolling window" + ); +} + #[test] #[serial(upstream_media_runtime)] fn planner_snapshot_tracks_client_timing_sidecar_metrics() { @@ -504,14 +542,24 @@ fn planner_snapshot_tracks_client_timing_sidecar_metrics() { assert_eq!(snapshot.client_capture_skew_ms, Some(60.0)); assert_eq!(snapshot.client_send_skew_ms, Some(50.0)); + assert_eq!(snapshot.client_capture_abs_skew_p95_ms, Some(60.0)); + assert_eq!(snapshot.client_send_abs_skew_p95_ms, Some(50.0)); assert_eq!(snapshot.camera_client_queue_age_ms, Some(20.0)); assert_eq!(snapshot.microphone_client_queue_age_ms, Some(30.0)); + assert_eq!(snapshot.camera_client_queue_age_p95_ms, Some(20.0)); + assert_eq!(snapshot.microphone_client_queue_age_p95_ms, Some(30.0)); assert!( snapshot .server_receive_skew_ms .is_some_and(|skew| skew < 0.0), "camera was received before microphone, so camera-minus-mic receive skew should be negative" ); + assert!( + snapshot + .server_receive_abs_skew_p95_ms + .is_some_and(|skew| skew > 0.0), + "server receive jitter should be retained as an absolute p95" + ); assert!( snapshot .camera_server_receive_age_ms diff --git a/server/src/upstream_media_runtime/types.rs b/server/src/upstream_media_runtime/types.rs index e5a8b97..356faa3 100644 --- a/server/src/upstream_media_runtime/types.rs +++ b/server/src/upstream_media_runtime/types.rs @@ -89,4 +89,15 @@ pub struct UpstreamPlannerSnapshot { pub microphone_client_queue_age_ms: Option, pub camera_server_receive_age_ms: Option, pub microphone_server_receive_age_ms: Option, + pub client_capture_abs_skew_p95_ms: Option, + pub client_send_abs_skew_p95_ms: Option, + pub server_receive_abs_skew_p95_ms: Option, + pub camera_client_queue_age_p95_ms: Option, + pub microphone_client_queue_age_p95_ms: Option, + pub sink_handoff_skew_ms: Option, + pub sink_handoff_abs_skew_p95_ms: Option, + pub camera_sink_late_ms: Option, + pub microphone_sink_late_ms: Option, + pub camera_sink_late_p95_ms: Option, + pub microphone_sink_late_p95_ms: Option, }