#!/usr/bin/env python3 import argparse import http.server import json import socketserver import threading import time from pathlib import Path def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Serve a local browser consumer probe page") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=18443) parser.add_argument("--output", default="/tmp/lesavka-browser-av-sync.webm") parser.add_argument("--status", default="/tmp/lesavka-browser-av-sync-status.json") parser.add_argument("--duration-seconds", type=int, default=15) return parser.parse_args() class ProbeState: def __init__(self, output_path: Path, status_path: Path, duration_seconds: int) -> None: self.output_path = output_path self.status_path = status_path self.duration_seconds = duration_seconds self.lock = threading.RLock() self.start_token = 0 self.status = { "booted_at": time.time(), "ready": False, "recording": False, "uploaded": False, "last_error": None, "selected_video": None, "selected_audio": None, "devices": [], "page_message": "booting", "last_update": time.time(), } self.write_status() def write_status(self) -> None: self.status_path.parent.mkdir(parents=True, exist_ok=True) tmp = self.status_path.with_suffix(".tmp") tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8") tmp.replace(self.status_path) def update(self, payload: dict) -> None: with self.lock: self.status.update(payload) self.status["last_update"] = time.time() self.write_status() def snapshot(self) -> dict: with self.lock: snap = dict(self.status) snap["start_token"] = self.start_token snap["duration_seconds"] = self.duration_seconds return snap def request_start(self) -> dict: with self.lock: self.start_token += 1 self.status.update({ "recording": False, "uploaded": False, "last_error": None, "page_message": "start requested", "start_requested_at": time.time(), }) self.write_status() return self.snapshot() def store_upload(self, blob: bytes) -> dict: with self.lock: self.output_path.parent.mkdir(parents=True, exist_ok=True) self.output_path.write_bytes(blob) self.status.update({ "uploaded": True, "recording": False, "page_message": f"capture uploaded to {self.output_path}", "upload_size": len(blob), "uploaded_at": time.time(), }) self.write_status() return self.snapshot() def page_html(duration_seconds: int) -> str: duration_ms = duration_seconds * 1000 return f""" Lesavka Browser Sync Probe

Lesavka Browser Sync Probe

Status

booting…
""" class ProbeHandler(http.server.BaseHTTPRequestHandler): state: ProbeState def _send(self, code: int, body: bytes, content_type: str = "application/json") -> None: self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self) -> None: if self.path in ("/", "/index.html"): snap = self.state.snapshot() self.state.update({ "page_message": "html served", "html_served_count": int(snap.get("html_served_count", 0)) + 1, }) self._send(200, page_html(self.state.duration_seconds).encode("utf-8"), "text/html; charset=utf-8") return if self.path == "/command": self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return if self.path == "/status": self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return self._send(404, b"not found", "text/plain; charset=utf-8") def do_POST(self) -> None: length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length) if self.path == "/status": payload = json.loads(body.decode("utf-8")) self.state.update(payload) self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return if self.path == "/start": self._send(200, json.dumps(self.state.request_start()).encode("utf-8")) return if self.path == "/upload": self._send(200, json.dumps(self.state.store_upload(body)).encode("utf-8")) return self._send(404, b"not found", "text/plain; charset=utf-8") def log_message(self, fmt: str, *args) -> None: pass class ReusableTcpServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True daemon_threads = True def main() -> None: args = parse_args() state = ProbeState(Path(args.output), Path(args.status), args.duration_seconds) class Handler(ProbeHandler): pass Handler.state = state with ReusableTcpServer((args.host, args.port), Handler) as httpd: httpd.serve_forever() if __name__ == "__main__": main()