// server/src/video.rs use anyhow::Context; use futures_util::Stream; use gst::MessageView::*; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::VideoPacket; use lesavka_common::process_metrics::ProcessCpuSampler; use std::os::unix::fs::FileTypeExt; use std::sync::Arc; use std::sync::OnceLock; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; use tokio::time::{Duration, Instant, sleep}; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{Level, debug, enabled, error, info, trace, warn}; pub use crate::video_sinks::{CameraRelay, HdmiSink, WebcamSink}; use crate::video_support::{ adjust_effective_fps, contains_idr, default_eye_fps, env_u32, env_usize, should_send_frame, }; const EYE_ID: [&str; 2] = ["l", "r"]; static START: OnceLock = OnceLock::new(); static SERVER_PROCESS_CPU_TENTHS: OnceLock> = OnceLock::new(); fn server_process_cpu_metric() -> Arc { Arc::clone(SERVER_PROCESS_CPU_TENTHS.get_or_init(|| { let metric = Arc::new(AtomicU32::new(0)); let metric_for_thread = Arc::clone(&metric); std::thread::spawn(move || { let mut sampler = ProcessCpuSampler::new(); loop { if let Some(value) = sampler.sample_tenths_percent() { metric_for_thread.store(value, Ordering::Relaxed); } std::thread::sleep(std::time::Duration::from_secs(1)); } }); metric })) } pub struct VideoStream { _pipeline: gst::Pipeline, #[cfg(not(coverage))] _bus_watch: Option, inner: ReceiverStream>, } impl Stream for VideoStream { type Item = Result; fn poll_next( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx) } } impl Drop for VideoStream { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); #[cfg(not(coverage))] { let _ = self._bus_watch.take(); } } } #[cfg(not(coverage))] struct BusWatchHandle { alive: Arc, join: Option>, } #[cfg(not(coverage))] impl BusWatchHandle { fn spawn(bus: gst::Bus, eye: String) -> Self { let alive = Arc::new(AtomicBool::new(true)); let alive_flag = Arc::clone(&alive); let join = std::thread::spawn(move || { while alive_flag.load(Ordering::Relaxed) { let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else { continue; }; match msg.view() { Error(err) => { error!( target:"lesavka_server::video", eye = %eye, "💥 pipeline error: {} ({})", err.error(), err.debug().unwrap_or_default() ); break; } Warning(warning) => { warn!( target:"lesavka_server::video", eye = %eye, "⚠️ pipeline warning: {} ({})", warning.error(), warning.debug().unwrap_or_default() ); } Info(info_msg) => { info!( target:"lesavka_server::video", eye = %eye, "📌 pipeline info: {} ({})", info_msg.error(), info_msg.debug().unwrap_or_default() ); } StateChanged(state) if state.current() == gst::State::Playing => { debug!(target:"lesavka_server::video", eye = %eye, "🎬 pipeline PLAYING"); } StateChanged(state) if state.current() == gst::State::Null => { debug!(target:"lesavka_server::video", eye = %eye, "🛑 pipeline stopped"); break; } Eos(..) => { debug!(target:"lesavka_server::video", eye = %eye, "🏁 pipeline EOS"); break; } _ => {} } } }); Self { alive, join: Some(join), } } } #[cfg(not(coverage))] impl Drop for BusWatchHandle { fn drop(&mut self) { self.alive.store(false, Ordering::Relaxed); if let Some(join) = self.join.take() { let _ = join.join(); } } } #[cfg(not(coverage))] fn start_eye_pipeline(pipeline: &gst::Pipeline, bus: &gst::Bus, eye: &str) -> anyhow::Result<()> { pipeline .set_state(gst::State::Playing) .context(format!("🎥 starting video pipeline eye-{eye}"))?; for _ in 0..20 { match bus.timed_pop(gst::ClockTime::from_mseconds(200)) { Some(msg) => match msg.view() { Error(err) => { let _ = pipeline.set_state(gst::State::Null); return Err(anyhow::anyhow!( "🎥 eye-{eye} pipeline error: {} ({})", err.error(), err.debug().unwrap_or_default() )); } StateChanged(state) if state.current() == gst::State::Playing => return Ok(()), _ => continue, }, None => continue, } } Ok(()) } fn eye_device_wait_timeout() -> Duration { Duration::from_millis( std::env::var("LESAVKA_EYE_DEVICE_WAIT_MS") .ok() .and_then(|value| value.parse::().ok()) .unwrap_or(5_000), ) } fn eye_device_wait_poll() -> Duration { Duration::from_millis( std::env::var("LESAVKA_EYE_DEVICE_POLL_MS") .ok() .and_then(|value| value.parse::().ok()) .map(|value| value.max(25)) .unwrap_or(100), ) } pub fn eye_source_profile() -> (u32, u32, u32) { let width = round_down_even_u32(env_u32("LESAVKA_EYE_SOURCE_WIDTH", 1920).max(320)); let height = round_down_even_u32(env_u32("LESAVKA_EYE_SOURCE_HEIGHT", 1080).max(180)); let fps = env_u32("LESAVKA_EYE_SOURCE_FPS", 30).max(1); (width, height, fps) } fn round_down_even_u32(value: u32) -> u32 { let rounded = value.max(2); rounded - (rounded % 2) } fn reset_stream_telemetry_window( last_window_sec: &AtomicU64, current_sec: u64, source_gap_peak_ms: &AtomicU32, send_gap_peak_ms: &AtomicU32, queue_peak_depth: &AtomicU32, ) { let prev = last_window_sec.load(Ordering::Relaxed); if current_sec <= prev { return; } if last_window_sec .compare_exchange(prev, current_sec, Ordering::SeqCst, Ordering::SeqCst) .is_ok() { source_gap_peak_ms.store(0, Ordering::Relaxed); send_gap_peak_ms.store(0, Ordering::Relaxed); queue_peak_depth.store(0, Ordering::Relaxed); } } #[derive(Clone, Copy, Debug)] struct EyeCaptureRequest { source_width: u32, source_height: u32, requested_width: u32, requested_height: u32, requested_fps: u32, max_bitrate_kbit: u32, downscale: bool, } fn normalize_eye_capture_request( requested_width: u32, requested_height: u32, requested_fps: u32, max_bitrate_kbit: u32, ) -> EyeCaptureRequest { let (source_width, source_height, source_fps) = eye_source_profile(); let requested_width = if requested_width == 0 { source_width } else { round_down_even_u32(requested_width.min(source_width).max(320)) }; let requested_height = if requested_height == 0 { source_height } else { round_down_even_u32(requested_height.min(source_height).max(180)) }; let requested_fps = if requested_fps == 0 { source_fps.max(1) } else { requested_fps.max(1).min(source_fps.max(1)) }; EyeCaptureRequest { source_width, source_height, requested_width, requested_height, requested_fps, max_bitrate_kbit: max_bitrate_kbit.max(800), downscale: requested_width < source_width || requested_height < source_height, } } #[cfg(not(coverage))] async fn wait_for_eye_device(dev: &str, eye: &str) -> anyhow::Result<()> { let timeout = eye_device_wait_timeout(); let poll = eye_device_wait_poll(); let deadline = Instant::now() + timeout; let last_detail = loop { let detail = match tokio::fs::metadata(dev).await { Ok(metadata) if metadata.file_type().is_char_device() => return Ok(()), Ok(metadata) => format!("device exists but is not a character device ({metadata:?})"), Err(err) => err.to_string(), }; if Instant::now() >= deadline { break detail; } sleep(poll).await; }; Err(anyhow::anyhow!( "🎥 eye-{eye} device {dev} was not ready within {} ms: {}", timeout.as_millis(), last_detail )) } #[cfg(coverage)] async fn wait_for_eye_device(dev: &str, eye: &str) -> anyhow::Result<()> { let timeout = eye_device_wait_timeout(); let poll = eye_device_wait_poll(); let deadline = Instant::now() + timeout; let last_detail = loop { let detail = match tokio::fs::metadata(dev).await { Ok(metadata) if metadata.file_type().is_char_device() => return Ok(()), Ok(metadata) => format!("device exists but is not a character device ({metadata:?})"), Err(err) => err.to_string(), }; if Instant::now() >= deadline { break detail; } sleep(poll).await; }; Err(anyhow::anyhow!( "🎥 eye-{eye} device {dev} was not ready within {} ms: {}", timeout.as_millis(), last_detail )) } /// Capture one eye stream from the local V4L2 gadget and expose it as a gRPC stream. /// /// Inputs: the V4L2 device node, logical eye id, and negotiated bitrate cap. /// Outputs: a `VideoStream` that yields H.264 access units for the requested eye. /// Why: the server keeps bitrate-aware pacing close to the capture pipeline so it can drop /// frames before they build up in gRPC queues and destabilize downstream playback. #[cfg(coverage)] pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result { eye_ball_with_request(dev, id, _max_bitrate_kbit, 0, 0, 0).await } #[cfg(coverage)] pub async fn eye_ball_with_request( dev: &str, id: u32, _max_bitrate_kbit: u32, _requested_width: u32, _requested_height: u32, _requested_fps: u32, ) -> anyhow::Result { let _ = EYE_ID[id as usize]; if dev.contains('"') { return Err(anyhow::anyhow!("invalid video source")); } let coverage_override = std::env::var("LESAVKA_TEST_VIDEO_SOURCE").ok(); let use_test_src = dev.eq_ignore_ascii_case("testsrc") || dev.eq_ignore_ascii_case("videotestsrc") || coverage_override.as_deref() == Some(dev); if !use_test_src { return Err(anyhow::anyhow!("video source unavailable")); } let _ = gst::init(); let pipeline = gst::Pipeline::new(); let (tx, rx) = tokio::sync::mpsc::channel(64); let _ = tx.try_send(Ok(VideoPacket { id: id.min(1), pts: 0, data: vec![0, 0, 0, 1, 0x65, 0x88, 0x84], ..Default::default() })); Ok(VideoStream { _pipeline: pipeline, #[cfg(not(coverage))] _bus_watch: None, inner: ReceiverStream::new(rx), }) } #[cfg(not(coverage))] pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Result { eye_ball_with_request(dev, id, max_bitrate_kbit, 0, 0, 0).await } #[cfg(not(coverage))] pub async fn eye_ball_with_request( dev: &str, id: u32, max_bitrate_kbit: u32, requested_width: u32, requested_height: u32, requested_fps: u32, ) -> anyhow::Result { let eye = EYE_ID[id as usize]; gst::init().context("gst init")?; let request = normalize_eye_capture_request( requested_width, requested_height, requested_fps, max_bitrate_kbit, ); let target_fps = if requested_fps > 0 { request.requested_fps } else { env_u32("LESAVKA_EYE_FPS", default_eye_fps(max_bitrate_kbit)).max(1) }; let min_fps = env_u32("LESAVKA_EYE_MIN_FPS", 12).clamp(1, target_fps); let adaptive = std::env::var("LESAVKA_EYE_ADAPTIVE") .map(|value| value != "0") .unwrap_or(true); info!( target: "lesavka_server::video", eye = %eye, max_bitrate_kbit, source_width = request.source_width, source_height = request.source_height, requested_width = request.requested_width, requested_height = request.requested_height, requested_fps = request.requested_fps, target_fps, min_fps, adaptive, "🎥 eye stream profile selected" ); let effective_fps = Arc::new(AtomicU32::new(target_fps)); let dropped_window = Arc::new(AtomicU64::new(0)); let dropped_total = Arc::new(AtomicU64::new(0)); let sent_window = Arc::new(AtomicU64::new(0)); let last_adjust_sec = Arc::new(AtomicU64::new(0)); let wait_for_idr = Arc::new(AtomicBool::new(false)); let last_sent = Arc::new(AtomicU64::new(0)); let last_source_pts = Arc::new(AtomicU64::new(0)); let source_gap_peak_ms = Arc::new(AtomicU32::new(0)); let send_gap_peak_ms = Arc::new(AtomicU32::new(0)); let queue_peak_depth = Arc::new(AtomicU32::new(0)); let last_telemetry_sec = Arc::new(AtomicU64::new(0)); let packet_seq = Arc::new(AtomicU64::new(0)); let queue_buffers = env_u32("LESAVKA_EYE_QUEUE_BUFFERS", 8).max(1); let appsink_buffers = env_u32("LESAVKA_EYE_APPSINK_BUFFERS", 8).max(1); let keyframe_interval = env_u32( "LESAVKA_EYE_KEYFRAME_INTERVAL", request.requested_fps.max(1).min(5), ) .clamp(1, request.requested_fps.max(1)); let use_test_src = dev.eq_ignore_ascii_case("testsrc") || dev.eq_ignore_ascii_case("videotestsrc"); let server_encoder_label = if use_test_src { "x264enc(testsrc)".to_string() } else if request.downscale { "x264enc".to_string() } else { "source-pass-through".to_string() }; let server_process_cpu_tenths = server_process_cpu_metric(); if !use_test_src { wait_for_eye_device(dev, eye).await?; } let desc = if use_test_src { let test_bitrate = env_u32("LESAVKA_EYE_TESTSRC_KBIT", request.max_bitrate_kbit); format!( "videotestsrc name=cam_{eye} is-live=true pattern=smpte ! \ video/x-raw,width={},height={},framerate={}/1 ! \ queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \ x264enc tune=zerolatency speed-preset=veryfast bitrate={test_bitrate} key-int-max={keyframe_interval} ! \ h264parse disable-passthrough=true config-interval=-1 ! \ video/x-h264,stream-format=byte-stream,alignment=au ! \ appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true", request.requested_width, request.requested_height, request.requested_fps, ) } else if request.downscale { format!( "v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \ queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \ h264parse disable-passthrough=true config-interval=-1 ! avdec_h264 ! videoconvert ! \ videoscale ! videorate ! video/x-raw,width={},height={},framerate={}/1,pixel-aspect-ratio=1/1 ! \ x264enc tune=zerolatency speed-preset=faster bitrate={} key-int-max={} ! \ h264parse disable-passthrough=true config-interval=-1 ! \ video/x-h264,stream-format=byte-stream,alignment=au ! \ appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true", request.requested_width, request.requested_height, request.requested_fps, request.max_bitrate_kbit, keyframe_interval, ) } else { format!( "v4l2src name=cam_{eye} device=\"{dev}\" io-mode=mmap do-timestamp=true ! \ queue max-size-buffers={queue_buffers} max-size-time=0 max-size-bytes=0 leaky=downstream ! \ h264parse disable-passthrough=true config-interval=-1 ! \ video/x-h264,stream-format=byte-stream,alignment=au ! \ appsink name=sink emit-signals=true max-buffers={appsink_buffers} drop=true" ) }; let pipeline = gst::parse::launch(&desc)? .downcast::() .expect("not a pipeline"); let sink = pipeline .by_name("sink") .expect("appsink") .dynamic_cast::() .expect("appsink down-cast"); let chan_capacity = env_usize("LESAVKA_EYE_CHAN_CAPACITY", 256).max(16); let (tx, rx) = tokio::sync::mpsc::channel(chan_capacity); if let Some(src_pad) = pipeline .by_name(&format!("cam_{eye}")) .and_then(|element| element.static_pad("src")) { src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| { if let Some(gst::PadProbeData::Event(ref event)) = info.data { if let gst::EventView::Caps(caps) = event.view() { trace!(target:"lesavka_server::video", ?caps, "🔍 new caps on {}", pad.name()); } } gst::PadProbeReturn::Ok }); } else { warn!(target:"lesavka_server::video", eye = %eye, "🍪 cam_{eye} not found - skipping pad-probe"); } let eye_name = eye.to_string(); let dropped_total_for_cb = Arc::clone(&dropped_total); let packet_seq_for_cb = Arc::clone(&packet_seq); let effective_fps_for_cb = Arc::clone(&effective_fps); let last_source_pts_for_cb = Arc::clone(&last_source_pts); let source_gap_peak_ms_for_cb = Arc::clone(&source_gap_peak_ms); let send_gap_peak_ms_for_cb = Arc::clone(&send_gap_peak_ms); let queue_peak_depth_for_cb = Arc::clone(&queue_peak_depth); let last_telemetry_sec_for_cb = Arc::clone(&last_telemetry_sec); let server_encoder_label_for_cb = server_encoder_label.clone(); let server_process_cpu_tenths_for_cb = Arc::clone(&server_process_cpu_tenths); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |sink| { let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; let is_idr = contains_idr(map.as_slice()); static FRAME: AtomicU64 = AtomicU64::new(0); let frame = FRAME.fetch_add(1, Ordering::Relaxed); if frame % 120 == 0 && is_idr { trace!(target: "lesavka_server::video", "eye-{eye}: delivered {frame} frames"); if enabled!(Level::TRACE) { let path = format!("/tmp/eye-{eye}-srv-{frame:05}.h264"); std::fs::write(&path, map.as_slice()).ok(); } } else if frame < 10 { debug!( target: "lesavka_server::video", eye = eye, frame, bytes = map.len(), pts = ?buffer.pts(), "⬆️ pushed video sample eye-{eye}" ); } if enabled!(Level::TRACE) && is_idr { trace!("eye-{eye}: IDR"); } let origin = *START.get_or_init(|| buffer.pts().unwrap_or(gst::ClockTime::ZERO)); let pts_us = buffer .pts() .unwrap_or(gst::ClockTime::ZERO) .saturating_sub(origin) .nseconds() / 1_000; let sec = pts_us / 1_000_000; reset_stream_telemetry_window( &last_telemetry_sec_for_cb, sec, &source_gap_peak_ms_for_cb, &send_gap_peak_ms_for_cb, &queue_peak_depth_for_cb, ); let previous_source_pts = last_source_pts_for_cb.swap(pts_us, Ordering::Relaxed); if previous_source_pts > 0 && pts_us > previous_source_pts { let source_gap_ms = ((pts_us.saturating_sub(previous_source_pts)) / 1_000) as u32; source_gap_peak_ms_for_cb.fetch_max(source_gap_ms, Ordering::Relaxed); } if adaptive { let prev = last_adjust_sec.load(Ordering::Relaxed); if sec > prev && last_adjust_sec .compare_exchange(prev, sec, Ordering::SeqCst, Ordering::SeqCst) .is_ok() { let dropped = dropped_window.swap(0, Ordering::Relaxed); let sent = sent_window.swap(0, Ordering::Relaxed); let current = effective_fps.load(Ordering::Relaxed).max(1); let next = adjust_effective_fps(current, min_fps, target_fps, dropped, sent); if next != current { effective_fps.store(next, Ordering::Relaxed); if next < current { warn!( target: "lesavka_server::video", eye = %eye_name, fps = next, "🎥 adaptive eye fps ↓" ); } else { info!( target: "lesavka_server::video", eye = %eye_name, fps = next, "🎥 adaptive eye fps ↑" ); } } } } let current_fps = effective_fps.load(Ordering::Relaxed).max(1); let last = last_sent.load(Ordering::Relaxed); if !should_send_frame(last, pts_us, current_fps) { return Ok(gst::FlowSuccess::Ok); } if last > 0 && pts_us > last { let send_gap_ms = ((pts_us.saturating_sub(last)) / 1_000) as u32; send_gap_peak_ms_for_cb.fetch_max(send_gap_ms, Ordering::Relaxed); } last_sent.store(pts_us, Ordering::Relaxed); if wait_for_idr.load(Ordering::Relaxed) && !is_idr { return Ok(gst::FlowSuccess::Ok); } let data = map.as_slice().to_vec(); let size = data.len(); let seq = packet_seq_for_cb.fetch_add(1, Ordering::Relaxed) + 1; let queue_depth = (chan_capacity.saturating_sub(tx.capacity())) as u32; queue_peak_depth_for_cb.fetch_max(queue_depth, Ordering::Relaxed); let pkt = VideoPacket { id, pts: pts_us, data, seq, effective_fps: effective_fps_for_cb.load(Ordering::Relaxed).max(1), dropped_total: dropped_total_for_cb.load(Ordering::Relaxed), queue_depth, server_source_gap_peak_ms: source_gap_peak_ms_for_cb.load(Ordering::Relaxed), server_send_gap_peak_ms: send_gap_peak_ms_for_cb.load(Ordering::Relaxed), server_queue_peak: queue_peak_depth_for_cb.load(Ordering::Relaxed), server_encoder_label: server_encoder_label_for_cb.clone(), server_process_cpu_tenths: server_process_cpu_tenths_for_cb .load(Ordering::Relaxed), }; match tx.try_send(Ok(pkt)) { Ok(_) => { sent_window.fetch_add(1, Ordering::Relaxed); if is_idr { wait_for_idr.store(false, Ordering::Relaxed); } trace!(target:"lesavka_server::video", eye = %eye, size = size, "🎥📤 sent"); } Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { dropped_window.fetch_add(1, Ordering::Relaxed); dropped_total_for_cb.fetch_add(1, Ordering::Relaxed); wait_for_idr.store(true, Ordering::Relaxed); static DROP_CNT: AtomicU64 = AtomicU64::new(0); let dropped = DROP_CNT.fetch_add(1, Ordering::Relaxed); if dropped % 120 == 0 { debug!( target:"lesavka_server::video", eye = %eye, dropped, "🎥⏳ channel full - dropping frames" ); } } Err(error) => error!("mpsc send err: {error}"), } Ok(gst::FlowSuccess::Ok) }) .build(), ); let bus = pipeline.bus().expect("bus"); start_eye_pipeline(&pipeline, &bus, eye)?; let bus_watch = BusWatchHandle::spawn(bus, eye.to_owned()); Ok(VideoStream { _pipeline: pipeline, _bus_watch: Some(bus_watch), inner: ReceiverStream::new(rx), }) }