#!/usr/bin/env python3 """Serve a local A/V stimulus page for the mirrored upstream sync probe.""" from __future__ import annotations import argparse import http.server import json import socketserver import threading import time import urllib.parse from pathlib import Path DEFAULT_EVENT_WIDTH_CODES = "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Serve Lesavka local A/V sync stimulus") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=18444) parser.add_argument("--status", default="/tmp/lesavka-local-av-stimulus-status.json") parser.add_argument("--duration-seconds", type=int, default=20) parser.add_argument("--warmup-seconds", type=int, default=4) parser.add_argument("--pulse-period-ms", type=int, default=1000) parser.add_argument("--pulse-width-ms", type=int, default=120) parser.add_argument("--marker-tick-period", type=int, default=5) parser.add_argument("--audio-gain", type=float, default=0.55) parser.add_argument("--event-width-codes", default=DEFAULT_EVENT_WIDTH_CODES) args = parser.parse_args() args.event_width_codes = parse_event_width_codes(args.event_width_codes) args.audio_gain = max(0.0, min(1.0, args.audio_gain)) return args def parse_event_width_codes(raw: str) -> list[int]: codes = [int(part.strip()) for part in raw.split(",") if part.strip()] if not codes: raise SystemExit("--event-width-codes must contain at least one integer") if any(code < 1 for code in codes): raise SystemExit("--event-width-codes values must be positive") return codes class StimulusState: def __init__(self, status_path: Path, args: argparse.Namespace) -> None: self.status_path = status_path self.args = args self.lock = threading.RLock() self.start_token = 0 self.preview_token = 0 self.preview_seconds = 0 self.status = { "booted_at": time.time(), "ready": False, "started": False, "completed": False, "last_error": None, "page_message": "booting", "stimulus_mode": "idle", "audio_state": "not-created", "observed_start_token": None, "completed_start_token": None, "observed_preview_token": None, "completed_preview_token": None, "pulse_active": False, "pulse_index": 0, "pulse_width_code": 0, "last_update": time.time(), } self.write_status() def write_status(self) -> None: self.status_path.parent.mkdir(parents=True, exist_ok=True) tmp = self.status_path.with_suffix(".tmp") tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8") tmp.replace(self.status_path) def update(self, payload: dict) -> None: with self.lock: self.status.update(payload) self.status["last_update"] = time.time() self.write_status() def snapshot(self) -> dict: with self.lock: snap = dict(self.status) snap.update({ "start_token": self.start_token, "preview_token": self.preview_token, "preview_seconds": self.preview_seconds, "duration_seconds": self.args.duration_seconds, "warmup_seconds": self.args.warmup_seconds, "pulse_period_ms": self.args.pulse_period_ms, "pulse_width_ms": self.args.pulse_width_ms, "marker_tick_period": self.args.marker_tick_period, "audio_gain": self.args.audio_gain, "event_width_codes": self.args.event_width_codes, }) return snap def request_start(self) -> dict: with self.lock: self.start_token += 1 self.status.update({ "started": False, "completed": False, "last_error": None, "page_message": "start requested", "stimulus_mode": "start", "start_requested_at": time.time(), }) self.write_status() return self.snapshot() def request_preview(self, seconds: int) -> dict: with self.lock: self.preview_token += 1 self.preview_seconds = max(1, min(30, seconds)) self.status.update({ "started": False, "completed": False, "last_error": None, "page_message": "preview requested", "stimulus_mode": "preview", "preview_requested_at": time.time(), }) self.write_status() return self.snapshot() def page_html() -> str: return """