media: normalize direct mjpeg before uvc spool

This commit is contained in:
Brad Stein 2026-05-16 02:53:49 -03:00
parent 51b8ffe39a
commit cd6241dbfa
12 changed files with 382 additions and 6 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.22.44" version = "0.22.45"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.22.44" version = "0.22.45"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.22.44" version = "0.22.45"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.22.44" version = "0.22.45"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.22.44" version = "0.22.45"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -609,6 +609,12 @@ These entries are intentionally concise because most are manual lab or CI harnes
| `LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS` | client live bundled-upstream startup heal delay; defaults to `3000`ms before issuing the safe audio-epoch recovery | | `LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS` | client live bundled-upstream startup heal delay; defaults to `3000`ms before issuing the safe audio-epoch recovery |
| `LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS` | server upstream media timing override; bounds live source lead or playout behavior while tuning client-to-server transport | | `LESAVKA_UPSTREAM_SOURCE_LEAD_CAP_MS` | server upstream media timing override; bounds live source lead or playout behavior while tuning client-to-server transport |
| `LESAVKA_UVC_CONFIGFS_BASE` | server UVC gadget mode/configfs override used by runtime reconfiguration and hardware-in-the-loop probes | | `LESAVKA_UVC_CONFIGFS_BASE` | server UVC gadget mode/configfs override used by runtime reconfiguration and hardware-in-the-loop probes |
| `LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY` | server direct-MJPEG normalization JPEG quality; defaults to `72` to reduce browser-facing UVC bitstream pressure |
| `LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES` | server direct-MJPEG guard baseline; frames smaller than this do not establish the last-good reference |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE` | server direct-MJPEG normalization toggle; defaults on so camera MJPEG is decoded/re-encoded before the UVC helper |
| `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS` | server direct-MJPEG normalization appsink timeout; defaults to `25`ms and is capped at `50`ms to avoid live backlog |
| `LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT` | server direct-MJPEG corruption guard threshold; frames below this percentage of the last good reference are frozen out |
| `LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD` | server direct-MJPEG corruption guard toggle; defaults on so obvious collapsed or flat payloads freeze the last good frame |
| `LESAVKA_UVC_HEVC_DOMINANT_BYTE_PCT` | server HEVC-to-MJPEG corruption guard threshold; flat decoded MJPEG payloads with one byte at or above this percentage are frozen out, default `92` | | `LESAVKA_UVC_HEVC_DOMINANT_BYTE_PCT` | server HEVC-to-MJPEG corruption guard threshold; flat decoded MJPEG payloads with one byte at or above this percentage are frozen out, default `92` |
| `LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP` | server HEVC-to-MJPEG corruption guard toggle; defaults on so suspicious decoded frame collapses freeze the last good MJPEG frame | | `LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP` | server HEVC-to-MJPEG corruption guard toggle; defaults on so suspicious decoded frame collapses freeze the last good MJPEG frame |
| `LESAVKA_UVC_HEVC_JPEG_QUALITY` | server HEVC-to-MJPEG UVC bridge JPEG quality; defaults to `72` to lower UVC payload pressure while keeping RCT output compatible | | `LESAVKA_UVC_HEVC_JPEG_QUALITY` | server HEVC-to-MJPEG UVC bridge JPEG quality; defaults to `72` to lower UVC payload pressure while keeping RCT output compatible |

View File

@ -1608,6 +1608,12 @@ SERVER_ENV_TMP=$(mktemp)
printf 'LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS:-20}" printf 'LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS:-20}"
printf 'LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s\n' "${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-2}" printf 'LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s\n' "${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-2}"
printf 'LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s\n' "${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}" printf 'LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s\n' "${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD:-1}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT:-18}"
printf 'LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES=%s\n' "${LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES:-49152}"
printf 'LESAVKA_SERVER_BIND_ADDR=%s\n' "${INSTALL_SERVER_BIND_ADDR}" printf 'LESAVKA_SERVER_BIND_ADDR=%s\n' "${INSTALL_SERVER_BIND_ADDR}"
printf 'LESAVKA_UVC_CODEC=%s\n' "${INSTALL_UVC_CODEC}" printf 'LESAVKA_UVC_CODEC=%s\n' "${INSTALL_UVC_CODEC}"
printf 'LESAVKA_UVC_WIDTH=%s\n' "${LESAVKA_UVC_WIDTH:-1280}" printf 'LESAVKA_UVC_WIDTH=%s\n' "${LESAVKA_UVC_WIDTH:-1280}"

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.22.44" version = "0.22.45"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -8,6 +8,9 @@ const DEFAULT_HEVC_DOMINANT_BYTE_PCT: u32 = 92;
const DEFAULT_DIRECT_MJPEG_SIZE_DROP_PCT: u32 = 18; const DEFAULT_DIRECT_MJPEG_SIZE_DROP_PCT: u32 = 18;
const DEFAULT_DIRECT_MJPEG_MIN_REFERENCE_BYTES: u32 = 48 * 1024; const DEFAULT_DIRECT_MJPEG_MIN_REFERENCE_BYTES: u32 = 48 * 1024;
const DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT: bool = false; const DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT: bool = false;
const DEFAULT_DIRECT_MJPEG_NORMALIZE: bool = true;
const DEFAULT_DIRECT_MJPEG_JPEG_QUALITY: u32 = 72;
const DEFAULT_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS: u32 = 25;
/// Summarizes one compressed MJPEG frame without fully decoding pixels. /// Summarizes one compressed MJPEG frame without fully decoding pixels.
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
@ -185,6 +188,52 @@ pub(super) fn direct_mjpeg_reject_profile_mismatch_enabled() -> bool {
.unwrap_or(DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT) .unwrap_or(DEFAULT_DIRECT_MJPEG_PROFILE_MISMATCH_REJECT)
} }
/// Decide whether direct MJPEG should be normalized before UVC spool.
///
/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE`. Output: true unless
/// explicitly disabled. Why: Google Meet/Firefox can expose lower-half grey
/// slabs from otherwise complete camera JPEGs; a local decode/re-encode gives
/// the RCT a simpler, freshly bounded MJPEG bitstream.
pub(super) fn direct_mjpeg_normalize_enabled() -> bool {
std::env::var("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(DEFAULT_DIRECT_MJPEG_NORMALIZE)
}
/// Resolve JPEG quality for normalized direct MJPEG frames.
///
/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY`, clamped to
/// 1..=100. Output: the `jpegenc` quality value. Why: direct MJPEG
/// normalization should reduce browser-facing bitstream complexity without
/// creating a new hidden bandwidth spike.
pub(super) fn direct_mjpeg_jpeg_quality() -> u32 {
env_u32(
"LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY",
DEFAULT_DIRECT_MJPEG_JPEG_QUALITY,
)
.clamp(1, 100)
}
/// Bound how long direct MJPEG normalization may wait for a fresh sample.
///
/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS`,
/// clamped to 0..=50. Output: timeout in milliseconds. Why: normalization is
/// safer than raw passthrough, but it must not build a live webcam backlog.
pub(super) fn direct_mjpeg_normalize_pull_timeout_ms() -> u32 {
env_u32(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS",
DEFAULT_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS,
)
.min(50)
}
/// Return whether a decoded buffer looks like one complete JPEG image. /// Return whether a decoded buffer looks like one complete JPEG image.
/// ///
/// Inputs: decoded MJPEG bytes. Output: true when SOI, SOS, and EOI markers /// Inputs: decoded MJPEG bytes. Output: true when SOI, SOS, and EOI markers
@ -469,6 +518,45 @@ mod tests {
}); });
} }
#[test]
fn direct_mjpeg_normalization_defaults_on_and_clamps_tuning() {
temp_env::with_vars(
[
("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE", None::<&str>),
("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", None::<&str>),
(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS",
None::<&str>,
),
],
|| {
assert!(super::direct_mjpeg_normalize_enabled());
assert_eq!(super::direct_mjpeg_jpeg_quality(), 72);
assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 25);
},
);
temp_env::with_vars(
[
("LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE", Some("off")),
("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", Some("101")),
(
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS",
Some("999"),
),
],
|| {
assert!(!super::direct_mjpeg_normalize_enabled());
assert_eq!(super::direct_mjpeg_jpeg_quality(), 100);
assert_eq!(super::direct_mjpeg_normalize_pull_timeout_ms(), 50);
},
);
temp_env::with_var("LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY", Some("0"), || {
assert_eq!(super::direct_mjpeg_jpeg_quality(), 1);
});
}
#[test] #[test]
fn freeze_guard_catches_large_decoded_frame_collapses() { fn freeze_guard_catches_large_decoded_frame_collapses() {
temp_env::with_vars( temp_env::with_vars(

View File

@ -36,6 +36,20 @@ impl MjpegSpoolTiming {
} }
} }
/// Build metadata for direct MJPEG after local decode/re-encode.
///
/// Inputs: the upstream packet PTS in microseconds. Output: timing metadata
/// labeled as normalized MJPEG. Why: browser-visible UVC corruption can
/// happen after a syntactically valid camera JPEG, so probes need to know
/// when the server has intentionally emitted a clean re-encoded frame.
pub(super) fn mjpeg_normalized(source_pts_us: u64) -> Self {
Self {
profile: "mjpeg-normalized",
source_pts_us: Some(source_pts_us),
decoded_pts_us: None,
}
}
/// Build metadata for decoded HEVC entering the MJPEG UVC helper. /// Build metadata for decoded HEVC entering the MJPEG UVC helper.
/// ///
/// Inputs: upstream packet PTS plus the decoded appsink buffer PTS. /// Inputs: upstream packet PTS plus the decoded appsink buffer PTS.
@ -542,6 +556,24 @@ mod tests {
assert!(record.contains("\"decoded_pts_us\":null")); assert!(record.contains("\"decoded_pts_us\":null"));
} }
/// Verifies normalized direct-MJPEG handoffs are distinguishable.
///
/// Input: an upstream MJPEG packet PTS after decode/re-encode. Output:
/// metadata with the normalized profile marker. Why: RCT artifact probes
/// need to separate raw passthrough from the safer browser-facing path.
#[test]
fn mjpeg_normalized_metadata_uses_source_pts_and_profile_marker() {
let record = super::format_mjpeg_spool_metadata(
9,
101,
super::MjpegSpoolTiming::mjpeg_normalized(66_000),
);
assert!(record.contains("\"profile\":\"mjpeg-normalized\""));
assert!(record.contains("\"source_pts_us\":66000"));
assert!(record.contains("\"decoded_pts_us\":null"));
}
/// Verifies frame spooling preserves default behavior unless metadata is enabled. /// Verifies frame spooling preserves default behavior unless metadata is enabled.
/// ///
/// Input: a temporary frame path plus disabled metadata env vars. Output: /// Input: a temporary frame path plus disabled metadata env vars. Output:

View File

@ -42,6 +42,8 @@ pub struct WebcamSink {
next_pts_us: AtomicU64, next_pts_us: AtomicU64,
frame_step_us: u64, frame_step_us: u64,
mjpeg_spool_path: Option<PathBuf>, mjpeg_spool_path: Option<PathBuf>,
direct_mjpeg_appsrc: Option<gst_app::AppSrc>,
normalized_mjpeg_sink: Option<gst_app::AppSink>,
hevc_mjpeg_appsrc: Option<gst_app::AppSrc>, hevc_mjpeg_appsrc: Option<gst_app::AppSrc>,
decoded_mjpeg_sink: Option<gst_app::AppSink>, decoded_mjpeg_sink: Option<gst_app::AppSink>,
last_mjpeg_passthrough_bytes: AtomicU64, last_mjpeg_passthrough_bytes: AtomicU64,
@ -50,6 +52,7 @@ pub struct WebcamSink {
uvc_height: u16, uvc_height: u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool, direct_mjpeg_profile_mismatch_seen: AtomicBool,
last_decoded_mjpeg_bytes: AtomicU64, last_decoded_mjpeg_bytes: AtomicU64,
normalized_mjpeg_miss_count: AtomicU64,
decoded_mjpeg_miss_count: AtomicU64, decoded_mjpeg_miss_count: AtomicU64,
decode_recovery_needs_irap: AtomicBool, decode_recovery_needs_irap: AtomicBool,
#[cfg(not(coverage))] #[cfg(not(coverage))]
@ -170,6 +173,119 @@ fn build_hevc_freshness_queue(name: &str) -> anyhow::Result<gst::Element> {
Ok(queue) Ok(queue)
} }
#[cfg(not(coverage))]
fn direct_mjpeg_normalize_pull_timeout() -> gst::ClockTime {
gst::ClockTime::from_mseconds(u64::from(
hevc_mjpeg_guard::direct_mjpeg_normalize_pull_timeout_ms(),
))
}
/// Drain normalized direct-MJPEG output down to the freshest sample.
///
/// Inputs: the direct-MJPEG normalization appsink. Output: newest available
/// sample, if any. Why: decode/re-encode should sanitize browser-facing MJPEG
/// without letting stale frames accumulate behind the live webcam feed.
#[cfg(not(coverage))]
fn freshest_direct_mjpeg_sample(sink: &gst_app::AppSink) -> Option<gst::Sample> {
let mut newest = sink.try_pull_sample(direct_mjpeg_normalize_pull_timeout());
while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) {
newest = Some(sample);
}
newest
}
#[cfg(not(coverage))]
fn build_direct_mjpeg_normalize_branch(
pipeline: &gst::Pipeline,
width: i32,
height: i32,
fps: i32,
) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> {
let src = gst::ElementFactory::make("appsrc")
.name("direct_mjpeg_normalize_src")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("direct MJPEG normalize appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", false);
configure_uvc_appsrc(&src);
let caps_in = gst::Caps::builder("image/jpeg")
.field("framerate", gst::Fraction::new(fps, 1))
.build();
src.set_caps(Some(&caps_in));
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
let jpegparse = gst::ElementFactory::make("jpegparse").build()?;
let decoder = gst::ElementFactory::make("jpegdec").build()?;
let decoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_decoded_queue")?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let scale = gst::ElementFactory::make("videoscale").build()?;
let raw_caps = gst::Caps::builder("video/x-raw")
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
let raw_capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let encoder = gst::ElementFactory::make("jpegenc")
.property(
"quality",
hevc_mjpeg_guard::direct_mjpeg_jpeg_quality() as i32,
)
.build()?;
let encoded_caps = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
let encoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_encoded_queue")?;
let sink = gst::ElementFactory::make("appsink")
.name("direct_mjpeg_normalize_sink")
.property("sync", false)
.property("enable-last-sample", false)
.property("emit-signals", false)
.property("max-buffers", 1u32)
.property("drop", true)
.build()?
.downcast::<gst_app::AppSink>()
.expect("direct MJPEG normalize appsink");
pipeline.add_many([
src.upcast_ref(),
&jpegparse,
&decoder,
&decoded_queue,
&convert,
&scale,
&raw_capsfilter,
&encoder,
&encoded_caps,
&encoded_queue,
sink.upcast_ref(),
])?;
gst::Element::link_many([
src.upcast_ref(),
&jpegparse,
&decoder,
&decoded_queue,
&convert,
&scale,
&raw_capsfilter,
&encoder,
&encoded_caps,
&encoded_queue,
sink.upcast_ref(),
])?;
Ok((src, sink))
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn add_hevc_mjpeg_spool_branch( fn add_hevc_mjpeg_spool_branch(
pipeline: &gst::Pipeline, pipeline: &gst::Pipeline,
@ -381,6 +497,8 @@ impl WebcamSink {
next_pts_us: AtomicU64::new(0), next_pts_us: AtomicU64::new(0),
frame_step_us, frame_step_us,
mjpeg_spool_path: None, mjpeg_spool_path: None,
direct_mjpeg_appsrc: None,
normalized_mjpeg_sink: None,
hevc_mjpeg_appsrc: None, hevc_mjpeg_appsrc: None,
decoded_mjpeg_sink: None, decoded_mjpeg_sink: None,
last_mjpeg_passthrough_bytes: AtomicU64::new(0), last_mjpeg_passthrough_bytes: AtomicU64::new(0),
@ -389,6 +507,7 @@ impl WebcamSink {
uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16, uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false), direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false),
last_decoded_mjpeg_bytes: AtomicU64::new(0), last_decoded_mjpeg_bytes: AtomicU64::new(0),
normalized_mjpeg_miss_count: AtomicU64::new(0),
decoded_mjpeg_miss_count: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0),
decode_recovery_needs_irap: AtomicBool::new(false), decode_recovery_needs_irap: AtomicBool::new(false),
}) })
@ -420,6 +539,8 @@ impl WebcamSink {
} }
let mut mjpeg_spool_file = None; let mut mjpeg_spool_file = None;
let mut direct_mjpeg_appsrc = None;
let mut normalized_mjpeg_sink = None;
let mut hevc_mjpeg_appsrc = None; let mut hevc_mjpeg_appsrc = None;
let mut decoded_mjpeg_sink = None; let mut decoded_mjpeg_sink = None;
@ -445,6 +566,27 @@ impl WebcamSink {
pipeline.add_many([src.upcast_ref(), &sink])?; pipeline.add_many([src.upcast_ref(), &sink])?;
gst::Element::link_many([src.upcast_ref(), &sink])?; gst::Element::link_many([src.upcast_ref(), &sink])?;
mjpeg_spool_file = Some(mjpeg_spool_path()); mjpeg_spool_file = Some(mjpeg_spool_path());
if hevc_mjpeg_guard::direct_mjpeg_normalize_enabled() {
match build_direct_mjpeg_normalize_branch(&pipeline, width, height, fps) {
Ok((normalize_src, normalize_sink)) => {
direct_mjpeg_appsrc = Some(normalize_src);
normalized_mjpeg_sink = Some(normalize_sink);
tracing::info!(
target: "lesavka_server::video",
quality = hevc_mjpeg_guard::direct_mjpeg_jpeg_quality(),
pull_timeout_ms = hevc_mjpeg_guard::direct_mjpeg_normalize_pull_timeout_ms(),
"📸 direct MJPEG UVC spool will normalize frames through jpegdec/jpegenc"
);
}
Err(err) => {
tracing::warn!(
target: "lesavka_server::video",
%err,
"📸⚠️ direct MJPEG normalization unavailable; falling back to guarded passthrough"
);
}
}
}
match add_hevc_mjpeg_spool_branch(&pipeline, width, height, fps) { match add_hevc_mjpeg_spool_branch(&pipeline, width, height, fps) {
Ok((hevc_src, hevc_sink)) => { Ok((hevc_src, hevc_sink)) => {
hevc_mjpeg_appsrc = Some(hevc_src); hevc_mjpeg_appsrc = Some(hevc_src);
@ -658,6 +800,8 @@ impl WebcamSink {
next_pts_us: AtomicU64::new(0), next_pts_us: AtomicU64::new(0),
frame_step_us, frame_step_us,
mjpeg_spool_path: mjpeg_spool_file, mjpeg_spool_path: mjpeg_spool_file,
direct_mjpeg_appsrc,
normalized_mjpeg_sink,
hevc_mjpeg_appsrc, hevc_mjpeg_appsrc,
decoded_mjpeg_sink, decoded_mjpeg_sink,
last_mjpeg_passthrough_bytes: AtomicU64::new(0), last_mjpeg_passthrough_bytes: AtomicU64::new(0),
@ -666,6 +810,7 @@ impl WebcamSink {
uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16, uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false), direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false),
last_decoded_mjpeg_bytes: AtomicU64::new(0), last_decoded_mjpeg_bytes: AtomicU64::new(0),
normalized_mjpeg_miss_count: AtomicU64::new(0),
decoded_mjpeg_miss_count: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0),
decode_recovery_needs_irap: AtomicBool::new(false), decode_recovery_needs_irap: AtomicBool::new(false),
_bus_watch: bus_watch, _bus_watch: bus_watch,
@ -850,6 +995,12 @@ impl WebcamSink {
); );
return; return;
} }
if self.direct_mjpeg_appsrc.is_some() && self.normalized_mjpeg_sink.is_some() {
self.spool_normalized_direct_mjpeg_frame(path, pkt);
return;
}
let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts); let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts);
if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) { if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) {
warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper"); warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper");
@ -858,6 +1009,90 @@ impl WebcamSink {
.store(pkt.data.len() as u64, std::sync::atomic::Ordering::Relaxed); .store(pkt.data.len() as u64, std::sync::atomic::Ordering::Relaxed);
} }
} }
#[cfg(not(coverage))]
fn spool_normalized_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) {
let Some(src) = self.direct_mjpeg_appsrc.as_ref() else {
return;
};
let Some(sink) = self.normalized_mjpeg_sink.as_ref() else {
return;
};
let mut buf = gst::Buffer::from_slice(pkt.data.clone());
if let Some(meta) = buf.get_mut() {
let ts = gst::ClockTime::from_useconds(pkt.pts);
meta.set_pts(Some(ts));
meta.set_dts(Some(ts));
meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us)));
}
if let Err(err) = src.push_buffer(buf) {
tracing::warn!(
target:"lesavka_server::video",
%err,
"📸⚠️ direct MJPEG normalization appsrc push failed"
);
return;
}
let Some(sample) = freshest_direct_mjpeg_sample(sink) else {
let misses = self
.normalized_mjpeg_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
if misses == 1 || misses % 30 == 0 {
warn!(
target:"lesavka_server::video",
misses,
"📸⚠️ direct MJPEG normalization produced no fresh frame; freezing last good UVC frame"
);
}
return;
};
let Some(buffer) = sample.buffer() else {
return;
};
let Ok(map) = buffer.map_readable() else {
return;
};
let normalized = map.as_slice();
let previous_bytes = self
.last_mjpeg_passthrough_bytes
.load(std::sync::atomic::Ordering::Relaxed);
if let Some(reason) = hevc_mjpeg_guard::direct_mjpeg_reject_reason(
previous_bytes,
Some(self.direct_mjpeg_max_bytes),
Some((self.uvc_width, self.uvc_height)),
normalized,
) {
let inspection = hevc_mjpeg_guard::inspect_mjpeg_frame(normalized);
warn!(
target:"lesavka_server::video",
?reason,
previous_bytes,
next_bytes = normalized.len(),
max_bytes = self.direct_mjpeg_max_bytes,
frame_width = ?inspection.width,
frame_height = ?inspection.height,
entropy_bytes = inspection.entropy_bytes,
entropy_distinct_bytes = inspection.entropy_distinct_bytes,
entropy_dominant_pct = inspection.entropy_dominant_pct,
entropy_max_run = inspection.entropy_max_run,
"📸⚠️ freezing suspicious normalized direct MJPEG frame before UVC spool"
);
return;
}
let timing = MjpegSpoolTiming::mjpeg_normalized(pkt.pts);
if let Err(err) = spool_mjpeg_frame_with_timing(path, normalized, Some(timing)) {
warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool normalized direct MJPEG frame for UVC helper");
} else {
self.normalized_mjpeg_miss_count
.store(0, std::sync::atomic::Ordering::Relaxed);
self.last_mjpeg_passthrough_bytes
.store(normalized.len() as u64, std::sync::atomic::Ordering::Relaxed);
}
}
} }
impl Drop for WebcamSink { impl Drop for WebcamSink {

View File

@ -122,6 +122,8 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() {
"freshest_mjpeg_sample(sink)", "freshest_mjpeg_sample(sink)",
"last_decoded_mjpeg_bytes", "last_decoded_mjpeg_bytes",
"last_mjpeg_passthrough_bytes", "last_mjpeg_passthrough_bytes",
"direct_mjpeg_normalize_src",
"mjpeg_normalized",
"should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())", "should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())",
"direct_mjpeg_reject_reason(", "direct_mjpeg_reject_reason(",
"spool_direct_mjpeg_frame", "spool_direct_mjpeg_frame",

View File

@ -35,6 +35,9 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
"LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS=%s", "LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS=%s",
"LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s", "LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s",
"LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s", "LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s",
"LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS=%s",
"LESAVKA_SERVER_BIND_ADDR=%s", "LESAVKA_SERVER_BIND_ADDR=%s",
"/etc/lesavka/uvc.env", "/etc/lesavka/uvc.env",
"LESAVKA_UVC_MAXPACKET=", "LESAVKA_UVC_MAXPACKET=",
@ -169,6 +172,9 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS:-20}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS:-20}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-2}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-2}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE:-1}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_JPEG_QUALITY:-72}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE_PULL_TIMEOUT_MS:-25}"));
assert!( assert!(
SERVER_INSTALL.contains("lesavka_server::video=info"), SERVER_INSTALL.contains("lesavka_server::video=info"),
"server installs should not leave the hot webcam frame path at debug logging by default" "server installs should not leave the hot webcam frame path at debug logging by default"

View File

@ -85,6 +85,7 @@ fn server_env_persists_runtime_profile_and_tls_settings() {
"LESAVKA_UPSTREAM_HEVC_VIDEO_PLAYOUT_MODE_OFFSETS_US=%s", "LESAVKA_UPSTREAM_HEVC_VIDEO_PLAYOUT_MODE_OFFSETS_US=%s",
"LESAVKA_UPSTREAM_MJPEG_VIDEO_PLAYOUT_MODE_OFFSETS_US=%s", "LESAVKA_UPSTREAM_MJPEG_VIDEO_PLAYOUT_MODE_OFFSETS_US=%s",
"LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP=%s", "LESAVKA_UVC_HEVC_FREEZE_ON_SIZE_DROP=%s",
"LESAVKA_UVC_DIRECT_MJPEG_NORMALIZE=%s",
"LESAVKA_SERVER_BIND_ADDR=%s", "LESAVKA_SERVER_BIND_ADDR=%s",
"LESAVKA_REQUIRE_TLS=%s", "LESAVKA_REQUIRE_TLS=%s",
"LESAVKA_TLS_CLIENT_CA=%s", "LESAVKA_TLS_CLIENT_CA=%s",