lesavka/server/src/main.rs
2025-06-16 19:42:26 -05:00

158 lines
6.1 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! navka-server — receive HidReport and write to /dev/hidg0
// main.rs
#![forbid(unsafe_code)]
use std::{pin::Pin, sync::Arc, panic::AssertUnwindSafe};
use tokio::{fs::{File, OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{error, info, trace, warn, debug};
use tracing_subscriber::{fmt, EnvFilter};
use futures_util::FutureExt;
use navka_common::navka::{
relay_server::{Relay, RelayServer},
hid_report,
HidReport,
};
struct Handler {
kb: Arc<Mutex<File>>,
ms: Arc<Mutex<File>>,
}
#[tonic::async_trait]
impl Relay for Handler {
type StreamStream =
Pin<Box<dyn Stream<Item = Result<HidReport, Status>> + Send + 'static>>;
async fn stream(
&self,
request: Request<tonic::Streaming<HidReport>>,
) -> Result<Response<Self::StreamStream>, Status> {
info!("▶️ new client stream from {:?}", request.remote_addr());
let mut in_stream = request.into_inner();
let kb = self.kb.clone();
let ms = self.ms.clone();
let (tx, rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
// catch panics so that they are logged instead of killing the task silently
let task = AssertUnwindSafe(async move {
// perpetually read client → server messages
while let Some(res) = in_stream.next().await {
match res {
/* ──────────────── message received ──────────────── */
Ok(msg) => {
debug!("📥 recv {:?}", &msg.kind); // < always log
// 1. write to the right gadget ---------------------------------
let io_res = match &msg.kind {
Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => {
kb.lock().await.write_all(v).await.map(|_| "⌨️ → /dev/hidg0 (8B)")
}
Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => {
ms.lock().await.write_all(v).await.map(|_| "🖱️ → /dev/hidg1 (4B)")
}
_ => {
error!(?msg.kind, "⚠️ malformed packet");
continue; // skip echo
}
};
// 2. I/O result -------------------------------------------------
match io_res {
Ok(msg_txt) => info!("{msg_txt}"),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
trace!("🐛 gadget busy, dropped packet");
continue; // skip echo
}
Err(e) => {
error!("write error: {e}");
continue; // skip echo
}
}
// 3. echo back (besteffort) -----------------------------------
if tx.try_send(Ok(msg)).is_err() {
trace!("↩️ echo buffer full dropped");
}
}
/* ──────────────── benign backpressure error ──────────────── */
Err(status) => {
// Tonic delivers backpressure as UNKNOWN / INTERNAL.
// They are *not* fatal for us log & continue.
warn!("🐛 gRPC backpressure: {status}");
continue; // keep the stream alive
}
}
}
info!("🔚 client closed the upstream");
Ok::<(), Status>(())
})
.catch_unwind()
.await;
if let Err(panic) = task {
// print the panic payload this is what killed the stream earlier
if let Some(s) = panic.downcast_ref::<&str>() {
error!("‼️ stream task panicked: {s}");
} else if let Some(s) = panic.downcast_ref::<String>() {
error!("‼️ stream task panicked: {s}");
} else {
error!("‼️ stream task panicked with unknown payload");
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
fmt().with_env_filter(
// honour RUST_LOG but fall back to very chatty defaults
EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new(
"navka_client=trace,\
navka_server=trace,\
tonic=debug,\
h2=debug,\
tower=debug",
)
}),
)
.with_target(true)
.with_thread_ids(true)
.with_file(true)
.init();
let kb = OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg0")
.await?;
let ms = OpenOptions::new()
.write(true)
.read(true)
.custom_flags(libc::O_NONBLOCK)
.open("/dev/hidg1")
.await?;
let handler = Handler {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
};
println!("🌐 navka-server listening on 0.0.0.0:50051");
Server::builder()
.add_service(RelayServer::new(handler))
.serve(([0, 0, 0, 0], 50051).into())
.await?;
Ok(())
}