lesavka: harden local handoff and launcher shutdown

This commit is contained in:
Brad Stein 2026-04-17 04:35:41 -03:00
parent 68510c086a
commit 4c505be4b2
10 changed files with 353 additions and 25 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.8.0"
version = "0.8.1"
edition = "2024"
[dependencies]

View File

@ -2,7 +2,7 @@
use anyhow::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::{
@ -37,6 +37,7 @@ pub struct LesavkaClientApp {
kbd_tx: broadcast::Sender<KeyboardReport>,
mou_tx: broadcast::Sender<MouseReport>,
paste_rx: Option<mpsc::UnboundedReceiver<String>>,
remote_capture_enabled: Arc<AtomicBool>,
}
impl LesavkaClientApp {
@ -54,16 +55,18 @@ impl LesavkaClientApp {
let (mou_tx, _) = broadcast::channel(4096);
let (paste_tx, paste_rx) = mpsc::unbounded_channel();
let agg = if headless {
None
let (agg, remote_capture_enabled) = if headless {
(None, Arc::new(AtomicBool::new(false)))
} else {
Some(InputAggregator::new_with_capture_mode(
let aggregator = InputAggregator::new_with_capture_mode(
dev_mode,
kbd_tx.clone(),
mou_tx.clone(),
Some(paste_tx),
capture_remote_boot,
))
);
let remote_capture_enabled = aggregator.remote_capture_enabled_handle();
(Some(aggregator), remote_capture_enabled)
};
Ok(Self {
@ -74,6 +77,7 @@ impl LesavkaClientApp {
kbd_tx,
mou_tx,
paste_rx: Some(paste_rx),
remote_capture_enabled,
})
}
@ -340,9 +344,18 @@ impl LesavkaClientApp {
loop {
info!("⌨️🤙 Keyboard dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let capture_enabled = Arc::clone(&self.remote_capture_enabled);
let mut remote_capture_was_enabled = capture_enabled.load(Ordering::Relaxed);
let outbound =
BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(keyboard_stream_report);
BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(move |report| {
let remote_capture_enabled = capture_enabled.load(Ordering::Relaxed);
keyboard_stream_report(
report,
remote_capture_enabled,
&mut remote_capture_was_enabled,
)
});
match cli.stream_keyboard(Request::new(outbound)).await {
Ok(mut resp) => {
@ -365,9 +378,18 @@ impl LesavkaClientApp {
loop {
info!("🖱️🤙 Mouse dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let capture_enabled = Arc::clone(&self.remote_capture_enabled);
let mut remote_capture_was_enabled = capture_enabled.load(Ordering::Relaxed);
let outbound =
BroadcastStream::new(self.mou_tx.subscribe()).filter_map(mouse_stream_report);
BroadcastStream::new(self.mou_tx.subscribe()).filter_map(move |report| {
let remote_capture_enabled = capture_enabled.load(Ordering::Relaxed);
mouse_stream_report(
report,
remote_capture_enabled,
&mut remote_capture_was_enabled,
)
});
match cli.stream_mouse(Request::new(outbound)).await {
Ok(mut resp) => {
@ -561,7 +583,15 @@ impl LesavkaClientApp {
pub(crate) fn keyboard_stream_report(
report: Result<KeyboardReport, BroadcastStreamRecvError>,
remote_capture_enabled: bool,
remote_capture_was_enabled: &mut bool,
) -> Option<KeyboardReport> {
if !remote_capture_enabled {
let emit_reset = *remote_capture_was_enabled;
*remote_capture_was_enabled = false;
return emit_reset.then_some(KeyboardReport { data: vec![0; 8] });
}
*remote_capture_was_enabled = true;
match report {
Ok(report) => Some(report),
Err(BroadcastStreamRecvError::Lagged(skipped)) => {
@ -576,7 +606,15 @@ pub(crate) fn keyboard_stream_report(
pub(crate) fn mouse_stream_report(
report: Result<MouseReport, BroadcastStreamRecvError>,
remote_capture_enabled: bool,
remote_capture_was_enabled: &mut bool,
) -> Option<MouseReport> {
if !remote_capture_enabled {
let emit_reset = *remote_capture_was_enabled;
*remote_capture_was_enabled = false;
return emit_reset.then_some(MouseReport { data: vec![0; 4] });
}
*remote_capture_was_enabled = true;
match report {
Ok(report) => Some(report),
Err(BroadcastStreamRecvError::Lagged(skipped)) => {

View File

@ -7,6 +7,10 @@ use evdev::{AbsoluteAxisCode, Device, EventType, KeyCode, RelativeAxisCode};
use std::collections::HashSet;
#[cfg(not(coverage))]
use std::path::{Path, PathBuf};
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::time::Instant;
use tokio::{
sync::broadcast::Sender,
@ -55,6 +59,7 @@ pub struct InputAggregator {
routing_state_path: Option<PathBuf>,
#[cfg(not(coverage))]
published_remote_capture: Option<bool>,
remote_capture_enabled: Arc<AtomicBool>,
}
impl InputAggregator {
@ -120,9 +125,14 @@ impl InputAggregator {
routing_state_path,
#[cfg(not(coverage))]
published_remote_capture: None,
remote_capture_enabled: Arc::new(AtomicBool::new(capture_remote_boot)),
}
}
pub fn remote_capture_enabled_handle(&self) -> Arc<AtomicBool> {
Arc::clone(&self.remote_capture_enabled)
}
#[cfg(coverage)]
pub fn init(&mut self) -> Result<()> {
let paths = std::fs::read_dir("/dev/input").context("Failed to read /dev/input")?;
@ -360,6 +370,7 @@ impl InputAggregator {
}
if want_kill && !self.pending_kill {
warn!("🧙 magic chord - killing 🪄 AVADA KEDAVRA!!! 💥💀⚰️");
self.remote_capture_enabled.store(false, Ordering::Relaxed);
for k in &mut self.keyboards {
k.send_empty_report();
k.set_send(false);
@ -433,6 +444,7 @@ impl InputAggregator {
}
fn enable_remote_capture(&mut self) {
self.remote_capture_enabled.store(true, Ordering::Relaxed);
for k in &mut self.keyboards {
k.reset_state();
k.set_send(true);
@ -449,6 +461,7 @@ impl InputAggregator {
}
fn begin_local_release(&mut self) {
self.remote_capture_enabled.store(false, Ordering::Relaxed);
for k in &mut self.keyboards {
k.send_empty_report();
k.set_send(false);

View File

@ -187,6 +187,19 @@ impl LauncherPreview {
}
}
pub fn shutdown_all(&self) {
if let Ok(feeds) = self.inline_feeds.lock() {
for feed in feeds.iter() {
feed.shutdown();
}
}
if let Ok(feeds) = self.window_feeds.lock() {
for feed in feeds.iter() {
feed.shutdown();
}
}
}
pub fn install_on_picture(
&self,
monitor_id: usize,

View File

@ -20,8 +20,8 @@ use {
input_control_path, input_state_path, next_input_routing, open_diagnostics_popout,
open_popout_window, open_session_log_popout, path_marker, present_popout_windows,
read_input_routing_state, reap_exited_child, refresh_launcher_ui, refresh_test_buttons,
routing_name, selected_combo_value, selected_server_addr, spawn_client_process,
stop_child_process, toggle_key_label, update_test_action_result,
routing_name, selected_combo_value, selected_server_addr, shutdown_launcher_runtime,
spawn_client_process, stop_child_process, toggle_key_label, update_test_action_result,
write_input_routing_request,
},
crate::handshake::{HandshakeProbe, probe},
@ -519,6 +519,56 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let popouts = Rc::clone(&view.popouts);
let diagnostics_popout = Rc::clone(&view.diagnostics_popout);
let log_popout = Rc::clone(&view.log_popout);
let shutdown_cleaned = Rc::new(Cell::new(false));
{
let shutdown_cleaned = Rc::clone(&shutdown_cleaned);
let child_proc = Rc::clone(&child_proc);
let tests = Rc::clone(&tests);
let preview = preview.clone();
let widgets = widgets.clone();
let popouts = Rc::clone(&popouts);
let diagnostics_popout = Rc::clone(&diagnostics_popout);
let log_popout = Rc::clone(&log_popout);
window.connect_close_request(move |_| {
if !shutdown_cleaned.replace(true) {
shutdown_launcher_runtime(
&child_proc,
&tests,
preview.as_deref(),
&widgets,
&popouts,
&diagnostics_popout,
&log_popout,
);
}
glib::Propagation::Proceed
});
}
{
let shutdown_cleaned = Rc::clone(&shutdown_cleaned);
let child_proc = Rc::clone(&child_proc);
let tests = Rc::clone(&tests);
let preview = preview.clone();
let widgets = widgets.clone();
let popouts = Rc::clone(&popouts);
let diagnostics_popout = Rc::clone(&diagnostics_popout);
let log_popout = Rc::clone(&log_popout);
app.connect_shutdown(move |_| {
if !shutdown_cleaned.replace(true) {
shutdown_launcher_runtime(
&child_proc,
&tests,
preview.as_deref(),
&widgets,
&popouts,
&diagnostics_popout,
&log_popout,
);
}
});
}
{
let mut tests = tests.borrow_mut();

View File

@ -982,6 +982,59 @@ pub fn stop_child_process(child_proc: &Rc<RefCell<Option<RelayChild>>>) {
}
}
pub fn shutdown_launcher_runtime(
child_proc: &Rc<RefCell<Option<RelayChild>>>,
tests: &Rc<RefCell<DeviceTestController>>,
preview: Option<&LauncherPreview>,
widgets: &LauncherWidgets,
popouts: &Rc<RefCell<[Option<PopoutWindowHandle>; 2]>>,
diagnostics_popout: &Rc<RefCell<Option<gtk::ApplicationWindow>>>,
log_popout: &Rc<RefCell<Option<gtk::ApplicationWindow>>>,
) {
stop_child_process(child_proc);
tests.borrow_mut().stop_all();
if let Some(preview) = preview {
preview.set_session_active(false);
preview.shutdown_all();
}
for pane in &widgets.display_panes {
if let Some(binding) = pane.preview_binding.borrow_mut().take() {
binding.close();
}
pane.picture.set_paintable(Option::<&gdk::Paintable>::None);
pane.stream_status.set_text("");
}
let mut detached_popouts = Vec::new();
{
let mut slots = popouts.borrow_mut();
for slot in slots.iter_mut() {
if let Some(handle) = slot.take() {
detached_popouts.push(handle);
}
}
}
for handle in detached_popouts {
handle.binding.close();
handle
.picture
.set_paintable(Option::<&gdk::Paintable>::None);
handle.window.set_child(Option::<&gtk::Widget>::None);
handle.window.hide();
}
if let Some(window) = diagnostics_popout.borrow_mut().take() {
window.set_child(Option::<&gtk::Widget>::None);
window.hide();
}
if let Some(window) = log_popout.borrow_mut().take() {
window.set_child(Option::<&gtk::Widget>::None);
window.hide();
}
}
pub fn reap_exited_child(child_proc: &Rc<RefCell<Option<RelayChild>>>) -> bool {
let mut slot = child_proc.borrow_mut();
match slot.as_mut() {
@ -1223,4 +1276,84 @@ mod tests {
assert!(popouts.borrow().iter().all(|handle| handle.is_none()));
assert_eq!(state.borrow().display_surface(0), DisplaySurface::Preview);
}
#[test]
fn shutdown_launcher_runtime_closes_preview_bindings_and_popouts() {
if gtk::init().is_err() || gtk::gdk::Display::default().is_none() {
return;
}
let app = gtk::Application::builder()
.application_id("dev.lesavka.test-shutdown")
.build();
let _ = app.register(None::<&gtk::gio::Cancellable>);
let state = Rc::new(RefCell::new(LauncherState::new()));
let state_snapshot = state.borrow().clone();
let view = build_launcher_view(
&app,
"http://127.0.0.1:50051",
&DeviceCatalog::default(),
&state_snapshot,
);
let child_proc = Rc::new(RefCell::new(None::<RelayChild>));
let tests = Rc::new(RefCell::new(DeviceTestController::new()));
let left_binding = PreviewBinding::test_stub();
let right_binding = PreviewBinding::test_stub();
*view.widgets.display_panes[0].preview_binding.borrow_mut() = Some(left_binding.clone());
*view.widgets.display_panes[1].preview_binding.borrow_mut() = Some(right_binding.clone());
{
let mut popouts = view.popouts.borrow_mut();
popouts[0] = Some(PopoutWindowHandle {
window: gtk::ApplicationWindow::builder()
.application(&app)
.title("Left")
.build(),
picture: gtk::Picture::new(),
status_label: gtk::Label::new(None),
binding: PreviewBinding::test_stub(),
});
}
*view.diagnostics_popout.borrow_mut() = Some(
gtk::ApplicationWindow::builder()
.application(&app)
.title("Diagnostics")
.build(),
);
*view.log_popout.borrow_mut() = Some(
gtk::ApplicationWindow::builder()
.application(&app)
.title("Log")
.build(),
);
shutdown_launcher_runtime(
&child_proc,
&tests,
None,
&view.widgets,
&view.popouts,
&view.diagnostics_popout,
&view.log_popout,
);
assert!(view.popouts.borrow().iter().all(|handle| handle.is_none()));
assert!(
view.widgets.display_panes[0]
.preview_binding
.borrow()
.is_none()
);
assert!(
view.widgets.display_panes[1]
.preview_binding
.borrow()
.is_none()
);
assert!(view.diagnostics_popout.borrow().is_none());
assert!(view.log_popout.borrow().is_none());
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.8.0"
version = "0.8.1"
edition = "2024"
build = "build.rs"

View File

@ -17,6 +17,6 @@ mod tests {
#[test]
fn banner_includes_version() {
assert_eq!(banner("0.8.0"), "lesavka-common CLI (v0.8.0)");
assert_eq!(banner("0.8.1"), "lesavka-common CLI (v0.8.1)");
}
}

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.8.0"
version = "0.8.1"
edition = "2024"
autobins = false

View File

@ -102,6 +102,7 @@ mod input {
pub mod inputs {
use lesavka_common::lesavka::{KeyboardReport, MouseReport};
use std::sync::{Arc, atomic::AtomicBool};
use tokio::sync::{broadcast::Sender, mpsc::UnboundedSender};
pub struct InputAggregator {
@ -109,6 +110,7 @@ mod input {
_mou_tx: Sender<MouseReport>,
_dev_mode: bool,
_paste_tx: Option<UnboundedSender<String>>,
remote_capture_enabled: Arc<AtomicBool>,
}
impl InputAggregator {
@ -123,6 +125,7 @@ mod input {
_mou_tx: mou_tx,
_dev_mode: dev_mode,
_paste_tx: paste_tx,
remote_capture_enabled: Arc::new(AtomicBool::new(true)),
}
}
@ -131,9 +134,13 @@ mod input {
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>,
paste_tx: Option<UnboundedSender<String>>,
_capture_remote_boot: bool,
capture_remote_boot: bool,
) -> Self {
Self::new(dev_mode, kbd_tx, mou_tx, paste_tx)
let aggregator = Self::new(dev_mode, kbd_tx, mou_tx, paste_tx);
aggregator
.remote_capture_enabled
.store(capture_remote_boot, std::sync::atomic::Ordering::Relaxed);
aggregator
}
pub fn init(&mut self) -> anyhow::Result<()> {
@ -145,6 +152,10 @@ mod input {
#[allow(unreachable_code)]
Ok(())
}
pub fn remote_capture_enabled_handle(&self) -> Arc<AtomicBool> {
Arc::clone(&self.remote_capture_enabled)
}
}
}
}
@ -268,27 +279,97 @@ mod tests {
#[test]
fn keyboard_stream_report_turns_lag_into_a_clean_reset() {
let pkt = keyboard_stream_report(Err(BroadcastStreamRecvError::Lagged(3)))
.expect("lagged keyboard item should produce reset");
let mut remote_capture_was_enabled = true;
let pkt = keyboard_stream_report(
Err(BroadcastStreamRecvError::Lagged(3)),
true,
&mut remote_capture_was_enabled,
)
.expect("lagged keyboard item should produce reset");
assert_eq!(pkt.data, vec![0; 8]);
let passthrough = keyboard_stream_report(Ok(KeyboardReport {
data: vec![1, 2, 3],
}))
let passthrough = keyboard_stream_report(
Ok(KeyboardReport {
data: vec![1, 2, 3],
}),
true,
&mut remote_capture_was_enabled,
)
.expect("ok keyboard item should pass through");
assert_eq!(passthrough.data, vec![1, 2, 3]);
}
#[test]
fn mouse_stream_report_turns_lag_into_a_neutral_packet() {
let pkt = mouse_stream_report(Err(BroadcastStreamRecvError::Lagged(5)))
.expect("lagged mouse item should produce neutral packet");
let mut remote_capture_was_enabled = true;
let pkt = mouse_stream_report(
Err(BroadcastStreamRecvError::Lagged(5)),
true,
&mut remote_capture_was_enabled,
)
.expect("lagged mouse item should produce neutral packet");
assert_eq!(pkt.data, vec![0; 4]);
let passthrough = mouse_stream_report(Ok(MouseReport {
data: vec![9, 8, 7, 6],
}))
let passthrough = mouse_stream_report(
Ok(MouseReport {
data: vec![9, 8, 7, 6],
}),
true,
&mut remote_capture_was_enabled,
)
.expect("ok mouse item should pass through");
assert_eq!(passthrough.data, vec![9, 8, 7, 6]);
}
#[test]
fn keyboard_stream_report_blocks_stale_packets_after_local_handoff() {
let mut remote_capture_was_enabled = true;
let reset = keyboard_stream_report(
Ok(KeyboardReport {
data: vec![4, 0, 5, 0, 0, 0, 0, 0],
}),
false,
&mut remote_capture_was_enabled,
)
.expect("switching local should emit one reset");
assert_eq!(reset.data, vec![0; 8]);
let dropped = keyboard_stream_report(
Ok(KeyboardReport {
data: vec![1, 2, 3],
}),
false,
&mut remote_capture_was_enabled,
);
assert!(
dropped.is_none(),
"stale keyboard packets should be dropped locally"
);
}
#[test]
fn mouse_stream_report_blocks_stale_packets_after_local_handoff() {
let mut remote_capture_was_enabled = true;
let reset = mouse_stream_report(
Ok(MouseReport {
data: vec![9, 8, 7, 6],
}),
false,
&mut remote_capture_was_enabled,
)
.expect("switching local should emit one neutral mouse packet");
assert_eq!(reset.data, vec![0; 4]);
let dropped = mouse_stream_report(
Ok(MouseReport {
data: vec![1, 1, 1, 1],
}),
false,
&mut remote_capture_was_enabled,
);
assert!(
dropped.is_none(),
"stale mouse packets should be dropped locally"
);
}
}