From a56a3be347004a3ba5aface30c48f0fdc02afa2a Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 17 Jun 2025 08:17:23 -0500 Subject: [PATCH] pre-rewrite --- client/src/app.rs | 125 ++++++++++++++++++++--------------- client/src/input/inputs.rs | 30 +++++---- client/src/input/keyboard.rs | 105 +++++++++-------------------- client/src/input/keymap.rs | 30 ++++----- client/src/input/mouse.rs | 24 ++----- common/proto/navka.proto | 15 ++--- scripts/install-client.sh | 2 +- server/src/main.rs | 110 +++++++++--------------------- 8 files changed, 181 insertions(+), 260 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 5a749b3..655f677 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -7,31 +7,36 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tonic::Request; use tracing::{info, warn, error, debug}; -use navka_common::navka::{relay_client::RelayClient, HidReport}; -use crate::input::inputs::InputAggregator; +use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport,}; + +use input::inputs::InputAggregator; + pub struct NavkaClientApp { aggregator: Option, server_addr: String, dev_mode: bool, - tx: broadcast::Sender, + kbd_tx: broadcast::Sender, + mou_tx: broadcast::Sender, } impl NavkaClientApp { pub fn new() -> Result { info!("Creating navka-client app!"); - let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok(); - let addr = std::env::args() + let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok(); + let server = std::env::args() .nth(1) .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".to_owned()); - let (tx, _) = broadcast::channel::(2048); // πŸ–±οΈ + ⌨️ burst‑proof - let mut aggregator = InputAggregator::new(dev_mode, tx.clone()); - aggregator.init()?; // discover & grab + let (kbd_tx, _) = broadcast::channel::(1024); + let (mou_tx, _) = broadcast::channel::(4096); + let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); + agg.init()?; Ok(Self { - aggregator: Some(aggregator), + keyboard: Some(kbd_aggregator), + mouse: Some(mou_aggregator), server_addr: addr, dev_mode, tx, @@ -40,18 +45,20 @@ impl NavkaClientApp { pub async fn run(&mut self) -> Result<()> { info!("Running navka-client app!"); - // spawn aggregator - let mut aggregator = std::mem::take(&mut self.aggregator).expect("aggregator must exist"); - let aggregator_task: JoinHandle> = tokio::spawn(async move { - aggregator.run().await - }); + /* ─ spawn the input thread ─ */ + let agg_task: JoinHandle> = + tokio::spawn(async move { self.aggregator.take().unwrap().run().await }); - // dev-mode kill future + /* ─ stream loops ─ */ + let kbd_loop = self.stream_loop_keyboard(); + let mou_loop = self.stream_loop_mouse(); + + /* ─ suicide timer for DEV mode ─ */ let suicide_future = async { if self.dev_mode { info!("DEV-mode: will kill itself in 30s"); tokio::time::sleep(Duration::from_secs(30)).await; - Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - goodbye cruel world...")) + Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - πŸ’€goodbyeπŸ’€ cruel world...")) } else { futures::future::pending().await } @@ -89,48 +96,60 @@ impl NavkaClientApp { } } - /// The loop that dials the server, sets up gRPC streaming, - /// and waits for inbound to end. Then tries again, unless aggregator ended. - async fn reconnect_loop(&self) { + #[inline(always)] + async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; } + + /*──────────────────────────────── keyboard stream ─────────────────────────*/ + async fn stream_loop_keyboard(&self) { loop { - // dial the servers - info!("πŸ“ž dialling {}", self.server_addr); - let mut client = match RelayClient::connect(self.server_addr.clone()).await { - Ok(c) => c, - Err(e) => { - error!("connect error {e}, sleeping 1s"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - } + info!("⌨️ dial {}", self.server_addr); + let mut cli = match RelayClient::connect(self.server_addr.clone()).await { + Ok(c) => c, Err(e) => { error!("connect: {e}"); delay().await; continue } + }; + let outbound = BroadcastStream::new(self.kbd_tx.subscribe()) + .filter_map(|r| r.ok()); + + info!("⌨️ start stream_keyboard()"); + let resp = match cli.stream_keyboard(Request::new(outbound)).await { + Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); delay().await; continue } }; - // fresh reader over the *same* broadcast channel - let outbound = BroadcastStream::new(self.tx.subscribe()).filter_map(|r| r.ok()); - - info!("πŸ›« spawning stream()"); - let response = match client.stream(Request::new(outbound)).await { - Ok(r) => { info!("βœ… stream established"); r }, - Err(e) => { - error!("stream RPC error: {e}, sleeping 1s"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; - } - }; - - let mut inbound = response.into_inner(); - while let Some(res) = inbound.message().await.transpose() { - match res { - Ok(report) => { - debug!(?report.kind, "↩️ echo from server"); - }, - Err(e) => { - error!("Inbound error: {e}"); - break; - } + let mut inbound = resp.into_inner(); + while let Some(m) = inbound.message().await.transpose() { + match m { + Ok(r) => debug!("↩️ kbd echo {}β€―B", r.data.len()), + Err(e) => { error!("kbd inbound: {e}"); break } } } - warn!("πŸ”Œ disconnected – retrying in 1β€―s"); - tokio::time::sleep(Duration::from_secs(1)).await; + warn!("⌨️ disconnected, retry"); + delay().await; + } + } + + /*──────────────────────────────── mouse stream ────────────────────────────*/ + async fn stream_loop_mouse(&self) { + loop { + info!("πŸ–±οΈ dial {}", self.server_addr); + let mut cli = match RelayClient::connect(self.server_addr.clone()).await { + Ok(c) => c, Err(e) => { error!("connect: {e}"); delay().await; continue } + }; + let outbound = BroadcastStream::new(self.mou_tx.subscribe()) + .filter_map(|r| r.ok()); + + info!("πŸ–±οΈ start stream_mouse()"); + let resp = match cli.stream_mouse(Request::new(outbound)).await { + Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); delay().await; continue } + }; + + let mut inbound = resp.into_inner(); + while let Some(m) = inbound.message().await.transpose() { + match m { + Ok(r) => debug!("↩️ mouse echo {}β€―B", r.data.len()), + Err(e) => { error!("mouse inbound: {e}"); break } + } + } + warn!("πŸ–±οΈ disconnected, retry"); + delay().await; } } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index b9ab8c2..5cc147f 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -18,7 +18,8 @@ use crate::input::microphone::MicrophoneCapture; /// spawns specialized aggregator objects, and can also /// create stubs for camera/microphone logic if needed. pub struct InputAggregator { - tx: Sender, + kbd_tx: Sender, + mou_tx: Sender, pub dev_mode: bool, keyboards: Vec, mice: Vec, @@ -27,15 +28,20 @@ pub struct InputAggregator { } impl InputAggregator { - pub fn new(dev_mode: bool, tx: Sender) -> Self { - Self { - tx, - dev_mode, - keyboards: Vec::new(), - mice: Vec::new(), - camera: None, - mic: None, - } + pub fn new( + dev_mode: bool, + kbd_tx: Sender, + mou_tx: Sender, + ) -> Self { + Self { + kbd_tx, + mou_tx, + dev_mode, + keyboards: Vec::new(), + mice: Vec::new(), + camera: None, + mic: None, + } } /// Called once at startup: enumerates input devices, @@ -74,7 +80,7 @@ impl InputAggregator { // pass dev_mode to aggregator // let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode); - let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode, self.tx.clone()); + let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode, self.kbd_tx.clone()); self.keyboards.push(kbd_agg); found_any = true; continue; @@ -84,7 +90,7 @@ impl InputAggregator { info!("Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN")); // let mouse_agg = MouseAggregator::new(dev);np - let mouse_agg = MouseAggregator::new(dev, self.dev_mode, self.tx.clone()); + let mouse_agg = MouseAggregator::new(dev, self.dev_mode, self.mou_tx.clone()); self.mice.push(mouse_agg); found_any = true; continue; diff --git a/client/src/input/keyboard.rs b/client/src/input/keyboard.rs index a2dd640..86267d0 100644 --- a/client/src/input/keyboard.rs +++ b/client/src/input/keyboard.rs @@ -5,20 +5,20 @@ use evdev::{Device, InputEvent, KeyCode, EventType}; use tokio::sync::broadcast::Sender; use tracing::{warn, error, info, debug}; -use navka_common::navka::{HidReport, hid_report}; +use navka_common::navka::KeyboardReport; use crate::input::keymap::{keycode_to_usage, is_modifier}; /// The aggregator logic for a single keyboard device. pub struct KeyboardAggregator { dev: Device, - tx: Sender, + tx: Sender, dev_mode: bool, pressed_keys: HashSet, } impl KeyboardAggregator { - pub fn new(dev: Device, dev_mode: bool, tx: Sender) -> Self { + pub fn new(dev: Device, dev_mode: bool, tx: Sender) -> Self { let _ = dev.set_nonblocking(true); Self { dev, @@ -37,92 +37,48 @@ impl KeyboardAggregator { /// Called frequently (e.g. every ~10ms) to fetch + handle events pub fn process_events(&mut self) { - // Fetch once. Any borrow of `self.dev` ends right here. let events: Vec = match self.dev.fetch_events() { - Ok(iter) => iter.collect(), - - // Would-block β†’ nothing new - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => return, - - // Any other error β†’ log without touching `self.dev` - Err(e) => { - if self.dev_mode { - error!("Keyboard device read error: {e}"); - } - return; - } + Ok(it) => it.collect(), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return, + Err(e) => { self.d(|| error!("read err: {e}")); return } }; - - // Safe: no mutable borrow is active now + if self.dev_mode && !events.is_empty() { - self.dev_log(|| info!( - "Got {} events from dev: {}", - events.len(), - self.dev.name().unwrap_or("???") - )); + self.d(|| info!("{} kbd evts from {}", events.len(), + self.dev.name().unwrap_or("???"))); } - + for ev in events { - self.handle_event(ev); - } - } - - fn handle_event(&mut self, ev: InputEvent) { - if ev.event_type() == EventType::KEY { + if ev.event_type() != EventType::KEY { continue } let code = KeyCode::new(ev.code()); - let val = ev.value(); // 1 = press, 0 = release, 2 = repeat - - if self.dev_mode { - self.dev_log(|| info!( - "Keyboard event: code={:?}, value={}, name={:?}", - code, val, self.dev.name() - )); - } - - match val { - 1 => {self.pressed_keys.insert(code);} - 0 => {self.pressed_keys.remove(&code);} - 2 => {/* repeat */} + match ev.value() { + 1 => { self.pressed.insert(code); } // press + 0 => { self.pressed.remove(&code); } // release _ => {} } - let report = self.build_report(); - // TODO: send this somewhere (e.g. an mpsc::Sender) - // For now, just log: - self.dev_log(|| debug!(?report, "Keyboard HID report")); + self.d(|| debug!(?report, "kbd report")); self.send_report(report); - if self.is_magic_chord() { - self.dev_log(|| warn!("Magic chord pressed => πŸ§™β€β™‚οΈπŸͺ„ AVADA KEDAVRA!!! πŸ’₯πŸ’€")); - std::process::exit(0); - } + if self.magic() { warn!("πŸ§™ magic chord, exiting πŸͺ„ AVADA KEDAVRA!!! πŸ’₯πŸ’€"); std::process::exit(0) } } - } + } fn build_report(&self) -> [u8; 8] { - let mut bytes = [0u8; 8]; + let mut out = [0u8; 8]; + let mut mods = 0u8; + let mut keys = Vec::new(); - let mut normal_keys = Vec::new(); - let mut modifiers = 0u8; - - for &kc in &self.pressed_keys { - if let Some(m) = is_modifier(kc) { - modifiers |= m; - } else if let Some(usage) = keycode_to_usage(kc) { - normal_keys.push(usage); - } + for &kc in &self.pressed { + if let Some(m) = is_modifier(kc) { mods |= m } + else if let Some(u) = keycode_to_usage(kc) { keys.push(u) } } - - bytes[0] = modifiers; - for (i, keycode) in normal_keys.into_iter().take(6).enumerate() { - bytes[2 + i] = keycode; - } - bytes + out[0] = mods; + for (i, k) in keys.into_iter().take(6).enumerate() { out[2+i] = k } + out } fn send_report(&self, report: [u8; 8]) { - let msg = HidReport { - kind: Some(hid_report::Kind::KeyboardReport(report.to_vec())), - }; + let msg = KeyboardReport { data: report.to_vec() } match self.tx.send(msg.clone()) { Ok(n) => { @@ -135,8 +91,9 @@ impl KeyboardAggregator { } } - fn is_magic_chord(&self) -> bool { - self.pressed_keys.contains(&KeyCode::KEY_LEFTCTRL) - && self.pressed_keys.contains(&KeyCode::KEY_ESC) + #[inline] + fn magic(&self) -> bool { + self.pressed.contains(&KeyCode::KEY_LEFTCTRL) + && self.pressed.contains(&KeyCode::KEY_ESC) } } diff --git a/client/src/input/keymap.rs b/client/src/input/keymap.rs index 08683bc..3df59b7 100644 --- a/client/src/input/keymap.rs +++ b/client/src/input/keymap.rs @@ -63,6 +63,21 @@ pub fn keycode_to_usage(key: KeyCode) -> Option { KeyCode::KEY_DOT => Some(0x37), KeyCode::KEY_SLASH => Some(0x38), + // --- Function keys ------------------------------------------------ + KeyCode::KEY_CAPSLOCK => Some(0x39), + KeyCode::KEY_F1 => Some(0x3A), + KeyCode::KEY_F2 => Some(0x3B), + KeyCode::KEY_F3 => Some(0x3C), + KeyCode::KEY_F4 => Some(0x3D), + KeyCode::KEY_F5 => Some(0x3E), + KeyCode::KEY_F6 => Some(0x3F), + KeyCode::KEY_F7 => Some(0x40), + KeyCode::KEY_F8 => Some(0x41), + KeyCode::KEY_F9 => Some(0x42), + KeyCode::KEY_F10 => Some(0x43), + KeyCode::KEY_F11 => Some(0x44), + KeyCode::KEY_F12 => Some(0x45), + // --- Navigation / editing cluster --------------------------------- KeyCode::KEY_SYSRQ => Some(0x46), // Print‑Screen KeyCode::KEY_SCROLLLOCK => Some(0x47), @@ -98,21 +113,6 @@ pub fn keycode_to_usage(key: KeyCode) -> Option { KeyCode::KEY_KPDOT => Some(0x63), KeyCode::KEY_KPEQUAL => Some(0x67), - // --- Function keys ------------------------------------------------ - KeyCode::KEY_CAPSLOCK => Some(0x39), - KeyCode::KEY_F1 => Some(0x3A), - KeyCode::KEY_F2 => Some(0x3B), - KeyCode::KEY_F3 => Some(0x3C), - KeyCode::KEY_F4 => Some(0x3D), - KeyCode::KEY_F5 => Some(0x3E), - KeyCode::KEY_F6 => Some(0x3F), - KeyCode::KEY_F7 => Some(0x40), - KeyCode::KEY_F8 => Some(0x41), - KeyCode::KEY_F9 => Some(0x42), - KeyCode::KEY_F10 => Some(0x43), - KeyCode::KEY_F11 => Some(0x44), - KeyCode::KEY_F12 => Some(0x45), - // --- Misc --------------------------------------------------------- KeyCode::KEY_102ND => Some(0x64), // β€œ<>” on ISO boards KeyCode::KEY_MENU => Some(0x65), // Application / Compose diff --git a/client/src/input/mouse.rs b/client/src/input/mouse.rs index 57edbfc..8ced1de 100644 --- a/client/src/input/mouse.rs +++ b/client/src/input/mouse.rs @@ -5,12 +5,12 @@ use evdev::{Device, InputEvent, EventType, KeyCode, RelativeAxisCode}; use tokio::sync::broadcast::{self, Sender}; use tracing::{debug, error, warn}; -use navka_common::navka::{HidReport, hid_report}; +use navka_common::navka::MouseReport; /// Aggregator for a single mouse device pub struct MouseAggregator { dev: Device, - tx: Sender, + tx: Sender, dev_mode: bool, buttons: u8, @@ -21,18 +21,8 @@ pub struct MouseAggregator { } impl MouseAggregator { - pub fn new(dev: Device, dev_mode: bool, tx: Sender) -> Self { - Self { - dev, - tx, - dev_mode, - - buttons: 0, - last_buttons: 0, - dx: 0, - dy: 0, - wheel: 0, - } + pub fn new(dev: Device, dev_mode: bool, tx: Sender) -> Self { + Self { dev, tx, dev_mode, buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 } } #[inline] @@ -93,7 +83,7 @@ impl MouseAggregator { EventType::SYNCHRONIZATION => { // Any sync event is fine – we only care about boundaries self.flush_report(); - } + }, _ => {} } @@ -119,8 +109,8 @@ impl MouseAggregator { ]; /* broadcast β€” this is non‑blocking just like `try_send` on mpsc */ - if let Err(broadcast::error::SendError(_)) = self.tx.send(HidReport { - kind: Some(hid_report::Kind::MouseReport(report.to_vec())), + if let Err(broadcast::error::SendError(_)) = self.tx.send(MouseReport { + report.to_vec() }) { self.dev_log(|| warn!("❌ no HID receiver (mouse)")); } else { diff --git a/common/proto/navka.proto b/common/proto/navka.proto index 3eeea2e..8ce78c4 100644 --- a/common/proto/navka.proto +++ b/common/proto/navka.proto @@ -1,14 +1,11 @@ syntax = "proto3"; package navka; -message HidReport { - oneof kind { - bytes keyboard_report = 1; // exactly 8 bytes - bytes mouse_report = 2; // exactly 4 bytes (btn, dx, dy, wheel) - } -} - +// smaller, fixed-size payloads β‡’ less allocation and simpler decoding +message KeyboardReport { bytes data = 1; } // exactly 8 bytes +message MouseReport { bytes data = 1; } // exactly 4 bytes service Relay { - rpc Stream (stream HidReport) returns (stream HidReport); -} + rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport); + rpc StreamMouse (stream MouseReport) returns (stream MouseReport); +} \ No newline at end of file diff --git a/scripts/install-client.sh b/scripts/install-client.sh index be3c0a2..53e46a6 100755 --- a/scripts/install-client.sh +++ b/scripts/install-client.sh @@ -20,7 +20,7 @@ else fi # 4. build -sudo -u "$ORIG_USER" bash -c "cd '$SRC/client' && cargo build --release" +sudo -u "$ORIG_USER" bash -c "cd '$SRC/client' && cargo clean && cargo build --release" # 5. install binary sudo install -Dm755 "$SRC/client/target/release/navka-client" /usr/local/bin/navka-client diff --git a/server/src/main.rs b/server/src/main.rs index be352b4..fdc213d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -23,93 +23,45 @@ struct Handler { #[tonic::async_trait] impl Relay for Handler { - type StreamStream = - Pin> + Send + 'static>>; + type StreamKeyboardStream = ReceiverStream>; + type StreamMouseStream = ReceiverStream>; - async fn stream( + async fn stream_keyboard( &self, - request: Request>, - ) -> Result, Status> { - info!("▢️ new client stream from {:?}", request.remote_addr()); - let mut in_stream = request.into_inner(); + req: Request>, + ) -> Result, Status> { + let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); - let ms = self.ms.clone(); - let (tx, rx) = tokio::sync::mpsc::channel::>(32); tokio::spawn(async move { - // catch panics so that they are logged instead of killing the task silently - let task = AssertUnwindSafe(async move { - // perpetually read client β†’ server messages - while let Some(res) = in_stream.next().await { - match res { - /* ──────────────── message received ──────────────── */ - Ok(msg) => { - debug!("πŸ“₯ recv {:?}", &msg.kind); // <‑‑ always log - - // 1. write to the right gadget --------------------------------- - let io_res = match &msg.kind { - Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => { - kb.lock().await.write_all(v).await.map(|_| "⌨️ β†’ /dev/hidg0 (8β€―B)") - } - Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => { - ms.lock().await.write_all(v).await.map(|_| "πŸ–±οΈ β†’ /dev/hidg1 (4β€―B)") - } - _ => { - error!(?msg.kind, "⚠️ malformed packet"); - continue; // skip echo - } - }; - - // 2. I/O result ------------------------------------------------- - match io_res { - Ok(msg_txt) => info!("{msg_txt}"), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - trace!("πŸ› gadget busy, dropped packet"); - continue; // skip echo - } - Err(e) => { - error!("write error: {e}"); - continue; // skip echo - } - } - - // 3. echo back (best‑effort) ----------------------------------- - if tx.try_send(Ok(msg)).is_err() { - trace!("↩️ echo buffer full – dropped"); - } - } - - /* ──────────────── benign back‑pressure error ──────────────── */ - Err(status) => { - // Tonic delivers back‑pressure as UNKNOWN / INTERNAL. - // They are *not* fatal for us – log & continue. - warn!("πŸ› gRPC back‑pressure: {status}"); - continue; // keep the stream alive - } - } - } - info!("πŸ”š client closed the upstream"); - Ok::<(), Status>(()) - }) - .catch_unwind() - .await; - - if let Err(panic) = task { - // print the panic payload – this is what killed the stream earlier - if let Some(s) = panic.downcast_ref::<&str>() { - error!("‼️ stream task panicked: {s}"); - } else if let Some(s) = panic.downcast_ref::() { - error!("‼️ stream task panicked: {s}"); - } else { - error!("‼️ stream task panicked with unknown payload"); - } + let mut s = req.into_inner(); + while let Some(r) = s.message().await.transpose()? { + kb.lock().await.write_all(&r.data).await?; + tx.send(Ok(r)).await.ok(); // best-effort echo } - info!("πŸ”š client closed the upstream"); + Ok::<(), Status>(()) }); - /* This is a **write‑only** stream – we keep it open forever. */ - use futures_util::stream::pending; - Ok(Response::new(Box::pin(pending::>()))) + Ok(Response::new(ReceiverStream::new(rx))) + } + + async fn stream_mouse( + &self, + req: Request>, + ) -> Result, Status> { + let (tx, rx) = tokio::sync::mpsc::channel(512); // higher burst + let ms = self.ms.clone(); + + tokio::spawn(async move { + let mut s = req.into_inner(); + while let Some(r) = s.message().await.transpose()? { + ms.lock().await.write_all(&r.data).await?; + tx.send(Ok(r)).await.ok(); + } + Ok::<(), Status>(()) + }); + + Ok(Response::new(ReceiverStream::new(rx))) } }