// server/src/video.rs use anyhow::Context; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; use gst::log; use lesavka_common::lesavka::VideoPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, enabled, trace, Level}; static START: std::sync::OnceLock = std::sync::OnceLock::new(); pub async fn eye_ball( dev: &str, id: u32, _max_bitrate_kbit: u32, ) -> anyhow::Result>> { gst::init().context("gst init")?; let desc = format!( "v4l2src device={dev} io-mode=mmap ! \ video/x-h264,stream-format=byte-stream,alignment=au ! \ h264parse config-interval=1 ! \ appsink name=sink emit-signals=true drop=true sync=false" ); let pipeline = gst::parse::launch(&desc)? .downcast::() .expect("pipeline down‑cast"); let sink = pipeline .by_name("sink") .expect("appsink") .dynamic_cast::() .expect("appsink down‑cast"); let (tx, rx) = tokio::sync::mpsc::channel(256); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |sink| { /* -------- pull frame ---------- */ let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; /* -------- basic counters ------ */ static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 120 == 0 { trace!("eye{id}: delivered {n} frames"); } /* -------- map once, reuse ----- */ let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; // write first IDR to disk (quick sanity check) if n == 0 { std::fs::write(format!("/tmp/eye{id}-idr.h264"), map.as_slice()).ok(); } /* -------- detect SPS / IDR ---- */ if enabled!(Level::DEBUG) { if let Some(&nal) = map.as_slice().get(4) { if (nal & 0x1F) == 0x05 /* IDR */ { debug!("eye{id}: IDR"); } } } /* -------- timestamps ---------- */ let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO)); let pts_us = buffer .pts() .unwrap_or(gst::ClockTime::ZERO) .saturating_sub(origin) .nseconds() / 1_000; /* -------- ship over gRPC ----- */ let pkt = VideoPacket { id, pts: pts_us, data: map.as_slice().to_vec(), }; let _ = tx.try_send(Ok(pkt)); Ok(gst::FlowSuccess::Ok) }) .build(), ); pipeline.set_state(gst::State::Playing)?; Ok(ReceiverStream::new(rx)) }