diff --git a/Cargo.lock b/Cargo.lock index e8b0e2d..6e1fc35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.10" +version = "0.22.11" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.10" +version = "0.22.11" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.10" +version = "0.22.11" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index bec7c47..10ed5a3 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.10" +version = "0.22.11" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 236f389..6108e0d 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.10" +version = "0.22.11" edition = "2024" build = "build.rs" diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 795fe03..ec54e80 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -1405,6 +1405,8 @@ SERVER_ENV_TMP=$(mktemp) printf 'LESAVKA_UVC_HEVC_MIN_REFERENCE_BYTES=%s\n' "${LESAVKA_UVC_HEVC_MIN_REFERENCE_BYTES:-65536}" printf 'LESAVKA_UVC_HEVC_MIN_PAYLOAD_DISTINCT_BYTES=%s\n' "${LESAVKA_UVC_HEVC_MIN_PAYLOAD_DISTINCT_BYTES:-12}" printf 'LESAVKA_UVC_HEVC_DOMINANT_BYTE_PCT=%s\n' "${LESAVKA_UVC_HEVC_DOMINANT_BYTE_PCT:-92}" + printf 'LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s\n' "${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-1}" + printf 'LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s\n' "${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}" printf 'LESAVKA_SERVER_BIND_ADDR=%s\n' "${INSTALL_SERVER_BIND_ADDR}" printf 'LESAVKA_UVC_CODEC=%s\n' "${INSTALL_UVC_CODEC}" printf 'LESAVKA_UVC_WIDTH=%s\n' "${LESAVKA_UVC_WIDTH:-1280}" @@ -1527,7 +1529,7 @@ TimeoutStopSec=10 KillSignal=SIGTERM KillMode=control-group Restart=always -Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=debug,lesavka_server::gadget=info +Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=info,lesavka_server::gadget=info Environment=RUST_BACKTRACE=1 Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6" Environment=LESAVKA_UVC_EXTERNAL=1 diff --git a/server/Cargo.toml b/server/Cargo.toml index ffd58b9..6740a7b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.10" +version = "0.22.11" edition = "2024" autobins = false diff --git a/server/src/video_sinks/webcam_sink.rs b/server/src/video_sinks/webcam_sink.rs index 7730c2d..9612077 100644 --- a/server/src/video_sinks/webcam_sink.rs +++ b/server/src/video_sinks/webcam_sink.rs @@ -6,16 +6,22 @@ use lesavka_common::lesavka::VideoPacket; use std::fs; use std::path::Path; use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, AtomicU64}; +use std::sync::{ + Arc, + atomic::{AtomicBool, AtomicU64, Ordering}, +}; use tracing::warn; use crate::camera::{CameraCodec, CameraConfig}; use crate::video_support::{ - contains_idr, dev_mode_enabled, require_h264_decoder, require_hevc_decoder, reserve_local_pts, + contains_hevc_irap, contains_idr, dev_mode_enabled, require_h264_decoder, + require_hevc_decoder, reserve_local_pts, }; mod mjpeg_spool; +#[cfg(not(coverage))] +use gst::MessageView::{Error, StateChanged, Warning}; #[cfg(not(coverage))] use mjpeg_spool::{freshest_mjpeg_sample, spool_mjpeg_frame_with_timing, MjpegSpoolTiming}; use mjpeg_spool::{mjpeg_spool_enabled, mjpeg_spool_path}; @@ -35,6 +41,10 @@ pub struct WebcamSink { mjpeg_spool_path: Option, decoded_mjpeg_sink: Option, last_decoded_mjpeg_bytes: AtomicU64, + decoded_mjpeg_miss_count: AtomicU64, + decode_recovery_needs_irap: AtomicBool, + #[cfg(not(coverage))] + _bus_watch: Option, } fn uvc_sink_session_clock_align_enabled() -> bool { @@ -93,6 +103,16 @@ fn uvc_appsrc_leaky_type() -> String { .unwrap_or_else(|| "downstream".to_string()) } +fn uvc_hevc_freshness_queue_buffers() -> u32 { + positive_u64_env("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", 1) + .min(4) + .max(1) as u32 +} + +fn uvc_hevc_decode_miss_limit() -> u64 { + positive_u64_env("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", 15) +} + /// Bound the UVC ingress queue so decode/UVC stalls cannot turn into RSS growth. /// /// Inputs: the UVC `appsrc`. Output: side-effect-only GStreamer properties. @@ -114,6 +134,94 @@ fn configure_uvc_appsrc(appsrc: &gst_app::AppSrc) { } } +/// Build a single-frame leaky queue for the decoded HEVC handoff branch. +/// +/// Inputs: a stable queue name. Output: configured GStreamer queue element. +/// Why: hidden raw/JPEG backlogs are memory leaks in a live webcam path; after +/// HEVC is decoded it is safe to drop stale raw frames and keep only the newest +/// candidate for MJPEG publication. +#[cfg(not(coverage))] +fn build_hevc_freshness_queue(name: &str) -> anyhow::Result { + let queue = gst::ElementFactory::make("queue") + .name(name) + .property("max-size-buffers", uvc_hevc_freshness_queue_buffers()) + .property("max-size-bytes", 0u32) + .property("max-size-time", 0u64) + .build()?; + queue.set_property_from_str("leaky", "downstream"); + Ok(queue) +} + +#[cfg(not(coverage))] +struct WebcamBusWatchHandle { + alive: Arc, + join: Option>, +} + +#[cfg(not(coverage))] +impl WebcamBusWatchHandle { + fn spawn(bus: gst::Bus, label: &'static str) -> Self { + let alive = Arc::new(AtomicBool::new(true)); + let alive_flag = Arc::clone(&alive); + let join = std::thread::spawn(move || { + while alive_flag.load(Ordering::Relaxed) { + let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else { + continue; + }; + match msg.view() { + StateChanged(state) + if state.current() == gst::State::Playing + && msg.src().is_some_and(|src| src.is::()) => + { + tracing::debug!(target: "lesavka_server::video", label, "📸 UVC webcam pipeline ▶️"); + } + Error(err) => tracing::error!( + target: "lesavka_server::video", + label, + "📸💥 UVC webcam pipeline error from {:?}: {} ({})", + msg.src().map(gst::prelude::GstObjectExt::path_string), + err.error(), + err.debug().unwrap_or_default() + ), + Warning(warning) => tracing::warn!( + target: "lesavka_server::video", + label, + "📸⚠️ UVC webcam pipeline warning from {:?}: {} ({})", + msg.src().map(gst::prelude::GstObjectExt::path_string), + warning.error(), + warning.debug().unwrap_or_default() + ), + _ => {} + } + } + }); + Self { + alive, + join: Some(join), + } + } +} + +#[cfg(not(coverage))] +impl Drop for WebcamBusWatchHandle { + fn drop(&mut self) { + self.alive.store(false, Ordering::Relaxed); + if let Some(join) = self.join.take() { + let _ = join.join(); + } + } +} + +#[cfg(not(coverage))] +fn spawn_webcam_bus_logger( + pipeline: &gst::Pipeline, + label: &'static str, +) -> Option { + pipeline + .bus() + .map(|bus| WebcamBusWatchHandle::spawn(bus, label)) +} + impl WebcamSink { /// Build a new webcam sink pipeline. /// @@ -157,6 +265,8 @@ impl WebcamSink { mjpeg_spool_path: None, decoded_mjpeg_sink: None, last_decoded_mjpeg_bytes: AtomicU64::new(0), + decoded_mjpeg_miss_count: AtomicU64::new(0), + decode_recovery_needs_irap: AtomicBool::new(false), }) } @@ -262,6 +372,7 @@ impl WebcamSink { let decoder = gst::ElementFactory::make(decoder_name) .build() .with_context(|| format!("building HEVC decoder element {decoder_name}"))?; + let decoded_queue = build_hevc_freshness_queue("hevc_mjpeg_decoded_queue")?; let convert = gst::ElementFactory::make("videoconvert").build()?; let encoder = gst::ElementFactory::make("jpegenc") .property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32) @@ -269,6 +380,7 @@ impl WebcamSink { let caps = gst::ElementFactory::make("capsfilter") .property("caps", &caps_mjpeg) .build()?; + let encoded_queue = build_hevc_freshness_queue("hevc_mjpeg_encoded_queue")?; tracing::info!( target: "lesavka_server::video", @@ -279,9 +391,10 @@ impl WebcamSink { if mjpeg_spool_enabled() { let sink = gst::ElementFactory::make("appsink") .name("hevc_mjpeg_spool_sink") - .property("sync", clock_align_enabled) + .property("sync", false) + .property("enable-last-sample", false) .property("emit-signals", false) - .property("max-buffers", 2u32) + .property("max-buffers", 1u32) .property("drop", true) .build()? .downcast::() @@ -290,18 +403,22 @@ impl WebcamSink { src.upcast_ref(), &h265parse, &decoder, + &decoded_queue, &convert, &encoder, &caps, + &encoded_queue, sink.upcast_ref(), ])?; gst::Element::link_many([ src.upcast_ref(), &h265parse, &decoder, + &decoded_queue, &convert, &encoder, &caps, + &encoded_queue, sink.upcast_ref(), ])?; mjpeg_spool_file = Some(mjpeg_spool_path()); @@ -388,6 +505,7 @@ impl WebcamSink { ])?; } pipeline.set_state(gst::State::Playing)?; + let bus_watch = spawn_webcam_bus_logger(&pipeline, "uvc-webcam"); let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1); Ok(Self { @@ -399,6 +517,9 @@ impl WebcamSink { mjpeg_spool_path: mjpeg_spool_file, decoded_mjpeg_sink, last_decoded_mjpeg_bytes: AtomicU64::new(0), + decoded_mjpeg_miss_count: AtomicU64::new(0), + decode_recovery_needs_irap: AtomicBool::new(false), + _bus_watch: bus_watch, }) } @@ -416,6 +537,27 @@ impl WebcamSink { #[cfg(not(coverage))] pub fn push(&self, pkt: VideoPacket) { + let hevc_recovery_frame = + self.decoded_mjpeg_sink.is_some() && contains_hevc_irap(&pkt.data); + if self.decoded_mjpeg_sink.is_some() + && self + .decode_recovery_needs_irap + .load(std::sync::atomic::Ordering::Relaxed) + { + if !hevc_recovery_frame { + return; + } + self.decode_recovery_needs_irap + .store(false, std::sync::atomic::Ordering::Relaxed); + self.decoded_mjpeg_miss_count + .store(0, std::sync::atomic::Ordering::Relaxed); + tracing::info!( + target: "lesavka_server::video", + pts = pkt.pts, + "📸 HEVC decoded-MJPEG handoff found a recovery keyframe" + ); + } + if let Some(path) = &self.mjpeg_spool_path && self.decoded_mjpeg_sink.is_none() { @@ -450,6 +592,8 @@ impl WebcamSink { && let Some(buffer) = sample.buffer() && let Ok(map) = buffer.map_readable() { + self.decoded_mjpeg_miss_count + .store(0, std::sync::atomic::Ordering::Relaxed); let decoded_pts_us = buffer.pts().map(|pts| pts.nseconds() / 1_000); let timing = MjpegSpoolTiming::hevc_decoded_mjpeg(pkt.pts, decoded_pts_us); let previous_bytes = self @@ -472,6 +616,24 @@ impl WebcamSink { self.last_decoded_mjpeg_bytes .store(decoded_bytes as u64, std::sync::atomic::Ordering::Relaxed); } + } else if self.decoded_mjpeg_sink.is_some() { + let misses = self + .decoded_mjpeg_miss_count + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + + 1; + let limit = uvc_hevc_decode_miss_limit(); + if misses >= limit { + self.decode_recovery_needs_irap + .store(true, std::sync::atomic::Ordering::Relaxed); + self.decoded_mjpeg_miss_count + .store(0, std::sync::atomic::Ordering::Relaxed); + warn!( + target: "lesavka_server::video", + misses, + limit, + "📸⚠️ HEVC decoded-MJPEG handoff produced no frames; freezing output until the next recovery keyframe" + ); + } } } } @@ -479,6 +641,10 @@ impl WebcamSink { impl Drop for WebcamSink { fn drop(&mut self) { let _ = self.pipe.set_state(gst::State::Null); + #[cfg(not(coverage))] + { + let _ = self._bus_watch.take(); + } } } @@ -583,4 +749,54 @@ mod tests { }); }); } + + #[test] + fn hevc_spool_freshness_bounds_default_to_single_frame_recovery() { + temp_env::with_var_unset("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", || { + temp_env::with_var_unset("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", || { + assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 1); + assert_eq!(super::uvc_hevc_decode_miss_limit(), 15); + }); + }); + } + + #[test] + fn hevc_spool_freshness_bounds_accept_only_small_positive_queue_depths() { + temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("3"), || { + temp_env::with_var("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", Some("4"), || { + assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 3); + assert_eq!(super::uvc_hevc_decode_miss_limit(), 4); + }); + }); + + temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("99"), || { + assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 4); + }); + + temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("0"), || { + temp_env::with_var("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", Some("0"), || { + assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 1); + assert_eq!(super::uvc_hevc_decode_miss_limit(), 15); + }); + }); + } + + #[cfg(not(coverage))] + #[test] + fn webcam_bus_watch_stops_promptly_on_drop() { + use gstreamer as gst; + use gstreamer::prelude::ElementExt; + + gst::init().expect("gstreamer init"); + let pipeline = gst::Pipeline::new(); + let bus = pipeline.bus().expect("pipeline bus"); + let started = std::time::Instant::now(); + + drop(super::WebcamBusWatchHandle::spawn(bus, "test-webcam")); + + assert!( + started.elapsed() < std::time::Duration::from_secs(1), + "webcam bus watcher should not outlive dropped webcam sinks" + ); + } } diff --git a/tests/api/server/upstream_media_runtime/server_upstream_media_v2_handoff_contract.rs b/tests/api/server/upstream_media_runtime/server_upstream_media_v2_handoff_contract.rs index 5a47fef..f951448 100644 --- a/tests/api/server/upstream_media_runtime/server_upstream_media_v2_handoff_contract.rs +++ b/tests/api/server/upstream_media_runtime/server_upstream_media_v2_handoff_contract.rs @@ -116,14 +116,21 @@ fn hevc_ingress_decodes_to_existing_mjpeg_uvc_path() { "let use_hevc = matches!(cfg.codec, CameraCodec::Hevc);", "video/x-h265", "h265parse", - "pick_hevc_decoder()", + "require_hevc_decoder()", "jpegenc", "LESAVKA_UVC_HEVC_JPEG_QUALITY", "LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS", + "LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", "image/jpeg", + "hevc_mjpeg_decoded_queue", + "hevc_mjpeg_encoded_queue", "hevc_mjpeg_spool_sink", - ".property(\"sync\", clock_align_enabled)", + ".property(\"sync\", false)", + ".property(\"enable-last-sample\", false)", + "decode_recovery_needs_irap", + "contains_hevc_irap(&pkt.data)", "failed to spool decoded HEVC frame for UVC helper", + "produced no frames; freezing output until the next recovery keyframe", "HEVC camera uplink will be decoded and emitted as MJPEG/UVC", ] { assert!( diff --git a/tests/installer/scripts/install/server_install_script_contract.rs b/tests/installer/scripts/install/server_install_script_contract.rs index 0a609b4..4b7cd09 100644 --- a/tests/installer/scripts/install/server_install_script_contract.rs +++ b/tests/installer/scripts/install/server_install_script_contract.rs @@ -32,6 +32,8 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { "LESAVKA_UPSTREAM_PAIR_SLACK_US=%s", "LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS=%s", "LESAVKA_UPSTREAM_STALE_DROP_MS=%s", + "LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS=%s", + "LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT=%s", "LESAVKA_SERVER_BIND_ADDR=%s", "/etc/lesavka/uvc.env", "LESAVKA_UVC_MAXPACKET=", @@ -141,6 +143,12 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS:-350}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS:-1}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT:-15}")); + assert!( + SERVER_INSTALL.contains("lesavka_server::video=info"), + "server installs should not leave the hot webcam frame path at debug logging by default" + ); assert!( SERVER_INSTALL.contains("LESAVKA_UPSTREAM_MJPEG_VIDEO_PLAYOUT_MODE_OFFSETS_US"), "installer should persist MJPEG-specific calibration maps"