#!/usr/bin/env python3 """Serve a local A/V stimulus page for the mirrored upstream sync probe.""" from __future__ import annotations import argparse import http.server import json import socketserver import threading import time import urllib.parse from pathlib import Path DEFAULT_EVENT_WIDTH_CODES = "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Serve Lesavka local A/V sync stimulus") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=18444) parser.add_argument("--status", default="/tmp/lesavka-local-av-stimulus-status.json") parser.add_argument("--duration-seconds", type=int, default=20) parser.add_argument("--warmup-seconds", type=int, default=4) parser.add_argument("--pulse-period-ms", type=int, default=1000) parser.add_argument("--pulse-width-ms", type=int, default=120) parser.add_argument("--marker-tick-period", type=int, default=5) parser.add_argument("--audio-gain", type=float, default=0.55) parser.add_argument("--event-width-codes", default=DEFAULT_EVENT_WIDTH_CODES) args = parser.parse_args() args.event_width_codes = parse_event_width_codes(args.event_width_codes) args.audio_gain = max(0.0, min(1.0, args.audio_gain)) return args def parse_event_width_codes(raw: str) -> list[int]: codes = [int(part.strip()) for part in raw.split(",") if part.strip()] if not codes: raise SystemExit("--event-width-codes must contain at least one integer") if any(code < 1 for code in codes): raise SystemExit("--event-width-codes values must be positive") return codes class StimulusState: def __init__(self, status_path: Path, args: argparse.Namespace) -> None: self.status_path = status_path self.args = args self.lock = threading.RLock() self.start_token = 0 self.preview_token = 0 self.preview_seconds = 0 self.status = { "booted_at": time.time(), "ready": False, "started": False, "completed": False, "last_error": None, "page_message": "booting", "stimulus_mode": "idle", "audio_state": "not-created", "observed_start_token": None, "completed_start_token": None, "observed_preview_token": None, "completed_preview_token": None, "pulse_active": False, "pulse_index": 0, "pulse_width_code": 0, "last_update": time.time(), } self.write_status() def write_status(self) -> None: self.status_path.parent.mkdir(parents=True, exist_ok=True) tmp = self.status_path.with_suffix(".tmp") tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8") tmp.replace(self.status_path) def update(self, payload: dict) -> None: with self.lock: self.status.update(payload) self.status["last_update"] = time.time() self.write_status() def snapshot(self) -> dict: with self.lock: snap = dict(self.status) snap.update({ "start_token": self.start_token, "preview_token": self.preview_token, "preview_seconds": self.preview_seconds, "duration_seconds": self.args.duration_seconds, "warmup_seconds": self.args.warmup_seconds, "pulse_period_ms": self.args.pulse_period_ms, "pulse_width_ms": self.args.pulse_width_ms, "marker_tick_period": self.args.marker_tick_period, "audio_gain": self.args.audio_gain, "event_width_codes": self.args.event_width_codes, }) return snap def request_start(self) -> dict: with self.lock: self.start_token += 1 self.status.update({ "started": False, "completed": False, "last_error": None, "page_message": "start requested", "stimulus_mode": "start", "start_requested_at": time.time(), }) self.write_status() return self.snapshot() def request_preview(self, seconds: int) -> dict: with self.lock: self.preview_token += 1 self.preview_seconds = max(1, min(30, seconds)) self.status.update({ "started": False, "completed": False, "last_error": None, "page_message": "preview requested", "stimulus_mode": "preview", "preview_requested_at": time.time(), }) self.write_status() return self.snapshot() def page_html() -> str: return """ Lesavka Local A/V Stimulus
LESAVKA
booting...
""" class StimulusHandler(http.server.BaseHTTPRequestHandler): state: StimulusState def _send(self, code: int, body: bytes, content_type: str = "application/json") -> None: self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path in ("/", "/index.html"): self.state.update({"page_message": "html served"}) self._send(200, page_html().encode("utf-8"), "text/html; charset=utf-8") return if parsed.path == "/command": self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return if parsed.path == "/status": self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return self._send(404, b"not found", "text/plain; charset=utf-8") def do_POST(self) -> None: parsed = urllib.parse.urlparse(self.path) length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length) if parsed.path == "/status": payload = json.loads(body.decode("utf-8")) self.state.update(payload) self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return if parsed.path == "/start": self._send(200, json.dumps(self.state.request_start()).encode("utf-8")) return if parsed.path == "/preview": query = urllib.parse.parse_qs(parsed.query) seconds = 4 if query.get("seconds"): try: seconds = int(query["seconds"][0]) except ValueError: seconds = 4 self._send(200, json.dumps(self.state.request_preview(seconds)).encode("utf-8")) return self._send(404, b"not found", "text/plain; charset=utf-8") def log_message(self, fmt: str, *args) -> None: pass class ReusableTcpServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True daemon_threads = True def main() -> None: args = parse_args() state = StimulusState(Path(args.status), args) class Handler(StimulusHandler): pass Handler.state = state with ReusableTcpServer((args.host, args.port), Handler) as httpd: httpd.serve_forever() if __name__ == "__main__": main()