#!/usr/bin/env python3 """Run synthetic Lesavka uplink media and compare what the RCT receives.""" from __future__ import annotations import argparse import collections import json import os import pathlib import shlex import shutil import subprocess import sys import time from typing import Any DEFAULT_DEVICE_LABEL = "Lesavka Composite" DEFAULT_MODES = "1280x720@20,1280x720@30,1920x1080@20,1920x1080@30" DEFAULT_JPEG_QUALITY = 82 HIGH_SPEED_ISOCHRONOUS_MICROFRAMES_PER_SEC = 8000 DEFAULT_ISOCHRONOUS_LIMIT_PCT = 85 DEFAULT_UVC_MAX_PACKET = 1024 DEFAULT_MEDIA_CONTROL_PATH = "/tmp/lesavka-media.control" DEFAULT_SERVER_UVC_AUDIT_CONTROL_PATH = "/tmp/lesavka-uvc-frame-audit.control" MARKER_BITS = 32 MARKER_COLUMNS = 16 CADENCE_REASONS = {"frame_repeat", "frame_gap", "frame_backwards"} NON_VISUAL_REASONS = CADENCE_REASONS | {"sequence_marker_mismatch"} REMOTE_MEDIA_CONTROL_PAUSE = r""" import base64 import json import pathlib import sys import time DEFAULT_MEDIA_CONTROL_PATH = "/tmp/lesavka-media.control" def media_control_with_camera(raw, enabled): tokens = raw.split() if raw else [] rendered = [] saw_camera = False saw_microphone = False saw_audio = False for token in tokens: key, sep, _value = token.partition("=") if sep and key == "camera": rendered.append(f"camera={1 if enabled else 0}") saw_camera = True else: rendered.append(token) saw_microphone = saw_microphone or (sep and key in {"microphone", "mic"}) saw_audio = saw_audio or (sep and key in {"audio", "speaker"}) if not saw_camera: rendered.insert(0, f"camera={1 if enabled else 0}") if not saw_microphone: rendered.append("microphone=1") if not saw_audio: rendered.append("audio=1") return " ".join(rendered) + "\n" def discover_media_control_paths(): candidates = set() proc = pathlib.Path("/proc") if not proc.exists(): return [] for entry in proc.iterdir(): if not entry.name.isdigit(): continue try: environ = (entry / "environ").read_bytes() cmdline = (entry / "cmdline").read_bytes().replace(b"\0", b" ") except (FileNotFoundError, PermissionError, ProcessLookupError, OSError): continue if b"lesavka" not in cmdline and b"LESAVKA_MEDIA_CONTROL=" not in environ: continue for token in environ.split(b"\0"): if token.startswith(b"LESAVKA_MEDIA_CONTROL="): raw_path = token.split(b"=", 1)[1].decode(errors="replace") if raw_path: candidates.add(pathlib.Path(raw_path)) return sorted( candidates, key=lambda path: ( not path.exists(), -path.stat().st_mtime if path.exists() else 0, str(path), ), ) request = json.loads(sys.argv[1]) state_path = pathlib.Path(request["state_path"]) explicit_path = request.get("media_control_path") or "" discovered = [] if explicit_path else discover_media_control_paths() path = ( pathlib.Path(explicit_path) if explicit_path else (discovered[0] if discovered else pathlib.Path(DEFAULT_MEDIA_CONTROL_PATH)) ) original = path.read_bytes() if path.exists() else None original_text = original.decode(errors="replace") if original is not None else None path.write_text(media_control_with_camera(original_text, False)) state_path.write_text( json.dumps( { "path": str(path), "had_original": original is not None, "original_b64": base64.b64encode(original or b"").decode(), }, sort_keys=True, ) + "\n" ) time.sleep(0.5) print( json.dumps( { "path": str(path), "state_path": str(state_path), "discovered": [str(path) for path in discovered], } ) ) """ REMOTE_MEDIA_CONTROL_RESTORE = r""" import base64 import json import pathlib import sys request = json.loads(sys.argv[1]) state_path = pathlib.Path(request["state_path"]) state = json.loads(state_path.read_text()) path = pathlib.Path(state["path"]) if state.get("had_original"): path.write_bytes(base64.b64decode(state.get("original_b64") or "")) else: path.unlink(missing_ok=True) state_path.unlink(missing_ok=True) print(json.dumps({"path": str(path), "state_path": str(state_path)})) """ def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Manual synthetic end-to-end probe: Theia sends sequence-coded media " "through StreamWebcamMedia while Tethys captures the received UVC/X11 " "frames and compares them to the generated source." ) ) parser.add_argument("--inject-host", default="", help="Theia SSH host, e.g. titan-jh") parser.add_argument("--local-inject", action="store_true", help="run the synthetic injector directly on this host") parser.add_argument("--rct-host", default="", help="RCT SSH host, e.g. tethys") parser.add_argument("--server", default="https://127.0.0.1:50051") parser.add_argument("--inject-binary", default="/usr/local/bin/lesavka-synthetic-uplink") parser.add_argument("--mode", default="1280x720@30", help=f"one mode; baseline set is {DEFAULT_MODES}") parser.add_argument("--width", type=int, default=0, help="override capture width") parser.add_argument("--height", type=int, default=0, help="override capture height") parser.add_argument("--fps", type=int, default=0, help="override capture fps") parser.add_argument("--duration", type=float, default=300.0) parser.add_argument("--source", choices=["device", "x11"], default="device") parser.add_argument("--device", default="auto") parser.add_argument("--device-label", default=DEFAULT_DEVICE_LABEL) parser.add_argument("--display", default=":0") parser.add_argument("--crop", default="", help="x,y,width,height for --source x11") parser.add_argument("--artifact-dir", default="") parser.add_argument("--remote-rct-dir", default="") parser.add_argument("--remote-inject-dir", default="") parser.add_argument( "--pause-local-live-upstream", action="store_true", help="temporarily write camera=0 to the injector host's Lesavka media control file so a live client does not preempt the synthetic injector", ) parser.add_argument( "--media-control-path", default=os.environ.get("LESAVKA_MEDIA_CONTROL", ""), help=( "local live-media control file used with --pause-local-live-upstream; " f"default discovers LESAVKA_MEDIA_CONTROL from running Lesavka processes, then falls back to {DEFAULT_MEDIA_CONTROL_PATH}" ), ) parser.add_argument( "--capture-before-inject", action="store_true", help="start RCT capture before synthetic uplink; default starts uplink first so superseded injectors fail fast", ) parser.add_argument("--inject-warmup-s", type=float, default=1.25) parser.add_argument( "--capture-finish-grace-s", type=float, default=0.0, help="seconds to wait for capture after injector exits; 0 waits indefinitely", ) parser.add_argument("--jpeg-quality", type=int, default=DEFAULT_JPEG_QUALITY) parser.add_argument( "--inject-max-frame-bytes", type=int, default=0, help="max encoded synthetic MJPEG bytes; default uses the safe high-speed isochronous budget for the selected fps", ) parser.add_argument("--x-step", type=int, default=8) parser.add_argument("--y-step", type=int, default=4) parser.add_argument("--bands", type=int, default=24) parser.add_argument("--mae-threshold", type=float, default=18.0) parser.add_argument("--lower-mae-threshold", type=float, default=28.0) parser.add_argument("--lower-skew-ratio", type=float, default=1.8) parser.add_argument("--slab-var", type=float, default=20.0) parser.add_argument("--shift-threshold", type=float, default=16.0) parser.add_argument("--shift-improvement", type=float, default=1.25) parser.add_argument( "--sequence-window", type=int, default=3, help="adjacent synthetic source-frame window to test when classifying mixed/teared frames", ) parser.add_argument( "--mix-mae-threshold", type=float, default=1.5, help="minimum decoded-frame band MAE before an adjacent-frame improvement can count as a mixed-frame tear", ) parser.add_argument( "--mix-improvement", type=float, default=1.8, help="required decoded-frame/best-adjacent MAE ratio for mixed-frame band classification", ) parser.add_argument("--mix-min-bands", type=int, default=2) parser.add_argument("--max-suspicious-artifacts", type=int, default=80) parser.add_argument("--max-reference-artifacts", type=int, default=12) parser.add_argument("--reference-every", type=int, default=900) parser.add_argument("--progress-every", type=int, default=150) parser.add_argument( "--server-uvc-audit", action="store_true", help="enable exact server-side UVC-bound MJPEG audit evidence for this run", ) parser.add_argument( "--server-uvc-audit-host", default="", help="SSH host running the Lesavka server; defaults to --inject-host when set", ) parser.add_argument( "--server-uvc-audit-control-path", default=DEFAULT_SERVER_UVC_AUDIT_CONTROL_PATH, help="runtime control file read by the server to enable UVC-bound frame auditing", ) parser.add_argument( "--server-uvc-audit-dir", default="", help="remote audit directory; default uses a unique /tmp path on the server host", ) parser.add_argument( "--server-uvc-audit-sample-frames", type=int, default=30, help="number of audited MJPEG frames to copy/decode for boundary classification", ) parser.add_argument( "--stream-analyze", action="store_true", help="debug path: analyze ffmpeg stdout directly instead of spooling raw frames first", ) parser.add_argument("--capture-only", action="store_true", help=argparse.SUPPRESS) parser.add_argument("--self-test", action="store_true") return parser.parse_args() def timestamp() -> str: return time.strftime("%Y%m%d-%H%M%S", time.gmtime()) def parse_mode(value: str) -> tuple[int, int, int]: try: size, fps = value.lower().split("@", 1) width, height = size.split("x", 1) return int(width), int(height), int(fps) except ValueError as exc: raise SystemExit(f"--mode must look like WIDTHxHEIGHT@FPS, got {value!r}") from exc def mode_dimensions(args: argparse.Namespace) -> tuple[int, int, int]: width, height, fps = parse_mode(args.mode) if args.width: width = args.width if args.height: height = args.height if args.fps: fps = args.fps if width <= 0 or height <= 0 or fps <= 0: raise SystemExit("width, height, and fps must be positive") return width, height, fps def default_inject_max_frame_bytes(fps: int) -> int: bytes_per_second = ( DEFAULT_UVC_MAX_PACKET * HIGH_SPEED_ISOCHRONOUS_MICROFRAMES_PER_SEC * DEFAULT_ISOCHRONOUS_LIMIT_PCT // 100 ) return max(64 * 1024, bytes_per_second // max(1, fps)) def default_artifact_dir(mode: str) -> pathlib.Path: safe_mode = mode.replace("@", "-").replace("x", "x") return pathlib.Path("artifacts/synthetic-rct") / f"{safe_mode}-{timestamp()}" def media_control_with_camera(raw: str | None, enabled: bool) -> str: tokens = raw.split() if raw else [] rendered: list[str] = [] saw_camera = False saw_microphone = False saw_audio = False for token in tokens: key, sep, _value = token.partition("=") if sep and key == "camera": rendered.append(f"camera={1 if enabled else 0}") saw_camera = True else: rendered.append(token) saw_microphone = saw_microphone or (sep and key in {"microphone", "mic"}) saw_audio = saw_audio or (sep and key in {"audio", "speaker"}) if not saw_camera: rendered.insert(0, f"camera={1 if enabled else 0}") if not saw_microphone: rendered.append("microphone=1") if not saw_audio: rendered.append("audio=1") return " ".join(rendered) + "\n" def discover_media_control_paths() -> list[pathlib.Path]: candidates: set[pathlib.Path] = set() proc = pathlib.Path("/proc") if not proc.exists(): return [] for entry in proc.iterdir(): if not entry.name.isdigit(): continue try: environ = (entry / "environ").read_bytes() cmdline = (entry / "cmdline").read_bytes().replace(b"\0", b" ") except (FileNotFoundError, PermissionError, ProcessLookupError, OSError): continue if b"lesavka" not in cmdline and b"LESAVKA_MEDIA_CONTROL=" not in environ: continue for token in environ.split(b"\0"): if token.startswith(b"LESAVKA_MEDIA_CONTROL="): raw_path = token.split(b"=", 1)[1].decode(errors="replace") if raw_path: candidates.add(pathlib.Path(raw_path)) return sorted( candidates, key=lambda path: ( not path.exists(), -path.stat().st_mtime if path.exists() else 0, str(path), ), ) def resolve_media_control_path(args: argparse.Namespace) -> pathlib.Path: if args.media_control_path: return pathlib.Path(args.media_control_path) discovered = discover_media_control_paths() if discovered: if len(discovered) > 1: print( "multiple live Lesavka media control paths discovered; using " f"{discovered[0]} candidates={[str(path) for path in discovered]}", file=sys.stderr, ) else: print(f"discovered live Lesavka media control path {discovered[0]}", file=sys.stderr) return discovered[0] print( f"no running Lesavka media control path discovered; falling back to {DEFAULT_MEDIA_CONTROL_PATH}", file=sys.stderr, ) return pathlib.Path(DEFAULT_MEDIA_CONTROL_PATH) def pause_local_live_upstream(args: argparse.Namespace) -> tuple[pathlib.Path, bytes | None]: path = resolve_media_control_path(args) original = path.read_bytes() if path.exists() else None raw = original.decode(errors="replace") if original is not None else None path.write_text(media_control_with_camera(raw, False)) print(f"paused local live camera upstream via {path}", file=sys.stderr) time.sleep(0.5) return path, original def restore_local_live_upstream(path: pathlib.Path, original: bytes | None) -> None: if original is None: path.unlink(missing_ok=True) else: path.write_bytes(original) print(f"restored local live media control at {path}", file=sys.stderr) def run_remote_python(host: str, script: str, payload: dict[str, Any]) -> dict[str, Any]: output = subprocess.check_output( ["ssh", host, f"python3 - {shlex.quote(json.dumps(payload, sort_keys=True))}"], input=script, text=True, ) return json.loads(output.strip().splitlines()[-1]) def pause_remote_live_upstream(host: str, args: argparse.Namespace) -> dict[str, Any]: state_path = f"/tmp/lesavka-synthetic-rct-media-control-{os.getpid()}.json" state = run_remote_python( host, REMOTE_MEDIA_CONTROL_PAUSE, { "media_control_path": args.media_control_path, "state_path": state_path, }, ) print( f"paused injector-host live camera upstream on {host} via {state['path']}", file=sys.stderr, ) return state def restore_remote_live_upstream(host: str, state: dict[str, Any]) -> None: restored = run_remote_python( host, REMOTE_MEDIA_CONTROL_RESTORE, {"state_path": state["state_path"]}, ) print( f"restored injector-host live media control on {host} at {restored['path']}", file=sys.stderr, ) def resolve_server_uvc_audit_host(args: argparse.Namespace) -> str: if args.server_uvc_audit_host: return args.server_uvc_audit_host if args.inject_host: return args.inject_host return "" def setup_server_uvc_audit(args: argparse.Namespace, artifact_stamp: str) -> tuple[str, str] | None: if not args.server_uvc_audit: return None host = resolve_server_uvc_audit_host(args) if not host: raise SystemExit("--server-uvc-audit requires --server-uvc-audit-host when --local-inject is used") remote_dir = args.server_uvc_audit_dir or f"/tmp/lesavka-synthetic-rct-uvc-audit-{artifact_stamp}" command = ( f"rm -rf {shlex.quote(remote_dir)} && " f"mkdir -p {shlex.quote(remote_dir)} && " f"printf '%s\\n' {shlex.quote(remote_dir)} > {shlex.quote(args.server_uvc_audit_control_path)}" ) subprocess.run(["ssh", host, command], check=True) print( f"enabled server UVC-bound frame audit on {host}: {remote_dir}", file=sys.stderr, ) return host, remote_dir def cleanup_server_uvc_audit(args: argparse.Namespace, state: tuple[str, str] | None) -> None: if state is None: return host, _remote_dir = state command = f"rm -f {shlex.quote(args.server_uvc_audit_control_path)}" subprocess.run(["ssh", host, command], check=False) print( f"disabled server UVC-bound frame audit on {host}", file=sys.stderr, ) def read_jsonl(path: pathlib.Path) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] if not path.exists(): return records for line in path.read_text(errors="replace").splitlines(): try: value = json.loads(line) except json.JSONDecodeError: continue if isinstance(value, dict): records.append(value) return records def sample_records(records: list[dict[str, Any]], limit: int) -> list[dict[str, Any]]: if limit <= 0 or len(records) <= limit: return records if limit == 1: return [records[-1]] indexes = { round(idx * (len(records) - 1) / (limit - 1)) for idx in range(limit) } return [records[idx] for idx in sorted(indexes)] def copy_server_uvc_audit( args: argparse.Namespace, state: tuple[str, str] | None, local_dir: pathlib.Path, ) -> pathlib.Path | None: if state is None: return None host, remote_dir = state local_dir.mkdir(parents=True, exist_ok=True) remote_log = f"{remote_dir.rstrip('/')}/spool-audit.jsonl" local_log = local_dir / "spool-audit.jsonl" subprocess.run(["scp", f"{host}:{remote_log}", str(local_log)], check=False) records = read_jsonl(local_log) for record in sample_records(records, args.server_uvc_audit_sample_frames): frame_file = str(record.get("file") or "") if not frame_file or "/" in frame_file: continue subprocess.run( [ "scp", f"{host}:{remote_dir.rstrip('/')}/{frame_file}", str(local_dir / frame_file), ], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return local_dir def decode_mjpeg_to_gray(path: pathlib.Path, width: int, height: int) -> bytes | None: if width <= 0 or height <= 0 or not path.exists(): return None proc = subprocess.run( [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-i", str(path), "-an", "-pix_fmt", "gray", "-f", "rawvideo", "-", ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=False, ) expected = width * height if proc.returncode != 0 or len(proc.stdout) < expected: return None return proc.stdout[:expected] def summarize_server_uvc_audit( local_dir: pathlib.Path | None, mode_width: int, mode_height: int, mode_fps: int, capture_data: dict[str, Any] | None, args: argparse.Namespace, ) -> dict[str, Any] | None: if local_dir is None: return None log_path = local_dir / "spool-audit.jsonl" records = read_jsonl(log_path) frame_size_counts: collections.Counter[str] = collections.Counter() uvc_mode_counts: collections.Counter[str] = collections.Counter() complete_count = 0 rejected_count = 0 decoded_sample_count = 0 marker_sample_count = 0 visual_sample_count = 0 sample_reason_counts: collections.Counter[str] = collections.Counter() previous_seq: int | None = None for record in records: width = record.get("frame_width") height = record.get("frame_height") frame_size_counts[f"{width}x{height}"] += 1 uvc_width = record.get("uvc_width") uvc_height = record.get("uvc_height") uvc_fps = record.get("uvc_fps") uvc_mode_counts[f"{uvc_width}x{uvc_height}@{uvc_fps}"] += 1 complete_count += int(bool(record.get("jpeg_complete"))) rejected_count += int(bool(record.get("rejected"))) for record in sample_records(records, args.server_uvc_audit_sample_frames): frame_file = str(record.get("file") or "") width = int(record.get("frame_width") or 0) height = int(record.get("frame_height") or 0) if not frame_file or width <= 0 or height <= 0: continue raw = decode_mjpeg_to_gray(local_dir / frame_file, width, height) if raw is None: continue decoded_sample_count += 1 result = analyze_frame(raw, width, height, args, previous_seq) comparison_seq = result.get("comparison_sequence") if comparison_seq is not None: previous_seq = int(comparison_seq) if result.get("decoded_sequence") is not None: marker_sample_count += 1 if result.get("visual_suspicious"): visual_sample_count += 1 sample_reason_counts.update(result.get("visual_reasons") or []) matching_records = [ record for record in records if int(record.get("frame_width") or 0) == mode_width and int(record.get("frame_height") or 0) == mode_height ] matching_uvc_records = [ record for record in records if int(record.get("uvc_width") or 0) == mode_width and int(record.get("uvc_height") or 0) == mode_height and int(record.get("uvc_fps") or 0) == mode_fps ] capture_visual_frames = int((capture_data or {}).get("visual_suspicious_frames") or 0) capture_frames = int((capture_data or {}).get("frames") or 0) status = "inconclusive" diagnosis: list[str] = [] if not records: status = "server_boundary_missing" diagnosis.append( "server UVC-bound audit recorded no frames; the software output path did not prove it produced fresh webcam frames during the probe" ) elif rejected_count and rejected_count == len(records): status = "server_boundary_rejected" diagnosis.append( "every audited UVC-bound frame was rejected before handoff; corruption or profile trouble is before the browser-facing UVC path" ) elif complete_count < len(records): status = "server_boundary_incomplete_jpeg" diagnosis.append( "the server UVC-bound audit contains incomplete JPEG payloads, so corruption exists before or at the server handoff" ) elif not matching_records: status = "server_boundary_frame_mode_mismatch" diagnosis.append( f"server UVC-bound frames did not match requested {mode_width}x{mode_height}; observed frame sizes {dict(frame_size_counts)}" ) elif not matching_uvc_records: status = "server_boundary_uvc_mode_mismatch" diagnosis.append( f"server UVC-bound records did not advertise requested {mode_width}x{mode_height}@{mode_fps}; observed UVC modes {dict(uvc_mode_counts)}" ) elif visual_sample_count: status = "server_boundary_visual_corruption" diagnosis.append( "decoded server UVC-bound audit samples were already visually suspicious before reaching the host/browser" ) elif capture_visual_frames: status = "downstream_uvc_or_browser_corruption" diagnosis.append( "server UVC-bound samples were clean and mode-matched, but receiver capture showed visual corruption; the software UVC gadget/browser leg is implicated" ) elif capture_frames: status = "no_visual_corruption_observed" diagnosis.append( "server UVC-bound samples and receiver capture had no visual corruption in this run" ) summary = { "schema": "lesavka.server-uvc-boundary-summary.v1", "status": status, "diagnosis": diagnosis, "record_count": len(records), "complete_count": complete_count, "rejected_count": rejected_count, "frame_size_counts": dict(frame_size_counts), "uvc_mode_counts": dict(uvc_mode_counts), "matching_frame_records": len(matching_records), "matching_uvc_mode_records": len(matching_uvc_records), "decoded_sample_count": decoded_sample_count, "marker_sample_count": marker_sample_count, "visual_sample_count": visual_sample_count, "sample_visual_reason_counts": dict(sample_reason_counts), "artifact_dir": str(local_dir), "log_path": str(log_path), } (local_dir / "boundary-summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") return summary def run_remote_orchestrated(args: argparse.Namespace) -> int: if (not args.inject_host and not args.local_inject) or not args.rct_host: raise SystemExit( "--rct-host and either --inject-host or --local-inject are required unless --capture-only or --self-test is used" ) if not shutil.which("ssh") or not shutil.which("scp"): raise SystemExit("ssh and scp are required for the remote synthetic probe") width, height, fps = mode_dimensions(args) run_stamp = timestamp() inject_max_frame_bytes = args.inject_max_frame_bytes or default_inject_max_frame_bytes(fps) artifact_dir = ( pathlib.Path(args.artifact_dir) if args.artifact_dir else pathlib.Path("artifacts/synthetic-rct") / f"{args.mode.replace('@', '-').replace('x', 'x')}-{run_stamp}" ) artifact_dir.mkdir(parents=True, exist_ok=True) remote_rct_dir = args.remote_rct_dir or f"/tmp/lesavka-synthetic-rct-capture-{run_stamp}" remote_inject_dir = args.remote_inject_dir or f"/tmp/lesavka-synthetic-uplink-{run_stamp}" remote_script = f"/tmp/lesavka-synthetic-rct-probe-{os.getpid()}.py" script_text = pathlib.Path(__file__).read_text() subprocess.run( ["ssh", args.rct_host, f"cat > {shlex.quote(remote_script)} && chmod +x {shlex.quote(remote_script)}"], input=script_text, text=True, check=True, ) capture_cmd = [ "python3", remote_script, "--capture-only", "--mode", args.mode, "--width", str(width), "--height", str(height), "--fps", str(fps), "--duration", str(args.duration), "--source", args.source, "--device", args.device, "--device-label", args.device_label, "--display", args.display, "--crop", args.crop, "--artifact-dir", remote_rct_dir, "--x-step", str(args.x_step), "--y-step", str(args.y_step), "--bands", str(args.bands), "--mae-threshold", str(args.mae_threshold), "--lower-mae-threshold", str(args.lower_mae_threshold), "--lower-skew-ratio", str(args.lower_skew_ratio), "--slab-var", str(args.slab_var), "--shift-threshold", str(args.shift_threshold), "--shift-improvement", str(args.shift_improvement), "--sequence-window", str(args.sequence_window), "--mix-mae-threshold", str(args.mix_mae_threshold), "--mix-improvement", str(args.mix_improvement), "--mix-min-bands", str(args.mix_min_bands), "--max-suspicious-artifacts", str(args.max_suspicious_artifacts), "--max-reference-artifacts", str(args.max_reference_artifacts), "--reference-every", str(args.reference_every), "--progress-every", str(args.progress_every), ] if args.stream_analyze: capture_cmd.append("--stream-analyze") inject_cmd = [ args.inject_binary, "--server", args.server, "--mode", args.mode, "--duration", str(args.duration + 2.0), "--artifact-dir", remote_inject_dir, "--jpeg-quality", str(args.jpeg_quality), "--max-frame-bytes", str(inject_max_frame_bytes), "--print-every", str(args.progress_every), ] (artifact_dir / "orchestrator-command.txt").write_text(" ".join(sys.argv) + "\n") (artifact_dir / "mode.json").write_text( json.dumps( { "schema": "lesavka.synthetic-rct-probe.run.v1", "mode": args.mode, "width": width, "height": height, "fps": fps, "source": args.source, "duration_s": args.duration, "jpeg_quality": args.jpeg_quality, "inject_max_frame_bytes": inject_max_frame_bytes, "inject_host": args.inject_host, "local_inject": args.local_inject, "rct_host": args.rct_host, "pause_local_live_upstream": args.pause_local_live_upstream, "media_control_path": args.media_control_path, "server_uvc_audit": args.server_uvc_audit, "server_uvc_audit_host": resolve_server_uvc_audit_host(args), "server_uvc_audit_control_path": args.server_uvc_audit_control_path, "server_uvc_audit_sample_frames": args.server_uvc_audit_sample_frames, }, indent=2, sort_keys=True, ) + "\n" ) def start_capture() -> subprocess.Popen[Any]: print(f"starting RCT capture on {args.rct_host}: {remote_rct_dir}", file=sys.stderr) return subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)]) def start_inject() -> subprocess.Popen[Any]: if args.local_inject: print(f"starting local synthetic uplink: {remote_inject_dir}", file=sys.stderr) return subprocess.Popen(inject_cmd) print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr) return subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)]) def stop_capture(process: subprocess.Popen[Any]) -> int | None: process.terminate() try: return process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() return process.wait() def wait_capture_or_inject_exit( capture_process: subprocess.Popen[Any], inject_process: subprocess.Popen[Any] ) -> tuple[int | None, int | None]: while True: capture_status = capture_process.poll() if capture_status is not None: return capture_status, inject_process.wait() inject_status = inject_process.poll() if inject_status is not None: if inject_status == 0: if args.capture_finish_grace_s <= 0: return capture_process.wait(), inject_status deadline = time.monotonic() + args.capture_finish_grace_s while time.monotonic() < deadline: capture_status = capture_process.poll() if capture_status is not None: return capture_status, inject_status time.sleep(0.25) diagnosis.append( "synthetic uplink completed but RCT capture did not finish; capture likely lagged, froze, or was blocked by another consumer" ) else: diagnosis.append( "synthetic uplink exited while RCT capture was still active; stopping capture because the run is not isolated or the injector failed" ) print( f"synthetic uplink exited during capture rc={inject_status}; stopping RCT capture", file=sys.stderr, ) return stop_capture(capture_process), inject_status time.sleep(0.25) capture: subprocess.Popen[Any] | None = None diagnosis: list[str] = [] paused_control: tuple[pathlib.Path, bytes | None] | None = None paused_remote_control: tuple[str, dict[str, Any]] | None = None server_audit_state: tuple[str, str] | None = None try: server_audit_state = setup_server_uvc_audit(args, run_stamp) if args.pause_local_live_upstream: if args.local_inject: paused_control = pause_local_live_upstream(args) else: remote_state = pause_remote_live_upstream(args.inject_host, args) paused_remote_control = (args.inject_host, remote_state) if args.capture_before_inject: capture = start_capture() time.sleep(1.0) inject = start_inject() capture_rc, inject_rc = wait_capture_or_inject_exit(capture, inject) else: inject = start_inject() time.sleep(max(0.0, args.inject_warmup_s)) inject_rc = inject.poll() if inject_rc is not None: capture_rc = None diagnosis.append( "synthetic uplink exited before capture warmup completed; disconnect the live client or pause upstream webcam before running the isolated probe" ) print(f"synthetic uplink exited before capture started rc={inject_rc}", file=sys.stderr) else: capture = start_capture() capture_rc, inject_rc = wait_capture_or_inject_exit(capture, inject) finally: cleanup_server_uvc_audit(args, server_audit_state) if paused_remote_control is not None: restore_remote_live_upstream(*paused_remote_control) if paused_control is not None: restore_local_live_upstream(*paused_control) local_capture = artifact_dir / "capture" local_inject = artifact_dir / "inject" local_server_audit = artifact_dir / "server-uvc-audit" if capture is not None: subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False) if args.local_inject: if pathlib.Path(remote_inject_dir).exists(): if local_inject.exists(): shutil.rmtree(local_inject) shutil.copytree(remote_inject_dir, local_inject) else: subprocess.run(["scp", "-r", f"{args.inject_host}:{remote_inject_dir}", str(local_inject)], check=False) copied_server_audit = copy_server_uvc_audit(args, server_audit_state, local_server_audit) capture_summary = local_capture / "summary.json" capture_data: dict[str, Any] | None = None if capture_summary.exists(): try: capture_data = json.loads(capture_summary.read_text()) decoded_pct = float(capture_data.get("decoded_pct") or 0.0) if inject_rc != 0 and decoded_pct < 80.0: diagnosis.append( "captured frames did not consistently contain synthetic markers and the injector failed; the RCT capture likely measured a mixed, previous, or live webcam stream" ) fps_observed = float(capture_data.get("fps_observed") or 0.0) fps_requested = float(capture_data.get("fps_requested") or fps) if fps_observed and fps_observed < fps_requested * 0.5: diagnosis.append( f"RCT capture decoded only {fps_observed:.3f} fps from a {fps_requested:.0f} fps mode; check for a frozen UVC device or another browser/process holding the camera" ) frames = int(capture_data.get("frames") or 0) reason_counts = capture_data.get("reason_counts") or {} visual_reasons = capture_data.get("visual_reason_counts") or {} visual_frames = int(capture_data.get("visual_suspicious_frames") or 0) suspicious_frames = int(capture_data.get("suspicious_frames") or 0) repeats = int(reason_counts.get("frame_repeat") or 0) cadence_only = suspicious_frames > 0 and visual_frames == 0 and not visual_reasons if cadence_only: diagnosis.append( "RCT capture had cadence-only repeat/gap events; no visual tear/mixed-frame corruption was detected in aligned synthetic frames" ) if frames > 0 and repeats >= max(3, int(frames * 0.9)): diagnosis.append( "RCT capture repeated nearly every decoded synthetic marker; the received UVC stream was stale/frozen instead of advancing" ) except Exception: pass inject_summary = local_inject / "summary.json" if inject_summary.exists(): try: inject_data = json.loads(inject_summary.read_text()) oversize_frames = int(inject_data.get("encoded_oversize_frames") or 0) sent_frames = int(inject_data.get("sent_frames") or 0) encoded_frames = int(inject_data.get("encoded_frames") or 0) exit_reason = str(inject_data.get("exit_reason") or "") max_bytes = inject_data.get("encoded_max_bytes") max_frame_bytes = inject_data.get("max_frame_bytes") if oversize_frames: diagnosis.append( f"synthetic injector produced {oversize_frames} over-budget MJPEG frame(s), max={max_bytes} cap={max_frame_bytes}; the server will freeze instead of spooling those frames" ) if inject_rc != 0 and "StreamWebcamMedia closed before accepting synthetic frame" in exit_reason: diagnosis.append( f"synthetic injector was preempted after sending {sent_frames} frame(s); disconnect/pause the live Lesavka client upstream before running this isolated probe" ) elif inject_rc != 0 and encoded_frames > 0 and not oversize_frames: diagnosis.append( f"synthetic injector encoded {encoded_frames} in-budget frame(s) before failing; inspect inject/summary.json exit_reason for the stream-close cause" ) except Exception: pass server_boundary_summary = summarize_server_uvc_audit( copied_server_audit, width, height, fps, capture_data, args, ) if server_boundary_summary: for item in server_boundary_summary.get("diagnosis") or []: diagnosis.append(str(item)) summary = { "schema": "lesavka.synthetic-rct-probe.orchestrator.v1", "mode": args.mode, "capture_rc": capture_rc, "inject_rc": inject_rc, "diagnosis": diagnosis, "artifact_dir": str(artifact_dir), "capture_artifacts": str(local_capture), "inject_artifacts": str(local_inject), "server_uvc_boundary": server_boundary_summary, "server_uvc_audit_artifacts": str(local_server_audit) if copied_server_audit else None, } (artifact_dir / "run-summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") print(json.dumps(summary, indent=2, sort_keys=True)) print(f"artifact_dir: {artifact_dir}") return 0 if capture_rc == 0 and inject_rc == 0 else 1 def detect_video_device(label: str) -> str: explicit = os.environ.get("LESAVKA_RCT_UVC_DEVICE") if explicit: return explicit try: listing = subprocess.check_output(["v4l2-ctl", "--list-devices"], text=True) except Exception: return "/dev/video2" current_matches = False for line in listing.splitlines(): if not line.startswith(("\t", " ")): current_matches = label.lower() in line.lower() continue value = line.strip() if current_matches and value.startswith("/dev/video"): return value return "/dev/video2" def parse_crop(args: argparse.Namespace, width: int, height: int) -> tuple[int, int, int, int]: if not args.crop: return 0, 0, width, height parts = [part.strip() for part in args.crop.split(",")] if len(parts) != 4: raise SystemExit("--crop must be x,y,width,height") x, y, crop_width, crop_height = [int(part) for part in parts] if crop_width <= 0 or crop_height <= 0: raise SystemExit("--crop width and height must be positive") return x, y, crop_width, crop_height def ffmpeg_cmd(args: argparse.Namespace, width: int, height: int) -> tuple[list[str], int, int, str]: if args.source == "x11": x, y, capture_width, capture_height = parse_crop(args, width, height) display = f"{args.display}+{x},{y}" return ( [ "ffmpeg", "-hide_banner", "-nostdin", "-loglevel", "warning", "-f", "x11grab", "-video_size", f"{capture_width}x{capture_height}", "-framerate", str(args.fps or parse_mode(args.mode)[2]), "-i", display, "-an", "-pix_fmt", "gray", "-f", "rawvideo", "-", ], capture_width, capture_height, display, ) device = detect_video_device(args.device_label) if args.device == "auto" else args.device return ( [ "ffmpeg", "-hide_banner", "-nostdin", "-loglevel", "warning", "-f", "v4l2", "-input_format", "mjpeg", "-video_size", f"{width}x{height}", "-framerate", str(args.fps or parse_mode(args.mode)[2]), "-i", device, "-an", "-pix_fmt", "gray", "-f", "rawvideo", "-", ], width, height, device, ) def marker_cell(width: int, height: int) -> int: return max(6, min(16, min(width, height) // 80)) def fill_rect(frame: bytearray, width: int, height: int, x0: int, y0: int, w: int, h: int, value: int) -> None: for y in range(max(0, y0), min(height, y0 + h)): row = y * width for x in range(max(0, x0), min(width, x0 + w)): frame[row + x] = value def synthetic_base_luma(width: int, height: int, sequence: int, x: int, y: int) -> int: safe_width = max(width, 1) safe_height = max(height, 1) moving_width = min(max(width // 10, 32), safe_width) moving_offset = (sequence * 13) % safe_width center_x = width // 2 center_y = height // 2 block_w = max(width // 24, 24) block_h = max(height // 18, 18) base = 44 + (x * 72 // safe_width) + (y * 52 // safe_height) + ((sequence * 3) % 28) checker = 30 if (((x // block_w) + (y // block_h) + (sequence // 5)) & 1) == 0 else 0 value = min(238, base + checker) moving = (x + safe_width - moving_offset) % safe_width if moving < moving_width: value = min(255, 220 - (y * 54 // safe_height)) elif moving < moving_width + 4: value = 28 if abs(x - center_x) < width // 9 and abs(y - center_y) < height // 12: value = 255 - value // 2 return value def synthetic_marker_luma(width: int, height: int, sequence: int, x: int, y: int) -> int | None: cell = marker_cell(width, height) rows = (MARKER_BITS + MARKER_COLUMNS - 1) // MARKER_COLUMNS if width < (MARKER_COLUMNS + 4) * cell or height < (rows + 4) * cell: return None marker_x = 2 * cell marker_y = 2 * cell if cell <= x < (MARKER_COLUMNS + 3) * cell and cell <= y < (rows + 3) * cell: value = 32 if marker_x - cell <= x < marker_x and marker_y - cell <= y < marker_y: value = 255 elif marker_x + MARKER_COLUMNS * cell <= x < marker_x + (MARKER_COLUMNS + 1) * cell and marker_y - cell <= y < marker_y: value = 0 elif marker_x <= x < marker_x + MARKER_COLUMNS * cell and marker_y <= y < marker_y + rows * cell: col = (x - marker_x) // cell row = (y - marker_y) // cell bit = row * MARKER_COLUMNS + col if bit < MARKER_BITS: value = 255 if ((sequence >> bit) & 1) else 0 return value return None def synthetic_luma(width: int, height: int, sequence: int, x: int, y: int) -> int: marker = synthetic_marker_luma(width, height, sequence, x, y) if marker is not None: return marker return synthetic_base_luma(width, height, sequence, x, y) def synthetic_gray(width: int, height: int, sequence: int) -> bytes: data = bytearray(width * height) for y in range(height): row = y * width for x in range(width): data[row + x] = synthetic_luma(width, height, sequence, x, y) return bytes(data) def draw_marker(frame: bytearray, width: int, height: int, sequence: int) -> None: cell = marker_cell(width, height) rows = (MARKER_BITS + MARKER_COLUMNS - 1) // MARKER_COLUMNS if width < (MARKER_COLUMNS + 4) * cell or height < (rows + 4) * cell: return x0 = 2 * cell y0 = 2 * cell fill_rect(frame, width, height, cell, cell, (MARKER_COLUMNS + 2) * cell, (rows + 2) * cell, 32) fill_rect(frame, width, height, x0 - cell, y0 - cell, cell, cell, 255) fill_rect(frame, width, height, x0 + MARKER_COLUMNS * cell, y0 - cell, cell, cell, 0) for bit in range(MARKER_BITS): col = bit % MARKER_COLUMNS row = bit // MARKER_COLUMNS value = 255 if ((sequence >> bit) & 1) else 0 fill_rect(frame, width, height, x0 + col * cell, y0 + row * cell, cell, cell, value) def cell_mean(frame: bytes, width: int, x0: int, y0: int, cell: int) -> float: total = 0 count = 0 inset = max(1, cell // 4) for y in range(y0 + inset, y0 + cell - inset): row = y * width for x in range(x0 + inset, x0 + cell - inset): total += frame[row + x] count += 1 return total / max(1, count) def decode_sequence(frame: bytes, width: int, height: int) -> tuple[int | None, int]: cell = marker_cell(width, height) rows = (MARKER_BITS + MARKER_COLUMNS - 1) // MARKER_COLUMNS if width < (MARKER_COLUMNS + 4) * cell or height < (rows + 4) * cell: return None, MARKER_BITS x0 = 2 * cell y0 = 2 * cell value = 0 uncertain = 0 for bit in range(MARKER_BITS): col = bit % MARKER_COLUMNS row = bit // MARKER_COLUMNS mean = cell_mean(frame, width, x0 + col * cell, y0 + row * cell, cell) if mean > 165: value |= 1 << bit elif mean >= 90: uncertain += 1 if uncertain > 6: return None, uncertain return value, uncertain def sampled_abs_delta_expected(frame: bytes, width: int, height: int, sequence: int, y0: int, y1: int, x_step: int, y_step: int) -> float: total = 0 count = 0 for y in range(y0, y1, y_step): row = y * width for x in range(0, width, x_step): total += abs(frame[row + x] - synthetic_luma(width, height, sequence, x, y)) count += 1 return total / max(1, count) def band_stats(frame: bytes, width: int, y0: int, y1: int, x_step: int, y_step: int) -> tuple[float, float]: total = 0 total2 = 0 count = 0 for y in range(y0, y1, y_step): row = y * width for x in range(0, width, x_step): value = frame[row + x] total += value total2 += value * value count += 1 mean = total / max(1, count) return mean, max(0.0, total2 / max(1, count) - mean * mean) def shifted_expected_delta(frame: bytes, width: int, height: int, sequence: int, shift: int, args: argparse.Namespace) -> float: x0 = max(0, -shift) x1 = min(width, width - shift) if x0 >= x1: return 0.0 y0 = height // 4 total = 0 count = 0 for y in range(y0, height, args.y_step): row = y * width for x in range(x0, x1, args.x_step): total += abs(frame[row + x] - synthetic_luma(width, height, sequence, x + shift, y)) count += 1 return total / max(1, count) def best_expected_shift(frame: bytes, width: int, height: int, sequence: int, args: argparse.Namespace) -> tuple[int, float, float, float]: zero = shifted_expected_delta(frame, width, height, sequence, 0, args) best = zero best_shift = 0 for shift in [-128, -96, -80, -64, -48, -32, -24, -16, -12, -8, 8, 12, 16, 24, 32, 48, 64, 80, 96, 128]: candidate = shifted_expected_delta(frame, width, height, sequence, shift, args) if candidate < best: best = candidate best_shift = shift improvement = zero / max(best, 0.001) if best_shift else 1.0 return best_shift, zero, best, improvement def candidate_sequences(sequence: int | None, previous_seq: int | None, args: argparse.Namespace) -> list[int]: candidates: set[int] = set() window = max(1, int(args.sequence_window)) if sequence is not None: candidates.update(range(max(0, sequence - window), sequence + window + 1)) if previous_seq is not None: candidates.update(range(max(0, previous_seq - 1), previous_seq + window + 2)) return sorted(candidates) def best_sequence_delta( frame: bytes, width: int, height: int, candidates: list[int], y0: int, y1: int, args: argparse.Namespace, ) -> tuple[int | None, float]: best_seq: int | None = None best_mae = float("inf") for candidate in candidates: mae = sampled_abs_delta_expected(frame, width, height, candidate, y0, y1, args.x_step, args.y_step) if mae < best_mae: best_mae = mae best_seq = candidate return best_seq, 0.0 if best_seq is None else best_mae def band_sequence_profile( frame: bytes, width: int, height: int, sequence: int | None, previous_seq: int | None, args: argparse.Namespace, ) -> dict[str, Any]: candidates = candidate_sequences(sequence, previous_seq, args) if not candidates: return { "best_frame_sequence": None, "best_frame_mae": 0.0, "mixed_band_count": 0, "mixed_band_run_pct": 0.0, "band_sequence_counts": {}, "upper_dominant_sequence": None, "lower_dominant_sequence": None, "sequence_boundary_count": 0, "sequence_marker_mismatch": False, "reasons": [], } best_frame_sequence, best_frame_mae = best_sequence_delta(frame, width, height, candidates, 0, height, args) band_count = max(8, args.bands) band_h = max(1, height // band_count) band_best_sequences: list[int | None] = [] mixed_flags: list[bool] = [] for band in range(band_count): y0 = band * band_h y1 = height if band == band_count - 1 else min(height, y0 + band_h) best_seq, best_mae = best_sequence_delta(frame, width, height, candidates, y0, y1, args) decoded_mae = ( sampled_abs_delta_expected(frame, width, height, sequence, y0, y1, args.x_step, args.y_step) if sequence is not None else float("inf") ) improvement = decoded_mae / max(best_mae, 0.001) is_mixed = ( sequence is not None and best_seq is not None and best_seq != sequence and decoded_mae >= args.mix_mae_threshold and improvement >= args.mix_improvement ) band_best_sequences.append(best_seq) mixed_flags.append(is_mixed) counts = collections.Counter(seq for seq in band_best_sequences if seq is not None) upper_counts = collections.Counter(seq for seq in band_best_sequences[: band_count // 2] if seq is not None) lower_counts = collections.Counter(seq for seq in band_best_sequences[band_count // 2 :] if seq is not None) upper_dominant = upper_counts.most_common(1)[0][0] if upper_counts else None lower_dominant = lower_counts.most_common(1)[0][0] if lower_counts else None mixed_band_count = sum(1 for flag in mixed_flags if flag) mixed_run_pct = max_run(mixed_flags) / max(1, band_count) sequence_boundary_count = sum( 1 for idx in range(1, len(band_best_sequences)) if band_best_sequences[idx] is not None and band_best_sequences[idx - 1] is not None and band_best_sequences[idx] != band_best_sequences[idx - 1] ) reasons: list[str] = [] all_or_nearly_all_foreign = mixed_band_count >= max(1, int(band_count * 0.85)) if sequence is not None and best_frame_sequence is not None and best_frame_sequence != sequence and all_or_nearly_all_foreign: reasons.append("sequence_marker_mismatch") elif mixed_band_count >= max(1, args.mix_min_bands): reasons.append("mixed_sequence_bands") if lower_dominant is not None and upper_dominant == sequence and lower_dominant != sequence: reasons.append("lower_half_frame_mix") if upper_dominant is not None and lower_dominant == sequence and upper_dominant != sequence: reasons.append("upper_half_frame_mix") if sequence_boundary_count > 0: reasons.append("sequence_boundary") return { "best_frame_sequence": best_frame_sequence, "best_frame_mae": best_frame_mae, "mixed_band_count": mixed_band_count, "mixed_band_run_pct": mixed_run_pct, "band_sequence_counts": dict(counts.most_common(6)), "upper_dominant_sequence": upper_dominant, "lower_dominant_sequence": lower_dominant, "sequence_boundary_count": sequence_boundary_count, "sequence_marker_mismatch": "sequence_marker_mismatch" in reasons, "reasons": reasons, } def max_run(flags: list[bool]) -> int: best = 0 current = 0 for flag in flags: current = current + 1 if flag else 0 best = max(best, current) return best def analyze_frame( frame: bytes, width: int, height: int, args: argparse.Namespace, previous_seq: int | None, ) -> dict[str, Any]: sequence, uncertain_bits = decode_sequence(frame, width, height) max_plausible_step = max(120, args.sequence_window * 16) marker_sequence_implausible = ( sequence is not None and previous_seq is not None and abs(sequence - previous_seq) > max_plausible_step ) comparison_sequence = sequence if marker_sequence_implausible: comparison_sequence = previous_seq + 1 if previous_seq is not None else None elif comparison_sequence is None and previous_seq is not None: comparison_sequence = previous_seq + 1 upper_mae = lower_mae = total_mae = 0.0 shift_pixels = 0 shift_zero_delta = shift_best_delta = shift_improvement = 0.0 if comparison_sequence is not None: upper_mae = sampled_abs_delta_expected(frame, width, height, comparison_sequence, 0, height // 2, args.x_step, args.y_step) lower_mae = sampled_abs_delta_expected(frame, width, height, comparison_sequence, height // 2, height, args.x_step, args.y_step) total_mae = sampled_abs_delta_expected(frame, width, height, comparison_sequence, 0, height, args.x_step, args.y_step) shift_pixels, shift_zero_delta, shift_best_delta, shift_improvement = best_expected_shift(frame, width, height, comparison_sequence, args) band_count = max(8, args.bands) band_h = max(1, height // band_count) means: list[float] = [] variances: list[float] = [] for band in range(band_count): y0 = band * band_h y1 = height if band == band_count - 1 else min(height, y0 + band_h) mean, variance = band_stats(frame, width, y0, y1, args.x_step, args.y_step) means.append(mean) variances.append(variance) lower = band_count // 2 lower_flags = [var < args.slab_var for var in variances[lower:]] low_var_run = max_run(lower_flags) / max(1, len(lower_flags)) mean_jumps = [abs(means[idx] - means[idx - 1]) for idx in range(1, band_count)] max_lower_jump = max(mean_jumps[lower:] or [0.0]) sequence_profile = band_sequence_profile(frame, width, height, comparison_sequence, previous_seq, args) reasons: list[str] = [] if sequence is None: reasons.append("marker_decode_failed") elif marker_sequence_implausible: reasons.append("marker_sequence_implausible") elif previous_seq is not None: if sequence == previous_seq: reasons.append("frame_repeat") elif sequence > previous_seq + 1: reasons.append("frame_gap") elif sequence < previous_seq: reasons.append("frame_backwards") if sequence is not None: if lower_mae > args.lower_mae_threshold and lower_mae > max(upper_mae * args.lower_skew_ratio, args.lower_mae_threshold): reasons.append("lower_half_tear") if total_mae > args.mae_threshold and lower_mae <= max(upper_mae * args.lower_skew_ratio, args.lower_mae_threshold): reasons.append("high_mae") if low_var_run >= 0.25 and lower_mae > args.lower_mae_threshold: reasons.append("black_or_gray_slab") if shift_pixels and shift_zero_delta > args.shift_threshold and shift_improvement > args.shift_improvement: reasons.append("horizontal_shift") reasons.extend(sequence_profile["reasons"]) visual_reasons = [reason for reason in reasons if reason not in NON_VISUAL_REASONS] cadence_reasons = [reason for reason in reasons if reason in CADENCE_REASONS] return { "suspicious": bool(reasons), "visual_suspicious": bool(visual_reasons), "reasons": reasons, "visual_reasons": visual_reasons, "cadence_reasons": cadence_reasons, "decoded_sequence": sequence, "comparison_sequence": comparison_sequence, "marker_sequence_implausible": marker_sequence_implausible, "marker_uncertain_bits": uncertain_bits, "upper_mae": round(upper_mae, 3), "lower_mae": round(lower_mae, 3), "total_mae": round(total_mae, 3), "lower_low_variance_run_pct": round(low_var_run, 3), "max_lower_jump": round(max_lower_jump, 3), "shift_pixels": shift_pixels, "shift_zero_delta": round(shift_zero_delta, 3), "shift_best_delta": round(shift_best_delta, 3), "shift_improvement": round(shift_improvement, 3), "best_frame_sequence": sequence_profile["best_frame_sequence"], "best_frame_mae": round(float(sequence_profile["best_frame_mae"]), 3), "mixed_band_count": sequence_profile["mixed_band_count"], "mixed_band_run_pct": round(float(sequence_profile["mixed_band_run_pct"]), 3), "band_sequence_counts": sequence_profile["band_sequence_counts"], "upper_dominant_sequence": sequence_profile["upper_dominant_sequence"], "lower_dominant_sequence": sequence_profile["lower_dominant_sequence"], "sequence_boundary_count": sequence_profile["sequence_boundary_count"], "sequence_marker_mismatch": sequence_profile["sequence_marker_mismatch"], } def write_pgm(path: pathlib.Path, frame: bytes, width: int, height: int) -> None: path.write_bytes(f"P5\n{width} {height}\n255\n".encode() + frame) def run_capture(args: argparse.Namespace) -> int: width, height, fps = mode_dimensions(args) command, capture_width, capture_height, device = ffmpeg_cmd(args, width, height) artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else pathlib.Path("/tmp") / f"lesavka-synthetic-rct-capture-{timestamp()}" artifact_dir.mkdir(parents=True, exist_ok=True) frame_size = capture_width * capture_height stderr_path = artifact_dir / "ffmpeg.stderr" metrics_path = artifact_dir / "frame-metrics.jsonl" capture_started = time.monotonic() capture_elapsed = 0.0 analysis_elapsed = 0.0 raw_capture_bytes = 0 ffmpeg_rc: int | None = None frame_index = 0 suspicious_count = 0 visual_suspicious_count = 0 reference_artifacts = 0 suspicious_artifacts = 0 previous_seq: int | None = None decoded_frames = 0 reason_counts: collections.Counter[str] = collections.Counter() visual_reason_counts: collections.Counter[str] = collections.Counter() cadence_reason_counts: collections.Counter[str] = collections.Counter() sequence_counts: collections.Counter[int] = collections.Counter() comparison_sequence_counts: collections.Counter[int] = collections.Counter() max_total_mae = max_upper_mae = max_lower_mae = 0.0 max_mixed_band_count = 0 max_sequence_boundary_count = 0 worst: list[dict[str, Any]] = [] def analyze_captured_frame(frame: bytes, elapsed_s: float, metrics: Any) -> None: nonlocal frame_index, suspicious_count, visual_suspicious_count, reference_artifacts, suspicious_artifacts nonlocal previous_seq, decoded_frames, max_total_mae, max_upper_mae, max_lower_mae, worst nonlocal max_mixed_band_count, max_sequence_boundary_count frame_index += 1 result = analyze_frame(frame, capture_width, capture_height, args, previous_seq) decoded_seq = result["decoded_sequence"] comparison_seq = result["comparison_sequence"] if decoded_seq is not None: decoded_frames += 1 sequence_counts[int(decoded_seq)] += 1 if comparison_seq is not None: comparison_sequence_counts[int(comparison_seq)] += 1 previous_seq = int(comparison_seq) result.update({"frame": frame_index, "elapsed_s": round(elapsed_s, 3)}) max_total_mae = max(max_total_mae, float(result["total_mae"])) max_upper_mae = max(max_upper_mae, float(result["upper_mae"])) max_lower_mae = max(max_lower_mae, float(result["lower_mae"])) max_mixed_band_count = max(max_mixed_band_count, int(result["mixed_band_count"])) max_sequence_boundary_count = max(max_sequence_boundary_count, int(result["sequence_boundary_count"])) if result["suspicious"]: suspicious_count += 1 reason_counts.update(result["reasons"]) visual_reason_counts.update(result["visual_reasons"]) cadence_reason_counts.update(result["cadence_reasons"]) worst.append(result) worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30] if result["visual_suspicious"]: visual_suspicious_count += 1 if result["visual_suspicious"] and suspicious_artifacts < args.max_suspicious_artifacts: seq_label = "unknown" if comparison_seq is None else f"seq{int(comparison_seq):08d}" write_pgm(artifact_dir / f"suspicious_{frame_index:06d}_{seq_label}.pgm", frame, capture_width, capture_height) if comparison_seq is not None: write_pgm( artifact_dir / f"expected_{frame_index:06d}_{seq_label}.pgm", synthetic_gray(capture_width, capture_height, int(comparison_seq)), capture_width, capture_height, ) best_seq = result.get("best_frame_sequence") if best_seq is not None and best_seq != comparison_seq: write_pgm( artifact_dir / f"expected_best_{frame_index:06d}_seq{int(best_seq):08d}.pgm", synthetic_gray(capture_width, capture_height, int(best_seq)), capture_width, capture_height, ) suspicious_artifacts += 1 should_reference = frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0) if should_reference and reference_artifacts < args.max_reference_artifacts: write_pgm(artifact_dir / f"reference_{frame_index:06d}.pgm", frame, capture_width, capture_height) reference_artifacts += 1 metrics.write(json.dumps(result, sort_keys=True) + "\n") if frame_index % args.progress_every == 0: print(f"frames={frame_index} suspicious={suspicious_count} latest={result}", file=sys.stderr) with stderr_path.open("wb") as err, metrics_path.open("w") as metrics: if args.stream_analyze: (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) assert proc.stdout is not None capture_started = time.monotonic() try: while time.monotonic() - capture_started < args.duration: frame = proc.stdout.read(frame_size) if len(frame) != frame_size: break analyze_captured_frame(frame, time.monotonic() - capture_started, metrics) finally: proc.terminate() try: ffmpeg_rc = proc.wait(timeout=3) except subprocess.TimeoutExpired: proc.kill() ffmpeg_rc = proc.wait() capture_elapsed = time.monotonic() - capture_started analysis_elapsed = capture_elapsed else: raw_path = artifact_dir / "capture.raw" capture_command = command[:] if "-an" in capture_command: capture_command[capture_command.index("-an") : capture_command.index("-an")] = ["-t", str(args.duration)] else: capture_command[-1:-1] = ["-t", str(args.duration)] capture_command[-1] = str(raw_path) (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in capture_command) + "\n") print(f"capturing raw RCT frames before analysis: {raw_path}", file=sys.stderr) capture_started = time.monotonic() proc = subprocess.run(capture_command, stdout=subprocess.DEVNULL, stderr=err, check=False) capture_elapsed = time.monotonic() - capture_started ffmpeg_rc = proc.returncode raw_capture_bytes = raw_path.stat().st_size if raw_path.exists() else 0 print( f"analyzing captured raw RCT frames bytes={raw_capture_bytes} capture_s={capture_elapsed:.3f}", file=sys.stderr, ) analysis_started = time.monotonic() try: with raw_path.open("rb") as raw: while True: frame = raw.read(frame_size) if len(frame) != frame_size: break analyze_captured_frame(frame, frame_index / max(1, fps), metrics) finally: raw_path.unlink(missing_ok=True) analysis_elapsed = time.monotonic() - analysis_started elapsed = max(0.001, capture_elapsed) summary = { "schema": "lesavka.synthetic-rct-capture.v1", "source": args.source, "device": device, "mode": args.mode, "capture_mode": "stream" if args.stream_analyze else "rawfile", "width": capture_width, "height": capture_height, "fps_requested": fps, "duration_requested_s": args.duration, "duration_observed_s": round(elapsed, 3), "analysis_duration_s": round(analysis_elapsed, 3), "ffmpeg_rc": ffmpeg_rc, "raw_capture_bytes": raw_capture_bytes, "frames": frame_index, "fps_observed": round(frame_index / elapsed, 3), "decoded_frames": decoded_frames, "decoded_pct": round(decoded_frames / frame_index * 100.0, 3) if frame_index else 0.0, "suspicious_frames": suspicious_count, "suspicious_pct": round(suspicious_count / frame_index * 100.0, 3) if frame_index else 0.0, "visual_suspicious_frames": visual_suspicious_count, "visual_suspicious_pct": round(visual_suspicious_count / frame_index * 100.0, 3) if frame_index else 0.0, "reason_counts": dict(reason_counts), "visual_reason_counts": dict(visual_reason_counts), "cadence_reason_counts": dict(cadence_reason_counts), "decoded_sequence_counts": dict(sequence_counts.most_common(12)), "comparison_sequence_counts": dict(comparison_sequence_counts.most_common(12)), "max_total_mae": round(max_total_mae, 3), "max_upper_mae": round(max_upper_mae, 3), "max_lower_mae": round(max_lower_mae, 3), "max_mixed_band_count": max_mixed_band_count, "max_sequence_boundary_count": max_sequence_boundary_count, "worst_frames": worst, "reference_artifacts": reference_artifacts, "suspicious_artifacts": suspicious_artifacts, "artifact_dir": str(artifact_dir), "ffmpeg_stderr": str(stderr_path), } (artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") (artifact_dir / "summary.txt").write_text(format_summary(summary)) print(format_summary(summary), end="") print(f"artifact_dir: {artifact_dir}") return 0 if frame_index > 0 else 2 def format_summary(summary: dict[str, Any]) -> str: return "\n".join( [ "Lesavka synthetic RCT UVC comparison probe", f"source: {summary['source']}", f"device: {summary['device']}", f"mode: {summary['mode']} capture={summary['width']}x{summary['height']}@{summary['fps_requested']}", f"frames: {summary['frames']} ({summary['fps_observed']} fps observed)", f"decoded markers: {summary['decoded_frames']} ({summary['decoded_pct']}%)", f"suspicious: {summary['suspicious_frames']} ({summary['suspicious_pct']}%)", f"visual suspicious: {summary['visual_suspicious_frames']} ({summary['visual_suspicious_pct']}%)", f"reasons: {summary['reason_counts']}", f"visual reasons: {summary['visual_reason_counts']}", f"cadence reasons: {summary['cadence_reason_counts']}", f"max mae: total={summary['max_total_mae']} upper={summary['max_upper_mae']} lower={summary['max_lower_mae']}", f"max mixed bands: {summary['max_mixed_band_count']} boundary_changes={summary['max_sequence_boundary_count']}", f"comparison sequence counts: {summary['comparison_sequence_counts']}", f"artifacts: {summary['artifact_dir']}", "", ] ) def run_self_test(args: argparse.Namespace) -> int: width = 320 height = 180 frames = [synthetic_gray(width, height, idx) for idx in range(6)] corrupt = bytearray(synthetic_gray(width, height, 6)) fill_rect(corrupt, width, height, 0, height // 2, width, height // 4, 0) frames.append(bytes(corrupt)) shifted = bytearray(width * height) expected = synthetic_gray(width, height, 7) for y in range(height): row = y * width for x in range(width): src = min(width - 1, x + 24) shifted[row + x] = expected[row + src] frames.append(bytes(shifted)) mixed = bytearray(synthetic_gray(width, height, 8)) lower_next = synthetic_gray(width, height, 9) split_y = height // 2 mixed[split_y * width :] = lower_next[split_y * width :] frames.append(bytes(mixed)) previous_seq: int | None = None records: list[dict[str, Any]] = [] suspicious = 0 for idx, frame in enumerate(frames): result = analyze_frame(frame, width, height, args, previous_seq) if result["comparison_sequence"] is not None: previous_seq = int(result["comparison_sequence"]) result["frame"] = idx records.append(result) suspicious += int(bool(result["suspicious"])) artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else pathlib.Path("/tmp") / f"lesavka-synthetic-rct-self-test-{timestamp()}" artifact_dir.mkdir(parents=True, exist_ok=True) write_pgm(artifact_dir / "reference_000001.pgm", frames[0], width, height) summary = { "schema": "lesavka.synthetic-rct-probe.self-test.v1", "frames": len(frames), "suspicious_frames": suspicious, "records": records, "artifact_dir": str(artifact_dir), } (artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") print(json.dumps(summary, indent=2, sort_keys=True)) return 0 if suspicious >= 3 else 1 def main() -> int: args = parse_args() if args.self_test: return run_self_test(args) if args.capture_only: return run_capture(args) return run_remote_orchestrated(args) if __name__ == "__main__": raise SystemExit(main())