#!/usr/bin/env python3 """Feed synthetic media through the real Lesavka client capture/uplink path.""" from __future__ import annotations import argparse import importlib.util import json import os import pathlib import shlex import subprocess import threading import time from typing import Any REPO_ROOT = pathlib.Path(__file__).resolve().parents[2] RCT_PROBE = REPO_ROOT / "scripts/manual/run_synthetic_rct_uvc_probe.py" def load_rct_probe_module() -> Any: spec = importlib.util.spec_from_file_location("lesavka_synthetic_rct_probe", RCT_PROBE) if spec is None or spec.loader is None: raise RuntimeError(f"could not load {RCT_PROBE}") module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module def parse_mode(mode: str) -> tuple[int, int, int]: size, fps = mode.lower().split("@", 1) width, height = size.split("x", 1) return int(width), int(height), int(fps) def timestamp() -> str: return time.strftime("%Y%m%d-%H%M%S", time.gmtime()) def default_artifact_dir(mode: str) -> pathlib.Path: safe = mode.replace("@", "-").replace("x", "x") return pathlib.Path("artifacts/client-uplink-virtual-camera") / f"{safe}-{timestamp()}" def require_tool(name: str) -> None: if subprocess.run(["sh", "-lc", f"command -v {shlex.quote(name)} >/dev/null"], check=False).returncode != 0: raise SystemExit(f"{name} is required for the client uplink virtual camera probe") def start_process(command: list[str], log_path: pathlib.Path, env: dict[str, str] | None = None) -> subprocess.Popen[bytes]: log_path.parent.mkdir(parents=True, exist_ok=True) log = log_path.open("wb") return subprocess.Popen(command, stdout=log, stderr=subprocess.STDOUT, env=env, cwd=REPO_ROOT) def terminate_process(proc: subprocess.Popen[bytes] | None, timeout_s: float = 5.0) -> int | None: if proc is None: return None if proc.poll() is not None: return proc.returncode proc.terminate() try: return proc.wait(timeout=timeout_s) except subprocess.TimeoutExpired: proc.kill() return proc.wait(timeout=timeout_s) def build_binaries(args: argparse.Namespace) -> None: if args.no_build: return subprocess.run( [ "cargo", "build", "-p", "lesavka_client", "--bin", "lesavka-client", "--bin", "lesavka-uplink-dummy-server", ], cwd=REPO_ROOT, check=True, ) def start_synthetic_frames_to_v4l2( args: argparse.Namespace, probe: Any, width: int, height: int, fps: int ) -> tuple[subprocess.Popen[bytes], threading.Thread]: if not args.device: raise SystemExit("--device /dev/videoN is required for --source virtual") command = [ "ffmpeg", "-hide_banner", "-loglevel", "warning", "-f", "rawvideo", "-pix_fmt", "gray", "-video_size", f"{width}x{height}", "-framerate", str(fps), "-i", "-", "-vf", "format=yuyv422", "-f", "v4l2", args.device, ] proc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=(args.artifact_dir / "virtual-camera-ffmpeg.stderr").open("wb")) assert proc.stdin is not None def writer() -> None: frame_count = int((args.duration + args.warmup) * fps) started = time.monotonic() try: for sequence in range(frame_count): if proc.poll() is not None: return proc.stdin.write(probe.synthetic_gray(width, height, sequence)) proc.stdin.flush() target = started + (sequence + 1) / max(1, fps) delay = target - time.monotonic() if delay > 0: time.sleep(delay) except BrokenPipeError: return finally: try: proc.stdin.close() except Exception: pass thread = threading.Thread(target=writer, name="lesavka-virtual-camera-feeder", daemon=True) thread.start() return proc, thread def decode_packets(args: argparse.Namespace, width: int, height: int) -> pathlib.Path | None: packet_dir = args.artifact_dir / "dummy-server" / "video-packets" packets = sorted(packet_dir.glob("*.bin")) if not packets: return None stream_path = args.artifact_dir / ("client-uplink.mjpeg" if args.codec == "mjpeg" else "client-uplink.hevc") with stream_path.open("wb") as stream: for packet in packets: stream.write(packet.read_bytes()) raw_path = args.artifact_dir / "client-uplink.gray" demuxer = "mjpeg" if args.codec == "mjpeg" else "hevc" command = [ "ffmpeg", "-hide_banner", "-loglevel", "warning", "-f", demuxer, "-i", str(stream_path), "-an", "-pix_fmt", "gray", "-f", "rawvideo", str(raw_path), ] (args.artifact_dir / "decode-command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") subprocess.run(command, cwd=REPO_ROOT, check=False, stderr=(args.artifact_dir / "decode-ffmpeg.stderr").open("wb")) if not raw_path.exists() or raw_path.stat().st_size < width * height: return None return raw_path def analyze_decoded_frames(args: argparse.Namespace, probe: Any, raw_path: pathlib.Path, width: int, height: int, fps: int) -> dict[str, Any]: frame_size = width * height previous_seq: int | None = None frame_index = 0 decoded = 0 suspicious = 0 visual = 0 reason_counts: dict[str, int] = {} visual_reason_counts: dict[str, int] = {} worst: list[dict[str, Any]] = [] analysis_args = argparse.Namespace( x_step=args.x_step, y_step=args.y_step, sequence_window=args.sequence_window, bands=args.bands, mix_mae_threshold=args.mix_mae_threshold, mix_improvement=args.mix_improvement, mix_min_bands=args.mix_min_bands, lower_mae_threshold=args.lower_mae_threshold, lower_skew_ratio=args.lower_skew_ratio, mae_threshold=args.mae_threshold, slab_var=args.slab_var, shift_threshold=args.shift_threshold, shift_improvement=args.shift_improvement, ) metrics_path = args.artifact_dir / "client-uplink-frame-metrics.jsonl" with raw_path.open("rb") as raw, metrics_path.open("w") as metrics: while True: frame = raw.read(frame_size) if len(frame) != frame_size: break frame_index += 1 result = probe.analyze_frame(frame, width, height, analysis_args, previous_seq) comparison_seq = result.get("comparison_sequence") if comparison_seq is not None: previous_seq = int(comparison_seq) if result.get("decoded_sequence") is not None: decoded += 1 if result["suspicious"]: suspicious += 1 for reason in result["reasons"]: reason_counts[reason] = reason_counts.get(reason, 0) + 1 worst.append({"frame": frame_index, **result}) worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30] if result["visual_suspicious"]: visual += 1 for reason in result["visual_reasons"]: visual_reason_counts[reason] = visual_reason_counts.get(reason, 0) + 1 metrics.write(json.dumps({"frame": frame_index, **result}, sort_keys=True) + "\n") return { "schema": "lesavka.client-uplink-virtual-camera.analysis.v1", "mode": args.mode, "codec": args.codec, "frames": frame_index, "fps_expected": fps, "decoded_markers": decoded, "suspicious": suspicious, "visual_suspicious": visual, "reasons": reason_counts, "visual_reasons": visual_reason_counts, "worst": worst, } def run(args: argparse.Namespace) -> int: require_tool("ffmpeg") build_binaries(args) probe = load_rct_probe_module() width, height, fps = parse_mode(args.mode) args.artifact_dir.mkdir(parents=True, exist_ok=True) server_binary = pathlib.Path(args.server_binary) client_binary = pathlib.Path(args.client_binary) if not server_binary.is_absolute(): server_binary = REPO_ROOT / server_binary if not client_binary.is_absolute(): client_binary = REPO_ROOT / client_binary dummy_dir = args.artifact_dir / "dummy-server" listen = f"127.0.0.1:{args.port}" server_command = [ str(server_binary), "--listen", listen, "--artifact-dir", str(dummy_dir), "--width", str(width), "--height", str(height), "--fps", str(fps), "--codec", args.codec, "--bundled", "true" if args.bundled else "false", "--microphone", "true" if args.bundled else "false", ] (args.artifact_dir / "dummy-server-command.txt").write_text(" ".join(shlex.quote(part) for part in server_command) + "\n") server_proc = start_process(server_command, args.artifact_dir / "dummy-server.log") time.sleep(1.0) if server_proc.poll() is not None: raise SystemExit(f"dummy server exited early rc={server_proc.returncode}; see {args.artifact_dir / 'dummy-server.log'}") feeder_proc: subprocess.Popen[bytes] | None = None feeder_thread: threading.Thread | None = None client_proc: subprocess.Popen[bytes] | None = None try: if args.source == "virtual": feeder_proc, feeder_thread = start_synthetic_frames_to_v4l2(args, probe, width, height, fps) time.sleep(min(1.0, max(0.0, args.warmup))) camera_source = args.device else: camera_source = "test" env = os.environ.copy() env.update( { "LESAVKA_HEADLESS": "1", "LESAVKA_SERVER_ADDR": f"http://127.0.0.1:{args.port}", "LESAVKA_CAM_SOURCE": camera_source, "LESAVKA_CAM_WIDTH": str(width), "LESAVKA_CAM_HEIGHT": str(height), "LESAVKA_CAM_FPS": str(fps), "LESAVKA_CAM_CODEC": args.codec, "LESAVKA_CAM_FORMAT": "raw", "LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES": "1", "LESAVKA_DEV_MODE": "1", } ) if not args.bundled: env["LESAVKA_MIC_DISABLE"] = "1" env["LESAVKA_LEGACY_SPLIT_UPLINK"] = "1" client_command = [str(client_binary), "--server", f"http://127.0.0.1:{args.port}"] (args.artifact_dir / "client-command.txt").write_text(" ".join(shlex.quote(part) for part in client_command) + "\n") client_proc = start_process(client_command, args.artifact_dir / "client.log", env=env) time.sleep(args.duration) finally: client_rc = terminate_process(client_proc) feeder_rc = terminate_process(feeder_proc) if feeder_thread is not None: feeder_thread.join(timeout=2.0) server_rc = terminate_process(server_proc) raw_path = decode_packets(args, width, height) summary: dict[str, Any] = { "schema": "lesavka.client-uplink-virtual-camera.run.v1", "artifact_dir": str(args.artifact_dir), "source": args.source, "device": args.device, "mode": args.mode, "codec": args.codec, "bundled": args.bundled, "client_rc": client_rc, "feeder_rc": feeder_rc, "server_rc": server_rc, "decoded_raw": str(raw_path) if raw_path else None, } if raw_path is not None: summary["analysis"] = analyze_decoded_frames(args, probe, raw_path, width, height, fps) (args.artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") print(json.dumps(summary, indent=2, sort_keys=True)) return 0 def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--source", choices=["virtual", "client-test-pattern"], default="virtual") parser.add_argument("--device", help="v4l2loopback device such as /dev/video10 for --source virtual") parser.add_argument("--mode", default="1280x720@30") parser.add_argument("--duration", type=float, default=20.0) parser.add_argument("--warmup", type=float, default=2.0) parser.add_argument("--codec", choices=["mjpeg", "hevc"], default="mjpeg") parser.add_argument("--bundled", action="store_true", help="exercise StreamWebcamMedia instead of legacy StreamCamera") parser.add_argument("--port", type=int, default=50051) parser.add_argument("--artifact-dir", type=pathlib.Path) parser.add_argument("--client-binary", default="target/debug/lesavka-client") parser.add_argument("--server-binary", default="target/debug/lesavka-uplink-dummy-server") parser.add_argument("--no-build", action="store_true") parser.add_argument("--x-step", type=int, default=64) parser.add_argument("--y-step", type=int, default=16) parser.add_argument("--bands", type=int, default=24) parser.add_argument("--mae-threshold", type=float, default=10.0) parser.add_argument("--lower-mae-threshold", type=float, default=16.0) parser.add_argument("--lower-skew-ratio", type=float, default=1.5) parser.add_argument("--slab-var", type=float, default=20.0) parser.add_argument("--shift-threshold", type=float, default=12.0) parser.add_argument("--shift-improvement", type=float, default=1.15) parser.add_argument("--sequence-window", type=int, default=3) parser.add_argument("--mix-mae-threshold", type=float, default=1.5) parser.add_argument("--mix-improvement", type=float, default=1.8) parser.add_argument("--mix-min-bands", type=int, default=2) args = parser.parse_args() if args.artifact_dir is None: args.artifact_dir = default_artifact_dir(args.mode) if args.source == "virtual" and not args.device: raise SystemExit("--device is required for --source virtual; create one with v4l2loopback first") return run(args) if __name__ == "__main__": raise SystemExit(main())