lesavka/server/src/video.rs

107 lines
3.8 KiB
Rust
Raw Normal View History

2025-06-21 05:21:57 -05:00
// server/src/video.rs
use anyhow::Context;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
2025-06-27 14:01:29 -05:00
use gst::log;
2025-06-23 07:18:26 -05:00
use lesavka_common::lesavka::VideoPacket;
2025-06-21 05:21:57 -05:00
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
2025-06-27 22:51:50 -05:00
use tracing::{debug, enabled, trace, Level};
2025-06-21 05:21:57 -05:00
2025-06-28 03:34:48 -05:00
const EYE_ID: [&str; 2] = ["l", "r"];
2025-06-27 14:01:29 -05:00
static START: std::sync::OnceLock<gst::ClockTime> = std::sync::OnceLock::new();
2025-06-27 22:51:50 -05:00
pub async fn eye_ball(
2025-06-21 05:21:57 -05:00
dev: &str,
id: u32,
2025-06-25 16:52:26 -05:00
_max_bitrate_kbit: u32,
2025-06-21 05:21:57 -05:00
) -> anyhow::Result<ReceiverStream<Result<VideoPacket, Status>>> {
2025-06-28 03:34:48 -05:00
let eye = EYE_ID[id as usize];
2025-06-21 05:21:57 -05:00
gst::init().context("gst init")?;
let desc = format!(
2025-06-27 14:01:29 -05:00
"v4l2src device={dev} io-mode=mmap ! \
2025-06-28 01:08:57 -05:00
video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \
h264parse config-interval=-1 ! \
2025-06-27 14:01:29 -05:00
appsink name=sink emit-signals=true drop=true sync=false"
2025-06-27 22:51:50 -05:00
);
2025-06-21 05:21:57 -05:00
2025-06-23 20:51:52 -05:00
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
2025-06-27 22:51:50 -05:00
.expect("pipeline downcast");
2025-06-21 05:21:57 -05:00
2025-06-23 20:57:18 -05:00
let sink = pipeline
.by_name("sink")
.expect("appsink")
.dynamic_cast::<gst_app::AppSink>()
2025-06-27 22:51:50 -05:00
.expect("appsink downcast");
2025-06-23 20:57:18 -05:00
2025-06-21 05:21:57 -05:00
let (tx, rx) = tokio::sync::mpsc::channel(256);
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
2025-06-27 22:51:50 -05:00
/* -------- pull frame ---------- */
2025-06-21 05:21:57 -05:00
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
2025-06-28 01:08:57 -05:00
/* -------- map once, reuse ----- */
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
2025-06-27 22:51:50 -05:00
/* -------- basic counters ------ */
static FRAME: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 {
2025-06-28 03:34:48 -05:00
// trace!("eye{eye}: delivered {n} frames");
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames");
2025-06-28 01:08:57 -05:00
if enabled!(Level::TRACE) {
2025-06-28 03:34:48 -05:00
let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n);
2025-06-28 01:08:57 -05:00
std::fs::write(&path, map.as_slice()).ok();
}
2025-06-27 22:51:50 -05:00
}
2025-06-23 20:45:21 -05:00
2025-06-27 22:51:50 -05:00
// write first IDR to disk (quick sanity check)
if n == 0 {
2025-06-28 00:05:13 -05:00
// let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
2025-06-28 03:34:48 -05:00
std::fs::write(format!("/tmp/eye-{eye}-idr.h264"), map.as_slice()).ok();
2025-06-27 22:51:50 -05:00
}
/* -------- detect SPS / IDR ---- */
2025-06-26 17:12:59 -05:00
if enabled!(Level::DEBUG) {
2025-06-27 22:51:50 -05:00
if let Some(&nal) = map.as_slice().get(4) {
if (nal & 0x1F) == 0x05 /* IDR */ {
2025-06-28 03:34:48 -05:00
debug!("eye-{eye}: IDR");
2025-06-26 17:12:59 -05:00
}
}
}
2025-06-27 22:51:50 -05:00
/* -------- timestamps ---------- */
let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO));
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.saturating_sub(origin)
.nseconds()
/ 1_000;
/* -------- ship over gRPC ----- */
2025-06-21 05:21:57 -05:00
let pkt = VideoPacket {
id,
2025-06-23 20:45:21 -05:00
pts: pts_us,
2025-06-21 05:21:57 -05:00
data: map.as_slice().to_vec(),
};
2025-06-28 03:34:48 -05:00
tracing::trace!("srv→grpc eye-{eye} {} bytes pts={}", pkt.data.len(), pkt.pts);
2025-06-27 22:51:50 -05:00
let _ = tx.try_send(Ok(pkt));
2025-06-21 05:21:57 -05:00
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
pipeline.set_state(gst::State::Playing)?;
Ok(ReceiverStream::new(rx))
}