feat: expose upstream blind timing windows

This commit is contained in:
Brad Stein 2026-05-02 17:36:41 -03:00
parent c741e8df17
commit e1fb31235f
17 changed files with 387 additions and 23 deletions

View File

@ -510,4 +510,18 @@ only if packet-attached metadata cannot explain the next failure.
- [x] Include those timing metrics in segmented mirrored-probe summaries. - [x] Include those timing metrics in segmented mirrored-probe summaries.
- [x] Add planner tests covering client capture skew, client send skew, server receive skew, and queue ages. - [x] Add planner tests covering client capture skew, client send skew, server receive skew, and queue ages.
- [ ] Use the next mirrored run to compare browser p95/drift against client capture/send skew and server receive skew. - [ ] Use the next mirrored run to compare browser p95/drift against client capture/send skew and server receive skew.
- [ ] If client/server timing is stable while browser p95 still fails, instrument UVC/UAC sink emission timing next. - [x] Instrument UVC/UAC/HDMI sink handoff timing before waiting for another run.
## 0.17.26 Blind Timing Window And Sink Handoff Checklist
Context: the next probe should not be required to discover that the server is
blind between "packet arrived" and "packet handed to UAC/UVC/HDMI". Close
measurement gaps before tuning any new healing controller.
- [x] Retain rolling client capture/send skew windows inside the server.
- [x] Retain rolling server receive skew and client queue age windows.
- [x] Record audio/video sink handoff instants and schedule lateness at the server boundary.
- [x] Expose sink handoff skew, sink lateness, and rolling p95 timing metrics through `GetUpstreamSync`.
- [x] Include rolling blind metrics in mirrored-probe CSV/JSONL summaries and blind targets.
- [x] Add planner tests for rolling timing windows and sink handoff timing.
- [ ] Use the next mirrored run only for correlation/tuning: decide whether the controller should adjust playout delay, offset, or drop/freeze policy from these blind metrics.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.17.25" version = "0.17.26"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.17.25" version = "0.17.26"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.17.25" version = "0.17.26"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.17.25" version = "0.17.26"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -321,6 +321,83 @@ fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) {
.map(|value| format!("{value:.1}")) .map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string()) .unwrap_or_else(|| "pending".to_string())
); );
println!(
"planner_client_capture_abs_skew_p95_ms={}",
state
.client_capture_abs_skew_p95_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_client_send_abs_skew_p95_ms={}",
state
.client_send_abs_skew_p95_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_server_receive_abs_skew_p95_ms={}",
state
.server_receive_abs_skew_p95_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_camera_client_queue_age_p95_ms={}",
state
.camera_client_queue_age_p95_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_microphone_client_queue_age_p95_ms={}",
state
.microphone_client_queue_age_p95_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_sink_handoff_skew_ms={}",
state
.sink_handoff_skew_ms
.map(|value| format!("{value:+.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_sink_handoff_abs_skew_p95_ms={}",
state
.sink_handoff_abs_skew_p95_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_camera_sink_late_ms={}",
state
.camera_sink_late_ms
.map(|value| format!("{value:+.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_microphone_sink_late_ms={}",
state
.microphone_sink_late_ms
.map(|value| format!("{value:+.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_camera_sink_late_p95_ms={}",
state
.camera_sink_late_p95_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_microphone_sink_late_p95_ms={}",
state
.microphone_sink_late_p95_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!("planner_detail={}", state.last_reason); println!("planner_detail={}", state.last_reason);
} }

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.17.25" version = "0.17.26"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -125,6 +125,17 @@ message UpstreamSyncState {
optional float microphone_client_queue_age_ms = 20; optional float microphone_client_queue_age_ms = 20;
optional float camera_server_receive_age_ms = 21; optional float camera_server_receive_age_ms = 21;
optional float microphone_server_receive_age_ms = 22; optional float microphone_server_receive_age_ms = 22;
optional float client_capture_abs_skew_p95_ms = 23;
optional float client_send_abs_skew_p95_ms = 24;
optional float server_receive_abs_skew_p95_ms = 25;
optional float camera_client_queue_age_p95_ms = 26;
optional float microphone_client_queue_age_p95_ms = 27;
optional float sink_handoff_skew_ms = 28;
optional float sink_handoff_abs_skew_p95_ms = 29;
optional float camera_sink_late_ms = 30;
optional float microphone_sink_late_ms = 31;
optional float camera_sink_late_p95_ms = 32;
optional float microphone_sink_late_p95_ms = 33;
} }
message HandshakeSet { message HandshakeSet {

View File

@ -861,6 +861,17 @@ for segment in range(1, segment_count + 1):
"planner_microphone_client_queue_age_ms_after": as_float(planner_after.get("planner_microphone_client_queue_age_ms")), "planner_microphone_client_queue_age_ms_after": as_float(planner_after.get("planner_microphone_client_queue_age_ms")),
"planner_camera_server_receive_age_ms_after": as_float(planner_after.get("planner_camera_server_receive_age_ms")), "planner_camera_server_receive_age_ms_after": as_float(planner_after.get("planner_camera_server_receive_age_ms")),
"planner_microphone_server_receive_age_ms_after": as_float(planner_after.get("planner_microphone_server_receive_age_ms")), "planner_microphone_server_receive_age_ms_after": as_float(planner_after.get("planner_microphone_server_receive_age_ms")),
"planner_client_capture_abs_skew_p95_ms_after": as_float(planner_after.get("planner_client_capture_abs_skew_p95_ms")),
"planner_client_send_abs_skew_p95_ms_after": as_float(planner_after.get("planner_client_send_abs_skew_p95_ms")),
"planner_server_receive_abs_skew_p95_ms_after": as_float(planner_after.get("planner_server_receive_abs_skew_p95_ms")),
"planner_camera_client_queue_age_p95_ms_after": as_float(planner_after.get("planner_camera_client_queue_age_p95_ms")),
"planner_microphone_client_queue_age_p95_ms_after": as_float(planner_after.get("planner_microphone_client_queue_age_p95_ms")),
"planner_sink_handoff_skew_ms_after": as_float(planner_after.get("planner_sink_handoff_skew_ms")),
"planner_sink_handoff_abs_skew_p95_ms_after": as_float(planner_after.get("planner_sink_handoff_abs_skew_p95_ms")),
"planner_camera_sink_late_ms_after": as_float(planner_after.get("planner_camera_sink_late_ms")),
"planner_microphone_sink_late_ms_after": as_float(planner_after.get("planner_microphone_sink_late_ms")),
"planner_camera_sink_late_p95_ms_after": as_float(planner_after.get("planner_camera_sink_late_p95_ms")),
"planner_microphone_sink_late_p95_ms_after": as_float(planner_after.get("planner_microphone_sink_late_p95_ms")),
"active_audio_offset_us_before": as_float(calibration_before.get("calibration_active_audio_offset_us")), "active_audio_offset_us_before": as_float(calibration_before.get("calibration_active_audio_offset_us")),
"active_audio_offset_us_after": as_float(calibration_after.get("calibration_active_audio_offset_us")), "active_audio_offset_us_after": as_float(calibration_after.get("calibration_active_audio_offset_us")),
"active_video_offset_us_before": as_float(calibration_before.get("calibration_active_video_offset_us")), "active_video_offset_us_before": as_float(calibration_before.get("calibration_active_video_offset_us")),
@ -939,6 +950,17 @@ if target_source_rows:
"planner_server_receive_skew_ms_after": range_for(target_source_rows, "planner_server_receive_skew_ms_after"), "planner_server_receive_skew_ms_after": range_for(target_source_rows, "planner_server_receive_skew_ms_after"),
"planner_camera_client_queue_age_ms_after": range_for(target_source_rows, "planner_camera_client_queue_age_ms_after"), "planner_camera_client_queue_age_ms_after": range_for(target_source_rows, "planner_camera_client_queue_age_ms_after"),
"planner_microphone_client_queue_age_ms_after": range_for(target_source_rows, "planner_microphone_client_queue_age_ms_after"), "planner_microphone_client_queue_age_ms_after": range_for(target_source_rows, "planner_microphone_client_queue_age_ms_after"),
"planner_client_capture_abs_skew_p95_ms_after": range_for(target_source_rows, "planner_client_capture_abs_skew_p95_ms_after"),
"planner_client_send_abs_skew_p95_ms_after": range_for(target_source_rows, "planner_client_send_abs_skew_p95_ms_after"),
"planner_server_receive_abs_skew_p95_ms_after": range_for(target_source_rows, "planner_server_receive_abs_skew_p95_ms_after"),
"planner_camera_client_queue_age_p95_ms_after": range_for(target_source_rows, "planner_camera_client_queue_age_p95_ms_after"),
"planner_microphone_client_queue_age_p95_ms_after": range_for(target_source_rows, "planner_microphone_client_queue_age_p95_ms_after"),
"planner_sink_handoff_skew_ms_after": range_for(target_source_rows, "planner_sink_handoff_skew_ms_after"),
"planner_sink_handoff_abs_skew_p95_ms_after": range_for(target_source_rows, "planner_sink_handoff_abs_skew_p95_ms_after"),
"planner_camera_sink_late_ms_after": range_for(target_source_rows, "planner_camera_sink_late_ms_after"),
"planner_microphone_sink_late_ms_after": range_for(target_source_rows, "planner_microphone_sink_late_ms_after"),
"planner_camera_sink_late_p95_ms_after": range_for(target_source_rows, "planner_camera_sink_late_p95_ms_after"),
"planner_microphone_sink_late_p95_ms_after": range_for(target_source_rows, "planner_microphone_sink_late_p95_ms_after"),
"active_audio_offset_us_after": range_for(target_source_rows, "active_audio_offset_us_after"), "active_audio_offset_us_after": range_for(target_source_rows, "active_audio_offset_us_after"),
"active_video_offset_us_after": range_for(target_source_rows, "active_video_offset_us_after"), "active_video_offset_us_after": range_for(target_source_rows, "active_video_offset_us_after"),
"probe_p95_abs_skew_ms": range_for(target_source_rows, "probe_p95_abs_skew_ms"), "probe_p95_abs_skew_ms": range_for(target_source_rows, "probe_p95_abs_skew_ms"),

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.17.25" version = "0.17.26"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -305,7 +305,7 @@ impl Relay for Handler {
tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len());
} }
sink.push(&pkt); sink.push(&pkt);
upstream_media_rt.mark_audio_presented(pkt.pts); upstream_media_rt.mark_audio_presented(pkt.pts, plan.due_at);
} }
sink.finish(); // flush on EOS sink.finish(); // flush on EOS
let _ = tx.send(Ok(Empty {})).await; let _ = tx.send(Ok(Empty {})).await;
@ -519,7 +519,7 @@ impl Relay for Handler {
startup_video_settled = true; startup_video_settled = true;
let presented_pts = pkt.pts; let presented_pts = pkt.pts;
relay.feed(pkt); // ← all logging inside video.rs relay.feed(pkt); // ← all logging inside video.rs
upstream_media_rt.mark_video_presented(presented_pts); upstream_media_rt.mark_video_presented(presented_pts, plan.due_at);
} }
tx.send(Ok(Empty {})).await.ok(); tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(()) Ok::<(), Status>(())

View File

@ -175,7 +175,7 @@ impl Relay for Handler {
} }
pkt.pts = plan.local_pts_us; pkt.pts = plan.local_pts_us;
sink.push(&pkt); sink.push(&pkt);
upstream_media_rt.mark_audio_presented(pkt.pts); upstream_media_rt.mark_audio_presented(pkt.pts, plan.due_at);
} }
sink.finish(); sink.finish();
upstream_media_rt.close_microphone(lease.generation); upstream_media_rt.close_microphone(lease.generation);
@ -264,7 +264,7 @@ impl Relay for Handler {
pkt.pts = plan.local_pts_us; pkt.pts = plan.local_pts_us;
let presented_pts = pkt.pts; let presented_pts = pkt.pts;
relay.feed(pkt); relay.feed(pkt);
upstream_media_rt.mark_video_presented(presented_pts); upstream_media_rt.mark_video_presented(presented_pts, plan.due_at);
} }
upstream_media_rt.close_camera(upstream_lease.generation); upstream_media_rt.close_camera(upstream_lease.generation);
tx.send(Ok(Empty {})).await.ok(); tx.send(Ok(Empty {})).await.ok();

View File

@ -198,6 +198,35 @@ impl Handler {
microphone_server_receive_age_ms: snapshot microphone_server_receive_age_ms: snapshot
.microphone_server_receive_age_ms .microphone_server_receive_age_ms
.map(|value| value as f32), .map(|value| value as f32),
client_capture_abs_skew_p95_ms: snapshot
.client_capture_abs_skew_p95_ms
.map(|value| value as f32),
client_send_abs_skew_p95_ms: snapshot
.client_send_abs_skew_p95_ms
.map(|value| value as f32),
server_receive_abs_skew_p95_ms: snapshot
.server_receive_abs_skew_p95_ms
.map(|value| value as f32),
camera_client_queue_age_p95_ms: snapshot
.camera_client_queue_age_p95_ms
.map(|value| value as f32),
microphone_client_queue_age_p95_ms: snapshot
.microphone_client_queue_age_p95_ms
.map(|value| value as f32),
sink_handoff_skew_ms: snapshot.sink_handoff_skew_ms.map(|value| value as f32),
sink_handoff_abs_skew_p95_ms: snapshot
.sink_handoff_abs_skew_p95_ms
.map(|value| value as f32),
camera_sink_late_ms: snapshot.camera_sink_late_ms.map(|value| value as f32),
microphone_sink_late_ms: snapshot
.microphone_sink_late_ms
.map(|value| value as f32),
camera_sink_late_p95_ms: snapshot
.camera_sink_late_p95_ms
.map(|value| value as f32),
microphone_sink_late_p95_ms: snapshot
.microphone_sink_late_p95_ms
.map(|value| value as f32),
})) }))
} }
} }

View File

@ -110,11 +110,17 @@ impl UpstreamMediaRuntime {
} }
/// Mark one audio chunk as actually handed to the UAC sink. /// Mark one audio chunk as actually handed to the UAC sink.
pub fn mark_audio_presented(&self, local_pts_us: u64) { pub fn mark_audio_presented(&self, local_pts_us: u64, due_at: Instant) {
let mut state = self let mut state = self
.state .state
.lock() .lock()
.expect("upstream media state mutex poisoned"); .expect("upstream media state mutex poisoned");
record_presentation_sample(
&mut state,
UpstreamMediaKind::Microphone,
local_pts_us,
due_at,
);
state.last_audio_presented_pts_us = Some(local_pts_us); state.last_audio_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed { if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live; state.phase = UpstreamSyncPhase::Live;
@ -139,14 +145,16 @@ impl UpstreamMediaRuntime {
UpstreamMediaKind::Camera => state.latest_camera_timing = Some(sample), UpstreamMediaKind::Camera => state.latest_camera_timing = Some(sample),
UpstreamMediaKind::Microphone => state.latest_microphone_timing = Some(sample), UpstreamMediaKind::Microphone => state.latest_microphone_timing = Some(sample),
} }
record_client_timing_windows(&mut state);
} }
/// Mark one video frame as actually handed to the UVC/HDMI sink. /// Mark one video frame as actually handed to the UVC/HDMI sink.
pub fn mark_video_presented(&self, local_pts_us: u64) { pub fn mark_video_presented(&self, local_pts_us: u64, due_at: Instant) {
let mut state = self let mut state = self
.state .state
.lock() .lock()
.expect("upstream media state mutex poisoned"); .expect("upstream media state mutex poisoned");
record_presentation_sample(&mut state, UpstreamMediaKind::Camera, local_pts_us, due_at);
state.last_video_presented_pts_us = Some(local_pts_us); state.last_video_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed { if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live; state.phase = UpstreamSyncPhase::Live;
@ -235,6 +243,19 @@ impl UpstreamMediaRuntime {
.as_secs_f64() .as_secs_f64()
* 1000.0 * 1000.0
}), }),
client_capture_abs_skew_p95_ms: state.client_capture_skew_window_ms.p95_abs(),
client_send_abs_skew_p95_ms: state.client_send_skew_window_ms.p95_abs(),
server_receive_abs_skew_p95_ms: state.server_receive_skew_window_ms.p95_abs(),
camera_client_queue_age_p95_ms: state.camera_client_queue_age_window_ms.p95(),
microphone_client_queue_age_p95_ms: state.microphone_client_queue_age_window_ms.p95(),
sink_handoff_skew_ms: latest_sink_handoff_skew_ms(&state),
sink_handoff_abs_skew_p95_ms: state.sink_handoff_skew_window_ms.p95_abs(),
camera_sink_late_ms: state.latest_camera_presentation.map(presentation_late_ms),
microphone_sink_late_ms: state
.latest_microphone_presentation
.map(presentation_late_ms),
camera_sink_late_p95_ms: state.camera_sink_late_window_ms.p95(),
microphone_sink_late_p95_ms: state.microphone_sink_late_window_ms.p95(),
} }
} }
} }
@ -713,6 +734,75 @@ fn instant_delta_us(left: Instant, right: Instant) -> i128 {
} }
} }
fn record_client_timing_windows(state: &mut UpstreamClockState) {
let (Some(camera), Some(microphone)) =
(state.latest_camera_timing, state.latest_microphone_timing)
else {
return;
};
state
.client_capture_skew_window_ms
.push((camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0);
state
.client_send_skew_window_ms
.push((camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0);
state
.server_receive_skew_window_ms
.push(instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0);
state
.camera_client_queue_age_window_ms
.push(f64::from(camera.queue_age_ms));
state
.microphone_client_queue_age_window_ms
.push(f64::from(microphone.queue_age_ms));
}
fn record_presentation_sample(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
local_pts_us: u64,
due_at: Instant,
) {
let sample = state::UpstreamPresentationSample {
local_pts_us,
due_at,
handed_at: Instant::now(),
};
let late_ms = presentation_late_ms(sample).max(0.0);
match kind {
UpstreamMediaKind::Camera => {
state.latest_camera_presentation = Some(sample);
state.camera_sink_late_window_ms.push(late_ms);
}
UpstreamMediaKind::Microphone => {
state.latest_microphone_presentation = Some(sample);
state.microphone_sink_late_window_ms.push(late_ms);
}
}
if let Some(skew_ms) = latest_sink_handoff_skew_ms(state) {
state.sink_handoff_skew_window_ms.push(skew_ms);
}
}
fn latest_sink_handoff_skew_ms(state: &UpstreamClockState) -> Option<f64> {
let (Some(camera), Some(microphone)) = (
state.latest_camera_presentation,
state.latest_microphone_presentation,
) else {
return None;
};
let local_pts_delta_ms =
(camera.local_pts_us as i128 - microphone.local_pts_us as i128).abs() as f64 / 1000.0;
if local_pts_delta_ms > 250.0 {
return None;
}
Some(instant_delta_us(camera.handed_at, microphone.handed_at) as f64 / 1000.0)
}
fn presentation_late_ms(sample: state::UpstreamPresentationSample) -> f64 {
instant_delta_us(sample.handed_at, sample.due_at) as f64 / 1000.0
}
fn refresh_unpaired_pairing_anchor( fn refresh_unpaired_pairing_anchor(
state: &mut UpstreamClockState, state: &mut UpstreamClockState,
kind: UpstreamMediaKind, kind: UpstreamMediaKind,

View File

@ -168,6 +168,16 @@ fn reset_timing_anchors(state: &mut UpstreamClockState) {
state.video_freezes = 0; state.video_freezes = 0;
state.latest_camera_timing = None; state.latest_camera_timing = None;
state.latest_microphone_timing = None; state.latest_microphone_timing = None;
state.latest_camera_presentation = None;
state.latest_microphone_presentation = None;
state.client_capture_skew_window_ms = Default::default();
state.client_send_skew_window_ms = Default::default();
state.server_receive_skew_window_ms = Default::default();
state.camera_client_queue_age_window_ms = Default::default();
state.microphone_client_queue_age_window_ms = Default::default();
state.sink_handoff_skew_window_ms = Default::default();
state.camera_sink_late_window_ms = Default::default();
state.microphone_sink_late_window_ms = Default::default();
state.phase = UpstreamSyncPhase::Acquiring; state.phase = UpstreamSyncPhase::Acquiring;
state.last_reason = "timing anchors reset".to_string(); state.last_reason = "timing anchors reset".to_string();
} }

View File

@ -1,5 +1,8 @@
use std::collections::VecDeque;
use tokio::time::Instant; use tokio::time::Instant;
const TIMING_WINDOW_CAPACITY: usize = 240;
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub(super) struct UpstreamTimingSample { pub(super) struct UpstreamTimingSample {
pub capture_pts_us: u64, pub capture_pts_us: u64,
@ -8,6 +11,45 @@ pub(super) struct UpstreamTimingSample {
pub received_at: Instant, pub received_at: Instant,
} }
#[derive(Clone, Copy, Debug)]
pub(super) struct UpstreamPresentationSample {
pub local_pts_us: u64,
pub due_at: Instant,
pub handed_at: Instant,
}
#[derive(Debug, Default)]
pub(super) struct UpstreamScalarWindow {
values: VecDeque<f64>,
}
impl UpstreamScalarWindow {
pub fn push(&mut self, value: f64) {
if self.values.len() >= TIMING_WINDOW_CAPACITY {
self.values.pop_front();
}
self.values.push_back(value);
}
pub fn p95_abs(&self) -> Option<f64> {
percentile(self.values.iter().map(|value| value.abs()), 0.95)
}
pub fn p95(&self) -> Option<f64> {
percentile(self.values.iter().copied(), 0.95)
}
}
fn percentile(values: impl Iterator<Item = f64>, quantile: f64) -> Option<f64> {
let mut sorted = values.filter(|value| value.is_finite()).collect::<Vec<_>>();
if sorted.is_empty() {
return None;
}
sorted.sort_by(|left, right| left.total_cmp(right));
let index = ((sorted.len() - 1) as f64 * quantile.clamp(0.0, 1.0)).ceil() as usize;
sorted.get(index).copied()
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum UpstreamSyncPhase { pub enum UpstreamSyncPhase {
Acquiring, Acquiring,
@ -60,6 +102,16 @@ pub(super) struct UpstreamClockState {
pub last_reason: String, pub last_reason: String,
pub latest_camera_timing: Option<UpstreamTimingSample>, pub latest_camera_timing: Option<UpstreamTimingSample>,
pub latest_microphone_timing: Option<UpstreamTimingSample>, pub latest_microphone_timing: Option<UpstreamTimingSample>,
pub latest_camera_presentation: Option<UpstreamPresentationSample>,
pub latest_microphone_presentation: Option<UpstreamPresentationSample>,
pub client_capture_skew_window_ms: UpstreamScalarWindow,
pub client_send_skew_window_ms: UpstreamScalarWindow,
pub server_receive_skew_window_ms: UpstreamScalarWindow,
pub camera_client_queue_age_window_ms: UpstreamScalarWindow,
pub microphone_client_queue_age_window_ms: UpstreamScalarWindow,
pub sink_handoff_skew_window_ms: UpstreamScalarWindow,
pub camera_sink_late_window_ms: UpstreamScalarWindow,
pub microphone_sink_late_window_ms: UpstreamScalarWindow,
} }
impl Default for UpstreamSyncPhase { impl Default for UpstreamSyncPhase {

View File

@ -15,7 +15,7 @@ async fn wait_for_audio_master_releases_video_once_audio_catches_up() {
super::UpstreamPlanDecision::AwaitingPair super::UpstreamPlanDecision::AwaitingPair
)); ));
let audio_first = play(runtime.plan_audio_pts(1_000_000)); let audio_first = play(runtime.plan_audio_pts(1_000_000));
runtime.mark_audio_presented(audio_first.local_pts_us); runtime.mark_audio_presented(audio_first.local_pts_us, audio_first.due_at);
let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
let waiter = tokio::spawn({ let waiter = tokio::spawn({
@ -29,7 +29,7 @@ async fn wait_for_audio_master_releases_video_once_audio_catches_up() {
tokio::time::sleep(Duration::from_millis(5)).await; tokio::time::sleep(Duration::from_millis(5)).await;
let audio_next = play(runtime.plan_audio_pts(1_010_000)); let audio_next = play(runtime.plan_audio_pts(1_010_000));
runtime.mark_audio_presented(audio_next.local_pts_us); runtime.mark_audio_presented(audio_next.local_pts_us, audio_next.due_at);
assert!(waiter.await.expect("audio master waiter should finish")); assert!(waiter.await.expect("audio master waiter should finish"));
} }
@ -47,7 +47,7 @@ async fn wait_for_audio_master_allows_configured_positive_audio_delay() {
super::UpstreamPlanDecision::AwaitingPair super::UpstreamPlanDecision::AwaitingPair
)); ));
let audio_first = play(runtime.plan_audio_pts(1_000_000)); let audio_first = play(runtime.plan_audio_pts(1_000_000));
runtime.mark_audio_presented(audio_first.local_pts_us); runtime.mark_audio_presented(audio_first.local_pts_us, audio_first.due_at);
let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
let delayed_video = play(runtime.plan_video_pts(1_700_000, 16_666)); let delayed_video = play(runtime.plan_video_pts(1_700_000, 16_666));
@ -75,7 +75,7 @@ async fn wait_for_audio_master_times_out_when_audio_never_catches_up() {
super::UpstreamPlanDecision::AwaitingPair super::UpstreamPlanDecision::AwaitingPair
)); ));
let audio_first = play(runtime.plan_audio_pts(1_000_000)); let audio_first = play(runtime.plan_audio_pts(1_000_000));
runtime.mark_audio_presented(audio_first.local_pts_us); runtime.mark_audio_presented(audio_first.local_pts_us, audio_first.due_at);
let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
let due_at = tokio::time::Instant::now() + Duration::from_millis(20); let due_at = tokio::time::Instant::now() + Duration::from_millis(20);
@ -98,7 +98,7 @@ async fn wait_for_audio_master_keeps_video_waiting_through_sync_grace() {
super::UpstreamPlanDecision::AwaitingPair super::UpstreamPlanDecision::AwaitingPair
)); ));
let audio_first = play(runtime.plan_audio_pts(1_000_000)); let audio_first = play(runtime.plan_audio_pts(1_000_000));
runtime.mark_audio_presented(audio_first.local_pts_us); runtime.mark_audio_presented(audio_first.local_pts_us, audio_first.due_at);
let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
let waiter = tokio::spawn({ let waiter = tokio::spawn({
@ -117,7 +117,7 @@ async fn wait_for_audio_master_keeps_video_waiting_through_sync_grace() {
tokio::time::sleep(Duration::from_millis(5)).await; tokio::time::sleep(Duration::from_millis(5)).await;
let audio_next = play(runtime.plan_audio_pts(1_120_000)); let audio_next = play(runtime.plan_audio_pts(1_120_000));
runtime.mark_audio_presented(audio_next.local_pts_us); runtime.mark_audio_presented(audio_next.local_pts_us, audio_next.due_at);
assert!( assert!(
waiter waiter

View File

@ -345,7 +345,7 @@ fn video_too_far_behind_audio_master_is_dropped_and_counted_as_freeze() {
)); ));
let audio = play(runtime.plan_audio_pts(1_000_000)); let audio = play(runtime.plan_audio_pts(1_000_000));
let _video = play(runtime.plan_video_pts(1_000_000, 16_666)); let _video = play(runtime.plan_video_pts(1_000_000, 16_666));
runtime.mark_audio_presented(audio.local_pts_us); runtime.mark_audio_presented(audio.local_pts_us, audio.due_at);
let audio_master = play(runtime.plan_audio_pts(1_200_000)); let audio_master = play(runtime.plan_audio_pts(1_200_000));
assert!( assert!(
@ -355,7 +355,7 @@ fn video_too_far_behind_audio_master_is_dropped_and_counted_as_freeze() {
), ),
"future planned audio alone must not freeze video before UAC presentation" "future planned audio alone must not freeze video before UAC presentation"
); );
runtime.mark_audio_presented(audio_master.local_pts_us); runtime.mark_audio_presented(audio_master.local_pts_us, audio_master.due_at);
assert!(matches!( assert!(matches!(
runtime.plan_video_pts(1_116_666, 16_666), runtime.plan_video_pts(1_116_666, 16_666),
@ -465,8 +465,8 @@ fn planner_snapshot_tracks_presented_playheads_and_skew() {
)); ));
let audio = play(runtime.plan_audio_pts(1_000_000)); let audio = play(runtime.plan_audio_pts(1_000_000));
let video = play(runtime.plan_video_pts(1_000_000, 16_666)); let video = play(runtime.plan_video_pts(1_000_000, 16_666));
runtime.mark_audio_presented(audio.local_pts_us); runtime.mark_audio_presented(audio.local_pts_us, audio.due_at);
runtime.mark_video_presented(video.local_pts_us); runtime.mark_video_presented(video.local_pts_us, video.due_at);
let snapshot = runtime.snapshot(); let snapshot = runtime.snapshot();
assert_eq!(snapshot.phase, "live"); assert_eq!(snapshot.phase, "live");
@ -475,6 +475,44 @@ fn planner_snapshot_tracks_presented_playheads_and_skew() {
assert_eq!(snapshot.planner_skew_ms, Some(0.0)); assert_eq!(snapshot.planner_skew_ms, Some(0.0));
} }
#[test]
#[serial(upstream_media_runtime)]
fn planner_snapshot_tracks_sink_handoff_timing_windows() {
let runtime = runtime_without_offsets();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let due_at = tokio::time::Instant::now()
.checked_sub(Duration::from_millis(5))
.unwrap_or_else(tokio::time::Instant::now);
runtime.mark_audio_presented(123_000, due_at);
std::thread::sleep(Duration::from_millis(1));
runtime.mark_video_presented(123_000, due_at);
let snapshot = runtime.snapshot();
assert!(
snapshot.sink_handoff_skew_ms.is_some_and(|skew| skew > 0.0),
"video was handed to its sink after audio"
);
assert!(
snapshot
.sink_handoff_abs_skew_p95_ms
.is_some_and(|skew| skew > 0.0),
"the rolling handoff window should include the audio/video handoff gap"
);
assert!(
snapshot.camera_sink_late_ms.is_some_and(|late| late > 0.0),
"handoff after due_at should be reported as positive lateness"
);
assert!(
snapshot
.microphone_sink_late_p95_ms
.is_some_and(|late| late > 0.0),
"audio sink lateness should be retained in the rolling window"
);
}
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn planner_snapshot_tracks_client_timing_sidecar_metrics() { fn planner_snapshot_tracks_client_timing_sidecar_metrics() {
@ -504,14 +542,24 @@ fn planner_snapshot_tracks_client_timing_sidecar_metrics() {
assert_eq!(snapshot.client_capture_skew_ms, Some(60.0)); assert_eq!(snapshot.client_capture_skew_ms, Some(60.0));
assert_eq!(snapshot.client_send_skew_ms, Some(50.0)); assert_eq!(snapshot.client_send_skew_ms, Some(50.0));
assert_eq!(snapshot.client_capture_abs_skew_p95_ms, Some(60.0));
assert_eq!(snapshot.client_send_abs_skew_p95_ms, Some(50.0));
assert_eq!(snapshot.camera_client_queue_age_ms, Some(20.0)); assert_eq!(snapshot.camera_client_queue_age_ms, Some(20.0));
assert_eq!(snapshot.microphone_client_queue_age_ms, Some(30.0)); assert_eq!(snapshot.microphone_client_queue_age_ms, Some(30.0));
assert_eq!(snapshot.camera_client_queue_age_p95_ms, Some(20.0));
assert_eq!(snapshot.microphone_client_queue_age_p95_ms, Some(30.0));
assert!( assert!(
snapshot snapshot
.server_receive_skew_ms .server_receive_skew_ms
.is_some_and(|skew| skew < 0.0), .is_some_and(|skew| skew < 0.0),
"camera was received before microphone, so camera-minus-mic receive skew should be negative" "camera was received before microphone, so camera-minus-mic receive skew should be negative"
); );
assert!(
snapshot
.server_receive_abs_skew_p95_ms
.is_some_and(|skew| skew > 0.0),
"server receive jitter should be retained as an absolute p95"
);
assert!( assert!(
snapshot snapshot
.camera_server_receive_age_ms .camera_server_receive_age_ms

View File

@ -89,4 +89,15 @@ pub struct UpstreamPlannerSnapshot {
pub microphone_client_queue_age_ms: Option<f64>, pub microphone_client_queue_age_ms: Option<f64>,
pub camera_server_receive_age_ms: Option<f64>, pub camera_server_receive_age_ms: Option<f64>,
pub microphone_server_receive_age_ms: Option<f64>, pub microphone_server_receive_age_ms: Option<f64>,
pub client_capture_abs_skew_p95_ms: Option<f64>,
pub client_send_abs_skew_p95_ms: Option<f64>,
pub server_receive_abs_skew_p95_ms: Option<f64>,
pub camera_client_queue_age_p95_ms: Option<f64>,
pub microphone_client_queue_age_p95_ms: Option<f64>,
pub sink_handoff_skew_ms: Option<f64>,
pub sink_handoff_abs_skew_p95_ms: Option<f64>,
pub camera_sink_late_ms: Option<f64>,
pub microphone_sink_late_ms: Option<f64>,
pub camera_sink_late_p95_ms: Option<f64>,
pub microphone_sink_late_p95_ms: Option<f64>,
} }