probe: classify mixed synthetic rct frames

This commit is contained in:
Brad Stein 2026-05-17 16:19:04 -03:00
parent 3ca0efc641
commit 5667608707
2 changed files with 244 additions and 15 deletions

View File

@ -24,6 +24,7 @@ DEFAULT_UVC_MAX_PACKET = 1024
MARKER_BITS = 32
MARKER_COLUMNS = 16
CADENCE_REASONS = {"frame_repeat", "frame_gap", "frame_backwards"}
NON_VISUAL_REASONS = CADENCE_REASONS | {"sequence_marker_mismatch"}
def parse_args() -> argparse.Namespace:
@ -80,6 +81,25 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--slab-var", type=float, default=20.0)
parser.add_argument("--shift-threshold", type=float, default=16.0)
parser.add_argument("--shift-improvement", type=float, default=1.25)
parser.add_argument(
"--sequence-window",
type=int,
default=3,
help="adjacent synthetic source-frame window to test when classifying mixed/teared frames",
)
parser.add_argument(
"--mix-mae-threshold",
type=float,
default=1.5,
help="minimum decoded-frame band MAE before an adjacent-frame improvement can count as a mixed-frame tear",
)
parser.add_argument(
"--mix-improvement",
type=float,
default=1.8,
help="required decoded-frame/best-adjacent MAE ratio for mixed-frame band classification",
)
parser.add_argument("--mix-min-bands", type=int, default=2)
parser.add_argument("--max-suspicious-artifacts", type=int, default=80)
parser.add_argument("--max-reference-artifacts", type=int, default=12)
parser.add_argument("--reference-every", type=int, default=900)
@ -201,6 +221,14 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int:
str(args.shift_threshold),
"--shift-improvement",
str(args.shift_improvement),
"--sequence-window",
str(args.sequence_window),
"--mix-mae-threshold",
str(args.mix_mae_threshold),
"--mix-improvement",
str(args.mix_improvement),
"--mix-min-bands",
str(args.mix_min_bands),
"--max-suspicious-artifacts",
str(args.max_suspicious_artifacts),
"--max-reference-artifacts",
@ -667,6 +695,124 @@ def best_expected_shift(frame: bytes, width: int, height: int, sequence: int, ar
return best_shift, zero, best, improvement
def candidate_sequences(sequence: int | None, previous_seq: int | None, args: argparse.Namespace) -> list[int]:
candidates: set[int] = set()
window = max(1, int(args.sequence_window))
if sequence is not None:
candidates.update(range(max(0, sequence - window), sequence + window + 1))
if previous_seq is not None:
candidates.update(range(max(0, previous_seq - 1), previous_seq + window + 2))
return sorted(candidates)
def best_sequence_delta(
frame: bytes,
width: int,
height: int,
candidates: list[int],
y0: int,
y1: int,
args: argparse.Namespace,
) -> tuple[int | None, float]:
best_seq: int | None = None
best_mae = float("inf")
for candidate in candidates:
mae = sampled_abs_delta_expected(frame, width, height, candidate, y0, y1, args.x_step, args.y_step)
if mae < best_mae:
best_mae = mae
best_seq = candidate
return best_seq, 0.0 if best_seq is None else best_mae
def band_sequence_profile(
frame: bytes,
width: int,
height: int,
sequence: int | None,
previous_seq: int | None,
args: argparse.Namespace,
) -> dict[str, Any]:
candidates = candidate_sequences(sequence, previous_seq, args)
if not candidates:
return {
"best_frame_sequence": None,
"best_frame_mae": 0.0,
"mixed_band_count": 0,
"mixed_band_run_pct": 0.0,
"band_sequence_counts": {},
"upper_dominant_sequence": None,
"lower_dominant_sequence": None,
"sequence_boundary_count": 0,
"sequence_marker_mismatch": False,
"reasons": [],
}
best_frame_sequence, best_frame_mae = best_sequence_delta(frame, width, height, candidates, 0, height, args)
band_count = max(8, args.bands)
band_h = max(1, height // band_count)
band_best_sequences: list[int | None] = []
mixed_flags: list[bool] = []
for band in range(band_count):
y0 = band * band_h
y1 = height if band == band_count - 1 else min(height, y0 + band_h)
best_seq, best_mae = best_sequence_delta(frame, width, height, candidates, y0, y1, args)
decoded_mae = (
sampled_abs_delta_expected(frame, width, height, sequence, y0, y1, args.x_step, args.y_step)
if sequence is not None
else float("inf")
)
improvement = decoded_mae / max(best_mae, 0.001)
is_mixed = (
sequence is not None
and best_seq is not None
and best_seq != sequence
and decoded_mae >= args.mix_mae_threshold
and improvement >= args.mix_improvement
)
band_best_sequences.append(best_seq)
mixed_flags.append(is_mixed)
counts = collections.Counter(seq for seq in band_best_sequences if seq is not None)
upper_counts = collections.Counter(seq for seq in band_best_sequences[: band_count // 2] if seq is not None)
lower_counts = collections.Counter(seq for seq in band_best_sequences[band_count // 2 :] if seq is not None)
upper_dominant = upper_counts.most_common(1)[0][0] if upper_counts else None
lower_dominant = lower_counts.most_common(1)[0][0] if lower_counts else None
mixed_band_count = sum(1 for flag in mixed_flags if flag)
mixed_run_pct = max_run(mixed_flags) / max(1, band_count)
sequence_boundary_count = sum(
1
for idx in range(1, len(band_best_sequences))
if band_best_sequences[idx] is not None
and band_best_sequences[idx - 1] is not None
and band_best_sequences[idx] != band_best_sequences[idx - 1]
)
reasons: list[str] = []
all_or_nearly_all_foreign = mixed_band_count >= max(1, int(band_count * 0.85))
if sequence is not None and best_frame_sequence is not None and best_frame_sequence != sequence and all_or_nearly_all_foreign:
reasons.append("sequence_marker_mismatch")
elif mixed_band_count >= max(1, args.mix_min_bands):
reasons.append("mixed_sequence_bands")
if lower_dominant is not None and upper_dominant == sequence and lower_dominant != sequence:
reasons.append("lower_half_frame_mix")
if upper_dominant is not None and lower_dominant == sequence and upper_dominant != sequence:
reasons.append("upper_half_frame_mix")
if sequence_boundary_count > 0:
reasons.append("sequence_boundary")
return {
"best_frame_sequence": best_frame_sequence,
"best_frame_mae": best_frame_mae,
"mixed_band_count": mixed_band_count,
"mixed_band_run_pct": mixed_run_pct,
"band_sequence_counts": dict(counts.most_common(6)),
"upper_dominant_sequence": upper_dominant,
"lower_dominant_sequence": lower_dominant,
"sequence_boundary_count": sequence_boundary_count,
"sequence_marker_mismatch": "sequence_marker_mismatch" in reasons,
"reasons": reasons,
}
def max_run(flags: list[bool]) -> int:
best = 0
current = 0
@ -684,14 +830,25 @@ def analyze_frame(
previous_seq: int | None,
) -> dict[str, Any]:
sequence, uncertain_bits = decode_sequence(frame, width, height)
max_plausible_step = max(120, args.sequence_window * 16)
marker_sequence_implausible = (
sequence is not None
and previous_seq is not None
and abs(sequence - previous_seq) > max_plausible_step
)
comparison_sequence = sequence
if marker_sequence_implausible:
comparison_sequence = previous_seq + 1 if previous_seq is not None else None
elif comparison_sequence is None and previous_seq is not None:
comparison_sequence = previous_seq + 1
upper_mae = lower_mae = total_mae = 0.0
shift_pixels = 0
shift_zero_delta = shift_best_delta = shift_improvement = 0.0
if sequence is not None:
upper_mae = sampled_abs_delta_expected(frame, width, height, sequence, 0, height // 2, args.x_step, args.y_step)
lower_mae = sampled_abs_delta_expected(frame, width, height, sequence, height // 2, height, args.x_step, args.y_step)
total_mae = sampled_abs_delta_expected(frame, width, height, sequence, 0, height, args.x_step, args.y_step)
shift_pixels, shift_zero_delta, shift_best_delta, shift_improvement = best_expected_shift(frame, width, height, sequence, args)
if comparison_sequence is not None:
upper_mae = sampled_abs_delta_expected(frame, width, height, comparison_sequence, 0, height // 2, args.x_step, args.y_step)
lower_mae = sampled_abs_delta_expected(frame, width, height, comparison_sequence, height // 2, height, args.x_step, args.y_step)
total_mae = sampled_abs_delta_expected(frame, width, height, comparison_sequence, 0, height, args.x_step, args.y_step)
shift_pixels, shift_zero_delta, shift_best_delta, shift_improvement = best_expected_shift(frame, width, height, comparison_sequence, args)
band_count = max(8, args.bands)
band_h = max(1, height // band_count)
@ -708,10 +865,13 @@ def analyze_frame(
low_var_run = max_run(lower_flags) / max(1, len(lower_flags))
mean_jumps = [abs(means[idx] - means[idx - 1]) for idx in range(1, band_count)]
max_lower_jump = max(mean_jumps[lower:] or [0.0])
sequence_profile = band_sequence_profile(frame, width, height, comparison_sequence, previous_seq, args)
reasons: list[str] = []
if sequence is None:
reasons.append("marker_decode_failed")
elif marker_sequence_implausible:
reasons.append("marker_sequence_implausible")
elif previous_seq is not None:
if sequence == previous_seq:
reasons.append("frame_repeat")
@ -728,7 +888,8 @@ def analyze_frame(
reasons.append("black_or_gray_slab")
if shift_pixels and shift_zero_delta > args.shift_threshold and shift_improvement > args.shift_improvement:
reasons.append("horizontal_shift")
visual_reasons = [reason for reason in reasons if reason not in CADENCE_REASONS]
reasons.extend(sequence_profile["reasons"])
visual_reasons = [reason for reason in reasons if reason not in NON_VISUAL_REASONS]
cadence_reasons = [reason for reason in reasons if reason in CADENCE_REASONS]
return {
"suspicious": bool(reasons),
@ -737,6 +898,8 @@ def analyze_frame(
"visual_reasons": visual_reasons,
"cadence_reasons": cadence_reasons,
"decoded_sequence": sequence,
"comparison_sequence": comparison_sequence,
"marker_sequence_implausible": marker_sequence_implausible,
"marker_uncertain_bits": uncertain_bits,
"upper_mae": round(upper_mae, 3),
"lower_mae": round(lower_mae, 3),
@ -747,6 +910,15 @@ def analyze_frame(
"shift_zero_delta": round(shift_zero_delta, 3),
"shift_best_delta": round(shift_best_delta, 3),
"shift_improvement": round(shift_improvement, 3),
"best_frame_sequence": sequence_profile["best_frame_sequence"],
"best_frame_mae": round(float(sequence_profile["best_frame_mae"]), 3),
"mixed_band_count": sequence_profile["mixed_band_count"],
"mixed_band_run_pct": round(float(sequence_profile["mixed_band_run_pct"]), 3),
"band_sequence_counts": sequence_profile["band_sequence_counts"],
"upper_dominant_sequence": sequence_profile["upper_dominant_sequence"],
"lower_dominant_sequence": sequence_profile["lower_dominant_sequence"],
"sequence_boundary_count": sequence_profile["sequence_boundary_count"],
"sequence_marker_mismatch": sequence_profile["sequence_marker_mismatch"],
}
@ -778,23 +950,32 @@ def run_capture(args: argparse.Namespace) -> int:
visual_reason_counts: collections.Counter[str] = collections.Counter()
cadence_reason_counts: collections.Counter[str] = collections.Counter()
sequence_counts: collections.Counter[int] = collections.Counter()
comparison_sequence_counts: collections.Counter[int] = collections.Counter()
max_total_mae = max_upper_mae = max_lower_mae = 0.0
max_mixed_band_count = 0
max_sequence_boundary_count = 0
worst: list[dict[str, Any]] = []
def analyze_captured_frame(frame: bytes, elapsed_s: float, metrics: Any) -> None:
nonlocal frame_index, suspicious_count, visual_suspicious_count, reference_artifacts, suspicious_artifacts
nonlocal previous_seq, decoded_frames, max_total_mae, max_upper_mae, max_lower_mae, worst
nonlocal max_mixed_band_count, max_sequence_boundary_count
frame_index += 1
result = analyze_frame(frame, capture_width, capture_height, args, previous_seq)
decoded_seq = result["decoded_sequence"]
comparison_seq = result["comparison_sequence"]
if decoded_seq is not None:
decoded_frames += 1
sequence_counts[int(decoded_seq)] += 1
previous_seq = int(decoded_seq)
if comparison_seq is not None:
comparison_sequence_counts[int(comparison_seq)] += 1
previous_seq = int(comparison_seq)
result.update({"frame": frame_index, "elapsed_s": round(elapsed_s, 3)})
max_total_mae = max(max_total_mae, float(result["total_mae"]))
max_upper_mae = max(max_upper_mae, float(result["upper_mae"]))
max_lower_mae = max(max_lower_mae, float(result["lower_mae"]))
max_mixed_band_count = max(max_mixed_band_count, int(result["mixed_band_count"]))
max_sequence_boundary_count = max(max_sequence_boundary_count, int(result["sequence_boundary_count"]))
if result["suspicious"]:
suspicious_count += 1
reason_counts.update(result["reasons"])
@ -805,12 +986,20 @@ def run_capture(args: argparse.Namespace) -> int:
if result["visual_suspicious"]:
visual_suspicious_count += 1
if result["visual_suspicious"] and suspicious_artifacts < args.max_suspicious_artifacts:
seq_label = "unknown" if decoded_seq is None else f"seq{decoded_seq:08d}"
seq_label = "unknown" if comparison_seq is None else f"seq{int(comparison_seq):08d}"
write_pgm(artifact_dir / f"suspicious_{frame_index:06d}_{seq_label}.pgm", frame, capture_width, capture_height)
if decoded_seq is not None:
if comparison_seq is not None:
write_pgm(
artifact_dir / f"expected_{frame_index:06d}_{seq_label}.pgm",
synthetic_gray(capture_width, capture_height, int(decoded_seq)),
synthetic_gray(capture_width, capture_height, int(comparison_seq)),
capture_width,
capture_height,
)
best_seq = result.get("best_frame_sequence")
if best_seq is not None and best_seq != comparison_seq:
write_pgm(
artifact_dir / f"expected_best_{frame_index:06d}_seq{int(best_seq):08d}.pgm",
synthetic_gray(capture_width, capture_height, int(best_seq)),
capture_width,
capture_height,
)
@ -901,9 +1090,12 @@ def run_capture(args: argparse.Namespace) -> int:
"visual_reason_counts": dict(visual_reason_counts),
"cadence_reason_counts": dict(cadence_reason_counts),
"decoded_sequence_counts": dict(sequence_counts.most_common(12)),
"comparison_sequence_counts": dict(comparison_sequence_counts.most_common(12)),
"max_total_mae": round(max_total_mae, 3),
"max_upper_mae": round(max_upper_mae, 3),
"max_lower_mae": round(max_lower_mae, 3),
"max_mixed_band_count": max_mixed_band_count,
"max_sequence_boundary_count": max_sequence_boundary_count,
"worst_frames": worst,
"reference_artifacts": reference_artifacts,
"suspicious_artifacts": suspicious_artifacts,
@ -932,6 +1124,8 @@ def format_summary(summary: dict[str, Any]) -> str:
f"visual reasons: {summary['visual_reason_counts']}",
f"cadence reasons: {summary['cadence_reason_counts']}",
f"max mae: total={summary['max_total_mae']} upper={summary['max_upper_mae']} lower={summary['max_lower_mae']}",
f"max mixed bands: {summary['max_mixed_band_count']} boundary_changes={summary['max_sequence_boundary_count']}",
f"comparison sequence counts: {summary['comparison_sequence_counts']}",
f"artifacts: {summary['artifact_dir']}",
"",
]
@ -953,13 +1147,18 @@ def run_self_test(args: argparse.Namespace) -> int:
src = min(width - 1, x + 24)
shifted[row + x] = expected[row + src]
frames.append(bytes(shifted))
mixed = bytearray(synthetic_gray(width, height, 8))
lower_next = synthetic_gray(width, height, 9)
split_y = height // 2
mixed[split_y * width :] = lower_next[split_y * width :]
frames.append(bytes(mixed))
previous_seq: int | None = None
records: list[dict[str, Any]] = []
suspicious = 0
for idx, frame in enumerate(frames):
result = analyze_frame(frame, width, height, args, previous_seq)
if result["decoded_sequence"] is not None:
previous_seq = int(result["decoded_sequence"])
if result["comparison_sequence"] is not None:
previous_seq = int(result["comparison_sequence"])
result["frame"] = idx
records.append(result)
suspicious += int(bool(result["suspicious"]))
@ -975,7 +1174,7 @@ def run_self_test(args: argparse.Namespace) -> int:
}
(artifact_dir / "summary.json").write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n")
print(json.dumps(summary, indent=2, sort_keys=True))
return 0 if suspicious >= 2 else 1
return 0 if suspicious >= 3 else 1
def main() -> int:

View File

@ -44,6 +44,10 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
"--jpeg-quality",
"--inject-max-frame-bytes",
"--stream-analyze",
"--sequence-window",
"--mix-mae-threshold",
"--mix-improvement",
"--mix-min-bands",
"--source",
"--mode",
"1280x720@20,1280x720@30,1920x1080@20,1920x1080@30",
@ -52,9 +56,16 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
"frame_gap",
"frame_backwards",
"lower_half_tear",
"mixed_sequence_bands",
"lower_half_frame_mix",
"upper_half_frame_mix",
"sequence_boundary",
"sequence_marker_mismatch",
"marker_sequence_implausible",
"black_or_gray_slab",
"horizontal_shift",
"expected_",
"expected_best_",
"suspicious_",
"decoded_pct",
"capture_mode",
@ -63,6 +74,17 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
"visual_suspicious",
"visual_reason_counts",
"cadence_reason_counts",
"best_frame_sequence",
"comparison_sequence",
"comparison_sequence_counts",
"best_frame_mae",
"mixed_band_count",
"mixed_band_run_pct",
"band_sequence_counts",
"upper_dominant_sequence",
"lower_dominant_sequence",
"max_mixed_band_count",
"max_sequence_boundary_count",
"diagnosis",
"encoded_oversize_frames",
"sent_frames",
@ -146,8 +168,8 @@ fn synthetic_probe_self_test_detects_slab_and_shift_categories() {
"lesavka.synthetic-rct-probe.self-test.v1"
);
assert!(
summary["suspicious_frames"].as_u64().unwrap_or_default() >= 2,
"self-test should detect at least the slab and shifted synthetic frames: {summary}"
summary["suspicious_frames"].as_u64().unwrap_or_default() >= 3,
"self-test should detect at least the slab, shifted, and mixed synthetic frames: {summary}"
);
let records = summary["records"].as_array().expect("records array");
assert!(
@ -162,4 +184,12 @@ fn synthetic_probe_self_test_detects_slab_and_shift_categories() {
.is_some_and(|reasons| reasons.iter().any(|reason| reason == "horizontal_shift"))),
"self-test should include a horizontal shift category: {summary}"
);
assert!(
records.iter().any(
|record| record["reasons"].as_array().is_some_and(|reasons| reasons
.iter()
.any(|reason| reason == "mixed_sequence_bands"))
),
"self-test should include a mixed-frame category: {summary}"
);
}