From 83f5d7124d7cf05e317394e0a25a5902990571c2 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 26 Jun 2025 15:12:23 -0500 Subject: [PATCH] mouse speed fix --- client/src/app.rs | 65 +++++++++++++++++++++++---------------- client/src/input/mouse.rs | 2 +- server/src/main.rs | 12 +++----- 3 files changed, 43 insertions(+), 36 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index c9d2a80..02c2d7f 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -132,20 +132,26 @@ impl LesavkaClientApp { let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok()); - let resp = match cli.stream_keyboard(Request::new(outbound)).await { - Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); Self::delay().await; continue } - }; + match cli.stream_keyboard(Request::new(outbound)).await { + Ok(mut resp) => { + // spawn a task just to drain echoes (keeps h2 window happy) + tokio::spawn(async move { + while let Some(_)= resp.get_mut().message().await.transpose() {} + warn!("⌨️ server closed stream"); + }); + } + Err(e) => { + error!("stream_keyboard: {e}"); + Self::delay().await; + continue; + } + } + // from now on we just park – connection persists until it errors + futures::future::pending::<()>().await; - // let mut inbound = resp.into_inner(); - // while let Some(m) = inbound.message().await.transpose() { - // match m { - // Ok(r) => trace!("kbd echo {} B", r.data.len()), - // Err(e) => { error!("kbd inbound: {e}"); break } - // } - // } - drop(resp); - warn!("⌨️ disconnected"); - Self::delay().await; + // drop(resp); + // warn!("⌨️ disconnected"); + // Self::delay().await; } } @@ -160,20 +166,25 @@ impl LesavkaClientApp { let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok()); - let resp = match cli.stream_mouse(Request::new(outbound)).await { - Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); Self::delay().await; continue } - }; - - // let mut inbound = resp.into_inner(); - // while let Some(m) = inbound.message().await.transpose() { - // match m { - // Ok(r) => trace!("mouse echo {} B", r.data.len()), - // Err(e) => { error!("mouse inbound: {e}"); break } - // } - // } - drop(resp); - warn!("🖱️ disconnected"); - Self::delay().await; + match cli.stream_mouse(Request::new(outbound)).await { + Ok(mut resp) => { + // spawn a task just to drain echoes (keeps h2 window happy) + tokio::spawn(async move { + while let Some(_)= resp.get_mut().message().await.transpose() {} + warn!("⌨️ server closed stream"); + }); + } + Err(e) => { + error!("stream_mouse: {e}"); + Self::delay().await; + continue; + } + } + // from now on we just park – connection persists until it errors + futures::future::pending::<()>().await; + // drop(resp); + // warn!("🖱️ disconnected"); + // Self::delay().await; } } diff --git a/client/src/input/mouse.rs b/client/src/input/mouse.rs index 96568f9..3de7978 100644 --- a/client/src/input/mouse.rs +++ b/client/src/input/mouse.rs @@ -7,7 +7,7 @@ use tracing::{debug, error, warn, trace}; use lesavka_common::lesavka::MouseReport; -const SEND_INTERVAL: Duration = Duration::from_micros(2000); +const SEND_INTERVAL: Duration = Duration::from_micros(250); pub struct MouseAggregator { dev: Device, diff --git a/server/src/main.rs b/server/src/main.rs index 1468041..30e79b2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -143,7 +143,7 @@ impl Relay for Handler { .map_err(|e| Status::internal(e.to_string()))?; tokio::time::sleep(Duration::from_millis(500)).await; } - let (tx, _rx) = + let (tx, rx) = tokio::sync::mpsc::channel::>(32); let kb = self.kb.clone(); @@ -193,16 +193,14 @@ impl Relay for Handler { Ok::<(), Status>(()) }); - let (_noop_tx, empty_rx) = - tokio::sync::mpsc::channel::>(1); - Ok(Response::new(ReceiverStream::new(empty_rx))) + Ok(Response::new(ReceiverStream::new(rx))) } async fn stream_mouse( &self, req: Request>, ) -> Result, Status> { - let (tx, _rx) = + let (tx, rx) = tokio::sync::mpsc::channel::>(4096); let ms = self.ms.clone(); @@ -251,9 +249,7 @@ impl Relay for Handler { Ok::<(), Status>(()) }); - let (_noop_tx, empty_rx) = - tokio::sync::mpsc::channel::>(1); - Ok(Response::new(ReceiverStream::new(empty_rx))) + Ok(Response::new(ReceiverStream::new(rx))) } async fn capture_video(