diff --git a/Cargo.lock b/Cargo.lock index 9332684..4c1e394 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.21.11" +version = "0.21.12" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.21.11" +version = "0.21.12" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.21.11" +version = "0.21.12" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index dd6fd6f..20464c3 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.21.11" +version = "0.21.12" edition = "2024" [dependencies] diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index a7c3103..bf81533 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -6,6 +6,8 @@ include!("uplink_media/camera_loop.rs"); include!("uplink_media/media_source_requirements.rs"); +include!("uplink_media/video_keyframes.rs"); + include!("uplink_media/bundled_media_queue.rs"); include!("uplink_media/uplink_queue_metadata.rs"); diff --git a/client/src/app/uplink_media/tests/mod.rs b/client/src/app/uplink_media/tests/mod.rs index 11447b9..6c08262 100644 --- a/client/src/app/uplink_media/tests/mod.rs +++ b/client/src/app/uplink_media/tests/mod.rs @@ -148,6 +148,43 @@ use super::*; assert!(!disables_upstream_epoch_auto_heal("yes")); } + #[test] + /// Keeps HEVC recovery explicit because freshness drops can otherwise resume from predictive frames. + fn hevc_keyframe_detection_recognizes_irap_access_units() { + assert!(contains_hevc_irap(&[0, 0, 0, 1, 0x26, 0x01, 0xaa])); + assert!(contains_hevc_irap(&[0, 0, 1, 0x28, 0x01, 0xaa])); + assert!(!contains_hevc_irap(&[0, 0, 0, 1, 0x02, 0x01, 0xaa])); + } + + #[test] + /// Keeps queue-drop recovery explicit because stale HEVC bundles should not corrupt the decoder. + fn hevc_recovery_holds_delta_bundle_until_next_keyframe() { + let delta_bundle = UpstreamMediaBundle { + video: Some(VideoPacket { + data: vec![0, 0, 0, 1, 0x02, 0x01, 0xaa], + ..Default::default() + }), + ..Default::default() + }; + let keyframe_bundle = UpstreamMediaBundle { + video: Some(VideoPacket { + data: vec![0, 0, 0, 1, 0x26, 0x01, 0xaa], + ..Default::default() + }), + ..Default::default() + }; + + assert!(should_hold_hevc_bundle_for_keyframe_recovery( + true, + &delta_bundle + )); + assert!(!should_hold_hevc_bundle_for_keyframe_recovery( + true, + &keyframe_bundle + )); + assert!(bundle_has_hevc_recovery_keyframe(&keyframe_bundle)); + } + /// Verifies the live uplink queue emits one physically bundled HEVC frame and PCM span. /// /// Inputs: pre-stamped HEVC video plus two nearby audio packets, exactly as diff --git a/client/src/app/uplink_media/video_keyframes.rs b/client/src/app/uplink_media/video_keyframes.rs new file mode 100644 index 0000000..a566f9a --- /dev/null +++ b/client/src/app/uplink_media/video_keyframes.rs @@ -0,0 +1,90 @@ +#[cfg(not(coverage))] +/// Detect whether an Annex-B HEVC access unit contains an intra recovery point. +/// +/// Inputs: one encoded HEVC packet in byte-stream form. Output: `true` when the +/// packet carries an IRAP NAL unit. Why: the freshness-first uplink may drop +/// predictive frames, and the server decoder should only resume from a frame +/// that can rebuild a clean picture. +fn contains_hevc_irap(data: &[u8]) -> bool { + let mut offset = 0; + while let Some((start, prefix_len)) = find_annex_b_start_code(data, offset) { + let nal_index = start + prefix_len; + if nal_index + 1 < data.len() { + let nal_type = (data[nal_index] >> 1) & 0x3f; + if (16..=23).contains(&nal_type) { + return true; + } + } + offset = nal_index.saturating_add(2); + } + false +} + +#[cfg(not(coverage))] +/// Locate the next Annex-B start code in an encoded video packet. +/// +/// Inputs: encoded bytes plus the search offset. Output: the start index and +/// prefix length. Why: both three-byte and four-byte start codes appear in +/// HEVC streams, and keyframe recovery must handle both without allocating. +fn find_annex_b_start_code(data: &[u8], from: usize) -> Option<(usize, usize)> { + let mut index = from; + while index + 3 < data.len() { + if data[index] == 0 && data[index + 1] == 0 { + if data[index + 2] == 1 { + return Some((index, 3)); + } + if index + 4 < data.len() && data[index + 2] == 0 && data[index + 3] == 1 { + return Some((index, 4)); + } + } + index += 1; + } + None +} + +#[cfg(not(coverage))] +/// Decide whether a bundled HEVC packet is safe after a freshness drop. +/// +/// Inputs: the recovery state and the next outbound bundle. Output: `true` +/// when the bundle should be held back. Why: sending a non-keyframe immediately +/// after dropping older HEVC packets can make the server decode from missing +/// references, which shows up as tearing or grey corrupted frames. +fn should_hold_hevc_bundle_for_keyframe_recovery( + waiting_for_keyframe: bool, + bundle: &UpstreamMediaBundle, +) -> bool { + waiting_for_keyframe + && bundle + .video + .as_ref() + .is_some_and(|video| !contains_hevc_irap(&video.data)) +} + +#[cfg(not(coverage))] +/// Report whether a bundle carries the HEVC recovery frame we were waiting for. +/// +/// Inputs: an outbound media bundle. Output: true when its video packet can +/// restart HEVC prediction. Why: the freshness policy should clear recovery +/// mode as soon as the next clean picture reaches the server. +fn bundle_has_hevc_recovery_keyframe(bundle: &UpstreamMediaBundle) -> bool { + bundle + .video + .as_ref() + .is_some_and(|video| contains_hevc_irap(&video.data)) +} + +#[cfg(not(coverage))] +/// Resolve whether the active upstream camera codec needs HEVC recovery. +/// +/// Inputs: the negotiated camera config plus optional env fallback. Output: +/// `true` only for HEVC/H.265. Why: MJPEG frames are independent, so the extra +/// keyframe wait should only apply to predictive video transport. +fn upstream_camera_uses_hevc(camera_cfg: Option) -> bool { + if camera_cfg.is_some_and(|cfg| matches!(cfg.codec, crate::input::camera::CameraCodec::Hevc)) { + return true; + } + std::env::var("LESAVKA_CAM_CODEC") + .ok() + .map(|value| value.trim().to_ascii_lowercase()) + .is_some_and(|value| matches!(value.as_str(), "hevc" | "h265" | "h.265")) +} diff --git a/client/src/app/uplink_media/webcam_media_loop.rs b/client/src/app/uplink_media/webcam_media_loop.rs index d09fd10..252bea5 100644 --- a/client/src/app/uplink_media/webcam_media_loop.rs +++ b/client/src/app/uplink_media/webcam_media_loop.rs @@ -16,6 +16,7 @@ impl LesavkaClientApp { ) { let mut delay = Duration::from_secs(1); let mut startup_epoch_heal_delay = upstream_epoch_auto_heal_delay(); + let recover_hevc_after_drops = upstream_camera_uses_hevc(camera_cfg); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { @@ -123,6 +124,7 @@ impl LesavkaClientApp { let microphone_telemetry_stream = microphone_telemetry.clone(); let drop_log_stream = Arc::clone(&drop_log); let outbound = async_stream::stream! { + let mut waiting_for_hevc_keyframe = false; loop { let next = queue_stream.pop_fresh().await; if next.dropped_stale > 0 { @@ -135,8 +137,31 @@ impl LesavkaClientApp { next.queue_depth, duration_ms(next.delivery_age), ); + if recover_hevc_after_drops { + waiting_for_hevc_keyframe = true; + } } if let Some(mut bundle) = next.packet { + if recover_hevc_after_drops + && should_hold_hevc_bundle_for_keyframe_recovery( + waiting_for_hevc_keyframe, + &bundle, + ) + { + camera_telemetry_stream.record_stale_drop(1); + microphone_telemetry_stream.record_stale_drop(1); + log_uplink_drop( + &drop_log_stream, + UplinkDropReason::Stale, + 1, + next.queue_depth, + duration_ms(next.delivery_age), + ); + continue; + } + if recover_hevc_after_drops && bundle_has_hevc_recovery_keyframe(&bundle) { + waiting_for_hevc_keyframe = false; + } let queue_depth = queue_depth_u32(next.queue_depth); let delivery_age_ms = duration_ms(next.delivery_age); if bundle.video.is_some() { @@ -175,6 +200,7 @@ impl LesavkaClientApp { let active_camera_source = active_camera_source.clone(); let active_camera_profile = active_camera_profile.clone(); std::thread::spawn(move || { + let mut waiting_for_hevc_keyframe = false; while !stop.load(Ordering::Relaxed) { let state = media_controls.refresh(); let desired_source = @@ -190,10 +216,27 @@ impl LesavkaClientApp { break; } if let Some(mut pkt) = camera.pull() { + if recover_hevc_after_drops + && waiting_for_hevc_keyframe + && !contains_hevc_irap(&pkt.data) + { + continue; + } let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt); + let is_recovery_keyframe = recover_hevc_after_drops + && contains_hevc_irap(&pkt.data); match event_tx.try_send(BundledCaptureEvent::Video(pkt)) { - Ok(()) => {} - Err(std::sync::mpsc::TrySendError::Full(_)) => continue, + Ok(()) => { + if is_recovery_keyframe { + waiting_for_hevc_keyframe = false; + } + } + Err(std::sync::mpsc::TrySendError::Full(_)) => { + if recover_hevc_after_drops { + waiting_for_hevc_keyframe = true; + } + continue; + } Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break, } } diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index 4fc169a..f285514 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -66,7 +66,8 @@ include!("camera/bus_and_encoder.rs"); #[cfg(test)] mod tests { use super::{ - CameraCapture, CameraCodec, CameraConfig, resolved_capture_profile, resolved_output_profile, + CameraCapture, CameraCodec, CameraConfig, hevc_keyframe_interval, resolved_capture_profile, + resolved_output_profile, }; use serial_test::serial; @@ -194,6 +195,31 @@ mod tests { }); } + #[test] + #[serial] + /// HEVC should recover quickly after freshness drops without changing H.264 knobs. + fn hevc_keyframe_interval_defaults_short_and_honors_overrides() { + temp_env::with_vars( + [ + ("LESAVKA_CAM_KEYFRAME_INTERVAL", None::<&str>), + ("LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL", None::<&str>), + ], + || { + assert_eq!(hevc_keyframe_interval(30), 3); + assert_eq!(hevc_keyframe_interval(2), 2); + }, + ); + temp_env::with_vars( + [ + ("LESAVKA_CAM_KEYFRAME_INTERVAL", Some("5")), + ("LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL", Some("1")), + ], + || { + assert_eq!(hevc_keyframe_interval(30), 1); + }, + ); + } + #[cfg(coverage)] #[test] /// Coverage builds use a deterministic HEVC encoder choice. diff --git a/client/src/input/camera/capture_pipeline.rs b/client/src/input/camera/capture_pipeline.rs index 5dc56b6..d0bd4d8 100644 --- a/client/src/input/camera/capture_pipeline.rs +++ b/client/src/input/camera/capture_pipeline.rs @@ -62,7 +62,11 @@ impl CameraCapture { let capture_profile = capture_profile_override.unwrap_or_else(|| resolved_capture_profile(cfg)); let (capture_width, capture_height, capture_fps) = capture_profile; let (width, height, fps) = resolved_output_profile(cfg, capture_profile); - let keyframe_interval = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps); + let keyframe_interval = if output_hevc { + hevc_keyframe_interval(fps) + } else { + env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps) + }; let source_profile = camera_source_profile(allow_mjpg_source); let use_mjpg_source = source_profile == CameraSourceProfile::Mjpeg; let passthrough_mjpg_source = @@ -346,6 +350,18 @@ fn env_flag_enabled(name: &str) -> bool { }) } +/// Choose the live HEVC keyframe cadence. +/// +/// Inputs: target FPS plus optional `LESAVKA_CAM_KEYFRAME_INTERVAL` and +/// `LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL` overrides. Output: GOP length in +/// frames. Why: the uplink intentionally drops stale HEVC bundles for +/// freshness, so short GOPs keep decoder recovery below a human-visible blink. +fn hevc_keyframe_interval(fps: u32) -> u32 { + let fps = fps.max(1); + let generic_default = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(3)); + env_u32("LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL", generic_default).clamp(1, fps) +} + /// Keeps `log_camera_first_packet` explicit because it sits on camera selection, where negotiated profiles must match the server output contract. /// Inputs are the typed parameters; output is the return value or side effect. fn log_camera_first_packet(packet_index: u64, bytes: usize, pts_us: u64) { diff --git a/client/src/sync_probe/capture/video_packets.rs b/client/src/sync_probe/capture/video_packets.rs index e40a328..b2b9495 100644 --- a/client/src/sync_probe/capture/video_packets.rs +++ b/client/src/sync_probe/capture/video_packets.rs @@ -264,7 +264,7 @@ fn pick_hevc_encoder(fps: u32) -> Result { /// real webcam uplink; a one-second GOP made coded flashes less representative /// than Lesavka's default live-call HEVC pipeline. fn low_latency_hevc_keyframe_interval(fps: u32) -> u32 { - fps.clamp(1, 5) + fps.clamp(1, 3) } /// Select the visual signature for a video timestamp. @@ -406,9 +406,9 @@ mod tests { fn low_latency_hevc_keyframe_interval_matches_live_camera_default() { assert_eq!(super::low_latency_hevc_keyframe_interval(0), 1); assert_eq!(super::low_latency_hevc_keyframe_interval(1), 1); - assert_eq!(super::low_latency_hevc_keyframe_interval(5), 5); - assert_eq!(super::low_latency_hevc_keyframe_interval(20), 5); - assert_eq!(super::low_latency_hevc_keyframe_interval(30), 5); + assert_eq!(super::low_latency_hevc_keyframe_interval(5), 3); + assert_eq!(super::low_latency_hevc_keyframe_interval(20), 3); + assert_eq!(super::low_latency_hevc_keyframe_interval(30), 3); } /// Verifies encoded packet timestamps come from the encoder output sample. diff --git a/common/Cargo.toml b/common/Cargo.toml index 3020764..340a50c 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.21.11" +version = "0.21.12" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index fde60ef..bdf8490 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.21.11" +version = "0.21.12" edition = "2024" autobins = false diff --git a/server/src/bin/lesavka-uvc.real.inc b/server/src/bin/lesavka-uvc.real.inc index 6f579e5..2722e93 100644 --- a/server/src/bin/lesavka-uvc.real.inc +++ b/server/src/bin/lesavka-uvc.real.inc @@ -45,7 +45,7 @@ const V4L2_BUF_TYPE_VIDEO_OUTPUT: u32 = 2; const V4L2_MEMORY_MMAP: u32 = 1; const V4L2_FIELD_NONE: u32 = 1; const V4L2_PIX_FMT_MJPEG: u32 = u32::from_le_bytes(*b"MJPG"); -const MAX_MJPEG_FRAME_BYTES: usize = 1024 * 1024; +const MAX_MJPEG_FRAME_BYTES: usize = 8 * 1024 * 1024; const EMPTY_MJPEG_FRAME: &[u8] = &[0xff, 0xd8, 0xff, 0xd9]; const DEFAULT_UVC_BUFFER_COUNT: u32 = 2; const DEFAULT_UVC_IDLE_PUMP_MS: u64 = 2; @@ -400,10 +400,11 @@ impl UvcVideoStream { let Some(buffer) = self.buffers.get(index as usize) else { return Ok(()); }; - let bytes = self.latest_frame.len().min(buffer.len); + let frame = self.frame_for_buffer(buffer.len); + let bytes = frame.len(); if bytes > 0 { unsafe { - std::ptr::copy_nonoverlapping(self.latest_frame.as_ptr(), buffer.ptr, bytes); + std::ptr::copy_nonoverlapping(frame.as_ptr(), buffer.ptr, bytes); } } else { unsafe { @@ -428,8 +429,9 @@ impl UvcVideoStream { if stale && looks_like_mjpeg_frame(&self.latest_frame) { return; } + let max_frame_bytes = self.frame_payload_limit(); if let Ok(frame) = std::fs::read(&self.frame_path) - && frame.len() <= MAX_MJPEG_FRAME_BYTES + && frame.len() <= max_frame_bytes && looks_like_mjpeg_frame(&frame) { self.latest_frame = frame; @@ -437,6 +439,22 @@ impl UvcVideoStream { self.latest_frame = EMPTY_MJPEG_FRAME.to_vec(); } } + + fn frame_payload_limit(&self) -> usize { + self.buffers + .iter() + .map(|buffer| buffer.len) + .min() + .unwrap_or(MAX_MJPEG_FRAME_BYTES) + } + + fn frame_for_buffer(&self, buffer_len: usize) -> &[u8] { + if self.latest_frame.len() <= buffer_len && looks_like_mjpeg_frame(&self.latest_frame) { + &self.latest_frame + } else { + EMPTY_MJPEG_FRAME + } + } } impl Drop for UvcVideoStream { diff --git a/testing/tests/server_uvc_binary_extra_contract.rs b/testing/tests/server_uvc_binary_extra_contract.rs index 8399e87..8fbc486 100644 --- a/testing/tests/server_uvc_binary_extra_contract.rs +++ b/testing/tests/server_uvc_binary_extra_contract.rs @@ -400,6 +400,29 @@ mod uvc_binary_extra { )); } + #[test] + fn uvc_frame_refresh_rejects_frames_that_would_be_truncated() { + let frame = NamedTempFile::new().expect("tmp frame"); + let mut stream = UvcVideoStream::new(-1); + stream.frame_path = frame.path().to_path_buf(); + stream.latest_frame = vec![0xff, 0xd8, 0x11, 0xff, 0xd9]; + stream.buffers.push(MmapBuffer { + ptr: std::ptr::null_mut(), + len: 8, + }); + + fs::write( + frame.path(), + [0xff, 0xd8, 1, 2, 3, 4, 5, 6, 7, 8, 0xff, 0xd9], + ) + .expect("write oversize frame"); + stream.refresh_latest_frame(); + + assert_eq!(stream.latest_frame, vec![0xff, 0xd8, 0x11, 0xff, 0xd9]); + assert_eq!(stream.frame_payload_limit(), 8); + assert_eq!(stream.frame_for_buffer(4), EMPTY_MJPEG_FRAME); + } + #[test] fn compute_payload_cap_clamps_limit_pct_bounds() { with_var("LESAVKA_UVC_MAXPAYLOAD_LIMIT", None::<&str>, || {