From 08cf7d8c84794b9d393be3a226d2efe03382a967 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 3 May 2026 13:11:54 -0300 Subject: [PATCH] feat: measure upstream output device delay --- AGENTS.md | 12 +- Cargo.lock | 6 +- client/Cargo.toml | 2 +- client/src/sync_probe/runner.rs | 281 ++++++++++++++---- common/Cargo.toml | 2 +- docs/operational-env.md | 9 + scripts/manual/run_upstream_av_sync.sh | 230 ++++++++++++++ server/Cargo.toml | 2 +- .../client_manual_sync_script_contract.rs | 49 +++ 9 files changed, 526 insertions(+), 67 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 6ea7b6d..4e395f5 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -36,6 +36,12 @@ path. handoff delays. - [x] Optional common playout delay is only smoothness slack; it cannot clip or replace sync-critical UVC/UAC offsets. +- [x] Direct UVC/UAC hardware probes produce a first-class + `output-delay-calibration.json` artifact for the server-to-host device + delay that v2 consumes through the same active output-offset calibration. +- [x] Treat the lab-attached host as measuring equipment only; future remote + hosts are not expected to expose SSH, browser probes, or local capture + access for lip-sync calibration. ### Wire Protocol - [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or @@ -67,7 +73,7 @@ path. pipeline when selected camera, camera quality, or microphone changes. - [x] Make launcher diagnostics expose the active upstream mode as first-class text rather than inferring from separate camera/mic telemetry. -- [ ] Migrate sync-probe runner to the bundled path explicitly and remove any +- [x] Migrate sync-probe runner to the bundled path explicitly and remove any normal probe dependence on split `StreamCamera` + `StreamMicrophone`. ### Server Migration @@ -104,6 +110,10 @@ path. - [x] Focused handshake and launcher tests. - [x] Focused UVC profile test for stale configured profile vs live attached descriptor. - [ ] Focused server upstream-media tests including bundled stream acceptance. +- [x] Direct UVC/UAC probe can derive, gate, apply, and optionally save measured + output-delay calibration without using the fragile webcam-at-screen path. +- [x] Saved output-delay calibration is a static server-side baseline for the + UVC/UAC gadget path, not a dependency on probing every future attached host. - [ ] Install on both ends and verify diagnostics show bundled webcam media. - [ ] Manual Google Meet test: camera starts, video is not black/unsupported, audio is intelligible, and lip sync is inside the acceptable band. diff --git a/Cargo.lock b/Cargo.lock index 035ace6..d9c5a39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.19.0" +version = "0.19.1" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.19.0" +version = "0.19.1" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.19.0" +version = "0.19.1" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index cd2b353..4395256 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.19.0" +version = "0.19.1" edition = "2024" [dependencies] diff --git a/client/src/sync_probe/runner.rs b/client/src/sync_probe/runner.rs index ddc3073..ceee5b7 100644 --- a/client/src/sync_probe/runner.rs +++ b/client/src/sync_probe/runner.rs @@ -15,10 +15,23 @@ use std::io::Write; use std::path::PathBuf; #[cfg(not(coverage))] -use lesavka_common::lesavka::relay_client::RelayClient; +use lesavka_common::lesavka::{ + AudioPacket, UpstreamMediaBundle, VideoPacket, relay_client::RelayClient, +}; #[cfg(not(coverage))] use tonic::{Request, transport::Channel}; +#[cfg(not(coverage))] +const PROBE_BUNDLE_AUDIO_GRACE: std::time::Duration = std::time::Duration::from_millis(30); +#[cfg(not(coverage))] +const PROBE_BUNDLE_AUDIO_WINDOW_BEFORE_US: u64 = 120_000; +#[cfg(not(coverage))] +const PROBE_BUNDLE_AUDIO_WINDOW_AFTER_US: u64 = 40_000; +#[cfg(not(coverage))] +const PROBE_BUNDLE_MAX_AUDIO_PACKETS: usize = 16; +#[cfg(not(coverage))] +const PROBE_BUNDLE_SESSION_ID: u64 = 1; + pub async fn run_sync_probe_from_args(args: I) -> Result<()> where I: IntoIterator, @@ -39,6 +52,9 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> { if !caps.camera || !caps.microphone { bail!("server does not advertise both camera and microphone support"); } + if !caps.bundled_webcam_media { + bail!("server does not advertise bundled webcam media; refusing to measure split upstream"); + } let camera = app_support::camera_config_from_caps(&caps) .context("server handshake did not include a complete camera profile")?; @@ -59,93 +75,238 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> { "🧪 A/V sync probe starting" ); - let video_channel = connect(config.server.as_str()).await?; - let audio_channel = connect(config.server.as_str()).await?; + let bundled_channel = connect(config.server.as_str()).await?; let capture = SyncProbeCapture::new(camera, schedule, config.duration)?; let video_queue = capture.video_queue(); let audio_queue = capture.audio_queue(); - let video_task = tokio::spawn(async move { - let mut client = RelayClient::new(video_channel); - let outbound = async_stream::stream! { - loop { - let next = video_queue.pop_fresh().await; - if next.dropped_stale > 0 { - tracing::warn!( - dropped_stale = next.dropped_stale, - queue_depth = next.queue_depth, - "🧪 sync probe video queue dropped stale packets" - ); - } - if let Some(packet) = next.packet { - yield packet; - continue; - } - break; - } - }; - let mut response = client - .stream_camera(Request::new(outbound)) - .await - .context("starting sync probe camera stream")?; - while response.get_mut().message().await.transpose().is_some() {} - Ok::<(), anyhow::Error>(()) - }); - - let audio_task = tokio::spawn(async move { - let mut client = RelayClient::new(audio_channel); + let bundled_task = tokio::spawn(async move { + let mut client = RelayClient::new(bundled_channel); let mut audio_dump = open_debug_dump("LESAVKA_SYNC_PROBE_AUDIO_DUMP") .context("opening sync probe audio dump")?; - let mut sent_packets = 0u64; let outbound = async_stream::stream! { + let mut pending_audio = Vec::::new(); + let mut audio_done = false; + let mut video_done = false; + let mut bundle_seq = 0_u64; + let mut audio_seq = 0_u64; + let mut video_seq = 0_u64; + loop { - let next = audio_queue.pop_fresh().await; - if next.dropped_stale > 0 { - tracing::warn!( - dropped_stale = next.dropped_stale, - queue_depth = next.queue_depth, - "🧪 sync probe audio queue dropped stale packets" - ); + if video_done && audio_done { + break; } - if let Some(packet) = next.packet { - sent_packets = sent_packets.saturating_add(1); - if sent_packets <= 5 || sent_packets.is_multiple_of(500) { - tracing::info!( - packet = sent_packets, - pts = packet.pts, - bytes = packet.data.len(), - "🧪 sync probe microphone packet" - ); + + tokio::select! { + next = video_queue.pop_fresh(), if !video_done => { + if next.dropped_stale > 0 { + tracing::warn!( + dropped_stale = next.dropped_stale, + queue_depth = next.queue_depth, + "🧪 sync probe video queue dropped stale packets" + ); + } + if let Some(mut video) = next.packet { + stamp_probe_video_packet(&mut video, &mut video_seq, next.queue_depth, camera.fps); + retain_probe_audio_for_video(&mut pending_audio, packet_video_capture_pts_us(&video)); + collect_probe_audio_grace( + &audio_queue, + &mut pending_audio, + &mut audio_done, + &mut audio_seq, + audio_dump.as_mut(), + ).await; + retain_probe_audio_for_video(&mut pending_audio, packet_video_capture_pts_us(&video)); + if pending_audio.is_empty() { + tracing::warn!( + video_pts = packet_video_capture_pts_us(&video), + "🧪 sync probe skipped video-only bundle while measuring bundled output delay" + ); + continue; + } + bundle_seq = bundle_seq.saturating_add(1); + let audio = std::mem::take(&mut pending_audio); + let (capture_start_us, capture_end_us) = + probe_bundle_capture_bounds(Some(&video), &audio); + yield UpstreamMediaBundle { + session_id: PROBE_BUNDLE_SESSION_ID, + seq: bundle_seq, + capture_start_us, + capture_end_us, + video: Some(video), + audio, + audio_sample_rate: 48_000, + audio_channels: 2, + video_width: camera.width, + video_height: camera.height, + video_fps: camera.fps, + }; + } else if next.closed { + video_done = true; + } } - if let Some(file) = audio_dump.as_mut() { - let _ = file.write_all(&packet.data); + next = audio_queue.pop_fresh(), if !audio_done => { + if next.dropped_stale > 0 { + tracing::warn!( + dropped_stale = next.dropped_stale, + queue_depth = next.queue_depth, + "🧪 sync probe audio queue dropped stale packets" + ); + } + if let Some(mut packet) = next.packet { + stamp_probe_audio_packet(&mut packet, &mut audio_seq, next.queue_depth); + write_probe_audio_dump(audio_dump.as_mut(), &packet); + pending_audio.push(packet); + retain_newest_probe_audio(&mut pending_audio); + } else if next.closed { + audio_done = true; + } } - yield packet; - continue; } - break; } if let Some(file) = audio_dump.as_mut() { let _ = file.flush(); } }; let mut response = client - .stream_microphone(Request::new(outbound)) + .stream_webcam_media(Request::new(outbound)) .await - .context("starting sync probe microphone stream")?; + .context("starting bundled sync probe webcam stream")?; while response.get_mut().message().await.transpose().is_some() {} Ok::<(), anyhow::Error>(()) }); - let (video_result, audio_result) = - tokio::try_join!(video_task, audio_task).context("joining sync probe streams")?; - video_result.context("sync probe camera task failed")?; - audio_result.context("sync probe microphone task failed")?; + bundled_task + .await + .context("joining bundled sync probe stream")? + .context("bundled sync probe task failed")?; tracing::info!("🧪 A/V sync probe finished"); Ok(()) } +#[cfg(not(coverage))] +async fn collect_probe_audio_grace( + audio_queue: &crate::uplink_fresh_queue::FreshPacketQueue, + pending_audio: &mut Vec, + audio_done: &mut bool, + audio_seq: &mut u64, + mut audio_dump: Option<&mut File>, +) { + if *audio_done || !pending_audio.is_empty() { + return; + } + let Ok(next) = tokio::time::timeout(PROBE_BUNDLE_AUDIO_GRACE, audio_queue.pop_fresh()).await + else { + return; + }; + if let Some(mut packet) = next.packet { + stamp_probe_audio_packet(&mut packet, audio_seq, next.queue_depth); + write_probe_audio_dump(audio_dump.as_mut().map(|file| &mut **file), &packet); + pending_audio.push(packet); + retain_newest_probe_audio(pending_audio); + } else if next.closed { + *audio_done = true; + } +} + +#[cfg(not(coverage))] +fn stamp_probe_audio_packet(packet: &mut AudioPacket, seq: &mut u64, queue_depth: usize) { + *seq = seq.saturating_add(1); + let capture_pts_us = packet.pts; + let send_pts_us = crate::live_capture_clock::capture_pts_us().max(capture_pts_us); + packet.seq = *seq; + packet.client_capture_pts_us = capture_pts_us; + packet.client_send_pts_us = send_pts_us; + packet.client_queue_depth = queue_depth.try_into().unwrap_or(u32::MAX); + packet.client_queue_age_ms = packet_age_ms(capture_pts_us, send_pts_us); +} + +#[cfg(not(coverage))] +fn stamp_probe_video_packet(packet: &mut VideoPacket, seq: &mut u64, queue_depth: usize, fps: u32) { + *seq = seq.saturating_add(1); + let capture_pts_us = packet.pts; + let send_pts_us = crate::live_capture_clock::capture_pts_us().max(capture_pts_us); + packet.seq = *seq; + packet.effective_fps = fps; + packet.client_capture_pts_us = capture_pts_us; + packet.client_send_pts_us = send_pts_us; + packet.client_queue_depth = queue_depth.try_into().unwrap_or(u32::MAX); + packet.client_queue_age_ms = packet_age_ms(capture_pts_us, send_pts_us); +} + +#[cfg(not(coverage))] +fn packet_age_ms(capture_pts_us: u64, send_pts_us: u64) -> u32 { + (send_pts_us.saturating_sub(capture_pts_us) / 1_000) + .try_into() + .unwrap_or(u32::MAX) +} + +#[cfg(not(coverage))] +fn write_probe_audio_dump(file: Option<&mut File>, packet: &AudioPacket) { + if let Some(file) = file { + let _ = file.write_all(&packet.data); + } +} + +#[cfg(not(coverage))] +fn retain_newest_probe_audio(pending_audio: &mut Vec) { + if pending_audio.len() > PROBE_BUNDLE_MAX_AUDIO_PACKETS { + let dropped = pending_audio.len() - PROBE_BUNDLE_MAX_AUDIO_PACKETS; + pending_audio.drain(..dropped); + } +} + +#[cfg(not(coverage))] +fn retain_probe_audio_for_video(pending_audio: &mut Vec, video_pts_us: u64) { + let min_pts = video_pts_us.saturating_sub(PROBE_BUNDLE_AUDIO_WINDOW_BEFORE_US); + let max_pts = video_pts_us.saturating_add(PROBE_BUNDLE_AUDIO_WINDOW_AFTER_US); + pending_audio.retain(|packet| { + let pts = packet_audio_capture_pts_us(packet); + pts >= min_pts && pts <= max_pts + }); + retain_newest_probe_audio(pending_audio); +} + +#[cfg(not(coverage))] +fn probe_bundle_capture_bounds(video: Option<&VideoPacket>, audio: &[AudioPacket]) -> (u64, u64) { + let mut start = u64::MAX; + let mut end = 0_u64; + if let Some(video) = video { + let pts = packet_video_capture_pts_us(video); + start = start.min(pts); + end = end.max(pts); + } + for packet in audio { + let pts = packet_audio_capture_pts_us(packet); + start = start.min(pts); + end = end.max(pts); + } + if start == u64::MAX { + let now = crate::live_capture_clock::capture_pts_us(); + return (now, now); + } + (start, end.max(start)) +} + +#[cfg(not(coverage))] +fn packet_audio_capture_pts_us(packet: &AudioPacket) -> u64 { + if packet.client_capture_pts_us == 0 { + packet.pts + } else { + packet.client_capture_pts_us + } +} + +#[cfg(not(coverage))] +fn packet_video_capture_pts_us(packet: &VideoPacket) -> u64 { + if packet.client_capture_pts_us == 0 { + packet.pts + } else { + packet.client_capture_pts_us + } +} + #[cfg(not(coverage))] async fn connect(server_addr: &str) -> Result { crate::relay_transport::endpoint(server_addr)? diff --git a/common/Cargo.toml b/common/Cargo.toml index 79ba30a..56bb6d8 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.19.0" +version = "0.19.1" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index a520ba9..d4a83a6 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -175,6 +175,15 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_MIC_SOURCE` | client media capture/playback override | | `LESAVKA_MIC_TEST_SOURCE_DESC` | client media capture/playback override | | `LESAVKA_MOUSE_DEVICE` | input routing/clipboard override | +| `LESAVKA_OUTPUT_DELAY_APPLY` | manual direct UVC/UAC probe override; apply the measured server output-delay correction through the calibration API when the probe gates pass | +| `LESAVKA_OUTPUT_DELAY_CALIBRATION` | manual direct UVC/UAC probe override; emit `output-delay-calibration.json` from a lab-attached USB host capture so the server can save static UVC/UAC output-path defaults, defaults to enabled | +| `LESAVKA_OUTPUT_DELAY_GAIN` | manual direct UVC/UAC probe override; scales measured output-delay correction before applying, defaults to `1.0` | +| `LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS` | manual direct UVC/UAC probe safety limit; refuses to apply/save implausibly large measured device skew, defaults to `5000` | +| `LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS` | manual direct UVC/UAC probe stability limit; refuses to apply/save unstable output-delay measurements, defaults to `80` | +| `LESAVKA_OUTPUT_DELAY_MAX_STEP_US` | manual direct UVC/UAC probe safety limit; clamps one measured correction step, defaults to `1500000` | +| `LESAVKA_OUTPUT_DELAY_MIN_PAIRS` | manual direct UVC/UAC probe evidence floor before applying measured output-delay calibration, defaults to `8` | +| `LESAVKA_OUTPUT_DELAY_SAVE` | manual direct UVC/UAC probe override; after applying a ready measured correction, persist it as the server default calibration | +| `LESAVKA_OUTPUT_DELAY_TARGET` | manual direct UVC/UAC probe override; choose whether measured skew is corrected by shifting `video` or `audio`, defaults to `video` | | `LESAVKA_PASTE_DELAY_MS` | input routing/clipboard override | | `LESAVKA_PASTE_KEY` | input routing/clipboard override | | `LESAVKA_PASTE_KEY_FILE` | input routing/clipboard override | diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index ccee330..149be34 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -49,6 +49,15 @@ FETCH_CAPTURE=${FETCH_CAPTURE:-0} REMOTE_SERVER_PREFLIGHT=${REMOTE_SERVER_PREFLIGHT:-1} REMOTE_EXPECT_CAM_OUTPUT=${REMOTE_EXPECT_CAM_OUTPUT:-uvc} REMOTE_EXPECT_UVC_CODEC=${REMOTE_EXPECT_UVC_CODEC:-mjpeg} +LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1} +LESAVKA_OUTPUT_DELAY_APPLY=${LESAVKA_OUTPUT_DELAY_APPLY:-0} +LESAVKA_OUTPUT_DELAY_SAVE=${LESAVKA_OUTPUT_DELAY_SAVE:-0} +LESAVKA_OUTPUT_DELAY_TARGET=${LESAVKA_OUTPUT_DELAY_TARGET:-video} +LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-8} +LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000} +LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80} +LESAVKA_OUTPUT_DELAY_GAIN=${LESAVKA_OUTPUT_DELAY_GAIN:-1.0} +LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000} CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__" STAMP="$(date +%Y%m%d-%H%M%S)" @@ -58,6 +67,8 @@ LOCAL_CAPTURE="${LOCAL_REPORT_DIR}/capture.mkv" LOCAL_ANALYSIS_JSON="${LOCAL_REPORT_DIR}/report.json" LOCAL_REPORT_TXT="${LOCAL_REPORT_DIR}/report.txt" LOCAL_EVENTS_CSV="${LOCAL_REPORT_DIR}/events.csv" +LOCAL_OUTPUT_DELAY_JSON="${LOCAL_REPORT_DIR}/output-delay-calibration.json" +LOCAL_OUTPUT_DELAY_ENV="${LOCAL_REPORT_DIR}/output-delay-calibration.env" LOCAL_CAPTURE_LOG="${LOCAL_REPORT_DIR}/capture.log" mkdir -p "${LOCAL_REPORT_DIR}" RESOLVED_LESAVKA_SERVER_ADDR="" @@ -236,6 +247,216 @@ print_lesavka_versions() { done <<<"${version_output}" } +write_output_delay_calibration() { + [[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0 + [[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0 + + echo "==> deriving UVC/UAC output-delay calibration" + python3 - <<'PY' \ + "${LOCAL_ANALYSIS_JSON}" \ + "${LOCAL_OUTPUT_DELAY_JSON}" \ + "${LOCAL_OUTPUT_DELAY_ENV}" \ + "${LESAVKA_OUTPUT_DELAY_TARGET}" \ + "${LESAVKA_OUTPUT_DELAY_MIN_PAIRS}" \ + "${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS}" \ + "${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS}" \ + "${LESAVKA_OUTPUT_DELAY_GAIN}" \ + "${LESAVKA_OUTPUT_DELAY_MAX_STEP_US}" \ + "${LESAVKA_OUTPUT_DELAY_APPLY}" \ + "${LESAVKA_OUTPUT_DELAY_SAVE}" +import json +import math +import pathlib +import shlex +import sys + +( + report_path, + output_json_path, + output_env_path, + target, + min_pairs_raw, + max_abs_skew_raw, + max_drift_raw, + gain_raw, + max_step_raw, + apply_raw, + save_raw, +) = sys.argv[1:] + + +def as_int(value, default): + try: + return int(str(value).strip()) + except Exception: + return default + + +def as_float(value, default): + try: + result = float(str(value).strip()) + except Exception: + return default + return result if math.isfinite(result) else default + + +def as_bool(value): + return str(value).strip().lower() not in {"", "0", "false", "no", "off"} + + +def env_line(key, value): + return f"{key}={shlex.quote(str(value))}\n" + + +report = json.loads(pathlib.Path(report_path).read_text()) +verdict = report.get("verdict") or {} + +target = target.strip().lower() +min_pairs = max(1, as_int(min_pairs_raw, 8)) +max_abs_skew_ms = max(1.0, as_float(max_abs_skew_raw, 5000.0)) +max_drift_ms = max(0.0, as_float(max_drift_raw, 80.0)) +gain = min(max(as_float(gain_raw, 1.0), 0.01), 1.0) +max_step_us = max(1, as_int(max_step_raw, 1_500_000)) + +paired = as_int(report.get("paired_event_count"), 0) +median_skew_ms = as_float(report.get("median_skew_ms"), 0.0) +p95_abs_skew_ms = as_float( + verdict.get("p95_abs_skew_ms"), + as_float(report.get("max_abs_skew_ms"), 0.0), +) +max_abs_observed_ms = as_float(report.get("max_abs_skew_ms"), p95_abs_skew_ms) +drift_ms = as_float(report.get("drift_ms"), 0.0) + +raw_device_delta_us = int(round(median_skew_ms * 1000.0)) +scaled_delta_us = int(round(raw_device_delta_us * gain)) +bounded_delta_us = max(-max_step_us, min(max_step_us, scaled_delta_us)) + +audio_delta_us = 0 +video_delta_us = 0 +refusal_reasons = [] + +if target == "video": + video_delta_us = bounded_delta_us +elif target == "audio": + audio_delta_us = -bounded_delta_us +else: + refusal_reasons.append(f"unsupported target {target!r}; use video or audio") + +if paired < min_pairs: + refusal_reasons.append(f"paired_event_count {paired} < {min_pairs}") +if max_abs_observed_ms > max_abs_skew_ms: + refusal_reasons.append( + f"max_abs_skew_ms {max_abs_observed_ms:.1f} > {max_abs_skew_ms:.1f}" + ) +if abs(drift_ms) > max_drift_ms: + refusal_reasons.append(f"abs(drift_ms) {abs(drift_ms):.1f} > {max_drift_ms:.1f}") + +ready = not refusal_reasons +decision = "ready" if ready else "refused" +note = ( + "direct UVC/UAC output-delay calibration: " + f"median device skew {median_skew_ms:+.1f}ms, target={target}, " + f"audio {audio_delta_us:+d}us/video {video_delta_us:+d}us" +) +if not ready: + note = f"direct UVC/UAC output-delay calibration refused: {'; '.join(refusal_reasons)}" + +artifact = { + "schema": "lesavka.output-delay-calibration.v1", + "source": "direct-uvc-uac-output-probe", + "scope": "server-output-static-baseline", + "applies_to": "server UVC/UAC gadget output path", + "measurement_host_role": "lab-attached USB host", + "report_json": report_path, + "audio_after_video_positive": True, + "target": target, + "ready": ready, + "decision": decision, + "apply_enabled": as_bool(apply_raw), + "save_enabled": as_bool(save_raw), + "paired_event_count": paired, + "min_pairs": min_pairs, + "measured_device_skew_ms": median_skew_ms, + "p95_abs_skew_ms": p95_abs_skew_ms, + "max_abs_skew_ms": max_abs_observed_ms, + "max_abs_skew_limit_ms": max_abs_skew_ms, + "drift_ms": drift_ms, + "max_drift_ms": max_drift_ms, + "gain": gain, + "max_step_us": max_step_us, + "raw_device_delta_us": raw_device_delta_us, + "bounded_device_delta_us": bounded_delta_us, + "audio_offset_adjust_us": audio_delta_us, + "video_offset_adjust_us": video_delta_us, + "refusal_reasons": refusal_reasons, + "note": note, +} + +pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") +env_values = { + "output_delay_ready": str(ready).lower(), + "output_delay_decision": decision, + "output_delay_target": target, + "output_delay_audio_delta_us": audio_delta_us, + "output_delay_video_delta_us": video_delta_us, + "output_delay_measured_skew_ms": f"{median_skew_ms:.3f}", + "output_delay_paired_event_count": paired, + "output_delay_drift_ms": f"{drift_ms:.3f}", + "output_delay_note": note, +} +with pathlib.Path(output_env_path).open("w") as handle: + for key, value in env_values.items(): + handle.write(env_line(key, value)) +PY +} + +maybe_apply_output_delay_calibration() { + [[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0 + [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]] || return 0 + + # shellcheck disable=SC1090 + source "${LOCAL_OUTPUT_DELAY_ENV}" + + echo "==> UVC/UAC output-delay calibration decision" + echo " ↪ output_delay_calibration_json=${LOCAL_OUTPUT_DELAY_JSON}" + echo " ↪ output_delay_ready=${output_delay_ready:-false}" + echo " ↪ output_delay_decision=${output_delay_decision:-unknown}" + echo " ↪ output_delay_target=${output_delay_target:-unknown}" + echo " ↪ output_delay_paired_event_count=${output_delay_paired_event_count:-0}" + echo " ↪ output_delay_measured_skew_ms=${output_delay_measured_skew_ms:-0.0}" + echo " ↪ output_delay_drift_ms=${output_delay_drift_ms:-0.0}" + echo " ↪ output_delay_audio_delta_us=${output_delay_audio_delta_us:-0}" + echo " ↪ output_delay_video_delta_us=${output_delay_video_delta_us:-0}" + echo " ↪ output_delay_note=${output_delay_note:-}" + + if [[ "${output_delay_ready:-false}" != "true" ]]; then + echo " ↪ output delay calibration apply refused: ${output_delay_note:-not ready}" + return 0 + fi + + if [[ "${LESAVKA_OUTPUT_DELAY_APPLY}" == "0" ]]; then + echo " ↪ output delay calibration apply disabled" + return 0 + fi + + echo "==> applying measured UVC/UAC output-delay calibration" + LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ + "${REPO_ROOT}/target/debug/lesavka-relayctl" \ + --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ + calibrate \ + "${output_delay_audio_delta_us:-0}" \ + "${output_delay_video_delta_us:-0}" \ + "${output_delay_note:-direct UVC/UAC output-delay calibration}" + + if [[ "${LESAVKA_OUTPUT_DELAY_SAVE}" != "0" ]]; then + echo "==> saving measured UVC/UAC output-delay calibration as default" + LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ + "${REPO_ROOT}/target/debug/lesavka-relayctl" \ + --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ + calibration-save-default + fi +} + if [[ "${LOCAL_AUDIO_SANITY}" != "0" ]]; then echo "==> verifying local speaker-to-mic sanity before upstream sync run" "${SCRIPT_DIR}/run_local_audio_sanity.sh" @@ -976,6 +1197,9 @@ else ) fi +write_output_delay_calibration +maybe_apply_output_delay_calibration + if [[ "${capture_v4l2_fault}" -eq 1 ]]; then echo "warning: Tethys video capture reported VIDIOC_QBUF / Bad file descriptor; treat unstable skew or analyzer failures as host-capture suspect" >&2 fi @@ -994,6 +1218,12 @@ fi if [[ -f "${LOCAL_EVENTS_CSV}" ]]; then echo "events_csv: ${LOCAL_EVENTS_CSV}" fi +if [[ -f "${LOCAL_OUTPUT_DELAY_JSON}" ]]; then + echo "output_delay_calibration_json: ${LOCAL_OUTPUT_DELAY_JSON}" +fi +if [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]]; then + echo "output_delay_calibration_env: ${LOCAL_OUTPUT_DELAY_ENV}" +fi if [[ -f "${LOCAL_CAPTURE_LOG}" ]]; then echo "capture_log: ${LOCAL_CAPTURE_LOG}" fi diff --git a/server/Cargo.toml b/server/Cargo.toml index 72e7190..5bc6056 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.19.0" +version = "0.19.1" edition = "2024" autobins = false diff --git a/testing/tests/client_manual_sync_script_contract.rs b/testing/tests/client_manual_sync_script_contract.rs index 73d6135..781338f 100644 --- a/testing/tests/client_manual_sync_script_contract.rs +++ b/testing/tests/client_manual_sync_script_contract.rs @@ -11,6 +11,7 @@ const BROWSER_SYNC_SCRIPT: &str = const MIRRORED_SYNC_SCRIPT: &str = include_str!("../../scripts/manual/run_upstream_mirrored_av_sync.sh"); const LOCAL_STIMULUS: &str = include_str!("../../scripts/manual/local_av_stimulus.py"); +const SYNC_PROBE_RUNNER: &str = include_str!("../../client/src/sync_probe/runner.rs"); #[test] fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { @@ -31,6 +32,27 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "LOCAL_REPORT_DIR=\"${LOCAL_OUTPUT_DIR%/}/lesavka-sync-probe-${STAMP}\"", "LOCAL_ANALYSIS_JSON=\"${LOCAL_REPORT_DIR}/report.json\"", "LOCAL_EVENTS_CSV=\"${LOCAL_REPORT_DIR}/events.csv\"", + "LOCAL_OUTPUT_DELAY_JSON=\"${LOCAL_REPORT_DIR}/output-delay-calibration.json\"", + "LOCAL_OUTPUT_DELAY_ENV=\"${LOCAL_REPORT_DIR}/output-delay-calibration.env\"", + "LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1}", + "LESAVKA_OUTPUT_DELAY_APPLY=${LESAVKA_OUTPUT_DELAY_APPLY:-0}", + "LESAVKA_OUTPUT_DELAY_SAVE=${LESAVKA_OUTPUT_DELAY_SAVE:-0}", + "LESAVKA_OUTPUT_DELAY_TARGET=${LESAVKA_OUTPUT_DELAY_TARGET:-video}", + "LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-8}", + "LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000}", + "LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80}", + "LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000}", + "write_output_delay_calibration", + "maybe_apply_output_delay_calibration", + "schema\": \"lesavka.output-delay-calibration.v1\"", + "source\": \"direct-uvc-uac-output-probe\"", + "scope\": \"server-output-static-baseline\"", + "applies_to\": \"server UVC/UAC gadget output path\"", + "measurement_host_role\": \"lab-attached USB host\"", + "audio_after_video_positive", + "output_delay_calibration_json", + "direct UVC/UAC output-delay calibration", + "calibration-save-default", "LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0}", "PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))}", "timeout --signal=INT \"${PROBE_TIMEOUT_SECONDS}\" \"${PROBE_BIN}\"", @@ -104,6 +126,33 @@ fn browser_sync_script_can_delegate_to_a_real_path_driver() { ); } +#[test] +fn sync_probe_runner_uses_bundled_webcam_media_path() { + for expected in [ + "bundled_webcam_media", + "refusing to measure split upstream", + "UpstreamMediaBundle", + "stream_webcam_media", + "PROBE_BUNDLE_SESSION_ID", + "PROBE_BUNDLE_AUDIO_GRACE", + "server does not advertise bundled webcam media", + ] { + assert!( + SYNC_PROBE_RUNNER.contains(expected), + "sync probe runner should contain {expected}" + ); + } + for forbidden in [ + ".stream_camera(Request::new(outbound))", + ".stream_microphone(Request::new(outbound))", + ] { + assert!( + !SYNC_PROBE_RUNNER.contains(forbidden), + "sync probe runner must not use old split upstream RPC {forbidden}" + ); + } +} + #[test] fn mirrored_sync_script_uses_real_client_capture_path() { for expected in [