#!/usr/bin/env python3 import argparse import http.server import json import socketserver import threading import time from pathlib import Path def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Serve a local browser consumer probe page") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=18443) parser.add_argument("--output", default="/tmp/lesavka-browser-av-sync.webm") parser.add_argument("--status", default="/tmp/lesavka-browser-av-sync-status.json") parser.add_argument("--duration-seconds", type=int, default=15) return parser.parse_args() class ProbeState: def __init__(self, output_path: Path, status_path: Path, duration_seconds: int) -> None: self.output_path = output_path self.status_path = status_path self.duration_seconds = duration_seconds self.lock = threading.RLock() self.start_token = 0 self.status = { "booted_at": time.time(), "ready": False, "recording": False, "uploaded": False, "last_error": None, "selected_video": None, "selected_audio": None, "devices": [], "page_message": "booting", "last_update": time.time(), } self.write_status() def write_status(self) -> None: self.status_path.parent.mkdir(parents=True, exist_ok=True) tmp = self.status_path.with_suffix(".tmp") tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8") tmp.replace(self.status_path) def update(self, payload: dict) -> None: with self.lock: self.status.update(payload) self.status["last_update"] = time.time() self.write_status() def snapshot(self) -> dict: with self.lock: snap = dict(self.status) snap["start_token"] = self.start_token snap["duration_seconds"] = self.duration_seconds return snap def request_start(self) -> dict: with self.lock: self.start_token += 1 self.status.update({ "recording": False, "uploaded": False, "last_error": None, "page_message": "start requested", "start_requested_at": time.time(), }) self.write_status() return self.snapshot() def store_upload(self, blob: bytes) -> dict: with self.lock: self.output_path.parent.mkdir(parents=True, exist_ok=True) self.output_path.write_bytes(blob) self.status.update({ "uploaded": True, "recording": False, "page_message": f"capture uploaded to {self.output_path}", "upload_size": len(blob), "uploaded_at": time.time(), }) self.write_status() return self.snapshot() def page_html(duration_seconds: int) -> str: duration_ms = duration_seconds * 1000 return f"""
booting…