lesavka/server/src/video.rs

715 lines
26 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// server/src/video.rs
use anyhow::Context;
use futures_util::Stream;
use gst::MessageView;
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{Level, debug, enabled, error, info, trace, warn};
use crate::camera::{CameraCodec, CameraConfig};
const EYE_ID: [&str; 2] = ["l", "r"];
static START: std::sync::OnceLock<gst::ClockTime> = std::sync::OnceLock::new();
static DEV_MODE: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
fn env_u32(name: &str, default: u32) -> u32 {
std::env::var(name)
.ok()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(default)
}
fn dev_mode_enabled() -> bool {
*DEV_MODE.get_or_init(|| std::env::var("LESAVKA_DEV_MODE").is_ok())
}
fn pick_h264_decoder() -> &'static str {
if gst::ElementFactory::find("v4l2h264dec").is_some() {
"v4l2h264dec"
} else if gst::ElementFactory::find("v4l2slh264dec").is_some() {
"v4l2slh264dec"
} else if gst::ElementFactory::find("omxh264dec").is_some() {
"omxh264dec"
} else {
"avdec_h264"
}
}
fn contains_idr(h264: &[u8]) -> bool {
// naive AnnexB scan for H.264 IDR (NAL type 5)
let mut i = 0;
while i + 4 < h264.len() {
// find start code 0x000001 or 0x00000001
if h264[i] == 0 && h264[i + 1] == 0 {
let offset = if h264[i + 2] == 1 {
3
} else if h264[i + 2] == 0 && h264[i + 3] == 1 {
4
} else {
i += 1;
continue;
};
let nal_idx = i + offset;
if nal_idx < h264.len() {
let nal = h264[nal_idx] & 0x1F;
if nal == 5 {
return true;
}
}
}
i += 1;
}
false
}
pub struct VideoStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<VideoPacket, Status>>,
}
impl Stream for VideoStream {
type Item = Result<VideoPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
impl Drop for VideoStream {
fn drop(&mut self) {
// shut down nicely - avoids the “dispose element … READY/PLAYING …” spam
let _ = self._pipeline.set_state(gst::State::Null);
}
}
pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
let bitrate_default_fps = match max_bitrate_kbit {
0 => 25,
1..=2_500 => 15,
2_501..=4_000 => 20,
_ => 25,
};
let target_fps = env_u32("LESAVKA_EYE_FPS", bitrate_default_fps).max(1);
let min_fps = env_u32("LESAVKA_EYE_MIN_FPS", 12).clamp(1, target_fps);
let adaptive = std::env::var("LESAVKA_EYE_ADAPTIVE")
.map(|v| v != "0")
.unwrap_or(true);
info!(
target: "lesavka_server::video",
eye = %eye,
max_bitrate_kbit,
target_fps,
min_fps,
adaptive,
"🎥 eye stream profile selected"
);
let effective_fps = Arc::new(std::sync::atomic::AtomicU32::new(target_fps));
let dropped_window = Arc::new(AtomicU64::new(0));
let sent_window = Arc::new(AtomicU64::new(0));
let last_adjust_sec = Arc::new(AtomicU64::new(0));
let wait_for_idr = Arc::new(AtomicBool::new(false));
let last_sent = Arc::new(AtomicU64::new(0));
let desc = format!(
"v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \
queue ! \
h264parse disable-passthrough=true config-interval=-1 ! \
video/x-h264,stream-format=byte-stream,alignment=au ! \
appsink name=sink emit-signals=true max-buffers=32 drop=true"
);
// let desc = format!(
// "v4l2src device={dev} io-mode=mmap ! \
// queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 ! tsdemux name=d ! \
// video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! tsdemux name=d ! \
// d. ! h264parse config-interval=1 ! queue ! appsink name=vsink emit-signals=true \
// d. ! aacparse ! queue ! h264parse config-interval=1 ! appsink name=sink \
// emit-signals=true drop=false sync=false"
// );
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
// let pipeline: gst::Pipeline = gst::parse_launch(&desc)?
// .downcast()
// .expect("not a pipeline");
let sink = pipeline
.by_name("sink")
.expect("appsink")
.dynamic_cast::<gst_app::AppSink>()
.expect("appsink down-cast");
let (tx, rx) = tokio::sync::mpsc::channel(8192);
/* ----- BUS WATCH: show errors & warnings immediately --------------- */
let bus = pipeline.bus().expect("bus");
if let Some(src_pad) = pipeline
.by_name(&format!("cam_{eye}"))
.and_then(|e| e.static_pad("src"))
{
src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if let gst::EventView::Caps(c) = ev.view() {
trace!(target:"lesavka_server::video",
?c, "🔍 new caps on {}", pad.name());
}
}
gst::PadProbeReturn::Ok
});
} else {
warn!(target:"lesavka_server::video",
eye = %eye,
"🍪 cam_{eye} not found - skipping pad-probe");
}
let eye_clone = eye.to_owned();
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(err) => {
error!(target:"lesavka_server::video",
eye = %eye_clone,
"💥 pipeline error: {} ({})",
err.error(), err.debug().unwrap_or_default());
}
Warning(w) => {
warn!(target:"lesavka_server::video",
eye = %eye_clone,
"⚠️ pipeline warning: {} ({})",
w.error(), w.debug().unwrap_or_default());
}
Info(i) => {
info!(target:"lesavka_server::video",
eye = %eye_clone,
"📌 pipeline info: {} ({})",
i.error(), i.debug().unwrap_or_default());
}
StateChanged(s) if s.current() == gst::State::Playing => {
debug!(target:"lesavka_server::video",
eye = %eye_clone,
"🎬 pipeline PLAYING");
}
_ => {}
}
}
});
let last_sent_cloned = last_sent.clone();
let effective_fps_cloned = effective_fps.clone();
let dropped_window_cloned = dropped_window.clone();
let sent_window_cloned = sent_window.clone();
let last_adjust_sec_cloned = last_adjust_sec.clone();
let wait_for_idr_cloned = wait_for_idr.clone();
let eye_name = eye.to_string();
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
/* -------- pull frame ---------- */
let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
/* -------- map once, reuse ----- */
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
let is_idr = contains_idr(map.as_slice());
/* -------- basic counters ------ */
static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 && is_idr {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames");
if enabled!(Level::TRACE) {
let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n);
std::fs::write(&path, map.as_slice()).ok();
}
} else if n < 10 {
debug!(target: "lesavka_server::video",
eye = eye, frame = n, bytes = map.len(),
pts = ?buffer.pts(), "⬆️ pushed video sample eye-{eye}");
}
/* -------- detect SPS / IDR ---- */
if enabled!(Level::DEBUG) && is_idr {
debug!("eye-{eye}: IDR");
}
/* -------- timestamps ---------- */
let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO));
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.saturating_sub(origin)
.nseconds()
/ 1_000;
if adaptive {
let sec = pts_us / 1_000_000;
let prev = last_adjust_sec_cloned.load(Ordering::Relaxed);
if sec > prev
&& last_adjust_sec_cloned
.compare_exchange(prev, sec, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
let dropped = dropped_window_cloned.swap(0, Ordering::Relaxed);
let sent = sent_window_cloned.swap(0, Ordering::Relaxed);
let total = dropped + sent;
if total > 0 {
let drop_ratio = dropped as f64 / total as f64;
let mut fps = effective_fps_cloned.load(Ordering::Relaxed).max(1);
if drop_ratio > 0.20 && fps > min_fps {
fps = fps.saturating_sub(2).max(min_fps);
effective_fps_cloned.store(fps, Ordering::Relaxed);
warn!(
target: "lesavka_server::video",
eye = %eye_name,
fps,
drop_ratio = %format_args!("{drop_ratio:.2}"),
"🎥 adaptive eye fps ↓"
);
} else if dropped == 0 && drop_ratio < 0.02 && fps < target_fps {
fps = (fps + 1).min(target_fps);
effective_fps_cloned.store(fps, Ordering::Relaxed);
info!(
target: "lesavka_server::video",
eye = %eye_name,
fps,
"🎥 adaptive eye fps ↑"
);
}
}
}
}
let cur_fps = effective_fps_cloned.load(Ordering::Relaxed).max(1);
let frame_interval_us = 1_000_000u64 / cur_fps as u64;
if frame_interval_us > 0 {
let last = last_sent_cloned.load(Ordering::Relaxed);
if last != 0 && pts_us.saturating_sub(last) < frame_interval_us {
return Ok(gst::FlowSuccess::Ok);
}
last_sent_cloned.store(pts_us, Ordering::Relaxed);
}
if wait_for_idr_cloned.load(Ordering::Relaxed) && !is_idr {
return Ok(gst::FlowSuccess::Ok);
}
/* -------- ship over gRPC ----- */
let data = map.as_slice().to_vec();
let size = data.len();
let pkt = VideoPacket {
id,
pts: pts_us,
data,
};
match tx.try_send(Ok(pkt)) {
Ok(_) => {
sent_window_cloned.fetch_add(1, Ordering::Relaxed);
if is_idr {
wait_for_idr_cloned.store(false, Ordering::Relaxed);
}
trace!(target:"lesavka_server::video",
eye = %eye,
size = size,
"🎥📤 sent");
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
dropped_window_cloned.fetch_add(1, Ordering::Relaxed);
wait_for_idr_cloned.store(true, Ordering::Relaxed);
static DROP_CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let c = DROP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if c % 120 == 0 {
debug!(target:"lesavka_server::video",
eye = %eye,
dropped = c,
"🎥⏳ channel full - dropping frames");
}
}
Err(e) => error!("mpsc send err: {e}"),
}
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
pipeline
.set_state(gst::State::Playing)
.context("🎥 starting video pipeline eye-{eye}")?;
let bus = pipeline.bus().unwrap();
loop {
match bus.timed_pop(gst::ClockTime::NONE) {
Some(msg)
if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) =>
{
break;
}
Some(_) => continue,
None => continue,
}
}
Ok(VideoStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
})
}
pub struct WebcamSink {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline,
}
impl WebcamSink {
pub fn new(uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let width = cfg.width as i32;
let height = cfg.height as i32;
let fps = cfg.fps.max(1) as i32;
let use_mjpeg = matches!(cfg.codec, CameraCodec::Mjpeg);
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
let block = std::env::var("LESAVKA_UVC_APP_BLOCK")
.ok()
.map(|v| v != "0")
.unwrap_or(false);
src.set_property("block", &block);
if use_mjpeg {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.field("pixel-aspect-ratio", gst::Fraction::new(1, 1))
.field("colorimetry", "2:4:7:1")
.build();
src.set_caps(Some(&caps_mjpeg));
let queue = gst::ElementFactory::make("queue").build()?;
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &caps_mjpeg)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
pipeline.add_many(&[src.upcast_ref(), &queue, &capsfilter, &sink])?;
gst::Element::link_many(&[src.upcast_ref(), &queue, &capsfilter, &sink])?;
} else {
let caps_h264 = gst::Caps::builder("video/x-h264")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
let raw_caps = gst::Caps::builder("video/x-raw")
.field("format", "YUY2")
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
src.set_caps(Some(&caps_h264));
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder_name = pick_h264_decoder();
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building decoder element {decoder_name}"))?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let scale = gst::ElementFactory::make("videoscale").build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
pipeline.add_many(&[
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&scale,
&caps,
&sink,
])?;
gst::Element::link_many(&[
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&scale,
&caps,
&sink,
])?;
}
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
appsrc: src,
_pipe: pipeline,
})
}
pub fn push(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
if let Err(err) = self.appsrc.push_buffer(buf) {
tracing::warn!(target:"lesavka_server::video", %err, "📸⚠️ appsrc push failed");
}
}
}
pub struct HdmiSink {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline,
}
impl HdmiSink {
pub fn new(cfg: &CameraConfig) -> anyhow::Result<Self> {
gst::init()?;
let pipeline = gst::Pipeline::new();
let width = cfg.width as i32;
let height = cfg.height as i32;
let fps = cfg.fps.max(1) as i32;
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
let raw_caps = gst::Caps::builder("video/x-raw")
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let queue = gst::ElementFactory::make("queue")
.property("max-size-buffers", 4u32)
.build()?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let scale = gst::ElementFactory::make("videoscale").build()?;
let sink = build_hdmi_sink(cfg)?;
match cfg.codec {
CameraCodec::H264 => {
let caps_h264 = gst::Caps::builder("video/x-h264")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
src.set_caps(Some(&caps_h264));
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder_name = pick_h264_decoder();
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building decoder element {decoder_name}"))?;
pipeline.add_many(&[
src.upcast_ref(),
&queue,
&h264parse,
&decoder,
&convert,
&scale,
&capsfilter,
&sink,
])?;
gst::Element::link_many(&[
src.upcast_ref(),
&queue,
&h264parse,
&decoder,
&convert,
&scale,
&capsfilter,
&sink,
])?;
}
CameraCodec::Mjpeg => {
let caps_mjpeg = gst::Caps::builder("image/jpeg")
.field("parsed", true)
.field("width", width)
.field("height", height)
.field("framerate", gst::Fraction::new(fps, 1))
.build();
src.set_caps(Some(&caps_mjpeg));
let jpegdec = gst::ElementFactory::make("jpegdec").build()?;
pipeline.add_many(&[
src.upcast_ref(),
&queue,
&jpegdec,
&convert,
&scale,
&capsfilter,
&sink,
])?;
gst::Element::link_many(&[
src.upcast_ref(),
&queue,
&jpegdec,
&convert,
&scale,
&capsfilter,
&sink,
])?;
}
}
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
appsrc: src,
_pipe: pipeline,
})
}
pub fn push(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
if let Err(err) = self.appsrc.push_buffer(buf) {
tracing::warn!(target:"lesavka_server::video", %err, "📺⚠️ HDMI appsrc push failed");
}
}
}
fn build_hdmi_sink(cfg: &CameraConfig) -> anyhow::Result<gst::Element> {
if let Ok(name) = std::env::var("LESAVKA_HDMI_SINK") {
return gst::ElementFactory::make(&name)
.build()
.context("building HDMI sink");
}
if gst::ElementFactory::find("kmssink").is_some() {
let sink = gst::ElementFactory::make("kmssink").build()?;
if let Some(connector) = cfg.hdmi.as_ref().and_then(|h| h.id) {
if sink.has_property("connector-id", None) {
sink.set_property("connector-id", &(connector as i32));
} else {
tracing::warn!(
target: "lesavka_server::video",
%connector,
"kmssink does not expose connector-id property; using default connector"
);
}
}
sink.set_property("sync", &false);
return Ok(sink);
}
let sink = gst::ElementFactory::make("autovideosink")
.build()
.context("building HDMI sink")?;
let _ = sink.set_property("sync", &false);
Ok(sink)
}
/*─────────────────────────────────*/
/* gRPC → CameraSink relay */
/*─────────────────────────────────*/
enum CameraSink {
Uvc(WebcamSink),
Hdmi(HdmiSink),
}
impl CameraSink {
fn push(&self, pkt: VideoPacket) {
match self {
CameraSink::Uvc(sink) => sink.push(pkt),
CameraSink::Hdmi(sink) => sink.push(pkt),
}
}
}
pub struct CameraRelay {
sink: CameraSink,
id: u32, // gRPC “id” (for future multicam)
frames: std::sync::atomic::AtomicU64,
}
impl CameraRelay {
pub fn new_uvc(id: u32, uvc_dev: &str, cfg: &CameraConfig) -> anyhow::Result<Self> {
Ok(Self {
sink: CameraSink::Uvc(WebcamSink::new(uvc_dev, cfg)?),
id,
frames: std::sync::atomic::AtomicU64::new(0),
})
}
pub fn new_hdmi(id: u32, cfg: &CameraConfig) -> anyhow::Result<Self> {
Ok(Self {
sink: CameraSink::Hdmi(HdmiSink::new(cfg)?),
id,
frames: std::sync::atomic::AtomicU64::new(0),
})
}
/// Push one VideoPacket coming from the client
pub fn feed(&self, pkt: VideoPacket) {
let n = self
.frames
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 60 == 0 {
tracing::debug!(target:"lesavka_server::video",
cam_id = self.id,
frame = n,
bytes = pkt.data.len(),
pts = pkt.pts,
"📸 srv webcam frame");
} else if n % 10 == 0 {
tracing::trace!(target:"lesavka_server::video",
cam_id = self.id,
bytes = pkt.data.len(),
"📸📥 srv pkt");
}
if dev_mode_enabled()
&& (cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE))
&& contains_idr(&pkt.data)
{
let path = format!("/tmp/eye3-cli-{n:05}.h264");
if let Err(e) = std::fs::write(&path, &pkt.data) {
tracing::warn!("📸💾 dump failed: {e}");
} else {
tracing::debug!("📸💾 wrote {}", path);
}
}
self.sink.push(pkt);
}
}