From 4c7bc1f2989bbc6082706555558ca2e250cafbad Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 2 Jun 2025 20:41:36 -0500 Subject: [PATCH] updates --- server/src/main.rs | 69 +++++++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index bbfc761..0959765 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,60 +1,67 @@ -//! navka-server – echoes HID reports back to every client +//! navka-server — receive HidReport and write to /dev/hidg0 #![forbid(unsafe_code)] -use anyhow::Result; -use tokio::{sync::mpsc, task}; -use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status, transport::Server}; -use tonic::async_trait; +use std::{pin::Pin, sync::Arc}; +use tokio::{fs::OpenOptions, io::AsyncWriteExt}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; +use tonic::{transport::Server, Request, Response, Status}; use navka_common::navka::{ relay_server::{Relay, RelayServer}, HidReport, }; -struct RelaySvc { - // shared broadcast channel (unused for now, but kept for future fan-out) - _hub_tx: mpsc::Sender, +struct Handler { + /// shared async handle to /dev/hidg0 + hid: Arc>, } -#[async_trait] -impl Relay for RelaySvc { - type StreamStream = ReceiverStream>; +#[tonic::async_trait] +impl Relay for Handler { + type StreamStream = + Pin> + Send + 'static>>; async fn stream( &self, request: Request>, ) -> Result, Status> { - let mut inbound = request.into_inner(); + let mut in_stream = request.into_inner(); + let hid = self.hid.clone(); - // each connected client gets its own outbound channel - let (tx, rx) = mpsc::channel::>(32); - let outbound = ReceiverStream::new(rx); + // echo everything back to caller & push down USB + let (tx, rx) = tokio::sync::mpsc::channel(32); - // task: read → echo - task::spawn(async move { - while let Some(report) = inbound.message().await? { - // errors here just mean the client hung up – stop the loop - if tx.send(Ok(report)).await.is_err() { - break; - } + tokio::spawn(async move { + while let Some(msg) = in_stream.next().await.transpose()? { + // 1) write to /dev/hidg0 + let mut file = hid.lock().await; + file.write_all(&msg.data).await.map_err(|e| Status::internal(e.to_string()))?; + file.flush().await.ok(); + + // 2) echo back + tx.send(Ok(msg)).await.ok(); } - Ok::<(), Status>(()) + Ok::<_, Status>(()) }); - Ok(Response::new(outbound)) + Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) } } #[tokio::main] -async fn main() -> Result<()> { - // placeholder hub – you’ll reuse this when you forward to the OTG gadget - let (hub_tx, _hub_rx) = mpsc::channel::(32); +async fn main() -> anyhow::Result<()> { + let file = OpenOptions::new() + .write(true) + .open("/dev/hidg0") + .await?; + let hid = Arc::new(tokio::sync::Mutex::new(file)); - println!("🌀 navka-server listening on 0.0.0.0:50051"); + let handler = Handler { hid }; + + println!("🌐 navka-server listening on 0.0.0.0:50051"); Server::builder() - .add_service(RelayServer::new(RelaySvc { _hub_tx: hub_tx })) - .serve("0.0.0.0:50051".parse().unwrap()) + .add_service(RelayServer::new(handler)) + .serve(([0, 0, 0, 0], 50051).into()) .await?; Ok(()) }