From c741e8df17e5def0b2128fc09fc540112d9cb7e1 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 2 May 2026 17:27:59 -0300 Subject: [PATCH] feat: expose upstream timing sidecar metrics --- AGENTS.md | 16 ++++ Cargo.lock | 6 +- client/Cargo.toml | 2 +- client/src/app.rs | 5 +- client/src/app/uplink_media.rs | 47 ++++++++++- client/src/bin/lesavka-relayctl.rs | 49 ++++++++++++ client/src/input/microphone.rs | 1 + client/src/sync_probe/capture/runtime.rs | 1 + client/src/sync_probe/capture/tests.rs | 1 + common/Cargo.toml | 2 +- common/proto/lesavka.proto | 22 ++++- .../manual/run_upstream_mirrored_av_sync.sh | 15 ++++ server/Cargo.toml | 2 +- server/src/audio/ear_capture.rs | 1 + server/src/main.rs | 15 +++- server/src/main/relay_service.rs | 12 +++ server/src/main/relay_stream_lifecycle.rs | 42 ++++++++++ server/src/main/rpc_helpers.rs | 15 ++++ server/src/upstream_media_runtime.rs | 80 ++++++++++++++++++- .../upstream_media_runtime/lease_lifecycle.rs | 2 + server/src/upstream_media_runtime/state.rs | 10 +++ .../upstream_media_runtime/tests/planning.rs | 46 +++++++++++ server/src/upstream_media_runtime/types.rs | 20 +++++ server/src/video/eye_capture.rs | 1 + 24 files changed, 396 insertions(+), 17 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index be4289c..ddea69e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -495,3 +495,19 @@ guess. - [ ] Use the next run to decide whether bad p95 is caused by low-confidence analyzer pairings, camera/mic capture instability, or server planner/output jitter. - [ ] Add stage-local timing evidence for stimulus schedule, client capture onsets, server output timing, and browser/device capture if the event table still cannot isolate the source. - [ ] Only save calibration defaults after a confirmation segment passes. + +## 0.17.25 Client/Server Timing Sidecar Checklist + +Context: the probe should remain an external truth check, not a runtime dependency. +Production sync needs client/server-only timing evidence that can predict and heal jitter before +browser/probe validation. Attach timing metadata to media packets first; add a separate timing RPC +only if packet-attached metadata cannot explain the next failure. + +- [x] Add client send/capture/queue timing metadata to upstream camera packets. +- [x] Add client send/capture/queue timing metadata to upstream microphone packets. +- [x] Record the latest packet timing samples on the server when media packets arrive. +- [x] Expose blind client/server timing metrics through `GetUpstreamSync` and `lesavka-relayctl upstream-sync`. +- [x] Include those timing metrics in segmented mirrored-probe summaries. +- [x] Add planner tests covering client capture skew, client send skew, server receive skew, and queue ages. +- [ ] Use the next mirrored run to compare browser p95/drift against client capture/send skew and server receive skew. +- [ ] If client/server timing is stable while browser p95 still fails, instrument UVC/UAC sink emission timing next. diff --git a/Cargo.lock b/Cargo.lock index 19a460d..925182f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.24" +version = "0.17.25" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.24" +version = "0.17.25" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.24" +version = "0.17.25" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index c372c13..14f2415 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.24" +version = "0.17.25" edition = "2024" [dependencies] diff --git a/client/src/app.rs b/client/src/app.rs index 42fe62d..8dc250b 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -3,7 +3,7 @@ use anyhow::Result; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use tokio::sync::{broadcast, mpsc}; use tokio_stream::{ @@ -19,7 +19,8 @@ use winit::{ }; use lesavka_common::lesavka::{ - Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, relay_client::RelayClient, + AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, + relay_client::RelayClient, }; #[cfg(not(coverage))] diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 60d5d2b..2cf0163 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -84,11 +84,16 @@ impl LesavkaClientApp { duration_ms(next.delivery_age), ); } - if let Some(packet) = next.packet { + if let Some(mut packet) = next.packet { telemetry_stream.record_streamed( queue_depth_u32(next.queue_depth), duration_ms(next.delivery_age), ); + attach_audio_timing_metadata( + &mut packet, + next.queue_depth, + next.delivery_age, + ); yield packet; continue; } @@ -265,11 +270,16 @@ impl LesavkaClientApp { duration_ms(next.delivery_age), ); } - if let Some(packet) = next.packet { + if let Some(mut packet) = next.packet { telemetry_stream.record_streamed( queue_depth_u32(next.queue_depth), duration_ms(next.delivery_age), ); + attach_video_timing_metadata( + &mut packet, + next.queue_depth, + next.delivery_age, + ); yield packet; continue; } @@ -485,6 +495,39 @@ fn duration_ms(duration: Duration) -> f32 { duration.as_secs_f32() * 1_000.0 } +#[cfg(not(coverage))] +fn duration_ms_u32(duration: Duration) -> u32 { + duration.as_millis().min(u128::from(u32::MAX)) as u32 +} + +#[cfg(not(coverage))] +fn attach_audio_timing_metadata( + packet: &mut AudioPacket, + queue_depth: usize, + delivery_age: Duration, +) { + static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0); + packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); + packet.client_capture_pts_us = packet.pts; + packet.client_send_pts_us = crate::live_capture_clock::capture_pts_us(); + packet.client_queue_depth = queue_depth_u32(queue_depth); + packet.client_queue_age_ms = duration_ms_u32(delivery_age); +} + +#[cfg(not(coverage))] +fn attach_video_timing_metadata( + packet: &mut VideoPacket, + queue_depth: usize, + delivery_age: Duration, +) { + static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0); + packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); + packet.client_capture_pts_us = packet.pts; + packet.client_send_pts_us = crate::live_capture_clock::capture_pts_us(); + packet.client_queue_depth = queue_depth_u32(queue_depth); + packet.client_queue_age_ms = duration_ms_u32(delivery_age); +} + #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] enum UplinkDropReason { diff --git a/client/src/bin/lesavka-relayctl.rs b/client/src/bin/lesavka-relayctl.rs index 4267686..2a8d255 100644 --- a/client/src/bin/lesavka-relayctl.rs +++ b/client/src/bin/lesavka-relayctl.rs @@ -272,6 +272,55 @@ fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) { println!("planner_freshness_reanchors={}", state.freshness_reanchors); println!("planner_startup_timeouts={}", state.startup_timeouts); println!("planner_video_freezes={}", state.video_freezes); + println!( + "planner_client_capture_skew_ms={}", + state + .client_capture_skew_ms + .map(|value| format!("{value:+.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_client_send_skew_ms={}", + state + .client_send_skew_ms + .map(|value| format!("{value:+.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_server_receive_skew_ms={}", + state + .server_receive_skew_ms + .map(|value| format!("{value:+.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_camera_client_queue_age_ms={}", + state + .camera_client_queue_age_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_microphone_client_queue_age_ms={}", + state + .microphone_client_queue_age_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_camera_server_receive_age_ms={}", + state + .camera_server_receive_age_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_microphone_server_receive_age_ms={}", + state + .microphone_server_receive_age_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); println!("planner_detail={}", state.last_reason); } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 2ec640e..e58ee3f 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -399,6 +399,7 @@ fn split_audio_sample(base_pts_us: u64, data: &[u8], target_bytes: usize) -> Vec id: 0, pts: base_pts_us.saturating_add(pcm_payload_duration_us(offset)), data: data[offset..end].to_vec(), + ..Default::default() }); offset = end; } diff --git a/client/src/sync_probe/capture/runtime.rs b/client/src/sync_probe/capture/runtime.rs index 6a624ed..60fd121 100644 --- a/client/src/sync_probe/capture/runtime.rs +++ b/client/src/sync_probe/capture/runtime.rs @@ -299,6 +299,7 @@ fn spawn_audio_thread( id: 0, pts: timing.packet_pts_us, data: chunk, + ..Default::default() }; let _ = queue.push(packet, Duration::ZERO); chunk_index = chunk_index.saturating_add(1); diff --git a/client/src/sync_probe/capture/tests.rs b/client/src/sync_probe/capture/tests.rs index f899af8..e717559 100644 --- a/client/src/sync_probe/capture/tests.rs +++ b/client/src/sync_probe/capture/tests.rs @@ -64,6 +64,7 @@ async fn coverage_stub_exposes_live_video_and_audio_queues() { id: 0, pts: 2, data: vec![4, 5, 6], + ..Default::default() }, Duration::ZERO, ); diff --git a/common/Cargo.toml b/common/Cargo.toml index 51759f1..e6f2bb3 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.24" +version = "0.17.25" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 4cd9099..1c9a55b 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -26,8 +26,21 @@ message VideoPacket { uint32 server_queue_peak = 10; string server_encoder_label = 11; uint32 server_process_cpu_tenths = 12; + uint64 client_capture_pts_us = 13; + uint64 client_send_pts_us = 14; + uint32 client_queue_depth = 15; + uint32 client_queue_age_ms = 16; +} +message AudioPacket { + uint32 id = 1; + uint64 pts = 2; + bytes data = 3; + uint64 seq = 4; + uint64 client_capture_pts_us = 5; + uint64 client_send_pts_us = 6; + uint32 client_queue_depth = 7; + uint32 client_queue_age_ms = 8; } -message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } message ResetUsbReply { bool ok = 1; } // true = success @@ -105,6 +118,13 @@ message UpstreamSyncState { uint64 startup_timeouts = 13; uint64 video_freezes = 14; string last_reason = 15; + optional float client_capture_skew_ms = 16; + optional float client_send_skew_ms = 17; + optional float server_receive_skew_ms = 18; + optional float camera_client_queue_age_ms = 19; + optional float microphone_client_queue_age_ms = 20; + optional float camera_server_receive_age_ms = 21; + optional float microphone_server_receive_age_ms = 22; } message HandshakeSet { diff --git a/scripts/manual/run_upstream_mirrored_av_sync.sh b/scripts/manual/run_upstream_mirrored_av_sync.sh index e2cfa9b..c26da5d 100755 --- a/scripts/manual/run_upstream_mirrored_av_sync.sh +++ b/scripts/manual/run_upstream_mirrored_av_sync.sh @@ -851,6 +851,16 @@ for segment in range(1, segment_count + 1): "planner_video_freezes_after": as_float(planner_after.get("planner_video_freezes")), "planner_freshness_reanchors_before": as_float(planner_before.get("planner_freshness_reanchors")), "planner_freshness_reanchors_after": as_float(planner_after.get("planner_freshness_reanchors")), + "planner_client_capture_skew_ms_before": as_float(planner_before.get("planner_client_capture_skew_ms")), + "planner_client_capture_skew_ms_after": as_float(planner_after.get("planner_client_capture_skew_ms")), + "planner_client_send_skew_ms_before": as_float(planner_before.get("planner_client_send_skew_ms")), + "planner_client_send_skew_ms_after": as_float(planner_after.get("planner_client_send_skew_ms")), + "planner_server_receive_skew_ms_before": as_float(planner_before.get("planner_server_receive_skew_ms")), + "planner_server_receive_skew_ms_after": as_float(planner_after.get("planner_server_receive_skew_ms")), + "planner_camera_client_queue_age_ms_after": as_float(planner_after.get("planner_camera_client_queue_age_ms")), + "planner_microphone_client_queue_age_ms_after": as_float(planner_after.get("planner_microphone_client_queue_age_ms")), + "planner_camera_server_receive_age_ms_after": as_float(planner_after.get("planner_camera_server_receive_age_ms")), + "planner_microphone_server_receive_age_ms_after": as_float(planner_after.get("planner_microphone_server_receive_age_ms")), "active_audio_offset_us_before": as_float(calibration_before.get("calibration_active_audio_offset_us")), "active_audio_offset_us_after": as_float(calibration_after.get("calibration_active_audio_offset_us")), "active_video_offset_us_before": as_float(calibration_before.get("calibration_active_video_offset_us")), @@ -924,6 +934,11 @@ if target_source_rows: ], "planner_live_lag_ms_after": range_for(target_source_rows, "planner_live_lag_ms_after"), "planner_skew_ms_after": range_for(target_source_rows, "planner_skew_ms_after"), + "planner_client_capture_skew_ms_after": range_for(target_source_rows, "planner_client_capture_skew_ms_after"), + "planner_client_send_skew_ms_after": range_for(target_source_rows, "planner_client_send_skew_ms_after"), + "planner_server_receive_skew_ms_after": range_for(target_source_rows, "planner_server_receive_skew_ms_after"), + "planner_camera_client_queue_age_ms_after": range_for(target_source_rows, "planner_camera_client_queue_age_ms_after"), + "planner_microphone_client_queue_age_ms_after": range_for(target_source_rows, "planner_microphone_client_queue_age_ms_after"), "active_audio_offset_us_after": range_for(target_source_rows, "active_audio_offset_us_after"), "active_video_offset_us_after": range_for(target_source_rows, "active_video_offset_us_after"), "probe_p95_abs_skew_ms": range_for(target_source_rows, "probe_p95_abs_skew_ms"), diff --git a/server/Cargo.toml b/server/Cargo.toml index b95762f..0496bc7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.24" +version = "0.17.25" edition = "2024" autobins = false diff --git a/server/src/audio/ear_capture.rs b/server/src/audio/ear_capture.rs index 7916848..70c08a3 100644 --- a/server/src/audio/ear_capture.rs +++ b/server/src/audio/ear_capture.rs @@ -210,6 +210,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { id, pts: pts_us, data: map.as_slice().to_vec(), + ..Default::default() })) .is_err() { diff --git a/server/src/main.rs b/server/src/main.rs index bbaf67d..7be9667 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -25,10 +25,17 @@ use lesavka_common::lesavka::{ }; use lesavka_server::{ - calibration::CalibrationStore, camera, camera_runtime::CameraRuntime, - capture_power::CapturePowerManager, gadget::UsbGadget, handshake::HandshakeSvc, paste, - runtime_support, runtime_support::init_tracing, security, - upstream_media_runtime::UpstreamMediaRuntime, uvc_runtime, video, + calibration::CalibrationStore, + camera, + camera_runtime::CameraRuntime, + capture_power::CapturePowerManager, + gadget::UsbGadget, + handshake::HandshakeSvc, + paste, runtime_support, + runtime_support::init_tracing, + security, + upstream_media_runtime::{UpstreamClientTiming, UpstreamMediaKind, UpstreamMediaRuntime}, + uvc_runtime, video, }; /*──────────────── constants ────────────────*/ diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 37b4e7e..4f135f6 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -165,6 +165,10 @@ impl Relay for Handler { if let Some(next_packet) = next_packet { match next_packet.transpose() { Ok(Some(pkt)) => { + upstream_media_rt.record_client_timing( + UpstreamMediaKind::Microphone, + audio_client_timing(&pkt), + ); pending.push_back(pkt); let coalesced = retain_freshest_audio_packet(&mut pending); if coalesced > 0 { @@ -253,6 +257,10 @@ impl Relay for Handler { next_packet = inbound.next(), if !inbound_closed => { match next_packet.transpose() { Ok(Some(next_pkt)) => { + upstream_media_rt.record_client_timing( + UpstreamMediaKind::Microphone, + audio_client_timing(&next_pkt), + ); pending.push_back(next_pkt); let coalesced = retain_freshest_audio_packet(&mut pending); if coalesced > 0 { @@ -373,6 +381,10 @@ impl Relay for Handler { if let Some(next_packet) = next_packet { match next_packet.transpose() { Ok(Some(pkt)) => { + upstream_media_rt.record_client_timing( + UpstreamMediaKind::Camera, + video_client_timing(&pkt), + ); pending.push_back(pkt); let coalesced = retain_freshest_video_packet(&mut pending); if coalesced > 0 { diff --git a/server/src/main/relay_stream_lifecycle.rs b/server/src/main/relay_stream_lifecycle.rs index 36a5a62..6d7e2ba 100644 --- a/server/src/main/relay_stream_lifecycle.rs +++ b/server/src/main/relay_stream_lifecycle.rs @@ -39,6 +39,48 @@ fn retain_freshest_audio_packet( dropped } +#[cfg(not(coverage))] +/// Extract client-side timing facts from an upstream microphone packet. +fn audio_client_timing(pkt: &AudioPacket) -> UpstreamClientTiming { + let capture_pts_us = if pkt.client_capture_pts_us == 0 { + pkt.pts + } else { + pkt.client_capture_pts_us + }; + let send_pts_us = if pkt.client_send_pts_us == 0 { + capture_pts_us + } else { + pkt.client_send_pts_us + }; + UpstreamClientTiming { + capture_pts_us, + send_pts_us, + queue_depth: pkt.client_queue_depth, + queue_age_ms: pkt.client_queue_age_ms, + } +} + +#[cfg(not(coverage))] +/// Extract client-side timing facts from an upstream camera packet. +fn video_client_timing(pkt: &VideoPacket) -> UpstreamClientTiming { + let capture_pts_us = if pkt.client_capture_pts_us == 0 { + pkt.pts + } else { + pkt.client_capture_pts_us + }; + let send_pts_us = if pkt.client_send_pts_us == 0 { + capture_pts_us + } else { + pkt.client_send_pts_us + }; + UpstreamClientTiming { + capture_pts_us, + send_pts_us, + queue_depth: pkt.client_queue_depth, + queue_age_ms: pkt.client_queue_age_ms, + } +} + #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] enum UpstreamStreamCleanupKind { diff --git a/server/src/main/rpc_helpers.rs b/server/src/main/rpc_helpers.rs index d2c6cb1..20cdb9d 100644 --- a/server/src/main/rpc_helpers.rs +++ b/server/src/main/rpc_helpers.rs @@ -183,6 +183,21 @@ impl Handler { startup_timeouts: snapshot.startup_timeouts, video_freezes: snapshot.video_freezes, last_reason: snapshot.last_reason, + client_capture_skew_ms: snapshot.client_capture_skew_ms.map(|value| value as f32), + client_send_skew_ms: snapshot.client_send_skew_ms.map(|value| value as f32), + server_receive_skew_ms: snapshot.server_receive_skew_ms.map(|value| value as f32), + camera_client_queue_age_ms: snapshot + .camera_client_queue_age_ms + .map(|value| value as f32), + microphone_client_queue_age_ms: snapshot + .microphone_client_queue_age_ms + .map(|value| value as f32), + camera_server_receive_age_ms: snapshot + .camera_server_receive_age_ms + .map(|value| value as f32), + microphone_server_receive_age_ms: snapshot + .microphone_server_receive_age_ms + .map(|value| value as f32), })) } } diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 836d1e7..5ba9834 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -19,8 +19,8 @@ use config::{ }; use state::{UpstreamClockState, UpstreamSyncPhase}; pub use types::{ - PlannedUpstreamPacket, UpstreamMediaKind, UpstreamPlanDecision, UpstreamPlannerSnapshot, - UpstreamStreamLease, + PlannedUpstreamPacket, UpstreamClientTiming, UpstreamMediaKind, UpstreamPlanDecision, + UpstreamPlannerSnapshot, UpstreamStreamLease, }; /// Coordinate upstream stream ownership and keep audio/video on one timeline. @@ -123,6 +123,24 @@ impl UpstreamMediaRuntime { self.audio_progress_notify.notify_waiters(); } + /// Record client-side timing facts for one packet as it arrives at the server. + pub fn record_client_timing(&self, kind: UpstreamMediaKind, timing: UpstreamClientTiming) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + let sample = state::UpstreamTimingSample { + capture_pts_us: timing.capture_pts_us, + send_pts_us: timing.send_pts_us, + queue_age_ms: timing.queue_age_ms, + received_at: Instant::now(), + }; + match kind { + UpstreamMediaKind::Camera => state.latest_camera_timing = Some(sample), + UpstreamMediaKind::Microphone => state.latest_microphone_timing = Some(sample), + } + } + /// Mark one video frame as actually handed to the UVC/HDMI sink. pub fn mark_video_presented(&self, local_pts_us: u64) { let mut state = self @@ -164,6 +182,24 @@ impl UpstreamMediaRuntime { (Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0), _ => None, }; + let now = Instant::now(); + let client_capture_skew_ms = skew_ms_from_samples( + state.latest_camera_timing, + state.latest_microphone_timing, + |sample| sample.capture_pts_us, + ); + let client_send_skew_ms = skew_ms_from_samples( + state.latest_camera_timing, + state.latest_microphone_timing, + |sample| sample.send_pts_us, + ); + let server_receive_skew_ms = + match (state.latest_camera_timing, state.latest_microphone_timing) { + (Some(camera), Some(microphone)) => Some( + instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0, + ), + _ => None, + }; UpstreamPlannerSnapshot { session_id: state.session_id, phase: state.phase.as_str(), @@ -180,6 +216,25 @@ impl UpstreamMediaRuntime { startup_timeouts: state.startup_timeouts, video_freezes: state.video_freezes, last_reason: state.last_reason.clone(), + client_capture_skew_ms, + client_send_skew_ms, + server_receive_skew_ms, + camera_client_queue_age_ms: state + .latest_camera_timing + .map(|sample| f64::from(sample.queue_age_ms)), + microphone_client_queue_age_ms: state + .latest_microphone_timing + .map(|sample| f64::from(sample.queue_age_ms)), + camera_server_receive_age_ms: state.latest_camera_timing.map(|sample| { + now.saturating_duration_since(sample.received_at) + .as_secs_f64() + * 1000.0 + }), + microphone_server_receive_age_ms: state.latest_microphone_timing.map(|sample| { + now.saturating_duration_since(sample.received_at) + .as_secs_f64() + * 1000.0 + }), } } } @@ -637,6 +692,27 @@ fn us_to_ms(value: u64) -> f64 { value as f64 / 1000.0 } +fn skew_ms_from_samples( + camera: Option, + microphone: Option, + value: impl Fn(state::UpstreamTimingSample) -> u64, +) -> Option { + match (camera, microphone) { + (Some(camera), Some(microphone)) => { + Some((value(camera) as i128 - value(microphone) as i128) as f64 / 1000.0) + } + _ => None, + } +} + +fn instant_delta_us(left: Instant, right: Instant) -> i128 { + if left >= right { + left.saturating_duration_since(right).as_micros() as i128 + } else { + -(right.saturating_duration_since(left).as_micros() as i128) + } +} + fn refresh_unpaired_pairing_anchor( state: &mut UpstreamClockState, kind: UpstreamMediaKind, diff --git a/server/src/upstream_media_runtime/lease_lifecycle.rs b/server/src/upstream_media_runtime/lease_lifecycle.rs index 1e732f9..0769280 100644 --- a/server/src/upstream_media_runtime/lease_lifecycle.rs +++ b/server/src/upstream_media_runtime/lease_lifecycle.rs @@ -166,6 +166,8 @@ fn reset_timing_anchors(state: &mut UpstreamClockState) { state.skew_video_drops = 0; state.startup_timeouts = 0; state.video_freezes = 0; + state.latest_camera_timing = None; + state.latest_microphone_timing = None; state.phase = UpstreamSyncPhase::Acquiring; state.last_reason = "timing anchors reset".to_string(); } diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs index ce6cbc0..7f324e5 100644 --- a/server/src/upstream_media_runtime/state.rs +++ b/server/src/upstream_media_runtime/state.rs @@ -1,5 +1,13 @@ use tokio::time::Instant; +#[derive(Clone, Copy, Debug)] +pub(super) struct UpstreamTimingSample { + pub capture_pts_us: u64, + pub send_pts_us: u64, + pub queue_age_ms: u32, + pub received_at: Instant, +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamSyncPhase { Acquiring, @@ -50,6 +58,8 @@ pub(super) struct UpstreamClockState { pub startup_timeouts: u64, pub video_freezes: u64, pub last_reason: String, + pub latest_camera_timing: Option, + pub latest_microphone_timing: Option, } impl Default for UpstreamSyncPhase { diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/server/src/upstream_media_runtime/tests/planning.rs index 19c0f47..6655398 100644 --- a/server/src/upstream_media_runtime/tests/planning.rs +++ b/server/src/upstream_media_runtime/tests/planning.rs @@ -475,6 +475,52 @@ fn planner_snapshot_tracks_presented_playheads_and_skew() { assert_eq!(snapshot.planner_skew_ms, Some(0.0)); } +#[test] +#[serial(upstream_media_runtime)] +fn planner_snapshot_tracks_client_timing_sidecar_metrics() { + let runtime = runtime_without_offsets(); + + runtime.record_client_timing( + super::UpstreamMediaKind::Camera, + super::UpstreamClientTiming { + capture_pts_us: 1_060_000, + send_pts_us: 1_080_000, + queue_depth: 2, + queue_age_ms: 20, + }, + ); + std::thread::sleep(Duration::from_millis(2)); + runtime.record_client_timing( + super::UpstreamMediaKind::Microphone, + super::UpstreamClientTiming { + capture_pts_us: 1_000_000, + send_pts_us: 1_030_000, + queue_depth: 3, + queue_age_ms: 30, + }, + ); + + let snapshot = runtime.snapshot(); + + assert_eq!(snapshot.client_capture_skew_ms, Some(60.0)); + assert_eq!(snapshot.client_send_skew_ms, Some(50.0)); + assert_eq!(snapshot.camera_client_queue_age_ms, Some(20.0)); + assert_eq!(snapshot.microphone_client_queue_age_ms, Some(30.0)); + assert!( + snapshot + .server_receive_skew_ms + .is_some_and(|skew| skew < 0.0), + "camera was received before microphone, so camera-minus-mic receive skew should be negative" + ); + assert!( + snapshot + .camera_server_receive_age_ms + .zip(snapshot.microphone_server_receive_age_ms) + .is_some_and(|(camera_age, microphone_age)| camera_age >= microphone_age), + "the earlier camera receive sample should be at least as old as the later mic sample" + ); +} + #[test] #[serial(upstream_media_runtime)] fn default_runtime_covers_video_map_play_path() { diff --git a/server/src/upstream_media_runtime/types.rs b/server/src/upstream_media_runtime/types.rs index 47d6f38..e5a8b97 100644 --- a/server/src/upstream_media_runtime/types.rs +++ b/server/src/upstream_media_runtime/types.rs @@ -10,6 +10,19 @@ pub enum UpstreamMediaKind { Microphone, } +/// Client-supplied timing facts attached to one upstream media packet. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct UpstreamClientTiming { + /// Packet capture timestamp on the shared client media clock. + pub capture_pts_us: u64, + /// Client media-clock timestamp when the packet left the uplink queue. + pub send_pts_us: u64, + /// Uplink queue depth observed as the packet was sent. + pub queue_depth: u32, + /// Packet age observed as the packet was sent. + pub queue_age_ms: u32, +} + /// Lease returned when one upstream media stream becomes the active owner. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct UpstreamStreamLease { @@ -69,4 +82,11 @@ pub struct UpstreamPlannerSnapshot { pub startup_timeouts: u64, pub video_freezes: u64, pub last_reason: String, + pub client_capture_skew_ms: Option, + pub client_send_skew_ms: Option, + pub server_receive_skew_ms: Option, + pub camera_client_queue_age_ms: Option, + pub microphone_client_queue_age_ms: Option, + pub camera_server_receive_age_ms: Option, + pub microphone_server_receive_age_ms: Option, } diff --git a/server/src/video/eye_capture.rs b/server/src/video/eye_capture.rs index c98c254..06e395a 100644 --- a/server/src/video/eye_capture.rs +++ b/server/src/video/eye_capture.rs @@ -371,6 +371,7 @@ pub async fn eye_ball_with_request( server_encoder_label: server_encoder_label_for_cb.clone(), server_process_cpu_tenths: server_process_cpu_tenths_for_cb .load(Ordering::Relaxed), + ..Default::default() }; match tx.try_send(Ok(pkt)) { Ok(_) => {