fix: normalize blind sync timing signals

This commit is contained in:
Brad Stein 2026-05-02 18:14:04 -03:00
parent 826bada865
commit d2f312b14d
9 changed files with 109 additions and 23 deletions

View File

@ -539,3 +539,15 @@ remains the truth judge and root-cause localizer, not the production dependency.
- [x] Emit `root-cause-summary.json` from mirrored probe runs to classify failing layers instead of eyeballing raw metrics.
- [x] Add unit tests for apply/refuse/target behavior in the blind healer.
- [ ] Next run should identify the failing layer if confirmation still fails: client capture/uplink, network/server receive, server planner, server sink handoff, or external USB/browser/probe boundary.
## 0.17.28 Blind Timing Normalization Checklist
Context: the first preferred confirmation pass showed the probe-calibrate-confirm
loop can work, but also revealed two blind-healing blockers: sink handoff samples
stayed empty, and client timing skew included a false cross-pipeline PTS offset.
- [x] Pair server sink handoff samples by planned due time, not raw local PTS, so offset-compensated streams still produce handoff evidence.
- [x] Normalize client sidecar capture/send windows onto the shared capture clock using queue delivery age instead of raw per-pipeline packet PTS.
- [x] Add tests proving sink handoff survives large offset-compensated local PTS gaps.
- [x] Add tests proving audio/video timing metadata no longer copies packet PTS domains into blind sidecar fields.
- [ ] Next mirrored run should show non-zero `planner_sink_handoff_window_samples` and much smaller client send/capture p95 skew before trusting blind healing.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.17.27"
version = "0.17.28"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.17.27"
version = "0.17.28"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.17.27"
version = "0.17.28"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.17.27"
version = "0.17.28"
edition = "2024"
[dependencies]

View File

@ -500,6 +500,13 @@ fn duration_ms_u32(duration: Duration) -> u32 {
duration.as_millis().min(u128::from(u32::MAX)) as u32
}
#[cfg(not(coverage))]
fn shared_capture_window_from_delivery_age(delivery_age: Duration) -> (u64, u64) {
let send_pts_us = crate::live_capture_clock::capture_pts_us();
let age_us = delivery_age.as_micros().min(u128::from(u64::MAX)) as u64;
(send_pts_us.saturating_sub(age_us), send_pts_us)
}
#[cfg(not(coverage))]
fn attach_audio_timing_metadata(
packet: &mut AudioPacket,
@ -508,8 +515,9 @@ fn attach_audio_timing_metadata(
) {
static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
packet.client_capture_pts_us = packet.pts;
packet.client_send_pts_us = crate::live_capture_clock::capture_pts_us();
let (capture_pts_us, send_pts_us) = shared_capture_window_from_delivery_age(delivery_age);
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = send_pts_us;
packet.client_queue_depth = queue_depth_u32(queue_depth);
packet.client_queue_age_ms = duration_ms_u32(delivery_age);
}
@ -522,8 +530,9 @@ fn attach_video_timing_metadata(
) {
static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
packet.client_capture_pts_us = packet.pts;
packet.client_send_pts_us = crate::live_capture_clock::capture_pts_us();
let (capture_pts_us, send_pts_us) = shared_capture_window_from_delivery_age(delivery_age);
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = send_pts_us;
packet.client_queue_depth = queue_depth_u32(queue_depth);
packet.client_queue_age_ms = duration_ms_u32(delivery_age);
}
@ -616,3 +625,54 @@ fn log_uplink_drop(
limiter.record(reason, count, queue_depth, age_ms);
}
}
#[cfg(test)]
mod uplink_timing_tests {
use super::*;
#[test]
fn audio_timing_metadata_uses_shared_clock_window_instead_of_packet_pts_domain() {
std::thread::sleep(Duration::from_millis(5));
let mut packet = AudioPacket {
pts: 9_999_999,
..AudioPacket::default()
};
attach_audio_timing_metadata(&mut packet, 3, Duration::from_millis(2));
assert!(packet.seq > 0);
assert_eq!(packet.client_queue_depth, 3);
assert_eq!(packet.client_queue_age_ms, 2);
assert!(
packet.client_send_pts_us >= packet.client_capture_pts_us,
"send must be on or after the shared-clock capture estimate"
);
assert!(
packet.client_send_pts_us - packet.client_capture_pts_us <= 2_000,
"delivery age, not packet PTS domain, should define the timing window"
);
}
#[test]
fn video_timing_metadata_uses_shared_clock_window_instead_of_packet_pts_domain() {
std::thread::sleep(Duration::from_millis(5));
let mut packet = VideoPacket {
pts: 9_999_999,
..VideoPacket::default()
};
attach_video_timing_metadata(&mut packet, 4, Duration::from_millis(3));
assert!(packet.seq > 0);
assert_eq!(packet.client_queue_depth, 4);
assert_eq!(packet.client_queue_age_ms, 3);
assert!(
packet.client_send_pts_us >= packet.client_capture_pts_us,
"send must be on or after the shared-clock capture estimate"
);
assert!(
packet.client_send_pts_us - packet.client_capture_pts_us <= 3_000,
"delivery age, not packet PTS domain, should define the timing window"
);
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.17.27"
version = "0.17.28"
edition = "2024"
build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.17.27"
version = "0.17.28"
edition = "2024"
autobins = false

View File

@ -115,12 +115,7 @@ impl UpstreamMediaRuntime {
.state
.lock()
.expect("upstream media state mutex poisoned");
record_presentation_sample(
&mut state,
UpstreamMediaKind::Microphone,
local_pts_us,
due_at,
);
record_presentation_sample(&mut state, UpstreamMediaKind::Microphone, due_at);
state.last_audio_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live;
@ -154,7 +149,7 @@ impl UpstreamMediaRuntime {
.state
.lock()
.expect("upstream media state mutex poisoned");
record_presentation_sample(&mut state, UpstreamMediaKind::Camera, local_pts_us, due_at);
record_presentation_sample(&mut state, UpstreamMediaKind::Camera, due_at);
state.last_video_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live;
@ -762,11 +757,9 @@ fn record_client_timing_windows(state: &mut UpstreamClockState) {
fn record_presentation_sample(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
local_pts_us: u64,
due_at: Instant,
) {
let sample = state::UpstreamPresentationSample {
local_pts_us,
due_at,
handed_at: Instant::now(),
};
@ -793,9 +786,8 @@ fn latest_sink_handoff_skew_ms(state: &UpstreamClockState) -> Option<f64> {
) else {
return None;
};
let local_pts_delta_ms =
(camera.local_pts_us as i128 - microphone.local_pts_us as i128).abs() as f64 / 1000.0;
if local_pts_delta_ms > 250.0 {
let due_at_delta_ms = instant_delta_us(camera.due_at, microphone.due_at).abs() as f64 / 1000.0;
if due_at_delta_ms > 250.0 {
return None;
}
Some(instant_delta_us(camera.handed_at, microphone.handed_at) as f64 / 1000.0)

View File

@ -13,7 +13,6 @@ pub(super) struct UpstreamTimingSample {
#[derive(Clone, Copy, Debug)]
pub(super) struct UpstreamPresentationSample {
pub local_pts_us: u64,
pub due_at: Instant,
pub handed_at: Instant,
}

View File

@ -513,6 +513,29 @@ fn planner_snapshot_tracks_sink_handoff_timing_windows() {
);
}
#[test]
#[serial(upstream_media_runtime)]
fn sink_handoff_window_pairs_by_due_time_not_offset_local_pts() {
let runtime = runtime_without_offsets();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let due_at = tokio::time::Instant::now()
.checked_sub(Duration::from_millis(5))
.unwrap_or_else(tokio::time::Instant::now);
runtime.mark_audio_presented(1_200_000, due_at);
std::thread::sleep(Duration::from_millis(1));
runtime.mark_video_presented(100_000, due_at);
let snapshot = runtime.snapshot();
assert_eq!(snapshot.sink_handoff_window_samples, 1);
assert!(
snapshot.sink_handoff_skew_ms.is_some(),
"offset-compensated streams should still produce handoff evidence when their due times match"
);
}
#[test]
#[serial(upstream_media_runtime)]
fn planner_snapshot_tracks_client_timing_sidecar_metrics() {