lesavka/server/src/video.rs

645 lines
22 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// server/src/video.rs
use anyhow::Context;
use futures_util::Stream;
use gst::MessageView;
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{Level, debug, enabled, error, info, trace, warn};
use crate::camera::{CameraCodec, CameraConfig};
const EYE_ID: [&str; 2] = ["l", "r"];
static START: std::sync::OnceLock<gst::ClockTime> = std::sync::OnceLock::new();
static DEV_MODE: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
fn env_u32(name: &str, default: u32) -> u32 {
std::env::var(name)
.ok()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(default)
}
fn dev_mode_enabled() -> bool {
*DEV_MODE.get_or_init(|| std::env::var("LESAVKA_DEV_MODE").is_ok())
}
fn pick_h264_decoder() -> &'static str {
if gst::ElementFactory::find("v4l2h264dec").is_some() {
"v4l2h264dec"
} else if gst::ElementFactory::find("v4l2slh264dec").is_some() {
"v4l2slh264dec"
} else if gst::ElementFactory::find("omxh264dec").is_some() {
"omxh264dec"
} else {
"avdec_h264"
}
}
fn contains_idr(h264: &[u8]) -> bool {
// naive AnnexB scan for H.264 IDR (NAL type 5)
let mut i = 0;
while i + 4 < h264.len() {
// find start code 0x000001 or 0x00000001
if h264[i] == 0 && h264[i + 1] == 0 {
let offset = if h264[i + 2] == 1 {
3
} else if h264[i + 2] == 0 && h264[i + 3] == 1 {
4
} else {
i += 1;
continue;
};
let nal_idx = i + offset;
if nal_idx < h264.len() {
let nal = h264[nal_idx] & 0x1F;
if nal == 5 {
return true;
}
}
}
i += 1;
}
false
}
pub struct VideoStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<VideoPacket, Status>>,
}
impl Stream for VideoStream {
type Item = Result<VideoPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
impl Drop for VideoStream {
fn drop(&mut self) {
// shut down nicely - avoids the “dispose element … READY/PLAYING …” spam
let _ = self._pipeline.set_state(gst::State::Null);
}
}
pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
let target_fps = env_u32("LESAVKA_EYE_FPS", 25);
let frame_interval_us = if target_fps == 0 {
0
} else {
(1_000_000 / target_fps) as u64
};
let last_sent = Arc::new(AtomicU64::new(0));
let desc = format!(
"v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \
queue ! \
h264parse disable-passthrough=true config-interval=-1 ! \
video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=sink emit-signals=true max-buffers=32 drop=true"
);
// let desc = format!(
// "v4l2src device={dev} io-mode=mmap ! \
// queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 ! tsdemux name=d ! \
// video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! tsdemux name=d ! \
// d. ! h264parse config-interval=1 ! queue ! appsink name=vsink emit-signals=true \
// d. ! aacparse ! queue ! h264parse config-interval=1 ! appsink name=sink \
// emit-signals=true drop=false sync=false"
// );
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
// let pipeline: gst::Pipeline = gst::parse_launch(&desc)?
// .downcast()
// .expect("not a pipeline");
let sink = pipeline
.by_name("sink")
.expect("appsink")
.dynamic_cast::<gst_app::AppSink>()
.expect("appsink down-cast");
let (tx, rx) = tokio::sync::mpsc::channel(8192);
/* ----- BUS WATCH: show errors & warnings immediately --------------- */
let bus = pipeline.bus().expect("bus");
if let Some(src_pad) = pipeline
.by_name(&format!("cam_{eye}"))
.and_then(|e| e.static_pad("src"))
{
src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if let gst::EventView::Caps(c) = ev.view() {
trace!(target:"lesavka_server::video",
?c, "🔍 new caps on {}", pad.name());
}
}
gst::PadProbeReturn::Ok
});
} else {
warn!(target:"lesavka_server::video",
eye = %eye,
"🍪 cam_{eye} not found - skipping pad-probe");
}
let eye_clone = eye.to_owned();
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(err) => {
error!(target:"lesavka_server::video",
eye = %eye_clone,
"💥 pipeline error: {} ({})",
err.error(), err.debug().unwrap_or_default());
}
Warning(w) => {
warn!(target:"lesavka_server::video",
eye = %eye_clone,
"⚠️ pipeline warning: {} ({})",
w.error(), w.debug().unwrap_or_default());
}
Info(i) => {
info!(target:"lesavka_server::video",
eye = %eye_clone,
"📌 pipeline info: {} ({})",
i.error(), i.debug().unwrap_or_default());
}
StateChanged(s) if s.current() == gst::State::Playing => {
debug!(target:"lesavka_server::video",
eye = %eye_clone,
"🎬 pipeline PLAYING");
}
_ => {}
}
}
});
let last_sent_cloned = last_sent.clone();
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
/* -------- pull frame ---------- */
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
/* -------- map once, reuse ----- */
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
/* -------- basic counters ------ */
static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 && contains_idr(map.as_slice()) {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames");
if enabled!(Level::TRACE) {
let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n);
std::fs::write(&path, map.as_slice()).ok();
}
} else if n < 10 {
debug!(target: "lesavka_server::video",
eye = eye, frame = n, bytes = map.len(),
pts = ?buffer.pts(), "⬆️ pushed video sample eye-{eye}");
}
/* -------- detect SPS / IDR ---- */
if enabled!(Level::DEBUG) {
if let Some(&nal) = map.as_slice().get(4) {
if (nal & 0x1F) == 0x05
/* IDR */
{
debug!("eye-{eye}: IDR");
}
}
}
/* -------- timestamps ---------- */
let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO));
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.saturating_sub(origin)
.nseconds()
/ 1_000;
if frame_interval_us > 0 {
let last = last_sent_cloned.load(Ordering::Relaxed);
if last != 0 && pts_us.saturating_sub(last) < frame_interval_us {
return Ok(gst::FlowSuccess::Ok);
}
last_sent_cloned.store(pts_us, Ordering::Relaxed);
}
/* -------- ship over gRPC ----- */
let data = map.as_slice().to_vec();
let size = data.len();
let pkt = VideoPacket {
id,
pts: pts_us,
data,
};
match tx.try_send(Ok(pkt)) {
Ok(_) => {
trace!(target:"lesavka_server::video",
eye = %eye,
size = size,
"🎥📤 sent");
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
static DROP_CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let c = DROP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if c % 120 == 0 {
debug!(target:"lesavka_server::video",
eye = %eye,
dropped = c,
"🎥⏳ channel full - dropping frames");
}
}
Err(e) => error!("mpsc send err: {e}"),
}
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
pipeline
.set_state(gst::State::Playing)
.context("🎥 starting video pipeline eye-{eye}")?;
let bus = pipeline.bus().unwrap();
loop {
match bus.timed_pop(gst::ClockTime::NONE) {
Some(msg)
if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) =>
{
break;
}
Some(_) => continue,
None => continue,
}
}
Ok(VideoStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
})
}
pub struct WebcamSink {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline,
}
impl WebcamSink {
pub fn new(uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let width = cfg.width as i32;
let height = cfg.height as i32;
let fps = cfg.fps.max(1) as i32;
let use_mjpeg = matches!(cfg.codec, CameraCodec::Mjpeg);
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
let block = std::env::var("LESAVKA_UVC_APP_BLOCK")
.ok()
.map(|v| v != "0")
.unwrap_or(false);
src.set_property("block", &block);
if use_mjpeg {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
src.set_caps(Some(&caps_mjpeg));
let queue = gst::ElementFactory::make("queue").build()?;
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
pipeline.add_many(&[src.upcast_ref(), &queue, &capsfilter, &sink])?;
gst::Element::link_many(&[src.upcast_ref(), &queue, &capsfilter, &sink])?;
} else {
let caps_h264 = gst::Caps::builder("video/x-h264")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
let raw_caps = gst::Caps::builder("video/x-raw")
.field("format", "YUY2")
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
src.set_caps(Some(&caps_h264));
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder_name = pick_h264_decoder();
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building decoder element {decoder_name}"))?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let scale = gst::ElementFactory::make("videoscale").build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
pipeline.add_many(&[
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&scale,
&caps,
&sink,
])?;
gst::Element::link_many(&[
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&scale,
&caps,
&sink,
])?;
}
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
appsrc: src,
_pipe: pipeline,
})
}
pub fn push(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
if let Err(err) = self.appsrc.push_buffer(buf) {
tracing::warn!(target:"lesavka_server::video", %err, "📸⚠️ appsrc push failed");
}
}
}
pub struct HdmiSink {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline,
}
impl HdmiSink {
pub fn new(cfg: &CameraConfig) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let width = cfg.width as i32;
let height = cfg.height as i32;
let fps = cfg.fps.max(1) as i32;
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
let raw_caps = gst::Caps::builder("video/x-raw")
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let queue = gst::ElementFactory::make("queue")
.property("max-size-buffers", 4u32)
.build()?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let scale = gst::ElementFactory::make("videoscale").build()?;
let sink = build_hdmi_sink(cfg)?;
match cfg.codec {
CameraCodec::H264 => {
let caps_h264 = gst::Caps::builder("video/x-h264")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
src.set_caps(Some(&caps_h264));
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder_name = pick_h264_decoder();
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building decoder element {decoder_name}"))?;
pipeline.add_many(&[
src.upcast_ref(),
&queue,
&h264parse,
&decoder,
&convert,
&scale,
&capsfilter,
&sink,
])?;
gst::Element::link_many(&[
src.upcast_ref(),
&queue,
&h264parse,
&decoder,
&convert,
&scale,
&capsfilter,
&sink,
])?;
}
CameraCodec::Mjpeg => {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
src.set_caps(Some(&caps_mjpeg));
let jpegdec = gst::ElementFactory::make("jpegdec").build()?;
pipeline.add_many(&[
src.upcast_ref(),
&queue,
&jpegdec,
&convert,
&scale,
&capsfilter,
&sink,
])?;
gst::Element::link_many(&[
src.upcast_ref(),
&queue,
&jpegdec,
&convert,
&scale,
&capsfilter,
&sink,
])?;
}
}
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
appsrc: src,
_pipe: pipeline,
})
}
pub fn push(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
if let Err(err) = self.appsrc.push_buffer(buf) {
tracing::warn!(target:"lesavka_server::video", %err, "📺⚠️ HDMI appsrc push failed");
}
}
}
fn build_hdmi_sink(cfg: &CameraConfig) -> anyhow::Result<gst::Element> {
if let Ok(name) = std::env::var("LESAVKA_HDMI_SINK") {
return gst::ElementFactory::make(&name)
.build()
.context("building HDMI sink");
}
if gst::ElementFactory::find("kmssink").is_some() {
let sink = gst::ElementFactory::make("kmssink").build()?;
if let Some(connector) = cfg.hdmi.as_ref().and_then(|h| h.id) {
if sink.has_property("connector-id", None) {
sink.set_property("connector-id", &(connector as i32));
} else {
tracing::warn!(
target: "lesavka_server::video",
%connector,
"kmssink does not expose connector-id property; using default connector"
);
}
}
sink.set_property("sync", &false);
return Ok(sink);
}
let sink = gst::ElementFactory::make("autovideosink")
.build()
.context("building HDMI sink")?;
let _ = sink.set_property("sync", &false);
Ok(sink)
}
/*─────────────────────────────────*/
/* gRPC → CameraSink relay */
/*─────────────────────────────────*/
enum CameraSink {
Uvc(WebcamSink),
Hdmi(HdmiSink),
}
impl CameraSink {
fn push(&self, pkt: VideoPacket) {
match self {
CameraSink::Uvc(sink) => sink.push(pkt),
CameraSink::Hdmi(sink) => sink.push(pkt),
}
}
}
pub struct CameraRelay {
sink: CameraSink,
id: u32, // gRPC “id” (for future multicam)
frames: std::sync::atomic::AtomicU64,
}
impl CameraRelay {
pub fn new_uvc(id: u32, uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
Ok(Self {
sink: CameraSink::Uvc(WebcamSink::new(uvc_dev, cfg)?),
id,
frames: std::sync::atomic::AtomicU64::new(0),
})
}
pub fn new_hdmi(id: u32, cfg: &CameraConfig) -> anyhow::Result<Self> {
Ok(Self {
sink: CameraSink::Hdmi(HdmiSink::new(cfg)?),
id,
frames: std::sync::atomic::AtomicU64::new(0),
})
}
/// Push one VideoPacket coming from the client
pub fn feed(&self, pkt: VideoPacket) {
let n = self
.frames
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 60 == 0 {
tracing::debug!(target:"lesavka_server::video",
cam_id = self.id,
frame = n,
bytes = pkt.data.len(),
pts = pkt.pts,
"📸 srv webcam frame");
} else if n % 10 == 0 {
tracing::trace!(target:"lesavka_server::video",
cam_id = self.id,
bytes = pkt.data.len(),
"📸📥 srv pkt");
}
if dev_mode_enabled()
&& (cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE))
&& contains_idr(&pkt.data)
{
let path = format!("/tmp/eye3-cli-{n:05}.h264");
if let Err(e) = std::fs::write(&path, &pkt.data) {
tracing::warn!("📸💾 dump failed: {e}");
} else {
tracing::debug!("📸💾 wrote {}", path);
}
}
self.sink.push(pkt);
}
}