//! navka-server — receive HidReport and write to /dev/hidg0 // sever/src/main.rs #![forbid(unsafe_code)] use std::{pin::Pin, sync::Arc, panic::AssertUnwindSafe}; use tokio::{fs::{File, OpenOptions}, io::AsyncWriteExt, sync::Mutex}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tonic::{transport::Server, Request, Response, Status}; use tracing::{error, info, trace, warn, debug}; use tracing_subscriber::{fmt, EnvFilter}; use futures_util::FutureExt; use navka_common::navka::{ relay_server::{Relay, RelayServer}, hid_report, HidReport, }; struct Handler { kb: Arc>, ms: Arc>, } #[tonic::async_trait] impl Relay for Handler { }type StreamStream = Pin> + Send + 'static>>; async fn stream( &self, request: Request>, ) -> Result, Status> { info!("▶️ new client stream from {:?}", request.remote_addr()); let mut in_stream = request.into_inner(); let kb = self.kb.clone(); let ms = self.ms.clone(); let (tx, rx) = tokio::sync::mpsc::channel::>(32); tokio::spawn(async move { // catch panics so that they are logged instead of killing the task silently let task = AssertUnwindSafe(async move { // perpetually read client → server messages while let Some(res) = in_stream.next().await { match res { /* ──────────────── message received ──────────────── */ Ok(msg) => { debug!("📥 recv {:?}", &msg.kind); // <‑‑ always log // 1. write to the right gadget --------------------------------- let io_res = match &msg.kind { Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => { kb.lock().await.write_all(v).await.map(|_| "⌨️ → /dev/hidg0 (8 B)") } Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => { ms.lock().await.write_all(v).await.map(|_| "🖱️ → /dev/hidg1 (4 B)") } _ => { error!(?msg.kind, "⚠️ malformed packet"); continue; // skip echo } }; // 2. I/O result ------------------------------------------------- match io_res { Ok(msg_txt) => info!("{msg_txt}"), Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { trace!("🐛 gadget busy, dropped packet"); continue; // skip echo } Err(e) => { error!("write error: {e}"); continue; // skip echo } } // 3. echo back (best‑effort) ----------------------------------- if tx.try_send(Ok(msg)).is_err() { trace!("↩️ echo buffer full – dropped"); } } /* ──────────────── benign back‑pressure error ──────────────── */ Err(status) => { // Tonic delivers back‑pressure as UNKNOWN / INTERNAL. // They are *not* fatal for us – log & continue. warn!("🐛 gRPC back‑pressure: {status}"); continue; // keep the stream alive } } } info!("🔚 client closed the upstream"); Ok::<(), Status>(()) }) .catch_unwind() .await; if let Err(panic) = task { // print the panic payload – this is what killed the stream earlier if let Some(s) = panic.downcast_ref::<&str>() { error!("‼️ stream task panicked: {s}"); } else if let Some(s) = panic.downcast_ref::() { error!("‼️ stream task panicked: {s}"); } else { error!("‼️ stream task panicked with unknown payload"); } } info!("🔚 client closed the upstream"); }); /* This is a **write‑only** stream – we keep it open forever. */ use futures_util::stream::pending; Ok(Response::new(Box::pin(pending::>()))) } } #[tokio::main] async fn main() -> anyhow::Result<()> { fmt().with_env_filter( // honour RUST_LOG but fall back to very chatty defaults EnvFilter::try_from_default_env().unwrap_or_else(|_| { EnvFilter::new( "navka_client=trace,\ navka_server=trace,\ tonic=debug,\ h2=debug,\ tower=debug", ) }), ) .with_target(true) .with_thread_ids(true) .with_file(true) .init(); let kb = OpenOptions::new() .write(true) .read(true) .custom_flags(libc::O_NONBLOCK) .open("/dev/hidg0") .await?; let ms = OpenOptions::new() .write(true) .read(true) .custom_flags(libc::O_NONBLOCK) .open("/dev/hidg1") .await?; let handler = Handler { kb: Arc::new(Mutex::new(kb)), ms: Arc::new(Mutex::new(ms)), }; println!("🌐 navka-server listening on 0.0.0.0:50051"); Server::builder() .add_service(RelayServer::new(handler)) .serve(([0, 0, 0, 0], 50051).into()) .await?; Ok(()) }