#!/usr/bin/env python3 """Serve a local A/V stimulus page for the mirrored upstream sync probe.""" from __future__ import annotations import argparse import http.server import json import socketserver import threading import time from pathlib import Path DEFAULT_EVENT_WIDTH_CODES = "1,2,1,3,2,4,1,1,3,1,4,2,1,2,3,4,1,3,2,2,4,1,2,4,3,1,1,4,2,3,1,2" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Serve Lesavka local A/V sync stimulus") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=18444) parser.add_argument("--status", default="/tmp/lesavka-local-av-stimulus-status.json") parser.add_argument("--duration-seconds", type=int, default=20) parser.add_argument("--warmup-seconds", type=int, default=4) parser.add_argument("--pulse-period-ms", type=int, default=1000) parser.add_argument("--pulse-width-ms", type=int, default=120) parser.add_argument("--marker-tick-period", type=int, default=5) parser.add_argument("--event-width-codes", default=DEFAULT_EVENT_WIDTH_CODES) args = parser.parse_args() args.event_width_codes = parse_event_width_codes(args.event_width_codes) return args def parse_event_width_codes(raw: str) -> list[int]: codes = [int(part.strip()) for part in raw.split(",") if part.strip()] if not codes: raise SystemExit("--event-width-codes must contain at least one integer") if any(code < 1 for code in codes): raise SystemExit("--event-width-codes values must be positive") return codes class StimulusState: def __init__(self, status_path: Path, args: argparse.Namespace) -> None: self.status_path = status_path self.args = args self.lock = threading.RLock() self.start_token = 0 self.status = { "booted_at": time.time(), "ready": False, "started": False, "completed": False, "last_error": None, "page_message": "booting", "last_update": time.time(), } self.write_status() def write_status(self) -> None: self.status_path.parent.mkdir(parents=True, exist_ok=True) tmp = self.status_path.with_suffix(".tmp") tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8") tmp.replace(self.status_path) def update(self, payload: dict) -> None: with self.lock: self.status.update(payload) self.status["last_update"] = time.time() self.write_status() def snapshot(self) -> dict: with self.lock: snap = dict(self.status) snap.update({ "start_token": self.start_token, "duration_seconds": self.args.duration_seconds, "warmup_seconds": self.args.warmup_seconds, "pulse_period_ms": self.args.pulse_period_ms, "pulse_width_ms": self.args.pulse_width_ms, "marker_tick_period": self.args.marker_tick_period, "event_width_codes": self.args.event_width_codes, }) return snap def request_start(self) -> dict: with self.lock: self.start_token += 1 self.status.update({ "started": False, "completed": False, "last_error": None, "page_message": "start requested", "start_requested_at": time.time(), }) self.write_status() return self.snapshot() def page_html() -> str: return """