This commit is contained in:
Brad Stein 2025-06-17 20:54:31 -05:00
parent a56a3be347
commit f8ae813750
5 changed files with 191 additions and 293 deletions

View File

@ -1,16 +1,17 @@
// client/src/app.rs
#![forbid(unsafe_code)]
use anyhow::Result;
use std::time::Duration;
use tokio::{sync::broadcast, task::JoinHandle};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tonic::Request;
use tracing::{info, warn, error, debug};
use tracing::{debug, error, info, warn};
use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport,};
use input::inputs::InputAggregator;
use navka_common::navka::{relay_client::RelayClient, KeyboardReport, MouseReport};
use crate::input::inputs::InputAggregator;
pub struct NavkaClientApp {
aggregator: Option<InputAggregator>,
@ -22,134 +23,105 @@ pub struct NavkaClientApp {
impl NavkaClientApp {
pub fn new() -> Result<Self> {
info!("Creating navka-client app!");
let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok();
let server = std::env::args()
let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok();
let server_addr = std::env::args()
.nth(1)
.or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok())
.unwrap_or_else(|| "http://127.0.0.1:50051".to_owned());
.unwrap_or_else(|| "http://127.0.0.1:50051".into());
let (kbd_tx, _) = broadcast::channel::<KeyboardReport>(1024);
let (mou_tx, _) = broadcast::channel::<MouseReport>(4096);
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
agg.init()?;
Ok(Self {
keyboard: Some(kbd_aggregator),
mouse: Some(mou_aggregator),
server_addr: addr,
dev_mode,
tx,
})
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
agg.init()?; // grab devices
Ok(Self { aggregator: Some(agg),
server_addr, dev_mode,
kbd_tx, mou_tx })
}
pub async fn run(&mut self) -> Result<()> {
info!("Running navka-client app!");
/* ─ spawn the input thread ─ */
let agg_task: JoinHandle<Result<()>> =
tokio::spawn(async move { self.aggregator.take().unwrap().run().await });
/* detach the aggregator before spawn so `self` is not moved */
let aggregator = self.aggregator.take().expect("InputAggregator present");
let agg_task = tokio::spawn(async move { aggregator.run().await });
/* ─ stream loops ─ */
/* two networking tasks */
let kbd_loop = self.stream_loop_keyboard();
let mou_loop = self.stream_loop_mouse();
/* ─ suicide timer for DEV mode ─ */
let suicide_future = async {
/* optional suicide timer */
let suicide = async {
if self.dev_mode {
info!("DEV-mode: will kill itself in 30s");
tokio::time::sleep(Duration::from_secs(30)).await;
Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - 💀goodbye💀 cruel world..."))
} else {
futures::future::pending().await
}
warn!("devmode timeout");
std::process::exit(0);
} else { futures::future::pending::<()>().await }
};
// Combine aggregator + dev + reconnect logic
tokio::select! {
// aggregator finishes
agg_res = aggregator_task => {
match agg_res {
Ok(Ok(())) => {
error!("Aggregator ended normally, exiting.");
std::process::exit(0);
}
Ok(Err(e)) => {
error!("Aggregator ended with error: {e}");
std::process::exit(1);
}
Err(join_err) => {
error!("Aggregator task panicked or was cancelled: {join_err}");
std::process::exit(1);
}
}
},
// dev-mode
res = suicide_future => {
warn!("Dev-mode: {res:?}");
std::process::exit(0);
},
// reconnect loop
_ = self.reconnect_loop() => {
warn!("Reconnect loop ended??");
std::process::exit(0);
_ = kbd_loop => unreachable!(),
_ = mou_loop => unreachable!(),
_ = suicide => unreachable!(),
// _ = suicide => { warn!("devmode timeout"); std::process::exit(0) },
r = agg_task => {
error!("aggregator task ended: {r:?}");
std::process::exit(1)
}
}
}
/*──────────────── keyboard stream ───────────────*/
async fn stream_loop_keyboard(&self) {
loop {
info!("⌨️ connect {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
};
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok());
let resp = match cli.stream_keyboard(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); Self::delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => debug!("kbd echo {}B", r.data.len()),
Err(e) => { error!("kbd inbound: {e}"); break }
}
}
warn!("⌨️ disconnected");
Self::delay().await;
}
}
/*──────────────── mouse stream ──────────────────*/
async fn stream_loop_mouse(&self) {
loop {
info!("🖱️ connect {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
};
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok());
let resp = match cli.stream_mouse(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); Self::delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => debug!("mouse echo {}B", r.data.len()),
Err(e) => { error!("mouse inbound: {e}"); break }
}
}
warn!("🖱️ disconnected");
Self::delay().await;
}
}
#[inline(always)]
async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; }
/*──────────────────────────────── keyboard stream ─────────────────────────*/
async fn stream_loop_keyboard(&self) {
loop {
info!("⌨️ dial {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c, Err(e) => { error!("connect: {e}"); delay().await; continue }
};
let outbound = BroadcastStream::new(self.kbd_tx.subscribe())
.filter_map(|r| r.ok());
info!("⌨️ start stream_keyboard()");
let resp = match cli.stream_keyboard(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => debug!("↩️ kbd echo {}B", r.data.len()),
Err(e) => { error!("kbd inbound: {e}"); break }
}
}
warn!("⌨️ disconnected, retry");
delay().await;
}
}
/*──────────────────────────────── mouse stream ────────────────────────────*/
async fn stream_loop_mouse(&self) {
loop {
info!("🖱️ dial {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c, Err(e) => { error!("connect: {e}"); delay().await; continue }
};
let outbound = BroadcastStream::new(self.mou_tx.subscribe())
.filter_map(|r| r.ok());
info!("🖱️ start stream_mouse()");
let resp = match cli.stream_mouse(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => debug!("↩️ mouse echo {}B", r.data.len()),
Err(e) => { error!("mouse inbound: {e}"); break }
}
}
warn!("🖱️ disconnected, retry");
delay().await;
}
}
}

View File

@ -2,46 +2,31 @@
use anyhow::{bail, Context, Result};
use evdev::{Device, EventType, KeyCode, RelativeAxisCode};
use tokio::time::{interval, Duration};
use tokio::sync::broadcast::Sender;
use tokio::{sync::broadcast::Sender, time::{interval, Duration}};
use tracing::{debug, info};
use navka_common::navka::HidReport;
use navka_common::navka::{KeyboardReport, MouseReport};
use crate::input::keyboard::KeyboardAggregator;
use crate::input::mouse::MouseAggregator;
use crate::input::camera::CameraCapture;
use crate::input::microphone::MicrophoneCapture;
use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator,
camera::CameraCapture, microphone::MicrophoneCapture};
/// A top-level aggregator that enumerates /dev/input/event*,
/// classifies them as keyboard vs. mouse vs. other,
/// spawns specialized aggregator objects, and can also
/// create stubs for camera/microphone logic if needed.
pub struct InputAggregator {
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>,
pub dev_mode: bool,
keyboards: Vec<KeyboardAggregator>,
mice: Vec<MouseAggregator>,
camera: Option<CameraCapture>,
mic: Option<MicrophoneCapture>,
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>,
dev_mode: bool,
keyboards: Vec<KeyboardAggregator>,
mice: Vec<MouseAggregator>,
camera: Option<CameraCapture>,
mic: Option<MicrophoneCapture>,
}
impl InputAggregator {
pub fn new(
dev_mode: bool,
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>,
) -> Self {
Self {
kbd_tx,
mou_tx,
dev_mode,
keyboards: Vec::new(),
mice: Vec::new(),
camera: None,
mic: None,
}
pub fn new(dev_mode: bool,
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>) -> Self {
Self { kbd_tx, mou_tx, dev_mode,
keyboards: Vec::new(), mice: Vec::new(),
camera: None, mic: None }
}
/// Called once at startup: enumerates input devices,
@ -89,7 +74,7 @@ impl InputAggregator {
dev.grab().with_context(|| format!("grabbing mouse {path:?}"))?;
info!("Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN"));
// let mouse_agg = MouseAggregator::new(dev);np
// let mouse_agg = MouseAggregator::new(dev);
let mouse_agg = MouseAggregator::new(dev, self.dev_mode, self.mou_tx.clone());
self.mice.push(mouse_agg);
found_any = true;

View File

@ -1,99 +1,78 @@
// client/src/input/keyboard.rs
use std::collections::HashSet;
use evdev::{Device, InputEvent, KeyCode, EventType};
use evdev::{Device, EventType, InputEvent, KeyCode};
use tokio::sync::broadcast::Sender;
use tracing::{warn, error, info, debug};
use tracing::{debug, error, info, warn};
use navka_common::navka::KeyboardReport;
use crate::input::keymap::{keycode_to_usage, is_modifier};
use super::keymap::{is_modifier, keycode_to_usage};
/// The aggregator logic for a single keyboard device.
pub struct KeyboardAggregator {
dev: Device,
tx: Sender<KeyboardReport>,
tx: Sender<KeyboardReport>,
dev_mode: bool,
pressed_keys: HashSet<KeyCode>,
}
impl KeyboardAggregator {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<KeyboardReport>) -> Self {
let _ = dev.set_nonblocking(true);
Self {
dev,
tx,
dev_mode,
pressed_keys: HashSet::new(),
}
let _ = dev.set_nonblocking(true);
Self { dev, tx, dev_mode, pressed_keys: HashSet::new() }
}
#[inline]
fn dev_log(&self, record: impl FnOnce()) {
if self.dev_mode {
record();
}
}
/// Called frequently (e.g. every ~10ms) to fetch + handle events
pub fn process_events(&mut self) {
let events: Vec<InputEvent> = match self.dev.fetch_events() {
// --- first fetch, then log (avoids aliasing borrow) ---
let result = self.dev.fetch_events();
let events: Vec<InputEvent> = match result {
Ok(it) => it.collect(),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => { self.d(|| error!("read err: {e}")); return }
Err(e) => { if self.dev_mode { error!("read error: {e}"); } return }
};
if self.dev_mode && !events.is_empty() {
self.d(|| info!("{} kbd evts from {}", events.len(),
self.dev.name().unwrap_or("???")));
info!("{} kbd evts from {}", events.len(), self.dev.name().unwrap_or("?"));
}
for ev in events {
if ev.event_type() != EventType::KEY { continue }
let code = KeyCode::new(ev.code());
match ev.value() {
1 => { self.pressed.insert(code); } // press
0 => { self.pressed.remove(&code); } // release
1 => { self.pressed_keys.insert(code); } // press
0 => { self.pressed_keys.remove(&code); } // release
_ => {}
}
let report = self.build_report();
self.d(|| debug!(?report, "kbd report"));
self.send_report(report);
if self.magic() { warn!("🧙 magic chord, exiting 🪄 AVADA KEDAVRA!!! 💥💀"); std::process::exit(0) }
if self.dev_mode { debug!(?rep, "kbd"); }
let _ = self.tx.send(KeyboardReport { data: report.to_vec() });
if self.is_magic() {
warn!("🧙 magic chord exiting 🪄 AVADA KEDAVRA!!! 💥💀");
std::process::exit(0);
}
}
}
}
fn build_report(&self) -> [u8; 8] {
let mut out = [0u8; 8];
let mut mods = 0u8;
let mut keys = Vec::new();
for &kc in &self.pressed {
if let Some(m) = is_modifier(kc) { mods |= m }
for &kc in &self.pressed_keys {
if let Some(m) = is_modifier(kc) { mods |= m }
else if let Some(u) = keycode_to_usage(kc) { keys.push(u) }
}
out[0] = mods;
for (i, k) in keys.into_iter().take(6).enumerate() { out[2+i] = k }
out
}
fn send_report(&self, report: [u8; 8]) {
let msg = KeyboardReport { data: report.to_vec() }
match self.tx.send(msg.clone()) {
Ok(n) => {
self.dev_log(|| info!("📤 sent HID report → {n} subscriber(s)"));
}
Err(e) => {
self.dev_log(|| warn!("❌ send failed: {e}"));
let _ = self.tx.send(msg);
}
}
}
#[inline]
fn magic(&self) -> bool {
self.pressed.contains(&KeyCode::KEY_LEFTCTRL)
&& self.pressed.contains(&KeyCode::KEY_ESC)
fn is_magic(&self) -> bool {
self.pressed_keys.contains(&KeyCode::KEY_LEFTCTRL)
&& self.pressed_keys.contains(&KeyCode::KEY_ESC)
}
}

View File

@ -1,22 +1,20 @@
// client/src/input/mouse.rs
use anyhow::Result;
use evdev::{Device, InputEvent, EventType, KeyCode, RelativeAxisCode};
use evdev::{Device, EventType, InputEvent, KeyCode, RelativeAxisCode};
use tokio::sync::broadcast::{self, Sender};
use tracing::{debug, error, warn};
use navka_common::navka::MouseReport;
/// Aggregator for a single mouse device
pub struct MouseAggregator {
dev: Device,
tx: Sender<MouseReport>,
tx: Sender<MouseReport>,
dev_mode: bool,
buttons: u8,
last_buttons: u8,
dx: i8,
dy: i8,
dx: i8,
dy: i8,
wheel: i8,
}
@ -25,102 +23,65 @@ impl MouseAggregator {
Self { dev, tx, dev_mode, buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 }
}
#[inline]
fn dev_log(&self, record: impl FnOnce()) {
if self.dev_mode {
record();
}
}
#[inline]
fn set_btn(&mut self, bit: u8, val: i32) {
if val != 0 {
self.buttons |= 1 << bit;
} else {
self.buttons &= !(1 << bit);
}
}
#[inline] fn slog(&self, f: impl FnOnce()) { if self.dev_mode { f() } }
pub fn process_events(&mut self) {
/* 1 ─ read a nonblocking batch */
let events: Vec<InputEvent> = match self.dev.fetch_events() {
let result = self.dev.fetch_events();
let evts: Vec<InputEvent> = match result {
Ok(it) => it.collect(),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => { if self.dev_mode { error!("🖱️ read error: {e}"); } return; }
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => { if self.dev_mode { error!("mouse read err: {e}"); } return }
};
if self.dev_mode && !events.is_empty() {
self.dev_log(|| debug!("🖱️ {} events from {}", events.len(),
self.dev.name().unwrap_or("UNKNOWN")));
if self.dev_mode && !evts.is_empty() {
debug!("🖱️ {} evts from {}", evts.len(), self.dev.name().unwrap_or("?"));
}
/* 2 ─ aggregate */
for ev in events {
match ev.event_type() {
/* ---------- buttons ---------- */
EventType::KEY => match ev.code() {
c if c == KeyCode::BTN_LEFT.0 => self.set_btn(0, ev.value()),
c if c == KeyCode::BTN_RIGHT.0 => self.set_btn(1, ev.value()),
c if c == KeyCode::BTN_MIDDLE.0 => self.set_btn(2, ev.value()),
for e in evts {
match e.event_type() {
EventType::KEY => match e.code() {
c if c == KeyCode::BTN_LEFT.0 => self.set_btn(0, e.value()),
c if c == KeyCode::BTN_RIGHT.0 => self.set_btn(1, e.value()),
c if c == KeyCode::BTN_MIDDLE.0 => self.set_btn(2, e.value()),
_ => {}
},
/* ----------- axes ------------ */
EventType::RELATIVE => match ev.code() {
c if c == RelativeAxisCode::REL_X.0 => {
self.dx = self.dx.saturating_add(ev.value().clamp(-127, 127) as i8);
}
c if c == RelativeAxisCode::REL_Y.0 => {
self.dy = self.dy.saturating_add(ev.value().clamp(-127, 127) as i8);
}
c if c == RelativeAxisCode::REL_WHEEL.0 => {
self.wheel = self.wheel.saturating_add(ev.value().clamp(-1, 1) as i8);
}
EventType::RELATIVE => match e.code() {
c if c == RelativeAxisCode::REL_X.0 =>
self.dx = self.dx.saturating_add(e.value().clamp(-127,127) as i8),
c if c == RelativeAxisCode::REL_Y.0 =>
self.dy = self.dy.saturating_add(e.value().clamp(-127,127) as i8),
c if c == RelativeAxisCode::REL_WHEEL.0 =>
self.wheel = self.wheel.saturating_add(e.value().clamp(-1,1) as i8),
_ => {}
},
/* ---- batch delimiter -------- */
EventType::SYNCHRONIZATION => {
// Any sync event is fine we only care about boundaries
self.flush_report();
},
EventType::SYNCHRONIZATION => self.flush(),
_ => {}
}
}
}
/// Build & send HID packet, then clear deltas
fn flush_report(&mut self) {
/* Nothing changed ⇒ nothing to send */
if self.dx == 0
&& self.dy == 0
&& self.wheel == 0
&& self.buttons == self.last_buttons
{
fn flush(&mut self) {
if self.dx==0 && self.dy==0 && self.wheel==0 && self.buttons==self.last_buttons {
return;
}
let report = [
let pkt = [
self.buttons,
self.dx.clamp(-127, 127) as u8,
self.dy.clamp(-127, 127) as u8,
self.dx.clamp(-127,127) as u8,
self.dy.clamp(-127,127) as u8,
self.wheel as u8,
];
/* broadcast — this is nonblocking just like `try_send` on mpsc */
if let Err(broadcast::error::SendError(_)) = self.tx.send(MouseReport {
report.to_vec()
}) {
self.dev_log(|| warn!("❌ no HID receiver (mouse)"));
} else {
self.dev_log(|| debug!("📤 HID mouse {:?}", report));
}
if let Err(broadcast::error::SendError(_)) =
self.tx.send(MouseReport { data: pkt.to_vec() })
{ if self.dev_mode { warn!("❌ no HID receiver (mouse)"); } }
else if self.dev_mode { debug!("📤 mouse {:?}", pkt) }
/* reset deltas for next frame */
self.dx = 0;
self.dy = 0;
self.wheel = 0;
self.last_buttons = self.buttons;
self.dx=0; self.dy=0; self.wheel=0; self.last_buttons=self.buttons;
}
#[inline] fn set_btn(&mut self, bit: u8, val: i32) {
if val!=0 { self.buttons |= 1<<bit } else { self.buttons &= !(1<<bit) }
}
}

View File

@ -12,13 +12,12 @@ use futures_util::FutureExt;
use navka_common::navka::{
relay_server::{Relay, RelayServer},
hid_report,
HidReport,
KeyboardReport, MouseReport,
};
struct Handler {
kb: Arc<Mutex<File>>,
ms: Arc<Mutex<File>>,
kb: Arc<Mutex<tokio::fs::File>>,
ms: Arc<Mutex<tokio::fs::File>>,
}
#[tonic::async_trait]
@ -35,9 +34,9 @@ impl Relay for Handler {
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(r) = s.message().await.transpose()? {
kb.lock().await.write_all(&r.data).await?;
tx.send(Ok(r)).await.ok(); // best-effort echo
while let Some(pkt) = s.next().await.transpose()? {
kb.lock().await.write_all(&pkt.data).await?;
tx.send(Ok(pkt)).await;//.ok(); // best-effort echo
}
Ok::<(), Status>(())
});
@ -54,9 +53,9 @@ impl Relay for Handler {
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(r) = s.message().await.transpose()? {
ms.lock().await.write_all(&r.data).await?;
tx.send(Ok(r)).await.ok();
while let Some(pkt) = s.next().await.transpose()? {
ms.lock().await.write_all(&pkt.data).await?;
tx.send(Ok(pkt)).await;//.ok();
}
Ok::<(), Status>(())
});
@ -69,31 +68,33 @@ impl Relay for Handler {
async fn main() -> anyhow::Result<()> {
fmt().with_env_filter(
// honour RUST_LOG but fall back to very chatty defaults
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::try_from_default_env().unwrap_or_else(|_| //{
EnvFilter::new(
"navka_client=trace,\
navka_server=trace,\
tonic=debug,\
h2=debug,\
tower=debug",
// "navka_client=trace,\
// navka_server=trace,\
// tonic=debug,\
// h2=debug,\
// tower=debug",
"navka_server=info"
)
}),
//}
),
)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
// .with_target(true)
// .with_thread_ids(true)
// .with_file(true)
.init();
let kb = OpenOptions::new()
.write(true)
.read(true)
// .read(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg0")
.await?;
let ms = OpenOptions::new()
.write(true)
.read(true)
// .read(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg1")
.await?;