media: fall back when mjpeg normalization starves

This commit is contained in:
Brad Stein 2026-05-16 04:52:15 -03:00
parent cd6241dbfa
commit 46538c4f44
11 changed files with 99 additions and 26 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.22.45"
version = "0.22.46"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.22.45"
version = "0.22.46"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.22.45"
version = "0.22.46"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.22.45"
version = "0.22.46"
edition = "2024"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.22.45"
version = "0.22.46"
edition = "2024"
build = "build.rs"

View File

@ -612,6 +612,7 @@ These entries are intentionally concise because most are manual lab or CI harnes
| `LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY` | server direct-MJPEG normalization JPEG quality; defaults to `72` to reduce browser-facing UVC bitstream pressure |
| `LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES` | server direct-MJPEG guard baseline; frames smaller than this do not establish the last-good reference |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE` | server direct-MJPEG normalization toggle; defaults on so camera MJPEG is decoded/re-encoded before the UVC helper |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT` | server direct-MJPEG normalization recovery threshold; after this many consecutive empty pulls, the session falls back to guarded passthrough |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS` | server direct-MJPEG normalization appsink timeout; defaults to `25`ms and is capped at `50`ms to avoid live backlog |
| `LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT` | server direct-MJPEG corruption guard threshold; frames below this percentage of the last good reference are frozen out |
| `LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD` | server direct-MJPEG corruption guard toggle; defaults on so obvious collapsed or flat payloads freeze the last good frame |

View File

@ -1611,6 +1611,7 @@ SERVER_ENV_TMP=$(mktemp)
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT:-30}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD:-1}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT:-18}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES:-49152}"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.22.45"
version = "0.22.46"
edition = "2024"
autobins = false

View File

@ -11,6 +11,7 @@ const DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT: bool = false;
const DEFAULT_DIRECT_MJPEG_NORMALIZE: bool = true;
const DEFAULT_DIRECT_MJPEG_JPEG_QUALITY: u32 = 72;
const DEFAULT_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS: u32 = 25;
const DEFAULT_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT: u32 = 30;
/// Summarizes one compressed MJPEG frame without fully decoding pixels.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
@ -234,6 +235,20 @@ pub(super) fn direct_mjpeg_normalize_pull_timeout_ms() -> u32 {
.min(50)
}
/// Bound how many consecutive normalization misses are allowed before bypass.
///
/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT`, clamped
/// to 1..=300. Output: miss count. Why: the direct-MJPEG normalizer is an
/// optional sanitizer, not a reason to freeze the conference camera forever if
/// GStreamer negotiation starves on one host.
pub(super) fn direct_mjpeg_normalize_miss_limit() -> u32 {
env_u32(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT",
DEFAULT_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT,
)
.clamp(1, 300)
}
/// Return whether a decoded buffer looks like one complete JPEG image.
///
/// Inputs: decoded MJPEG bytes. Output: true when SOI, SOS, and EOI markers
@ -528,11 +543,16 @@ mod tests {
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS",
None::<&str>,
),
(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT",
None::<&str>,
),
],
|| {
assert!(super::direct_mjpeg_normalize_enabled());
assert_eq!(super::direct_mjpeg_jpeg_quality(), 72);
assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 25);
assert_eq!(super::direct_mjpeg_normalize_miss_limit(), 30);
},
);
@ -544,17 +564,30 @@ mod tests {
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS",
Some("999"),
),
(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT",
Some("999"),
),
],
|| {
assert!(!super::direct_mjpeg_normalize_enabled());
assert_eq!(super::direct_mjpeg_jpeg_quality(), 100);
assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 50);
assert_eq!(super::direct_mjpeg_normalize_miss_limit(), 300);
},
);
temp_env::with_var("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", Some("0"), || {
assert_eq!(super::direct_mjpeg_jpeg_quality(), 1);
});
temp_env::with_var(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT",
Some("0"),
|| {
assert_eq!(super::direct_mjpeg_normalize_miss_limit(), 1);
},
);
}
#[test]

View File

@ -52,6 +52,7 @@ pub struct WebcamSink {
uvc_height: u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool,
last_decoded_mjpeg_bytes: AtomicU64,
direct_mjpeg_normalize_bypassed: AtomicBool,
normalized_mjpeg_miss_count: AtomicU64,
decoded_mjpeg_miss_count: AtomicU64,
decode_recovery_needs_irap: AtomicBool,
@ -199,7 +200,7 @@ fn build_direct_mjpeg_normalize_branch(
pipeline: &gst::Pipeline,
width: i32,
height: i32,
fps: i32,
_fps: i32,
) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> {
let src = gst::ElementFactory::make("appsrc")
.name("direct_mjpeg_normalize_src")
@ -208,22 +209,18 @@ fn build_direct_mjpeg_normalize_branch(
.expect("direct MJPEG normalize appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", false);
src.set_property("do-timestamp", true);
configure_uvc_appsrc(&src);
let caps_in = gst::Caps::builder("image/jpeg")
.field("framerate", gst::Fraction::new(fps, 1))
.build();
let caps_in = gst::Caps::builder("image/jpeg").build();
src.set_caps(Some(&caps_in));
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
let jpegparse = gst::ElementFactory::make("jpegparse").build()?;
let decoder = gst::ElementFactory::make("jpegdec").build()?;
let decoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_decoded_queue")?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
@ -231,7 +228,6 @@ fn build_direct_mjpeg_normalize_branch(
let raw_caps = gst::Caps::builder("video/x-raw")
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
let raw_capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
@ -259,7 +255,6 @@ fn build_direct_mjpeg_normalize_branch(
pipeline.add_many([
src.upcast_ref(),
&jpegparse,
&decoder,
&decoded_queue,
&convert,
@ -272,7 +267,6 @@ fn build_direct_mjpeg_normalize_branch(
])?;
gst::Element::link_many([
src.upcast_ref(),
&jpegparse,
&decoder,
&decoded_queue,
&convert,
@ -507,6 +501,7 @@ impl WebcamSink {
uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false),
last_decoded_mjpeg_bytes: AtomicU64::new(0),
direct_mjpeg_normalize_bypassed: AtomicBool::new(false),
normalized_mjpeg_miss_count: AtomicU64::new(0),
decoded_mjpeg_miss_count: AtomicU64::new(0),
decode_recovery_needs_irap: AtomicBool::new(false),
@ -810,6 +805,7 @@ impl WebcamSink {
uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false),
last_decoded_mjpeg_bytes: AtomicU64::new(0),
direct_mjpeg_normalize_bypassed: AtomicBool::new(false),
normalized_mjpeg_miss_count: AtomicU64::new(0),
decoded_mjpeg_miss_count: AtomicU64::new(0),
decode_recovery_needs_irap: AtomicBool::new(false),
@ -996,11 +992,21 @@ impl WebcamSink {
return;
}
if self.direct_mjpeg_appsrc.is_some() && self.normalized_mjpeg_sink.is_some() {
if self.direct_mjpeg_appsrc.is_some()
&& self.normalized_mjpeg_sink.is_some()
&& !self
.direct_mjpeg_normalize_bypassed
.load(std::sync::atomic::Ordering::Relaxed)
{
self.spool_normalized_direct_mjpeg_frame(path, pkt);
return;
}
self.spool_passthrough_direct_mjpeg_frame(path, pkt);
}
#[cfg(not(coverage))]
fn spool_passthrough_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) {
let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts);
if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) {
warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper");
@ -1013,25 +1019,24 @@ impl WebcamSink {
#[cfg(not(coverage))]
fn spool_normalized_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) {
let Some(src) = self.direct_mjpeg_appsrc.as_ref() else {
self.spool_passthrough_direct_mjpeg_frame(path, pkt);
return;
};
let Some(sink) = self.normalized_mjpeg_sink.as_ref() else {
self.spool_passthrough_direct_mjpeg_frame(path, pkt);
return;
};
let mut buf = gst::Buffer::from_slice(pkt.data.clone());
if let Some(meta) = buf.get_mut() {
let ts = gst::ClockTime::from_useconds(pkt.pts);
meta.set_pts(Some(ts));
meta.set_dts(Some(ts));
meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us)));
}
let buf = gst::Buffer::from_slice(pkt.data.clone());
if let Err(err) = src.push_buffer(buf) {
tracing::warn!(
target:"lesavka_server::video",
%err,
"📸⚠️ direct MJPEG normalization appsrc push failed"
"📸⚠️ direct MJPEG normalization appsrc push failed; falling back to guarded passthrough"
);
self.direct_mjpeg_normalize_bypassed
.store(true, std::sync::atomic::Ordering::Relaxed);
self.spool_passthrough_direct_mjpeg_frame(path, pkt);
return;
}
@ -1040,19 +1045,46 @@ impl WebcamSink {
.normalized_mjpeg_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
let limit = u64::from(hevc_mjpeg_guard::direct_mjpeg_normalize_miss_limit());
if misses == 1 || misses % 30 == 0 {
warn!(
target:"lesavka_server::video",
misses,
"📸⚠️ direct MJPEG normalization produced no fresh frame; freezing last good UVC frame"
limit,
"📸⚠️ direct MJPEG normalization produced no fresh frame; freezing last good UVC frame until fallback threshold"
);
}
if misses >= limit {
self.direct_mjpeg_normalize_bypassed
.store(true, std::sync::atomic::Ordering::Relaxed);
warn!(
target:"lesavka_server::video",
misses,
limit,
"📸⚠️ direct MJPEG normalization starved; falling back to guarded passthrough for this webcam session"
);
self.spool_passthrough_direct_mjpeg_frame(path, pkt);
}
return;
};
let Some(buffer) = sample.buffer() else {
self.direct_mjpeg_normalize_bypassed
.store(true, std::sync::atomic::Ordering::Relaxed);
warn!(
target:"lesavka_server::video",
"📸⚠️ direct MJPEG normalization returned an empty sample; falling back to guarded passthrough"
);
self.spool_passthrough_direct_mjpeg_frame(path, pkt);
return;
};
let Ok(map) = buffer.map_readable() else {
self.direct_mjpeg_normalize_bypassed
.store(true, std::sync::atomic::Ordering::Relaxed);
warn!(
target:"lesavka_server::video",
"📸⚠️ direct MJPEG normalization returned an unreadable sample; falling back to guarded passthrough"
);
self.spool_passthrough_direct_mjpeg_frame(path, pkt);
return;
};
let normalized = map.as_slice();

View File

@ -123,12 +123,15 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() {
"last_decoded_mjpeg_bytes",
"last_mjpeg_passthrough_bytes",
"direct_mjpeg_normalize_src",
"direct_mjpeg_normalize_bypassed",
"mjpeg_normalized",
"should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())",
"direct_mjpeg_reject_reason(",
"spool_direct_mjpeg_frame",
"spool_passthrough_direct_mjpeg_frame",
"freezing suspicious decoded HEVC->MJPEG frame",
"freezing suspicious direct MJPEG frame before UVC spool",
"direct MJPEG normalization starved; falling back to guarded passthrough",
] {
assert!(
WEBCAM_SINK.contains(marker),

View File

@ -38,6 +38,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s",
"LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s",
"LESAVKA_SERVER_BIND_ADDR=%s",
"/etc/lesavka/uvc.env",
"LESAVKA_UVC_MAXPACKET=",
@ -175,6 +176,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT:-30}"));
assert!(
SERVER_INSTALL.contains("lesavka_server::video=info"),
"server installs should not leave the hot webcam frame path at debug logging by default"

View File

@ -86,6 +86,7 @@ fn server_env_persists_runtime_profile_and_tls_settings() {
"LESAVKA_UPSTREAM_MJPEG_VIDEO_PLAYOUT_MODE_OFFSETS_US=%s",
"LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s",
"LESAVKA_SERVER_BIND_ADDR=%s",
"LESAVKA_REQUIRE_TLS=%s",
"LESAVKA_TLS_CLIENT_CA=%s",