lesavka/scripts/manual/run_client_uplink_virtual_camera_probe.py

377 lines
14 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""Feed synthetic media through the real Lesavka client capture/uplink path."""
from __future__ import annotations
import argparse
import importlib.util
import json
import os
import pathlib
import shlex
import subprocess
import threading
import time
from typing import Any
REPO_ROOT = pathlib.Path(__file__).resolve().parents[2]
RCT_PROBE = REPO_ROOT / "scripts/manual/run_synthetic_rct_uvc_probe.py"
def load_rct_probe_module() -> Any:
spec = importlib.util.spec_from_file_location("lesavka_synthetic_rct_probe", RCT_PROBE)
if spec is None or spec.loader is None:
raise RuntimeError(f"could not load {RCT_PROBE}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def parse_mode(mode: str) -> tuple[int, int, int]:
size, fps = mode.lower().split("@", 1)
width, height = size.split("x", 1)
return int(width), int(height), int(fps)
def timestamp() -> str:
return time.strftime("%Y%m%d-%H%M%S", time.gmtime())
def default_artifact_dir(mode: str) -> pathlib.Path:
safe = mode.replace("@", "-").replace("x", "x")
return pathlib.Path("artifacts/client-uplink-virtual-camera") / f"{safe}-{timestamp()}"
def require_tool(name: str) -> None:
if subprocess.run(["sh", "-lc", f"command -v {shlex.quote(name)} >/dev/null"], check=False).returncode != 0:
raise SystemExit(f"{name} is required for the client uplink virtual camera probe")
def start_process(command: list[str], log_path: pathlib.Path, env: dict[str, str] | None = None) -> subprocess.Popen[bytes]:
log_path.parent.mkdir(parents=True, exist_ok=True)
log = log_path.open("wb")
return subprocess.Popen(command, stdout=log, stderr=subprocess.STDOUT, env=env, cwd=REPO_ROOT)
def terminate_process(proc: subprocess.Popen[bytes] | None, timeout_s: float = 5.0) -> int | None:
if proc is None:
return None
if proc.poll() is not None:
return proc.returncode
proc.terminate()
try:
return proc.wait(timeout=timeout_s)
except subprocess.TimeoutExpired:
proc.kill()
return proc.wait(timeout=timeout_s)
def build_binaries(args: argparse.Namespace) -> None:
if args.no_build:
return
subprocess.run(
[
"cargo",
"build",
"-p",
"lesavka_client",
"--bin",
"lesavka-client",
"--bin",
"lesavka-uplink-dummy-server",
],
cwd=REPO_ROOT,
check=True,
)
def start_synthetic_frames_to_v4l2(
args: argparse.Namespace, probe: Any, width: int, height: int, fps: int
) -> tuple[subprocess.Popen[bytes], threading.Thread]:
if not args.device:
raise SystemExit("--device /dev/videoN is required for --source virtual")
command = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"warning",
"-f",
"rawvideo",
"-pix_fmt",
"gray",
"-video_size",
f"{width}x{height}",
"-framerate",
str(fps),
"-i",
"-",
"-vf",
"format=yuyv422",
"-f",
"v4l2",
args.device,
]
proc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=(args.artifact_dir / "virtual-camera-ffmpeg.stderr").open("wb"))
assert proc.stdin is not None
def writer() -> None:
frame_count = int((args.duration + args.warmup) * fps)
started = time.monotonic()
try:
for sequence in range(frame_count):
if proc.poll() is not None:
return
proc.stdin.write(probe.synthetic_gray(width, height, sequence))
proc.stdin.flush()
target = started + (sequence + 1) / max(1, fps)
delay = target - time.monotonic()
if delay > 0:
time.sleep(delay)
except BrokenPipeError:
return
finally:
try:
proc.stdin.close()
except Exception:
pass
thread = threading.Thread(target=writer, name="lesavka-virtual-camera-feeder", daemon=True)
thread.start()
return proc, thread
def decode_packets(args: argparse.Namespace, width: int, height: int) -> pathlib.Path | None:
packet_dir = args.artifact_dir / "dummy-server" / "video-packets"
packets = sorted(packet_dir.glob("*.bin"))
if not packets:
return None
stream_path = args.artifact_dir / ("client-uplink.mjpeg" if args.codec == "mjpeg" else "client-uplink.hevc")
with stream_path.open("wb") as stream:
for packet in packets:
stream.write(packet.read_bytes())
raw_path = args.artifact_dir / "client-uplink.gray"
demuxer = "mjpeg" if args.codec == "mjpeg" else "hevc"
command = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"warning",
"-f",
demuxer,
"-i",
str(stream_path),
"-an",
"-pix_fmt",
"gray",
"-f",
"rawvideo",
str(raw_path),
]
(args.artifact_dir / "decode-command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n")
subprocess.run(command, cwd=REPO_ROOT, check=False, stderr=(args.artifact_dir / "decode-ffmpeg.stderr").open("wb"))
if not raw_path.exists() or raw_path.stat().st_size < width * height:
return None
return raw_path
def analyze_decoded_frames(args: argparse.Namespace, probe: Any, raw_path: pathlib.Path, width: int, height: int, fps: int) -> dict[str, Any]:
frame_size = width * height
previous_seq: int | None = None
frame_index = 0
decoded = 0
suspicious = 0
visual = 0
reason_counts: dict[str, int] = {}
visual_reason_counts: dict[str, int] = {}
worst: list[dict[str, Any]] = []
analysis_args = argparse.Namespace(
x_step=args.x_step,
y_step=args.y_step,
sequence_window=args.sequence_window,
bands=args.bands,
mix_mae_threshold=args.mix_mae_threshold,
mix_improvement=args.mix_improvement,
mix_min_bands=args.mix_min_bands,
lower_mae_threshold=args.lower_mae_threshold,
lower_skew_ratio=args.lower_skew_ratio,
mae_threshold=args.mae_threshold,
slab_var=args.slab_var,
shift_threshold=args.shift_threshold,
shift_improvement=args.shift_improvement,
)
metrics_path = args.artifact_dir / "client-uplink-frame-metrics.jsonl"
with raw_path.open("rb") as raw, metrics_path.open("w") as metrics:
while True:
frame = raw.read(frame_size)
if len(frame) != frame_size:
break
frame_index += 1
result = probe.analyze_frame(frame, width, height, analysis_args, previous_seq)
comparison_seq = result.get("comparison_sequence")
if comparison_seq is not None:
previous_seq = int(comparison_seq)
if result.get("decoded_sequence") is not None:
decoded += 1
if result["suspicious"]:
suspicious += 1
for reason in result["reasons"]:
reason_counts[reason] = reason_counts.get(reason, 0) + 1
worst.append({"frame": frame_index, **result})
worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30]
if result["visual_suspicious"]:
visual += 1
for reason in result["visual_reasons"]:
visual_reason_counts[reason] = visual_reason_counts.get(reason, 0) + 1
metrics.write(json.dumps({"frame": frame_index, **result}, sort_keys=True) + "\n")
return {
"schema": "lesavka.client-uplink-virtual-camera.analysis.v1",
"mode": args.mode,
"codec": args.codec,
"frames": frame_index,
"fps_expected": fps,
"decoded_markers": decoded,
"suspicious": suspicious,
"visual_suspicious": visual,
"reasons": reason_counts,
"visual_reasons": visual_reason_counts,
"worst": worst,
}
def run(args: argparse.Namespace) -> int:
require_tool("ffmpeg")
build_binaries(args)
probe = load_rct_probe_module()
width, height, fps = parse_mode(args.mode)
args.artifact_dir.mkdir(parents=True, exist_ok=True)
server_binary = pathlib.Path(args.server_binary)
client_binary = pathlib.Path(args.client_binary)
if not server_binary.is_absolute():
server_binary = REPO_ROOT / server_binary
if not client_binary.is_absolute():
client_binary = REPO_ROOT / client_binary
dummy_dir = args.artifact_dir / "dummy-server"
listen = f"127.0.0.1:{args.port}"
server_command = [
str(server_binary),
"--listen",
listen,
"--artifact-dir",
str(dummy_dir),
"--width",
str(width),
"--height",
str(height),
"--fps",
str(fps),
"--codec",
args.codec,
"--bundled",
"true" if args.bundled else "false",
"--microphone",
"true" if args.bundled else "false",
]
(args.artifact_dir / "dummy-server-command.txt").write_text(" ".join(shlex.quote(part) for part in server_command) + "\n")
server_proc = start_process(server_command, args.artifact_dir / "dummy-server.log")
time.sleep(1.0)
if server_proc.poll() is not None:
raise SystemExit(f"dummy server exited early rc={server_proc.returncode}; see {args.artifact_dir / 'dummy-server.log'}")
feeder_proc: subprocess.Popen[bytes] | None = None
feeder_thread: threading.Thread | None = None
client_proc: subprocess.Popen[bytes] | None = None
try:
if args.source == "virtual":
feeder_proc, feeder_thread = start_synthetic_frames_to_v4l2(args, probe, width, height, fps)
time.sleep(min(1.0, max(0.0, args.warmup)))
camera_source = args.device
else:
camera_source = "test"
env = os.environ.copy()
env.update(
{
"LESAVKA_HEADLESS": "1",
"LESAVKA_SERVER_ADDR": f"http://127.0.0.1:{args.port}",
"LESAVKA_CAM_SOURCE": camera_source,
"LESAVKA_CAM_WIDTH": str(width),
"LESAVKA_CAM_HEIGHT": str(height),
"LESAVKA_CAM_FPS": str(fps),
"LESAVKA_CAM_CODEC": args.codec,
"LESAVKA_CAM_FORMAT": "raw",
"LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES": "1",
"LESAVKA_DEV_MODE": "1",
}
)
if not args.bundled:
env["LESAVKA_MIC_DISABLE"] = "1"
env["LESAVKA_LEGACY_SPLIT_UPLINK"] = "1"
client_command = [str(client_binary), "--server", f"http://127.0.0.1:{args.port}"]
(args.artifact_dir / "client-command.txt").write_text(" ".join(shlex.quote(part) for part in client_command) + "\n")
client_proc = start_process(client_command, args.artifact_dir / "client.log", env=env)
time.sleep(args.duration)
finally:
client_rc = terminate_process(client_proc)
feeder_rc = terminate_process(feeder_proc)
if feeder_thread is not None:
feeder_thread.join(timeout=2.0)
server_rc = terminate_process(server_proc)
raw_path = decode_packets(args, width, height)
summary: dict[str, Any] = {
"schema": "lesavka.client-uplink-virtual-camera.run.v1",
"artifact_dir": str(args.artifact_dir),
"source": args.source,
"device": args.device,
"mode": args.mode,
"codec": args.codec,
"bundled": args.bundled,
"client_rc": client_rc,
"feeder_rc": feeder_rc,
"server_rc": server_rc,
"decoded_raw": str(raw_path) if raw_path else None,
}
if raw_path is not None:
summary["analysis"] = analyze_decoded_frames(args, probe, raw_path, width, height, fps)
(args.artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n")
print(json.dumps(summary, indent=2, sort_keys=True))
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--source", choices=["virtual", "client-test-pattern"], default="virtual")
parser.add_argument("--device", help="v4l2loopback device such as /dev/video10 for --source virtual")
parser.add_argument("--mode", default="1280x720@30")
parser.add_argument("--duration", type=float, default=20.0)
parser.add_argument("--warmup", type=float, default=2.0)
parser.add_argument("--codec", choices=["mjpeg", "hevc"], default="mjpeg")
parser.add_argument("--bundled", action="store_true", help="exercise StreamWebcamMedia instead of legacy StreamCamera")
parser.add_argument("--port", type=int, default=50051)
parser.add_argument("--artifact-dir", type=pathlib.Path)
parser.add_argument("--client-binary", default="target/debug/lesavka-client")
parser.add_argument("--server-binary", default="target/debug/lesavka-uplink-dummy-server")
parser.add_argument("--no-build", action="store_true")
parser.add_argument("--x-step", type=int, default=64)
parser.add_argument("--y-step", type=int, default=16)
parser.add_argument("--bands", type=int, default=24)
parser.add_argument("--mae-threshold", type=float, default=10.0)
parser.add_argument("--lower-mae-threshold", type=float, default=16.0)
parser.add_argument("--lower-skew-ratio", type=float, default=1.5)
parser.add_argument("--slab-var", type=float, default=20.0)
parser.add_argument("--shift-threshold", type=float, default=12.0)
parser.add_argument("--shift-improvement", type=float, default=1.15)
parser.add_argument("--sequence-window", type=int, default=3)
parser.add_argument("--mix-mae-threshold", type=float, default=1.5)
parser.add_argument("--mix-improvement", type=float, default=1.8)
parser.add_argument("--mix-min-bands", type=int, default=2)
args = parser.parse_args()
if args.artifact_dir is None:
args.artifact_dir = default_artifact_dir(args.mode)
if args.source == "virtual" and not args.device:
raise SystemExit("--device is required for --source virtual; create one with v4l2loopback first")
return run(args)
if __name__ == "__main__":
raise SystemExit(main())