This commit is contained in:
Brad Stein 2025-06-02 20:41:36 -05:00
parent 89f440b0fe
commit 4c7bc1f298

View File

@ -1,60 +1,67 @@
//! navka-server echoes HID reports back to every client //! navka-server — receive HidReport and write to /dev/hidg0
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
use anyhow::Result; use std::{pin::Pin, sync::Arc};
use tokio::{sync::mpsc, task}; use tokio::{fs::OpenOptions, io::AsyncWriteExt};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{Request, Response, Status, transport::Server}; use tonic::{transport::Server, Request, Response, Status};
use tonic::async_trait;
use navka_common::navka::{ use navka_common::navka::{
relay_server::{Relay, RelayServer}, relay_server::{Relay, RelayServer},
HidReport, HidReport,
}; };
struct RelaySvc { struct Handler {
// shared broadcast channel (unused for now, but kept for future fan-out) /// shared async handle to /dev/hidg0
_hub_tx: mpsc::Sender<HidReport>, hid: Arc<tokio::sync::Mutex<tokio::fs::File>>,
} }
#[async_trait] #[tonic::async_trait]
impl Relay for RelaySvc { impl Relay for Handler {
type StreamStream = ReceiverStream<Result<HidReport, Status>>; type StreamStream =
Pin<Box<dyn Stream<Item = Result<HidReport, Status>> + Send + 'static>>;
async fn stream( async fn stream(
&self, &self,
request: Request<tonic::Streaming<HidReport>>, request: Request<tonic::Streaming<HidReport>>,
) -> Result<Response<Self::StreamStream>, Status> { ) -> Result<Response<Self::StreamStream>, Status> {
let mut inbound = request.into_inner(); let mut in_stream = request.into_inner();
let hid = self.hid.clone();
// each connected client gets its own outbound channel // echo everything back to caller & push down USB
let (tx, rx) = mpsc::channel::<Result<HidReport, Status>>(32); let (tx, rx) = tokio::sync::mpsc::channel(32);
let outbound = ReceiverStream::new(rx);
// task: read → echo tokio::spawn(async move {
task::spawn(async move { while let Some(msg) = in_stream.next().await.transpose()? {
while let Some(report) = inbound.message().await? { // 1) write to /dev/hidg0
// errors here just mean the client hung up stop the loop let mut file = hid.lock().await;
if tx.send(Ok(report)).await.is_err() { file.write_all(&msg.data).await.map_err(|e| Status::internal(e.to_string()))?;
break; file.flush().await.ok();
// 2) echo back
tx.send(Ok(msg)).await.ok();
} }
} Ok::<_, Status>(())
Ok::<(), Status>(())
}); });
Ok(Response::new(outbound)) Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
} }
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> anyhow::Result<()> {
// placeholder hub youll reuse this when you forward to the OTG gadget let file = OpenOptions::new()
let (hub_tx, _hub_rx) = mpsc::channel::<HidReport>(32); .write(true)
.open("/dev/hidg0")
.await?;
let hid = Arc::new(tokio::sync::Mutex::new(file));
println!("🌀 navka-server listening on 0.0.0.0:50051"); let handler = Handler { hid };
println!("🌐 navka-server listening on 0.0.0.0:50051");
Server::builder() Server::builder()
.add_service(RelayServer::new(RelaySvc { _hub_tx: hub_tx })) .add_service(RelayServer::new(handler))
.serve("0.0.0.0:50051".parse().unwrap()) .serve(([0, 0, 0, 0], 50051).into())
.await?; .await?;
Ok(()) Ok(())
} }