diff --git a/Cargo.lock b/Cargo.lock index 0e2dc20..2996cf8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.54" +version = "0.22.55" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.54" +version = "0.22.55" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.54" +version = "0.22.55" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 98ea340..cf96872 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.54" +version = "0.22.55" edition = "2024" [dependencies] diff --git a/client/src/app/downlink_media.rs b/client/src/app/downlink_media.rs index 4696f75..47dcd3a 100644 --- a/client/src/app/downlink_media.rs +++ b/client/src/app/downlink_media.rs @@ -11,6 +11,7 @@ impl LesavkaClientApp { let tx = tx.clone(); tokio::spawn(async move { let mut dropped_packets = 0_u64; + let mut wait_for_idr = false; loop { let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { @@ -31,20 +32,30 @@ impl LesavkaClientApp { "πŸŽ₯πŸ“₯ cli video{monitor_id}: got {}β€―bytes", pkt.data.len() ); + let is_idr = crate::video_support::contains_idr(&pkt.data); + if wait_for_idr && !is_idr { + dropped_packets = dropped_packets.saturating_add(1); + continue; + } match tx.try_send(pkt) { - Ok(()) => {} + Ok(()) => { + if is_idr { + wait_for_idr = false; + } + } Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { warn!("⚠️πŸŽ₯ cli video{monitor_id}: GUI thread gone"); break; } Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { dropped_packets = dropped_packets.saturating_add(1); + wait_for_idr = true; if dropped_packets <= 8 || dropped_packets.is_multiple_of(300) { debug!( dropped_packets, - "πŸŽ₯πŸͺ‚ cli video{monitor_id}: dropping stale packet because the renderer queue is full" + "πŸŽ₯πŸͺ‚ cli video{monitor_id}: dropping stale packet; waiting for IDR before resuming decoder" ); } } diff --git a/client/src/output/video/monitor_window.rs b/client/src/output/video/monitor_window.rs index 4dbe744..9ab8047 100644 --- a/client/src/output/video/monitor_window.rs +++ b/client/src/output/video/monitor_window.rs @@ -308,8 +308,8 @@ impl MonitorWindow { let decoder_fragment = h264_decoder_launch_fragment(&decoder_name); let desc = format!( - "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ - queue max-size-buffers=2 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ + "appsrc name=src is-live=true format=time do-timestamp=true block=true max-buffers=8 max-time=0 max-bytes=0 ! \ + queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 ! \ capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ h264parse disable-passthrough=true ! {decoder_fragment} ! videoconvert ! {sink}" ); diff --git a/client/src/output/video/unified_monitor.rs b/client/src/output/video/unified_monitor.rs index 40330f0..ee391a6 100644 --- a/client/src/output/video/unified_monitor.rs +++ b/client/src/output/video/unified_monitor.rs @@ -77,12 +77,12 @@ impl UnifiedMonitorWindow { let decoder_fragment1 = h264_decoder_launch_fragment_named(&decoder_name, "decoder1"); let desc = format!( "compositor name=mix background=black ! videoconvert ! {sink} \ - appsrc name=src0 is-live=true format=time do-timestamp=true block=false ! \ - queue max-size-buffers=2 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ + appsrc name=src0 is-live=true format=time do-timestamp=true block=true max-buffers=8 max-time=0 max-bytes=0 ! \ + queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 ! \ capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ h264parse disable-passthrough=true ! {decoder_fragment0} ! videoconvert ! videoscale ! mix. \ - appsrc name=src1 is-live=true format=time do-timestamp=true block=false ! \ - queue max-size-buffers=2 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ + appsrc name=src1 is-live=true format=time do-timestamp=true block=true max-buffers=8 max-time=0 max-bytes=0 ! \ + queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 ! \ capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ h264parse disable-passthrough=true ! {decoder_fragment1} ! videoconvert ! videoscale ! mix." ); diff --git a/client/src/video_support.rs b/client/src/video_support.rs index 0b6273a..7492810 100644 --- a/client/src/video_support.rs +++ b/client/src/video_support.rs @@ -184,6 +184,34 @@ pub fn h264_decoder_launch_fragment_named(decoder_name: &str, element_name: &str } } +/// Detect whether an Annex-B H.264 access unit carries an IDR frame. +/// +/// Inputs: one H.264 access unit. Output: true when it includes NAL type 5. +/// Why: if the downstream renderer drops a predicted frame, resuming on the +/// next non-IDR packet can smear until a keyframe repairs decoder state. +#[must_use] +pub fn contains_idr(h264: &[u8]) -> bool { + let mut index = 0; + while index + 4 < h264.len() { + if h264[index] == 0 && h264[index + 1] == 0 { + let offset = if h264[index + 2] == 1 { + 3 + } else if h264[index + 2] == 0 && h264[index + 3] == 1 { + 4 + } else { + index += 1; + continue; + }; + let nal_index = index + offset; + if nal_index < h264.len() && (h264[nal_index] & 0x1F) == 5 { + return true; + } + } + index += 1; + } + false +} + fn buildable_decoder(name: &str) -> bool { #[cfg(coverage)] if std::env::var("TEST_FAIL_GST_INIT").is_ok() { diff --git a/common/Cargo.toml b/common/Cargo.toml index d2f5ed9..8f89fbb 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.54" +version = "0.22.55" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_rct_uvc_artifact_probe.py b/scripts/manual/run_rct_uvc_artifact_probe.py index ea6d01d..30d6e2d 100755 --- a/scripts/manual/run_rct_uvc_artifact_probe.py +++ b/scripts/manual/run_rct_uvc_artifact_probe.py @@ -54,6 +54,11 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--max-reference-artifacts", type=int, default=12) parser.add_argument("--reference-every", type=int, default=900) parser.add_argument("--progress-every", type=int, default=150) + parser.add_argument( + "--stream-analyze", + action="store_true", + help="debug path: analyze ffmpeg stdout directly instead of spooling raw frames first", + ) parser.add_argument("--self-test", action="store_true") return parser.parse_args() @@ -133,6 +138,8 @@ def run_remote(args: argparse.Namespace) -> int: "--progress-every", str(args.progress_every), ] + if args.stream_analyze: + remote_cmd.append("--stream-analyze") print(f"running remote RCT UVC probe on {args.host}: {remote_artifact_dir}", file=sys.stderr) rc = subprocess.run(["ssh", args.host, " ".join(shlex.quote(part) for part in remote_cmd)]).returncode local_artifact_dir.parent.mkdir(parents=True, exist_ok=True) @@ -461,11 +468,14 @@ def run_capture(args: argparse.Namespace) -> int: artifact_dir.mkdir(parents=True, exist_ok=True) device = detect_video_device(args.device_label) if args.device == "auto" else args.device command = ffmpeg_cmd(device, args) - (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") frame_size = args.width * args.height stderr_path = artifact_dir / "ffmpeg.stderr" jsonl_path = artifact_dir / "frame-metrics.jsonl" started = time.monotonic() + capture_elapsed = 0.0 + analysis_elapsed = 0.0 + raw_capture_bytes = 0 + ffmpeg_rc: int | None = None previous: bytes | None = None frame_index = 0 suspicious_count = 0 @@ -478,78 +488,120 @@ def run_capture(args: argparse.Namespace) -> int: max_upper_delta = 0.0 max_lower_delta = 0.0 max_lower_jump_seen = 0.0 - with stderr_path.open("wb") as err, jsonl_path.open("w") as metrics: - proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) - assert proc.stdout is not None - try: - while time.monotonic() - started < args.duration: - frame = proc.stdout.read(frame_size) - if len(frame) != frame_size: - break - frame_index += 1 - result = analyze_frame(frame, previous, args) - previous = frame - result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)}) - max_upper_delta = max(max_upper_delta, float(result["upper_delta"])) - max_lower_delta = max(max_lower_delta, float(result["lower_delta"])) - max_lower_jump_seen = max(max_lower_jump_seen, float(result["max_lower_jump"])) - if frame_index > 1: - if float(result["upper_delta"]) + float(result["lower_delta"]) >= args.change_threshold: - changed_frames += 1 - else: - static_frames += 1 - if result["suspicious"]: - suspicious_count += 1 - reason_counts.update(result["reasons"]) - worst.append(result) - worst = sorted( - worst, - key=lambda item: (item["max_lower_jump"], item["lower_delta"]), - reverse=True, - )[:20] - if artifacts_written < args.max_suspicious_artifacts: - write_pgm( - artifact_dir / f"suspicious_{frame_index:06d}.pgm", - frame, - args.width, - args.height, - ) - artifacts_written += 1 - should_write_reference = ( - frame_index == 1 - or (args.reference_every > 0 and frame_index % args.reference_every == 0) + def analyze_captured_frame(frame: bytes, elapsed_s: float, metrics: Any) -> None: + nonlocal previous, frame_index, suspicious_count, artifacts_written, reference_artifacts_written + nonlocal changed_frames, static_frames, max_upper_delta, max_lower_delta, max_lower_jump_seen, worst + frame_index += 1 + result = analyze_frame(frame, previous, args) + previous = frame + result.update({"frame": frame_index, "elapsed_s": round(elapsed_s, 3)}) + max_upper_delta = max(max_upper_delta, float(result["upper_delta"])) + max_lower_delta = max(max_lower_delta, float(result["lower_delta"])) + max_lower_jump_seen = max(max_lower_jump_seen, float(result["max_lower_jump"])) + if frame_index > 1: + if float(result["upper_delta"]) + float(result["lower_delta"]) >= args.change_threshold: + changed_frames += 1 + else: + static_frames += 1 + if result["suspicious"]: + suspicious_count += 1 + reason_counts.update(result["reasons"]) + worst.append(result) + worst = sorted( + worst, + key=lambda item: (item["max_lower_jump"], item["lower_delta"]), + reverse=True, + )[:20] + if artifacts_written < args.max_suspicious_artifacts: + write_pgm( + artifact_dir / f"suspicious_{frame_index:06d}.pgm", + frame, + args.width, + args.height, ) - if should_write_reference and reference_artifacts_written < args.max_reference_artifacts: - write_pgm( - artifact_dir / f"reference_{frame_index:06d}.pgm", - frame, - args.width, - args.height, - ) - reference_artifacts_written += 1 - if frame_index <= 5 or result["suspicious"] or frame_index % args.progress_every == 0: - metrics.write(json.dumps(result, sort_keys=True) + "\n") - if frame_index % args.progress_every == 0: - print( - f"frames={frame_index} suspicious={suspicious_count} latest={result}", - file=sys.stderr, - ) - finally: - proc.terminate() + artifacts_written += 1 + should_write_reference = frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0) + if should_write_reference and reference_artifacts_written < args.max_reference_artifacts: + write_pgm( + artifact_dir / f"reference_{frame_index:06d}.pgm", + frame, + args.width, + args.height, + ) + reference_artifacts_written += 1 + if frame_index <= 5 or result["suspicious"] or frame_index % args.progress_every == 0: + metrics.write(json.dumps(result, sort_keys=True) + "\n") + if frame_index % args.progress_every == 0: + print( + f"frames={frame_index} suspicious={suspicious_count} latest={result}", + file=sys.stderr, + ) + + with stderr_path.open("wb") as err, jsonl_path.open("w") as metrics: + if args.stream_analyze: + (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n") + proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err) + assert proc.stdout is not None + started = time.monotonic() try: - proc.wait(timeout=3) - except subprocess.TimeoutExpired: - proc.kill() - elapsed = max(0.001, time.monotonic() - started) + while time.monotonic() - started < args.duration: + frame = proc.stdout.read(frame_size) + if len(frame) != frame_size: + break + analyze_captured_frame(frame, time.monotonic() - started, metrics) + finally: + proc.terminate() + try: + ffmpeg_rc = proc.wait(timeout=3) + except subprocess.TimeoutExpired: + proc.kill() + ffmpeg_rc = proc.wait() + capture_elapsed = time.monotonic() - started + analysis_elapsed = capture_elapsed + else: + raw_path = artifact_dir / "capture.raw" + capture_command = command[:] + if "-an" in capture_command: + capture_command[capture_command.index("-an") : capture_command.index("-an")] = ["-t", str(args.duration)] + else: + capture_command[-1:-1] = ["-t", str(args.duration)] + capture_command[-1] = str(raw_path) + (artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in capture_command) + "\n") + print(f"capturing raw RCT frames before analysis: {raw_path}", file=sys.stderr) + started = time.monotonic() + proc = subprocess.run(capture_command, stdout=subprocess.DEVNULL, stderr=err, check=False) + capture_elapsed = time.monotonic() - started + ffmpeg_rc = proc.returncode + raw_capture_bytes = raw_path.stat().st_size if raw_path.exists() else 0 + print( + f"analyzing captured raw RCT frames bytes={raw_capture_bytes} capture_s={capture_elapsed:.3f}", + file=sys.stderr, + ) + analysis_started = time.monotonic() + try: + with raw_path.open("rb") as raw: + while True: + frame = raw.read(frame_size) + if len(frame) != frame_size: + break + analyze_captured_frame(frame, frame_index / max(1, args.fps), metrics) + finally: + raw_path.unlink(missing_ok=True) + analysis_elapsed = time.monotonic() - analysis_started + elapsed = max(0.001, capture_elapsed) summary = { "schema": "lesavka.rct-uvc-artifact-probe.v1", "source": args.source, "device": device, + "capture_mode": "stream" if args.stream_analyze else "rawfile", "width": args.width, "height": args.height, "fps_requested": args.fps, "duration_requested_s": args.duration, "duration_observed_s": round(elapsed, 3), + "analysis_duration_s": round(analysis_elapsed, 3), + "ffmpeg_rc": ffmpeg_rc, + "raw_capture_bytes": raw_capture_bytes, "frames": frame_index, "fps_observed": round(frame_index / elapsed, 3), "suspicious_frames": suspicious_count, diff --git a/server/Cargo.toml b/server/Cargo.toml index d436897..692d8ea 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,7 +16,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.54" +version = "0.22.55" edition = "2024" autobins = false diff --git a/tests/chaos/downstream/video/downstream_video_stall_chaos_contract.rs b/tests/chaos/downstream/video/downstream_video_stall_chaos_contract.rs index c0085c2..5f7e5cf 100644 --- a/tests/chaos/downstream/video/downstream_video_stall_chaos_contract.rs +++ b/tests/chaos/downstream/video/downstream_video_stall_chaos_contract.rs @@ -8,9 +8,11 @@ const SERVER_EYE_CAPTURE: &str = include_str!("../../../../server/src/video/eye_capture.rs"); const SERVER_STREAM_CORE: &str = include_str!("../../../../server/src/video/stream_core.rs"); +const CLIENT_DOWNLINK_MEDIA: &str = include_str!("../../../../client/src/app/downlink_media.rs"); const CLIENT_MONITOR: &str = include_str!("../../../../client/src/output/video/monitor_window.rs"); const CLIENT_UNIFIED_MONITOR: &str = include_str!("../../../../client/src/output/video/unified_monitor.rs"); +const CLIENT_VIDEO_SUPPORT: &str = include_str!("../../../../client/src/video_support.rs"); #[test] fn source_stalls_are_reported_on_the_stream_and_in_logs() { @@ -66,15 +68,37 @@ fn dropped_frames_increment_telemetry_and_force_decoder_recovery_on_idr() { } #[test] -fn client_decoder_queues_drop_downstream_when_display_lags() { +fn client_renderer_backpressure_recovers_on_idr_instead_of_smearing_decoder_state() { + for marker in [ + "crate::video_support::contains_idr(&pkt.data)", + "if wait_for_idr && !is_idr", + "wait_for_idr = true", + "wait_for_idr = false", + "waiting for IDR before resuming decoder", + ] { + assert!( + CLIENT_DOWNLINK_MEDIA.contains(marker), + "client downlink should preserve IDR recovery marker {marker}" + ); + } + for marker in ["pub fn contains_idr", "NAL type 5"] { + assert!( + CLIENT_VIDEO_SUPPORT.contains(marker), + "client H.264 helpers should preserve marker {marker}" + ); + } for source in [CLIENT_MONITOR, CLIENT_UNIFIED_MONITOR] { assert!( - source.matches("leaky=downstream").count() >= 1, - "client display path should drop stale frames under display lag" + !source.contains("leaky=downstream"), + "client display path should not drop arbitrary H.264 access units before decode" ); assert!( - source.contains("block=false"), - "client display appsrc should not block network receive loops" + source.contains("block=true max-buffers=8 max-time=0 max-bytes=0"), + "client display appsrc should apply bounded backpressure" + ); + assert!( + source.contains("queue max-size-buffers=8 max-size-time=0 max-size-bytes=0"), + "client display queue should be bounded without pre-decoder leakage" ); } } diff --git a/tests/compatibility/client/video_support/client_video_support_include_contract.rs b/tests/compatibility/client/video_support/client_video_support_include_contract.rs index 6548282..e867c22 100644 --- a/tests/compatibility/client/video_support/client_video_support_include_contract.rs +++ b/tests/compatibility/client/video_support/client_video_support_include_contract.rs @@ -137,6 +137,18 @@ fn vulkan_decoder_fragment_downloads_gpu_memory_before_cpu_sinks() { assert!(named.contains("vulkandownload")); } +#[test] +fn contains_idr_handles_short_and_multi_nal_annex_b_streams() { + assert!(video_support::contains_idr(&[0, 0, 0, 1, 0x65, 0x88])); + assert!(video_support::contains_idr(&[ + 0, 0, 0, 1, 0x41, 0x99, 0, 0, 1, 0x65, 0x88 + ])); + assert!(!video_support::contains_idr(&[ + 0, 0, 0, 1, 0x41, 0x99, 0, 0, 1, 0x61, 0x88 + ])); + assert!(!video_support::contains_idr(&[0, 0, 2, 0x65])); +} + #[test] #[serial] fn decoder_auto_order_can_prefer_software_for_driver_comparisons() { diff --git a/tests/performance/downstream/video/downstream_video_latency_budget_contract.rs b/tests/performance/downstream/video/downstream_video_latency_budget_contract.rs index 34909f7..73eebfa 100644 --- a/tests/performance/downstream/video/downstream_video_latency_budget_contract.rs +++ b/tests/performance/downstream/video/downstream_video_latency_budget_contract.rs @@ -3,11 +3,12 @@ // Scope: verify that both server capture and client display paths are biased // toward bounded freshness rather than unbounded smoothness debt. // Targets: `server/src/video/eye_capture.rs` and -// `client/src/output/video/monitor_window.rs`. +// `client/src/app/downlink_media.rs` plus the client monitor renderers. // Why: downstream desktop usability depends on seeing input effects quickly; // stale-but-perfect frames are worse than bounded drops. const SERVER_EYE_CAPTURE: &str = include_str!("../../../../server/src/video/eye_capture.rs"); +const CLIENT_DOWNLINK_MEDIA: &str = include_str!("../../../../client/src/app/downlink_media.rs"); const CLIENT_MONITOR: &str = include_str!("../../../../client/src/output/video/monitor_window.rs"); const CLIENT_UNIFIED_MONITOR: &str = include_str!("../../../../client/src/output/video/unified_monitor.rs"); @@ -17,7 +18,7 @@ fn percentile_95(mut values: Vec) -> u64 { values[((values.len() - 1) as f64 * 0.95).ceil() as usize] } -fn simulate_leaky_display_queue( +fn simulate_bounded_display_queue( source_interval_ms: u64, display_interval_ms: u64, frames: u64, @@ -50,7 +51,7 @@ fn simulate_leaky_display_queue( #[test] fn downstream_display_queue_model_keeps_p95_age_under_interactive_budget() { - let (p95_age_ms, dropped) = simulate_leaky_display_queue(16, 33, 600, 2); + let (p95_age_ms, dropped) = simulate_bounded_display_queue(16, 33, 600, 2); assert!( p95_age_ms < 150, @@ -81,14 +82,25 @@ fn server_capture_path_uses_bounded_leaky_queues_and_try_send() { } #[test] -fn client_display_path_is_live_nonblocking_and_leaky() { +fn client_display_path_is_bounded_and_uses_idr_recovery_after_drops() { + for marker in [ + "TrySendError::Full", + "wait_for_idr = true", + "if wait_for_idr && !is_idr", + "crate::video_support::contains_idr(&pkt.data)", + ] { + assert!( + CLIENT_DOWNLINK_MEDIA.contains(marker), + "client downlink should preserve latency/recovery marker {marker}" + ); + } for source in [CLIENT_MONITOR, CLIENT_UNIFIED_MONITOR] { for marker in [ "appsrc name=src", "is-live=true", "do-timestamp=true", - "block=false", - "queue max-size-buffers=2 max-size-time=0 max-size-bytes=0 leaky=downstream", + "block=true max-buffers=8 max-time=0 max-bytes=0", + "queue max-size-buffers=8 max-size-time=0 max-size-bytes=0", "sync=false", ] { assert!( @@ -96,5 +108,9 @@ fn client_display_path_is_live_nonblocking_and_leaky() { "client downstream display should preserve marker {marker}" ); } + assert!( + !source.contains("leaky=downstream"), + "client H.264 decoder input should not leak arbitrary access units" + ); } }