diff --git a/Cargo.lock b/Cargo.lock index 9549099..01b30ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.52" +version = "0.22.53" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.52" +version = "0.22.53" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.52" +version = "0.22.53" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index e1f4e7b..604f141 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.52" +version = "0.22.53" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 7e23786..1f4ffd8 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.52" +version = "0.22.53" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_synthetic_rct_uvc_probe.py b/scripts/manual/run_synthetic_rct_uvc_probe.py index 59149cf..2b3674b 100755 --- a/scripts/manual/run_synthetic_rct_uvc_probe.py +++ b/scripts/manual/run_synthetic_rct_uvc_probe.py @@ -57,7 +57,12 @@ def parse_args() -> argparse.Namespace: help="start RCT capture before synthetic uplink; default starts uplink first so superseded injectors fail fast", ) parser.add_argument("--inject-warmup-s", type=float, default=1.25) - parser.add_argument("--capture-finish-grace-s", type=float, default=5.0) + parser.add_argument( + "--capture-finish-grace-s", + type=float, + default=0.0, + help="seconds to wait for capture after injector exits; 0 waits indefinitely", + ) parser.add_argument("--jpeg-quality", type=int, default=DEFAULT_JPEG_QUALITY) parser.add_argument( "--inject-max-frame-bytes", @@ -78,6 +83,11 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--max-reference-artifacts", type=int, default=12) parser.add_argument("--reference-every", type=int, default=900) parser.add_argument("--progress-every", type=int, default=150) + parser.add_argument( + "--stream-analyze", + action="store_true", + help="debug path: analyze ffmpeg stdout directly instead of spooling raw frames first", + ) parser.add_argument("--capture-only", action="store_true", help=argparse.SUPPRESS) parser.add_argument("--self-test", action="store_true") return parser.parse_args() @@ -199,6 +209,8 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int: "--progress-every", str(args.progress_every), ] + if args.stream_analyze: + capture_cmd.append("--stream-analyze") inject_cmd = [ args.inject_binary, "--server", @@ -267,7 +279,9 @@ def run_remote_orchestrated(args: argparse.Namespace) -> int: inject_status = inject_process.poll() if inject_status is not None: if inject_status == 0: - deadline = time.monotonic() + max(0.0, args.capture_finish_grace_s) + if args.capture_finish_grace_s <= 0: + return capture_process.wait(), inject_status + deadline = time.monotonic() + args.capture_finish_grace_s while time.monotonic() < deadline: capture_status = capture_process.poll() if capture_status is not None: @@ -708,10 +722,13 @@ def run_capture(args: argparse.Namespace) -> int: artifact_dir = pathlib.Path(args.artifact_dir) if args.artifact_dir else pathlib.Path("/tmp") / f"lesavka-synthetic-rct-capture-{timestamp()}" artifact_dir.mkdir(parents=True, exist_ok=True) frame_size = capture_width * capture_height - (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") stderr_path = artifact_dir / "ffmpeg.stderr" metrics_path = artifact_dir / "frame-metrics.jsonl" - started = time.monotonic() + capture_started = time.monotonic() + capture_elapsed = 0.0 + analysis_elapsed = 0.0 + raw_capture_bytes = 0 + ffmpeg_rc: int | None = None frame_index = 0 suspicious_count = 0 reference_artifacts = 0 @@ -722,65 +739,111 @@ def run_capture(args: argparse.Namespace) -> int: sequence_counts: collections.Counter[int] = collections.Counter() max_total_mae = max_upper_mae = max_lower_mae = 0.0 worst: list[dict[str, Any]] = [] - with stderr_path.open("wb") as err, metrics_path.open("w") as metrics: - proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) - assert proc.stdout is not None - try: - while time.monotonic() - started < args.duration: - frame = proc.stdout.read(frame_size) - if len(frame) != frame_size: - break - frame_index += 1 - result = analyze_frame(frame, capture_width, capture_height, args, previous_seq) - decoded_seq = result["decoded_sequence"] + + def analyze_captured_frame(frame: bytes, elapsed_s: float, metrics: Any) -> None: + nonlocal frame_index, suspicious_count, reference_artifacts, suspicious_artifacts + nonlocal previous_seq, decoded_frames, max_total_mae, max_upper_mae, max_lower_mae, worst + frame_index += 1 + result = analyze_frame(frame, capture_width, capture_height, args, previous_seq) + decoded_seq = result["decoded_sequence"] + if decoded_seq is not None: + decoded_frames += 1 + sequence_counts[int(decoded_seq)] += 1 + previous_seq = int(decoded_seq) + result.update({"frame": frame_index, "elapsed_s": round(elapsed_s, 3)}) + max_total_mae = max(max_total_mae, float(result["total_mae"])) + max_upper_mae = max(max_upper_mae, float(result["upper_mae"])) + max_lower_mae = max(max_lower_mae, float(result["lower_mae"])) + if result["suspicious"]: + suspicious_count += 1 + reason_counts.update(result["reasons"]) + worst.append(result) + worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30] + if suspicious_artifacts < args.max_suspicious_artifacts: + seq_label = "unknown" if decoded_seq is None else f"seq{decoded_seq:08d}" + write_pgm(artifact_dir / f"suspicious_{frame_index:06d}_{seq_label}.pgm", frame, capture_width, capture_height) if decoded_seq is not None: - decoded_frames += 1 - sequence_counts[int(decoded_seq)] += 1 - previous_seq = int(decoded_seq) - result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)}) - max_total_mae = max(max_total_mae, float(result["total_mae"])) - max_upper_mae = max(max_upper_mae, float(result["upper_mae"])) - max_lower_mae = max(max_lower_mae, float(result["lower_mae"])) - if result["suspicious"]: - suspicious_count += 1 - reason_counts.update(result["reasons"]) - worst.append(result) - worst = sorted(worst, key=lambda item: (item["lower_mae"], item["total_mae"]), reverse=True)[:30] - if suspicious_artifacts < args.max_suspicious_artifacts: - seq_label = "unknown" if decoded_seq is None else f"seq{decoded_seq:08d}" - write_pgm(artifact_dir / f"suspicious_{frame_index:06d}_{seq_label}.pgm", frame, capture_width, capture_height) - if decoded_seq is not None: - write_pgm( - artifact_dir / f"expected_{frame_index:06d}_{seq_label}.pgm", - synthetic_gray(capture_width, capture_height, int(decoded_seq)), - capture_width, - capture_height, - ) - suspicious_artifacts += 1 - should_reference = frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0) - if should_reference and reference_artifacts < args.max_reference_artifacts: - write_pgm(artifact_dir / f"reference_{frame_index:06d}.pgm", frame, capture_width, capture_height) - reference_artifacts += 1 - metrics.write(json.dumps(result, sort_keys=True) + "\n") - if frame_index % args.progress_every == 0: - print(f"frames={frame_index} suspicious={suspicious_count} latest={result}", file=sys.stderr) - finally: - proc.terminate() + write_pgm( + artifact_dir / f"expected_{frame_index:06d}_{seq_label}.pgm", + synthetic_gray(capture_width, capture_height, int(decoded_seq)), + capture_width, + capture_height, + ) + suspicious_artifacts += 1 + should_reference = frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0) + if should_reference and reference_artifacts < args.max_reference_artifacts: + write_pgm(artifact_dir / f"reference_{frame_index:06d}.pgm", frame, capture_width, capture_height) + reference_artifacts += 1 + metrics.write(json.dumps(result, sort_keys=True) + "\n") + if frame_index % args.progress_every == 0: + print(f"frames={frame_index} suspicious={suspicious_count} latest={result}", file=sys.stderr) + + with stderr_path.open("wb") as err, metrics_path.open("w") as metrics: + if args.stream_analyze: + (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") + proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) + assert proc.stdout is not None + capture_started = time.monotonic() try: - proc.wait(timeout=3) - except subprocess.TimeoutExpired: - proc.kill() - elapsed = max(0.001, time.monotonic() - started) + while time.monotonic() - capture_started < args.duration: + frame = proc.stdout.read(frame_size) + if len(frame) != frame_size: + break + analyze_captured_frame(frame, time.monotonic() - capture_started, metrics) + finally: + proc.terminate() + try: + ffmpeg_rc = proc.wait(timeout=3) + except subprocess.TimeoutExpired: + proc.kill() + ffmpeg_rc = proc.wait() + capture_elapsed = time.monotonic() - capture_started + analysis_elapsed = capture_elapsed + else: + raw_path = artifact_dir / "capture.raw" + capture_command = command[:] + if "-an" in capture_command: + capture_command[capture_command.index("-an") : capture_command.index("-an")] = ["-t", str(args.duration)] + else: + capture_command[-1:-1] = ["-t", str(args.duration)] + capture_command[-1] = str(raw_path) + (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in capture_command) + "\n") + print(f"capturing raw RCT frames before analysis: {raw_path}", file=sys.stderr) + capture_started = time.monotonic() + proc = subprocess.run(capture_command, stdout=subprocess.DEVNULL, stderr=err, check=False) + capture_elapsed = time.monotonic() - capture_started + ffmpeg_rc = proc.returncode + raw_capture_bytes = raw_path.stat().st_size if raw_path.exists() else 0 + print( + f"analyzing captured raw RCT frames bytes={raw_capture_bytes} capture_s={capture_elapsed:.3f}", + file=sys.stderr, + ) + analysis_started = time.monotonic() + try: + with raw_path.open("rb") as raw: + while True: + frame = raw.read(frame_size) + if len(frame) != frame_size: + break + analyze_captured_frame(frame, frame_index / max(1, fps), metrics) + finally: + raw_path.unlink(missing_ok=True) + analysis_elapsed = time.monotonic() - analysis_started + elapsed = max(0.001, capture_elapsed) summary = { "schema": "lesavka.synthetic-rct-capture.v1", "source": args.source, "device": device, "mode": args.mode, + "capture_mode": "stream" if args.stream_analyze else "rawfile", "width": capture_width, "height": capture_height, "fps_requested": fps, "duration_requested_s": args.duration, "duration_observed_s": round(elapsed, 3), + "analysis_duration_s": round(analysis_elapsed, 3), + "ffmpeg_rc": ffmpeg_rc, + "raw_capture_bytes": raw_capture_bytes, "frames": frame_index, "fps_observed": round(frame_index / elapsed, 3), "decoded_frames": decoded_frames, diff --git a/server/Cargo.toml b/server/Cargo.toml index 5e25276..7b851e7 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,7 +16,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.52" +version = "0.22.53" edition = "2024" autobins = false diff --git a/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs b/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs index 8c95e49..077408a 100644 --- a/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs +++ b/tests/manual/server/rct/synthetic_rct_uvc_probe_manual_contract.rs @@ -43,6 +43,7 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() { "--capture-finish-grace-s", "--jpeg-quality", "--inject-max-frame-bytes", + "--stream-analyze", "--source", "--mode", "1280x720@20,1280x720@30,1920x1080@20,1920x1080@30", @@ -56,6 +57,9 @@ fn synthetic_probe_keeps_bundled_network_ingress_and_rct_comparison_markers() { "expected_", "suspicious_", "decoded_pct", + "capture_mode", + "raw_capture_bytes", + "analysis_duration_s", "diagnosis", "encoded_oversize_frames", "sent_frames",