#!/usr/bin/env python3 """Capture the RCT-facing UVC webcam and score lower-half video corruption.""" from __future__ import annotations import argparse import collections import json import os import pathlib import shlex import shutil import subprocess import sys import time from typing import Any DEFAULT_DEVICE_LABEL = "Lesavka Composite" def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Long-running Lesavka RCT UVC artifact probe. It captures decoded " "grayscale frames from the host-side UVC device, flags lower-half " "slabs/tears, and writes JSON/PGM artifacts for review." ) ) parser.add_argument("--host", default="", help="optional SSH host, e.g. tethys") parser.add_argument("--source", choices=["device", "x11"], default="device") parser.add_argument("--device", default="auto", help="video device or auto") parser.add_argument("--display", default=":0", help="X11 display for --source x11") parser.add_argument("--crop", default="", help="X11 crop as x,y,width,height for --source x11") parser.add_argument("--device-label", default=DEFAULT_DEVICE_LABEL) parser.add_argument("--width", type=int, default=1280) parser.add_argument("--height", type=int, default=720) parser.add_argument("--fps", type=int, default=30) parser.add_argument("--duration", type=float, default=180.0) parser.add_argument("--artifact-dir", default="") parser.add_argument("--remote-artifact-dir", default="") parser.add_argument("--x-step", type=int, default=8) parser.add_argument("--y-step", type=int, default=4) parser.add_argument("--bands", type=int, default=24) parser.add_argument("--flat-var", type=float, default=18.0) parser.add_argument("--delta-threshold", type=float, default=24.0) parser.add_argument("--jump-threshold", type=float, default=34.0) parser.add_argument("--tear-threshold", type=float, default=18.0) parser.add_argument("--shift-threshold", type=float, default=10.0) parser.add_argument("--shift-improvement", type=float, default=1.35) parser.add_argument("--blur-delta-threshold", type=float, default=6.0) parser.add_argument("--blur-var-ratio", type=float, default=0.45) parser.add_argument("--change-threshold", type=float, default=1.0) parser.add_argument("--max-suspicious-artifacts", type=int, default=40) parser.add_argument("--max-reference-artifacts", type=int, default=12) parser.add_argument("--reference-every", type=int, default=900) parser.add_argument("--progress-every", type=int, default=150) parser.add_argument("--self-test", action="store_true") return parser.parse_args() def timestamp() -> str: return time.strftime("%Y%m%d-%H%M%S", time.gmtime()) def default_artifact_dir() -> pathlib.Path: return pathlib.Path("/tmp") / f"lesavka-rct-uvc-artifact-probe-{timestamp()}" def run_remote(args: argparse.Namespace) -> int: local_artifact_dir = pathlib.Path(args.artifact_dir or f"artifacts/rct-uvc/{timestamp()}") remote_artifact_dir = args.remote_artifact_dir or f"/tmp/lesavka-rct-uvc-artifact-probe-{timestamp()}" remote_script = f"/tmp/lesavka-rct-uvc-artifact-probe-{os.getpid()}.py" script_text = pathlib.Path(__file__).read_text() subprocess.run( ["ssh", args.host, f"cat > {shlex.quote(remote_script)} && chmod +x {shlex.quote(remote_script)}"], input=script_text, text=True, check=True, ) remote_cmd = [ "python3", remote_script, "--source", args.source, "--device", args.device, "--device-label", args.device_label, "--display", args.display, "--crop", args.crop, "--width", str(args.width), "--height", str(args.height), "--fps", str(args.fps), "--duration", str(args.duration), "--artifact-dir", remote_artifact_dir, "--x-step", str(args.x_step), "--y-step", str(args.y_step), "--bands", str(args.bands), "--flat-var", str(args.flat_var), "--delta-threshold", str(args.delta_threshold), "--jump-threshold", str(args.jump_threshold), "--tear-threshold", str(args.tear_threshold), "--shift-threshold", str(args.shift_threshold), "--shift-improvement", str(args.shift_improvement), "--blur-delta-threshold", str(args.blur_delta_threshold), "--blur-var-ratio", str(args.blur_var_ratio), "--change-threshold", str(args.change_threshold), "--max-suspicious-artifacts", str(args.max_suspicious_artifacts), "--max-reference-artifacts", str(args.max_reference_artifacts), "--reference-every", str(args.reference_every), "--progress-every", str(args.progress_every), ] print(f"running remote RCT UVC probe on {args.host}: {remote_artifact_dir}", file=sys.stderr) rc = subprocess.run(["ssh", args.host, " ".join(shlex.quote(part) for part in remote_cmd)]).returncode local_artifact_dir.parent.mkdir(parents=True, exist_ok=True) subprocess.run( ["scp", "-r", f"{args.host}:{remote_artifact_dir}", str(local_artifact_dir)], check=False, ) print(f"artifact_dir: {local_artifact_dir}") return rc def detect_video_device(label: str) -> str: explicit = os.environ.get("LESAVKA_RCT_UVC_DEVICE") if explicit: return explicit try: listing = subprocess.check_output(["v4l2-ctl", "--list-devices"], text=True) except Exception: return "/dev/video2" current_matches = False for line in listing.splitlines(): if not line.startswith(("\t", " ")): current_matches = label.lower() in line.lower() continue value = line.strip() if current_matches and value.startswith("/dev/video"): return value return "/dev/video2" def band_stats(frame: bytes, width: int, y0: int, y1: int, x_step: int, y_step: int) -> tuple[float, float]: total = 0 total2 = 0 count = 0 view = memoryview(frame) for y in range(y0, y1, y_step): row = y * width for x in range(0, width, x_step): value = view[row + x] total += value total2 += value * value count += 1 if count == 0: return 0.0, 0.0 mean = total / count variance = max(0.0, (total2 / count) - (mean * mean)) return mean, variance def band_delta( frame: bytes, previous: bytes | None, width: int, y0: int, y1: int, x_step: int, y_step: int ) -> float: if previous is None: return 0.0 total = 0 count = 0 view = memoryview(frame) prev = memoryview(previous) for y in range(y0, y1, y_step): row = y * width for x in range(0, width, x_step): total += abs(view[row + x] - prev[row + x]) count += 1 return total / count if count else 0.0 def band_deltas( frame: bytes, previous: bytes | None, width: int, height: int, band_count: int, x_step: int, y_step: int, ) -> list[float]: if previous is None: return [0.0] * band_count band_h = max(1, height // band_count) values: list[float] = [] for band in range(band_count): y0 = band * band_h y1 = height if band == band_count - 1 else min(height, y0 + band_h) values.append(band_delta(frame, previous, width, y0, y1, x_step, y_step)) return values def shifted_delta( frame: bytes, previous: bytes, width: int, y0: int, y1: int, shift: int, x_step: int, y_step: int, ) -> float: x0 = max(0, -shift) x1 = min(width, width - shift) if x0 >= x1: return 0.0 total = 0 count = 0 view = memoryview(frame) prev = memoryview(previous) for y in range(y0, y1, y_step): row = y * width for x in range(x0, x1, x_step): total += abs(view[row + x] - prev[row + x + shift]) count += 1 return total / count if count else 0.0 def best_shift_match( frame: bytes, previous: bytes | None, width: int, height: int, x_step: int, y_step: int, ) -> tuple[int, float, float, float]: if previous is None: return 0, 0.0, 0.0, 0.0 y0 = height // 4 y1 = height zero = shifted_delta(frame, previous, width, y0, y1, 0, x_step, y_step) best = zero best_shift = 0 for shift in [-96, -80, -64, -48, -32, -24, -16, -12, -8, 8, 12, 16, 24, 32, 48, 64, 80, 96]: candidate = shifted_delta(frame, previous, width, y0, y1, shift, x_step, y_step) if candidate < best: best = candidate best_shift = shift improvement = zero / max(best, 0.001) if best_shift else 1.0 return best_shift, zero, best, improvement def max_run(flags: list[bool]) -> int: best = 0 cur = 0 for flag in flags: cur = cur + 1 if flag else 0 best = max(best, cur) return best def analyze_frame(frame: bytes, previous: bytes | None, args: argparse.Namespace) -> dict[str, Any]: width = args.width height = args.height band_count = max(8, args.bands) band_h = max(1, height // band_count) means: list[float] = [] variances: list[float] = [] for band in range(band_count): y0 = band * band_h y1 = height if band == band_count - 1 else min(height, y0 + band_h) mean, variance = band_stats(frame, width, y0, y1, args.x_step, args.y_step) means.append(mean) variances.append(variance) half = band_count // 2 lower_flags = [var < args.flat_var for var in variances[half:]] lower_flat_pct = sum(lower_flags) / max(1, len(lower_flags)) lower_flat_run_pct = max_run(lower_flags) / max(1, len(lower_flags)) upper_delta = band_delta(frame, previous, width, 0, height // 2, args.x_step, args.y_step) lower_delta = band_delta(frame, previous, width, height // 2, height, args.x_step, args.y_step) temporal_deltas = band_deltas(frame, previous, width, height, band_count, args.x_step, args.y_step) jumps = [abs(means[idx] - means[idx - 1]) for idx in range(1, band_count)] lower_jumps = jumps[half:] max_lower_jump = max(lower_jumps or [0.0]) sorted_jumps = sorted(jumps) median_jump = sorted_jumps[len(sorted_jumps) // 2] if sorted_jumps else 0.0 sorted_temporal = sorted(temporal_deltas) median_temporal_delta = sorted_temporal[len(sorted_temporal) // 2] if sorted_temporal else 0.0 max_temporal_delta = max(temporal_deltas or [0.0]) max_temporal_band = temporal_deltas.index(max_temporal_delta) if temporal_deltas else 0 shift_pixels, shift_zero_delta, shift_best_delta, shift_improvement = best_shift_match( frame, previous, width, height, args.x_step, args.y_step ) reasons: list[str] = [] temporal_lower_jump = lower_delta > args.delta_threshold and lower_delta > max(upper_delta * 2.2, 8.0) lower_delta_skew = temporal_lower_jump lower_boundary_jump = ( temporal_lower_jump and max_lower_jump > args.jump_threshold and max_lower_jump > max(median_jump * 3.0, 8.0) ) lower_flat_flash = lower_flat_pct >= 0.25 and temporal_lower_jump lower_slab = lower_flat_run_pct >= 0.33 and max_lower_jump > args.jump_threshold and temporal_lower_jump temporal_delta_spike = ( previous is not None and max_temporal_delta > args.tear_threshold and max_temporal_delta > max(median_temporal_delta * 3.2, 6.0) ) horizontal_shift = ( bool(shift_pixels) and shift_zero_delta > args.shift_threshold and shift_improvement > args.shift_improvement ) upper_variance_mean = sum(variances[:half]) / max(1, len(variances[:half])) lower_blur_flash = ( previous is not None and lower_delta > args.blur_delta_threshold and lower_delta > max(upper_delta * 1.35, 2.0) and min(variances[half:] or [0.0]) < max(args.flat_var * 4.0, upper_variance_mean * args.blur_var_ratio) ) if lower_delta_skew: reasons.append("lower_delta_skew") if lower_boundary_jump: reasons.append("lower_boundary_jump") if lower_flat_flash: reasons.append("lower_flat_flash") if lower_slab: reasons.append("lower_slab") if temporal_delta_spike: reasons.append("temporal_delta_spike") if horizontal_shift: reasons.append("horizontal_shift") if lower_blur_flash: reasons.append("lower_blur_flash") suspicious = bool( lower_delta_skew or lower_boundary_jump or lower_flat_flash or lower_slab or temporal_delta_spike or horizontal_shift or lower_blur_flash ) return { "suspicious": suspicious, "reasons": reasons, "upper_delta": round(upper_delta, 3), "lower_delta": round(lower_delta, 3), "lower_flat_pct": round(lower_flat_pct, 3), "lower_flat_run_pct": round(lower_flat_run_pct, 3), "max_lower_jump": round(max_lower_jump, 3), "median_band_jump": round(median_jump, 3), "max_temporal_delta": round(max_temporal_delta, 3), "max_temporal_band": max_temporal_band, "median_temporal_delta": round(median_temporal_delta, 3), "shift_pixels": shift_pixels, "shift_zero_delta": round(shift_zero_delta, 3), "shift_best_delta": round(shift_best_delta, 3), "shift_improvement": round(shift_improvement, 3), "lower_variance_min": round(min(variances[half:] or [0.0]), 3), "lower_variance_mean": round(sum(variances[half:]) / max(1, len(variances[half:])), 3), } def write_pgm(path: pathlib.Path, frame: bytes, width: int, height: int) -> None: path.write_bytes(f"P5\n{width} {height}\n255\n".encode() + frame) def parse_crop(value: str, args: argparse.Namespace) -> tuple[int, int, int, int]: if not value: return (0, 0, args.width, args.height) parts = [part.strip() for part in value.split(",")] if len(parts) != 4: raise SystemExit("--crop must be x,y,width,height") try: x, y, width, height = [int(part) for part in parts] except ValueError as exc: raise SystemExit("--crop values must be integers") from exc if width <= 0 or height <= 0: raise SystemExit("--crop width and height must be positive") args.width = width args.height = height return (x, y, width, height) def ffmpeg_cmd(device: str, args: argparse.Namespace) -> list[str]: if args.source == "x11": x, y, width, height = parse_crop(args.crop, args) display = f"{args.display}+{x},{y}" return [ "ffmpeg", "-hide_banner", "-nostdin", "-loglevel", "warning", "-f", "x11grab", "-video_size", f"{width}x{height}", "-framerate", str(args.fps), "-i", display, "-an", "-pix_fmt", "gray", "-f", "rawvideo", "-", ] return [ "ffmpeg", "-hide_banner", "-nostdin", "-loglevel", "warning", "-f", "v4l2", "-input_format", "mjpeg", "-video_size", f"{args.width}x{args.height}", "-framerate", str(args.fps), "-i", device, "-an", "-pix_fmt", "gray", "-f", "rawvideo", "-", ] def run_capture(args: argparse.Namespace) -> int: artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else default_artifact_dir() artifact_dir.mkdir(parents=True, exist_ok=True) device = detect_video_device(args.device_label) if args.device == "auto" else args.device command = ffmpeg_cmd(device, args) (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") frame_size = args.width * args.height stderr_path = artifact_dir / "ffmpeg.stderr" jsonl_path = artifact_dir / "frame-metrics.jsonl" started = time.monotonic() previous: bytes | None = None frame_index = 0 suspicious_count = 0 artifacts_written = 0 reference_artifacts_written = 0 changed_frames = 0 static_frames = 0 reason_counts: collections.Counter[str] = collections.Counter() worst: list[dict[str, Any]] = [] max_upper_delta = 0.0 max_lower_delta = 0.0 max_lower_jump_seen = 0.0 with stderr_path.open("wb") as err, jsonl_path.open("w") as metrics: proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) assert proc.stdout is not None try: while time.monotonic() - started < args.duration: frame = proc.stdout.read(frame_size) if len(frame) != frame_size: break frame_index += 1 result = analyze_frame(frame, previous, args) previous = frame result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)}) max_upper_delta = max(max_upper_delta, float(result["upper_delta"])) max_lower_delta = max(max_lower_delta, float(result["lower_delta"])) max_lower_jump_seen = max(max_lower_jump_seen, float(result["max_lower_jump"])) if frame_index > 1: if float(result["upper_delta"]) + float(result["lower_delta"]) >= args.change_threshold: changed_frames += 1 else: static_frames += 1 if result["suspicious"]: suspicious_count += 1 reason_counts.update(result["reasons"]) worst.append(result) worst = sorted( worst, key=lambda item: (item["max_lower_jump"], item["lower_delta"]), reverse=True, )[:20] if artifacts_written < args.max_suspicious_artifacts: write_pgm( artifact_dir / f"suspicious_{frame_index:06d}.pgm", frame, args.width, args.height, ) artifacts_written += 1 should_write_reference = ( frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0) ) if should_write_reference and reference_artifacts_written < args.max_reference_artifacts: write_pgm( artifact_dir / f"reference_{frame_index:06d}.pgm", frame, args.width, args.height, ) reference_artifacts_written += 1 if frame_index <= 5 or result["suspicious"] or frame_index % args.progress_every == 0: metrics.write(json.dumps(result, sort_keys=True) + "\n") if frame_index % args.progress_every == 0: print( f"frames={frame_index} suspicious={suspicious_count} latest={result}", file=sys.stderr, ) finally: proc.terminate() try: proc.wait(timeout=3) except subprocess.TimeoutExpired: proc.kill() elapsed = max(0.001, time.monotonic() - started) summary = { "schema": "lesavka.rct-uvc-artifact-probe.v1", "source": args.source, "device": device, "width": args.width, "height": args.height, "fps_requested": args.fps, "duration_requested_s": args.duration, "duration_observed_s": round(elapsed, 3), "frames": frame_index, "fps_observed": round(frame_index / elapsed, 3), "suspicious_frames": suspicious_count, "suspicious_pct": round((suspicious_count / frame_index * 100.0) if frame_index else 0.0, 3), "changed_frames": changed_frames, "static_frames": static_frames, "static_pct": round((static_frames / max(1, frame_index - 1) * 100.0) if frame_index > 1 else 0.0, 3), "max_upper_delta": round(max_upper_delta, 3), "max_lower_delta": round(max_lower_delta, 3), "max_lower_jump_seen": round(max_lower_jump_seen, 3), "reason_counts": dict(reason_counts), "worst_frames": worst, "reference_artifacts": reference_artifacts_written, "suspicious_artifacts": artifacts_written, "artifact_dir": str(artifact_dir), "ffmpeg_stderr": str(stderr_path), } (artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") (artifact_dir / "summary.txt").write_text(format_summary(summary)) print(format_summary(summary), end="") print(f"artifact_dir: {artifact_dir}") return 0 if frame_index > 0 else 2 def format_summary(summary: dict[str, Any]) -> str: lines = [ "Lesavka RCT UVC artifact probe", f"source: {summary.get('source', 'device')}", f"device: {summary['device']}", f"mode: {summary['width']}x{summary['height']}@{summary['fps_requested']}", f"frames: {summary['frames']} ({summary['fps_observed']} fps observed)", f"suspicious: {summary['suspicious_frames']} ({summary['suspicious_pct']}%)", f"static: {summary.get('static_frames', 0)} ({summary.get('static_pct', 0.0)}%)", f"max deltas: upper={summary.get('max_upper_delta', 0.0)} lower={summary.get('max_lower_delta', 0.0)}", f"reasons: {summary['reason_counts']}", f"reference artifacts: {summary.get('reference_artifacts', 0)}", f"suspicious artifacts: {summary.get('suspicious_artifacts', 0)}", f"artifacts: {summary['artifact_dir']}", "", ] return "\n".join(lines) def synthetic_frame(width: int, height: int, shift: int = 0, corrupt: bool = False) -> bytes: data = bytearray(width * height) for y in range(height): row = y * width for x in range(width): data[row + x] = (x // 8 + y // 4 + shift) % 256 if corrupt: for y in range(height // 2, height): row = y * width data[row : row + width] = bytes([128]) * width return bytes(data) def synthetic_shift_frame(previous: bytes, width: int, height: int, shift: int) -> bytes: data = bytearray(width * height) for y in range(height): row = y * width for x in range(width): src = min(width - 1, max(0, x + shift)) data[row + x] = previous[row + src] return bytes(data) def synthetic_rich_frame(width: int, height: int) -> bytes: data = bytearray(width * height) for y in range(height): row = y * width for x in range(width): data[row + x] = ((x * 13) ^ (y * 7) ^ ((x * y) // 11)) % 256 return bytes(data) def run_self_test(args: argparse.Namespace) -> int: artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else default_artifact_dir() artifact_dir.mkdir(parents=True, exist_ok=True) args.width = 160 args.height = 90 args.shift_threshold = 3.0 args.tear_threshold = 8.0 frames = [synthetic_frame(args.width, args.height, idx) for idx in range(4)] frames.append(synthetic_frame(args.width, args.height, 5, corrupt=True)) rich = synthetic_rich_frame(args.width, args.height) frames.append(rich) frames.append(synthetic_shift_frame(rich, args.width, args.height, 24)) previous = None suspicious = 0 records = [] for idx, frame in enumerate(frames, start=1): result = analyze_frame(frame, previous, args) previous = frame result["frame"] = idx records.append(result) if idx == 1: write_pgm(artifact_dir / "reference_000001.pgm", frame, args.width, args.height) if result["suspicious"]: suspicious += 1 write_pgm(artifact_dir / f"selftest_suspicious_{idx:06d}.pgm", frame, args.width, args.height) summary = { "schema": "lesavka.rct-uvc-artifact-probe.self-test.v1", "frames": len(frames), "suspicious_frames": suspicious, "records": records, "artifact_dir": str(artifact_dir), } (artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") print(json.dumps(summary, indent=2, sort_keys=True)) return 0 if suspicious >= 1 else 1 def main() -> int: args = parse_args() if args.self_test: return run_self_test(args) if args.host and args.host not in {"localhost", "127.0.0.1"}: if not shutil.which("ssh") or not shutil.which("scp"): print("ssh and scp are required for --host", file=sys.stderr) return 2 return run_remote(args) return run_capture(args) if __name__ == "__main__": raise SystemExit(main())