From cd6241dbfaf7c9035aefac8ae2bff05f75d6e557 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 16 May 2026 02:53:49 -0300 Subject: [PATCH] media: normalize direct mjpeg before uvc spool --- Cargo.lock | 6 +- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- docs/operational-env.md | 6 + scripts/install/server.sh | 6 + server/Cargo.toml | 2 +- server/src/video_sinks/hevc_mjpeg_guard.rs | 88 +++++++ server/src/video_sinks/mjpeg_spool.rs | 32 +++ server/src/video_sinks/webcam_sink.rs | 235 ++++++++++++++++++ .../hevc_mjpeg_guard_chaos_contract.rs | 2 + .../install/server_install_script_contract.rs | 6 + .../install/systemd_unit_env_contract.rs | 1 + 12 files changed, 382 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 46c476e..006044b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.44" +version = "0.22.45" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.44" +version = "0.22.45" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.44" +version = "0.22.45" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index cdab257..abec7f2 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.44" +version = "0.22.45" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index b3ef4b0..d3b8086 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.44" +version = "0.22.45" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index 2128cc2..7fd52f1 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -609,6 +609,12 @@ These entries are intentionally concise because most are manual lab or CI harnes | `LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS` | client live bundled-upstream startup heal delay; defaults to `3000`ms before issuing the safe audio-epoch recovery | | `LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS` | server upstream media timing override; bounds live source lead or playout behavior while tuning client-to-server transport | | `LESAVKA_UVC_CONFIGFS_BASE` | server UVC gadget mode/configfs override used by runtime reconfiguration and hardware-in-the-loop probes | +| `LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY` | server direct-MJPEG normalization JPEG quality; defaults to `72` to reduce browser-facing UVC bitstream pressure | +| `LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES` | server direct-MJPEG guard baseline; frames smaller than this do not establish the last-good reference | +| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE` | server direct-MJPEG normalization toggle; defaults on so camera MJPEG is decoded/re-encoded before the UVC helper | +| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS` | server direct-MJPEG normalization appsink timeout; defaults to `25`ms and is capped at `50`ms to avoid live backlog | +| `LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT` | server direct-MJPEG corruption guard threshold; frames below this percentage of the last good reference are frozen out | +| `LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD` | server direct-MJPEG corruption guard toggle; defaults on so obvious collapsed or flat payloads freeze the last good frame | | `LESAVKA_UVC_HEVC_DOMINANT_BYTE_PCT` | server HEVC-to-MJPEG corruption guard threshold; flat decoded MJPEG payloads with one byte at or above this percentage are frozen out, default `92` | | `LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP` | server HEVC-to-MJPEG corruption guard toggle; defaults on so suspicious decoded frame collapses freeze the last good MJPEG frame | | `LESAVKA_UVC_HEVC_JPEG_QUALITY` | server HEVC-to-MJPEG UVC bridge JPEG quality; defaults to `72` to lower UVC payload pressure while keeping RCT output compatible | diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 0833252..cef00d1 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -1608,6 +1608,12 @@ SERVER_ENV_TMP=$(mktemp) printf 'LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS:-20}" printf 'LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s\n' "${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-2}" printf 'LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s\n' "${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}" + printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}" + printf 'LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}" + printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}" + printf 'LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD:-1}" + printf 'LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT:-18}" + printf 'LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES:-49152}" printf 'LESAVKA_SERVER_BIND_ADDR=%s\n' "${INSTALL_SERVER_BIND_ADDR}" printf 'LESAVKA_UVC_CODEC=%s\n' "${INSTALL_UVC_CODEC}" printf 'LESAVKA_UVC_WIDTH=%s\n' "${LESAVKA_UVC_WIDTH:-1280}" diff --git a/server/Cargo.toml b/server/Cargo.toml index cae0b2e..5134341 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.44" +version = "0.22.45" edition = "2024" autobins = false diff --git a/server/src/video_sinks/hevc_mjpeg_guard.rs b/server/src/video_sinks/hevc_mjpeg_guard.rs index 9139799..45acacb 100644 --- a/server/src/video_sinks/hevc_mjpeg_guard.rs +++ b/server/src/video_sinks/hevc_mjpeg_guard.rs @@ -8,6 +8,9 @@ const DEFAULT_HEVC_DOMINANT_BYTE_PCT: u32 = 92; const DEFAULT_DIRECT_MJPEG_SIZE_DROP_PCT: u32 = 18; const DEFAULT_DIRECT_MJPEG_MIN_REFERENCE_BYTES: u32 = 48 * 1024; const DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT: bool = false; +const DEFAULT_DIRECT_MJPEG_NORMALIZE: bool = true; +const DEFAULT_DIRECT_MJPEG_JPEG_QUALITY: u32 = 72; +const DEFAULT_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS: u32 = 25; /// Summarizes one compressed MJPEG frame without fully decoding pixels. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -185,6 +188,52 @@ pub(super) fn direct_mjpeg_reject_profile_mismatch_enabled() -> bool { .unwrap_or(DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT) } +/// Decide whether direct MJPEG should be normalized before UVC spool. +/// +/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE`. Output: true unless +/// explicitly disabled. Why: Google Meet/Firefox can expose lower-half grey +/// slabs from otherwise complete camera JPEGs; a local decode/re-encode gives +/// the RCT a simpler, freshly bounded MJPEG bitstream. +pub(super) fn direct_mjpeg_normalize_enabled() -> bool { + std::env::var("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE") + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(DEFAULT_DIRECT_MJPEG_NORMALIZE) +} + +/// Resolve JPEG quality for normalized direct MJPEG frames. +/// +/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY`, clamped to +/// 1..=100. Output: the `jpegenc` quality value. Why: direct MJPEG +/// normalization should reduce browser-facing bitstream complexity without +/// creating a new hidden bandwidth spike. +pub(super) fn direct_mjpeg_jpeg_quality() -> u32 { + env_u32( + "LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", + DEFAULT_DIRECT_MJPEG_JPEG_QUALITY, + ) + .clamp(1, 100) +} + +/// Bound how long direct MJPEG normalization may wait for a fresh sample. +/// +/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS`, +/// clamped to 0..=50. Output: timeout in milliseconds. Why: normalization is +/// safer than raw passthrough, but it must not build a live webcam backlog. +pub(super) fn direct_mjpeg_normalize_pull_timeout_ms() -> u32 { + env_u32( + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS", + DEFAULT_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS, + ) + .min(50) +} + /// Return whether a decoded buffer looks like one complete JPEG image. /// /// Inputs: decoded MJPEG bytes. Output: true when SOI, SOS, and EOI markers @@ -469,6 +518,45 @@ mod tests { }); } + #[test] + fn direct_mjpeg_normalization_defaults_on_and_clamps_tuning() { + temp_env::with_vars( + [ + ("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE", None::<&str>), + ("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", None::<&str>), + ( + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS", + None::<&str>, + ), + ], + || { + assert!(super::direct_mjpeg_normalize_enabled()); + assert_eq!(super::direct_mjpeg_jpeg_quality(), 72); + assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 25); + }, + ); + + temp_env::with_vars( + [ + ("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE", Some("off")), + ("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", Some("101")), + ( + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS", + Some("999"), + ), + ], + || { + assert!(!super::direct_mjpeg_normalize_enabled()); + assert_eq!(super::direct_mjpeg_jpeg_quality(), 100); + assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 50); + }, + ); + + temp_env::with_var("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", Some("0"), || { + assert_eq!(super::direct_mjpeg_jpeg_quality(), 1); + }); + } + #[test] fn freeze_guard_catches_large_decoded_frame_collapses() { temp_env::with_vars( diff --git a/server/src/video_sinks/mjpeg_spool.rs b/server/src/video_sinks/mjpeg_spool.rs index 09d5e5e..762ea8d 100644 --- a/server/src/video_sinks/mjpeg_spool.rs +++ b/server/src/video_sinks/mjpeg_spool.rs @@ -36,6 +36,20 @@ impl MjpegSpoolTiming { } } + /// Build metadata for direct MJPEG after local decode/re-encode. + /// + /// Inputs: the upstream packet PTS in microseconds. Output: timing metadata + /// labeled as normalized MJPEG. Why: browser-visible UVC corruption can + /// happen after a syntactically valid camera JPEG, so probes need to know + /// when the server has intentionally emitted a clean re-encoded frame. + pub(super) fn mjpeg_normalized(source_pts_us: u64) -> Self { + Self { + profile: "mjpeg-normalized", + source_pts_us: Some(source_pts_us), + decoded_pts_us: None, + } + } + /// Build metadata for decoded HEVC entering the MJPEG UVC helper. /// /// Inputs: upstream packet PTS plus the decoded appsink buffer PTS. @@ -542,6 +556,24 @@ mod tests { assert!(record.contains("\"decoded_pts_us\":null")); } + /// Verifies normalized direct-MJPEG handoffs are distinguishable. + /// + /// Input: an upstream MJPEG packet PTS after decode/re-encode. Output: + /// metadata with the normalized profile marker. Why: RCT artifact probes + /// need to separate raw passthrough from the safer browser-facing path. + #[test] + fn mjpeg_normalized_metadata_uses_source_pts_and_profile_marker() { + let record = super::format_mjpeg_spool_metadata( + 9, + 101, + super::MjpegSpoolTiming::mjpeg_normalized(66_000), + ); + + assert!(record.contains("\"profile\":\"mjpeg-normalized\"")); + assert!(record.contains("\"source_pts_us\":66000")); + assert!(record.contains("\"decoded_pts_us\":null")); + } + /// Verifies frame spooling preserves default behavior unless metadata is enabled. /// /// Input: a temporary frame path plus disabled metadata env vars. Output: diff --git a/server/src/video_sinks/webcam_sink.rs b/server/src/video_sinks/webcam_sink.rs index a720cc2..f2b2681 100644 --- a/server/src/video_sinks/webcam_sink.rs +++ b/server/src/video_sinks/webcam_sink.rs @@ -42,6 +42,8 @@ pub struct WebcamSink { next_pts_us: AtomicU64, frame_step_us: u64, mjpeg_spool_path: Option, + direct_mjpeg_appsrc: Option, + normalized_mjpeg_sink: Option, hevc_mjpeg_appsrc: Option, decoded_mjpeg_sink: Option, last_mjpeg_passthrough_bytes: AtomicU64, @@ -50,6 +52,7 @@ pub struct WebcamSink { uvc_height: u16, direct_mjpeg_profile_mismatch_seen: AtomicBool, last_decoded_mjpeg_bytes: AtomicU64, + normalized_mjpeg_miss_count: AtomicU64, decoded_mjpeg_miss_count: AtomicU64, decode_recovery_needs_irap: AtomicBool, #[cfg(not(coverage))] @@ -170,6 +173,119 @@ fn build_hevc_freshness_queue(name: &str) -> anyhow::Result { Ok(queue) } +#[cfg(not(coverage))] +fn direct_mjpeg_normalize_pull_timeout() -> gst::ClockTime { + gst::ClockTime::from_mseconds(u64::from( + hevc_mjpeg_guard::direct_mjpeg_normalize_pull_timeout_ms(), + )) +} + +/// Drain normalized direct-MJPEG output down to the freshest sample. +/// +/// Inputs: the direct-MJPEG normalization appsink. Output: newest available +/// sample, if any. Why: decode/re-encode should sanitize browser-facing MJPEG +/// without letting stale frames accumulate behind the live webcam feed. +#[cfg(not(coverage))] +fn freshest_direct_mjpeg_sample(sink: &gst_app::AppSink) -> Option { + let mut newest = sink.try_pull_sample(direct_mjpeg_normalize_pull_timeout()); + while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) { + newest = Some(sample); + } + newest +} + +#[cfg(not(coverage))] +fn build_direct_mjpeg_normalize_branch( + pipeline: &gst::Pipeline, + width: i32, + height: i32, + fps: i32, +) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> { + let src = gst::ElementFactory::make("appsrc") + .name("direct_mjpeg_normalize_src") + .build()? + .downcast::() + .expect("direct MJPEG normalize appsrc"); + src.set_is_live(true); + src.set_format(gst::Format::Time); + src.set_property("do-timestamp", false); + configure_uvc_appsrc(&src); + let caps_in = gst::Caps::builder("image/jpeg") + .field("framerate", gst::Fraction::new(fps, 1)) + .build(); + src.set_caps(Some(&caps_in)); + + let caps_mjpeg = gst::Caps::builder("image/jpeg") + .field("parsed", true) + .field("width", width) + .field("height", height) + .field("framerate", gst::Fraction::new(fps, 1)) + .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) + .field("colorimetry", "2:4:7:1") + .build(); + let jpegparse = gst::ElementFactory::make("jpegparse").build()?; + let decoder = gst::ElementFactory::make("jpegdec").build()?; + let decoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_decoded_queue")?; + let convert = gst::ElementFactory::make("videoconvert").build()?; + let scale = gst::ElementFactory::make("videoscale").build()?; + let raw_caps = gst::Caps::builder("video/x-raw") + .field("width", width) + .field("height", height) + .field("framerate", gst::Fraction::new(fps, 1)) + .build(); + let raw_capsfilter = gst::ElementFactory::make("capsfilter") + .property("caps", &raw_caps) + .build()?; + let encoder = gst::ElementFactory::make("jpegenc") + .property( + "quality", + hevc_mjpeg_guard::direct_mjpeg_jpeg_quality() as i32, + ) + .build()?; + let encoded_caps = gst::ElementFactory::make("capsfilter") + .property("caps", &caps_mjpeg) + .build()?; + let encoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_encoded_queue")?; + let sink = gst::ElementFactory::make("appsink") + .name("direct_mjpeg_normalize_sink") + .property("sync", false) + .property("enable-last-sample", false) + .property("emit-signals", false) + .property("max-buffers", 1u32) + .property("drop", true) + .build()? + .downcast::() + .expect("direct MJPEG normalize appsink"); + + pipeline.add_many([ + src.upcast_ref(), + &jpegparse, + &decoder, + &decoded_queue, + &convert, + &scale, + &raw_capsfilter, + &encoder, + &encoded_caps, + &encoded_queue, + sink.upcast_ref(), + ])?; + gst::Element::link_many([ + src.upcast_ref(), + &jpegparse, + &decoder, + &decoded_queue, + &convert, + &scale, + &raw_capsfilter, + &encoder, + &encoded_caps, + &encoded_queue, + sink.upcast_ref(), + ])?; + Ok((src, sink)) +} + #[cfg(not(coverage))] fn add_hevc_mjpeg_spool_branch( pipeline: &gst::Pipeline, @@ -381,6 +497,8 @@ impl WebcamSink { next_pts_us: AtomicU64::new(0), frame_step_us, mjpeg_spool_path: None, + direct_mjpeg_appsrc: None, + normalized_mjpeg_sink: None, hevc_mjpeg_appsrc: None, decoded_mjpeg_sink: None, last_mjpeg_passthrough_bytes: AtomicU64::new(0), @@ -389,6 +507,7 @@ impl WebcamSink { uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16, direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false), last_decoded_mjpeg_bytes: AtomicU64::new(0), + normalized_mjpeg_miss_count: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0), decode_recovery_needs_irap: AtomicBool::new(false), }) @@ -420,6 +539,8 @@ impl WebcamSink { } let mut mjpeg_spool_file = None; + let mut direct_mjpeg_appsrc = None; + let mut normalized_mjpeg_sink = None; let mut hevc_mjpeg_appsrc = None; let mut decoded_mjpeg_sink = None; @@ -445,6 +566,27 @@ impl WebcamSink { pipeline.add_many([src.upcast_ref(), &sink])?; gst::Element::link_many([src.upcast_ref(), &sink])?; mjpeg_spool_file = Some(mjpeg_spool_path()); + if hevc_mjpeg_guard::direct_mjpeg_normalize_enabled() { + match build_direct_mjpeg_normalize_branch(&pipeline, width, height, fps) { + Ok((normalize_src, normalize_sink)) => { + direct_mjpeg_appsrc = Some(normalize_src); + normalized_mjpeg_sink = Some(normalize_sink); + tracing::info!( + target: "lesavka_server::video", + quality = hevc_mjpeg_guard::direct_mjpeg_jpeg_quality(), + pull_timeout_ms = hevc_mjpeg_guard::direct_mjpeg_normalize_pull_timeout_ms(), + "📸 direct MJPEG UVC spool will normalize frames through jpegdec/jpegenc" + ); + } + Err(err) => { + tracing::warn!( + target: "lesavka_server::video", + %err, + "📸⚠️ direct MJPEG normalization unavailable; falling back to guarded passthrough" + ); + } + } + } match add_hevc_mjpeg_spool_branch(&pipeline, width, height, fps) { Ok((hevc_src, hevc_sink)) => { hevc_mjpeg_appsrc = Some(hevc_src); @@ -658,6 +800,8 @@ impl WebcamSink { next_pts_us: AtomicU64::new(0), frame_step_us, mjpeg_spool_path: mjpeg_spool_file, + direct_mjpeg_appsrc, + normalized_mjpeg_sink, hevc_mjpeg_appsrc, decoded_mjpeg_sink, last_mjpeg_passthrough_bytes: AtomicU64::new(0), @@ -666,6 +810,7 @@ impl WebcamSink { uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16, direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false), last_decoded_mjpeg_bytes: AtomicU64::new(0), + normalized_mjpeg_miss_count: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0), decode_recovery_needs_irap: AtomicBool::new(false), _bus_watch: bus_watch, @@ -850,6 +995,12 @@ impl WebcamSink { ); return; } + + if self.direct_mjpeg_appsrc.is_some() && self.normalized_mjpeg_sink.is_some() { + self.spool_normalized_direct_mjpeg_frame(path, pkt); + return; + } + let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts); if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) { warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper"); @@ -858,6 +1009,90 @@ impl WebcamSink { .store(pkt.data.len() as u64, std::sync::atomic::Ordering::Relaxed); } } + + #[cfg(not(coverage))] + fn spool_normalized_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { + let Some(src) = self.direct_mjpeg_appsrc.as_ref() else { + return; + }; + let Some(sink) = self.normalized_mjpeg_sink.as_ref() else { + return; + }; + + let mut buf = gst::Buffer::from_slice(pkt.data.clone()); + if let Some(meta) = buf.get_mut() { + let ts = gst::ClockTime::from_useconds(pkt.pts); + meta.set_pts(Some(ts)); + meta.set_dts(Some(ts)); + meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); + } + if let Err(err) = src.push_buffer(buf) { + tracing::warn!( + target:"lesavka_server::video", + %err, + "📸⚠️ direct MJPEG normalization appsrc push failed" + ); + return; + } + + let Some(sample) = freshest_direct_mjpeg_sample(sink) else { + let misses = self + .normalized_mjpeg_miss_count + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + + 1; + if misses == 1 || misses % 30 == 0 { + warn!( + target:"lesavka_server::video", + misses, + "📸⚠️ direct MJPEG normalization produced no fresh frame; freezing last good UVC frame" + ); + } + return; + }; + let Some(buffer) = sample.buffer() else { + return; + }; + let Ok(map) = buffer.map_readable() else { + return; + }; + let normalized = map.as_slice(); + let previous_bytes = self + .last_mjpeg_passthrough_bytes + .load(std::sync::atomic::Ordering::Relaxed); + if let Some(reason) = hevc_mjpeg_guard::direct_mjpeg_reject_reason( + previous_bytes, + Some(self.direct_mjpeg_max_bytes), + Some((self.uvc_width, self.uvc_height)), + normalized, + ) { + let inspection = hevc_mjpeg_guard::inspect_mjpeg_frame(normalized); + warn!( + target:"lesavka_server::video", + ?reason, + previous_bytes, + next_bytes = normalized.len(), + max_bytes = self.direct_mjpeg_max_bytes, + frame_width = ?inspection.width, + frame_height = ?inspection.height, + entropy_bytes = inspection.entropy_bytes, + entropy_distinct_bytes = inspection.entropy_distinct_bytes, + entropy_dominant_pct = inspection.entropy_dominant_pct, + entropy_max_run = inspection.entropy_max_run, + "📸⚠️ freezing suspicious normalized direct MJPEG frame before UVC spool" + ); + return; + } + + let timing = MjpegSpoolTiming::mjpeg_normalized(pkt.pts); + if let Err(err) = spool_mjpeg_frame_with_timing(path, normalized, Some(timing)) { + warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool normalized direct MJPEG frame for UVC helper"); + } else { + self.normalized_mjpeg_miss_count + .store(0, std::sync::atomic::Ordering::Relaxed); + self.last_mjpeg_passthrough_bytes + .store(normalized.len() as u64, std::sync::atomic::Ordering::Relaxed); + } + } } impl Drop for WebcamSink { diff --git a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs index 8a7b369..f58db92 100644 --- a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs +++ b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs @@ -122,6 +122,8 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() { "freshest_mjpeg_sample(sink)", "last_decoded_mjpeg_bytes", "last_mjpeg_passthrough_bytes", + "direct_mjpeg_normalize_src", + "mjpeg_normalized", "should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())", "direct_mjpeg_reject_reason(", "spool_direct_mjpeg_frame", diff --git a/tests/installer/scripts/install/server_install_script_contract.rs b/tests/installer/scripts/install/server_install_script_contract.rs index 6d608ce..3b83759 100644 --- a/tests/installer/scripts/install/server_install_script_contract.rs +++ b/tests/installer/scripts/install/server_install_script_contract.rs @@ -35,6 +35,9 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { "LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS=%s", "LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s", "LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s", + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s", + "LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s", + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s", "LESAVKA_SERVER_BIND_ADDR=%s", "/etc/lesavka/uvc.env", "LESAVKA_UVC_MAXPACKET=", @@ -169,6 +172,9 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS:-20}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-2}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}")); assert!( SERVER_INSTALL.contains("lesavka_server::video=info"), "server installs should not leave the hot webcam frame path at debug logging by default" diff --git a/tests/system/scripts/install/systemd_unit_env_contract.rs b/tests/system/scripts/install/systemd_unit_env_contract.rs index f42ff38..204e344 100644 --- a/tests/system/scripts/install/systemd_unit_env_contract.rs +++ b/tests/system/scripts/install/systemd_unit_env_contract.rs @@ -85,6 +85,7 @@ fn server_env_persists_runtime_profile_and_tls_settings() { "LESAVKA_UPSTREAM_HEVC_VIDEO_PLAYOUT_MODE_OFFSETS_US=%s", "LESAVKA_UPSTREAM_MJPEG_VIDEO_PLAYOUT_MODE_OFFSETS_US=%s", "LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP=%s", + "LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s", "LESAVKA_SERVER_BIND_ADDR=%s", "LESAVKA_REQUIRE_TLS=%s", "LESAVKA_TLS_CLIENT_CA=%s",