tools: harden rct uvc artifact probe

This commit is contained in:
Brad Stein 2026-05-15 19:52:15 -03:00
parent 850a9fcd42
commit 6e56ed0825
2 changed files with 51 additions and 0 deletions

View File

@ -44,7 +44,10 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--flat-var", type=float, default=18.0)
parser.add_argument("--delta-threshold", type=float, default=24.0)
parser.add_argument("--jump-threshold", type=float, default=34.0)
parser.add_argument("--change-threshold", type=float, default=1.0)
parser.add_argument("--max-suspicious-artifacts", type=int, default=40)
parser.add_argument("--max-reference-artifacts", type=int, default=12)
parser.add_argument("--reference-every", type=int, default=900)
parser.add_argument("--progress-every", type=int, default=150)
parser.add_argument("--self-test", action="store_true")
return parser.parse_args()
@ -330,8 +333,14 @@ def run_capture(args: argparse.Namespace) -> int:
frame_index = 0
suspicious_count = 0
artifacts_written = 0
reference_artifacts_written = 0
changed_frames = 0
static_frames = 0
reason_counts: collections.Counter[str] = collections.Counter()
worst: list[dict[str, Any]] = []
max_upper_delta = 0.0
max_lower_delta = 0.0
max_lower_jump_seen = 0.0
with stderr_path.open("wb") as err, jsonl_path.open("w") as metrics:
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err)
assert proc.stdout is not None
@ -344,6 +353,14 @@ def run_capture(args: argparse.Namespace) -> int:
result = analyze_frame(frame, previous, args)
previous = frame
result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)})
max_upper_delta = max(max_upper_delta, float(result["upper_delta"]))
max_lower_delta = max(max_lower_delta, float(result["lower_delta"]))
max_lower_jump_seen = max(max_lower_jump_seen, float(result["max_lower_jump"]))
if frame_index > 1:
if float(result["upper_delta"]) + float(result["lower_delta"]) >= args.change_threshold:
changed_frames += 1
else:
static_frames += 1
if result["suspicious"]:
suspicious_count += 1
reason_counts.update(result["reasons"])
@ -361,6 +378,18 @@ def run_capture(args: argparse.Namespace) -> int:
args.height,
)
artifacts_written += 1
should_write_reference = (
frame_index == 1
or (args.reference_every > 0 and frame_index % args.reference_every == 0)
)
if should_write_reference and reference_artifacts_written < args.max_reference_artifacts:
write_pgm(
artifact_dir / f"reference_{frame_index:06d}.pgm",
frame,
args.width,
args.height,
)
reference_artifacts_written += 1
if frame_index <= 5 or result["suspicious"] or frame_index % args.progress_every == 0:
metrics.write(json.dumps(result, sort_keys=True) + "\n")
if frame_index % args.progress_every == 0:
@ -388,8 +417,16 @@ def run_capture(args: argparse.Namespace) -> int:
"fps_observed": round(frame_index / elapsed, 3),
"suspicious_frames": suspicious_count,
"suspicious_pct": round((suspicious_count / frame_index * 100.0) if frame_index else 0.0, 3),
"changed_frames": changed_frames,
"static_frames": static_frames,
"static_pct": round((static_frames / max(1, frame_index - 1) * 100.0) if frame_index > 1 else 0.0, 3),
"max_upper_delta": round(max_upper_delta, 3),
"max_lower_delta": round(max_lower_delta, 3),
"max_lower_jump_seen": round(max_lower_jump_seen, 3),
"reason_counts": dict(reason_counts),
"worst_frames": worst,
"reference_artifacts": reference_artifacts_written,
"suspicious_artifacts": artifacts_written,
"artifact_dir": str(artifact_dir),
"ffmpeg_stderr": str(stderr_path),
}
@ -408,7 +445,11 @@ def format_summary(summary: dict[str, Any]) -> str:
f"mode: {summary['width']}x{summary['height']}@{summary['fps_requested']}",
f"frames: {summary['frames']} ({summary['fps_observed']} fps observed)",
f"suspicious: {summary['suspicious_frames']} ({summary['suspicious_pct']}%)",
f"static: {summary.get('static_frames', 0)} ({summary.get('static_pct', 0.0)}%)",
f"max deltas: upper={summary.get('max_upper_delta', 0.0)} lower={summary.get('max_lower_delta', 0.0)}",
f"reasons: {summary['reason_counts']}",
f"reference artifacts: {summary.get('reference_artifacts', 0)}",
f"suspicious artifacts: {summary.get('suspicious_artifacts', 0)}",
f"artifacts: {summary['artifact_dir']}",
"",
]
@ -443,6 +484,8 @@ def run_self_test(args: argparse.Namespace) -> int:
previous = frame
result["frame"] = idx
records.append(result)
if idx == 1:
write_pgm(artifact_dir / "reference_000001.pgm", frame, args.width, args.height)
if result["suspicious"]:
suspicious += 1
write_pgm(artifact_dir / f"selftest_suspicious_{idx:06d}.pgm", frame, args.width, args.height)

View File

@ -30,6 +30,10 @@ fn rct_uvc_artifact_probe_documents_late_path_lower_half_detection() {
"ffmpeg",
"v4l2",
"x11grab",
"--change-threshold",
"--reference-every",
"static_pct",
"reference_",
"--source",
"--crop",
"PGM",
@ -73,6 +77,10 @@ fn rct_uvc_artifact_probe_self_test_flags_synthetic_lower_half_slab() {
summary["suspicious_frames"].as_u64().unwrap_or_default() >= 1,
"self-test should detect the synthetic lower-half slab: {summary}"
);
assert!(
dir.path().join("reference_000001.pgm").exists(),
"probe should save a reference frame so no-artifact runs prove the crop"
);
let records = summary["records"].as_array().expect("records array");
let corrupt = records
.iter()