diff --git a/client/src/app.rs b/client/src/app.rs index ac5048a..5a749b3 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -26,7 +26,7 @@ impl NavkaClientApp { .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".to_owned()); - let (tx, _) = broadcast::channel::(128); + let (tx, _) = broadcast::channel::(2048); // πŸ–±οΈ + ⌨️ burst‑proof let mut aggregator = InputAggregator::new(dev_mode, tx.clone()); aggregator.init()?; // discover & grab diff --git a/server/src/main.rs b/server/src/main.rs index 001dcaa..14f248c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -36,55 +36,73 @@ impl Relay for Handler { let (tx, rx) = tokio::sync::mpsc::channel(32); tokio::spawn(async move { - loop { - match in_stream.next().await { - /* ──────────────── message received ──────────────── */ - Some(Ok(msg)) => { - // 1. write to the right gadget --------------------------------- - let io_res = match &msg.kind { - Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => { - kb.lock().await.write_all(v).await.map(|_| "⌨️ β†’ /dev/hidg0 (8β€―B)") + // catch panics so that they are logged instead of killing the task silently + let task = std::panic::AssertUnwindSafe(async move { + // perpetually read client β†’ server messages + while let Some(res) = in_stream.next().await { + match res { + /* ──────────────── message received ──────────────── */ + Ok(msg) => { + debug!("πŸ“₯ recv {:?}", &msg.kind); // <‑‑ always log + + // 1. write to the right gadget --------------------------------- + let io_res = match &msg.kind { + Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => { + kb.lock().await.write_all(v).await.map(|_| "⌨️ β†’ /dev/hidg0 (8β€―B)") + } + Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => { + ms.lock().await.write_all(v).await.map(|_| "πŸ–±οΈ β†’ /dev/hidg1 (4β€―B)") + } + _ => { + error!(?msg.kind, "⚠️ malformed packet"); + continue; // skip echo + } + }; + + // 2. I/O result ------------------------------------------------- + match io_res { + Ok(msg_txt) => info!("{msg_txt}"), + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + trace!("πŸ› gadget busy, dropped packet"); + continue; // skip echo + } + Err(e) => { + error!("write error: {e}"); + continue; // skip echo + } } - Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => { - ms.lock().await.write_all(v).await.map(|_| "πŸ–±οΈ β†’ /dev/hidg1 (4β€―B)") - } - _ => { - error!(?msg.kind, "⚠️ malformed packet"); - continue; // skip echo - } - }; - - // 2. I/O result ------------------------------------------------- - match io_res { - Ok(msg_txt) => info!("{msg_txt}"), - Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { - trace!("πŸ› gadget busy, dropped packet"); - continue; // skip echo - } - Err(e) => { - error!("write error: {e}"); - continue; // skip echo + + // 3. echo back (best‑effort) ----------------------------------- + if tx.try_send(Ok(msg)).is_err() { + trace!("↩️ echo buffer full – dropped"); } } - - // 3. echo back (best‑effort) ----------------------------------- - let _ = tx.try_send(Ok(msg)); + + /* ──────────────── benign back‑pressure error ──────────────── */ + Err(status) => { + // Tonic delivers back‑pressure as UNKNOWN / INTERNAL. + // They are *not* fatal for us – log & continue. + warn!("πŸ› gRPC back‑pressure: {status}"); + continue; // keep the stream alive + } } - - /* ──────────────── benign back‑pressure error ──────────────── */ - Some(Err(status)) => { - trace!("grpc recv error (ignored): {status}"); - continue; // keep the stream alive - } - - /* ──────────────── client closed the stream ──────────────── */ - None => break, + } + info!("πŸ”š client closed the upstream"); + Ok::<(), Status>(()) + }) + .catch_unwind() + .await; + + if let Err(panic) = task { + // print the panic payload – this is what killed the stream earlier + if let Some(s) = panic.downcast_ref::<&str>() { + error!("‼️ stream task panicked: {s}"); + } else if let Some(s) = panic.downcast_ref::() { + error!("‼️ stream task panicked: {s}"); + } else { + error!("‼️ stream task panicked with unknown payload"); } } - - info!("πŸ”š client stream closed"); - // dropping `tx` here terminates the serverβ†’client stream gracefully - Ok::<(), Status>(()) }); Ok(Response::new(Box::pin(ReceiverStream::new(rx))))