#!/usr/bin/env python3 import argparse import http.server import json import socketserver import threading import time import urllib.parse from pathlib import Path def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Serve a local browser consumer probe page") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", type=int, default=18443) parser.add_argument("--output", default="/tmp/lesavka-browser-av-sync.webm") parser.add_argument("--status", default="/tmp/lesavka-browser-av-sync-status.json") parser.add_argument("--duration-seconds", type=int, default=15) return parser.parse_args() class ProbeState: def __init__(self, output_path: Path, status_path: Path, duration_seconds: int) -> None: self.output_path = output_path self.status_path = status_path self.duration_seconds = duration_seconds self.lock = threading.RLock() self.start_token = 0 self.status = { "booted_at": time.time(), "ready": False, "recording": False, "uploaded": False, "last_error": None, "selected_video": None, "selected_audio": None, "devices": [], "page_message": "booting", "last_update": time.time(), "active_start_token": 0, "uploaded_start_token": None, "upload_count": 0, } self.write_status() def write_status(self) -> None: self.status_path.parent.mkdir(parents=True, exist_ok=True) tmp = self.status_path.with_suffix(".tmp") tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8") tmp.replace(self.status_path) def update(self, payload: dict) -> None: with self.lock: self.status.update(payload) self.status["last_update"] = time.time() self.write_status() def snapshot(self) -> dict: with self.lock: snap = dict(self.status) snap["start_token"] = self.start_token snap["duration_seconds"] = self.duration_seconds return snap def request_start(self) -> dict: with self.lock: self.start_token += 1 self.status.update({ "active_start_token": self.start_token, "uploaded_start_token": None, "recording": False, "uploaded": False, "last_error": None, "page_message": "start requested", "start_requested_at": time.time(), }) self.write_status() return self.snapshot() def store_upload(self, blob: bytes, start_token: int | None) -> dict: with self.lock: self.output_path.parent.mkdir(parents=True, exist_ok=True) self.output_path.write_bytes(blob) upload_count = int(self.status.get("upload_count") or 0) + 1 self.status.update({ "uploaded": True, "uploaded_start_token": start_token, "upload_count": upload_count, "recording": False, "page_message": f"capture uploaded to {self.output_path}", "upload_size": len(blob), "uploaded_at": time.time(), }) self.write_status() return self.snapshot() def page_html(duration_seconds: int) -> str: duration_ms = duration_seconds * 1000 return f""" Lesavka Browser Sync Probe

Lesavka Browser Sync Probe

Status

booting…
""" class ProbeHandler(http.server.BaseHTTPRequestHandler): state: ProbeState def _send(self, code: int, body: bytes, content_type: str = "application/json") -> None: self.send_response(code) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self) -> None: parsed = urllib.parse.urlparse(self.path) if parsed.path in ("/", "/index.html"): snap = self.state.snapshot() self.state.update({ "page_message": "html served", "html_served_count": int(snap.get("html_served_count", 0)) + 1, }) self._send(200, page_html(self.state.duration_seconds).encode("utf-8"), "text/html; charset=utf-8") return if parsed.path == "/command": self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return if parsed.path == "/status": self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return self._send(404, b"not found", "text/plain; charset=utf-8") def do_POST(self) -> None: parsed = urllib.parse.urlparse(self.path) length = int(self.headers.get("Content-Length", "0")) body = self.rfile.read(length) if parsed.path == "/status": payload = json.loads(body.decode("utf-8")) self.state.update(payload) self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) return if parsed.path == "/start": self._send(200, json.dumps(self.state.request_start()).encode("utf-8")) return if parsed.path == "/upload": query = urllib.parse.parse_qs(parsed.query) start_token = None if query.get("start_token"): try: start_token = int(query["start_token"][0]) except ValueError: start_token = None self._send(200, json.dumps(self.state.store_upload(body, start_token)).encode("utf-8")) return self._send(404, b"not found", "text/plain; charset=utf-8") def log_message(self, fmt: str, *args) -> None: pass class ReusableTcpServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True daemon_threads = True def main() -> None: args = parse_args() state = ProbeState(Path(args.output), Path(args.status), args.duration_seconds) class Handler(ProbeHandler): pass Handler.state = state with ReusableTcpServer((args.host, args.port), Handler) as httpd: httpd.serve_forever() if __name__ == "__main__": main()