lesavka/scripts/manual/run_upstream_mirrored_av_sync.sh

811 lines
34 KiB
Bash
Raw Normal View History

#!/usr/bin/env bash
# scripts/manual/run_upstream_mirrored_av_sync.sh
# Manual: full mirrored upstream A/V sync probe.
#
# This probe intentionally uses the normal lesavka-client capture path as the
# sender. A local browser stimulus is captured by the real webcam and real mic,
# Lesavka relays those live captures to Theia, and a Tethys browser records the
# Lesavka UVC/UAC devices via getUserMedia before analysis.
set -euo pipefail
SCRIPT_DIR="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../.." >/dev/null 2>&1 && pwd)"
LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-auto}
LESAVKA_SERVER_HOST=${LESAVKA_SERVER_HOST:-theia}
LESAVKA_SERVER_SCHEME=${LESAVKA_SERVER_SCHEME:-https}
LESAVKA_SERVER_PORT=${LESAVKA_SERVER_PORT:-50051}
LESAVKA_TLS_DOMAIN=${LESAVKA_TLS_DOMAIN:-lesavka-server}
PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-20}
PROBE_WARMUP_SECONDS=${PROBE_WARMUP_SECONDS:-4}
PROBE_PULSE_PERIOD_MS=${PROBE_PULSE_PERIOD_MS:-1000}
PROBE_PULSE_WIDTH_MS=${PROBE_PULSE_WIDTH_MS:-120}
PROBE_MARKER_TICK_PERIOD=${PROBE_MARKER_TICK_PERIOD:-5}
PROBE_EVENT_WIDTH_CODES=${PROBE_EVENT_WIDTH_CODES:-1,2,1,3,2,4,1,1,3,1,4,2,1,2,3,4,1,3,2,2,4,1,2,4,3,1,1,4,2,3,1,2}
LESAVKA_SYNC_CALIBRATION_SEGMENTS_SET=${LESAVKA_SYNC_CALIBRATION_SEGMENTS+x}
LESAVKA_SYNC_ADAPTIVE_CALIBRATION=${LESAVKA_SYNC_ADAPTIVE_CALIBRATION:-0}
LESAVKA_SYNC_APPLY_CALIBRATION=${LESAVKA_SYNC_APPLY_CALIBRATION:-0}
LESAVKA_SYNC_SAVE_CALIBRATION=${LESAVKA_SYNC_SAVE_CALIBRATION:-0}
LESAVKA_SYNC_CALIBRATION_TARGET=${LESAVKA_SYNC_CALIBRATION_TARGET:-video}
LESAVKA_SYNC_CALIBRATION_SEGMENTS=${LESAVKA_SYNC_CALIBRATION_SEGMENTS:-1}
2026-05-02 14:21:33 -03:00
LESAVKA_SYNC_CONTINUOUS_BROWSER=${LESAVKA_SYNC_CONTINUOUS_BROWSER:-${LESAVKA_SYNC_ADAPTIVE_CALIBRATION}}
LESAVKA_SYNC_CONTINUE_ON_ANALYSIS_FAILURE=${LESAVKA_SYNC_CONTINUE_ON_ANALYSIS_FAILURE:-${LESAVKA_SYNC_ADAPTIVE_CALIBRATION}}
LESAVKA_SYNC_SEGMENT_SETTLE_SECONDS=${LESAVKA_SYNC_SEGMENT_SETTLE_SECONDS:-3}
LESAVKA_SYNC_PROVISIONAL_CALIBRATION=${LESAVKA_SYNC_PROVISIONAL_CALIBRATION:-${LESAVKA_SYNC_ADAPTIVE_CALIBRATION}}
LESAVKA_SYNC_PROVISIONAL_MIN_PAIRS=${LESAVKA_SYNC_PROVISIONAL_MIN_PAIRS:-3}
LESAVKA_SYNC_PROVISIONAL_MAX_P95_MS=${LESAVKA_SYNC_PROVISIONAL_MAX_P95_MS:-350}
LESAVKA_SYNC_PROVISIONAL_MAX_DRIFT_MS=${LESAVKA_SYNC_PROVISIONAL_MAX_DRIFT_MS:-250}
LESAVKA_SYNC_PROVISIONAL_GAIN=${LESAVKA_SYNC_PROVISIONAL_GAIN:-0.5}
LESAVKA_SYNC_PROVISIONAL_MAX_STEP_US=${LESAVKA_SYNC_PROVISIONAL_MAX_STEP_US:-150000}
STIMULUS_PORT=${STIMULUS_PORT:-18444}
STIMULUS_SETTLE_SECONDS=${STIMULUS_SETTLE_SECONDS:-10}
LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-"${REPO_ROOT}/tmp"}
SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=5"}
LOCAL_BROWSER=${LOCAL_BROWSER:-firefox}
mkdir -p "${LOCAL_OUTPUT_DIR}"
STAMP="$(date +%Y%m%d-%H%M%S)"
ARTIFACT_DIR="${LOCAL_OUTPUT_DIR%/}/lesavka-mirrored-av-sync-${STAMP}"
mkdir -p "${ARTIFACT_DIR}"
STIMULUS_STATUS="${ARTIFACT_DIR}/stimulus-status.json"
STIMULUS_PROFILE="${ARTIFACT_DIR}/stimulus-firefox-profile"
CLIENT_LOG="${ARTIFACT_DIR}/lesavka-client.log"
MEDIA_CONTROL="${ARTIFACT_DIR}/media.control"
RESOLVED_LESAVKA_SERVER_ADDR=""
SERVER_TUNNEL_PID=""
STIMULUS_PID=""
STIMULUS_BROWSER_PID=""
CLIENT_PID=""
if [[ "${LESAVKA_SYNC_ADAPTIVE_CALIBRATION}" == "1" && -z "${LESAVKA_SYNC_CALIBRATION_SEGMENTS_SET}" ]]; then
LESAVKA_SYNC_CALIBRATION_SEGMENTS=4
fi
if ! [[ "${LESAVKA_SYNC_CALIBRATION_SEGMENTS}" =~ ^[1-9][0-9]*$ ]]; then
echo "LESAVKA_SYNC_CALIBRATION_SEGMENTS must be a positive integer" >&2
exit 2
fi
cleanup() {
set +e
[[ -n "${CLIENT_PID}" ]] && kill "${CLIENT_PID}" >/dev/null 2>&1
[[ -n "${STIMULUS_BROWSER_PID}" ]] && kill "${STIMULUS_BROWSER_PID}" >/dev/null 2>&1
[[ -n "${STIMULUS_PID}" ]] && kill "${STIMULUS_PID}" >/dev/null 2>&1
[[ -n "${SERVER_TUNNEL_PID}" ]] && kill "${SERVER_TUNNEL_PID}" >/dev/null 2>&1
}
trap cleanup EXIT
pick_local_port() {
python3 - <<'PY'
import socket
with socket.socket() as s:
s.bind(('127.0.0.1', 0))
print(s.getsockname()[1])
PY
}
wait_for_url() {
local url=$1
local timeout_seconds=$2
local deadline=$(( $(date +%s) + timeout_seconds ))
until curl -fsS "${url}" >/dev/null 2>&1; do
if (( $(date +%s) >= deadline )); then
echo "Timed out waiting for ${url}" >&2
return 1
fi
sleep 0.2
done
}
wait_for_tcp() {
local host=$1
local port=$2
local timeout_seconds=$3
python3 - "$host" "$port" "$timeout_seconds" <<'PY'
import socket
import sys
import time
host = sys.argv[1]
port = int(sys.argv[2])
deadline = time.monotonic() + float(sys.argv[3])
last_error = None
while time.monotonic() < deadline:
try:
with socket.create_connection((host, port), timeout=0.5):
sys.exit(0)
except OSError as exc:
last_error = exc
time.sleep(0.2)
print(f"Timed out waiting for TCP {host}:{port}: {last_error}", file=sys.stderr)
sys.exit(1)
PY
}
wait_for_stimulus_page_ready() {
local timeout_seconds=$1
local deadline=$(( $(date +%s) + timeout_seconds ))
local status_json=""
until status_json="$(curl -fsS "http://127.0.0.1:${STIMULUS_PORT}/status" 2>/dev/null)"; do
if (( $(date +%s) >= deadline )); then
echo "Timed out waiting for local stimulus status endpoint" >&2
return 1
fi
sleep 0.2
done
while true; do
if STATUS_JSON="${status_json}" python3 - <<'PY'
import json
import os
import sys
status = json.loads(os.environ["STATUS_JSON"])
sys.exit(0 if status.get("ready") and status.get("page_message") != "booting" else 1)
PY
then
return 0
fi
if (( $(date +%s) >= deadline )); then
echo "local stimulus page did not become ready before timeout" >&2
echo "last stimulus status: ${status_json}" >&2
echo "stimulus server log: ${ARTIFACT_DIR}/stimulus-server.log" >&2
echo "stimulus browser log: ${ARTIFACT_DIR}/stimulus-browser.log" >&2
return 1
fi
sleep 0.5
status_json="$(curl -fsS "http://127.0.0.1:${STIMULUS_PORT}/status" 2>/dev/null || true)"
done
}
start_server_tunnel_if_needed() {
if [[ "${LESAVKA_SERVER_ADDR}" != "auto" ]]; then
RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_ADDR}"
return
fi
local port
port="$(pick_local_port)"
echo "==> opening SSH tunnel to ${LESAVKA_SERVER_HOST}:127.0.0.1:${LESAVKA_SERVER_PORT} on localhost:${port}"
ssh ${SSH_OPTS} -N \
-o ExitOnForwardFailure=yes \
-L "127.0.0.1:${port}:127.0.0.1:${LESAVKA_SERVER_PORT}" \
"${LESAVKA_SERVER_HOST}" >/tmp/lesavka-mirrored-sync-tunnel.log 2>&1 &
SERVER_TUNNEL_PID=$!
wait_for_tcp "127.0.0.1" "${port}" 5
RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${port}"
echo "==> resolved Lesavka server addr: ${RESOLVED_LESAVKA_SERVER_ADDR}"
echo " ↪ tunneled to ${LESAVKA_SERVER_HOST}:127.0.0.1:${LESAVKA_SERVER_PORT}"
}
2026-05-01 16:06:52 -03:00
print_lesavka_versions() {
echo "==> Lesavka versions under test"
if [[ ! -x "${REPO_ROOT}/target/debug/lesavka-relayctl" ]]; then
(cd "${REPO_ROOT}" && cargo build -p lesavka_client --bin lesavka-relayctl >/dev/null)
fi
local version_output
if ! version_output="$(
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
version 2>&1
)"; then
echo "failed to query Lesavka versions through ${RESOLVED_LESAVKA_SERVER_ADDR}" >&2
echo "${version_output}" >&2
return 1
fi
if ! grep -q "^client_version=" <<<"${version_output}"; then
echo "Lesavka version query did not report client_version=; refusing to run an unattributed probe" >&2
echo "${version_output}" >&2
return 1
fi
2026-05-01 20:25:56 -03:00
if grep -q "^client_full_version=" <<<"${version_output}"; then
echo "Lesavka version query reported a combined version+revision; refusing ambiguous probe attribution" >&2
echo "${version_output}" >&2
return 1
fi
if ! grep -q "^client_revision=" <<<"${version_output}"; then
echo "Lesavka version query did not report client_revision=; refusing to run an unattributed probe" >&2
echo "${version_output}" >&2
return 1
fi
2026-05-01 16:06:52 -03:00
if ! grep -q "^server_version=" <<<"${version_output}"; then
echo "Lesavka version query did not report server_version=; refusing to run an unattributed probe" >&2
echo "${version_output}" >&2
return 1
fi
2026-05-01 20:25:56 -03:00
if ! grep -q "^server_revision=" <<<"${version_output}"; then
echo "Lesavka version query did not report server_revision=; refusing to run an unattributed probe" >&2
echo "${version_output}" >&2
return 1
fi
2026-05-01 16:06:52 -03:00
while IFS= read -r line; do
[[ -n "${line}" ]] && echo "${line}"
done <<<"${version_output}"
}
print_upstream_sync_state() {
local label="$1"
local output_path="${2:-}"
echo "==> upstream sync planner state (${label})"
if [[ ! -x "${REPO_ROOT}/target/debug/lesavka-relayctl" ]]; then
(cd "${REPO_ROOT}" && cargo build -p lesavka_client --bin lesavka-relayctl >/dev/null)
fi
local sync_output
if ! sync_output="$(
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
upstream-sync 2>&1
)"; then
echo " ↪ planner query failed: ${sync_output}"
[[ -n "${output_path}" ]] && printf '%s\n' "${sync_output}" >"${output_path}"
return 0
fi
[[ -n "${output_path}" ]] && printf '%s\n' "${sync_output}" >"${output_path}"
while IFS= read -r line; do
[[ -n "${line}" ]] && echo "${line}"
done <<<"${sync_output}"
}
print_upstream_calibration_state() {
local label="$1"
local output_path="${2:-}"
echo "==> upstream calibration state (${label})"
if [[ ! -x "${REPO_ROOT}/target/debug/lesavka-relayctl" ]]; then
(cd "${REPO_ROOT}" && cargo build -p lesavka_client --bin lesavka-relayctl >/dev/null)
fi
local calibration_output
if ! calibration_output="$(
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
calibration 2>&1
)"; then
echo " ↪ calibration query failed: ${calibration_output}"
[[ -n "${output_path}" ]] && printf '%s\n' "${calibration_output}" >"${output_path}"
return 0
fi
[[ -n "${output_path}" ]] && printf '%s\n' "${calibration_output}" >"${output_path}"
while IFS= read -r line; do
[[ -n "${line}" ]] && echo "${line}"
done <<<"${calibration_output}"
}
latest_report_json() {
local report_root="${1:-${ARTIFACT_DIR}}"
find "${report_root}" -mindepth 2 -maxdepth 2 -type f -name report.json -printf '%T@ %p\n' 2>/dev/null \
| sort -n \
| tail -n 1 \
| cut -d' ' -f2-
}
maybe_apply_probe_calibration() {
local report_root="${1:-${ARTIFACT_DIR}}"
local label="${2:-mirrored run}"
local report_json
report_json="$(latest_report_json "${report_root}")"
echo "==> probe calibration decision (${label})"
if [[ -z "${report_json}" || ! -f "${report_json}" ]]; then
echo " ↪ report_json=missing"
echo " ↪ calibration apply skipped: analyzer report was not produced"
return 0
fi
local summary
if ! summary="$(python3 - "${report_json}" "${LESAVKA_SYNC_CALIBRATION_TARGET}" <<'PY'
import json
import math
import os
import shlex
import sys
report_path = sys.argv[1]
target = sys.argv[2].strip().lower()
with open(report_path, "r", encoding="utf-8") as handle:
report = json.load(handle)
cal = report.get("calibration", {})
verdict = report.get("verdict", {})
if target not in {"audio", "video"}:
target = "video"
def env_bool(name, default=False):
raw = os.environ.get(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def env_int(name, default):
try:
return int(os.environ.get(name, str(default)))
except ValueError:
return default
def env_float(name, default):
try:
return float(os.environ.get(name, str(default)))
except ValueError:
return default
def as_float(value, default=0.0):
try:
parsed = float(value)
except (TypeError, ValueError):
return default
return parsed if math.isfinite(parsed) else default
def clamp(value, limit):
limit = abs(int(limit))
return max(-limit, min(limit, int(round(value))))
ready = bool(cal.get("ready"))
paired_pulses = int(report.get("paired_event_count", 0) or 0)
median_skew_ms = as_float(report.get("median_skew_ms", 0.0))
p95_abs_skew_ms = as_float(verdict.get("p95_abs_skew_ms", 0.0))
drift_ms = as_float(report.get("drift_ms", 0.0))
provisional_enabled = env_bool("LESAVKA_SYNC_PROVISIONAL_CALIBRATION", False)
provisional_min_pairs = env_int("LESAVKA_SYNC_PROVISIONAL_MIN_PAIRS", 3)
provisional_max_p95_ms = env_float("LESAVKA_SYNC_PROVISIONAL_MAX_P95_MS", 350.0)
provisional_max_drift_ms = env_float("LESAVKA_SYNC_PROVISIONAL_MAX_DRIFT_MS", 250.0)
provisional_gain = env_float("LESAVKA_SYNC_PROVISIONAL_GAIN", 0.5)
provisional_max_step_us = env_int("LESAVKA_SYNC_PROVISIONAL_MAX_STEP_US", 150000)
ready_audio_recommendation = int(cal.get("recommended_audio_offset_adjust_us") or 0)
ready_video_recommendation = int(cal.get("recommended_video_offset_adjust_us") or 0)
provisional_audio_recommendation = int(round(-median_skew_ms * 1000.0))
provisional_video_recommendation = int(round(median_skew_ms * 1000.0))
audio_recommendation = ready_audio_recommendation
video_recommendation = ready_video_recommendation
audio_delta = audio_recommendation if target == "audio" else 0
video_delta = video_recommendation if target == "video" else 0
decision_mode = "ready" if ready else "refused"
decision_note = "analyzer marked this report calibration-ready" if ready else "analyzer did not mark this report calibration-ready"
if not ready and provisional_enabled:
refusal_reasons = []
if paired_pulses < provisional_min_pairs:
refusal_reasons.append(f"paired_pulses {paired_pulses} < {provisional_min_pairs}")
if p95_abs_skew_ms > provisional_max_p95_ms:
refusal_reasons.append(f"p95_abs_skew_ms {p95_abs_skew_ms:.1f} > {provisional_max_p95_ms:.1f}")
if abs(drift_ms) > provisional_max_drift_ms:
refusal_reasons.append(f"abs(drift_ms) {abs(drift_ms):.1f} > {provisional_max_drift_ms:.1f}")
if refusal_reasons:
decision_note = "provisional calibration refused: " + "; ".join(refusal_reasons)
else:
audio_recommendation = provisional_audio_recommendation
video_recommendation = provisional_video_recommendation
if target == "audio":
audio_delta = clamp(audio_recommendation * provisional_gain, provisional_max_step_us)
video_delta = 0
else:
audio_delta = 0
video_delta = clamp(video_recommendation * provisional_gain, provisional_max_step_us)
if audio_delta == 0 and video_delta == 0:
decision_note = "provisional calibration skipped: rounded correction was zero"
else:
decision_mode = "provisional"
decision_note = (
"bounded provisional correction from median skew; "
"not safe to save until a confirming ready report"
)
fields = {
"report_json": report_path,
"calibration_ready": str(ready).lower(),
"calibration_target": target,
"calibration_decision_mode": decision_mode,
"calibration_decision_note": decision_note,
"calibration_audio_recommendation_us": audio_recommendation,
"calibration_video_recommendation_us": video_recommendation,
"calibration_ready_audio_recommendation_us": ready_audio_recommendation,
"calibration_ready_video_recommendation_us": ready_video_recommendation,
"calibration_provisional_audio_recommendation_us": provisional_audio_recommendation,
"calibration_provisional_video_recommendation_us": provisional_video_recommendation,
"calibration_apply_audio_delta_us": audio_delta,
"calibration_apply_video_delta_us": video_delta,
"calibration_note": cal.get("note", ""),
"provisional_calibration_enabled": str(provisional_enabled).lower(),
"provisional_min_pairs": provisional_min_pairs,
"provisional_max_p95_ms": f"{provisional_max_p95_ms:.1f}",
"provisional_max_drift_ms": f"{provisional_max_drift_ms:.1f}",
"provisional_gain": f"{provisional_gain:.3f}",
"provisional_max_step_us": provisional_max_step_us,
"verdict_status": verdict.get("status", ""),
"paired_pulses": paired_pulses,
"median_skew_ms": f"{median_skew_ms:+.1f}",
"p95_abs_skew_ms": f"{p95_abs_skew_ms:.1f}",
"drift_ms": f"{drift_ms:+.1f}",
}
for key, value in fields.items():
print(f"{key}={shlex.quote(str(value))}")
PY
)"; then
echo " ↪ failed to parse ${report_json}; calibration apply skipped" >&2
return 0
fi
eval "${summary}"
printf '%s\n' "${summary}" >"${report_root}/calibration-decision.env"
echo " ↪ report_json=${report_json}"
echo " ↪ verdict_status=${verdict_status}"
echo " ↪ paired_pulses=${paired_pulses}"
echo " ↪ median_skew_ms=${median_skew_ms}"
echo " ↪ p95_abs_skew_ms=${p95_abs_skew_ms}"
echo " ↪ drift_ms=${drift_ms}"
echo " ↪ calibration_ready=${calibration_ready}"
echo " ↪ calibration_target=${calibration_target}"
echo " ↪ calibration_decision_mode=${calibration_decision_mode}"
echo " ↪ recommended_audio_offset_adjust_us=${calibration_audio_recommendation_us}"
echo " ↪ recommended_video_offset_adjust_us=${calibration_video_recommendation_us}"
echo " ↪ ready_audio_offset_adjust_us=${calibration_ready_audio_recommendation_us}"
echo " ↪ ready_video_offset_adjust_us=${calibration_ready_video_recommendation_us}"
echo " ↪ provisional_audio_offset_adjust_us=${calibration_provisional_audio_recommendation_us}"
echo " ↪ provisional_video_offset_adjust_us=${calibration_provisional_video_recommendation_us}"
echo " ↪ provisional_calibration_enabled=${provisional_calibration_enabled}"
echo " ↪ provisional_min_pairs=${provisional_min_pairs}"
echo " ↪ provisional_max_p95_ms=${provisional_max_p95_ms}"
echo " ↪ provisional_max_drift_ms=${provisional_max_drift_ms}"
echo " ↪ provisional_gain=${provisional_gain}"
echo " ↪ provisional_max_step_us=${provisional_max_step_us}"
echo " ↪ calibration_note=${calibration_note}"
echo " ↪ calibration_decision_note=${calibration_decision_note}"
if [[ "${LESAVKA_SYNC_APPLY_CALIBRATION}" != "1" ]]; then
echo " ↪ calibration apply disabled; set LESAVKA_SYNC_APPLY_CALIBRATION=1 to apply ready or provisional recommendations"
return 0
fi
if [[ "${calibration_decision_mode}" == "refused" ]]; then
echo " ↪ calibration apply refused: ${calibration_decision_note}"
return 0
fi
if [[ "${calibration_apply_audio_delta_us}" == "0" && "${calibration_apply_video_delta_us}" == "0" ]]; then
echo " ↪ calibration apply skipped: recommended delta is already zero"
return 0
fi
local note="mirrored probe ${STAMP} ${label}: mode=${calibration_decision_mode}, target=${calibration_target}, median=${median_skew_ms}ms, p95=${p95_abs_skew_ms}ms, pairs=${paired_pulses}"
echo " ↪ applying calibration: audio_delta_us=${calibration_apply_audio_delta_us}, video_delta_us=${calibration_apply_video_delta_us}"
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
calibrate "${calibration_apply_audio_delta_us}" "${calibration_apply_video_delta_us}" "${note}" \
| sed 's/^/ ↪ /'
if [[ "${LESAVKA_SYNC_SAVE_CALIBRATION}" == "1" && "${calibration_decision_mode}" == "ready" ]]; then
echo " ↪ saving active calibration as site default"
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
calibration-save-default \
| sed 's/^/ ↪ /'
elif [[ "${LESAVKA_SYNC_SAVE_CALIBRATION}" == "1" ]]; then
echo " ↪ provisional calibration not saved; require a confirming ready rerun before saving defaults"
else
echo " ↪ active calibration not saved; set LESAVKA_SYNC_SAVE_CALIBRATION=1 after a confirming rerun"
fi
}
start_local_stimulus() {
echo "==> starting local A/V stimulus server"
python3 "${REPO_ROOT}/scripts/manual/local_av_stimulus.py" \
--port "${STIMULUS_PORT}" \
--status "${STIMULUS_STATUS}" \
--duration-seconds "${PROBE_DURATION_SECONDS}" \
--warmup-seconds "${PROBE_WARMUP_SECONDS}" \
--pulse-period-ms "${PROBE_PULSE_PERIOD_MS}" \
--pulse-width-ms "${PROBE_PULSE_WIDTH_MS}" \
--marker-tick-period "${PROBE_MARKER_TICK_PERIOD}" \
--event-width-codes "${PROBE_EVENT_WIDTH_CODES}" \
>"${ARTIFACT_DIR}/stimulus-server.log" 2>&1 &
STIMULUS_PID=$!
wait_for_url "http://127.0.0.1:${STIMULUS_PORT}/status" 10
mkdir -p "${STIMULUS_PROFILE}"
cat >"${STIMULUS_PROFILE}/user.js" <<'PREFS'
user_pref("media.autoplay.default", 0);
user_pref("media.autoplay.blocking_policy", 0);
user_pref("toolkit.telemetry.reportingpolicy.firstRun", false);
user_pref("browser.shell.checkDefaultBrowser", false);
user_pref("browser.tabs.warnOnClose", false);
user_pref("browser.startup.page", 1);
user_pref("browser.aboutwelcome.enabled", false);
PREFS
printf 'user_pref("browser.startup.homepage", "http://127.0.0.1:%s/");\n' "${STIMULUS_PORT}" >>"${STIMULUS_PROFILE}/user.js"
echo "==> opening local stimulus browser"
"${LOCAL_BROWSER}" --new-instance --no-remote --profile "${STIMULUS_PROFILE}" \
"http://127.0.0.1:${STIMULUS_PORT}/" \
>"${ARTIFACT_DIR}/stimulus-browser.log" 2>&1 &
STIMULUS_BROWSER_PID=$!
wait_for_stimulus_page_ready 15
echo "==> position check"
echo " Point the real webcam at the stimulus window and keep the selected microphone hearing the tone."
echo " Waiting ${STIMULUS_SETTLE_SECONDS}s before starting the mirrored capture."
sleep "${STIMULUS_SETTLE_SECONDS}"
}
start_real_lesavka_client() {
echo "camera=1 microphone=1 audio=0 $(date +%s%N)" >"${MEDIA_CONTROL}"
echo "==> starting real headless lesavka-client sender"
(
cd "${REPO_ROOT}"
LESAVKA_HEADLESS=1 \
LESAVKA_SERVER_ADDR="${RESOLVED_LESAVKA_SERVER_ADDR}" \
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
LESAVKA_MEDIA_CONTROL="${MEDIA_CONTROL}" \
LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES="${LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES:-1}" \
LESAVKA_UPSTREAM_TIMING_TRACE="${LESAVKA_UPSTREAM_TIMING_TRACE:-1}" \
RUST_LOG="${RUST_LOG:-warn,lesavka_client::app=info,lesavka_client::input::camera=info,lesavka_client::input::microphone=info}" \
"${REPO_ROOT}/target/debug/lesavka-client" --no-launcher --server "${RESOLVED_LESAVKA_SERVER_ADDR}"
) >"${CLIENT_LOG}" 2>&1 &
CLIENT_PID=$!
sleep 4
if ! kill -0 "${CLIENT_PID}" >/dev/null 2>&1; then
echo "lesavka-client exited before mirrored probe could start; see ${CLIENT_LOG}" >&2
exit 1
fi
}
run_browser_capture_with_real_driver() {
local segment_label="$1"
local segment_output_dir="$2"
2026-05-02 14:21:33 -03:00
local segment_index="${3:-1}"
local record_seconds=$((PROBE_DURATION_SECONDS + 3))
local wait_seconds=$((PROBE_DURATION_SECONDS + 2))
local driver_command="curl -fsS -X POST http://127.0.0.1:${STIMULUS_PORT}/start >/dev/null; sleep ${wait_seconds}"
2026-05-02 14:21:33 -03:00
local reuse_browser_session=0
local analysis_required=1
2026-05-02 14:21:33 -03:00
if [[ "${LESAVKA_SYNC_CONTINUOUS_BROWSER}" == "1" && "${segment_index}" != "1" ]]; then
reuse_browser_session=1
fi
if [[ "${LESAVKA_SYNC_CONTINUE_ON_ANALYSIS_FAILURE}" == "1" ]]; then
analysis_required=0
fi
mkdir -p "${segment_output_dir}"
echo "==> starting Tethys browser consumer and mirrored driver (${segment_label})"
2026-05-02 14:21:33 -03:00
echo " ↪ browser_consumer_reuse_session=${reuse_browser_session}"
echo " ↪ browser_analysis_required=${analysis_required}"
BROWSER_RECORD_SECONDS="${record_seconds}" \
PROBE_DURATION_SECONDS="${PROBE_DURATION_SECONDS}" \
BROWSER_SYNC_DRIVER_COMMAND="${driver_command}" \
2026-05-02 14:21:33 -03:00
BROWSER_CONSUMER_REUSE_SESSION="${reuse_browser_session}" \
BROWSER_ANALYSIS_REQUIRED="${analysis_required}" \
SYNC_ANALYZE_EVENT_WIDTH_CODES="${PROBE_EVENT_WIDTH_CODES}" \
LOCAL_OUTPUT_DIR="${segment_output_dir}" \
LESAVKA_SERVER_ADDR="${RESOLVED_LESAVKA_SERVER_ADDR}" \
"${REPO_ROOT}/scripts/manual/run_upstream_browser_av_sync.sh"
}
run_mirrored_segments() {
local run_status=0
local segment
for segment in $(seq 1 "${LESAVKA_SYNC_CALIBRATION_SEGMENTS}"); do
local segment_label="segment ${segment}/${LESAVKA_SYNC_CALIBRATION_SEGMENTS}"
local segment_dir="${ARTIFACT_DIR}/segment-${segment}"
mkdir -p "${segment_dir}"
echo "==> mirrored calibration ${segment_label}"
print_upstream_calibration_state "before ${segment_label}" "${segment_dir}/calibration-before.env"
print_upstream_sync_state "before ${segment_label}" "${segment_dir}/planner-before.env"
2026-05-02 14:21:33 -03:00
if run_browser_capture_with_real_driver "${segment_label}" "${segment_dir}" "${segment}"; then
maybe_apply_probe_calibration "${segment_dir}" "${segment_label}"
print_upstream_sync_state "after ${segment_label}" "${segment_dir}/planner-after.env"
print_upstream_calibration_state "after ${segment_label}" "${segment_dir}/calibration-after.env"
else
run_status=$?
print_upstream_sync_state "after failed ${segment_label}" "${segment_dir}/planner-after-failed.env"
print_upstream_calibration_state "after failed ${segment_label}" "${segment_dir}/calibration-after-failed.env"
break
fi
if (( segment < LESAVKA_SYNC_CALIBRATION_SEGMENTS )); then
echo "==> settling ${LESAVKA_SYNC_SEGMENT_SETTLE_SECONDS}s before next calibration segment"
sleep "${LESAVKA_SYNC_SEGMENT_SETTLE_SECONDS}"
fi
done
return "${run_status}"
}
summarize_adaptive_probe_metrics() {
echo "==> summarizing segmented probe metrics"
python3 - "${ARTIFACT_DIR}" "${LESAVKA_SYNC_CALIBRATION_SEGMENTS}" <<'PY'
import csv
import json
import os
import sys
from pathlib import Path
root = Path(sys.argv[1])
segment_count = int(sys.argv[2])
def read_env(path):
values = {}
if not path.exists():
return values
for raw in path.read_text(encoding="utf-8").splitlines():
if not raw or "=" not in raw:
continue
key, value = raw.split("=", 1)
values[key] = value
return values
def latest_report(segment_dir):
reports = list(segment_dir.glob("*/report.json"))
if not reports:
return None
return max(reports, key=lambda path: path.stat().st_mtime)
def latest_analysis_failure(segment_dir):
failures = list(segment_dir.glob("*/analysis-failure.json"))
if not failures:
return None
return max(failures, key=lambda path: path.stat().st_mtime)
def as_float(value):
if value is None or value in {"", "pending"}:
return None
try:
return float(value)
except ValueError:
return None
def range_for(rows, key):
values = [row[key] for row in rows if isinstance(row.get(key), (int, float))]
if not values:
return None
return {
"min": round(min(values), 3),
"max": round(max(values), 3),
"mean": round(sum(values) / len(values), 3),
}
rows = []
for segment in range(1, segment_count + 1):
segment_dir = root / f"segment-{segment}"
report_path = latest_report(segment_dir)
report = {}
verdict = {}
calibration = {}
if report_path is not None:
report = json.loads(report_path.read_text(encoding="utf-8"))
verdict = report.get("verdict", {})
calibration = report.get("calibration", {})
failure_path = latest_analysis_failure(segment_dir)
failure = {}
if failure_path is not None:
failure = json.loads(failure_path.read_text(encoding="utf-8"))
planner_before = read_env(segment_dir / "planner-before.env")
planner_after = read_env(segment_dir / "planner-after.env")
calibration_before = read_env(segment_dir / "calibration-before.env")
calibration_after = read_env(segment_dir / "calibration-after.env")
decision = read_env(segment_dir / "calibration-decision.env")
row = {
"segment": segment,
"report_json": str(report_path) if report_path else "",
"analysis_failure_json": str(failure_path) if failure_path else "",
"analysis_failure_reason": failure.get("reason", ""),
"probe_status": verdict.get("status", failure.get("status", "missing")),
"probe_passed": bool(verdict.get("passed", False)),
"probe_p95_abs_skew_ms": as_float(str(verdict.get("p95_abs_skew_ms", ""))),
"probe_max_abs_skew_ms": as_float(str(verdict.get("max_abs_skew_ms", ""))),
"probe_median_skew_ms": as_float(str(report.get("median_skew_ms", ""))),
"probe_mean_skew_ms": as_float(str(report.get("mean_skew_ms", ""))),
"probe_drift_ms": as_float(str(report.get("drift_ms", ""))),
"probe_paired_pulses": report.get("paired_event_count", failure.get("paired_pulses", 0)),
"probe_activity_start_delta_ms": as_float(str(report.get("activity_start_delta_ms", failure.get("raw_activity_delta_ms", "")))),
"analysis_raw_first_video_activity_s": as_float(str(failure.get("raw_first_video_activity_s", ""))),
"analysis_raw_first_audio_activity_s": as_float(str(failure.get("raw_first_audio_activity_s", ""))),
"calibration_ready": bool(calibration.get("ready", False)),
"calibration_note": calibration.get("note", ""),
"decision_mode": decision.get("calibration_decision_mode", ""),
"decision_note": decision.get("calibration_decision_note", ""),
"decision_video_delta_us": as_float(decision.get("calibration_apply_video_delta_us")),
"decision_audio_delta_us": as_float(decision.get("calibration_apply_audio_delta_us")),
"decision_provisional_video_recommendation_us": as_float(decision.get("calibration_provisional_video_recommendation_us")),
"decision_provisional_audio_recommendation_us": as_float(decision.get("calibration_provisional_audio_recommendation_us")),
"planner_phase_before": planner_before.get("planner_phase", ""),
"planner_phase_after": planner_after.get("planner_phase", ""),
"planner_live_lag_ms_before": as_float(planner_before.get("planner_live_lag_ms")),
"planner_live_lag_ms_after": as_float(planner_after.get("planner_live_lag_ms")),
"planner_skew_ms_before": as_float(planner_before.get("planner_skew_ms")),
"planner_skew_ms_after": as_float(planner_after.get("planner_skew_ms")),
"planner_video_freezes_before": as_float(planner_before.get("planner_video_freezes")),
"planner_video_freezes_after": as_float(planner_after.get("planner_video_freezes")),
"planner_freshness_reanchors_before": as_float(planner_before.get("planner_freshness_reanchors")),
"planner_freshness_reanchors_after": as_float(planner_after.get("planner_freshness_reanchors")),
"active_audio_offset_us_before": as_float(calibration_before.get("calibration_active_audio_offset_us")),
"active_audio_offset_us_after": as_float(calibration_after.get("calibration_active_audio_offset_us")),
"active_video_offset_us_before": as_float(calibration_before.get("calibration_active_video_offset_us")),
"active_video_offset_us_after": as_float(calibration_after.get("calibration_active_video_offset_us")),
}
rows.append(row)
csv_path = root / "segment-metrics.csv"
jsonl_path = root / "segment-metrics.jsonl"
fieldnames = list(rows[0].keys()) if rows else ["segment"]
with csv_path.open("w", newline="", encoding="utf-8") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
with jsonl_path.open("w", encoding="utf-8") as handle:
for row in rows:
handle.write(json.dumps(row, sort_keys=True) + "\n")
good_rows = [row for row in rows if row.get("probe_passed")]
target_path = root / "blind-targets.json"
if good_rows:
target = {
"ready": True,
"source": "probe-passing segmented mirrored run",
"good_segments": [row["segment"] for row in good_rows],
"planner_live_lag_ms_after": range_for(good_rows, "planner_live_lag_ms_after"),
"planner_skew_ms_after": range_for(good_rows, "planner_skew_ms_after"),
"active_audio_offset_us_after": range_for(good_rows, "active_audio_offset_us_after"),
"active_video_offset_us_after": range_for(good_rows, "active_video_offset_us_after"),
"probe_p95_abs_skew_ms": range_for(good_rows, "probe_p95_abs_skew_ms"),
"probe_median_skew_ms": range_for(good_rows, "probe_median_skew_ms"),
}
else:
sortable = [
row for row in rows
if isinstance(row.get("probe_p95_abs_skew_ms"), (int, float))
]
best = min(sortable, key=lambda row: row["probe_p95_abs_skew_ms"], default=None)
target = {
"ready": False,
"reason": "no segment produced a passing probe verdict; refusing to invent blind targets",
"segments_seen": len(rows),
"best_segment": best["segment"] if best else None,
"best_probe_status": best["probe_status"] if best else "missing",
"best_probe_p95_abs_skew_ms": best["probe_p95_abs_skew_ms"] if best else None,
}
target_path.write_text(json.dumps(target, indent=2, sort_keys=True) + "\n", encoding="utf-8")
print(f" ↪ segment_metrics_csv={csv_path}")
print(f" ↪ segment_metrics_jsonl={jsonl_path}")
print(f" ↪ blind_targets_json={target_path}")
print(f" ↪ blind_targets_ready={str(bool(target.get('ready'))).lower()}")
PY
}
echo "==> prebuilding real client and analyzer"
(
cd "${REPO_ROOT}"
2026-05-01 16:06:52 -03:00
cargo build -p lesavka_client --bin lesavka-client --bin lesavka-sync-analyze --bin lesavka-relayctl >/dev/null
)
start_server_tunnel_if_needed
2026-05-01 16:06:52 -03:00
print_lesavka_versions
print_upstream_calibration_state "before mirrored run" "${ARTIFACT_DIR}/calibration-before.env"
print_upstream_sync_state "before mirrored run" "${ARTIFACT_DIR}/planner-before.env"
start_local_stimulus
start_real_lesavka_client
run_status=0
run_mirrored_segments || run_status=$?
print_upstream_sync_state "after mirrored run" "${ARTIFACT_DIR}/planner-after.env"
print_upstream_calibration_state "after mirrored run" "${ARTIFACT_DIR}/calibration-after.env"
summarize_adaptive_probe_metrics
if ((run_status != 0)); then
echo "==> mirrored probe failed"
echo "artifact_dir: ${ARTIFACT_DIR}"
echo "client_log: ${CLIENT_LOG}"
echo "stimulus_status: ${STIMULUS_STATUS}"
exit "${run_status}"
fi
echo "==> mirrored probe complete"
echo "artifact_dir: ${ARTIFACT_DIR}"
echo "client_log: ${CLIENT_LOG}"
echo "stimulus_status: ${STIMULUS_STATUS}"