lesavka/server/src/video.rs
2025-07-04 01:56:59 -05:00

313 lines
12 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// server/src/video.rs
use anyhow::Context;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst::{log, MessageView};
use gst::MessageView::*;
use lesavka_common::lesavka::VideoPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, warn, error, info, enabled, trace, Level};
use futures_util::Stream;
const EYE_ID: [&str; 2] = ["l", "r"];
static START: std::sync::OnceLock<gst::ClockTime> = std::sync::OnceLock::new();
pub struct VideoStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<VideoPacket, Status>>,
}
impl Stream for VideoStream {
type Item = Result<VideoPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
impl Drop for VideoStream {
fn drop(&mut self) {
// shut down nicely - avoids the “dispose element … READY/PLAYING …” spam
let _ = self._pipeline.set_state(gst::State::Null);
}
}
pub async fn eye_ball(
dev: &str,
id: u32,
_max_bitrate_kbit: u32,
) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
let desc = format!(
"v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \
queue ! \
h264parse disable-passthrough=true config-interval=-1 ! \
video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=sink emit-signals=true max-buffers=32 drop=true"
);
// let desc = format!(
// "v4l2src device={dev} io-mode=mmap ! \
// queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 ! tsdemux name=d ! \
// video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! tsdemux name=d ! \
// d. ! h264parse config-interval=1 ! queue ! appsink name=vsink emit-signals=true \
// d. ! aacparse ! queue ! h264parse config-interval=1 ! appsink name=sink \
// emit-signals=true drop=false sync=false"
// );
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
// let pipeline: gst::Pipeline = gst::parse_launch(&desc)?
// .downcast()
// .expect("not a pipeline");
let sink = pipeline
.by_name("sink")
.expect("appsink")
.dynamic_cast::<gst_app::AppSink>()
.expect("appsink down-cast");
let (tx, rx) = tokio::sync::mpsc::channel(8192);
/* ----- BUS WATCH: show errors & warnings immediately --------------- */
let bus = pipeline.bus().expect("bus");
if let Some(src_pad) = pipeline.by_name(&format!("cam_{eye}"))
.and_then(|e| e.static_pad("src")) {
src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if let gst::EventView::Caps(c) = ev.view() {
trace!(target:"lesavka_server::video",
?c, "🔍 new caps on {}", pad.name());
}
}
gst::PadProbeReturn::Ok
});
} else {
warn!(target:"lesavka_server::video",
eye = %eye,
"🍪 cam_{eye} not found - skipping pad-probe");
}
let eye_clone = eye.to_owned();
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(err) => {
error!(target:"lesavka_server::video",
eye = %eye_clone,
"💥 pipeline error: {} ({})",
err.error(), err.debug().unwrap_or_default());
}
Warning(w) => {
warn!(target:"lesavka_server::video",
eye = %eye_clone,
"⚠️ pipeline warning: {} ({})",
w.error(), w.debug().unwrap_or_default());
}
Info(i) => {
info!(target:"lesavka_server::video",
eye = %eye_clone,
"📌 pipeline info: {} ({})",
i.error(), i.debug().unwrap_or_default());
}
StateChanged(s) if s.current() == gst::State::Playing => {
debug!(target:"lesavka_server::video",
eye = %eye_clone,
"🎬 pipeline PLAYING");
}
_ => {}
}
}
});
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
/* -------- pull frame ---------- */
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
/* -------- map once, reuse ----- */
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
/* -------- basic counters ------ */
static FRAME: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames");
if enabled!(Level::TRACE) {
let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n);
std::fs::write(&path, map.as_slice()).ok();
}
} else if n < 10 {
debug!(target: "lesavka_server::video",
eye = eye, frame = n, bytes = map.len(),
pts = ?buffer.pts(), "⬆️ pushed video sample eye-{eye}");
}
/* -------- detect SPS / IDR ---- */
if enabled!(Level::DEBUG) {
if let Some(&nal) = map.as_slice().get(4) {
if (nal & 0x1F) == 0x05 /* IDR */ {
debug!("eye-{eye}: IDR");
}
}
}
/* -------- timestamps ---------- */
let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO));
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.saturating_sub(origin)
.nseconds()
/ 1_000;
/* -------- ship over gRPC ----- */
let data = map.as_slice().to_vec();
let size = data.len();
let pkt = VideoPacket { id, pts: pts_us, data };
match tx.try_send(Ok(pkt)) {
Ok(_) => {
trace!(target:"lesavka_server::video",
eye = %eye,
size = size,
"🎥📤 sent");
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
static DROP_CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let c = DROP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if c % 120 == 0 {
debug!(target:"lesavka_server::video",
eye = %eye,
dropped = c,
"🎥⏳ channel full - dropping frames");
}
}
Err(e) => error!("mpsc send err: {e}"),
}
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
pipeline.set_state(gst::State::Playing).context("🎥 starting video pipeline eye-{eye}")?;
let bus = pipeline.bus().unwrap();
loop {
match bus.timed_pop(gst::ClockTime::NONE) {
Some(msg) if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) => break,
Some(_) => continue,
None => continue,
}
}
Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx) })
}
pub struct WebcamSink {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline,
}
impl WebcamSink {
pub fn new(uvc_dev: &str) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder = gst::ElementFactory::make("v4l2h264dec").build()?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
// Upcast to &gst::Element for the collection macros
pipeline.add_many(&[
src.upcast_ref(), &h264parse, &decoder, &convert, &sink
])?;
gst::Element::link_many(&[
src.upcast_ref(), &h264parse, &decoder, &convert, &sink
])?;
pipeline.set_state(gst::State::Playing)?;
Ok(Self { appsrc: src, _pipe: pipeline })
}
pub fn push(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut().unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.appsrc.push_buffer(buf);
}
}
/*─────────────────────────────────*/
/* gRPC → WebcamSink relay */
/*─────────────────────────────────*/
pub struct CameraRelay {
sink: WebcamSink, // the v4l2sink pipeline (or stub)
id: u32, // gRPC “id” (for future multicam)
frames: std::sync::atomic::AtomicU64,
}
impl CameraRelay {
pub fn new(id: u32, uvc_dev: &str) -> anyhow::Result<Self> {
Ok(Self {
sink: WebcamSink::new(uvc_dev)?,
id,
frames: std::sync::atomic::AtomicU64::new(0),
})
}
/// Push one VideoPacket coming from the client
pub fn feed(&self, pkt: VideoPacket) {
let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 60 == 0 {
tracing::debug!(target:"lesavka_server::video",
cam_id = self.id,
frame = n,
bytes = pkt.data.len(),
pts = pkt.pts,
"📸 srv webcam frame");
} else if n % 10 == 0 {
tracing::trace!(target:"lesavka_server::video",
cam_id = self.id,
bytes = pkt.data.len(),
"📸📥 srv pkt");
}
if cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE) {
if n % 120 == 0 {
let path = format!("/tmp/eye3-cli-{n:05}.h264");
if let Err(e) = std::fs::write(&path, &pkt.data) {
tracing::warn!("📸💾 dump failed: {e}");
} else {
tracing::debug!("📸💾 wrote {}", path);
}
}
}
self.sink.push(pkt);
}
}