server: wait for eye hub release before reopen

This commit is contained in:
Brad Stein 2026-05-12 11:32:21 -03:00
parent 38df734d42
commit 38ef8327d3
8 changed files with 64 additions and 12 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.22.12"
version = "0.22.13"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.22.12"
version = "0.22.13"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.22.12"
version = "0.22.13"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.22.12"
version = "0.22.13"
edition = "2024"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.22.12"
version = "0.22.13"
edition = "2024"
build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.22.12"
version = "0.22.13"
edition = "2024"
autobins = false

View File

@ -97,7 +97,7 @@ struct EyeHub {
tx: broadcast::Sender<VideoPacket>,
running: Arc<AtomicBool>,
subscribers: Arc<AtomicUsize>,
abort: tokio::task::AbortHandle,
task: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
include!("main/handler_startup.rs");

View File

@ -35,20 +35,40 @@ impl EyeHub {
running_for_task.store(false, Ordering::Relaxed);
});
Arc::new(Self {
tx: tx.clone(),
running: Arc::clone(&running),
subscribers: Arc::clone(&subscribers),
abort: task.abort_handle(),
task: Arc::new(Mutex::new(Some(task))),
})
}
fn shutdown(&self) {
if self.running.swap(false, Ordering::AcqRel) {
self.abort.abort();
if let Ok(mut task) = self.task.try_lock()
&& let Some(task) = task.take()
{
task.abort();
}
}
}
async fn shutdown_and_wait(&self, timeout: Duration) {
self.running.store(false, Ordering::Release);
let task = self.task.lock().await.take();
if let Some(task) = task {
task.abort();
let _ = tokio::time::timeout(timeout, task).await;
}
}
}
fn eye_hub_reopen_grace() -> Duration {
std::env::var("LESAVKA_EYE_HUB_REOPEN_GRACE_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(250))
}
fn take_conflicting_eye_hubs(

View File

@ -89,7 +89,7 @@ impl Handler {
);
}
for hub in stale_hubs {
hub.shutdown();
hub.shutdown_and_wait(eye_hub_reopen_grace()).await;
}
let lease = self.capture_power.acquire().await;
@ -147,6 +147,6 @@ async fn forward_eye_hub_packets(
}
}
if subscribers.fetch_sub(1, Ordering::AcqRel) == 1 {
hub_for_task.shutdown();
hub_for_task.shutdown_and_wait(eye_hub_reopen_grace()).await;
}
}

View File

@ -135,6 +135,38 @@ mod server_main_eye_hub {
});
}
#[test]
fn eye_hub_shutdown_and_wait_releases_pending_stream_before_reopen() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_capture_power_disabled(|| {
rt.block_on(async {
let hub = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
hub.shutdown_and_wait(std::time::Duration::from_millis(100))
.await;
assert!(
!hub.running.load(std::sync::atomic::Ordering::Relaxed),
"conflicting eye hubs must be fully shut down before reopening the same V4L2 card"
);
});
});
}
#[test]
fn eye_hub_reopen_grace_defaults_to_v4l2_release_window() {
with_var("LESAVKA_EYE_HUB_REOPEN_GRACE_MS", None::<&str>, || {
assert_eq!(
eye_hub_reopen_grace(),
std::time::Duration::from_millis(250)
);
});
with_var("LESAVKA_EYE_HUB_REOPEN_GRACE_MS", Some("75"), || {
assert_eq!(eye_hub_reopen_grace(), std::time::Duration::from_millis(75));
});
}
#[test]
#[cfg(coverage)]
fn eye_hub_forwarder_handles_dropped_downstream() {