lesavka/server/src/video_sinks/webcam_sink.rs

480 lines
16 KiB
Rust
Raw Normal View History

use anyhow::Context;
use gstreamer as gst;
use gstreamer::prelude::*;
use gstreamer_app as gst_app;
use std::fs;
use lesavka_common::lesavka::VideoPacket;
use std::path::{Path, PathBuf};
2026-05-12 10:46:56 -03:00
use std::sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use tracing::warn;
use crate::camera::{CameraCodec, CameraConfig};
use crate::video_support::{
contains_idr, dev_mode_enabled, require_h264_decoder, require_hevc_decoder,
};
#[path = "webcam_sink/constructor.rs"]
mod constructor;
#[path = "webcam_sink/frame_handoff.rs"]
mod frame_handoff;
mod mjpeg_spool;
2026-05-12 10:46:56 -03:00
#[cfg(not(coverage))]
use gst::MessageView::{Error, StateChanged, Warning};
/// Push H.264 or MJPEG frames into the USB UVC gadget.
///
/// Inputs: a UVC device node and the negotiated camera configuration.
/// Outputs: a live `WebcamSink` that accepts `VideoPacket`s.
/// Why: the UVC sink owns the GStreamer pipeline details for gadget output so
/// the relay logic can focus on session lifecycle instead of media plumbing.
pub struct WebcamSink {
appsrc: gst_app::AppSrc,
pipe: gst::Pipeline,
clock_aligned: AtomicBool,
next_pts_us: AtomicU64,
frame_step_us: u64,
mjpeg_spool_path: Option<PathBuf>,
direct_mjpeg_appsrc: Option<gst_app::AppSrc>,
normalized_mjpeg_sink: Option<gst_app::AppSink>,
hevc_mjpeg_appsrc: Option<gst_app::AppSrc>,
decoded_mjpeg_sink: Option<gst_app::AppSink>,
last_mjpeg_passthrough_bytes: AtomicU64,
2026-05-14 05:18:36 -03:00
direct_mjpeg_max_bytes: usize,
uvc_width: u16,
uvc_height: u16,
direct_mjpeg_profile_mismatch_seen: AtomicBool,
unexpected_mjpeg_in_hevc_seen: AtomicBool,
last_decoded_mjpeg_bytes: AtomicU64,
direct_mjpeg_normalize_bypassed: AtomicBool,
normalized_mjpeg_miss_count: AtomicU64,
normalized_mjpeg_memory_check_count: AtomicU64,
2026-05-12 10:46:56 -03:00
decoded_mjpeg_miss_count: AtomicU64,
decode_recovery_needs_irap: AtomicBool,
#[cfg(not(coverage))]
_bus_watch: Option<WebcamBusWatchHandle>,
}
fn uvc_sink_session_clock_align_enabled() -> bool {
std::env::var("LESAVKA_UVC_SESSION_CLOCK_ALIGN")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(true)
}
fn uvc_mjpeg_v4l2sink_io_mode() -> String {
let value = std::env::var("LESAVKA_UVC_MJPEG_IO_MODE").unwrap_or_else(|_| "mmap".to_string());
let trimmed = value.trim().to_ascii_lowercase();
match trimmed.as_str() {
"auto" | "rw" | "mmap" | "userptr" | "dmabuf" | "dmabuf-import" => trimmed,
_ => {
warn!(
value,
"invalid LESAVKA_UVC_MJPEG_IO_MODE; falling back to mmap"
);
"mmap".to_string()
}
}
}
2026-05-12 03:02:57 -03:00
fn positive_u64_env(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
fn uvc_appsrc_max_buffers() -> u64 {
positive_u64_env("LESAVKA_UVC_APP_MAX_BUFFERS", 4)
}
fn uvc_appsrc_max_bytes() -> u64 {
positive_u64_env("LESAVKA_UVC_APP_MAX_BYTES", 4 * 1024 * 1024)
}
fn uvc_appsrc_max_time_ns() -> u64 {
positive_u64_env("LESAVKA_UVC_APP_MAX_TIME_NS", 200_000_000)
}
fn uvc_appsrc_leaky_type() -> String {
std::env::var("LESAVKA_UVC_APP_LEAKY_TYPE")
.ok()
.map(|value| value.trim().to_ascii_lowercase())
.filter(|value| matches!(value.as_str(), "downstream" | "upstream" | "none"))
.unwrap_or_else(|| "downstream".to_string())
}
fn looks_like_mjpeg_frame(data: &[u8]) -> bool {
data.len() >= 4 && data.starts_with(&[0xff, 0xd8, 0xff])
}
fn looks_like_annex_b_hevc(data: &[u8]) -> bool {
data.starts_with(&[0, 0, 0, 1]) || data.starts_with(&[0, 0, 1])
}
2026-05-12 10:46:56 -03:00
fn uvc_hevc_freshness_queue_buffers() -> u32 {
positive_u64_env("LESAVKA_UVC_HEVC_FRESHNESS_QUEUE_BUFFERS", 2)
2026-05-16 17:47:58 -03:00
.clamp(1, 4) as u32
2026-05-12 10:46:56 -03:00
}
fn uvc_hevc_decode_miss_limit() -> u64 {
positive_u64_env("LESAVKA_UVC_HEVC_DECODE_MISS_LIMIT", 15)
}
2026-05-12 03:02:57 -03:00
/// Bound the UVC ingress queue so decode/UVC stalls cannot turn into RSS growth.
///
/// Inputs: the UVC `appsrc`. Output: side-effect-only GStreamer properties.
/// Why: live webcam output should prefer dropping stale frames over buffering
/// seconds or minutes of encoded media when the physical sink falls behind.
fn configure_uvc_appsrc(appsrc: &gst_app::AppSrc) {
appsrc.set_property("block", false);
if appsrc.has_property("max-buffers", None) {
appsrc.set_property("max-buffers", uvc_appsrc_max_buffers());
}
if appsrc.has_property("max-bytes", None) {
appsrc.set_property("max-bytes", uvc_appsrc_max_bytes());
}
if appsrc.has_property("max-time", None) {
appsrc.set_property("max-time", uvc_appsrc_max_time_ns());
}
if appsrc.has_property("leaky-type", None) {
appsrc.set_property_from_str("leaky-type", &uvc_appsrc_leaky_type());
}
}
/// Build a tiny leaky queue for the decoded HEVC handoff branch.
2026-05-12 10:46:56 -03:00
///
/// Inputs: a stable queue name. Output: configured GStreamer queue element.
/// Why: hidden raw/JPEG backlogs are memory leaks in a live webcam path; after
/// HEVC is decoded it is safe to drop stale raw frames and keep only the newest
/// candidate for MJPEG publication while absorbing normal decoder scheduling
/// jitter.
2026-05-12 10:46:56 -03:00
#[cfg(not(coverage))]
fn build_hevc_freshness_queue(name: &str) -> anyhow::Result<gst::Element> {
let queue = gst::ElementFactory::make("queue")
.name(name)
.property("max-size-buffers", uvc_hevc_freshness_queue_buffers())
.property("max-size-bytes", 0u32)
.property("max-size-time", 0u64)
.build()?;
queue.set_property_from_str("leaky", "downstream");
Ok(queue)
}
#[cfg(not(coverage))]
fn direct_mjpeg_normalize_pull_timeout() -> gst::ClockTime {
gst::ClockTime::from_mseconds(u64::from(
hevc_mjpeg_guard::direct_mjpeg_normalize_pull_timeout_ms(),
))
}
#[cfg(not(coverage))]
fn current_process_rss_kb() -> Option<u64> {
let status = fs::read_to_string("/proc/self/status").ok()?;
status.lines().find_map(|line| {
let rest = line.strip_prefix("VmRSS:")?;
rest.split_whitespace().next()?.parse::<u64>().ok()
})
}
/// Drain normalized direct-MJPEG output down to the freshest sample.
///
/// Inputs: the direct-MJPEG normalization appsink. Output: newest available
/// sample, if any. Why: decode/re-encode should sanitize browser-facing MJPEG
/// without letting stale frames accumulate behind the live webcam feed.
#[cfg(not(coverage))]
fn freshest_direct_mjpeg_sample(sink: &gst_app::AppSink) -> Option<gst::Sample> {
let mut newest = sink.try_pull_sample(direct_mjpeg_normalize_pull_timeout());
while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) {
newest = Some(sample);
}
newest
}
#[cfg(not(coverage))]
fn build_direct_mjpeg_normalize_branch(
pipeline: &gst::Pipeline,
width: i32,
height: i32,
_fps: i32,
) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> {
let src = gst::ElementFactory::make("appsrc")
.name("direct_mjpeg_normalize_src")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("direct MJPEG normalize appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", true);
configure_uvc_appsrc(&src);
let caps_in = gst::Caps::builder("image/jpeg").build();
src.set_caps(Some(&caps_in));
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.build();
2026-05-19 12:01:13 -03:00
let input_parser = gst::ElementFactory::make("jpegparse")
.name("direct_mjpeg_normalize_input_parse")
.build()?;
let decoder = gst::ElementFactory::make("jpegdec").build()?;
let decoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_decoded_queue")?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let scale = gst::ElementFactory::make("videoscale").build()?;
let raw_caps = gst::Caps::builder("video/x-raw")
.field("width", width)
.field("height", height)
.build();
let raw_capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let encoder = gst::ElementFactory::make("jpegenc")
.property(
"quality",
hevc_mjpeg_guard::direct_mjpeg_jpeg_quality() as i32,
)
.build()?;
2026-05-19 12:01:13 -03:00
let output_parser = gst::ElementFactory::make("jpegparse")
.name("direct_mjpeg_normalize_output_parse")
.build()?;
let encoded_caps = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
let encoded_queue = build_hevc_freshness_queue("direct_mjpeg_normalize_encoded_queue")?;
let sink = gst::ElementFactory::make("appsink")
.name("direct_mjpeg_normalize_sink")
.property("sync", false)
.property("enable-last-sample", false)
.property("emit-signals", false)
.property("max-buffers", 1u32)
.property("drop", true)
.build()?
.downcast::<gst_app::AppSink>()
.expect("direct MJPEG normalize appsink");
pipeline.add_many([
src.upcast_ref(),
2026-05-19 12:01:13 -03:00
&input_parser,
&decoder,
&decoded_queue,
&convert,
&scale,
&raw_capsfilter,
&encoder,
2026-05-19 12:01:13 -03:00
&output_parser,
&encoded_caps,
&encoded_queue,
sink.upcast_ref(),
])?;
gst::Element::link_many([
src.upcast_ref(),
2026-05-19 12:01:13 -03:00
&input_parser,
&decoder,
&decoded_queue,
&convert,
&scale,
&raw_capsfilter,
&encoder,
2026-05-19 12:01:13 -03:00
&output_parser,
&encoded_caps,
&encoded_queue,
sink.upcast_ref(),
])?;
Ok((src, sink))
}
#[cfg(not(coverage))]
fn add_hevc_mjpeg_spool_branch(
pipeline: &gst::Pipeline,
width: i32,
height: i32,
2026-05-19 12:01:13 -03:00
_fps: i32,
) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> {
let src = gst::ElementFactory::make("appsrc")
.name("dynamic_hevc_mjpeg_src")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("dynamic HEVC appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", false);
configure_uvc_appsrc(&src);
let caps_hevc = gst::Caps::builder("video/x-h265")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
src.set_caps(Some(&caps_hevc));
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.build();
let h265parse = gst::ElementFactory::make("h265parse")
.property("disable-passthrough", true)
.property("config-interval", -1i32)
.build()?;
let decoder_name = require_hevc_decoder()?;
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building dynamic HEVC decoder element {decoder_name}"))?;
configure_hevc_decoder(&decoder);
let decoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_decoded_queue")?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let encoder = gst::ElementFactory::make("jpegenc")
.property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32)
.build()?;
2026-05-19 12:01:13 -03:00
let jpegparse = gst::ElementFactory::make("jpegparse")
.name("dynamic_hevc_mjpeg_output_parse")
.build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
let encoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_encoded_queue")?;
let sink = gst::ElementFactory::make("appsink")
.name("dynamic_hevc_mjpeg_spool_sink")
.property("sync", false)
.property("enable-last-sample", false)
.property("emit-signals", false)
.property("max-buffers", 1u32)
.property("drop", true)
.build()?
.downcast::<gst_app::AppSink>()
.expect("dynamic HEVC appsink");
pipeline.add_many([
src.upcast_ref(),
&h265parse,
&decoder,
&decoded_queue,
&convert,
&encoder,
2026-05-19 12:01:13 -03:00
&jpegparse,
&caps,
&encoded_queue,
sink.upcast_ref(),
])?;
gst::Element::link_many([
src.upcast_ref(),
&h265parse,
&decoder,
&decoded_queue,
&convert,
&encoder,
2026-05-19 12:01:13 -03:00
&jpegparse,
&caps,
&encoded_queue,
sink.upcast_ref(),
])?;
Ok((src, sink))
}
/// Configure conservative recovery knobs on hardware HEVC decoders.
///
/// Inputs: a decoder element selected by `require_hevc_decoder`. Output:
/// side-effect-only property updates when the element supports them. Why: the
/// Pi stateless decoder can otherwise hold onto corrupt or dependency-missing
/// pictures after live HEVC packet drops, starving the MJPEG UVC handoff.
#[cfg(not(coverage))]
fn configure_hevc_decoder(decoder: &gst::Element) {
if decoder.has_property("discard-corrupted-frames", None) {
decoder.set_property("discard-corrupted-frames", true);
}
if decoder.has_property("automatic-request-sync-points", None) {
decoder.set_property("automatic-request-sync-points", true);
}
}
2026-05-12 10:46:56 -03:00
#[cfg(not(coverage))]
struct WebcamBusWatchHandle {
alive: Arc<AtomicBool>,
join: Option<std::thread::JoinHandle<()>>,
}
#[cfg(not(coverage))]
impl WebcamBusWatchHandle {
fn spawn(bus: gst::Bus, label: &'static str) -> Self {
let alive = Arc::new(AtomicBool::new(true));
let alive_flag = Arc::clone(&alive);
let join = std::thread::spawn(move || {
while alive_flag.load(Ordering::Relaxed) {
let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else {
continue;
};
match msg.view() {
StateChanged(state)
if state.current() == gst::State::Playing
&& msg.src().is_some_and(|src| src.is::<gst::Pipeline>()) =>
{
tracing::debug!(target: "lesavka_server::video", label, "📸 UVC webcam pipeline ▶️");
}
Error(err) => tracing::error!(
target: "lesavka_server::video",
label,
"📸💥 UVC webcam pipeline error from {:?}: {} ({})",
msg.src().map(gst::prelude::GstObjectExt::path_string),
err.error(),
err.debug().unwrap_or_default()
),
Warning(warning) => tracing::warn!(
target: "lesavka_server::video",
label,
"📸⚠️ UVC webcam pipeline warning from {:?}: {} ({})",
msg.src().map(gst::prelude::GstObjectExt::path_string),
warning.error(),
warning.debug().unwrap_or_default()
),
_ => {}
}
}
});
Self {
alive,
join: Some(join),
}
}
}
#[cfg(not(coverage))]
impl Drop for WebcamBusWatchHandle {
fn drop(&mut self) {
self.alive.store(false, Ordering::Relaxed);
if let Some(join) = self.join.take() {
let _ = join.join();
}
}
}
#[cfg(not(coverage))]
fn spawn_webcam_bus_logger(
pipeline: &gst::Pipeline,
label: &'static str,
) -> Option<WebcamBusWatchHandle> {
pipeline
.bus()
.map(|bus| WebcamBusWatchHandle::spawn(bus, label))
}
impl Drop for WebcamSink {
fn drop(&mut self) {
let _ = self.pipe.set_state(gst::State::Null);
2026-05-12 10:46:56 -03:00
#[cfg(not(coverage))]
{
let _ = self._bus_watch.take();
}
}
}