// server/src/video.rs use anyhow::Context; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; use lesavka_common::lesavka::VideoPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; pub async fn spawn_camera( dev: &str, id: u32, max_bitrate_kbit: u32, ) -> anyhow::Result>> { gst::init().context("gst init")?; // v4l2src → H.264 already, we only parse & relay let desc = format!( "v4l2src device={dev} io-mode=dmabuf ! queue ! h264parse config-interval=1 ! \ video/x-h264,stream-format=byte-stream,profile=baseline,level=4,\ bitrate={max_bitrate_kbit}000 ! appsink name=sink emit-signals=true sync=false" ); let pipeline = gst::parse_launch(&desc)?.downcast::()?; let sink = pipeline .by_name("sink") .expect("appsink") .dynamic_cast::() .expect("appsink downcast"); let (tx, rx) = tokio::sync::mpsc::channel(256); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |sink| { let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; let pkt = VideoPacket { id, pts: buffer.pts().nseconds() / 1_000, // → µs data: map.as_slice().to_vec(), }; // ignore back‑pressure: drop oldest if channel is full let _ = tx.try_send(Ok(pkt)); Ok(gst::FlowSuccess::Ok) }) .build(), ); pipeline.set_state(gst::State::Playing)?; Ok(ReceiverStream::new(rx)) }