From b8183d68611e9ffd2f6d471f0e8d398b5ee171d1 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 11 Jun 2025 00:37:01 -0500 Subject: [PATCH] unblocked evtest --- client/src/app.rs | 66 +++++++++++++++++++++--------------- client/src/input/inputs.rs | 63 ++++++++++++++++++++++------------ client/src/input/keyboard.rs | 64 ++++++++++++++++++++++------------ client/src/input/mouse.rs | 28 ++++++++------- 4 files changed, 137 insertions(+), 84 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 158cf05..8316d94 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -1,7 +1,8 @@ // client/src/app.rs -use anyhow::{Context, Result}; -use tokio::{sync::mpsc, time::Duration, task::JoinHandle}; +use anyhow::Result; +use std::time::Duration; +use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; use tonic::Request; use tracing::{info, warn, error}; @@ -9,7 +10,7 @@ use navka_common::navka::{relay_client::RelayClient, HidReport}; use crate::input::inputs::InputAggregator; pub struct NavkaClientApp { - aggregator: InputAggregator, + aggregator: Option, server_addr: String, dev_mode: bool, } @@ -21,11 +22,11 @@ impl NavkaClientApp { .nth(1) .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".to_owned()); - let mut aggregator = InputAggregator::new(); + let mut aggregator = InputAggregator::new(dev_mode); aggregator.init()?; // discover & grab - + Ok(Self { - aggregator, + aggregator: Some(aggregator), server_addr: addr, dev_mode, }) @@ -33,15 +34,16 @@ impl NavkaClientApp { pub async fn run(&mut self) -> Result<()> { // spawn aggregator + let mut aggregator = std::mem::take(&mut self.aggregator).expect("aggregator must exist"); let aggregator_task: JoinHandle> = tokio::spawn(async move { - self.aggregator.run().await + aggregator.run().await }); // dev-mode kill future let suicide_future = async { if self.dev_mode { - info!("DEV-mode: will kill itself in 30 s"); - tokio::time::sleep(Duration::from_secs(30)).await; + info!("DEV-mode: will kill itself in 30s"); + tokio::time::sleep(std::time::Duration::from_secs(30)).await; Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - goodbye cruel world...")) } else { futures::future::pending().await @@ -50,16 +52,30 @@ impl NavkaClientApp { // Combine aggregator + dev + reconnect logic tokio::select! { - res = aggregator_task => { - error!("Aggregator ended: {res:?}"); - std::process::exit(1); + // aggregator finishes + agg_res = aggregator_task => { + match agg_res { + Ok(Ok(())) => { + error!("Aggregator ended normally, exiting."); + std::process::exit(0); + } + Ok(Err(e)) => { + error!("Aggregator ended with error: {e}"); + std::process::exit(1); + } + Err(join_err) => { + error!("Aggregator task panicked or was cancelled: {join_err}"); + std::process::exit(1); + } + } }, + // dev-mode res = suicide_future => { warn!("Dev-mode: {res:?}"); std::process::exit(0); }, + // reconnect loop _ = self.reconnect_loop() => { - // if that loop ends, e.g. user or server warn!("Reconnect loop ended?? We exit"); std::process::exit(0); } @@ -70,26 +86,27 @@ impl NavkaClientApp { /// and waits for inbound to end. Then tries again, unless aggregator ended. async fn reconnect_loop(&self) { loop { - // dial the server + // dial the servers + info!("Dialing server at: {}", self.server_addr); let mut client = match RelayClient::connect(self.server_addr.clone()).await { Ok(c) => c, Err(e) => { - error!("connect error {e}, sleeping 5s"); - tokio::time::sleep(Duration::from_secs(5)).await; + error!("connect error {e}, sleeping 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; continue; } }; // fresh channel for aggregator => we do this with new (tx, rx) - let (tx, rx) = mpsc::channel::(32); + let (_tx, rx) = mpsc::channel::(32); // aggregator can hold 'tx' by storing in some global or so? let outbound = ReceiverStream::new(rx); let response = match client.stream(Request::new(outbound)).await { Ok(r) => r, Err(e) => { - error!("stream RPC error: {e}, sleeping 5s"); - tokio::time::sleep(Duration::from_secs(5)).await; + error!("stream RPC error: {e}, sleeping 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; continue; } }; @@ -107,15 +124,8 @@ impl NavkaClientApp { } } } - warn!("Inbound ended. Will try to reconnect in 5s"); - tokio::time::sleep(Duration::from_secs(5)).await; + warn!("Inbound ended. Will try to reconnect in 1s"); + tokio::time::sleep(Duration::from_secs(1)).await; } } - - fn make_channel(&self) -> Result> { - // aggregator's main channel - let (tx, _rx) = mpsc::channel(32); - // aggregator would store tx in self - Ok(tx) - } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index 42265a5..05f02ed 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -1,9 +1,9 @@ // client/src/input/inputs.rs use anyhow::{bail, Context, Result}; -use evdev::{Device, EventType, KeyCode, RelativeAxisType}; +use evdev::{Device, EventType, KeyCode, RelativeAxisCode}; use tokio::time::{interval, Duration}; -use tracing::info; +use tracing::{debug, info}; use crate::input::keyboard::KeyboardAggregator; use crate::input::mouse::MouseAggregator; @@ -15,16 +15,17 @@ use crate::input::microphone::MicrophoneCapture; /// spawns specialized aggregator objects, and can also /// create stubs for camera/microphone logic if needed. pub struct InputAggregator { - keyboards: Vec, - mice: Vec, - // Possibly store camera or mic aggregator as well: - camera: Option, - mic: Option, + pub dev_mode: bool, + keyboards: Vec, + mice: Vec, + camera: Option, + mic: Option, } impl InputAggregator { - pub fn new() -> Self { + pub fn new(dev_mode: bool) -> Self { Self { + dev_mode, keyboards: Vec::new(), mice: Vec::new(), camera: None, @@ -45,9 +46,7 @@ impl InputAggregator { let path = entry.path(); // skip anything that isn't "event*" - if !path.file_name() - .map_or(false, |f| f.to_string_lossy().starts_with("event")) - { + if !path.file_name().map_or(false, |f| f.to_string_lossy().starts_with("event")) { continue; } @@ -60,23 +59,32 @@ impl InputAggregator { } }; + // non‑blocking so fetch_events never stalls the whole loop + dev.set_nonblocking(true).with_context(|| format!("set_non_blocking {:?}", path))?; + match classify_device(&dev) { DeviceKind::Keyboard => { - dev.grab().with_context(|| format!("grabbing keyboard {:?}", path))?; + dev.grab().with_context(|| format!("grabbing keyboard {path:?}"))?; info!("Grabbed keyboard {:?}", dev.name().unwrap_or("UNKNOWN")); - let kbd_agg = KeyboardAggregator::new(dev); + + // pass dev_mode to aggregator + let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode); self.keyboards.push(kbd_agg); found_any = true; + continue; } DeviceKind::Mouse => { - dev.grab().with_context(|| format!("grabbing mouse {:?}", path))?; + dev.grab().with_context(|| format!("grabbing mouse {path:?}"))?; info!("Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN")); + let mouse_agg = MouseAggregator::new(dev); self.mice.push(mouse_agg); found_any = true; + continue; } DeviceKind::Other => { - tracing::debug!("Skipping non-kbd/mouse device: {:?}", dev.name().unwrap_or("UNKNOWN")); + debug!("Skipping non-kbd/mouse device: {:?}", dev.name().unwrap_or("UNKNOWN")); + continue; } } } @@ -110,11 +118,17 @@ impl InputAggregator { } } +#[derive(Debug)] +struct Classification { + keyboard: Option<()>, + mouse: Option<()>, +} + /// The classification function fn classify_device(dev: &Device) -> DeviceKind { let evbits = dev.supported_events(); - // If it supports typed scancodes => Keyboard + // Keyboard logic if evbits.contains(EventType::KEY) { if let Some(keys) = dev.supported_keys() { if keys.contains(KeyCode::KEY_A) || keys.contains(KeyCode::KEY_ENTER) { @@ -122,12 +136,17 @@ fn classify_device(dev: &Device) -> DeviceKind { } } } - // If it supports REL_X / REL_Y => Mouse - if evbits.contains(EventType::REL) { - if let Some(rel) = dev.supported_relative_axes() { - if rel.contains(RelativeAxisType::REL_X) && - rel.contains(RelativeAxisType::REL_Y) - { + + // Mouse logic + if evbits.contains(EventType::RELATIVE) { + if let (Some(rel), Some(keys)) = + (dev.supported_relative_axes(), dev.supported_keys()) + { + let has_xy = rel.contains(RelativeAxisCode::REL_X) + && rel.contains(RelativeAxisCode::REL_Y); + let has_btn = keys.contains(KeyCode::BTN_LEFT) + || keys.contains(KeyCode::BTN_RIGHT); + if has_xy && has_btn { return DeviceKind::Mouse; } } diff --git a/client/src/input/keyboard.rs b/client/src/input/keyboard.rs index aca2ce4..c3bcf0b 100644 --- a/client/src/input/keyboard.rs +++ b/client/src/input/keyboard.rs @@ -2,60 +2,81 @@ use std::collections::HashSet; use evdev::{Device, InputEvent, KeyCode, EventType}; -use tracing::warn; +use tracing::{warn, error, info, debug}; -use navka_common::navka::HidReport; use crate::input::keymap::{keycode_to_usage, is_modifier}; /// The aggregator logic for a single keyboard device. pub struct KeyboardAggregator { dev: Device, + dev_mode: bool, pressed_keys: HashSet, } impl KeyboardAggregator { - pub fn new(dev: Device) -> Self { + pub fn new(mut dev: Device, dev_mode: bool) -> Self { + let _ = dev.set_nonblocking(true); Self { dev, + dev_mode, pressed_keys: HashSet::new(), } } /// Called frequently (e.g. every ~10ms) to fetch + handle events pub fn process_events(&mut self) { - match self.dev.fetch_events() { - Ok(events) => { - for ev in events { - self.handle_event(ev); + let events: Vec = { + match self.dev.fetch_events() { + Ok(it) => it.collect(), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return, + Err(e) => { + error!("Keyboard device read error: {e}"); + return; } } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // nothing - } - Err(e) => { - tracing::error!("Keyboard device read error: {e}"); - } + }; + + if self.dev_mode && !events.is_empty() { + info!( + "Got {} events from dev: {}", + events.len(), + self.dev.name().unwrap_or("???") + ); } - } + + for ev in events { + self.handle_event(ev); + } + } fn handle_event(&mut self, ev: InputEvent) { if ev.event_type() == EventType::KEY { let code = KeyCode::new(ev.code()); - match ev.value() { - 1 => { self.pressed_keys.insert(code); } - 0 => { self.pressed_keys.remove(&code); } - 2 => { /* repeats, if needed */ } + let val = ev.value(); // 1 = press, 0 = release, 2 = repeat + + if self.dev_mode { + info!( + "Keyboard event: code={:?}, value={}, name={:?}", + code, val, self.dev.name() + ); + } + + match val { + 1 => {self.pressed_keys.insert(code);} + 0 => {self.pressed_keys.remove(&code);} + 2 => {/* repeat */} _ => {} } let report = self.build_report(); // TODO: send this somewhere (e.g. an mpsc::Sender) // For now, just log: - tracing::debug!(?report, "Keyboard HID report"); + debug!(?report, "Keyboard HID report"); // optional: magic chord if self.is_magic_chord() { warn!("Magic chord pressed => exit aggregator??"); // Or do something else + std::process::exit(0); } } } @@ -82,8 +103,7 @@ impl KeyboardAggregator { } fn is_magic_chord(&self) -> bool { - // example logic - self.pressed_keys.contains(&KeyCode::KEY_LEFTCTRL) && - self.pressed_keys.contains(&KeyCode::KEY_ESC) + self.pressed_keys.contains(&KeyCode::KEY_LEFTCTRL) + && self.pressed_keys.contains(&KeyCode::KEY_ESC) } } diff --git a/client/src/input/mouse.rs b/client/src/input/mouse.rs index 279ac8d..18dbed4 100644 --- a/client/src/input/mouse.rs +++ b/client/src/input/mouse.rs @@ -1,6 +1,7 @@ // client/src/input/mouse.rs -use evdev::{Device, InputEvent, EventType, RelativeAxisType}; +use evdev::{Device, InputEvent, EventType, RelativeAxisCode}; +use tracing::error; /// Aggregator for a single mouse device pub struct MouseAggregator { @@ -20,26 +21,29 @@ impl MouseAggregator { } pub fn process_events(&mut self) { - match self.dev.fetch_events() { - Ok(events) => { - for ev in events { - self.handle_event(ev); - } + let events_vec: Vec = match self.dev.fetch_events() { + Ok(it) => it.collect(), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + return; } - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {} Err(e) => { - tracing::error!("Mouse device read error: {e}"); + error!("Mouse device read error: {e}"); + return; } + }; + + for ev in events_vec { + self.handle_event(ev); } } fn handle_event(&mut self, ev: InputEvent) { - if ev.event_type() == EventType::REL { - match ev.code() { - x if x == RelativeAxisType::REL_X.0 => { + if ev.event_type() == EventType::RELATIVE { + match RelativeAxisCode(ev.code()) { + RelativeAxisCode::REL_X => { self.dx += ev.value(); } - y if y == RelativeAxisType::REL_Y.0 => { + RelativeAxisCode::REL_Y => { self.dy += ev.value(); } _ => {}