From f8ae81375062c394a7568ff1f83855f3c0c68bd1 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 17 Jun 2025 20:54:31 -0500 Subject: [PATCH] rewrite --- client/src/app.rs | 192 +++++++++++++++-------------------- client/src/input/inputs.rs | 51 ++++------ client/src/input/keyboard.rs | 77 +++++--------- client/src/input/mouse.rs | 119 ++++++++-------------- server/src/main.rs | 45 ++++---- 5 files changed, 191 insertions(+), 293 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 655f677..0c17221 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -1,16 +1,17 @@ // client/src/app.rs + #![forbid(unsafe_code)] + use anyhow::Result; use std::time::Duration; use tokio::{sync::broadcast, task::JoinHandle}; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tonic::Request; -use tracing::{info, warn, error, debug}; +use tracing::{debug, error, info, warn}; -use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport,}; - -use input::inputs::InputAggregator; +use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport}; +use crate::input::inputs::InputAggregator; pub struct NavkaClientApp { aggregator: Option, @@ -22,134 +23,105 @@ pub struct NavkaClientApp { impl NavkaClientApp { pub fn new() -> Result { - info!("Creating navka-client app!"); - let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok(); - let server = std::env::args() + let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok(); + let server_addr = std::env::args() .nth(1) .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) - .unwrap_or_else(|| "http://127.0.0.1:50051".to_owned()); + .unwrap_or_else(|| "http://127.0.0.1:50051".into()); let (kbd_tx, _) = broadcast::channel::(1024); let (mou_tx, _) = broadcast::channel::(4096); - let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); - agg.init()?; - Ok(Self { - keyboard: Some(kbd_aggregator), - mouse: Some(mou_aggregator), - server_addr: addr, - dev_mode, - tx, - }) + let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); + agg.init()?; // grab devices + + Ok(Self { aggregator: Some(agg), + server_addr, dev_mode, + kbd_tx, mou_tx }) } pub async fn run(&mut self) -> Result<()> { - info!("Running navka-client app!"); - /* ─ spawn the input thread ─ */ - let agg_task: JoinHandle> = - tokio::spawn(async move { self.aggregator.take().unwrap().run().await }); + /* detach the aggregator before spawn so `self` is not moved */ + let aggregator = self.aggregator.take().expect("InputAggregator present"); + let agg_task = tokio::spawn(async move { aggregator.run().await }); - /* ─ stream loops ─ */ + /* two networking tasks */ let kbd_loop = self.stream_loop_keyboard(); let mou_loop = self.stream_loop_mouse(); - /* ─ suicide timer for DEV mode ─ */ - let suicide_future = async { + /* optional suicide timer */ + let suicide = async { if self.dev_mode { - info!("DEV-mode: will kill itself in 30s"); tokio::time::sleep(Duration::from_secs(30)).await; - Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - 💀goodbye💀 cruel world...")) - } else { - futures::future::pending().await - } + warn!("dev‑mode timeout"); + std::process::exit(0); + } else { futures::future::pending::<()>().await } }; - // Combine aggregator + dev + reconnect logic tokio::select! { - // aggregator finishes - agg_res = aggregator_task => { - match agg_res { - Ok(Ok(())) => { - error!("Aggregator ended normally, exiting."); - std::process::exit(0); - } - Ok(Err(e)) => { - error!("Aggregator ended with error: {e}"); - std::process::exit(1); - } - Err(join_err) => { - error!("Aggregator task panicked or was cancelled: {join_err}"); - std::process::exit(1); - } - } - }, - // dev-mode - res = suicide_future => { - warn!("Dev-mode: {res:?}"); - std::process::exit(0); - }, - // reconnect loop - _ = self.reconnect_loop() => { - warn!("Reconnect loop ended??"); - std::process::exit(0); + _ = kbd_loop => unreachable!(), + _ = mou_loop => unreachable!(), + _ = suicide => unreachable!(), + // _ = suicide => { warn!("dev‑mode timeout"); std::process::exit(0) }, + r = agg_task => { + error!("aggregator task ended: {r:?}"); + std::process::exit(1) } } } + /*──────────────── keyboard stream ───────────────*/ + async fn stream_loop_keyboard(&self) { + loop { + info!("⌨️ connect {}", self.server_addr); + let mut cli = match RelayClient::connect(self.server_addr.clone()).await { + Ok(c) => c, + Err(e) => { error!("connect: {e}"); Self::delay().await; continue } + }; + + let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok()); + let resp = match cli.stream_keyboard(Request::new(outbound)).await { + Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); Self::delay().await; continue } + }; + + let mut inbound = resp.into_inner(); + while let Some(m) = inbound.message().await.transpose() { + match m { + Ok(r) => debug!("kbd echo {} B", r.data.len()), + Err(e) => { error!("kbd inbound: {e}"); break } + } + } + warn!("⌨️ disconnected"); + Self::delay().await; + } + } + + /*──────────────── mouse stream ──────────────────*/ + async fn stream_loop_mouse(&self) { + loop { + info!("🖱️ connect {}", self.server_addr); + let mut cli = match RelayClient::connect(self.server_addr.clone()).await { + Ok(c) => c, + Err(e) => { error!("connect: {e}"); Self::delay().await; continue } + }; + + let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok()); + let resp = match cli.stream_mouse(Request::new(outbound)).await { + Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); Self::delay().await; continue } + }; + + let mut inbound = resp.into_inner(); + while let Some(m) = inbound.message().await.transpose() { + match m { + Ok(r) => debug!("mouse echo {} B", r.data.len()), + Err(e) => { error!("mouse inbound: {e}"); break } + } + } + warn!("🖱️ disconnected"); + Self::delay().await; + } + } + #[inline(always)] async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; } - - /*──────────────────────────────── keyboard stream ─────────────────────────*/ - async fn stream_loop_keyboard(&self) { - loop { - info!("⌨️ dial {}", self.server_addr); - let mut cli = match RelayClient::connect(self.server_addr.clone()).await { - Ok(c) => c, Err(e) => { error!("connect: {e}"); delay().await; continue } - }; - let outbound = BroadcastStream::new(self.kbd_tx.subscribe()) - .filter_map(|r| r.ok()); - - info!("⌨️ start stream_keyboard()"); - let resp = match cli.stream_keyboard(Request::new(outbound)).await { - Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); delay().await; continue } - }; - - let mut inbound = resp.into_inner(); - while let Some(m) = inbound.message().await.transpose() { - match m { - Ok(r) => debug!("↩️ kbd echo {} B", r.data.len()), - Err(e) => { error!("kbd inbound: {e}"); break } - } - } - warn!("⌨️ disconnected, retry"); - delay().await; - } - } - - /*──────────────────────────────── mouse stream ────────────────────────────*/ - async fn stream_loop_mouse(&self) { - loop { - info!("🖱️ dial {}", self.server_addr); - let mut cli = match RelayClient::connect(self.server_addr.clone()).await { - Ok(c) => c, Err(e) => { error!("connect: {e}"); delay().await; continue } - }; - let outbound = BroadcastStream::new(self.mou_tx.subscribe()) - .filter_map(|r| r.ok()); - - info!("🖱️ start stream_mouse()"); - let resp = match cli.stream_mouse(Request::new(outbound)).await { - Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); delay().await; continue } - }; - - let mut inbound = resp.into_inner(); - while let Some(m) = inbound.message().await.transpose() { - match m { - Ok(r) => debug!("↩️ mouse echo {} B", r.data.len()), - Err(e) => { error!("mouse inbound: {e}"); break } - } - } - warn!("🖱️ disconnected, retry"); - delay().await; - } - } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index 5cc147f..1b5b251 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -2,46 +2,31 @@ use anyhow::{bail, Context, Result}; use evdev::{Device, EventType, KeyCode, RelativeAxisCode}; -use tokio::time::{interval, Duration}; -use tokio::sync::broadcast::Sender; +use tokio::{sync::broadcast::Sender, time::{interval, Duration}}; use tracing::{debug, info}; -use navka_common::navka::HidReport; +use navka_common::navka::{KeyboardReport, MouseReport}; -use crate::input::keyboard::KeyboardAggregator; -use crate::input::mouse::MouseAggregator; -use crate::input::camera::CameraCapture; -use crate::input::microphone::MicrophoneCapture; +use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator, + camera::CameraCapture, microphone::MicrophoneCapture}; -/// A top-level aggregator that enumerates /dev/input/event*, -/// classifies them as keyboard vs. mouse vs. other, -/// spawns specialized aggregator objects, and can also -/// create stubs for camera/microphone logic if needed. pub struct InputAggregator { - kbd_tx: Sender, - mou_tx: Sender, - pub dev_mode: bool, - keyboards: Vec, - mice: Vec, - camera: Option, - mic: Option, + kbd_tx: Sender, + mou_tx: Sender, + dev_mode: bool, + keyboards: Vec, + mice: Vec, + camera: Option, + mic: Option, } impl InputAggregator { - pub fn new( - dev_mode: bool, - kbd_tx: Sender, - mou_tx: Sender, - ) -> Self { - Self { - kbd_tx, - mou_tx, - dev_mode, - keyboards: Vec::new(), - mice: Vec::new(), - camera: None, - mic: None, - } + pub fn new(dev_mode: bool, + kbd_tx: Sender, + mou_tx: Sender) -> Self { + Self { kbd_tx, mou_tx, dev_mode, + keyboards: Vec::new(), mice: Vec::new(), + camera: None, mic: None } } /// Called once at startup: enumerates input devices, @@ -89,7 +74,7 @@ impl InputAggregator { dev.grab().with_context(|| format!("grabbing mouse {path:?}"))?; info!("Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN")); - // let mouse_agg = MouseAggregator::new(dev);np + // let mouse_agg = MouseAggregator::new(dev); let mouse_agg = MouseAggregator::new(dev, self.dev_mode, self.mou_tx.clone()); self.mice.push(mouse_agg); found_any = true; diff --git a/client/src/input/keyboard.rs b/client/src/input/keyboard.rs index 86267d0..84b71d9 100644 --- a/client/src/input/keyboard.rs +++ b/client/src/input/keyboard.rs @@ -1,99 +1,78 @@ // client/src/input/keyboard.rs use std::collections::HashSet; -use evdev::{Device, InputEvent, KeyCode, EventType}; +use evdev::{Device, EventType, InputEvent, KeyCode}; use tokio::sync::broadcast::Sender; -use tracing::{warn, error, info, debug}; +use tracing::{debug, error, info, warn}; use navka_common::navka::KeyboardReport; -use crate::input::keymap::{keycode_to_usage, is_modifier}; +use super::keymap::{is_modifier, keycode_to_usage}; -/// The aggregator logic for a single keyboard device. pub struct KeyboardAggregator { dev: Device, - tx: Sender, + tx: Sender, dev_mode: bool, pressed_keys: HashSet, } impl KeyboardAggregator { pub fn new(dev: Device, dev_mode: bool, tx: Sender) -> Self { - let _ = dev.set_nonblocking(true); - Self { - dev, - tx, - dev_mode, - pressed_keys: HashSet::new(), - } + let _ = dev.set_nonblocking(true); + Self { dev, tx, dev_mode, pressed_keys: HashSet::new() } } - #[inline] - fn dev_log(&self, record: impl FnOnce()) { - if self.dev_mode { - record(); - } - } - - /// Called frequently (e.g. every ~10ms) to fetch + handle events pub fn process_events(&mut self) { - let events: Vec = match self.dev.fetch_events() { + // --- first fetch, then log (avoids aliasing borrow) --- + let result = self.dev.fetch_events(); + let events: Vec = match result { Ok(it) => it.collect(), Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return, - Err(e) => { self.d(|| error!("read err: {e}")); return } + Err(e) => { if self.dev_mode { error!("read error: {e}"); } return } }; if self.dev_mode && !events.is_empty() { - self.d(|| info!("{} kbd evts from {}", events.len(), - self.dev.name().unwrap_or("???"))); + info!("{} kbd evts from {}", events.len(), self.dev.name().unwrap_or("?")); } for ev in events { if ev.event_type() != EventType::KEY { continue } let code = KeyCode::new(ev.code()); match ev.value() { - 1 => { self.pressed.insert(code); } // press - 0 => { self.pressed.remove(&code); } // release + 1 => { self.pressed_keys.insert(code); } // press + 0 => { self.pressed_keys.remove(&code); } // release _ => {} } + let report = self.build_report(); - self.d(|| debug!(?report, "kbd report")); - self.send_report(report); - if self.magic() { warn!("🧙 magic chord, exiting 🪄 AVADA KEDAVRA!!! 💥💀"); std::process::exit(0) } + if self.dev_mode { debug!(?rep, "kbd"); } + let _ = self.tx.send(KeyboardReport { data: report.to_vec() }); + + if self.is_magic() { + warn!("🧙 magic chord – exiting 🪄 AVADA KEDAVRA!!! 💥💀"); + std::process::exit(0); + } } - } + } fn build_report(&self) -> [u8; 8] { let mut out = [0u8; 8]; let mut mods = 0u8; let mut keys = Vec::new(); - for &kc in &self.pressed { - if let Some(m) = is_modifier(kc) { mods |= m } + for &kc in &self.pressed_keys { + if let Some(m) = is_modifier(kc) { mods |= m } else if let Some(u) = keycode_to_usage(kc) { keys.push(u) } } + out[0] = mods; for (i, k) in keys.into_iter().take(6).enumerate() { out[2+i] = k } out } - fn send_report(&self, report: [u8; 8]) { - let msg = KeyboardReport { data: report.to_vec() } - - match self.tx.send(msg.clone()) { - Ok(n) => { - self.dev_log(|| info!("📤 sent HID report → {n} subscriber(s)")); - } - Err(e) => { - self.dev_log(|| warn!("❌ send failed: {e}")); - let _ = self.tx.send(msg); - } - } - } - #[inline] - fn magic(&self) -> bool { - self.pressed.contains(&KeyCode::KEY_LEFTCTRL) - && self.pressed.contains(&KeyCode::KEY_ESC) + fn is_magic(&self) -> bool { + self.pressed_keys.contains(&KeyCode::KEY_LEFTCTRL) + && self.pressed_keys.contains(&KeyCode::KEY_ESC) } } diff --git a/client/src/input/mouse.rs b/client/src/input/mouse.rs index 8ced1de..539f1fe 100644 --- a/client/src/input/mouse.rs +++ b/client/src/input/mouse.rs @@ -1,22 +1,20 @@ // client/src/input/mouse.rs -use anyhow::Result; -use evdev::{Device, InputEvent, EventType, KeyCode, RelativeAxisCode}; +use evdev::{Device, EventType, InputEvent, KeyCode, RelativeAxisCode}; use tokio::sync::broadcast::{self, Sender}; use tracing::{debug, error, warn}; use navka_common::navka::MouseReport; -/// Aggregator for a single mouse device pub struct MouseAggregator { dev: Device, - tx: Sender, + tx: Sender, dev_mode: bool, buttons: u8, last_buttons: u8, - dx: i8, - dy: i8, + dx: i8, + dy: i8, wheel: i8, } @@ -25,102 +23,65 @@ impl MouseAggregator { Self { dev, tx, dev_mode, buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 } } - #[inline] - fn dev_log(&self, record: impl FnOnce()) { - if self.dev_mode { - record(); - } - } - - #[inline] - fn set_btn(&mut self, bit: u8, val: i32) { - if val != 0 { - self.buttons |= 1 << bit; - } else { - self.buttons &= !(1 << bit); - } - } + #[inline] fn slog(&self, f: impl FnOnce()) { if self.dev_mode { f() } } pub fn process_events(&mut self) { - /* 1 ─ read a non‑blocking batch */ - let events: Vec = match self.dev.fetch_events() { + let result = self.dev.fetch_events(); + let evts: Vec = match result { Ok(it) => it.collect(), - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => return, - Err(e) => { if self.dev_mode { error!("🖱️ read error: {e}"); } return; } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return, + Err(e) => { if self.dev_mode { error!("mouse read err: {e}"); } return } }; - if self.dev_mode && !events.is_empty() { - self.dev_log(|| debug!("🖱️ {} events from {}", events.len(), - self.dev.name().unwrap_or("UNKNOWN"))); + if self.dev_mode && !evts.is_empty() { + debug!("🖱️ {} evts from {}", evts.len(), self.dev.name().unwrap_or("?")); } - /* 2 ─ aggregate */ - for ev in events { - match ev.event_type() { - /* ---------- buttons ---------- */ - EventType::KEY => match ev.code() { - c if c == KeyCode::BTN_LEFT.0 => self.set_btn(0, ev.value()), - c if c == KeyCode::BTN_RIGHT.0 => self.set_btn(1, ev.value()), - c if c == KeyCode::BTN_MIDDLE.0 => self.set_btn(2, ev.value()), + for e in evts { + match e.event_type() { + EventType::KEY => match e.code() { + c if c == KeyCode::BTN_LEFT.0 => self.set_btn(0, e.value()), + c if c == KeyCode::BTN_RIGHT.0 => self.set_btn(1, e.value()), + c if c == KeyCode::BTN_MIDDLE.0 => self.set_btn(2, e.value()), _ => {} }, - - /* ----------- axes ------------ */ - EventType::RELATIVE => match ev.code() { - c if c == RelativeAxisCode::REL_X.0 => { - self.dx = self.dx.saturating_add(ev.value().clamp(-127, 127) as i8); - } - c if c == RelativeAxisCode::REL_Y.0 => { - self.dy = self.dy.saturating_add(ev.value().clamp(-127, 127) as i8); - } - c if c == RelativeAxisCode::REL_WHEEL.0 => { - self.wheel = self.wheel.saturating_add(ev.value().clamp(-1, 1) as i8); - } + EventType::RELATIVE => match e.code() { + c if c == RelativeAxisCode::REL_X.0 => + self.dx = self.dx.saturating_add(e.value().clamp(-127,127) as i8), + c if c == RelativeAxisCode::REL_Y.0 => + self.dy = self.dy.saturating_add(e.value().clamp(-127,127) as i8), + c if c == RelativeAxisCode::REL_WHEEL.0 => + self.wheel = self.wheel.saturating_add(e.value().clamp(-1,1) as i8), _ => {} }, - - /* ---- batch delimiter -------- */ - EventType::SYNCHRONIZATION => { - // Any sync event is fine – we only care about boundaries - self.flush_report(); - }, - + EventType::SYNCHRONIZATION => self.flush(), _ => {} } } } - /// Build & send HID packet, then clear deltas - fn flush_report(&mut self) { - /* Nothing changed ⇒ nothing to send */ - if self.dx == 0 - && self.dy == 0 - && self.wheel == 0 - && self.buttons == self.last_buttons - { + fn flush(&mut self) { + if self.dx==0 && self.dy==0 && self.wheel==0 && self.buttons==self.last_buttons { return; } - let report = [ + let pkt = [ self.buttons, - self.dx.clamp(-127, 127) as u8, - self.dy.clamp(-127, 127) as u8, + self.dx.clamp(-127,127) as u8, + self.dy.clamp(-127,127) as u8, self.wheel as u8, ]; - /* broadcast — this is non‑blocking just like `try_send` on mpsc */ - if let Err(broadcast::error::SendError(_)) = self.tx.send(MouseReport { - report.to_vec() - }) { - self.dev_log(|| warn!("❌ no HID receiver (mouse)")); - } else { - self.dev_log(|| debug!("📤 HID mouse {:?}", report)); - } + if let Err(broadcast::error::SendError(_)) = + self.tx.send(MouseReport { data: pkt.to_vec() }) + { if self.dev_mode { warn!("❌ no HID receiver (mouse)"); } } + else if self.dev_mode { debug!("📤 mouse {:?}", pkt) } - /* reset deltas for next frame */ - self.dx = 0; - self.dy = 0; - self.wheel = 0; - self.last_buttons = self.buttons; + self.dx=0; self.dy=0; self.wheel=0; self.last_buttons=self.buttons; + } + + #[inline] fn set_btn(&mut self, bit: u8, val: i32) { + if val!=0 { self.buttons |= 1<>, - ms: Arc>, + kb: Arc>, + ms: Arc>, } #[tonic::async_trait] @@ -35,9 +34,9 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); - while let Some(r) = s.message().await.transpose()? { - kb.lock().await.write_all(&r.data).await?; - tx.send(Ok(r)).await.ok(); // best-effort echo + while let Some(pkt) = s.next().await.transpose()? { + kb.lock().await.write_all(&pkt.data).await?; + tx.send(Ok(pkt)).await;//.ok(); // best-effort echo } Ok::<(), Status>(()) }); @@ -54,9 +53,9 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); - while let Some(r) = s.message().await.transpose()? { - ms.lock().await.write_all(&r.data).await?; - tx.send(Ok(r)).await.ok(); + while let Some(pkt) = s.next().await.transpose()? { + ms.lock().await.write_all(&pkt.data).await?; + tx.send(Ok(pkt)).await;//.ok(); } Ok::<(), Status>(()) }); @@ -69,31 +68,33 @@ impl Relay for Handler { async fn main() -> anyhow::Result<()> { fmt().with_env_filter( // honour RUST_LOG but fall back to very chatty defaults - EnvFilter::try_from_default_env().unwrap_or_else(|_| { + EnvFilter::try_from_default_env().unwrap_or_else(|_| //{ EnvFilter::new( - "navka_client=trace,\ - navka_server=trace,\ - tonic=debug,\ - h2=debug,\ - tower=debug", + // "navka_client=trace,\ + // navka_server=trace,\ + // tonic=debug,\ + // h2=debug,\ + // tower=debug", + "navka_server=info" ) - }), + //} + ), ) - .with_target(true) - .with_thread_ids(true) - .with_file(true) + // .with_target(true) + // .with_thread_ids(true) + // .with_file(true) .init(); let kb = OpenOptions::new() .write(true) - .read(true) + // .read(true) .custom_flags(libc::O_NONBLOCK) .open("/dev/hidg0") .await?; let ms = OpenOptions::new() .write(true) - .read(true) + // .read(true) .custom_flags(libc::O_NONBLOCK) .open("/dev/hidg1") .await?;