diff --git a/Cargo.lock b/Cargo.lock index 573cd53..509edae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.46" +version = "0.22.47" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.46" +version = "0.22.47" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.46" +version = "0.22.47" dependencies = [ "anyhow", "base64", diff --git a/Cargo.toml b/Cargo.toml index 1943347..11eaef5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -472,6 +472,10 @@ path = "tests/manual/server/rct/server_rct_mode_matrix_manual_contract.rs" name = "rct_uvc_artifact_probe_manual_contract" path = "tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs" +[[test]] +name = "synthetic_rct_uvc_probe_manual_contract" +path = "tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs" + [[test]] name = "google_meet_observer_manual_contract" path = "tests/manual/google_meet/google_meet_observer_manual_contract.rs" diff --git a/client/Cargo.toml b/client/Cargo.toml index a438f9a..3bda9ad 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.46" +version = "0.22.47" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 20fb27e..59794c0 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.46" +version = "0.22.47" edition = "2024" build = "build.rs" diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 313bea6..829a2c1 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -1524,6 +1524,7 @@ run_as_user env TMPDIR="$TMPDIR" bash -c "cd '$SRC_DIR/server' && cargo clean && echo "==> 5. Install binaries" install_verified_executable "$SRC_DIR/target/release/lesavka-server" /usr/local/bin/lesavka-server "lesavka-server" install_verified_executable "$SRC_DIR/target/release/lesavka-uvc" /usr/local/bin/lesavka-uvc "lesavka-uvc" +install_verified_executable "$SRC_DIR/target/release/lesavka-synthetic-uplink" /usr/local/bin/lesavka-synthetic-uplink "lesavka-synthetic-uplink" install_verified_executable "$SRC_DIR/scripts/daemon/lesavka-core.sh" /usr/local/bin/lesavka-core.sh "lesavka-core.sh" install_verified_executable "$SRC_DIR/scripts/daemon/lesavka-uvc.sh" /usr/local/bin/lesavka-uvc.sh "lesavka-uvc.sh" install_verified_executable "$SRC_DIR/scripts/daemon/lesavka-recovery-ladder.sh" /usr/local/bin/lesavka-recovery-ladder "lesavka-recovery-ladder" diff --git a/scripts/manual/run_synthetic_rct_uvc_probe.py b/scripts/manual/run_synthetic_rct_uvc_probe.py new file mode 100755 index 0000000..daf1f62 --- /dev/null +++ b/scripts/manual/run_synthetic_rct_uvc_probe.py @@ -0,0 +1,710 @@ +#!/usr/bin/env python3 +"""Run synthetic Lesavka uplink media and compare what the RCT receives.""" + +from __future__ import annotations + +import argparse +import collections +import json +import os +import pathlib +import shlex +import shutil +import subprocess +import sys +import time +from typing import Any + +DEFAULT_DEVICE_LABEL = "Lesavka Composite" +DEFAULT_MODES = "1280x720@20,1280x720@30,1920x1080@20,1920x1080@30" +MARKER_BITS = 32 +MARKER_COLUMNS = 16 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Manual synthetic end-to-end probe: Theia sends sequence-coded media " + "through StreamWebcamMedia while Tethys captures the received UVC/X11 " + "frames and compares them to the generated source." + ) + ) + parser.add_argument("--inject-host", default="", help="Theia SSH host, e.g. titan-jh") + parser.add_argument("--rct-host", default="", help="RCT SSH host, e.g. tethys") + parser.add_argument("--server", default="http://127.0.0.1:50051") + parser.add_argument("--inject-binary", default="/usr/local/bin/lesavka-synthetic-uplink") + parser.add_argument("--mode", default="1280x720@30", help=f"one mode; baseline set is {DEFAULT_MODES}") + parser.add_argument("--width", type=int, default=0, help="override capture width") + parser.add_argument("--height", type=int, default=0, help="override capture height") + parser.add_argument("--fps", type=int, default=0, help="override capture fps") + parser.add_argument("--duration", type=float, default=300.0) + parser.add_argument("--source", choices=["device", "x11"], default="device") + parser.add_argument("--device", default="auto") + parser.add_argument("--device-label", default=DEFAULT_DEVICE_LABEL) + parser.add_argument("--display", default=":0") + parser.add_argument("--crop", default="", help="x,y,width,height for --source x11") + parser.add_argument("--artifact-dir", default="") + parser.add_argument("--remote-rct-dir", default="") + parser.add_argument("--remote-inject-dir", default="") + parser.add_argument("--x-step", type=int, default=8) + parser.add_argument("--y-step", type=int, default=4) + parser.add_argument("--bands", type=int, default=24) + parser.add_argument("--mae-threshold", type=float, default=18.0) + parser.add_argument("--lower-mae-threshold", type=float, default=28.0) + parser.add_argument("--lower-skew-ratio", type=float, default=1.8) + parser.add_argument("--slab-var", type=float, default=20.0) + parser.add_argument("--shift-threshold", type=float, default=16.0) + parser.add_argument("--shift-improvement", type=float, default=1.25) + parser.add_argument("--max-suspicious-artifacts", type=int, default=80) + parser.add_argument("--max-reference-artifacts", type=int, default=12) + parser.add_argument("--reference-every", type=int, default=900) + parser.add_argument("--progress-every", type=int, default=150) + parser.add_argument("--capture-only", action="store_true", help=argparse.SUPPRESS) + parser.add_argument("--self-test", action="store_true") + return parser.parse_args() + + +def timestamp() -> str: + return time.strftime("%Y%m%d-%H%M%S", time.gmtime()) + + +def parse_mode(value: str) -> tuple[int, int, int]: + try: + size, fps = value.lower().split("@", 1) + width, height = size.split("x", 1) + return int(width), int(height), int(fps) + except ValueError as exc: + raise SystemExit(f"--mode must look like WIDTHxHEIGHT@FPS, got {value!r}") from exc + + +def mode_dimensions(args: argparse.Namespace) -> tuple[int, int, int]: + width, height, fps = parse_mode(args.mode) + if args.width: + width = args.width + if args.height: + height = args.height + if args.fps: + fps = args.fps + if width <= 0 or height <= 0 or fps <= 0: + raise SystemExit("width, height, and fps must be positive") + return width, height, fps + + +def default_artifact_dir(mode: str) -> pathlib.Path: + safe_mode = mode.replace("@", "-").replace("x", "x") + return pathlib.Path("artifacts/synthetic-rct") / f"{safe_mode}-{timestamp()}" + + +def run_remote_orchestrated(args: argparse.Namespace) -> int: + if not args.inject_host or not args.rct_host: + raise SystemExit("--inject-host and --rct-host are required unless --capture-only or --self-test is used") + if not shutil.which("ssh") or not shutil.which("scp"): + raise SystemExit("ssh and scp are required for the remote synthetic probe") + width, height, fps = mode_dimensions(args) + artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else default_artifact_dir(args.mode) + artifact_dir.mkdir(parents=True, exist_ok=True) + remote_rct_dir = args.remote_rct_dir or f"/tmp/lesavka-synthetic-rct-capture-{timestamp()}" + remote_inject_dir = args.remote_inject_dir or f"/tmp/lesavka-synthetic-uplink-{timestamp()}" + remote_script = f"/tmp/lesavka-synthetic-rct-probe-{os.getpid()}.py" + script_text = pathlib.Path(__file__).read_text() + subprocess.run( + ["ssh", args.rct_host, f"cat > {shlex.quote(remote_script)} && chmod +x {shlex.quote(remote_script)}"], + input=script_text, + text=True, + check=True, + ) + + capture_cmd = [ + "python3", + remote_script, + "--capture-only", + "--mode", + args.mode, + "--width", + str(width), + "--height", + str(height), + "--fps", + str(fps), + "--duration", + str(args.duration), + "--source", + args.source, + "--device", + args.device, + "--device-label", + args.device_label, + "--display", + args.display, + "--crop", + args.crop, + "--artifact-dir", + remote_rct_dir, + "--x-step", + str(args.x_step), + "--y-step", + str(args.y_step), + "--bands", + str(args.bands), + "--mae-threshold", + str(args.mae_threshold), + "--lower-mae-threshold", + str(args.lower_mae_threshold), + "--lower-skew-ratio", + str(args.lower_skew_ratio), + "--slab-var", + str(args.slab_var), + "--shift-threshold", + str(args.shift_threshold), + "--shift-improvement", + str(args.shift_improvement), + "--max-suspicious-artifacts", + str(args.max_suspicious_artifacts), + "--max-reference-artifacts", + str(args.max_reference_artifacts), + "--reference-every", + str(args.reference_every), + "--progress-every", + str(args.progress_every), + ] + inject_cmd = [ + args.inject_binary, + "--server", + args.server, + "--mode", + args.mode, + "--duration", + str(args.duration + 2.0), + "--artifact-dir", + remote_inject_dir, + "--print-every", + str(args.progress_every), + ] + (artifact_dir / "orchestrator-command.txt").write_text(" ".join(sys.argv) + "\n") + (artifact_dir / "mode.json").write_text( + json.dumps( + { + "schema": "lesavka.synthetic-rct-probe.run.v1", + "mode": args.mode, + "width": width, + "height": height, + "fps": fps, + "source": args.source, + "duration_s": args.duration, + "inject_host": args.inject_host, + "rct_host": args.rct_host, + }, + indent=2, + sort_keys=True, + ) + + "\n" + ) + print(f"starting RCT capture on {args.rct_host}: {remote_rct_dir}", file=sys.stderr) + capture = subprocess.Popen(["ssh", args.rct_host, " ".join(shlex.quote(part) for part in capture_cmd)]) + time.sleep(1.0) + print(f"starting synthetic uplink on {args.inject_host}: {remote_inject_dir}", file=sys.stderr) + inject = subprocess.Popen(["ssh", args.inject_host, " ".join(shlex.quote(part) for part in inject_cmd)]) + inject_rc = inject.wait() + capture_rc = capture.wait() + local_capture = artifact_dir / "capture" + local_inject = artifact_dir / "inject" + subprocess.run(["scp", "-r", f"{args.rct_host}:{remote_rct_dir}", str(local_capture)], check=False) + subprocess.run(["scp", "-r", f"{args.inject_host}:{remote_inject_dir}", str(local_inject)], check=False) + summary = { + "schema": "lesavka.synthetic-rct-probe.orchestrator.v1", + "mode": args.mode, + "capture_rc": capture_rc, + "inject_rc": inject_rc, + "artifact_dir": str(artifact_dir), + "capture_artifacts": str(local_capture), + "inject_artifacts": str(local_inject), + } + (artifact_dir / "run-summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") + print(json.dumps(summary, indent=2, sort_keys=True)) + print(f"artifact_dir: {artifact_dir}") + return 0 if capture_rc == 0 and inject_rc == 0 else 1 + + +def detect_video_device(label: str) -> str: + explicit = os.environ.get("LESAVKA_RCT_UVC_DEVICE") + if explicit: + return explicit + try: + listing = subprocess.check_output(["v4l2-ctl", "--list-devices"], text=True) + except Exception: + return "/dev/video2" + current_matches = False + for line in listing.splitlines(): + if not line.startswith(("\t", " ")): + current_matches = label.lower() in line.lower() + continue + value = line.strip() + if current_matches and value.startswith("/dev/video"): + return value + return "/dev/video2" + + +def parse_crop(args: argparse.Namespace, width: int, height: int) -> tuple[int, int, int, int]: + if not args.crop: + return 0, 0, width, height + parts = [part.strip() for part in args.crop.split(",")] + if len(parts) != 4: + raise SystemExit("--crop must be x,y,width,height") + x, y, crop_width, crop_height = [int(part) for part in parts] + if crop_width <= 0 or crop_height <= 0: + raise SystemExit("--crop width and height must be positive") + return x, y, crop_width, crop_height + + +def ffmpeg_cmd(args: argparse.Namespace, width: int, height: int) -> tuple[list[str], int, int, str]: + if args.source == "x11": + x, y, capture_width, capture_height = parse_crop(args, width, height) + display = f"{args.display}+{x},{y}" + return ( + [ + "ffmpeg", + "-hide_banner", + "-nostdin", + "-loglevel", + "warning", + "-f", + "x11grab", + "-video_size", + f"{capture_width}x{capture_height}", + "-framerate", + str(args.fps or parse_mode(args.mode)[2]), + "-i", + display, + "-an", + "-pix_fmt", + "gray", + "-f", + "rawvideo", + "-", + ], + capture_width, + capture_height, + display, + ) + device = detect_video_device(args.device_label) if args.device == "auto" else args.device + return ( + [ + "ffmpeg", + "-hide_banner", + "-nostdin", + "-loglevel", + "warning", + "-f", + "v4l2", + "-input_format", + "mjpeg", + "-video_size", + f"{width}x{height}", + "-framerate", + str(args.fps or parse_mode(args.mode)[2]), + "-i", + device, + "-an", + "-pix_fmt", + "gray", + "-f", + "rawvideo", + "-", + ], + width, + height, + device, + ) + + +def marker_cell(width: int, height: int) -> int: + return max(6, min(16, min(width, height) // 80)) + + +def fill_rect(frame: bytearray, width: int, height: int, x0: int, y0: int, w: int, h: int, value: int) -> None: + for y in range(max(0, y0), min(height, y0 + h)): + row = y * width + for x in range(max(0, x0), min(width, x0 + w)): + frame[row + x] = value + + +def synthetic_gray(width: int, height: int, sequence: int) -> bytes: + data = bytearray(width * height) + moving_period = max(width // 3, 64) + moving_width = max(width // 18, 12) + moving_offset = (sequence * 17) % moving_period + center_x = width // 2 + center_y = height // 2 + for y in range(height): + row = y * width + for x in range(width): + value = (x * 3 + y * 5 + sequence * 11) & 0xFF + if (x + moving_offset) % moving_period < moving_width: + value = min(255, value + 70) + if abs(x - center_x) < width // 9 and abs(y - center_y) < height // 12: + value = 255 - value // 2 + if y >= height // 2 and (((x // 32) + (y // 24) + sequence) & 1) == 0: + value //= 3 + data[row + x] = value + draw_marker(data, width, height, sequence) + return bytes(data) + + +def draw_marker(frame: bytearray, width: int, height: int, sequence: int) -> None: + cell = marker_cell(width, height) + rows = (MARKER_BITS + MARKER_COLUMNS - 1) // MARKER_COLUMNS + if width < (MARKER_COLUMNS + 4) * cell or height < (rows + 4) * cell: + return + x0 = 2 * cell + y0 = 2 * cell + fill_rect(frame, width, height, cell, cell, (MARKER_COLUMNS + 2) * cell, (rows + 2) * cell, 32) + fill_rect(frame, width, height, x0 - cell, y0 - cell, cell, cell, 255) + fill_rect(frame, width, height, x0 + MARKER_COLUMNS * cell, y0 - cell, cell, cell, 0) + for bit in range(MARKER_BITS): + col = bit % MARKER_COLUMNS + row = bit // MARKER_COLUMNS + value = 255 if ((sequence >> bit) & 1) else 0 + fill_rect(frame, width, height, x0 + col * cell, y0 + row * cell, cell, cell, value) + + +def cell_mean(frame: bytes, width: int, x0: int, y0: int, cell: int) -> float: + total = 0 + count = 0 + inset = max(1, cell // 4) + for y in range(y0 + inset, y0 + cell - inset): + row = y * width + for x in range(x0 + inset, x0 + cell - inset): + total += frame[row + x] + count += 1 + return total / max(1, count) + + +def decode_sequence(frame: bytes, width: int, height: int) -> tuple[int | None, int]: + cell = marker_cell(width, height) + rows = (MARKER_BITS + MARKER_COLUMNS - 1) // MARKER_COLUMNS + if width < (MARKER_COLUMNS + 4) * cell or height < (rows + 4) * cell: + return None, MARKER_BITS + x0 = 2 * cell + y0 = 2 * cell + value = 0 + uncertain = 0 + for bit in range(MARKER_BITS): + col = bit % MARKER_COLUMNS + row = bit // MARKER_COLUMNS + mean = cell_mean(frame, width, x0 + col * cell, y0 + row * cell, cell) + if mean > 165: + value |= 1 << bit + elif mean >= 90: + uncertain += 1 + if uncertain > 6: + return None, uncertain + return value, uncertain + + +def sampled_abs_delta(a: bytes, b: bytes, width: int, y0: int, y1: int, x_step: int, y_step: int) -> float: + total = 0 + count = 0 + for y in range(y0, y1, y_step): + row = y * width + for x in range(0, width, x_step): + total += abs(a[row + x] - b[row + x]) + count += 1 + return total / max(1, count) + + +def band_stats(frame: bytes, width: int, y0: int, y1: int, x_step: int, y_step: int) -> tuple[float, float]: + total = 0 + total2 = 0 + count = 0 + for y in range(y0, y1, y_step): + row = y * width + for x in range(0, width, x_step): + value = frame[row + x] + total += value + total2 += value * value + count += 1 + mean = total / max(1, count) + return mean, max(0.0, total2 / max(1, count) - mean * mean) + + +def shifted_expected_delta(frame: bytes, expected: bytes, width: int, height: int, shift: int, args: argparse.Namespace) -> float: + x0 = max(0, -shift) + x1 = min(width, width - shift) + if x0 >= x1: + return 0.0 + y0 = height // 4 + total = 0 + count = 0 + for y in range(y0, height, args.y_step): + row = y * width + for x in range(x0, x1, args.x_step): + total += abs(frame[row + x] - expected[row + x + shift]) + count += 1 + return total / max(1, count) + + +def best_expected_shift(frame: bytes, expected: bytes, width: int, height: int, args: argparse.Namespace) -> tuple[int, float, float, float]: + zero = shifted_expected_delta(frame, expected, width, height, 0, args) + best = zero + best_shift = 0 + for shift in [-128, -96, -80, -64, -48, -32, -24, -16, -12, -8, 8, 12, 16, 24, 32, 48, 64, 80, 96, 128]: + candidate = shifted_expected_delta(frame, expected, width, height, shift, args) + if candidate < best: + best = candidate + best_shift = shift + improvement = zero / max(best, 0.001) if best_shift else 1.0 + return best_shift, zero, best, improvement + + +def max_run(flags: list[bool]) -> int: + best = 0 + current = 0 + for flag in flags: + current = current + 1 if flag else 0 + best = max(best, current) + return best + + +def analyze_frame( + frame: bytes, + width: int, + height: int, + args: argparse.Namespace, + previous_seq: int | None, +) -> dict[str, Any]: + sequence, uncertain_bits = decode_sequence(frame, width, height) + expected = synthetic_gray(width, height, sequence or 0) if sequence is not None else None + upper_mae = lower_mae = total_mae = 0.0 + shift_pixels = 0 + shift_zero_delta = shift_best_delta = shift_improvement = 0.0 + if expected is not None: + upper_mae = sampled_abs_delta(frame, expected, width, 0, height // 2, args.x_step, args.y_step) + lower_mae = sampled_abs_delta(frame, expected, width, height // 2, height, args.x_step, args.y_step) + total_mae = sampled_abs_delta(frame, expected, width, 0, height, args.x_step, args.y_step) + shift_pixels, shift_zero_delta, shift_best_delta, shift_improvement = best_expected_shift(frame, expected, width, height, args) + + band_count = max(8, args.bands) + band_h = max(1, height // band_count) + means: list[float] = [] + variances: list[float] = [] + for band in range(band_count): + y0 = band * band_h + y1 = height if band == band_count - 1 else min(height, y0 + band_h) + mean, variance = band_stats(frame, width, y0, y1, args.x_step, args.y_step) + means.append(mean) + variances.append(variance) + lower = band_count // 2 + lower_flags = [var < args.slab_var for var in variances[lower:]] + low_var_run = max_run(lower_flags) / max(1, len(lower_flags)) + mean_jumps = [abs(means[idx] - means[idx - 1]) for idx in range(1, band_count)] + max_lower_jump = max(mean_jumps[lower:] or [0.0]) + + reasons: list[str] = [] + if sequence is None: + reasons.append("marker_decode_failed") + elif previous_seq is not None: + if sequence == previous_seq: + reasons.append("frame_repeat") + elif sequence > previous_seq + 1: + reasons.append("frame_gap") + elif sequence < previous_seq: + reasons.append("frame_backwards") + if expected is not None: + if lower_mae > args.lower_mae_threshold and lower_mae > max(upper_mae * args.lower_skew_ratio, args.lower_mae_threshold): + reasons.append("lower_half_tear") + if total_mae > args.mae_threshold and lower_mae <= max(upper_mae * args.lower_skew_ratio, args.lower_mae_threshold): + reasons.append("high_mae") + if low_var_run >= 0.25 and lower_mae > args.lower_mae_threshold: + reasons.append("black_or_gray_slab") + if shift_pixels and shift_zero_delta > args.shift_threshold and shift_improvement > args.shift_improvement: + reasons.append("horizontal_shift") + return { + "suspicious": bool(reasons), + "reasons": reasons, + "decoded_sequence": sequence, + "marker_uncertain_bits": uncertain_bits, + "upper_mae": round(upper_mae, 3), + "lower_mae": round(lower_mae, 3), + "total_mae": round(total_mae, 3), + "lower_low_variance_run_pct": round(low_var_run, 3), + "max_lower_jump": round(max_lower_jump, 3), + "shift_pixels": shift_pixels, + "shift_zero_delta": round(shift_zero_delta, 3), + "shift_best_delta": round(shift_best_delta, 3), + "shift_improvement": round(shift_improvement, 3), + } + + +def write_pgm(path: pathlib.Path, frame: bytes, width: int, height: int) -> None: + path.write_bytes(f"P5\n{width} {height}\n255\n".encode() + frame) + + +def run_capture(args: argparse.Namespace) -> int: + width, height, fps = mode_dimensions(args) + command, capture_width, capture_height, device = ffmpeg_cmd(args, width, height) + artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else pathlib.Path("/tmp") / f"lesavka-synthetic-rct-capture-{timestamp()}" + artifact_dir.mkdir(parents=True, exist_ok=True) + frame_size = capture_width * capture_height + (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") + stderr_path = artifact_dir / "ffmpeg.stderr" + metrics_path = artifact_dir / "frame-metrics.jsonl" + started = time.monotonic() + frame_index = 0 + suspicious_count = 0 + reference_artifacts = 0 + suspicious_artifacts = 0 + previous_seq: int | None = None + decoded_frames = 0 + reason_counts: collections.Counter[str] = collections.Counter() + max_total_mae = max_upper_mae = max_lower_mae = 0.0 + worst: list[dict[str, Any]] = [] + with stderr_path.open("wb") as err, metrics_path.open("w") as metrics: + proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) + assert proc.stdout is not None + try: + while time.monotonic() - started < args.duration: + frame = proc.stdout.read(frame_size) + if len(frame) != frame_size: + break + frame_index += 1 + result = analyze_frame(frame, capture_width, capture_height, args, previous_seq) + decoded_seq = result["decoded_sequence"] + if decoded_seq is not None: + decoded_frames += 1 + previous_seq = int(decoded_seq) + result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)}) + max_total_mae = max(max_total_mae, float(result["total_mae"])) + max_upper_mae = max(max_upper_mae, float(result["upper_mae"])) + max_lower_mae = max(max_lower_mae, float(result["lower_mae"])) + if result["suspicious"]: + suspicious_count += 1 + reason_counts.update(result["reasons"]) + worst.append(result) + worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30] + if suspicious_artifacts < args.max_suspicious_artifacts: + seq_label = "unknown" if decoded_seq is None else f"seq{decoded_seq:08d}" + write_pgm(artifact_dir / f"suspicious_{frame_index:06d}_{seq_label}.pgm", frame, capture_width, capture_height) + if decoded_seq is not None: + write_pgm( + artifact_dir / f"expected_{frame_index:06d}_{seq_label}.pgm", + synthetic_gray(capture_width, capture_height, int(decoded_seq)), + capture_width, + capture_height, + ) + suspicious_artifacts += 1 + should_reference = frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0) + if should_reference and reference_artifacts < args.max_reference_artifacts: + write_pgm(artifact_dir / f"reference_{frame_index:06d}.pgm", frame, capture_width, capture_height) + reference_artifacts += 1 + metrics.write(json.dumps(result, sort_keys=True) + "\n") + if frame_index % args.progress_every == 0: + print(f"frames={frame_index} suspicious={suspicious_count} latest={result}", file=sys.stderr) + finally: + proc.terminate() + try: + proc.wait(timeout=3) + except subprocess.TimeoutExpired: + proc.kill() + elapsed = max(0.001, time.monotonic() - started) + summary = { + "schema": "lesavka.synthetic-rct-capture.v1", + "source": args.source, + "device": device, + "mode": args.mode, + "width": capture_width, + "height": capture_height, + "fps_requested": fps, + "duration_requested_s": args.duration, + "duration_observed_s": round(elapsed, 3), + "frames": frame_index, + "fps_observed": round(frame_index / elapsed, 3), + "decoded_frames": decoded_frames, + "decoded_pct": round(decoded_frames / frame_index * 100.0, 3) if frame_index else 0.0, + "suspicious_frames": suspicious_count, + "suspicious_pct": round(suspicious_count / frame_index * 100.0, 3) if frame_index else 0.0, + "reason_counts": dict(reason_counts), + "max_total_mae": round(max_total_mae, 3), + "max_upper_mae": round(max_upper_mae, 3), + "max_lower_mae": round(max_lower_mae, 3), + "worst_frames": worst, + "reference_artifacts": reference_artifacts, + "suspicious_artifacts": suspicious_artifacts, + "artifact_dir": str(artifact_dir), + "ffmpeg_stderr": str(stderr_path), + } + (artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") + (artifact_dir / "summary.txt").write_text(format_summary(summary)) + print(format_summary(summary), end="") + print(f"artifact_dir: {artifact_dir}") + return 0 if frame_index > 0 else 2 + + +def format_summary(summary: dict[str, Any]) -> str: + return "\n".join( + [ + "Lesavka synthetic RCT UVC comparison probe", + f"source: {summary['source']}", + f"device: {summary['device']}", + f"mode: {summary['mode']} capture={summary['width']}x{summary['height']}@{summary['fps_requested']}", + f"frames: {summary['frames']} ({summary['fps_observed']} fps observed)", + f"decoded markers: {summary['decoded_frames']} ({summary['decoded_pct']}%)", + f"suspicious: {summary['suspicious_frames']} ({summary['suspicious_pct']}%)", + f"reasons: {summary['reason_counts']}", + f"max mae: total={summary['max_total_mae']} upper={summary['max_upper_mae']} lower={summary['max_lower_mae']}", + f"artifacts: {summary['artifact_dir']}", + "", + ] + ) + + +def run_self_test(args: argparse.Namespace) -> int: + width = 320 + height = 180 + frames = [synthetic_gray(width, height, idx) for idx in range(6)] + corrupt = bytearray(synthetic_gray(width, height, 6)) + fill_rect(corrupt, width, height, 0, height // 2, width, height // 4, 128) + frames.append(bytes(corrupt)) + shifted = bytearray(width * height) + expected = synthetic_gray(width, height, 7) + for y in range(height): + row = y * width + for x in range(width): + src = min(width - 1, x + 24) + shifted[row + x] = expected[row + src] + frames.append(bytes(shifted)) + previous_seq: int | None = None + records: list[dict[str, Any]] = [] + suspicious = 0 + for idx, frame in enumerate(frames): + result = analyze_frame(frame, width, height, args, previous_seq) + if result["decoded_sequence"] is not None: + previous_seq = int(result["decoded_sequence"]) + result["frame"] = idx + records.append(result) + suspicious += int(bool(result["suspicious"])) + artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else pathlib.Path("/tmp") / f"lesavka-synthetic-rct-self-test-{timestamp()}" + artifact_dir.mkdir(parents=True, exist_ok=True) + write_pgm(artifact_dir / "reference_000001.pgm", frames[0], width, height) + summary = { + "schema": "lesavka.synthetic-rct-probe.self-test.v1", + "frames": len(frames), + "suspicious_frames": suspicious, + "records": records, + "artifact_dir": str(artifact_dir), + } + (artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") + print(json.dumps(summary, indent=2, sort_keys=True)) + return 0 if suspicious >= 2 else 1 + + +def main() -> int: + args = parse_args() + if args.self_test: + return run_self_test(args) + if args.capture_only: + return run_capture(args) + return run_remote_orchestrated(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/server/Cargo.toml b/server/Cargo.toml index 236a466..9a7de8b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -8,9 +8,15 @@ path = "src/bin/lesavka-uvc.rs" test = false bench = false +[[bin]] +name = "lesavka-synthetic-uplink" +path = "src/bin/lesavka-synthetic-uplink.rs" +test = false +bench = false + [package] name = "lesavka_server" -version = "0.22.46" +version = "0.22.47" edition = "2024" autobins = false diff --git a/server/src/bin/lesavka-synthetic-uplink.rs b/server/src/bin/lesavka-synthetic-uplink.rs new file mode 100755 index 0000000..1dfce4b --- /dev/null +++ b/server/src/bin/lesavka-synthetic-uplink.rs @@ -0,0 +1,501 @@ +#![forbid(unsafe_code)] + +use std::{path::PathBuf, time::Duration}; + +use anyhow::{Context, Result, bail}; +use gstreamer as gst; +use gstreamer::prelude::*; +use gstreamer_app as gst_app; +use lesavka_common::lesavka::{ + AudioEncoding, AudioPacket, UpstreamMediaBundle, VideoPacket, relay_client::RelayClient, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::Request; + +const DEFAULT_SERVER: &str = "http://127.0.0.1:50051"; +const DEFAULT_SAMPLE_RATE: u32 = 48_000; +const DEFAULT_CHANNELS: u32 = 2; +const DEFAULT_JPEG_QUALITY: i32 = 90; +const MARKER_BITS: usize = 32; +const MARKER_COLUMNS: usize = 16; + +#[derive(Clone, Debug)] +struct Args { + server: String, + width: usize, + height: usize, + fps: u32, + duration: Duration, + jpeg_quality: i32, + session_id: u64, + artifact_dir: Option, + print_every: u64, +} + +impl Args { + fn parse() -> Result { + let mut args = Self { + server: DEFAULT_SERVER.to_string(), + width: 1280, + height: 720, + fps: 30, + duration: Duration::from_secs(300), + jpeg_quality: DEFAULT_JPEG_QUALITY, + session_id: unix_millis(), + artifact_dir: None, + print_every: 150, + }; + let mut it = std::env::args().skip(1); + while let Some(flag) = it.next() { + match flag.as_str() { + "--server" => args.server = next_value(&mut it, &flag)?, + "--width" => args.width = parse_next(&mut it, &flag)?, + "--height" => args.height = parse_next(&mut it, &flag)?, + "--fps" => args.fps = parse_next(&mut it, &flag)?, + "--duration" => { + let seconds: f64 = parse_next(&mut it, &flag)?; + args.duration = Duration::from_secs_f64(seconds.max(0.0)); + } + "--jpeg-quality" => args.jpeg_quality = parse_next(&mut it, &flag)?, + "--session-id" => args.session_id = parse_next(&mut it, &flag)?, + "--artifact-dir" => { + args.artifact_dir = Some(PathBuf::from(next_value(&mut it, &flag)?)) + } + "--print-every" => args.print_every = parse_next(&mut it, &flag)?, + "--mode" => { + let value = next_value(&mut it, &flag)?; + let (width, height, fps) = parse_mode(&value)?; + args.width = width; + args.height = height; + args.fps = fps; + } + "--help" | "-h" => { + print_help(); + std::process::exit(0); + } + other => bail!("unknown argument {other:?}; pass --help for usage"), + } + } + if args.width == 0 || args.height == 0 || args.fps == 0 { + bail!("width, height, and fps must be positive"); + } + args.jpeg_quality = args.jpeg_quality.clamp(1, 100); + Ok(args) + } + + fn frame_step_us(&self) -> u64 { + (1_000_000_u64 / u64::from(self.fps)).max(1) + } + + fn total_frames(&self) -> u64 { + let frames = self.duration.as_secs_f64() * f64::from(self.fps); + frames.ceil().max(1.0) as u64 + } +} + +struct MjpegEncoder { + src: gst_app::AppSrc, + sink: gst_app::AppSink, + pipeline: gst::Pipeline, + width: usize, + height: usize, + frame_step_us: u64, +} + +impl MjpegEncoder { + fn new(args: &Args) -> Result { + gst::init().context("gst init")?; + let width = args.width as i32; + let height = args.height as i32; + let fps = args.fps as i32; + let raw_caps = gst::Caps::builder("video/x-raw") + .field("format", "RGB") + .field("width", width) + .field("height", height) + .field("framerate", gst::Fraction::new(fps, 1)) + .build(); + let jpeg_caps = gst::Caps::builder("image/jpeg") + .field("parsed", true) + .field("width", width) + .field("height", height) + .field("framerate", gst::Fraction::new(fps, 1)) + .build(); + let pipeline = gst::Pipeline::new(); + let src = gst::ElementFactory::make("appsrc") + .name("lesavka_synthetic_uplink_src") + .build()? + .downcast::() + .expect("appsrc"); + src.set_is_live(false); + src.set_format(gst::Format::Time); + src.set_property("do-timestamp", false); + src.set_caps(Some(&raw_caps)); + let convert = gst::ElementFactory::make("videoconvert").build()?; + let encoder = gst::ElementFactory::make("jpegenc") + .property("quality", args.jpeg_quality) + .build()?; + let capsfilter = gst::ElementFactory::make("capsfilter") + .property("caps", &jpeg_caps) + .build()?; + let sink = gst::ElementFactory::make("appsink") + .name("lesavka_synthetic_uplink_sink") + .property("sync", false) + .property("emit-signals", false) + .property("max-buffers", 8u32) + .build()? + .downcast::() + .expect("appsink"); + pipeline.add_many([ + src.upcast_ref(), + &convert, + &encoder, + &capsfilter, + sink.upcast_ref(), + ])?; + gst::Element::link_many([ + src.upcast_ref(), + &convert, + &encoder, + &capsfilter, + sink.upcast_ref(), + ])?; + pipeline + .set_state(gst::State::Playing) + .context("starting synthetic MJPEG encoder")?; + Ok(Self { + src, + sink, + pipeline, + width: args.width, + height: args.height, + frame_step_us: args.frame_step_us(), + }) + } + + fn encode(&mut self, sequence: u64) -> Result> { + let pts_us = sequence.saturating_mul(self.frame_step_us); + let mut buffer = + gst::Buffer::from_slice(synthetic_rgb_frame(self.width, self.height, sequence)); + if let Some(meta) = buffer.get_mut() { + let pts = gst::ClockTime::from_useconds(pts_us); + meta.set_pts(Some(pts)); + meta.set_dts(Some(pts)); + meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); + } + self.src + .push_buffer(buffer) + .context("encoding synthetic frame")?; + let sample = self + .sink + .pull_sample() + .context("pulling encoded synthetic frame")?; + let buffer = sample + .buffer() + .context("encoded synthetic frame had no buffer")?; + let map = buffer + .map_readable() + .context("mapping encoded synthetic frame")?; + Ok(map.as_slice().to_vec()) + } +} + +impl Drop for MjpegEncoder { + fn drop(&mut self) { + let _ = self.src.end_of_stream(); + let _ = self.pipeline.set_state(gst::State::Null); + } +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<()> { + let args = Args::parse()?; + if let Some(dir) = &args.artifact_dir { + std::fs::create_dir_all(dir).with_context(|| format!("creating {}", dir.display()))?; + std::fs::write( + dir.join("command.txt"), + std::env::args().collect::>().join(" ") + "\n", + )?; + std::fs::write(dir.join("summary.json"), args_summary_json(&args) + "\n")?; + } + + let channel = tonic::transport::Channel::from_shared(args.server.clone())? + .tcp_nodelay(true) + .connect() + .await + .with_context(|| format!("connecting to {}", args.server))?; + let mut client = RelayClient::new(channel); + let (tx, rx) = mpsc::channel::(8); + let response_task = tokio::spawn(async move { + let response = client + .stream_webcam_media(Request::new(ReceiverStream::new(rx))) + .await + .context("opening StreamWebcamMedia")?; + let mut inbound = response.into_inner(); + while inbound + .message() + .await + .context("reading StreamWebcamMedia response")? + .is_some() + {} + Ok::<(), anyhow::Error>(()) + }); + + let mut encoder = MjpegEncoder::new(&args)?; + let frame_step = Duration::from_micros(args.frame_step_us()); + let started = tokio::time::Instant::now() + Duration::from_millis(250); + let total_frames = args.total_frames(); + eprintln!( + "lesavka synthetic uplink: mode={}x{}@{} frames={} server={} session={}", + args.width, args.height, args.fps, total_frames, args.server, args.session_id + ); + for sequence in 0..total_frames { + tokio::time::sleep_until(started + duration_mul(frame_step, sequence)).await; + let pts_us = sequence.saturating_mul(args.frame_step_us()); + let encoded = encoder.encode(sequence)?; + let bundle = synthetic_bundle(&args, sequence, pts_us, encoded); + tx.send(bundle).await.context("sending synthetic bundle")?; + if args.print_every > 0 && sequence > 0 && sequence % args.print_every == 0 { + eprintln!("sent synthetic frame {sequence}/{total_frames}"); + } + } + drop(tx); + response_task + .await + .context("joining StreamWebcamMedia task")??; + eprintln!("lesavka synthetic uplink complete: frames={total_frames}"); + Ok(()) +} + +fn synthetic_bundle(args: &Args, sequence: u64, pts_us: u64, data: Vec) -> UpstreamMediaBundle { + let video = VideoPacket { + id: 0, + pts: pts_us, + data, + seq: sequence, + effective_fps: args.fps, + client_capture_pts_us: pts_us, + client_send_pts_us: pts_us, + ..Default::default() + }; + let audio = AudioPacket { + id: 0, + pts: pts_us, + data: silence_pcm(args.frame_step_us()), + seq: sequence, + client_capture_pts_us: pts_us, + client_send_pts_us: pts_us, + encoding: AudioEncoding::PcmS16le as i32, + sample_rate: DEFAULT_SAMPLE_RATE, + channels: DEFAULT_CHANNELS, + frame_duration_us: args.frame_step_us().min(u64::from(u32::MAX)) as u32, + ..Default::default() + }; + UpstreamMediaBundle { + session_id: args.session_id, + seq: sequence, + capture_start_us: pts_us, + capture_end_us: pts_us, + video: Some(video), + audio: vec![audio], + audio_sample_rate: DEFAULT_SAMPLE_RATE, + audio_channels: DEFAULT_CHANNELS, + video_width: args.width as u32, + video_height: args.height as u32, + video_fps: args.fps, + audio_encoding: AudioEncoding::PcmS16le as i32, + } +} + +fn silence_pcm(duration_us: u64) -> Vec { + let samples = (u64::from(DEFAULT_SAMPLE_RATE).saturating_mul(duration_us) / 1_000_000).max(1); + let bytes = samples + .saturating_mul(u64::from(DEFAULT_CHANNELS)) + .saturating_mul(std::mem::size_of::() as u64) + .min(usize::MAX as u64) as usize; + vec![0; bytes] +} + +fn synthetic_rgb_frame(width: usize, height: usize, sequence: u64) -> Vec { + let mut frame = vec![0u8; width.saturating_mul(height).saturating_mul(3)]; + let moving_period = (width / 3).max(64); + let moving_width = (width / 18).max(12); + let moving_offset = (sequence as usize).wrapping_mul(17) % moving_period; + for y in 0..height { + for x in 0..width { + let mut value = synthetic_luma( + width, + height, + x, + y, + sequence, + moving_period, + moving_width, + moving_offset, + ); + if y >= height / 2 && (((x / 32) + (y / 24) + sequence as usize) & 1) == 0 { + value /= 3; + } + let offset = (y * width + x) * 3; + frame[offset] = value; + frame[offset + 1] = value; + frame[offset + 2] = value; + } + } + draw_sequence_marker(&mut frame, width, height, sequence); + frame +} + +fn synthetic_luma( + width: usize, + height: usize, + x: usize, + y: usize, + sequence: u64, + moving_period: usize, + moving_width: usize, + moving_offset: usize, +) -> u8 { + let mut value = ((x as u64 * 3 + y as u64 * 5 + sequence.saturating_mul(11)) & 0xff) as u8; + let moving = (x + moving_offset) % moving_period; + if moving < moving_width { + value = value.saturating_add(70); + } + let center_x = width / 2; + let center_y = height / 2; + if x.abs_diff(center_x) < width / 9 && y.abs_diff(center_y) < height / 12 { + value = 255u8.saturating_sub(value / 2); + } + value +} + +fn marker_cell(width: usize, height: usize) -> usize { + (width.min(height) / 80).clamp(6, 16) +} + +fn draw_sequence_marker(frame: &mut [u8], width: usize, height: usize, sequence: u64) { + let cell = marker_cell(width, height); + let rows = MARKER_BITS.div_ceil(MARKER_COLUMNS); + if width < (MARKER_COLUMNS + 4) * cell || height < (rows + 4) * cell { + return; + } + let x0 = 2 * cell; + let y0 = 2 * cell; + fill_rect( + frame, + width, + cell, + cell, + (MARKER_COLUMNS + 2) * cell, + (rows + 2) * cell, + 32, + ); + fill_rect(frame, width, x0 - cell, y0 - cell, cell, cell, 255); + fill_rect( + frame, + width, + x0 + MARKER_COLUMNS * cell, + y0 - cell, + cell, + cell, + 0, + ); + for bit in 0..MARKER_BITS { + let col = bit % MARKER_COLUMNS; + let row = bit / MARKER_COLUMNS; + let value = if ((sequence >> bit) & 1) != 0 { 255 } else { 0 }; + fill_rect( + frame, + width, + x0 + col * cell, + y0 + row * cell, + cell, + cell, + value, + ); + } +} + +fn fill_rect(frame: &mut [u8], width: usize, x0: usize, y0: usize, w: usize, h: usize, value: u8) { + let pixels = frame.len() / 3; + let height = pixels / width.max(1); + let x1 = (x0 + w).min(width); + let y1 = (y0 + h).min(height); + for y in y0..y1 { + for x in x0..x1 { + let offset = (y * width + x) * 3; + if let Some(pixel) = frame.get_mut(offset..offset + 3) { + pixel[0] = value; + pixel[1] = value; + pixel[2] = value; + } + } + } +} + +fn parse_mode(value: &str) -> Result<(usize, usize, u32)> { + let (size, fps) = value + .split_once('@') + .with_context(|| format!("mode must look like WIDTHxHEIGHT@FPS, got {value:?}"))?; + let (width, height) = size + .split_once('x') + .with_context(|| format!("mode must look like WIDTHxHEIGHT@FPS, got {value:?}"))?; + Ok((width.parse()?, height.parse()?, fps.parse()?)) +} + +fn next_value(it: &mut impl Iterator, flag: &str) -> Result { + it.next() + .with_context(|| format!("{flag} requires a value")) +} + +fn parse_next(it: &mut impl Iterator, flag: &str) -> Result +where + T: std::str::FromStr, + T::Err: std::error::Error + Send + Sync + 'static, +{ + Ok(next_value(it, flag)?.parse()?) +} + +fn duration_mul(duration: Duration, count: u64) -> Duration { + Duration::from_nanos( + duration + .as_nanos() + .saturating_mul(u128::from(count)) + .min(u128::from(u64::MAX)) as u64, + ) +} + +fn unix_millis() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .min(u128::from(u64::MAX)) as u64 +} + +fn args_summary_json(args: &Args) -> String { + format!( + "{{\"schema\":\"lesavka.synthetic-uplink.v1\",\"server\":{server:?},\"width\":{width},\"height\":{height},\"fps\":{fps},\"duration_s\":{duration:.3},\"session_id\":{session}}}", + server = args.server, + width = args.width, + height = args.height, + fps = args.fps, + duration = args.duration.as_secs_f64(), + session = args.session_id, + ) +} + +fn print_help() { + println!( + "lesavka-synthetic-uplink\n\n\ + Sends sequence-coded synthetic MJPEG plus silent PCM through StreamWebcamMedia.\n\n\ + Options:\n\ + --server URL gRPC endpoint, default {DEFAULT_SERVER}\n\ + --mode WIDTHxHEIGHT@FPS shorthand for width/height/fps\n\ + --width N --height N --fps N\n\ + --duration SECONDS default 300\n\ + --jpeg-quality N default {DEFAULT_JPEG_QUALITY}\n\ + --artifact-dir PATH write command/summary metadata\n\ + --print-every N progress interval in frames" + ); +} diff --git a/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs b/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs new file mode 100644 index 0000000..d21cf5c --- /dev/null +++ b/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs @@ -0,0 +1,136 @@ +// Contract coverage for the synthetic Theia-to-RCT UVC comparison probe. +// +// Scope: inspect the StreamWebcamMedia synthetic injector and the manual RCT +// capture/comparison harness. Why: the remaining UVC corruption is rare and +// stateful, so we need repeatable source-to-received frame evidence. + +use std::{fs, path::PathBuf, process::Command}; + +use serde_json::Value; + +const PROBE_SRC: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/scripts/manual/run_synthetic_rct_uvc_probe.py" +)); +const INJECTOR_SRC: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/server/src/bin/lesavka-synthetic-uplink.rs" +)); +const SERVER_CARGO: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/server/Cargo.toml")); +const SERVER_INSTALL: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/scripts/install/server.sh" +)); + +fn repo_script_path() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("scripts/manual/run_synthetic_rct_uvc_probe.py") +} + +#[test] +fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() { + for expected in [ + "lesavka.synthetic-rct-capture.v1", + "lesavka.synthetic-rct-probe.orchestrator.v1", + "StreamWebcamMedia", + "lesavka-synthetic-uplink", + "--inject-host", + "--rct-host", + "--capture-only", + "--source", + "--mode", + "1280x720@20,1280x720@30,1920x1080@20,1920x1080@30", + "marker_decode_failed", + "frame_repeat", + "frame_gap", + "frame_backwards", + "lower_half_tear", + "black_or_gray_slab", + "horizontal_shift", + "expected_", + "suspicious_", + "decoded_pct", + "max_lower_mae", + "ffmpeg", + "v4l2", + "x11grab", + ] { + assert!( + PROBE_SRC.contains(expected), + "synthetic RCT probe should preserve marker {expected}" + ); + } +} + +#[test] +fn synthetic_injector_enters_the_public_bundled_media_rpc() { + for expected in [ + "name = \"lesavka-synthetic-uplink\"", + "src/bin/lesavka-synthetic-uplink.rs", + "stream_webcam_media(Request::new(ReceiverStream::new(rx)))", + "UpstreamMediaBundle", + "AudioEncoding::PcmS16le", + "silence_pcm(args.frame_step_us())", + "synthetic_rgb_frame", + "draw_sequence_marker", + "image/jpeg", + "jpegenc", + "client_capture_pts_us: pts_us", + "client_send_pts_us: pts_us", + ] { + assert!( + SERVER_CARGO.contains(expected) || INJECTOR_SRC.contains(expected), + "synthetic injector should preserve marker {expected}" + ); + } + assert!( + SERVER_INSTALL.contains( + "install_verified_executable \"$SRC_DIR/target/release/lesavka-synthetic-uplink\" /usr/local/bin/lesavka-synthetic-uplink" + ), + "server installer should install the synthetic uplink binary for Theia lab runs" + ); +} + +#[test] +fn synthetic_probe_self_test_detects_slab_and_shift_categories() { + let dir = tempfile::tempdir().expect("tempdir"); + let output = Command::new("python3") + .arg(repo_script_path()) + .arg("--self-test") + .arg("--artifact-dir") + .arg(dir.path()) + .output() + .expect("run synthetic RCT probe self-test"); + + assert!( + output.status.success(), + "synthetic probe self-test should succeed: stdout={} stderr={}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + + let summary: Value = serde_json::from_str( + &fs::read_to_string(dir.path().join("summary.json")).expect("summary json"), + ) + .expect("parse summary json"); + assert_eq!( + summary["schema"], + "lesavka.synthetic-rct-probe.self-test.v1" + ); + assert!( + summary["suspicious_frames"].as_u64().unwrap_or_default() >= 2, + "self-test should detect at least the slab and shifted synthetic frames: {summary}" + ); + let records = summary["records"].as_array().expect("records array"); + assert!( + records.iter().any(|record| record["reasons"] + .as_array() + .is_some_and(|reasons| reasons.iter().any(|reason| reason == "black_or_gray_slab"))), + "self-test should include a slab category: {summary}" + ); + assert!( + records.iter().any(|record| record["reasons"] + .as_array() + .is_some_and(|reasons| reasons.iter().any(|reason| reason == "horizontal_shift"))), + "self-test should include a horizontal shift category: {summary}" + ); +}