use anyhow::Context; use gstreamer as gst; use gstreamer::prelude::*; use gstreamer_app as gst_app; use std::fs; use lesavka_common::lesavka::VideoPacket; use std::path::{Path, PathBuf}; use std::sync::{ Arc, atomic::{AtomicBool, AtomicU64, Ordering}, }; use tracing::warn; use crate::camera::{CameraCodec, CameraConfig}; use crate::video_support::{ contains_idr, dev_mode_enabled, require_h264_decoder, require_hevc_decoder, }; #[path = "webcam_sink/constructor.rs"] mod constructor; #[path = "webcam_sink/frame_handoff.rs"] mod frame_handoff; mod mjpeg_spool; #[cfg(not(coverage))] use gst::MessageView::{Error, StateChanged, Warning}; /// Push H.264 or MJPEG frames into the USB UVC gadget. /// /// Inputs: a UVC device node and the negotiated camera configuration. /// Outputs: a live `WebcamSink` that accepts `VideoPacket`s. /// Why: the UVC sink owns the GStreamer pipeline details for gadget output so /// the relay logic can focus on session lifecycle instead of media plumbing. pub struct WebcamSink { appsrc: gst_app::AppSrc, pipe: gst::Pipeline, clock_aligned: AtomicBool, next_pts_us: AtomicU64, frame_step_us: u64, mjpeg_spool_path: Option, direct_mjpeg_appsrc: Option, normalized_mjpeg_sink: Option, hevc_mjpeg_appsrc: Option, decoded_mjpeg_sink: Option, last_mjpeg_passthrough_bytes: AtomicU64, direct_mjpeg_max_bytes: usize, uvc_width: u16, uvc_height: u16, direct_mjpeg_profile_mismatch_seen: AtomicBool, last_decoded_mjpeg_bytes: AtomicU64, direct_mjpeg_normalize_bypassed: AtomicBool, normalized_mjpeg_miss_count: AtomicU64, normalized_mjpeg_memory_check_count: AtomicU64, decoded_mjpeg_miss_count: AtomicU64, decode_recovery_needs_irap: AtomicBool, #[cfg(not(coverage))] _bus_watch: Option, } fn uvc_sink_session_clock_align_enabled() -> bool { std::env::var("LESAVKA_UVC_SESSION_CLOCK_ALIGN") .ok() .map(|value| { let trimmed = value.trim(); !(trimmed.eq_ignore_ascii_case("0") || trimmed.eq_ignore_ascii_case("false") || trimmed.eq_ignore_ascii_case("no") || trimmed.eq_ignore_ascii_case("off")) }) .unwrap_or(true) } fn uvc_mjpeg_v4l2sink_io_mode() -> String { let value = std::env::var("LESAVKA_UVC_MJPEG_IO_MODE").unwrap_or_else(|_| "mmap".to_string()); let trimmed = value.trim().to_ascii_lowercase(); match trimmed.as_str() { "auto" | "rw" | "mmap" | "userptr" | "dmabuf" | "dmabuf-import" => trimmed, _ => { warn!( value, "invalid LESAVKA_UVC_MJPEG_IO_MODE; falling back to mmap" ); "mmap".to_string() } } } fn positive_u64_env(name: &str, default: u64) -> u64 { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value > 0) .unwrap_or(default) } fn uvc_appsrc_max_buffers() -> u64 { positive_u64_env("LESAVKA_UVC_APP_MAX_BUFFERS", 4) } fn uvc_appsrc_max_bytes() -> u64 { positive_u64_env("LESAVKA_UVC_APP_MAX_BYTES", 4 * 1024 * 1024) } fn uvc_appsrc_max_time_ns() -> u64 { positive_u64_env("LESAVKA_UVC_APP_MAX_TIME_NS", 200_000_000) } fn uvc_appsrc_leaky_type() -> String { std::env::var("LESAVKA_UVC_APP_LEAKY_TYPE") .ok() .map(|value| value.trim().to_ascii_lowercase()) .filter(|value| matches!(value.as_str(), "downstream" | "upstream" | "none")) .unwrap_or_else(|| "downstream".to_string()) } fn looks_like_mjpeg_frame(data: &[u8]) -> bool { data.len() >= 4 && data.starts_with(&[0xff, 0xd8, 0xff]) } fn looks_like_annex_b_hevc(data: &[u8]) -> bool { data.starts_with(&[0, 0, 0, 1]) || data.starts_with(&[0, 0, 1]) } fn uvc_hevc_freshness_queue_buffers() -> u32 { positive_u64_env("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", 2) .clamp(1, 4) as u32 } fn uvc_hevc_decode_miss_limit() -> u64 { positive_u64_env("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", 15) } /// Bound the UVC ingress queue so decode/UVC stalls cannot turn into RSS growth. /// /// Inputs: the UVC `appsrc`. Output: side-effect-only GStreamer properties. /// Why: live webcam output should prefer dropping stale frames over buffering /// seconds or minutes of encoded media when the physical sink falls behind. fn configure_uvc_appsrc(appsrc: &gst_app::AppSrc) { appsrc.set_property("block", false); if appsrc.has_property("max-buffers", None) { appsrc.set_property("max-buffers", uvc_appsrc_max_buffers()); } if appsrc.has_property("max-bytes", None) { appsrc.set_property("max-bytes", uvc_appsrc_max_bytes()); } if appsrc.has_property("max-time", None) { appsrc.set_property("max-time", uvc_appsrc_max_time_ns()); } if appsrc.has_property("leaky-type", None) { appsrc.set_property_from_str("leaky-type", &uvc_appsrc_leaky_type()); } } /// Build a tiny leaky queue for the decoded HEVC handoff branch. /// /// Inputs: a stable queue name. Output: configured GStreamer queue element. /// Why: hidden raw/JPEG backlogs are memory leaks in a live webcam path; after /// HEVC is decoded it is safe to drop stale raw frames and keep only the newest /// candidate for MJPEG publication while absorbing normal decoder scheduling /// jitter. #[cfg(not(coverage))] fn build_hevc_freshness_queue(name: &str) -> anyhow::Result { let queue = gst::ElementFactory::make("queue") .name(name) .property("max-size-buffers", uvc_hevc_freshness_queue_buffers()) .property("max-size-bytes", 0u32) .property("max-size-time", 0u64) .build()?; queue.set_property_from_str("leaky", "downstream"); Ok(queue) } #[cfg(not(coverage))] fn direct_mjpeg_normalize_pull_timeout() -> gst::ClockTime { gst::ClockTime::from_mseconds(u64::from( hevc_mjpeg_guard::direct_mjpeg_normalize_pull_timeout_ms(), )) } #[cfg(not(coverage))] fn current_process_rss_kb() -> Option { let status = fs::read_to_string("/proc/self/status").ok()?; status.lines().find_map(|line| { let rest = line.strip_prefix("VmRSS:")?; rest.split_whitespace().next()?.parse::().ok() }) } /// Drain normalized direct-MJPEG output down to the freshest sample. /// /// Inputs: the direct-MJPEG normalization appsink. Output: newest available /// sample, if any. Why: decode/re-encode should sanitize browser-facing MJPEG /// without letting stale frames accumulate behind the live webcam feed. #[cfg(not(coverage))] fn freshest_direct_mjpeg_sample(sink: &gst_app::AppSink) -> Option { let mut newest = sink.try_pull_sample(direct_mjpeg_normalize_pull_timeout()); while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) { newest = Some(sample); } newest } #[cfg(not(coverage))] fn build_direct_mjpeg_normalize_branch( pipeline: &gst::Pipeline, width: i32, height: i32, _fps: i32, ) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> { let src = gst::ElementFactory::make("appsrc") .name("direct_mjpeg_normalize_src") .build()? .downcast::() .expect("direct MJPEG normalize appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); src.set_property("do-timestamp", true); configure_uvc_appsrc(&src); let caps_in = gst::Caps::builder("image/jpeg").build(); src.set_caps(Some(&caps_in)); let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .field("colorimetry", "2:4:7:1") .build(); let decoder = gst::ElementFactory::make("jpegdec").build()?; let decoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_decoded_queue")?; let convert = gst::ElementFactory::make("videoconvert").build()?; let scale = gst::ElementFactory::make("videoscale").build()?; let raw_caps = gst::Caps::builder("video/x-raw") .field("width", width) .field("height", height) .build(); let raw_capsfilter = gst::ElementFactory::make("capsfilter") .property("caps", &raw_caps) .build()?; let encoder = gst::ElementFactory::make("jpegenc") .property( "quality", hevc_mjpeg_guard::direct_mjpeg_jpeg_quality() as i32, ) .build()?; let encoded_caps = gst::ElementFactory::make("capsfilter") .property("caps", &caps_mjpeg) .build()?; let encoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_encoded_queue")?; let sink = gst::ElementFactory::make("appsink") .name("direct_mjpeg_normalize_sink") .property("sync", false) .property("enable-last-sample", false) .property("emit-signals", false) .property("max-buffers", 1u32) .property("drop", true) .build()? .downcast::() .expect("direct MJPEG normalize appsink"); pipeline.add_many([ src.upcast_ref(), &decoder, &decoded_queue, &convert, &scale, &raw_capsfilter, &encoder, &encoded_caps, &encoded_queue, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &decoder, &decoded_queue, &convert, &scale, &raw_capsfilter, &encoder, &encoded_caps, &encoded_queue, sink.upcast_ref(), ])?; Ok((src, sink)) } #[cfg(not(coverage))] fn add_hevc_mjpeg_spool_branch( pipeline: &gst::Pipeline, width: i32, height: i32, fps: i32, ) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> { let src = gst::ElementFactory::make("appsrc") .name("dynamic_hevc_mjpeg_src") .build()? .downcast::() .expect("dynamic HEVC appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); src.set_property("do-timestamp", false); configure_uvc_appsrc(&src); let caps_hevc = gst::Caps::builder("video/x-h265") .field("stream-format", "byte-stream") .field("alignment", "au") .build(); src.set_caps(Some(&caps_hevc)); let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) .field("colorimetry", "2:4:7:1") .build(); let h265parse = gst::ElementFactory::make("h265parse") .property("disable-passthrough", true) .property("config-interval", -1i32) .build()?; let decoder_name = require_hevc_decoder()?; let decoder = gst::ElementFactory::make(decoder_name) .build() .with_context(|| format!("building dynamic HEVC decoder element {decoder_name}"))?; configure_hevc_decoder(&decoder); let decoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_decoded_queue")?; let convert = gst::ElementFactory::make("videoconvert").build()?; let encoder = gst::ElementFactory::make("jpegenc") .property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32) .build()?; let caps = gst::ElementFactory::make("capsfilter") .property("caps", &caps_mjpeg) .build()?; let encoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_encoded_queue")?; let sink = gst::ElementFactory::make("appsink") .name("dynamic_hevc_mjpeg_spool_sink") .property("sync", false) .property("enable-last-sample", false) .property("emit-signals", false) .property("max-buffers", 1u32) .property("drop", true) .build()? .downcast::() .expect("dynamic HEVC appsink"); pipeline.add_many([ src.upcast_ref(), &h265parse, &decoder, &decoded_queue, &convert, &encoder, &caps, &encoded_queue, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &h265parse, &decoder, &decoded_queue, &convert, &encoder, &caps, &encoded_queue, sink.upcast_ref(), ])?; Ok((src, sink)) } /// Configure conservative recovery knobs on hardware HEVC decoders. /// /// Inputs: a decoder element selected by `require_hevc_decoder`. Output: /// side-effect-only property updates when the element supports them. Why: the /// Pi stateless decoder can otherwise hold onto corrupt or dependency-missing /// pictures after live HEVC packet drops, starving the MJPEG UVC handoff. #[cfg(not(coverage))] fn configure_hevc_decoder(decoder: &gst::Element) { if decoder.has_property("discard-corrupted-frames", None) { decoder.set_property("discard-corrupted-frames", true); } if decoder.has_property("automatic-request-sync-points", None) { decoder.set_property("automatic-request-sync-points", true); } } #[cfg(not(coverage))] struct WebcamBusWatchHandle { alive: Arc, join: Option>, } #[cfg(not(coverage))] impl WebcamBusWatchHandle { fn spawn(bus: gst::Bus, label: &'static str) -> Self { let alive = Arc::new(AtomicBool::new(true)); let alive_flag = Arc::clone(&alive); let join = std::thread::spawn(move || { while alive_flag.load(Ordering::Relaxed) { let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else { continue; }; match msg.view() { StateChanged(state) if state.current() == gst::State::Playing && msg.src().is_some_and(|src| src.is::()) => { tracing::debug!(target: "lesavka_server::video", label, "📸 UVC webcam pipeline ▶️"); } Error(err) => tracing::error!( target: "lesavka_server::video", label, "📸💥 UVC webcam pipeline error from {:?}: {} ({})", msg.src().map(gst::prelude::GstObjectExt::path_string), err.error(), err.debug().unwrap_or_default() ), Warning(warning) => tracing::warn!( target: "lesavka_server::video", label, "📸⚠️ UVC webcam pipeline warning from {:?}: {} ({})", msg.src().map(gst::prelude::GstObjectExt::path_string), warning.error(), warning.debug().unwrap_or_default() ), _ => {} } } }); Self { alive, join: Some(join), } } } #[cfg(not(coverage))] impl Drop for WebcamBusWatchHandle { fn drop(&mut self) { self.alive.store(false, Ordering::Relaxed); if let Some(join) = self.join.take() { let _ = join.join(); } } } #[cfg(not(coverage))] fn spawn_webcam_bus_logger( pipeline: &gst::Pipeline, label: &'static str, ) -> Option { pipeline .bus() .map(|bus| WebcamBusWatchHandle::spawn(bus, label)) } impl Drop for WebcamSink { fn drop(&mut self) { let _ = self.pipe.set_state(gst::State::Null); #[cfg(not(coverage))] { let _ = self._bus_watch.take(); } } }