377 lines
14 KiB
Python
377 lines
14 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Feed synthetic media through the real Lesavka client capture/uplink path."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import importlib.util
|
||
|
|
import json
|
||
|
|
import os
|
||
|
|
import pathlib
|
||
|
|
import shlex
|
||
|
|
import subprocess
|
||
|
|
import threading
|
||
|
|
import time
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
REPO_ROOT = pathlib.Path(__file__).resolve().parents[2]
|
||
|
|
RCT_PROBE = REPO_ROOT / "scripts/manual/run_synthetic_rct_uvc_probe.py"
|
||
|
|
|
||
|
|
|
||
|
|
def load_rct_probe_module() -> Any:
|
||
|
|
spec = importlib.util.spec_from_file_location("lesavka_synthetic_rct_probe", RCT_PROBE)
|
||
|
|
if spec is None or spec.loader is None:
|
||
|
|
raise RuntimeError(f"could not load {RCT_PROBE}")
|
||
|
|
module = importlib.util.module_from_spec(spec)
|
||
|
|
spec.loader.exec_module(module)
|
||
|
|
return module
|
||
|
|
|
||
|
|
|
||
|
|
def parse_mode(mode: str) -> tuple[int, int, int]:
|
||
|
|
size, fps = mode.lower().split("@", 1)
|
||
|
|
width, height = size.split("x", 1)
|
||
|
|
return int(width), int(height), int(fps)
|
||
|
|
|
||
|
|
|
||
|
|
def timestamp() -> str:
|
||
|
|
return time.strftime("%Y%m%d-%H%M%S", time.gmtime())
|
||
|
|
|
||
|
|
|
||
|
|
def default_artifact_dir(mode: str) -> pathlib.Path:
|
||
|
|
safe = mode.replace("@", "-").replace("x", "x")
|
||
|
|
return pathlib.Path("artifacts/client-uplink-virtual-camera") / f"{safe}-{timestamp()}"
|
||
|
|
|
||
|
|
|
||
|
|
def require_tool(name: str) -> None:
|
||
|
|
if subprocess.run(["sh", "-lc", f"command -v {shlex.quote(name)} >/dev/null"], check=False).returncode != 0:
|
||
|
|
raise SystemExit(f"{name} is required for the client uplink virtual camera probe")
|
||
|
|
|
||
|
|
|
||
|
|
def start_process(command: list[str], log_path: pathlib.Path, env: dict[str, str] | None = None) -> subprocess.Popen[bytes]:
|
||
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
log = log_path.open("wb")
|
||
|
|
return subprocess.Popen(command, stdout=log, stderr=subprocess.STDOUT, env=env, cwd=REPO_ROOT)
|
||
|
|
|
||
|
|
|
||
|
|
def terminate_process(proc: subprocess.Popen[bytes] | None, timeout_s: float = 5.0) -> int | None:
|
||
|
|
if proc is None:
|
||
|
|
return None
|
||
|
|
if proc.poll() is not None:
|
||
|
|
return proc.returncode
|
||
|
|
proc.terminate()
|
||
|
|
try:
|
||
|
|
return proc.wait(timeout=timeout_s)
|
||
|
|
except subprocess.TimeoutExpired:
|
||
|
|
proc.kill()
|
||
|
|
return proc.wait(timeout=timeout_s)
|
||
|
|
|
||
|
|
|
||
|
|
def build_binaries(args: argparse.Namespace) -> None:
|
||
|
|
if args.no_build:
|
||
|
|
return
|
||
|
|
subprocess.run(
|
||
|
|
[
|
||
|
|
"cargo",
|
||
|
|
"build",
|
||
|
|
"-p",
|
||
|
|
"lesavka_client",
|
||
|
|
"--bin",
|
||
|
|
"lesavka-client",
|
||
|
|
"--bin",
|
||
|
|
"lesavka-uplink-dummy-server",
|
||
|
|
],
|
||
|
|
cwd=REPO_ROOT,
|
||
|
|
check=True,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def start_synthetic_frames_to_v4l2(
|
||
|
|
args: argparse.Namespace, probe: Any, width: int, height: int, fps: int
|
||
|
|
) -> tuple[subprocess.Popen[bytes], threading.Thread]:
|
||
|
|
if not args.device:
|
||
|
|
raise SystemExit("--device /dev/videoN is required for --source virtual")
|
||
|
|
command = [
|
||
|
|
"ffmpeg",
|
||
|
|
"-hide_banner",
|
||
|
|
"-loglevel",
|
||
|
|
"warning",
|
||
|
|
"-f",
|
||
|
|
"rawvideo",
|
||
|
|
"-pix_fmt",
|
||
|
|
"gray",
|
||
|
|
"-video_size",
|
||
|
|
f"{width}x{height}",
|
||
|
|
"-framerate",
|
||
|
|
str(fps),
|
||
|
|
"-i",
|
||
|
|
"-",
|
||
|
|
"-vf",
|
||
|
|
"format=yuyv422",
|
||
|
|
"-f",
|
||
|
|
"v4l2",
|
||
|
|
args.device,
|
||
|
|
]
|
||
|
|
proc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=(args.artifact_dir / "virtual-camera-ffmpeg.stderr").open("wb"))
|
||
|
|
assert proc.stdin is not None
|
||
|
|
def writer() -> None:
|
||
|
|
frame_count = int((args.duration + args.warmup) * fps)
|
||
|
|
started = time.monotonic()
|
||
|
|
try:
|
||
|
|
for sequence in range(frame_count):
|
||
|
|
if proc.poll() is not None:
|
||
|
|
return
|
||
|
|
proc.stdin.write(probe.synthetic_gray(width, height, sequence))
|
||
|
|
proc.stdin.flush()
|
||
|
|
target = started + (sequence + 1) / max(1, fps)
|
||
|
|
delay = target - time.monotonic()
|
||
|
|
if delay > 0:
|
||
|
|
time.sleep(delay)
|
||
|
|
except BrokenPipeError:
|
||
|
|
return
|
||
|
|
finally:
|
||
|
|
try:
|
||
|
|
proc.stdin.close()
|
||
|
|
except Exception:
|
||
|
|
pass
|
||
|
|
|
||
|
|
thread = threading.Thread(target=writer, name="lesavka-virtual-camera-feeder", daemon=True)
|
||
|
|
thread.start()
|
||
|
|
return proc, thread
|
||
|
|
|
||
|
|
|
||
|
|
def decode_packets(args: argparse.Namespace, width: int, height: int) -> pathlib.Path | None:
|
||
|
|
packet_dir = args.artifact_dir / "dummy-server" / "video-packets"
|
||
|
|
packets = sorted(packet_dir.glob("*.bin"))
|
||
|
|
if not packets:
|
||
|
|
return None
|
||
|
|
stream_path = args.artifact_dir / ("client-uplink.mjpeg" if args.codec == "mjpeg" else "client-uplink.hevc")
|
||
|
|
with stream_path.open("wb") as stream:
|
||
|
|
for packet in packets:
|
||
|
|
stream.write(packet.read_bytes())
|
||
|
|
raw_path = args.artifact_dir / "client-uplink.gray"
|
||
|
|
demuxer = "mjpeg" if args.codec == "mjpeg" else "hevc"
|
||
|
|
command = [
|
||
|
|
"ffmpeg",
|
||
|
|
"-hide_banner",
|
||
|
|
"-loglevel",
|
||
|
|
"warning",
|
||
|
|
"-f",
|
||
|
|
demuxer,
|
||
|
|
"-i",
|
||
|
|
str(stream_path),
|
||
|
|
"-an",
|
||
|
|
"-pix_fmt",
|
||
|
|
"gray",
|
||
|
|
"-f",
|
||
|
|
"rawvideo",
|
||
|
|
str(raw_path),
|
||
|
|
]
|
||
|
|
(args.artifact_dir / "decode-command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n")
|
||
|
|
subprocess.run(command, cwd=REPO_ROOT, check=False, stderr=(args.artifact_dir / "decode-ffmpeg.stderr").open("wb"))
|
||
|
|
if not raw_path.exists() or raw_path.stat().st_size < width * height:
|
||
|
|
return None
|
||
|
|
return raw_path
|
||
|
|
|
||
|
|
|
||
|
|
def analyze_decoded_frames(args: argparse.Namespace, probe: Any, raw_path: pathlib.Path, width: int, height: int, fps: int) -> dict[str, Any]:
|
||
|
|
frame_size = width * height
|
||
|
|
previous_seq: int | None = None
|
||
|
|
frame_index = 0
|
||
|
|
decoded = 0
|
||
|
|
suspicious = 0
|
||
|
|
visual = 0
|
||
|
|
reason_counts: dict[str, int] = {}
|
||
|
|
visual_reason_counts: dict[str, int] = {}
|
||
|
|
worst: list[dict[str, Any]] = []
|
||
|
|
analysis_args = argparse.Namespace(
|
||
|
|
x_step=args.x_step,
|
||
|
|
y_step=args.y_step,
|
||
|
|
sequence_window=args.sequence_window,
|
||
|
|
bands=args.bands,
|
||
|
|
mix_mae_threshold=args.mix_mae_threshold,
|
||
|
|
mix_improvement=args.mix_improvement,
|
||
|
|
mix_min_bands=args.mix_min_bands,
|
||
|
|
lower_mae_threshold=args.lower_mae_threshold,
|
||
|
|
lower_skew_ratio=args.lower_skew_ratio,
|
||
|
|
mae_threshold=args.mae_threshold,
|
||
|
|
slab_var=args.slab_var,
|
||
|
|
shift_threshold=args.shift_threshold,
|
||
|
|
shift_improvement=args.shift_improvement,
|
||
|
|
)
|
||
|
|
metrics_path = args.artifact_dir / "client-uplink-frame-metrics.jsonl"
|
||
|
|
with raw_path.open("rb") as raw, metrics_path.open("w") as metrics:
|
||
|
|
while True:
|
||
|
|
frame = raw.read(frame_size)
|
||
|
|
if len(frame) != frame_size:
|
||
|
|
break
|
||
|
|
frame_index += 1
|
||
|
|
result = probe.analyze_frame(frame, width, height, analysis_args, previous_seq)
|
||
|
|
comparison_seq = result.get("comparison_sequence")
|
||
|
|
if comparison_seq is not None:
|
||
|
|
previous_seq = int(comparison_seq)
|
||
|
|
if result.get("decoded_sequence") is not None:
|
||
|
|
decoded += 1
|
||
|
|
if result["suspicious"]:
|
||
|
|
suspicious += 1
|
||
|
|
for reason in result["reasons"]:
|
||
|
|
reason_counts[reason] = reason_counts.get(reason, 0) + 1
|
||
|
|
worst.append({"frame": frame_index, **result})
|
||
|
|
worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30]
|
||
|
|
if result["visual_suspicious"]:
|
||
|
|
visual += 1
|
||
|
|
for reason in result["visual_reasons"]:
|
||
|
|
visual_reason_counts[reason] = visual_reason_counts.get(reason, 0) + 1
|
||
|
|
metrics.write(json.dumps({"frame": frame_index, **result}, sort_keys=True) + "\n")
|
||
|
|
return {
|
||
|
|
"schema": "lesavka.client-uplink-virtual-camera.analysis.v1",
|
||
|
|
"mode": args.mode,
|
||
|
|
"codec": args.codec,
|
||
|
|
"frames": frame_index,
|
||
|
|
"fps_expected": fps,
|
||
|
|
"decoded_markers": decoded,
|
||
|
|
"suspicious": suspicious,
|
||
|
|
"visual_suspicious": visual,
|
||
|
|
"reasons": reason_counts,
|
||
|
|
"visual_reasons": visual_reason_counts,
|
||
|
|
"worst": worst,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def run(args: argparse.Namespace) -> int:
|
||
|
|
require_tool("ffmpeg")
|
||
|
|
build_binaries(args)
|
||
|
|
probe = load_rct_probe_module()
|
||
|
|
width, height, fps = parse_mode(args.mode)
|
||
|
|
args.artifact_dir.mkdir(parents=True, exist_ok=True)
|
||
|
|
server_binary = pathlib.Path(args.server_binary)
|
||
|
|
client_binary = pathlib.Path(args.client_binary)
|
||
|
|
if not server_binary.is_absolute():
|
||
|
|
server_binary = REPO_ROOT / server_binary
|
||
|
|
if not client_binary.is_absolute():
|
||
|
|
client_binary = REPO_ROOT / client_binary
|
||
|
|
|
||
|
|
dummy_dir = args.artifact_dir / "dummy-server"
|
||
|
|
listen = f"127.0.0.1:{args.port}"
|
||
|
|
server_command = [
|
||
|
|
str(server_binary),
|
||
|
|
"--listen",
|
||
|
|
listen,
|
||
|
|
"--artifact-dir",
|
||
|
|
str(dummy_dir),
|
||
|
|
"--width",
|
||
|
|
str(width),
|
||
|
|
"--height",
|
||
|
|
str(height),
|
||
|
|
"--fps",
|
||
|
|
str(fps),
|
||
|
|
"--codec",
|
||
|
|
args.codec,
|
||
|
|
"--bundled",
|
||
|
|
"true" if args.bundled else "false",
|
||
|
|
"--microphone",
|
||
|
|
"true" if args.bundled else "false",
|
||
|
|
]
|
||
|
|
(args.artifact_dir / "dummy-server-command.txt").write_text(" ".join(shlex.quote(part) for part in server_command) + "\n")
|
||
|
|
server_proc = start_process(server_command, args.artifact_dir / "dummy-server.log")
|
||
|
|
time.sleep(1.0)
|
||
|
|
if server_proc.poll() is not None:
|
||
|
|
raise SystemExit(f"dummy server exited early rc={server_proc.returncode}; see {args.artifact_dir / 'dummy-server.log'}")
|
||
|
|
|
||
|
|
feeder_proc: subprocess.Popen[bytes] | None = None
|
||
|
|
feeder_thread: threading.Thread | None = None
|
||
|
|
client_proc: subprocess.Popen[bytes] | None = None
|
||
|
|
try:
|
||
|
|
if args.source == "virtual":
|
||
|
|
feeder_proc, feeder_thread = start_synthetic_frames_to_v4l2(args, probe, width, height, fps)
|
||
|
|
time.sleep(min(1.0, max(0.0, args.warmup)))
|
||
|
|
camera_source = args.device
|
||
|
|
else:
|
||
|
|
camera_source = "test"
|
||
|
|
env = os.environ.copy()
|
||
|
|
env.update(
|
||
|
|
{
|
||
|
|
"LESAVKA_HEADLESS": "1",
|
||
|
|
"LESAVKA_SERVER_ADDR": f"http://127.0.0.1:{args.port}",
|
||
|
|
"LESAVKA_CAM_SOURCE": camera_source,
|
||
|
|
"LESAVKA_CAM_WIDTH": str(width),
|
||
|
|
"LESAVKA_CAM_HEIGHT": str(height),
|
||
|
|
"LESAVKA_CAM_FPS": str(fps),
|
||
|
|
"LESAVKA_CAM_CODEC": args.codec,
|
||
|
|
"LESAVKA_CAM_FORMAT": "raw",
|
||
|
|
"LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES": "1",
|
||
|
|
"LESAVKA_DEV_MODE": "1",
|
||
|
|
}
|
||
|
|
)
|
||
|
|
if not args.bundled:
|
||
|
|
env["LESAVKA_MIC_DISABLE"] = "1"
|
||
|
|
env["LESAVKA_LEGACY_SPLIT_UPLINK"] = "1"
|
||
|
|
client_command = [str(client_binary), "--server", f"http://127.0.0.1:{args.port}"]
|
||
|
|
(args.artifact_dir / "client-command.txt").write_text(" ".join(shlex.quote(part) for part in client_command) + "\n")
|
||
|
|
client_proc = start_process(client_command, args.artifact_dir / "client.log", env=env)
|
||
|
|
time.sleep(args.duration)
|
||
|
|
finally:
|
||
|
|
client_rc = terminate_process(client_proc)
|
||
|
|
feeder_rc = terminate_process(feeder_proc)
|
||
|
|
if feeder_thread is not None:
|
||
|
|
feeder_thread.join(timeout=2.0)
|
||
|
|
server_rc = terminate_process(server_proc)
|
||
|
|
|
||
|
|
raw_path = decode_packets(args, width, height)
|
||
|
|
summary: dict[str, Any] = {
|
||
|
|
"schema": "lesavka.client-uplink-virtual-camera.run.v1",
|
||
|
|
"artifact_dir": str(args.artifact_dir),
|
||
|
|
"source": args.source,
|
||
|
|
"device": args.device,
|
||
|
|
"mode": args.mode,
|
||
|
|
"codec": args.codec,
|
||
|
|
"bundled": args.bundled,
|
||
|
|
"client_rc": client_rc,
|
||
|
|
"feeder_rc": feeder_rc,
|
||
|
|
"server_rc": server_rc,
|
||
|
|
"decoded_raw": str(raw_path) if raw_path else None,
|
||
|
|
}
|
||
|
|
if raw_path is not None:
|
||
|
|
summary["analysis"] = analyze_decoded_frames(args, probe, raw_path, width, height, fps)
|
||
|
|
(args.artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n")
|
||
|
|
print(json.dumps(summary, indent=2, sort_keys=True))
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
||
|
|
parser.add_argument("--source", choices=["virtual", "client-test-pattern"], default="virtual")
|
||
|
|
parser.add_argument("--device", help="v4l2loopback device such as /dev/video10 for --source virtual")
|
||
|
|
parser.add_argument("--mode", default="1280x720@30")
|
||
|
|
parser.add_argument("--duration", type=float, default=20.0)
|
||
|
|
parser.add_argument("--warmup", type=float, default=2.0)
|
||
|
|
parser.add_argument("--codec", choices=["mjpeg", "hevc"], default="mjpeg")
|
||
|
|
parser.add_argument("--bundled", action="store_true", help="exercise StreamWebcamMedia instead of legacy StreamCamera")
|
||
|
|
parser.add_argument("--port", type=int, default=50051)
|
||
|
|
parser.add_argument("--artifact-dir", type=pathlib.Path)
|
||
|
|
parser.add_argument("--client-binary", default="target/debug/lesavka-client")
|
||
|
|
parser.add_argument("--server-binary", default="target/debug/lesavka-uplink-dummy-server")
|
||
|
|
parser.add_argument("--no-build", action="store_true")
|
||
|
|
parser.add_argument("--x-step", type=int, default=64)
|
||
|
|
parser.add_argument("--y-step", type=int, default=16)
|
||
|
|
parser.add_argument("--bands", type=int, default=24)
|
||
|
|
parser.add_argument("--mae-threshold", type=float, default=10.0)
|
||
|
|
parser.add_argument("--lower-mae-threshold", type=float, default=16.0)
|
||
|
|
parser.add_argument("--lower-skew-ratio", type=float, default=1.5)
|
||
|
|
parser.add_argument("--slab-var", type=float, default=20.0)
|
||
|
|
parser.add_argument("--shift-threshold", type=float, default=12.0)
|
||
|
|
parser.add_argument("--shift-improvement", type=float, default=1.15)
|
||
|
|
parser.add_argument("--sequence-window", type=int, default=3)
|
||
|
|
parser.add_argument("--mix-mae-threshold", type=float, default=1.5)
|
||
|
|
parser.add_argument("--mix-improvement", type=float, default=1.8)
|
||
|
|
parser.add_argument("--mix-min-bands", type=int, default=2)
|
||
|
|
args = parser.parse_args()
|
||
|
|
if args.artifact_dir is None:
|
||
|
|
args.artifact_dir = default_artifact_dir(args.mode)
|
||
|
|
if args.source == "virtual" and not args.device:
|
||
|
|
raise SystemExit("--device is required for --source virtual; create one with v4l2loopback first")
|
||
|
|
return run(args)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
raise SystemExit(main())
|