diff --git a/Cargo.lock b/Cargo.lock index 5baf3ff..5741221 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.12" +version = "0.22.13" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.12" +version = "0.22.13" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.12" +version = "0.22.13" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 845526d..ef2cfe1 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.12" +version = "0.22.13" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 22f5950..43cee7e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.12" +version = "0.22.13" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index d08d88e..b845afd 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.12" +version = "0.22.13" edition = "2024" autobins = false diff --git a/server/src/main.rs b/server/src/main.rs index c047dc0..b90e128 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -97,7 +97,7 @@ struct EyeHub { tx: broadcast::Sender, running: Arc, subscribers: Arc, - abort: tokio::task::AbortHandle, + task: Arc>>>, } include!("main/handler_startup.rs"); diff --git a/server/src/main/eye_hub.rs b/server/src/main/eye_hub.rs index dd7faa4..8657166 100644 --- a/server/src/main/eye_hub.rs +++ b/server/src/main/eye_hub.rs @@ -35,20 +35,40 @@ impl EyeHub { running_for_task.store(false, Ordering::Relaxed); }); - Arc::new(Self { tx: tx.clone(), running: Arc::clone(&running), subscribers: Arc::clone(&subscribers), - abort: task.abort_handle(), + task: Arc::new(Mutex::new(Some(task))), }) } fn shutdown(&self) { if self.running.swap(false, Ordering::AcqRel) { - self.abort.abort(); + if let Ok(mut task) = self.task.try_lock() + && let Some(task) = task.take() + { + task.abort(); + } } } + + async fn shutdown_and_wait(&self, timeout: Duration) { + self.running.store(false, Ordering::Release); + let task = self.task.lock().await.take(); + if let Some(task) = task { + task.abort(); + let _ = tokio::time::timeout(timeout, task).await; + } + } +} + +fn eye_hub_reopen_grace() -> Duration { + std::env::var("LESAVKA_EYE_HUB_REOPEN_GRACE_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .map(Duration::from_millis) + .unwrap_or_else(|| Duration::from_millis(250)) } fn take_conflicting_eye_hubs( diff --git a/server/src/main/eye_video.rs b/server/src/main/eye_video.rs index 8f81e20..eab0437 100644 --- a/server/src/main/eye_video.rs +++ b/server/src/main/eye_video.rs @@ -89,7 +89,7 @@ impl Handler { ); } for hub in stale_hubs { - hub.shutdown(); + hub.shutdown_and_wait(eye_hub_reopen_grace()).await; } let lease = self.capture_power.acquire().await; @@ -147,6 +147,6 @@ async fn forward_eye_hub_packets( } } if subscribers.fetch_sub(1, Ordering::AcqRel) == 1 { - hub_for_task.shutdown(); + hub_for_task.shutdown_and_wait(eye_hub_reopen_grace()).await; } } diff --git a/tests/contract/server/main/server_main_eye_hub_contract.rs b/tests/contract/server/main/server_main_eye_hub_contract.rs index 2531e81..2bfe765 100644 --- a/tests/contract/server/main/server_main_eye_hub_contract.rs +++ b/tests/contract/server/main/server_main_eye_hub_contract.rs @@ -135,6 +135,38 @@ mod server_main_eye_hub { }); } + #[test] + fn eye_hub_shutdown_and_wait_releases_pending_stream_before_reopen() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + with_capture_power_disabled(|| { + rt.block_on(async { + let hub = EyeHub::spawn( + stream::pending::>(), + CapturePowerManager::new().acquire().await, + ); + hub.shutdown_and_wait(std::time::Duration::from_millis(100)) + .await; + assert!( + !hub.running.load(std::sync::atomic::Ordering::Relaxed), + "conflicting eye hubs must be fully shut down before reopening the same V4L2 card" + ); + }); + }); + } + + #[test] + fn eye_hub_reopen_grace_defaults_to_v4l2_release_window() { + with_var("LESAVKA_EYE_HUB_REOPEN_GRACE_MS", None::<&str>, || { + assert_eq!( + eye_hub_reopen_grace(), + std::time::Duration::from_millis(250) + ); + }); + with_var("LESAVKA_EYE_HUB_REOPEN_GRACE_MS", Some("75"), || { + assert_eq!(eye_hub_reopen_grace(), std::time::Duration::from_millis(75)); + }); + } + #[test] #[cfg(coverage)] fn eye_hub_forwarder_handles_dropped_downstream() {