From 411a2ab3fe8809631a4e7a96e29de4d3748f0fc1 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 8 Jun 2025 18:11:44 -0500 Subject: [PATCH] client reconnects automatically now --- client/src/app.rs | 120 ++++++++++++++++++++++------------- client/src/input/keyboard.rs | 4 +- 2 files changed, 79 insertions(+), 45 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 527f9c5..4a0b5c7 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -1,9 +1,8 @@ use anyhow::{Context, Result}; -use tokio::sync::mpsc; -use tokio::time::Duration; +use tokio::{sync::mpsc, time::Duration, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; use tonic::Request; -use tracing::info; +use tracing::{info, warn, error}; use navka_common::navka::{relay_client::RelayClient, HidReport}; use crate::input::keyboard::KeyboardAggregator; @@ -27,59 +26,94 @@ impl NavkaClientApp { } pub async fn run(&mut self) -> Result<()> { - // 1) Connect to navka-server - let mut client = RelayClient::connect(self.server_addr.clone()) - .await - .with_context(|| format!("failed to connect to {}", self.server_addr))?; + // aggregator + let mut aggregator = KeyboardAggregator::new(self.make_channel()?); + aggregator.init_devices()?; // discover & grab - // 2) Create a bidirectional streaming stub - let (tx, rx) = mpsc::channel::(32); - let outbound = ReceiverStream::new(rx); - let response = client.stream(Request::new(outbound)).await?; - let mut inbound = response.into_inner(); - - // 3) Start reading from all keyboards in a background task - let mut aggregator = KeyboardAggregator::new(tx.clone()); - aggregator.init_devices()?; // discover & grab - let kb_handle = tokio::spawn(async move { - if let Err(e) = aggregator.run().await { - tracing::error!("KeyboardAggregator failed: {e}"); - } + // spawn aggregator + let aggregator_task: JoinHandle> = tokio::spawn(async move { + aggregator.run().await }); - // 4) Add 30 second suicide for dev mode - let suicide_fut = async { - if std::env::var_os("NAVKA_DEV_MODE").is_some() { - tracing::info!("DEV-mode: will exit in 30 s"); + // dev-mode kill future + let suicide_future = async { + if self.dev_mode { + info!("DEV-mode: will kill itself in 30 s"); tokio::time::sleep(Duration::from_secs(30)).await; - Err::<(), _>(anyhow::anyhow!("dev-mode timer expired")) // cause select! branch to win + Err::<(), _>(anyhow::anyhow!("dev-mode timer expired\ngoodbye cruel world...")) } else { futures::future::pending().await } }; - // 5) Inbound loop: we do something with reports from the server, e.g. logging: - let inbound_fut = async { - while let Some(report) = inbound.message().await? { - tracing::debug!(?report.data, "msg from server"); - } - Err::<(), _>(anyhow::anyhow!("server closed stream")) - }; - - // 6) Race the futures + // Combine aggregator + dev + reconnect logic tokio::select! { - res = inbound_fut => { - tracing::warn!("Inbound stream ended: {res:?}"); + res = aggregator_task => { + error!("Aggregator ended: {res:?}"); + std::process::exit(1); }, - res = kb_handle => { - tracing::warn!("Keyboard task finished: {res:?}"); - }, - res = suicide_fut => { - tracing::warn!("Dev-mode shutdown: {res:?}"); + res = suicide_future => { + warn!("Dev-mode: {res:?}"); + std::process::exit(0); }, + _ = self.reconnect_loop() => { + // if that loop ends, e.g. user or server + warn!("Reconnect loop ended?? We exit"); + std::process::exit(0); + } } + } - // 7) If inbound stream ends, stop the input task - Ok(()) + /// The loop that dials the server, sets up gRPC streaming, + /// and waits for inbound to end. Then tries again, unless aggregator ended. + async fn reconnect_loop(&self) { + loop { + // dial the server + let mut client = match RelayClient::connect(self.server_addr.clone()).await { + Ok(c) => c, + Err(e) => { + error!("connect error {e}, sleeping 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + + // fresh channel for aggregator => we do this with new (tx, rx) + let (tx, rx) = mpsc::channel::(32); + // aggregator can hold 'tx' by storing in some global or so? + + let outbound = ReceiverStream::new(rx); + let response = match client.stream(Request::new(outbound)).await { + Ok(r) => r, + Err(e) => { + error!("stream RPC error: {e}, sleeping 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + } + }; + + let mut inbound = response.into_inner(); + // read inbound + while let Some(res) = inbound.message().await.transpose() { + match res { + Ok(report) => { + tracing::debug!(?report.data, "server inbound"); + }, + Err(e) => { + error!("Inbound error: {e}"); + break; + } + } + } + warn!("Inbound ended. Will try to reconnect in 5s"); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + + fn make_channel(&self) -> Result> { + // aggregator's main channel + let (tx, _rx) = mpsc::channel(32); + // aggregator would store tx in self + Ok(tx) } } diff --git a/client/src/input/keyboard.rs b/client/src/input/keyboard.rs index 40e2a11..7ca6e88 100644 --- a/client/src/input/keyboard.rs +++ b/client/src/input/keyboard.rs @@ -11,8 +11,8 @@ use crate::input::keymap::{keycode_to_usage, is_modifier}; /// Magic chord example: LeftCtrl + LeftAlt + LeftShift + Esc const MAGIC_CHORD: &[KeyCode] = &[ KeyCode::KEY_LEFTCTRL, - KeyCode::KEY_LEFTSHIFT, - KeyCode::KEY_LEFTALT, + // KeyCode::KEY_LEFTSHIFT, + // KeyCode::KEY_LEFTALT, KeyCode::KEY_ESC, ];