#!/usr/bin/env bash # scripts/manual/run_upstream_av_sync.sh # Manual: upstream A/V sync hardware probe; not part of CI. # # Manual: capture the real Tethys UVC/UAC endpoints while the Lesavka server # generates paired probe media locally and feeds its own UVC/UAC output sinks. # This intentionally measures only the server-to-host output-device skew. set -euo pipefail SCRIPT_DIR="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)" REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../.." >/dev/null 2>&1 && pwd)" TETHYS_HOST=${TETHYS_HOST:-tethys} LESAVKA_SERVER_HOST=${LESAVKA_SERVER_HOST:-theia} LESAVKA_SERVER_CONNECT_HOST=${LESAVKA_SERVER_CONNECT_HOST:-38.28.125.112} LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-auto} LESAVKA_SERVER_SCHEME=${LESAVKA_SERVER_SCHEME:-https} LESAVKA_TLS_DOMAIN=${LESAVKA_TLS_DOMAIN:-lesavka-server} PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-20} PROBE_WARMUP_SECONDS=${PROBE_WARMUP_SECONDS:-4} PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))} PROBE_PULSE_PERIOD_MS=${PROBE_PULSE_PERIOD_MS:-1000} PROBE_PULSE_WIDTH_MS=${PROBE_PULSE_WIDTH_MS:-120} PROBE_EVENT_WIDTH_CODES=${PROBE_EVENT_WIDTH_CODES:-1,2,1,3,2,4,1,1,3,1,4,2,1,2,3,4,1,3,2,2,4,1,2,4,3,1,1,4,2,3,1,2} LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US:-0} LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US:-0} # Do not open the UVC host capture far ahead of the probe. The gadget side only # has frames once the sync probe is feeding the server, and some hosts time out # VIDIOC_STREAMON if the camera is starved during pre-roll. LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0} TAIL_SECONDS=${TAIL_SECONDS:-2} CAPTURE_SECONDS=${CAPTURE_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + LEAD_IN_SECONDS + TAIL_SECONDS))} LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-/tmp} REMOTE_VIDEO_DEVICE=${REMOTE_VIDEO_DEVICE:-auto} VIDEO_SIZE=${VIDEO_SIZE:-auto} VIDEO_FPS=${VIDEO_FPS:-auto} VIDEO_FORMAT=${VIDEO_FORMAT:-mjpeg} REMOTE_CAPTURE_STACK=${REMOTE_CAPTURE_STACK:-pulse} REMOTE_PULSE_CAPTURE_TOOL=${REMOTE_PULSE_CAPTURE_TOOL:-gst} REMOTE_PULSE_VIDEO_MODE=${REMOTE_PULSE_VIDEO_MODE:-copy} REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto} REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto} ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-0} ANALYSIS_SCALE_WIDTH=${ANALYSIS_SCALE_WIDTH:-1280} SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=30"} PROBE_PREBUILD=${PROBE_PREBUILD:-1} ANALYZE_BIN=${ANALYZE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-analyze"} REMOTE_ANALYZE=${REMOTE_ANALYZE:-1} REMOTE_ANALYZE_BIN=${REMOTE_ANALYZE_BIN:-/tmp/lesavka-sync-analyze} REMOTE_ANALYZE_COPY=${REMOTE_ANALYZE_COPY:-1} FETCH_CAPTURE=${FETCH_CAPTURE:-1} REMOTE_SERVER_PREFLIGHT=${REMOTE_SERVER_PREFLIGHT:-1} REMOTE_EXPECT_CAM_OUTPUT=${REMOTE_EXPECT_CAM_OUTPUT:-uvc} REMOTE_EXPECT_UVC_CODEC=${REMOTE_EXPECT_UVC_CODEC:-mjpeg} LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1} LESAVKA_OUTPUT_DELAY_APPLY=${LESAVKA_OUTPUT_DELAY_APPLY:-0} LESAVKA_OUTPUT_DELAY_APPLY_MODE=${LESAVKA_OUTPUT_DELAY_APPLY_MODE:-absolute} LESAVKA_OUTPUT_DELAY_SAVE=${LESAVKA_OUTPUT_DELAY_SAVE:-0} LESAVKA_OUTPUT_DELAY_TARGET=${LESAVKA_OUTPUT_DELAY_TARGET:-video} LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-8} LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000} LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80} LESAVKA_OUTPUT_DELAY_GAIN=${LESAVKA_OUTPUT_DELAY_GAIN:-1.0} LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000} CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__" STAMP="$(date +%Y%m%d-%H%M%S)" REMOTE_CAPTURE=${REMOTE_CAPTURE:-"/tmp/lesavka-output-delay-probe-${STAMP}.mkv"} LOCAL_REPORT_DIR="${LOCAL_OUTPUT_DIR%/}/lesavka-output-delay-probe-${STAMP}" LOCAL_CAPTURE="${LOCAL_REPORT_DIR}/capture.mkv" LOCAL_ANALYSIS_JSON="${LOCAL_REPORT_DIR}/report.json" LOCAL_REPORT_TXT="${LOCAL_REPORT_DIR}/report.txt" LOCAL_EVENTS_CSV="${LOCAL_REPORT_DIR}/events.csv" LOCAL_SERVER_PROBE_REPLY="${LOCAL_REPORT_DIR}/server-output-probe-reply.txt" LOCAL_SERVER_TIMELINE_JSON="${LOCAL_REPORT_DIR}/server-output-timeline.json" LOCAL_OUTPUT_DELAY_CORRELATION_JSON="${LOCAL_REPORT_DIR}/output-delay-correlation.json" LOCAL_OUTPUT_DELAY_CORRELATION_CSV="${LOCAL_REPORT_DIR}/output-delay-correlation.csv" LOCAL_OUTPUT_DELAY_CORRELATION_TXT="${LOCAL_REPORT_DIR}/output-delay-correlation.txt" LOCAL_OUTPUT_DELAY_JSON="${LOCAL_REPORT_DIR}/output-delay-calibration.json" LOCAL_OUTPUT_DELAY_ENV="${LOCAL_REPORT_DIR}/output-delay-calibration.env" LOCAL_CAPTURE_LOG="${LOCAL_REPORT_DIR}/capture.log" mkdir -p "${LOCAL_REPORT_DIR}" RESOLVED_LESAVKA_SERVER_ADDR="" SERVER_TUNNEL_PID="" SERVER_TUNNEL_REMOTE_PORT="" SERVER_TUNNEL_LOCAL_PORT="" cleanup_server_tunnel() { if [[ -z "${SERVER_TUNNEL_PID}" ]]; then return 0 fi if kill -0 "${SERVER_TUNNEL_PID}" >/dev/null 2>&1; then kill "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true wait "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true fi } trap cleanup_server_tunnel EXIT pick_local_server_tunnel_port() { python3 - <<'PY' import socket with socket.socket() as sock: sock.bind(("127.0.0.1", 0)) print(sock.getsockname()[1]) PY } wait_for_server_tunnel() { local local_port=$1 local tries=50 local i=0 while (( i < tries )); do if nc -z 127.0.0.1 "${local_port}" >/dev/null 2>&1; then return 0 fi if ! kill -0 "${SERVER_TUNNEL_PID}" >/dev/null 2>&1; then wait "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true echo "SSH tunnel to ${LESAVKA_SERVER_HOST} exited before becoming ready" >&2 exit 88 fi sleep 0.1 ((i += 1)) done echo "SSH tunnel to ${LESAVKA_SERVER_HOST} did not become ready on localhost:${local_port}" >&2 exit 89 } start_server_tunnel() { local remote_port=$1 local local_port local_port="$(pick_local_server_tunnel_port)" echo "==> opening SSH tunnel to ${LESAVKA_SERVER_HOST}:127.0.0.1:${remote_port} on localhost:${local_port}" ssh ${SSH_OPTS} -o ExitOnForwardFailure=yes \ -N \ -L "127.0.0.1:${local_port}:127.0.0.1:${remote_port}" \ "${LESAVKA_SERVER_HOST}" & SERVER_TUNNEL_PID=$! SERVER_TUNNEL_REMOTE_PORT="${remote_port}" SERVER_TUNNEL_LOCAL_PORT="${local_port}" wait_for_server_tunnel "${local_port}" } resolve_server_addr() { if [[ "${LESAVKA_SERVER_ADDR}" != "auto" ]]; then RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_ADDR}" return 0 fi local bind_addr port bind_addr="$( ssh ${SSH_OPTS} "${LESAVKA_SERVER_HOST}" \ "grep -E '^LESAVKA_SERVER_BIND_ADDR=' /etc/lesavka/server.env 2>/dev/null | tail -n1 | cut -d= -f2-" \ 2>/dev/null || true )" port="${bind_addr##*:}" if [[ "${port}" =~ ^[0-9]+$ ]]; then start_server_tunnel "${port}" RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}" return 0 fi start_server_tunnel "50051" RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}" } preflight_server_path() { [[ "${REMOTE_SERVER_PREFLIGHT}" != "0" ]] || return 0 echo "==> verifying Lesavka server path on ${LESAVKA_SERVER_HOST}" ssh ${SSH_OPTS} "${LESAVKA_SERVER_HOST}" bash -s -- \ "${REMOTE_EXPECT_CAM_OUTPUT}" \ "${REMOTE_EXPECT_UVC_CODEC}" <<'REMOTE_PREFLIGHT' set -euo pipefail expect_cam_output=$1 expect_uvc_codec=$2 read_env_value() { local key=$1 local file=$2 local value="" value=$(grep -E "^${key}=" "$file" 2>/dev/null | tail -n1 | cut -d= -f2- || true) printf '%s\n' "$value" } cam_output=$(read_env_value "LESAVKA_CAM_OUTPUT" /etc/lesavka/server.env) server_uvc_codec=$(read_env_value "LESAVKA_UVC_CODEC" /etc/lesavka/server.env) runtime_uvc_codec=$(read_env_value "LESAVKA_UVC_CODEC" /etc/lesavka/uvc.env) printf ' ↪ server.env CAM_OUTPUT=%s\n' "${cam_output:-}" printf ' ↪ server.env UVC_CODEC=%s\n' "${server_uvc_codec:-}" printf ' ↪ uvc.env UVC_CODEC=%s\n' "${runtime_uvc_codec:-}" if [[ -n "${expect_cam_output}" && "${cam_output}" != "${expect_cam_output}" ]]; then printf 'expected CAM_OUTPUT=%s but found %s\n' "${expect_cam_output}" "${cam_output:-}" >&2 exit 64 fi if [[ -n "${expect_uvc_codec}" && "${server_uvc_codec}" != "${expect_uvc_codec}" ]]; then printf 'expected server.env UVC_CODEC=%s but found %s\n' "${expect_uvc_codec}" "${server_uvc_codec:-}" >&2 exit 65 fi if [[ -n "${expect_uvc_codec}" && "${runtime_uvc_codec}" != "${expect_uvc_codec}" ]]; then printf 'expected uvc.env UVC_CODEC=%s but found %s\n' "${expect_uvc_codec}" "${runtime_uvc_codec:-}" >&2 exit 66 fi systemctl is-active lesavka-server lesavka-uvc lesavka-core >/dev/null REMOTE_PREFLIGHT } print_lesavka_versions() { echo "==> Lesavka versions under test" if [[ ! -x "${REPO_ROOT}/target/debug/lesavka-relayctl" ]]; then (cd "${REPO_ROOT}" && cargo build -p lesavka_client --bin lesavka-relayctl >/dev/null) fi local version_output if ! version_output="$( LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ version 2>&1 )"; then echo "failed to query Lesavka versions through ${RESOLVED_LESAVKA_SERVER_ADDR}" >&2 echo "${version_output}" >&2 return 1 fi if ! grep -q "^client_version=" <<<"${version_output}"; then echo "Lesavka version query did not report client_version=; refusing to run an unattributed probe" >&2 echo "${version_output}" >&2 return 1 fi if grep -q "^client_full_version=" <<<"${version_output}"; then echo "Lesavka version query reported a combined version+revision; refusing ambiguous probe attribution" >&2 echo "${version_output}" >&2 return 1 fi if ! grep -q "^client_revision=" <<<"${version_output}"; then echo "Lesavka version query did not report client_revision=; refusing to run an unattributed probe" >&2 echo "${version_output}" >&2 return 1 fi if ! grep -q "^server_version=" <<<"${version_output}"; then echo "Lesavka version query did not report server_version=; refusing to run an unattributed probe" >&2 echo "${version_output}" >&2 return 1 fi if ! grep -q "^server_revision=" <<<"${version_output}"; then echo "Lesavka version query did not report server_revision=; refusing to run an unattributed probe" >&2 echo "${version_output}" >&2 return 1 fi while IFS= read -r line; do [[ -n "${line}" ]] && echo " ↪ ${line}" done <<<"${version_output}" } write_output_delay_calibration() { [[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0 [[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0 echo "==> deriving UVC/UAC output-delay calibration" python3 - <<'PY' \ "${LOCAL_ANALYSIS_JSON}" \ "${LOCAL_OUTPUT_DELAY_JSON}" \ "${LOCAL_OUTPUT_DELAY_ENV}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \ "${LESAVKA_OUTPUT_DELAY_TARGET}" \ "${LESAVKA_OUTPUT_DELAY_MIN_PAIRS}" \ "${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS}" \ "${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS}" \ "${LESAVKA_OUTPUT_DELAY_GAIN}" \ "${LESAVKA_OUTPUT_DELAY_MAX_STEP_US}" \ "${LESAVKA_OUTPUT_DELAY_APPLY}" \ "${LESAVKA_OUTPUT_DELAY_APPLY_MODE}" \ "${LESAVKA_OUTPUT_DELAY_SAVE}" import json import math import pathlib import shlex import sys ( report_path, output_json_path, output_env_path, correlation_path, target, min_pairs_raw, max_abs_skew_raw, max_drift_raw, gain_raw, max_step_raw, apply_raw, apply_mode_raw, save_raw, ) = sys.argv[1:] def as_int(value, default): try: return int(str(value).strip()) except Exception: return default def as_float(value, default): try: result = float(str(value).strip()) except Exception: return default return result if math.isfinite(result) else default def as_bool(value): return str(value).strip().lower() not in {"", "0", "false", "no", "off"} def env_line(key, value): return f"{key}={shlex.quote(str(value))}\n" report = json.loads(pathlib.Path(report_path).read_text()) verdict = report.get("verdict") or {} target = target.strip().lower() apply_mode = apply_mode_raw.strip().lower() min_pairs = max(1, as_int(min_pairs_raw, 8)) max_abs_skew_ms = max(1.0, as_float(max_abs_skew_raw, 5000.0)) max_drift_ms = max(0.0, as_float(max_drift_raw, 80.0)) gain = min(max(as_float(gain_raw, 1.0), 0.01), 1.0) max_step_us = max(1, as_int(max_step_raw, 1_500_000)) paired = as_int(report.get("paired_event_count"), 0) median_skew_ms = as_float(report.get("median_skew_ms"), 0.0) p95_abs_skew_ms = as_float( verdict.get("p95_abs_skew_ms"), as_float(report.get("max_abs_skew_ms"), 0.0), ) max_abs_observed_ms = as_float(report.get("max_abs_skew_ms"), p95_abs_skew_ms) drift_ms = as_float(report.get("drift_ms"), 0.0) raw_device_delta_us = int(round(median_skew_ms * 1000.0)) scaled_delta_us = int(round(raw_device_delta_us * gain)) bounded_delta_us = max(-max_step_us, min(max_step_us, scaled_delta_us)) audio_delta_us = 0 video_delta_us = 0 refusal_reasons = [] if target == "video": video_delta_us = bounded_delta_us elif target == "audio": audio_delta_us = -bounded_delta_us else: refusal_reasons.append(f"unsupported target {target!r}; use video or audio") if apply_mode not in {"absolute", "relative"}: refusal_reasons.append( f"unsupported apply mode {apply_mode!r}; use absolute or relative" ) if paired < min_pairs: refusal_reasons.append(f"paired_event_count {paired} < {min_pairs}") if max_abs_observed_ms > max_abs_skew_ms: refusal_reasons.append( f"max_abs_skew_ms {max_abs_observed_ms:.1f} > {max_abs_skew_ms:.1f}" ) if abs(drift_ms) > max_drift_ms: refusal_reasons.append(f"abs(drift_ms) {abs(drift_ms):.1f} > {max_drift_ms:.1f}") ready = not refusal_reasons decision = "ready" if ready else "refused" note = ( "direct UVC/UAC output-delay calibration: " f"median device skew {median_skew_ms:+.1f}ms, target={target}, " f"audio {audio_delta_us:+d}us/video {video_delta_us:+d}us" ) if not ready: note = f"direct UVC/UAC output-delay calibration refused: {'; '.join(refusal_reasons)}" artifact = { "schema": "lesavka.output-delay-calibration.v1", "source": "direct-uvc-uac-output-probe", "scope": "server-output-static-baseline", "applies_to": "server UVC/UAC gadget output path", "measurement_host_role": "lab-attached USB host", "probe_media_origin": "server-generated", "probe_media_path": "server generated signatures -> UVC/UAC sinks -> lab host capture", "report_json": report_path, "correlation_json": correlation_path if pathlib.Path(correlation_path).exists() else "", "audio_after_video_positive": True, "target": target, "ready": ready, "decision": decision, "apply_enabled": as_bool(apply_raw), "apply_mode": apply_mode, "save_enabled": as_bool(save_raw), "paired_event_count": paired, "min_pairs": min_pairs, "measured_device_skew_ms": median_skew_ms, "p95_abs_skew_ms": p95_abs_skew_ms, "max_abs_skew_ms": max_abs_observed_ms, "max_abs_skew_limit_ms": max_abs_skew_ms, "drift_ms": drift_ms, "max_drift_ms": max_drift_ms, "gain": gain, "max_step_us": max_step_us, "raw_device_delta_us": raw_device_delta_us, "bounded_device_delta_us": bounded_delta_us, "audio_offset_adjust_us": audio_delta_us, "video_offset_adjust_us": video_delta_us, "audio_target_offset_us": audio_delta_us, "video_target_offset_us": video_delta_us, "refusal_reasons": refusal_reasons, "note": note, } pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") env_values = { "output_delay_ready": str(ready).lower(), "output_delay_decision": decision, "output_delay_target": target, "output_delay_audio_delta_us": audio_delta_us, "output_delay_video_delta_us": video_delta_us, "output_delay_audio_target_offset_us": audio_delta_us, "output_delay_video_target_offset_us": video_delta_us, "output_delay_measured_skew_ms": f"{median_skew_ms:.3f}", "output_delay_paired_event_count": paired, "output_delay_drift_ms": f"{drift_ms:.3f}", "output_delay_apply_mode": apply_mode, "output_delay_note": note, } with pathlib.Path(output_env_path).open("w") as handle: for key, value in env_values.items(): handle.write(env_line(key, value)) PY } extract_server_timeline() { [[ -f "${LOCAL_SERVER_PROBE_REPLY}" ]] || return 0 python3 - <<'PY' "${LOCAL_SERVER_PROBE_REPLY}" "${LOCAL_SERVER_TIMELINE_JSON}" import json import pathlib import sys reply_path = pathlib.Path(sys.argv[1]) timeline_path = pathlib.Path(sys.argv[2]) prefix = "server_timeline_json=" raw = "" for line in reply_path.read_text(errors="replace").splitlines(): if line.startswith(prefix): raw = line[len(prefix):].strip() if not raw: raise SystemExit(0) timeline = json.loads(raw) timeline_path.write_text(json.dumps(timeline, indent=2, sort_keys=True) + "\n") PY } write_output_delay_correlation() { [[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0 [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]] || return 0 echo "==> correlating Theia feed timing with Tethys observations" python3 - <<'PY' \ "${LOCAL_ANALYSIS_JSON}" \ "${LOCAL_SERVER_TIMELINE_JSON}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" import csv import json import math import pathlib import sys report_path, timeline_path, output_json_path, output_csv_path, output_txt_path = sys.argv[1:] report = json.loads(pathlib.Path(report_path).read_text()) timeline = json.loads(pathlib.Path(timeline_path).read_text()) server_events = {int(event["event_id"]): event for event in timeline.get("events", [])} def finite(value): try: result = float(value) except Exception: return None return result if math.isfinite(result) else None def fit_linear(rows, key): points = [(row["event_time_s"], row[key]) for row in rows if row.get(key) is not None] if len(points) < 2: return { "available": False, "intercept_ms": 0.0, "slope_ms_per_s": 0.0, "r2": 0.0, "drift_ms": 0.0, } xs = [point[0] for point in points] ys = [point[1] for point in points] mean_x = sum(xs) / len(xs) mean_y = sum(ys) / len(ys) denom = sum((x - mean_x) ** 2 for x in xs) slope = 0.0 if denom == 0 else sum((x - mean_x) * (y - mean_y) for x, y in points) / denom intercept = mean_y - slope * mean_x predicted = [intercept + slope * x for x in xs] ss_tot = sum((y - mean_y) ** 2 for y in ys) ss_res = sum((y - y_hat) ** 2 for y, y_hat in zip(ys, predicted)) r2 = 1.0 if ss_tot == 0.0 else max(0.0, min(1.0, 1.0 - (ss_res / ss_tot))) drift = (intercept + slope * xs[-1]) - (intercept + slope * xs[0]) return { "available": True, "intercept_ms": intercept, "slope_ms_per_s": slope, "r2": r2, "drift_ms": drift, "first_fit_ms": intercept + slope * xs[0], "last_fit_ms": intercept + slope * xs[-1], } def correlation(rows, left_key, right_key): pairs = [ (row[left_key], row[right_key]) for row in rows if row.get(left_key) is not None and row.get(right_key) is not None ] if len(pairs) < 2: return 0.0 xs = [pair[0] for pair in pairs] ys = [pair[1] for pair in pairs] mean_x = sum(xs) / len(xs) mean_y = sum(ys) / len(ys) denom_x = sum((x - mean_x) ** 2 for x in xs) denom_y = sum((y - mean_y) ** 2 for y in ys) if denom_x <= 0.0 or denom_y <= 0.0: return 0.0 return sum((x - mean_x) * (y - mean_y) for x, y in pairs) / math.sqrt(denom_x * denom_y) joined = [] for observed in report.get("paired_events", []): event_id = int(observed.get("event_id", -1)) server = server_events.get(event_id) if not server: continue observed_skew_ms = finite(observed.get("skew_ms")) server_feed_delta_ms = finite(server.get("server_feed_delta_ms")) residual_path_skew_ms = ( observed_skew_ms - server_feed_delta_ms if observed_skew_ms is not None and server_feed_delta_ms is not None else None ) planned_start_us = int(server.get("planned_start_us", 0)) joined.append({ "event_id": event_id, "code": int(server.get("code", 0)), "event_time_s": planned_start_us / 1_000_000.0, "planned_start_us": planned_start_us, "planned_end_us": int(server.get("planned_end_us", 0)), "tethys_video_time_s": finite(observed.get("video_time_s")), "tethys_audio_time_s": finite(observed.get("audio_time_s")), "observed_skew_ms": observed_skew_ms, "server_video_feed_monotonic_us": server.get("video_feed_monotonic_us"), "server_audio_push_monotonic_us": server.get("audio_push_monotonic_us"), "server_feed_delta_ms": server_feed_delta_ms, "residual_path_skew_ms": residual_path_skew_ms, "confidence": finite(observed.get("confidence")), }) first_event_time_s = joined[0]["event_time_s"] if joined else 0.0 for row in joined: row["relative_event_time_s"] = row["event_time_s"] - first_event_time_s row["event_time_s"] = row["relative_event_time_s"] observed_model = fit_linear(joined, "observed_skew_ms") server_model = fit_linear(joined, "server_feed_delta_ms") residual_model = fit_linear(joined, "residual_path_skew_ms") server_observed_correlation = correlation(joined, "server_feed_delta_ms", "observed_skew_ms") observed_drift = observed_model.get("drift_ms", 0.0) server_drift = server_model.get("drift_ms", 0.0) residual_drift = residual_model.get("drift_ms", 0.0) same_direction = observed_drift == 0.0 or (observed_drift > 0) == (server_drift > 0) server_share = 0.0 if abs(observed_drift) < 1e-6 else abs(server_drift) / abs(observed_drift) if same_direction and server_share >= 0.5 and abs(server_drift) >= 20.0: dominant_layer = "server_feed_timing" else: dominant_layer = "post_server_output_or_tethys_capture" correction_mode = ( "linear_function_candidate" if abs(residual_drift) >= 20.0 else "scalar_candidate" ) artifact = { "schema": "lesavka.output-delay-correlation.v1", "report_json": report_path, "server_timeline_json": timeline_path, "joined_event_count": len(joined), "audio_after_video_positive": True, "observed_skew_model": observed_model, "server_feed_delta_model": server_model, "residual_path_skew_model": residual_model, "server_observed_correlation": server_observed_correlation, "server_drift_share_of_observed": server_share, "dominant_layer": dominant_layer, "correction_mode": correction_mode, "video_delay_function_candidate": { "units": "microseconds", "end_to_end": { "intercept_us": round(observed_model.get("intercept_ms", 0.0) * 1000.0), "slope_us_per_s": round(observed_model.get("slope_ms_per_s", 0.0) * 1000.0), "formula": "video_delay_us(t) = intercept_us + slope_us_per_s * seconds_since_first_event", }, "output_path_only": { "intercept_us": round(residual_model.get("intercept_ms", 0.0) * 1000.0), "slope_us_per_s": round(residual_model.get("slope_ms_per_s", 0.0) * 1000.0), "formula": "video_delay_us(t) = intercept_us + slope_us_per_s * seconds_since_first_event", }, }, "events": joined, } pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") with pathlib.Path(output_csv_path).open("w", newline="", encoding="utf-8") as handle: fieldnames = [ "event_id", "code", "event_time_s", "tethys_video_time_s", "tethys_audio_time_s", "observed_skew_ms", "server_feed_delta_ms", "residual_path_skew_ms", "server_video_feed_monotonic_us", "server_audio_push_monotonic_us", "confidence", ] writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() for row in joined: writer.writerow({key: row.get(key) for key in fieldnames}) lines = [ f"Output-delay correlation for {report_path}", f"- joined events: {len(joined)}", f"- dominant layer: {dominant_layer}", f"- correction mode: {correction_mode}", f"- observed skew model: {observed_model.get('intercept_ms', 0.0):+.3f} ms + {observed_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", f"- server feed model: {server_model.get('intercept_ms', 0.0):+.3f} ms + {server_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", f"- residual path model: {residual_model.get('intercept_ms', 0.0):+.3f} ms + {residual_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", f"- server/observed correlation: {server_observed_correlation:+.3f}", f"- server drift share of observed: {server_share:.3f}", ] summary = "\n".join(lines) + "\n" pathlib.Path(output_txt_path).write_text(summary) print(summary, end="") PY } maybe_apply_output_delay_calibration() { [[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0 [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]] || return 0 # shellcheck disable=SC1090 source "${LOCAL_OUTPUT_DELAY_ENV}" echo "==> UVC/UAC output-delay calibration decision" echo " ↪ output_delay_calibration_json=${LOCAL_OUTPUT_DELAY_JSON}" echo " ↪ output_delay_ready=${output_delay_ready:-false}" echo " ↪ output_delay_decision=${output_delay_decision:-unknown}" echo " ↪ output_delay_target=${output_delay_target:-unknown}" echo " ↪ output_delay_paired_event_count=${output_delay_paired_event_count:-0}" echo " ↪ output_delay_measured_skew_ms=${output_delay_measured_skew_ms:-0.0}" echo " ↪ output_delay_drift_ms=${output_delay_drift_ms:-0.0}" echo " ↪ output_delay_audio_delta_us=${output_delay_audio_delta_us:-0}" echo " ↪ output_delay_video_delta_us=${output_delay_video_delta_us:-0}" echo " ↪ output_delay_audio_target_offset_us=${output_delay_audio_target_offset_us:-0}" echo " ↪ output_delay_video_target_offset_us=${output_delay_video_target_offset_us:-0}" echo " ↪ output_delay_apply_mode=${output_delay_apply_mode:-${LESAVKA_OUTPUT_DELAY_APPLY_MODE}}" echo " ↪ output_delay_note=${output_delay_note:-}" if [[ "${output_delay_ready:-false}" != "true" ]]; then echo " ↪ output delay calibration apply refused: ${output_delay_note:-not ready}" return 0 fi if [[ "${LESAVKA_OUTPUT_DELAY_APPLY}" == "0" ]]; then echo " ↪ output delay calibration apply disabled" return 0 fi local apply_mode="${output_delay_apply_mode:-${LESAVKA_OUTPUT_DELAY_APPLY_MODE}}" local apply_audio_delta="${output_delay_audio_delta_us:-0}" local apply_video_delta="${output_delay_video_delta_us:-0}" if [[ "${apply_mode}" == "absolute" ]]; then local calibration_output current_audio_offset_us current_video_offset_us if ! calibration_output="$( LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ calibration 2>&1 )"; then echo " ↪ output delay calibration apply refused: current calibration query failed" echo "${calibration_output}" >&2 return 0 fi current_audio_offset_us="$(awk -F= '/^calibration_active_audio_offset_us=/{print $2; exit}' <<<"${calibration_output}")" current_video_offset_us="$(awk -F= '/^calibration_active_video_offset_us=/{print $2; exit}' <<<"${calibration_output}")" if [[ ! "${current_audio_offset_us}" =~ ^-?[0-9]+$ || ! "${current_video_offset_us}" =~ ^-?[0-9]+$ ]]; then echo " ↪ output delay calibration apply refused: could not parse active calibration offsets" echo "${calibration_output}" >&2 return 0 fi apply_audio_delta=$(( ${output_delay_audio_target_offset_us:-0} - current_audio_offset_us )) apply_video_delta=$(( ${output_delay_video_target_offset_us:-0} - current_video_offset_us )) echo " ↪ current_active_audio_offset_us=${current_audio_offset_us}" echo " ↪ current_active_video_offset_us=${current_video_offset_us}" echo " ↪ absolute_target_audio_offset_us=${output_delay_audio_target_offset_us:-0}" echo " ↪ absolute_target_video_offset_us=${output_delay_video_target_offset_us:-0}" elif [[ "${apply_mode}" != "relative" ]]; then echo " ↪ output delay calibration apply refused: unsupported apply mode ${apply_mode}" return 0 fi echo " ↪ calibration_apply_audio_delta_us=${apply_audio_delta}" echo " ↪ calibration_apply_video_delta_us=${apply_video_delta}" echo "==> applying measured UVC/UAC output-delay calibration" LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ calibrate \ "${apply_audio_delta}" \ "${apply_video_delta}" \ "${output_delay_note:-direct UVC/UAC output-delay calibration}" if [[ "${LESAVKA_OUTPUT_DELAY_SAVE}" != "0" ]]; then echo "==> saving measured UVC/UAC output-delay calibration as default" LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ calibration-save-default fi } if [[ "${PROBE_PREBUILD}" != "0" ]]; then echo "==> prebuilding relay control/analyzer before opening the capture window" ( cd "${REPO_ROOT}" cargo build -p lesavka_client --bin lesavka-sync-analyze --bin lesavka-relayctl ) fi if [[ ! -x "${ANALYZE_BIN}" ]]; then echo "sync analyzer binary not found at ${ANALYZE_BIN}" >&2 exit 1 fi resolve_server_addr echo "==> resolved Lesavka server addr: ${RESOLVED_LESAVKA_SERVER_ADDR}" if [[ -n "${SERVER_TUNNEL_PID}" ]]; then echo " ↪ tunneled to ${LESAVKA_SERVER_HOST}:127.0.0.1:${SERVER_TUNNEL_REMOTE_PORT}" fi print_lesavka_versions preflight_server_path echo "==> starting Tethys capture on ${TETHYS_HOST}" ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ "${REMOTE_CAPTURE}" \ "${CAPTURE_SECONDS}" \ "${REMOTE_VIDEO_DEVICE}" \ "${VIDEO_SIZE}" \ "${VIDEO_FPS}" \ "${VIDEO_FORMAT}" \ "${REMOTE_CAPTURE_STACK}" \ "${REMOTE_PULSE_CAPTURE_TOOL}" \ "${REMOTE_PULSE_VIDEO_MODE}" \ "${REMOTE_AUDIO_SOURCE}" \ "${REMOTE_AUDIO_QUIESCE_USER_AUDIO}" \ > >(tee "${LOCAL_CAPTURE_LOG}") \ 2> >(tee -a "${LOCAL_CAPTURE_LOG}" >&2) <<'REMOTE_CAPTURE_SCRIPT' & set -euo pipefail remote_capture=$1 capture_seconds=$2 remote_video_device=$3 video_size=$4 video_fps=$5 video_format=$6 remote_capture_stack=$7 remote_pulse_capture_tool=$8 remote_pulse_video_mode=$9 remote_audio_source=${10} remote_audio_quiesce_user_audio=${11} rm -f "${remote_capture}" restore_user_audio() { systemctl --user start pipewire.socket pipewire-pulse.socket wireplumber.service >/dev/null 2>&1 || true sleep 1 systemctl --user start pipewire.service pipewire-pulse.service >/dev/null 2>&1 || true } quiesce_user_audio() { systemctl --user stop pipewire-pulse.service pipewire.service wireplumber.service \ pipewire-pulse.socket pipewire.socket >/dev/null 2>&1 || true sleep 1 } resolve_video_device() { local requested=$1 if [[ "${requested}" != "auto" ]]; then printf '%s\n' "${requested}" return 0 fi local by_id by_id=$(find /dev/v4l/by-id -maxdepth 1 -type l -name '*Lesavka*video-index0' 2>/dev/null | head -n1 || true) if [[ -n "${by_id}" ]]; then printf '%s\n' "${by_id}" return 0 fi if command -v v4l2-ctl >/dev/null 2>&1; then local resolved resolved="$( v4l2-ctl --list-devices 2>/dev/null \ | awk ' BEGIN { want=0 } /Lesavka Composite: UVC Camera/ { want=1; next } /^[^ \t]/ { want=0 } want && /^[ \t]+\/dev\/video[0-9]+/ { gsub(/^[ \t]+/, "", $0) print exit } ' )" if [[ -n "${resolved}" ]]; then printf '%s\n' "${resolved}" return 0 fi fi printf 'Lesavka UVC video device not found on Tethys; refusing to fall back to an unrelated webcam/capture card.\n' >&2 exit 64 } resolve_pulse_source() { if ! command -v pactl >/dev/null 2>&1; then return 1 fi pactl list short sources 2>/dev/null \ | awk ' /alsa_input\..*Lesavka_Composite/ { print $2; found=1; exit } /Lesavka_Composite/ && !fallback { fallback=$2 } END { if (found) exit 0 if (fallback != "") { print fallback; exit 0 } exit 1 } ' } resolve_alsa_audio_device() { if ! command -v arecord >/dev/null 2>&1; then return 1 fi arecord -l 2>/dev/null | awk ' /^card [0-9]+:/ && ($0 ~ /Lesavka|UAC2_Gadget|UAC2Gadget|Composite/) { card=$2 sub(":", "", card) for (i = 1; i <= NF; i++) { if ($i == "device") { dev=$(i + 1) sub(":", "", dev) printf "hw:%s,%s\n", card, dev found=1 exit 0 } } } END { if (!found) exit 1 } ' } gst_video_source_caps() { case "${video_format}" in ""|mjpeg|MJPG) printf 'image/jpeg,width=%s,height=%s,framerate=%s/1' \ "${resolved_video_size%x*}" \ "${resolved_video_size#*x}" \ "${resolved_video_fps}" ;; yuyv422|YUYV|yuyv) printf 'video/x-raw,format=YUY2,width=%s,height=%s,framerate=%s/1' \ "${resolved_video_size%x*}" \ "${resolved_video_size#*x}" \ "${resolved_video_fps}" ;; *) printf 'unsupported gst video_format=%s\n' "${video_format}" >&2 exit 64 ;; esac } gst_video_decode_chain() { case "${video_format}" in ""|mjpeg|MJPG) printf 'jpegdec ! ' ;; yuyv422|YUYV|yuyv) printf '' ;; *) printf 'unsupported gst video_format=%s\n' "${video_format}" >&2 exit 64 ;; esac } current_video_profile() { if ! command -v v4l2-ctl >/dev/null 2>&1; then return 1 fi v4l2-ctl -d "${resolved_video_device}" --all 2>/dev/null \ | awk ' /Width\/Height[[:space:]]*:/ { split($0, a, ":") gsub(/^[ \t]+/, "", a[2]) split(a[2], wh, "/") width=wh[1] height=wh[2] next } /Frames per second:/ { fps=$4 sub(/\..*/, "", fps) } END { if (width != "" && height != "") { print "size=" width "x" height } if (fps != "") { print "fps=" fps } } ' } resolve_video_size() { local requested=$1 if [[ "${requested}" != "auto" ]]; then printf '%s\n' "${requested}" return 0 fi local current_profile current_profile="$(current_video_profile || true)" local current_size current_size="$(awk -F= '/^size=/{print $2; exit}' <<<"${current_profile}")" if [[ -n "${current_size}" ]]; then printf '%s\n' "${current_size}" return 0 fi if ! command -v v4l2-ctl >/dev/null 2>&1; then printf '640x480\n' return 0 fi local listing listing="$(v4l2-ctl -d "${resolved_video_device}" --list-formats-ext 2>/dev/null || true)" local preferred for preferred in 1920x1080 1360x768 1280x720; do if grep -q "Size: Discrete ${preferred}" <<<"${listing}"; then printf '%s\n' "${preferred}" return 0 fi done local first_size first_size="$(grep -m1 -o 'Size: Discrete [0-9]\+x[0-9]\+' <<<"${listing}" | awk '{print $3}' || true)" if [[ -n "${first_size}" ]]; then printf '%s\n' "${first_size}" return 0 fi printf '640x480\n' } resolve_video_fps() { local requested=$1 if [[ "${requested}" != "auto" ]]; then printf '%s\n' "${requested}" return 0 fi local current_profile current_profile="$(current_video_profile || true)" local current_fps current_fps="$(awk -F= '/^fps=/{print $2; exit}' <<<"${current_profile}")" if [[ -n "${current_fps}" ]]; then printf '%s\n' "${current_fps}" return 0 fi if ! command -v v4l2-ctl >/dev/null 2>&1; then printf '20\n' return 0 fi local listing listing="$(v4l2-ctl -d "${resolved_video_device}" --list-formats-ext 2>/dev/null || true)" local first_fps first_fps="$(grep -m1 -o '[0-9]\+\.[0-9]\+ fps' <<<"${listing}" | awk '{sub(/\..*/, "", $1); print $1}' || true)" if [[ -n "${first_fps}" ]]; then printf '%s\n' "${first_fps}" return 0 fi printf '20\n' } resolve_pw_audio_target() { if ! command -v pw-dump >/dev/null 2>&1 || ! command -v python3 >/dev/null 2>&1; then return 1 fi pw-dump | python3 -c ' import json import sys try: objs = json.load(sys.stdin) except Exception: raise SystemExit(1) for obj in objs: if obj.get("type") != "PipeWire:Interface:Node": continue props = (obj.get("info") or {}).get("props") or {} if props.get("media.class") != "Audio/Source": continue serial = props.get("object.serial") name = props.get("node.name", "") desc = props.get("node.description", "") if serial is None: continue if "Lesavka_Composite" in name or "Lesavka Composite" in desc: print(serial) raise SystemExit(0) raise SystemExit(1) ' } capture_mode="alsa" alsa_audio_dev="hw:3,0" pulse_source="" pw_audio_target="" case "${remote_capture_stack}" in auto) if [[ "${remote_audio_source}" == "auto" ]]; then if pulse_source="$(resolve_pulse_source)"; then capture_mode="pulse" elif alsa_audio_dev="$(resolve_alsa_audio_device)"; then capture_mode="alsa" printf 'PipeWire Lesavka source not found; falling back to ALSA device %s\n' "${alsa_audio_dev}" >&2 elif command -v pw-record >/dev/null 2>&1 \ && command -v pw-v4l2 >/dev/null 2>&1 \ && pw_audio_target="$(resolve_pw_audio_target)"; then capture_mode="pwpipe" else printf 'Lesavka audio source not found in PipeWire or ALSA; capture host does not currently expose the gadget microphone.\n' >&2 exit 64 fi elif [[ "${remote_audio_source}" == pulse:* ]]; then capture_mode="pulse" pulse_source="${remote_audio_source#pulse:}" elif [[ "${remote_audio_source}" == alsa:* ]]; then alsa_audio_dev="${remote_audio_source#alsa:}" else printf 'unsupported REMOTE_AUDIO_SOURCE=%s\n' "${remote_audio_source}" >&2 exit 64 fi ;; pwpipe) if ! command -v pw-record >/dev/null 2>&1 || ! command -v pw-v4l2 >/dev/null 2>&1; then printf 'REMOTE_CAPTURE_STACK=pwpipe requires pw-record and pw-v4l2\n' >&2 exit 64 fi pw_audio_target="$(resolve_pw_audio_target)" || { printf 'PipeWire Lesavka capture target not found for REMOTE_CAPTURE_STACK=pwpipe\n' >&2 exit 64 } capture_mode="pwpipe" ;; pulse) if [[ "${remote_audio_source}" == pulse:* ]]; then pulse_source="${remote_audio_source#pulse:}" elif [[ "${remote_audio_source}" == "auto" ]]; then pulse_source="$(resolve_pulse_source)" || { printf 'PipeWire Lesavka source not found for REMOTE_CAPTURE_STACK=pulse\n' >&2 exit 64 } else pulse_source="${remote_audio_source}" fi capture_mode="pulse" ;; alsa) if [[ "${remote_audio_source}" == alsa:* ]]; then alsa_audio_dev="${remote_audio_source#alsa:}" elif [[ "${remote_audio_source}" != "auto" ]]; then alsa_audio_dev="${remote_audio_source}" fi capture_mode="alsa" ;; *) printf 'unsupported REMOTE_CAPTURE_STACK=%s\n' "${remote_capture_stack}" >&2 exit 64 ;; esac resolved_video_device="$(resolve_video_device "${remote_video_device}")" resolved_video_size="$(resolve_video_size "${video_size}")" resolved_video_fps="$(resolve_video_fps "${video_fps}")" printf 'using video device: %s\n' "${resolved_video_device}" >&2 printf 'using video mode: %s @ %s fps (%s)\n' "${resolved_video_size}" "${resolved_video_fps}" "${video_format:-driver-default}" >&2 printf '%s\n' "__LESAVKA_CAPTURE_READY__" video_args=(-f video4linux2 -framerate "${resolved_video_fps}" -video_size "${resolved_video_size}") if [[ -n "${video_format}" ]]; then video_args+=(-input_format "${video_format}") fi gst_source_caps="$(gst_video_source_caps)" gst_decode_chain="$(gst_video_decode_chain)" run_ffmpeg_capture() { local rc=0 timeout --signal=INT "$((capture_seconds + 5))" "$@" || rc=$? case "${rc}" in 0|124|130) return 0 ;; *) return "${rc}" ;; esac } quiesce_for_alsa=0 case "${remote_audio_quiesce_user_audio}" in 1|true|yes) quiesce_for_alsa=1 ;; auto) if [[ "${capture_mode}" == "alsa" ]]; then quiesce_for_alsa=1 fi ;; 0|false|no) quiesce_for_alsa=0 ;; *) printf 'unsupported REMOTE_AUDIO_QUIESCE_USER_AUDIO=%s\n' "${remote_audio_quiesce_user_audio}" >&2 exit 64 ;; esac if [[ "${capture_mode}" == "alsa" && "${quiesce_for_alsa}" == "1" ]]; then printf 'quiescing Tethys user audio before raw ALSA capture\n' >&2 quiesce_user_audio trap restore_user_audio EXIT fi if [[ "${capture_mode}" == "pwpipe" ]]; then printf 'using PipeWire-native mux capture target serial: %s\n' "${pw_audio_target}" >&2 timeout "${capture_seconds}" pw-record \ --target "${pw_audio_target}" \ --rate 48000 \ --channels 2 \ --format s16 \ --raw - \ | pw-v4l2 ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f s16le -ar 48000 -ac 2 \ -i pipe:0 \ -t "${capture_seconds}" \ -c:v copy \ -c:a pcm_s16le \ "${remote_capture}" elif [[ "${capture_mode}" == "pulse" ]]; then printf 'using Pulse source: %s\n' "${pulse_source}" >&2 case "${remote_pulse_capture_tool}" in ffmpeg) case "${remote_pulse_video_mode}" in copy) run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f pulse \ -i "${pulse_source}" \ -t "${capture_seconds}" \ -c:v copy \ -c:a pcm_s16le \ "${remote_capture}" ;; cfr) run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f pulse \ -i "${pulse_source}" \ -t "${capture_seconds}" \ -vf "fps=${resolved_video_fps}" \ -c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \ -c:a pcm_s16le \ "${remote_capture}" ;; *) printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2 exit 64 ;; esac ;; gst) case "${remote_pulse_video_mode}" in copy) if [[ "${video_format}" != "mjpeg" && "${video_format}" != "MJPG" && -n "${video_format}" ]]; then printf 'gst copy mode only supports mjpeg input, got %s\n' "${video_format}" >&2 exit 64 fi timeout --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device="${resolved_video_device}" do-timestamp=true ! \ ${gst_source_caps} ! \ queue ! mux. \ pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ audioconvert ! audioresample ! queue ! mux. || true ;; cfr) timeout --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device="${resolved_video_device}" do-timestamp=true ! \ ${gst_source_caps} ! \ ${gst_decode_chain} \ videoconvert ! videorate ! video/x-raw,framerate="${resolved_video_fps}"/1 ! \ x264enc tune=zerolatency speed-preset=ultrafast key-int-max=1 bitrate=5000 ! \ h264parse ! \ queue ! mux. \ pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ audioconvert ! audioresample ! queue ! mux. || true ;; *) printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2 exit 64 ;; esac ;; *) printf 'unsupported REMOTE_PULSE_CAPTURE_TOOL=%s\n' "${remote_pulse_capture_tool}" >&2 exit 64 ;; esac else run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f alsa -ac 2 -ar 48000 \ -i "${alsa_audio_dev}" \ -t "${capture_seconds}" \ -c:v ffv1 -level 3 -g 1 \ -c:a pcm_s16le \ "${remote_capture}" fi REMOTE_CAPTURE_SCRIPT capture_pid=$! wait_for_capture_ready() { local tries=100 local i=0 while (( i < tries )); do if [[ -f "${LOCAL_CAPTURE_LOG}" ]] && grep -q "${CAPTURE_READY_MARKER}" "${LOCAL_CAPTURE_LOG}"; then return 0 fi if ! kill -0 "${capture_pid}" >/dev/null 2>&1; then capture_status=0 wait "${capture_pid}" || capture_status=$? echo "Tethys capture failed before the sync probe could start; see ${LOCAL_CAPTURE_LOG} for details." >&2 exit "${capture_status}" fi sleep 0.1 ((i += 1)) done echo "Timed out waiting for Tethys capture to become ready; see ${LOCAL_CAPTURE_LOG} for details." >&2 exit 90 } wait_for_capture_ready sleep "${LEAD_IN_SECONDS}" echo "==> running server-generated UVC/UAC output-delay probe against ${RESOLVED_LESAVKA_SERVER_ADDR}" probe_status=0 probe_timed_out=0 set +e ( cd "${REPO_ROOT}" LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ timeout --signal=INT "${PROBE_TIMEOUT_SECONDS}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ output-delay-probe \ "${PROBE_DURATION_SECONDS}" \ "${PROBE_WARMUP_SECONDS}" \ "${PROBE_PULSE_PERIOD_MS}" \ "${PROBE_PULSE_WIDTH_MS}" \ "${PROBE_EVENT_WIDTH_CODES}" \ "${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US}" \ "${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US}" ) 2>&1 | tee "${LOCAL_SERVER_PROBE_REPLY}" probe_status=${PIPESTATUS[0]} set -e if [[ "${probe_status}" -eq 124 ]]; then probe_timed_out=1 fi extract_server_timeline capture_status=0 wait "${capture_pid}" || capture_status=$? capture_v4l2_fault=0 if [[ -f "${LOCAL_CAPTURE_LOG}" ]] \ && grep -q 'VIDIOC_QBUF): Bad file descriptor' "${LOCAL_CAPTURE_LOG}"; then capture_v4l2_fault=1 fi capture_streamon_timeout=0 if [[ -f "${LOCAL_CAPTURE_LOG}" ]] \ && grep -q 'VIDIOC_STREAMON.*Connection timed out' "${LOCAL_CAPTURE_LOG}"; then capture_streamon_timeout=1 fi if ssh ${SSH_OPTS} "${TETHYS_HOST}" "test -f '${REMOTE_CAPTURE}'"; then remote_fetch_capture="${REMOTE_CAPTURE}" if [[ "${ANALYSIS_NORMALIZE}" != "0" ]]; then remote_fetch_capture="${REMOTE_CAPTURE%.mkv}-analysis.mkv" echo "==> normalizing remote capture to CFR for analysis" normalize_status=0 ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ "${REMOTE_CAPTURE}" \ "${remote_fetch_capture}" \ "${VIDEO_FPS}" \ "${ANALYSIS_SCALE_WIDTH}" <<'REMOTE_NORMALIZE_SCRIPT' || normalize_status=$? set -euo pipefail src=$1 dst=$2 fps=$3 scale_width=$4 ffmpeg -hide_banner -loglevel error -y \ -i "${src}" \ -vf "fps=${fps},scale='min(${scale_width},iw)':-2" \ -c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \ -c:a pcm_s16le \ "${dst}" REMOTE_NORMALIZE_SCRIPT if [[ "${normalize_status}" -ne 0 ]]; then echo "remote CFR normalization failed; falling back to raw capture" >&2 remote_fetch_capture="${REMOTE_CAPTURE}" fi fi if [[ "${REMOTE_ANALYZE}" != "0" ]]; then if [[ "${REMOTE_ANALYZE_COPY}" != "0" ]]; then echo "==> copying sync analyzer to ${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}" scp ${SSH_OPTS} "${ANALYZE_BIN}" "${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}" fi echo "==> analyzing capture on ${TETHYS_HOST}" ssh ${SSH_OPTS} "${TETHYS_HOST}" \ "chmod +x '${REMOTE_ANALYZE_BIN}' && '${REMOTE_ANALYZE_BIN}' '${remote_fetch_capture}' --json --event-width-codes '${PROBE_EVENT_WIDTH_CODES}'" \ > "${LOCAL_ANALYSIS_JSON}" fi if [[ "${FETCH_CAPTURE}" != "0" ]]; then echo "==> fetching capture back to ${LOCAL_CAPTURE}" scp ${SSH_OPTS} "${TETHYS_HOST}:${remote_fetch_capture}" "${LOCAL_CAPTURE}" fi fi if [[ "${probe_status}" -ne 0 ]]; then if [[ "${probe_timed_out}" -eq 1 ]]; then echo "server output-delay probe timed out after ${PROBE_TIMEOUT_SECONDS}s; this usually means one UVC/UAC output sink did not close cleanly." >&2 fi echo "server output-delay probe failed with status ${probe_status}" >&2 [[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2 exit "${probe_status}" fi if [[ "${capture_status}" -ne 0 ]]; then if [[ "${capture_streamon_timeout}" -eq 1 ]]; then echo "Tethys capture timed out during VIDIOC_STREAMON; the UVC host opened before MJPEG frames reached the gadget." >&2 echo "Keep LEAD_IN_SECONDS=0 and restart lesavka-uvc/lesavka-server before retrying if the gadget is wedged from an earlier failed run." >&2 fi if [[ "${capture_status}" -eq 141 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved analysis artifacts" >&2 elif [[ "${capture_status}" -eq 124 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then echo "Tethys capture timed out after preserving analysis artifacts; accepting the run for analysis" >&2 else echo "Tethys capture failed with status ${capture_status}" >&2 [[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2 exit "${capture_status}" fi fi if [[ "${REMOTE_ANALYZE}" != "0" ]]; then if [[ ! -f "${LOCAL_ANALYSIS_JSON}" ]]; then echo "remote analysis did not produce ${LOCAL_ANALYSIS_JSON}" >&2 exit 92 fi echo "==> remote analysis summary" python - <<'PY' "${LOCAL_ANALYSIS_JSON}" "${LOCAL_REPORT_TXT}" "${LOCAL_EVENTS_CSV}" import csv import json import pathlib import sys report = json.loads(pathlib.Path(sys.argv[1]).read_text()) verdict = report.get('verdict', {}) cal = report.get('calibration', {}) lines = [ f"A/V sync report for {sys.argv[1]}", f"- verdict: {verdict.get('status', 'unknown')} ({'pass' if verdict.get('passed') else 'fail'})", f"- verdict reason: {verdict.get('reason', '')}", f"- p95 abs skew: {float(verdict.get('p95_abs_skew_ms', 0.0)):.1f} ms", f"- video onsets: {report['video_event_count']}", f"- audio onsets: {report['audio_event_count']}", f"- paired pulses: {report['paired_event_count']}", f"- activity start delta: {report.get('activity_start_delta_ms', 0.0):+.1f} ms (audio after video is positive)", f"- first skew: {report['first_skew_ms']:+.1f} ms (audio after video is positive)", f"- last skew: {report['last_skew_ms']:+.1f} ms", f"- mean skew: {report['mean_skew_ms']:+.1f} ms", f"- median skew: {report['median_skew_ms']:+.1f} ms", f"- max abs skew: {report['max_abs_skew_ms']:.1f} ms", f"- drift: {report['drift_ms']:+.1f} ms", f"- calibration ready: {cal.get('ready')}", f"- recommended audio offset adjust: {int(cal.get('recommended_audio_offset_adjust_us', 0)):+d} us", f"- alternative video offset adjust: {int(cal.get('recommended_video_offset_adjust_us', 0)):+d} us", f"- calibration note: {cal.get('note', '')}", ] summary = "\n".join(lines) + "\n" pathlib.Path(sys.argv[2]).write_text(summary) with pathlib.Path(sys.argv[3]).open("w", newline="") as handle: writer = csv.DictWriter( handle, fieldnames=["event_id", "video_time_s", "audio_time_s", "skew_ms", "confidence"], ) writer.writeheader() for event in report.get("paired_events", []): writer.writerow({ "event_id": event.get("event_id"), "video_time_s": event.get("video_time_s"), "audio_time_s": event.get("audio_time_s"), "skew_ms": event.get("skew_ms"), "confidence": event.get("confidence"), }) print(summary, end="") PY else if [[ ! -f "${LOCAL_CAPTURE}" ]]; then echo "capture was not fetched and REMOTE_ANALYZE=0 left nothing local to analyze" >&2 exit 93 fi echo "==> analyzing capture" ( cd "${REPO_ROOT}" "${ANALYZE_BIN}" "${LOCAL_CAPTURE}" --report-dir "${LOCAL_REPORT_DIR}" --event-width-codes "${PROBE_EVENT_WIDTH_CODES}" ) fi write_output_delay_correlation write_output_delay_calibration maybe_apply_output_delay_calibration if [[ "${capture_v4l2_fault}" -eq 1 ]]; then echo "warning: Tethys video capture reported VIDIOC_QBUF / Bad file descriptor; treat unstable skew or analyzer failures as host-capture suspect" >&2 fi echo "==> done" echo "artifact_dir: ${LOCAL_REPORT_DIR}" if [[ -f "${LOCAL_CAPTURE}" ]]; then echo "capture: ${LOCAL_CAPTURE}" fi if [[ -f "${LOCAL_ANALYSIS_JSON}" ]]; then echo "report_json: ${LOCAL_ANALYSIS_JSON}" fi if [[ -f "${LOCAL_REPORT_TXT}" ]]; then echo "report_txt: ${LOCAL_REPORT_TXT}" fi if [[ -f "${LOCAL_EVENTS_CSV}" ]]; then echo "events_csv: ${LOCAL_EVENTS_CSV}" fi if [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]]; then echo "server_timeline_json: ${LOCAL_SERVER_TIMELINE_JSON}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" ]]; then echo "output_delay_correlation_json: ${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" ]]; then echo "output_delay_correlation_csv: ${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" ]]; then echo "output_delay_correlation_txt: ${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_JSON}" ]]; then echo "output_delay_calibration_json: ${LOCAL_OUTPUT_DELAY_JSON}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]]; then echo "output_delay_calibration_env: ${LOCAL_OUTPUT_DELAY_ENV}" fi if [[ -f "${LOCAL_CAPTURE_LOG}" ]]; then echo "capture_log: ${LOCAL_CAPTURE_LOG}" fi