lesavka/scripts/manual/run_upstream_av_sync.sh

3278 lines
122 KiB
Bash
Executable File

#!/usr/bin/env bash
# scripts/manual/run_upstream_av_sync.sh
# Manual: upstream A/V sync hardware probe; not part of CI.
#
# Manual: capture the real Tethys UVC/UAC endpoints while the Lesavka server
# generates paired probe media locally and feeds its own UVC/UAC output sinks.
# This intentionally measures only the server-to-host output-device skew.
set -euo pipefail
SCRIPT_DIR="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../.." >/dev/null 2>&1 && pwd)"
TETHYS_HOST=${TETHYS_HOST:-tethys}
LESAVKA_SERVER_HOST=${LESAVKA_SERVER_HOST:-theia}
LESAVKA_SERVER_CONNECT_HOST=${LESAVKA_SERVER_CONNECT_HOST:-38.28.125.112}
LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-auto}
LESAVKA_SERVER_SCHEME=${LESAVKA_SERVER_SCHEME:-https}
LESAVKA_TLS_DOMAIN=${LESAVKA_TLS_DOMAIN:-lesavka-server}
PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-20}
PROBE_WARMUP_SECONDS=${PROBE_WARMUP_SECONDS:-4}
PROBE_START_GRACE_SECONDS=${PROBE_START_GRACE_SECONDS:-20}
PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + PROBE_START_GRACE_SECONDS))}
PROBE_PULSE_PERIOD_MS=${PROBE_PULSE_PERIOD_MS:-1000}
PROBE_PULSE_WIDTH_MS=${PROBE_PULSE_WIDTH_MS:-120}
PROBE_EVENT_WIDTH_CODES=${PROBE_EVENT_WIDTH_CODES:-1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16}
LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US:-0}
LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US:-0}
# Do not open the UVC host capture far ahead of the probe. The gadget side only
# has frames once the sync probe is feeding the server, and some hosts time out
# VIDIOC_STREAMON if the camera is starved during pre-roll.
LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0}
TAIL_SECONDS=${TAIL_SECONDS:-2}
CAPTURE_SECONDS=${CAPTURE_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + PROBE_START_GRACE_SECONDS + LEAD_IN_SECONDS + TAIL_SECONDS))}
LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-/tmp}
REMOTE_VIDEO_DEVICE=${REMOTE_VIDEO_DEVICE:-auto}
VIDEO_SIZE=${VIDEO_SIZE:-auto}
VIDEO_FPS=${VIDEO_FPS:-auto}
VIDEO_FORMAT=${VIDEO_FORMAT:-mjpeg}
REMOTE_CAPTURE_STACK=${REMOTE_CAPTURE_STACK:-pulse}
REMOTE_PULSE_CAPTURE_TOOL=${REMOTE_PULSE_CAPTURE_TOOL:-gst}
REMOTE_PULSE_VIDEO_MODE=${REMOTE_PULSE_VIDEO_MODE:-copy}
REMOTE_PULSE_AUDIO_ANCHOR_SILENCE=${REMOTE_PULSE_AUDIO_ANCHOR_SILENCE:-1}
REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto}
REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto}
REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK=${REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK:-0}
REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS=${REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS:-0}
REMOTE_CAPTURE_READY_SETTLE_SECONDS=${REMOTE_CAPTURE_READY_SETTLE_SECONDS:-1}
ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-0}
ANALYSIS_SCALE_WIDTH=${ANALYSIS_SCALE_WIDTH:-1280}
ANALYSIS_TIMELINE_WINDOW=${ANALYSIS_TIMELINE_WINDOW:-0}
ANALYSIS_TIMELINE_WINDOW_PADDING_SECONDS=${ANALYSIS_TIMELINE_WINDOW_PADDING_SECONDS:-1.0}
SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=30"}
PROBE_PREBUILD=${PROBE_PREBUILD:-1}
ANALYZE_BIN=${ANALYZE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-analyze"}
REMOTE_ANALYZE=${REMOTE_ANALYZE:-1}
REMOTE_ANALYZE_BIN=${REMOTE_ANALYZE_BIN:-/tmp/lesavka-sync-analyze}
REMOTE_ANALYZE_COPY=${REMOTE_ANALYZE_COPY:-1}
FETCH_CAPTURE=${FETCH_CAPTURE:-1}
REMOTE_SERVER_PREFLIGHT=${REMOTE_SERVER_PREFLIGHT:-1}
REMOTE_EXPECT_CAM_OUTPUT=${REMOTE_EXPECT_CAM_OUTPUT:-uvc}
REMOTE_EXPECT_UVC_CODEC=${REMOTE_EXPECT_UVC_CODEC:-mjpeg}
REMOTE_EXPECT_UVC_WIDTH=${REMOTE_EXPECT_UVC_WIDTH:-}
REMOTE_EXPECT_UVC_HEIGHT=${REMOTE_EXPECT_UVC_HEIGHT:-}
REMOTE_EXPECT_UVC_FPS=${REMOTE_EXPECT_UVC_FPS:-}
LESAVKA_OUTPUT_DELAY_CALIBRATION=${LESAVKA_OUTPUT_DELAY_CALIBRATION:-1}
LESAVKA_OUTPUT_DELAY_APPLY=${LESAVKA_OUTPUT_DELAY_APPLY:-0}
LESAVKA_OUTPUT_DELAY_APPLY_MODE=${LESAVKA_OUTPUT_DELAY_APPLY_MODE:-absolute}
LESAVKA_OUTPUT_DELAY_CONFIRM=${LESAVKA_OUTPUT_DELAY_CONFIRM:-1}
LESAVKA_OUTPUT_DELAY_SAVE=${LESAVKA_OUTPUT_DELAY_SAVE:-0}
LESAVKA_OUTPUT_REQUIRE_SYNC_PASS=${LESAVKA_OUTPUT_REQUIRE_SYNC_PASS:-0}
LESAVKA_OUTPUT_DELAY_TARGET=${LESAVKA_OUTPUT_DELAY_TARGET:-video}
LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-13}
LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000}
LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80}
LESAVKA_OUTPUT_DELAY_GAIN=${LESAVKA_OUTPUT_DELAY_GAIN:-1.0}
LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000}
LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS:-1000}
LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS:-100}
LESAVKA_OUTPUT_FRESHNESS_MAX_CLOCK_UNCERTAINTY_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_CLOCK_UNCERTAINTY_MS:-250}
LESAVKA_CLOCK_ALIGNMENT_SAMPLES=${LESAVKA_CLOCK_ALIGNMENT_SAMPLES:-5}
CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__"
STAMP="$(date +%Y%m%d-%H%M%S)"
REMOTE_CAPTURE=${REMOTE_CAPTURE:-"/tmp/lesavka-output-delay-probe-${STAMP}.mkv"}
LOCAL_REPORT_DIR="${LOCAL_OUTPUT_DIR%/}/lesavka-output-delay-probe-${STAMP}"
LOCAL_CAPTURE="${LOCAL_REPORT_DIR}/capture.mkv"
LOCAL_ANALYSIS_JSON="${LOCAL_REPORT_DIR}/report.json"
LOCAL_REPORT_TXT="${LOCAL_REPORT_DIR}/report.txt"
LOCAL_EVENTS_CSV="${LOCAL_REPORT_DIR}/events.csv"
LOCAL_SERVER_PROBE_REPLY="${LOCAL_REPORT_DIR}/server-output-probe-reply.txt"
LOCAL_SERVER_TIMELINE_JSON="${LOCAL_REPORT_DIR}/server-output-timeline.json"
LOCAL_OUTPUT_DELAY_CORRELATION_JSON="${LOCAL_REPORT_DIR}/output-delay-correlation.json"
LOCAL_OUTPUT_DELAY_CORRELATION_CSV="${LOCAL_REPORT_DIR}/output-delay-correlation.csv"
LOCAL_OUTPUT_DELAY_CORRELATION_TXT="${LOCAL_REPORT_DIR}/output-delay-correlation.txt"
LOCAL_CLOCK_ALIGNMENT_JSON="${LOCAL_REPORT_DIR}/clock-alignment.json"
LOCAL_OUTPUT_DELAY_JSON="${LOCAL_REPORT_DIR}/output-delay-calibration.json"
LOCAL_OUTPUT_DELAY_ENV="${LOCAL_REPORT_DIR}/output-delay-calibration.env"
LOCAL_CAPTURE_LOG="${LOCAL_REPORT_DIR}/capture.log"
mkdir -p "${LOCAL_REPORT_DIR}"
RESOLVED_LESAVKA_SERVER_ADDR=""
SERVER_TUNNEL_PID=""
SERVER_TUNNEL_REMOTE_PORT=""
SERVER_TUNNEL_LOCAL_PORT=""
cleanup_server_tunnel() {
if [[ -z "${SERVER_TUNNEL_PID}" ]]; then
return 0
fi
if kill -0 "${SERVER_TUNNEL_PID}" >/dev/null 2>&1; then
kill "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true
wait "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true
fi
}
trap cleanup_server_tunnel EXIT
pick_local_server_tunnel_port() {
python3 - <<'PY'
import socket
with socket.socket() as sock:
sock.bind(("127.0.0.1", 0))
print(sock.getsockname()[1])
PY
}
sample_host_clock_offset_ns() {
local host=$1
local before_ns remote_ns after_ns
before_ns="$(date +%s%N)"
remote_ns="$(ssh ${SSH_OPTS} "${host}" 'date +%s%N')"
after_ns="$(date +%s%N)"
python3 - <<'PY' "${before_ns}" "${remote_ns}" "${after_ns}"
import sys
before_ns, remote_ns, after_ns = (int(value) for value in sys.argv[1:])
local_mid_ns = (before_ns + after_ns) // 2
print(f"{remote_ns - local_mid_ns} {(after_ns - before_ns) // 2} {after_ns - before_ns}")
PY
}
sample_best_host_clock_offset_ns() {
local host=$1
local samples=$2
if python3 - <<'PY' "${host}" "${samples}" "${SSH_OPTS}"; then
import shlex
import subprocess
import sys
import time
host = sys.argv[1]
try:
samples = max(1, int(sys.argv[2]))
except Exception:
samples = 5
ssh_opts = shlex.split(sys.argv[3])
remote_code = (
"import sys,time\n"
"for _line in sys.stdin:\n"
" print(time.time_ns(), flush=True)\n"
)
cmd = [
"ssh",
*ssh_opts,
host,
"python3 -u -c " + shlex.quote(remote_code),
]
try:
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
bufsize=1,
)
except Exception:
raise SystemExit(1)
rows = []
try:
for _ in range(samples):
if proc.stdin is None or proc.stdout is None:
break
before_ns = time.time_ns()
try:
proc.stdin.write("x\n")
proc.stdin.flush()
line = proc.stdout.readline()
except (BrokenPipeError, OSError):
break
after_ns = time.time_ns()
if not line:
break
try:
remote_ns = int(line.strip())
except Exception:
continue
local_mid_ns = (before_ns + after_ns) // 2
rtt_ns = after_ns - before_ns
rows.append((rtt_ns // 2, remote_ns - local_mid_ns, rtt_ns))
finally:
try:
if proc.stdin is not None:
proc.stdin.close()
except Exception:
pass
try:
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
if not rows:
raise SystemExit(1)
uncertainty_ns, offset_ns, rtt_ns = min(rows)
print(f"{offset_ns} {uncertainty_ns} {rtt_ns} {len(rows)} persistent-ssh-python")
PY
return 0
fi
local tmp
tmp="$(mktemp)"
local i=0
while (( i < samples )); do
sample_host_clock_offset_ns "${host}" >>"${tmp}" 2>/dev/null || true
((i += 1))
done
python3 - <<'PY' "${tmp}"
import pathlib
import sys
rows = []
for line in pathlib.Path(sys.argv[1]).read_text().splitlines():
try:
offset_ns, uncertainty_ns, rtt_ns = (int(value) for value in line.split())
except Exception:
continue
rows.append((uncertainty_ns, offset_ns, rtt_ns))
if not rows:
raise SystemExit(1)
uncertainty_ns, offset_ns, rtt_ns = min(rows)
print(f"{offset_ns} {uncertainty_ns} {rtt_ns} {len(rows)} ssh-date")
PY
local rc=$?
rm -f "${tmp}"
return "${rc}"
}
sample_server_to_capture_clock_offset_ns() {
local server_host=$1
local capture_host=$2
local samples=$3
python3 - <<'PY' "${server_host}" "${capture_host}" "${samples}" "${SSH_OPTS}"
import shlex
import subprocess
import sys
server_host = sys.argv[1]
capture_host = sys.argv[2]
try:
samples = max(1, int(sys.argv[3]))
except Exception:
samples = 5
ssh_opts_text = sys.argv[4]
ssh_opts = shlex.split(ssh_opts_text)
remote_code = r'''
import shlex
import subprocess
import sys
import time
capture_host = sys.argv[1]
samples = max(1, int(sys.argv[2]))
ssh_opts = shlex.split(sys.argv[3])
capture_code = (
"import sys,time\n"
"for _line in sys.stdin:\n"
" print(time.time_ns(), flush=True)\n"
)
cmd = [
"ssh",
*ssh_opts,
capture_host,
"python3 -u -c " + shlex.quote(capture_code),
]
try:
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
bufsize=1,
)
except Exception:
raise SystemExit(1)
rows = []
try:
for _ in range(samples):
if proc.stdin is None or proc.stdout is None:
break
before_ns = time.time_ns()
try:
proc.stdin.write("x\n")
proc.stdin.flush()
line = proc.stdout.readline()
except (BrokenPipeError, OSError):
break
after_ns = time.time_ns()
if not line:
break
try:
remote_ns = int(line.strip())
except Exception:
continue
local_mid_ns = (before_ns + after_ns) // 2
rtt_ns = after_ns - before_ns
rows.append((rtt_ns // 2, remote_ns - local_mid_ns, rtt_ns))
finally:
try:
if proc.stdin is not None:
proc.stdin.close()
except Exception:
pass
try:
proc.wait(timeout=2)
except Exception:
try:
proc.kill()
except Exception:
pass
if not rows:
raise SystemExit(1)
uncertainty_ns, offset_ns, rtt_ns = min(rows)
print(f"{offset_ns} {uncertainty_ns} {rtt_ns} {len(rows)} server-to-capture-persistent-ssh-python")
'''
remote_cmd = (
"python3 -u -c "
+ shlex.quote(remote_code)
+ " "
+ shlex.quote(capture_host)
+ " "
+ shlex.quote(str(samples))
+ " "
+ shlex.quote(ssh_opts_text)
)
result = subprocess.run(
["ssh", *ssh_opts, server_host, remote_cmd],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
if result.returncode != 0:
raise SystemExit(result.returncode)
for line in result.stdout.splitlines():
if line.strip():
print(line.strip())
raise SystemExit(0)
raise SystemExit(1)
PY
}
write_clock_alignment() {
echo "==> sampling Theia/Tethys clock alignment for freshness"
local direct_sample
if direct_sample="$(sample_server_to_capture_clock_offset_ns "${LESAVKA_SERVER_HOST}" "${TETHYS_HOST}" "${LESAVKA_CLOCK_ALIGNMENT_SAMPLES}")"; then
python3 - <<'PY' \
"${LESAVKA_SERVER_HOST}" \
"${TETHYS_HOST}" \
"${direct_sample}" \
"${LOCAL_CLOCK_ALIGNMENT_JSON}"
import json
import pathlib
import sys
server_host, capture_host, sample, output_path = sys.argv[1:]
parts = sample.split()
offset_ns, uncertainty_ns, rtt_ns, sample_count = (int(value) for value in parts[:4])
method = parts[4] if len(parts) > 4 else "server-to-capture-ssh"
artifact = {
"schema": "lesavka.clock-alignment.v1",
"available": True,
"method": "server host to capture host persistent ssh midpoint",
"server_host": server_host,
"capture_host": capture_host,
"server_clock_offset_from_local_ns": None,
"capture_clock_offset_from_local_ns": None,
"theia_to_tethys_offset_ns": offset_ns,
"uncertainty_ns": uncertainty_ns,
"uncertainty_ms": uncertainty_ns / 1_000_000.0,
"server_sample_rtt_ns": 0,
"capture_sample_rtt_ns": rtt_ns,
"server_samples": sample_count,
"capture_samples": sample_count,
"server_sample_method": "server-host-local-clock",
"capture_sample_method": method,
}
pathlib.Path(output_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n")
print(f" ↪ theia_to_tethys_offset_ms={offset_ns / 1_000_000.0:+.3f}")
print(f" ↪ clock_alignment_uncertainty_ms={uncertainty_ns / 1_000_000.0:.3f}")
PY
return 0
fi
echo " ↪ server-to-capture clock alignment unavailable; falling back to client-mediated SSH samples"
local theia_sample tethys_sample
if ! theia_sample="$(sample_best_host_clock_offset_ns "${LESAVKA_SERVER_HOST}" "${LESAVKA_CLOCK_ALIGNMENT_SAMPLES}")"; then
echo " ↪ clock alignment unavailable: failed to sample ${LESAVKA_SERVER_HOST}"
printf '{"schema":"lesavka.clock-alignment.v1","available":false,"reason":"failed to sample server host"}\n' >"${LOCAL_CLOCK_ALIGNMENT_JSON}"
return 0
fi
if ! tethys_sample="$(sample_best_host_clock_offset_ns "${TETHYS_HOST}" "${LESAVKA_CLOCK_ALIGNMENT_SAMPLES}")"; then
echo " ↪ clock alignment unavailable: failed to sample ${TETHYS_HOST}"
printf '{"schema":"lesavka.clock-alignment.v1","available":false,"reason":"failed to sample capture host"}\n' >"${LOCAL_CLOCK_ALIGNMENT_JSON}"
return 0
fi
python3 - <<'PY' \
"${LESAVKA_SERVER_HOST}" \
"${TETHYS_HOST}" \
"${theia_sample}" \
"${tethys_sample}" \
"${LOCAL_CLOCK_ALIGNMENT_JSON}"
import json
import pathlib
import sys
server_host, capture_host, server_sample, capture_sample, output_path = sys.argv[1:]
server_parts = server_sample.split()
capture_parts = capture_sample.split()
server_offset_ns, server_uncertainty_ns, server_rtt_ns, server_samples = (int(value) for value in server_parts[:4])
capture_offset_ns, capture_uncertainty_ns, capture_rtt_ns, capture_samples = (int(value) for value in capture_parts[:4])
server_method = server_parts[4] if len(server_parts) > 4 else "ssh-date"
capture_method = capture_parts[4] if len(capture_parts) > 4 else "ssh-date"
theia_to_tethys_offset_ns = capture_offset_ns - server_offset_ns
uncertainty_ns = server_uncertainty_ns + capture_uncertainty_ns
artifact = {
"schema": "lesavka.clock-alignment.v1",
"available": True,
"method": "persistent ssh midpoint" if server_method == capture_method == "persistent-ssh-python" else "ssh midpoint",
"server_host": server_host,
"capture_host": capture_host,
"server_clock_offset_from_local_ns": server_offset_ns,
"capture_clock_offset_from_local_ns": capture_offset_ns,
"theia_to_tethys_offset_ns": theia_to_tethys_offset_ns,
"uncertainty_ns": uncertainty_ns,
"uncertainty_ms": uncertainty_ns / 1_000_000.0,
"server_sample_rtt_ns": server_rtt_ns,
"capture_sample_rtt_ns": capture_rtt_ns,
"server_samples": server_samples,
"capture_samples": capture_samples,
"server_sample_method": server_method,
"capture_sample_method": capture_method,
}
pathlib.Path(output_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n")
print(
f" ↪ theia_to_tethys_offset_ms={theia_to_tethys_offset_ns / 1_000_000.0:+.3f}"
)
print(f" ↪ clock_alignment_uncertainty_ms={uncertainty_ns / 1_000_000.0:.3f}")
PY
}
wait_for_server_tunnel() {
local local_port=$1
local tries=50
local i=0
while (( i < tries )); do
if nc -z 127.0.0.1 "${local_port}" >/dev/null 2>&1; then
return 0
fi
if ! kill -0 "${SERVER_TUNNEL_PID}" >/dev/null 2>&1; then
wait "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 || true
echo "SSH tunnel to ${LESAVKA_SERVER_HOST} exited before becoming ready" >&2
exit 88
fi
sleep 0.1
((i += 1))
done
echo "SSH tunnel to ${LESAVKA_SERVER_HOST} did not become ready on localhost:${local_port}" >&2
exit 89
}
start_server_tunnel() {
local remote_port=$1
local local_port
local_port="$(pick_local_server_tunnel_port)"
echo "==> opening SSH tunnel to ${LESAVKA_SERVER_HOST}:127.0.0.1:${remote_port} on localhost:${local_port}"
ssh ${SSH_OPTS} -o ExitOnForwardFailure=yes \
-N \
-L "127.0.0.1:${local_port}:127.0.0.1:${remote_port}" \
"${LESAVKA_SERVER_HOST}" &
SERVER_TUNNEL_PID=$!
SERVER_TUNNEL_REMOTE_PORT="${remote_port}"
SERVER_TUNNEL_LOCAL_PORT="${local_port}"
wait_for_server_tunnel "${local_port}"
}
resolve_server_addr() {
if [[ "${LESAVKA_SERVER_ADDR}" != "auto" ]]; then
RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_ADDR}"
return 0
fi
local bind_addr port
bind_addr="$(
ssh ${SSH_OPTS} "${LESAVKA_SERVER_HOST}" \
"grep -E '^LESAVKA_SERVER_BIND_ADDR=' /etc/lesavka/server.env 2>/dev/null | tail -n1 | cut -d= -f2-" \
2>/dev/null || true
)"
port="${bind_addr##*:}"
if [[ "${port}" =~ ^[0-9]+$ ]]; then
start_server_tunnel "${port}"
RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}"
return 0
fi
start_server_tunnel "50051"
RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}"
}
preflight_server_path() {
[[ "${REMOTE_SERVER_PREFLIGHT}" != "0" ]] || return 0
echo "==> verifying Lesavka server path on ${LESAVKA_SERVER_HOST}"
ssh ${SSH_OPTS} "${LESAVKA_SERVER_HOST}" bash -s -- \
"${REMOTE_EXPECT_CAM_OUTPUT}" \
"${REMOTE_EXPECT_UVC_CODEC}" \
"${REMOTE_EXPECT_UVC_WIDTH}" \
"${REMOTE_EXPECT_UVC_HEIGHT}" \
"${REMOTE_EXPECT_UVC_FPS}" <<'REMOTE_PREFLIGHT'
set -euo pipefail
expect_cam_output=${1:-}
expect_uvc_codec=${2:-}
expect_uvc_width=${3:-}
expect_uvc_height=${4:-}
expect_uvc_fps=${5:-}
read_env_value() {
local key=$1
local file=$2
local value=""
value=$(grep -E "^${key}=" "$file" 2>/dev/null | tail -n1 | cut -d= -f2- || true)
printf '%s\n' "$value"
}
cam_output=$(read_env_value "LESAVKA_CAM_OUTPUT" /etc/lesavka/server.env)
server_uvc_codec=$(read_env_value "LESAVKA_UVC_CODEC" /etc/lesavka/server.env)
runtime_uvc_codec=$(read_env_value "LESAVKA_UVC_CODEC" /etc/lesavka/uvc.env)
server_uvc_width=$(read_env_value "LESAVKA_UVC_WIDTH" /etc/lesavka/server.env)
server_uvc_height=$(read_env_value "LESAVKA_UVC_HEIGHT" /etc/lesavka/server.env)
server_uvc_fps=$(read_env_value "LESAVKA_UVC_FPS" /etc/lesavka/server.env)
runtime_uvc_width=$(read_env_value "LESAVKA_UVC_WIDTH" /etc/lesavka/uvc.env)
runtime_uvc_height=$(read_env_value "LESAVKA_UVC_HEIGHT" /etc/lesavka/uvc.env)
runtime_uvc_fps=$(read_env_value "LESAVKA_UVC_FPS" /etc/lesavka/uvc.env)
effective_uvc_codec=${server_uvc_codec:-$runtime_uvc_codec}
effective_uvc_width=${server_uvc_width:-$runtime_uvc_width}
effective_uvc_height=${server_uvc_height:-$runtime_uvc_height}
effective_uvc_fps=${server_uvc_fps:-$runtime_uvc_fps}
printf ' ↪ server.env CAM_OUTPUT=%s\n' "${cam_output:-<unset>}"
printf ' ↪ server.env UVC_CODEC=%s\n' "${server_uvc_codec:-<unset>}"
printf ' ↪ uvc.env UVC_CODEC=%s\n' "${runtime_uvc_codec:-<unset>}"
printf ' ↪ server.env UVC_MODE=%sx%s@%s\n' "${server_uvc_width:-<unset>}" "${server_uvc_height:-<unset>}" "${server_uvc_fps:-<unset>}"
printf ' ↪ uvc.env UVC_MODE=%sx%s@%s\n' "${runtime_uvc_width:-<unset>}" "${runtime_uvc_height:-<unset>}" "${runtime_uvc_fps:-<unset>}"
printf ' ↪ effective UVC_MODE=%sx%s@%s\n' "${effective_uvc_width:-<unset>}" "${effective_uvc_height:-<unset>}" "${effective_uvc_fps:-<unset>}"
if [[ -n "${expect_cam_output}" && "${cam_output}" != "${expect_cam_output}" ]]; then
printf 'expected CAM_OUTPUT=%s but found %s\n' "${expect_cam_output}" "${cam_output:-<unset>}" >&2
exit 64
fi
if [[ -n "${expect_uvc_codec}" && "${effective_uvc_codec}" != "${expect_uvc_codec}" ]]; then
printf 'expected effective UVC_CODEC=%s but found %s\n' "${expect_uvc_codec}" "${effective_uvc_codec:-<unset>}" >&2
exit 65
fi
if [[ -n "${expect_uvc_codec}" && "${runtime_uvc_codec}" != "${expect_uvc_codec}" ]]; then
printf 'expected uvc.env UVC_CODEC=%s but found %s\n' "${expect_uvc_codec}" "${runtime_uvc_codec:-<unset>}" >&2
exit 66
fi
if [[ -n "${expect_uvc_width}" && "${effective_uvc_width}" != "${expect_uvc_width}" ]]; then
printf 'expected effective UVC_WIDTH=%s but found %s\n' "${expect_uvc_width}" "${effective_uvc_width:-<unset>}" >&2
exit 67
fi
if [[ -n "${expect_uvc_height}" && "${effective_uvc_height}" != "${expect_uvc_height}" ]]; then
printf 'expected effective UVC_HEIGHT=%s but found %s\n' "${expect_uvc_height}" "${effective_uvc_height:-<unset>}" >&2
exit 68
fi
if [[ -n "${expect_uvc_fps}" && "${effective_uvc_fps}" != "${expect_uvc_fps}" ]]; then
printf 'expected effective UVC_FPS=%s but found %s\n' "${expect_uvc_fps}" "${effective_uvc_fps:-<unset>}" >&2
exit 69
fi
if [[ -n "${expect_uvc_width}" && "${runtime_uvc_width}" != "${expect_uvc_width}" ]]; then
printf 'expected uvc.env UVC_WIDTH=%s but found %s\n' "${expect_uvc_width}" "${runtime_uvc_width:-<unset>}" >&2
exit 70
fi
if [[ -n "${expect_uvc_height}" && "${runtime_uvc_height}" != "${expect_uvc_height}" ]]; then
printf 'expected uvc.env UVC_HEIGHT=%s but found %s\n' "${expect_uvc_height}" "${runtime_uvc_height:-<unset>}" >&2
exit 71
fi
if [[ -n "${expect_uvc_fps}" && "${runtime_uvc_fps}" != "${expect_uvc_fps}" ]]; then
printf 'expected uvc.env UVC_FPS=%s but found %s\n' "${expect_uvc_fps}" "${runtime_uvc_fps:-<unset>}" >&2
exit 72
fi
systemctl is-active lesavka-server lesavka-uvc lesavka-core >/dev/null
REMOTE_PREFLIGHT
}
print_lesavka_versions() {
echo "==> Lesavka versions under test"
if [[ ! -x "${REPO_ROOT}/target/debug/lesavka-relayctl" ]]; then
(cd "${REPO_ROOT}" && cargo build -p lesavka_client --bin lesavka-relayctl >/dev/null)
fi
local version_output
if ! version_output="$(
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
version 2>&1
)"; then
echo "failed to query Lesavka versions through ${RESOLVED_LESAVKA_SERVER_ADDR}" >&2
echo "${version_output}" >&2
return 1
fi
if ! grep -q "^client_version=" <<<"${version_output}"; then
echo "Lesavka version query did not report client_version=; refusing to run an unattributed probe" >&2
echo "${version_output}" >&2
return 1
fi
if grep -q "^client_full_version=" <<<"${version_output}"; then
echo "Lesavka version query reported a combined version+revision; refusing ambiguous probe attribution" >&2
echo "${version_output}" >&2
return 1
fi
if ! grep -q "^client_revision=" <<<"${version_output}"; then
echo "Lesavka version query did not report client_revision=; refusing to run an unattributed probe" >&2
echo "${version_output}" >&2
return 1
fi
if ! grep -q "^server_version=" <<<"${version_output}"; then
echo "Lesavka version query did not report server_version=; refusing to run an unattributed probe" >&2
echo "${version_output}" >&2
return 1
fi
if ! grep -q "^server_revision=" <<<"${version_output}"; then
echo "Lesavka version query did not report server_revision=; refusing to run an unattributed probe" >&2
echo "${version_output}" >&2
return 1
fi
while IFS= read -r line; do
[[ -n "${line}" ]] && echo "${line}"
done <<<"${version_output}"
}
write_output_delay_calibration() {
[[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0
[[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0
echo "==> deriving UVC/UAC output-delay calibration"
python3 - <<'PY' \
"${LOCAL_ANALYSIS_JSON}" \
"${LOCAL_OUTPUT_DELAY_JSON}" \
"${LOCAL_OUTPUT_DELAY_ENV}" \
"${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \
"${LESAVKA_OUTPUT_DELAY_TARGET}" \
"${LESAVKA_OUTPUT_DELAY_MIN_PAIRS}" \
"${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS}" \
"${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS}" \
"${LESAVKA_OUTPUT_DELAY_GAIN}" \
"${LESAVKA_OUTPUT_DELAY_MAX_STEP_US}" \
"${LESAVKA_OUTPUT_DELAY_APPLY}" \
"${LESAVKA_OUTPUT_DELAY_APPLY_MODE}" \
"${LESAVKA_OUTPUT_DELAY_SAVE}" \
"${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US}" \
"${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US}"
import json
import math
import pathlib
import shlex
import sys
(
report_path,
output_json_path,
output_env_path,
correlation_path,
target,
min_pairs_raw,
max_abs_skew_raw,
max_drift_raw,
gain_raw,
max_step_raw,
apply_raw,
apply_mode_raw,
save_raw,
active_audio_raw,
active_video_raw,
) = sys.argv[1:]
def as_int(value, default):
try:
return int(str(value).strip())
except Exception:
return default
def as_float(value, default):
try:
result = float(str(value).strip())
except Exception:
return default
return result if math.isfinite(result) else default
def as_bool(value):
return str(value).strip().lower() not in {"", "0", "false", "no", "off"}
def env_line(key, value):
return f"{key}={shlex.quote(str(value))}\n"
report = json.loads(pathlib.Path(report_path).read_text())
verdict = report.get("verdict") or {}
try:
correlation = json.loads(pathlib.Path(correlation_path).read_text())
except Exception:
correlation = {}
freshness = correlation.get("freshness") or {}
capture_timebase = freshness.get("capture_timebase") or {}
target = target.strip().lower()
apply_mode = apply_mode_raw.strip().lower()
min_pairs = max(1, as_int(min_pairs_raw, 13))
max_abs_skew_ms = max(1.0, as_float(max_abs_skew_raw, 5000.0))
max_drift_ms = max(0.0, as_float(max_drift_raw, 80.0))
gain = min(max(as_float(gain_raw, 1.0), 0.01), 1.0)
max_step_us = max(1, as_int(max_step_raw, 1_500_000))
active_audio_offset_us = as_int(active_audio_raw, 0)
active_video_offset_us = as_int(active_video_raw, 0)
paired = as_int(report.get("paired_event_count"), 0)
median_skew_ms = as_float(report.get("median_skew_ms"), 0.0)
p95_abs_skew_ms = as_float(
verdict.get("p95_abs_skew_ms"),
as_float(report.get("max_abs_skew_ms"), 0.0),
)
max_abs_observed_ms = as_float(report.get("max_abs_skew_ms"), p95_abs_skew_ms)
drift_ms = as_float(report.get("drift_ms"), 0.0)
raw_device_delta_us = int(round(median_skew_ms * 1000.0))
scaled_delta_us = int(round(raw_device_delta_us * gain))
bounded_delta_us = max(-max_step_us, min(max_step_us, scaled_delta_us))
audio_delta_us = 0
video_delta_us = 0
refusal_reasons = []
if target == "video":
video_delta_us = bounded_delta_us
elif target == "audio":
audio_delta_us = -bounded_delta_us
else:
refusal_reasons.append(f"unsupported target {target!r}; use video or audio")
if apply_mode not in {"absolute", "relative"}:
refusal_reasons.append(
f"unsupported apply mode {apply_mode!r}; use absolute or relative"
)
if paired < min_pairs:
refusal_reasons.append(f"paired_event_count {paired} < {min_pairs}")
if max_abs_observed_ms > max_abs_skew_ms:
refusal_reasons.append(
f"max_abs_skew_ms {max_abs_observed_ms:.1f} > {max_abs_skew_ms:.1f}"
)
if abs(drift_ms) > max_drift_ms:
refusal_reasons.append(f"abs(drift_ms) {abs(drift_ms):.1f} > {max_drift_ms:.1f}")
if capture_timebase and capture_timebase.get("valid") is not True:
refusal_reasons.append(
"capture timebase invalid for calibration: "
f"{capture_timebase.get('status', 'unknown')} - {capture_timebase.get('reason', '')}"
)
elif freshness.get("status") == "invalid":
refusal_reasons.append(
f"freshness invalid for calibration: {freshness.get('reason', '')}"
)
audio_target_offset_us = active_audio_offset_us + audio_delta_us
video_target_offset_us = active_video_offset_us + video_delta_us
ready = not refusal_reasons
decision = "ready" if ready else "refused"
note = (
"direct UVC/UAC output-delay calibration: "
f"median device skew {median_skew_ms:+.1f}ms, target={target}, "
f"audio {active_audio_offset_us:+d}->{audio_target_offset_us:+d}us "
f"(delta {audio_delta_us:+d}us), "
f"video {active_video_offset_us:+d}->{video_target_offset_us:+d}us "
f"(delta {video_delta_us:+d}us)"
)
if not ready:
note = f"direct UVC/UAC output-delay calibration refused: {'; '.join(refusal_reasons)}"
artifact = {
"schema": "lesavka.output-delay-calibration.v1",
"source": "direct-uvc-uac-output-probe",
"scope": "server-output-static-baseline",
"applies_to": "server UVC/UAC gadget output path",
"measurement_host_role": "lab-attached USB host",
"probe_media_origin": "server-generated",
"probe_media_path": "server generated signatures -> UVC/UAC sinks -> lab host capture",
"injection_scope": "server-final-output-handoff",
"server_pipeline_reference": "generated media PTS before intentional sync delay",
"sink_handoff_path": "video CameraRelay::feed; audio Voice::push",
"client_uplink_included": False,
"report_json": report_path,
"correlation_json": correlation_path if pathlib.Path(correlation_path).exists() else "",
"audio_after_video_positive": True,
"target": target,
"ready": ready,
"decision": decision,
"apply_enabled": as_bool(apply_raw),
"apply_mode": apply_mode,
"save_enabled": as_bool(save_raw),
"paired_event_count": paired,
"min_pairs": min_pairs,
"measured_device_skew_ms": median_skew_ms,
"p95_abs_skew_ms": p95_abs_skew_ms,
"max_abs_skew_ms": max_abs_observed_ms,
"max_abs_skew_limit_ms": max_abs_skew_ms,
"drift_ms": drift_ms,
"max_drift_ms": max_drift_ms,
"gain": gain,
"max_step_us": max_step_us,
"active_audio_offset_us": active_audio_offset_us,
"active_video_offset_us": active_video_offset_us,
"raw_device_delta_us": raw_device_delta_us,
"bounded_device_delta_us": bounded_delta_us,
"audio_offset_adjust_us": audio_delta_us,
"video_offset_adjust_us": video_delta_us,
"audio_target_offset_us": audio_target_offset_us,
"video_target_offset_us": video_target_offset_us,
"refusal_reasons": refusal_reasons,
"note": note,
}
pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n")
env_values = {
"output_delay_ready": str(ready).lower(),
"output_delay_decision": decision,
"output_delay_target": target,
"output_delay_audio_delta_us": audio_delta_us,
"output_delay_video_delta_us": video_delta_us,
"output_delay_active_audio_offset_us": active_audio_offset_us,
"output_delay_active_video_offset_us": active_video_offset_us,
"output_delay_audio_target_offset_us": audio_target_offset_us,
"output_delay_video_target_offset_us": video_target_offset_us,
"output_delay_measured_skew_ms": f"{median_skew_ms:.3f}",
"output_delay_paired_event_count": paired,
"output_delay_drift_ms": f"{drift_ms:.3f}",
"output_delay_apply_mode": apply_mode,
"output_delay_note": note,
}
with pathlib.Path(output_env_path).open("w") as handle:
for key, value in env_values.items():
handle.write(env_line(key, value))
PY
}
extract_server_timeline() {
[[ -f "${LOCAL_SERVER_PROBE_REPLY}" ]] || return 0
python3 - <<'PY' "${LOCAL_SERVER_PROBE_REPLY}" "${LOCAL_SERVER_TIMELINE_JSON}"
import json
import pathlib
import sys
reply_path = pathlib.Path(sys.argv[1])
timeline_path = pathlib.Path(sys.argv[2])
prefix = "server_timeline_json="
raw = ""
for line in reply_path.read_text(errors="replace").splitlines():
if line.startswith(prefix):
raw = line[len(prefix):].strip()
if not raw:
raise SystemExit(0)
timeline = json.loads(raw)
timeline_path.write_text(json.dumps(timeline, indent=2, sort_keys=True) + "\n")
PY
}
compute_analysis_window_arg() {
[[ "${ANALYSIS_TIMELINE_WINDOW}" != "0" ]] || return 0
[[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]] || return 0
[[ -f "${LOCAL_CAPTURE_LOG}" ]] || return 0
[[ -f "${LOCAL_CLOCK_ALIGNMENT_JSON}" ]] || return 0
python3 - <<'PY' \
"${LOCAL_SERVER_TIMELINE_JSON}" \
"${LOCAL_CAPTURE_LOG}" \
"${LOCAL_CLOCK_ALIGNMENT_JSON}" \
"${ANALYSIS_TIMELINE_WINDOW_PADDING_SECONDS}" || true
import json
import math
import pathlib
import sys
timeline_path, capture_log_path, clock_path, padding_raw = sys.argv[1:]
def as_int(value):
try:
return int(str(value).strip())
except Exception:
return None
def as_float(value, default):
try:
parsed = float(str(value).strip())
except Exception:
return default
return parsed if math.isfinite(parsed) else default
def capture_start_ns(path):
try:
lines = pathlib.Path(path).read_text(errors="replace").splitlines()
except Exception:
return None
for line in lines:
if line.startswith("capture_start_unix_ns="):
return as_int(line.split("=", 1)[1])
return None
try:
timeline = json.loads(pathlib.Path(timeline_path).read_text())
clock = json.loads(pathlib.Path(clock_path).read_text())
except Exception:
raise SystemExit(0)
capture_start = capture_start_ns(capture_log_path)
offset = as_int(clock.get("theia_to_tethys_offset_ns")) if clock.get("available") else None
if capture_start is None or offset is None:
raise SystemExit(0)
times = []
for event in timeline.get("events", []):
for key in ("audio_push_unix_ns", "video_feed_unix_ns"):
event_ns = as_int(event.get(key))
if event_ns is not None:
times.append((event_ns + offset - capture_start) / 1_000_000_000.0)
if not times:
raise SystemExit(0)
padding = max(0.0, as_float(padding_raw, 1.0))
start = max(0.0, min(times) - padding)
end = max(times) + padding
if end <= start:
raise SystemExit(0)
print(f"--analysis-window-s {start:.3f}:{end:.3f}")
PY
}
write_output_delay_correlation() {
[[ -f "${LOCAL_ANALYSIS_JSON}" ]] || return 0
[[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]] || return 0
echo "==> correlating Theia feed timing with Tethys observations"
python3 - <<'PY' \
"${LOCAL_ANALYSIS_JSON}" \
"${LOCAL_SERVER_TIMELINE_JSON}" \
"${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" \
"${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" \
"${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" \
"${LOCAL_CAPTURE}" \
"${LOCAL_CAPTURE_LOG}" \
"${LOCAL_CLOCK_ALIGNMENT_JSON}" \
"${LESAVKA_OUTPUT_FRESHNESS_MAX_AGE_MS}" \
"${LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS}" \
"${LESAVKA_OUTPUT_FRESHNESS_MAX_CLOCK_UNCERTAINTY_MS}"
import csv
import json
import math
import pathlib
import statistics
import struct
import subprocess
import sys
(
report_path,
timeline_path,
output_json_path,
output_csv_path,
output_txt_path,
capture_path,
capture_log_path,
clock_alignment_path,
max_freshness_age_raw,
max_freshness_drift_raw,
max_clock_uncertainty_raw,
) = sys.argv[1:]
report = json.loads(pathlib.Path(report_path).read_text())
timeline = json.loads(pathlib.Path(timeline_path).read_text())
server_event_list = timeline.get("events", [])
server_events = {int(event["event_id"]): event for event in server_event_list}
def finite(value):
try:
result = float(value)
except Exception:
return None
return result if math.isfinite(result) else None
def as_float(value, default):
try:
result = float(str(value).strip())
except Exception:
return default
return result if math.isfinite(result) else default
def as_int_or_none(value):
try:
return int(str(value).strip())
except Exception:
return None
server_events_by_code = {}
for event in server_event_list:
code = as_int_or_none(event.get("code"))
if code is None:
continue
server_events_by_code.setdefault(code, []).append(event)
unique_server_events_by_code = {
code: events[0] for code, events in server_events_by_code.items() if len(events) == 1
}
def load_json_or_empty(path):
try:
return json.loads(pathlib.Path(path).read_text())
except Exception:
return {}
def run_json(command, description):
try:
output = subprocess.check_output(command, stderr=subprocess.PIPE)
except Exception as error:
return {"available": False, "error": f"{description} failed: {error}"}
try:
return json.loads(output)
except Exception as error:
return {"available": False, "error": f"{description} JSON parse failed: {error}"}
def run_bytes(command, description):
try:
return subprocess.check_output(command, stderr=subprocess.PIPE)
except Exception as error:
raise RuntimeError(f"{description} failed: {error}") from error
def parse_capture_start_unix_ns(path):
try:
lines = pathlib.Path(path).read_text(errors="replace").splitlines()
except Exception:
return None
for line in lines:
if line.startswith("capture_start_unix_ns="):
return as_int_or_none(line.split("=", 1)[1])
return None
def percentile(values, pct):
values = sorted(value for value in values if value is not None and math.isfinite(value))
if not values:
return None
if len(values) == 1:
return values[0]
rank = (len(values) - 1) * (pct / 100.0)
lower = math.floor(rank)
upper = math.ceil(rank)
if lower == upper:
return values[int(rank)]
fraction = rank - lower
return values[lower] * (1.0 - fraction) + values[upper] * fraction
def numeric_stats(values, suffix=""):
values = [value for value in values if value is not None and math.isfinite(value)]
if not values:
return {
"available": False,
"count": 0,
f"median{suffix}": None,
f"p95{suffix}": None,
f"max{suffix}": None,
}
return {
"available": True,
"count": len(values),
f"median{suffix}": percentile(values, 50.0),
f"p95{suffix}": percentile(values, 95.0),
f"max{suffix}": max(values),
}
def stats(rows, key):
values = [row.get(key) for row in rows if row.get(key) is not None]
values = [value for value in values if math.isfinite(value)]
if not values:
return {
"available": False,
"count": 0,
"first_ms": None,
"last_ms": None,
"mean_ms": None,
"min_ms": None,
"median_ms": None,
"p95_ms": None,
"max_ms": None,
}
return {
"available": True,
"count": len(values),
"first_ms": values[0],
"last_ms": values[-1],
"mean_ms": sum(values) / len(values),
"min_ms": min(values),
"median_ms": percentile(values, 50.0),
"p95_ms": percentile(values, 95.0),
"max_ms": max(values),
}
def shift_stats(base, delta_ms):
shifted = dict(base)
if not shifted.get("available"):
return shifted
for key in ["first_ms", "last_ms", "mean_ms", "min_ms", "median_ms", "p95_ms", "max_ms"]:
value = shifted.get(key)
shifted[key] = value + delta_ms if value is not None else None
shifted["intentional_delay_ms"] = delta_ms
return shifted
def fit_linear(rows, key):
points = [(row["event_time_s"], row[key]) for row in rows if row.get(key) is not None]
if len(points) < 2:
return {
"available": False,
"intercept_ms": 0.0,
"slope_ms_per_s": 0.0,
"r2": 0.0,
"drift_ms": 0.0,
}
xs = [point[0] for point in points]
ys = [point[1] for point in points]
mean_x = sum(xs) / len(xs)
mean_y = sum(ys) / len(ys)
denom = sum((x - mean_x) ** 2 for x in xs)
slope = 0.0 if denom == 0 else sum((x - mean_x) * (y - mean_y) for x, y in points) / denom
intercept = mean_y - slope * mean_x
predicted = [intercept + slope * x for x in xs]
ss_tot = sum((y - mean_y) ** 2 for y in ys)
ss_res = sum((y - y_hat) ** 2 for y, y_hat in zip(ys, predicted))
r2 = 1.0 if ss_tot == 0.0 else max(0.0, min(1.0, 1.0 - (ss_res / ss_tot)))
drift = (intercept + slope * xs[-1]) - (intercept + slope * xs[0])
return {
"available": True,
"intercept_ms": intercept,
"slope_ms_per_s": slope,
"r2": r2,
"drift_ms": drift,
"first_fit_ms": intercept + slope * xs[0],
"last_fit_ms": intercept + slope * xs[-1],
}
def correlation(rows, left_key, right_key):
pairs = [
(row[left_key], row[right_key])
for row in rows
if row.get(left_key) is not None and row.get(right_key) is not None
]
if len(pairs) < 2:
return 0.0
xs = [pair[0] for pair in pairs]
ys = [pair[1] for pair in pairs]
mean_x = sum(xs) / len(xs)
mean_y = sum(ys) / len(ys)
denom_x = sum((x - mean_x) ** 2 for x in xs)
denom_y = sum((y - mean_y) ** 2 for y in ys)
if denom_x <= 0.0 or denom_y <= 0.0:
return 0.0
return sum((x - mean_x) * (y - mean_y) for x, y in pairs) / math.sqrt(denom_x * denom_y)
def ffprobe_frame_timestamps(path):
data = run_json(
[
"ffprobe",
"-hide_banner",
"-loglevel",
"error",
"-select_streams",
"v:0",
"-show_frames",
"-show_entries",
"frame=best_effort_timestamp_time",
"-of",
"json",
path,
],
"ffprobe video frames",
)
return [
float(frame["best_effort_timestamp_time"])
for frame in data.get("frames", [])
if frame.get("best_effort_timestamp_time") not in (None, "N/A")
]
def ffprobe_packet_times(path, stream):
data = run_json(
[
"ffprobe",
"-hide_banner",
"-loglevel",
"error",
"-select_streams",
stream,
"-show_packets",
"-show_entries",
"packet=pts_time,duration_time",
"-of",
"json",
path,
],
f"ffprobe {stream} packets",
)
packets = []
for packet in data.get("packets", []):
try:
pts = float(packet["pts_time"])
except Exception:
continue
duration = None
try:
duration = float(packet.get("duration_time"))
except Exception:
pass
packets.append((pts, duration))
return packets
def continuity_frame_ids(path):
width = 320
height = 32
blocks = 20
try:
raw = run_bytes(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-i",
path,
"-map",
"0:v:0",
"-vf",
f"crop=iw:32:0:ih-32,scale={width}:{height}:flags=area,format=gray",
"-f",
"rawvideo",
"-pix_fmt",
"gray",
"-",
],
"ffmpeg video continuity extraction",
)
except RuntimeError:
return []
frame_pixels = width * height
if not raw or len(raw) % frame_pixels != 0:
return []
block_width = width // blocks
ids = []
for offset in range(0, len(raw), frame_pixels):
frame = raw[offset : offset + frame_pixels]
averages = []
for block in range(blocks):
x_start = block * block_width
x_end = width if block + 1 == blocks else (block + 1) * block_width
total = 0
count = 0
for y in range(height):
row = y * width
for x in range(x_start, x_end):
total += frame[row + x]
count += 1
averages.append(total / max(1, count))
white = averages[0]
black = averages[1]
if white < 150 or black > 105 or white - black < 70:
ids.append(None)
continue
threshold = (white + black) / 2.0
value = 0
for block in range(2, 18):
value = (value << 1) | int(averages[block] > threshold)
parity = bool(averages[18] > threshold)
inverse = bool(averages[19] > threshold)
if parity == inverse or parity != bool(value.bit_count() & 1):
ids.append(None)
continue
ids.append(value)
return ids
def sequence_smoothness(ids):
decoded = [value for value in ids if value is not None]
duplicates = 0
missing = 0
regressions = 0
jumps = []
previous = None
for value in decoded:
if previous is None:
previous = value
continue
diff = (value - previous) & 0xFFFF
jumps.append(diff)
if diff == 0:
duplicates += 1
elif 1 < diff < 32768:
missing += diff - 1
elif diff >= 32768:
regressions += 1
previous = value
return {
"decoded_frames": len(decoded),
"undecodable_frames": len(ids) - len(decoded),
"unique_frames": len(set(decoded)),
"duplicate_frames": duplicates,
"estimated_missing_frames": missing,
"sequence_regressions": regressions,
"largest_forward_jump": max(jumps) if jumps else 0,
}
def interval_smoothness(timestamps, expected_ms, window_start_s=None, window_end_s=None):
if window_start_s is not None and window_end_s is not None:
timestamps = [ts for ts in timestamps if window_start_s <= ts <= window_end_s]
intervals = [
(right - left) * 1000.0
for left, right in zip(timestamps, timestamps[1:])
if right > left
]
if (expected_ms is None or expected_ms <= 0.0) and intervals:
expected_ms = statistics.median(intervals)
jitter = [abs(value - expected_ms) for value in intervals] if expected_ms else []
hiccup_threshold = expected_ms * 1.5 if expected_ms else float("inf")
short_threshold = expected_ms * 0.5 if expected_ms else -1.0
return {
"timestamps": len(timestamps),
"expected_interval_ms": expected_ms,
"interval_stats": numeric_stats(intervals, "_interval_ms"),
"jitter_stats": numeric_stats(jitter, "_jitter_ms"),
"hiccup_count": sum(1 for value in intervals if value > hiccup_threshold),
"short_interval_count": sum(1 for value in intervals if value < short_threshold),
"estimated_missing_by_timestamp": sum(
max(0, round(value / expected_ms) - 1)
for value in intervals
if expected_ms and value > hiccup_threshold
),
}
def audio_rms_smoothness(path, window_start_s, window_end_s):
sample_rate = 48_000
window_ms = 10
samples_per_window = sample_rate * window_ms // 1000
try:
raw = run_bytes(
[
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-i",
path,
"-map",
"0:a:0",
"-ac",
"1",
"-ar",
str(sample_rate),
"-f",
"s16le",
"-acodec",
"pcm_s16le",
"-",
],
"ffmpeg audio continuity extraction",
)
except RuntimeError as error:
return {"available": False, "error": str(error)}
start_sample = max(0, int(window_start_s * sample_rate))
end_sample = min(len(raw) // 2, int(window_end_s * sample_rate))
if end_sample <= start_sample:
return {"available": False, "error": "empty audio smoothness window"}
sample_count = end_sample - start_sample
samples = struct.unpack(
f"<{sample_count}h",
raw[start_sample * 2 : end_sample * 2],
)
rms_values = []
for index in range(0, len(samples) - samples_per_window + 1, samples_per_window):
chunk = samples[index : index + samples_per_window]
rms_values.append(math.sqrt(sum(value * value for value in chunk) / len(chunk)))
low_threshold = 90.0
return {
"available": bool(rms_values),
"window_ms": window_ms,
"rms_stats": numeric_stats(rms_values, "_rms"),
"low_rms_window_count": sum(1 for value in rms_values if value < low_threshold),
"low_rms_threshold": low_threshold,
}
def analyze_smoothness(path, report, timeline):
camera_fps = as_float(timeline.get("camera_fps"), 0.0)
duration_s = as_float(timeline.get("duration_us"), 0.0) / 1_000_000.0
warmup_s = as_float(timeline.get("warmup_us"), 0.0) / 1_000_000.0
paired_times = [
value
for event in report.get("paired_events", [])
for value in [finite(event.get("video_time_s")), finite(event.get("audio_time_s"))]
if value is not None
]
if paired_times:
window_start_s = max(0.0, min(paired_times) - warmup_s)
window_end_s = window_start_s + duration_s
else:
window_start_s = None
window_end_s = None
video_timestamps = ffprobe_frame_timestamps(path)
audio_packets = ffprobe_packet_times(path, "a:0")
audio_packet_timestamps = [pts for pts, _duration in audio_packets]
media_timeline = {
"video_first_s": min(video_timestamps) if video_timestamps else None,
"video_last_s": max(video_timestamps) if video_timestamps else None,
"audio_first_s": min(audio_packet_timestamps) if audio_packet_timestamps else None,
"audio_last_s": max(audio_packet_timestamps) if audio_packet_timestamps else None,
}
media_timeline["video_span_s"] = (
media_timeline["video_last_s"] - media_timeline["video_first_s"]
if media_timeline["video_first_s"] is not None and media_timeline["video_last_s"] is not None
else None
)
media_timeline["audio_span_s"] = (
media_timeline["audio_last_s"] - media_timeline["audio_first_s"]
if media_timeline["audio_first_s"] is not None and media_timeline["audio_last_s"] is not None
else None
)
media_timeline["audio_video_span_gap_s"] = (
media_timeline["video_span_s"] - media_timeline["audio_span_s"]
if media_timeline["video_span_s"] is not None and media_timeline["audio_span_s"] is not None
else None
)
video_ids = continuity_frame_ids(path)
if window_start_s is not None and window_end_s is not None and len(video_ids) == len(video_timestamps):
video_ids = [
frame_id
for frame_id, timestamp in zip(video_ids, video_timestamps)
if window_start_s <= timestamp <= window_end_s
]
expected_video_ms = 1000.0 / camera_fps if camera_fps > 0.0 else None
audio_durations = [
duration * 1000.0
for _pts, duration in audio_packets
if duration is not None and math.isfinite(duration) and duration > 0.0
]
expected_audio_ms = statistics.median(audio_durations) if audio_durations else None
audio_window_end = window_end_s if window_end_s is not None else max(audio_packet_timestamps or [0.0])
return {
"schema": "lesavka.output-smoothness-summary.v1",
"scope": "captured RC target media cadence and continuity over the server-generated probe window",
"window_start_s": window_start_s,
"window_end_s": window_end_s,
"media_timeline": media_timeline,
"video": {
**interval_smoothness(video_timestamps, expected_video_ms, window_start_s, window_end_s),
**sequence_smoothness(video_ids),
},
"audio": {
"packet_cadence": interval_smoothness(
audio_packet_timestamps,
expected_audio_ms,
window_start_s,
window_end_s,
),
"rms_continuity": audio_rms_smoothness(path, window_start_s or 0.0, audio_window_end),
},
}
capture_start_unix_ns = parse_capture_start_unix_ns(capture_log_path)
clock_alignment = load_json_or_empty(clock_alignment_path)
clock_alignment_available = bool(clock_alignment.get("available"))
theia_to_tethys_offset_ns = (
as_int_or_none(clock_alignment.get("theia_to_tethys_offset_ns"))
if clock_alignment_available
else None
)
clock_uncertainty_ms = as_float(clock_alignment.get("uncertainty_ms"), 0.0)
audio_delay_ms = as_float(timeline.get("audio_delay_us"), 0.0) / 1000.0
video_delay_ms = as_float(timeline.get("video_delay_us"), 0.0) / 1000.0
def corrected_server_capture_time_s(server, key):
server_unix_ns = as_int_or_none(server.get(key))
if (
server_unix_ns is None
or theia_to_tethys_offset_ns is None
or capture_start_unix_ns is None
):
return None
return (server_unix_ns + theia_to_tethys_offset_ns - capture_start_unix_ns) / 1_000_000_000.0
def corrected_capture_time_s_from_unix_ns(server_unix_ns):
if (
server_unix_ns is None
or theia_to_tethys_offset_ns is None
or capture_start_unix_ns is None
):
return None
return (server_unix_ns + theia_to_tethys_offset_ns - capture_start_unix_ns) / 1_000_000_000.0
def shifted_unix_ns(unix_ns, delta_ms):
if unix_ns is None:
return None
return unix_ns + int(round(delta_ms * 1_000_000.0))
def nearest_server_event_for_observation(observed, used_event_ids):
observed_video_s = finite(observed.get("video_time_s"))
observed_audio_s = finite(observed.get("audio_time_s"))
if observed_video_s is None and observed_audio_s is None:
return None, None, None
best = None
for server in server_event_list:
event_id = as_int_or_none(server.get("event_id"))
if event_id in used_event_ids:
continue
expected_video_s = corrected_server_capture_time_s(server, "video_feed_unix_ns")
expected_audio_s = corrected_server_capture_time_s(server, "audio_push_unix_ns")
score = 0.0
parts = 0
if observed_video_s is not None and expected_video_s is not None:
score += abs(observed_video_s - expected_video_s)
parts += 1
if observed_audio_s is not None and expected_audio_s is not None:
score += abs(observed_audio_s - expected_audio_s)
parts += 1
if parts == 0:
continue
normalized_score = score / parts
if best is None or normalized_score < best[0]:
best = (normalized_score, event_id, server)
if best is None:
return None, None, None
return best[2], "nearest_clocked_time", best[0] * 1000.0
joined = []
used_server_event_ids = set()
for observed in report.get("paired_events", []):
observed_event_id = as_int_or_none(observed.get("event_id"))
observed_server_event_id = as_int_or_none(observed.get("server_event_id"))
observed_event_code = as_int_or_none(observed.get("event_code"))
server = None
match_method = None
match_score_ms = None
if observed_server_event_id is not None:
server = server_events.get(observed_server_event_id)
match_method = "analyzer_server_event_id" if server else None
if server is None and observed_event_code is not None:
server = unique_server_events_by_code.get(observed_event_code)
match_method = "unique_event_code" if server else None
if server is None:
server, match_method, match_score_ms = nearest_server_event_for_observation(
observed,
used_server_event_ids,
)
if server is None and observed_event_code is None and observed_event_id is not None:
server = server_events.get(observed_event_id)
match_method = "legacy_pair_event_id" if server else None
if not server:
continue
event_id = int(server.get("event_id", -1))
used_server_event_ids.add(event_id)
observed_skew_ms = finite(observed.get("skew_ms"))
server_feed_delta_ms = finite(server.get("server_feed_delta_ms"))
tethys_video_time_s = finite(observed.get("video_time_s"))
tethys_audio_time_s = finite(observed.get("audio_time_s"))
server_video_feed_s = finite(server.get("video_feed_monotonic_us"))
server_audio_push_s = finite(server.get("audio_push_monotonic_us"))
if server_video_feed_s is not None:
server_video_feed_s /= 1_000_000.0
if server_audio_push_s is not None:
server_audio_push_s /= 1_000_000.0
server_video_feed_unix_ns = as_int_or_none(server.get("video_feed_unix_ns"))
server_audio_push_unix_ns = as_int_or_none(server.get("audio_push_unix_ns"))
server_video_pipeline_unix_ns = shifted_unix_ns(
server_video_feed_unix_ns,
-video_delay_ms,
)
server_audio_pipeline_unix_ns = shifted_unix_ns(
server_audio_push_unix_ns,
-audio_delay_ms,
)
tethys_video_unix_ns = (
capture_start_unix_ns + int(round(tethys_video_time_s * 1_000_000_000.0))
if capture_start_unix_ns is not None and tethys_video_time_s is not None
else None
)
tethys_audio_unix_ns = (
capture_start_unix_ns + int(round(tethys_audio_time_s * 1_000_000_000.0))
if capture_start_unix_ns is not None and tethys_audio_time_s is not None
else None
)
corrected_server_video_feed_unix_ns = (
server_video_feed_unix_ns + theia_to_tethys_offset_ns
if server_video_feed_unix_ns is not None and theia_to_tethys_offset_ns is not None
else None
)
corrected_server_audio_push_unix_ns = (
server_audio_push_unix_ns + theia_to_tethys_offset_ns
if server_audio_push_unix_ns is not None and theia_to_tethys_offset_ns is not None
else None
)
corrected_server_video_pipeline_unix_ns = (
server_video_pipeline_unix_ns + theia_to_tethys_offset_ns
if server_video_pipeline_unix_ns is not None and theia_to_tethys_offset_ns is not None
else None
)
corrected_server_audio_pipeline_unix_ns = (
server_audio_pipeline_unix_ns + theia_to_tethys_offset_ns
if server_audio_pipeline_unix_ns is not None and theia_to_tethys_offset_ns is not None
else None
)
video_sink_handoff_overhead_ms = (
(tethys_video_unix_ns - corrected_server_video_feed_unix_ns) / 1_000_000.0
if tethys_video_unix_ns is not None and corrected_server_video_feed_unix_ns is not None
else None
)
audio_sink_handoff_overhead_ms = (
(tethys_audio_unix_ns - corrected_server_audio_push_unix_ns) / 1_000_000.0
if tethys_audio_unix_ns is not None and corrected_server_audio_push_unix_ns is not None
else None
)
video_freshness_ms = (
(tethys_video_unix_ns - corrected_server_video_pipeline_unix_ns) / 1_000_000.0
if tethys_video_unix_ns is not None and corrected_server_video_pipeline_unix_ns is not None
else None
)
audio_freshness_ms = (
(tethys_audio_unix_ns - corrected_server_audio_pipeline_unix_ns) / 1_000_000.0
if tethys_audio_unix_ns is not None and corrected_server_audio_pipeline_unix_ns is not None
else None
)
video_event_age_ms = video_freshness_ms
audio_event_age_ms = audio_freshness_ms
residual_path_skew_ms = (
observed_skew_ms - server_feed_delta_ms
if observed_skew_ms is not None and server_feed_delta_ms is not None
else None
)
planned_start_us = int(server.get("planned_start_us", 0))
joined.append({
"event_id": event_id,
"observed_event_id": observed_event_id,
"observed_server_event_id": observed_server_event_id,
"observed_event_code": observed_event_code,
"match_method": match_method,
"match_score_ms": match_score_ms,
"code": int(server.get("code", 0)),
"event_time_s": planned_start_us / 1_000_000.0,
"planned_start_us": planned_start_us,
"planned_end_us": int(server.get("planned_end_us", 0)),
"tethys_video_time_s": tethys_video_time_s,
"tethys_audio_time_s": tethys_audio_time_s,
"observed_skew_ms": observed_skew_ms,
"server_video_feed_monotonic_us": server.get("video_feed_monotonic_us"),
"server_audio_push_monotonic_us": server.get("audio_push_monotonic_us"),
"server_video_feed_unix_ns": server_video_feed_unix_ns,
"server_audio_push_unix_ns": server_audio_push_unix_ns,
"server_video_pipeline_unix_ns": server_video_pipeline_unix_ns,
"server_audio_pipeline_unix_ns": server_audio_pipeline_unix_ns,
"tethys_video_unix_ns": tethys_video_unix_ns,
"tethys_audio_unix_ns": tethys_audio_unix_ns,
"server_video_feed_s": server_video_feed_s,
"server_audio_push_s": server_audio_push_s,
"video_freshness_ms": video_freshness_ms,
"audio_freshness_ms": audio_freshness_ms,
"video_sink_handoff_overhead_ms": video_sink_handoff_overhead_ms,
"audio_sink_handoff_overhead_ms": audio_sink_handoff_overhead_ms,
"video_event_age_ms": video_event_age_ms,
"audio_event_age_ms": audio_event_age_ms,
"server_feed_delta_ms": server_feed_delta_ms,
"residual_path_skew_ms": residual_path_skew_ms,
"confidence": finite(observed.get("confidence")),
})
first_event_time_s = joined[0]["event_time_s"] if joined else 0.0
for row in joined:
row["relative_event_time_s"] = row["event_time_s"] - first_event_time_s
row["event_time_s"] = row["relative_event_time_s"]
observed_model = fit_linear(joined, "observed_skew_ms")
server_model = fit_linear(joined, "server_feed_delta_ms")
residual_model = fit_linear(joined, "residual_path_skew_ms")
video_freshness_model = fit_linear(joined, "video_freshness_ms")
audio_freshness_model = fit_linear(joined, "audio_freshness_ms")
video_sink_handoff_model = fit_linear(joined, "video_sink_handoff_overhead_ms")
audio_sink_handoff_model = fit_linear(joined, "audio_sink_handoff_overhead_ms")
video_event_age_model = fit_linear(joined, "video_event_age_ms")
audio_event_age_model = fit_linear(joined, "audio_event_age_ms")
video_freshness_stats = stats(joined, "video_freshness_ms")
audio_freshness_stats = stats(joined, "audio_freshness_ms")
video_sink_handoff_stats = stats(joined, "video_sink_handoff_overhead_ms")
audio_sink_handoff_stats = stats(joined, "audio_sink_handoff_overhead_ms")
video_event_age_stats = dict(video_freshness_stats)
audio_event_age_stats = dict(audio_freshness_stats)
video_event_age_stats["intentional_delay_ms"] = video_delay_ms
audio_event_age_stats["intentional_delay_ms"] = audio_delay_ms
server_observed_correlation = correlation(joined, "server_feed_delta_ms", "observed_skew_ms")
sync_verdict = report.get("verdict") or {}
sync_passed = sync_verdict.get("passed") is True
observed_drift = observed_model.get("drift_ms", 0.0)
server_drift = server_model.get("drift_ms", 0.0)
residual_drift = residual_model.get("drift_ms", 0.0)
same_direction = observed_drift == 0.0 or (observed_drift > 0) == (server_drift > 0)
server_share = 0.0 if abs(observed_drift) < 1e-6 else abs(server_drift) / abs(observed_drift)
if same_direction and server_share >= 0.5 and abs(server_drift) >= 20.0:
dominant_layer = "server_feed_timing"
else:
dominant_layer = "post_server_output_or_tethys_capture"
correction_mode = (
"linear_function_candidate"
if abs(residual_drift) >= 20.0
else "scalar_candidate"
)
max_freshness_age_ms = max(1.0, as_float(max_freshness_age_raw, 1000.0))
max_freshness_drift_ms = max(0.0, as_float(max_freshness_drift_raw, 100.0))
max_clock_uncertainty_ms = max(0.0, as_float(max_clock_uncertainty_raw, 100.0))
freshness_p95_values = [
value
for value in [
video_freshness_stats.get("p95_ms"),
audio_freshness_stats.get("p95_ms"),
]
if value is not None
]
event_age_p95_values = [
value
for value in [
video_event_age_stats.get("p95_ms"),
audio_event_age_stats.get("p95_ms"),
]
if value is not None
]
freshness_drift_values = [
abs(value)
for value in [
video_event_age_model.get("drift_ms"),
audio_event_age_model.get("drift_ms"),
]
if value is not None and math.isfinite(value)
]
freshness_worst_p95_ms = max(freshness_p95_values) if freshness_p95_values else None
freshness_worst_event_p95_ms = max(event_age_p95_values) if event_age_p95_values else None
freshness_worst_drift_ms = max(freshness_drift_values) if freshness_drift_values else None
freshness_worst_event_with_uncertainty_ms = (
freshness_worst_event_p95_ms + clock_uncertainty_ms
if freshness_worst_event_p95_ms is not None
else None
)
event_age_min_values = [
value
for value in [
video_event_age_stats.get("min_ms"),
audio_event_age_stats.get("min_ms"),
]
if value is not None and math.isfinite(value)
]
freshness_min_event_age_ms = min(event_age_min_values) if event_age_min_values else None
smoothness = analyze_smoothness(capture_path, report, timeline)
media_timeline = smoothness.get("media_timeline") or {}
def negative_rows(rows, key, limit_ms):
return [
row
for row in rows
if row.get(key) is not None
and math.isfinite(row[key])
and row[key] < -limit_ms
]
video_negative_rows = negative_rows(joined, "video_freshness_ms", clock_uncertainty_ms)
audio_negative_rows = negative_rows(joined, "audio_freshness_ms", clock_uncertainty_ms)
capture_timebase_status = "valid"
capture_timebase_reason = "Tethys observation timestamps are monotonic against server pipeline references"
capture_timebase_reasons = []
if not joined:
capture_timebase_status = "unknown"
capture_timebase_reason = "no joined coded events are available"
elif not clock_alignment_available:
capture_timebase_status = "unknown"
capture_timebase_reason = "server/capture clock alignment is unavailable"
elif video_negative_rows or audio_negative_rows:
capture_timebase_status = "invalid"
if video_negative_rows:
capture_timebase_reasons.append(
f"video observed before server pipeline injection {len(video_negative_rows)} time(s)"
)
if audio_negative_rows:
capture_timebase_reasons.append(
f"audio observed before server pipeline injection {len(audio_negative_rows)} time(s)"
)
span_gap_s = finite(media_timeline.get("audio_video_span_gap_s"))
if span_gap_s is not None and abs(span_gap_s) > 1.0:
relation = "shorter" if span_gap_s > 0 else "longer"
capture_timebase_status = "invalid"
capture_timebase_reasons.append(
f"captured audio stream is {abs(span_gap_s):.3f}s {relation} than video; "
"the recording cannot place audio events on the video capture timeline"
)
if capture_timebase_reasons:
capture_timebase_reason = "; ".join(capture_timebase_reasons)
server_start_capture_time_s = corrected_capture_time_s_from_unix_ns(
as_int_or_none(timeline.get("server_start_unix_ns"))
)
first_timing_row = joined[0] if joined else {}
first_event_timing = {
"event_id": first_timing_row.get("event_id"),
"code": first_timing_row.get("code"),
"server_audio_pipeline_capture_s": corrected_capture_time_s_from_unix_ns(
first_timing_row.get("server_audio_pipeline_unix_ns")
),
"server_video_pipeline_capture_s": corrected_capture_time_s_from_unix_ns(
first_timing_row.get("server_video_pipeline_unix_ns")
),
"server_audio_sink_push_capture_s": corrected_capture_time_s_from_unix_ns(
first_timing_row.get("server_audio_push_unix_ns")
),
"server_video_sink_feed_capture_s": corrected_capture_time_s_from_unix_ns(
first_timing_row.get("server_video_feed_unix_ns")
),
"tethys_audio_observed_s": first_timing_row.get("tethys_audio_time_s"),
"tethys_video_observed_s": first_timing_row.get("tethys_video_time_s"),
"audio_pipeline_freshness_ms": first_timing_row.get("audio_freshness_ms"),
"video_pipeline_freshness_ms": first_timing_row.get("video_freshness_ms"),
}
timing_map = {
"schema": "lesavka.output-timing-map.v1",
"scope": "capture-relative map of server pipeline reference, sink handoff, and Tethys observation times",
"injection_scope": timeline.get("injection_scope", "server-final-output-handoff"),
"server_pipeline_reference": timeline.get(
"server_pipeline_reference",
"generated media PTS before intentional sync delay",
),
"sink_handoff_path": timeline.get(
"sink_handoff_path",
"video CameraRelay::feed; audio Voice::push",
),
"client_uplink_included": bool(timeline.get("client_uplink_included", False)),
"freshness_clock_starts_at": "server pipeline reference before intentional sync delay",
"capture_start_unix_ns": capture_start_unix_ns,
"server_start_unix_ns": as_int_or_none(timeline.get("server_start_unix_ns")),
"server_start_capture_time_s": server_start_capture_time_s,
"capture_to_server_start_ms": (
server_start_capture_time_s * 1000.0 if server_start_capture_time_s is not None else None
),
"probe_warmup_ms": as_float(timeline.get("warmup_us"), 0.0) / 1000.0,
"probe_duration_ms": as_float(timeline.get("duration_us"), 0.0) / 1000.0,
"intentional_audio_delay_ms": audio_delay_ms,
"intentional_video_delay_ms": video_delay_ms,
"first_event": first_event_timing,
}
if not event_age_p95_values:
freshness_status = "unknown"
freshness_reason = "clock-aligned server feed and Tethys capture timestamps were not available"
elif not clock_alignment_available or clock_uncertainty_ms > max_clock_uncertainty_ms:
freshness_status = "unknown"
freshness_reason = (
f"clock uncertainty {clock_uncertainty_ms:.1f} ms exceeds "
f"{max_clock_uncertainty_ms:.1f} ms freshness measurement limit"
)
elif capture_timebase_status == "invalid":
freshness_status = "invalid"
freshness_reason = f"capture timebase invalid: {capture_timebase_reason}"
elif freshness_min_event_age_ms is not None and freshness_min_event_age_ms < -clock_uncertainty_ms:
freshness_status = "invalid"
freshness_reason = (
f"one media stream has impossible negative pipeline freshness beyond clock uncertainty: "
f"minimum freshness {freshness_min_event_age_ms:.1f} ms, uncertainty "
f"{clock_uncertainty_ms:.1f} ms"
)
elif freshness_worst_event_p95_ms < -clock_uncertainty_ms:
freshness_status = "invalid"
freshness_reason = (
f"freshness was negative beyond clock uncertainty: "
f"worst pipeline p95 {freshness_worst_event_p95_ms:.1f} ms, uncertainty "
f"{clock_uncertainty_ms:.1f} ms"
)
elif not sync_passed:
freshness_status = "unknown"
freshness_reason = (
"sync did not pass, so freshness from paired signatures is not trustworthy: "
f"{sync_verdict.get('status', 'unknown')} - {sync_verdict.get('reason', '')}"
)
elif freshness_worst_event_with_uncertainty_ms <= max_freshness_age_ms and (
freshness_worst_drift_ms is None or freshness_worst_drift_ms <= max_freshness_drift_ms
):
freshness_status = "pass"
freshness_reason = (
f"worst pipeline p95 {freshness_worst_event_p95_ms:.1f} ms + clock uncertainty "
f"{clock_uncertainty_ms:.1f} ms <= {max_freshness_age_ms:.1f} ms and worst freshness drift "
f"{(freshness_worst_drift_ms or 0.0):.1f} ms <= {max_freshness_drift_ms:.1f} ms"
)
else:
freshness_status = "fail"
freshness_reason = (
f"worst pipeline p95 "
f"{freshness_worst_event_p95_ms if freshness_worst_event_p95_ms is not None else 0.0:.1f} ms "
f"+ clock uncertainty {clock_uncertainty_ms:.1f} ms "
f"(limit {max_freshness_age_ms:.1f} ms), worst freshness drift "
f"{freshness_worst_drift_ms if freshness_worst_drift_ms is not None else 0.0:.1f} ms "
f"(limit {max_freshness_drift_ms:.1f} ms)"
)
artifact = {
"schema": "lesavka.output-delay-correlation.v1",
"report_json": report_path,
"server_timeline_json": timeline_path,
"joined_event_count": len(joined),
"audio_after_video_positive": True,
"timing_map": timing_map,
"observed_skew_model": observed_model,
"server_feed_delta_model": server_model,
"residual_path_skew_model": residual_model,
"freshness": {
"schema": "lesavka.output-freshness-summary.v1",
"status": freshness_status,
"reason": freshness_reason,
"scope": "clock-corrected server pipeline reference to Tethys observed playback from the same paired signatures",
"capture_start_unix_ns": capture_start_unix_ns,
"capture_timebase": {
"status": capture_timebase_status,
"reason": capture_timebase_reason,
"valid": capture_timebase_status == "valid",
"video_observed_before_injection_count": len(video_negative_rows),
"audio_observed_before_injection_count": len(audio_negative_rows),
},
"clock_alignment": clock_alignment,
"clock_uncertainty_ms": clock_uncertainty_ms,
"max_age_limit_ms": max_freshness_age_ms,
"max_drift_limit_ms": max_freshness_drift_ms,
"max_clock_uncertainty_ms": max_clock_uncertainty_ms,
"intentional_audio_delay_ms": audio_delay_ms,
"intentional_video_delay_ms": video_delay_ms,
"worst_p95_sink_handoff_overhead_ms": max(
[
value
for value in [
video_sink_handoff_stats.get("p95_ms"),
audio_sink_handoff_stats.get("p95_ms"),
]
if value is not None
],
default=None,
),
"worst_p95_pipeline_freshness_ms": freshness_worst_p95_ms,
"worst_event_age_p95_ms": freshness_worst_event_p95_ms,
"worst_event_age_with_uncertainty_ms": freshness_worst_event_with_uncertainty_ms,
"minimum_event_age_ms": freshness_min_event_age_ms,
"minimum_pipeline_freshness_ms": freshness_min_event_age_ms,
"sync_passed": sync_passed,
"worst_p95_freshness_ms": freshness_worst_event_p95_ms,
"worst_freshness_drift_ms": freshness_worst_drift_ms,
"video_freshness_stats": video_freshness_stats,
"audio_freshness_stats": audio_freshness_stats,
"video_sink_handoff_overhead_stats": video_sink_handoff_stats,
"audio_sink_handoff_overhead_stats": audio_sink_handoff_stats,
"video_event_age_stats": video_event_age_stats,
"audio_event_age_stats": audio_event_age_stats,
"video_freshness_model": video_freshness_model,
"audio_freshness_model": audio_freshness_model,
"video_sink_handoff_overhead_model": video_sink_handoff_model,
"audio_sink_handoff_overhead_model": audio_sink_handoff_model,
"video_event_age_model": video_event_age_model,
"audio_event_age_model": audio_event_age_model,
},
"smoothness": smoothness,
"server_observed_correlation": server_observed_correlation,
"server_drift_share_of_observed": server_share,
"dominant_layer": dominant_layer,
"correction_mode": correction_mode,
"video_delay_function_candidate": {
"units": "microseconds",
"end_to_end": {
"intercept_us": round(observed_model.get("intercept_ms", 0.0) * 1000.0),
"slope_us_per_s": round(observed_model.get("slope_ms_per_s", 0.0) * 1000.0),
"formula": "video_delay_us(t) = intercept_us + slope_us_per_s * seconds_since_first_event",
},
"output_path_only": {
"intercept_us": round(residual_model.get("intercept_ms", 0.0) * 1000.0),
"slope_us_per_s": round(residual_model.get("slope_ms_per_s", 0.0) * 1000.0),
"formula": "video_delay_us(t) = intercept_us + slope_us_per_s * seconds_since_first_event",
},
},
"events": joined,
}
pathlib.Path(output_json_path).write_text(json.dumps(artifact, indent=2, sort_keys=True) + "\n")
with pathlib.Path(output_csv_path).open("w", newline="", encoding="utf-8") as handle:
fieldnames = [
"event_id",
"observed_event_id",
"observed_server_event_id",
"observed_event_code",
"match_method",
"match_score_ms",
"code",
"event_time_s",
"tethys_video_time_s",
"tethys_audio_time_s",
"observed_skew_ms",
"server_video_feed_s",
"server_audio_push_s",
"server_video_feed_unix_ns",
"server_audio_push_unix_ns",
"server_video_pipeline_unix_ns",
"server_audio_pipeline_unix_ns",
"tethys_video_unix_ns",
"tethys_audio_unix_ns",
"video_freshness_ms",
"audio_freshness_ms",
"video_sink_handoff_overhead_ms",
"audio_sink_handoff_overhead_ms",
"video_event_age_ms",
"audio_event_age_ms",
"server_feed_delta_ms",
"residual_path_skew_ms",
"server_video_feed_monotonic_us",
"server_audio_push_monotonic_us",
"confidence",
]
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
for row in joined:
writer.writerow({key: row.get(key) for key in fieldnames})
def fmt_number(value, unit="", precision=1):
if value is None:
return "n/a"
try:
value = float(value)
except Exception:
return "n/a"
if not math.isfinite(value):
return "n/a"
return f"{value:.{precision}f}{unit}"
lines = [
f"Output-delay correlation for {report_path}",
f"- joined events: {len(joined)}",
f"- dominant layer: {dominant_layer}",
f"- correction mode: {correction_mode}",
f"- observed skew model: {observed_model.get('intercept_ms', 0.0):+.3f} ms + {observed_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t",
f"- server feed model: {server_model.get('intercept_ms', 0.0):+.3f} ms + {server_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t",
f"- residual path model: {residual_model.get('intercept_ms', 0.0):+.3f} ms + {residual_model.get('slope_ms_per_s', 0.0):+.3f} ms/s * t",
f"- server/observed correlation: {server_observed_correlation:+.3f}",
f"- server drift share of observed: {server_share:.3f}",
"",
"Timing map",
"- scope: capture-relative server pipeline reference, sink handoff, and Tethys observation",
f"- injection point: {timing_map.get('injection_scope')} ({timing_map.get('sink_handoff_path')}); client uplink included={timing_map.get('client_uplink_included')}",
f"- freshness clock starts at: {timing_map.get('freshness_clock_starts_at')}",
f"- capture to server probe start: {fmt_number(timing_map.get('capture_to_server_start_ms'), ' ms')} (setup/pre-roll, not media latency)",
f"- probe warmup before first coded event: {fmt_number(timing_map.get('probe_warmup_ms'), ' ms')}",
f"- first event server pipeline refs: audio {fmt_number(first_event_timing.get('server_audio_pipeline_capture_s'), 's', 3)}, video {fmt_number(first_event_timing.get('server_video_pipeline_capture_s'), 's', 3)}",
f"- first event sink handoff: audio {fmt_number(first_event_timing.get('server_audio_sink_push_capture_s'), 's', 3)}, video {fmt_number(first_event_timing.get('server_video_sink_feed_capture_s'), 's', 3)}",
f"- first event Tethys observation: audio {fmt_number(first_event_timing.get('tethys_audio_observed_s'), 's', 3)}, video {fmt_number(first_event_timing.get('tethys_video_observed_s'), 's', 3)}",
f"- first event pipeline freshness: audio {fmt_number(first_event_timing.get('audio_pipeline_freshness_ms'), ' ms')}, video {fmt_number(first_event_timing.get('video_pipeline_freshness_ms'), ' ms')}",
"",
f"Output freshness for {report_path}",
"- scope: clock-corrected server pipeline reference to Tethys observed playback from the same paired signatures",
f"- freshness status: {freshness_status} ({freshness_reason})",
f"- capture timebase: {capture_timebase_status} ({capture_timebase_reason})",
f"- clock uncertainty: +/-{clock_uncertainty_ms:.1f} ms",
f"- intentional sync delays: audio {audio_delay_ms:+.1f} ms, video {video_delay_ms:+.1f} ms",
f"- sink handoff overhead: video median {fmt_number(video_sink_handoff_stats.get('median_ms'), ' ms')} / p95 {fmt_number(video_sink_handoff_stats.get('p95_ms'), ' ms')} / max {fmt_number(video_sink_handoff_stats.get('max_ms'), ' ms')}; audio median {fmt_number(audio_sink_handoff_stats.get('median_ms'), ' ms')} / p95 {fmt_number(audio_sink_handoff_stats.get('p95_ms'), ' ms')} / max {fmt_number(audio_sink_handoff_stats.get('max_ms'), ' ms')}",
f"- pipeline freshness: video median {fmt_number(video_freshness_stats.get('median_ms'), ' ms')} / p95 {fmt_number(video_freshness_stats.get('p95_ms'), ' ms')} / max {fmt_number(video_freshness_stats.get('max_ms'), ' ms')}; audio median {fmt_number(audio_freshness_stats.get('median_ms'), ' ms')} / p95 {fmt_number(audio_freshness_stats.get('p95_ms'), ' ms')} / max {fmt_number(audio_freshness_stats.get('max_ms'), ' ms')}",
f"- freshness budget: worst pipeline p95 {fmt_number(freshness_worst_event_p95_ms, ' ms')} + clock uncertainty {clock_uncertainty_ms:.1f} ms = {fmt_number(freshness_worst_event_with_uncertainty_ms, ' ms')} vs limit {max_freshness_age_ms:.1f} ms",
f"- video freshness drift: {video_event_age_model.get('drift_ms', 0.0):+.1f} ms over paired events ({video_event_age_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)",
f"- audio freshness drift: {audio_event_age_model.get('drift_ms', 0.0):+.1f} ms over paired events ({audio_event_age_model.get('slope_ms_per_s', 0.0):+.3f} ms/s)",
"",
"Output smoothness",
f"- window: {smoothness.get('window_start_s') or 0.0:.3f}s to {smoothness.get('window_end_s') or 0.0:.3f}s",
f"- capture media timeline: video span {fmt_number(media_timeline.get('video_span_s'), 's', 3)}, audio span {fmt_number(media_timeline.get('audio_span_s'), 's', 3)}, audio/video span gap {fmt_number(media_timeline.get('audio_video_span_gap_s'), 's', 3)}",
f"- video cadence: frames {smoothness.get('video', {}).get('timestamps', 0)}, expected interval {(smoothness.get('video', {}).get('expected_interval_ms') or 0.0):.1f} ms, p95 jitter {(smoothness.get('video', {}).get('jitter_stats', {}).get('p95_jitter_ms') or 0.0):.1f} ms, max interval {(smoothness.get('video', {}).get('interval_stats', {}).get('max_interval_ms') or 0.0):.1f} ms, hiccups {smoothness.get('video', {}).get('hiccup_count', 0)}",
f"- video continuity: decoded {smoothness.get('video', {}).get('decoded_frames', 0)}, duplicates {smoothness.get('video', {}).get('duplicate_frames', 0)}, estimated missing {smoothness.get('video', {}).get('estimated_missing_frames', 0)}, undecodable {smoothness.get('video', {}).get('undecodable_frames', 0)}",
f"- audio packet cadence: packets {smoothness.get('audio', {}).get('packet_cadence', {}).get('timestamps', 0)}, p95 jitter {(smoothness.get('audio', {}).get('packet_cadence', {}).get('jitter_stats', {}).get('p95_jitter_ms') or 0.0):.1f} ms, max interval {(smoothness.get('audio', {}).get('packet_cadence', {}).get('interval_stats', {}).get('max_interval_ms') or 0.0):.1f} ms, hiccups {smoothness.get('audio', {}).get('packet_cadence', {}).get('hiccup_count', 0)}",
f"- audio pilot continuity: low-RMS windows {smoothness.get('audio', {}).get('rms_continuity', {}).get('low_rms_window_count', 0)}, median RMS {(smoothness.get('audio', {}).get('rms_continuity', {}).get('rms_stats', {}).get('median_rms') or 0.0):.1f}",
]
summary = "\n".join(lines) + "\n"
pathlib.Path(output_txt_path).write_text(summary)
print(summary, end="")
PY
}
maybe_apply_output_delay_calibration() {
[[ "${LESAVKA_OUTPUT_DELAY_CALIBRATION}" != "0" ]] || return 0
[[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]] || return 0
# shellcheck disable=SC1090
source "${LOCAL_OUTPUT_DELAY_ENV}"
echo "==> UVC/UAC output-delay calibration decision"
echo " ↪ output_delay_calibration_json=${LOCAL_OUTPUT_DELAY_JSON}"
echo " ↪ output_delay_ready=${output_delay_ready:-false}"
echo " ↪ output_delay_decision=${output_delay_decision:-unknown}"
echo " ↪ output_delay_target=${output_delay_target:-unknown}"
echo " ↪ output_delay_paired_event_count=${output_delay_paired_event_count:-0}"
echo " ↪ output_delay_measured_skew_ms=${output_delay_measured_skew_ms:-0.0}"
echo " ↪ output_delay_drift_ms=${output_delay_drift_ms:-0.0}"
echo " ↪ output_delay_audio_delta_us=${output_delay_audio_delta_us:-0}"
echo " ↪ output_delay_video_delta_us=${output_delay_video_delta_us:-0}"
echo " ↪ output_delay_active_audio_offset_us=${output_delay_active_audio_offset_us:-0}"
echo " ↪ output_delay_active_video_offset_us=${output_delay_active_video_offset_us:-0}"
echo " ↪ output_delay_audio_target_offset_us=${output_delay_audio_target_offset_us:-0}"
echo " ↪ output_delay_video_target_offset_us=${output_delay_video_target_offset_us:-0}"
echo " ↪ output_delay_apply_mode=${output_delay_apply_mode:-${LESAVKA_OUTPUT_DELAY_APPLY_MODE}}"
echo " ↪ output_delay_note=${output_delay_note:-}"
if [[ "${output_delay_ready:-false}" != "true" ]]; then
echo " ↪ output delay calibration apply refused: ${output_delay_note:-not ready}"
return 0
fi
if [[ "${LESAVKA_OUTPUT_DELAY_APPLY}" == "0" ]]; then
echo " ↪ output delay calibration apply disabled"
return 0
fi
local apply_mode="${output_delay_apply_mode:-${LESAVKA_OUTPUT_DELAY_APPLY_MODE}}"
local apply_audio_delta="${output_delay_audio_delta_us:-0}"
local apply_video_delta="${output_delay_video_delta_us:-0}"
if [[ "${apply_mode}" == "absolute" ]]; then
local calibration_output current_audio_offset_us current_video_offset_us
if ! calibration_output="$(
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
calibration 2>&1
)"; then
echo " ↪ output delay calibration apply refused: current calibration query failed"
echo "${calibration_output}" >&2
return 0
fi
current_audio_offset_us="$(awk -F= '/^calibration_active_audio_offset_us=/{print $2; exit}' <<<"${calibration_output}")"
current_video_offset_us="$(awk -F= '/^calibration_active_video_offset_us=/{print $2; exit}' <<<"${calibration_output}")"
if [[ ! "${current_audio_offset_us}" =~ ^-?[0-9]+$ || ! "${current_video_offset_us}" =~ ^-?[0-9]+$ ]]; then
echo " ↪ output delay calibration apply refused: could not parse active calibration offsets"
echo "${calibration_output}" >&2
return 0
fi
apply_audio_delta=$(( ${output_delay_audio_target_offset_us:-0} - current_audio_offset_us ))
apply_video_delta=$(( ${output_delay_video_target_offset_us:-0} - current_video_offset_us ))
echo " ↪ current_active_audio_offset_us=${current_audio_offset_us}"
echo " ↪ current_active_video_offset_us=${current_video_offset_us}"
echo " ↪ absolute_target_audio_offset_us=${output_delay_audio_target_offset_us:-0}"
echo " ↪ absolute_target_video_offset_us=${output_delay_video_target_offset_us:-0}"
elif [[ "${apply_mode}" != "relative" ]]; then
echo " ↪ output delay calibration apply refused: unsupported apply mode ${apply_mode}"
return 0
fi
echo " ↪ calibration_apply_audio_delta_us=${apply_audio_delta}"
echo " ↪ calibration_apply_video_delta_us=${apply_video_delta}"
echo "==> applying measured UVC/UAC output-delay calibration"
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
calibrate \
"${apply_audio_delta}" \
"${apply_video_delta}" \
"${output_delay_note:-direct UVC/UAC output-delay calibration}"
if [[ "${LESAVKA_OUTPUT_DELAY_SAVE}" != "0" ]]; then
echo "==> saving measured UVC/UAC output-delay calibration as default"
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
calibration-save-default
fi
}
maybe_run_output_delay_confirmation() {
[[ "${LESAVKA_OUTPUT_DELAY_CONFIRM}" != "0" ]] || return 0
[[ "${LESAVKA_OUTPUT_DELAY_APPLY}" != "0" ]] || return 0
[[ "${LESAVKA_OUTPUT_DELAY_CONFIRMING:-0}" != "1" ]] || return 0
[[ "${output_delay_ready:-false}" == "true" ]] || return 0
local confirm_audio_delay="${output_delay_audio_target_offset_us:-0}"
local confirm_video_delay="${output_delay_video_target_offset_us:-0}"
local confirm_output_dir="${LOCAL_REPORT_DIR}/confirmation"
mkdir -p "${confirm_output_dir}"
echo "==> confirming fixed UVC/UAC output-delay calibration"
echo " ↪ confirmation_audio_delay_us=${confirm_audio_delay}"
echo " ↪ confirmation_video_delay_us=${confirm_video_delay}"
echo " ↪ confirmation_requires_sync_pass=1"
LESAVKA_OUTPUT_DELAY_CONFIRMING=1 \
LESAVKA_OUTPUT_DELAY_CONFIRM=0 \
LESAVKA_OUTPUT_DELAY_APPLY=0 \
LESAVKA_OUTPUT_DELAY_SAVE=0 \
LESAVKA_OUTPUT_REQUIRE_SYNC_PASS=1 \
LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US="${confirm_audio_delay}" \
LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US="${confirm_video_delay}" \
PROBE_PREBUILD=0 \
LOCAL_OUTPUT_DIR="${confirm_output_dir}" \
"${SCRIPT_DIR}/run_upstream_av_sync.sh"
}
enforce_sync_verdict() {
[[ "${LESAVKA_OUTPUT_REQUIRE_SYNC_PASS}" != "0" ]] || return 0
[[ -f "${LOCAL_ANALYSIS_JSON}" ]] || {
echo "required sync pass unavailable: missing ${LOCAL_ANALYSIS_JSON}" >&2
exit 94
}
python3 - <<'PY' "${LOCAL_ANALYSIS_JSON}"
import json
import pathlib
import sys
report = json.loads(pathlib.Path(sys.argv[1]).read_text())
verdict = report.get("verdict") or {}
if verdict.get("passed") is True:
raise SystemExit(0)
print(
"required sync pass failed: "
f"{verdict.get('status', 'unknown')} - {verdict.get('reason', '')}",
file=sys.stderr,
)
raise SystemExit(94)
PY
}
if [[ "${PROBE_PREBUILD}" != "0" ]]; then
echo "==> prebuilding relay control/analyzer before opening the capture window"
(
cd "${REPO_ROOT}"
cargo build -p lesavka_client --bin lesavka-sync-analyze --bin lesavka-relayctl
)
fi
if [[ ! -x "${ANALYZE_BIN}" ]]; then
echo "sync analyzer binary not found at ${ANALYZE_BIN}" >&2
exit 1
fi
resolve_server_addr
echo "==> resolved Lesavka server addr: ${RESOLVED_LESAVKA_SERVER_ADDR}"
if [[ -n "${SERVER_TUNNEL_PID}" ]]; then
echo " ↪ tunneled to ${LESAVKA_SERVER_HOST}:127.0.0.1:${SERVER_TUNNEL_REMOTE_PORT}"
fi
print_lesavka_versions
preflight_server_path
write_clock_alignment
echo "==> starting Tethys capture on ${TETHYS_HOST}"
ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \
"${REMOTE_CAPTURE}" \
"${CAPTURE_SECONDS}" \
"${REMOTE_VIDEO_DEVICE}" \
"${VIDEO_SIZE}" \
"${VIDEO_FPS}" \
"${VIDEO_FORMAT}" \
"${REMOTE_CAPTURE_STACK}" \
"${REMOTE_PULSE_CAPTURE_TOOL}" \
"${REMOTE_PULSE_VIDEO_MODE}" \
"${REMOTE_PULSE_AUDIO_ANCHOR_SILENCE}" \
"${REMOTE_AUDIO_SOURCE}" \
"${REMOTE_AUDIO_QUIESCE_USER_AUDIO}" \
"${REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK}" \
"${REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS}" \
"${REMOTE_CAPTURE_READY_SETTLE_SECONDS}" \
> >(tee "${LOCAL_CAPTURE_LOG}") \
2> >(tee -a "${LOCAL_CAPTURE_LOG}" >&2) <<'REMOTE_CAPTURE_SCRIPT' &
set -euo pipefail
remote_capture=$1
capture_seconds=$2
remote_video_device=$3
video_size=$4
video_fps=$5
video_format=$6
remote_capture_stack=$7
remote_pulse_capture_tool=$8
remote_pulse_video_mode=$9
remote_pulse_audio_anchor_silence=${10}
remote_audio_source=${11}
remote_audio_quiesce_user_audio=${12}
remote_capture_allow_alsa_fallback=${13}
remote_capture_preroll_discard_seconds=${14}
remote_capture_ready_settle_seconds=${15}
rm -f "${remote_capture}"
restore_user_audio() {
systemctl --user start pipewire.socket pipewire-pulse.socket wireplumber.service >/dev/null 2>&1 || true
sleep 1
systemctl --user start pipewire.service pipewire-pulse.service >/dev/null 2>&1 || true
}
quiesce_user_audio() {
systemctl --user stop pipewire-pulse.service pipewire.service wireplumber.service \
pipewire-pulse.socket pipewire.socket >/dev/null 2>&1 || true
sleep 1
}
resolve_video_device() {
local requested=$1
if [[ "${requested}" != "auto" ]]; then
printf '%s\n' "${requested}"
return 0
fi
local by_id
by_id=$(find /dev/v4l/by-id -maxdepth 1 -type l -name '*Lesavka*video-index0' 2>/dev/null | head -n1 || true)
if [[ -n "${by_id}" ]]; then
printf '%s\n' "${by_id}"
return 0
fi
if command -v v4l2-ctl >/dev/null 2>&1; then
local resolved
resolved="$(
v4l2-ctl --list-devices 2>/dev/null \
| awk '
BEGIN { want=0 }
/Lesavka Composite: UVC Camera/ { want=1; next }
/^[^ \t]/ { want=0 }
want && /^[ \t]+\/dev\/video[0-9]+/ {
gsub(/^[ \t]+/, "", $0)
print
exit
}
'
)"
if [[ -n "${resolved}" ]]; then
printf '%s\n' "${resolved}"
return 0
fi
fi
printf 'Lesavka UVC video device not found on Tethys; refusing to fall back to an unrelated webcam/capture card.\n' >&2
exit 64
}
resolve_pulse_source() {
if ! command -v pactl >/dev/null 2>&1; then
return 1
fi
pactl list short sources 2>/dev/null \
| awk '
/alsa_input\..*Lesavka_Composite/ { print $2; found=1; exit }
/Lesavka_Composite/ && !fallback { fallback=$2 }
END {
if (found) exit 0
if (fallback != "") { print fallback; exit 0 }
exit 1
}
'
}
resolve_alsa_audio_device() {
if ! command -v arecord >/dev/null 2>&1; then
return 1
fi
arecord -l 2>/dev/null | awk '
/^card [0-9]+:/ && ($0 ~ /Lesavka|UAC2_Gadget|UAC2Gadget|Composite/) {
card=$2
sub(":", "", card)
for (i = 1; i <= NF; i++) {
if ($i == "device") {
dev=$(i + 1)
sub(":", "", dev)
printf "hw:%s,%s\n", card, dev
found=1
exit 0
}
}
}
END {
if (!found) exit 1
}
'
}
gst_video_source_caps() {
case "${video_format}" in
""|mjpeg|MJPG)
printf 'image/jpeg,width=%s,height=%s,framerate=%s/1' \
"${resolved_video_size%x*}" \
"${resolved_video_size#*x}" \
"${resolved_video_fps}"
;;
yuyv422|YUYV|yuyv)
printf 'video/x-raw,format=YUY2,width=%s,height=%s,framerate=%s/1' \
"${resolved_video_size%x*}" \
"${resolved_video_size#*x}" \
"${resolved_video_fps}"
;;
*)
printf 'unsupported gst video_format=%s\n' "${video_format}" >&2
exit 64
;;
esac
}
gst_video_decode_chain() {
case "${video_format}" in
""|mjpeg|MJPG)
printf 'jpegdec ! '
;;
yuyv422|YUYV|yuyv)
printf ''
;;
*)
printf 'unsupported gst video_format=%s\n' "${video_format}" >&2
exit 64
;;
esac
}
gst_audio_mixer_element() {
if gst-inspect-1.0 audiomixer 2>/dev/null | grep -q 'ignore-inactive-pads'; then
printf 'audiomixer name=amix ignore-inactive-pads=true'
else
printf 'audiomixer name=amix'
fi
}
current_video_profile() {
if ! command -v v4l2-ctl >/dev/null 2>&1; then
return 1
fi
v4l2-ctl -d "${resolved_video_device}" --all 2>/dev/null \
| awk '
/Width\/Height[[:space:]]*:/ {
split($0, a, ":")
gsub(/^[ \t]+/, "", a[2])
split(a[2], wh, "/")
width=wh[1]
height=wh[2]
next
}
/Frames per second:/ {
fps=$4
sub(/\..*/, "", fps)
}
END {
if (width != "" && height != "") {
print "size=" width "x" height
}
if (fps != "") {
print "fps=" fps
}
}
'
}
resolve_video_size() {
local requested=$1
if [[ "${requested}" != "auto" ]]; then
printf '%s\n' "${requested}"
return 0
fi
local current_profile
current_profile="$(current_video_profile || true)"
local current_size
current_size="$(awk -F= '/^size=/{print $2; exit}' <<<"${current_profile}")"
if [[ -n "${current_size}" ]]; then
printf '%s\n' "${current_size}"
return 0
fi
if ! command -v v4l2-ctl >/dev/null 2>&1; then
printf '640x480\n'
return 0
fi
local listing
listing="$(v4l2-ctl -d "${resolved_video_device}" --list-formats-ext 2>/dev/null || true)"
local preferred
for preferred in 1920x1080 1360x768 1280x720; do
if grep -q "Size: Discrete ${preferred}" <<<"${listing}"; then
printf '%s\n' "${preferred}"
return 0
fi
done
local first_size
first_size="$(grep -m1 -o 'Size: Discrete [0-9]\+x[0-9]\+' <<<"${listing}" | awk '{print $3}' || true)"
if [[ -n "${first_size}" ]]; then
printf '%s\n' "${first_size}"
return 0
fi
printf '640x480\n'
}
resolve_video_fps() {
local requested=$1
if [[ "${requested}" != "auto" ]]; then
printf '%s\n' "${requested}"
return 0
fi
local current_profile
current_profile="$(current_video_profile || true)"
local current_fps
current_fps="$(awk -F= '/^fps=/{print $2; exit}' <<<"${current_profile}")"
if [[ -n "${current_fps}" ]]; then
printf '%s\n' "${current_fps}"
return 0
fi
if ! command -v v4l2-ctl >/dev/null 2>&1; then
printf '20\n'
return 0
fi
local listing
listing="$(v4l2-ctl -d "${resolved_video_device}" --list-formats-ext 2>/dev/null || true)"
local first_fps
first_fps="$(grep -m1 -o '[0-9]\+\.[0-9]\+ fps' <<<"${listing}" | awk '{sub(/\..*/, "", $1); print $1}' || true)"
if [[ -n "${first_fps}" ]]; then
printf '%s\n' "${first_fps}"
return 0
fi
printf '20\n'
}
resolve_pw_audio_target() {
if ! command -v pw-dump >/dev/null 2>&1 || ! command -v python3 >/dev/null 2>&1; then
return 1
fi
pw-dump | python3 -c '
import json
import sys
try:
objs = json.load(sys.stdin)
except Exception:
raise SystemExit(1)
for obj in objs:
if obj.get("type") != "PipeWire:Interface:Node":
continue
props = (obj.get("info") or {}).get("props") or {}
if props.get("media.class") != "Audio/Source":
continue
serial = props.get("object.serial")
name = props.get("node.name", "")
desc = props.get("node.description", "")
if serial is None:
continue
if "Lesavka_Composite" in name or "Lesavka Composite" in desc:
print(serial)
raise SystemExit(0)
raise SystemExit(1)
'
}
capture_mode=""
alsa_audio_dev="hw:3,0"
pulse_source=""
pw_audio_target=""
case "${remote_capture_stack}" in
auto)
if [[ "${remote_audio_source}" == "auto" ]]; then
if pulse_source="$(resolve_pulse_source)"; then
capture_mode="pulse"
elif command -v pw-record >/dev/null 2>&1 \
&& command -v pw-v4l2 >/dev/null 2>&1 \
&& pw_audio_target="$(resolve_pw_audio_target)"; then
capture_mode="pwpipe"
elif [[ "${remote_capture_allow_alsa_fallback}" == "1" ]] && alsa_audio_dev="$(resolve_alsa_audio_device)"; then
capture_mode="alsa"
printf 'PipeWire Lesavka source not found; using explicit diagnostic ALSA fallback device %s\n' "${alsa_audio_dev}" >&2
else
printf 'Lesavka Pulse/PipeWire audio source not found; refusing raw ALSA fallback for timing-sensitive capture.\n' >&2
printf 'Set REMOTE_CAPTURE_STACK=alsa or REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK=1 only for diagnostic signal-presence checks.\n' >&2
exit 64
fi
elif [[ "${remote_audio_source}" == pulse:* ]]; then
capture_mode="pulse"
pulse_source="${remote_audio_source#pulse:}"
elif [[ "${remote_audio_source}" == alsa:* ]]; then
capture_mode="alsa"
alsa_audio_dev="${remote_audio_source#alsa:}"
else
printf 'unsupported REMOTE_AUDIO_SOURCE=%s\n' "${remote_audio_source}" >&2
exit 64
fi
;;
pwpipe)
if ! command -v pw-record >/dev/null 2>&1 || ! command -v pw-v4l2 >/dev/null 2>&1; then
printf 'REMOTE_CAPTURE_STACK=pwpipe requires pw-record and pw-v4l2\n' >&2
exit 64
fi
pw_audio_target="$(resolve_pw_audio_target)" || {
printf 'PipeWire Lesavka capture target not found for REMOTE_CAPTURE_STACK=pwpipe\n' >&2
exit 64
}
capture_mode="pwpipe"
;;
pulse)
if [[ "${remote_audio_source}" == pulse:* ]]; then
pulse_source="${remote_audio_source#pulse:}"
elif [[ "${remote_audio_source}" == "auto" ]]; then
pulse_source="$(resolve_pulse_source)" || {
printf 'PipeWire Lesavka source not found for REMOTE_CAPTURE_STACK=pulse\n' >&2
exit 64
}
else
pulse_source="${remote_audio_source}"
fi
capture_mode="pulse"
;;
alsa)
if [[ "${remote_audio_source}" == alsa:* ]]; then
alsa_audio_dev="${remote_audio_source#alsa:}"
elif [[ "${remote_audio_source}" != "auto" ]]; then
alsa_audio_dev="${remote_audio_source}"
fi
capture_mode="alsa"
;;
*)
printf 'unsupported REMOTE_CAPTURE_STACK=%s\n' "${remote_capture_stack}" >&2
exit 64
;;
esac
resolved_video_device="$(resolve_video_device "${remote_video_device}")"
resolved_video_size="$(resolve_video_size "${video_size}")"
resolved_video_fps="$(resolve_video_fps "${video_fps}")"
printf 'using video device: %s\n' "${resolved_video_device}" >&2
printf 'using video mode: %s @ %s fps (%s)\n' "${resolved_video_size}" "${resolved_video_fps}" "${video_format:-driver-default}" >&2
video_args=(-f video4linux2 -framerate "${resolved_video_fps}" -video_size "${resolved_video_size}")
if [[ -n "${video_format}" ]]; then
video_args+=(-input_format "${video_format}")
fi
gst_source_caps="$(gst_video_source_caps)"
gst_decode_chain="$(gst_video_decode_chain)"
gst_audio_mixer="$(gst_audio_mixer_element)"
run_ffmpeg_capture() {
local rc=0
announce_capture_start
timeout --kill-after=5 --signal=INT "$((capture_seconds + 5))" "$@" </dev/null &
local capture_pid=$!
signal_capture_ready
wait "${capture_pid}" || rc=$?
case "${rc}" in
0|124|130)
return 0
;;
*)
return "${rc}"
;;
esac
}
announce_capture_start() {
printf 'capture_start_unix_ns=%s\n' "$(date +%s%N)" >&2
}
signal_capture_ready() {
if [[ "${remote_capture_ready_settle_seconds}" =~ ^[0-9]+([.][0-9]+)?$ ]]; then
sleep "${remote_capture_ready_settle_seconds}"
fi
printf '%s\n' "__LESAVKA_CAPTURE_READY__"
}
run_tolerant_capture() {
announce_capture_start
"$@" &
local capture_pid=$!
signal_capture_ready
wait "${capture_pid}" || true
}
quiesce_for_alsa=0
case "${remote_audio_quiesce_user_audio}" in
1|true|yes)
quiesce_for_alsa=1
;;
auto)
if [[ "${capture_mode}" == "alsa" ]]; then
quiesce_for_alsa=1
fi
;;
0|false|no)
quiesce_for_alsa=0
;;
*)
printf 'unsupported REMOTE_AUDIO_QUIESCE_USER_AUDIO=%s\n' "${remote_audio_quiesce_user_audio}" >&2
exit 64
;;
esac
if [[ "${capture_mode}" == "alsa" && "${quiesce_for_alsa}" == "1" ]]; then
printf 'quiescing Tethys user audio before raw ALSA capture\n' >&2
quiesce_user_audio
trap restore_user_audio EXIT
fi
discard_preroll_capture() {
local seconds=$1
[[ "${seconds}" =~ ^[0-9]+$ ]] || return 0
[[ "${seconds}" -gt 0 ]] || return 0
local discard="/tmp/lesavka-output-delay-preroll-discard-$RANDOM.mkv"
printf 'discarding %ss of post-enumeration capture before probe\n' "${seconds}" >&2
case "${capture_mode}" in
pulse)
if command -v ffmpeg >/dev/null 2>&1; then
timeout --kill-after=3 --signal=INT "$((seconds + 5))" ffmpeg -nostdin -hide_banner -loglevel error -y \
-thread_queue_size 1024 \
"${video_args[@]}" \
-i "${resolved_video_device}" \
-thread_queue_size 1024 \
-f pulse \
-i "${pulse_source}" \
-t "${seconds}" \
-c:v copy \
-c:a pcm_s16le \
"${discard}" </dev/null >/dev/null 2>&1 || true
fi
;;
alsa)
if command -v ffmpeg >/dev/null 2>&1; then
timeout --kill-after=3 --signal=INT "$((seconds + 5))" ffmpeg -nostdin -hide_banner -loglevel error -y \
-thread_queue_size 1024 \
"${video_args[@]}" \
-i "${resolved_video_device}" \
-thread_queue_size 1024 \
-f alsa -ac 2 -ar 48000 \
-i "${alsa_audio_dev}" \
-t "${seconds}" \
-c:v copy \
-c:a pcm_s16le \
"${discard}" </dev/null >/dev/null 2>&1 || true
fi
;;
*)
sleep "${seconds}"
;;
esac
rm -f "${discard}"
}
discard_preroll_capture "${remote_capture_preroll_discard_seconds}"
if [[ "${capture_mode}" == "pwpipe" ]]; then
printf 'using PipeWire-native mux capture target serial: %s\n' "${pw_audio_target}" >&2
announce_capture_start
(
timeout "${capture_seconds}" pw-record \
--target "${pw_audio_target}" \
--rate 48000 \
--channels 2 \
--format s16 \
--raw - \
| pw-v4l2 ffmpeg -hide_banner -loglevel error -y \
-thread_queue_size 1024 \
"${video_args[@]}" \
-i "${resolved_video_device}" \
-thread_queue_size 1024 \
-f s16le -ar 48000 -ac 2 \
-i pipe:0 \
-t "${capture_seconds}" \
-c:v copy \
-c:a pcm_s16le \
"${remote_capture}"
) &
capture_pid=$!
signal_capture_ready
wait "${capture_pid}"
elif [[ "${capture_mode}" == "pulse" ]]; then
printf 'using Pulse source: %s\n' "${pulse_source}" >&2
case "${remote_pulse_capture_tool}" in
ffmpeg)
case "${remote_pulse_video_mode}" in
copy)
run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \
-thread_queue_size 1024 \
"${video_args[@]}" \
-i "${resolved_video_device}" \
-thread_queue_size 1024 \
-f pulse \
-i "${pulse_source}" \
-map 0:v:0 \
-map 1:a:0 \
-t "${capture_seconds}" \
-c:v copy \
-c:a pcm_s16le \
"${remote_capture}"
;;
cfr)
run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \
-thread_queue_size 1024 \
"${video_args[@]}" \
-i "${resolved_video_device}" \
-thread_queue_size 1024 \
-f pulse \
-i "${pulse_source}" \
-map 0:v:0 \
-map 1:a:0 \
-t "${capture_seconds}" \
-vf "fps=${resolved_video_fps}" \
-c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \
-c:a pcm_s16le \
"${remote_capture}"
;;
*)
printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2
exit 64
;;
esac
;;
gst)
if [[ "${remote_pulse_audio_anchor_silence}" == "1" ]]; then
printf 'anchoring Pulse capture audio timeline with generated silence\n' >&2
fi
case "${remote_pulse_video_mode}" in
copy)
if [[ "${video_format}" != "mjpeg" && "${video_format}" != "MJPG" && -n "${video_format}" ]]; then
printf 'gst copy mode only supports mjpeg input, got %s\n' "${video_format}" >&2
exit 64
fi
if [[ "${remote_pulse_audio_anchor_silence}" == "1" ]]; then
run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \
gst-launch-1.0 -q -e \
matroskamux name=mux ! filesink location="${remote_capture}" \
v4l2src device="${resolved_video_device}" do-timestamp=true ! \
${gst_source_caps} ! \
queue ! mux. \
${gst_audio_mixer} ! \
audio/x-raw,rate=48000,channels=2 ! \
queue ! mux. \
audiotestsrc wave=silence is-live=true do-timestamp=true ! \
audio/x-raw,rate=48000,channels=2 ! \
queue ! amix. \
pulsesrc device="${pulse_source}" do-timestamp=true ! \
audio/x-raw,rate=48000,channels=2 ! \
audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \
queue ! amix.
else
run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \
gst-launch-1.0 -q -e \
matroskamux name=mux ! filesink location="${remote_capture}" \
v4l2src device="${resolved_video_device}" do-timestamp=true ! \
${gst_source_caps} ! \
queue ! mux. \
pulsesrc device="${pulse_source}" do-timestamp=true ! \
audio/x-raw,rate=48000,channels=2 ! \
audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \
queue ! mux.
fi
;;
cfr)
if [[ "${remote_pulse_audio_anchor_silence}" == "1" ]]; then
run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \
gst-launch-1.0 -q -e \
matroskamux name=mux ! filesink location="${remote_capture}" \
v4l2src device="${resolved_video_device}" do-timestamp=true ! \
${gst_source_caps} ! \
${gst_decode_chain} \
videoconvert ! videorate ! video/x-raw,framerate="${resolved_video_fps}"/1 ! \
x264enc tune=zerolatency speed-preset=ultrafast key-int-max=1 bitrate=5000 ! \
h264parse ! \
queue ! mux. \
${gst_audio_mixer} ! \
audio/x-raw,rate=48000,channels=2 ! \
queue ! mux. \
audiotestsrc wave=silence is-live=true do-timestamp=true ! \
audio/x-raw,rate=48000,channels=2 ! \
queue ! amix. \
pulsesrc device="${pulse_source}" do-timestamp=true ! \
audio/x-raw,rate=48000,channels=2 ! \
audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \
queue ! amix.
else
run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \
gst-launch-1.0 -q -e \
matroskamux name=mux ! filesink location="${remote_capture}" \
v4l2src device="${resolved_video_device}" do-timestamp=true ! \
${gst_source_caps} ! \
${gst_decode_chain} \
videoconvert ! videorate ! video/x-raw,framerate="${resolved_video_fps}"/1 ! \
x264enc tune=zerolatency speed-preset=ultrafast key-int-max=1 bitrate=5000 ! \
h264parse ! \
queue ! mux. \
pulsesrc device="${pulse_source}" do-timestamp=true ! \
audio/x-raw,rate=48000,channels=2 ! \
audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \
queue ! mux.
fi
;;
*)
printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2
exit 64
;;
esac
;;
*)
printf 'unsupported REMOTE_PULSE_CAPTURE_TOOL=%s\n' "${remote_pulse_capture_tool}" >&2
exit 64
;;
esac
else
case "${remote_pulse_video_mode}" in
copy)
run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \
-thread_queue_size 1024 \
"${video_args[@]}" \
-i "${resolved_video_device}" \
-thread_queue_size 1024 \
-f alsa -ac 2 -ar 48000 \
-i "${alsa_audio_dev}" \
-t "${capture_seconds}" \
-c:v copy \
-c:a pcm_s16le \
"${remote_capture}"
;;
cfr)
run_ffmpeg_capture ffmpeg -hide_banner -loglevel error -y \
-thread_queue_size 1024 \
"${video_args[@]}" \
-i "${resolved_video_device}" \
-thread_queue_size 1024 \
-f alsa -ac 2 -ar 48000 \
-i "${alsa_audio_dev}" \
-t "${capture_seconds}" \
-vf "fps=${resolved_video_fps}" \
-c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \
-c:a pcm_s16le \
"${remote_capture}"
;;
*)
printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2
exit 64
;;
esac
fi
REMOTE_CAPTURE_SCRIPT
capture_pid=$!
wait_for_capture_ready() {
local tries=100
local i=0
while (( i < tries )); do
if [[ -f "${LOCAL_CAPTURE_LOG}" ]] && grep -q "${CAPTURE_READY_MARKER}" "${LOCAL_CAPTURE_LOG}"; then
return 0
fi
if ! kill -0 "${capture_pid}" >/dev/null 2>&1; then
capture_status=0
wait "${capture_pid}" || capture_status=$?
echo "Tethys capture failed before the sync probe could start; see ${LOCAL_CAPTURE_LOG} for details." >&2
exit "${capture_status}"
fi
sleep 0.1
((i += 1))
done
echo "Timed out waiting for Tethys capture to become ready; see ${LOCAL_CAPTURE_LOG} for details." >&2
exit 90
}
wait_for_capture_ready
sleep "${LEAD_IN_SECONDS}"
echo "==> running server-generated UVC/UAC output-delay probe against ${RESOLVED_LESAVKA_SERVER_ADDR}"
probe_status=0
probe_timed_out=0
set +e
(
cd "${REPO_ROOT}"
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
timeout --signal=INT "${PROBE_TIMEOUT_SECONDS}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
output-delay-probe \
"${PROBE_DURATION_SECONDS}" \
"${PROBE_WARMUP_SECONDS}" \
"${PROBE_PULSE_PERIOD_MS}" \
"${PROBE_PULSE_WIDTH_MS}" \
"${PROBE_EVENT_WIDTH_CODES}" \
"${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US}" \
"${LESAVKA_OUTPUT_DELAY_PROBE_VIDEO_DELAY_US}"
) 2>&1 | tee "${LOCAL_SERVER_PROBE_REPLY}"
probe_status=${PIPESTATUS[0]}
set -e
if [[ "${probe_status}" -eq 124 ]]; then
probe_timed_out=1
fi
extract_server_timeline
capture_status=0
wait "${capture_pid}" || capture_status=$?
capture_v4l2_fault=0
if [[ -f "${LOCAL_CAPTURE_LOG}" ]] \
&& grep -q 'VIDIOC_QBUF): Bad file descriptor' "${LOCAL_CAPTURE_LOG}"; then
capture_v4l2_fault=1
fi
capture_streamon_timeout=0
if [[ -f "${LOCAL_CAPTURE_LOG}" ]] \
&& grep -q 'VIDIOC_STREAMON.*Connection timed out' "${LOCAL_CAPTURE_LOG}"; then
capture_streamon_timeout=1
fi
if ssh ${SSH_OPTS} "${TETHYS_HOST}" "test -f '${REMOTE_CAPTURE}'"; then
remote_fetch_capture="${REMOTE_CAPTURE}"
analysis_status=0
if [[ "${ANALYSIS_NORMALIZE}" != "0" ]]; then
remote_fetch_capture="${REMOTE_CAPTURE%.mkv}-analysis.mkv"
echo "==> normalizing remote capture to CFR for analysis"
normalize_status=0
ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \
"${REMOTE_CAPTURE}" \
"${remote_fetch_capture}" \
"${VIDEO_FPS}" \
"${ANALYSIS_SCALE_WIDTH}" <<'REMOTE_NORMALIZE_SCRIPT' || normalize_status=$?
set -euo pipefail
src=$1
dst=$2
fps=$3
scale_width=$4
ffmpeg -hide_banner -loglevel error -y \
-i "${src}" \
-vf "fps=${fps},scale='min(${scale_width},iw)':-2" \
-c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \
-c:a pcm_s16le \
"${dst}"
REMOTE_NORMALIZE_SCRIPT
if [[ "${normalize_status}" -ne 0 ]]; then
echo "remote CFR normalization failed; falling back to raw capture" >&2
remote_fetch_capture="${REMOTE_CAPTURE}"
fi
fi
if [[ "${REMOTE_ANALYZE}" != "0" ]]; then
if [[ "${REMOTE_ANALYZE_COPY}" != "0" ]]; then
echo "==> copying sync analyzer to ${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}"
scp ${SSH_OPTS} "${ANALYZE_BIN}" "${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}"
fi
analysis_window_arg="$(compute_analysis_window_arg)"
if [[ -n "${analysis_window_arg}" ]]; then
echo " ↪ analyzer timeline window: ${analysis_window_arg#--analysis-window-s }"
fi
echo "==> analyzing capture on ${TETHYS_HOST}"
set +e
ssh ${SSH_OPTS} "${TETHYS_HOST}" \
"chmod +x '${REMOTE_ANALYZE_BIN}' && '${REMOTE_ANALYZE_BIN}' '${remote_fetch_capture}' --json --event-width-codes '${PROBE_EVENT_WIDTH_CODES}' ${analysis_window_arg}" \
> "${LOCAL_ANALYSIS_JSON}"
analysis_status=$?
set -e
fi
if [[ "${FETCH_CAPTURE}" != "0" ]]; then
echo "==> fetching capture back to ${LOCAL_CAPTURE}"
scp ${SSH_OPTS} "${TETHYS_HOST}:${remote_fetch_capture}" "${LOCAL_CAPTURE}"
fi
if [[ "${analysis_status}" -ne 0 ]]; then
echo "remote analysis failed with status ${analysis_status}; capture preserved at ${LOCAL_CAPTURE}" >&2
exit "${analysis_status}"
fi
fi
if [[ "${probe_status}" -ne 0 ]]; then
if [[ "${probe_timed_out}" -eq 1 ]]; then
echo "server output-delay probe timed out after ${PROBE_TIMEOUT_SECONDS}s; this usually means one UVC/UAC output sink did not close cleanly." >&2
fi
echo "server output-delay probe failed with status ${probe_status}" >&2
[[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2
exit "${probe_status}"
fi
if [[ "${capture_status}" -ne 0 ]]; then
if [[ "${capture_streamon_timeout}" -eq 1 ]]; then
echo "Tethys capture timed out during VIDIOC_STREAMON; the UVC host opened before MJPEG frames reached the gadget." >&2
echo "Keep LEAD_IN_SECONDS=0 and restart lesavka-uvc/lesavka-server before retrying if the gadget is wedged from an earlier failed run." >&2
fi
if [[ "${capture_status}" -eq 141 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then
echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved analysis artifacts" >&2
elif [[ "${capture_status}" -eq 124 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then
echo "Tethys capture timed out after preserving analysis artifacts; accepting the run for analysis" >&2
else
echo "Tethys capture failed with status ${capture_status}" >&2
[[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2
exit "${capture_status}"
fi
fi
if [[ "${REMOTE_ANALYZE}" != "0" ]]; then
if [[ ! -f "${LOCAL_ANALYSIS_JSON}" ]]; then
echo "remote analysis did not produce ${LOCAL_ANALYSIS_JSON}" >&2
exit 92
fi
echo "==> remote analysis summary"
python - <<'PY' "${LOCAL_ANALYSIS_JSON}" "${LOCAL_REPORT_TXT}" "${LOCAL_EVENTS_CSV}"
import csv
import json
import pathlib
import sys
report = json.loads(pathlib.Path(sys.argv[1]).read_text())
verdict = report.get('verdict', {})
cal = report.get('calibration', {})
lines = [
f"A/V sync report for {sys.argv[1]}",
f"- verdict: {verdict.get('status', 'unknown')} ({'pass' if verdict.get('passed') else 'fail'})",
f"- verdict reason: {verdict.get('reason', '')}",
f"- p95 abs skew: {float(verdict.get('p95_abs_skew_ms', 0.0)):.1f} ms",
f"- video onsets: {report['video_event_count']}",
f"- audio onsets: {report['audio_event_count']}",
f"- paired pulses: {report['paired_event_count']}",
f"- activity start delta: {report.get('activity_start_delta_ms', 0.0):+.1f} ms (audio after video is positive)",
f"- first skew: {report['first_skew_ms']:+.1f} ms (audio after video is positive)",
f"- last skew: {report['last_skew_ms']:+.1f} ms",
f"- mean skew: {report['mean_skew_ms']:+.1f} ms",
f"- median skew: {report['median_skew_ms']:+.1f} ms",
f"- max abs skew: {report['max_abs_skew_ms']:.1f} ms",
f"- drift: {report['drift_ms']:+.1f} ms",
f"- calibration ready: {cal.get('ready')}",
f"- recommended audio offset adjust: {int(cal.get('recommended_audio_offset_adjust_us', 0)):+d} us",
f"- alternative video offset adjust: {int(cal.get('recommended_video_offset_adjust_us', 0)):+d} us",
f"- calibration note: {cal.get('note', '')}",
]
summary = "\n".join(lines) + "\n"
pathlib.Path(sys.argv[2]).write_text(summary)
with pathlib.Path(sys.argv[3]).open("w", newline="") as handle:
writer = csv.DictWriter(
handle,
fieldnames=[
"event_id",
"server_event_id",
"event_code",
"video_time_s",
"audio_time_s",
"skew_ms",
"confidence",
],
)
writer.writeheader()
for event in report.get("paired_events", []):
writer.writerow({
"event_id": event.get("event_id"),
"server_event_id": event.get("server_event_id"),
"event_code": event.get("event_code"),
"video_time_s": event.get("video_time_s"),
"audio_time_s": event.get("audio_time_s"),
"skew_ms": event.get("skew_ms"),
"confidence": event.get("confidence"),
})
print(summary, end="")
PY
else
if [[ ! -f "${LOCAL_CAPTURE}" ]]; then
echo "capture was not fetched and REMOTE_ANALYZE=0 left nothing local to analyze" >&2
exit 93
fi
echo "==> analyzing capture"
analysis_window_arg="$(compute_analysis_window_arg)"
if [[ -n "${analysis_window_arg}" ]]; then
echo " ↪ analyzer timeline window: ${analysis_window_arg#--analysis-window-s }"
fi
(
cd "${REPO_ROOT}"
# shellcheck disable=SC2086
"${ANALYZE_BIN}" "${LOCAL_CAPTURE}" --report-dir "${LOCAL_REPORT_DIR}" --event-width-codes "${PROBE_EVENT_WIDTH_CODES}" ${analysis_window_arg}
)
fi
write_output_delay_correlation
write_output_delay_calibration
maybe_apply_output_delay_calibration
maybe_run_output_delay_confirmation
enforce_sync_verdict
if [[ "${capture_v4l2_fault}" -eq 1 ]]; then
echo "warning: Tethys video capture reported VIDIOC_QBUF / Bad file descriptor; treat unstable skew or analyzer failures as host-capture suspect" >&2
fi
echo "==> done"
echo "artifact_dir: ${LOCAL_REPORT_DIR}"
if [[ -f "${LOCAL_CAPTURE}" ]]; then
echo "capture: ${LOCAL_CAPTURE}"
fi
if [[ -f "${LOCAL_ANALYSIS_JSON}" ]]; then
echo "report_json: ${LOCAL_ANALYSIS_JSON}"
fi
if [[ -f "${LOCAL_REPORT_TXT}" ]]; then
echo "report_txt: ${LOCAL_REPORT_TXT}"
fi
if [[ -f "${LOCAL_EVENTS_CSV}" ]]; then
echo "events_csv: ${LOCAL_EVENTS_CSV}"
fi
if [[ -f "${LOCAL_SERVER_TIMELINE_JSON}" ]]; then
echo "server_timeline_json: ${LOCAL_SERVER_TIMELINE_JSON}"
fi
if [[ -f "${LOCAL_CLOCK_ALIGNMENT_JSON}" ]]; then
echo "clock_alignment_json: ${LOCAL_CLOCK_ALIGNMENT_JSON}"
fi
if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}" ]]; then
echo "output_delay_correlation_json: ${LOCAL_OUTPUT_DELAY_CORRELATION_JSON}"
fi
if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}" ]]; then
echo "output_delay_correlation_csv: ${LOCAL_OUTPUT_DELAY_CORRELATION_CSV}"
fi
if [[ -f "${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}" ]]; then
echo "output_delay_correlation_txt: ${LOCAL_OUTPUT_DELAY_CORRELATION_TXT}"
fi
if [[ -f "${LOCAL_OUTPUT_DELAY_JSON}" ]]; then
echo "output_delay_calibration_json: ${LOCAL_OUTPUT_DELAY_JSON}"
fi
if [[ -f "${LOCAL_OUTPUT_DELAY_ENV}" ]]; then
echo "output_delay_calibration_env: ${LOCAL_OUTPUT_DELAY_ENV}"
fi
if [[ -f "${LOCAL_CAPTURE_LOG}" ]]; then
echo "capture_log: ${LOCAL_CAPTURE_LOG}"
fi