lesavka/server/src/video.rs

141 lines
5.0 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// server/src/video.rs
use anyhow::Context;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst::{log, MessageView};
use lesavka_common::lesavka::VideoPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, enabled, trace, Level};
use futures_util::Stream;
const EYE_ID: [&str; 2] = ["l", "r"];
static START: std::sync::OnceLock<gst::ClockTime> = std::sync::OnceLock::new();
pub struct VideoStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<VideoPacket, Status>>,
}
impl Stream for VideoStream {
type Item = Result<VideoPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
impl Drop for VideoStream {
fn drop(&mut self) {
// shut down nicely avoids the “dispose element … READY/PLAYING …” spam
let _ = self._pipeline.set_state(gst::State::Null);
}
}
pub async fn eye_ball(
dev: &str,
id: u32,
_max_bitrate_kbit: u32,
) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
let desc = format!(
"v4l2src device=\"{dev}\" io-mode=mmap ! \
queue ! tsdemux name=d ! \
d. ! h264parse disable-passthrough=true config-interval=-1 ! \
video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=vsink emit-signals=true max-buffers=32 drop=true"
);
// let desc = format!(
// "v4l2src device={dev} io-mode=mmap ! \
// queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 ! tsdemux name=d ! \
// video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! tsdemux name=d ! \
// d. ! h264parse config-interval=1 ! queue ! appsink name=vsink emit-signals=true \
// d. ! aacparse ! queue ! h264parse config-interval=1 ! appsink name=sink \
// emit-signals=true drop=false sync=false"
// );
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
let sink = pipeline
.by_name("sink")
.expect("appsink")
.dynamic_cast::<gst_app::AppSink>()
.expect("appsink down-cast");
let (tx, rx) = tokio::sync::mpsc::channel(8192);
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
/* -------- pull frame ---------- */
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
/* -------- map once, reuse ----- */
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
/* -------- basic counters ------ */
static FRAME: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames");
if enabled!(Level::TRACE) {
let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n);
std::fs::write(&path, map.as_slice()).ok();
}
}
/* -------- detect SPS / IDR ---- */
if enabled!(Level::DEBUG) {
if let Some(&nal) = map.as_slice().get(4) {
if (nal & 0x1F) == 0x05 /* IDR */ {
debug!("eye-{eye}: IDR");
}
}
}
/* -------- timestamps ---------- */
let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO));
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.saturating_sub(origin)
.nseconds()
/ 1_000;
/* -------- ship over gRPC ----- */
let pkt = VideoPacket {
id,
pts: pts_us,
data: map.as_slice().to_vec(),
};
tracing::trace!("srv→grpc eye-{eye} {} bytes pts={}", pkt.data.len(), pkt.pts);
let _ = tx.try_send(Ok(pkt));
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
pipeline.set_state(gst::State::Playing)?;
let bus = pipeline.bus().unwrap();
loop {
match bus.timed_pop(gst::ClockTime::NONE) {
Some(msg) if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) => break,
Some(_) => continue,
None => continue,
}
}
Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx) })
}