From 4c505be4b2f3ce533a54d0c68b38e1f5a97c25b0 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 17 Apr 2026 04:35:41 -0300 Subject: [PATCH] lesavka: harden local handoff and launcher shutdown --- client/Cargo.toml | 2 +- client/src/app.rs | 52 +++++++- client/src/input/inputs.rs | 13 ++ client/src/launcher/preview.rs | 13 ++ client/src/launcher/ui.rs | 54 +++++++- client/src/launcher/ui_runtime.rs | 133 +++++++++++++++++++ common/Cargo.toml | 2 +- common/src/cli.rs | 2 +- server/Cargo.toml | 2 +- testing/tests/client_app_include_contract.rs | 105 +++++++++++++-- 10 files changed, 353 insertions(+), 25 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 9aa5cc8..606b0d4 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.8.0" +version = "0.8.1" edition = "2024" [dependencies] diff --git a/client/src/app.rs b/client/src/app.rs index 7bdbc74..dabc6b1 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -2,7 +2,7 @@ use anyhow::Result; use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::time::Duration; use tokio::sync::{broadcast, mpsc}; use tokio_stream::{ @@ -37,6 +37,7 @@ pub struct LesavkaClientApp { kbd_tx: broadcast::Sender, mou_tx: broadcast::Sender, paste_rx: Option>, + remote_capture_enabled: Arc, } impl LesavkaClientApp { @@ -54,16 +55,18 @@ impl LesavkaClientApp { let (mou_tx, _) = broadcast::channel(4096); let (paste_tx, paste_rx) = mpsc::unbounded_channel(); - let agg = if headless { - None + let (agg, remote_capture_enabled) = if headless { + (None, Arc::new(AtomicBool::new(false))) } else { - Some(InputAggregator::new_with_capture_mode( + let aggregator = InputAggregator::new_with_capture_mode( dev_mode, kbd_tx.clone(), mou_tx.clone(), Some(paste_tx), capture_remote_boot, - )) + ); + let remote_capture_enabled = aggregator.remote_capture_enabled_handle(); + (Some(aggregator), remote_capture_enabled) }; Ok(Self { @@ -74,6 +77,7 @@ impl LesavkaClientApp { kbd_tx, mou_tx, paste_rx: Some(paste_rx), + remote_capture_enabled, }) } @@ -340,9 +344,18 @@ impl LesavkaClientApp { loop { info!("⌨️🤙 Keyboard dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); + let capture_enabled = Arc::clone(&self.remote_capture_enabled); + let mut remote_capture_was_enabled = capture_enabled.load(Ordering::Relaxed); let outbound = - BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(keyboard_stream_report); + BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(move |report| { + let remote_capture_enabled = capture_enabled.load(Ordering::Relaxed); + keyboard_stream_report( + report, + remote_capture_enabled, + &mut remote_capture_was_enabled, + ) + }); match cli.stream_keyboard(Request::new(outbound)).await { Ok(mut resp) => { @@ -365,9 +378,18 @@ impl LesavkaClientApp { loop { info!("🖱️🤙 Mouse dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); + let capture_enabled = Arc::clone(&self.remote_capture_enabled); + let mut remote_capture_was_enabled = capture_enabled.load(Ordering::Relaxed); let outbound = - BroadcastStream::new(self.mou_tx.subscribe()).filter_map(mouse_stream_report); + BroadcastStream::new(self.mou_tx.subscribe()).filter_map(move |report| { + let remote_capture_enabled = capture_enabled.load(Ordering::Relaxed); + mouse_stream_report( + report, + remote_capture_enabled, + &mut remote_capture_was_enabled, + ) + }); match cli.stream_mouse(Request::new(outbound)).await { Ok(mut resp) => { @@ -561,7 +583,15 @@ impl LesavkaClientApp { pub(crate) fn keyboard_stream_report( report: Result, + remote_capture_enabled: bool, + remote_capture_was_enabled: &mut bool, ) -> Option { + if !remote_capture_enabled { + let emit_reset = *remote_capture_was_enabled; + *remote_capture_was_enabled = false; + return emit_reset.then_some(KeyboardReport { data: vec![0; 8] }); + } + *remote_capture_was_enabled = true; match report { Ok(report) => Some(report), Err(BroadcastStreamRecvError::Lagged(skipped)) => { @@ -576,7 +606,15 @@ pub(crate) fn keyboard_stream_report( pub(crate) fn mouse_stream_report( report: Result, + remote_capture_enabled: bool, + remote_capture_was_enabled: &mut bool, ) -> Option { + if !remote_capture_enabled { + let emit_reset = *remote_capture_was_enabled; + *remote_capture_was_enabled = false; + return emit_reset.then_some(MouseReport { data: vec![0; 4] }); + } + *remote_capture_was_enabled = true; match report { Ok(report) => Some(report), Err(BroadcastStreamRecvError::Lagged(skipped)) => { diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index c8a90dd..ae95df3 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -7,6 +7,10 @@ use evdev::{AbsoluteAxisCode, Device, EventType, KeyCode, RelativeAxisCode}; use std::collections::HashSet; #[cfg(not(coverage))] use std::path::{Path, PathBuf}; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; use std::time::Instant; use tokio::{ sync::broadcast::Sender, @@ -55,6 +59,7 @@ pub struct InputAggregator { routing_state_path: Option, #[cfg(not(coverage))] published_remote_capture: Option, + remote_capture_enabled: Arc, } impl InputAggregator { @@ -120,9 +125,14 @@ impl InputAggregator { routing_state_path, #[cfg(not(coverage))] published_remote_capture: None, + remote_capture_enabled: Arc::new(AtomicBool::new(capture_remote_boot)), } } + pub fn remote_capture_enabled_handle(&self) -> Arc { + Arc::clone(&self.remote_capture_enabled) + } + #[cfg(coverage)] pub fn init(&mut self) -> Result<()> { let paths = std::fs::read_dir("/dev/input").context("Failed to read /dev/input")?; @@ -360,6 +370,7 @@ impl InputAggregator { } if want_kill && !self.pending_kill { warn!("🧙 magic chord - killing 🪄 AVADA KEDAVRA!!! 💥💀⚰️"); + self.remote_capture_enabled.store(false, Ordering::Relaxed); for k in &mut self.keyboards { k.send_empty_report(); k.set_send(false); @@ -433,6 +444,7 @@ impl InputAggregator { } fn enable_remote_capture(&mut self) { + self.remote_capture_enabled.store(true, Ordering::Relaxed); for k in &mut self.keyboards { k.reset_state(); k.set_send(true); @@ -449,6 +461,7 @@ impl InputAggregator { } fn begin_local_release(&mut self) { + self.remote_capture_enabled.store(false, Ordering::Relaxed); for k in &mut self.keyboards { k.send_empty_report(); k.set_send(false); diff --git a/client/src/launcher/preview.rs b/client/src/launcher/preview.rs index 1874627..4689949 100644 --- a/client/src/launcher/preview.rs +++ b/client/src/launcher/preview.rs @@ -187,6 +187,19 @@ impl LauncherPreview { } } + pub fn shutdown_all(&self) { + if let Ok(feeds) = self.inline_feeds.lock() { + for feed in feeds.iter() { + feed.shutdown(); + } + } + if let Ok(feeds) = self.window_feeds.lock() { + for feed in feeds.iter() { + feed.shutdown(); + } + } + } + pub fn install_on_picture( &self, monitor_id: usize, diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index 39debb4..e6be2e4 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -20,8 +20,8 @@ use { input_control_path, input_state_path, next_input_routing, open_diagnostics_popout, open_popout_window, open_session_log_popout, path_marker, present_popout_windows, read_input_routing_state, reap_exited_child, refresh_launcher_ui, refresh_test_buttons, - routing_name, selected_combo_value, selected_server_addr, spawn_client_process, - stop_child_process, toggle_key_label, update_test_action_result, + routing_name, selected_combo_value, selected_server_addr, shutdown_launcher_runtime, + spawn_client_process, stop_child_process, toggle_key_label, update_test_action_result, write_input_routing_request, }, crate::handshake::{HandshakeProbe, probe}, @@ -519,6 +519,56 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let popouts = Rc::clone(&view.popouts); let diagnostics_popout = Rc::clone(&view.diagnostics_popout); let log_popout = Rc::clone(&view.log_popout); + let shutdown_cleaned = Rc::new(Cell::new(false)); + + { + let shutdown_cleaned = Rc::clone(&shutdown_cleaned); + let child_proc = Rc::clone(&child_proc); + let tests = Rc::clone(&tests); + let preview = preview.clone(); + let widgets = widgets.clone(); + let popouts = Rc::clone(&popouts); + let diagnostics_popout = Rc::clone(&diagnostics_popout); + let log_popout = Rc::clone(&log_popout); + window.connect_close_request(move |_| { + if !shutdown_cleaned.replace(true) { + shutdown_launcher_runtime( + &child_proc, + &tests, + preview.as_deref(), + &widgets, + &popouts, + &diagnostics_popout, + &log_popout, + ); + } + glib::Propagation::Proceed + }); + } + + { + let shutdown_cleaned = Rc::clone(&shutdown_cleaned); + let child_proc = Rc::clone(&child_proc); + let tests = Rc::clone(&tests); + let preview = preview.clone(); + let widgets = widgets.clone(); + let popouts = Rc::clone(&popouts); + let diagnostics_popout = Rc::clone(&diagnostics_popout); + let log_popout = Rc::clone(&log_popout); + app.connect_shutdown(move |_| { + if !shutdown_cleaned.replace(true) { + shutdown_launcher_runtime( + &child_proc, + &tests, + preview.as_deref(), + &widgets, + &popouts, + &diagnostics_popout, + &log_popout, + ); + } + }); + } { let mut tests = tests.borrow_mut(); diff --git a/client/src/launcher/ui_runtime.rs b/client/src/launcher/ui_runtime.rs index 41be295..718040d 100644 --- a/client/src/launcher/ui_runtime.rs +++ b/client/src/launcher/ui_runtime.rs @@ -982,6 +982,59 @@ pub fn stop_child_process(child_proc: &Rc>>) { } } +pub fn shutdown_launcher_runtime( + child_proc: &Rc>>, + tests: &Rc>, + preview: Option<&LauncherPreview>, + widgets: &LauncherWidgets, + popouts: &Rc; 2]>>, + diagnostics_popout: &Rc>>, + log_popout: &Rc>>, +) { + stop_child_process(child_proc); + tests.borrow_mut().stop_all(); + + if let Some(preview) = preview { + preview.set_session_active(false); + preview.shutdown_all(); + } + + for pane in &widgets.display_panes { + if let Some(binding) = pane.preview_binding.borrow_mut().take() { + binding.close(); + } + pane.picture.set_paintable(Option::<&gdk::Paintable>::None); + pane.stream_status.set_text(""); + } + + let mut detached_popouts = Vec::new(); + { + let mut slots = popouts.borrow_mut(); + for slot in slots.iter_mut() { + if let Some(handle) = slot.take() { + detached_popouts.push(handle); + } + } + } + for handle in detached_popouts { + handle.binding.close(); + handle + .picture + .set_paintable(Option::<&gdk::Paintable>::None); + handle.window.set_child(Option::<>k::Widget>::None); + handle.window.hide(); + } + + if let Some(window) = diagnostics_popout.borrow_mut().take() { + window.set_child(Option::<>k::Widget>::None); + window.hide(); + } + if let Some(window) = log_popout.borrow_mut().take() { + window.set_child(Option::<>k::Widget>::None); + window.hide(); + } +} + pub fn reap_exited_child(child_proc: &Rc>>) -> bool { let mut slot = child_proc.borrow_mut(); match slot.as_mut() { @@ -1223,4 +1276,84 @@ mod tests { assert!(popouts.borrow().iter().all(|handle| handle.is_none())); assert_eq!(state.borrow().display_surface(0), DisplaySurface::Preview); } + + #[test] + fn shutdown_launcher_runtime_closes_preview_bindings_and_popouts() { + if gtk::init().is_err() || gtk::gdk::Display::default().is_none() { + return; + } + + let app = gtk::Application::builder() + .application_id("dev.lesavka.test-shutdown") + .build(); + let _ = app.register(None::<>k::gio::Cancellable>); + + let state = Rc::new(RefCell::new(LauncherState::new())); + let state_snapshot = state.borrow().clone(); + let view = build_launcher_view( + &app, + "http://127.0.0.1:50051", + &DeviceCatalog::default(), + &state_snapshot, + ); + let child_proc = Rc::new(RefCell::new(None::)); + let tests = Rc::new(RefCell::new(DeviceTestController::new())); + + let left_binding = PreviewBinding::test_stub(); + let right_binding = PreviewBinding::test_stub(); + *view.widgets.display_panes[0].preview_binding.borrow_mut() = Some(left_binding.clone()); + *view.widgets.display_panes[1].preview_binding.borrow_mut() = Some(right_binding.clone()); + + { + let mut popouts = view.popouts.borrow_mut(); + popouts[0] = Some(PopoutWindowHandle { + window: gtk::ApplicationWindow::builder() + .application(&app) + .title("Left") + .build(), + picture: gtk::Picture::new(), + status_label: gtk::Label::new(None), + binding: PreviewBinding::test_stub(), + }); + } + + *view.diagnostics_popout.borrow_mut() = Some( + gtk::ApplicationWindow::builder() + .application(&app) + .title("Diagnostics") + .build(), + ); + *view.log_popout.borrow_mut() = Some( + gtk::ApplicationWindow::builder() + .application(&app) + .title("Log") + .build(), + ); + + shutdown_launcher_runtime( + &child_proc, + &tests, + None, + &view.widgets, + &view.popouts, + &view.diagnostics_popout, + &view.log_popout, + ); + + assert!(view.popouts.borrow().iter().all(|handle| handle.is_none())); + assert!( + view.widgets.display_panes[0] + .preview_binding + .borrow() + .is_none() + ); + assert!( + view.widgets.display_panes[1] + .preview_binding + .borrow() + .is_none() + ); + assert!(view.diagnostics_popout.borrow().is_none()); + assert!(view.log_popout.borrow().is_none()); + } } diff --git a/common/Cargo.toml b/common/Cargo.toml index 89aa4f7..912958e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.8.0" +version = "0.8.1" edition = "2024" build = "build.rs" diff --git a/common/src/cli.rs b/common/src/cli.rs index b4cdbe1..0a44b77 100644 --- a/common/src/cli.rs +++ b/common/src/cli.rs @@ -17,6 +17,6 @@ mod tests { #[test] fn banner_includes_version() { - assert_eq!(banner("0.8.0"), "lesavka-common CLI (v0.8.0)"); + assert_eq!(banner("0.8.1"), "lesavka-common CLI (v0.8.1)"); } } diff --git a/server/Cargo.toml b/server/Cargo.toml index e86e411..b013078 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.8.0" +version = "0.8.1" edition = "2024" autobins = false diff --git a/testing/tests/client_app_include_contract.rs b/testing/tests/client_app_include_contract.rs index c8568c7..9880689 100644 --- a/testing/tests/client_app_include_contract.rs +++ b/testing/tests/client_app_include_contract.rs @@ -102,6 +102,7 @@ mod input { pub mod inputs { use lesavka_common::lesavka::{KeyboardReport, MouseReport}; + use std::sync::{Arc, atomic::AtomicBool}; use tokio::sync::{broadcast::Sender, mpsc::UnboundedSender}; pub struct InputAggregator { @@ -109,6 +110,7 @@ mod input { _mou_tx: Sender, _dev_mode: bool, _paste_tx: Option>, + remote_capture_enabled: Arc, } impl InputAggregator { @@ -123,6 +125,7 @@ mod input { _mou_tx: mou_tx, _dev_mode: dev_mode, _paste_tx: paste_tx, + remote_capture_enabled: Arc::new(AtomicBool::new(true)), } } @@ -131,9 +134,13 @@ mod input { kbd_tx: Sender, mou_tx: Sender, paste_tx: Option>, - _capture_remote_boot: bool, + capture_remote_boot: bool, ) -> Self { - Self::new(dev_mode, kbd_tx, mou_tx, paste_tx) + let aggregator = Self::new(dev_mode, kbd_tx, mou_tx, paste_tx); + aggregator + .remote_capture_enabled + .store(capture_remote_boot, std::sync::atomic::Ordering::Relaxed); + aggregator } pub fn init(&mut self) -> anyhow::Result<()> { @@ -145,6 +152,10 @@ mod input { #[allow(unreachable_code)] Ok(()) } + + pub fn remote_capture_enabled_handle(&self) -> Arc { + Arc::clone(&self.remote_capture_enabled) + } } } } @@ -268,27 +279,97 @@ mod tests { #[test] fn keyboard_stream_report_turns_lag_into_a_clean_reset() { - let pkt = keyboard_stream_report(Err(BroadcastStreamRecvError::Lagged(3))) - .expect("lagged keyboard item should produce reset"); + let mut remote_capture_was_enabled = true; + let pkt = keyboard_stream_report( + Err(BroadcastStreamRecvError::Lagged(3)), + true, + &mut remote_capture_was_enabled, + ) + .expect("lagged keyboard item should produce reset"); assert_eq!(pkt.data, vec![0; 8]); - let passthrough = keyboard_stream_report(Ok(KeyboardReport { - data: vec![1, 2, 3], - })) + let passthrough = keyboard_stream_report( + Ok(KeyboardReport { + data: vec![1, 2, 3], + }), + true, + &mut remote_capture_was_enabled, + ) .expect("ok keyboard item should pass through"); assert_eq!(passthrough.data, vec![1, 2, 3]); } #[test] fn mouse_stream_report_turns_lag_into_a_neutral_packet() { - let pkt = mouse_stream_report(Err(BroadcastStreamRecvError::Lagged(5))) - .expect("lagged mouse item should produce neutral packet"); + let mut remote_capture_was_enabled = true; + let pkt = mouse_stream_report( + Err(BroadcastStreamRecvError::Lagged(5)), + true, + &mut remote_capture_was_enabled, + ) + .expect("lagged mouse item should produce neutral packet"); assert_eq!(pkt.data, vec![0; 4]); - let passthrough = mouse_stream_report(Ok(MouseReport { - data: vec![9, 8, 7, 6], - })) + let passthrough = mouse_stream_report( + Ok(MouseReport { + data: vec![9, 8, 7, 6], + }), + true, + &mut remote_capture_was_enabled, + ) .expect("ok mouse item should pass through"); assert_eq!(passthrough.data, vec![9, 8, 7, 6]); } + + #[test] + fn keyboard_stream_report_blocks_stale_packets_after_local_handoff() { + let mut remote_capture_was_enabled = true; + let reset = keyboard_stream_report( + Ok(KeyboardReport { + data: vec![4, 0, 5, 0, 0, 0, 0, 0], + }), + false, + &mut remote_capture_was_enabled, + ) + .expect("switching local should emit one reset"); + assert_eq!(reset.data, vec![0; 8]); + + let dropped = keyboard_stream_report( + Ok(KeyboardReport { + data: vec![1, 2, 3], + }), + false, + &mut remote_capture_was_enabled, + ); + assert!( + dropped.is_none(), + "stale keyboard packets should be dropped locally" + ); + } + + #[test] + fn mouse_stream_report_blocks_stale_packets_after_local_handoff() { + let mut remote_capture_was_enabled = true; + let reset = mouse_stream_report( + Ok(MouseReport { + data: vec![9, 8, 7, 6], + }), + false, + &mut remote_capture_was_enabled, + ) + .expect("switching local should emit one neutral mouse packet"); + assert_eq!(reset.data, vec![0; 4]); + + let dropped = mouse_stream_report( + Ok(MouseReport { + data: vec![1, 1, 1, 1], + }), + false, + &mut remote_capture_was_enabled, + ); + assert!( + dropped.is_none(), + "stale mouse packets should be dropped locally" + ); + } }