server updates

This commit is contained in:
Brad Stein 2025-06-16 19:19:14 -05:00
parent de2c99731f
commit 59f02adcf8
2 changed files with 62 additions and 44 deletions

View File

@ -26,7 +26,7 @@ impl NavkaClientApp {
.or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok())
.unwrap_or_else(|| "http://127.0.0.1:50051".to_owned());
let (tx, _) = broadcast::channel::<HidReport>(128);
let (tx, _) = broadcast::channel::<HidReport>(2048); // 🖱️ + ⌨️ burstproof
let mut aggregator = InputAggregator::new(dev_mode, tx.clone());
aggregator.init()?; // discover & grab

View File

@ -36,55 +36,73 @@ impl Relay for Handler {
let (tx, rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
loop {
match in_stream.next().await {
/* ──────────────── message received ──────────────── */
Some(Ok(msg)) => {
// 1. write to the right gadget ---------------------------------
let io_res = match &msg.kind {
Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => {
kb.lock().await.write_all(v).await.map(|_| "⌨️ → /dev/hidg0 (8B)")
// catch panics so that they are logged instead of killing the task silently
let task = std::panic::AssertUnwindSafe(async move {
// perpetually read client → server messages
while let Some(res) = in_stream.next().await {
match res {
/* ──────────────── message received ──────────────── */
Ok(msg) => {
debug!("📥 recv {:?}", &msg.kind); // < always log
// 1. write to the right gadget ---------------------------------
let io_res = match &msg.kind {
Some(hid_report::Kind::KeyboardReport(v)) if v.len() == 8 => {
kb.lock().await.write_all(v).await.map(|_| "⌨️ → /dev/hidg0 (8B)")
}
Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => {
ms.lock().await.write_all(v).await.map(|_| "🖱️ → /dev/hidg1 (4B)")
}
_ => {
error!(?msg.kind, "⚠️ malformed packet");
continue; // skip echo
}
};
// 2. I/O result -------------------------------------------------
match io_res {
Ok(msg_txt) => info!("{msg_txt}"),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
trace!("🐛 gadget busy, dropped packet");
continue; // skip echo
}
Err(e) => {
error!("write error: {e}");
continue; // skip echo
}
}
Some(hid_report::Kind::MouseReport(v)) if v.len() == 4 => {
ms.lock().await.write_all(v).await.map(|_| "🖱️ → /dev/hidg1 (4B)")
}
_ => {
error!(?msg.kind, "⚠️ malformed packet");
continue; // skip echo
}
};
// 2. I/O result -------------------------------------------------
match io_res {
Ok(msg_txt) => info!("{msg_txt}"),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
trace!("🐛 gadget busy, dropped packet");
continue; // skip echo
}
Err(e) => {
error!("write error: {e}");
continue; // skip echo
// 3. echo back (besteffort) -----------------------------------
if tx.try_send(Ok(msg)).is_err() {
trace!("↩️ echo buffer full dropped");
}
}
// 3. echo back (besteffort) -----------------------------------
let _ = tx.try_send(Ok(msg));
/* ──────────────── benign backpressure error ──────────────── */
Err(status) => {
// Tonic delivers backpressure as UNKNOWN / INTERNAL.
// They are *not* fatal for us log & continue.
warn!("🐛 gRPC backpressure: {status}");
continue; // keep the stream alive
}
}
/* ──────────────── benign backpressure error ──────────────── */
Some(Err(status)) => {
trace!("grpc recv error (ignored): {status}");
continue; // keep the stream alive
}
/* ──────────────── client closed the stream ──────────────── */
None => break,
}
info!("🔚 client closed the upstream");
Ok::<(), Status>(())
})
.catch_unwind()
.await;
if let Err(panic) = task {
// print the panic payload this is what killed the stream earlier
if let Some(s) = panic.downcast_ref::<&str>() {
error!("‼️ stream task panicked: {s}");
} else if let Some(s) = panic.downcast_ref::<String>() {
error!("‼️ stream task panicked: {s}");
} else {
error!("‼️ stream task panicked with unknown payload");
}
}
info!("🔚 client stream closed");
// dropping `tx` here terminates the server→client stream gracefully
Ok::<(), Status>(())
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))