diff --git a/Cargo.lock b/Cargo.lock index 006044b..573cd53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.45" +version = "0.22.46" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.45" +version = "0.22.46" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.45" +version = "0.22.46" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index abec7f2..a438f9a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.45" +version = "0.22.46" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index d3b8086..20fb27e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.45" +version = "0.22.46" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index 7fd52f1..cb41361 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -612,6 +612,7 @@ These entries are intentionally concise because most are manual lab or CI harnes | `LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY` | server direct-MJPEG normalization JPEG quality; defaults to `72` to reduce browser-facing UVC bitstream pressure | | `LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES` | server direct-MJPEG guard baseline; frames smaller than this do not establish the last-good reference | | `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE` | server direct-MJPEG normalization toggle; defaults on so camera MJPEG is decoded/re-encoded before the UVC helper | +| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT` | server direct-MJPEG normalization recovery threshold; after this many consecutive empty pulls, the session falls back to guarded passthrough | | `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS` | server direct-MJPEG normalization appsink timeout; defaults to `25`ms and is capped at `50`ms to avoid live backlog | | `LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT` | server direct-MJPEG corruption guard threshold; frames below this percentage of the last good reference are frozen out | | `LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD` | server direct-MJPEG corruption guard toggle; defaults on so obvious collapsed or flat payloads freeze the last good frame | diff --git a/scripts/install/server.sh b/scripts/install/server.sh index cef00d1..313bea6 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -1611,6 +1611,7 @@ SERVER_ENV_TMP=$(mktemp) printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}" printf 'LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}" printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}" + printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT:-30}" printf 'LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD:-1}" printf 'LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT:-18}" printf 'LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES:-49152}" diff --git a/server/Cargo.toml b/server/Cargo.toml index 5134341..236a466 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.45" +version = "0.22.46" edition = "2024" autobins = false diff --git a/server/src/video_sinks/hevc_mjpeg_guard.rs b/server/src/video_sinks/hevc_mjpeg_guard.rs index 45acacb..d57adc9 100644 --- a/server/src/video_sinks/hevc_mjpeg_guard.rs +++ b/server/src/video_sinks/hevc_mjpeg_guard.rs @@ -11,6 +11,7 @@ const DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT: bool = false; const DEFAULT_DIRECT_MJPEG_NORMALIZE: bool = true; const DEFAULT_DIRECT_MJPEG_JPEG_QUALITY: u32 = 72; const DEFAULT_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS: u32 = 25; +const DEFAULT_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT: u32 = 30; /// Summarizes one compressed MJPEG frame without fully decoding pixels. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -234,6 +235,20 @@ pub(super) fn direct_mjpeg_normalize_pull_timeout_ms() -> u32 { .min(50) } +/// Bound how many consecutive normalization misses are allowed before bypass. +/// +/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT`, clamped +/// to 1..=300. Output: miss count. Why: the direct-MJPEG normalizer is an +/// optional sanitizer, not a reason to freeze the conference camera forever if +/// GStreamer negotiation starves on one host. +pub(super) fn direct_mjpeg_normalize_miss_limit() -> u32 { + env_u32( + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT", + DEFAULT_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT, + ) + .clamp(1, 300) +} + /// Return whether a decoded buffer looks like one complete JPEG image. /// /// Inputs: decoded MJPEG bytes. Output: true when SOI, SOS, and EOI markers @@ -528,11 +543,16 @@ mod tests { "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS", None::<&str>, ), + ( + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT", + None::<&str>, + ), ], || { assert!(super::direct_mjpeg_normalize_enabled()); assert_eq!(super::direct_mjpeg_jpeg_quality(), 72); assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 25); + assert_eq!(super::direct_mjpeg_normalize_miss_limit(), 30); }, ); @@ -544,17 +564,30 @@ mod tests { "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS", Some("999"), ), + ( + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT", + Some("999"), + ), ], || { assert!(!super::direct_mjpeg_normalize_enabled()); assert_eq!(super::direct_mjpeg_jpeg_quality(), 100); assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 50); + assert_eq!(super::direct_mjpeg_normalize_miss_limit(), 300); }, ); temp_env::with_var("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", Some("0"), || { assert_eq!(super::direct_mjpeg_jpeg_quality(), 1); }); + + temp_env::with_var( + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT", + Some("0"), + || { + assert_eq!(super::direct_mjpeg_normalize_miss_limit(), 1); + }, + ); } #[test] diff --git a/server/src/video_sinks/webcam_sink.rs b/server/src/video_sinks/webcam_sink.rs index f2b2681..d9db098 100644 --- a/server/src/video_sinks/webcam_sink.rs +++ b/server/src/video_sinks/webcam_sink.rs @@ -52,6 +52,7 @@ pub struct WebcamSink { uvc_height: u16, direct_mjpeg_profile_mismatch_seen: AtomicBool, last_decoded_mjpeg_bytes: AtomicU64, + direct_mjpeg_normalize_bypassed: AtomicBool, normalized_mjpeg_miss_count: AtomicU64, decoded_mjpeg_miss_count: AtomicU64, decode_recovery_needs_irap: AtomicBool, @@ -199,7 +200,7 @@ fn build_direct_mjpeg_normalize_branch( pipeline: &gst::Pipeline, width: i32, height: i32, - fps: i32, + _fps: i32, ) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> { let src = gst::ElementFactory::make("appsrc") .name("direct_mjpeg_normalize_src") @@ -208,22 +209,18 @@ fn build_direct_mjpeg_normalize_branch( .expect("direct MJPEG normalize appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); - src.set_property("do-timestamp", false); + src.set_property("do-timestamp", true); configure_uvc_appsrc(&src); - let caps_in = gst::Caps::builder("image/jpeg") - .field("framerate", gst::Fraction::new(fps, 1)) - .build(); + let caps_in = gst::Caps::builder("image/jpeg").build(); src.set_caps(Some(&caps_in)); let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) - .field("framerate", gst::Fraction::new(fps, 1)) .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .field("colorimetry", "2:4:7:1") .build(); - let jpegparse = gst::ElementFactory::make("jpegparse").build()?; let decoder = gst::ElementFactory::make("jpegdec").build()?; let decoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_decoded_queue")?; let convert = gst::ElementFactory::make("videoconvert").build()?; @@ -231,7 +228,6 @@ fn build_direct_mjpeg_normalize_branch( let raw_caps = gst::Caps::builder("video/x-raw") .field("width", width) .field("height", height) - .field("framerate", gst::Fraction::new(fps, 1)) .build(); let raw_capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &raw_caps) @@ -259,7 +255,6 @@ fn build_direct_mjpeg_normalize_branch( pipeline.add_many([ src.upcast_ref(), - &jpegparse, &decoder, &decoded_queue, &convert, @@ -272,7 +267,6 @@ fn build_direct_mjpeg_normalize_branch( ])?; gst::Element::link_many([ src.upcast_ref(), - &jpegparse, &decoder, &decoded_queue, &convert, @@ -507,6 +501,7 @@ impl WebcamSink { uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16, direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false), last_decoded_mjpeg_bytes: AtomicU64::new(0), + direct_mjpeg_normalize_bypassed: AtomicBool::new(false), normalized_mjpeg_miss_count: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0), decode_recovery_needs_irap: AtomicBool::new(false), @@ -810,6 +805,7 @@ impl WebcamSink { uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16, direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false), last_decoded_mjpeg_bytes: AtomicU64::new(0), + direct_mjpeg_normalize_bypassed: AtomicBool::new(false), normalized_mjpeg_miss_count: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0), decode_recovery_needs_irap: AtomicBool::new(false), @@ -996,11 +992,21 @@ impl WebcamSink { return; } - if self.direct_mjpeg_appsrc.is_some() && self.normalized_mjpeg_sink.is_some() { + if self.direct_mjpeg_appsrc.is_some() + && self.normalized_mjpeg_sink.is_some() + && !self + .direct_mjpeg_normalize_bypassed + .load(std::sync::atomic::Ordering::Relaxed) + { self.spool_normalized_direct_mjpeg_frame(path, pkt); return; } + self.spool_passthrough_direct_mjpeg_frame(path, pkt); + } + + #[cfg(not(coverage))] + fn spool_passthrough_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts); if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) { warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper"); @@ -1013,25 +1019,24 @@ impl WebcamSink { #[cfg(not(coverage))] fn spool_normalized_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { let Some(src) = self.direct_mjpeg_appsrc.as_ref() else { + self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; }; let Some(sink) = self.normalized_mjpeg_sink.as_ref() else { + self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; }; - let mut buf = gst::Buffer::from_slice(pkt.data.clone()); - if let Some(meta) = buf.get_mut() { - let ts = gst::ClockTime::from_useconds(pkt.pts); - meta.set_pts(Some(ts)); - meta.set_dts(Some(ts)); - meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); - } + let buf = gst::Buffer::from_slice(pkt.data.clone()); if let Err(err) = src.push_buffer(buf) { tracing::warn!( target:"lesavka_server::video", %err, - "📸⚠️ direct MJPEG normalization appsrc push failed" + "📸⚠️ direct MJPEG normalization appsrc push failed; falling back to guarded passthrough" ); + self.direct_mjpeg_normalize_bypassed + .store(true, std::sync::atomic::Ordering::Relaxed); + self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; } @@ -1040,19 +1045,46 @@ impl WebcamSink { .normalized_mjpeg_miss_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; + let limit = u64::from(hevc_mjpeg_guard::direct_mjpeg_normalize_miss_limit()); if misses == 1 || misses % 30 == 0 { warn!( target:"lesavka_server::video", misses, - "📸⚠️ direct MJPEG normalization produced no fresh frame; freezing last good UVC frame" + limit, + "📸⚠️ direct MJPEG normalization produced no fresh frame; freezing last good UVC frame until fallback threshold" ); } + if misses >= limit { + self.direct_mjpeg_normalize_bypassed + .store(true, std::sync::atomic::Ordering::Relaxed); + warn!( + target:"lesavka_server::video", + misses, + limit, + "📸⚠️ direct MJPEG normalization starved; falling back to guarded passthrough for this webcam session" + ); + self.spool_passthrough_direct_mjpeg_frame(path, pkt); + } return; }; let Some(buffer) = sample.buffer() else { + self.direct_mjpeg_normalize_bypassed + .store(true, std::sync::atomic::Ordering::Relaxed); + warn!( + target:"lesavka_server::video", + "📸⚠️ direct MJPEG normalization returned an empty sample; falling back to guarded passthrough" + ); + self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; }; let Ok(map) = buffer.map_readable() else { + self.direct_mjpeg_normalize_bypassed + .store(true, std::sync::atomic::Ordering::Relaxed); + warn!( + target:"lesavka_server::video", + "📸⚠️ direct MJPEG normalization returned an unreadable sample; falling back to guarded passthrough" + ); + self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; }; let normalized = map.as_slice(); diff --git a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs index f58db92..1a129d5 100644 --- a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs +++ b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs @@ -123,12 +123,15 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() { "last_decoded_mjpeg_bytes", "last_mjpeg_passthrough_bytes", "direct_mjpeg_normalize_src", + "direct_mjpeg_normalize_bypassed", "mjpeg_normalized", "should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())", "direct_mjpeg_reject_reason(", "spool_direct_mjpeg_frame", + "spool_passthrough_direct_mjpeg_frame", "freezing suspicious decoded HEVC->MJPEG frame", "freezing suspicious direct MJPEG frame before UVC spool", + "direct MJPEG normalization starved; falling back to guarded passthrough", ] { assert!( WEBCAM_SINK.contains(marker), diff --git a/tests/installer/scripts/install/server_install_script_contract.rs b/tests/installer/scripts/install/server_install_script_contract.rs index 3b83759..81d3606 100644 --- a/tests/installer/scripts/install/server_install_script_contract.rs +++ b/tests/installer/scripts/install/server_install_script_contract.rs @@ -38,6 +38,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s", "LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s", "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s", + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s", "LESAVKA_SERVER_BIND_ADDR=%s", "/etc/lesavka/uvc.env", "LESAVKA_UVC_MAXPACKET=", @@ -175,6 +176,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT:-30}")); assert!( SERVER_INSTALL.contains("lesavka_server::video=info"), "server installs should not leave the hot webcam frame path at debug logging by default" diff --git a/tests/system/scripts/install/systemd_unit_env_contract.rs b/tests/system/scripts/install/systemd_unit_env_contract.rs index 204e344..7849105 100644 --- a/tests/system/scripts/install/systemd_unit_env_contract.rs +++ b/tests/system/scripts/install/systemd_unit_env_contract.rs @@ -86,6 +86,7 @@ fn server_env_persists_runtime_profile_and_tls_settings() { "LESAVKA_UPSTREAM_MJPEG_VIDEO_PLAYOUT_MODE_OFFSETS_US=%s", "LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP=%s", "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s", + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s", "LESAVKA_SERVER_BIND_ADDR=%s", "LESAVKA_REQUIRE_TLS=%s", "LESAVKA_TLS_CLIENT_CA=%s",