lesavka/server/src/video_sinks/webcam_sink.rs

1044 lines
40 KiB
Rust
Raw Normal View History

use anyhow::Context;
use gstreamer as gst;
use gstreamer::prelude::*;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use std::fs;
use std::path::Path;
use std::path::PathBuf;
2026-05-12 10:46:56 -03:00
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use tracing::warn;
use crate::camera::{CameraCodec, CameraConfig};
use crate::video_support::{
2026-05-12 10:46:56 -03:00
contains_hevc_irap, contains_idr, dev_mode_enabled, require_h264_decoder,
require_hevc_decoder, reserve_local_pts,
};
mod mjpeg_spool;
2026-05-12 10:46:56 -03:00
#[cfg(not(coverage))]
use gst::MessageView::{Error, StateChanged, Warning};
#[cfg(not(coverage))]
2026-05-14 05:18:36 -03:00
use mjpeg_spool::{
MjpegSpoolTiming, freshest_mjpeg_sample, mjpeg_spool_frame_max_bytes,
spool_mjpeg_frame_with_timing,
};
use mjpeg_spool::{mjpeg_spool_enabled, mjpeg_spool_path};
/// Push H.264 or MJPEG frames into the USB UVC gadget.
///
/// Inputs: a UVC device node and the negotiated camera configuration.
/// Outputs: a live `WebcamSink` that accepts `VideoPacket`s.
/// Why: the UVC sink owns the GStreamer pipeline details for gadget output so
/// the relay logic can focus on session lifecycle instead of media plumbing.
pub struct WebcamSink {
appsrc: gst_app::AppSrc,
pipe: gst::Pipeline,
clock_aligned: AtomicBool,
next_pts_us: AtomicU64,
frame_step_us: u64,
mjpeg_spool_path: Option<PathBuf>,
hevc_mjpeg_appsrc: Option<gst_app::AppSrc>,
decoded_mjpeg_sink: Option<gst_app::AppSink>,
last_mjpeg_passthrough_bytes: AtomicU64,
2026-05-14 05:18:36 -03:00
direct_mjpeg_max_bytes: usize,
uvc_width: u16,
uvc_height: u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool,
last_decoded_mjpeg_bytes: AtomicU64,
2026-05-12 10:46:56 -03:00
decoded_mjpeg_miss_count: AtomicU64,
decode_recovery_needs_irap: AtomicBool,
#[cfg(not(coverage))]
_bus_watch: Option<WebcamBusWatchHandle>,
}
fn uvc_sink_session_clock_align_enabled() -> bool {
std::env::var("LESAVKA_UVC_SESSION_CLOCK_ALIGN")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(true)
}
fn uvc_mjpeg_v4l2sink_io_mode() -> String {
let value = std::env::var("LESAVKA_UVC_MJPEG_IO_MODE").unwrap_or_else(|_| "mmap".to_string());
let trimmed = value.trim().to_ascii_lowercase();
match trimmed.as_str() {
"auto" | "rw" | "mmap" | "userptr" | "dmabuf" | "dmabuf-import" => trimmed,
_ => {
warn!(
value,
"invalid LESAVKA_UVC_MJPEG_IO_MODE; falling back to mmap"
);
"mmap".to_string()
}
}
}
2026-05-12 03:02:57 -03:00
fn positive_u64_env(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
fn uvc_appsrc_max_buffers() -> u64 {
positive_u64_env("LESAVKA_UVC_APP_MAX_BUFFERS", 4)
}
fn uvc_appsrc_max_bytes() -> u64 {
positive_u64_env("LESAVKA_UVC_APP_MAX_BYTES", 4 * 1024 * 1024)
}
fn uvc_appsrc_max_time_ns() -> u64 {
positive_u64_env("LESAVKA_UVC_APP_MAX_TIME_NS", 200_000_000)
}
fn uvc_appsrc_leaky_type() -> String {
std::env::var("LESAVKA_UVC_APP_LEAKY_TYPE")
.ok()
.map(|value| value.trim().to_ascii_lowercase())
.filter(|value| matches!(value.as_str(), "downstream" | "upstream" | "none"))
.unwrap_or_else(|| "downstream".to_string())
}
fn looks_like_mjpeg_frame(data: &[u8]) -> bool {
data.len() >= 4 && data.starts_with(&[0xff, 0xd8, 0xff])
}
fn looks_like_annex_b_hevc(data: &[u8]) -> bool {
data.starts_with(&[0, 0, 0, 1]) || data.starts_with(&[0, 0, 1])
}
2026-05-12 10:46:56 -03:00
fn uvc_hevc_freshness_queue_buffers() -> u32 {
positive_u64_env("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", 2)
2026-05-12 10:46:56 -03:00
.min(4)
.max(1) as u32
}
fn uvc_hevc_decode_miss_limit() -> u64 {
positive_u64_env("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", 15)
}
2026-05-12 03:02:57 -03:00
/// Bound the UVC ingress queue so decode/UVC stalls cannot turn into RSS growth.
///
/// Inputs: the UVC `appsrc`. Output: side-effect-only GStreamer properties.
/// Why: live webcam output should prefer dropping stale frames over buffering
/// seconds or minutes of encoded media when the physical sink falls behind.
fn configure_uvc_appsrc(appsrc: &gst_app::AppSrc) {
appsrc.set_property("block", false);
if appsrc.has_property("max-buffers", None) {
appsrc.set_property("max-buffers", uvc_appsrc_max_buffers());
}
if appsrc.has_property("max-bytes", None) {
appsrc.set_property("max-bytes", uvc_appsrc_max_bytes());
}
if appsrc.has_property("max-time", None) {
appsrc.set_property("max-time", uvc_appsrc_max_time_ns());
}
if appsrc.has_property("leaky-type", None) {
appsrc.set_property_from_str("leaky-type", &uvc_appsrc_leaky_type());
}
}
/// Build a tiny leaky queue for the decoded HEVC handoff branch.
2026-05-12 10:46:56 -03:00
///
/// Inputs: a stable queue name. Output: configured GStreamer queue element.
/// Why: hidden raw/JPEG backlogs are memory leaks in a live webcam path; after
/// HEVC is decoded it is safe to drop stale raw frames and keep only the newest
/// candidate for MJPEG publication while absorbing normal decoder scheduling
/// jitter.
2026-05-12 10:46:56 -03:00
#[cfg(not(coverage))]
fn build_hevc_freshness_queue(name: &str) -> anyhow::Result<gst::Element> {
let queue = gst::ElementFactory::make("queue")
.name(name)
.property("max-size-buffers", uvc_hevc_freshness_queue_buffers())
.property("max-size-bytes", 0u32)
.property("max-size-time", 0u64)
.build()?;
queue.set_property_from_str("leaky", "downstream");
Ok(queue)
}
#[cfg(not(coverage))]
fn add_hevc_mjpeg_spool_branch(
pipeline: &gst::Pipeline,
width: i32,
height: i32,
fps: i32,
) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> {
let src = gst::ElementFactory::make("appsrc")
.name("dynamic_hevc_mjpeg_src")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("dynamic HEVC appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", false);
configure_uvc_appsrc(&src);
let caps_hevc = gst::Caps::builder("video/x-h265")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
src.set_caps(Some(&caps_hevc));
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
let h265parse = gst::ElementFactory::make("h265parse")
.property("disable-passthrough", true)
.property("config-interval", -1i32)
.build()?;
let decoder_name = require_hevc_decoder()?;
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building dynamic HEVC decoder element {decoder_name}"))?;
configure_hevc_decoder(&decoder);
let decoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_decoded_queue")?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let encoder = gst::ElementFactory::make("jpegenc")
.property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32)
.build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
let encoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_encoded_queue")?;
let sink = gst::ElementFactory::make("appsink")
.name("dynamic_hevc_mjpeg_spool_sink")
.property("sync", false)
.property("enable-last-sample", false)
.property("emit-signals", false)
.property("max-buffers", 1u32)
.property("drop", true)
.build()?
.downcast::<gst_app::AppSink>()
.expect("dynamic HEVC appsink");
pipeline.add_many([
src.upcast_ref(),
&h265parse,
&decoder,
&decoded_queue,
&convert,
&encoder,
&caps,
&encoded_queue,
sink.upcast_ref(),
])?;
gst::Element::link_many([
src.upcast_ref(),
&h265parse,
&decoder,
&decoded_queue,
&convert,
&encoder,
&caps,
&encoded_queue,
sink.upcast_ref(),
])?;
Ok((src, sink))
}
/// Configure conservative recovery knobs on hardware HEVC decoders.
///
/// Inputs: a decoder element selected by `require_hevc_decoder`. Output:
/// side-effect-only property updates when the element supports them. Why: the
/// Pi stateless decoder can otherwise hold onto corrupt or dependency-missing
/// pictures after live HEVC packet drops, starving the MJPEG UVC handoff.
#[cfg(not(coverage))]
fn configure_hevc_decoder(decoder: &gst::Element) {
if decoder.has_property("discard-corrupted-frames", None) {
decoder.set_property("discard-corrupted-frames", true);
}
if decoder.has_property("automatic-request-sync-points", None) {
decoder.set_property("automatic-request-sync-points", true);
}
}
2026-05-12 10:46:56 -03:00
#[cfg(not(coverage))]
struct WebcamBusWatchHandle {
alive: Arc<AtomicBool>,
join: Option<std::thread::JoinHandle<()>>,
}
#[cfg(not(coverage))]
impl WebcamBusWatchHandle {
fn spawn(bus: gst::Bus, label: &'static str) -> Self {
let alive = Arc::new(AtomicBool::new(true));
let alive_flag = Arc::clone(&alive);
let join = std::thread::spawn(move || {
while alive_flag.load(Ordering::Relaxed) {
let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else {
continue;
};
match msg.view() {
StateChanged(state)
if state.current() == gst::State::Playing
&& msg.src().is_some_and(|src| src.is::<gst::Pipeline>()) =>
{
tracing::debug!(target: "lesavka_server::video", label, "📸 UVC webcam pipeline ▶️");
}
Error(err) => tracing::error!(
target: "lesavka_server::video",
label,
"📸💥 UVC webcam pipeline error from {:?}: {} ({})",
msg.src().map(gst::prelude::GstObjectExt::path_string),
err.error(),
err.debug().unwrap_or_default()
),
Warning(warning) => tracing::warn!(
target: "lesavka_server::video",
label,
"📸⚠️ UVC webcam pipeline warning from {:?}: {} ({})",
msg.src().map(gst::prelude::GstObjectExt::path_string),
warning.error(),
warning.debug().unwrap_or_default()
),
_ => {}
}
}
});
Self {
alive,
join: Some(join),
}
}
}
#[cfg(not(coverage))]
impl Drop for WebcamBusWatchHandle {
fn drop(&mut self) {
self.alive.store(false, Ordering::Relaxed);
if let Some(join) = self.join.take() {
let _ = join.join();
}
}
}
#[cfg(not(coverage))]
fn spawn_webcam_bus_logger(
pipeline: &gst::Pipeline,
label: &'static str,
) -> Option<WebcamBusWatchHandle> {
pipeline
.bus()
.map(|bus| WebcamBusWatchHandle::spawn(bus, label))
}
impl WebcamSink {
/// Build a new webcam sink pipeline.
///
/// Inputs: the target UVC device plus the selected camera profile.
/// Outputs: a sink ready to receive `VideoPacket`s.
/// Why: UVC output has its own caps and decoder chain that differs from the
/// HDMI sink, so it lives in a dedicated constructor.
#[cfg(coverage)]
pub fn new(_uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let clock_align_enabled = uvc_sink_session_clock_align_enabled();
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", &false);
2026-05-12 03:02:57 -03:00
configure_uvc_appsrc(&src);
let sink = gst::ElementFactory::make("fakesink")
.build()
.context("building fakesink")?;
if clock_align_enabled {
crate::media_timing::prepare_pipeline_clock_sync(&pipeline);
crate::media_timing::enable_sink_clock_sync(&sink);
}
pipeline.add_many(&[src.upcast_ref(), &sink])?;
gst::Element::link_many(&[src.upcast_ref(), &sink])?;
pipeline.set_state(gst::State::Playing)?;
let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1);
Ok(Self {
appsrc: src,
pipe: pipeline,
clock_aligned: AtomicBool::new(!clock_align_enabled),
next_pts_us: AtomicU64::new(0),
frame_step_us,
mjpeg_spool_path: None,
hevc_mjpeg_appsrc: None,
decoded_mjpeg_sink: None,
last_mjpeg_passthrough_bytes: AtomicU64::new(0),
2026-05-14 05:18:36 -03:00
direct_mjpeg_max_bytes: mjpeg_spool::mjpeg_spool_frame_max_bytes(cfg.fps),
uvc_width: cfg.width.min(u32::from(u16::MAX)) as u16,
uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false),
last_decoded_mjpeg_bytes: AtomicU64::new(0),
2026-05-12 10:46:56 -03:00
decoded_mjpeg_miss_count: AtomicU64::new(0),
decode_recovery_needs_irap: AtomicBool::new(false),
})
}
#[cfg(not(coverage))]
pub fn new(uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let clock_align_enabled = uvc_sink_session_clock_align_enabled();
let width = cfg.width as i32;
let height = cfg.height as i32;
let fps = cfg.fps.max(1) as i32;
let use_mjpeg = matches!(cfg.codec, CameraCodec::Mjpeg);
let use_hevc = matches!(cfg.codec, CameraCodec::Hevc);
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", false);
2026-05-12 03:02:57 -03:00
configure_uvc_appsrc(&src);
if clock_align_enabled {
crate::media_timing::prepare_pipeline_clock_sync(&pipeline);
}
let mut mjpeg_spool_file = None;
let mut hevc_mjpeg_appsrc = None;
let mut decoded_mjpeg_sink = None;
if use_mjpeg && mjpeg_spool_enabled() {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
src.set_caps(Some(&caps_mjpeg));
let sink = gst::ElementFactory::make("fakesink")
.build()
.context("building fakesink for MJPEG UVC spool")?;
if clock_align_enabled {
crate::media_timing::enable_sink_clock_sync(&sink);
} else if sink.has_property("sync", None) {
sink.set_property("sync", false);
}
pipeline.add_many([src.upcast_ref(), &sink])?;
gst::Element::link_many([src.upcast_ref(), &sink])?;
mjpeg_spool_file = Some(mjpeg_spool_path());
match add_hevc_mjpeg_spool_branch(&pipeline, width, height, fps) {
Ok((hevc_src, hevc_sink)) => {
hevc_mjpeg_appsrc = Some(hevc_src);
decoded_mjpeg_sink = Some(hevc_sink);
tracing::info!(
target: "lesavka_server::video",
"📸 MJPEG UVC spool will also accept live HEVC uplink packets"
);
}
Err(err) => {
tracing::warn!(
target: "lesavka_server::video",
%err,
"📸⚠️ dynamic HEVC->MJPEG branch unavailable; MJPEG UVC spool will accept MJPEG only"
);
}
}
} else if use_mjpeg {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
src.set_caps(Some(&caps_mjpeg));
let queue = gst::ElementFactory::make("queue").build()?;
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", uvc_dev)
.build()?;
// Kept as an emergency fallback; normal MJPEG output is brokered
// through the UVC helper so only one process owns the gadget node.
sink.set_property_from_str("io-mode", &uvc_mjpeg_v4l2sink_io_mode());
if clock_align_enabled {
crate::media_timing::enable_sink_clock_sync(&sink);
} else if sink.has_property("sync", None) {
sink.set_property("sync", false);
}
pipeline.add_many([src.upcast_ref(), &queue, &capsfilter, &sink])?;
gst::Element::link_many([src.upcast_ref(), &queue, &capsfilter, &sink])?;
} else if use_hevc {
let caps_hevc = gst::Caps::builder("video/x-h265")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
src.set_caps(Some(&caps_hevc));
let h265parse = gst::ElementFactory::make("h265parse")
.property("disable-passthrough", true)
.property("config-interval", -1i32)
.build()?;
let decoder_name = require_hevc_decoder()?;
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building HEVC decoder element {decoder_name}"))?;
configure_hevc_decoder(&decoder);
2026-05-12 10:46:56 -03:00
let decoded_queue = build_hevc_freshness_queue("hevc_mjpeg_decoded_queue")?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let encoder = gst::ElementFactory::make("jpegenc")
.property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32)
.build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
2026-05-12 10:46:56 -03:00
let encoded_queue = build_hevc_freshness_queue("hevc_mjpeg_encoded_queue")?;
tracing::info!(
target: "lesavka_server::video",
decoder = decoder_name,
"📸 HEVC camera uplink will be decoded and emitted as MJPEG/UVC"
);
if mjpeg_spool_enabled() {
let sink = gst::ElementFactory::make("appsink")
.name("hevc_mjpeg_spool_sink")
2026-05-12 10:46:56 -03:00
.property("sync", false)
.property("enable-last-sample", false)
.property("emit-signals", false)
2026-05-12 10:46:56 -03:00
.property("max-buffers", 1u32)
.property("drop", true)
.build()?
.downcast::<gst_app::AppSink>()
.expect("appsink");
pipeline.add_many([
src.upcast_ref(),
&h265parse,
&decoder,
2026-05-12 10:46:56 -03:00
&decoded_queue,
&convert,
&encoder,
&caps,
2026-05-12 10:46:56 -03:00
&encoded_queue,
sink.upcast_ref(),
])?;
gst::Element::link_many([
src.upcast_ref(),
&h265parse,
&decoder,
2026-05-12 10:46:56 -03:00
&decoded_queue,
&convert,
&encoder,
&caps,
2026-05-12 10:46:56 -03:00
&encoded_queue,
sink.upcast_ref(),
])?;
mjpeg_spool_file = Some(mjpeg_spool_path());
decoded_mjpeg_sink = Some(sink);
} else {
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", uvc_dev)
.build()?;
sink.set_property_from_str("io-mode", &uvc_mjpeg_v4l2sink_io_mode());
if clock_align_enabled {
crate::media_timing::enable_sink_clock_sync(&sink);
} else if sink.has_property("sync", None) {
sink.set_property("sync", false);
}
pipeline.add_many([
src.upcast_ref(),
&h265parse,
&decoder,
&convert,
&encoder,
&caps,
&sink,
])?;
gst::Element::link_many([
src.upcast_ref(),
&h265parse,
&decoder,
&convert,
&encoder,
&caps,
&sink,
])?;
}
} else {
let caps_h264 = gst::Caps::builder("video/x-h264")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
let raw_caps = gst::Caps::builder("video/x-raw")
.field("format", "YUY2")
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
src.set_caps(Some(&caps_h264));
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder_name = require_h264_decoder()?;
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building decoder element {decoder_name}"))?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let scale = gst::ElementFactory::make("videoscale").build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", uvc_dev)
.build()?;
if clock_align_enabled {
crate::media_timing::enable_sink_clock_sync(&sink);
} else if sink.has_property("sync", None) {
sink.set_property("sync", false);
}
pipeline.add_many([
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&scale,
&caps,
&sink,
])?;
gst::Element::link_many([
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&scale,
&caps,
&sink,
])?;
}
pipeline.set_state(gst::State::Playing)?;
2026-05-12 10:46:56 -03:00
let bus_watch = spawn_webcam_bus_logger(&pipeline, "uvc-webcam");
let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1);
Ok(Self {
appsrc: src,
pipe: pipeline,
clock_aligned: AtomicBool::new(!clock_align_enabled),
next_pts_us: AtomicU64::new(0),
frame_step_us,
mjpeg_spool_path: mjpeg_spool_file,
hevc_mjpeg_appsrc,
decoded_mjpeg_sink,
last_mjpeg_passthrough_bytes: AtomicU64::new(0),
2026-05-14 05:18:36 -03:00
direct_mjpeg_max_bytes: mjpeg_spool_frame_max_bytes(cfg.fps),
uvc_width: cfg.width.min(u32::from(u16::MAX)) as u16,
uvc_height: cfg.height.min(u32::from(u16::MAX)) as u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool::new(false),
last_decoded_mjpeg_bytes: AtomicU64::new(0),
2026-05-12 10:46:56 -03:00
decoded_mjpeg_miss_count: AtomicU64::new(0),
decode_recovery_needs_irap: AtomicBool::new(false),
_bus_watch: bus_watch,
})
}
/// Push one client frame into the UVC pipeline.
///
/// Inputs: the next `VideoPacket` from the gRPC camera stream.
/// Outputs: none; the frame is forwarded to the appsrc when possible.
/// Why: UVC sinks use a locally monotonic timeline so presentation remains
/// stable even when WAN packet timestamps arrive out of order.
#[cfg(coverage)]
pub fn push(&self, pkt: VideoPacket) {
let buf = gst::Buffer::from_slice(pkt.data);
let _ = self.appsrc.push_buffer(buf);
}
#[cfg(not(coverage))]
pub fn push(&self, pkt: VideoPacket) {
if let Some(path) = &self.mjpeg_spool_path
&& looks_like_mjpeg_frame(&pkt.data)
{
self.spool_direct_mjpeg_frame(path, &pkt);
return;
}
2026-05-12 10:46:56 -03:00
let hevc_recovery_frame =
self.decoded_mjpeg_sink.is_some() && contains_hevc_irap(&pkt.data);
if self.decoded_mjpeg_sink.is_some()
&& self
.decode_recovery_needs_irap
.load(std::sync::atomic::Ordering::Relaxed)
{
if !hevc_recovery_frame {
return;
}
self.decode_recovery_needs_irap
.store(false, std::sync::atomic::Ordering::Relaxed);
self.decoded_mjpeg_miss_count
.store(0, std::sync::atomic::Ordering::Relaxed);
tracing::info!(
target: "lesavka_server::video",
pts = pkt.pts,
"📸 HEVC decoded-MJPEG handoff found a recovery keyframe"
);
}
if self.mjpeg_spool_path.is_some()
&& self.decoded_mjpeg_sink.is_none()
&& !looks_like_mjpeg_frame(&pkt.data)
{
warn!(
target:"lesavka_server::video",
bytes = pkt.data.len(),
hevc_annex_b = looks_like_annex_b_hevc(&pkt.data),
"📸⚠️ dropping non-MJPEG packet before UVC spool; no dynamic decoder is available"
);
return;
}
let mut buf = gst::Buffer::from_slice(pkt.data);
if let Some(meta) = buf.get_mut() {
let pts_us = reserve_local_pts(&self.next_pts_us, pkt.pts, self.frame_step_us);
if !self
.clock_aligned
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
crate::media_timing::align_pipeline_to_session_clock(&self.pipe, pts_us);
}
let ts = gst::ClockTime::from_useconds(pts_us);
meta.set_pts(Some(ts));
meta.set_dts(Some(ts));
meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us)));
}
let hevc_appsrc = self.hevc_mjpeg_appsrc.as_ref().unwrap_or(&self.appsrc);
if let Err(err) = hevc_appsrc.push_buffer(buf) {
tracing::warn!(target:"lesavka_server::video", %err, "📸⚠️ appsrc push failed");
return;
}
if let (Some(path), Some(sink)) = (&self.mjpeg_spool_path, &self.decoded_mjpeg_sink)
&& let Some(sample) = freshest_mjpeg_sample(sink)
&& let Some(buffer) = sample.buffer()
&& let Ok(map) = buffer.map_readable()
{
2026-05-12 10:46:56 -03:00
self.decoded_mjpeg_miss_count
.store(0, std::sync::atomic::Ordering::Relaxed);
let decoded_pts_us = buffer.pts().map(|pts| pts.nseconds() / 1_000);
let timing = MjpegSpoolTiming::hevc_decoded_mjpeg(pkt.pts, decoded_pts_us);
let previous_bytes = self
.last_decoded_mjpeg_bytes
.load(std::sync::atomic::Ordering::Relaxed);
let decoded_bytes = map.as_slice().len();
if hevc_mjpeg_guard::should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())
{
warn!(
target:"lesavka_server::video",
previous_bytes,
next_bytes = decoded_bytes,
"📸⚠️ freezing suspicious decoded HEVC->MJPEG frame"
);
return;
}
if decoded_bytes > self.direct_mjpeg_max_bytes {
warn!(
target:"lesavka_server::video",
previous_bytes,
next_bytes = decoded_bytes,
max_bytes = self.direct_mjpeg_max_bytes,
"📸⚠️ freezing oversized decoded HEVC->MJPEG frame before UVC spool"
);
return;
}
if let Err(err) = spool_mjpeg_frame_with_timing(path, map.as_slice(), Some(timing)) {
warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool decoded HEVC frame for UVC helper");
} else {
self.last_decoded_mjpeg_bytes
.store(decoded_bytes as u64, std::sync::atomic::Ordering::Relaxed);
}
2026-05-12 10:46:56 -03:00
} else if self.decoded_mjpeg_sink.is_some() {
let misses = self
.decoded_mjpeg_miss_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
+ 1;
let limit = uvc_hevc_decode_miss_limit();
if misses >= limit {
self.decode_recovery_needs_irap
.store(true, std::sync::atomic::Ordering::Relaxed);
self.decoded_mjpeg_miss_count
.store(0, std::sync::atomic::Ordering::Relaxed);
warn!(
target: "lesavka_server::video",
misses,
limit,
"📸⚠️ HEVC decoded-MJPEG handoff produced no frames; freezing output until the next recovery keyframe"
);
}
}
}
#[cfg(not(coverage))]
fn spool_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) {
let previous_bytes = self
.last_mjpeg_passthrough_bytes
.load(std::sync::atomic::Ordering::Relaxed);
2026-05-14 05:18:36 -03:00
let inspection = hevc_mjpeg_guard::inspect_mjpeg_frame(&pkt.data);
if let (Some(width), Some(height)) = (inspection.width, inspection.height)
&& (width, height) != (self.uvc_width, self.uvc_height)
&& !self
.direct_mjpeg_profile_mismatch_seen
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
warn!(
target:"lesavka_server::video",
frame_width = width,
frame_height = height,
uvc_width = self.uvc_width,
uvc_height = self.uvc_height,
"📸⚠️ direct MJPEG frame dimensions differ from the live UVC profile; this can make browser output unstable"
);
}
if let Some(reason) = hevc_mjpeg_guard::direct_mjpeg_reject_reason(
previous_bytes,
Some(self.direct_mjpeg_max_bytes),
Some((self.uvc_width, self.uvc_height)),
&pkt.data,
) {
warn!(
target:"lesavka_server::video",
2026-05-14 05:18:36 -03:00
?reason,
previous_bytes,
next_bytes = pkt.data.len(),
2026-05-14 05:18:36 -03:00
max_bytes = self.direct_mjpeg_max_bytes,
frame_width = ?inspection.width,
frame_height = ?inspection.height,
entropy_bytes = inspection.entropy_bytes,
entropy_distinct_bytes = inspection.entropy_distinct_bytes,
entropy_dominant_pct = inspection.entropy_dominant_pct,
entropy_max_run = inspection.entropy_max_run,
"📸⚠️ freezing suspicious direct MJPEG frame before UVC spool"
);
return;
}
let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts);
if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) {
warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper");
} else {
self.last_mjpeg_passthrough_bytes
.store(pkt.data.len() as u64, std::sync::atomic::Ordering::Relaxed);
}
}
}
impl Drop for WebcamSink {
fn drop(&mut self) {
let _ = self.pipe.set_state(gst::State::Null);
2026-05-12 10:46:56 -03:00
#[cfg(not(coverage))]
{
let _ = self._bus_watch.take();
}
}
}
#[cfg(test)]
mod tests {
#[test]
fn mjpeg_spool_byte_guard_accepts_jpeg_and_identifies_hevc_annex_b() {
assert!(super::looks_like_mjpeg_frame(&[
0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10
]));
assert!(!super::looks_like_mjpeg_frame(&[
0x00, 0x00, 0x00, 0x01, 0x46, 0x01
]));
assert!(super::looks_like_annex_b_hevc(&[
0x00, 0x00, 0x00, 0x01, 0x46, 0x01
]));
assert!(super::looks_like_annex_b_hevc(&[
0x00, 0x00, 0x01, 0x26
]));
assert!(!super::looks_like_annex_b_hevc(&[
0xff, 0xd8, 0xff, 0xdb
]));
}
#[test]
fn uvc_session_clock_alignment_defaults_on_and_accepts_disable_overrides() {
temp_env::with_var_unset("LESAVKA_UVC_SESSION_CLOCK_ALIGN", || {
assert!(super::uvc_sink_session_clock_align_enabled());
});
for disabled in ["0", "false", "no", "off"] {
temp_env::with_var("LESAVKA_UVC_SESSION_CLOCK_ALIGN", Some(disabled), || {
assert!(!super::uvc_sink_session_clock_align_enabled());
});
}
temp_env::with_var("LESAVKA_UVC_SESSION_CLOCK_ALIGN", Some("1"), || {
assert!(super::uvc_sink_session_clock_align_enabled());
});
}
#[test]
fn mjpeg_uvc_sink_defaults_to_mmap_io_mode_with_safe_override() {
temp_env::with_var_unset("LESAVKA_UVC_MJPEG_IO_MODE", || {
assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "mmap");
});
temp_env::with_var("LESAVKA_UVC_MJPEG_IO_MODE", Some("mmap"), || {
assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "mmap");
});
temp_env::with_var("LESAVKA_UVC_MJPEG_IO_MODE", Some("not-real"), || {
assert_eq!(super::uvc_mjpeg_v4l2sink_io_mode(), "mmap");
});
}
#[test]
fn mjpeg_spool_defaults_on_with_path_override() {
temp_env::with_var_unset("LESAVKA_UVC_MJPEG_SPOOL", || {
assert!(super::mjpeg_spool_enabled());
});
temp_env::with_var("LESAVKA_UVC_MJPEG_SPOOL", Some("0"), || {
assert!(!super::mjpeg_spool_enabled());
});
for disabled in ["false", "no", "off"] {
temp_env::with_var("LESAVKA_UVC_MJPEG_SPOOL", Some(disabled), || {
assert!(!super::mjpeg_spool_enabled());
});
}
temp_env::with_var("LESAVKA_UVC_FRAME_PATH", Some("/tmp/frame.mjpg"), || {
assert_eq!(
super::mjpeg_spool_path(),
std::path::PathBuf::from("/tmp/frame.mjpg")
);
});
}
2026-05-12 03:02:57 -03:00
#[test]
fn uvc_appsrc_limits_default_to_freshness_first_bounds() {
temp_env::with_var_unset("LESAVKA_UVC_APP_MAX_BUFFERS", || {
temp_env::with_var_unset("LESAVKA_UVC_APP_MAX_BYTES", || {
temp_env::with_var_unset("LESAVKA_UVC_APP_MAX_TIME_NS", || {
temp_env::with_var_unset("LESAVKA_UVC_APP_LEAKY_TYPE", || {
assert_eq!(super::uvc_appsrc_max_buffers(), 4);
assert_eq!(super::uvc_appsrc_max_bytes(), 4 * 1024 * 1024);
assert_eq!(super::uvc_appsrc_max_time_ns(), 200_000_000);
assert_eq!(super::uvc_appsrc_leaky_type(), "downstream");
});
});
});
});
}
#[test]
fn uvc_appsrc_limits_accept_positive_safe_overrides_only() {
temp_env::with_var("LESAVKA_UVC_APP_MAX_BUFFERS", Some("6"), || {
temp_env::with_var("LESAVKA_UVC_APP_MAX_BYTES", Some("1048576"), || {
temp_env::with_var("LESAVKA_UVC_APP_MAX_TIME_NS", Some("100000000"), || {
temp_env::with_var("LESAVKA_UVC_APP_LEAKY_TYPE", Some("upstream"), || {
assert_eq!(super::uvc_appsrc_max_buffers(), 6);
assert_eq!(super::uvc_appsrc_max_bytes(), 1_048_576);
assert_eq!(super::uvc_appsrc_max_time_ns(), 100_000_000);
assert_eq!(super::uvc_appsrc_leaky_type(), "upstream");
});
});
});
});
temp_env::with_var("LESAVKA_UVC_APP_MAX_BUFFERS", Some("0"), || {
temp_env::with_var("LESAVKA_UVC_APP_MAX_BYTES", Some("nope"), || {
temp_env::with_var("LESAVKA_UVC_APP_MAX_TIME_NS", Some("0"), || {
temp_env::with_var("LESAVKA_UVC_APP_LEAKY_TYPE", Some("sideways"), || {
assert_eq!(super::uvc_appsrc_max_buffers(), 4);
assert_eq!(super::uvc_appsrc_max_bytes(), 4 * 1024 * 1024);
assert_eq!(super::uvc_appsrc_max_time_ns(), 200_000_000);
assert_eq!(super::uvc_appsrc_leaky_type(), "downstream");
});
});
});
});
}
2026-05-12 10:46:56 -03:00
#[test]
fn hevc_spool_freshness_bounds_default_to_tiny_live_handoff() {
2026-05-12 10:46:56 -03:00
temp_env::with_var_unset("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", || {
temp_env::with_var_unset("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", || {
assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 2);
2026-05-12 10:46:56 -03:00
assert_eq!(super::uvc_hevc_decode_miss_limit(), 15);
});
});
}
#[test]
fn hevc_spool_freshness_bounds_accept_only_small_positive_queue_depths() {
temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("3"), || {
temp_env::with_var("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", Some("4"), || {
assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 3);
assert_eq!(super::uvc_hevc_decode_miss_limit(), 4);
});
});
temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("99"), || {
assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 4);
});
temp_env::with_var("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", Some("0"), || {
temp_env::with_var("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", Some("0"), || {
assert_eq!(super::uvc_hevc_freshness_queue_buffers(), 2);
2026-05-12 10:46:56 -03:00
assert_eq!(super::uvc_hevc_decode_miss_limit(), 15);
});
});
}
#[cfg(not(coverage))]
#[test]
fn webcam_bus_watch_stops_promptly_on_drop() {
use gstreamer as gst;
use gstreamer::prelude::ElementExt;
gst::init().expect("gstreamer init");
let pipeline = gst::Pipeline::new();
let bus = pipeline.bus().expect("pipeline bus");
let started = std::time::Instant::now();
drop(super::WebcamBusWatchHandle::spawn(bus, "test-webcam"));
assert!(
started.elapsed() < std::time::Duration::from_secs(1),
"webcam bus watcher should not outlive dropped webcam sinks"
);
}
}