From c02241cd9a9e9c570e49f80bd2c719b2bd85cc94 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 3 May 2026 14:45:16 -0300 Subject: [PATCH] feat: correlate output delay drift --- AGENTS.md | 2 + Cargo.lock | 8 +- client/Cargo.toml | 2 +- client/src/bin/lesavka-relayctl.rs | 3 + common/Cargo.toml | 2 +- common/proto/lesavka.proto | 3 + scripts/manual/run_upstream_av_sync.sh | 254 +++++++++++++++++- server/Cargo.toml | 4 +- server/src/main/relay_service.rs | 10 +- server/src/main/relay_service_coverage.rs | 10 +- server/src/output_delay_probe.rs | 160 ++++++++++- .../client_manual_sync_script_contract.rs | 13 + 12 files changed, 453 insertions(+), 18 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 410ad12..80f20ea 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -142,6 +142,8 @@ Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind - [x] `report.json` - [x] `report.txt` - [x] per-event rows with event id, video time, audio time, skew, and confidence + - [x] server-output timeline and correlation artifacts that separate Theia + feed timing from Tethys-observed UVC/UAC skew - [x] pass/fail verdict using preferred/acceptable/catastrophic thresholds - [x] Add a deterministic server-output sync beacon source: - [x] video flash pattern with event identity or cadence diff --git a/Cargo.lock b/Cargo.lock index e659661..53b857a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.19.2" +version = "0.19.3" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.19.2" +version = "0.19.3" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.19.2" +version = "0.19.3" dependencies = [ "anyhow", "base64", @@ -1712,6 +1712,8 @@ dependencies = [ "libc", "prost-build", "prost-types", + "serde", + "serde_json", "serial_test", "temp-env", "tempfile", diff --git a/client/Cargo.toml b/client/Cargo.toml index c13c2e2..6195c96 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.19.2" +version = "0.19.3" edition = "2024" [dependencies] diff --git a/client/src/bin/lesavka-relayctl.rs b/client/src/bin/lesavka-relayctl.rs index 591687e..f0ed0ac 100644 --- a/client/src/bin/lesavka-relayctl.rs +++ b/client/src/bin/lesavka-relayctl.rs @@ -612,6 +612,9 @@ async fn main() -> Result<()> { { println!("ok={}", reply.ok); println!("detail={}", reply.detail); + if !reply.server_timeline_json.trim().is_empty() { + println!("server_timeline_json={}", reply.server_timeline_json); + } } return Ok(()); } diff --git a/common/Cargo.toml b/common/Cargo.toml index ca6dd6f..c15c801 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.19.2" +version = "0.19.3" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index f512f33..d06e33c 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -76,6 +76,9 @@ message OutputDelayProbeRequest { message OutputDelayProbeReply { bool ok = 1; string detail = 2; + // Compact JSON timeline of when the server fed each coded signature into + // UVC/UAC. The workstation script joins this with Tethys-observed events. + string server_timeline_json = 3; } message ResetUsbReply { bool ok = 1; } // true = success diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index 084d7b2..dce06e0 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -69,6 +69,11 @@ LOCAL_CAPTURE="${LOCAL_REPORT_DIR}/capture.mkv" LOCAL_ANALYSIS_JSON="${LOCAL_REPORT_DIR}/report.json" LOCAL_REPORT_TXT="${LOCAL_REPORT_DIR}/report.txt" LOCAL_EVENTS_CSV="${LOCAL_REPORT_DIR}/events.csv" +LOCAL_SERVER_PROBE_REPLY="${LOCAL_REPORT_DIR}/server-output-probe-reply.txt" +LOCAL_SERVER_TIMELINE_JSON="${LOCAL_REPORT_DIR}/server-output-timeline.json" +LOCAL_OUTPUT_DELAY_CORRELATION_JSON="${LOCAL_REPORT_DIR}/output-delay-correlation.json" +LOCAL_OUTPUT_DELAY_CORRELATION_CSV="${LOCAL_REPORT_DIR}/output-delay-correlation.csv" +LOCAL_OUTPUT_DELAY_CORRELATION_TXT="${LOCAL_REPORT_DIR}/output-delay-correlation.txt" LOCAL_OUTPUT_DELAY_JSON="${LOCAL_REPORT_DIR}/output-delay-calibration.json" LOCAL_OUTPUT_DELAY_ENV="${LOCAL_REPORT_DIR}/output-delay-calibration.env" LOCAL_CAPTURE_LOG="${LOCAL_REPORT_DIR}/capture.log" @@ -258,6 +263,7 @@ write_output_delay_calibration() { "${LOCAL_ANALYSIS_JSON}" \ "${LOCAL_OUTPUT_DELAY_JSON}" \ "${LOCAL_OUTPUT_DELAY_ENV}" \ + "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \ "${LESAVKA_OUTPUT_DELAY_TARGET}" \ "${LESAVKA_OUTPUT_DELAY_MIN_PAIRS}" \ "${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS}" \ @@ -276,6 +282,7 @@ import sys report_path, output_json_path, output_env_path, + correlation_path, target, min_pairs_raw, max_abs_skew_raw, @@ -372,6 +379,7 @@ artifact = { "probe_media_origin": "server-generated", "probe_media_path": "server generated signatures -> UVC/UAC sinks -> lab host capture", "report_json": report_path, + "correlation_json": correlation_path if pathlib.Path(correlation_path).exists() else "", "audio_after_video_positive": True, "target": target, "ready": ready, @@ -414,6 +422,233 @@ with pathlib.Path(output_env_path).open("w") as handle: PY } +extract_server_timeline() { + [[ -f "${LOCAL_SERVER_PROBE_REPLY}" ]] || return 0 + + python3 - <<'PY' "${LOCAL_SERVER_PROBE_REPLY}" "${LOCAL_SERVER_TIMELINE_JSON}" +import json +import pathlib +import sys + +reply_path = pathlib.Path(sys.argv[1]) +timeline_path = pathlib.Path(sys.argv[2]) +prefix = "server_timeline_json=" +raw = "" +for line in reply_path.read_text(errors="replace").splitlines(): + if line.startswith(prefix): + raw = line[len(prefix):].strip() +if not raw: + raise SystemExit(0) +timeline = json.loads(raw) +timeline_path.write_text(json.dumps(timeline, indent=2, sort_keys=True) + "\n") +PY +} + +write_output_delay_correlation() { + [[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0 + [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]] || return 0 + + echo "==> correlating Theia feed timing with Tethys observations" + python3 - <<'PY' \ + "${LOCAL_ANALYSIS_JSON}" \ + "${LOCAL_SERVER_TIMELINE_JSON}" \ + "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \ + "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" \ + "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" +import csv +import json +import math +import pathlib +import sys + +report_path, timeline_path, output_json_path, output_csv_path, output_txt_path = sys.argv[1:] +report = json.loads(pathlib.Path(report_path).read_text()) +timeline = json.loads(pathlib.Path(timeline_path).read_text()) +server_events = {int(event["event_id"]): event for event in timeline.get("events", [])} + + +def finite(value): + try: + result = float(value) + except Exception: + return None + return result if math.isfinite(result) else None + + +def fit_linear(rows, key): + points = [(row["event_time_s"], row[key]) for row in rows if row.get(key) is not None] + if len(points) < 2: + return { + "available": False, + "intercept_ms": 0.0, + "slope_ms_per_s": 0.0, + "r2": 0.0, + "drift_ms": 0.0, + } + xs = [point[0] for point in points] + ys = [point[1] for point in points] + mean_x = sum(xs) / len(xs) + mean_y = sum(ys) / len(ys) + denom = sum((x - mean_x) ** 2 for x in xs) + slope = 0.0 if denom == 0 else sum((x - mean_x) * (y - mean_y) for x, y in points) / denom + intercept = mean_y - slope * mean_x + predicted = [intercept + slope * x for x in xs] + ss_tot = sum((y - mean_y) ** 2 for y in ys) + ss_res = sum((y - y_hat) ** 2 for y, y_hat in zip(ys, predicted)) + r2 = 1.0 if ss_tot == 0.0 else max(0.0, min(1.0, 1.0 - (ss_res / ss_tot))) + drift = (intercept + slope * xs[-1]) - (intercept + slope * xs[0]) + return { + "available": True, + "intercept_ms": intercept, + "slope_ms_per_s": slope, + "r2": r2, + "drift_ms": drift, + "first_fit_ms": intercept + slope * xs[0], + "last_fit_ms": intercept + slope * xs[-1], + } + + +def correlation(rows, left_key, right_key): + pairs = [ + (row[left_key], row[right_key]) + for row in rows + if row.get(left_key) is not None and row.get(right_key) is not None + ] + if len(pairs) < 2: + return 0.0 + xs = [pair[0] for pair in pairs] + ys = [pair[1] for pair in pairs] + mean_x = sum(xs) / len(xs) + mean_y = sum(ys) / len(ys) + denom_x = sum((x - mean_x) ** 2 for x in xs) + denom_y = sum((y - mean_y) ** 2 for y in ys) + if denom_x <= 0.0 or denom_y <= 0.0: + return 0.0 + return sum((x - mean_x) * (y - mean_y) for x, y in pairs) / math.sqrt(denom_x * denom_y) + + +joined = [] +for observed in report.get("paired_events", []): + event_id = int(observed.get("event_id", -1)) + server = server_events.get(event_id) + if not server: + continue + observed_skew_ms = finite(observed.get("skew_ms")) + server_feed_delta_ms = finite(server.get("server_feed_delta_ms")) + residual_path_skew_ms = ( + observed_skew_ms - server_feed_delta_ms + if observed_skew_ms is not None and server_feed_delta_ms is not None + else None + ) + planned_start_us = int(server.get("planned_start_us", 0)) + joined.append({ + "event_id": event_id, + "code": int(server.get("code", 0)), + "event_time_s": planned_start_us / 1_000_000.0, + "planned_start_us": planned_start_us, + "planned_end_us": int(server.get("planned_end_us", 0)), + "tethys_video_time_s": finite(observed.get("video_time_s")), + "tethys_audio_time_s": finite(observed.get("audio_time_s")), + "observed_skew_ms": observed_skew_ms, + "server_video_feed_monotonic_us": server.get("video_feed_monotonic_us"), + "server_audio_push_monotonic_us": server.get("audio_push_monotonic_us"), + "server_feed_delta_ms": server_feed_delta_ms, + "residual_path_skew_ms": residual_path_skew_ms, + "confidence": finite(observed.get("confidence")), + }) + +first_event_time_s = joined[0]["event_time_s"] if joined else 0.0 +for row in joined: + row["relative_event_time_s"] = row["event_time_s"] - first_event_time_s + row["event_time_s"] = row["relative_event_time_s"] + +observed_model = fit_linear(joined, "observed_skew_ms") +server_model = fit_linear(joined, "server_feed_delta_ms") +residual_model = fit_linear(joined, "residual_path_skew_ms") +server_observed_correlation = correlation(joined, "server_feed_delta_ms", "observed_skew_ms") + +observed_drift = observed_model.get("drift_ms", 0.0) +server_drift = server_model.get("drift_ms", 0.0) +residual_drift = residual_model.get("drift_ms", 0.0) +same_direction = observed_drift == 0.0 or (observed_drift > 0) == (server_drift > 0) +server_share = 0.0 if abs(observed_drift) < 1e-6 else abs(server_drift) / abs(observed_drift) +if same_direction and server_share >= 0.5 and abs(server_drift) >= 20.0: + dominant_layer = "server_feed_timing" +else: + dominant_layer = "post_server_output_or_tethys_capture" + +correction_mode = ( + "linear_function_candidate" + if abs(residual_drift) >= 20.0 + else "scalar_candidate" +) + +artifact = { + "schema": "lesavka.output-delay-correlation.v1", + "report_json": report_path, + "server_timeline_json": timeline_path, + "joined_event_count": len(joined), + "audio_after_video_positive": True, + "observed_skew_model": observed_model, + "server_feed_delta_model": server_model, + "residual_path_skew_model": residual_model, + "server_observed_correlation": server_observed_correlation, + "server_drift_share_of_observed": server_share, + "dominant_layer": dominant_layer, + "correction_mode": correction_mode, + "video_delay_function_candidate": { + "units": "microseconds", + "end_to_end": { + "intercept_us": round(observed_model.get("intercept_ms", 0.0) * 1000.0), + "slope_us_per_s": round(observed_model.get("slope_ms_per_s", 0.0) * 1000.0), + "formula": "video_delay_us(t) = intercept_us + slope_us_per_s * seconds_since_first_event", + }, + "output_path_only": { + "intercept_us": round(residual_model.get("intercept_ms", 0.0) * 1000.0), + "slope_us_per_s": round(residual_model.get("slope_ms_per_s", 0.0) * 1000.0), + "formula": "video_delay_us(t) = intercept_us + slope_us_per_s * seconds_since_first_event", + }, + }, + "events": joined, +} + +pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") +with pathlib.Path(output_csv_path).open("w", newline="", encoding="utf-8") as handle: + fieldnames = [ + "event_id", + "code", + "event_time_s", + "tethys_video_time_s", + "tethys_audio_time_s", + "observed_skew_ms", + "server_feed_delta_ms", + "residual_path_skew_ms", + "server_video_feed_monotonic_us", + "server_audio_push_monotonic_us", + "confidence", + ] + writer = csv.DictWriter(handle, fieldnames=fieldnames) + writer.writeheader() + for row in joined: + writer.writerow({key: row.get(key) for key in fieldnames}) + +lines = [ + f"Output-delay correlation for {report_path}", + f"- joined events: {len(joined)}", + f"- dominant layer: {dominant_layer}", + f"- correction mode: {correction_mode}", + f"- observed skew model: {observed_model.get('intercept_ms', 0.0):+.3f} ms + {observed_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", + f"- server feed model: {server_model.get('intercept_ms', 0.0):+.3f} ms + {server_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", + f"- residual path model: {residual_model.get('intercept_ms', 0.0):+.3f} ms + {residual_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", + f"- server/observed correlation: {server_observed_correlation:+.3f}", + f"- server drift share of observed: {server_share:.3f}", +] +summary = "\n".join(lines) + "\n" +pathlib.Path(output_txt_path).write_text(summary) +print(summary, end="") +PY +} + maybe_apply_output_delay_calibration() { [[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0 [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]] || return 0 @@ -1032,6 +1267,7 @@ sleep "${LEAD_IN_SECONDS}" echo "==> running server-generated UVC/UAC output-delay probe against ${RESOLVED_LESAVKA_SERVER_ADDR}" probe_status=0 probe_timed_out=0 +set +e ( cd "${REPO_ROOT}" LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ @@ -1044,10 +1280,13 @@ probe_timed_out=0 "${PROBE_PULSE_PERIOD_MS}" \ "${PROBE_PULSE_WIDTH_MS}" \ "${PROBE_EVENT_WIDTH_CODES}" -) || probe_status=$? +) 2>&1 | tee "${LOCAL_SERVER_PROBE_REPLY}" +probe_status=${PIPESTATUS[0]} +set -e if [[ "${probe_status}" -eq 124 ]]; then probe_timed_out=1 fi +extract_server_timeline capture_status=0 wait "${capture_pid}" || capture_status=$? @@ -1197,6 +1436,7 @@ else ) fi +write_output_delay_correlation write_output_delay_calibration maybe_apply_output_delay_calibration @@ -1218,6 +1458,18 @@ fi if [[ -f "${LOCAL_EVENTS_CSV}" ]]; then echo "events_csv: ${LOCAL_EVENTS_CSV}" fi +if [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]]; then + echo "server_timeline_json: ${LOCAL_SERVER_TIMELINE_JSON}" +fi +if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" ]]; then + echo "output_delay_correlation_json: ${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" +fi +if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" ]]; then + echo "output_delay_correlation_csv: ${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" +fi +if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" ]]; then + echo "output_delay_correlation_txt: ${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" +fi if [[ -f "${LOCAL_OUTPUT_DELAY_JSON}" ]]; then echo "output_delay_calibration_json: ${LOCAL_OUTPUT_DELAY_JSON}" fi diff --git a/server/Cargo.toml b/server/Cargo.toml index 0d14322..1e16efe 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.19.2" +version = "0.19.3" edition = "2024" autobins = false @@ -34,6 +34,8 @@ prost-types = "0.13" chrono = { version = "0.4", default-features = false, features = ["std", "clock", "serde"] } chacha20poly1305 = "0.10" base64 = "0.22" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" [build-dependencies] prost-build = "0.13" diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 1f92dd3..32dc3ca 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -650,9 +650,13 @@ impl Relay for Handler { detail, "🧪 server output-delay probe closed" ); - tx.send(Ok(OutputDelayProbeReply { ok: true, detail })) - .await - .ok(); + tx.send(Ok(OutputDelayProbeReply { + ok: true, + detail, + server_timeline_json: summary.timeline_json, + })) + .await + .ok(); } Err(err) => { warn!( diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index b857955..d33543b 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -332,9 +332,13 @@ impl Relay for Handler { "server-generated UVC/UAC output-delay probe complete: video_frames={} audio_packets={} events={}", summary.video_frames, summary.audio_packets, summary.event_count ); - tx.send(Ok(OutputDelayProbeReply { ok: true, detail })) - .await - .ok(); + tx.send(Ok(OutputDelayProbeReply { + ok: true, + detail, + server_timeline_json: summary.timeline_json, + })) + .await + .ok(); Ok(Response::new(ReceiverStream::new(rx))) } diff --git a/server/src/output_delay_probe.rs b/server/src/output_delay_probe.rs index 250d288..2bfff13 100644 --- a/server/src/output_delay_probe.rs +++ b/server/src/output_delay_probe.rs @@ -3,6 +3,7 @@ use gstreamer as gst; use gstreamer::prelude::*; use gstreamer_app as gst_app; use lesavka_common::lesavka::{AudioPacket, OutputDelayProbeRequest, VideoPacket}; +use serde::Serialize; use std::f64::consts::TAU; use std::sync::Arc; use std::time::Duration; @@ -68,6 +69,117 @@ pub struct OutputDelayProbeSummary { pub video_frames: u64, pub audio_packets: u64, pub event_count: u64, + pub timeline_json: String, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +struct ProbeEventSlot { + event_id: usize, + code: u32, + planned_start_us: u64, + planned_end_us: u64, +} + +#[derive(Clone, Debug, Serialize)] +struct OutputDelayProbeTimeline { + schema: &'static str, + origin: &'static str, + media_path: &'static str, + camera_width: u32, + camera_height: u32, + camera_fps: u32, + audio_sample_rate: u32, + audio_channels: usize, + audio_chunk_ms: u64, + pulse_period_ms: u64, + pulse_width_ms: u64, + warmup_us: u64, + duration_us: u64, + events: Vec, +} + +#[derive(Clone, Debug, Serialize)] +struct OutputDelayProbeEventTimeline { + event_id: usize, + code: u32, + planned_start_us: u64, + planned_end_us: u64, + video_seq: Option, + audio_seq: Option, + video_feed_monotonic_us: Option, + audio_push_monotonic_us: Option, + server_feed_delta_ms: Option, +} + +impl OutputDelayProbeTimeline { + fn new(config: &ProbeConfig, camera: &CameraConfig) -> Self { + let event_count = config.event_count(); + let events = (0..event_count) + .map(|event_id| { + let slot = config.event_slot_by_id(event_id as usize); + OutputDelayProbeEventTimeline { + event_id: event_id as usize, + code: slot.code, + planned_start_us: slot.planned_start_us, + planned_end_us: slot.planned_end_us, + video_seq: None, + audio_seq: None, + video_feed_monotonic_us: None, + audio_push_monotonic_us: None, + server_feed_delta_ms: None, + } + }) + .collect(); + Self { + schema: "lesavka.output-delay-server-timeline.v1", + origin: "theia-server-generated", + media_path: "server generator -> UVC/UAC sinks", + camera_width: camera.width, + camera_height: camera.height, + camera_fps: camera.fps, + audio_sample_rate: AUDIO_SAMPLE_RATE, + audio_channels: AUDIO_CHANNELS, + audio_chunk_ms: AUDIO_CHUNK_MS, + pulse_period_ms: config.pulse_period.as_millis() as u64, + pulse_width_ms: config.pulse_width.as_millis() as u64, + warmup_us: duration_us(config.warmup), + duration_us: duration_us(config.duration), + events, + } + } + + fn mark_audio(&mut self, slot: ProbeEventSlot, seq: u64, monotonic_us: u64) { + let Some(event) = self.events.get_mut(slot.event_id) else { + return; + }; + if event.audio_push_monotonic_us.is_none() { + event.audio_seq = Some(seq); + event.audio_push_monotonic_us = Some(monotonic_us); + event.update_delta(); + } + } + + fn mark_video(&mut self, slot: ProbeEventSlot, seq: u64, monotonic_us: u64) { + let Some(event) = self.events.get_mut(slot.event_id) else { + return; + }; + if event.video_feed_monotonic_us.is_none() { + event.video_seq = Some(seq); + event.video_feed_monotonic_us = Some(monotonic_us); + event.update_delta(); + } + } +} + +impl OutputDelayProbeEventTimeline { + fn update_delta(&mut self) { + let (Some(audio_us), Some(video_us)) = + (self.audio_push_monotonic_us, self.video_feed_monotonic_us) + else { + return; + }; + self.server_feed_delta_ms = Some((audio_us as f64 - video_us as f64) / 1000.0); + } } impl ProbeConfig { @@ -107,6 +219,10 @@ impl ProbeConfig { } fn event_code_at(&self, pts: Duration) -> Option { + self.event_slot_at(pts).map(|slot| slot.code) + } + + fn event_slot_at(&self, pts: Duration) -> Option { if pts < self.warmup { return None; } @@ -116,7 +232,22 @@ impl ProbeConfig { let pulse_offset_ns = since_warmup.as_nanos() % period_ns; let code = self.event_width_codes[pulse_index % self.event_width_codes.len()]; let active_ns = self.pulse_width.as_nanos().saturating_mul(u128::from(code)); - (pulse_offset_ns < active_ns).then_some(code) + (pulse_offset_ns < active_ns).then(|| self.event_slot_by_id(pulse_index)) + } + + fn event_slot_by_id(&self, event_id: usize) -> ProbeEventSlot { + let code = self.event_width_codes[event_id % self.event_width_codes.len()]; + let planned_start = self + .warmup + .saturating_add(duration_mul(self.pulse_period, event_id as u64)); + let planned_end = + planned_start.saturating_add(duration_mul(self.pulse_width, u64::from(code))); + ProbeEventSlot { + event_id, + code, + planned_start_us: duration_us(planned_start), + planned_end_us: duration_us(planned_end), + } } fn event_count(&self) -> u64 { @@ -190,6 +321,7 @@ pub async fn run_server_output_delay_probe( let samples_per_chunk = ((u64::from(AUDIO_SAMPLE_RATE) * AUDIO_CHUNK_MS) / 1_000) as usize; let frames = EncodedProbeFrames::new(camera)?; let start = tokio::time::Instant::now(); + let mut timeline = OutputDelayProbeTimeline::new(&config, camera); let mut frame_index = 0u64; let mut audio_index = 0u64; let mut video_frames = 0u64; @@ -206,29 +338,36 @@ pub async fn run_server_output_delay_probe( if next_audio_pts <= next_frame_pts && next_audio_pts <= config.duration { let pts_us = duration_us(next_audio_pts); + let event_slot = config.event_slot_at(next_audio_pts); let data = render_audio_chunk(&config, next_audio_pts, samples_per_chunk); + let seq = audio_index.saturating_add(1); sink.push(&AudioPacket { id: 0, pts: pts_us, data, - seq: audio_index.saturating_add(1), + seq, client_capture_pts_us: pts_us, client_send_pts_us: pts_us, client_queue_depth: 0, client_queue_age_ms: 0, }); + if let Some(slot) = event_slot { + timeline.mark_audio(slot, seq, monotonic_us_since(start)); + } audio_packets = audio_packets.saturating_add(1); audio_index = audio_index.saturating_add(1); } if next_frame_pts <= next_audio_pts && next_frame_pts <= config.duration { let pts_us = duration_us(next_frame_pts); - let code = config.event_code_at(next_frame_pts); + let event_slot = config.event_slot_at(next_frame_pts); + let code = event_slot.map(|slot| slot.code); + let seq = frame_index.saturating_add(1); relay.feed(VideoPacket { id: 0, pts: pts_us, data: frames.packet_for_code(code)?.to_vec(), - seq: frame_index.saturating_add(1), + seq, effective_fps: camera.fps, client_capture_pts_us: pts_us, client_send_pts_us: pts_us, @@ -236,6 +375,9 @@ pub async fn run_server_output_delay_probe( client_queue_age_ms: 0, ..Default::default() }); + if let Some(slot) = event_slot { + timeline.mark_video(slot, seq, monotonic_us_since(start)); + } video_frames = video_frames.saturating_add(1); frame_index = frame_index.saturating_add(1); } @@ -246,6 +388,8 @@ pub async fn run_server_output_delay_probe( video_frames, audio_packets, event_count: config.event_count(), + timeline_json: serde_json::to_string(&timeline) + .context("serializing output-delay server timeline")?, }) } @@ -253,7 +397,7 @@ pub async fn run_server_output_delay_probe( pub async fn run_server_output_delay_probe( _relay: Arc, _sink: &mut Voice, - _camera: &CameraConfig, + camera: &CameraConfig, request: &OutputDelayProbeRequest, ) -> Result { let config = ProbeConfig::from_request(request)?; @@ -261,6 +405,8 @@ pub async fn run_server_output_delay_probe( video_frames: 1, audio_packets: 1, event_count: config.event_count(), + timeline_json: serde_json::to_string(&OutputDelayProbeTimeline::new(&config, camera)) + .unwrap_or_else(|_| "{}".to_string()), }) } @@ -464,6 +610,10 @@ fn duration_us(duration: Duration) -> u64 { duration.as_micros().min(u128::from(u64::MAX)) as u64 } +fn monotonic_us_since(start: tokio::time::Instant) -> u64 { + duration_us(tokio::time::Instant::now().saturating_duration_since(start)) +} + fn duration_mul(duration: Duration, count: u64) -> Duration { let nanos = duration .as_nanos() diff --git a/testing/tests/client_manual_sync_script_contract.rs b/testing/tests/client_manual_sync_script_contract.rs index 7be3526..1e668b2 100644 --- a/testing/tests/client_manual_sync_script_contract.rs +++ b/testing/tests/client_manual_sync_script_contract.rs @@ -32,6 +32,11 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "LOCAL_REPORT_DIR=\"${LOCAL_OUTPUT_DIR%/}/lesavka-output-delay-probe-${STAMP}\"", "LOCAL_ANALYSIS_JSON=\"${LOCAL_REPORT_DIR}/report.json\"", "LOCAL_EVENTS_CSV=\"${LOCAL_REPORT_DIR}/events.csv\"", + "LOCAL_SERVER_PROBE_REPLY=\"${LOCAL_REPORT_DIR}/server-output-probe-reply.txt\"", + "LOCAL_SERVER_TIMELINE_JSON=\"${LOCAL_REPORT_DIR}/server-output-timeline.json\"", + "LOCAL_OUTPUT_DELAY_CORRELATION_JSON=\"${LOCAL_REPORT_DIR}/output-delay-correlation.json\"", + "LOCAL_OUTPUT_DELAY_CORRELATION_CSV=\"${LOCAL_REPORT_DIR}/output-delay-correlation.csv\"", + "LOCAL_OUTPUT_DELAY_CORRELATION_TXT=\"${LOCAL_REPORT_DIR}/output-delay-correlation.txt\"", "LOCAL_OUTPUT_DELAY_JSON=\"${LOCAL_REPORT_DIR}/output-delay-calibration.json\"", "LOCAL_OUTPUT_DELAY_ENV=\"${LOCAL_REPORT_DIR}/output-delay-calibration.env\"", "LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1}", @@ -43,8 +48,14 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80}", "LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000}", "write_output_delay_calibration", + "extract_server_timeline", + "write_output_delay_correlation", "maybe_apply_output_delay_calibration", "schema\": \"lesavka.output-delay-calibration.v1\"", + "schema\": \"lesavka.output-delay-correlation.v1\"", + "server_timeline_json=", + "dominant_layer", + "video_delay_function_candidate", "source\": \"direct-uvc-uac-output-probe\"", "scope\": \"server-output-static-baseline\"", "applies_to\": \"server UVC/UAC gadget output path\"", @@ -75,6 +86,8 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "Lesavka audio source not found in PipeWire or ALSA; capture host does not currently expose the gadget microphone.", "artifact_dir: ${LOCAL_REPORT_DIR}", "events_csv: ${LOCAL_EVENTS_CSV}", + "server_timeline_json: ${LOCAL_SERVER_TIMELINE_JSON}", + "output_delay_correlation_json: ${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}", "==> Lesavka versions under test", "lesavka-relayctl", "--bin lesavka-relayctl",