media: disable leaking mjpeg normalizer by default

This commit is contained in:
Brad Stein 2026-05-16 11:32:04 -03:00
parent 260cb263f6
commit 980332a5cb
15 changed files with 135 additions and 22 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.22.47"
version = "0.22.48"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.22.47"
version = "0.22.48"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.22.47"
version = "0.22.48"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.22.47"
version = "0.22.48"
edition = "2024"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.22.47"
version = "0.22.48"
edition = "2024"
build = "build.rs"

View File

@ -609,11 +609,12 @@ These entries are intentionally concise because most are manual lab or CI harnes
| `LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS` | client live bundled-upstream startup heal delay; defaults to `3000`ms before issuing the safe audio-epoch recovery |
| `LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS` | server upstream media timing override; bounds live source lead or playout behavior while tuning client-to-server transport |
| `LESAVKA_UVC_CONFIGFS_BASE` | server UVC gadget mode/configfs override used by runtime reconfiguration and hardware-in-the-loop probes |
| `LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY` | server direct-MJPEG normalization JPEG quality; defaults to `72` to reduce browser-facing UVC bitstream pressure |
| `LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY` | server direct-MJPEG normalization JPEG quality; defaults to `72` when the opt-in normalizer is enabled |
| `LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES` | server direct-MJPEG guard baseline; frames smaller than this do not establish the last-good reference |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE` | server direct-MJPEG normalization toggle; defaults on so camera MJPEG is decoded/re-encoded before the UVC helper |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE` | server direct-MJPEG normalization toggle; defaults off because the native GStreamer decode/re-encode branch can retain RSS during long calls |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT` | server direct-MJPEG normalization recovery threshold; after this many consecutive empty pulls, the session falls back to guarded passthrough |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS` | server direct-MJPEG normalization appsink timeout; defaults to `25`ms and is capped at `50`ms to avoid live backlog |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB` | server direct-MJPEG normalization RSS safety ceiling; defaults to `768`, and `0` disables this opt-in branch guard |
| `LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT` | server direct-MJPEG corruption guard threshold; frames below this percentage of the last good reference are frozen out |
| `LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD` | server direct-MJPEG corruption guard toggle; defaults on so obvious collapsed or flat payloads freeze the last good frame |
| `LESAVKA_UVC_HEVC_DOMINANT_BYTE_PCT` | server HEVC-to-MJPEG corruption guard threshold; flat decoded MJPEG payloads with one byte at or above this percentage are frozen out, default `92` |

View File

@ -1609,10 +1609,11 @@ SERVER_ENV_TMP=$(mktemp)
printf 'LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS:-20}"
printf 'LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s\n' "${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-2}"
printf 'LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s\n' "${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-0}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT:-30}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB:-768}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD:-1}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT:-18}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES:-49152}"

View File

@ -16,7 +16,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.22.47"
version = "0.22.48"
edition = "2024"
autobins = false

View File

@ -116,6 +116,36 @@ impl CameraRuntime {
self.generation.load(Ordering::Relaxed) == session_id
}
/// Release the active relay when the owning stream has ended.
///
/// Inputs: the camera session id returned by `activate`.
/// Outputs: true only when that session was still current and was
/// superseded. Why: keeping a completed UVC session's GStreamer pipeline
/// alive preserves native buffers and makes allocator retention look like
/// a server leak after the client UI disconnects.
pub async fn release_if_active(&self, session_id: u64) -> bool {
if self
.generation
.compare_exchange(
session_id,
session_id.saturating_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
return false;
}
let mut slot = self.slot.lock().await;
let released = slot.take().is_some();
info!(
session_id,
released, "🎥 camera relay released after stream lifecycle ended"
);
true
}
/// Supersede the active camera stream and drop the userspace relay sink.
///
/// Inputs: none.

View File

@ -262,6 +262,7 @@ impl Handler {
relay.feed(pkt); // ← all logging inside video.rs
upstream_media_rt.mark_video_presented(presented_pts, plan.due_at);
}
camera_rt.release_if_active(camera_session_id).await;
tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(())
});

View File

@ -377,6 +377,7 @@ impl Handler {
}
upstream_media_rt.close_camera(camera_lease.generation);
upstream_media_rt.close_microphone(microphone_lease.generation);
camera_rt.release_if_active(camera_session_id).await;
info!(
rpc_id,
session_id = camera_lease.session_id,

View File

@ -8,10 +8,11 @@ const DEFAULT_HEVC_DOMINANT_BYTE_PCT: u32 = 92;
const DEFAULT_DIRECT_MJPEG_SIZE_DROP_PCT: u32 = 18;
const DEFAULT_DIRECT_MJPEG_MIN_REFERENCE_BYTES: u32 = 48 * 1024;
const DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT: bool = false;
const DEFAULT_DIRECT_MJPEG_NORMALIZE: bool = true;
const DEFAULT_DIRECT_MJPEG_NORMALIZE: bool = false;
const DEFAULT_DIRECT_MJPEG_JPEG_QUALITY: u32 = 72;
const DEFAULT_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS: u32 = 25;
const DEFAULT_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT: u32 = 30;
const DEFAULT_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB: u32 = 768;
/// Summarizes one compressed MJPEG frame without fully decoding pixels.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
@ -192,18 +193,19 @@ pub(super) fn direct_mjpeg_reject_profile_mismatch_enabled() -> bool {
/// Decide whether direct MJPEG should be normalized before UVC spool.
///
/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE`. Output: true unless
/// explicitly disabled. Why: Google Meet/Firefox can expose lower-half grey
/// slabs from otherwise complete camera JPEGs; a local decode/re-encode gives
/// the RCT a simpler, freshly bounded MJPEG bitstream.
/// explicitly enabled. Why: this path runs native GStreamer JPEG decode/encode
/// inside `lesavka-server`; it is useful for lab comparisons, but field
/// evidence showed long-running RSS growth from the native allocator/buffer
/// pools, so guarded passthrough is the production-safe default.
pub(super) fn direct_mjpeg_normalize_enabled() -> bool {
std::env::var("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
trimmed.eq_ignore_ascii_case("1")
|| trimmed.eq_ignore_ascii_case("true")
|| trimmed.eq_ignore_ascii_case("yes")
|| trimmed.eq_ignore_ascii_case("on")
})
.unwrap_or(DEFAULT_DIRECT_MJPEG_NORMALIZE)
}
@ -249,6 +251,21 @@ pub(super) fn direct_mjpeg_normalize_miss_limit() -> u32 {
.clamp(1, 300)
}
/// Resolve the RSS ceiling for the optional direct-MJPEG normalizer.
///
/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB`.
/// Output: `None` when set to zero, otherwise a kilobyte ceiling. Why: if an
/// operator enables the native JPEG normalizer for lab diagnostics, the server
/// should still self-disable that branch before allocator retention threatens
/// the Pi.
pub(super) fn direct_mjpeg_normalize_rss_limit_kb() -> Option<u64> {
let mb = env_u32(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB",
DEFAULT_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB,
);
(mb > 0).then_some(u64::from(mb) * 1024)
}
/// Return whether a decoded buffer looks like one complete JPEG image.
///
/// Inputs: decoded MJPEG bytes. Output: true when SOI, SOS, and EOI markers
@ -534,7 +551,7 @@ mod tests {
}
#[test]
fn direct_mjpeg_normalization_defaults_on_and_clamps_tuning() {
fn direct_mjpeg_normalization_defaults_off_and_clamps_tuning() {
temp_env::with_vars(
[
("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE", None::<&str>),
@ -547,18 +564,26 @@ mod tests {
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT",
None::<&str>,
),
(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB",
None::<&str>,
),
],
|| {
assert!(super::direct_mjpeg_normalize_enabled());
assert!(!super::direct_mjpeg_normalize_enabled());
assert_eq!(super::direct_mjpeg_jpeg_quality(), 72);
assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 25);
assert_eq!(super::direct_mjpeg_normalize_miss_limit(), 30);
assert_eq!(
super::direct_mjpeg_normalize_rss_limit_kb(),
Some(768 * 1024)
);
},
);
temp_env::with_vars(
[
("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE", Some("off")),
("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE", Some("on")),
("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", Some("101")),
(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS",
@ -568,12 +593,17 @@ mod tests {
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT",
Some("999"),
),
(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB",
Some("0"),
),
],
|| {
assert!(!super::direct_mjpeg_normalize_enabled());
assert!(super::direct_mjpeg_normalize_enabled());
assert_eq!(super::direct_mjpeg_jpeg_quality(), 100);
assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 50);
assert_eq!(super::direct_mjpeg_normalize_miss_limit(), 300);
assert_eq!(super::direct_mjpeg_normalize_rss_limit_kb(), None);
},
);

View File

@ -54,6 +54,7 @@ pub struct WebcamSink {
last_decoded_mjpeg_bytes: AtomicU64,
direct_mjpeg_normalize_bypassed: AtomicBool,
normalized_mjpeg_miss_count: AtomicU64,
normalized_mjpeg_memory_check_count: AtomicU64,
decoded_mjpeg_miss_count: AtomicU64,
decode_recovery_needs_irap: AtomicBool,
#[cfg(not(coverage))]
@ -181,6 +182,15 @@ fn direct_mjpeg_normalize_pull_timeout() -> gst::ClockTime {
))
}
#[cfg(not(coverage))]
fn current_process_rss_kb() -> Option<u64> {
let status = fs::read_to_string("/proc/self/status").ok()?;
status.lines().find_map(|line| {
let rest = line.strip_prefix("VmRSS:")?;
rest.split_whitespace().next()?.parse::<u64>().ok()
})
}
/// Drain normalized direct-MJPEG output down to the freshest sample.
///
/// Inputs: the direct-MJPEG normalization appsink. Output: newest available
@ -503,6 +513,7 @@ impl WebcamSink {
last_decoded_mjpeg_bytes: AtomicU64::new(0),
direct_mjpeg_normalize_bypassed: AtomicBool::new(false),
normalized_mjpeg_miss_count: AtomicU64::new(0),
normalized_mjpeg_memory_check_count: AtomicU64::new(0),
decoded_mjpeg_miss_count: AtomicU64::new(0),
decode_recovery_needs_irap: AtomicBool::new(false),
})
@ -807,6 +818,7 @@ impl WebcamSink {
last_decoded_mjpeg_bytes: AtomicU64::new(0),
direct_mjpeg_normalize_bypassed: AtomicBool::new(false),
normalized_mjpeg_miss_count: AtomicU64::new(0),
normalized_mjpeg_memory_check_count: AtomicU64::new(0),
decoded_mjpeg_miss_count: AtomicU64::new(0),
decode_recovery_needs_irap: AtomicBool::new(false),
_bus_watch: bus_watch,
@ -1027,6 +1039,26 @@ impl WebcamSink {
return;
};
if self
.normalized_mjpeg_memory_check_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
.is_multiple_of(150)
&& let Some(limit_kb) = hevc_mjpeg_guard::direct_mjpeg_normalize_rss_limit_kb()
&& let Some(rss_kb) = current_process_rss_kb()
&& rss_kb > limit_kb
{
self.direct_mjpeg_normalize_bypassed
.store(true, std::sync::atomic::Ordering::Relaxed);
warn!(
target:"lesavka_server::video",
rss_kb,
limit_kb,
"📸⚠️ direct MJPEG normalization disabled because server RSS exceeded its safety limit"
);
self.spool_passthrough_direct_mjpeg_frame(path, pkt);
return;
}
let buf = gst::Buffer::from_slice(pkt.data.clone());
if let Err(err) = src.push_buffer(buf) {
tracing::warn!(

View File

@ -125,6 +125,8 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() {
"direct_mjpeg_normalize_src",
"direct_mjpeg_normalize_bypassed",
"mjpeg_normalized",
"current_process_rss_kb",
"direct_mjpeg_normalize_rss_limit_kb",
"should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())",
"direct_mjpeg_reject_reason(",
"spool_direct_mjpeg_frame",

View File

@ -21,6 +21,18 @@ fn default_runtime_starts_without_an_active_generation() {
assert!(!runtime.is_active(1));
}
#[test]
fn release_if_active_supersedes_only_the_current_generation() {
let runtime = CameraRuntime::new();
let rt = Runtime::new().expect("runtime");
assert!(rt.block_on(runtime.release_if_active(0)));
assert!(!runtime.is_active(0));
assert!(runtime.is_active(1));
assert!(!rt.block_on(runtime.release_if_active(0)));
assert!(runtime.is_active(1));
}
#[test]
#[serial]
fn activate_rejects_uvc_when_disabled_and_bumps_generation() {

View File

@ -39,6 +39,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
"LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB=%s",
"LESAVKA_SERVER_BIND_ADDR=%s",
"/etc/lesavka/uvc.env",
"LESAVKA_UVC_MAXPACKET=",
@ -173,10 +174,11 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS:-20}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-2}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-0}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT:-30}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB:-768}"));
assert!(
SERVER_INSTALL.contains("lesavka_server::video=info"),
"server installs should not leave the hot webcam frame path at debug logging by default"

View File

@ -87,6 +87,7 @@ fn server_env_persists_runtime_profile_and_tls_settings() {
"LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_MISS_LIMIT=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_RSS_LIMIT_MB=%s",
"LESAVKA_SERVER_BIND_ADDR=%s",
"LESAVKA_REQUIRE_TLS=%s",
"LESAVKA_TLS_CLIENT_CA=%s",