lesavka/scripts/manual/browser_consumer_probe.py

315 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import http.server
import json
import socketserver
import threading
import time
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Serve a local browser consumer probe page")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=18443)
parser.add_argument("--output", default="/tmp/lesavka-browser-av-sync.webm")
parser.add_argument("--status", default="/tmp/lesavka-browser-av-sync-status.json")
parser.add_argument("--duration-seconds", type=int, default=15)
return parser.parse_args()
class ProbeState:
def __init__(self, output_path: Path, status_path: Path, duration_seconds: int) -> None:
self.output_path = output_path
self.status_path = status_path
self.duration_seconds = duration_seconds
self.lock = threading.Lock()
self.start_token = 0
self.status = {
"booted_at": time.time(),
"ready": False,
"recording": False,
"uploaded": False,
"last_error": None,
"selected_video": None,
"selected_audio": None,
"devices": [],
"page_message": "booting",
"last_update": time.time(),
}
self.write_status()
def write_status(self) -> None:
self.status_path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.status_path.with_suffix(".tmp")
tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8")
tmp.replace(self.status_path)
def update(self, payload: dict) -> None:
with self.lock:
self.status.update(payload)
self.status["last_update"] = time.time()
self.write_status()
def snapshot(self) -> dict:
with self.lock:
snap = dict(self.status)
snap["start_token"] = self.start_token
snap["duration_seconds"] = self.duration_seconds
return snap
def request_start(self) -> dict:
with self.lock:
self.start_token += 1
self.status.update({
"recording": False,
"uploaded": False,
"last_error": None,
"page_message": "start requested",
"start_requested_at": time.time(),
})
self.write_status()
return self.snapshot()
def store_upload(self, blob: bytes) -> dict:
with self.lock:
self.output_path.parent.mkdir(parents=True, exist_ok=True)
self.output_path.write_bytes(blob)
self.status.update({
"uploaded": True,
"recording": False,
"page_message": f"capture uploaded to {self.output_path}",
"upload_size": len(blob),
"uploaded_at": time.time(),
})
self.write_status()
return self.snapshot()
def page_html(duration_seconds: int) -> str:
duration_ms = duration_seconds * 1000
return f"""<!doctype html>
<html>
<head>
<meta charset=\"utf-8\">
<title>Lesavka Browser Sync Probe</title>
<style>
body {{ margin: 0; background: #0f131a; color: #e7edf6; font: 16px/1.4 system-ui, sans-serif; }}
.wrap {{ padding: 16px; display: grid; grid-template-columns: 1.1fr 0.9fr; gap: 16px; min-height: 100vh; box-sizing: border-box; }}
.panel {{ background: #1b212c; border: 1px solid #2a3342; border-radius: 12px; padding: 12px; box-sizing: border-box; }}
h1 {{ margin: 0 0 10px; font-size: 22px; }}
h2 {{ margin: 0 0 8px; font-size: 16px; }}
video {{ width: 100%; aspect-ratio: 16 / 9; background: black; border-radius: 10px; object-fit: contain; }}
pre {{ white-space: pre-wrap; word-break: break-word; margin: 0; font: 13px/1.35 ui-monospace, monospace; }}
.meter {{ height: 18px; background: #111722; border-radius: 999px; overflow: hidden; border: 1px solid #2a3342; }}
.bar {{ height: 100%; width: 0%; background: linear-gradient(90deg, #2fbf71, #ffd36e, #ff6b6b); transition: width 120ms linear; }}
.row {{ margin: 10px 0; }}
</style>
</head>
<body>
<div class=\"wrap\">
<div class=\"panel\">
<h1>Lesavka Browser Sync Probe</h1>
<video id=\"video\" autoplay playsinline muted></video>
<div class=\"row\"><div class=\"meter\"><div id=\"bar\" class=\"bar\"></div></div></div>
</div>
<div class=\"panel\">
<h2>Status</h2>
<pre id=\"status\">booting…</pre>
</div>
</div>
<script>
const videoEl = document.getElementById('video');
const statusEl = document.getElementById('status');
const barEl = document.getElementById('bar');
let stream = null;
let recorder = null;
let chunks = [];
let startToken = 0;
let analyser = null;
let audioCtx = null;
let recording = false;
let heartbeatCounter = 0;
function setStatus(lines) {{ statusEl.textContent = lines.join('\\n'); }}
async function postJson(path, payload) {{
await fetch(path, {{ method: 'POST', headers: {{ 'Content-Type': 'application/json' }}, body: JSON.stringify(payload) }});
}}
async function postBlob(path, blob) {{
await fetch(path, {{ method: 'POST', headers: {{ 'Content-Type': 'application/octet-stream' }}, body: blob }});
}}
function fmtDevice(d) {{ return `${{d.kind}}: ${{d.label || '(unlabeled)'}}`; }}
function meterLoop() {{
if (!analyser) return;
const data = new Uint8Array(analyser.fftSize);
const tick = () => {{
if (!analyser) return;
analyser.getByteTimeDomainData(data);
let peak = 0;
for (let i = 0; i < data.length; i++) peak = Math.max(peak, Math.abs(data[i] - 128));
const pct = Math.min(100, Math.round((peak / 128) * 100));
barEl.style.width = pct + '%';
requestAnimationFrame(tick);
}};
requestAnimationFrame(tick);
}}
function attachMeter(track) {{
if (!track) return;
try {{
audioCtx = new AudioContext();
const source = audioCtx.createMediaStreamSource(new MediaStream([track]));
analyser = audioCtx.createAnalyser();
analyser.fftSize = 2048;
source.connect(analyser);
meterLoop();
}} catch (err) {{
console.warn('meter setup failed', err);
}}
}}
async function initStream() {{
const lines = [];
try {{
await postJson('/status', {{ page_message: 'page loaded' }});
lines.push('requesting permission…');
await postJson('/status', {{ page_message: 'requesting permission' }});
const warm = await navigator.mediaDevices.getUserMedia({{ video: true, audio: true }});
warm.getTracks().forEach(track => track.stop());
await postJson('/status', {{ page_message: 'permission granted' }});
const devices = await navigator.mediaDevices.enumerateDevices();
await postJson('/status', {{ page_message: 'devices enumerated', devices: devices.map(fmtDevice) }});
const videoIn =
devices.find(d => d.kind === 'videoinput' && /(Lesavka Composite|Multifunction Composite Gadget)/i.test(d.label)) ||
devices.find(d => d.kind === 'videoinput' && /UGREEN/i.test(d.label)) ||
devices.find(d => d.kind === 'videoinput');
const audioIn = devices.find(d => d.kind === 'audioinput' && /(Multifunction Composite Gadget|Lesavka Composite)/i.test(d.label)) || devices.find(d => d.kind === 'audioinput');
stream = await navigator.mediaDevices.getUserMedia({{
video: videoIn ? {{ deviceId: {{ exact: videoIn.deviceId }} }} : true,
audio: audioIn ? {{ deviceId: {{ exact: audioIn.deviceId }} }} : true,
}});
await postJson('/status', {{ page_message: 'media stream opened' }});
videoEl.srcObject = stream;
attachMeter(stream.getAudioTracks()[0]);
const payload = {{
ready: true,
selected_video: videoIn ? fmtDevice(videoIn) : null,
selected_audio: audioIn ? fmtDevice(audioIn) : null,
devices: devices.map(fmtDevice),
page_message: 'ready for start',
last_error: null,
}};
await postJson('/status', payload);
lines.push('ready');
lines.push('video: ' + payload.selected_video);
lines.push('audio: ' + payload.selected_audio);
setStatus(lines);
}} catch (err) {{
const message = String(err && (err.stack || err));
await postJson('/status', {{ ready: false, last_error: message, page_message: 'permission or stream setup failed' }});
setStatus(['consumer status: FAIL', message]);
}}
}}
async function maybeStartRecording() {{
if (!stream || recording) return;
try {{
const response = await fetch('/command').then(r => r.json());
if (response.start_token === startToken) return;
startToken = response.start_token;
await postJson('/status', {{ page_message: 'start token observed', observed_start_token: startToken }});
recording = true;
chunks = [];
const preferredMime = 'video/webm;codecs=vp8,opus';
const options = MediaRecorder.isTypeSupported(preferredMime) ? {{ mimeType: preferredMime }} : undefined;
recorder = options ? new MediaRecorder(stream, options) : new MediaRecorder(stream);
await postJson('/status', {{ recording: true, uploaded: false, page_message: 'recording' }});
recorder.ondataavailable = event => {{ if (event.data && event.data.size > 0) chunks.push(event.data); }};
recorder.onstop = async () => {{
const blob = new Blob(chunks, {{ type: recorder.mimeType || 'video/webm' }});
await postBlob('/upload', blob);
recording = false;
}};
recorder.start(250);
setTimeout(() => recorder && recorder.state !== 'inactive' && recorder.stop(), response.duration_seconds * 1000);
}} catch (err) {{
recording = false;
const message = String(err && (err.stack || err));
await postJson('/status', {{ last_error: message, page_message: 'recording setup failed', recording: false }});
}}
}}
setInterval(() => {{
heartbeatCounter += 1;
void postJson('/status', {{ heartbeat_counter: heartbeatCounter, page_message: stream ? 'ready heartbeat' : 'boot heartbeat' }});
}}, 1000);
setInterval(() => {{ void maybeStartRecording(); }}, 250);
void initStream();
</script>
</body>
</html>"""
class ProbeHandler(http.server.BaseHTTPRequestHandler):
state: ProbeState
def _send(self, code: int, body: bytes, content_type: str = "application/json") -> None:
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None:
if self.path in ("/", "/index.html"):
snap = self.state.snapshot()
self.state.update({
"page_message": "html served",
"html_served_count": int(snap.get("html_served_count", 0)) + 1,
})
self._send(200, page_html(self.state.duration_seconds).encode("utf-8"), "text/html; charset=utf-8")
return
if self.path == "/command":
self._send(200, json.dumps(self.state.snapshot()).encode("utf-8"))
return
if self.path == "/status":
self._send(200, json.dumps(self.state.snapshot()).encode("utf-8"))
return
self._send(404, b"not found", "text/plain; charset=utf-8")
def do_POST(self) -> None:
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
if self.path == "/status":
payload = json.loads(body.decode("utf-8"))
self.state.update(payload)
self._send(200, json.dumps(self.state.snapshot()).encode("utf-8"))
return
if self.path == "/start":
self._send(200, json.dumps(self.state.request_start()).encode("utf-8"))
return
if self.path == "/upload":
self._send(200, json.dumps(self.state.store_upload(body)).encode("utf-8"))
return
self._send(404, b"not found", "text/plain; charset=utf-8")
def log_message(self, fmt: str, *args) -> None:
pass
class ReusableTcpServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
allow_reuse_address = True
daemon_threads = True
def main() -> None:
args = parse_args()
state = ProbeState(Path(args.output), Path(args.status), args.duration_seconds)
class Handler(ProbeHandler):
pass
Handler.state = state
with ReusableTcpServer((args.host, args.port), Handler) as httpd:
httpd.serve_forever()
if __name__ == "__main__":
main()