fix: recover downstream video on idr after drops

This commit is contained in:
Brad Stein 2026-05-17 13:18:45 -03:00
parent ffe32958b5
commit dca8b20528
12 changed files with 230 additions and 87 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.22.54"
version = "0.22.55"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.22.54"
version = "0.22.55"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.22.54"
version = "0.22.55"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.22.54"
version = "0.22.55"
edition = "2024"
[dependencies]

View File

@ -11,6 +11,7 @@ impl LesavkaClientApp {
let tx = tx.clone();
tokio::spawn(async move {
let mut dropped_packets = 0_u64;
let mut wait_for_idr = false;
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest {
@ -31,20 +32,30 @@ impl LesavkaClientApp {
"🎥📥 cli video{monitor_id}: got {}bytes",
pkt.data.len()
);
let is_idr = crate::video_support::contains_idr(&pkt.data);
if wait_for_idr && !is_idr {
dropped_packets = dropped_packets.saturating_add(1);
continue;
}
match tx.try_send(pkt) {
Ok(()) => {}
Ok(()) => {
if is_idr {
wait_for_idr = false;
}
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone");
break;
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
dropped_packets = dropped_packets.saturating_add(1);
wait_for_idr = true;
if dropped_packets <= 8
|| dropped_packets.is_multiple_of(300)
{
debug!(
dropped_packets,
"🎥🪂 cli video{monitor_id}: dropping stale packet because the renderer queue is full"
"🎥🪂 cli video{monitor_id}: dropping stale packet; waiting for IDR before resuming decoder"
);
}
}

View File

@ -308,8 +308,8 @@ impl MonitorWindow {
let decoder_fragment = h264_decoder_launch_fragment(&decoder_name);
let desc = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=2 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
"appsrc name=src is-live=true format=time do-timestamp=true block=true max-buffers=8 max-time=0 max-bytes=0 ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! {decoder_fragment} ! videoconvert ! {sink}"
);

View File

@ -77,12 +77,12 @@ impl UnifiedMonitorWindow {
let decoder_fragment1 = h264_decoder_launch_fragment_named(&decoder_name, "decoder1");
let desc = format!(
"compositor name=mix background=black ! videoconvert ! {sink} \
appsrc name=src0 is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=2 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
appsrc name=src0 is-live=true format=time do-timestamp=true block=true max-buffers=8 max-time=0 max-bytes=0 ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! {decoder_fragment0} ! videoconvert ! videoscale ! mix. \
appsrc name=src1 is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=2 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
appsrc name=src1 is-live=true format=time do-timestamp=true block=true max-buffers=8 max-time=0 max-bytes=0 ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! {decoder_fragment1} ! videoconvert ! videoscale ! mix."
);

View File

@ -184,6 +184,34 @@ pub fn h264_decoder_launch_fragment_named(decoder_name: &str, element_name: &str
}
}
/// Detect whether an Annex-B H.264 access unit carries an IDR frame.
///
/// Inputs: one H.264 access unit. Output: true when it includes NAL type 5.
/// Why: if the downstream renderer drops a predicted frame, resuming on the
/// next non-IDR packet can smear until a keyframe repairs decoder state.
#[must_use]
pub fn contains_idr(h264: &[u8]) -> bool {
let mut index = 0;
while index + 4 < h264.len() {
if h264[index] == 0 && h264[index + 1] == 0 {
let offset = if h264[index + 2] == 1 {
3
} else if h264[index + 2] == 0 && h264[index + 3] == 1 {
4
} else {
index += 1;
continue;
};
let nal_index = index + offset;
if nal_index < h264.len() && (h264[nal_index] & 0x1F) == 5 {
return true;
}
}
index += 1;
}
false
}
fn buildable_decoder(name: &str) -> bool {
#[cfg(coverage)]
if std::env::var("TEST_FAIL_GST_INIT").is_ok() {

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.22.54"
version = "0.22.55"
edition = "2024"
build = "build.rs"

View File

@ -54,6 +54,11 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--max-reference-artifacts", type=int, default=12)
parser.add_argument("--reference-every", type=int, default=900)
parser.add_argument("--progress-every", type=int, default=150)
parser.add_argument(
"--stream-analyze",
action="store_true",
help="debug path: analyze ffmpeg stdout directly instead of spooling raw frames first",
)
parser.add_argument("--self-test", action="store_true")
return parser.parse_args()
@ -133,6 +138,8 @@ def run_remote(args: argparse.Namespace) -> int:
"--progress-every",
str(args.progress_every),
]
if args.stream_analyze:
remote_cmd.append("--stream-analyze")
print(f"running remote RCT UVC probe on {args.host}: {remote_artifact_dir}", file=sys.stderr)
rc = subprocess.run(["ssh", args.host, " ".join(shlex.quote(part) for part in remote_cmd)]).returncode
local_artifact_dir.parent.mkdir(parents=True, exist_ok=True)
@ -461,11 +468,14 @@ def run_capture(args: argparse.Namespace) -> int:
artifact_dir.mkdir(parents=True, exist_ok=True)
device = detect_video_device(args.device_label) if args.device == "auto" else args.device
command = ffmpeg_cmd(device, args)
(artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n")
frame_size = args.width * args.height
stderr_path = artifact_dir / "ffmpeg.stderr"
jsonl_path = artifact_dir / "frame-metrics.jsonl"
started = time.monotonic()
capture_elapsed = 0.0
analysis_elapsed = 0.0
raw_capture_bytes = 0
ffmpeg_rc: int | None = None
previous: bytes | None = None
frame_index = 0
suspicious_count = 0
@ -478,78 +488,120 @@ def run_capture(args: argparse.Namespace) -> int:
max_upper_delta = 0.0
max_lower_delta = 0.0
max_lower_jump_seen = 0.0
with stderr_path.open("wb") as err, jsonl_path.open("w") as metrics:
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err)
assert proc.stdout is not None
try:
while time.monotonic() - started < args.duration:
frame = proc.stdout.read(frame_size)
if len(frame) != frame_size:
break
frame_index += 1
result = analyze_frame(frame, previous, args)
previous = frame
result.update({"frame": frame_index, "elapsed_s": round(time.monotonic() - started, 3)})
max_upper_delta = max(max_upper_delta, float(result["upper_delta"]))
max_lower_delta = max(max_lower_delta, float(result["lower_delta"]))
max_lower_jump_seen = max(max_lower_jump_seen, float(result["max_lower_jump"]))
if frame_index > 1:
if float(result["upper_delta"]) + float(result["lower_delta"]) >= args.change_threshold:
changed_frames += 1
else:
static_frames += 1
if result["suspicious"]:
suspicious_count += 1
reason_counts.update(result["reasons"])
worst.append(result)
worst = sorted(
worst,
key=lambda item: (item["max_lower_jump"], item["lower_delta"]),
reverse=True,
)[:20]
if artifacts_written < args.max_suspicious_artifacts:
write_pgm(
artifact_dir / f"suspicious_{frame_index:06d}.pgm",
frame,
args.width,
args.height,
)
artifacts_written += 1
should_write_reference = (
frame_index == 1
or (args.reference_every > 0 and frame_index % args.reference_every == 0)
def analyze_captured_frame(frame: bytes, elapsed_s: float, metrics: Any) -> None:
nonlocal previous, frame_index, suspicious_count, artifacts_written, reference_artifacts_written
nonlocal changed_frames, static_frames, max_upper_delta, max_lower_delta, max_lower_jump_seen, worst
frame_index += 1
result = analyze_frame(frame, previous, args)
previous = frame
result.update({"frame": frame_index, "elapsed_s": round(elapsed_s, 3)})
max_upper_delta = max(max_upper_delta, float(result["upper_delta"]))
max_lower_delta = max(max_lower_delta, float(result["lower_delta"]))
max_lower_jump_seen = max(max_lower_jump_seen, float(result["max_lower_jump"]))
if frame_index > 1:
if float(result["upper_delta"]) + float(result["lower_delta"]) >= args.change_threshold:
changed_frames += 1
else:
static_frames += 1
if result["suspicious"]:
suspicious_count += 1
reason_counts.update(result["reasons"])
worst.append(result)
worst = sorted(
worst,
key=lambda item: (item["max_lower_jump"], item["lower_delta"]),
reverse=True,
)[:20]
if artifacts_written < args.max_suspicious_artifacts:
write_pgm(
artifact_dir / f"suspicious_{frame_index:06d}.pgm",
frame,
args.width,
args.height,
)
if should_write_reference and reference_artifacts_written < args.max_reference_artifacts:
write_pgm(
artifact_dir / f"reference_{frame_index:06d}.pgm",
frame,
args.width,
args.height,
)
reference_artifacts_written += 1
if frame_index <= 5 or result["suspicious"] or frame_index % args.progress_every == 0:
metrics.write(json.dumps(result, sort_keys=True) + "\n")
if frame_index % args.progress_every == 0:
print(
f"frames={frame_index} suspicious={suspicious_count} latest={result}",
file=sys.stderr,
)
finally:
proc.terminate()
artifacts_written += 1
should_write_reference = frame_index == 1 or (args.reference_every > 0 and frame_index % args.reference_every == 0)
if should_write_reference and reference_artifacts_written < args.max_reference_artifacts:
write_pgm(
artifact_dir / f"reference_{frame_index:06d}.pgm",
frame,
args.width,
args.height,
)
reference_artifacts_written += 1
if frame_index <= 5 or result["suspicious"] or frame_index % args.progress_every == 0:
metrics.write(json.dumps(result, sort_keys=True) + "\n")
if frame_index % args.progress_every == 0:
print(
f"frames={frame_index} suspicious={suspicious_count} latest={result}",
file=sys.stderr,
)
with stderr_path.open("wb") as err, jsonl_path.open("w") as metrics:
if args.stream_analyze:
(artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in command) + "\n")
proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=err)
assert proc.stdout is not None
started = time.monotonic()
try:
proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
elapsed = max(0.001, time.monotonic() - started)
while time.monotonic() - started < args.duration:
frame = proc.stdout.read(frame_size)
if len(frame) != frame_size:
break
analyze_captured_frame(frame, time.monotonic() - started, metrics)
finally:
proc.terminate()
try:
ffmpeg_rc = proc.wait(timeout=3)
except subprocess.TimeoutExpired:
proc.kill()
ffmpeg_rc = proc.wait()
capture_elapsed = time.monotonic() - started
analysis_elapsed = capture_elapsed
else:
raw_path = artifact_dir / "capture.raw"
capture_command = command[:]
if "-an" in capture_command:
capture_command[capture_command.index("-an") : capture_command.index("-an")] = ["-t", str(args.duration)]
else:
capture_command[-1:-1] = ["-t", str(args.duration)]
capture_command[-1] = str(raw_path)
(artifact_dir / "command.txt").write_text(" ".join(shlex.quote(part) for part in capture_command) + "\n")
print(f"capturing raw RCT frames before analysis: {raw_path}", file=sys.stderr)
started = time.monotonic()
proc = subprocess.run(capture_command, stdout=subprocess.DEVNULL, stderr=err, check=False)
capture_elapsed = time.monotonic() - started
ffmpeg_rc = proc.returncode
raw_capture_bytes = raw_path.stat().st_size if raw_path.exists() else 0
print(
f"analyzing captured raw RCT frames bytes={raw_capture_bytes} capture_s={capture_elapsed:.3f}",
file=sys.stderr,
)
analysis_started = time.monotonic()
try:
with raw_path.open("rb") as raw:
while True:
frame = raw.read(frame_size)
if len(frame) != frame_size:
break
analyze_captured_frame(frame, frame_index / max(1, args.fps), metrics)
finally:
raw_path.unlink(missing_ok=True)
analysis_elapsed = time.monotonic() - analysis_started
elapsed = max(0.001, capture_elapsed)
summary = {
"schema": "lesavka.rct-uvc-artifact-probe.v1",
"source": args.source,
"device": device,
"capture_mode": "stream" if args.stream_analyze else "rawfile",
"width": args.width,
"height": args.height,
"fps_requested": args.fps,
"duration_requested_s": args.duration,
"duration_observed_s": round(elapsed, 3),
"analysis_duration_s": round(analysis_elapsed, 3),
"ffmpeg_rc": ffmpeg_rc,
"raw_capture_bytes": raw_capture_bytes,
"frames": frame_index,
"fps_observed": round(frame_index / elapsed, 3),
"suspicious_frames": suspicious_count,

View File

@ -16,7 +16,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.22.54"
version = "0.22.55"
edition = "2024"
autobins = false

View File

@ -8,9 +8,11 @@
const SERVER_EYE_CAPTURE: &str = include_str!("../../../../server/src/video/eye_capture.rs");
const SERVER_STREAM_CORE: &str = include_str!("../../../../server/src/video/stream_core.rs");
const CLIENT_DOWNLINK_MEDIA: &str = include_str!("../../../../client/src/app/downlink_media.rs");
const CLIENT_MONITOR: &str = include_str!("../../../../client/src/output/video/monitor_window.rs");
const CLIENT_UNIFIED_MONITOR: &str =
include_str!("../../../../client/src/output/video/unified_monitor.rs");
const CLIENT_VIDEO_SUPPORT: &str = include_str!("../../../../client/src/video_support.rs");
#[test]
fn source_stalls_are_reported_on_the_stream_and_in_logs() {
@ -66,15 +68,37 @@ fn dropped_frames_increment_telemetry_and_force_decoder_recovery_on_idr() {
}
#[test]
fn client_decoder_queues_drop_downstream_when_display_lags() {
fn client_renderer_backpressure_recovers_on_idr_instead_of_smearing_decoder_state() {
for marker in [
"crate::video_support::contains_idr(&pkt.data)",
"if wait_for_idr && !is_idr",
"wait_for_idr = true",
"wait_for_idr = false",
"waiting for IDR before resuming decoder",
] {
assert!(
CLIENT_DOWNLINK_MEDIA.contains(marker),
"client downlink should preserve IDR recovery marker {marker}"
);
}
for marker in ["pub fn contains_idr", "NAL type 5"] {
assert!(
CLIENT_VIDEO_SUPPORT.contains(marker),
"client H.264 helpers should preserve marker {marker}"
);
}
for source in [CLIENT_MONITOR, CLIENT_UNIFIED_MONITOR] {
assert!(
source.matches("leaky=downstream").count() >= 1,
"client display path should drop stale frames under display lag"
!source.contains("leaky=downstream"),
"client display path should not drop arbitrary H.264 access units before decode"
);
assert!(
source.contains("block=false"),
"client display appsrc should not block network receive loops"
source.contains("block=true max-buffers=8 max-time=0 max-bytes=0"),
"client display appsrc should apply bounded backpressure"
);
assert!(
source.contains("queue max-size-buffers=8 max-size-time=0 max-size-bytes=0"),
"client display queue should be bounded without pre-decoder leakage"
);
}
}

View File

@ -137,6 +137,18 @@ fn vulkan_decoder_fragment_downloads_gpu_memory_before_cpu_sinks() {
assert!(named.contains("vulkandownload"));
}
#[test]
fn contains_idr_handles_short_and_multi_nal_annex_b_streams() {
assert!(video_support::contains_idr(&[0, 0, 0, 1, 0x65, 0x88]));
assert!(video_support::contains_idr(&[
0, 0, 0, 1, 0x41, 0x99, 0, 0, 1, 0x65, 0x88
]));
assert!(!video_support::contains_idr(&[
0, 0, 0, 1, 0x41, 0x99, 0, 0, 1, 0x61, 0x88
]));
assert!(!video_support::contains_idr(&[0, 0, 2, 0x65]));
}
#[test]
#[serial]
fn decoder_auto_order_can_prefer_software_for_driver_comparisons() {

View File

@ -3,11 +3,12 @@
// Scope: verify that both server capture and client display paths are biased
// toward bounded freshness rather than unbounded smoothness debt.
// Targets: `server/src/video/eye_capture.rs` and
// `client/src/output/video/monitor_window.rs`.
// `client/src/app/downlink_media.rs` plus the client monitor renderers.
// Why: downstream desktop usability depends on seeing input effects quickly;
// stale-but-perfect frames are worse than bounded drops.
const SERVER_EYE_CAPTURE: &str = include_str!("../../../../server/src/video/eye_capture.rs");
const CLIENT_DOWNLINK_MEDIA: &str = include_str!("../../../../client/src/app/downlink_media.rs");
const CLIENT_MONITOR: &str = include_str!("../../../../client/src/output/video/monitor_window.rs");
const CLIENT_UNIFIED_MONITOR: &str =
include_str!("../../../../client/src/output/video/unified_monitor.rs");
@ -17,7 +18,7 @@ fn percentile_95(mut values: Vec<u64>) -> u64 {
values[((values.len() - 1) as f64 * 0.95).ceil() as usize]
}
fn simulate_leaky_display_queue(
fn simulate_bounded_display_queue(
source_interval_ms: u64,
display_interval_ms: u64,
frames: u64,
@ -50,7 +51,7 @@ fn simulate_leaky_display_queue(
#[test]
fn downstream_display_queue_model_keeps_p95_age_under_interactive_budget() {
let (p95_age_ms, dropped) = simulate_leaky_display_queue(16, 33, 600, 2);
let (p95_age_ms, dropped) = simulate_bounded_display_queue(16, 33, 600, 2);
assert!(
p95_age_ms < 150,
@ -81,14 +82,25 @@ fn server_capture_path_uses_bounded_leaky_queues_and_try_send() {
}
#[test]
fn client_display_path_is_live_nonblocking_and_leaky() {
fn client_display_path_is_bounded_and_uses_idr_recovery_after_drops() {
for marker in [
"TrySendError::Full",
"wait_for_idr = true",
"if wait_for_idr && !is_idr",
"crate::video_support::contains_idr(&pkt.data)",
] {
assert!(
CLIENT_DOWNLINK_MEDIA.contains(marker),
"client downlink should preserve latency/recovery marker {marker}"
);
}
for source in [CLIENT_MONITOR, CLIENT_UNIFIED_MONITOR] {
for marker in [
"appsrc name=src",
"is-live=true",
"do-timestamp=true",
"block=false",
"queue max-size-buffers=2 max-size-time=0 max-size-bytes=0 leaky=downstream",
"block=true max-buffers=8 max-time=0 max-bytes=0",
"queue max-size-buffers=8 max-size-time=0 max-size-bytes=0",
"sync=false",
] {
assert!(
@ -96,5 +108,9 @@ fn client_display_path_is_live_nonblocking_and_leaky() {
"client downstream display should preserve marker {marker}"
);
}
assert!(
!source.contains("leaky=downstream"),
"client H.264 decoder input should not leak arbitrary access units"
);
}
}