diff --git a/scripts/manual/run_rct_uvc_artifact_probe.py b/scripts/manual/run_rct_uvc_artifact_probe.py index 3e0913d..ea6d01d 100755 --- a/scripts/manual/run_rct_uvc_artifact_probe.py +++ b/scripts/manual/run_rct_uvc_artifact_probe.py @@ -44,6 +44,11 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--flat-var", type=float, default=18.0) parser.add_argument("--delta-threshold", type=float, default=24.0) parser.add_argument("--jump-threshold", type=float, default=34.0) + parser.add_argument("--tear-threshold", type=float, default=18.0) + parser.add_argument("--shift-threshold", type=float, default=10.0) + parser.add_argument("--shift-improvement", type=float, default=1.35) + parser.add_argument("--blur-delta-threshold", type=float, default=6.0) + parser.add_argument("--blur-var-ratio", type=float, default=0.45) parser.add_argument("--change-threshold", type=float, default=1.0) parser.add_argument("--max-suspicious-artifacts", type=int, default=40) parser.add_argument("--max-reference-artifacts", type=int, default=12) @@ -107,8 +112,24 @@ def run_remote(args: argparse.Namespace) -> int: str(args.delta_threshold), "--jump-threshold", str(args.jump_threshold), + "--tear-threshold", + str(args.tear_threshold), + "--shift-threshold", + str(args.shift_threshold), + "--shift-improvement", + str(args.shift_improvement), + "--blur-delta-threshold", + str(args.blur_delta_threshold), + "--blur-var-ratio", + str(args.blur_var_ratio), + "--change-threshold", + str(args.change_threshold), "--max-suspicious-artifacts", str(args.max_suspicious_artifacts), + "--max-reference-artifacts", + str(args.max_reference_artifacts), + "--reference-every", + str(args.reference_every), "--progress-every", str(args.progress_every), ] @@ -178,6 +199,76 @@ def band_delta( return total / count if count else 0.0 +def band_deltas( + frame: bytes, + previous: bytes | None, + width: int, + height: int, + band_count: int, + x_step: int, + y_step: int, +) -> list[float]: + if previous is None: + return [0.0] * band_count + band_h = max(1, height // band_count) + values: list[float] = [] + for band in range(band_count): + y0 = band * band_h + y1 = height if band == band_count - 1 else min(height, y0 + band_h) + values.append(band_delta(frame, previous, width, y0, y1, x_step, y_step)) + return values + + +def shifted_delta( + frame: bytes, + previous: bytes, + width: int, + y0: int, + y1: int, + shift: int, + x_step: int, + y_step: int, +) -> float: + x0 = max(0, -shift) + x1 = min(width, width - shift) + if x0 >= x1: + return 0.0 + total = 0 + count = 0 + view = memoryview(frame) + prev = memoryview(previous) + for y in range(y0, y1, y_step): + row = y * width + for x in range(x0, x1, x_step): + total += abs(view[row + x] - prev[row + x + shift]) + count += 1 + return total / count if count else 0.0 + + +def best_shift_match( + frame: bytes, + previous: bytes | None, + width: int, + height: int, + x_step: int, + y_step: int, +) -> tuple[int, float, float, float]: + if previous is None: + return 0, 0.0, 0.0, 0.0 + y0 = height // 4 + y1 = height + zero = shifted_delta(frame, previous, width, y0, y1, 0, x_step, y_step) + best = zero + best_shift = 0 + for shift in [-96, -80, -64, -48, -32, -24, -16, -12, -8, 8, 12, 16, 24, 32, 48, 64, 80, 96]: + candidate = shifted_delta(frame, previous, width, y0, y1, shift, x_step, y_step) + if candidate < best: + best = candidate + best_shift = shift + improvement = zero / max(best, 0.001) if best_shift else 1.0 + return best_shift, zero, best, improvement + + def max_run(flags: list[bool]) -> int: best = 0 cur = 0 @@ -207,11 +298,19 @@ def analyze_frame(frame: bytes, previous: bytes | None, args: argparse.Namespace lower_flat_run_pct = max_run(lower_flags) / max(1, len(lower_flags)) upper_delta = band_delta(frame, previous, width, 0, height // 2, args.x_step, args.y_step) lower_delta = band_delta(frame, previous, width, height // 2, height, args.x_step, args.y_step) + temporal_deltas = band_deltas(frame, previous, width, height, band_count, args.x_step, args.y_step) jumps = [abs(means[idx] - means[idx - 1]) for idx in range(1, band_count)] lower_jumps = jumps[half:] max_lower_jump = max(lower_jumps or [0.0]) sorted_jumps = sorted(jumps) median_jump = sorted_jumps[len(sorted_jumps) // 2] if sorted_jumps else 0.0 + sorted_temporal = sorted(temporal_deltas) + median_temporal_delta = sorted_temporal[len(sorted_temporal) // 2] if sorted_temporal else 0.0 + max_temporal_delta = max(temporal_deltas or [0.0]) + max_temporal_band = temporal_deltas.index(max_temporal_delta) if temporal_deltas else 0 + shift_pixels, shift_zero_delta, shift_best_delta, shift_improvement = best_shift_match( + frame, previous, width, height, args.x_step, args.y_step + ) reasons: list[str] = [] temporal_lower_jump = lower_delta > args.delta_threshold and lower_delta > max(upper_delta * 2.2, 8.0) @@ -223,6 +322,23 @@ def analyze_frame(frame: bytes, previous: bytes | None, args: argparse.Namespace ) lower_flat_flash = lower_flat_pct >= 0.25 and temporal_lower_jump lower_slab = lower_flat_run_pct >= 0.33 and max_lower_jump > args.jump_threshold and temporal_lower_jump + temporal_delta_spike = ( + previous is not None + and max_temporal_delta > args.tear_threshold + and max_temporal_delta > max(median_temporal_delta * 3.2, 6.0) + ) + horizontal_shift = ( + bool(shift_pixels) + and shift_zero_delta > args.shift_threshold + and shift_improvement > args.shift_improvement + ) + upper_variance_mean = sum(variances[:half]) / max(1, len(variances[:half])) + lower_blur_flash = ( + previous is not None + and lower_delta > args.blur_delta_threshold + and lower_delta > max(upper_delta * 1.35, 2.0) + and min(variances[half:] or [0.0]) < max(args.flat_var * 4.0, upper_variance_mean * args.blur_var_ratio) + ) if lower_delta_skew: reasons.append("lower_delta_skew") if lower_boundary_jump: @@ -231,8 +347,22 @@ def analyze_frame(frame: bytes, previous: bytes | None, args: argparse.Namespace reasons.append("lower_flat_flash") if lower_slab: reasons.append("lower_slab") + if temporal_delta_spike: + reasons.append("temporal_delta_spike") + if horizontal_shift: + reasons.append("horizontal_shift") + if lower_blur_flash: + reasons.append("lower_blur_flash") - suspicious = bool(lower_delta_skew or lower_boundary_jump or lower_flat_flash or lower_slab) + suspicious = bool( + lower_delta_skew + or lower_boundary_jump + or lower_flat_flash + or lower_slab + or temporal_delta_spike + or horizontal_shift + or lower_blur_flash + ) return { "suspicious": suspicious, "reasons": reasons, @@ -242,6 +372,13 @@ def analyze_frame(frame: bytes, previous: bytes | None, args: argparse.Namespace "lower_flat_run_pct": round(lower_flat_run_pct, 3), "max_lower_jump": round(max_lower_jump, 3), "median_band_jump": round(median_jump, 3), + "max_temporal_delta": round(max_temporal_delta, 3), + "max_temporal_band": max_temporal_band, + "median_temporal_delta": round(median_temporal_delta, 3), + "shift_pixels": shift_pixels, + "shift_zero_delta": round(shift_zero_delta, 3), + "shift_best_delta": round(shift_best_delta, 3), + "shift_improvement": round(shift_improvement, 3), "lower_variance_min": round(min(variances[half:] or [0.0]), 3), "lower_variance_mean": round(sum(variances[half:]) / max(1, len(variances[half:])), 3), } @@ -469,13 +606,37 @@ def synthetic_frame(width: int, height: int, shift: int = 0, corrupt: bool = Fal return bytes(data) +def synthetic_shift_frame(previous: bytes, width: int, height: int, shift: int) -> bytes: + data = bytearray(width * height) + for y in range(height): + row = y * width + for x in range(width): + src = min(width - 1, max(0, x + shift)) + data[row + x] = previous[row + src] + return bytes(data) + + +def synthetic_rich_frame(width: int, height: int) -> bytes: + data = bytearray(width * height) + for y in range(height): + row = y * width + for x in range(width): + data[row + x] = ((x * 13) ^ (y * 7) ^ ((x * y) // 11)) % 256 + return bytes(data) + + def run_self_test(args: argparse.Namespace) -> int: artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else default_artifact_dir() artifact_dir.mkdir(parents=True, exist_ok=True) args.width = 160 args.height = 90 + args.shift_threshold = 3.0 + args.tear_threshold = 8.0 frames = [synthetic_frame(args.width, args.height, idx) for idx in range(4)] frames.append(synthetic_frame(args.width, args.height, 5, corrupt=True)) + rich = synthetic_rich_frame(args.width, args.height) + frames.append(rich) + frames.append(synthetic_shift_frame(rich, args.width, args.height, 24)) previous = None suspicious = 0 records = [] diff --git a/tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs b/tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs index f965b0d..314c072 100644 --- a/tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs +++ b/tests/manual/server/rct/rct_uvc_artifact_probe_manual_contract.rs @@ -27,9 +27,15 @@ fn rct_uvc_artifact_probe_documents_late_path_lower_half_detection() { "lower_boundary_jump", "lower_flat_flash", "lower_slab", + "temporal_delta_spike", + "horizontal_shift", + "lower_blur_flash", "ffmpeg", "v4l2", "x11grab", + "--tear-threshold", + "--shift-threshold", + "--blur-delta-threshold", "--change-threshold", "--reference-every", "static_pct", @@ -72,7 +78,7 @@ fn rct_uvc_artifact_probe_self_test_flags_synthetic_lower_half_slab() { summary["schema"], "lesavka.rct-uvc-artifact-probe.self-test.v1" ); - assert_eq!(summary["frames"], 5); + assert_eq!(summary["frames"], 7); assert!( summary["suspicious_frames"].as_u64().unwrap_or_default() >= 1, "self-test should detect the synthetic lower-half slab: {summary}" @@ -92,6 +98,24 @@ fn rct_uvc_artifact_probe_self_test_flags_synthetic_lower_half_slab() { || reasons.iter().any(|reason| reason == "lower_flat_flash"), "self-test should classify the lower-half slab/flash: {corrupt}" ); + assert!( + records.iter().any(|record| { + record["reasons"] + .as_array() + .is_some_and(|reasons| reasons.iter().any(|reason| reason == "horizontal_shift")) + }), + "self-test should catch shifted frames: {summary}" + ); + assert!( + records.iter().any(|record| { + record["reasons"].as_array().is_some_and(|reasons| { + reasons + .iter() + .any(|reason| reason == "temporal_delta_spike") + }) + }), + "self-test should catch transient tear spikes: {summary}" + ); assert!( dir.path().join("selftest_suspicious_000005.pgm").exists(), "probe should save visual evidence for suspicious frames"