diff --git a/Cargo.lock b/Cargo.lock index 4c1e394..de0141a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.21.12" +version = "0.21.13" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.21.12" +version = "0.21.13" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.21.12" +version = "0.21.13" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 20464c3..6008613 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.21.12" +version = "0.21.13" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 340a50c..8c3a988 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.21.12" +version = "0.21.13" edition = "2024" build = "build.rs" diff --git a/scripts/install/server.sh b/scripts/install/server.sh index b91f3e4..7347213 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -1148,6 +1148,10 @@ fi printf 'LESAVKA_UPSTREAM_PAIR_SLACK_US=%s\n' "${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}" printf 'LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS=%s\n' "${LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS:-350}" printf 'LESAVKA_UPSTREAM_STALE_DROP_MS=%s\n' "${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}" + printf 'LESAVKA_UVC_HEVC_JPEG_QUALITY=%s\n' "${LESAVKA_UVC_HEVC_JPEG_QUALITY:-82}" + printf 'LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP=%s\n' "${LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP:-1}" + printf 'LESAVKA_UVC_HEVC_SIZE_DROP_PCT=%s\n' "${LESAVKA_UVC_HEVC_SIZE_DROP_PCT:-45}" + printf 'LESAVKA_UVC_HEVC_MIN_REFERENCE_BYTES=%s\n' "${LESAVKA_UVC_HEVC_MIN_REFERENCE_BYTES:-65536}" printf 'LESAVKA_SERVER_BIND_ADDR=%s\n' "${INSTALL_SERVER_BIND_ADDR}" printf 'LESAVKA_UVC_CODEC=%s\n' "${INSTALL_UVC_CODEC}" printf 'LESAVKA_UVC_WIDTH=%s\n' "${LESAVKA_UVC_WIDTH:-1280}" diff --git a/server/Cargo.toml b/server/Cargo.toml index bdf8490..f80d839 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.21.12" +version = "0.21.13" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index f51d212..c3ecd65 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -172,6 +172,41 @@ fn media_v2_drop_late_plan(plan: &PlannedUpstreamPacket) -> bool { plan.late_by >= media_v2_max_live_age() } +#[cfg(not(coverage))] +/// Decide whether server-side HEVC recovery should hold a video packet. +/// +/// Inputs: current recovery state, camera codec, and candidate video packet. +/// Output: true when the packet should not be handed to the decoder yet. Why: +/// server freshness drops can create the same HEVC reference gap as client +/// queue drops, and feeding the next delta frame produces block tearing. +fn media_v2_should_hold_hevc_video_for_recovery( + waiting_for_keyframe: bool, + codec: camera::CameraCodec, + video: Option<&VideoPacket>, +) -> bool { + waiting_for_keyframe + && matches!(codec, camera::CameraCodec::Hevc) + && video.is_some_and(|packet| { + !lesavka_server::video_support::contains_hevc_irap(&packet.data) + }) +} + +#[cfg(not(coverage))] +/// Report whether a video packet can repair a server-side HEVC reference gap. +/// +/// Inputs: camera codec and candidate video packet. Output: true for HEVC IRAP +/// frames. Why: after the server drops a late HEVC packet for freshness, this +/// tells the relay when it is safe to resume motion without visible corruption. +fn media_v2_has_hevc_recovery_keyframe( + codec: camera::CameraCodec, + video: Option<&VideoPacket>, +) -> bool { + matches!(codec, camera::CameraCodec::Hevc) + && video.is_some_and(|packet| { + lesavka_server::video_support::contains_hevc_irap(&packet.data) + }) +} + #[cfg(not(coverage))] /// Keeps `sleep_until_media_v2` explicit because it sits on relay RPC orchestration, where hardware failures must surface without stopping the server. /// Inputs are the typed parameters; output is the return value or side effect. diff --git a/server/src/main/relay_service/upstream_media_rpc.rs b/server/src/main/relay_service/upstream_media_rpc.rs index 325280c..b268c7e 100644 --- a/server/src/main/relay_service/upstream_media_rpc.rs +++ b/server/src/main/relay_service/upstream_media_rpc.rs @@ -57,6 +57,7 @@ impl Handler { let mut inbound = req.into_inner(); let mut last_bundle_session_id = None; let mut last_bundle_seq = None; + let mut waiting_for_hevc_keyframe = false; let mut outcome = "aborted"; let (audio_handoff_tx, audio_handoff_rx) = tokio::sync::mpsc::channel::(32); @@ -123,6 +124,9 @@ impl Handler { continue; }; if facts.has_audio && facts.has_video && facts.capture_span_us > MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US { + if matches!(camera_cfg.codec, camera::CameraCodec::Hevc) && facts.has_video { + waiting_for_hevc_keyframe = true; + } warn!( rpc_id, session_id = camera_lease.session_id, @@ -148,6 +152,9 @@ impl Handler { let (video_offset_us, audio_offset_us) = upstream_media_rt.playout_offsets(); let Some(schedule) = media_v2_handoff_schedule(facts, audio_offset_us, video_offset_us) else { if facts.has_video { + if matches!(camera_cfg.codec, camera::CameraCodec::Hevc) { + waiting_for_hevc_keyframe = true; + } upstream_media_rt.record_video_freeze( "v2 dropped stale bundled A/V before UVC/UAC handoff", ); @@ -198,25 +205,48 @@ impl Handler { ); break; } - if schedule.video_due_at.is_some() - && let Some(scheduled_video) = prepare_media_v2_video( + let hold_video_for_hevc_recovery = + media_v2_should_hold_hevc_video_for_recovery( + waiting_for_hevc_keyframe, + camera_cfg.codec, + bundle.video.as_ref(), + ); + if hold_video_for_hevc_recovery { + upstream_media_rt.record_video_freeze( + "v2 held HEVC delta frame until next recovery keyframe", + ); + bundle.video.take(); + } + let video_recovers_hevc_gap = + media_v2_has_hevc_recovery_keyframe(camera_cfg.codec, bundle.video.as_ref()); + let scheduled_video = if schedule.video_due_at.is_some() { + prepare_media_v2_video( bundle.video.take(), &upstream_media_rt, bundle_base_remote_pts_us, bundle_epoch, frame_step_us, ) - && video_handoff_tx - .send(scheduled_video) - .await - .is_err() + } else { + None + }; + if let Some(scheduled_video) = scheduled_video { + if video_handoff_tx.send(scheduled_video).await.is_err() { + warn!( + rpc_id, + session_id = camera_lease.session_id, + "📦 v2 video handoff worker stopped while receiving bundled media" + ); + break; + } + if video_recovers_hevc_gap { + waiting_for_hevc_keyframe = false; + } + } else if matches!(camera_cfg.codec, camera::CameraCodec::Hevc) + && facts.has_video + && !hold_video_for_hevc_recovery { - warn!( - rpc_id, - session_id = camera_lease.session_id, - "📦 v2 video handoff worker stopped while receiving bundled media" - ); - break; + waiting_for_hevc_keyframe = true; } } diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index 10dd1d0..94d58fe 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -2,10 +2,13 @@ #[allow(clippy::items_after_test_module)] mod tests { use super::{ - MediaV2BundleFacts, UpstreamStreamCleanup, media_v2_handoff_schedule, - media_v2_frame_step_us, prepare_media_v2_audio, prepare_media_v2_video, - retain_freshest_audio_packet, retain_freshest_video_packet, summarize_media_v2_bundle, + MediaV2BundleFacts, UpstreamStreamCleanup, media_v2_frame_step_us, + media_v2_handoff_schedule, media_v2_has_hevc_recovery_keyframe, + media_v2_should_hold_hevc_video_for_recovery, prepare_media_v2_audio, + prepare_media_v2_video, retain_freshest_audio_packet, retain_freshest_video_packet, + summarize_media_v2_bundle, }; + use crate::camera::CameraCodec; use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket}; use lesavka_server::upstream_media_runtime::{ UpstreamClientTiming, UpstreamMediaKind, UpstreamMediaRuntime, @@ -176,6 +179,39 @@ mod tests { assert!(media_v2_handoff_schedule(facts, 0, 0).is_none()); } + #[test] + /// Keeps server HEVC drop recovery explicit because late-drop freshness can otherwise corrupt decoded video. + fn media_v2_hevc_recovery_holds_delta_until_keyframe() { + let delta = VideoPacket { + data: vec![0, 0, 0, 1, 0x02, 0x01, 0xaa], + ..Default::default() + }; + let keyframe = VideoPacket { + data: vec![0, 0, 0, 1, 0x26, 0x01, 0xaa], + ..Default::default() + }; + + assert!(media_v2_should_hold_hevc_video_for_recovery( + true, + CameraCodec::Hevc, + Some(&delta), + )); + assert!(!media_v2_should_hold_hevc_video_for_recovery( + true, + CameraCodec::Hevc, + Some(&keyframe), + )); + assert!(!media_v2_should_hold_hevc_video_for_recovery( + true, + CameraCodec::Mjpeg, + Some(&delta), + )); + assert!(media_v2_has_hevc_recovery_keyframe( + CameraCodec::Hevc, + Some(&keyframe), + )); + } + #[test] /// Keeps `media_v2_preparation_anchors_audio_and_video_to_one_capture_epoch` explicit because the bundled path must not let network receive cadence become video playout cadence. /// Inputs are one bundle's client capture PTS values; output proves audio diff --git a/server/src/video_sinks.rs b/server/src/video_sinks.rs index c9a9f23..46dd26d 100644 --- a/server/src/video_sinks.rs +++ b/server/src/video_sinks.rs @@ -1,4 +1,5 @@ // Camera sink pipelines for UVC webcam output and HDMI capture adapters. +mod hevc_mjpeg_guard; include!("video_sinks/webcam_sink.rs"); include!("video_sinks/hdmi_sink.rs"); include!("video_sinks/camera_relay.rs"); diff --git a/server/src/video_sinks/hevc_mjpeg_guard.rs b/server/src/video_sinks/hevc_mjpeg_guard.rs new file mode 100644 index 0000000..70ccc7e --- /dev/null +++ b/server/src/video_sinks/hevc_mjpeg_guard.rs @@ -0,0 +1,120 @@ +use crate::video_support::env_u32; + +const DEFAULT_HEVC_JPEG_QUALITY: u32 = 82; +const DEFAULT_HEVC_SIZE_DROP_PCT: u32 = 45; +const DEFAULT_HEVC_MIN_REFERENCE_BYTES: u32 = 64 * 1024; + +/// Resolve the JPEG quality used after HEVC decode. +/// +/// Inputs: optional `LESAVKA_UVC_HEVC_JPEG_QUALITY`, clamped to 1..=100. +/// Output: the `jpegenc` quality value. Why: HEVC ingress must become MJPEG +/// for the existing UVC gadget path, and a slightly smaller JPEG lowers USB +/// and browser pressure without changing the calibrated A/V timing model. +pub(super) fn hevc_jpeg_quality() -> u32 { + env_u32("LESAVKA_UVC_HEVC_JPEG_QUALITY", DEFAULT_HEVC_JPEG_QUALITY).clamp(1, 100) +} + +/// Decide whether suspicious decoded-frame freezing is enabled. +/// +/// Inputs: optional `LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP`. Output: true unless +/// explicitly disabled. Why: when one damaged decoded MJPEG frame appears, a +/// short freeze is less disruptive than showing a grey slab or torn half-frame. +pub(super) fn freeze_on_size_drop_enabled() -> bool { + std::env::var("LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP") + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(true) +} + +/// Resolve the frame-size drop percentage that triggers a freeze. +/// +/// Inputs: optional `LESAVKA_UVC_HEVC_SIZE_DROP_PCT`, clamped to 1..=95. +/// Output: next-frame size percentage of the last good frame. Why: the guard +/// should catch sudden damaged-frame collapses while still allowing normal +/// bitrate variation from scene motion and encoder decisions. +pub(super) fn size_drop_pct() -> u32 { + env_u32("LESAVKA_UVC_HEVC_SIZE_DROP_PCT", DEFAULT_HEVC_SIZE_DROP_PCT).clamp(1, 95) +} + +/// Resolve the minimum reference frame size for collapse detection. +/// +/// Inputs: optional `LESAVKA_UVC_HEVC_MIN_REFERENCE_BYTES`. Output: byte count. +/// Why: tiny synthetic or blank frames should not establish a baseline that +/// causes later healthy low-detail frames to be frozen. +pub(super) fn min_reference_bytes() -> u32 { + env_u32( + "LESAVKA_UVC_HEVC_MIN_REFERENCE_BYTES", + DEFAULT_HEVC_MIN_REFERENCE_BYTES, + ) + .max(1) +} + +/// Decide whether a decoded HEVC-to-MJPEG frame should be frozen out. +/// +/// Inputs: byte length of the last successfully spooled decoded MJPEG and the +/// next decoded MJPEG. Output: true when the next frame looks like a damaged +/// collapse. Why: keeping the last good frame preserves freshness and sync +/// better than forwarding a syntactically valid but visually corrupted JPEG. +pub(super) fn should_freeze_decoded_mjpeg(previous_bytes: u64, next_bytes: usize) -> bool { + if !freeze_on_size_drop_enabled() || previous_bytes < u64::from(min_reference_bytes()) { + return false; + } + + let next_bytes = next_bytes as u64; + let threshold_bytes = previous_bytes.saturating_mul(u64::from(size_drop_pct())) / 100; + next_bytes < threshold_bytes +} + +#[cfg(test)] +mod tests { + #[test] + fn hevc_jpeg_quality_defaults_to_moderate_transport_pressure() { + temp_env::with_var_unset("LESAVKA_UVC_HEVC_JPEG_QUALITY", || { + assert_eq!(super::hevc_jpeg_quality(), 82); + }); + + temp_env::with_var("LESAVKA_UVC_HEVC_JPEG_QUALITY", Some("101"), || { + assert_eq!(super::hevc_jpeg_quality(), 100); + }); + + temp_env::with_var("LESAVKA_UVC_HEVC_JPEG_QUALITY", Some("0"), || { + assert_eq!(super::hevc_jpeg_quality(), 1); + }); + } + + #[test] + fn freeze_guard_catches_large_decoded_frame_collapses() { + temp_env::with_vars( + [ + ("LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP", Some("1")), + ("LESAVKA_UVC_HEVC_SIZE_DROP_PCT", Some("45")), + ("LESAVKA_UVC_HEVC_MIN_REFERENCE_BYTES", Some("65536")), + ], + || { + assert!(super::should_freeze_decoded_mjpeg(200_000, 80_000)); + assert!(!super::should_freeze_decoded_mjpeg(200_000, 110_000)); + assert!(!super::should_freeze_decoded_mjpeg(20_000, 1_000)); + }, + ); + } + + #[test] + fn freeze_guard_can_be_disabled_for_diagnostics() { + temp_env::with_vars( + [ + ("LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP", Some("0")), + ("LESAVKA_UVC_HEVC_SIZE_DROP_PCT", Some("95")), + ("LESAVKA_UVC_HEVC_MIN_REFERENCE_BYTES", Some("1")), + ], + || { + assert!(!super::should_freeze_decoded_mjpeg(200_000, 1_000)); + }, + ); + } +} diff --git a/server/src/video_sinks/webcam_sink.rs b/server/src/video_sinks/webcam_sink.rs index 517f08a..afebac1 100644 --- a/server/src/video_sinks/webcam_sink.rs +++ b/server/src/video_sinks/webcam_sink.rs @@ -34,6 +34,7 @@ pub struct WebcamSink { frame_step_us: u64, mjpeg_spool_path: Option, decoded_mjpeg_sink: Option, + last_decoded_mjpeg_bytes: AtomicU64, } fn uvc_sink_session_clock_align_enabled() -> bool { @@ -105,6 +106,7 @@ impl WebcamSink { frame_step_us, mjpeg_spool_path: None, decoded_mjpeg_sink: None, + last_decoded_mjpeg_bytes: AtomicU64::new(0), }) } @@ -216,11 +218,7 @@ impl WebcamSink { .with_context(|| format!("building HEVC decoder element {decoder_name}"))?; let convert = gst::ElementFactory::make("videoconvert").build()?; let encoder = gst::ElementFactory::make("jpegenc") - .property( - "quality", - crate::video_support::env_u32("LESAVKA_UVC_HEVC_JPEG_QUALITY", 90) - .clamp(1, 100) as i32, - ) + .property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32) .build()?; let caps = gst::ElementFactory::make("capsfilter") .property("caps", &caps_mjpeg) @@ -354,6 +352,7 @@ impl WebcamSink { frame_step_us, mjpeg_spool_path: mjpeg_spool_file, decoded_mjpeg_sink, + last_decoded_mjpeg_bytes: AtomicU64::new(0), }) } @@ -407,8 +406,24 @@ impl WebcamSink { { let decoded_pts_us = buffer.pts().map(|pts| pts.nseconds() / 1_000); let timing = MjpegSpoolTiming::hevc_decoded_mjpeg(pkt.pts, decoded_pts_us); + let previous_bytes = self + .last_decoded_mjpeg_bytes + .load(std::sync::atomic::Ordering::Relaxed); + let decoded_bytes = map.as_slice().len(); + if hevc_mjpeg_guard::should_freeze_decoded_mjpeg(previous_bytes, decoded_bytes) { + warn!( + target:"lesavka_server::video", + previous_bytes, + next_bytes = decoded_bytes, + "📸⚠️ freezing suspicious decoded HEVC->MJPEG frame" + ); + return; + } if let Err(err) = spool_mjpeg_frame_with_timing(path, map.as_slice(), Some(timing)) { warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool decoded HEVC frame for UVC helper"); + } else { + self.last_decoded_mjpeg_bytes + .store(decoded_bytes as u64, std::sync::atomic::Ordering::Relaxed); } } } diff --git a/server/src/video_support.rs b/server/src/video_support.rs index 14a0362..d9b4e2d 100644 --- a/server/src/video_support.rs +++ b/server/src/video_support.rs @@ -147,6 +147,38 @@ pub fn contains_idr(h264: &[u8]) -> bool { false } +/// Detect whether an HEVC access unit contains an intra recovery point. +/// +/// Inputs: one Annex-B encoded HEVC access unit. Output: `true` when the access +/// unit carries an IRAP NAL. Why: freshness-first upstream media can drop HEVC +/// packets before server decode; after that gap, the decoder must resume from a +/// clean picture instead of a predictive frame with missing references. +#[must_use] +pub fn contains_hevc_irap(hevc: &[u8]) -> bool { + let mut index = 0; + while index + 4 < hevc.len() { + if hevc[index] == 0 && hevc[index + 1] == 0 { + let offset = if hevc[index + 2] == 1 { + 3 + } else if hevc[index + 2] == 0 && hevc[index + 3] == 1 { + 4 + } else { + index += 1; + continue; + }; + let nal_index = index + offset; + if nal_index + 1 < hevc.len() { + let nal_type = (hevc[nal_index] >> 1) & 0x3f; + if (16..=23).contains(&nal_type) { + return true; + } + } + } + index += 1; + } + false +} + /// Compute the next adaptive eye-stream FPS after one reporting window. /// /// Inputs: the current FPS plus the target/min bounds and the sent/dropped @@ -226,8 +258,8 @@ pub fn reserve_local_pts(counter: &AtomicU64, preferred_pts_us: u64, frame_step_ #[cfg(test)] mod tests { use super::{ - adjust_effective_fps, contains_idr, default_eye_fps, env_u32, env_usize, next_local_pts, - reserve_local_pts, should_send_frame, + adjust_effective_fps, contains_hevc_irap, contains_idr, default_eye_fps, env_u32, + env_usize, next_local_pts, reserve_local_pts, should_send_frame, }; use serial_test::serial; use std::sync::atomic::AtomicU64; @@ -248,6 +280,13 @@ mod tests { assert!(!contains_idr(&[0, 0, 0, 1, 0x41, 0x99])); } + #[test] + fn contains_hevc_irap_finds_annex_b_recovery_frames() { + assert!(contains_hevc_irap(&[0, 0, 0, 1, 0x26, 0x01, 0xaa])); + assert!(contains_hevc_irap(&[0, 0, 1, 0x28, 0x01, 0xaa])); + assert!(!contains_hevc_irap(&[0, 0, 0, 1, 0x02, 0x01, 0xaa])); + } + #[test] fn adjust_effective_fps_reacts_to_drop_windows() { assert_eq!(adjust_effective_fps(20, 12, 25, 5, 10), 17);