diff --git a/AGENTS.md b/AGENTS.md index bbc337b..bb743e4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -539,3 +539,15 @@ remains the truth judge and root-cause localizer, not the production dependency. - [x] Emit `root-cause-summary.json` from mirrored probe runs to classify failing layers instead of eyeballing raw metrics. - [x] Add unit tests for apply/refuse/target behavior in the blind healer. - [ ] Next run should identify the failing layer if confirmation still fails: client capture/uplink, network/server receive, server planner, server sink handoff, or external USB/browser/probe boundary. + +## 0.17.28 Blind Timing Normalization Checklist + +Context: the first preferred confirmation pass showed the probe-calibrate-confirm +loop can work, but also revealed two blind-healing blockers: sink handoff samples +stayed empty, and client timing skew included a false cross-pipeline PTS offset. + +- [x] Pair server sink handoff samples by planned due time, not raw local PTS, so offset-compensated streams still produce handoff evidence. +- [x] Normalize client sidecar capture/send windows onto the shared capture clock using queue delivery age instead of raw per-pipeline packet PTS. +- [x] Add tests proving sink handoff survives large offset-compensated local PTS gaps. +- [x] Add tests proving audio/video timing metadata no longer copies packet PTS domains into blind sidecar fields. +- [ ] Next mirrored run should show non-zero `planner_sink_handoff_window_samples` and much smaller client send/capture p95 skew before trusting blind healing. diff --git a/Cargo.lock b/Cargo.lock index fccac32..1c655d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.27" +version = "0.17.28" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.27" +version = "0.17.28" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.27" +version = "0.17.28" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 870ba63..88343fd 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.27" +version = "0.17.28" edition = "2024" [dependencies] diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 2cf0163..7a283f4 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -500,6 +500,13 @@ fn duration_ms_u32(duration: Duration) -> u32 { duration.as_millis().min(u128::from(u32::MAX)) as u32 } +#[cfg(not(coverage))] +fn shared_capture_window_from_delivery_age(delivery_age: Duration) -> (u64, u64) { + let send_pts_us = crate::live_capture_clock::capture_pts_us(); + let age_us = delivery_age.as_micros().min(u128::from(u64::MAX)) as u64; + (send_pts_us.saturating_sub(age_us), send_pts_us) +} + #[cfg(not(coverage))] fn attach_audio_timing_metadata( packet: &mut AudioPacket, @@ -508,8 +515,9 @@ fn attach_audio_timing_metadata( ) { static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0); packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); - packet.client_capture_pts_us = packet.pts; - packet.client_send_pts_us = crate::live_capture_clock::capture_pts_us(); + let (capture_pts_us, send_pts_us) = shared_capture_window_from_delivery_age(delivery_age); + packet.client_capture_pts_us = capture_pts_us; + packet.client_send_pts_us = send_pts_us; packet.client_queue_depth = queue_depth_u32(queue_depth); packet.client_queue_age_ms = duration_ms_u32(delivery_age); } @@ -522,8 +530,9 @@ fn attach_video_timing_metadata( ) { static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0); packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); - packet.client_capture_pts_us = packet.pts; - packet.client_send_pts_us = crate::live_capture_clock::capture_pts_us(); + let (capture_pts_us, send_pts_us) = shared_capture_window_from_delivery_age(delivery_age); + packet.client_capture_pts_us = capture_pts_us; + packet.client_send_pts_us = send_pts_us; packet.client_queue_depth = queue_depth_u32(queue_depth); packet.client_queue_age_ms = duration_ms_u32(delivery_age); } @@ -616,3 +625,54 @@ fn log_uplink_drop( limiter.record(reason, count, queue_depth, age_ms); } } + +#[cfg(test)] +mod uplink_timing_tests { + use super::*; + + #[test] + fn audio_timing_metadata_uses_shared_clock_window_instead_of_packet_pts_domain() { + std::thread::sleep(Duration::from_millis(5)); + let mut packet = AudioPacket { + pts: 9_999_999, + ..AudioPacket::default() + }; + + attach_audio_timing_metadata(&mut packet, 3, Duration::from_millis(2)); + + assert!(packet.seq > 0); + assert_eq!(packet.client_queue_depth, 3); + assert_eq!(packet.client_queue_age_ms, 2); + assert!( + packet.client_send_pts_us >= packet.client_capture_pts_us, + "send must be on or after the shared-clock capture estimate" + ); + assert!( + packet.client_send_pts_us - packet.client_capture_pts_us <= 2_000, + "delivery age, not packet PTS domain, should define the timing window" + ); + } + + #[test] + fn video_timing_metadata_uses_shared_clock_window_instead_of_packet_pts_domain() { + std::thread::sleep(Duration::from_millis(5)); + let mut packet = VideoPacket { + pts: 9_999_999, + ..VideoPacket::default() + }; + + attach_video_timing_metadata(&mut packet, 4, Duration::from_millis(3)); + + assert!(packet.seq > 0); + assert_eq!(packet.client_queue_depth, 4); + assert_eq!(packet.client_queue_age_ms, 3); + assert!( + packet.client_send_pts_us >= packet.client_capture_pts_us, + "send must be on or after the shared-clock capture estimate" + ); + assert!( + packet.client_send_pts_us - packet.client_capture_pts_us <= 3_000, + "delivery age, not packet PTS domain, should define the timing window" + ); + } +} diff --git a/common/Cargo.toml b/common/Cargo.toml index dda659e..7a906fe 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.27" +version = "0.17.28" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 46ff7c5..2806381 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.27" +version = "0.17.28" edition = "2024" autobins = false diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 428364c..c17ba20 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -115,12 +115,7 @@ impl UpstreamMediaRuntime { .state .lock() .expect("upstream media state mutex poisoned"); - record_presentation_sample( - &mut state, - UpstreamMediaKind::Microphone, - local_pts_us, - due_at, - ); + record_presentation_sample(&mut state, UpstreamMediaKind::Microphone, due_at); state.last_audio_presented_pts_us = Some(local_pts_us); if state.phase != UpstreamSyncPhase::Failed { state.phase = UpstreamSyncPhase::Live; @@ -154,7 +149,7 @@ impl UpstreamMediaRuntime { .state .lock() .expect("upstream media state mutex poisoned"); - record_presentation_sample(&mut state, UpstreamMediaKind::Camera, local_pts_us, due_at); + record_presentation_sample(&mut state, UpstreamMediaKind::Camera, due_at); state.last_video_presented_pts_us = Some(local_pts_us); if state.phase != UpstreamSyncPhase::Failed { state.phase = UpstreamSyncPhase::Live; @@ -762,11 +757,9 @@ fn record_client_timing_windows(state: &mut UpstreamClockState) { fn record_presentation_sample( state: &mut UpstreamClockState, kind: UpstreamMediaKind, - local_pts_us: u64, due_at: Instant, ) { let sample = state::UpstreamPresentationSample { - local_pts_us, due_at, handed_at: Instant::now(), }; @@ -793,9 +786,8 @@ fn latest_sink_handoff_skew_ms(state: &UpstreamClockState) -> Option { ) else { return None; }; - let local_pts_delta_ms = - (camera.local_pts_us as i128 - microphone.local_pts_us as i128).abs() as f64 / 1000.0; - if local_pts_delta_ms > 250.0 { + let due_at_delta_ms = instant_delta_us(camera.due_at, microphone.due_at).abs() as f64 / 1000.0; + if due_at_delta_ms > 250.0 { return None; } Some(instant_delta_us(camera.handed_at, microphone.handed_at) as f64 / 1000.0) diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs index 99fbce6..d70a43d 100644 --- a/server/src/upstream_media_runtime/state.rs +++ b/server/src/upstream_media_runtime/state.rs @@ -13,7 +13,6 @@ pub(super) struct UpstreamTimingSample { #[derive(Clone, Copy, Debug)] pub(super) struct UpstreamPresentationSample { - pub local_pts_us: u64, pub due_at: Instant, pub handed_at: Instant, } diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/server/src/upstream_media_runtime/tests/planning.rs index 4b41997..2725687 100644 --- a/server/src/upstream_media_runtime/tests/planning.rs +++ b/server/src/upstream_media_runtime/tests/planning.rs @@ -513,6 +513,29 @@ fn planner_snapshot_tracks_sink_handoff_timing_windows() { ); } +#[test] +#[serial(upstream_media_runtime)] +fn sink_handoff_window_pairs_by_due_time_not_offset_local_pts() { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + let due_at = tokio::time::Instant::now() + .checked_sub(Duration::from_millis(5)) + .unwrap_or_else(tokio::time::Instant::now); + + runtime.mark_audio_presented(1_200_000, due_at); + std::thread::sleep(Duration::from_millis(1)); + runtime.mark_video_presented(100_000, due_at); + + let snapshot = runtime.snapshot(); + + assert_eq!(snapshot.sink_handoff_window_samples, 1); + assert!( + snapshot.sink_handoff_skew_ms.is_some(), + "offset-compensated streams should still produce handoff evidence when their due times match" + ); +} + #[test] #[serial(upstream_media_runtime)] fn planner_snapshot_tracks_client_timing_sidecar_metrics() {