From e0a472b714cdf29fc76f7a60e3415ea7e1bdd556 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 19 Apr 2026 14:14:14 -0300 Subject: [PATCH] lesavka: release source hub switches cleanly --- client/Cargo.toml | 2 +- client/src/launcher/state.rs | 49 ++++++++-- client/src/launcher/ui.rs | 2 +- client/src/launcher/ui_components.rs | 4 +- common/Cargo.toml | 2 +- common/src/cli.rs | 2 +- server/Cargo.toml | 2 +- server/src/main.rs | 94 +++++++++++++++---- .../server_main_binary_extra_contract.rs | 71 ++++++++++++++ 9 files changed, 196 insertions(+), 32 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 2aa382d..b5f08da 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.11.14" +version = "0.11.15" edition = "2024" [dependencies] diff --git a/client/src/launcher/state.rs b/client/src/launcher/state.rs index afdb7d8..e71d7c4 100644 --- a/client/src/launcher/state.rs +++ b/client/src/launcher/state.rs @@ -565,6 +565,17 @@ impl LauncherState { .map(|source_id| self.capture_size_choice(source_id)) } + pub fn effective_preview_source_size(&self, monitor_id: usize) -> PreviewSourceSize { + let capture = self + .display_capture_size_choice(monitor_id) + .unwrap_or_else(|| self.capture_size_choice(monitor_id)); + PreviewSourceSize { + width: capture.width.max(1) as u32, + height: capture.height.max(1) as u32, + fps: capture.fps.max(1), + } + } + pub fn capture_size_options(&self) -> Vec { capture_size_options(self.preview_source) } @@ -594,16 +605,16 @@ impl LauncherState { breakout_size_choice( self.breakout_limit, self.breakout_display, - self.preview_source, + self.effective_preview_source_size(monitor_id), self.breakout_size_preset(monitor_id), ) } - pub fn breakout_size_options(&self) -> Vec { + pub fn breakout_size_options(&self, monitor_id: usize) -> Vec { breakout_size_options( self.breakout_limit, self.breakout_display, - self.preview_source, + self.effective_preview_source_size(monitor_id), ) } @@ -1008,6 +1019,23 @@ mod tests { assert!(state.display_capture_size_choice(0).is_none()); } + #[test] + fn mirrored_panes_use_their_effective_source_size_for_breakout_source_labels() { + let mut state = LauncherState::new(); + state.set_capture_size_preset(1, CaptureSizePreset::P720); + state.set_feed_source_preset(0, FeedSourcePreset::OtherEye); + + let mirrored_source = state.effective_preview_source_size(0); + assert_eq!(mirrored_source.width, 1280); + assert_eq!(mirrored_source.height, 720); + assert_eq!(mirrored_source.fps, 60); + + let mirrored_breakout = state.breakout_size_choice(0); + assert_eq!(mirrored_breakout.preset, BreakoutSizePreset::Source); + assert_eq!(mirrored_breakout.width, 1280); + assert_eq!(mirrored_breakout.height, 720); + } + #[test] fn selecting_auto_or_blank_clears_explicit_device() { let mut state = LauncherState::new(); @@ -1127,9 +1155,14 @@ mod tests { assert_eq!(compact_capture.fps, 60); assert_eq!(compact_capture.max_bitrate_kbit, 2_500); + let effective_source = state.effective_preview_source_size(0); + assert_eq!(effective_source.width, 720); + assert_eq!(effective_source.height, 480); + assert_eq!(effective_source.fps, 60); + let display = state.breakout_size_choice(0); - assert_eq!(display.width, 1920); - assert_eq!(display.height, 1080); + assert_eq!(display.width, 852); + assert_eq!(display.height, 480); state.set_breakout_size_preset(0, BreakoutSizePreset::P360); let smaller = state.breakout_size_choice(0); @@ -1149,12 +1182,12 @@ mod tests { assert_eq!(capture_options[0].fps, 60); assert_eq!(capture_options[0].max_bitrate_kbit, 18_000); - let breakout_options = state.breakout_size_options(); + let breakout_options = state.breakout_size_options(0); assert!(breakout_options.len() >= 5); assert!(breakout_options.iter().any(|choice| { choice.preset == BreakoutSizePreset::Source - && choice.width == 1920 - && choice.height == 1080 + && choice.width == 852 + && choice.height == 480 })); } diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index 8adf7b2..9fcb34c 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -256,7 +256,7 @@ fn refresh_eye_feed_controls( } super::ui_components::sync_breakout_size_combo( &widgets.display_panes[monitor_id].breakout_combo, - state.breakout_size_options(), + state.breakout_size_options(monitor_id), state.breakout_size_preset(monitor_id), ); } diff --git a/client/src/launcher/ui_components.rs b/client/src/launcher/ui_components.rs index c987aa0..ffc3798 100644 --- a/client/src/launcher/ui_components.rs +++ b/client/src/launcher/ui_components.rs @@ -639,12 +639,12 @@ pub fn build_launcher_view( } sync_breakout_size_combo( &left_pane.breakout_combo, - state.breakout_size_options(), + state.breakout_size_options(0), state.breakout_size_preset(0), ); sync_breakout_size_combo( &right_pane.breakout_combo, - state.breakout_size_options(), + state.breakout_size_options(1), state.breakout_size_preset(1), ); diff --git a/common/Cargo.toml b/common/Cargo.toml index 06c2151..30fca70 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.11.14" +version = "0.11.15" edition = "2024" build = "build.rs" diff --git a/common/src/cli.rs b/common/src/cli.rs index 81ee032..4107760 100644 --- a/common/src/cli.rs +++ b/common/src/cli.rs @@ -17,6 +17,6 @@ mod tests { #[test] fn banner_includes_version() { - assert_eq!(banner("0.11.14"), "lesavka-common CLI (v0.11.14)"); + assert_eq!(banner("0.11.15"), "lesavka-common CLI (v0.11.15)"); } } diff --git a/server/Cargo.toml b/server/Cargo.toml index b28b6b3..c985af2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.11.14" +version = "0.11.15" edition = "2024" autobins = false diff --git a/server/src/main.rs b/server/src/main.rs index 3168643..bc61d18 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -68,6 +68,7 @@ struct EyeHub { tx: broadcast::Sender, running: Arc, subscribers: Arc, + abort: tokio::task::AbortHandle, } impl Handler { @@ -160,6 +161,7 @@ impl Handler { let mut hub_rx = hub.tx.subscribe(); hub.subscribers.fetch_add(1, Ordering::AcqRel); let subscribers = Arc::clone(&hub.subscribers); + let hub_for_task = Arc::clone(&hub); let (tx, rx) = tokio::sync::mpsc::channel(32); tokio::spawn(async move { loop { @@ -174,7 +176,9 @@ impl Handler { Err(broadcast::error::RecvError::Closed) => break, } } - subscribers.fetch_sub(1, Ordering::AcqRel); + if subscribers.fetch_sub(1, Ordering::AcqRel) == 1 { + hub_for_task.shutdown(); + } }); Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) @@ -186,11 +190,27 @@ impl Handler { key: EyeHubKey, max_bitrate_kbit: u32, ) -> Result, Status> { - let mut hubs = self.eye_hubs.lock().await; - if let Some(hub) = hubs.get(&key) - && hub.running.load(Ordering::Relaxed) - { - return Ok(Arc::clone(hub)); + let stale_hubs = { + let mut hubs = self.eye_hubs.lock().await; + if let Some(hub) = hubs.get(&key) + && hub.running.load(Ordering::Relaxed) + { + return Ok(Arc::clone(hub)); + } + take_conflicting_eye_hubs(&mut hubs, key) + }; + if !stale_hubs.is_empty() { + info!( + source_id = key.source_id, + requested_width = key.requested_width, + requested_height = key.requested_height, + requested_fps = key.requested_fps, + stale_hubs = stale_hubs.len(), + "🎥 replacing stale/conflicting eye hubs before opening the source" + ); + } + for hub in stale_hubs { + hub.shutdown(); } let lease = self.capture_power.acquire().await; @@ -206,6 +226,13 @@ impl Handler { .map_err(|e| Status::internal(format!("{e:#}")))?; let hub = EyeHub::spawn(stream, lease); + let mut hubs = self.eye_hubs.lock().await; + if let Some(existing) = hubs.get(&key) + && existing.running.load(Ordering::Relaxed) + { + hub.shutdown(); + return Ok(Arc::clone(existing)); + } hubs.insert(key, Arc::clone(&hub)); Ok(hub) } @@ -289,20 +316,17 @@ impl EyeHub { let (tx, _) = broadcast::channel(32); let running = Arc::new(AtomicBool::new(true)); let subscribers = Arc::new(AtomicUsize::new(0)); - let hub = Arc::new(Self { - tx: tx.clone(), - running: Arc::clone(&running), - subscribers: Arc::clone(&subscribers), - }); - - tokio::spawn(async move { + let tx_for_task = tx.clone(); + let running_for_task = Arc::clone(&running); + let subscribers_for_task = Arc::clone(&subscribers); + let task = tokio::spawn(async move { let _lease = lease; let mut idle_ticks = 0_u32; - while running.load(Ordering::Relaxed) { + while running_for_task.load(Ordering::Relaxed) { match stream.next().await { Some(Ok(pkt)) => { - let _ = tx.send(pkt); - if subscribers.load(Ordering::Relaxed) == 0 { + let _ = tx_for_task.send(pkt); + if subscribers_for_task.load(Ordering::Relaxed) == 0 { idle_ticks = idle_ticks.saturating_add(1); if idle_ticks >= 60 { break; @@ -318,11 +342,47 @@ impl EyeHub { None => break, } } - running.store(false, Ordering::Relaxed); + running_for_task.store(false, Ordering::Relaxed); + }); + let hub = Arc::new(Self { + tx: tx.clone(), + running: Arc::clone(&running), + subscribers: Arc::clone(&subscribers), + abort: task.abort_handle(), }); hub } + + fn shutdown(&self) { + if self.running.swap(false, Ordering::AcqRel) { + self.abort.abort(); + } + } +} + +fn take_conflicting_eye_hubs( + hubs: &mut HashMap>, + key: EyeHubKey, +) -> Vec> { + let stale_keys: Vec<_> = hubs + .iter() + .filter_map(|(existing_key, hub)| { + let running = hub.running.load(Ordering::Relaxed); + let conflicting_source = + existing_key.source_id == key.source_id && *existing_key != key; + if !running || conflicting_source { + Some(*existing_key) + } else { + None + } + }) + .collect(); + + stale_keys + .into_iter() + .filter_map(|existing_key| hubs.remove(&existing_key)) + .collect() } /*──────────────── gRPC service ─────────────*/ diff --git a/testing/tests/server_main_binary_extra_contract.rs b/testing/tests/server_main_binary_extra_contract.rs index 5c551d5..c10d54c 100644 --- a/testing/tests/server_main_binary_extra_contract.rs +++ b/testing/tests/server_main_binary_extra_contract.rs @@ -497,4 +497,75 @@ mod server_main_binary_extra { assert_eq!(observed.data, packet.data); }); } + + #[test] + fn conflicting_eye_hubs_for_the_same_source_are_pruned_before_reopen() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + rt.block_on(async { + let requested_key = EyeHubKey { + source_id: 1, + requested_width: 1280, + requested_height: 720, + requested_fps: 60, + }; + let stale_same_source_key = EyeHubKey { + source_id: 1, + requested_width: 1920, + requested_height: 1080, + requested_fps: 60, + }; + let keep_other_source_key = EyeHubKey { + source_id: 0, + requested_width: 1920, + requested_height: 1080, + requested_fps: 60, + }; + let stale_same_source = EyeHub::spawn( + stream::pending::>(), + CapturePowerManager::new().acquire().await, + ); + let stopped_other_source = EyeHub::spawn( + stream::pending::>(), + CapturePowerManager::new().acquire().await, + ); + stopped_other_source.shutdown(); + let keep_other_source = EyeHub::spawn( + stream::pending::>(), + CapturePowerManager::new().acquire().await, + ); + + let mut hubs = std::collections::HashMap::new(); + hubs.insert(stale_same_source_key, stale_same_source.clone()); + hubs.insert( + EyeHubKey { + source_id: 0, + requested_width: 1280, + requested_height: 720, + requested_fps: 60, + }, + stopped_other_source, + ); + hubs.insert(keep_other_source_key, keep_other_source.clone()); + + let removed = take_conflicting_eye_hubs(&mut hubs, requested_key); + + assert_eq!(removed.len(), 2); + assert!(!hubs.contains_key(&stale_same_source_key)); + assert!(hubs.contains_key(&keep_other_source_key)); + }); + } + + #[test] + fn eye_hub_shutdown_marks_the_hub_as_not_running() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + rt.block_on(async { + let hub = EyeHub::spawn( + stream::pending::>(), + CapturePowerManager::new().acquire().await, + ); + assert!(hub.running.load(std::sync::atomic::Ordering::Relaxed)); + hub.shutdown(); + assert!(!hub.running.load(std::sync::atomic::Ordering::Relaxed)); + }); + } }