From 955cfb3b7bb15a485ad47d4b86d17c3e06a8fcf9 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 14 May 2026 19:05:32 -0300 Subject: [PATCH] media: pace uvc output and add rct artifact probe --- Cargo.lock | 6 +- Cargo.toml | 4 + client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- scripts/install/server.sh | 1 + scripts/manual/run_rct_uvc_artifact_probe.py | 474 ++++++++++++++++++ server/Cargo.toml | 2 +- server/src/bin/lesavka-uvc.real.inc | 48 +- server/src/bin/lesavka_uvc/coverage_model.rs | 6 + .../src/bin/lesavka_uvc/coverage_startup.rs | 20 +- .../server/uvc/server_uvc_binary_contract.rs | 4 + .../uvc/server_uvc_binary_extra_contract.rs | 15 +- .../rct_uvc_artifact_probe_manual_contract.rs | 91 ++++ 13 files changed, 659 insertions(+), 16 deletions(-) create mode 100755 scripts/manual/run_rct_uvc_artifact_probe.py create mode 100644 tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs diff --git a/Cargo.lock b/Cargo.lock index 995009a..24321e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.32" +version = "0.22.33" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.32" +version = "0.22.33" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.32" +version = "0.22.33" dependencies = [ "anyhow", "base64", diff --git a/Cargo.toml b/Cargo.toml index f272243..1943347 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -468,6 +468,10 @@ path = "tests/manual/client/sync_probe/client_server_rc_matrix_script_contract.r name = "server_rct_mode_matrix_manual_contract" path = "tests/manual/server/rct/server_rct_mode_matrix_manual_contract.rs" +[[test]] +name = "rct_uvc_artifact_probe_manual_contract" +path = "tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs" + [[test]] name = "google_meet_observer_manual_contract" path = "tests/manual/google_meet/google_meet_observer_manual_contract.rs" diff --git a/client/Cargo.toml b/client/Cargo.toml index f5d8038..20a518a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.32" +version = "0.22.33" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index ca34060..97786d9 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.32" +version = "0.22.33" edition = "2024" build = "build.rs" diff --git a/scripts/install/server.sh b/scripts/install/server.sh index a64a130..870724a 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -355,6 +355,7 @@ LESAVKA_UVC_HEIGHT=$(uvc_env_value LESAVKA_UVC_HEIGHT 720) LESAVKA_UVC_CODEC=${INSTALL_UVC_CODEC} LESAVKA_UVC_BLOCKING=$(uvc_env_value LESAVKA_UVC_BLOCKING 1) LESAVKA_UVC_CONTROL_READ_ONLY=$(uvc_env_value LESAVKA_UVC_CONTROL_READ_ONLY 0) +LESAVKA_UVC_QUEUE_PACING=$(uvc_env_value LESAVKA_UVC_QUEUE_PACING 1) LESAVKA_UVC_MAXBURST=$(uvc_env_value LESAVKA_UVC_MAXBURST 0) LESAVKA_UVC_BULK=$(uvc_env_value LESAVKA_UVC_BULK 1) LESAVKA_UVC_FRAME_SIZE_GUARD=$(uvc_env_value LESAVKA_UVC_FRAME_SIZE_GUARD 1) diff --git a/scripts/manual/run_rct_uvc_artifact_probe.py b/scripts/manual/run_rct_uvc_artifact_probe.py new file mode 100755 index 0000000..6184e8e --- /dev/null +++ b/scripts/manual/run_rct_uvc_artifact_probe.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +"""Capture the RCT-facing UVC webcam and score lower-half video corruption.""" + +from __future__ import annotations + +import argparse +import collections +import json +import os +import pathlib +import shlex +import shutil +import subprocess +import sys +import time +from typing import Any + +DEFAULT_DEVICE_LABEL = "Lesavka Composite" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Long-running Lesavka RCT UVC artifact probe. It captures decoded " + "grayscale frames from the host-side UVC device, flags lower-half " + "slabs/tears, and writes JSON/PGM artifacts for review." + ) + ) + parser.add_argument("--host", default="", help="optional SSH host, e.g. tethys") + parser.add_argument("--source", choices=["device", "x11"], default="device") + parser.add_argument("--device", default="auto", help="video device or auto") + parser.add_argument("--display", default=":0", help="X11 display for --source x11") + parser.add_argument("--crop", default="", help="X11 crop as x,y,width,height for --source x11") + parser.add_argument("--device-label", default=DEFAULT_DEVICE_LABEL) + parser.add_argument("--width", type=int, default=1280) + parser.add_argument("--height", type=int, default=720) + parser.add_argument("--fps", type=int, default=30) + parser.add_argument("--duration", type=float, default=180.0) + parser.add_argument("--artifact-dir", default="") + parser.add_argument("--remote-artifact-dir", default="") + parser.add_argument("--x-step", type=int, default=8) + parser.add_argument("--y-step", type=int, default=4) + parser.add_argument("--bands", type=int, default=24) + parser.add_argument("--flat-var", type=float, default=18.0) + parser.add_argument("--delta-threshold", type=float, default=24.0) + parser.add_argument("--jump-threshold", type=float, default=34.0) + parser.add_argument("--max-suspicious-artifacts", type=int, default=40) + parser.add_argument("--progress-every", type=int, default=150) + parser.add_argument("--self-test", action="store_true") + return parser.parse_args() + + +def timestamp() -> str: + return time.strftime("%Y%m%d-%H%M%S", time.gmtime()) + + +def default_artifact_dir() -> pathlib.Path: + return pathlib.Path("/tmp") / f"lesavka-rct-uvc-artifact-probe-{timestamp()}" + + +def run_remote(args: argparse.Namespace) -> int: + local_artifact_dir = pathlib.Path(args.artifact_dir or f"artifacts/rct-uvc/{timestamp()}") + remote_artifact_dir = args.remote_artifact_dir or f"/tmp/lesavka-rct-uvc-artifact-probe-{timestamp()}" + remote_script = f"/tmp/lesavka-rct-uvc-artifact-probe-{os.getpid()}.py" + script_text = pathlib.Path(__file__).read_text() + subprocess.run( + ["ssh", args.host, f"cat > {shlex.quote(remote_script)} && chmod +x {shlex.quote(remote_script)}"], + input=script_text, + text=True, + check=True, + ) + remote_cmd = [ + "python3", + remote_script, + "--source", + args.source, + "--device", + args.device, + "--device-label", + args.device_label, + "--display", + args.display, + "--crop", + args.crop, + "--width", + str(args.width), + "--height", + str(args.height), + "--fps", + str(args.fps), + "--duration", + str(args.duration), + "--artifact-dir", + remote_artifact_dir, + "--x-step", + str(args.x_step), + "--y-step", + str(args.y_step), + "--bands", + str(args.bands), + "--flat-var", + str(args.flat_var), + "--delta-threshold", + str(args.delta_threshold), + "--jump-threshold", + str(args.jump_threshold), + "--max-suspicious-artifacts", + str(args.max_suspicious_artifacts), + "--progress-every", + str(args.progress_every), + ] + print(f"running remote RCT UVC probe on {args.host}: {remote_artifact_dir}", file=sys.stderr) + rc = subprocess.run(["ssh", args.host, " ".join(shlex.quote(part) for part in remote_cmd)]).returncode + local_artifact_dir.parent.mkdir(parents=True, exist_ok=True) + subprocess.run( + ["scp", "-r", f"{args.host}:{remote_artifact_dir}", str(local_artifact_dir)], + check=False, + ) + print(f"artifact_dir: {local_artifact_dir}") + return rc + + +def detect_video_device(label: str) -> str: + explicit = os.environ.get("LESAVKA_RCT_UVC_DEVICE") + if explicit: + return explicit + try: + listing = subprocess.check_output(["v4l2-ctl", "--list-devices"], text=True) + except Exception: + return "/dev/video2" + current_matches = False + for line in listing.splitlines(): + if not line.startswith(("\t", " ")): + current_matches = label.lower() in line.lower() + continue + value = line.strip() + if current_matches and value.startswith("/dev/video"): + return value + return "/dev/video2" + + +def band_stats(frame: bytes, width: int, y0: int, y1: int, x_step: int, y_step: int) -> tuple[float, float]: + total = 0 + total2 = 0 + count = 0 + view = memoryview(frame) + for y in range(y0, y1, y_step): + row = y * width + for x in range(0, width, x_step): + value = view[row + x] + total += value + total2 += value * value + count += 1 + if count == 0: + return 0.0, 0.0 + mean = total / count + variance = max(0.0, (total2 / count) - (mean * mean)) + return mean, variance + + +def band_delta( + frame: bytes, previous: bytes | None, width: int, y0: int, y1: int, x_step: int, y_step: int +) -> float: + if previous is None: + return 0.0 + total = 0 + count = 0 + view = memoryview(frame) + prev = memoryview(previous) + for y in range(y0, y1, y_step): + row = y * width + for x in range(0, width, x_step): + total += abs(view[row + x] - prev[row + x]) + count += 1 + return total / count if count else 0.0 + + +def max_run(flags: list[bool]) -> int: + best = 0 + cur = 0 + for flag in flags: + cur = cur + 1 if flag else 0 + best = max(best, cur) + return best + + +def analyze_frame(frame: bytes, previous: bytes | None, args: argparse.Namespace) -> dict[str, Any]: + width = args.width + height = args.height + band_count = max(8, args.bands) + band_h = max(1, height // band_count) + means: list[float] = [] + variances: list[float] = [] + for band in range(band_count): + y0 = band * band_h + y1 = height if band == band_count - 1 else min(height, y0 + band_h) + mean, variance = band_stats(frame, width, y0, y1, args.x_step, args.y_step) + means.append(mean) + variances.append(variance) + + half = band_count // 2 + lower_flags = [var < args.flat_var for var in variances[half:]] + lower_flat_pct = sum(lower_flags) / max(1, len(lower_flags)) + lower_flat_run_pct = max_run(lower_flags) / max(1, len(lower_flags)) + upper_delta = band_delta(frame, previous, width, 0, height // 2, args.x_step, args.y_step) + lower_delta = band_delta(frame, previous, width, height // 2, height, args.x_step, args.y_step) + jumps = [abs(means[idx] - means[idx - 1]) for idx in range(1, band_count)] + lower_jumps = jumps[half:] + max_lower_jump = max(lower_jumps or [0.0]) + sorted_jumps = sorted(jumps) + median_jump = sorted_jumps[len(sorted_jumps) // 2] if sorted_jumps else 0.0 + + reasons: list[str] = [] + temporal_lower_jump = lower_delta > args.delta_threshold and lower_delta > max(upper_delta * 2.2, 8.0) + lower_delta_skew = temporal_lower_jump + lower_boundary_jump = ( + temporal_lower_jump + and max_lower_jump > args.jump_threshold + and max_lower_jump > max(median_jump * 3.0, 8.0) + ) + lower_flat_flash = lower_flat_pct >= 0.25 and temporal_lower_jump + lower_slab = lower_flat_run_pct >= 0.33 and max_lower_jump > args.jump_threshold and temporal_lower_jump + if lower_delta_skew: + reasons.append("lower_delta_skew") + if lower_boundary_jump: + reasons.append("lower_boundary_jump") + if lower_flat_flash: + reasons.append("lower_flat_flash") + if lower_slab: + reasons.append("lower_slab") + + suspicious = bool(lower_delta_skew or lower_boundary_jump or lower_flat_flash or lower_slab) + return { + "suspicious": suspicious, + "reasons": reasons, + "upper_delta": round(upper_delta, 3), + "lower_delta": round(lower_delta, 3), + "lower_flat_pct": round(lower_flat_pct, 3), + "lower_flat_run_pct": round(lower_flat_run_pct, 3), + "max_lower_jump": round(max_lower_jump, 3), + "median_band_jump": round(median_jump, 3), + "lower_variance_min": round(min(variances[half:] or [0.0]), 3), + "lower_variance_mean": round(sum(variances[half:]) / max(1, len(variances[half:])), 3), + } + + +def write_pgm(path: pathlib.Path, frame: bytes, width: int, height: int) -> None: + path.write_bytes(f"P5\n{width} {height}\n255\n".encode() + frame) + + +def parse_crop(value: str, args: argparse.Namespace) -> tuple[int, int, int, int]: + if not value: + return (0, 0, args.width, args.height) + parts = [part.strip() for part in value.split(",")] + if len(parts) != 4: + raise SystemExit("--crop must be x,y,width,height") + try: + x, y, width, height = [int(part) for part in parts] + except ValueError as exc: + raise SystemExit("--crop values must be integers") from exc + if width <= 0 or height <= 0: + raise SystemExit("--crop width and height must be positive") + args.width = width + args.height = height + return (x, y, width, height) + + +def ffmpeg_cmd(device: str, args: argparse.Namespace) -> list[str]: + if args.source == "x11": + x, y, width, height = parse_crop(args.crop, args) + display = f"{args.display}+{x},{y}" + return [ + "ffmpeg", + "-hide_banner", + "-nostdin", + "-loglevel", + "warning", + "-f", + "x11grab", + "-video_size", + f"{width}x{height}", + "-framerate", + str(args.fps), + "-i", + display, + "-an", + "-pix_fmt", + "gray", + "-f", + "rawvideo", + "-", + ] + + return [ + "ffmpeg", + "-hide_banner", + "-nostdin", + "-loglevel", + "warning", + "-f", + "v4l2", + "-input_format", + "mjpeg", + "-video_size", + f"{args.width}x{args.height}", + "-framerate", + str(args.fps), + "-i", + device, + "-an", + "-pix_fmt", + "gray", + "-f", + "rawvideo", + "-", + ] + + +def run_capture(args: argparse.Namespace) -> int: + artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else default_artifact_dir() + artifact_dir.mkdir(parents=True, exist_ok=True) + device = detect_video_device(args.device_label) if args.device == "auto" else args.device + command = ffmpeg_cmd(device, args) + (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") + frame_size = args.width * args.height + stderr_path = artifact_dir / "ffmpeg.stderr" + jsonl_path = artifact_dir / "frame-metrics.jsonl" + started = time.monotonic() + previous: bytes | None = None + frame_index = 0 + suspicious_count = 0 + artifacts_written = 0 + reason_counts: collections.Counter[str] = collections.Counter() + worst: list[dict[str, Any]] = [] + with stderr_path.open("wb") as err, jsonl_path.open("w") as metrics: + proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) + assert proc.stdout is not None + try: + while time.monotonic() - started < args.duration: + frame = proc.stdout.read(frame_size) + if len(frame) != frame_size: + break + frame_index += 1 + result = analyze_frame(frame, previous, args) + previous = frame + result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)}) + if result["suspicious"]: + suspicious_count += 1 + reason_counts.update(result["reasons"]) + worst.append(result) + worst = sorted( + worst, + key=lambda item: (item["max_lower_jump"], item["lower_delta"]), + reverse=True, + )[:20] + if artifacts_written < args.max_suspicious_artifacts: + write_pgm( + artifact_dir / f"suspicious_{frame_index:06d}.pgm", + frame, + args.width, + args.height, + ) + artifacts_written += 1 + if frame_index <= 5 or result["suspicious"] or frame_index % args.progress_every == 0: + metrics.write(json.dumps(result, sort_keys=True) + "\n") + if frame_index % args.progress_every == 0: + print( + f"frames={frame_index} suspicious={suspicious_count} latest={result}", + file=sys.stderr, + ) + finally: + proc.terminate() + try: + proc.wait(timeout=3) + except subprocess.TimeoutExpired: + proc.kill() + elapsed = max(0.001, time.monotonic() - started) + summary = { + "schema": "lesavka.rct-uvc-artifact-probe.v1", + "source": args.source, + "device": device, + "width": args.width, + "height": args.height, + "fps_requested": args.fps, + "duration_requested_s": args.duration, + "duration_observed_s": round(elapsed, 3), + "frames": frame_index, + "fps_observed": round(frame_index / elapsed, 3), + "suspicious_frames": suspicious_count, + "suspicious_pct": round((suspicious_count / frame_index * 100.0) if frame_index else 0.0, 3), + "reason_counts": dict(reason_counts), + "worst_frames": worst, + "artifact_dir": str(artifact_dir), + "ffmpeg_stderr": str(stderr_path), + } + (artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") + (artifact_dir / "summary.txt").write_text(format_summary(summary)) + print(format_summary(summary), end="") + print(f"artifact_dir: {artifact_dir}") + return 0 if frame_index > 0 else 2 + + +def format_summary(summary: dict[str, Any]) -> str: + lines = [ + "Lesavka RCT UVC artifact probe", + f"source: {summary.get('source', 'device')}", + f"device: {summary['device']}", + f"mode: {summary['width']}x{summary['height']}@{summary['fps_requested']}", + f"frames: {summary['frames']} ({summary['fps_observed']} fps observed)", + f"suspicious: {summary['suspicious_frames']} ({summary['suspicious_pct']}%)", + f"reasons: {summary['reason_counts']}", + f"artifacts: {summary['artifact_dir']}", + "", + ] + return "\n".join(lines) + + +def synthetic_frame(width: int, height: int, shift: int = 0, corrupt: bool = False) -> bytes: + data = bytearray(width * height) + for y in range(height): + row = y * width + for x in range(width): + data[row + x] = (x // 8 + y // 4 + shift) % 256 + if corrupt: + for y in range(height // 2, height): + row = y * width + data[row : row + width] = bytes([128]) * width + return bytes(data) + + +def run_self_test(args: argparse.Namespace) -> int: + artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else default_artifact_dir() + artifact_dir.mkdir(parents=True, exist_ok=True) + args.width = 160 + args.height = 90 + frames = [synthetic_frame(args.width, args.height, idx) for idx in range(4)] + frames.append(synthetic_frame(args.width, args.height, 5, corrupt=True)) + previous = None + suspicious = 0 + records = [] + for idx, frame in enumerate(frames, start=1): + result = analyze_frame(frame, previous, args) + previous = frame + result["frame"] = idx + records.append(result) + if result["suspicious"]: + suspicious += 1 + write_pgm(artifact_dir / f"selftest_suspicious_{idx:06d}.pgm", frame, args.width, args.height) + summary = { + "schema": "lesavka.rct-uvc-artifact-probe.self-test.v1", + "frames": len(frames), + "suspicious_frames": suspicious, + "records": records, + "artifact_dir": str(artifact_dir), + } + (artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") + print(json.dumps(summary, indent=2, sort_keys=True)) + return 0 if suspicious >= 1 else 1 + + +def main() -> int: + args = parse_args() + if args.self_test: + return run_self_test(args) + if args.host and args.host not in {"localhost", "127.0.0.1"}: + if not shutil.which("ssh") or not shutil.which("scp"): + print("ssh and scp are required for --host", file=sys.stderr) + return 2 + return run_remote(args) + return run_capture(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/server/Cargo.toml b/server/Cargo.toml index f2a2768..7c1a339 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.32" +version = "0.22.33" edition = "2024" autobins = false diff --git a/server/src/bin/lesavka-uvc.real.inc b/server/src/bin/lesavka-uvc.real.inc index df3ecca..1c4507b 100644 --- a/server/src/bin/lesavka-uvc.real.inc +++ b/server/src/bin/lesavka-uvc.real.inc @@ -244,6 +244,8 @@ struct UvcVideoStream { frame_path: std::path::PathBuf, latest_frame: Vec, frame_max_bytes: usize, + frame_period: Option, + next_queue_at: Option, streaming: bool, stats: UvcVideoStats, } @@ -257,6 +259,8 @@ struct UvcVideoStats { rejected_invalid: u64, fallback_idle: u64, latest_bytes: usize, + paced_sleeps: u64, + paced_sleep_ms: u64, last_report: Option, } @@ -268,6 +272,8 @@ impl UvcVideoStream { frame_path: frame_spool_path(), latest_frame: IDLE_MJPEG_FRAME.to_vec(), frame_max_bytes: MAX_MJPEG_FRAME_BYTES, + frame_period: None, + next_queue_at: None, streaming: false, stats: UvcVideoStats::default(), } @@ -276,6 +282,8 @@ impl UvcVideoStream { fn start(&mut self, cfg: UvcConfig) -> Result<()> { self.stop(); self.frame_max_bytes = uvc_frame_max_bytes(cfg); + self.frame_period = uvc_queue_period(cfg.fps); + self.next_queue_at = None; self.set_format(cfg)?; self.request_buffers(uvc_buffer_count())?; for index in 0..self.buffers.len() { @@ -417,6 +425,9 @@ impl UvcVideoStream { } fn queue_buffer(&mut self, index: u32) -> Result<()> { + if self.streaming { + self.pace_queue_if_needed(); + } self.refresh_latest_frame(); let Some(buffer) = self.buffers.get(index as usize) else { return Ok(()); @@ -453,6 +464,23 @@ impl UvcVideoStream { Ok(()) } + fn pace_queue_if_needed(&mut self) { + let Some(period) = self.frame_period else { + return; + }; + let now = Instant::now(); + if let Some(deadline) = self.next_queue_at { + if let Some(delay) = deadline.checked_duration_since(now) { + if !delay.is_zero() { + thread::sleep(delay); + self.stats.paced_sleeps += 1; + self.stats.paced_sleep_ms += delay.as_millis().min(u128::from(u64::MAX)) as u64; + } + } + } + self.next_queue_at = Some(Instant::now() + period); + } + fn refresh_latest_frame(&mut self) { let stale = frame_spool_is_stale(&self.frame_path, frame_spool_max_age()); if stale && looks_like_mjpeg_frame(&self.latest_frame) { @@ -512,7 +540,7 @@ impl UvcVideoStream { } self.stats.last_report = Some(now); eprintln!( - "[lesavka-uvc] video stats queued={} reloaded={} stale_replay={} rejected_oversize={} rejected_invalid={} fallback_idle={} latest_bytes={} frame_cap={}", + "[lesavka-uvc] video stats queued={} reloaded={} stale_replay={} rejected_oversize={} rejected_invalid={} fallback_idle={} latest_bytes={} frame_cap={} paced_sleeps={} paced_sleep_ms={}", self.stats.queued, self.stats.reloaded, self.stats.replayed_stale, @@ -520,7 +548,9 @@ impl UvcVideoStream { self.stats.rejected_invalid, self.stats.fallback_idle, self.stats.latest_bytes, - self.frame_payload_limit() + self.frame_payload_limit(), + self.stats.paced_sleeps, + self.stats.paced_sleep_ms ); if let Some(path) = uvc_stats_path() { let _ = write_atomic_text( @@ -603,6 +633,14 @@ fn uvc_idle_pump_sleep() -> Duration { )) } +fn uvc_queue_period(fps: u32) -> Option { + if !env_flag_enabled("LESAVKA_UVC_QUEUE_PACING", true) { + return None; + } + let fps = fps.max(1); + Some(Duration::from_nanos(1_000_000_000 / u64::from(fps))) +} + /// Bound MJPEG frames before they enter the USB UVC helper. /// /// Inputs: UVC mode plus optional `LESAVKA_UVC_FRAME_MAX_BYTES` or @@ -700,7 +738,7 @@ fn write_atomic_text(path: &std::path::Path, text: &str) -> Result<()> { fn uvc_stats_snapshot_json(stats: &UvcVideoStats, frame_cap: usize) -> String { format!( - "{{\"queued\":{},\"reloaded\":{},\"stale_replay\":{},\"rejected_oversize\":{},\"rejected_invalid\":{},\"fallback_idle\":{},\"latest_bytes\":{},\"frame_cap\":{}}}\n", + "{{\"queued\":{},\"reloaded\":{},\"stale_replay\":{},\"rejected_oversize\":{},\"rejected_invalid\":{},\"fallback_idle\":{},\"latest_bytes\":{},\"frame_cap\":{},\"paced_sleeps\":{},\"paced_sleep_ms\":{}}}\n", stats.queued, stats.reloaded, stats.replayed_stale, @@ -708,7 +746,9 @@ fn uvc_stats_snapshot_json(stats: &UvcVideoStats, frame_cap: usize) -> String { stats.rejected_invalid, stats.fallback_idle, stats.latest_bytes, - frame_cap + frame_cap, + stats.paced_sleeps, + stats.paced_sleep_ms ) } diff --git a/server/src/bin/lesavka_uvc/coverage_model.rs b/server/src/bin/lesavka_uvc/coverage_model.rs index 1581090..d747644 100644 --- a/server/src/bin/lesavka_uvc/coverage_model.rs +++ b/server/src/bin/lesavka_uvc/coverage_model.rs @@ -162,6 +162,8 @@ struct UvcVideoStream { frame_path: std::path::PathBuf, latest_frame: Vec, frame_max_bytes: usize, + frame_period: Option, + next_queue_at: Option, } #[cfg(coverage)] @@ -174,6 +176,8 @@ struct UvcVideoStats { rejected_invalid: u64, fallback_idle: u64, latest_bytes: usize, + paced_sleeps: u64, + paced_sleep_ms: u64, } #[cfg(coverage)] @@ -184,6 +188,8 @@ impl UvcVideoStream { frame_path: frame_spool_path(), latest_frame: IDLE_MJPEG_FRAME.to_vec(), frame_max_bytes: MAX_MJPEG_FRAME_BYTES, + frame_period: None, + next_queue_at: None, } } diff --git a/server/src/bin/lesavka_uvc/coverage_startup.rs b/server/src/bin/lesavka_uvc/coverage_startup.rs index a4fe670..0f93b52 100644 --- a/server/src/bin/lesavka_uvc/coverage_startup.rs +++ b/server/src/bin/lesavka_uvc/coverage_startup.rs @@ -151,6 +151,20 @@ fn uvc_idle_pump_sleep() -> std::time::Duration { )) } +#[cfg(coverage)] +/// Returns the frame queue pacing period for the negotiated UVC frame rate. +/// +/// Inputs: `LESAVKA_UVC_QUEUE_PACING` plus the active frame rate. Output: +/// `None` when pacing is explicitly disabled. Why: the RCT-facing host must +/// not be overfed faster than the advertised descriptor cadence. +fn uvc_queue_period(fps: u32) -> Option { + if !env_flag_enabled("LESAVKA_UVC_QUEUE_PACING", true) { + return None; + } + let fps = fps.max(1); + Some(std::time::Duration::from_nanos(1_000_000_000 / u64::from(fps))) +} + #[cfg(coverage)] /// Bound MJPEG frames before they enter the USB UVC helper. /// @@ -268,7 +282,7 @@ fn write_atomic_text(path: &std::path::Path, text: &str) -> Result<()> { #[cfg(coverage)] fn uvc_stats_snapshot_json(stats: &UvcVideoStats, frame_cap: usize) -> String { format!( - "{{\"queued\":{},\"reloaded\":{},\"stale_replay\":{},\"rejected_oversize\":{},\"rejected_invalid\":{},\"fallback_idle\":{},\"latest_bytes\":{},\"frame_cap\":{}}}\n", + "{{\"queued\":{},\"reloaded\":{},\"stale_replay\":{},\"rejected_oversize\":{},\"rejected_invalid\":{},\"fallback_idle\":{},\"latest_bytes\":{},\"frame_cap\":{},\"paced_sleeps\":{},\"paced_sleep_ms\":{}}}\n", stats.queued, stats.reloaded, stats.replayed_stale, @@ -276,7 +290,9 @@ fn uvc_stats_snapshot_json(stats: &UvcVideoStats, frame_cap: usize) -> String { stats.rejected_invalid, stats.fallback_idle, stats.latest_bytes, - frame_cap + frame_cap, + stats.paced_sleeps, + stats.paced_sleep_ms ) } diff --git a/tests/contract/server/uvc/server_uvc_binary_contract.rs b/tests/contract/server/uvc/server_uvc_binary_contract.rs index 59befe2..8d1d477 100644 --- a/tests/contract/server/uvc/server_uvc_binary_contract.rs +++ b/tests/contract/server/uvc/server_uvc_binary_contract.rs @@ -360,12 +360,16 @@ mod uvc_binary { rejected_invalid: 3, fallback_idle: 4, latest_bytes: 77_036, + paced_sleeps: 5, + paced_sleep_ms: 123, last_report: None, }; let json = uvc_stats_snapshot_json(&stats, 333_333); assert!(json.contains("\"queued\":7")); assert!(json.contains("\"latest_bytes\":77036")); assert!(json.contains("\"frame_cap\":333333")); + assert!(json.contains("\"paced_sleeps\":5")); + assert!(json.contains("\"paced_sleep_ms\":123")); let dir = tempfile::tempdir().expect("tempdir"); let path = dir.path().join("uvc").join("stats.json"); diff --git a/tests/contract/server/uvc/server_uvc_binary_extra_contract.rs b/tests/contract/server/uvc/server_uvc_binary_extra_contract.rs index 86cffb1..889b653 100644 --- a/tests/contract/server/uvc/server_uvc_binary_extra_contract.rs +++ b/tests/contract/server/uvc/server_uvc_binary_extra_contract.rs @@ -250,7 +250,7 @@ mod uvc_binary_extra { assert_eq!(out[2], 1); assert_eq!(out[3], 1); assert_eq!(read_le32(&out, 4), 333_333); - assert_eq!(read_le32(&out, 18), 1920 * 1080 * 2); + assert_eq!(read_le32(&out, 18), state.cfg.frame_size); assert_eq!(read_le32(&out, 22), state.cfg.max_packet); } @@ -360,6 +360,10 @@ mod uvc_binary_extra { with_var("LESAVKA_UVC_FRAME_MAX_AGE_MS", None::<&str>, || { assert_eq!(uvc_buffer_count(), 2); assert_eq!(uvc_idle_pump_sleep(), std::time::Duration::from_millis(2)); + assert_eq!( + uvc_queue_period(30), + Some(std::time::Duration::from_nanos(33_333_333)) + ); assert_eq!( frame_spool_max_age(), Some(std::time::Duration::from_millis(1_000)) @@ -375,9 +379,12 @@ mod uvc_binary_extra { with_var("LESAVKA_UVC_BUFFER_COUNT", Some("99"), || { with_var("LESAVKA_UVC_IDLE_PUMP_MS", Some("11"), || { with_var("LESAVKA_UVC_FRAME_MAX_AGE_MS", Some("0"), || { - assert_eq!(uvc_buffer_count(), 8); - assert_eq!(uvc_idle_pump_sleep(), std::time::Duration::from_millis(11)); - assert_eq!(frame_spool_max_age(), None); + with_var("LESAVKA_UVC_QUEUE_PACING", Some("0"), || { + assert_eq!(uvc_buffer_count(), 8); + assert_eq!(uvc_idle_pump_sleep(), std::time::Duration::from_millis(11)); + assert_eq!(uvc_queue_period(30), None); + assert_eq!(frame_spool_max_age(), None); + }); }); }); }); diff --git a/tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs b/tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs new file mode 100644 index 0000000..7445ed4 --- /dev/null +++ b/tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs @@ -0,0 +1,91 @@ +// Contract coverage for the RCT-side UVC artifact probe. +// +// Scope: the manual long-running UVC corruption detector used against tethys. +// Targets: `scripts/manual/run_rct_uvc_artifact_probe.py`. +// Why: lower-half UVC tears are late-path artifacts, so we need a durable +// host-side probe that can catch them from the same device Google Meet reads. + +use std::{fs, path::PathBuf, process::Command}; + +use serde_json::Value; + +const PROBE_SRC: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/scripts/manual/run_rct_uvc_artifact_probe.py" +)); + +fn repo_script_path() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("scripts/manual/run_rct_uvc_artifact_probe.py") +} + +#[test] +fn rct_uvc_artifact_probe_documents_late_path_lower_half_detection() { + for expected in [ + "lesavka.rct-uvc-artifact-probe.v1", + "Lesavka Composite", + "lower_delta_skew", + "lower_boundary_jump", + "lower_flat_flash", + "lower_slab", + "ffmpeg", + "v4l2", + "x11grab", + "--source", + "--crop", + "PGM", + "--host", + ] { + assert!( + PROBE_SRC.contains(expected), + "RCT UVC artifact probe should preserve marker {expected}" + ); + } +} + +#[test] +fn rct_uvc_artifact_probe_self_test_flags_synthetic_lower_half_slab() { + let dir = tempfile::tempdir().expect("tempdir"); + let output = Command::new("python3") + .arg(repo_script_path()) + .arg("--self-test") + .arg("--artifact-dir") + .arg(dir.path()) + .output() + .expect("run rct uvc artifact probe self-test"); + + assert!( + output.status.success(), + "probe self-test should succeed: stdout={} stderr={}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr) + ); + + let summary: Value = serde_json::from_str( + &fs::read_to_string(dir.path().join("summary.json")).expect("summary json"), + ) + .expect("parse summary json"); + assert_eq!( + summary["schema"], + "lesavka.rct-uvc-artifact-probe.self-test.v1" + ); + assert_eq!(summary["frames"], 5); + assert!( + summary["suspicious_frames"].as_u64().unwrap_or_default() >= 1, + "self-test should detect the synthetic lower-half slab: {summary}" + ); + let records = summary["records"].as_array().expect("records array"); + let corrupt = records + .iter() + .find(|record| record["suspicious"].as_bool().unwrap_or(false)) + .expect("suspicious record"); + let reasons = corrupt["reasons"].as_array().expect("reasons array"); + assert!( + reasons.iter().any(|reason| reason == "lower_slab") + || reasons.iter().any(|reason| reason == "lower_flat_flash"), + "self-test should classify the lower-half slab/flash: {corrupt}" + ); + assert!( + dir.path().join("selftest_suspicious_000005.pgm").exists(), + "probe should save visual evidence for suspicious frames" + ); +}