lesavka/server/src/video_sinks/webcam_sink.rs

200 lines
7.2 KiB
Rust

use anyhow::Context;
use gstreamer as gst;
use gstreamer::prelude::*;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use std::fs;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU64};
use tracing::warn;
use crate::camera::{CameraCodec, CameraConfig};
use crate::video_support::{contains_idr, dev_mode_enabled, pick_h264_decoder, reserve_local_pts};
/// Push H.264 or MJPEG frames into the USB UVC gadget.
///
/// Inputs: a UVC device node and the negotiated camera configuration.
/// Outputs: a live `WebcamSink` that accepts `VideoPacket`s.
/// Why: the UVC sink owns the GStreamer pipeline details for gadget output so
/// the relay logic can focus on session lifecycle instead of media plumbing.
pub struct WebcamSink {
appsrc: gst_app::AppSrc,
pipe: gst::Pipeline,
next_pts_us: AtomicU64,
frame_step_us: u64,
}
impl WebcamSink {
/// Build a new webcam sink pipeline.
///
/// Inputs: the target UVC device plus the selected camera profile.
/// Outputs: a sink ready to receive `VideoPacket`s.
/// Why: UVC output has its own caps and decoder chain that differs from the
/// HDMI sink, so it lives in a dedicated constructor.
#[cfg(coverage)]
pub fn new(_uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", &false);
let sink = gst::ElementFactory::make("fakesink")
.build()
.context("building fakesink")?;
pipeline.add_many(&[src.upcast_ref(), &sink])?;
gst::Element::link_many(&[src.upcast_ref(), &sink])?;
pipeline.set_state(gst::State::Playing)?;
let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1);
Ok(Self {
appsrc: src,
pipe: pipeline,
next_pts_us: AtomicU64::new(0),
frame_step_us,
})
}
#[cfg(not(coverage))]
pub fn new(uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let width = cfg.width as i32;
let height = cfg.height as i32;
let fps = cfg.fps.max(1) as i32;
let use_mjpeg = matches!(cfg.codec, CameraCodec::Mjpeg);
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_property("do-timestamp", false);
let block = std::env::var("LESAVKA_UVC_APP_BLOCK")
.ok()
.map(|value| value != "0")
.unwrap_or(false);
src.set_property("block", block);
if use_mjpeg {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
src.set_caps(Some(&caps_mjpeg));
let queue = gst::ElementFactory::make("queue").build()?;
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", uvc_dev)
.property("sync", false)
.build()?;
pipeline.add_many([src.upcast_ref(), &queue, &capsfilter, &sink])?;
gst::Element::link_many([src.upcast_ref(), &queue, &capsfilter, &sink])?;
} else {
let caps_h264 = gst::Caps::builder("video/x-h264")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
let raw_caps = gst::Caps::builder("video/x-raw")
.field("format", "YUY2")
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
src.set_caps(Some(&caps_h264));
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder_name = pick_h264_decoder();
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building decoder element {decoder_name}"))?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let scale = gst::ElementFactory::make("videoscale").build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", uvc_dev)
.property("sync", false)
.build()?;
pipeline.add_many([
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&scale,
&caps,
&sink,
])?;
gst::Element::link_many([
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&scale,
&caps,
&sink,
])?;
}
pipeline.set_state(gst::State::Playing)?;
let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1);
Ok(Self {
appsrc: src,
pipe: pipeline,
next_pts_us: AtomicU64::new(0),
frame_step_us,
})
}
/// Push one client frame into the UVC pipeline.
///
/// Inputs: the next `VideoPacket` from the gRPC camera stream.
/// Outputs: none; the frame is forwarded to the appsrc when possible.
/// Why: UVC sinks use a locally monotonic timeline so presentation remains
/// stable even when WAN packet timestamps arrive out of order.
#[cfg(coverage)]
pub fn push(&self, pkt: VideoPacket) {
let buf = gst::Buffer::from_slice(pkt.data);
let _ = self.appsrc.push_buffer(buf);
}
#[cfg(not(coverage))]
pub fn push(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
if let Some(meta) = buf.get_mut() {
let pts_us = reserve_local_pts(&self.next_pts_us, pkt.pts, self.frame_step_us);
let ts = gst::ClockTime::from_useconds(pts_us);
meta.set_pts(Some(ts));
meta.set_dts(Some(ts));
meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us)));
}
if let Err(err) = self.appsrc.push_buffer(buf) {
tracing::warn!(target:"lesavka_server::video", %err, "📸⚠️ appsrc push failed");
}
}
}
impl Drop for WebcamSink {
fn drop(&mut self) {
let _ = self.pipe.set_state(gst::State::Null);
}
}