lesavka/server/src/video.rs

297 lines
12 KiB
Rust

// server/src/video.rs
use anyhow::Context;
use futures_util::Stream;
use gst::MessageView;
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{Level, debug, enabled, error, info, trace, warn};
pub use crate::video_sinks::{CameraRelay, HdmiSink, WebcamSink};
use crate::video_support::{
adjust_effective_fps, contains_idr, default_eye_fps, env_u32, env_usize, should_send_frame,
};
const EYE_ID: [&str; 2] = ["l", "r"];
static START: OnceLock<gst::ClockTime> = OnceLock::new();
pub struct VideoStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<VideoPacket, Status>>,
}
impl Stream for VideoStream {
type Item = Result<VideoPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
impl Drop for VideoStream {
fn drop(&mut self) {
let _ = self._pipeline.set_state(gst::State::Null);
}
}
/// Capture one eye stream from the local V4L2 gadget and expose it as a gRPC stream.
///
/// Inputs: the V4L2 device node, logical eye id, and negotiated bitrate cap.
/// Outputs: a `VideoStream` that yields H.264 access units for the requested eye.
/// Why: the server keeps bitrate-aware pacing close to the capture pipeline so it can drop
/// frames before they build up in gRPC queues and destabilize downstream playback.
pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
let target_fps = env_u32("LESAVKA_EYE_FPS", default_eye_fps(max_bitrate_kbit)).max(1);
let min_fps = env_u32("LESAVKA_EYE_MIN_FPS", 12).clamp(1, target_fps);
let adaptive = std::env::var("LESAVKA_EYE_ADAPTIVE")
.map(|value| value != "0")
.unwrap_or(true);
info!(
target: "lesavka_server::video",
eye = %eye,
max_bitrate_kbit,
target_fps,
min_fps,
adaptive,
"🎥 eye stream profile selected"
);
let effective_fps = Arc::new(AtomicU32::new(target_fps));
let dropped_window = Arc::new(AtomicU64::new(0));
let sent_window = Arc::new(AtomicU64::new(0));
let last_adjust_sec = Arc::new(AtomicU64::new(0));
let wait_for_idr = Arc::new(AtomicBool::new(false));
let last_sent = Arc::new(AtomicU64::new(0));
let queue_buffers = env_u32("LESAVKA_EYE_QUEUE_BUFFERS", 8).max(1);
let appsink_buffers = env_u32("LESAVKA_EYE_APPSINK_BUFFERS", 8).max(1);
let desc = format!(
"v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \
queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \
h264parse disable-passthrough=true config-interval=-1 ! \
video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true"
);
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
let sink = pipeline
.by_name("sink")
.expect("appsink")
.dynamic_cast::<gst_app::AppSink>()
.expect("appsink down-cast");
let chan_capacity = env_usize("LESAVKA_EYE_CHAN_CAPACITY", 256).max(16);
let (tx, rx) = tokio::sync::mpsc::channel(chan_capacity);
let bus = pipeline.bus().expect("bus");
if let Some(src_pad) = pipeline
.by_name(&format!("cam_{eye}"))
.and_then(|element| element.static_pad("src"))
{
src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| {
if let Some(gst::PadProbeData::Event(ref event)) = info.data {
if let gst::EventView::Caps(caps) = event.view() {
trace!(target:"lesavka_server::video", ?caps, "🔍 new caps on {}", pad.name());
}
}
gst::PadProbeReturn::Ok
});
} else {
warn!(target:"lesavka_server::video", eye = %eye, "🍪 cam_{eye} not found - skipping pad-probe");
}
let eye_clone = eye.to_owned();
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(err) => {
error!(
target:"lesavka_server::video",
eye = %eye_clone,
"💥 pipeline error: {} ({})",
err.error(),
err.debug().unwrap_or_default()
);
}
Warning(warning) => {
warn!(
target:"lesavka_server::video",
eye = %eye_clone,
"⚠️ pipeline warning: {} ({})",
warning.error(),
warning.debug().unwrap_or_default()
);
}
Info(info_msg) => {
info!(
target:"lesavka_server::video",
eye = %eye_clone,
"📌 pipeline info: {} ({})",
info_msg.error(),
info_msg.debug().unwrap_or_default()
);
}
StateChanged(state) if state.current() == gst::State::Playing => {
debug!(target:"lesavka_server::video", eye = %eye_clone, "🎬 pipeline PLAYING");
}
_ => {}
}
}
});
let eye_name = eye.to_string();
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
let is_idr = contains_idr(map.as_slice());
static FRAME: AtomicU64 = AtomicU64::new(0);
let frame = FRAME.fetch_add(1, Ordering::Relaxed);
if frame % 120 == 0 && is_idr {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {frame} frames");
if enabled!(Level::TRACE) {
let path = format!("/tmp/eye-{eye}-srv-{frame:05}.h264");
std::fs::write(&path, map.as_slice()).ok();
}
} else if frame < 10 {
debug!(
target: "lesavka_server::video",
eye = eye,
frame,
bytes = map.len(),
pts = ?buffer.pts(),
"⬆️ pushed video sample eye-{eye}"
);
}
if enabled!(Level::TRACE) && is_idr {
trace!("eye-{eye}: IDR");
}
let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO));
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.saturating_sub(origin)
.nseconds()
/ 1_000;
if adaptive {
let sec = pts_us / 1_000_000;
let prev = last_adjust_sec.load(Ordering::Relaxed);
if sec > prev
&& last_adjust_sec
.compare_exchange(prev, sec, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let dropped = dropped_window.swap(0, Ordering::Relaxed);
let sent = sent_window.swap(0, Ordering::Relaxed);
let current = effective_fps.load(Ordering::Relaxed).max(1);
let next = adjust_effective_fps(current, min_fps, target_fps, dropped, sent);
if next != current {
effective_fps.store(next, Ordering::Relaxed);
if next < current {
warn!(
target: "lesavka_server::video",
eye = %eye_name,
fps = next,
"🎥 adaptive eye fps ↓"
);
} else {
info!(
target: "lesavka_server::video",
eye = %eye_name,
fps = next,
"🎥 adaptive eye fps ↑"
);
}
}
}
}
let current_fps = effective_fps.load(Ordering::Relaxed).max(1);
let last = last_sent.load(Ordering::Relaxed);
if !should_send_frame(last, pts_us, current_fps) {
return Ok(gst::FlowSuccess::Ok);
}
last_sent.store(pts_us, Ordering::Relaxed);
if wait_for_idr.load(Ordering::Relaxed) && !is_idr {
return Ok(gst::FlowSuccess::Ok);
}
let data = map.as_slice().to_vec();
let size = data.len();
let pkt = VideoPacket { id, pts: pts_us, data };
match tx.try_send(Ok(pkt)) {
Ok(_) => {
sent_window.fetch_add(1, Ordering::Relaxed);
if is_idr {
wait_for_idr.store(false, Ordering::Relaxed);
}
trace!(target:"lesavka_server::video", eye = %eye, size = size, "🎥📤 sent");
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
dropped_window.fetch_add(1, Ordering::Relaxed);
wait_for_idr.store(true, Ordering::Relaxed);
static DROP_CNT: AtomicU64 = AtomicU64::new(0);
let dropped = DROP_CNT.fetch_add(1, Ordering::Relaxed);
if dropped % 120 == 0 {
debug!(
target:"lesavka_server::video",
eye = %eye,
dropped,
"🎥⏳ channel full - dropping frames"
);
}
}
Err(error) => error!("mpsc send err: {error}"),
}
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
pipeline
.set_state(gst::State::Playing)
.context("🎥 starting video pipeline eye-{eye}")?;
let bus = pipeline.bus().unwrap();
loop {
match bus.timed_pop(gst::ClockTime::NONE) {
Some(msg)
if matches!(msg.view(), MessageView::StateChanged(state)
if state.current() == gst::State::Playing) =>
{
break;
}
Some(_) | None => continue,
}
}
Ok(VideoStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
})
}