From ae46daa6f83ed0929e6d3e7c902e35fe0241d074 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 15 Apr 2026 11:37:43 -0300 Subject: [PATCH] fix(server): wait for eye devices before preview startup --- server/src/video.rs | 229 +++++++++++++----- .../tests/server_video_include_contract.rs | 39 +++ 2 files changed, 208 insertions(+), 60 deletions(-) diff --git a/server/src/video.rs b/server/src/video.rs index b57c95b..4c843c8 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -2,15 +2,16 @@ use anyhow::Context; use futures_util::Stream; -use gst::MessageView; use gst::MessageView::*; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::VideoPacket; +use std::os::unix::fs::FileTypeExt; use std::sync::Arc; use std::sync::OnceLock; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}; +use tokio::time::{Duration, Instant, sleep}; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{Level, debug, enabled, error, info, trace, warn}; @@ -25,6 +26,8 @@ static START: OnceLock = OnceLock::new(); pub struct VideoStream { _pipeline: gst::Pipeline, + #[cfg(not(coverage))] + _bus_watch: Option, inner: ReceiverStream>, } @@ -42,9 +45,162 @@ impl Stream for VideoStream { impl Drop for VideoStream { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); + #[cfg(not(coverage))] + { + let _ = self._bus_watch.take(); + } } } +#[cfg(not(coverage))] +struct BusWatchHandle { + alive: Arc, + join: Option>, +} + +#[cfg(not(coverage))] +impl BusWatchHandle { + fn spawn(bus: gst::Bus, eye: String) -> Self { + let alive = Arc::new(AtomicBool::new(true)); + let alive_flag = Arc::clone(&alive); + let join = std::thread::spawn(move || { + while alive_flag.load(Ordering::Relaxed) { + let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else { + continue; + }; + match msg.view() { + Error(err) => { + error!( + target:"lesavka_server::video", + eye = %eye, + "💥 pipeline error: {} ({})", + err.error(), + err.debug().unwrap_or_default() + ); + break; + } + Warning(warning) => { + warn!( + target:"lesavka_server::video", + eye = %eye, + "⚠️ pipeline warning: {} ({})", + warning.error(), + warning.debug().unwrap_or_default() + ); + } + Info(info_msg) => { + info!( + target:"lesavka_server::video", + eye = %eye, + "📌 pipeline info: {} ({})", + info_msg.error(), + info_msg.debug().unwrap_or_default() + ); + } + StateChanged(state) if state.current() == gst::State::Playing => { + debug!(target:"lesavka_server::video", eye = %eye, "🎬 pipeline PLAYING"); + } + StateChanged(state) if state.current() == gst::State::Null => { + debug!(target:"lesavka_server::video", eye = %eye, "🛑 pipeline stopped"); + break; + } + Eos(..) => { + debug!(target:"lesavka_server::video", eye = %eye, "🏁 pipeline EOS"); + break; + } + _ => {} + } + } + }); + Self { + alive, + join: Some(join), + } + } +} + +#[cfg(not(coverage))] +impl Drop for BusWatchHandle { + fn drop(&mut self) { + self.alive.store(false, Ordering::Relaxed); + if let Some(join) = self.join.take() { + let _ = join.join(); + } + } +} + +#[cfg(not(coverage))] +fn start_eye_pipeline(pipeline: &gst::Pipeline, bus: &gst::Bus, eye: &str) -> anyhow::Result<()> { + pipeline + .set_state(gst::State::Playing) + .context(format!("🎥 starting video pipeline eye-{eye}"))?; + for _ in 0..20 { + match bus.timed_pop(gst::ClockTime::from_mseconds(200)) { + Some(msg) => match msg.view() { + Error(err) => { + let _ = pipeline.set_state(gst::State::Null); + return Err(anyhow::anyhow!( + "🎥 eye-{eye} pipeline error: {} ({})", + err.error(), + err.debug().unwrap_or_default() + )); + } + StateChanged(state) if state.current() == gst::State::Playing => return Ok(()), + _ => continue, + }, + None => continue, + } + } + Ok(()) +} + +#[cfg(not(coverage))] +fn eye_device_wait_timeout() -> Duration { + Duration::from_millis( + std::env::var("LESAVKA_EYE_DEVICE_WAIT_MS") + .ok() + .and_then(|value| value.parse::().ok()) + .unwrap_or(5_000), + ) +} + +#[cfg(not(coverage))] +fn eye_device_wait_poll() -> Duration { + Duration::from_millis( + std::env::var("LESAVKA_EYE_DEVICE_POLL_MS") + .ok() + .and_then(|value| value.parse::().ok()) + .map(|value| value.max(25)) + .unwrap_or(100), + ) +} + +#[cfg(not(coverage))] +async fn wait_for_eye_device(dev: &str, eye: &str) -> anyhow::Result<()> { + let timeout = eye_device_wait_timeout(); + let poll = eye_device_wait_poll(); + let deadline = Instant::now() + timeout; + let last_detail = loop { + let detail = match tokio::fs::metadata(dev).await { + Ok(metadata) if metadata.file_type().is_char_device() => return Ok(()), + Ok(metadata) => format!("device exists but is not a character device ({metadata:?})"), + Err(err) => err.to_string(), + }; + + if Instant::now() >= deadline { + break detail; + } + + sleep(poll).await; + }; + + Err(anyhow::anyhow!( + "🎥 eye-{eye} device {dev} was not ready within {} ms: {}", + timeout.as_millis(), + last_detail + )) +} + /// Capture one eye stream from the local V4L2 gadget and expose it as a gRPC stream. /// /// Inputs: the V4L2 device node, logical eye id, and negotiated bitrate cap. @@ -59,10 +215,9 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res } let coverage_override = std::env::var("LESAVKA_TEST_VIDEO_SOURCE").ok(); - let use_test_src = - dev.eq_ignore_ascii_case("testsrc") - || dev.eq_ignore_ascii_case("videotestsrc") - || coverage_override.as_deref() == Some(dev); + let use_test_src = dev.eq_ignore_ascii_case("testsrc") + || dev.eq_ignore_ascii_case("videotestsrc") + || coverage_override.as_deref() == Some(dev); if !use_test_src { return Err(anyhow::anyhow!("video source unavailable")); } @@ -79,6 +234,8 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res Ok(VideoStream { _pipeline: pipeline, + #[cfg(not(coverage))] + _bus_watch: None, inner: ReceiverStream::new(rx), }) } @@ -114,6 +271,9 @@ pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Resu let appsink_buffers = env_u32("LESAVKA_EYE_APPSINK_BUFFERS", 8).max(1); let use_test_src = dev.eq_ignore_ascii_case("testsrc") || dev.eq_ignore_ascii_case("videotestsrc"); + if !use_test_src { + wait_for_eye_device(dev, eye).await?; + } let desc = if use_test_src { let test_bitrate = env_u32("LESAVKA_EYE_TESTSRC_KBIT", max_bitrate_kbit.max(800)); format!( @@ -148,7 +308,6 @@ pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Resu let chan_capacity = env_usize("LESAVKA_EYE_CHAN_CAPACITY", 256).max(16); let (tx, rx) = tokio::sync::mpsc::channel(chan_capacity); - let bus = pipeline.bus().expect("bus"); if let Some(src_pad) = pipeline .by_name(&format!("cam_{eye}")) .and_then(|element| element.static_pad("src")) @@ -165,45 +324,6 @@ pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Resu warn!(target:"lesavka_server::video", eye = %eye, "🍪 cam_{eye} not found - skipping pad-probe"); } - let eye_clone = eye.to_owned(); - std::thread::spawn(move || { - for msg in bus.iter_timed(gst::ClockTime::NONE) { - match msg.view() { - Error(err) => { - error!( - target:"lesavka_server::video", - eye = %eye_clone, - "💥 pipeline error: {} ({})", - err.error(), - err.debug().unwrap_or_default() - ); - } - Warning(warning) => { - warn!( - target:"lesavka_server::video", - eye = %eye_clone, - "⚠️ pipeline warning: {} ({})", - warning.error(), - warning.debug().unwrap_or_default() - ); - } - Info(info_msg) => { - info!( - target:"lesavka_server::video", - eye = %eye_clone, - "📌 pipeline info: {} ({})", - info_msg.error(), - info_msg.debug().unwrap_or_default() - ); - } - StateChanged(state) if state.current() == gst::State::Playing => { - debug!(target:"lesavka_server::video", eye = %eye_clone, "🎬 pipeline PLAYING"); - } - _ => {} - } - } - }); - let eye_name = eye.to_string(); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() @@ -321,24 +441,13 @@ pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Resu .build(), ); - pipeline - .set_state(gst::State::Playing) - .context("🎥 starting video pipeline eye-{eye}")?; - let bus = pipeline.bus().unwrap(); - for _ in 0..20 { - match bus.timed_pop(gst::ClockTime::from_mseconds(200)) { - Some(msg) - if matches!(msg.view(), MessageView::StateChanged(state) - if state.current() == gst::State::Playing) => - { - break; - } - Some(_) | None => continue, - } - } + let bus = pipeline.bus().expect("bus"); + start_eye_pipeline(&pipeline, &bus, eye)?; + let bus_watch = BusWatchHandle::spawn(bus, eye.to_owned()); Ok(VideoStream { _pipeline: pipeline, + _bus_watch: Some(bus_watch), inner: ReceiverStream::new(rx), }) } diff --git a/testing/tests/server_video_include_contract.rs b/testing/tests/server_video_include_contract.rs index ad833cd..08204de 100644 --- a/testing/tests/server_video_include_contract.rs +++ b/testing/tests/server_video_include_contract.rs @@ -46,6 +46,8 @@ mod video_include_contract { let mut stream = VideoStream { _pipeline: gst::Pipeline::new(), + #[cfg(not(coverage))] + _bus_watch: None, inner: ReceiverStream::new(rx), }; @@ -66,6 +68,8 @@ mod video_include_contract { let (_tx, rx) = tokio::sync::mpsc::channel(1); let stream = VideoStream { _pipeline: gst::Pipeline::new(), + #[cfg(not(coverage))] + _bus_watch: None, inner: ReceiverStream::new(rx), }; drop(stream); @@ -211,4 +215,39 @@ mod video_include_contract { }); }); } + + #[test] + #[serial] + fn wait_for_eye_device_accepts_existing_character_device() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + with_var("LESAVKA_EYE_DEVICE_WAIT_MS", Some("50"), || { + rt.block_on(async { + wait_for_eye_device("/dev/null", "l") + .await + .expect("/dev/null should be ready"); + }); + }); + } + + #[test] + #[serial] + fn wait_for_eye_device_times_out_for_missing_path() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + let dir = tempfile::tempdir().expect("tempdir"); + let missing = dir.path().join("lesavka-eye-missing"); + with_var("LESAVKA_EYE_DEVICE_WAIT_MS", Some("50"), || { + with_var("LESAVKA_EYE_DEVICE_POLL_MS", Some("25"), || { + rt.block_on(async { + let err = wait_for_eye_device( + missing.to_str().expect("utf8 path"), + "r", + ) + .await + .expect_err("missing eye device should time out"); + let rendered = format!("{err:#}"); + assert!(rendered.contains("was not ready within 50 ms")); + }); + }); + }); + } }