// server/src/video.rs use anyhow::Context; use futures_util::Stream; use gst::prelude::*; use gst::MessageView::*; use gst::MessageView; use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::VideoPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{Level, debug, enabled, error, info, trace, warn}; const EYE_ID: [&str; 2] = ["l", "r"]; static START: std::sync::OnceLock = std::sync::OnceLock::new(); static DEV_MODE: std::sync::OnceLock = std::sync::OnceLock::new(); fn env_u32(name: &str, default: u32) -> u32 { std::env::var(name) .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(default) } fn dev_mode_enabled() -> bool { *DEV_MODE .get_or_init(|| std::env::var("LESAVKA_DEV_MODE").is_ok()) } fn contains_idr(h264: &[u8]) -> bool { // naive Annex‑B scan for H.264 IDR (NAL type 5) let mut i = 0; while i + 4 < h264.len() { // find start code 0x000001 or 0x00000001 if h264[i] == 0 && h264[i + 1] == 0 { let offset = if h264[i + 2] == 1 { 3 } else if h264[i + 2] == 0 && h264[i + 3] == 1 { 4 } else { i += 1; continue; }; let nal_idx = i + offset; if nal_idx < h264.len() { let nal = h264[nal_idx] & 0x1F; if nal == 5 { return true; } } } i += 1; } false } pub struct VideoStream { _pipeline: gst::Pipeline, inner: ReceiverStream>, } impl Stream for VideoStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) } } impl Drop for VideoStream { fn drop(&mut self) { // shut down nicely - avoids the “dispose element … READY/PLAYING …” spam let _ = self._pipeline.set_state(gst::State::Null); } } pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result { let eye = EYE_ID[id as usize]; gst::init().context("gst init")?; let desc = format!( "v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \ queue ! \ h264parse disable-passthrough=true config-interval=-1 ! \ video/x-h264,stream-format=byte-stream,alignment=au ! \ appsink name=sink emit-signals=true max-buffers=32 drop=true" ); // let desc = format!( // "v4l2src device={dev} io-mode=mmap ! \ // queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 ! tsdemux name=d ! \ // video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! tsdemux name=d ! \ // d. ! h264parse config-interval=1 ! queue ! appsink name=vsink emit-signals=true \ // d. ! aacparse ! queue ! h264parse config-interval=1 ! appsink name=sink \ // emit-signals=true drop=false sync=false" // ); let pipeline = gst::parse::launch(&desc)? .downcast::() .expect("not a pipeline"); // let pipeline: gst::Pipeline = gst::parse_launch(&desc)? // .downcast() // .expect("not a pipeline"); let sink = pipeline .by_name("sink") .expect("appsink") .dynamic_cast::() .expect("appsink down-cast"); let (tx, rx) = tokio::sync::mpsc::channel(8192); /* ----- BUS WATCH: show errors & warnings immediately --------------- */ let bus = pipeline.bus().expect("bus"); if let Some(src_pad) = pipeline .by_name(&format!("cam_{eye}")) .and_then(|e| e.static_pad("src")) { src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| { if let Some(gst::PadProbeData::Event(ref ev)) = info.data { if let gst::EventView::Caps(c) = ev.view() { trace!(target:"lesavka_server::video", ?c, "🔍 new caps on {}", pad.name()); } } gst::PadProbeReturn::Ok }); } else { warn!(target:"lesavka_server::video", eye = %eye, "🍪 cam_{eye} not found - skipping pad-probe"); } let eye_clone = eye.to_owned(); std::thread::spawn(move || { for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { Error(err) => { error!(target:"lesavka_server::video", eye = %eye_clone, "💥 pipeline error: {} ({})", err.error(), err.debug().unwrap_or_default()); } Warning(w) => { warn!(target:"lesavka_server::video", eye = %eye_clone, "⚠️ pipeline warning: {} ({})", w.error(), w.debug().unwrap_or_default()); } Info(i) => { info!(target:"lesavka_server::video", eye = %eye_clone, "📌 pipeline info: {} ({})", i.error(), i.debug().unwrap_or_default()); } StateChanged(s) if s.current() == gst::State::Playing => { debug!(target:"lesavka_server::video", eye = %eye_clone, "🎬 pipeline PLAYING"); } _ => {} } } }); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |sink| { /* -------- pull frame ---------- */ let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; /* -------- map once, reuse ----- */ let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; /* -------- basic counters ------ */ static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 120 == 0 && contains_idr(map.as_slice()) { trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames"); if enabled!(Level::TRACE) { let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n); std::fs::write(&path, map.as_slice()).ok(); } } else if n < 10 { debug!(target: "lesavka_server::video", eye = eye, frame = n, bytes = map.len(), pts = ?buffer.pts(), "⬆️ pushed video sample eye-{eye}"); } /* -------- detect SPS / IDR ---- */ if enabled!(Level::DEBUG) { if let Some(&nal) = map.as_slice().get(4) { if (nal & 0x1F) == 0x05 /* IDR */ { debug!("eye-{eye}: IDR"); } } } /* -------- timestamps ---------- */ let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO)); let pts_us = buffer .pts() .unwrap_or(gst::ClockTime::ZERO) .saturating_sub(origin) .nseconds() / 1_000; /* -------- ship over gRPC ----- */ let data = map.as_slice().to_vec(); let size = data.len(); let pkt = VideoPacket { id, pts: pts_us, data, }; match tx.try_send(Ok(pkt)) { Ok(_) => { trace!(target:"lesavka_server::video", eye = %eye, size = size, "🎥📤 sent"); } Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { static DROP_CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let c = DROP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if c % 120 == 0 { debug!(target:"lesavka_server::video", eye = %eye, dropped = c, "🎥⏳ channel full - dropping frames"); } } Err(e) => error!("mpsc send err: {e}"), } Ok(gst::FlowSuccess::Ok) }) .build(), ); pipeline .set_state(gst::State::Playing) .context("🎥 starting video pipeline eye-{eye}")?; let bus = pipeline.bus().unwrap(); loop { match bus.timed_pop(gst::ClockTime::NONE) { Some(msg) if matches!(msg.view(), MessageView::StateChanged(s) if s.current() == gst::State::Playing) => { break; } Some(_) => continue, None => continue, } } Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx), }) } pub struct WebcamSink { appsrc: gst_app::AppSrc, _pipe: gst::Pipeline, } impl WebcamSink { pub fn new(uvc_dev: &str) -> anyhow::Result { gst::init()?; let pipeline = gst::Pipeline::new(); let width = env_u32("LESAVKA_UVC_WIDTH", 1280) as i32; let height = env_u32("LESAVKA_UVC_HEIGHT", 720) as i32; let fps = env_u32("LESAVKA_UVC_FPS", 30).max(1) as i32; let caps_h264 = gst::Caps::builder("video/x-h264") .field("stream-format", "byte-stream") .field("alignment", "au") .build(); let raw_caps = gst::Caps::builder("video/x-raw") .field("format", "YUY2") .field("width", width) .field("height", height) .field("framerate", gst::Fraction::new(fps, 1)) .build(); let src = gst::ElementFactory::make("appsrc") .build()? .downcast::() .expect("appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); src.set_caps(Some(&caps_h264)); src.set_property("block", &true); let h264parse = gst::ElementFactory::make("h264parse").build()?; let decoder_name = Self::pick_decoder(); let decoder = gst::ElementFactory::make(decoder_name) .build() .with_context(|| format!("building decoder element {decoder_name}"))?; let convert = gst::ElementFactory::make("videoconvert").build()?; let scale = gst::ElementFactory::make("videoscale").build()?; let caps = gst::ElementFactory::make("capsfilter") .property("caps", &raw_caps) .build()?; let sink = gst::ElementFactory::make("v4l2sink") .property("device", &uvc_dev) .property("sync", &false) .build()?; // Up‑cast to &gst::Element for the collection macros pipeline.add_many(&[ src.upcast_ref(), &h264parse, &decoder, &convert, &scale, &caps, &sink, ])?; gst::Element::link_many(&[ src.upcast_ref(), &h264parse, &decoder, &convert, &scale, &caps, &sink, ])?; pipeline.set_state(gst::State::Playing)?; Ok(Self { appsrc: src, _pipe: pipeline, }) } pub fn push(&self, pkt: VideoPacket) { let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); let _ = self.appsrc.push_buffer(buf); } fn pick_decoder() -> &'static str { if gst::ElementFactory::find("v4l2h264dec").is_some() { "v4l2h264dec" } else if gst::ElementFactory::find("v4l2slh264dec").is_some() { "v4l2slh264dec" } else if gst::ElementFactory::find("omxh264dec").is_some() { "omxh264dec" } else { "avdec_h264" } } } /*─────────────────────────────────*/ /* gRPC → WebcamSink relay */ /*─────────────────────────────────*/ pub struct CameraRelay { sink: WebcamSink, // the v4l2sink pipeline (or stub) id: u32, // gRPC “id” (for future multi‑cam) frames: std::sync::atomic::AtomicU64, } impl CameraRelay { pub fn new(id: u32, uvc_dev: &str) -> anyhow::Result { Ok(Self { sink: WebcamSink::new(uvc_dev)?, id, frames: std::sync::atomic::AtomicU64::new(0), }) } /// Push one VideoPacket coming from the client pub fn feed(&self, pkt: VideoPacket) { let n = self .frames .fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 60 == 0 { tracing::debug!(target:"lesavka_server::video", cam_id = self.id, frame = n, bytes = pkt.data.len(), pts = pkt.pts, "📸 srv webcam frame"); } else if n % 10 == 0 { tracing::trace!(target:"lesavka_server::video", cam_id = self.id, bytes = pkt.data.len(), "📸📥 srv pkt"); } if dev_mode_enabled() && (cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE)) && contains_idr(&pkt.data) { let path = format!("/tmp/eye3-cli-{n:05}.h264"); if let Err(e) = std::fs::write(&path, &pkt.data) { tracing::warn!("📸💾 dump failed: {e}"); } else { tracing::debug!("📸💾 wrote {}", path); } } self.sink.push(pkt); } }