use anyhow::Context; use gstreamer as gst; use gstreamer::prelude::*; use gstreamer_app as gst_app; use lesavka_common::lesavka::VideoPacket; use std::fs; use std::path::Path; use std::path::PathBuf; use std::sync::{ Arc, atomic::{AtomicBool, AtomicU64, Ordering}, }; use tracing::warn; use crate::camera::{CameraCodec, CameraConfig}; use crate::video_support::{ contains_hevc_irap, contains_idr, dev_mode_enabled, require_h264_decoder, require_hevc_decoder, reserve_local_pts, }; mod mjpeg_spool; #[cfg(not(coverage))] use gst::MessageView::{Error, StateChanged, Warning}; #[cfg(not(coverage))] use mjpeg_spool::{ MjpegSpoolTiming, freshest_mjpeg_sample, mjpeg_spool_frame_max_bytes, spool_mjpeg_frame_with_timing, }; use mjpeg_spool::{mjpeg_spool_enabled, mjpeg_spool_path}; /// Push H.264 or MJPEG frames into the USB UVC gadget. /// /// Inputs: a UVC device node and the negotiated camera configuration. /// Outputs: a live `WebcamSink` that accepts `VideoPacket`s. /// Why: the UVC sink owns the GStreamer pipeline details for gadget output so /// the relay logic can focus on session lifecycle instead of media plumbing. pub struct WebcamSink { appsrc: gst_app::AppSrc, pipe: gst::Pipeline, clock_aligned: AtomicBool, next_pts_us: AtomicU64, frame_step_us: u64, mjpeg_spool_path: Option, direct_mjpeg_appsrc: Option, normalized_mjpeg_sink: Option, hevc_mjpeg_appsrc: Option, decoded_mjpeg_sink: Option, last_mjpeg_passthrough_bytes: AtomicU64, direct_mjpeg_max_bytes: usize, uvc_width: u16, uvc_height: u16, direct_mjpeg_profile_mismatch_seen: AtomicBool, last_decoded_mjpeg_bytes: AtomicU64, direct_mjpeg_normalize_bypassed: AtomicBool, normalized_mjpeg_miss_count: AtomicU64, normalized_mjpeg_memory_check_count: AtomicU64, decoded_mjpeg_miss_count: AtomicU64, decode_recovery_needs_irap: AtomicBool, #[cfg(not(coverage))] _bus_watch: Option, } fn uvc_sink_session_clock_align_enabled() -> bool { std::env::var("LESAVKA_UVC_SESSION_CLOCK_ALIGN") .ok() .map(|value| { let trimmed = value.trim(); !(trimmed.eq_ignore_ascii_case("0") || trimmed.eq_ignore_ascii_case("false") || trimmed.eq_ignore_ascii_case("no") || trimmed.eq_ignore_ascii_case("off")) }) .unwrap_or(true) } fn uvc_mjpeg_v4l2sink_io_mode() -> String { let value = std::env::var("LESAVKA_UVC_MJPEG_IO_MODE").unwrap_or_else(|_| "mmap".to_string()); let trimmed = value.trim().to_ascii_lowercase(); match trimmed.as_str() { "auto" | "rw" | "mmap" | "userptr" | "dmabuf" | "dmabuf-import" => trimmed, _ => { warn!( value, "invalid LESAVKA_UVC_MJPEG_IO_MODE; falling back to mmap" ); "mmap".to_string() } } } fn positive_u64_env(name: &str, default: u64) -> u64 { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value > 0) .unwrap_or(default) } fn uvc_appsrc_max_buffers() -> u64 { positive_u64_env("LESAVKA_UVC_APP_MAX_BUFFERS", 4) } fn uvc_appsrc_max_bytes() -> u64 { positive_u64_env("LESAVKA_UVC_APP_MAX_BYTES", 4 * 1024 * 1024) } fn uvc_appsrc_max_time_ns() -> u64 { positive_u64_env("LESAVKA_UVC_APP_MAX_TIME_NS", 200_000_000) } fn uvc_appsrc_leaky_type() -> String { std::env::var("LESAVKA_UVC_APP_LEAKY_TYPE") .ok() .map(|value| value.trim().to_ascii_lowercase()) .filter(|value| matches!(value.as_str(), "downstream" | "upstream" | "none")) .unwrap_or_else(|| "downstream".to_string()) } fn looks_like_mjpeg_frame(data: &[u8]) -> bool { data.len() >= 4 && data.starts_with(&[0xff, 0xd8, 0xff]) } fn looks_like_annex_b_hevc(data: &[u8]) -> bool { data.starts_with(&[0, 0, 0, 1]) || data.starts_with(&[0, 0, 1]) } fn uvc_hevc_freshness_queue_buffers() -> u32 { positive_u64_env("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", 2) .min(4) .max(1) as u32 } fn uvc_hevc_decode_miss_limit() -> u64 { positive_u64_env("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", 15) } /// Bound the UVC ingress queue so decode/UVC stalls cannot turn into RSS growth. /// /// Inputs: the UVC `appsrc`. Output: side-effect-only GStreamer properties. /// Why: live webcam output should prefer dropping stale frames over buffering /// seconds or minutes of encoded media when the physical sink falls behind. fn configure_uvc_appsrc(appsrc: &gst_app::AppSrc) { appsrc.set_property("block", false); if appsrc.has_property("max-buffers", None) { appsrc.set_property("max-buffers", uvc_appsrc_max_buffers()); } if appsrc.has_property("max-bytes", None) { appsrc.set_property("max-bytes", uvc_appsrc_max_bytes()); } if appsrc.has_property("max-time", None) { appsrc.set_property("max-time", uvc_appsrc_max_time_ns()); } if appsrc.has_property("leaky-type", None) { appsrc.set_property_from_str("leaky-type", &uvc_appsrc_leaky_type()); } } /// Build a tiny leaky queue for the decoded HEVC handoff branch. /// /// Inputs: a stable queue name. Output: configured GStreamer queue element. /// Why: hidden raw/JPEG backlogs are memory leaks in a live webcam path; after /// HEVC is decoded it is safe to drop stale raw frames and keep only the newest /// candidate for MJPEG publication while absorbing normal decoder scheduling /// jitter. #[cfg(not(coverage))] fn build_hevc_freshness_queue(name: &str) -> anyhow::Result { let queue = gst::ElementFactory::make("queue") .name(name) .property("max-size-buffers", uvc_hevc_freshness_queue_buffers()) .property("max-size-bytes", 0u32) .property("max-size-time", 0u64) .build()?; queue.set_property_from_str("leaky", "downstream"); Ok(queue) } #[cfg(not(coverage))] fn direct_mjpeg_normalize_pull_timeout() -> gst::ClockTime { gst::ClockTime::from_mseconds(u64::from( hevc_mjpeg_guard::direct_mjpeg_normalize_pull_timeout_ms(), )) } #[cfg(not(coverage))] fn current_process_rss_kb() -> Option { let status = fs::read_to_string("/proc/self/status").ok()?; status.lines().find_map(|line| { let rest = line.strip_prefix("VmRSS:")?; rest.split_whitespace().next()?.parse::().ok() }) } /// Drain normalized direct-MJPEG output down to the freshest sample. /// /// Inputs: the direct-MJPEG normalization appsink. Output: newest available /// sample, if any. Why: decode/re-encode should sanitize browser-facing MJPEG /// without letting stale frames accumulate behind the live webcam feed. #[cfg(not(coverage))] fn freshest_direct_mjpeg_sample(sink: &gst_app::AppSink) -> Option { let mut newest = sink.try_pull_sample(direct_mjpeg_normalize_pull_timeout()); while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) { newest = Some(sample); } newest } #[cfg(not(coverage))] fn build_direct_mjpeg_normalize_branch( pipeline: &gst::Pipeline, width: i32, height: i32, _fps: i32, ) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> { let src = gst::ElementFactory::make("appsrc") .name("direct_mjpeg_normalize_src") .build()? .downcast::() .expect("direct MJPEG normalize appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); src.set_property("do-timestamp", true); configure_uvc_appsrc(&src); let caps_in = gst::Caps::builder("image/jpeg").build(); src.set_caps(Some(&caps_in)); let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .field("colorimetry", "2:4:7:1") .build(); let decoder = gst::ElementFactory::make("jpegdec").build()?; let decoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_decoded_queue")?; let convert = gst::ElementFactory::make("videoconvert").build()?; let scale = gst::ElementFactory::make("videoscale").build()?; let raw_caps = gst::Caps::builder("video/x-raw") .field("width", width) .field("height", height) .build(); let raw_capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &raw_caps) .build()?; let encoder = gst::ElementFactory::make("jpegenc") .property( "quality", hevc_mjpeg_guard::direct_mjpeg_jpeg_quality() as i32, ) .build()?; let encoded_caps = gst::ElementFactory::make("capsfilter") .property("caps", &caps_mjpeg) .build()?; let encoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_encoded_queue")?; let sink = gst::ElementFactory::make("appsink") .name("direct_mjpeg_normalize_sink") .property("sync", false) .property("enable-last-sample", false) .property("emit-signals", false) .property("max-buffers", 1u32) .property("drop", true) .build()? .downcast::() .expect("direct MJPEG normalize appsink"); pipeline.add_many([ src.upcast_ref(), &decoder, &decoded_queue, &convert, &scale, &raw_capsfilter, &encoder, &encoded_caps, &encoded_queue, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &decoder, &decoded_queue, &convert, &scale, &raw_capsfilter, &encoder, &encoded_caps, &encoded_queue, sink.upcast_ref(), ])?; Ok((src, sink)) } #[cfg(not(coverage))] fn add_hevc_mjpeg_spool_branch( pipeline: &gst::Pipeline, width: i32, height: i32, fps: i32, ) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> { let src = gst::ElementFactory::make("appsrc") .name("dynamic_hevc_mjpeg_src") .build()? .downcast::() .expect("dynamic HEVC appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); src.set_property("do-timestamp", false); configure_uvc_appsrc(&src); let caps_hevc = gst::Caps::builder("video/x-h265") .field("stream-format", "byte-stream") .field("alignment", "au") .build(); src.set_caps(Some(&caps_hevc)); let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .field("colorimetry", "2:4:7:1") .build(); let h265parse = gst::ElementFactory::make("h265parse") .property("disable-passthrough", true) .property("config-interval", -1i32) .build()?; let decoder_name = require_hevc_decoder()?; let decoder = gst::ElementFactory::make(decoder_name) .build() .with_context(|| format!("building dynamic HEVC decoder element {decoder_name}"))?; configure_hevc_decoder(&decoder); let decoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_decoded_queue")?; let convert = gst::ElementFactory::make("videoconvert").build()?; let encoder = gst::ElementFactory::make("jpegenc") .property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32) .build()?; let caps = gst::ElementFactory::make("capsfilter") .property("caps", &caps_mjpeg) .build()?; let encoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_encoded_queue")?; let sink = gst::ElementFactory::make("appsink") .name("dynamic_hevc_mjpeg_spool_sink") .property("sync", false) .property("enable-last-sample", false) .property("emit-signals", false) .property("max-buffers", 1u32) .property("drop", true) .build()? .downcast::() .expect("dynamic HEVC appsink"); pipeline.add_many([ src.upcast_ref(), &h265parse, &decoder, &decoded_queue, &convert, &encoder, &caps, &encoded_queue, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &h265parse, &decoder, &decoded_queue, &convert, &encoder, &caps, &encoded_queue, sink.upcast_ref(), ])?; Ok((src, sink)) } /// Configure conservative recovery knobs on hardware HEVC decoders. /// /// Inputs: a decoder element selected by `require_hevc_decoder`. Output: /// side-effect-only property updates when the element supports them. Why: the /// Pi stateless decoder can otherwise hold onto corrupt or dependency-missing /// pictures after live HEVC packet drops, starving the MJPEG UVC handoff. #[cfg(not(coverage))] fn configure_hevc_decoder(decoder: &gst::Element) { if decoder.has_property("discard-corrupted-frames", None) { decoder.set_property("discard-corrupted-frames", true); } if decoder.has_property("automatic-request-sync-points", None) { decoder.set_property("automatic-request-sync-points", true); } } #[cfg(not(coverage))] struct WebcamBusWatchHandle { alive: Arc, join: Option>, } #[cfg(not(coverage))] impl WebcamBusWatchHandle { fn spawn(bus: gst::Bus, label: &'static str) -> Self { let alive = Arc::new(AtomicBool::new(true)); let alive_flag = Arc::clone(&alive); let join = std::thread::spawn(move || { while alive_flag.load(Ordering::Relaxed) { let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else { continue; }; match msg.view() { StateChanged(state) if state.current() == gst::State::Playing && msg.src().is_some_and(|src| src.is::()) => { tracing::debug!(target: "lesavka_server::video", label, "📸 UVC webcam pipeline ▶️"); } Error(err) => tracing::error!( target: "lesavka_server::video", label, "📸💥 UVC webcam pipeline error from {:?}: {} ({})", msg.src().map(gst::prelude::GstObjectExt::path_string), err.error(), err.debug().unwrap_or_default() ), Warning(warning) => tracing::warn!( target: "lesavka_server::video", label, "📸⚠️ UVC webcam pipeline warning from {:?}: {} ({})", msg.src().map(gst::prelude::GstObjectExt::path_string), warning.error(), warning.debug().unwrap_or_default() ), _ => {} } } }); Self { alive, join: Some(join), } } } #[cfg(not(coverage))] impl Drop for WebcamBusWatchHandle { fn drop(&mut self) { self.alive.store(false, Ordering::Relaxed); if let Some(join) = self.join.take() { let _ = join.join(); } } } #[cfg(not(coverage))] fn spawn_webcam_bus_logger( pipeline: &gst::Pipeline, label: &'static str, ) -> Option { pipeline .bus() .map(|bus| WebcamBusWatchHandle::spawn(bus, label)) } impl WebcamSink { /// Build a new webcam sink pipeline. /// /// Inputs: the target UVC device plus the selected camera profile. /// Outputs: a sink ready to receive `VideoPacket`s. /// Why: UVC output has its own caps and decoder chain that differs from the /// HDMI sink, so it lives in a dedicated constructor. #[cfg(coverage)] pub fn new(_uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result { gst::init()?; let pipeline = gst::Pipeline::new(); let clock_align_enabled = uvc_sink_session_clock_align_enabled(); let src = gst::ElementFactory::make("appsrc") .build()? .downcast::() .expect("appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); src.set_property("do-timestamp", &false); configure_uvc_appsrc(&src); let sink = gst::ElementFactory::make("fakesink") .build() .context("building fakesink")?; if clock_align_enabled { crate::media_timing::prepare_pipeline_clock_sync(&pipeline); crate::media_timing::enable_sink_clock_sync(&sink); } pipeline.add_many(&[src.upcast_ref(), &sink])?; gst::Element::link_many(&[src.upcast_ref(), &sink])?; pipeline.set_state(gst::State::Playing)?; let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1); Ok(Self { appsrc: src, pipe: pipeline, clock_aligned: AtomicBool::new(!clock_align_enabled), next_pts_us: AtomicU64::new(0), frame_step_us, mjpeg_spool_path: None, direct_mjpeg_appsrc: None, normalized_mjpeg_sink: None, hevc_mjpeg_appsrc: None, decoded_mjpeg_sink: None, last_mjpeg_passthrough_bytes: AtomicU64::new(0), direct_mjpeg_max_bytes: mjpeg_spool::mjpeg_spool_frame_max_bytes(cfg.fps), uvc_width: cfg.width.min(u32::from(u16::MAX)) as u16, uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16, direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false), last_decoded_mjpeg_bytes: AtomicU64::new(0), direct_mjpeg_normalize_bypassed: AtomicBool::new(false), normalized_mjpeg_miss_count: AtomicU64::new(0), normalized_mjpeg_memory_check_count: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0), decode_recovery_needs_irap: AtomicBool::new(false), }) } #[cfg(not(coverage))] pub fn new(uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result { gst::init()?; let pipeline = gst::Pipeline::new(); let clock_align_enabled = uvc_sink_session_clock_align_enabled(); let width = cfg.width as i32; let height = cfg.height as i32; let fps = cfg.fps.max(1) as i32; let use_mjpeg = matches!(cfg.codec, CameraCodec::Mjpeg); let use_hevc = matches!(cfg.codec, CameraCodec::Hevc); let src = gst::ElementFactory::make("appsrc") .build()? .downcast::() .expect("appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); src.set_property("do-timestamp", false); configure_uvc_appsrc(&src); if clock_align_enabled { crate::media_timing::prepare_pipeline_clock_sync(&pipeline); } let mut mjpeg_spool_file = None; let mut direct_mjpeg_appsrc = None; let mut normalized_mjpeg_sink = None; let mut hevc_mjpeg_appsrc = None; let mut decoded_mjpeg_sink = None; if use_mjpeg && mjpeg_spool_enabled() { let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .field("colorimetry", "2:4:7:1") .build(); src.set_caps(Some(&caps_mjpeg)); let sink = gst::ElementFactory::make("fakesink") .build() .context("building fakesink for MJPEG UVC spool")?; if clock_align_enabled { crate::media_timing::enable_sink_clock_sync(&sink); } else if sink.has_property("sync", None) { sink.set_property("sync", false); } pipeline.add_many([src.upcast_ref(), &sink])?; gst::Element::link_many([src.upcast_ref(), &sink])?; mjpeg_spool_file = Some(mjpeg_spool_path()); if hevc_mjpeg_guard::direct_mjpeg_normalize_enabled() { match build_direct_mjpeg_normalize_branch(&pipeline, width, height, fps) { Ok((normalize_src, normalize_sink)) => { direct_mjpeg_appsrc = Some(normalize_src); normalized_mjpeg_sink = Some(normalize_sink); tracing::info!( target: "lesavka_server::video", quality = hevc_mjpeg_guard::direct_mjpeg_jpeg_quality(), pull_timeout_ms = hevc_mjpeg_guard::direct_mjpeg_normalize_pull_timeout_ms(), "📸 direct MJPEG UVC spool will normalize frames through jpegdec/jpegenc" ); } Err(err) => { tracing::warn!( target: "lesavka_server::video", %err, "📸⚠️ direct MJPEG normalization unavailable; falling back to guarded passthrough" ); } } } match add_hevc_mjpeg_spool_branch(&pipeline, width, height, fps) { Ok((hevc_src, hevc_sink)) => { hevc_mjpeg_appsrc = Some(hevc_src); decoded_mjpeg_sink = Some(hevc_sink); tracing::info!( target: "lesavka_server::video", "📸 MJPEG UVC spool will also accept live HEVC uplink packets" ); } Err(err) => { tracing::warn!( target: "lesavka_server::video", %err, "📸⚠️ dynamic HEVC->MJPEG branch unavailable; MJPEG UVC spool will accept MJPEG only" ); } } } else if use_mjpeg { let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .field("colorimetry", "2:4:7:1") .build(); src.set_caps(Some(&caps_mjpeg)); let queue = gst::ElementFactory::make("queue").build()?; let capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &caps_mjpeg) .build()?; let sink = gst::ElementFactory::make("v4l2sink") .property("device", uvc_dev) .build()?; // Kept as an emergency fallback; normal MJPEG output is brokered // through the UVC helper so only one process owns the gadget node. sink.set_property_from_str("io-mode", &uvc_mjpeg_v4l2sink_io_mode()); if clock_align_enabled { crate::media_timing::enable_sink_clock_sync(&sink); } else if sink.has_property("sync", None) { sink.set_property("sync", false); } pipeline.add_many([src.upcast_ref(), &queue, &capsfilter, &sink])?; gst::Element::link_many([src.upcast_ref(), &queue, &capsfilter, &sink])?; } else if use_hevc { let caps_hevc = gst::Caps::builder("video/x-h265") .field("stream-format", "byte-stream") .field("alignment", "au") .build(); let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .field("colorimetry", "2:4:7:1") .build(); src.set_caps(Some(&caps_hevc)); let h265parse = gst::ElementFactory::make("h265parse") .property("disable-passthrough", true) .property("config-interval", -1i32) .build()?; let decoder_name = require_hevc_decoder()?; let decoder = gst::ElementFactory::make(decoder_name) .build() .with_context(|| format!("building HEVC decoder element {decoder_name}"))?; configure_hevc_decoder(&decoder); let decoded_queue = build_hevc_freshness_queue("hevc_mjpeg_decoded_queue")?; let convert = gst::ElementFactory::make("videoconvert").build()?; let encoder = gst::ElementFactory::make("jpegenc") .property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32) .build()?; let caps = gst::ElementFactory::make("capsfilter") .property("caps", &caps_mjpeg) .build()?; let encoded_queue = build_hevc_freshness_queue("hevc_mjpeg_encoded_queue")?; tracing::info!( target: "lesavka_server::video", decoder = decoder_name, "📸 HEVC camera uplink will be decoded and emitted as MJPEG/UVC" ); if mjpeg_spool_enabled() { let sink = gst::ElementFactory::make("appsink") .name("hevc_mjpeg_spool_sink") .property("sync", false) .property("enable-last-sample", false) .property("emit-signals", false) .property("max-buffers", 1u32) .property("drop", true) .build()? .downcast::() .expect("appsink"); pipeline.add_many([ src.upcast_ref(), &h265parse, &decoder, &decoded_queue, &convert, &encoder, &caps, &encoded_queue, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &h265parse, &decoder, &decoded_queue, &convert, &encoder, &caps, &encoded_queue, sink.upcast_ref(), ])?; mjpeg_spool_file = Some(mjpeg_spool_path()); decoded_mjpeg_sink = Some(sink); } else { let sink = gst::ElementFactory::make("v4l2sink") .property("device", uvc_dev) .build()?; sink.set_property_from_str("io-mode", &uvc_mjpeg_v4l2sink_io_mode()); if clock_align_enabled { crate::media_timing::enable_sink_clock_sync(&sink); } else if sink.has_property("sync", None) { sink.set_property("sync", false); } pipeline.add_many([ src.upcast_ref(), &h265parse, &decoder, &convert, &encoder, &caps, &sink, ])?; gst::Element::link_many([ src.upcast_ref(), &h265parse, &decoder, &convert, &encoder, &caps, &sink, ])?; } } else { let caps_h264 = gst::Caps::builder("video/x-h264") .field("stream-format", "byte-stream") .field("alignment", "au") .build(); let raw_caps = gst::Caps::builder("video/x-raw") .field("format", "YUY2") .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .build(); src.set_caps(Some(&caps_h264)); let h264parse = gst::ElementFactory::make("h264parse").build()?; let decoder_name = require_h264_decoder()?; let decoder = gst::ElementFactory::make(decoder_name) .build() .with_context(|| format!("building decoder element {decoder_name}"))?; let convert = gst::ElementFactory::make("videoconvert").build()?; let scale = gst::ElementFactory::make("videoscale").build()?; let caps = gst::ElementFactory::make("capsfilter") .property("caps", &raw_caps) .build()?; let sink = gst::ElementFactory::make("v4l2sink") .property("device", uvc_dev) .build()?; if clock_align_enabled { crate::media_timing::enable_sink_clock_sync(&sink); } else if sink.has_property("sync", None) { sink.set_property("sync", false); } pipeline.add_many([ src.upcast_ref(), &h264parse, &decoder, &convert, &scale, &caps, &sink, ])?; gst::Element::link_many([ src.upcast_ref(), &h264parse, &decoder, &convert, &scale, &caps, &sink, ])?; } pipeline.set_state(gst::State::Playing)?; let bus_watch = spawn_webcam_bus_logger(&pipeline, "uvc-webcam"); let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1); Ok(Self { appsrc: src, pipe: pipeline, clock_aligned: AtomicBool::new(!clock_align_enabled), next_pts_us: AtomicU64::new(0), frame_step_us, mjpeg_spool_path: mjpeg_spool_file, direct_mjpeg_appsrc, normalized_mjpeg_sink, hevc_mjpeg_appsrc, decoded_mjpeg_sink, last_mjpeg_passthrough_bytes: AtomicU64::new(0), direct_mjpeg_max_bytes: mjpeg_spool_frame_max_bytes(cfg.fps), uvc_width: cfg.width.min(u32::from(u16::MAX)) as u16, uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16, direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false), last_decoded_mjpeg_bytes: AtomicU64::new(0), direct_mjpeg_normalize_bypassed: AtomicBool::new(false), normalized_mjpeg_miss_count: AtomicU64::new(0), normalized_mjpeg_memory_check_count: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0), decode_recovery_needs_irap: AtomicBool::new(false), _bus_watch: bus_watch, }) } /// Push one client frame into the UVC pipeline. /// /// Inputs: the next `VideoPacket` from the gRPC camera stream. /// Outputs: none; the frame is forwarded to the appsrc when possible. /// Why: UVC sinks use a locally monotonic timeline so presentation remains /// stable even when WAN packet timestamps arrive out of order. #[cfg(coverage)] pub fn push(&self, pkt: VideoPacket) { let buf = gst::Buffer::from_slice(pkt.data); let _ = self.appsrc.push_buffer(buf); } #[cfg(not(coverage))] pub fn push(&self, pkt: VideoPacket) { if let Some(path) = &self.mjpeg_spool_path && looks_like_mjpeg_frame(&pkt.data) { self.spool_direct_mjpeg_frame(path, &pkt); return; } let hevc_recovery_frame = self.decoded_mjpeg_sink.is_some() && contains_hevc_irap(&pkt.data); if self.decoded_mjpeg_sink.is_some() && self .decode_recovery_needs_irap .load(std::sync::atomic::Ordering::Relaxed) { if !hevc_recovery_frame { return; } self.decode_recovery_needs_irap .store(false, std::sync::atomic::Ordering::Relaxed); self.decoded_mjpeg_miss_count .store(0, std::sync::atomic::Ordering::Relaxed); tracing::info!( target: "lesavka_server::video", pts = pkt.pts, "📸 HEVC decoded-MJPEG handoff found a recovery keyframe" ); } if self.mjpeg_spool_path.is_some() && self.decoded_mjpeg_sink.is_none() && !looks_like_mjpeg_frame(&pkt.data) { warn!( target:"lesavka_server::video", bytes = pkt.data.len(), hevc_annex_b = looks_like_annex_b_hevc(&pkt.data), "📸⚠️ dropping non-MJPEG packet before UVC spool; no dynamic decoder is available" ); return; } let mut buf = gst::Buffer::from_slice(pkt.data); if let Some(meta) = buf.get_mut() { let pts_us = reserve_local_pts(&self.next_pts_us, pkt.pts, self.frame_step_us); if !self .clock_aligned .swap(true, std::sync::atomic::Ordering::SeqCst) { crate::media_timing::align_pipeline_to_session_clock(&self.pipe, pts_us); } let ts = gst::ClockTime::from_useconds(pts_us); meta.set_pts(Some(ts)); meta.set_dts(Some(ts)); meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); } let hevc_appsrc = self.hevc_mjpeg_appsrc.as_ref().unwrap_or(&self.appsrc); if let Err(err) = hevc_appsrc.push_buffer(buf) { tracing::warn!(target:"lesavka_server::video", %err, "📸⚠️ appsrc push failed"); return; } if let (Some(path), Some(sink)) = (&self.mjpeg_spool_path, &self.decoded_mjpeg_sink) && let Some(sample) = freshest_mjpeg_sample(sink) && let Some(buffer) = sample.buffer() && let Ok(map) = buffer.map_readable() { self.decoded_mjpeg_miss_count .store(0, std::sync::atomic::Ordering::Relaxed); let decoded_pts_us = buffer.pts().map(|pts| pts.nseconds() / 1_000); let timing = MjpegSpoolTiming::hevc_decoded_mjpeg(pkt.pts, decoded_pts_us); let previous_bytes = self .last_decoded_mjpeg_bytes .load(std::sync::atomic::Ordering::Relaxed); let decoded_bytes = map.as_slice().len(); if hevc_mjpeg_guard::should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice()) { warn!( target:"lesavka_server::video", previous_bytes, next_bytes = decoded_bytes, "📸⚠️ freezing suspicious decoded HEVC->MJPEG frame" ); return; } if decoded_bytes > self.direct_mjpeg_max_bytes { warn!( target:"lesavka_server::video", previous_bytes, next_bytes = decoded_bytes, max_bytes = self.direct_mjpeg_max_bytes, "📸⚠️ freezing oversized decoded HEVC->MJPEG frame before UVC spool" ); return; } if let Err(err) = spool_mjpeg_frame_with_timing(path, map.as_slice(), Some(timing)) { warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool decoded HEVC frame for UVC helper"); } else { self.last_decoded_mjpeg_bytes .store(decoded_bytes as u64, std::sync::atomic::Ordering::Relaxed); } } else if self.decoded_mjpeg_sink.is_some() { let misses = self .decoded_mjpeg_miss_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; let limit = uvc_hevc_decode_miss_limit(); if misses >= limit { self.decode_recovery_needs_irap .store(true, std::sync::atomic::Ordering::Relaxed); self.decoded_mjpeg_miss_count .store(0, std::sync::atomic::Ordering::Relaxed); warn!( target: "lesavka_server::video", misses, limit, "📸⚠️ HEVC decoded-MJPEG handoff produced no frames; freezing output until the next recovery keyframe" ); } } } #[cfg(not(coverage))] fn spool_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { let previous_bytes = self .last_mjpeg_passthrough_bytes .load(std::sync::atomic::Ordering::Relaxed); let inspection = hevc_mjpeg_guard::inspect_mjpeg_frame(&pkt.data); if let (Some(width), Some(height)) = (inspection.width, inspection.height) && (width, height) != (self.uvc_width, self.uvc_height) && !self .direct_mjpeg_profile_mismatch_seen .swap(true, std::sync::atomic::Ordering::Relaxed) { warn!( target:"lesavka_server::video", frame_width = width, frame_height = height, uvc_width = self.uvc_width, uvc_height = self.uvc_height, "📸⚠️ direct MJPEG frame dimensions differ from the live UVC profile; this can make browser output unstable" ); } if let Some(reason) = hevc_mjpeg_guard::direct_mjpeg_reject_reason( previous_bytes, Some(self.direct_mjpeg_max_bytes), Some((self.uvc_width, self.uvc_height)), &pkt.data, ) { warn!( target:"lesavka_server::video", ?reason, previous_bytes, next_bytes = pkt.data.len(), max_bytes = self.direct_mjpeg_max_bytes, frame_width = ?inspection.width, frame_height = ?inspection.height, entropy_bytes = inspection.entropy_bytes, entropy_distinct_bytes = inspection.entropy_distinct_bytes, entropy_dominant_pct = inspection.entropy_dominant_pct, entropy_max_run = inspection.entropy_max_run, "📸⚠️ freezing suspicious direct MJPEG frame before UVC spool" ); return; } if self.direct_mjpeg_appsrc.is_some() && self.normalized_mjpeg_sink.is_some() && !self .direct_mjpeg_normalize_bypassed .load(std::sync::atomic::Ordering::Relaxed) { self.spool_normalized_direct_mjpeg_frame(path, pkt); return; } self.spool_passthrough_direct_mjpeg_frame(path, pkt); } #[cfg(not(coverage))] fn spool_passthrough_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts); if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) { warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper"); } else { self.last_mjpeg_passthrough_bytes .store(pkt.data.len() as u64, std::sync::atomic::Ordering::Relaxed); } } #[cfg(not(coverage))] fn spool_normalized_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { let Some(src) = self.direct_mjpeg_appsrc.as_ref() else { self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; }; let Some(sink) = self.normalized_mjpeg_sink.as_ref() else { self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; }; if self .normalized_mjpeg_memory_check_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed) .is_multiple_of(150) && let Some(limit_kb) = hevc_mjpeg_guard::direct_mjpeg_normalize_rss_limit_kb() && let Some(rss_kb) = current_process_rss_kb() && rss_kb > limit_kb { self.direct_mjpeg_normalize_bypassed .store(true, std::sync::atomic::Ordering::Relaxed); warn!( target:"lesavka_server::video", rss_kb, limit_kb, "📸⚠️ direct MJPEG normalization disabled because server RSS exceeded its safety limit" ); self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; } let buf = gst::Buffer::from_slice(pkt.data.clone()); if let Err(err) = src.push_buffer(buf) { tracing::warn!( target:"lesavka_server::video", %err, "📸⚠️ direct MJPEG normalization appsrc push failed; falling back to guarded passthrough" ); self.direct_mjpeg_normalize_bypassed .store(true, std::sync::atomic::Ordering::Relaxed); self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; } let Some(sample) = freshest_direct_mjpeg_sample(sink) else { let misses = self .normalized_mjpeg_miss_count .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; let limit = u64::from(hevc_mjpeg_guard::direct_mjpeg_normalize_miss_limit()); if misses == 1 || misses % 30 == 0 { warn!( target:"lesavka_server::video", misses, limit, "📸⚠️ direct MJPEG normalization produced no fresh frame; freezing last good UVC frame until fallback threshold" ); } if misses >= limit { self.direct_mjpeg_normalize_bypassed .store(true, std::sync::atomic::Ordering::Relaxed); warn!( target:"lesavka_server::video", misses, limit, "📸⚠️ direct MJPEG normalization starved; falling back to guarded passthrough for this webcam session" ); self.spool_passthrough_direct_mjpeg_frame(path, pkt); } return; }; let Some(buffer) = sample.buffer() else { self.direct_mjpeg_normalize_bypassed .store(true, std::sync::atomic::Ordering::Relaxed); warn!( target:"lesavka_server::video", "📸⚠️ direct MJPEG normalization returned an empty sample; falling back to guarded passthrough" ); self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; }; let Ok(map) = buffer.map_readable() else { self.direct_mjpeg_normalize_bypassed .store(true, std::sync::atomic::Ordering::Relaxed); warn!( target:"lesavka_server::video", "📸⚠️ direct MJPEG normalization returned an unreadable sample; falling back to guarded passthrough" ); self.spool_passthrough_direct_mjpeg_frame(path, pkt); return; }; let normalized = map.as_slice(); let previous_bytes = self .last_mjpeg_passthrough_bytes .load(std::sync::atomic::Ordering::Relaxed); if let Some(reason) = hevc_mjpeg_guard::direct_mjpeg_reject_reason( previous_bytes, Some(self.direct_mjpeg_max_bytes), Some((self.uvc_width, self.uvc_height)), normalized, ) { let inspection = hevc_mjpeg_guard::inspect_mjpeg_frame(normalized); warn!( target:"lesavka_server::video", ?reason, previous_bytes, next_bytes = normalized.len(), max_bytes = self.direct_mjpeg_max_bytes, frame_width = ?inspection.width, frame_height = ?inspection.height, entropy_bytes = inspection.entropy_bytes, entropy_distinct_bytes = inspection.entropy_distinct_bytes, entropy_dominant_pct = inspection.entropy_dominant_pct, entropy_max_run = inspection.entropy_max_run, "📸⚠️ freezing suspicious normalized direct MJPEG frame before UVC spool" ); return; } let timing = MjpegSpoolTiming::mjpeg_normalized(pkt.pts); if let Err(err) = spool_mjpeg_frame_with_timing(path, normalized, Some(timing)) { warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool normalized direct MJPEG frame for UVC helper"); } else { self.normalized_mjpeg_miss_count .store(0, std::sync::atomic::Ordering::Relaxed); self.last_mjpeg_passthrough_bytes .store(normalized.len() as u64, std::sync::atomic::Ordering::Relaxed); } } } impl Drop for WebcamSink { fn drop(&mut self) { let _ = self.pipe.set_state(gst::State::Null); #[cfg(not(coverage))] { let _ = self._bus_watch.take(); } } } #[cfg(test)] mod tests { #[test] fn mjpeg_spool_byte_guard_accepts_jpeg_and_identifies_hevc_annex_b() { assert!(super::looks_like_mjpeg_frame(&[ 0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10 ])); assert!(!super::looks_like_mjpeg_frame(&[ 0x00, 0x00, 0x00, 0x01, 0x46, 0x01 ])); assert!(super::looks_like_annex_b_hevc(&[ 0x00, 0x00, 0x00, 0x01, 0x46, 0x01 ])); assert!(super::looks_like_annex_b_hevc(&[ 0x00, 0x00, 0x01, 0x26 ])); assert!(!super::looks_like_annex_b_hevc(&[ 0xff, 0xd8, 0xff, 0xdb ])); } #[test] fn uvc_session_clock_alignment_defaults_on_and_accepts_disable_overrides() { temp_env::with_var_unset("LESAVKA_UVC_SESSION_CLOCK_ALIGN", || { assert!(super::uvc_sink_session_clock_align_enabled()); }); for disabled in ["0", "false", "no", "off"] { temp_env::with_var("LESAVKA_UVC_SESSION_CLOCK_ALIGN", Some(disabled), || { assert!(!super::uvc_sink_session_clock_align_enabled()); }); } temp_env::with_var("LESAVKA_UVC_SESSION_CLOCK_ALIGN", Some("1"), || { assert!(super::uvc_sink_session_clock_align_enabled()); }); } #[test] fn mjpeg_uvc_sink_defaults_to_mmap_io_mode_with_safe_override() { temp_env::with_var_unset("LESAVKA_UVC_MJPEG_IO_MODE", || { assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "mmap"); }); temp_env::with_var("LESAVKA_UVC_MJPEG_IO_MODE", Some("mmap"), || { assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "mmap"); }); temp_env::with_var("LESAVKA_UVC_MJPEG_IO_MODE", Some("not-real"), || { assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "mmap"); }); } #[test] fn mjpeg_spool_defaults_on_with_path_override() { temp_env::with_var_unset("LESAVKA_UVC_MJPEG_SPOOL", || { assert!(super::mjpeg_spool_enabled()); }); temp_env::with_var("LESAVKA_UVC_MJPEG_SPOOL", Some("0"), || { assert!(!super::mjpeg_spool_enabled()); }); for disabled in ["false", "no", "off"] { temp_env::with_var("LESAVKA_UVC_MJPEG_SPOOL", Some(disabled), || { assert!(!super::mjpeg_spool_enabled()); }); } temp_env::with_var("LESAVKA_UVC_FRAME_PATH", Some("/tmp/frame.mjpg"), || { assert_eq!( super::mjpeg_spool_path(), std::path::PathBuf::from("/tmp/frame.mjpg") ); }); } #[test] fn uvc_appsrc_limits_default_to_freshness_first_bounds() { temp_env::with_var_unset("LESAVKA_UVC_APP_MAX_BUFFERS", || { temp_env::with_var_unset("LESAVKA_UVC_APP_MAX_BYTES", || { temp_env::with_var_unset("LESAVKA_UVC_APP_MAX_TIME_NS", || { temp_env::with_var_unset("LESAVKA_UVC_APP_LEAKY_TYPE", || { assert_eq!(super::uvc_appsrc_max_buffers(), 4); assert_eq!(super::uvc_appsrc_max_bytes(), 4 * 1024 * 1024); assert_eq!(super::uvc_appsrc_max_time_ns(), 200_000_000); assert_eq!(super::uvc_appsrc_leaky_type(), "downstream"); }); }); }); }); } #[test] fn uvc_appsrc_limits_accept_positive_safe_overrides_only() { temp_env::with_var("LESAVKA_UVC_APP_MAX_BUFFERS", Some("6"), || { temp_env::with_var("LESAVKA_UVC_APP_MAX_BYTES", Some("1048576"), || { temp_env::with_var("LESAVKA_UVC_APP_MAX_TIME_NS", Some("100000000"), || { temp_env::with_var("LESAVKA_UVC_APP_LEAKY_TYPE", Some("upstream"), || { assert_eq!(super::uvc_appsrc_max_buffers(), 6); assert_eq!(super::uvc_appsrc_max_bytes(), 1_048_576); assert_eq!(super::uvc_appsrc_max_time_ns(), 100_000_000); assert_eq!(super::uvc_appsrc_leaky_type(), "upstream"); }); }); }); }); temp_env::with_var("LESAVKA_UVC_APP_MAX_BUFFERS", Some("0"), || { temp_env::with_var("LESAVKA_UVC_APP_MAX_BYTES", Some("nope"), || { temp_env::with_var("LESAVKA_UVC_APP_MAX_TIME_NS", Some("0"), || { temp_env::with_var("LESAVKA_UVC_APP_LEAKY_TYPE", Some("sideways"), || { assert_eq!(super::uvc_appsrc_max_buffers(), 4); assert_eq!(super::uvc_appsrc_max_bytes(), 4 * 1024 * 1024); assert_eq!(super::uvc_appsrc_max_time_ns(), 200_000_000); assert_eq!(super::uvc_appsrc_leaky_type(), "downstream"); }); }); }); }); } #[test] fn hevc_spool_freshness_bounds_default_to_tiny_live_handoff() { temp_env::with_var_unset("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", || { temp_env::with_var_unset("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", || { assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 2); assert_eq!(super::uvc_hevc_decode_miss_limit(), 15); }); }); } #[test] fn hevc_spool_freshness_bounds_accept_only_small_positive_queue_depths() { temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("3"), || { temp_env::with_var("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", Some("4"), || { assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 3); assert_eq!(super::uvc_hevc_decode_miss_limit(), 4); }); }); temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("99"), || { assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 4); }); temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("0"), || { temp_env::with_var("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", Some("0"), || { assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 2); assert_eq!(super::uvc_hevc_decode_miss_limit(), 15); }); }); } #[cfg(not(coverage))] #[test] fn webcam_bus_watch_stops_promptly_on_drop() { use gstreamer as gst; use gstreamer::prelude::ElementExt; gst::init().expect("gstreamer init"); let pipeline = gst::Pipeline::new(); let bus = pipeline.bus().expect("pipeline bus"); let started = std::time::Instant::now(); drop(super::WebcamBusWatchHandle::spawn(bus, "test-webcam")); assert!( started.elapsed() < std::time::Duration::from_secs(1), "webcam bus watcher should not outlive dropped webcam sinks" ); } }