feat: measure upstream output device delay

This commit is contained in:
Brad Stein 2026-05-03 13:11:54 -03:00
parent 3011dabc92
commit 08cf7d8c84
9 changed files with 526 additions and 67 deletions

View File

@ -36,6 +36,12 @@ path.
handoff delays.
- [x] Optional common playout delay is only smoothness slack; it cannot clip or
replace sync-critical UVC/UAC offsets.
- [x] Direct UVC/UAC hardware probes produce a first-class
`output-delay-calibration.json` artifact for the server-to-host device
delay that v2 consumes through the same active output-offset calibration.
- [x] Treat the lab-attached host as measuring equipment only; future remote
hosts are not expected to expose SSH, browser probes, or local capture
access for lip-sync calibration.
### Wire Protocol
- [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or
@ -67,7 +73,7 @@ path.
pipeline when selected camera, camera quality, or microphone changes.
- [x] Make launcher diagnostics expose the active upstream mode as first-class
text rather than inferring from separate camera/mic telemetry.
- [ ] Migrate sync-probe runner to the bundled path explicitly and remove any
- [x] Migrate sync-probe runner to the bundled path explicitly and remove any
normal probe dependence on split `StreamCamera` + `StreamMicrophone`.
### Server Migration
@ -104,6 +110,10 @@ path.
- [x] Focused handshake and launcher tests.
- [x] Focused UVC profile test for stale configured profile vs live attached descriptor.
- [ ] Focused server upstream-media tests including bundled stream acceptance.
- [x] Direct UVC/UAC probe can derive, gate, apply, and optionally save measured
output-delay calibration without using the fragile webcam-at-screen path.
- [x] Saved output-delay calibration is a static server-side baseline for the
UVC/UAC gadget path, not a dependency on probing every future attached host.
- [ ] Install on both ends and verify diagnostics show bundled webcam media.
- [ ] Manual Google Meet test: camera starts, video is not black/unsupported,
audio is intelligible, and lip sync is inside the acceptable band.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.19.0"
version = "0.19.1"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.19.0"
version = "0.19.1"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.19.0"
version = "0.19.1"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.19.0"
version = "0.19.1"
edition = "2024"
[dependencies]

View File

@ -15,10 +15,23 @@ use std::io::Write;
use std::path::PathBuf;
#[cfg(not(coverage))]
use lesavka_common::lesavka::relay_client::RelayClient;
use lesavka_common::lesavka::{
AudioPacket, UpstreamMediaBundle, VideoPacket, relay_client::RelayClient,
};
#[cfg(not(coverage))]
use tonic::{Request, transport::Channel};
#[cfg(not(coverage))]
const PROBE_BUNDLE_AUDIO_GRACE: std::time::Duration = std::time::Duration::from_millis(30);
#[cfg(not(coverage))]
const PROBE_BUNDLE_AUDIO_WINDOW_BEFORE_US: u64 = 120_000;
#[cfg(not(coverage))]
const PROBE_BUNDLE_AUDIO_WINDOW_AFTER_US: u64 = 40_000;
#[cfg(not(coverage))]
const PROBE_BUNDLE_MAX_AUDIO_PACKETS: usize = 16;
#[cfg(not(coverage))]
const PROBE_BUNDLE_SESSION_ID: u64 = 1;
pub async fn run_sync_probe_from_args<I, S>(args: I) -> Result<()>
where
I: IntoIterator<Item = S>,
@ -39,6 +52,9 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> {
if !caps.camera || !caps.microphone {
bail!("server does not advertise both camera and microphone support");
}
if !caps.bundled_webcam_media {
bail!("server does not advertise bundled webcam media; refusing to measure split upstream");
}
let camera = app_support::camera_config_from_caps(&caps)
.context("server handshake did not include a complete camera profile")?;
@ -59,93 +75,238 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> {
"🧪 A/V sync probe starting"
);
let video_channel = connect(config.server.as_str()).await?;
let audio_channel = connect(config.server.as_str()).await?;
let bundled_channel = connect(config.server.as_str()).await?;
let capture = SyncProbeCapture::new(camera, schedule, config.duration)?;
let video_queue = capture.video_queue();
let audio_queue = capture.audio_queue();
let video_task = tokio::spawn(async move {
let mut client = RelayClient::new(video_channel);
let outbound = async_stream::stream! {
loop {
let next = video_queue.pop_fresh().await;
if next.dropped_stale > 0 {
tracing::warn!(
dropped_stale = next.dropped_stale,
queue_depth = next.queue_depth,
"🧪 sync probe video queue dropped stale packets"
);
}
if let Some(packet) = next.packet {
yield packet;
continue;
}
break;
}
};
let mut response = client
.stream_camera(Request::new(outbound))
.await
.context("starting sync probe camera stream")?;
while response.get_mut().message().await.transpose().is_some() {}
Ok::<(), anyhow::Error>(())
});
let audio_task = tokio::spawn(async move {
let mut client = RelayClient::new(audio_channel);
let bundled_task = tokio::spawn(async move {
let mut client = RelayClient::new(bundled_channel);
let mut audio_dump = open_debug_dump("LESAVKA_SYNC_PROBE_AUDIO_DUMP")
.context("opening sync probe audio dump")?;
let mut sent_packets = 0u64;
let outbound = async_stream::stream! {
let mut pending_audio = Vec::<AudioPacket>::new();
let mut audio_done = false;
let mut video_done = false;
let mut bundle_seq = 0_u64;
let mut audio_seq = 0_u64;
let mut video_seq = 0_u64;
loop {
let next = audio_queue.pop_fresh().await;
if next.dropped_stale > 0 {
tracing::warn!(
dropped_stale = next.dropped_stale,
queue_depth = next.queue_depth,
"🧪 sync probe audio queue dropped stale packets"
);
if video_done && audio_done {
break;
}
if let Some(packet) = next.packet {
sent_packets = sent_packets.saturating_add(1);
if sent_packets <= 5 || sent_packets.is_multiple_of(500) {
tracing::info!(
packet = sent_packets,
pts = packet.pts,
bytes = packet.data.len(),
"🧪 sync probe microphone packet"
);
tokio::select! {
next = video_queue.pop_fresh(), if !video_done => {
if next.dropped_stale > 0 {
tracing::warn!(
dropped_stale = next.dropped_stale,
queue_depth = next.queue_depth,
"🧪 sync probe video queue dropped stale packets"
);
}
if let Some(mut video) = next.packet {
stamp_probe_video_packet(&mut video, &mut video_seq, next.queue_depth, camera.fps);
retain_probe_audio_for_video(&mut pending_audio, packet_video_capture_pts_us(&video));
collect_probe_audio_grace(
&audio_queue,
&mut pending_audio,
&mut audio_done,
&mut audio_seq,
audio_dump.as_mut(),
).await;
retain_probe_audio_for_video(&mut pending_audio, packet_video_capture_pts_us(&video));
if pending_audio.is_empty() {
tracing::warn!(
video_pts = packet_video_capture_pts_us(&video),
"🧪 sync probe skipped video-only bundle while measuring bundled output delay"
);
continue;
}
bundle_seq = bundle_seq.saturating_add(1);
let audio = std::mem::take(&mut pending_audio);
let (capture_start_us, capture_end_us) =
probe_bundle_capture_bounds(Some(&video), &audio);
yield UpstreamMediaBundle {
session_id: PROBE_BUNDLE_SESSION_ID,
seq: bundle_seq,
capture_start_us,
capture_end_us,
video: Some(video),
audio,
audio_sample_rate: 48_000,
audio_channels: 2,
video_width: camera.width,
video_height: camera.height,
video_fps: camera.fps,
};
} else if next.closed {
video_done = true;
}
}
if let Some(file) = audio_dump.as_mut() {
let _ = file.write_all(&packet.data);
next = audio_queue.pop_fresh(), if !audio_done => {
if next.dropped_stale > 0 {
tracing::warn!(
dropped_stale = next.dropped_stale,
queue_depth = next.queue_depth,
"🧪 sync probe audio queue dropped stale packets"
);
}
if let Some(mut packet) = next.packet {
stamp_probe_audio_packet(&mut packet, &mut audio_seq, next.queue_depth);
write_probe_audio_dump(audio_dump.as_mut(), &packet);
pending_audio.push(packet);
retain_newest_probe_audio(&mut pending_audio);
} else if next.closed {
audio_done = true;
}
}
yield packet;
continue;
}
break;
}
if let Some(file) = audio_dump.as_mut() {
let _ = file.flush();
}
};
let mut response = client
.stream_microphone(Request::new(outbound))
.stream_webcam_media(Request::new(outbound))
.await
.context("starting sync probe microphone stream")?;
.context("starting bundled sync probe webcam stream")?;
while response.get_mut().message().await.transpose().is_some() {}
Ok::<(), anyhow::Error>(())
});
let (video_result, audio_result) =
tokio::try_join!(video_task, audio_task).context("joining sync probe streams")?;
video_result.context("sync probe camera task failed")?;
audio_result.context("sync probe microphone task failed")?;
bundled_task
.await
.context("joining bundled sync probe stream")?
.context("bundled sync probe task failed")?;
tracing::info!("🧪 A/V sync probe finished");
Ok(())
}
#[cfg(not(coverage))]
async fn collect_probe_audio_grace(
audio_queue: &crate::uplink_fresh_queue::FreshPacketQueue<AudioPacket>,
pending_audio: &mut Vec<AudioPacket>,
audio_done: &mut bool,
audio_seq: &mut u64,
mut audio_dump: Option<&mut File>,
) {
if *audio_done || !pending_audio.is_empty() {
return;
}
let Ok(next) = tokio::time::timeout(PROBE_BUNDLE_AUDIO_GRACE, audio_queue.pop_fresh()).await
else {
return;
};
if let Some(mut packet) = next.packet {
stamp_probe_audio_packet(&mut packet, audio_seq, next.queue_depth);
write_probe_audio_dump(audio_dump.as_mut().map(|file| &mut **file), &packet);
pending_audio.push(packet);
retain_newest_probe_audio(pending_audio);
} else if next.closed {
*audio_done = true;
}
}
#[cfg(not(coverage))]
fn stamp_probe_audio_packet(packet: &mut AudioPacket, seq: &mut u64, queue_depth: usize) {
*seq = seq.saturating_add(1);
let capture_pts_us = packet.pts;
let send_pts_us = crate::live_capture_clock::capture_pts_us().max(capture_pts_us);
packet.seq = *seq;
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = send_pts_us;
packet.client_queue_depth = queue_depth.try_into().unwrap_or(u32::MAX);
packet.client_queue_age_ms = packet_age_ms(capture_pts_us, send_pts_us);
}
#[cfg(not(coverage))]
fn stamp_probe_video_packet(packet: &mut VideoPacket, seq: &mut u64, queue_depth: usize, fps: u32) {
*seq = seq.saturating_add(1);
let capture_pts_us = packet.pts;
let send_pts_us = crate::live_capture_clock::capture_pts_us().max(capture_pts_us);
packet.seq = *seq;
packet.effective_fps = fps;
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = send_pts_us;
packet.client_queue_depth = queue_depth.try_into().unwrap_or(u32::MAX);
packet.client_queue_age_ms = packet_age_ms(capture_pts_us, send_pts_us);
}
#[cfg(not(coverage))]
fn packet_age_ms(capture_pts_us: u64, send_pts_us: u64) -> u32 {
(send_pts_us.saturating_sub(capture_pts_us) / 1_000)
.try_into()
.unwrap_or(u32::MAX)
}
#[cfg(not(coverage))]
fn write_probe_audio_dump(file: Option<&mut File>, packet: &AudioPacket) {
if let Some(file) = file {
let _ = file.write_all(&packet.data);
}
}
#[cfg(not(coverage))]
fn retain_newest_probe_audio(pending_audio: &mut Vec<AudioPacket>) {
if pending_audio.len() > PROBE_BUNDLE_MAX_AUDIO_PACKETS {
let dropped = pending_audio.len() - PROBE_BUNDLE_MAX_AUDIO_PACKETS;
pending_audio.drain(..dropped);
}
}
#[cfg(not(coverage))]
fn retain_probe_audio_for_video(pending_audio: &mut Vec<AudioPacket>, video_pts_us: u64) {
let min_pts = video_pts_us.saturating_sub(PROBE_BUNDLE_AUDIO_WINDOW_BEFORE_US);
let max_pts = video_pts_us.saturating_add(PROBE_BUNDLE_AUDIO_WINDOW_AFTER_US);
pending_audio.retain(|packet| {
let pts = packet_audio_capture_pts_us(packet);
pts >= min_pts && pts <= max_pts
});
retain_newest_probe_audio(pending_audio);
}
#[cfg(not(coverage))]
fn probe_bundle_capture_bounds(video: Option<&VideoPacket>, audio: &[AudioPacket]) -> (u64, u64) {
let mut start = u64::MAX;
let mut end = 0_u64;
if let Some(video) = video {
let pts = packet_video_capture_pts_us(video);
start = start.min(pts);
end = end.max(pts);
}
for packet in audio {
let pts = packet_audio_capture_pts_us(packet);
start = start.min(pts);
end = end.max(pts);
}
if start == u64::MAX {
let now = crate::live_capture_clock::capture_pts_us();
return (now, now);
}
(start, end.max(start))
}
#[cfg(not(coverage))]
fn packet_audio_capture_pts_us(packet: &AudioPacket) -> u64 {
if packet.client_capture_pts_us == 0 {
packet.pts
} else {
packet.client_capture_pts_us
}
}
#[cfg(not(coverage))]
fn packet_video_capture_pts_us(packet: &VideoPacket) -> u64 {
if packet.client_capture_pts_us == 0 {
packet.pts
} else {
packet.client_capture_pts_us
}
}
#[cfg(not(coverage))]
async fn connect(server_addr: &str) -> Result<Channel> {
crate::relay_transport::endpoint(server_addr)?

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.19.0"
version = "0.19.1"
edition = "2024"
build = "build.rs"

View File

@ -175,6 +175,15 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_MIC_SOURCE` | client media capture/playback override |
| `LESAVKA_MIC_TEST_SOURCE_DESC` | client media capture/playback override |
| `LESAVKA_MOUSE_DEVICE` | input routing/clipboard override |
| `LESAVKA_OUTPUT_DELAY_APPLY` | manual direct UVC/UAC probe override; apply the measured server output-delay correction through the calibration API when the probe gates pass |
| `LESAVKA_OUTPUT_DELAY_CALIBRATION` | manual direct UVC/UAC probe override; emit `output-delay-calibration.json` from a lab-attached USB host capture so the server can save static UVC/UAC output-path defaults, defaults to enabled |
| `LESAVKA_OUTPUT_DELAY_GAIN` | manual direct UVC/UAC probe override; scales measured output-delay correction before applying, defaults to `1.0` |
| `LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS` | manual direct UVC/UAC probe safety limit; refuses to apply/save implausibly large measured device skew, defaults to `5000` |
| `LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS` | manual direct UVC/UAC probe stability limit; refuses to apply/save unstable output-delay measurements, defaults to `80` |
| `LESAVKA_OUTPUT_DELAY_MAX_STEP_US` | manual direct UVC/UAC probe safety limit; clamps one measured correction step, defaults to `1500000` |
| `LESAVKA_OUTPUT_DELAY_MIN_PAIRS` | manual direct UVC/UAC probe evidence floor before applying measured output-delay calibration, defaults to `8` |
| `LESAVKA_OUTPUT_DELAY_SAVE` | manual direct UVC/UAC probe override; after applying a ready measured correction, persist it as the server default calibration |
| `LESAVKA_OUTPUT_DELAY_TARGET` | manual direct UVC/UAC probe override; choose whether measured skew is corrected by shifting `video` or `audio`, defaults to `video` |
| `LESAVKA_PASTE_DELAY_MS` | input routing/clipboard override |
| `LESAVKA_PASTE_KEY` | input routing/clipboard override |
| `LESAVKA_PASTE_KEY_FILE` | input routing/clipboard override |

View File

@ -49,6 +49,15 @@ FETCH_CAPTURE=${FETCH_CAPTURE:-0}
REMOTE_SERVER_PREFLIGHT=${REMOTE_SERVER_PREFLIGHT:-1}
REMOTE_EXPECT_CAM_OUTPUT=${REMOTE_EXPECT_CAM_OUTPUT:-uvc}
REMOTE_EXPECT_UVC_CODEC=${REMOTE_EXPECT_UVC_CODEC:-mjpeg}
LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1}
LESAVKA_OUTPUT_DELAY_APPLY=${LESAVKA_OUTPUT_DELAY_APPLY:-0}
LESAVKA_OUTPUT_DELAY_SAVE=${LESAVKA_OUTPUT_DELAY_SAVE:-0}
LESAVKA_OUTPUT_DELAY_TARGET=${LESAVKA_OUTPUT_DELAY_TARGET:-video}
LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-8}
LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000}
LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80}
LESAVKA_OUTPUT_DELAY_GAIN=${LESAVKA_OUTPUT_DELAY_GAIN:-1.0}
LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000}
CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__"
STAMP="$(date +%Y%m%d-%H%M%S)"
@ -58,6 +67,8 @@ LOCAL_CAPTURE="${LOCAL_REPORT_DIR}/capture.mkv"
LOCAL_ANALYSIS_JSON="${LOCAL_REPORT_DIR}/report.json"
LOCAL_REPORT_TXT="${LOCAL_REPORT_DIR}/report.txt"
LOCAL_EVENTS_CSV="${LOCAL_REPORT_DIR}/events.csv"
LOCAL_OUTPUT_DELAY_JSON="${LOCAL_REPORT_DIR}/output-delay-calibration.json"
LOCAL_OUTPUT_DELAY_ENV="${LOCAL_REPORT_DIR}/output-delay-calibration.env"
LOCAL_CAPTURE_LOG="${LOCAL_REPORT_DIR}/capture.log"
mkdir -p "${LOCAL_REPORT_DIR}"
RESOLVED_LESAVKA_SERVER_ADDR=""
@ -236,6 +247,216 @@ print_lesavka_versions() {
done <<<"${version_output}"
}
write_output_delay_calibration() {
[[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0
[[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0
echo "==> deriving UVC/UAC output-delay calibration"
python3 - <<'PY' \
"${LOCAL_ANALYSIS_JSON}" \
"${LOCAL_OUTPUT_DELAY_JSON}" \
"${LOCAL_OUTPUT_DELAY_ENV}" \
"${LESAVKA_OUTPUT_DELAY_TARGET}" \
"${LESAVKA_OUTPUT_DELAY_MIN_PAIRS}" \
"${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS}" \
"${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS}" \
"${LESAVKA_OUTPUT_DELAY_GAIN}" \
"${LESAVKA_OUTPUT_DELAY_MAX_STEP_US}" \
"${LESAVKA_OUTPUT_DELAY_APPLY}" \
"${LESAVKA_OUTPUT_DELAY_SAVE}"
import json
import math
import pathlib
import shlex
import sys
(
report_path,
output_json_path,
output_env_path,
target,
min_pairs_raw,
max_abs_skew_raw,
max_drift_raw,
gain_raw,
max_step_raw,
apply_raw,
save_raw,
) = sys.argv[1:]
def as_int(value, default):
try:
return int(str(value).strip())
except Exception:
return default
def as_float(value, default):
try:
result = float(str(value).strip())
except Exception:
return default
return result if math.isfinite(result) else default
def as_bool(value):
return str(value).strip().lower() not in {"", "0", "false", "no", "off"}
def env_line(key, value):
return f"{key}={shlex.quote(str(value))}\n"
report = json.loads(pathlib.Path(report_path).read_text())
verdict = report.get("verdict") or {}
target = target.strip().lower()
min_pairs = max(1, as_int(min_pairs_raw, 8))
max_abs_skew_ms = max(1.0, as_float(max_abs_skew_raw, 5000.0))
max_drift_ms = max(0.0, as_float(max_drift_raw, 80.0))
gain = min(max(as_float(gain_raw, 1.0), 0.01), 1.0)
max_step_us = max(1, as_int(max_step_raw, 1_500_000))
paired = as_int(report.get("paired_event_count"), 0)
median_skew_ms = as_float(report.get("median_skew_ms"), 0.0)
p95_abs_skew_ms = as_float(
verdict.get("p95_abs_skew_ms"),
as_float(report.get("max_abs_skew_ms"), 0.0),
)
max_abs_observed_ms = as_float(report.get("max_abs_skew_ms"), p95_abs_skew_ms)
drift_ms = as_float(report.get("drift_ms"), 0.0)
raw_device_delta_us = int(round(median_skew_ms * 1000.0))
scaled_delta_us = int(round(raw_device_delta_us * gain))
bounded_delta_us = max(-max_step_us, min(max_step_us, scaled_delta_us))
audio_delta_us = 0
video_delta_us = 0
refusal_reasons = []
if target == "video":
video_delta_us = bounded_delta_us
elif target == "audio":
audio_delta_us = -bounded_delta_us
else:
refusal_reasons.append(f"unsupported target {target!r}; use video or audio")
if paired < min_pairs:
refusal_reasons.append(f"paired_event_count {paired} < {min_pairs}")
if max_abs_observed_ms > max_abs_skew_ms:
refusal_reasons.append(
f"max_abs_skew_ms {max_abs_observed_ms:.1f} > {max_abs_skew_ms:.1f}"
)
if abs(drift_ms) > max_drift_ms:
refusal_reasons.append(f"abs(drift_ms) {abs(drift_ms):.1f} > {max_drift_ms:.1f}")
ready = not refusal_reasons
decision = "ready" if ready else "refused"
note = (
"direct UVC/UAC output-delay calibration: "
f"median device skew {median_skew_ms:+.1f}ms, target={target}, "
f"audio {audio_delta_us:+d}us/video {video_delta_us:+d}us"
)
if not ready:
note = f"direct UVC/UAC output-delay calibration refused: {'; '.join(refusal_reasons)}"
artifact = {
"schema": "lesavka.output-delay-calibration.v1",
"source": "direct-uvc-uac-output-probe",
"scope": "server-output-static-baseline",
"applies_to": "server UVC/UAC gadget output path",
"measurement_host_role": "lab-attached USB host",
"report_json": report_path,
"audio_after_video_positive": True,
"target": target,
"ready": ready,
"decision": decision,
"apply_enabled": as_bool(apply_raw),
"save_enabled": as_bool(save_raw),
"paired_event_count": paired,
"min_pairs": min_pairs,
"measured_device_skew_ms": median_skew_ms,
"p95_abs_skew_ms": p95_abs_skew_ms,
"max_abs_skew_ms": max_abs_observed_ms,
"max_abs_skew_limit_ms": max_abs_skew_ms,
"drift_ms": drift_ms,
"max_drift_ms": max_drift_ms,
"gain": gain,
"max_step_us": max_step_us,
"raw_device_delta_us": raw_device_delta_us,
"bounded_device_delta_us": bounded_delta_us,
"audio_offset_adjust_us": audio_delta_us,
"video_offset_adjust_us": video_delta_us,
"refusal_reasons": refusal_reasons,
"note": note,
}
pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n")
env_values = {
"output_delay_ready": str(ready).lower(),
"output_delay_decision": decision,
"output_delay_target": target,
"output_delay_audio_delta_us": audio_delta_us,
"output_delay_video_delta_us": video_delta_us,
"output_delay_measured_skew_ms": f"{median_skew_ms:.3f}",
"output_delay_paired_event_count": paired,
"output_delay_drift_ms": f"{drift_ms:.3f}",
"output_delay_note": note,
}
with pathlib.Path(output_env_path).open("w") as handle:
for key, value in env_values.items():
handle.write(env_line(key, value))
PY
}
maybe_apply_output_delay_calibration() {
[[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0
[[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]] || return 0
# shellcheck disable=SC1090
source "${LOCAL_OUTPUT_DELAY_ENV}"
echo "==> UVC/UAC output-delay calibration decision"
echo " ↪ output_delay_calibration_json=${LOCAL_OUTPUT_DELAY_JSON}"
echo " ↪ output_delay_ready=${output_delay_ready:-false}"
echo " ↪ output_delay_decision=${output_delay_decision:-unknown}"
echo " ↪ output_delay_target=${output_delay_target:-unknown}"
echo " ↪ output_delay_paired_event_count=${output_delay_paired_event_count:-0}"
echo " ↪ output_delay_measured_skew_ms=${output_delay_measured_skew_ms:-0.0}"
echo " ↪ output_delay_drift_ms=${output_delay_drift_ms:-0.0}"
echo " ↪ output_delay_audio_delta_us=${output_delay_audio_delta_us:-0}"
echo " ↪ output_delay_video_delta_us=${output_delay_video_delta_us:-0}"
echo " ↪ output_delay_note=${output_delay_note:-}"
if [[ "${output_delay_ready:-false}" != "true" ]]; then
echo " ↪ output delay calibration apply refused: ${output_delay_note:-not ready}"
return 0
fi
if [[ "${LESAVKA_OUTPUT_DELAY_APPLY}" == "0" ]]; then
echo " ↪ output delay calibration apply disabled"
return 0
fi
echo "==> applying measured UVC/UAC output-delay calibration"
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
calibrate \
"${output_delay_audio_delta_us:-0}" \
"${output_delay_video_delta_us:-0}" \
"${output_delay_note:-direct UVC/UAC output-delay calibration}"
if [[ "${LESAVKA_OUTPUT_DELAY_SAVE}" != "0" ]]; then
echo "==> saving measured UVC/UAC output-delay calibration as default"
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
calibration-save-default
fi
}
if [[ "${LOCAL_AUDIO_SANITY}" != "0" ]]; then
echo "==> verifying local speaker-to-mic sanity before upstream sync run"
"${SCRIPT_DIR}/run_local_audio_sanity.sh"
@ -976,6 +1197,9 @@ else
)
fi
write_output_delay_calibration
maybe_apply_output_delay_calibration
if [[ "${capture_v4l2_fault}" -eq 1 ]]; then
echo "warning: Tethys video capture reported VIDIOC_QBUF / Bad file descriptor; treat unstable skew or analyzer failures as host-capture suspect" >&2
fi
@ -994,6 +1218,12 @@ fi
if [[ -f "${LOCAL_EVENTS_CSV}" ]]; then
echo "events_csv: ${LOCAL_EVENTS_CSV}"
fi
if [[ -f "${LOCAL_OUTPUT_DELAY_JSON}" ]]; then
echo "output_delay_calibration_json: ${LOCAL_OUTPUT_DELAY_JSON}"
fi
if [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]]; then
echo "output_delay_calibration_env: ${LOCAL_OUTPUT_DELAY_ENV}"
fi
if [[ -f "${LOCAL_CAPTURE_LOG}" ]]; then
echo "capture_log: ${LOCAL_CAPTURE_LOG}"
fi

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.19.0"
version = "0.19.1"
edition = "2024"
autobins = false

View File

@ -11,6 +11,7 @@ const BROWSER_SYNC_SCRIPT: &str =
const MIRRORED_SYNC_SCRIPT: &str =
include_str!("../../scripts/manual/run_upstream_mirrored_av_sync.sh");
const LOCAL_STIMULUS: &str = include_str!("../../scripts/manual/local_av_stimulus.py");
const SYNC_PROBE_RUNNER: &str = include_str!("../../client/src/sync_probe/runner.rs");
#[test]
fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() {
@ -31,6 +32,27 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() {
"LOCAL_REPORT_DIR=\"${LOCAL_OUTPUT_DIR%/}/lesavka-sync-probe-${STAMP}\"",
"LOCAL_ANALYSIS_JSON=\"${LOCAL_REPORT_DIR}/report.json\"",
"LOCAL_EVENTS_CSV=\"${LOCAL_REPORT_DIR}/events.csv\"",
"LOCAL_OUTPUT_DELAY_JSON=\"${LOCAL_REPORT_DIR}/output-delay-calibration.json\"",
"LOCAL_OUTPUT_DELAY_ENV=\"${LOCAL_REPORT_DIR}/output-delay-calibration.env\"",
"LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1}",
"LESAVKA_OUTPUT_DELAY_APPLY=${LESAVKA_OUTPUT_DELAY_APPLY:-0}",
"LESAVKA_OUTPUT_DELAY_SAVE=${LESAVKA_OUTPUT_DELAY_SAVE:-0}",
"LESAVKA_OUTPUT_DELAY_TARGET=${LESAVKA_OUTPUT_DELAY_TARGET:-video}",
"LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-8}",
"LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000}",
"LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80}",
"LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000}",
"write_output_delay_calibration",
"maybe_apply_output_delay_calibration",
"schema\": \"lesavka.output-delay-calibration.v1\"",
"source\": \"direct-uvc-uac-output-probe\"",
"scope\": \"server-output-static-baseline\"",
"applies_to\": \"server UVC/UAC gadget output path\"",
"measurement_host_role\": \"lab-attached USB host\"",
"audio_after_video_positive",
"output_delay_calibration_json",
"direct UVC/UAC output-delay calibration",
"calibration-save-default",
"LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0}",
"PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))}",
"timeout --signal=INT \"${PROBE_TIMEOUT_SECONDS}\" \"${PROBE_BIN}\"",
@ -104,6 +126,33 @@ fn browser_sync_script_can_delegate_to_a_real_path_driver() {
);
}
#[test]
fn sync_probe_runner_uses_bundled_webcam_media_path() {
for expected in [
"bundled_webcam_media",
"refusing to measure split upstream",
"UpstreamMediaBundle",
"stream_webcam_media",
"PROBE_BUNDLE_SESSION_ID",
"PROBE_BUNDLE_AUDIO_GRACE",
"server does not advertise bundled webcam media",
] {
assert!(
SYNC_PROBE_RUNNER.contains(expected),
"sync probe runner should contain {expected}"
);
}
for forbidden in [
".stream_camera(Request::new(outbound))",
".stream_microphone(Request::new(outbound))",
] {
assert!(
!SYNC_PROBE_RUNNER.contains(forbidden),
"sync probe runner must not use old split upstream RPC {forbidden}"
);
}
}
#[test]
fn mirrored_sync_script_uses_real_client_capture_path() {
for expected in [