From bed03404feb7af5d9d86d4619f26f0d5abdd1b3a Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 1 Jun 2025 21:26:57 -0500 Subject: [PATCH] update to main --- server/src/main.rs | 77 +++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 45 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 9293fcc..bbfc761 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,30 +1,25 @@ -//! navka-server – bridge HID reports between client (gRPC) and /dev/hidg0 +//! navka-server – echoes HID reports back to every client #![forbid(unsafe_code)] use anyhow::Result; -use navka_common::navka::{ - hid_report::*, relay_server::Relay, HidReport, RelayServer, -}; -use std::{path::Path, sync::Arc}; -use tokio::{ - fs::OpenOptions, - io::AsyncWriteExt, - signal, - sync::{mpsc, Mutex}, -}; -use tonic::{transport::Server, Request, Response, Status}; +use tokio::{sync::mpsc, task}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status, transport::Server}; +use tonic::async_trait; -/// Implementation of the gRPC service generated by tonic-build. -#[derive(Debug)] -pub struct RelaySvc { - /// Shared handle to /dev/hidg0 (protected by a mutex because several - /// gRPC streams may write concurrently). - hidg: Arc>, +use navka_common::navka::{ + relay_server::{Relay, RelayServer}, + HidReport, +}; + +struct RelaySvc { + // shared broadcast channel (unused for now, but kept for future fan-out) + _hub_tx: mpsc::Sender, } -#[tonic::async_trait] +#[async_trait] impl Relay for RelaySvc { - type StreamStream = tokio_stream::wrappers::ReceiverStream>; + type StreamStream = ReceiverStream>; async fn stream( &self, @@ -32,42 +27,34 @@ impl Relay for RelaySvc { ) -> Result, Status> { let mut inbound = request.into_inner(); - // Channel we’ll use if one day we need to send something back. - let (tx, rx) = mpsc::channel(32); + // each connected client gets its own outbound channel + let (tx, rx) = mpsc::channel::>(32); + let outbound = ReceiverStream::new(rx); - let hidg = self.hidg.clone(); - tokio::spawn(async move { - while let Some(report) = inbound.message().await.transpose()? { - // Write raw 8-byte packet to the gadget device - hidg.lock().await.write_all(&report.data).await?; + // task: read → echo + task::spawn(async move { + while let Some(report) = inbound.message().await? { + // errors here just mean the client hung up – stop the loop + if tx.send(Ok(report)).await.is_err() { + break; + } } - Ok::<(), anyhow::Error>(()) + Ok::<(), Status>(()) }); - Ok(Response::new(tokio_stream::wrappers::ReceiverStream::new(rx))) + Ok(Response::new(outbound)) } } #[tokio::main] async fn main() -> Result<()> { - // Open /dev/hidg0 once and share it. - let hid_dev = if Path::new("/dev/hidg0").exists() { - OpenOptions::new().write(true).open("/dev/hidg0").await? - } else { - anyhow::bail!("/dev/hidg0 not found – is navka-gadget running?"); - }; + // placeholder hub – you’ll reuse this when you forward to the OTG gadget + let (hub_tx, _hub_rx) = mpsc::channel::(32); - let svc = RelayServer::new(RelaySvc { - hidg: Arc::new(Mutex::new(hid_dev)), - }); - - println!("🛰️ navka-server listening on 0.0.0.0:50051"); + println!("🌀 navka-server listening on 0.0.0.0:50051"); Server::builder() - .add_service(svc) - .serve_with_shutdown("0.0.0.0:50051".parse()?, async { - signal::ctrl_c().await.ok(); - }) + .add_service(RelayServer::new(RelaySvc { _hub_tx: hub_tx })) + .serve("0.0.0.0:50051".parse().unwrap()) .await?; - Ok(()) }