#!/usr/bin/env bash # scripts/manual/run_upstream_av_sync.sh # Manual: upstream A/V sync hardware probe; not part of CI. # # Manual: capture the real Tethys UVC/UAC endpoints while the Lesavka server # generates paired probe media locally and feeds its own UVC/UAC output sinks. # This intentionally measures only the server-to-host output-device skew. set -euo pipefail SCRIPT_DIR="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)" REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../.." >/dev/null 2>&1 && pwd)" TETHYS_HOST=${TETHYS_HOST:-tethys} LESAVKA_SERVER_HOST=${LESAVKA_SERVER_HOST:-theia} LESAVKA_SERVER_CONNECT_HOST=${LESAVKA_SERVER_CONNECT_HOST:-38.28.125.112} LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-auto} LESAVKA_SERVER_SCHEME=${LESAVKA_SERVER_SCHEME:-https} LESAVKA_TLS_DOMAIN=${LESAVKA_TLS_DOMAIN:-lesavka-server} PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-20} PROBE_WARMUP_SECONDS=${PROBE_WARMUP_SECONDS:-4} PROBE_START_GRACE_SECONDS=${PROBE_START_GRACE_SECONDS:-20} PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + PROBE_START_GRACE_SECONDS))} PROBE_PULSE_PERIOD_MS=${PROBE_PULSE_PERIOD_MS:-1000} PROBE_PULSE_WIDTH_MS=${PROBE_PULSE_WIDTH_MS:-120} PROBE_EVENT_WIDTH_CODES=${PROBE_EVENT_WIDTH_CODES:-1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16} PROBE_CONDITIONING_SECONDS=${PROBE_CONDITIONING_SECONDS:-0} PROBE_CONDITIONING_WARMUP_SECONDS=${PROBE_CONDITIONING_WARMUP_SECONDS:-1} PROBE_CONDITIONING_GAP_SECONDS=${PROBE_CONDITIONING_GAP_SECONDS:-1} PROBE_CONDITIONING_EVENT_WIDTH_CODES=${PROBE_CONDITIONING_EVENT_WIDTH_CODES:-${PROBE_EVENT_WIDTH_CODES}} PROBE_CONDITIONING_TIMEOUT_SECONDS=${PROBE_CONDITIONING_TIMEOUT_SECONDS:-$((PROBE_CONDITIONING_SECONDS + PROBE_CONDITIONING_WARMUP_SECONDS + PROBE_START_GRACE_SECONDS))} LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US:-0} LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US:-0} # Do not open the UVC host capture far ahead of the probe. The gadget side only # has frames once the sync probe is feeding the server, and some hosts time out # VIDIOC_STREAMON if the camera is starved during pre-roll. LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0} TAIL_SECONDS=${TAIL_SECONDS:-2} CAPTURE_START_GRACE_SECONDS=${CAPTURE_START_GRACE_SECONDS:-12} CONDITIONING_CAPTURE_SECONDS=$((PROBE_CONDITIONING_SECONDS > 0 ? PROBE_CONDITIONING_SECONDS + PROBE_CONDITIONING_WARMUP_SECONDS + PROBE_CONDITIONING_GAP_SECONDS : 0)) CAPTURE_SECONDS=${CAPTURE_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + CONDITIONING_CAPTURE_SECONDS + CAPTURE_START_GRACE_SECONDS + LEAD_IN_SECONDS + TAIL_SECONDS))} LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-/tmp} REMOTE_VIDEO_DEVICE=${REMOTE_VIDEO_DEVICE:-auto} VIDEO_SIZE=${VIDEO_SIZE:-auto} VIDEO_FPS=${VIDEO_FPS:-auto} VIDEO_FORMAT=${VIDEO_FORMAT:-mjpeg} REMOTE_CAPTURE_STACK=${REMOTE_CAPTURE_STACK:-pulse} REMOTE_PULSE_CAPTURE_TOOL=${REMOTE_PULSE_CAPTURE_TOOL:-gst} REMOTE_PULSE_VIDEO_MODE=${REMOTE_PULSE_VIDEO_MODE:-copy} REMOTE_PULSE_AUDIO_ANCHOR_SILENCE=${REMOTE_PULSE_AUDIO_ANCHOR_SILENCE:-1} REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto} REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto} REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK=${REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK:-0} REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS=${REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS:-0} REMOTE_CAPTURE_READY_SETTLE_SECONDS=${REMOTE_CAPTURE_READY_SETTLE_SECONDS:-1} ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-0} ANALYSIS_SCALE_WIDTH=${ANALYSIS_SCALE_WIDTH:-1280} ANALYSIS_TIMELINE_WINDOW=${ANALYSIS_TIMELINE_WINDOW:-0} ANALYSIS_TIMELINE_WINDOW_PADDING_SECONDS=${ANALYSIS_TIMELINE_WINDOW_PADDING_SECONDS:-1.0} SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=30"} PROBE_PREBUILD=${PROBE_PREBUILD:-1} ANALYZE_BIN=${ANALYZE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-analyze"} REMOTE_ANALYZE=${REMOTE_ANALYZE:-1} REMOTE_ANALYZE_BIN=${REMOTE_ANALYZE_BIN:-/tmp/lesavka-sync-analyze} REMOTE_ANALYZE_COPY=${REMOTE_ANALYZE_COPY:-1} FETCH_CAPTURE=${FETCH_CAPTURE:-1} REMOTE_ARTIFACT_RETRIES=${REMOTE_ARTIFACT_RETRIES:-3} REMOTE_ARTIFACT_RETRY_DELAY_SECONDS=${REMOTE_ARTIFACT_RETRY_DELAY_SECONDS:-5} REMOTE_SERVER_PREFLIGHT=${REMOTE_SERVER_PREFLIGHT:-1} REMOTE_EXPECT_CAM_OUTPUT=${REMOTE_EXPECT_CAM_OUTPUT:-uvc} REMOTE_EXPECT_UVC_CODEC=${REMOTE_EXPECT_UVC_CODEC:-mjpeg} REMOTE_EXPECT_UVC_WIDTH=${REMOTE_EXPECT_UVC_WIDTH:-} REMOTE_EXPECT_UVC_HEIGHT=${REMOTE_EXPECT_UVC_HEIGHT:-} REMOTE_EXPECT_UVC_FPS=${REMOTE_EXPECT_UVC_FPS:-} LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1} LESAVKA_OUTPUT_DELAY_APPLY=${LESAVKA_OUTPUT_DELAY_APPLY:-0} LESAVKA_OUTPUT_DELAY_APPLY_MODE=${LESAVKA_OUTPUT_DELAY_APPLY_MODE:-absolute} LESAVKA_OUTPUT_DELAY_CONFIRM=${LESAVKA_OUTPUT_DELAY_CONFIRM:-1} LESAVKA_OUTPUT_DELAY_SAVE=${LESAVKA_OUTPUT_DELAY_SAVE:-0} LESAVKA_OUTPUT_REQUIRE_SYNC_PASS=${LESAVKA_OUTPUT_REQUIRE_SYNC_PASS:-0} LESAVKA_OUTPUT_DELAY_TARGET=${LESAVKA_OUTPUT_DELAY_TARGET:-video} LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-13} LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000} LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80} LESAVKA_OUTPUT_DELAY_GAIN=${LESAVKA_OUTPUT_DELAY_GAIN:-1.0} LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000} LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS:-1000} LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS:-100} LESAVKA_OUTPUT_FRESHNESS_MAX_CLOCK_UNCERTAINTY_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_CLOCK_UNCERTAINTY_MS:-250} LESAVKA_OUTPUT_FRESHNESS_MIN_PAIRS=${LESAVKA_OUTPUT_FRESHNESS_MIN_PAIRS:-${LESAVKA_OUTPUT_DELAY_MIN_PAIRS}} LESAVKA_CLOCK_ALIGNMENT_SAMPLES=${LESAVKA_CLOCK_ALIGNMENT_SAMPLES:-5} CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__" STAMP="$(date +%Y%m%d-%H%M%S)" REMOTE_CAPTURE=${REMOTE_CAPTURE:-"/tmp/lesavka-output-delay-probe-${STAMP}.mkv"} LOCAL_REPORT_DIR="${LOCAL_OUTPUT_DIR%/}/lesavka-output-delay-probe-${STAMP}" LOCAL_CAPTURE="${LOCAL_REPORT_DIR}/capture.mkv" LOCAL_ANALYSIS_JSON="${LOCAL_REPORT_DIR}/report.json" LOCAL_ANALYSIS_ERROR_LOG="${LOCAL_REPORT_DIR}/analysis-error.log" LOCAL_REPORT_TXT="${LOCAL_REPORT_DIR}/report.txt" LOCAL_EVENTS_CSV="${LOCAL_REPORT_DIR}/events.csv" LOCAL_SERVER_PROBE_REPLY="${LOCAL_REPORT_DIR}/server-output-probe-reply.txt" LOCAL_SERVER_TIMELINE_JSON="${LOCAL_REPORT_DIR}/server-output-timeline.json" LOCAL_SIGNAL_CONDITIONING_REPLY="${LOCAL_REPORT_DIR}/signal-conditioning-probe-reply.txt" LOCAL_SIGNAL_CONDITIONING_TIMELINE_JSON="${LOCAL_REPORT_DIR}/signal-conditioning-timeline.json" LOCAL_OUTPUT_DELAY_CORRELATION_JSON="${LOCAL_REPORT_DIR}/output-delay-correlation.json" LOCAL_OUTPUT_DELAY_CORRELATION_CSV="${LOCAL_REPORT_DIR}/output-delay-correlation.csv" LOCAL_OUTPUT_DELAY_CORRELATION_TXT="${LOCAL_REPORT_DIR}/output-delay-correlation.txt" LOCAL_CLOCK_ALIGNMENT_JSON="${LOCAL_REPORT_DIR}/clock-alignment.json" LOCAL_OUTPUT_DELAY_JSON="${LOCAL_REPORT_DIR}/output-delay-calibration.json" LOCAL_OUTPUT_DELAY_ENV="${LOCAL_REPORT_DIR}/output-delay-calibration.env" LOCAL_CAPTURE_LOG="${LOCAL_REPORT_DIR}/capture.log" mkdir -p "${LOCAL_REPORT_DIR}" RESOLVED_LESAVKA_SERVER_ADDR="" SERVER_TUNNEL_PID="" SERVER_TUNNEL_REMOTE_PORT="" SERVER_TUNNEL_LOCAL_PORT="" cleanup_server_tunnel() { if [[ -z "${SERVER_TUNNEL_PID}" ]]; then return 0 fi if kill -0 "${SERVER_TUNNEL_PID}" >/dev/null 2>&1; then kill "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true wait "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true fi } trap cleanup_server_tunnel EXIT pick_local_server_tunnel_port() { python3 - <<'PY' import socket with socket.socket() as sock: sock.bind(("127.0.0.1", 0)) print(sock.getsockname()[1]) PY } sample_host_clock_offset_ns() { local host=$1 local before_ns remote_ns after_ns before_ns="$(date +%s%N)" remote_ns="$(ssh ${SSH_OPTS} "${host}" 'date +%s%N')" after_ns="$(date +%s%N)" python3 - <<'PY' "${before_ns}" "${remote_ns}" "${after_ns}" import sys before_ns, remote_ns, after_ns = (int(value) for value in sys.argv[1:]) local_mid_ns = (before_ns + after_ns) // 2 print(f"{remote_ns - local_mid_ns} {(after_ns - before_ns) // 2} {after_ns - before_ns}") PY } sample_best_host_clock_offset_ns() { local host=$1 local samples=$2 if python3 - <<'PY' "${host}" "${samples}" "${SSH_OPTS}"; then import shlex import subprocess import sys import time host = sys.argv[1] try: samples = max(1, int(sys.argv[2])) except Exception: samples = 5 ssh_opts = shlex.split(sys.argv[3]) remote_code = ( "import sys,time\n" "for _line in sys.stdin:\n" " print(time.time_ns(), flush=True)\n" ) cmd = [ "ssh", *ssh_opts, host, "python3 -u -c " + shlex.quote(remote_code), ] try: proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, bufsize=1, ) except Exception: raise SystemExit(1) rows = [] try: for _ in range(samples): if proc.stdin is None or proc.stdout is None: break before_ns = time.time_ns() try: proc.stdin.write("x\n") proc.stdin.flush() line = proc.stdout.readline() except (BrokenPipeError, OSError): break after_ns = time.time_ns() if not line: break try: remote_ns = int(line.strip()) except Exception: continue local_mid_ns = (before_ns + after_ns) // 2 rtt_ns = after_ns - before_ns rows.append((rtt_ns // 2, remote_ns - local_mid_ns, rtt_ns)) finally: try: if proc.stdin is not None: proc.stdin.close() except Exception: pass try: proc.wait(timeout=2) except Exception: try: proc.kill() except Exception: pass if not rows: raise SystemExit(1) uncertainty_ns, offset_ns, rtt_ns = min(rows) print(f"{offset_ns} {uncertainty_ns} {rtt_ns} {len(rows)} persistent-ssh-python") PY return 0 fi local tmp tmp="$(mktemp)" local i=0 while (( i < samples )); do sample_host_clock_offset_ns "${host}" >>"${tmp}" 2>/dev/null || true ((i += 1)) done python3 - <<'PY' "${tmp}" import pathlib import sys rows = [] for line in pathlib.Path(sys.argv[1]).read_text().splitlines(): try: offset_ns, uncertainty_ns, rtt_ns = (int(value) for value in line.split()) except Exception: continue rows.append((uncertainty_ns, offset_ns, rtt_ns)) if not rows: raise SystemExit(1) uncertainty_ns, offset_ns, rtt_ns = min(rows) print(f"{offset_ns} {uncertainty_ns} {rtt_ns} {len(rows)} ssh-date") PY local rc=$? rm -f "${tmp}" return "${rc}" } sample_server_to_capture_clock_offset_ns() { local server_host=$1 local capture_host=$2 local samples=$3 python3 - <<'PY' "${server_host}" "${capture_host}" "${samples}" "${SSH_OPTS}" import shlex import subprocess import sys server_host = sys.argv[1] capture_host = sys.argv[2] try: samples = max(1, int(sys.argv[3])) except Exception: samples = 5 ssh_opts_text = sys.argv[4] ssh_opts = shlex.split(ssh_opts_text) remote_code = r''' import shlex import subprocess import sys import time capture_host = sys.argv[1] samples = max(1, int(sys.argv[2])) ssh_opts = shlex.split(sys.argv[3]) capture_code = ( "import sys,time\n" "for _line in sys.stdin:\n" " print(time.time_ns(), flush=True)\n" ) cmd = [ "ssh", *ssh_opts, capture_host, "python3 -u -c " + shlex.quote(capture_code), ] try: proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, bufsize=1, ) except Exception: raise SystemExit(1) rows = [] try: for _ in range(samples): if proc.stdin is None or proc.stdout is None: break before_ns = time.time_ns() try: proc.stdin.write("x\n") proc.stdin.flush() line = proc.stdout.readline() except (BrokenPipeError, OSError): break after_ns = time.time_ns() if not line: break try: remote_ns = int(line.strip()) except Exception: continue local_mid_ns = (before_ns + after_ns) // 2 rtt_ns = after_ns - before_ns rows.append((rtt_ns // 2, remote_ns - local_mid_ns, rtt_ns)) finally: try: if proc.stdin is not None: proc.stdin.close() except Exception: pass try: proc.wait(timeout=2) except Exception: try: proc.kill() except Exception: pass if not rows: raise SystemExit(1) uncertainty_ns, offset_ns, rtt_ns = min(rows) print(f"{offset_ns} {uncertainty_ns} {rtt_ns} {len(rows)} server-to-capture-persistent-ssh-python") ''' remote_cmd = ( "python3 -u -c " + shlex.quote(remote_code) + " " + shlex.quote(capture_host) + " " + shlex.quote(str(samples)) + " " + shlex.quote(ssh_opts_text) ) result = subprocess.run( ["ssh", *ssh_opts, server_host, remote_cmd], check=False, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, ) if result.returncode != 0: raise SystemExit(result.returncode) for line in result.stdout.splitlines(): if line.strip(): print(line.strip()) raise SystemExit(0) raise SystemExit(1) PY } write_clock_alignment() { echo "==> sampling Theia/Tethys clock alignment for freshness" local direct_sample if direct_sample="$(sample_server_to_capture_clock_offset_ns "${LESAVKA_SERVER_HOST}" "${TETHYS_HOST}" "${LESAVKA_CLOCK_ALIGNMENT_SAMPLES}")"; then python3 - <<'PY' \ "${LESAVKA_SERVER_HOST}" \ "${TETHYS_HOST}" \ "${direct_sample}" \ "${LOCAL_CLOCK_ALIGNMENT_JSON}" import json import pathlib import sys server_host, capture_host, sample, output_path = sys.argv[1:] parts = sample.split() offset_ns, uncertainty_ns, rtt_ns, sample_count = (int(value) for value in parts[:4]) method = parts[4] if len(parts) > 4 else "server-to-capture-ssh" artifact = { "schema": "lesavka.clock-alignment.v1", "available": True, "method": "server host to capture host persistent ssh midpoint", "server_host": server_host, "capture_host": capture_host, "server_clock_offset_from_local_ns": None, "capture_clock_offset_from_local_ns": None, "theia_to_tethys_offset_ns": offset_ns, "uncertainty_ns": uncertainty_ns, "uncertainty_ms": uncertainty_ns / 1_000_000.0, "server_sample_rtt_ns": 0, "capture_sample_rtt_ns": rtt_ns, "server_samples": sample_count, "capture_samples": sample_count, "server_sample_method": "server-host-local-clock", "capture_sample_method": method, } pathlib.Path(output_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") print(f" ↪ theia_to_tethys_offset_ms={offset_ns / 1_000_000.0:+.3f}") print(f" ↪ clock_alignment_uncertainty_ms={uncertainty_ns / 1_000_000.0:.3f}") PY return 0 fi echo " ↪ server-to-capture clock alignment unavailable; falling back to client-mediated SSH samples" local theia_sample tethys_sample if ! theia_sample="$(sample_best_host_clock_offset_ns "${LESAVKA_SERVER_HOST}" "${LESAVKA_CLOCK_ALIGNMENT_SAMPLES}")"; then echo " ↪ clock alignment unavailable: failed to sample ${LESAVKA_SERVER_HOST}" printf '{"schema":"lesavka.clock-alignment.v1","available":false,"reason":"failed to sample server host"}\n' >"${LOCAL_CLOCK_ALIGNMENT_JSON}" return 0 fi if ! tethys_sample="$(sample_best_host_clock_offset_ns "${TETHYS_HOST}" "${LESAVKA_CLOCK_ALIGNMENT_SAMPLES}")"; then echo " ↪ clock alignment unavailable: failed to sample ${TETHYS_HOST}" printf '{"schema":"lesavka.clock-alignment.v1","available":false,"reason":"failed to sample capture host"}\n' >"${LOCAL_CLOCK_ALIGNMENT_JSON}" return 0 fi python3 - <<'PY' \ "${LESAVKA_SERVER_HOST}" \ "${TETHYS_HOST}" \ "${theia_sample}" \ "${tethys_sample}" \ "${LOCAL_CLOCK_ALIGNMENT_JSON}" import json import pathlib import sys server_host, capture_host, server_sample, capture_sample, output_path = sys.argv[1:] server_parts = server_sample.split() capture_parts = capture_sample.split() server_offset_ns, server_uncertainty_ns, server_rtt_ns, server_samples = (int(value) for value in server_parts[:4]) capture_offset_ns, capture_uncertainty_ns, capture_rtt_ns, capture_samples = (int(value) for value in capture_parts[:4]) server_method = server_parts[4] if len(server_parts) > 4 else "ssh-date" capture_method = capture_parts[4] if len(capture_parts) > 4 else "ssh-date" theia_to_tethys_offset_ns = capture_offset_ns - server_offset_ns uncertainty_ns = server_uncertainty_ns + capture_uncertainty_ns artifact = { "schema": "lesavka.clock-alignment.v1", "available": True, "method": "persistent ssh midpoint" if server_method == capture_method == "persistent-ssh-python" else "ssh midpoint", "server_host": server_host, "capture_host": capture_host, "server_clock_offset_from_local_ns": server_offset_ns, "capture_clock_offset_from_local_ns": capture_offset_ns, "theia_to_tethys_offset_ns": theia_to_tethys_offset_ns, "uncertainty_ns": uncertainty_ns, "uncertainty_ms": uncertainty_ns / 1_000_000.0, "server_sample_rtt_ns": server_rtt_ns, "capture_sample_rtt_ns": capture_rtt_ns, "server_samples": server_samples, "capture_samples": capture_samples, "server_sample_method": server_method, "capture_sample_method": capture_method, } pathlib.Path(output_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") print( f" ↪ theia_to_tethys_offset_ms={theia_to_tethys_offset_ns / 1_000_000.0:+.3f}" ) print(f" ↪ clock_alignment_uncertainty_ms={uncertainty_ns / 1_000_000.0:.3f}") PY } wait_for_server_tunnel() { local local_port=$1 local tries=50 local i=0 while (( i < tries )); do if nc -z 127.0.0.1 "${local_port}" >/dev/null 2>&1; then return 0 fi if ! kill -0 "${SERVER_TUNNEL_PID}" >/dev/null 2>&1; then wait "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true echo "SSH tunnel to ${LESAVKA_SERVER_HOST} exited before becoming ready" >&2 exit 88 fi sleep 0.1 ((i += 1)) done echo "SSH tunnel to ${LESAVKA_SERVER_HOST} did not become ready on localhost:${local_port}" >&2 exit 89 } start_server_tunnel() { local remote_port=$1 local local_port local_port="$(pick_local_server_tunnel_port)" echo "==> opening SSH tunnel to ${LESAVKA_SERVER_HOST}:127.0.0.1:${remote_port} on localhost:${local_port}" ssh ${SSH_OPTS} -o ExitOnForwardFailure=yes \ -N \ -L "127.0.0.1:${local_port}:127.0.0.1:${remote_port}" \ "${LESAVKA_SERVER_HOST}" & SERVER_TUNNEL_PID=$! SERVER_TUNNEL_REMOTE_PORT="${remote_port}" SERVER_TUNNEL_LOCAL_PORT="${local_port}" wait_for_server_tunnel "${local_port}" } retry_remote_artifact_command() { local description=$1 shift local attempt status for attempt in $(seq 1 "${REMOTE_ARTIFACT_RETRIES}"); do set +e "$@" status=$? set -e if [[ "${status}" -eq 0 ]]; then return 0 fi if [[ "${attempt}" -lt "${REMOTE_ARTIFACT_RETRIES}" ]]; then printf '%s failed with status %s; retrying %s/%s in %ss\n' \ "${description}" "${status}" "$((attempt + 1))" "${REMOTE_ARTIFACT_RETRIES}" \ "${REMOTE_ARTIFACT_RETRY_DELAY_SECONDS}" >&2 sleep "${REMOTE_ARTIFACT_RETRY_DELAY_SECONDS}" fi done printf '%s failed after %s attempt(s)\n' "${description}" "${REMOTE_ARTIFACT_RETRIES}" >&2 return "${status}" } run_remote_sync_analysis_once() { local output_path=$1 local error_path=${2:-/dev/null} ssh ${SSH_OPTS} "${TETHYS_HOST}" \ "chmod +x '${REMOTE_ANALYZE_BIN}' && '${REMOTE_ANALYZE_BIN}' '${remote_fetch_capture}' --json --event-width-codes '${PROBE_EVENT_WIDTH_CODES}' ${analysis_window_arg}" \ > "${output_path}" 2> "${error_path}" } resolve_server_addr() { if [[ "${LESAVKA_SERVER_ADDR}" != "auto" ]]; then RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_ADDR}" return 0 fi local bind_addr port bind_addr="$( ssh ${SSH_OPTS} "${LESAVKA_SERVER_HOST}" \ "grep -E '^LESAVKA_SERVER_BIND_ADDR=' /etc/lesavka/server.env 2>/dev/null | tail -n1 | cut -d= -f2-" \ 2>/dev/null || true )" port="${bind_addr##*:}" if [[ "${port}" =~ ^[0-9]+$ ]]; then start_server_tunnel "${port}" RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}" return 0 fi start_server_tunnel "50051" RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}" } preflight_server_path() { [[ "${REMOTE_SERVER_PREFLIGHT}" != "0" ]] || return 0 echo "==> verifying Lesavka server path on ${LESAVKA_SERVER_HOST}" ssh ${SSH_OPTS} "${LESAVKA_SERVER_HOST}" bash -s -- \ "${REMOTE_EXPECT_CAM_OUTPUT}" \ "${REMOTE_EXPECT_UVC_CODEC}" \ "${REMOTE_EXPECT_UVC_WIDTH}" \ "${REMOTE_EXPECT_UVC_HEIGHT}" \ "${REMOTE_EXPECT_UVC_FPS}" <<'REMOTE_PREFLIGHT' set -euo pipefail expect_cam_output=${1:-} expect_uvc_codec=${2:-} expect_uvc_width=${3:-} expect_uvc_height=${4:-} expect_uvc_fps=${5:-} read_env_value() { local key=$1 local file=$2 local value="" value=$(grep -E "^${key}=" "$file" 2>/dev/null | tail -n1 | cut -d= -f2- || true) printf '%s\n' "$value" } cam_output=$(read_env_value "LESAVKA_CAM_OUTPUT" /etc/lesavka/server.env) server_uvc_codec=$(read_env_value "LESAVKA_UVC_CODEC" /etc/lesavka/server.env) runtime_uvc_codec=$(read_env_value "LESAVKA_UVC_CODEC" /etc/lesavka/uvc.env) server_uvc_width=$(read_env_value "LESAVKA_UVC_WIDTH" /etc/lesavka/server.env) server_uvc_height=$(read_env_value "LESAVKA_UVC_HEIGHT" /etc/lesavka/server.env) server_uvc_fps=$(read_env_value "LESAVKA_UVC_FPS" /etc/lesavka/server.env) runtime_uvc_width=$(read_env_value "LESAVKA_UVC_WIDTH" /etc/lesavka/uvc.env) runtime_uvc_height=$(read_env_value "LESAVKA_UVC_HEIGHT" /etc/lesavka/uvc.env) runtime_uvc_fps=$(read_env_value "LESAVKA_UVC_FPS" /etc/lesavka/uvc.env) effective_uvc_codec=${server_uvc_codec:-$runtime_uvc_codec} effective_uvc_width=${server_uvc_width:-$runtime_uvc_width} effective_uvc_height=${server_uvc_height:-$runtime_uvc_height} effective_uvc_fps=${server_uvc_fps:-$runtime_uvc_fps} printf ' ↪ server.env CAM_OUTPUT=%s\n' "${cam_output:-}" printf ' ↪ server.env UVC_CODEC=%s\n' "${server_uvc_codec:-}" printf ' ↪ uvc.env UVC_CODEC=%s\n' "${runtime_uvc_codec:-}" printf ' ↪ server.env UVC_MODE=%sx%s@%s\n' "${server_uvc_width:-}" "${server_uvc_height:-}" "${server_uvc_fps:-}" printf ' ↪ uvc.env UVC_MODE=%sx%s@%s\n' "${runtime_uvc_width:-}" "${runtime_uvc_height:-}" "${runtime_uvc_fps:-}" printf ' ↪ effective UVC_MODE=%sx%s@%s\n' "${effective_uvc_width:-}" "${effective_uvc_height:-}" "${effective_uvc_fps:-}" if [[ -n "${expect_cam_output}" && "${cam_output}" != "${expect_cam_output}" ]]; then printf 'expected CAM_OUTPUT=%s but found %s\n' "${expect_cam_output}" "${cam_output:-}" >&2 exit 64 fi if [[ -n "${expect_uvc_codec}" && "${effective_uvc_codec}" != "${expect_uvc_codec}" ]]; then printf 'expected effective UVC_CODEC=%s but found %s\n' "${expect_uvc_codec}" "${effective_uvc_codec:-}" >&2 exit 65 fi if [[ -n "${expect_uvc_codec}" && "${runtime_uvc_codec}" != "${expect_uvc_codec}" ]]; then printf 'expected uvc.env UVC_CODEC=%s but found %s\n' "${expect_uvc_codec}" "${runtime_uvc_codec:-}" >&2 exit 66 fi if [[ -n "${expect_uvc_width}" && "${effective_uvc_width}" != "${expect_uvc_width}" ]]; then printf 'expected effective UVC_WIDTH=%s but found %s\n' "${expect_uvc_width}" "${effective_uvc_width:-}" >&2 exit 67 fi if [[ -n "${expect_uvc_height}" && "${effective_uvc_height}" != "${expect_uvc_height}" ]]; then printf 'expected effective UVC_HEIGHT=%s but found %s\n' "${expect_uvc_height}" "${effective_uvc_height:-}" >&2 exit 68 fi if [[ -n "${expect_uvc_fps}" && "${effective_uvc_fps}" != "${expect_uvc_fps}" ]]; then printf 'expected effective UVC_FPS=%s but found %s\n' "${expect_uvc_fps}" "${effective_uvc_fps:-}" >&2 exit 69 fi if [[ -n "${expect_uvc_width}" && "${runtime_uvc_width}" != "${expect_uvc_width}" ]]; then printf 'expected uvc.env UVC_WIDTH=%s but found %s\n' "${expect_uvc_width}" "${runtime_uvc_width:-}" >&2 exit 70 fi if [[ -n "${expect_uvc_height}" && "${runtime_uvc_height}" != "${expect_uvc_height}" ]]; then printf 'expected uvc.env UVC_HEIGHT=%s but found %s\n' "${expect_uvc_height}" "${runtime_uvc_height:-}" >&2 exit 71 fi if [[ -n "${expect_uvc_fps}" && "${runtime_uvc_fps}" != "${expect_uvc_fps}" ]]; then printf 'expected uvc.env UVC_FPS=%s but found %s\n' "${expect_uvc_fps}" "${runtime_uvc_fps:-}" >&2 exit 72 fi systemctl is-active lesavka-server lesavka-uvc lesavka-core >/dev/null REMOTE_PREFLIGHT } print_lesavka_versions() { echo "==> Lesavka versions under test" if [[ ! -x "${REPO_ROOT}/target/debug/lesavka-relayctl" ]]; then (cd "${REPO_ROOT}" && cargo build -p lesavka_client --bin lesavka-relayctl >/dev/null) fi local version_output if ! version_output="$( LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ version 2>&1 )"; then echo "failed to query Lesavka versions through ${RESOLVED_LESAVKA_SERVER_ADDR}" >&2 echo "${version_output}" >&2 return 1 fi if ! grep -q "^client_version=" <<<"${version_output}"; then echo "Lesavka version query did not report client_version=; refusing to run an unattributed probe" >&2 echo "${version_output}" >&2 return 1 fi if grep -q "^client_full_version=" <<<"${version_output}"; then echo "Lesavka version query reported a combined version+revision; refusing ambiguous probe attribution" >&2 echo "${version_output}" >&2 return 1 fi if ! grep -q "^client_revision=" <<<"${version_output}"; then echo "Lesavka version query did not report client_revision=; refusing to run an unattributed probe" >&2 echo "${version_output}" >&2 return 1 fi if ! grep -q "^server_version=" <<<"${version_output}"; then echo "Lesavka version query did not report server_version=; refusing to run an unattributed probe" >&2 echo "${version_output}" >&2 return 1 fi if ! grep -q "^server_revision=" <<<"${version_output}"; then echo "Lesavka version query did not report server_revision=; refusing to run an unattributed probe" >&2 echo "${version_output}" >&2 return 1 fi while IFS= read -r line; do [[ -n "${line}" ]] && echo " ↪ ${line}" done <<<"${version_output}" } write_output_delay_calibration() { [[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0 [[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0 echo "==> deriving UVC/UAC output-delay calibration" python3 - <<'PY' \ "${LOCAL_ANALYSIS_JSON}" \ "${LOCAL_OUTPUT_DELAY_JSON}" \ "${LOCAL_OUTPUT_DELAY_ENV}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \ "${LESAVKA_OUTPUT_DELAY_TARGET}" \ "${LESAVKA_OUTPUT_DELAY_MIN_PAIRS}" \ "${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS}" \ "${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS}" \ "${LESAVKA_OUTPUT_DELAY_GAIN}" \ "${LESAVKA_OUTPUT_DELAY_MAX_STEP_US}" \ "${LESAVKA_OUTPUT_DELAY_APPLY}" \ "${LESAVKA_OUTPUT_DELAY_APPLY_MODE}" \ "${LESAVKA_OUTPUT_DELAY_SAVE}" \ "${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US}" \ "${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US}" import json import math import pathlib import shlex import sys ( report_path, output_json_path, output_env_path, correlation_path, target, min_pairs_raw, max_abs_skew_raw, max_drift_raw, gain_raw, max_step_raw, apply_raw, apply_mode_raw, save_raw, active_audio_raw, active_video_raw, ) = sys.argv[1:] def as_int(value, default): try: return int(str(value).strip()) except Exception: return default def as_float(value, default): try: result = float(str(value).strip()) except Exception: return default return result if math.isfinite(result) else default def as_bool(value): return str(value).strip().lower() not in {"", "0", "false", "no", "off"} def env_line(key, value): return f"{key}={shlex.quote(str(value))}\n" report = json.loads(pathlib.Path(report_path).read_text()) verdict = report.get("verdict") or {} try: correlation = json.loads(pathlib.Path(correlation_path).read_text()) except Exception: correlation = {} freshness = correlation.get("freshness") or {} capture_timebase = freshness.get("capture_timebase") or {} target = target.strip().lower() apply_mode = apply_mode_raw.strip().lower() min_pairs = max(1, as_int(min_pairs_raw, 13)) max_abs_skew_ms = max(1.0, as_float(max_abs_skew_raw, 5000.0)) max_drift_ms = max(0.0, as_float(max_drift_raw, 80.0)) gain = min(max(as_float(gain_raw, 1.0), 0.01), 1.0) max_step_us = max(1, as_int(max_step_raw, 1_500_000)) active_audio_offset_us = as_int(active_audio_raw, 0) active_video_offset_us = as_int(active_video_raw, 0) paired = as_int(report.get("paired_event_count"), 0) median_skew_ms = as_float(report.get("median_skew_ms"), 0.0) p95_abs_skew_ms = as_float( verdict.get("p95_abs_skew_ms"), as_float(report.get("max_abs_skew_ms"), 0.0), ) max_abs_observed_ms = as_float(report.get("max_abs_skew_ms"), p95_abs_skew_ms) drift_ms = as_float(report.get("drift_ms"), 0.0) raw_device_delta_us = int(round(median_skew_ms * 1000.0)) scaled_delta_us = int(round(raw_device_delta_us * gain)) bounded_delta_us = max(-max_step_us, min(max_step_us, scaled_delta_us)) audio_delta_us = 0 video_delta_us = 0 refusal_reasons = [] if target == "video": video_delta_us = bounded_delta_us elif target == "audio": audio_delta_us = -bounded_delta_us else: refusal_reasons.append(f"unsupported target {target!r}; use video or audio") if apply_mode not in {"absolute", "relative"}: refusal_reasons.append( f"unsupported apply mode {apply_mode!r}; use absolute or relative" ) if paired < min_pairs: refusal_reasons.append(f"paired_event_count {paired} < {min_pairs}") if max_abs_observed_ms > max_abs_skew_ms: refusal_reasons.append( f"max_abs_skew_ms {max_abs_observed_ms:.1f} > {max_abs_skew_ms:.1f}" ) if abs(drift_ms) > max_drift_ms: refusal_reasons.append(f"abs(drift_ms) {abs(drift_ms):.1f} > {max_drift_ms:.1f}") if capture_timebase and capture_timebase.get("valid") is not True: refusal_reasons.append( "capture timebase invalid for calibration: " f"{capture_timebase.get('status', 'unknown')} - {capture_timebase.get('reason', '')}" ) elif freshness.get("status") == "invalid": refusal_reasons.append( f"freshness invalid for calibration: {freshness.get('reason', '')}" ) audio_target_offset_us = active_audio_offset_us + audio_delta_us video_target_offset_us = active_video_offset_us + video_delta_us ready = not refusal_reasons decision = "ready" if ready else "refused" note = ( "direct UVC/UAC output-delay calibration: " f"median device skew {median_skew_ms:+.1f}ms, target={target}, " f"audio {active_audio_offset_us:+d}->{audio_target_offset_us:+d}us " f"(delta {audio_delta_us:+d}us), " f"video {active_video_offset_us:+d}->{video_target_offset_us:+d}us " f"(delta {video_delta_us:+d}us)" ) if not ready: note = f"direct UVC/UAC output-delay calibration refused: {'; '.join(refusal_reasons)}" artifact = { "schema": "lesavka.output-delay-calibration.v1", "source": "direct-uvc-uac-output-probe", "scope": "server-output-static-baseline", "applies_to": "server UVC/UAC gadget output path", "measurement_host_role": "lab-attached USB host", "probe_media_origin": "server-generated", "probe_media_path": "server generated signatures -> UVC/UAC sinks -> lab host capture", "injection_scope": "server-final-output-handoff", "server_pipeline_reference": "generated media PTS before intentional sync delay", "sink_handoff_path": "video CameraRelay::feed; audio Voice::push", "client_uplink_included": False, "report_json": report_path, "correlation_json": correlation_path if pathlib.Path(correlation_path).exists() else "", "audio_after_video_positive": True, "target": target, "ready": ready, "decision": decision, "apply_enabled": as_bool(apply_raw), "apply_mode": apply_mode, "save_enabled": as_bool(save_raw), "paired_event_count": paired, "min_pairs": min_pairs, "measured_device_skew_ms": median_skew_ms, "p95_abs_skew_ms": p95_abs_skew_ms, "max_abs_skew_ms": max_abs_observed_ms, "max_abs_skew_limit_ms": max_abs_skew_ms, "drift_ms": drift_ms, "max_drift_ms": max_drift_ms, "gain": gain, "max_step_us": max_step_us, "active_audio_offset_us": active_audio_offset_us, "active_video_offset_us": active_video_offset_us, "raw_device_delta_us": raw_device_delta_us, "bounded_device_delta_us": bounded_delta_us, "audio_offset_adjust_us": audio_delta_us, "video_offset_adjust_us": video_delta_us, "audio_target_offset_us": audio_target_offset_us, "video_target_offset_us": video_target_offset_us, "refusal_reasons": refusal_reasons, "note": note, } pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") env_values = { "output_delay_ready": str(ready).lower(), "output_delay_decision": decision, "output_delay_target": target, "output_delay_audio_delta_us": audio_delta_us, "output_delay_video_delta_us": video_delta_us, "output_delay_active_audio_offset_us": active_audio_offset_us, "output_delay_active_video_offset_us": active_video_offset_us, "output_delay_audio_target_offset_us": audio_target_offset_us, "output_delay_video_target_offset_us": video_target_offset_us, "output_delay_measured_skew_ms": f"{median_skew_ms:.3f}", "output_delay_paired_event_count": paired, "output_delay_drift_ms": f"{drift_ms:.3f}", "output_delay_apply_mode": apply_mode, "output_delay_note": note, } with pathlib.Path(output_env_path).open("w") as handle: for key, value in env_values.items(): handle.write(env_line(key, value)) PY } extract_timeline_from_reply() { local reply_path=$1 local timeline_path=$2 [[ -f "${reply_path}" ]] || return 0 python3 - <<'PY' "${reply_path}" "${timeline_path}" import json import pathlib import sys reply_path = pathlib.Path(sys.argv[1]) timeline_path = pathlib.Path(sys.argv[2]) prefix = "server_timeline_json=" raw = "" for line in reply_path.read_text(errors="replace").splitlines(): if line.startswith(prefix): raw = line[len(prefix):].strip() if not raw: raise SystemExit(0) timeline = json.loads(raw) timeline_path.write_text(json.dumps(timeline, indent=2, sort_keys=True) + "\n") PY } extract_server_timeline() { extract_timeline_from_reply "${LOCAL_SERVER_PROBE_REPLY}" "${LOCAL_SERVER_TIMELINE_JSON}" } extract_signal_conditioning_timeline() { extract_timeline_from_reply "${LOCAL_SIGNAL_CONDITIONING_REPLY}" "${LOCAL_SIGNAL_CONDITIONING_TIMELINE_JSON}" } compute_analysis_window_arg() { [[ "${ANALYSIS_TIMELINE_WINDOW}" != "0" ]] || return 0 [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]] || return 0 [[ -f "${LOCAL_CAPTURE_LOG}" ]] || return 0 [[ -f "${LOCAL_CLOCK_ALIGNMENT_JSON}" ]] || return 0 python3 - <<'PY' \ "${LOCAL_SERVER_TIMELINE_JSON}" \ "${LOCAL_CAPTURE_LOG}" \ "${LOCAL_CLOCK_ALIGNMENT_JSON}" \ "${ANALYSIS_TIMELINE_WINDOW_PADDING_SECONDS}" || true import json import math import pathlib import sys timeline_path, capture_log_path, clock_path, padding_raw = sys.argv[1:] def as_int(value): try: return int(str(value).strip()) except Exception: return None def as_float(value, default): try: parsed = float(str(value).strip()) except Exception: return default return parsed if math.isfinite(parsed) else default def capture_start_ns(path): try: lines = pathlib.Path(path).read_text(errors="replace").splitlines() except Exception: return None for line in lines: if line.startswith("capture_start_unix_ns="): return as_int(line.split("=", 1)[1]) return None try: timeline = json.loads(pathlib.Path(timeline_path).read_text()) clock = json.loads(pathlib.Path(clock_path).read_text()) except Exception: raise SystemExit(0) capture_start = capture_start_ns(capture_log_path) offset = as_int(clock.get("theia_to_tethys_offset_ns")) if clock.get("available") else None if capture_start is None or offset is None: raise SystemExit(0) times = [] for event in timeline.get("events", []): for key in ("audio_push_unix_ns", "video_feed_unix_ns"): event_ns = as_int(event.get(key)) if event_ns is not None: times.append((event_ns + offset - capture_start) / 1_000_000_000.0) if not times: raise SystemExit(0) padding = max(0.0, as_float(padding_raw, 1.0)) start = max(0.0, min(times) - padding) end = max(times) + padding if end <= start: raise SystemExit(0) print(f"--analysis-window-s {start:.3f}:{end:.3f}") PY } write_output_delay_correlation() { [[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0 [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]] || return 0 echo "==> correlating Theia feed timing with Tethys observations" python3 - <<'PY' \ "${LOCAL_ANALYSIS_JSON}" \ "${LOCAL_SERVER_TIMELINE_JSON}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" \ "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" \ "${LOCAL_CAPTURE}" \ "${LOCAL_CAPTURE_LOG}" \ "${LOCAL_CLOCK_ALIGNMENT_JSON}" \ "${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS}" \ "${LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS}" \ "${LESAVKA_OUTPUT_FRESHNESS_MAX_CLOCK_UNCERTAINTY_MS}" \ "${LESAVKA_OUTPUT_FRESHNESS_MIN_PAIRS}" \ "${CAPTURE_SECONDS}" \ "${CAPTURE_START_GRACE_SECONDS}" import csv import json import math import pathlib import statistics import struct import subprocess import sys ( report_path, timeline_path, output_json_path, output_csv_path, output_txt_path, capture_path, capture_log_path, clock_alignment_path, max_freshness_age_raw, max_freshness_drift_raw, max_clock_uncertainty_raw, min_freshness_pairs_raw, capture_seconds_raw, capture_start_grace_seconds_raw, ) = sys.argv[1:] report = json.loads(pathlib.Path(report_path).read_text()) timeline = json.loads(pathlib.Path(timeline_path).read_text()) server_event_list = timeline.get("events", []) server_events = {int(event["event_id"]): event for event in server_event_list} def finite(value): try: result = float(value) except Exception: return None return result if math.isfinite(result) else None def as_float(value, default): try: result = float(str(value).strip()) except Exception: return default return result if math.isfinite(result) else default def as_int_or_none(value): try: return int(str(value).strip()) except Exception: return None server_events_by_code = {} for event in server_event_list: code = as_int_or_none(event.get("code")) if code is None: continue server_events_by_code.setdefault(code, []).append(event) unique_server_events_by_code = { code: events[0] for code, events in server_events_by_code.items() if len(events) == 1 } def load_json_or_empty(path): try: return json.loads(pathlib.Path(path).read_text()) except Exception: return {} def run_json(command, description): try: output = subprocess.check_output(command, stderr=subprocess.PIPE) except Exception as error: return {"available": False, "error": f"{description} failed: {error}"} try: return json.loads(output) except Exception as error: return {"available": False, "error": f"{description} JSON parse failed: {error}"} def run_bytes(command, description): try: return subprocess.check_output(command, stderr=subprocess.PIPE) except Exception as error: raise RuntimeError(f"{description} failed: {error}") from error def parse_capture_start_unix_ns(path): try: lines = pathlib.Path(path).read_text(errors="replace").splitlines() except Exception: return None for line in lines: if line.startswith("capture_start_unix_ns="): return as_int_or_none(line.split("=", 1)[1]) return None def percentile(values, pct): values = sorted(value for value in values if value is not None and math.isfinite(value)) if not values: return None if len(values) == 1: return values[0] rank = (len(values) - 1) * (pct / 100.0) lower = math.floor(rank) upper = math.ceil(rank) if lower == upper: return values[int(rank)] fraction = rank - lower return values[lower] * (1.0 - fraction) + values[upper] * fraction def numeric_stats(values, suffix=""): values = [value for value in values if value is not None and math.isfinite(value)] if not values: return { "available": False, "count": 0, f"median{suffix}": None, f"p95{suffix}": None, f"max{suffix}": None, } return { "available": True, "count": len(values), f"median{suffix}": percentile(values, 50.0), f"p95{suffix}": percentile(values, 95.0), f"max{suffix}": max(values), } def stats(rows, key): values = [row.get(key) for row in rows if row.get(key) is not None] values = [value for value in values if math.isfinite(value)] if not values: return { "available": False, "count": 0, "first_ms": None, "last_ms": None, "mean_ms": None, "min_ms": None, "median_ms": None, "p95_ms": None, "max_ms": None, } return { "available": True, "count": len(values), "first_ms": values[0], "last_ms": values[-1], "mean_ms": sum(values) / len(values), "min_ms": min(values), "median_ms": percentile(values, 50.0), "p95_ms": percentile(values, 95.0), "max_ms": max(values), } def shift_stats(base, delta_ms): shifted = dict(base) if not shifted.get("available"): return shifted for key in ["first_ms", "last_ms", "mean_ms", "min_ms", "median_ms", "p95_ms", "max_ms"]: value = shifted.get(key) shifted[key] = value + delta_ms if value is not None else None shifted["intentional_delay_ms"] = delta_ms return shifted def fit_linear(rows, key): points = [(row["event_time_s"], row[key]) for row in rows if row.get(key) is not None] if len(points) < 2: return { "available": False, "intercept_ms": 0.0, "slope_ms_per_s": 0.0, "r2": 0.0, "drift_ms": 0.0, } xs = [point[0] for point in points] ys = [point[1] for point in points] mean_x = sum(xs) / len(xs) mean_y = sum(ys) / len(ys) denom = sum((x - mean_x) ** 2 for x in xs) slope = 0.0 if denom == 0 else sum((x - mean_x) * (y - mean_y) for x, y in points) / denom intercept = mean_y - slope * mean_x predicted = [intercept + slope * x for x in xs] ss_tot = sum((y - mean_y) ** 2 for y in ys) ss_res = sum((y - y_hat) ** 2 for y, y_hat in zip(ys, predicted)) r2 = 1.0 if ss_tot == 0.0 else max(0.0, min(1.0, 1.0 - (ss_res / ss_tot))) drift = (intercept + slope * xs[-1]) - (intercept + slope * xs[0]) return { "available": True, "intercept_ms": intercept, "slope_ms_per_s": slope, "r2": r2, "drift_ms": drift, "first_fit_ms": intercept + slope * xs[0], "last_fit_ms": intercept + slope * xs[-1], } def correlation(rows, left_key, right_key): pairs = [ (row[left_key], row[right_key]) for row in rows if row.get(left_key) is not None and row.get(right_key) is not None ] if len(pairs) < 2: return 0.0 xs = [pair[0] for pair in pairs] ys = [pair[1] for pair in pairs] mean_x = sum(xs) / len(xs) mean_y = sum(ys) / len(ys) denom_x = sum((x - mean_x) ** 2 for x in xs) denom_y = sum((y - mean_y) ** 2 for y in ys) if denom_x <= 0.0 or denom_y <= 0.0: return 0.0 return sum((x - mean_x) * (y - mean_y) for x, y in pairs) / math.sqrt(denom_x * denom_y) def ffprobe_frame_timestamps(path): data = run_json( [ "ffprobe", "-hide_banner", "-loglevel", "error", "-select_streams", "v:0", "-show_frames", "-show_entries", "frame=best_effort_timestamp_time", "-of", "json", path, ], "ffprobe video frames", ) return [ float(frame["best_effort_timestamp_time"]) for frame in data.get("frames", []) if frame.get("best_effort_timestamp_time") not in (None, "N/A") ] def ffprobe_packet_times(path, stream): data = run_json( [ "ffprobe", "-hide_banner", "-loglevel", "error", "-select_streams", stream, "-show_packets", "-show_entries", "packet=pts_time,duration_time", "-of", "json", path, ], f"ffprobe {stream} packets", ) packets = [] for packet in data.get("packets", []): try: pts = float(packet["pts_time"]) except Exception: continue duration = None try: duration = float(packet.get("duration_time")) except Exception: pass packets.append((pts, duration)) return packets def continuity_frame_ids(path): width = 320 height = 32 blocks = 20 try: raw = run_bytes( [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", path, "-map", "0:v:0", "-vf", f"crop=iw:32:0:ih-32,scale={width}:{height}:flags=area,format=gray", "-f", "rawvideo", "-pix_fmt", "gray", "-", ], "ffmpeg video continuity extraction", ) except RuntimeError: return [] frame_pixels = width * height if not raw or len(raw) % frame_pixels != 0: return [] block_width = width // blocks ids = [] for offset in range(0, len(raw), frame_pixels): frame = raw[offset : offset + frame_pixels] averages = [] for block in range(blocks): x_start = block * block_width x_end = width if block + 1 == blocks else (block + 1) * block_width total = 0 count = 0 for y in range(height): row = y * width for x in range(x_start, x_end): total += frame[row + x] count += 1 averages.append(total / max(1, count)) white = averages[0] black = averages[1] if white < 150 or black > 105 or white - black < 70: ids.append(None) continue threshold = (white + black) / 2.0 value = 0 for block in range(2, 18): value = (value << 1) | int(averages[block] > threshold) parity = bool(averages[18] > threshold) inverse = bool(averages[19] > threshold) if parity == inverse or parity != bool(value.bit_count() & 1): ids.append(None) continue ids.append(value) return ids def sequence_smoothness(ids): decoded = [value for value in ids if value is not None] duplicates = 0 missing = 0 regressions = 0 jumps = [] previous = None for value in decoded: if previous is None: previous = value continue diff = (value - previous) & 0xFFFF jumps.append(diff) if diff == 0: duplicates += 1 elif 1 < diff < 32768: missing += diff - 1 elif diff >= 32768: regressions += 1 previous = value return { "decoded_frames": len(decoded), "undecodable_frames": len(ids) - len(decoded), "unique_frames": len(set(decoded)), "duplicate_frames": duplicates, "estimated_missing_frames": missing, "sequence_regressions": regressions, "largest_forward_jump": max(jumps) if jumps else 0, } def interval_smoothness(timestamps, expected_ms, window_start_s=None, window_end_s=None): if window_start_s is not None and window_end_s is not None: timestamps = [ts for ts in timestamps if window_start_s <= ts <= window_end_s] intervals = [ (right - left) * 1000.0 for left, right in zip(timestamps, timestamps[1:]) if right > left ] if (expected_ms is None or expected_ms <= 0.0) and intervals: expected_ms = statistics.median(intervals) jitter = [abs(value - expected_ms) for value in intervals] if expected_ms else [] hiccup_threshold = expected_ms * 1.5 if expected_ms else float("inf") short_threshold = expected_ms * 0.5 if expected_ms else -1.0 return { "timestamps": len(timestamps), "expected_interval_ms": expected_ms, "interval_stats": numeric_stats(intervals, "_interval_ms"), "jitter_stats": numeric_stats(jitter, "_jitter_ms"), "hiccup_count": sum(1 for value in intervals if value > hiccup_threshold), "short_interval_count": sum(1 for value in intervals if value < short_threshold), "estimated_missing_by_timestamp": sum( max(0, round(value / expected_ms) - 1) for value in intervals if expected_ms and value > hiccup_threshold ), } def audio_rms_smoothness(path, window_start_s, window_end_s): sample_rate = 48_000 window_ms = 10 samples_per_window = sample_rate * window_ms // 1000 try: raw = run_bytes( [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", path, "-map", "0:a:0", "-ac", "1", "-ar", str(sample_rate), "-f", "s16le", "-acodec", "pcm_s16le", "-", ], "ffmpeg audio continuity extraction", ) except RuntimeError as error: return {"available": False, "error": str(error)} start_sample = max(0, int(window_start_s * sample_rate)) end_sample = min(len(raw) // 2, int(window_end_s * sample_rate)) if end_sample <= start_sample: return {"available": False, "error": "empty audio smoothness window"} sample_count = end_sample - start_sample samples = struct.unpack( f"<{sample_count}h", raw[start_sample * 2 : end_sample * 2], ) rms_values = [] for index in range(0, len(samples) - samples_per_window + 1, samples_per_window): chunk = samples[index : index + samples_per_window] rms_values.append(math.sqrt(sum(value * value for value in chunk) / len(chunk))) low_threshold = 90.0 return { "available": bool(rms_values), "window_ms": window_ms, "rms_stats": numeric_stats(rms_values, "_rms"), "low_rms_window_count": sum(1 for value in rms_values if value < low_threshold), "low_rms_threshold": low_threshold, } def analyze_smoothness(path, report, timeline): camera_fps = as_float(timeline.get("camera_fps"), 0.0) duration_s = as_float(timeline.get("duration_us"), 0.0) / 1_000_000.0 warmup_s = as_float(timeline.get("warmup_us"), 0.0) / 1_000_000.0 paired_times = [ value for event in report.get("paired_events", []) for value in [finite(event.get("video_time_s")), finite(event.get("audio_time_s"))] if value is not None ] if paired_times: window_start_s = max(0.0, min(paired_times) - warmup_s) window_end_s = window_start_s + duration_s else: window_start_s = None window_end_s = None video_timestamps = ffprobe_frame_timestamps(path) audio_packets = ffprobe_packet_times(path, "a:0") audio_packet_timestamps = [pts for pts, _duration in audio_packets] media_timeline = { "video_first_s": min(video_timestamps) if video_timestamps else None, "video_last_s": max(video_timestamps) if video_timestamps else None, "audio_first_s": min(audio_packet_timestamps) if audio_packet_timestamps else None, "audio_last_s": max(audio_packet_timestamps) if audio_packet_timestamps else None, } media_timeline["video_span_s"] = ( media_timeline["video_last_s"] - media_timeline["video_first_s"] if media_timeline["video_first_s"] is not None and media_timeline["video_last_s"] is not None else None ) media_timeline["audio_span_s"] = ( media_timeline["audio_last_s"] - media_timeline["audio_first_s"] if media_timeline["audio_first_s"] is not None and media_timeline["audio_last_s"] is not None else None ) media_timeline["audio_video_span_gap_s"] = ( media_timeline["video_span_s"] - media_timeline["audio_span_s"] if media_timeline["video_span_s"] is not None and media_timeline["audio_span_s"] is not None else None ) video_ids = continuity_frame_ids(path) if window_start_s is not None and window_end_s is not None and len(video_ids) == len(video_timestamps): video_ids = [ frame_id for frame_id, timestamp in zip(video_ids, video_timestamps) if window_start_s <= timestamp <= window_end_s ] expected_video_ms = 1000.0 / camera_fps if camera_fps > 0.0 else None audio_durations = [ duration * 1000.0 for _pts, duration in audio_packets if duration is not None and math.isfinite(duration) and duration > 0.0 ] expected_audio_ms = statistics.median(audio_durations) if audio_durations else None audio_window_end = window_end_s if window_end_s is not None else max(audio_packet_timestamps or [0.0]) return { "schema": "lesavka.output-smoothness-summary.v1", "scope": "captured RC target media cadence and continuity over the server-generated probe window", "window_start_s": window_start_s, "window_end_s": window_end_s, "media_timeline": media_timeline, "video": { **interval_smoothness(video_timestamps, expected_video_ms, window_start_s, window_end_s), **sequence_smoothness(video_ids), }, "audio": { "packet_cadence": interval_smoothness( audio_packet_timestamps, expected_audio_ms, window_start_s, window_end_s, ), "rms_continuity": audio_rms_smoothness(path, window_start_s or 0.0, audio_window_end), }, } capture_start_unix_ns = parse_capture_start_unix_ns(capture_log_path) clock_alignment = load_json_or_empty(clock_alignment_path) clock_alignment_available = bool(clock_alignment.get("available")) theia_to_tethys_offset_ns = ( as_int_or_none(clock_alignment.get("theia_to_tethys_offset_ns")) if clock_alignment_available else None ) clock_uncertainty_ms = as_float(clock_alignment.get("uncertainty_ms"), 0.0) audio_delay_ms = as_float(timeline.get("audio_delay_us"), 0.0) / 1000.0 video_delay_ms = as_float(timeline.get("video_delay_us"), 0.0) / 1000.0 def corrected_server_capture_time_s(server, key): server_unix_ns = as_int_or_none(server.get(key)) if ( server_unix_ns is None or theia_to_tethys_offset_ns is None or capture_start_unix_ns is None ): return None return (server_unix_ns + theia_to_tethys_offset_ns - capture_start_unix_ns) / 1_000_000_000.0 def corrected_capture_time_s_from_unix_ns(server_unix_ns): if ( server_unix_ns is None or theia_to_tethys_offset_ns is None or capture_start_unix_ns is None ): return None return (server_unix_ns + theia_to_tethys_offset_ns - capture_start_unix_ns) / 1_000_000_000.0 def shifted_unix_ns(unix_ns, delta_ms): if unix_ns is None: return None return unix_ns + int(round(delta_ms * 1_000_000.0)) def nearest_server_event_for_observation(observed, used_event_ids): observed_video_s = finite(observed.get("video_time_s")) observed_audio_s = finite(observed.get("audio_time_s")) if observed_video_s is None and observed_audio_s is None: return None, None, None best = None for server in server_event_list: event_id = as_int_or_none(server.get("event_id")) if event_id in used_event_ids: continue expected_video_s = corrected_server_capture_time_s(server, "video_feed_unix_ns") expected_audio_s = corrected_server_capture_time_s(server, "audio_push_unix_ns") score = 0.0 parts = 0 if observed_video_s is not None and expected_video_s is not None: score += abs(observed_video_s - expected_video_s) parts += 1 if observed_audio_s is not None and expected_audio_s is not None: score += abs(observed_audio_s - expected_audio_s) parts += 1 if parts == 0: continue normalized_score = score / parts if best is None or normalized_score < best[0]: best = (normalized_score, event_id, server) if best is None: return None, None, None return best[2], "nearest_clocked_time", best[0] * 1000.0 joined = [] used_server_event_ids = set() for observed in report.get("paired_events", []): observed_event_id = as_int_or_none(observed.get("event_id")) observed_server_event_id = as_int_or_none(observed.get("server_event_id")) observed_event_code = as_int_or_none(observed.get("event_code")) server = None match_method = None match_score_ms = None if observed_server_event_id is not None: server = server_events.get(observed_server_event_id) match_method = "analyzer_server_event_id" if server else None if server is None and observed_event_code is not None: server = unique_server_events_by_code.get(observed_event_code) match_method = "unique_event_code" if server else None if server is None: server, match_method, match_score_ms = nearest_server_event_for_observation( observed, used_server_event_ids, ) if server is None and observed_event_code is None and observed_event_id is not None: server = server_events.get(observed_event_id) match_method = "legacy_pair_event_id" if server else None if not server: continue event_id = int(server.get("event_id", -1)) used_server_event_ids.add(event_id) observed_skew_ms = finite(observed.get("skew_ms")) server_feed_delta_ms = finite(server.get("server_feed_delta_ms")) tethys_video_time_s = finite(observed.get("video_time_s")) tethys_audio_time_s = finite(observed.get("audio_time_s")) server_video_feed_s = finite(server.get("video_feed_monotonic_us")) server_audio_push_s = finite(server.get("audio_push_monotonic_us")) if server_video_feed_s is not None: server_video_feed_s /= 1_000_000.0 if server_audio_push_s is not None: server_audio_push_s /= 1_000_000.0 server_video_feed_unix_ns = as_int_or_none(server.get("video_feed_unix_ns")) server_audio_push_unix_ns = as_int_or_none(server.get("audio_push_unix_ns")) server_video_pipeline_unix_ns = shifted_unix_ns( server_video_feed_unix_ns, -video_delay_ms, ) server_audio_pipeline_unix_ns = shifted_unix_ns( server_audio_push_unix_ns, -audio_delay_ms, ) tethys_video_unix_ns = ( capture_start_unix_ns + int(round(tethys_video_time_s * 1_000_000_000.0)) if capture_start_unix_ns is not None and tethys_video_time_s is not None else None ) tethys_audio_unix_ns = ( capture_start_unix_ns + int(round(tethys_audio_time_s * 1_000_000_000.0)) if capture_start_unix_ns is not None and tethys_audio_time_s is not None else None ) corrected_server_video_feed_unix_ns = ( server_video_feed_unix_ns + theia_to_tethys_offset_ns if server_video_feed_unix_ns is not None and theia_to_tethys_offset_ns is not None else None ) corrected_server_audio_push_unix_ns = ( server_audio_push_unix_ns + theia_to_tethys_offset_ns if server_audio_push_unix_ns is not None and theia_to_tethys_offset_ns is not None else None ) corrected_server_video_pipeline_unix_ns = ( server_video_pipeline_unix_ns + theia_to_tethys_offset_ns if server_video_pipeline_unix_ns is not None and theia_to_tethys_offset_ns is not None else None ) corrected_server_audio_pipeline_unix_ns = ( server_audio_pipeline_unix_ns + theia_to_tethys_offset_ns if server_audio_pipeline_unix_ns is not None and theia_to_tethys_offset_ns is not None else None ) video_sink_handoff_overhead_ms = ( (tethys_video_unix_ns - corrected_server_video_feed_unix_ns) / 1_000_000.0 if tethys_video_unix_ns is not None and corrected_server_video_feed_unix_ns is not None else None ) audio_sink_handoff_overhead_ms = ( (tethys_audio_unix_ns - corrected_server_audio_push_unix_ns) / 1_000_000.0 if tethys_audio_unix_ns is not None and corrected_server_audio_push_unix_ns is not None else None ) video_freshness_ms = ( (tethys_video_unix_ns - corrected_server_video_pipeline_unix_ns) / 1_000_000.0 if tethys_video_unix_ns is not None and corrected_server_video_pipeline_unix_ns is not None else None ) audio_freshness_ms = ( (tethys_audio_unix_ns - corrected_server_audio_pipeline_unix_ns) / 1_000_000.0 if tethys_audio_unix_ns is not None and corrected_server_audio_pipeline_unix_ns is not None else None ) video_event_age_ms = video_freshness_ms audio_event_age_ms = audio_freshness_ms residual_path_skew_ms = ( observed_skew_ms - server_feed_delta_ms if observed_skew_ms is not None and server_feed_delta_ms is not None else None ) planned_start_us = int(server.get("planned_start_us", 0)) joined.append({ "event_id": event_id, "observed_event_id": observed_event_id, "observed_server_event_id": observed_server_event_id, "observed_event_code": observed_event_code, "match_method": match_method, "match_score_ms": match_score_ms, "code": int(server.get("code", 0)), "event_time_s": planned_start_us / 1_000_000.0, "planned_start_us": planned_start_us, "planned_end_us": int(server.get("planned_end_us", 0)), "tethys_video_time_s": tethys_video_time_s, "tethys_audio_time_s": tethys_audio_time_s, "observed_skew_ms": observed_skew_ms, "server_video_feed_monotonic_us": server.get("video_feed_monotonic_us"), "server_audio_push_monotonic_us": server.get("audio_push_monotonic_us"), "server_video_feed_unix_ns": server_video_feed_unix_ns, "server_audio_push_unix_ns": server_audio_push_unix_ns, "server_video_pipeline_unix_ns": server_video_pipeline_unix_ns, "server_audio_pipeline_unix_ns": server_audio_pipeline_unix_ns, "tethys_video_unix_ns": tethys_video_unix_ns, "tethys_audio_unix_ns": tethys_audio_unix_ns, "server_video_feed_s": server_video_feed_s, "server_audio_push_s": server_audio_push_s, "video_freshness_ms": video_freshness_ms, "audio_freshness_ms": audio_freshness_ms, "video_sink_handoff_overhead_ms": video_sink_handoff_overhead_ms, "audio_sink_handoff_overhead_ms": audio_sink_handoff_overhead_ms, "video_event_age_ms": video_event_age_ms, "audio_event_age_ms": audio_event_age_ms, "server_feed_delta_ms": server_feed_delta_ms, "residual_path_skew_ms": residual_path_skew_ms, "confidence": finite(observed.get("confidence")), }) first_event_time_s = joined[0]["event_time_s"] if joined else 0.0 for row in joined: row["relative_event_time_s"] = row["event_time_s"] - first_event_time_s row["event_time_s"] = row["relative_event_time_s"] observed_model = fit_linear(joined, "observed_skew_ms") server_model = fit_linear(joined, "server_feed_delta_ms") residual_model = fit_linear(joined, "residual_path_skew_ms") video_freshness_model = fit_linear(joined, "video_freshness_ms") audio_freshness_model = fit_linear(joined, "audio_freshness_ms") video_sink_handoff_model = fit_linear(joined, "video_sink_handoff_overhead_ms") audio_sink_handoff_model = fit_linear(joined, "audio_sink_handoff_overhead_ms") video_event_age_model = fit_linear(joined, "video_event_age_ms") audio_event_age_model = fit_linear(joined, "audio_event_age_ms") video_freshness_stats = stats(joined, "video_freshness_ms") audio_freshness_stats = stats(joined, "audio_freshness_ms") video_sink_handoff_stats = stats(joined, "video_sink_handoff_overhead_ms") audio_sink_handoff_stats = stats(joined, "audio_sink_handoff_overhead_ms") video_event_age_stats = dict(video_freshness_stats) audio_event_age_stats = dict(audio_freshness_stats) video_event_age_stats["intentional_delay_ms"] = video_delay_ms audio_event_age_stats["intentional_delay_ms"] = audio_delay_ms server_observed_correlation = correlation(joined, "server_feed_delta_ms", "observed_skew_ms") sync_verdict = report.get("verdict") or {} sync_passed = sync_verdict.get("passed") is True observed_drift = observed_model.get("drift_ms", 0.0) server_drift = server_model.get("drift_ms", 0.0) residual_drift = residual_model.get("drift_ms", 0.0) same_direction = observed_drift == 0.0 or (observed_drift > 0) == (server_drift > 0) server_share = 0.0 if abs(observed_drift) < 1e-6 else abs(server_drift) / abs(observed_drift) if same_direction and server_share >= 0.5 and abs(server_drift) >= 20.0: dominant_layer = "server_feed_timing" else: dominant_layer = "post_server_output_or_tethys_capture" correction_mode = ( "linear_function_candidate" if abs(residual_drift) >= 20.0 else "scalar_candidate" ) max_freshness_age_ms = max(1.0, as_float(max_freshness_age_raw, 1000.0)) max_freshness_drift_ms = max(0.0, as_float(max_freshness_drift_raw, 100.0)) max_clock_uncertainty_ms = max(0.0, as_float(max_clock_uncertainty_raw, 100.0)) min_freshness_pairs = max(1, as_int_or_none(min_freshness_pairs_raw) or 1) configured_capture_duration_s = as_float(capture_seconds_raw, 0.0) capture_start_grace_s = as_float(capture_start_grace_seconds_raw, 0.0) freshness_p95_values = [ value for value in [ video_freshness_stats.get("p95_ms"), audio_freshness_stats.get("p95_ms"), ] if value is not None ] event_age_p95_values = [ value for value in [ video_event_age_stats.get("p95_ms"), audio_event_age_stats.get("p95_ms"), ] if value is not None ] freshness_drift_values = [ abs(value) for value in [ video_event_age_model.get("drift_ms"), audio_event_age_model.get("drift_ms"), ] if value is not None and math.isfinite(value) ] freshness_worst_p95_ms = max(freshness_p95_values) if freshness_p95_values else None freshness_worst_event_p95_ms = max(event_age_p95_values) if event_age_p95_values else None freshness_worst_drift_ms = max(freshness_drift_values) if freshness_drift_values else None freshness_worst_event_with_uncertainty_ms = ( freshness_worst_event_p95_ms + clock_uncertainty_ms if freshness_worst_event_p95_ms is not None else None ) event_age_min_values = [ value for value in [ video_event_age_stats.get("min_ms"), audio_event_age_stats.get("min_ms"), ] if value is not None and math.isfinite(value) ] freshness_min_event_age_ms = min(event_age_min_values) if event_age_min_values else None smoothness = analyze_smoothness(capture_path, report, timeline) media_timeline = smoothness.get("media_timeline") or {} observed_capture_duration_values = [ value for value in [ finite(media_timeline.get("video_last_s")), finite(media_timeline.get("audio_last_s")), ] if value is not None ] observed_capture_duration_s = ( max(observed_capture_duration_values) if observed_capture_duration_values else None ) def negative_rows(rows, key, limit_ms): return [ row for row in rows if row.get(key) is not None and math.isfinite(row[key]) and row[key] < -limit_ms ] def non_monotonic_rows(rows, key): bad = [] previous = None for row in rows: value = finite(row.get(key)) if value is None: continue if previous is not None and value + 1e-9 < previous: bad.append(row) previous = value return bad video_negative_rows = negative_rows(joined, "video_freshness_ms", clock_uncertainty_ms) audio_negative_rows = negative_rows(joined, "audio_freshness_ms", clock_uncertainty_ms) monotonic_checks = { "server video pipeline": non_monotonic_rows(joined, "server_video_pipeline_unix_ns"), "server audio pipeline": non_monotonic_rows(joined, "server_audio_pipeline_unix_ns"), "Tethys video observation": non_monotonic_rows(joined, "tethys_video_time_s"), "Tethys audio observation": non_monotonic_rows(joined, "tethys_audio_time_s"), } non_monotonic_labels = [ f"{label} ({len(rows)} reversal{'s' if len(rows) != 1 else ''})" for label, rows in monotonic_checks.items() if rows ] capture_timebase_status = "valid" capture_timebase_reason = "Tethys observation timestamps are monotonic against server pipeline references" capture_timebase_reasons = [] capture_timebase_warnings = [] if not joined: capture_timebase_status = "unknown" capture_timebase_reason = "no joined coded events are available" elif not clock_alignment_available: capture_timebase_status = "unknown" capture_timebase_reason = "server/capture clock alignment is unavailable" elif video_negative_rows or audio_negative_rows: capture_timebase_status = "invalid" if video_negative_rows: capture_timebase_reasons.append( f"video observed before server pipeline injection {len(video_negative_rows)} time(s)" ) if audio_negative_rows: capture_timebase_reasons.append( f"audio observed before server pipeline injection {len(audio_negative_rows)} time(s)" ) if non_monotonic_labels: capture_timebase_status = "invalid" capture_timebase_reasons.append( "non-monotonic event timestamps: " + ", ".join(non_monotonic_labels) ) span_gap_s = finite(media_timeline.get("audio_video_span_gap_s")) if span_gap_s is not None and abs(span_gap_s) > 1.0: relation = "shorter" if span_gap_s > 0 else "longer" capture_timebase_warnings.append( f"captured audio stream is {abs(span_gap_s):.3f}s {relation} than video; " "treating whole-file media span as recorder coverage, not event freshness" ) if capture_timebase_reasons: capture_timebase_reason = "; ".join(capture_timebase_reasons) server_start_capture_time_s = corrected_capture_time_s_from_unix_ns( as_int_or_none(timeline.get("server_start_unix_ns")) ) first_timing_row = joined[0] if joined else {} first_event_timing = { "event_id": first_timing_row.get("event_id"), "code": first_timing_row.get("code"), "server_audio_pipeline_capture_s": corrected_capture_time_s_from_unix_ns( first_timing_row.get("server_audio_pipeline_unix_ns") ), "server_video_pipeline_capture_s": corrected_capture_time_s_from_unix_ns( first_timing_row.get("server_video_pipeline_unix_ns") ), "server_audio_sink_push_capture_s": corrected_capture_time_s_from_unix_ns( first_timing_row.get("server_audio_push_unix_ns") ), "server_video_sink_feed_capture_s": corrected_capture_time_s_from_unix_ns( first_timing_row.get("server_video_feed_unix_ns") ), "tethys_audio_observed_s": first_timing_row.get("tethys_audio_time_s"), "tethys_video_observed_s": first_timing_row.get("tethys_video_time_s"), "audio_pipeline_freshness_ms": first_timing_row.get("audio_freshness_ms"), "video_pipeline_freshness_ms": first_timing_row.get("video_freshness_ms"), } def nearest_observation(expected_s, observations_s, tolerance_s): if expected_s is None: return None best = None for observed_s in observations_s: observed = finite(observed_s) if observed is None: continue delta_s = observed - expected_s if abs(delta_s) > tolerance_s: continue if best is None or abs(delta_s) < abs(best["delta_s"]): best = { "time_s": observed, "delta_ms": delta_s * 1000.0, "delta_s": delta_s, } return best def build_event_visibility(): video_onsets = [value for value in (finite(v) for v in report.get("video_onsets_s") or []) if value is not None] audio_onsets = [value for value in (finite(v) for v in report.get("audio_onsets_s") or []) if value is not None] raw_first_video = finite(report.get("raw_first_video_activity_s")) raw_first_audio = finite(report.get("raw_first_audio_activity_s")) pulse_period_s = max(0.001, as_float(timeline.get("pulse_period_ms"), 1000.0) / 1000.0) tolerance_s = max(0.20, min(0.45, pulse_period_s * 0.45)) paired_by_server_event = { int(row["event_id"]): row for row in joined if row.get("event_id") is not None } events = [] summary = { "paired": 0, "unpaired": 0, "video_only": 0, "audio_only": 0, "missing": 0, } for server in server_event_list: event_id = as_int_or_none(server.get("event_id")) if event_id is None: continue code = as_int_or_none(server.get("code")) video_pipeline_s = corrected_capture_time_s_from_unix_ns( shifted_unix_ns(as_int_or_none(server.get("video_feed_unix_ns")), -video_delay_ms) ) audio_pipeline_s = corrected_capture_time_s_from_unix_ns( shifted_unix_ns(as_int_or_none(server.get("audio_push_unix_ns")), -audio_delay_ms) ) video_match = nearest_observation(video_pipeline_s, video_onsets, tolerance_s) audio_match = nearest_observation(audio_pipeline_s, audio_onsets, tolerance_s) raw_video_match = ( video_match is None and raw_first_video is not None and video_pipeline_s is not None and abs(raw_first_video - video_pipeline_s) <= tolerance_s ) raw_audio_match = ( audio_match is None and raw_first_audio is not None and audio_pipeline_s is not None and abs(raw_first_audio - audio_pipeline_s) <= tolerance_s ) paired = paired_by_server_event.get(event_id) video_seen = video_match is not None or raw_video_match audio_seen = audio_match is not None or raw_audio_match if paired is not None: status = "paired" elif video_seen and audio_seen: status = "unpaired" elif video_seen: status = "video_only" elif audio_seen: status = "audio_only" else: status = "missing" summary[status] += 1 events.append({ "event_id": event_id, "code": code, "status": status, "paired": paired is not None, "expected_video_pipeline_s": video_pipeline_s, "expected_audio_pipeline_s": audio_pipeline_s, "video_seen": video_seen, "audio_seen": audio_seen, "video_source": "coded_onset" if video_match else ("raw_activity" if raw_video_match else None), "audio_source": "coded_onset" if audio_match else ("raw_activity" if raw_audio_match else None), "nearest_video_time_s": (video_match or {}).get("time_s"), "nearest_audio_time_s": (audio_match or {}).get("time_s"), "nearest_video_delta_ms": (video_match or {}).get("delta_ms"), "nearest_audio_delta_ms": (audio_match or {}).get("delta_ms"), "paired_video_time_s": paired.get("tethys_video_time_s") if paired else None, "paired_audio_time_s": paired.get("tethys_audio_time_s") if paired else None, "paired_skew_ms": paired.get("observed_skew_ms") if paired else None, "confidence": paired.get("confidence") if paired else None, }) return { "schema": "lesavka.coded-event-visibility.v1", "scope": "server event slots classified against Tethys coded onsets and raw first activity", "match_tolerance_s": tolerance_s, "summary": summary, "events": events, } event_visibility = build_event_visibility() timing_map = { "schema": "lesavka.output-timing-map.v1", "scope": "capture-relative map of server pipeline reference, sink handoff, and Tethys observation times", "injection_scope": timeline.get("injection_scope", "server-final-output-handoff"), "server_pipeline_reference": timeline.get( "server_pipeline_reference", "generated media PTS before intentional sync delay", ), "sink_handoff_path": timeline.get( "sink_handoff_path", "video CameraRelay::feed; audio Voice::push", ), "client_uplink_included": bool(timeline.get("client_uplink_included", False)), "freshness_clock_starts_at": "server pipeline reference before intentional sync delay", "capture_start_unix_ns": capture_start_unix_ns, "server_start_unix_ns": as_int_or_none(timeline.get("server_start_unix_ns")), "server_start_capture_time_s": server_start_capture_time_s, "capture_to_server_start_ms": ( server_start_capture_time_s * 1000.0 if server_start_capture_time_s is not None else None ), "probe_warmup_ms": as_float(timeline.get("warmup_us"), 0.0) / 1000.0, "probe_duration_ms": as_float(timeline.get("duration_us"), 0.0) / 1000.0, "active_probe_media_ms": ( as_float(timeline.get("warmup_us"), 0.0) + as_float(timeline.get("duration_us"), 0.0) ) / 1000.0, "configured_capture_duration_s": configured_capture_duration_s, "observed_capture_duration_s": observed_capture_duration_s, "capture_start_grace_s": capture_start_grace_s, "intentional_audio_delay_ms": audio_delay_ms, "intentional_video_delay_ms": video_delay_ms, "first_event": first_event_timing, } if not event_age_p95_values: freshness_status = "invalid" freshness_reason = "clock-aligned server pipeline references and Tethys observations were not available" elif len(joined) < min_freshness_pairs: freshness_status = "invalid" freshness_reason = ( f"paired coded events {len(joined)} < {min_freshness_pairs}; " "freshness requires enough matched injected/observed events" ) elif not clock_alignment_available or clock_uncertainty_ms > max_clock_uncertainty_ms: freshness_status = "invalid" freshness_reason = ( "server/capture clock alignment unavailable or too uncertain: " f"{clock_uncertainty_ms:.1f} ms exceeds {max_clock_uncertainty_ms:.1f} ms limit" ) elif capture_timebase_status == "invalid": freshness_status = "invalid" freshness_reason = f"capture timebase invalid: {capture_timebase_reason}" elif freshness_min_event_age_ms is not None and freshness_min_event_age_ms < -clock_uncertainty_ms: freshness_status = "invalid" freshness_reason = ( f"one media stream has impossible negative pipeline freshness beyond clock uncertainty: " f"minimum freshness {freshness_min_event_age_ms:.1f} ms, uncertainty " f"{clock_uncertainty_ms:.1f} ms" ) elif freshness_worst_event_p95_ms < -clock_uncertainty_ms: freshness_status = "invalid" freshness_reason = ( f"freshness was negative beyond clock uncertainty: " f"worst pipeline p95 {freshness_worst_event_p95_ms:.1f} ms, uncertainty " f"{clock_uncertainty_ms:.1f} ms" ) elif not sync_passed: freshness_status = "unknown" freshness_reason = ( "sync did not pass, so freshness from paired signatures is not trustworthy: " f"{sync_verdict.get('status', 'unknown')} - {sync_verdict.get('reason', '')}" ) elif freshness_worst_event_with_uncertainty_ms <= max_freshness_age_ms and ( freshness_worst_drift_ms is None or freshness_worst_drift_ms <= max_freshness_drift_ms ): freshness_status = "pass" freshness_reason = ( f"worst pipeline p95 {freshness_worst_event_p95_ms:.1f} ms + clock uncertainty " f"{clock_uncertainty_ms:.1f} ms <= {max_freshness_age_ms:.1f} ms and worst freshness drift " f"{(freshness_worst_drift_ms or 0.0):.1f} ms <= {max_freshness_drift_ms:.1f} ms" ) else: freshness_status = "fail" freshness_reason = ( f"worst pipeline p95 " f"{freshness_worst_event_p95_ms if freshness_worst_event_p95_ms is not None else 0.0:.1f} ms " f"+ clock uncertainty {clock_uncertainty_ms:.1f} ms " f"(limit {max_freshness_age_ms:.1f} ms), worst freshness drift " f"{freshness_worst_drift_ms if freshness_worst_drift_ms is not None else 0.0:.1f} ms " f"(limit {max_freshness_drift_ms:.1f} ms)" ) artifact = { "schema": "lesavka.output-delay-correlation.v1", "report_json": report_path, "server_timeline_json": timeline_path, "joined_event_count": len(joined), "audio_after_video_positive": True, "timing_map": timing_map, "observed_skew_model": observed_model, "server_feed_delta_model": server_model, "residual_path_skew_model": residual_model, "freshness": { "schema": "lesavka.output-freshness-summary.v1", "status": freshness_status, "reason": freshness_reason, "scope": "clock-corrected server pipeline reference to Tethys observed playback from the same paired signatures", "capture_start_unix_ns": capture_start_unix_ns, "capture_timebase": { "status": capture_timebase_status, "reason": capture_timebase_reason, "valid": capture_timebase_status == "valid", "warnings": capture_timebase_warnings, "video_observed_before_injection_count": len(video_negative_rows), "audio_observed_before_injection_count": len(audio_negative_rows), "non_monotonic_event_timestamps": non_monotonic_labels, }, "clock_alignment": clock_alignment, "clock_uncertainty_ms": clock_uncertainty_ms, "max_age_limit_ms": max_freshness_age_ms, "max_drift_limit_ms": max_freshness_drift_ms, "max_clock_uncertainty_ms": max_clock_uncertainty_ms, "min_paired_events": min_freshness_pairs, "paired_event_count": len(joined), "intentional_audio_delay_ms": audio_delay_ms, "intentional_video_delay_ms": video_delay_ms, "worst_p95_sink_handoff_overhead_ms": max( [ value for value in [ video_sink_handoff_stats.get("p95_ms"), audio_sink_handoff_stats.get("p95_ms"), ] if value is not None ], default=None, ), "worst_p95_pipeline_freshness_ms": freshness_worst_p95_ms, "worst_event_age_p95_ms": freshness_worst_event_p95_ms, "worst_event_age_with_uncertainty_ms": freshness_worst_event_with_uncertainty_ms, "minimum_event_age_ms": freshness_min_event_age_ms, "minimum_pipeline_freshness_ms": freshness_min_event_age_ms, "sync_passed": sync_passed, "worst_p95_freshness_ms": freshness_worst_event_p95_ms, "worst_freshness_drift_ms": freshness_worst_drift_ms, "video_freshness_stats": video_freshness_stats, "audio_freshness_stats": audio_freshness_stats, "video_sink_handoff_overhead_stats": video_sink_handoff_stats, "audio_sink_handoff_overhead_stats": audio_sink_handoff_stats, "video_event_age_stats": video_event_age_stats, "audio_event_age_stats": audio_event_age_stats, "video_freshness_model": video_freshness_model, "audio_freshness_model": audio_freshness_model, "video_sink_handoff_overhead_model": video_sink_handoff_model, "audio_sink_handoff_overhead_model": audio_sink_handoff_model, "video_event_age_model": video_event_age_model, "audio_event_age_model": audio_event_age_model, }, "smoothness": smoothness, "event_visibility": event_visibility, "server_observed_correlation": server_observed_correlation, "server_drift_share_of_observed": server_share, "dominant_layer": dominant_layer, "correction_mode": correction_mode, "video_delay_function_candidate": { "units": "microseconds", "end_to_end": { "intercept_us": round(observed_model.get("intercept_ms", 0.0) * 1000.0), "slope_us_per_s": round(observed_model.get("slope_ms_per_s", 0.0) * 1000.0), "formula": "video_delay_us(t) = intercept_us + slope_us_per_s * seconds_since_first_event", }, "output_path_only": { "intercept_us": round(residual_model.get("intercept_ms", 0.0) * 1000.0), "slope_us_per_s": round(residual_model.get("slope_ms_per_s", 0.0) * 1000.0), "formula": "video_delay_us(t) = intercept_us + slope_us_per_s * seconds_since_first_event", }, }, "events": joined, } pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n") with pathlib.Path(output_csv_path).open("w", newline="", encoding="utf-8") as handle: fieldnames = [ "event_id", "observed_event_id", "observed_server_event_id", "observed_event_code", "match_method", "match_score_ms", "code", "event_time_s", "tethys_video_time_s", "tethys_audio_time_s", "observed_skew_ms", "server_video_feed_s", "server_audio_push_s", "server_video_feed_unix_ns", "server_audio_push_unix_ns", "server_video_pipeline_unix_ns", "server_audio_pipeline_unix_ns", "tethys_video_unix_ns", "tethys_audio_unix_ns", "video_freshness_ms", "audio_freshness_ms", "video_sink_handoff_overhead_ms", "audio_sink_handoff_overhead_ms", "video_event_age_ms", "audio_event_age_ms", "server_feed_delta_ms", "residual_path_skew_ms", "server_video_feed_monotonic_us", "server_audio_push_monotonic_us", "confidence", ] writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() for row in joined: writer.writerow({key: row.get(key) for key in fieldnames}) def fmt_number(value, unit="", precision=1): if value is None: return "n/a" try: value = float(value) except Exception: return "n/a" if not math.isfinite(value): return "n/a" return f"{value:.{precision}f}{unit}" visibility_summary = event_visibility.get("summary") or {} visibility_lines = [ "Coded event visibility", f"- paired={visibility_summary.get('paired', 0)} unpaired={visibility_summary.get('unpaired', 0)} video_only={visibility_summary.get('video_only', 0)} audio_only={visibility_summary.get('audio_only', 0)} missing={visibility_summary.get('missing', 0)}", ] for event in event_visibility.get("events", []): status = event.get("status", "unknown") if status == "paired": detail = ( f"skew={fmt_number(event.get('paired_skew_ms'), ' ms')} " f"conf={fmt_number(event.get('confidence'), '', 3)}" ) else: video_source = event.get("video_source") or "not_seen" audio_source = event.get("audio_source") or "not_seen" detail = f"video={video_source} audio={audio_source}" visibility_lines.append( f"- code {event.get('code')} event {event.get('event_id')}: {status} ({detail})" ) lines = [ f"Output-delay correlation for {report_path}", f"- joined events: {len(joined)}", f"- dominant layer: {dominant_layer}", f"- correction mode: {correction_mode}", f"- observed skew model: {observed_model.get('intercept_ms', 0.0):+.3f} ms + {observed_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", f"- server feed model: {server_model.get('intercept_ms', 0.0):+.3f} ms + {server_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", f"- residual path model: {residual_model.get('intercept_ms', 0.0):+.3f} ms + {residual_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t", f"- server/observed correlation: {server_observed_correlation:+.3f}", f"- server drift share of observed: {server_share:.3f}", "", "Timing map", "- scope: capture-relative server pipeline reference, sink handoff, and Tethys observation", f"- injection point: {timing_map.get('injection_scope')} ({timing_map.get('sink_handoff_path')}); client uplink included={timing_map.get('client_uplink_included')}", f"- freshness clock starts at: {timing_map.get('freshness_clock_starts_at')}", f"- capture to server probe start: {fmt_number(timing_map.get('capture_to_server_start_ms'), ' ms')} (setup/pre-roll, not media latency)", f"- probe warmup before first coded event: {fmt_number(timing_map.get('probe_warmup_ms'), ' ms')}", f"- first event server pipeline refs: audio {fmt_number(first_event_timing.get('server_audio_pipeline_capture_s'), 's', 3)}, video {fmt_number(first_event_timing.get('server_video_pipeline_capture_s'), 's', 3)}", f"- first event sink handoff: audio {fmt_number(first_event_timing.get('server_audio_sink_push_capture_s'), 's', 3)}, video {fmt_number(first_event_timing.get('server_video_sink_feed_capture_s'), 's', 3)}", f"- first event Tethys observation: audio {fmt_number(first_event_timing.get('tethys_audio_observed_s'), 's', 3)}, video {fmt_number(first_event_timing.get('tethys_video_observed_s'), 's', 3)}", f"- first event pipeline freshness: audio {fmt_number(first_event_timing.get('audio_pipeline_freshness_ms'), ' ms')}, video {fmt_number(first_event_timing.get('video_pipeline_freshness_ms'), ' ms')}", "", "Freshness timeline", f"- capture duration: {fmt_number(observed_capture_duration_s if observed_capture_duration_s is not None else configured_capture_duration_s, 's', 3)} (recorder coverage, not media latency; configured {fmt_number(configured_capture_duration_s, 's', 3)})", f"- active probe media: {fmt_number(timing_map.get('active_probe_media_ms'), ' ms')} (warmup + coded events)", f"- setup before probe: {fmt_number(timing_map.get('capture_to_server_start_ms'), ' ms')} (capture readiness/tunnel/probe startup, not media latency)", f"- measured event freshness: video p95 {fmt_number(video_freshness_stats.get('p95_ms'), ' ms')}, audio p95 {fmt_number(audio_freshness_stats.get('p95_ms'), ' ms')}", f"- freshness verdict: {freshness_status} ({freshness_reason})", "", *visibility_lines, "", f"Output freshness for {report_path}", "- scope: clock-corrected server pipeline reference to Tethys observed playback from the same paired signatures", f"- freshness status: {freshness_status} ({freshness_reason})", f"- capture timebase: {capture_timebase_status} ({capture_timebase_reason})", *[f"- capture timebase warning: {warning}" for warning in capture_timebase_warnings], f"- clock uncertainty: +/-{clock_uncertainty_ms:.1f} ms", f"- intentional sync delays: audio {audio_delay_ms:+.1f} ms, video {video_delay_ms:+.1f} ms", f"- sink handoff overhead: video median {fmt_number(video_sink_handoff_stats.get('median_ms'), ' ms')} / p95 {fmt_number(video_sink_handoff_stats.get('p95_ms'), ' ms')} / max {fmt_number(video_sink_handoff_stats.get('max_ms'), ' ms')}; audio median {fmt_number(audio_sink_handoff_stats.get('median_ms'), ' ms')} / p95 {fmt_number(audio_sink_handoff_stats.get('p95_ms'), ' ms')} / max {fmt_number(audio_sink_handoff_stats.get('max_ms'), ' ms')}", f"- pipeline freshness: video median {fmt_number(video_freshness_stats.get('median_ms'), ' ms')} / p95 {fmt_number(video_freshness_stats.get('p95_ms'), ' ms')} / max {fmt_number(video_freshness_stats.get('max_ms'), ' ms')}; audio median {fmt_number(audio_freshness_stats.get('median_ms'), ' ms')} / p95 {fmt_number(audio_freshness_stats.get('p95_ms'), ' ms')} / max {fmt_number(audio_freshness_stats.get('max_ms'), ' ms')}", f"- freshness budget: worst pipeline p95 {fmt_number(freshness_worst_event_p95_ms, ' ms')} + clock uncertainty {clock_uncertainty_ms:.1f} ms = {fmt_number(freshness_worst_event_with_uncertainty_ms, ' ms')} vs limit {max_freshness_age_ms:.1f} ms", f"- video freshness drift: {video_event_age_model.get('drift_ms', 0.0):+.1f} ms over paired events ({video_event_age_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)", f"- audio freshness drift: {audio_event_age_model.get('drift_ms', 0.0):+.1f} ms over paired events ({audio_event_age_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)", "", "Output smoothness", f"- window: {smoothness.get('window_start_s') or 0.0:.3f}s to {smoothness.get('window_end_s') or 0.0:.3f}s", f"- capture media timeline: video span {fmt_number(media_timeline.get('video_span_s'), 's', 3)}, audio span {fmt_number(media_timeline.get('audio_span_s'), 's', 3)}, audio/video span gap {fmt_number(media_timeline.get('audio_video_span_gap_s'), 's', 3)}", f"- video cadence: frames {smoothness.get('video', {}).get('timestamps', 0)}, expected interval {(smoothness.get('video', {}).get('expected_interval_ms') or 0.0):.1f} ms, p95 jitter {(smoothness.get('video', {}).get('jitter_stats', {}).get('p95_jitter_ms') or 0.0):.1f} ms, max interval {(smoothness.get('video', {}).get('interval_stats', {}).get('max_interval_ms') or 0.0):.1f} ms, hiccups {smoothness.get('video', {}).get('hiccup_count', 0)}", f"- video continuity: decoded {smoothness.get('video', {}).get('decoded_frames', 0)}, duplicates {smoothness.get('video', {}).get('duplicate_frames', 0)}, estimated missing {smoothness.get('video', {}).get('estimated_missing_frames', 0)}, undecodable {smoothness.get('video', {}).get('undecodable_frames', 0)}", f"- audio packet cadence: packets {smoothness.get('audio', {}).get('packet_cadence', {}).get('timestamps', 0)}, p95 jitter {(smoothness.get('audio', {}).get('packet_cadence', {}).get('jitter_stats', {}).get('p95_jitter_ms') or 0.0):.1f} ms, max interval {(smoothness.get('audio', {}).get('packet_cadence', {}).get('interval_stats', {}).get('max_interval_ms') or 0.0):.1f} ms, hiccups {smoothness.get('audio', {}).get('packet_cadence', {}).get('hiccup_count', 0)}", f"- audio pilot continuity: low-RMS windows {smoothness.get('audio', {}).get('rms_continuity', {}).get('low_rms_window_count', 0)}, median RMS {(smoothness.get('audio', {}).get('rms_continuity', {}).get('rms_stats', {}).get('median_rms') or 0.0):.1f}", ] summary = "\n".join(lines) + "\n" pathlib.Path(output_txt_path).write_text(summary) print(summary, end="") PY } maybe_apply_output_delay_calibration() { [[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0 [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]] || return 0 # shellcheck disable=SC1090 source "${LOCAL_OUTPUT_DELAY_ENV}" echo "==> UVC/UAC output-delay calibration decision" echo " ↪ output_delay_calibration_json=${LOCAL_OUTPUT_DELAY_JSON}" echo " ↪ output_delay_ready=${output_delay_ready:-false}" echo " ↪ output_delay_decision=${output_delay_decision:-unknown}" echo " ↪ output_delay_target=${output_delay_target:-unknown}" echo " ↪ output_delay_paired_event_count=${output_delay_paired_event_count:-0}" echo " ↪ output_delay_measured_skew_ms=${output_delay_measured_skew_ms:-0.0}" echo " ↪ output_delay_drift_ms=${output_delay_drift_ms:-0.0}" echo " ↪ output_delay_audio_delta_us=${output_delay_audio_delta_us:-0}" echo " ↪ output_delay_video_delta_us=${output_delay_video_delta_us:-0}" echo " ↪ output_delay_active_audio_offset_us=${output_delay_active_audio_offset_us:-0}" echo " ↪ output_delay_active_video_offset_us=${output_delay_active_video_offset_us:-0}" echo " ↪ output_delay_audio_target_offset_us=${output_delay_audio_target_offset_us:-0}" echo " ↪ output_delay_video_target_offset_us=${output_delay_video_target_offset_us:-0}" echo " ↪ output_delay_apply_mode=${output_delay_apply_mode:-${LESAVKA_OUTPUT_DELAY_APPLY_MODE}}" echo " ↪ output_delay_note=${output_delay_note:-}" if [[ "${output_delay_ready:-false}" != "true" ]]; then echo " ↪ output delay calibration apply refused: ${output_delay_note:-not ready}" return 0 fi if [[ "${LESAVKA_OUTPUT_DELAY_APPLY}" == "0" ]]; then echo " ↪ output delay calibration apply disabled" return 0 fi local apply_mode="${output_delay_apply_mode:-${LESAVKA_OUTPUT_DELAY_APPLY_MODE}}" local apply_audio_delta="${output_delay_audio_delta_us:-0}" local apply_video_delta="${output_delay_video_delta_us:-0}" if [[ "${apply_mode}" == "absolute" ]]; then local calibration_output current_audio_offset_us current_video_offset_us if ! calibration_output="$( LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ calibration 2>&1 )"; then echo " ↪ output delay calibration apply refused: current calibration query failed" echo "${calibration_output}" >&2 return 0 fi current_audio_offset_us="$(awk -F= '/^calibration_active_audio_offset_us=/{print $2; exit}' <<<"${calibration_output}")" current_video_offset_us="$(awk -F= '/^calibration_active_video_offset_us=/{print $2; exit}' <<<"${calibration_output}")" if [[ ! "${current_audio_offset_us}" =~ ^-?[0-9]+$ || ! "${current_video_offset_us}" =~ ^-?[0-9]+$ ]]; then echo " ↪ output delay calibration apply refused: could not parse active calibration offsets" echo "${calibration_output}" >&2 return 0 fi apply_audio_delta=$(( ${output_delay_audio_target_offset_us:-0} - current_audio_offset_us )) apply_video_delta=$(( ${output_delay_video_target_offset_us:-0} - current_video_offset_us )) echo " ↪ current_active_audio_offset_us=${current_audio_offset_us}" echo " ↪ current_active_video_offset_us=${current_video_offset_us}" echo " ↪ absolute_target_audio_offset_us=${output_delay_audio_target_offset_us:-0}" echo " ↪ absolute_target_video_offset_us=${output_delay_video_target_offset_us:-0}" elif [[ "${apply_mode}" != "relative" ]]; then echo " ↪ output delay calibration apply refused: unsupported apply mode ${apply_mode}" return 0 fi echo " ↪ calibration_apply_audio_delta_us=${apply_audio_delta}" echo " ↪ calibration_apply_video_delta_us=${apply_video_delta}" echo "==> applying measured UVC/UAC output-delay calibration" LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ calibrate \ "${apply_audio_delta}" \ "${apply_video_delta}" \ "${output_delay_note:-direct UVC/UAC output-delay calibration}" if [[ "${LESAVKA_OUTPUT_DELAY_SAVE}" != "0" ]]; then echo "==> saving measured UVC/UAC output-delay calibration as default" LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ calibration-save-default fi } maybe_run_output_delay_confirmation() { [[ "${LESAVKA_OUTPUT_DELAY_CONFIRM}" != "0" ]] || return 0 [[ "${LESAVKA_OUTPUT_DELAY_APPLY}" != "0" ]] || return 0 [[ "${LESAVKA_OUTPUT_DELAY_CONFIRMING:-0}" != "1" ]] || return 0 [[ "${output_delay_ready:-false}" == "true" ]] || return 0 local confirm_audio_delay="${output_delay_audio_target_offset_us:-0}" local confirm_video_delay="${output_delay_video_target_offset_us:-0}" local confirm_output_dir="${LOCAL_REPORT_DIR}/confirmation" mkdir -p "${confirm_output_dir}" echo "==> confirming fixed UVC/UAC output-delay calibration" echo " ↪ confirmation_audio_delay_us=${confirm_audio_delay}" echo " ↪ confirmation_video_delay_us=${confirm_video_delay}" echo " ↪ confirmation_requires_sync_pass=1" LESAVKA_OUTPUT_DELAY_CONFIRMING=1 \ LESAVKA_OUTPUT_DELAY_CONFIRM=0 \ LESAVKA_OUTPUT_DELAY_APPLY=0 \ LESAVKA_OUTPUT_DELAY_SAVE=0 \ LESAVKA_OUTPUT_REQUIRE_SYNC_PASS=1 \ LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US="${confirm_audio_delay}" \ LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US="${confirm_video_delay}" \ PROBE_PREBUILD=0 \ LOCAL_OUTPUT_DIR="${confirm_output_dir}" \ "${SCRIPT_DIR}/run_upstream_av_sync.sh" } enforce_sync_verdict() { [[ "${LESAVKA_OUTPUT_REQUIRE_SYNC_PASS}" != "0" ]] || return 0 [[ -f "${LOCAL_ANALYSIS_JSON}" ]] || { echo "required sync pass unavailable: missing ${LOCAL_ANALYSIS_JSON}" >&2 exit 94 } python3 - <<'PY' "${LOCAL_ANALYSIS_JSON}" import json import pathlib import sys report = json.loads(pathlib.Path(sys.argv[1]).read_text()) verdict = report.get("verdict") or {} if verdict.get("passed") is True: raise SystemExit(0) print( "required sync pass failed: " f"{verdict.get('status', 'unknown')} - {verdict.get('reason', '')}", file=sys.stderr, ) raise SystemExit(94) PY } if [[ "${PROBE_PREBUILD}" != "0" ]]; then echo "==> prebuilding relay control/analyzer before opening the capture window" ( cd "${REPO_ROOT}" cargo build -p lesavka_client --bin lesavka-sync-analyze --bin lesavka-relayctl ) fi if [[ ! -x "${ANALYZE_BIN}" ]]; then echo "sync analyzer binary not found at ${ANALYZE_BIN}" >&2 exit 1 fi resolve_server_addr echo "==> resolved Lesavka server addr: ${RESOLVED_LESAVKA_SERVER_ADDR}" if [[ -n "${SERVER_TUNNEL_PID}" ]]; then echo " ↪ tunneled to ${LESAVKA_SERVER_HOST}:127.0.0.1:${SERVER_TUNNEL_REMOTE_PORT}" fi print_lesavka_versions preflight_server_path write_clock_alignment echo "==> starting Tethys capture on ${TETHYS_HOST}" ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ "${REMOTE_CAPTURE}" \ "${CAPTURE_SECONDS}" \ "${REMOTE_VIDEO_DEVICE}" \ "${VIDEO_SIZE}" \ "${VIDEO_FPS}" \ "${VIDEO_FORMAT}" \ "${REMOTE_CAPTURE_STACK}" \ "${REMOTE_PULSE_CAPTURE_TOOL}" \ "${REMOTE_PULSE_VIDEO_MODE}" \ "${REMOTE_PULSE_AUDIO_ANCHOR_SILENCE}" \ "${REMOTE_AUDIO_SOURCE}" \ "${REMOTE_AUDIO_QUIESCE_USER_AUDIO}" \ "${REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK}" \ "${REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS}" \ "${REMOTE_CAPTURE_READY_SETTLE_SECONDS}" \ > >(tee "${LOCAL_CAPTURE_LOG}") \ 2> >(tee -a "${LOCAL_CAPTURE_LOG}" >&2) <<'REMOTE_CAPTURE_SCRIPT' & set -euo pipefail remote_capture=$1 capture_seconds=$2 remote_video_device=$3 video_size=$4 video_fps=$5 video_format=$6 remote_capture_stack=$7 remote_pulse_capture_tool=$8 remote_pulse_video_mode=$9 remote_pulse_audio_anchor_silence=${10} remote_audio_source=${11} remote_audio_quiesce_user_audio=${12} remote_capture_allow_alsa_fallback=${13} remote_capture_preroll_discard_seconds=${14} remote_capture_ready_settle_seconds=${15} rm -f "${remote_capture}" restore_user_audio() { systemctl --user start pipewire.socket pipewire-pulse.socket wireplumber.service >/dev/null 2>&1 || true sleep 1 systemctl --user start pipewire.service pipewire-pulse.service >/dev/null 2>&1 || true } quiesce_user_audio() { systemctl --user stop pipewire-pulse.service pipewire.service wireplumber.service \ pipewire-pulse.socket pipewire.socket >/dev/null 2>&1 || true sleep 1 } resolve_video_device() { local requested=$1 if [[ "${requested}" != "auto" ]]; then printf '%s\n' "${requested}" return 0 fi local by_id by_id=$(find /dev/v4l/by-id -maxdepth 1 -type l -name '*Lesavka*video-index0' 2>/dev/null | head -n1 || true) if [[ -n "${by_id}" ]]; then printf '%s\n' "${by_id}" return 0 fi if command -v v4l2-ctl >/dev/null 2>&1; then local resolved resolved="$( v4l2-ctl --list-devices 2>/dev/null \ | awk ' BEGIN { want=0 } /Lesavka Composite: UVC Camera/ { want=1; next } /^[^ \t]/ { want=0 } want && /^[ \t]+\/dev\/video[0-9]+/ { gsub(/^[ \t]+/, "", $0) print exit } ' )" if [[ -n "${resolved}" ]]; then printf '%s\n' "${resolved}" return 0 fi fi printf 'Lesavka UVC video device not found on Tethys; refusing to fall back to an unrelated webcam/capture card.\n' >&2 exit 64 } resolve_pulse_source() { if ! command -v pactl >/dev/null 2>&1; then return 1 fi pactl list short sources 2>/dev/null \ | awk ' /alsa_input\..*Lesavka_Composite/ { print $2; found=1; exit } /Lesavka_Composite/ && !fallback { fallback=$2 } END { if (found) exit 0 if (fallback != "") { print fallback; exit 0 } exit 1 } ' } resolve_alsa_audio_device() { if ! command -v arecord >/dev/null 2>&1; then return 1 fi arecord -l 2>/dev/null | awk ' /^card [0-9]+:/ && ($0 ~ /Lesavka|UAC2_Gadget|UAC2Gadget|Composite/) { card=$2 sub(":", "", card) for (i = 1; i <= NF; i++) { if ($i == "device") { dev=$(i + 1) sub(":", "", dev) printf "hw:%s,%s\n", card, dev found=1 exit 0 } } } END { if (!found) exit 1 } ' } gst_video_source_caps() { case "${video_format}" in ""|mjpeg|MJPG) printf 'image/jpeg,width=%s,height=%s,framerate=%s/1' \ "${resolved_video_size%x*}" \ "${resolved_video_size#*x}" \ "${resolved_video_fps}" ;; yuyv422|YUYV|yuyv) printf 'video/x-raw,format=YUY2,width=%s,height=%s,framerate=%s/1' \ "${resolved_video_size%x*}" \ "${resolved_video_size#*x}" \ "${resolved_video_fps}" ;; *) printf 'unsupported gst video_format=%s\n' "${video_format}" >&2 exit 64 ;; esac } gst_video_decode_chain() { case "${video_format}" in ""|mjpeg|MJPG) printf 'jpegdec ! ' ;; yuyv422|YUYV|yuyv) printf '' ;; *) printf 'unsupported gst video_format=%s\n' "${video_format}" >&2 exit 64 ;; esac } gst_audio_mixer_element() { if gst-inspect-1.0 audiomixer 2>/dev/null | grep -q 'ignore-inactive-pads'; then printf 'audiomixer name=amix ignore-inactive-pads=true' else printf 'audiomixer name=amix' fi } current_video_profile() { if ! command -v v4l2-ctl >/dev/null 2>&1; then return 1 fi v4l2-ctl -d "${resolved_video_device}" --all 2>/dev/null \ | awk ' /Width\/Height[[:space:]]*:/ { split($0, a, ":") gsub(/^[ \t]+/, "", a[2]) split(a[2], wh, "/") width=wh[1] height=wh[2] next } /Frames per second:/ { fps=$4 sub(/\..*/, "", fps) } END { if (width != "" && height != "") { print "size=" width "x" height } if (fps != "") { print "fps=" fps } } ' } resolve_video_size() { local requested=$1 if [[ "${requested}" != "auto" ]]; then printf '%s\n' "${requested}" return 0 fi local current_profile current_profile="$(current_video_profile || true)" local current_size current_size="$(awk -F= '/^size=/{print $2; exit}' <<<"${current_profile}")" if [[ -n "${current_size}" ]]; then printf '%s\n' "${current_size}" return 0 fi if ! command -v v4l2-ctl >/dev/null 2>&1; then printf '640x480\n' return 0 fi local listing listing="$(v4l2-ctl -d "${resolved_video_device}" --list-formats-ext 2>/dev/null || true)" local preferred for preferred in 1920x1080 1360x768 1280x720; do if grep -q "Size: Discrete ${preferred}" <<<"${listing}"; then printf '%s\n' "${preferred}" return 0 fi done local first_size first_size="$(grep -m1 -o 'Size: Discrete [0-9]\+x[0-9]\+' <<<"${listing}" | awk '{print $3}' || true)" if [[ -n "${first_size}" ]]; then printf '%s\n' "${first_size}" return 0 fi printf '640x480\n' } resolve_video_fps() { local requested=$1 if [[ "${requested}" != "auto" ]]; then printf '%s\n' "${requested}" return 0 fi local current_profile current_profile="$(current_video_profile || true)" local current_fps current_fps="$(awk -F= '/^fps=/{print $2; exit}' <<<"${current_profile}")" if [[ -n "${current_fps}" ]]; then printf '%s\n' "${current_fps}" return 0 fi if ! command -v v4l2-ctl >/dev/null 2>&1; then printf '20\n' return 0 fi local listing listing="$(v4l2-ctl -d "${resolved_video_device}" --list-formats-ext 2>/dev/null || true)" local first_fps first_fps="$(grep -m1 -o '[0-9]\+\.[0-9]\+ fps' <<<"${listing}" | awk '{sub(/\..*/, "", $1); print $1}' || true)" if [[ -n "${first_fps}" ]]; then printf '%s\n' "${first_fps}" return 0 fi printf '20\n' } resolve_pw_audio_target() { if ! command -v pw-dump >/dev/null 2>&1 || ! command -v python3 >/dev/null 2>&1; then return 1 fi pw-dump | python3 -c ' import json import sys try: objs = json.load(sys.stdin) except Exception: raise SystemExit(1) for obj in objs: if obj.get("type") != "PipeWire:Interface:Node": continue props = (obj.get("info") or {}).get("props") or {} if props.get("media.class") != "Audio/Source": continue serial = props.get("object.serial") name = props.get("node.name", "") desc = props.get("node.description", "") if serial is None: continue if "Lesavka_Composite" in name or "Lesavka Composite" in desc: print(serial) raise SystemExit(0) raise SystemExit(1) ' } capture_mode="" alsa_audio_dev="hw:3,0" pulse_source="" pw_audio_target="" case "${remote_capture_stack}" in auto) if [[ "${remote_audio_source}" == "auto" ]]; then if pulse_source="$(resolve_pulse_source)"; then capture_mode="pulse" elif command -v pw-record >/dev/null 2>&1 \ && command -v pw-v4l2 >/dev/null 2>&1 \ && pw_audio_target="$(resolve_pw_audio_target)"; then capture_mode="pwpipe" elif [[ "${remote_capture_allow_alsa_fallback}" == "1" ]] && alsa_audio_dev="$(resolve_alsa_audio_device)"; then capture_mode="alsa" printf 'PipeWire Lesavka source not found; using explicit diagnostic ALSA fallback device %s\n' "${alsa_audio_dev}" >&2 else printf 'Lesavka Pulse/PipeWire audio source not found; refusing raw ALSA fallback for timing-sensitive capture.\n' >&2 printf 'Set REMOTE_CAPTURE_STACK=alsa or REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK=1 only for diagnostic signal-presence checks.\n' >&2 exit 64 fi elif [[ "${remote_audio_source}" == pulse:* ]]; then capture_mode="pulse" pulse_source="${remote_audio_source#pulse:}" elif [[ "${remote_audio_source}" == alsa:* ]]; then capture_mode="alsa" alsa_audio_dev="${remote_audio_source#alsa:}" else printf 'unsupported REMOTE_AUDIO_SOURCE=%s\n' "${remote_audio_source}" >&2 exit 64 fi ;; pwpipe) if ! command -v pw-record >/dev/null 2>&1 || ! command -v pw-v4l2 >/dev/null 2>&1; then printf 'REMOTE_CAPTURE_STACK=pwpipe requires pw-record and pw-v4l2\n' >&2 exit 64 fi pw_audio_target="$(resolve_pw_audio_target)" || { printf 'PipeWire Lesavka capture target not found for REMOTE_CAPTURE_STACK=pwpipe\n' >&2 exit 64 } capture_mode="pwpipe" ;; pulse) if [[ "${remote_audio_source}" == pulse:* ]]; then pulse_source="${remote_audio_source#pulse:}" elif [[ "${remote_audio_source}" == "auto" ]]; then pulse_source="$(resolve_pulse_source)" || { printf 'PipeWire Lesavka source not found for REMOTE_CAPTURE_STACK=pulse\n' >&2 exit 64 } else pulse_source="${remote_audio_source}" fi capture_mode="pulse" ;; alsa) if [[ "${remote_audio_source}" == alsa:* ]]; then alsa_audio_dev="${remote_audio_source#alsa:}" elif [[ "${remote_audio_source}" != "auto" ]]; then alsa_audio_dev="${remote_audio_source}" fi capture_mode="alsa" ;; *) printf 'unsupported REMOTE_CAPTURE_STACK=%s\n' "${remote_capture_stack}" >&2 exit 64 ;; esac resolved_video_device="$(resolve_video_device "${remote_video_device}")" resolved_video_size="$(resolve_video_size "${video_size}")" resolved_video_fps="$(resolve_video_fps "${video_fps}")" printf 'using video device: %s\n' "${resolved_video_device}" >&2 printf 'using video mode: %s @ %s fps (%s)\n' "${resolved_video_size}" "${resolved_video_fps}" "${video_format:-driver-default}" >&2 video_args=(-f video4linux2 -framerate "${resolved_video_fps}" -video_size "${resolved_video_size}") if [[ -n "${video_format}" ]]; then video_args+=(-input_format "${video_format}") fi gst_source_caps="$(gst_video_source_caps)" gst_decode_chain="$(gst_video_decode_chain)" gst_audio_mixer="$(gst_audio_mixer_element)" run_ffmpeg_capture() { local rc=0 announce_capture_start timeout --kill-after=5 --signal=INT "$((capture_seconds + 5))" "$@" &2 } signal_capture_ready() { if [[ "${remote_capture_ready_settle_seconds}" =~ ^[0-9]+([.][0-9]+)?$ ]]; then sleep "${remote_capture_ready_settle_seconds}" fi printf '%s\n' "__LESAVKA_CAPTURE_READY__" } run_tolerant_capture() { announce_capture_start "$@" & local capture_pid=$! signal_capture_ready wait "${capture_pid}" || true } quiesce_for_alsa=0 case "${remote_audio_quiesce_user_audio}" in 1|true|yes) quiesce_for_alsa=1 ;; auto) if [[ "${capture_mode}" == "alsa" ]]; then quiesce_for_alsa=1 fi ;; 0|false|no) quiesce_for_alsa=0 ;; *) printf 'unsupported REMOTE_AUDIO_QUIESCE_USER_AUDIO=%s\n' "${remote_audio_quiesce_user_audio}" >&2 exit 64 ;; esac if [[ "${capture_mode}" == "alsa" && "${quiesce_for_alsa}" == "1" ]]; then printf 'quiescing Tethys user audio before raw ALSA capture\n' >&2 quiesce_user_audio trap restore_user_audio EXIT fi discard_preroll_capture() { local seconds=$1 [[ "${seconds}" =~ ^[0-9]+$ ]] || return 0 [[ "${seconds}" -gt 0 ]] || return 0 local discard="/tmp/lesavka-output-delay-preroll-discard-$RANDOM.mkv" printf 'discarding %ss of post-enumeration capture before probe\n' "${seconds}" >&2 case "${capture_mode}" in pulse) if command -v ffmpeg >/dev/null 2>&1; then timeout --kill-after=3 --signal=INT "$((seconds + 5))" ffmpeg -nostdin -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f pulse \ -i "${pulse_source}" \ -t "${seconds}" \ -c:v copy \ -c:a pcm_s16le \ "${discard}" /dev/null 2>&1 || true fi ;; alsa) if command -v ffmpeg >/dev/null 2>&1; then timeout --kill-after=3 --signal=INT "$((seconds + 5))" ffmpeg -nostdin -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f alsa -ac 2 -ar 48000 \ -i "${alsa_audio_dev}" \ -t "${seconds}" \ -c:v copy \ -c:a pcm_s16le \ "${discard}" /dev/null 2>&1 || true fi ;; *) sleep "${seconds}" ;; esac rm -f "${discard}" } discard_preroll_capture "${remote_capture_preroll_discard_seconds}" if [[ "${capture_mode}" == "pwpipe" ]]; then printf 'using PipeWire-native mux capture target serial: %s\n' "${pw_audio_target}" >&2 announce_capture_start ( timeout "${capture_seconds}" pw-record \ --target "${pw_audio_target}" \ --rate 48000 \ --channels 2 \ --format s16 \ --raw - \ | pw-v4l2 ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f s16le -ar 48000 -ac 2 \ -i pipe:0 \ -t "${capture_seconds}" \ -c:v copy \ -c:a pcm_s16le \ "${remote_capture}" ) & capture_pid=$! signal_capture_ready wait "${capture_pid}" elif [[ "${capture_mode}" == "pulse" ]]; then printf 'using Pulse source: %s\n' "${pulse_source}" >&2 case "${remote_pulse_capture_tool}" in ffmpeg) case "${remote_pulse_video_mode}" in copy) run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f pulse \ -i "${pulse_source}" \ -map 0:v:0 \ -map 1:a:0 \ -t "${capture_seconds}" \ -c:v copy \ -c:a pcm_s16le \ "${remote_capture}" ;; cfr) run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f pulse \ -i "${pulse_source}" \ -map 0:v:0 \ -map 1:a:0 \ -t "${capture_seconds}" \ -vf "fps=${resolved_video_fps}" \ -c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \ -c:a pcm_s16le \ "${remote_capture}" ;; *) printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2 exit 64 ;; esac ;; gst) if [[ "${remote_pulse_audio_anchor_silence}" == "1" ]]; then printf 'anchoring Pulse capture audio timeline with generated silence\n' >&2 fi case "${remote_pulse_video_mode}" in copy) if [[ "${video_format}" != "mjpeg" && "${video_format}" != "MJPG" && -n "${video_format}" ]]; then printf 'gst copy mode only supports mjpeg input, got %s\n' "${video_format}" >&2 exit 64 fi if [[ "${remote_pulse_audio_anchor_silence}" == "1" ]]; then run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device="${resolved_video_device}" do-timestamp=true ! \ ${gst_source_caps} ! \ queue ! mux. \ ${gst_audio_mixer} ! \ audio/x-raw,rate=48000,channels=2 ! \ queue ! mux. \ audiotestsrc wave=silence is-live=true do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ queue ! amix. \ pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \ queue ! amix. else run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device="${resolved_video_device}" do-timestamp=true ! \ ${gst_source_caps} ! \ queue ! mux. \ pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \ queue ! mux. fi ;; cfr) if [[ "${remote_pulse_audio_anchor_silence}" == "1" ]]; then run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device="${resolved_video_device}" do-timestamp=true ! \ ${gst_source_caps} ! \ ${gst_decode_chain} \ videoconvert ! videorate ! video/x-raw,framerate="${resolved_video_fps}"/1 ! \ x264enc tune=zerolatency speed-preset=ultrafast key-int-max=1 bitrate=5000 ! \ h264parse ! \ queue ! mux. \ ${gst_audio_mixer} ! \ audio/x-raw,rate=48000,channels=2 ! \ queue ! mux. \ audiotestsrc wave=silence is-live=true do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ queue ! amix. \ pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \ queue ! amix. else run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device="${resolved_video_device}" do-timestamp=true ! \ ${gst_source_caps} ! \ ${gst_decode_chain} \ videoconvert ! videorate ! video/x-raw,framerate="${resolved_video_fps}"/1 ! \ x264enc tune=zerolatency speed-preset=ultrafast key-int-max=1 bitrate=5000 ! \ h264parse ! \ queue ! mux. \ pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \ queue ! mux. fi ;; *) printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2 exit 64 ;; esac ;; *) printf 'unsupported REMOTE_PULSE_CAPTURE_TOOL=%s\n' "${remote_pulse_capture_tool}" >&2 exit 64 ;; esac else case "${remote_pulse_video_mode}" in copy) run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f alsa -ac 2 -ar 48000 \ -i "${alsa_audio_dev}" \ -t "${capture_seconds}" \ -c:v copy \ -c:a pcm_s16le \ "${remote_capture}" ;; cfr) run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i "${resolved_video_device}" \ -thread_queue_size 1024 \ -f alsa -ac 2 -ar 48000 \ -i "${alsa_audio_dev}" \ -t "${capture_seconds}" \ -vf "fps=${resolved_video_fps}" \ -c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \ -c:a pcm_s16le \ "${remote_capture}" ;; *) printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2 exit 64 ;; esac fi REMOTE_CAPTURE_SCRIPT capture_pid=$! wait_for_capture_ready() { local tries=100 local i=0 while (( i < tries )); do if [[ -f "${LOCAL_CAPTURE_LOG}" ]] && grep -q "${CAPTURE_READY_MARKER}" "${LOCAL_CAPTURE_LOG}"; then return 0 fi if ! kill -0 "${capture_pid}" >/dev/null 2>&1; then capture_status=0 wait "${capture_pid}" || capture_status=$? echo "Tethys capture failed before the sync probe could start; see ${LOCAL_CAPTURE_LOG} for details." >&2 exit "${capture_status}" fi sleep 0.1 ((i += 1)) done echo "Timed out waiting for Tethys capture to become ready; see ${LOCAL_CAPTURE_LOG} for details." >&2 exit 90 } wait_for_capture_ready sleep "${LEAD_IN_SECONDS}" run_output_delay_probe() { local label=$1 local duration_seconds=$2 local warmup_seconds=$3 local event_width_codes=$4 local audio_delay_us=$5 local video_delay_us=$6 local timeout_seconds=$7 local reply_path=$8 set +e ( cd "${REPO_ROOT}" LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ timeout --signal=INT "${timeout_seconds}" \ "${REPO_ROOT}/target/debug/lesavka-relayctl" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ output-delay-probe \ "${duration_seconds}" \ "${warmup_seconds}" \ "${PROBE_PULSE_PERIOD_MS}" \ "${PROBE_PULSE_WIDTH_MS}" \ "${event_width_codes}" \ "${audio_delay_us}" \ "${video_delay_us}" ) 2>&1 | tee "${reply_path}" local status=${PIPESTATUS[0]} set -e if [[ "${status}" -eq 124 ]]; then printf '%s output-delay probe timed out after %ss\n' "${label}" "${timeout_seconds}" >&2 fi return "${status}" } conditioning_status=0 conditioning_timed_out=0 if [[ "${PROBE_CONDITIONING_SECONDS}" =~ ^[0-9]+$ && "${PROBE_CONDITIONING_SECONDS}" -gt 0 ]]; then echo "==> conditioning Tethys UVC/UAC signal path before measured probe" echo " ↪ conditioning duration=${PROBE_CONDITIONING_SECONDS}s warmup=${PROBE_CONDITIONING_WARMUP_SECONDS}s gap=${PROBE_CONDITIONING_GAP_SECONDS}s" run_output_delay_probe \ "signal-conditioning" \ "${PROBE_CONDITIONING_SECONDS}" \ "${PROBE_CONDITIONING_WARMUP_SECONDS}" \ "${PROBE_CONDITIONING_EVENT_WIDTH_CODES}" \ "${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US}" \ "${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US}" \ "${PROBE_CONDITIONING_TIMEOUT_SECONDS}" \ "${LOCAL_SIGNAL_CONDITIONING_REPLY}" || conditioning_status=$? if [[ "${conditioning_status}" -eq 124 ]]; then conditioning_timed_out=1 fi extract_signal_conditioning_timeline if [[ "${conditioning_status}" -ne 0 ]]; then echo "signal-conditioning output-delay probe failed with status ${conditioning_status}" >&2 exit "${conditioning_status}" fi sleep "${PROBE_CONDITIONING_GAP_SECONDS}" fi echo "==> running server-generated UVC/UAC output-delay probe against ${RESOLVED_LESAVKA_SERVER_ADDR}" probe_status=0 probe_timed_out=0 run_output_delay_probe \ "server" \ "${PROBE_DURATION_SECONDS}" \ "${PROBE_WARMUP_SECONDS}" \ "${PROBE_EVENT_WIDTH_CODES}" \ "${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US}" \ "${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US}" \ "${PROBE_TIMEOUT_SECONDS}" \ "${LOCAL_SERVER_PROBE_REPLY}" || probe_status=$? if [[ "${probe_status}" -eq 124 ]]; then probe_timed_out=1 fi extract_server_timeline capture_status=0 wait "${capture_pid}" || capture_status=$? capture_v4l2_fault=0 if [[ -f "${LOCAL_CAPTURE_LOG}" ]] \ && grep -q 'VIDIOC_QBUF): Bad file descriptor' "${LOCAL_CAPTURE_LOG}"; then capture_v4l2_fault=1 fi capture_streamon_timeout=0 if [[ -f "${LOCAL_CAPTURE_LOG}" ]] \ && grep -q 'VIDIOC_STREAMON.*Connection timed out' "${LOCAL_CAPTURE_LOG}"; then capture_streamon_timeout=1 fi if retry_remote_artifact_command \ "checking remote capture on ${TETHYS_HOST}" \ ssh ${SSH_OPTS} "${TETHYS_HOST}" "test -f '${REMOTE_CAPTURE}'"; then remote_fetch_capture="${REMOTE_CAPTURE}" analysis_status=0 if [[ "${ANALYSIS_NORMALIZE}" != "0" ]]; then remote_fetch_capture="${REMOTE_CAPTURE%.mkv}-analysis.mkv" echo "==> normalizing remote capture to CFR for analysis" normalize_status=0 ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ "${REMOTE_CAPTURE}" \ "${remote_fetch_capture}" \ "${VIDEO_FPS}" \ "${ANALYSIS_SCALE_WIDTH}" <<'REMOTE_NORMALIZE_SCRIPT' || normalize_status=$? set -euo pipefail src=$1 dst=$2 fps=$3 scale_width=$4 ffmpeg -hide_banner -loglevel error -y \ -i "${src}" \ -vf "fps=${fps},scale='min(${scale_width},iw)':-2" \ -c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \ -c:a pcm_s16le \ "${dst}" REMOTE_NORMALIZE_SCRIPT if [[ "${normalize_status}" -ne 0 ]]; then echo "remote CFR normalization failed; falling back to raw capture" >&2 remote_fetch_capture="${REMOTE_CAPTURE}" fi fi if [[ "${REMOTE_ANALYZE}" != "0" ]]; then if [[ "${REMOTE_ANALYZE_COPY}" != "0" ]]; then echo "==> copying sync analyzer to ${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}" retry_remote_artifact_command \ "copying sync analyzer to ${TETHYS_HOST}" \ scp ${SSH_OPTS} "${ANALYZE_BIN}" "${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}" fi analysis_window_arg="$(compute_analysis_window_arg)" if [[ -n "${analysis_window_arg}" ]]; then echo " ↪ analyzer timeline window: ${analysis_window_arg#--analysis-window-s }" fi echo "==> analyzing capture on ${TETHYS_HOST}" analysis_tmp="${LOCAL_ANALYSIS_JSON}.tmp" analysis_error_tmp="${LOCAL_ANALYSIS_ERROR_LOG}.tmp" rm -f "${analysis_tmp}" "${analysis_error_tmp}" "${LOCAL_ANALYSIS_ERROR_LOG}" set +e retry_remote_artifact_command \ "running remote sync analysis on ${TETHYS_HOST}" \ run_remote_sync_analysis_once "${analysis_tmp}" "${analysis_error_tmp}" analysis_status=$? set -e if [[ -s "${analysis_error_tmp}" ]]; then cp "${analysis_error_tmp}" "${LOCAL_ANALYSIS_ERROR_LOG}" fi if [[ "${analysis_status}" -eq 0 ]]; then mv "${analysis_tmp}" "${LOCAL_ANALYSIS_JSON}" rm -f "${analysis_error_tmp}" else rm -f "${analysis_tmp}" # Keep the .tmp copy too. If this script exits on analysis failure, the # readiness summarizer can still parse whichever error artifact survived. fi fi if [[ "${FETCH_CAPTURE}" != "0" ]]; then echo "==> fetching capture back to ${LOCAL_CAPTURE}" fetch_capture_status=0 retry_remote_artifact_command \ "fetching capture from ${TETHYS_HOST}" \ scp ${SSH_OPTS} "${TETHYS_HOST}:${remote_fetch_capture}" "${LOCAL_CAPTURE}" \ || fetch_capture_status=$? if [[ "${fetch_capture_status}" -ne 0 ]]; then echo "warning: failed to fetch capture artifact; continuing with remote analysis JSON when available" >&2 if [[ "${REMOTE_ANALYZE}" == "0" ]]; then exit "${fetch_capture_status}" fi fi fi if [[ "${analysis_status}" -ne 0 ]]; then echo "remote analysis failed with status ${analysis_status}; capture preserved at ${LOCAL_CAPTURE}" >&2 exit "${analysis_status}" fi fi if [[ "${probe_status}" -ne 0 ]]; then if [[ "${probe_timed_out}" -eq 1 ]]; then echo "server output-delay probe timed out after ${PROBE_TIMEOUT_SECONDS}s; this usually means one UVC/UAC output sink did not close cleanly." >&2 fi echo "server output-delay probe failed with status ${probe_status}" >&2 [[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2 exit "${probe_status}" fi if [[ "${capture_status}" -ne 0 ]]; then if [[ "${capture_streamon_timeout}" -eq 1 ]]; then echo "Tethys capture timed out during VIDIOC_STREAMON; the UVC host opened before MJPEG frames reached the gadget." >&2 echo "Keep LEAD_IN_SECONDS=0 and restart lesavka-uvc/lesavka-server before retrying if the gadget is wedged from an earlier failed run." >&2 fi if [[ "${capture_status}" -eq 141 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved analysis artifacts" >&2 elif [[ "${capture_status}" -eq 124 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then echo "Tethys capture timed out after preserving analysis artifacts; accepting the run for analysis" >&2 else echo "Tethys capture failed with status ${capture_status}" >&2 [[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2 exit "${capture_status}" fi fi if [[ "${REMOTE_ANALYZE}" != "0" ]]; then if [[ ! -f "${LOCAL_ANALYSIS_JSON}" ]]; then echo "remote analysis did not produce ${LOCAL_ANALYSIS_JSON}" >&2 exit 92 fi echo "==> remote analysis summary" python - <<'PY' "${LOCAL_ANALYSIS_JSON}" "${LOCAL_REPORT_TXT}" "${LOCAL_EVENTS_CSV}" import csv import json import pathlib import sys report = json.loads(pathlib.Path(sys.argv[1]).read_text()) verdict = report.get('verdict', {}) cal = report.get('calibration', {}) lines = [ f"A/V sync report for {sys.argv[1]}", f"- verdict: {verdict.get('status', 'unknown')} ({'pass' if verdict.get('passed') else 'fail'})", f"- verdict reason: {verdict.get('reason', '')}", f"- p95 abs skew: {float(verdict.get('p95_abs_skew_ms', 0.0)):.1f} ms", f"- video onsets: {report['video_event_count']}", f"- audio onsets: {report['audio_event_count']}", f"- paired pulses: {report['paired_event_count']}", f"- activity start delta: {report.get('activity_start_delta_ms', 0.0):+.1f} ms (audio after video is positive)", f"- first skew: {report['first_skew_ms']:+.1f} ms (audio after video is positive)", f"- last skew: {report['last_skew_ms']:+.1f} ms", f"- mean skew: {report['mean_skew_ms']:+.1f} ms", f"- median skew: {report['median_skew_ms']:+.1f} ms", f"- max abs skew: {report['max_abs_skew_ms']:.1f} ms", f"- drift: {report['drift_ms']:+.1f} ms", f"- calibration ready: {cal.get('ready')}", f"- recommended audio offset adjust: {int(cal.get('recommended_audio_offset_adjust_us', 0)):+d} us", f"- alternative video offset adjust: {int(cal.get('recommended_video_offset_adjust_us', 0)):+d} us", f"- calibration note: {cal.get('note', '')}", ] summary = "\n".join(lines) + "\n" pathlib.Path(sys.argv[2]).write_text(summary) with pathlib.Path(sys.argv[3]).open("w", newline="") as handle: writer = csv.DictWriter( handle, fieldnames=[ "event_id", "server_event_id", "event_code", "video_time_s", "audio_time_s", "skew_ms", "confidence", ], ) writer.writeheader() for event in report.get("paired_events", []): writer.writerow({ "event_id": event.get("event_id"), "server_event_id": event.get("server_event_id"), "event_code": event.get("event_code"), "video_time_s": event.get("video_time_s"), "audio_time_s": event.get("audio_time_s"), "skew_ms": event.get("skew_ms"), "confidence": event.get("confidence"), }) print(summary, end="") PY else if [[ ! -f "${LOCAL_CAPTURE}" ]]; then echo "capture was not fetched and REMOTE_ANALYZE=0 left nothing local to analyze" >&2 exit 93 fi echo "==> analyzing capture" analysis_window_arg="$(compute_analysis_window_arg)" if [[ -n "${analysis_window_arg}" ]]; then echo " ↪ analyzer timeline window: ${analysis_window_arg#--analysis-window-s }" fi ( cd "${REPO_ROOT}" # shellcheck disable=SC2086 "${ANALYZE_BIN}" "${LOCAL_CAPTURE}" --report-dir "${LOCAL_REPORT_DIR}" --event-width-codes "${PROBE_EVENT_WIDTH_CODES}" ${analysis_window_arg} ) fi write_output_delay_correlation write_output_delay_calibration maybe_apply_output_delay_calibration maybe_run_output_delay_confirmation enforce_sync_verdict if [[ "${capture_v4l2_fault}" -eq 1 ]]; then echo "warning: Tethys video capture reported VIDIOC_QBUF / Bad file descriptor; treat unstable skew or analyzer failures as host-capture suspect" >&2 fi echo "==> done" echo "artifact_dir: ${LOCAL_REPORT_DIR}" if [[ -f "${LOCAL_CAPTURE}" ]]; then echo "capture: ${LOCAL_CAPTURE}" fi if [[ -f "${LOCAL_ANALYSIS_JSON}" ]]; then echo "report_json: ${LOCAL_ANALYSIS_JSON}" fi if [[ -f "${LOCAL_ANALYSIS_ERROR_LOG}" ]]; then echo "analysis_error_log: ${LOCAL_ANALYSIS_ERROR_LOG}" fi if [[ -f "${LOCAL_REPORT_TXT}" ]]; then echo "report_txt: ${LOCAL_REPORT_TXT}" fi if [[ -f "${LOCAL_EVENTS_CSV}" ]]; then echo "events_csv: ${LOCAL_EVENTS_CSV}" fi if [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]]; then echo "server_timeline_json: ${LOCAL_SERVER_TIMELINE_JSON}" fi if [[ -f "${LOCAL_SIGNAL_CONDITIONING_REPLY}" ]]; then echo "signal_conditioning_reply: ${LOCAL_SIGNAL_CONDITIONING_REPLY}" fi if [[ -f "${LOCAL_SIGNAL_CONDITIONING_TIMELINE_JSON}" ]]; then echo "signal_conditioning_timeline_json: ${LOCAL_SIGNAL_CONDITIONING_TIMELINE_JSON}" fi if [[ -f "${LOCAL_CLOCK_ALIGNMENT_JSON}" ]]; then echo "clock_alignment_json: ${LOCAL_CLOCK_ALIGNMENT_JSON}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" ]]; then echo "output_delay_correlation_json: ${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" ]]; then echo "output_delay_correlation_csv: ${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" ]]; then echo "output_delay_correlation_txt: ${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_JSON}" ]]; then echo "output_delay_calibration_json: ${LOCAL_OUTPUT_DELAY_JSON}" fi if [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]]; then echo "output_delay_calibration_env: ${LOCAL_OUTPUT_DELAY_ENV}" fi if [[ -f "${LOCAL_CAPTURE_LOG}" ]]; then echo "capture_log: ${LOCAL_CAPTURE_LOG}" fi