lesavka/server/src/main.rs

161 lines
6.3 KiB
Rust
Raw Normal View History

2025-06-02 20:41:36 -05:00
//! navka-server — receive HidReport and write to /dev/hidg0
2025-06-16 21:47:01 -05:00
// sever/src/main.rs
2025-06-01 16:04:00 -05:00
#![forbid(unsafe_code)]
2025-06-01 13:31:22 -05:00
2025-06-16 19:42:26 -05:00
use std::{pin::Pin, sync::Arc, panic::AssertUnwindSafe};
2025-06-12 01:59:17 -05:00
use tokio::{fs::{File, OpenOptions}, io::AsyncWriteExt, sync::Mutex};
2025-06-02 20:41:36 -05:00
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{transport::Server, Request, Response, Status};
2025-06-16 19:22:40 -05:00
use tracing::{error, info, trace, warn, debug};
2025-06-15 22:15:50 -05:00
use tracing_subscriber::{fmt, EnvFilter};
2025-06-16 19:42:26 -05:00
use futures_util::FutureExt;
2025-06-01 21:26:57 -05:00
2025-06-01 16:04:00 -05:00
use navka_common::navka::{
2025-06-01 21:26:57 -05:00
relay_server::{Relay, RelayServer},
2025-06-12 02:02:07 -05:00
hid_report,
2025-06-01 21:26:57 -05:00
HidReport,
2025-06-01 16:04:00 -05:00
};
2025-06-02 20:24:00 -05:00
2025-06-02 20:41:36 -05:00
struct Handler {
kb: Arc<Mutex<File>>,
ms: Arc<Mutex<File>>,
2025-06-01 16:04:00 -05:00
}
2025-06-01 13:31:22 -05:00
2025-06-02 20:41:36 -05:00
#[tonic::async_trait]
impl Relay for Handler {
type StreamStream =
Pin<Box<dyn Stream<Item = Result<HidReport, Status>> + Send + 'static>>;
2025-06-01 13:31:22 -05:00
async fn stream(
&self,
request: Request<tonic::Streaming<HidReport>>,
2025-06-02 20:24:30 -05:00
) -> Result<Response<Self::StreamStream>, Status> {
2025-06-15 21:39:07 -05:00
info!("▶️ new client stream from {:?}", request.remote_addr());
2025-06-02 20:41:36 -05:00
let mut in_stream = request.into_inner();
2025-06-12 01:57:08 -05:00
let kb = self.kb.clone();
let ms = self.ms.clone();
2025-06-02 20:41:36 -05:00
let (tx, rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
2025-06-16 19:19:14 -05:00
// catch panics so that they are logged instead of killing the task silently
2025-06-16 19:42:26 -05:00
let task = AssertUnwindSafe(async move {
2025-06-16 19:19:14 -05:00
// perpetually read client → server messages
while let Some(res) = in_stream.next().await {
match res {
/* ──────────────── message received ──────────────── */
Ok(msg) => {
debug!("📥 recv {:?}", &msg.kind); // < always log
// 1. write to the right gadget ---------------------------------
let io_res = match &msg.kind {
Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => {
kb.lock().await.write_all(v).await.map(|_| "⌨️ → /dev/hidg0 (8B)")
}
Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => {
ms.lock().await.write_all(v).await.map(|_| "🖱️ → /dev/hidg1 (4B)")
}
_ => {
error!(?msg.kind, "⚠️ malformed packet");
continue; // skip echo
}
};
// 2. I/O result -------------------------------------------------
match io_res {
Ok(msg_txt) => info!("{msg_txt}"),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
trace!("🐛 gadget busy, dropped packet");
continue; // skip echo
}
Err(e) => {
error!("write error: {e}");
continue; // skip echo
}
}
2025-06-16 19:19:14 -05:00
// 3. echo back (besteffort) -----------------------------------
if tx.try_send(Ok(msg)).is_err() {
trace!("↩️ echo buffer full dropped");
}
}
2025-06-16 19:19:14 -05:00
/* ──────────────── benign backpressure error ──────────────── */
Err(status) => {
// Tonic delivers backpressure as UNKNOWN / INTERNAL.
// They are *not* fatal for us log & continue.
warn!("🐛 gRPC backpressure: {status}");
continue; // keep the stream alive
}
2025-06-06 00:21:20 -05:00
}
2025-06-16 19:19:14 -05:00
}
info!("🔚 client closed the upstream");
Ok::<(), Status>(())
})
.catch_unwind()
.await;
if let Err(panic) = task {
// print the panic payload this is what killed the stream earlier
if let Some(s) = panic.downcast_ref::<&str>() {
error!("‼️ stream task panicked: {s}");
} else if let Some(s) = panic.downcast_ref::<String>() {
error!("‼️ stream task panicked: {s}");
} else {
error!("‼️ stream task panicked with unknown payload");
2025-06-06 00:21:20 -05:00
}
2025-06-01 13:31:22 -05:00
}
2025-06-16 21:47:01 -05:00
info!("🔚 client closed the upstream");
2025-06-01 13:31:22 -05:00
});
2025-06-16 21:47:01 -05:00
/* This is a **writeonly** stream we keep it open forever. */
use futures_util::stream::pending;
Ok(Response::new(Box::pin(pending::<Result<HidReport, Status>>())))
2025-06-01 13:31:22 -05:00
}
}
#[tokio::main]
2025-06-02 20:41:36 -05:00
async fn main() -> anyhow::Result<()> {
2025-06-16 00:10:03 -05:00
fmt().with_env_filter(
2025-06-16 00:05:39 -05:00
// honour RUST_LOG but fall back to very chatty defaults
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new(
"navka_client=trace,\
navka_server=trace,\
tonic=debug,\
h2=debug,\
tower=debug",
)
}),
)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.init();
2025-06-05 22:44:27 -05:00
2025-06-12 01:57:08 -05:00
let kb = OpenOptions::new()
2025-06-02 20:41:36 -05:00
.write(true)
2025-06-06 00:04:55 -05:00
.read(true)
.custom_flags(libc::O_NONBLOCK)
2025-06-02 20:41:36 -05:00
.open("/dev/hidg0")
.await?;
2025-06-12 01:57:08 -05:00
let ms = OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg1")
.await?;
let handler = Handler {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
};
2025-06-01 16:04:00 -05:00
2025-06-02 20:41:36 -05:00
println!("🌐 navka-server listening on 0.0.0.0:50051");
2025-06-01 13:31:22 -05:00
Server::builder()
2025-06-02 20:41:36 -05:00
.add_service(RelayServer::new(handler))
.serve(([0, 0, 0, 0], 50051).into())
2025-06-01 13:31:22 -05:00
.await?;
Ok(())
}