probe: decouple rct capture from analysis

This commit is contained in:
Brad Stein 2026-05-17 10:09:43 -03:00
parent f56bb4a400
commit bebba543fe
6 changed files with 124 additions and 57 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.22.52" version = "0.22.53"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.22.52" version = "0.22.53"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.22.52" version = "0.22.53"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.22.52" version = "0.22.53"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.22.52" version = "0.22.53"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -57,7 +57,12 @@ def parse_args() -> argparse.Namespace:
help="start RCT capture before synthetic uplink; default starts uplink first so superseded injectors fail fast", help="start RCT capture before synthetic uplink; default starts uplink first so superseded injectors fail fast",
) )
parser.add_argument("--inject-warmup-s", type=float, default=1.25) parser.add_argument("--inject-warmup-s", type=float, default=1.25)
parser.add_argument("--capture-finish-grace-s", type=float, default=5.0) parser.add_argument(
"--capture-finish-grace-s",
type=float,
default=0.0,
help="seconds to wait for capture after injector exits; 0 waits indefinitely",
)
parser.add_argument("--jpeg-quality", type=int, default=DEFAULT_JPEG_QUALITY) parser.add_argument("--jpeg-quality", type=int, default=DEFAULT_JPEG_QUALITY)
parser.add_argument( parser.add_argument(
"--inject-max-frame-bytes", "--inject-max-frame-bytes",
@ -78,6 +83,11 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--max-reference-artifacts", type=int, default=12) parser.add_argument("--max-reference-artifacts", type=int, default=12)
parser.add_argument("--reference-every", type=int, default=900) parser.add_argument("--reference-every", type=int, default=900)
parser.add_argument("--progress-every", type=int, default=150) parser.add_argument("--progress-every", type=int, default=150)
parser.add_argument(
"--stream-analyze",
action="store_true",
help="debug path: analyze ffmpeg stdout directly instead of spooling raw frames first",
)
parser.add_argument("--capture-only", action="store_true", help=argparse.SUPPRESS) parser.add_argument("--capture-only", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--self-test", action="store_true") parser.add_argument("--self-test", action="store_true")
return parser.parse_args() return parser.parse_args()
@ -199,6 +209,8 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int:
"--progress-every", "--progress-every",
str(args.progress_every), str(args.progress_every),
] ]
if args.stream_analyze:
capture_cmd.append("--stream-analyze")
inject_cmd = [ inject_cmd = [
args.inject_binary, args.inject_binary,
"--server", "--server",
@ -267,7 +279,9 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int:
inject_status = inject_process.poll() inject_status = inject_process.poll()
if inject_status is not None: if inject_status is not None:
if inject_status == 0: if inject_status == 0:
deadline = time.monotonic() + max(0.0, args.capture_finish_grace_s) if args.capture_finish_grace_s <= 0:
return capture_process.wait(), inject_status
deadline = time.monotonic() + args.capture_finish_grace_s
while time.monotonic() < deadline: while time.monotonic() < deadline:
capture_status = capture_process.poll() capture_status = capture_process.poll()
if capture_status is not None: if capture_status is not None:
@ -708,10 +722,13 @@ def run_capture(args: argparse.Namespace) -> int:
artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else pathlib.Path("/tmp") / f"lesavka-synthetic-rct-capture-{timestamp()}" artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else pathlib.Path("/tmp") / f"lesavka-synthetic-rct-capture-{timestamp()}"
artifact_dir.mkdir(parents=True, exist_ok=True) artifact_dir.mkdir(parents=True, exist_ok=True)
frame_size = capture_width * capture_height frame_size = capture_width * capture_height
(artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n")
stderr_path = artifact_dir / "ffmpeg.stderr" stderr_path = artifact_dir / "ffmpeg.stderr"
metrics_path = artifact_dir / "frame-metrics.jsonl" metrics_path = artifact_dir / "frame-metrics.jsonl"
started = time.monotonic() capture_started = time.monotonic()
capture_elapsed = 0.0
analysis_elapsed = 0.0
raw_capture_bytes = 0
ffmpeg_rc: int | None = None
frame_index = 0 frame_index = 0
suspicious_count = 0 suspicious_count = 0
reference_artifacts = 0 reference_artifacts = 0
@ -722,65 +739,111 @@ def run_capture(args: argparse.Namespace) -> int:
sequence_counts: collections.Counter[int] = collections.Counter() sequence_counts: collections.Counter[int] = collections.Counter()
max_total_mae = max_upper_mae = max_lower_mae = 0.0 max_total_mae = max_upper_mae = max_lower_mae = 0.0
worst: list[dict[str, Any]] = [] worst: list[dict[str, Any]] = []
with stderr_path.open("wb") as err, metrics_path.open("w") as metrics:
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) def analyze_captured_frame(frame: bytes, elapsed_s: float, metrics: Any) -> None:
assert proc.stdout is not None nonlocal frame_index, suspicious_count, reference_artifacts, suspicious_artifacts
try: nonlocal previous_seq, decoded_frames, max_total_mae, max_upper_mae, max_lower_mae, worst
while time.monotonic() - started < args.duration: frame_index += 1
frame = proc.stdout.read(frame_size) result = analyze_frame(frame, capture_width, capture_height, args, previous_seq)
if len(frame) != frame_size: decoded_seq = result["decoded_sequence"]
break if decoded_seq is not None:
frame_index += 1 decoded_frames += 1
result = analyze_frame(frame, capture_width, capture_height, args, previous_seq) sequence_counts[int(decoded_seq)] += 1
decoded_seq = result["decoded_sequence"] previous_seq = int(decoded_seq)
result.update({"frame": frame_index, "elapsed_s": round(elapsed_s, 3)})
max_total_mae = max(max_total_mae, float(result["total_mae"]))
max_upper_mae = max(max_upper_mae, float(result["upper_mae"]))
max_lower_mae = max(max_lower_mae, float(result["lower_mae"]))
if result["suspicious"]:
suspicious_count += 1
reason_counts.update(result["reasons"])
worst.append(result)
worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30]
if suspicious_artifacts < args.max_suspicious_artifacts:
seq_label = "unknown" if decoded_seq is None else f"seq{decoded_seq:08d}"
write_pgm(artifact_dir / f"suspicious_{frame_index:06d}_{seq_label}.pgm", frame, capture_width, capture_height)
if decoded_seq is not None: if decoded_seq is not None:
decoded_frames += 1 write_pgm(
sequence_counts[int(decoded_seq)] += 1 artifact_dir / f"expected_{frame_index:06d}_{seq_label}.pgm",
previous_seq = int(decoded_seq) synthetic_gray(capture_width, capture_height, int(decoded_seq)),
result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)}) capture_width,
max_total_mae = max(max_total_mae, float(result["total_mae"])) capture_height,
max_upper_mae = max(max_upper_mae, float(result["upper_mae"])) )
max_lower_mae = max(max_lower_mae, float(result["lower_mae"])) suspicious_artifacts += 1
if result["suspicious"]: should_reference = frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0)
suspicious_count += 1 if should_reference and reference_artifacts < args.max_reference_artifacts:
reason_counts.update(result["reasons"]) write_pgm(artifact_dir / f"reference_{frame_index:06d}.pgm", frame, capture_width, capture_height)
worst.append(result) reference_artifacts += 1
worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30] metrics.write(json.dumps(result, sort_keys=True) + "\n")
if suspicious_artifacts < args.max_suspicious_artifacts: if frame_index % args.progress_every == 0:
seq_label = "unknown" if decoded_seq is None else f"seq{decoded_seq:08d}" print(f"frames={frame_index} suspicious={suspicious_count} latest={result}", file=sys.stderr)
write_pgm(artifact_dir / f"suspicious_{frame_index:06d}_{seq_label}.pgm", frame, capture_width, capture_height)
if decoded_seq is not None: with stderr_path.open("wb") as err, metrics_path.open("w") as metrics:
write_pgm( if args.stream_analyze:
artifact_dir / f"expected_{frame_index:06d}_{seq_label}.pgm", (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n")
synthetic_gray(capture_width, capture_height, int(decoded_seq)), proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err)
capture_width, assert proc.stdout is not None
capture_height, capture_started = time.monotonic()
)
suspicious_artifacts += 1
should_reference = frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0)
if should_reference and reference_artifacts < args.max_reference_artifacts:
write_pgm(artifact_dir / f"reference_{frame_index:06d}.pgm", frame, capture_width, capture_height)
reference_artifacts += 1
metrics.write(json.dumps(result, sort_keys=True) + "\n")
if frame_index % args.progress_every == 0:
print(f"frames={frame_index} suspicious={suspicious_count} latest={result}", file=sys.stderr)
finally:
proc.terminate()
try: try:
proc.wait(timeout=3) while time.monotonic() - capture_started < args.duration:
except subprocess.TimeoutExpired: frame = proc.stdout.read(frame_size)
proc.kill() if len(frame) != frame_size:
elapsed = max(0.001, time.monotonic() - started) break
analyze_captured_frame(frame, time.monotonic() - capture_started, metrics)
finally:
proc.terminate()
try:
ffmpeg_rc = proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
ffmpeg_rc = proc.wait()
capture_elapsed = time.monotonic() - capture_started
analysis_elapsed = capture_elapsed
else:
raw_path = artifact_dir / "capture.raw"
capture_command = command[:]
if "-an" in capture_command:
capture_command[capture_command.index("-an") : capture_command.index("-an")] = ["-t", str(args.duration)]
else:
capture_command[-1:-1] = ["-t", str(args.duration)]
capture_command[-1] = str(raw_path)
(artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in capture_command) + "\n")
print(f"capturing raw RCT frames before analysis: {raw_path}", file=sys.stderr)
capture_started = time.monotonic()
proc = subprocess.run(capture_command, stdout=subprocess.DEVNULL, stderr=err, check=False)
capture_elapsed = time.monotonic() - capture_started
ffmpeg_rc = proc.returncode
raw_capture_bytes = raw_path.stat().st_size if raw_path.exists() else 0
print(
f"analyzing captured raw RCT frames bytes={raw_capture_bytes} capture_s={capture_elapsed:.3f}",
file=sys.stderr,
)
analysis_started = time.monotonic()
try:
with raw_path.open("rb") as raw:
while True:
frame = raw.read(frame_size)
if len(frame) != frame_size:
break
analyze_captured_frame(frame, frame_index / max(1, fps), metrics)
finally:
raw_path.unlink(missing_ok=True)
analysis_elapsed = time.monotonic() - analysis_started
elapsed = max(0.001, capture_elapsed)
summary = { summary = {
"schema": "lesavka.synthetic-rct-capture.v1", "schema": "lesavka.synthetic-rct-capture.v1",
"source": args.source, "source": args.source,
"device": device, "device": device,
"mode": args.mode, "mode": args.mode,
"capture_mode": "stream" if args.stream_analyze else "rawfile",
"width": capture_width, "width": capture_width,
"height": capture_height, "height": capture_height,
"fps_requested": fps, "fps_requested": fps,
"duration_requested_s": args.duration, "duration_requested_s": args.duration,
"duration_observed_s": round(elapsed, 3), "duration_observed_s": round(elapsed, 3),
"analysis_duration_s": round(analysis_elapsed, 3),
"ffmpeg_rc": ffmpeg_rc,
"raw_capture_bytes": raw_capture_bytes,
"frames": frame_index, "frames": frame_index,
"fps_observed": round(frame_index / elapsed, 3), "fps_observed": round(frame_index / elapsed, 3),
"decoded_frames": decoded_frames, "decoded_frames": decoded_frames,

View File

@ -16,7 +16,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.22.52" version = "0.22.53"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -43,6 +43,7 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
"--capture-finish-grace-s", "--capture-finish-grace-s",
"--jpeg-quality", "--jpeg-quality",
"--inject-max-frame-bytes", "--inject-max-frame-bytes",
"--stream-analyze",
"--source", "--source",
"--mode", "--mode",
"1280x720@20,1280x720@30,1920x1080@20,1920x1080@30", "1280x720@20,1280x720@30,1920x1080@20,1920x1080@30",
@ -56,6 +57,9 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() {
"expected_", "expected_",
"suspicious_", "suspicious_",
"decoded_pct", "decoded_pct",
"capture_mode",
"raw_capture_bytes",
"analysis_duration_s",
"diagnosis", "diagnosis",
"encoded_oversize_frames", "encoded_oversize_frames",
"sent_frames", "sent_frames",