fix(server): wait for eye devices before preview startup

This commit is contained in:
Brad Stein 2026-04-15 11:37:43 -03:00
parent 0ab5ce82ed
commit ae46daa6f8
2 changed files with 208 additions and 60 deletions

View File

@ -2,15 +2,16 @@
use anyhow::Context;
use futures_util::Stream;
use gst::MessageView;
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use std::os::unix::fs::FileTypeExt;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use tokio::time::{Duration, Instant, sleep};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{Level, debug, enabled, error, info, trace, warn};
@ -25,6 +26,8 @@ static START: OnceLock<gst::ClockTime> = OnceLock::new();
pub struct VideoStream {
_pipeline: gst::Pipeline,
#[cfg(not(coverage))]
_bus_watch: Option<BusWatchHandle>,
inner: ReceiverStream<Result<VideoPacket, Status>>,
}
@ -42,9 +45,162 @@ impl Stream for VideoStream {
impl Drop for VideoStream {
fn drop(&mut self) {
let _ = self._pipeline.set_state(gst::State::Null);
#[cfg(not(coverage))]
{
let _ = self._bus_watch.take();
}
}
}
#[cfg(not(coverage))]
struct BusWatchHandle {
alive: Arc<AtomicBool>,
join: Option<std::thread::JoinHandle<()>>,
}
#[cfg(not(coverage))]
impl BusWatchHandle {
fn spawn(bus: gst::Bus, eye: String) -> Self {
let alive = Arc::new(AtomicBool::new(true));
let alive_flag = Arc::clone(&alive);
let join = std::thread::spawn(move || {
while alive_flag.load(Ordering::Relaxed) {
let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(250)) else {
continue;
};
match msg.view() {
Error(err) => {
error!(
target:"lesavka_server::video",
eye = %eye,
"💥 pipeline error: {} ({})",
err.error(),
err.debug().unwrap_or_default()
);
break;
}
Warning(warning) => {
warn!(
target:"lesavka_server::video",
eye = %eye,
"⚠️ pipeline warning: {} ({})",
warning.error(),
warning.debug().unwrap_or_default()
);
}
Info(info_msg) => {
info!(
target:"lesavka_server::video",
eye = %eye,
"📌 pipeline info: {} ({})",
info_msg.error(),
info_msg.debug().unwrap_or_default()
);
}
StateChanged(state) if state.current() == gst::State::Playing => {
debug!(target:"lesavka_server::video", eye = %eye, "🎬 pipeline PLAYING");
}
StateChanged(state) if state.current() == gst::State::Null => {
debug!(target:"lesavka_server::video", eye = %eye, "🛑 pipeline stopped");
break;
}
Eos(..) => {
debug!(target:"lesavka_server::video", eye = %eye, "🏁 pipeline EOS");
break;
}
_ => {}
}
}
});
Self {
alive,
join: Some(join),
}
}
}
#[cfg(not(coverage))]
impl Drop for BusWatchHandle {
fn drop(&mut self) {
self.alive.store(false, Ordering::Relaxed);
if let Some(join) = self.join.take() {
let _ = join.join();
}
}
}
#[cfg(not(coverage))]
fn start_eye_pipeline(pipeline: &gst::Pipeline, bus: &gst::Bus, eye: &str) -> anyhow::Result<()> {
pipeline
.set_state(gst::State::Playing)
.context(format!("🎥 starting video pipeline eye-{eye}"))?;
for _ in 0..20 {
match bus.timed_pop(gst::ClockTime::from_mseconds(200)) {
Some(msg) => match msg.view() {
Error(err) => {
let _ = pipeline.set_state(gst::State::Null);
return Err(anyhow::anyhow!(
"🎥 eye-{eye} pipeline error: {} ({})",
err.error(),
err.debug().unwrap_or_default()
));
}
StateChanged(state) if state.current() == gst::State::Playing => return Ok(()),
_ => continue,
},
None => continue,
}
}
Ok(())
}
#[cfg(not(coverage))]
fn eye_device_wait_timeout() -> Duration {
Duration::from_millis(
std::env::var("LESAVKA_EYE_DEVICE_WAIT_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.unwrap_or(5_000),
)
}
#[cfg(not(coverage))]
fn eye_device_wait_poll() -> Duration {
Duration::from_millis(
std::env::var("LESAVKA_EYE_DEVICE_POLL_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.map(|value| value.max(25))
.unwrap_or(100),
)
}
#[cfg(not(coverage))]
async fn wait_for_eye_device(dev: &str, eye: &str) -> anyhow::Result<()> {
let timeout = eye_device_wait_timeout();
let poll = eye_device_wait_poll();
let deadline = Instant::now() + timeout;
let last_detail = loop {
let detail = match tokio::fs::metadata(dev).await {
Ok(metadata) if metadata.file_type().is_char_device() => return Ok(()),
Ok(metadata) => format!("device exists but is not a character device ({metadata:?})"),
Err(err) => err.to_string(),
};
if Instant::now() >= deadline {
break detail;
}
sleep(poll).await;
};
Err(anyhow::anyhow!(
"🎥 eye-{eye} device {dev} was not ready within {} ms: {}",
timeout.as_millis(),
last_detail
))
}
/// Capture one eye stream from the local V4L2 gadget and expose it as a gRPC stream.
///
/// Inputs: the V4L2 device node, logical eye id, and negotiated bitrate cap.
@ -59,10 +215,9 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res
}
let coverage_override = std::env::var("LESAVKA_TEST_VIDEO_SOURCE").ok();
let use_test_src =
dev.eq_ignore_ascii_case("testsrc")
|| dev.eq_ignore_ascii_case("videotestsrc")
|| coverage_override.as_deref() == Some(dev);
let use_test_src = dev.eq_ignore_ascii_case("testsrc")
|| dev.eq_ignore_ascii_case("videotestsrc")
|| coverage_override.as_deref() == Some(dev);
if !use_test_src {
return Err(anyhow::anyhow!("video source unavailable"));
}
@ -79,6 +234,8 @@ pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Res
Ok(VideoStream {
_pipeline: pipeline,
#[cfg(not(coverage))]
_bus_watch: None,
inner: ReceiverStream::new(rx),
})
}
@ -114,6 +271,9 @@ pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Resu
let appsink_buffers = env_u32("LESAVKA_EYE_APPSINK_BUFFERS", 8).max(1);
let use_test_src =
dev.eq_ignore_ascii_case("testsrc") || dev.eq_ignore_ascii_case("videotestsrc");
if !use_test_src {
wait_for_eye_device(dev, eye).await?;
}
let desc = if use_test_src {
let test_bitrate = env_u32("LESAVKA_EYE_TESTSRC_KBIT", max_bitrate_kbit.max(800));
format!(
@ -148,7 +308,6 @@ pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Resu
let chan_capacity = env_usize("LESAVKA_EYE_CHAN_CAPACITY", 256).max(16);
let (tx, rx) = tokio::sync::mpsc::channel(chan_capacity);
let bus = pipeline.bus().expect("bus");
if let Some(src_pad) = pipeline
.by_name(&format!("cam_{eye}"))
.and_then(|element| element.static_pad("src"))
@ -165,45 +324,6 @@ pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Resu
warn!(target:"lesavka_server::video", eye = %eye, "🍪 cam_{eye} not found - skipping pad-probe");
}
let eye_clone = eye.to_owned();
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(err) => {
error!(
target:"lesavka_server::video",
eye = %eye_clone,
"💥 pipeline error: {} ({})",
err.error(),
err.debug().unwrap_or_default()
);
}
Warning(warning) => {
warn!(
target:"lesavka_server::video",
eye = %eye_clone,
"⚠️ pipeline warning: {} ({})",
warning.error(),
warning.debug().unwrap_or_default()
);
}
Info(info_msg) => {
info!(
target:"lesavka_server::video",
eye = %eye_clone,
"📌 pipeline info: {} ({})",
info_msg.error(),
info_msg.debug().unwrap_or_default()
);
}
StateChanged(state) if state.current() == gst::State::Playing => {
debug!(target:"lesavka_server::video", eye = %eye_clone, "🎬 pipeline PLAYING");
}
_ => {}
}
}
});
let eye_name = eye.to_string();
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
@ -321,24 +441,13 @@ pub async fn eye_ball(dev: &str, id: u32, max_bitrate_kbit: u32) -> anyhow::Resu
.build(),
);
pipeline
.set_state(gst::State::Playing)
.context("🎥 starting video pipeline eye-{eye}")?;
let bus = pipeline.bus().unwrap();
for _ in 0..20 {
match bus.timed_pop(gst::ClockTime::from_mseconds(200)) {
Some(msg)
if matches!(msg.view(), MessageView::StateChanged(state)
if state.current() == gst::State::Playing) =>
{
break;
}
Some(_) | None => continue,
}
}
let bus = pipeline.bus().expect("bus");
start_eye_pipeline(&pipeline, &bus, eye)?;
let bus_watch = BusWatchHandle::spawn(bus, eye.to_owned());
Ok(VideoStream {
_pipeline: pipeline,
_bus_watch: Some(bus_watch),
inner: ReceiverStream::new(rx),
})
}

View File

@ -46,6 +46,8 @@ mod video_include_contract {
let mut stream = VideoStream {
_pipeline: gst::Pipeline::new(),
#[cfg(not(coverage))]
_bus_watch: None,
inner: ReceiverStream::new(rx),
};
@ -66,6 +68,8 @@ mod video_include_contract {
let (_tx, rx) = tokio::sync::mpsc::channel(1);
let stream = VideoStream {
_pipeline: gst::Pipeline::new(),
#[cfg(not(coverage))]
_bus_watch: None,
inner: ReceiverStream::new(rx),
};
drop(stream);
@ -211,4 +215,39 @@ mod video_include_contract {
});
});
}
#[test]
#[serial]
fn wait_for_eye_device_accepts_existing_character_device() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_EYE_DEVICE_WAIT_MS", Some("50"), || {
rt.block_on(async {
wait_for_eye_device("/dev/null", "l")
.await
.expect("/dev/null should be ready");
});
});
}
#[test]
#[serial]
fn wait_for_eye_device_times_out_for_missing_path() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
let dir = tempfile::tempdir().expect("tempdir");
let missing = dir.path().join("lesavka-eye-missing");
with_var("LESAVKA_EYE_DEVICE_WAIT_MS", Some("50"), || {
with_var("LESAVKA_EYE_DEVICE_POLL_MS", Some("25"), || {
rt.block_on(async {
let err = wait_for_eye_device(
missing.to_str().expect("utf8 path"),
"r",
)
.await
.expect_err("missing eye device should time out");
let rendered = format!("{err:#}");
assert!(rendered.contains("was not ready within 50 ms"));
});
});
});
}
}