pre-rewrite

This commit is contained in:
Brad Stein 2025-06-17 08:17:23 -05:00
parent 3f1f6e4893
commit a56a3be347
8 changed files with 181 additions and 260 deletions

View File

@ -7,31 +7,36 @@ use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tonic::Request;
use tracing::{info, warn, error, debug};
use navka_common::navka::{relay_client::RelayClient, HidReport};
use crate::input::inputs::InputAggregator;
use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport,};
use input::inputs::InputAggregator;
pub struct NavkaClientApp {
aggregator: Option<InputAggregator>,
server_addr: String,
dev_mode: bool,
tx: broadcast::Sender<HidReport>,
kbd_tx: broadcast::Sender<KeyboardReport>,
mou_tx: broadcast::Sender<MouseReport>,
}
impl NavkaClientApp {
pub fn new() -> Result<Self> {
info!("Creating navka-client app!");
let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok();
let addr = std::env::args()
let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok();
let server = std::env::args()
.nth(1)
.or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok())
.unwrap_or_else(|| "http://127.0.0.1:50051".to_owned());
let (tx, _) = broadcast::channel::<HidReport>(2048); // 🖱️ + ⌨️ burstproof
let mut aggregator = InputAggregator::new(dev_mode, tx.clone());
aggregator.init()?; // discover & grab
let (kbd_tx, _) = broadcast::channel::<KeyboardReport>(1024);
let (mou_tx, _) = broadcast::channel::<MouseReport>(4096);
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
agg.init()?;
Ok(Self {
aggregator: Some(aggregator),
keyboard: Some(kbd_aggregator),
mouse: Some(mou_aggregator),
server_addr: addr,
dev_mode,
tx,
@ -40,18 +45,20 @@ impl NavkaClientApp {
pub async fn run(&mut self) -> Result<()> {
info!("Running navka-client app!");
// spawn aggregator
let mut aggregator = std::mem::take(&mut self.aggregator).expect("aggregator must exist");
let aggregator_task: JoinHandle<Result<()>> = tokio::spawn(async move {
aggregator.run().await
});
/* ─ spawn the input thread ─ */
let agg_task: JoinHandle<Result<()>> =
tokio::spawn(async move { self.aggregator.take().unwrap().run().await });
// dev-mode kill future
/* ─ stream loops ─ */
let kbd_loop = self.stream_loop_keyboard();
let mou_loop = self.stream_loop_mouse();
/* ─ suicide timer for DEV mode ─ */
let suicide_future = async {
if self.dev_mode {
info!("DEV-mode: will kill itself in 30s");
tokio::time::sleep(Duration::from_secs(30)).await;
Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - goodbye cruel world..."))
Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - 💀goodbye💀 cruel world..."))
} else {
futures::future::pending().await
}
@ -89,48 +96,60 @@ impl NavkaClientApp {
}
}
/// The loop that dials the server, sets up gRPC streaming,
/// and waits for inbound to end. Then tries again, unless aggregator ended.
async fn reconnect_loop(&self) {
#[inline(always)]
async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; }
/*──────────────────────────────── keyboard stream ─────────────────────────*/
async fn stream_loop_keyboard(&self) {
loop {
// dial the servers
info!("📞 dialling {}", self.server_addr);
let mut client = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => {
error!("connect error {e}, sleeping 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
info!("⌨️ dial {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c, Err(e) => { error!("connect: {e}"); delay().await; continue }
};
let outbound = BroadcastStream::new(self.kbd_tx.subscribe())
.filter_map(|r| r.ok());
info!("⌨️ start stream_keyboard()");
let resp = match cli.stream_keyboard(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); delay().await; continue }
};
// fresh reader over the *same* broadcast channel
let outbound = BroadcastStream::new(self.tx.subscribe()).filter_map(|r| r.ok());
info!("🛫 spawning stream()");
let response = match client.stream(Request::new(outbound)).await {
Ok(r) => { info!("✅ stream established"); r },
Err(e) => {
error!("stream RPC error: {e}, sleeping 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};
let mut inbound = response.into_inner();
while let Some(res) = inbound.message().await.transpose() {
match res {
Ok(report) => {
debug!(?report.kind, "↩️ echo from server");
},
Err(e) => {
error!("Inbound error: {e}");
break;
}
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => debug!("↩️ kbd echo {}B", r.data.len()),
Err(e) => { error!("kbd inbound: {e}"); break }
}
}
warn!("🔌 disconnected retrying in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
warn!("⌨️ disconnected, retry");
delay().await;
}
}
/*──────────────────────────────── mouse stream ────────────────────────────*/
async fn stream_loop_mouse(&self) {
loop {
info!("🖱️ dial {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c, Err(e) => { error!("connect: {e}"); delay().await; continue }
};
let outbound = BroadcastStream::new(self.mou_tx.subscribe())
.filter_map(|r| r.ok());
info!("🖱️ start stream_mouse()");
let resp = match cli.stream_mouse(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => debug!("↩️ mouse echo {}B", r.data.len()),
Err(e) => { error!("mouse inbound: {e}"); break }
}
}
warn!("🖱️ disconnected, retry");
delay().await;
}
}
}

View File

@ -18,7 +18,8 @@ use crate::input::microphone::MicrophoneCapture;
/// spawns specialized aggregator objects, and can also
/// create stubs for camera/microphone logic if needed.
pub struct InputAggregator {
tx: Sender<HidReport>,
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>,
pub dev_mode: bool,
keyboards: Vec<KeyboardAggregator>,
mice: Vec<MouseAggregator>,
@ -27,15 +28,20 @@ pub struct InputAggregator {
}
impl InputAggregator {
pub fn new(dev_mode: bool, tx: Sender<HidReport>) -> Self {
Self {
tx,
dev_mode,
keyboards: Vec::new(),
mice: Vec::new(),
camera: None,
mic: None,
}
pub fn new(
dev_mode: bool,
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>,
) -> Self {
Self {
kbd_tx,
mou_tx,
dev_mode,
keyboards: Vec::new(),
mice: Vec::new(),
camera: None,
mic: None,
}
}
/// Called once at startup: enumerates input devices,
@ -74,7 +80,7 @@ impl InputAggregator {
// pass dev_mode to aggregator
// let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode);
let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode, self.tx.clone());
let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode, self.kbd_tx.clone());
self.keyboards.push(kbd_agg);
found_any = true;
continue;
@ -84,7 +90,7 @@ impl InputAggregator {
info!("Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN"));
// let mouse_agg = MouseAggregator::new(dev);np
let mouse_agg = MouseAggregator::new(dev, self.dev_mode, self.tx.clone());
let mouse_agg = MouseAggregator::new(dev, self.dev_mode, self.mou_tx.clone());
self.mice.push(mouse_agg);
found_any = true;
continue;

View File

@ -5,20 +5,20 @@ use evdev::{Device, InputEvent, KeyCode, EventType};
use tokio::sync::broadcast::Sender;
use tracing::{warn, error, info, debug};
use navka_common::navka::{HidReport, hid_report};
use navka_common::navka::KeyboardReport;
use crate::input::keymap::{keycode_to_usage, is_modifier};
/// The aggregator logic for a single keyboard device.
pub struct KeyboardAggregator {
dev: Device,
tx: Sender<HidReport>,
tx: Sender<KeyboardReport>,
dev_mode: bool,
pressed_keys: HashSet<KeyCode>,
}
impl KeyboardAggregator {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<HidReport>) -> Self {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<KeyboardReport>) -> Self {
let _ = dev.set_nonblocking(true);
Self {
dev,
@ -37,92 +37,48 @@ impl KeyboardAggregator {
/// Called frequently (e.g. every ~10ms) to fetch + handle events
pub fn process_events(&mut self) {
// Fetch once. Any borrow of `self.dev` ends right here.
let events: Vec<InputEvent> = match self.dev.fetch_events() {
Ok(iter) => iter.collect(),
// Would-block → nothing new
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
// Any other error → log without touching `self.dev`
Err(e) => {
if self.dev_mode {
error!("Keyboard device read error: {e}");
}
return;
}
Ok(it) => it.collect(),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => { self.d(|| error!("read err: {e}")); return }
};
// Safe: no mutable borrow is active now
if self.dev_mode && !events.is_empty() {
self.dev_log(|| info!(
"Got {} events from dev: {}",
events.len(),
self.dev.name().unwrap_or("???")
));
self.d(|| info!("{} kbd evts from {}", events.len(),
self.dev.name().unwrap_or("???")));
}
for ev in events {
self.handle_event(ev);
}
}
fn handle_event(&mut self, ev: InputEvent) {
if ev.event_type() == EventType::KEY {
if ev.event_type() != EventType::KEY { continue }
let code = KeyCode::new(ev.code());
let val = ev.value(); // 1 = press, 0 = release, 2 = repeat
if self.dev_mode {
self.dev_log(|| info!(
"Keyboard event: code={:?}, value={}, name={:?}",
code, val, self.dev.name()
));
}
match val {
1 => {self.pressed_keys.insert(code);}
0 => {self.pressed_keys.remove(&code);}
2 => {/* repeat */}
match ev.value() {
1 => { self.pressed.insert(code); } // press
0 => { self.pressed.remove(&code); } // release
_ => {}
}
let report = self.build_report();
// TODO: send this somewhere (e.g. an mpsc::Sender)
// For now, just log:
self.dev_log(|| debug!(?report, "Keyboard HID report"));
self.d(|| debug!(?report, "kbd report"));
self.send_report(report);
if self.is_magic_chord() {
self.dev_log(|| warn!("Magic chord pressed => 🧙‍♂️🪄 AVADA KEDAVRA!!! 💥💀"));
std::process::exit(0);
}
if self.magic() { warn!("🧙 magic chord, exiting 🪄 AVADA KEDAVRA!!! 💥💀"); std::process::exit(0) }
}
}
}
fn build_report(&self) -> [u8; 8] {
let mut bytes = [0u8; 8];
let mut out = [0u8; 8];
let mut mods = 0u8;
let mut keys = Vec::new();
let mut normal_keys = Vec::new();
let mut modifiers = 0u8;
for &kc in &self.pressed_keys {
if let Some(m) = is_modifier(kc) {
modifiers |= m;
} else if let Some(usage) = keycode_to_usage(kc) {
normal_keys.push(usage);
}
for &kc in &self.pressed {
if let Some(m) = is_modifier(kc) { mods |= m }
else if let Some(u) = keycode_to_usage(kc) { keys.push(u) }
}
bytes[0] = modifiers;
for (i, keycode) in normal_keys.into_iter().take(6).enumerate() {
bytes[2 + i] = keycode;
}
bytes
out[0] = mods;
for (i, k) in keys.into_iter().take(6).enumerate() { out[2+i] = k }
out
}
fn send_report(&self, report: [u8; 8]) {
let msg = HidReport {
kind: Some(hid_report::Kind::KeyboardReport(report.to_vec())),
};
let msg = KeyboardReport { data: report.to_vec() }
match self.tx.send(msg.clone()) {
Ok(n) => {
@ -135,8 +91,9 @@ impl KeyboardAggregator {
}
}
fn is_magic_chord(&self) -> bool {
self.pressed_keys.contains(&KeyCode::KEY_LEFTCTRL)
&& self.pressed_keys.contains(&KeyCode::KEY_ESC)
#[inline]
fn magic(&self) -> bool {
self.pressed.contains(&KeyCode::KEY_LEFTCTRL)
&& self.pressed.contains(&KeyCode::KEY_ESC)
}
}

View File

@ -63,6 +63,21 @@ pub fn keycode_to_usage(key: KeyCode) -> Option<u8> {
KeyCode::KEY_DOT => Some(0x37),
KeyCode::KEY_SLASH => Some(0x38),
// --- Function keys ------------------------------------------------
KeyCode::KEY_CAPSLOCK => Some(0x39),
KeyCode::KEY_F1 => Some(0x3A),
KeyCode::KEY_F2 => Some(0x3B),
KeyCode::KEY_F3 => Some(0x3C),
KeyCode::KEY_F4 => Some(0x3D),
KeyCode::KEY_F5 => Some(0x3E),
KeyCode::KEY_F6 => Some(0x3F),
KeyCode::KEY_F7 => Some(0x40),
KeyCode::KEY_F8 => Some(0x41),
KeyCode::KEY_F9 => Some(0x42),
KeyCode::KEY_F10 => Some(0x43),
KeyCode::KEY_F11 => Some(0x44),
KeyCode::KEY_F12 => Some(0x45),
// --- Navigation / editing cluster ---------------------------------
KeyCode::KEY_SYSRQ => Some(0x46), // PrintScreen
KeyCode::KEY_SCROLLLOCK => Some(0x47),
@ -98,21 +113,6 @@ pub fn keycode_to_usage(key: KeyCode) -> Option<u8> {
KeyCode::KEY_KPDOT => Some(0x63),
KeyCode::KEY_KPEQUAL => Some(0x67),
// --- Function keys ------------------------------------------------
KeyCode::KEY_CAPSLOCK => Some(0x39),
KeyCode::KEY_F1 => Some(0x3A),
KeyCode::KEY_F2 => Some(0x3B),
KeyCode::KEY_F3 => Some(0x3C),
KeyCode::KEY_F4 => Some(0x3D),
KeyCode::KEY_F5 => Some(0x3E),
KeyCode::KEY_F6 => Some(0x3F),
KeyCode::KEY_F7 => Some(0x40),
KeyCode::KEY_F8 => Some(0x41),
KeyCode::KEY_F9 => Some(0x42),
KeyCode::KEY_F10 => Some(0x43),
KeyCode::KEY_F11 => Some(0x44),
KeyCode::KEY_F12 => Some(0x45),
// --- Misc ---------------------------------------------------------
KeyCode::KEY_102ND => Some(0x64), // “<>” on ISO boards
KeyCode::KEY_MENU => Some(0x65), // Application / Compose

View File

@ -5,12 +5,12 @@ use evdev::{Device, InputEvent, EventType, KeyCode, RelativeAxisCode};
use tokio::sync::broadcast::{self, Sender};
use tracing::{debug, error, warn};
use navka_common::navka::{HidReport, hid_report};
use navka_common::navka::MouseReport;
/// Aggregator for a single mouse device
pub struct MouseAggregator {
dev: Device,
tx: Sender<HidReport>,
tx: Sender<MouseReport>,
dev_mode: bool,
buttons: u8,
@ -21,18 +21,8 @@ pub struct MouseAggregator {
}
impl MouseAggregator {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<HidReport>) -> Self {
Self {
dev,
tx,
dev_mode,
buttons: 0,
last_buttons: 0,
dx: 0,
dy: 0,
wheel: 0,
}
pub fn new(dev: Device, dev_mode: bool, tx: Sender<MouseReport>) -> Self {
Self { dev, tx, dev_mode, buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 }
}
#[inline]
@ -93,7 +83,7 @@ impl MouseAggregator {
EventType::SYNCHRONIZATION => {
// Any sync event is fine we only care about boundaries
self.flush_report();
}
},
_ => {}
}
@ -119,8 +109,8 @@ impl MouseAggregator {
];
/* broadcast — this is nonblocking just like `try_send` on mpsc */
if let Err(broadcast::error::SendError(_)) = self.tx.send(HidReport {
kind: Some(hid_report::Kind::MouseReport(report.to_vec())),
if let Err(broadcast::error::SendError(_)) = self.tx.send(MouseReport {
report.to_vec()
}) {
self.dev_log(|| warn!("❌ no HID receiver (mouse)"));
} else {

View File

@ -1,14 +1,11 @@
syntax = "proto3";
package navka;
message HidReport {
oneof kind {
bytes keyboard_report = 1; // exactly 8 bytes
bytes mouse_report = 2; // exactly 4 bytes (btn, dx, dy, wheel)
}
}
// smaller, fixed-size payloads less allocation and simpler decoding
message KeyboardReport { bytes data = 1; } // exactly 8 bytes
message MouseReport { bytes data = 1; } // exactly 4 bytes
service Relay {
rpc Stream (stream HidReport) returns (stream HidReport);
}
rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport);
rpc StreamMouse (stream MouseReport) returns (stream MouseReport);
}

View File

@ -20,7 +20,7 @@ else
fi
# 4. build
sudo -u "$ORIG_USER" bash -c "cd '$SRC/client' && cargo build --release"
sudo -u "$ORIG_USER" bash -c "cd '$SRC/client' && cargo clean && cargo build --release"
# 5. install binary
sudo install -Dm755 "$SRC/client/target/release/navka-client" /usr/local/bin/navka-client

View File

@ -23,93 +23,45 @@ struct Handler {
#[tonic::async_trait]
impl Relay for Handler {
type StreamStream =
Pin<Box<dyn Stream<Item = Result<HidReport, Status>> + Send + 'static>>;
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
async fn stream(
async fn stream_keyboard(
&self,
request: Request<tonic::Streaming<HidReport>>,
) -> Result<Response<Self::StreamStream>, Status> {
info!("▶️ new client stream from {:?}", request.remote_addr());
let mut in_stream = request.into_inner();
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
let ms = self.ms.clone();
let (tx, rx) = tokio::sync::mpsc::channel::<Result<HidReport, Status>>(32);
tokio::spawn(async move {
// catch panics so that they are logged instead of killing the task silently
let task = AssertUnwindSafe(async move {
// perpetually read client → server messages
while let Some(res) = in_stream.next().await {
match res {
/* ──────────────── message received ──────────────── */
Ok(msg) => {
debug!("📥 recv {:?}", &msg.kind); // < always log
// 1. write to the right gadget ---------------------------------
let io_res = match &msg.kind {
Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => {
kb.lock().await.write_all(v).await.map(|_| "⌨️ → /dev/hidg0 (8B)")
}
Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => {
ms.lock().await.write_all(v).await.map(|_| "🖱️ → /dev/hidg1 (4B)")
}
_ => {
error!(?msg.kind, "⚠️ malformed packet");
continue; // skip echo
}
};
// 2. I/O result -------------------------------------------------
match io_res {
Ok(msg_txt) => info!("{msg_txt}"),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
trace!("🐛 gadget busy, dropped packet");
continue; // skip echo
}
Err(e) => {
error!("write error: {e}");
continue; // skip echo
}
}
// 3. echo back (besteffort) -----------------------------------
if tx.try_send(Ok(msg)).is_err() {
trace!("↩️ echo buffer full dropped");
}
}
/* ──────────────── benign backpressure error ──────────────── */
Err(status) => {
// Tonic delivers backpressure as UNKNOWN / INTERNAL.
// They are *not* fatal for us log & continue.
warn!("🐛 gRPC backpressure: {status}");
continue; // keep the stream alive
}
}
}
info!("🔚 client closed the upstream");
Ok::<(), Status>(())
})
.catch_unwind()
.await;
if let Err(panic) = task {
// print the panic payload this is what killed the stream earlier
if let Some(s) = panic.downcast_ref::<&str>() {
error!("‼️ stream task panicked: {s}");
} else if let Some(s) = panic.downcast_ref::<String>() {
error!("‼️ stream task panicked: {s}");
} else {
error!("‼️ stream task panicked with unknown payload");
}
let mut s = req.into_inner();
while let Some(r) = s.message().await.transpose()? {
kb.lock().await.write_all(&r.data).await?;
tx.send(Ok(r)).await.ok(); // best-effort echo
}
info!("🔚 client closed the upstream");
Ok::<(), Status>(())
});
/* This is a **writeonly** stream we keep it open forever. */
use futures_util::stream::pending;
Ok(Response::new(Box::pin(pending::<Result<HidReport, Status>>())))
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(512); // higher burst
let ms = self.ms.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(r) = s.message().await.transpose()? {
ms.lock().await.write_all(&r.data).await?;
tx.send(Ok(r)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
}