lesavka/scripts/manual/local_av_stimulus.py

260 lines
10 KiB
Python
Executable File

#!/usr/bin/env python3
"""Serve a local A/V stimulus page for the mirrored upstream sync probe."""
from __future__ import annotations
import argparse
import http.server
import json
import socketserver
import threading
import time
from pathlib import Path
DEFAULT_EVENT_WIDTH_CODES = "1,2,1,3,2,4,1,1,3,1,4,2,1,2,3,4,1,3,2,2,4,1,2,4,3,1,1,4,2,3,1,2"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Serve Lesavka local A/V sync stimulus")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=18444)
parser.add_argument("--status", default="/tmp/lesavka-local-av-stimulus-status.json")
parser.add_argument("--duration-seconds", type=int, default=20)
parser.add_argument("--warmup-seconds", type=int, default=4)
parser.add_argument("--pulse-period-ms", type=int, default=1000)
parser.add_argument("--pulse-width-ms", type=int, default=120)
parser.add_argument("--marker-tick-period", type=int, default=5)
parser.add_argument("--event-width-codes", default=DEFAULT_EVENT_WIDTH_CODES)
args = parser.parse_args()
args.event_width_codes = parse_event_width_codes(args.event_width_codes)
return args
def parse_event_width_codes(raw: str) -> list[int]:
codes = [int(part.strip()) for part in raw.split(",") if part.strip()]
if not codes:
raise SystemExit("--event-width-codes must contain at least one integer")
if any(code < 1 for code in codes):
raise SystemExit("--event-width-codes values must be positive")
return codes
class StimulusState:
def __init__(self, status_path: Path, args: argparse.Namespace) -> None:
self.status_path = status_path
self.args = args
self.lock = threading.Lock()
self.start_token = 0
self.status = {
"booted_at": time.time(),
"ready": False,
"started": False,
"completed": False,
"last_error": None,
"page_message": "booting",
"last_update": time.time(),
}
self.write_status()
def write_status(self) -> None:
self.status_path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.status_path.with_suffix(".tmp")
tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8")
tmp.replace(self.status_path)
def update(self, payload: dict) -> None:
with self.lock:
self.status.update(payload)
self.status["last_update"] = time.time()
self.write_status()
def snapshot(self) -> dict:
with self.lock:
snap = dict(self.status)
snap.update({
"start_token": self.start_token,
"duration_seconds": self.args.duration_seconds,
"warmup_seconds": self.args.warmup_seconds,
"pulse_period_ms": self.args.pulse_period_ms,
"pulse_width_ms": self.args.pulse_width_ms,
"marker_tick_period": self.args.marker_tick_period,
"event_width_codes": self.args.event_width_codes,
})
return snap
def request_start(self) -> dict:
with self.lock:
self.start_token += 1
self.status.update({
"started": False,
"completed": False,
"last_error": None,
"page_message": "start requested",
"start_requested_at": time.time(),
})
self.write_status()
return self.snapshot()
def page_html() -> str:
return """<!doctype html>
<html>
<head>
<meta charset=\"utf-8\">
<title>Lesavka Local A/V Stimulus</title>
<style>
html, body { margin: 0; width: 100%; height: 100%; overflow: hidden; background: #02040a; color: #eaf2ff; font: 20px/1.4 system-ui, sans-serif; }
#stage { position: fixed; inset: 0; display: grid; place-items: center; background: #02040a; transition: none; }
#stage.active { background: #f8fbff; color: #02040a; }
#card { max-width: 900px; padding: 28px; border-radius: 24px; background: rgba(16, 24, 40, 0.78); border: 1px solid rgba(255,255,255,0.18); text-align: center; }
#stage.active #card { background: rgba(255,255,255,0.84); border-color: rgba(0,0,0,0.18); }
#big { font-size: clamp(48px, 9vw, 140px); font-weight: 900; letter-spacing: 0.06em; }
#status { white-space: pre-wrap; font: 15px/1.45 ui-monospace, monospace; opacity: 0.86; }
</style>
</head>
<body>
<div id=\"stage\"><div id=\"card\"><div id=\"big\">LESAVKA</div><div id=\"status\">booting...</div></div></div>
<script>
const stage = document.getElementById('stage');
const statusEl = document.getElementById('status');
let startToken = 0;
let running = false;
let audioCtx = null;
let oscillator = null;
let gain = null;
let startedAt = 0;
function setStatus(message) { statusEl.textContent = message; }
async function postJson(path, payload) {
await fetch(path, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload) });
}
function ensureAudio() {
if (audioCtx) return;
audioCtx = new AudioContext();
oscillator = audioCtx.createOscillator();
oscillator.type = 'sine';
oscillator.frequency.value = 880;
gain = audioCtx.createGain();
gain.gain.value = 0;
oscillator.connect(gain).connect(audioCtx.destination);
oscillator.start();
}
function activeAt(elapsedMs, command) {
const warmupMs = command.warmup_seconds * 1000;
if (elapsedMs < warmupMs || elapsedMs > command.duration_seconds * 1000) return false;
const sinceWarmup = elapsedMs - warmupMs;
const pulseIndex = Math.floor(sinceWarmup / command.pulse_period_ms);
const offset = sinceWarmup % command.pulse_period_ms;
const codes = command.event_width_codes && command.event_width_codes.length ? command.event_width_codes : [1];
const widthCode = codes[pulseIndex % codes.length];
const width = Math.min(command.pulse_period_ms - 1, command.pulse_width_ms * widthCode);
return offset < width;
}
async function runStimulus(command) {
if (running) return;
running = true;
ensureAudio();
await audioCtx.resume();
startedAt = performance.now();
await postJson('/status', { ready: true, started: true, completed: false, page_message: 'stimulus running' });
const tick = async () => {
const elapsed = performance.now() - startedAt;
const active = activeAt(elapsed, command);
const warmupMs = command.warmup_seconds * 1000;
const pulseIndex = Math.max(0, Math.floor((elapsed - warmupMs) / command.pulse_period_ms));
const codes = command.event_width_codes && command.event_width_codes.length ? command.event_width_codes : [1];
const widthCode = codes[pulseIndex % codes.length];
stage.classList.toggle('active', active);
gain.gain.setTargetAtTime(active ? 0.28 : 0.0, audioCtx.currentTime, 0.005);
setStatus(`running\nelapsed=${(elapsed / 1000).toFixed(2)}s\nactive=${active}\nevent=${pulseIndex}\nwidth_code=${widthCode}\nPoint the real webcam at this window and keep the real microphone hearing the tone.`);
if (elapsed <= command.duration_seconds * 1000 + 500) {
requestAnimationFrame(tick);
} else {
stage.classList.remove('active');
gain.gain.setTargetAtTime(0, audioCtx.currentTime, 0.005);
running = false;
await postJson('/status', { completed: true, page_message: 'stimulus completed' });
setStatus('completed');
}
};
requestAnimationFrame(tick);
}
async function pollCommand() {
try {
const command = await fetch('/command').then(r => r.json());
if (command.start_token !== startToken) {
startToken = command.start_token;
await runStimulus(command);
}
} catch (err) {
await postJson('/status', { last_error: String(err), page_message: 'command poll failed' });
}
}
setInterval(pollCommand, 200);
setInterval(() => postJson('/status', { ready: true, page_message: running ? 'running heartbeat' : 'ready heartbeat' }), 1000);
postJson('/status', { ready: true, page_message: 'page ready' });
setStatus('ready\nPoint the real webcam at this window.\nKeep speakers audible to the selected microphone.');
</script>
</body>
</html>"""
class StimulusHandler(http.server.BaseHTTPRequestHandler):
state: StimulusState
def _send(self, code: int, body: bytes, content_type: str = "application/json") -> None:
self.send_response(code)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None:
if self.path in ("/", "/index.html"):
self.state.update({"page_message": "html served"})
self._send(200, page_html().encode("utf-8"), "text/html; charset=utf-8")
return
if self.path == "/command":
self._send(200, json.dumps(self.state.snapshot()).encode("utf-8"))
return
if self.path == "/status":
self._send(200, json.dumps(self.state.snapshot()).encode("utf-8"))
return
self._send(404, b"not found", "text/plain; charset=utf-8")
def do_POST(self) -> None:
length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(length)
if self.path == "/status":
payload = json.loads(body.decode("utf-8"))
self.state.update(payload)
self._send(200, json.dumps(self.state.snapshot()).encode("utf-8"))
return
if self.path == "/start":
self._send(200, json.dumps(self.state.request_start()).encode("utf-8"))
return
self._send(404, b"not found", "text/plain; charset=utf-8")
def log_message(self, fmt: str, *args) -> None:
pass
class ReusableTcpServer(socketserver.TCPServer):
allow_reuse_address = True
def main() -> None:
args = parse_args()
state = StimulusState(Path(args.status), args)
class Handler(StimulusHandler):
pass
Handler.state = state
with ReusableTcpServer((args.host, args.port), Handler) as httpd:
httpd.serve_forever()
if __name__ == "__main__":
main()