459 lines
16 KiB
Rust
459 lines
16 KiB
Rust
#![forbid(unsafe_code)]
|
|
|
|
use anyhow::Context;
|
|
use gstreamer as gst;
|
|
use gstreamer::prelude::*;
|
|
use gstreamer_app as gst_app;
|
|
use lesavka_common::lesavka::VideoPacket;
|
|
use std::sync::atomic::AtomicU64;
|
|
use tracing::warn;
|
|
|
|
use crate::camera::{CameraCodec, CameraConfig};
|
|
use crate::video_support::{contains_idr, dev_mode_enabled, next_local_pts, pick_h264_decoder};
|
|
|
|
/// Push H.264 or MJPEG frames into the USB UVC gadget.
|
|
///
|
|
/// Inputs: a UVC device node and the negotiated camera configuration.
|
|
/// Outputs: a live `WebcamSink` that accepts `VideoPacket`s.
|
|
/// Why: the UVC sink owns the GStreamer pipeline details for gadget output so
|
|
/// the relay logic can focus on session lifecycle instead of media plumbing.
|
|
pub struct WebcamSink {
|
|
appsrc: gst_app::AppSrc,
|
|
pipe: gst::Pipeline,
|
|
next_pts_us: AtomicU64,
|
|
frame_step_us: u64,
|
|
}
|
|
|
|
impl WebcamSink {
|
|
/// Build a new webcam sink pipeline.
|
|
///
|
|
/// Inputs: the target UVC device plus the selected camera profile.
|
|
/// Outputs: a sink ready to receive `VideoPacket`s.
|
|
/// Why: UVC output has its own caps and decoder chain that differs from the
|
|
/// HDMI sink, so it lives in a dedicated constructor.
|
|
pub fn new(uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
|
|
gst::init()?;
|
|
|
|
let pipeline = gst::Pipeline::new();
|
|
|
|
let width = cfg.width as i32;
|
|
let height = cfg.height as i32;
|
|
let fps = cfg.fps.max(1) as i32;
|
|
let use_mjpeg = matches!(cfg.codec, CameraCodec::Mjpeg);
|
|
|
|
let src = gst::ElementFactory::make("appsrc")
|
|
.build()?
|
|
.downcast::<gst_app::AppSrc>()
|
|
.expect("appsrc");
|
|
src.set_is_live(true);
|
|
src.set_format(gst::Format::Time);
|
|
src.set_property("do-timestamp", &false);
|
|
let block = std::env::var("LESAVKA_UVC_APP_BLOCK")
|
|
.ok()
|
|
.map(|value| value != "0")
|
|
.unwrap_or(false);
|
|
src.set_property("block", &block);
|
|
|
|
if use_mjpeg {
|
|
let caps_mjpeg = gst::Caps::builder("image/jpeg")
|
|
.field("parsed", true)
|
|
.field("width", width)
|
|
.field("height", height)
|
|
.field("framerate", gst::Fraction::new(fps, 1))
|
|
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
|
|
.field("colorimetry", "2:4:7:1")
|
|
.build();
|
|
src.set_caps(Some(&caps_mjpeg));
|
|
|
|
let queue = gst::ElementFactory::make("queue").build()?;
|
|
let capsfilter = gst::ElementFactory::make("capsfilter")
|
|
.property("caps", &caps_mjpeg)
|
|
.build()?;
|
|
let sink = gst::ElementFactory::make("v4l2sink")
|
|
.property("device", &uvc_dev)
|
|
.property("sync", &false)
|
|
.build()?;
|
|
|
|
pipeline.add_many(&[src.upcast_ref(), &queue, &capsfilter, &sink])?;
|
|
gst::Element::link_many(&[src.upcast_ref(), &queue, &capsfilter, &sink])?;
|
|
} else {
|
|
let caps_h264 = gst::Caps::builder("video/x-h264")
|
|
.field("stream-format", "byte-stream")
|
|
.field("alignment", "au")
|
|
.build();
|
|
let raw_caps = gst::Caps::builder("video/x-raw")
|
|
.field("format", "YUY2")
|
|
.field("width", width)
|
|
.field("height", height)
|
|
.field("framerate", gst::Fraction::new(fps, 1))
|
|
.build();
|
|
src.set_caps(Some(&caps_h264));
|
|
|
|
let h264parse = gst::ElementFactory::make("h264parse").build()?;
|
|
let decoder_name = pick_h264_decoder();
|
|
let decoder = gst::ElementFactory::make(decoder_name)
|
|
.build()
|
|
.with_context(|| format!("building decoder element {decoder_name}"))?;
|
|
let convert = gst::ElementFactory::make("videoconvert").build()?;
|
|
let scale = gst::ElementFactory::make("videoscale").build()?;
|
|
let caps = gst::ElementFactory::make("capsfilter")
|
|
.property("caps", &raw_caps)
|
|
.build()?;
|
|
let sink = gst::ElementFactory::make("v4l2sink")
|
|
.property("device", &uvc_dev)
|
|
.property("sync", &false)
|
|
.build()?;
|
|
|
|
pipeline.add_many(&[
|
|
src.upcast_ref(),
|
|
&h264parse,
|
|
&decoder,
|
|
&convert,
|
|
&scale,
|
|
&caps,
|
|
&sink,
|
|
])?;
|
|
gst::Element::link_many(&[
|
|
src.upcast_ref(),
|
|
&h264parse,
|
|
&decoder,
|
|
&convert,
|
|
&scale,
|
|
&caps,
|
|
&sink,
|
|
])?;
|
|
}
|
|
pipeline.set_state(gst::State::Playing)?;
|
|
|
|
let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1);
|
|
Ok(Self {
|
|
appsrc: src,
|
|
pipe: pipeline,
|
|
next_pts_us: AtomicU64::new(0),
|
|
frame_step_us,
|
|
})
|
|
}
|
|
|
|
/// Push one client frame into the UVC pipeline.
|
|
///
|
|
/// Inputs: the next `VideoPacket` from the gRPC camera stream.
|
|
/// Outputs: none; the frame is forwarded to the appsrc when possible.
|
|
/// Why: UVC sinks use a locally monotonic timeline so presentation remains
|
|
/// stable even when WAN packet timestamps arrive out of order.
|
|
pub fn push(&self, pkt: VideoPacket) {
|
|
let mut buf = gst::Buffer::from_slice(pkt.data);
|
|
if let Some(meta) = buf.get_mut() {
|
|
let pts_us = next_local_pts(&self.next_pts_us, self.frame_step_us);
|
|
let ts = gst::ClockTime::from_useconds(pts_us);
|
|
meta.set_pts(Some(ts));
|
|
meta.set_dts(Some(ts));
|
|
meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us)));
|
|
}
|
|
if let Err(err) = self.appsrc.push_buffer(buf) {
|
|
tracing::warn!(target:"lesavka_server::video", %err, "📸⚠️ appsrc push failed");
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for WebcamSink {
|
|
fn drop(&mut self) {
|
|
let _ = self.pipe.set_state(gst::State::Null);
|
|
}
|
|
}
|
|
|
|
/// Push H.264 or MJPEG frames into the HDMI display pipeline.
|
|
///
|
|
/// Inputs: the negotiated camera configuration.
|
|
/// Outputs: a live `HdmiSink` ready to display frames.
|
|
/// Why: HDMI output uses a different sink selection and conversion chain than
|
|
/// the USB gadget, so it warrants a dedicated implementation.
|
|
pub struct HdmiSink {
|
|
appsrc: gst_app::AppSrc,
|
|
pipe: gst::Pipeline,
|
|
next_pts_us: AtomicU64,
|
|
frame_step_us: u64,
|
|
}
|
|
|
|
impl HdmiSink {
|
|
/// Build a new HDMI sink pipeline.
|
|
///
|
|
/// Inputs: the selected camera configuration, including optional connector
|
|
/// metadata for `kmssink`.
|
|
/// Outputs: a sink ready to receive `VideoPacket`s.
|
|
/// Why: display output must honor connector pinning and decoder selection
|
|
/// while keeping the relay code agnostic of GStreamer details.
|
|
pub fn new(cfg: &CameraConfig) -> anyhow::Result<Self> {
|
|
gst::init()?;
|
|
|
|
let pipeline = gst::Pipeline::new();
|
|
let width = cfg.width as i32;
|
|
let height = cfg.height as i32;
|
|
let fps = cfg.fps.max(1) as i32;
|
|
|
|
let src = gst::ElementFactory::make("appsrc")
|
|
.build()?
|
|
.downcast::<gst_app::AppSrc>()
|
|
.expect("appsrc");
|
|
src.set_is_live(true);
|
|
src.set_format(gst::Format::Time);
|
|
src.set_property("do-timestamp", &false);
|
|
|
|
let raw_caps = gst::Caps::builder("video/x-raw")
|
|
.field("width", width)
|
|
.field("height", height)
|
|
.field("framerate", gst::Fraction::new(fps, 1))
|
|
.build();
|
|
let capsfilter = gst::ElementFactory::make("capsfilter")
|
|
.property("caps", &raw_caps)
|
|
.build()?;
|
|
|
|
let queue = gst::ElementFactory::make("queue")
|
|
.property("max-size-buffers", 4u32)
|
|
.build()?;
|
|
let convert = gst::ElementFactory::make("videoconvert").build()?;
|
|
let rate = gst::ElementFactory::make("videorate").build()?;
|
|
let scale = gst::ElementFactory::make("videoscale").build()?;
|
|
let sink = build_hdmi_sink(cfg)?;
|
|
|
|
match cfg.codec {
|
|
CameraCodec::H264 => {
|
|
let caps_h264 = gst::Caps::builder("video/x-h264")
|
|
.field("stream-format", "byte-stream")
|
|
.field("alignment", "au")
|
|
.build();
|
|
src.set_caps(Some(&caps_h264));
|
|
let h264parse = gst::ElementFactory::make("h264parse").build()?;
|
|
let decoder_name = pick_h264_decoder();
|
|
let decoder = gst::ElementFactory::make(decoder_name)
|
|
.build()
|
|
.with_context(|| format!("building decoder element {decoder_name}"))?;
|
|
|
|
pipeline.add_many(&[
|
|
src.upcast_ref(),
|
|
&queue,
|
|
&h264parse,
|
|
&decoder,
|
|
&rate,
|
|
&convert,
|
|
&scale,
|
|
&capsfilter,
|
|
&sink,
|
|
])?;
|
|
gst::Element::link_many(&[
|
|
src.upcast_ref(),
|
|
&queue,
|
|
&h264parse,
|
|
&decoder,
|
|
&rate,
|
|
&convert,
|
|
&scale,
|
|
&capsfilter,
|
|
&sink,
|
|
])?;
|
|
}
|
|
CameraCodec::Mjpeg => {
|
|
let caps_mjpeg = gst::Caps::builder("image/jpeg")
|
|
.field("parsed", true)
|
|
.field("width", width)
|
|
.field("height", height)
|
|
.field("framerate", gst::Fraction::new(fps, 1))
|
|
.build();
|
|
src.set_caps(Some(&caps_mjpeg));
|
|
let jpegdec = gst::ElementFactory::make("jpegdec").build()?;
|
|
|
|
pipeline.add_many(&[
|
|
src.upcast_ref(),
|
|
&queue,
|
|
&jpegdec,
|
|
&rate,
|
|
&convert,
|
|
&scale,
|
|
&capsfilter,
|
|
&sink,
|
|
])?;
|
|
gst::Element::link_many(&[
|
|
src.upcast_ref(),
|
|
&queue,
|
|
&jpegdec,
|
|
&rate,
|
|
&convert,
|
|
&scale,
|
|
&capsfilter,
|
|
&sink,
|
|
])?;
|
|
}
|
|
}
|
|
|
|
pipeline.set_state(gst::State::Playing)?;
|
|
let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1);
|
|
Ok(Self {
|
|
appsrc: src,
|
|
pipe: pipeline,
|
|
next_pts_us: AtomicU64::new(0),
|
|
frame_step_us,
|
|
})
|
|
}
|
|
|
|
/// Push one client frame into the HDMI pipeline.
|
|
///
|
|
/// Inputs: the next `VideoPacket` from the gRPC camera stream.
|
|
/// Outputs: none; the frame is forwarded to the appsrc when possible.
|
|
/// Why: display playback uses the same local monotonic PTS policy as UVC to
|
|
/// avoid visible glitches when remote timestamps jitter.
|
|
pub fn push(&self, pkt: VideoPacket) {
|
|
let mut buf = gst::Buffer::from_slice(pkt.data);
|
|
if let Some(meta) = buf.get_mut() {
|
|
let pts_us = next_local_pts(&self.next_pts_us, self.frame_step_us);
|
|
let ts = gst::ClockTime::from_useconds(pts_us);
|
|
meta.set_pts(Some(ts));
|
|
meta.set_dts(Some(ts));
|
|
meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us)));
|
|
}
|
|
if let Err(err) = self.appsrc.push_buffer(buf) {
|
|
tracing::warn!(target:"lesavka_server::video", %err, "📺⚠️ HDMI appsrc push failed");
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for HdmiSink {
|
|
fn drop(&mut self) {
|
|
let _ = self.pipe.set_state(gst::State::Null);
|
|
}
|
|
}
|
|
|
|
fn build_hdmi_sink(cfg: &CameraConfig) -> anyhow::Result<gst::Element> {
|
|
if let Ok(name) = std::env::var("LESAVKA_HDMI_SINK") {
|
|
return gst::ElementFactory::make(&name)
|
|
.build()
|
|
.context("building HDMI sink");
|
|
}
|
|
|
|
if gst::ElementFactory::find("kmssink").is_some() {
|
|
let sink = gst::ElementFactory::make("kmssink").build()?;
|
|
if sink.has_property("driver-name", None) {
|
|
let driver = std::env::var("LESAVKA_HDMI_DRIVER").unwrap_or_else(|_| "vc4".to_string());
|
|
sink.set_property("driver-name", &driver);
|
|
}
|
|
if let Some(connector) = cfg.hdmi.as_ref().and_then(|hdmi| hdmi.id) {
|
|
if sink.has_property("connector-id", None) {
|
|
sink.set_property("connector-id", &(connector as i32));
|
|
} else {
|
|
tracing::warn!(
|
|
target: "lesavka_server::video",
|
|
%connector,
|
|
"kmssink does not expose connector-id property; using default connector"
|
|
);
|
|
}
|
|
}
|
|
if sink.has_property("force-modesetting", None) {
|
|
sink.set_property("force-modesetting", &true);
|
|
}
|
|
sink.set_property("sync", &false);
|
|
return Ok(sink);
|
|
}
|
|
|
|
let sink = gst::ElementFactory::make("autovideosink")
|
|
.build()
|
|
.context("building HDMI sink")?;
|
|
let _ = sink.set_property("sync", &false);
|
|
Ok(sink)
|
|
}
|
|
|
|
enum CameraSink {
|
|
Uvc(WebcamSink),
|
|
Hdmi(HdmiSink),
|
|
}
|
|
|
|
impl CameraSink {
|
|
fn push(&self, pkt: VideoPacket) {
|
|
match self {
|
|
CameraSink::Uvc(sink) => sink.push(pkt),
|
|
CameraSink::Hdmi(sink) => sink.push(pkt),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Forward camera packets from gRPC into either a UVC or HDMI sink.
|
|
///
|
|
/// Inputs: packets received from the client camera stream.
|
|
/// Outputs: none; packets are forwarded to the configured sink.
|
|
/// Why: camera sessions share the same logging and dev-mode dump behavior even
|
|
/// though their physical sinks differ.
|
|
pub struct CameraRelay {
|
|
sink: CameraSink,
|
|
id: u32,
|
|
frames: AtomicU64,
|
|
}
|
|
|
|
impl CameraRelay {
|
|
/// Build a relay that targets the USB UVC gadget.
|
|
///
|
|
/// Inputs: the logical camera id, UVC device node, and camera config.
|
|
/// Outputs: a relay that writes frames into the gadget pipeline.
|
|
/// Why: keeping constructors explicit avoids accidental sink mismatches.
|
|
pub fn new_uvc(id: u32, uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
|
|
Ok(Self {
|
|
sink: CameraSink::Uvc(WebcamSink::new(uvc_dev, cfg)?),
|
|
id,
|
|
frames: AtomicU64::new(0),
|
|
})
|
|
}
|
|
|
|
/// Build a relay that targets the HDMI output pipeline.
|
|
///
|
|
/// Inputs: the logical camera id plus the camera config.
|
|
/// Outputs: a relay that writes frames into the display pipeline.
|
|
/// Why: the camera runtime reuses this constructor when the negotiated
|
|
/// output mode selects HDMI instead of UVC.
|
|
pub fn new_hdmi(id: u32, cfg: &CameraConfig) -> anyhow::Result<Self> {
|
|
Ok(Self {
|
|
sink: CameraSink::Hdmi(HdmiSink::new(cfg)?),
|
|
id,
|
|
frames: AtomicU64::new(0),
|
|
})
|
|
}
|
|
|
|
/// Push one `VideoPacket` coming from the client.
|
|
///
|
|
/// Inputs: the next packet from the camera stream.
|
|
/// Outputs: none; the packet is logged and forwarded to the sink.
|
|
/// Why: centralizing frame logging and dev-mode dump behavior keeps the
|
|
/// transport session logic separate from media sink mechanics.
|
|
pub fn feed(&self, pkt: VideoPacket) {
|
|
let frame = self
|
|
.frames
|
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
if frame < 10 || frame % 60 == 0 {
|
|
tracing::debug!(
|
|
target:"lesavka_server::video",
|
|
cam_id = self.id,
|
|
frame,
|
|
bytes = pkt.data.len(),
|
|
pts = pkt.pts,
|
|
"📸 srv webcam frame"
|
|
);
|
|
} else if frame % 10 == 0 {
|
|
tracing::trace!(
|
|
target:"lesavka_server::video",
|
|
cam_id = self.id,
|
|
bytes = pkt.data.len(),
|
|
"📸📥 srv pkt"
|
|
);
|
|
}
|
|
|
|
if dev_mode_enabled()
|
|
&& (cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE))
|
|
&& contains_idr(&pkt.data)
|
|
{
|
|
let path = format!("/tmp/eye3-cli-{frame:05}.h264");
|
|
if let Err(error) = std::fs::write(&path, &pkt.data) {
|
|
warn!("📸💾 dump failed: {error}");
|
|
} else {
|
|
tracing::debug!("📸💾 wrote {}", path);
|
|
}
|
|
}
|
|
|
|
self.sink.push(pkt);
|
|
}
|
|
}
|