unblocked evtest

This commit is contained in:
Brad Stein 2025-06-11 00:37:01 -05:00
parent d4b8208762
commit b8183d6861
4 changed files with 137 additions and 84 deletions

View File

@ -1,7 +1,8 @@
// client/src/app.rs // client/src/app.rs
use anyhow::{Context, Result}; use anyhow::Result;
use tokio::{sync::mpsc, time::Duration, task::JoinHandle}; use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::Request; use tonic::Request;
use tracing::{info, warn, error}; use tracing::{info, warn, error};
@ -9,7 +10,7 @@ use navka_common::navka::{relay_client::RelayClient, HidReport};
use crate::input::inputs::InputAggregator; use crate::input::inputs::InputAggregator;
pub struct NavkaClientApp { pub struct NavkaClientApp {
aggregator: InputAggregator, aggregator: Option<InputAggregator>,
server_addr: String, server_addr: String,
dev_mode: bool, dev_mode: bool,
} }
@ -21,11 +22,11 @@ impl NavkaClientApp {
.nth(1) .nth(1)
.or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok())
.unwrap_or_else(|| "http://127.0.0.1:50051".to_owned()); .unwrap_or_else(|| "http://127.0.0.1:50051".to_owned());
let mut aggregator = InputAggregator::new(); let mut aggregator = InputAggregator::new(dev_mode);
aggregator.init()?; // discover & grab aggregator.init()?; // discover & grab
Ok(Self { Ok(Self {
aggregator, aggregator: Some(aggregator),
server_addr: addr, server_addr: addr,
dev_mode, dev_mode,
}) })
@ -33,15 +34,16 @@ impl NavkaClientApp {
pub async fn run(&mut self) -> Result<()> { pub async fn run(&mut self) -> Result<()> {
// spawn aggregator // spawn aggregator
let mut aggregator = std::mem::take(&mut self.aggregator).expect("aggregator must exist");
let aggregator_task: JoinHandle<Result<()>> = tokio::spawn(async move { let aggregator_task: JoinHandle<Result<()>> = tokio::spawn(async move {
self.aggregator.run().await aggregator.run().await
}); });
// dev-mode kill future // dev-mode kill future
let suicide_future = async { let suicide_future = async {
if self.dev_mode { if self.dev_mode {
info!("DEV-mode: will kill itself in 30 s"); info!("DEV-mode: will kill itself in 30s");
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(std::time::Duration::from_secs(30)).await;
Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - goodbye cruel world...")) Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - goodbye cruel world..."))
} else { } else {
futures::future::pending().await futures::future::pending().await
@ -50,16 +52,30 @@ impl NavkaClientApp {
// Combine aggregator + dev + reconnect logic // Combine aggregator + dev + reconnect logic
tokio::select! { tokio::select! {
res = aggregator_task => { // aggregator finishes
error!("Aggregator ended: {res:?}"); agg_res = aggregator_task => {
std::process::exit(1); match agg_res {
Ok(Ok(())) => {
error!("Aggregator ended normally, exiting.");
std::process::exit(0);
}
Ok(Err(e)) => {
error!("Aggregator ended with error: {e}");
std::process::exit(1);
}
Err(join_err) => {
error!("Aggregator task panicked or was cancelled: {join_err}");
std::process::exit(1);
}
}
}, },
// dev-mode
res = suicide_future => { res = suicide_future => {
warn!("Dev-mode: {res:?}"); warn!("Dev-mode: {res:?}");
std::process::exit(0); std::process::exit(0);
}, },
// reconnect loop
_ = self.reconnect_loop() => { _ = self.reconnect_loop() => {
// if that loop ends, e.g. user or server
warn!("Reconnect loop ended?? We exit"); warn!("Reconnect loop ended?? We exit");
std::process::exit(0); std::process::exit(0);
} }
@ -70,26 +86,27 @@ impl NavkaClientApp {
/// and waits for inbound to end. Then tries again, unless aggregator ended. /// and waits for inbound to end. Then tries again, unless aggregator ended.
async fn reconnect_loop(&self) { async fn reconnect_loop(&self) {
loop { loop {
// dial the server // dial the servers
info!("Dialing server at: {}", self.server_addr);
let mut client = match RelayClient::connect(self.server_addr.clone()).await { let mut client = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c, Ok(c) => c,
Err(e) => { Err(e) => {
error!("connect error {e}, sleeping 5s"); error!("connect error {e}, sleeping 1s");
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(1)).await;
continue; continue;
} }
}; };
// fresh channel for aggregator => we do this with new (tx, rx) // fresh channel for aggregator => we do this with new (tx, rx)
let (tx, rx) = mpsc::channel::<HidReport>(32); let (_tx, rx) = mpsc::channel::<HidReport>(32);
// aggregator can hold 'tx' by storing in some global or so? // aggregator can hold 'tx' by storing in some global or so?
let outbound = ReceiverStream::new(rx); let outbound = ReceiverStream::new(rx);
let response = match client.stream(Request::new(outbound)).await { let response = match client.stream(Request::new(outbound)).await {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
error!("stream RPC error: {e}, sleeping 5s"); error!("stream RPC error: {e}, sleeping 1s");
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(1)).await;
continue; continue;
} }
}; };
@ -107,15 +124,8 @@ impl NavkaClientApp {
} }
} }
} }
warn!("Inbound ended. Will try to reconnect in 5s"); warn!("Inbound ended. Will try to reconnect in 1s");
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(1)).await;
} }
} }
fn make_channel(&self) -> Result<mpsc::Sender<HidReport>> {
// aggregator's main channel
let (tx, _rx) = mpsc::channel(32);
// aggregator would store tx in self
Ok(tx)
}
} }

View File

@ -1,9 +1,9 @@
// client/src/input/inputs.rs // client/src/input/inputs.rs
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use evdev::{Device, EventType, KeyCode, RelativeAxisType}; use evdev::{Device, EventType, KeyCode, RelativeAxisCode};
use tokio::time::{interval, Duration}; use tokio::time::{interval, Duration};
use tracing::info; use tracing::{debug, info};
use crate::input::keyboard::KeyboardAggregator; use crate::input::keyboard::KeyboardAggregator;
use crate::input::mouse::MouseAggregator; use crate::input::mouse::MouseAggregator;
@ -15,16 +15,17 @@ use crate::input::microphone::MicrophoneCapture;
/// spawns specialized aggregator objects, and can also /// spawns specialized aggregator objects, and can also
/// create stubs for camera/microphone logic if needed. /// create stubs for camera/microphone logic if needed.
pub struct InputAggregator { pub struct InputAggregator {
keyboards: Vec<KeyboardAggregator>, pub dev_mode: bool,
mice: Vec<MouseAggregator>, keyboards: Vec<KeyboardAggregator>,
// Possibly store camera or mic aggregator as well: mice: Vec<MouseAggregator>,
camera: Option<CameraCapture>, camera: Option<CameraCapture>,
mic: Option<MicrophoneCapture>, mic: Option<MicrophoneCapture>,
} }
impl InputAggregator { impl InputAggregator {
pub fn new() -> Self { pub fn new(dev_mode: bool) -> Self {
Self { Self {
dev_mode,
keyboards: Vec::new(), keyboards: Vec::new(),
mice: Vec::new(), mice: Vec::new(),
camera: None, camera: None,
@ -45,9 +46,7 @@ impl InputAggregator {
let path = entry.path(); let path = entry.path();
// skip anything that isn't "event*" // skip anything that isn't "event*"
if !path.file_name() if !path.file_name().map_or(false, |f| f.to_string_lossy().starts_with("event")) {
.map_or(false, |f| f.to_string_lossy().starts_with("event"))
{
continue; continue;
} }
@ -60,23 +59,32 @@ impl InputAggregator {
} }
}; };
// nonblocking so fetch_events never stalls the whole loop
dev.set_nonblocking(true).with_context(|| format!("set_non_blocking {:?}", path))?;
match classify_device(&dev) { match classify_device(&dev) {
DeviceKind::Keyboard => { DeviceKind::Keyboard => {
dev.grab().with_context(|| format!("grabbing keyboard {:?}", path))?; dev.grab().with_context(|| format!("grabbing keyboard {path:?}"))?;
info!("Grabbed keyboard {:?}", dev.name().unwrap_or("UNKNOWN")); info!("Grabbed keyboard {:?}", dev.name().unwrap_or("UNKNOWN"));
let kbd_agg = KeyboardAggregator::new(dev);
// pass dev_mode to aggregator
let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode);
self.keyboards.push(kbd_agg); self.keyboards.push(kbd_agg);
found_any = true; found_any = true;
continue;
} }
DeviceKind::Mouse => { DeviceKind::Mouse => {
dev.grab().with_context(|| format!("grabbing mouse {:?}", path))?; dev.grab().with_context(|| format!("grabbing mouse {path:?}"))?;
info!("Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN")); info!("Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN"));
let mouse_agg = MouseAggregator::new(dev); let mouse_agg = MouseAggregator::new(dev);
self.mice.push(mouse_agg); self.mice.push(mouse_agg);
found_any = true; found_any = true;
continue;
} }
DeviceKind::Other => { DeviceKind::Other => {
tracing::debug!("Skipping non-kbd/mouse device: {:?}", dev.name().unwrap_or("UNKNOWN")); debug!("Skipping non-kbd/mouse device: {:?}", dev.name().unwrap_or("UNKNOWN"));
continue;
} }
} }
} }
@ -110,11 +118,17 @@ impl InputAggregator {
} }
} }
#[derive(Debug)]
struct Classification {
keyboard: Option<()>,
mouse: Option<()>,
}
/// The classification function /// The classification function
fn classify_device(dev: &Device) -> DeviceKind { fn classify_device(dev: &Device) -> DeviceKind {
let evbits = dev.supported_events(); let evbits = dev.supported_events();
// If it supports typed scancodes => Keyboard // Keyboard logic
if evbits.contains(EventType::KEY) { if evbits.contains(EventType::KEY) {
if let Some(keys) = dev.supported_keys() { if let Some(keys) = dev.supported_keys() {
if keys.contains(KeyCode::KEY_A) || keys.contains(KeyCode::KEY_ENTER) { if keys.contains(KeyCode::KEY_A) || keys.contains(KeyCode::KEY_ENTER) {
@ -122,12 +136,17 @@ fn classify_device(dev: &Device) -> DeviceKind {
} }
} }
} }
// If it supports REL_X / REL_Y => Mouse
if evbits.contains(EventType::REL) { // Mouse logic
if let Some(rel) = dev.supported_relative_axes() { if evbits.contains(EventType::RELATIVE) {
if rel.contains(RelativeAxisType::REL_X) && if let (Some(rel), Some(keys)) =
rel.contains(RelativeAxisType::REL_Y) (dev.supported_relative_axes(), dev.supported_keys())
{ {
let has_xy = rel.contains(RelativeAxisCode::REL_X)
&& rel.contains(RelativeAxisCode::REL_Y);
let has_btn = keys.contains(KeyCode::BTN_LEFT)
|| keys.contains(KeyCode::BTN_RIGHT);
if has_xy && has_btn {
return DeviceKind::Mouse; return DeviceKind::Mouse;
} }
} }

View File

@ -2,60 +2,81 @@
use std::collections::HashSet; use std::collections::HashSet;
use evdev::{Device, InputEvent, KeyCode, EventType}; use evdev::{Device, InputEvent, KeyCode, EventType};
use tracing::warn; use tracing::{warn, error, info, debug};
use navka_common::navka::HidReport;
use crate::input::keymap::{keycode_to_usage, is_modifier}; use crate::input::keymap::{keycode_to_usage, is_modifier};
/// The aggregator logic for a single keyboard device. /// The aggregator logic for a single keyboard device.
pub struct KeyboardAggregator { pub struct KeyboardAggregator {
dev: Device, dev: Device,
dev_mode: bool,
pressed_keys: HashSet<KeyCode>, pressed_keys: HashSet<KeyCode>,
} }
impl KeyboardAggregator { impl KeyboardAggregator {
pub fn new(dev: Device) -> Self { pub fn new(mut dev: Device, dev_mode: bool) -> Self {
let _ = dev.set_nonblocking(true);
Self { Self {
dev, dev,
dev_mode,
pressed_keys: HashSet::new(), pressed_keys: HashSet::new(),
} }
} }
/// Called frequently (e.g. every ~10ms) to fetch + handle events /// Called frequently (e.g. every ~10ms) to fetch + handle events
pub fn process_events(&mut self) { pub fn process_events(&mut self) {
match self.dev.fetch_events() { let events: Vec<InputEvent> = {
Ok(events) => { match self.dev.fetch_events() {
for ev in events { Ok(it) => it.collect(),
self.handle_event(ev); Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => {
error!("Keyboard device read error: {e}");
return;
} }
} }
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { };
// nothing
} if self.dev_mode && !events.is_empty() {
Err(e) => { info!(
tracing::error!("Keyboard device read error: {e}"); "Got {} events from dev: {}",
} events.len(),
self.dev.name().unwrap_or("???")
);
}
for ev in events {
self.handle_event(ev);
} }
} }
fn handle_event(&mut self, ev: InputEvent) { fn handle_event(&mut self, ev: InputEvent) {
if ev.event_type() == EventType::KEY { if ev.event_type() == EventType::KEY {
let code = KeyCode::new(ev.code()); let code = KeyCode::new(ev.code());
match ev.value() { let val = ev.value(); // 1 = press, 0 = release, 2 = repeat
1 => { self.pressed_keys.insert(code); }
0 => { self.pressed_keys.remove(&code); } if self.dev_mode {
2 => { /* repeats, if needed */ } info!(
"Keyboard event: code={:?}, value={}, name={:?}",
code, val, self.dev.name()
);
}
match val {
1 => {self.pressed_keys.insert(code);}
0 => {self.pressed_keys.remove(&code);}
2 => {/* repeat */}
_ => {} _ => {}
} }
let report = self.build_report(); let report = self.build_report();
// TODO: send this somewhere (e.g. an mpsc::Sender) // TODO: send this somewhere (e.g. an mpsc::Sender)
// For now, just log: // For now, just log:
tracing::debug!(?report, "Keyboard HID report"); debug!(?report, "Keyboard HID report");
// optional: magic chord // optional: magic chord
if self.is_magic_chord() { if self.is_magic_chord() {
warn!("Magic chord pressed => exit aggregator??"); warn!("Magic chord pressed => exit aggregator??");
// Or do something else // Or do something else
std::process::exit(0);
} }
} }
} }
@ -82,8 +103,7 @@ impl KeyboardAggregator {
} }
fn is_magic_chord(&self) -> bool { fn is_magic_chord(&self) -> bool {
// example logic self.pressed_keys.contains(&KeyCode::KEY_LEFTCTRL)
self.pressed_keys.contains(&KeyCode::KEY_LEFTCTRL) && && self.pressed_keys.contains(&KeyCode::KEY_ESC)
self.pressed_keys.contains(&KeyCode::KEY_ESC)
} }
} }

View File

@ -1,6 +1,7 @@
// client/src/input/mouse.rs // client/src/input/mouse.rs
use evdev::{Device, InputEvent, EventType, RelativeAxisType}; use evdev::{Device, InputEvent, EventType, RelativeAxisCode};
use tracing::error;
/// Aggregator for a single mouse device /// Aggregator for a single mouse device
pub struct MouseAggregator { pub struct MouseAggregator {
@ -20,26 +21,29 @@ impl MouseAggregator {
} }
pub fn process_events(&mut self) { pub fn process_events(&mut self) {
match self.dev.fetch_events() { let events_vec: Vec<InputEvent> = match self.dev.fetch_events() {
Ok(events) => { Ok(it) => it.collect(),
for ev in events { Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
self.handle_event(ev); return;
}
} }
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {}
Err(e) => { Err(e) => {
tracing::error!("Mouse device read error: {e}"); error!("Mouse device read error: {e}");
return;
} }
};
for ev in events_vec {
self.handle_event(ev);
} }
} }
fn handle_event(&mut self, ev: InputEvent) { fn handle_event(&mut self, ev: InputEvent) {
if ev.event_type() == EventType::REL { if ev.event_type() == EventType::RELATIVE {
match ev.code() { match RelativeAxisCode(ev.code()) {
x if x == RelativeAxisType::REL_X.0 => { RelativeAxisCode::REL_X => {
self.dx += ev.value(); self.dx += ev.value();
} }
y if y == RelativeAxisType::REL_Y.0 => { RelativeAxisCode::REL_Y => {
self.dy += ev.value(); self.dy += ev.value();
} }
_ => {} _ => {}