client reconnects automatically now

This commit is contained in:
Brad Stein 2025-06-08 18:11:44 -05:00
parent a518c8acc4
commit 411a2ab3fe
2 changed files with 79 additions and 45 deletions

View File

@ -1,9 +1,8 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use tokio::sync::mpsc; use tokio::{sync::mpsc, time::Duration, task::JoinHandle};
use tokio::time::Duration;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::Request; use tonic::Request;
use tracing::info; use tracing::{info, warn, error};
use navka_common::navka::{relay_client::RelayClient, HidReport}; use navka_common::navka::{relay_client::RelayClient, HidReport};
use crate::input::keyboard::KeyboardAggregator; use crate::input::keyboard::KeyboardAggregator;
@ -27,59 +26,94 @@ impl NavkaClientApp {
} }
pub async fn run(&mut self) -> Result<()> { pub async fn run(&mut self) -> Result<()> {
// 1) Connect to navka-server // aggregator
let mut client = RelayClient::connect(self.server_addr.clone()) let mut aggregator = KeyboardAggregator::new(self.make_channel()?);
.await
.with_context(|| format!("failed to connect to {}", self.server_addr))?;
// 2) Create a bidirectional streaming stub
let (tx, rx) = mpsc::channel::<HidReport>(32);
let outbound = ReceiverStream::new(rx);
let response = client.stream(Request::new(outbound)).await?;
let mut inbound = response.into_inner();
// 3) Start reading from all keyboards in a background task
let mut aggregator = KeyboardAggregator::new(tx.clone());
aggregator.init_devices()?; // discover & grab aggregator.init_devices()?; // discover & grab
let kb_handle = tokio::spawn(async move {
if let Err(e) = aggregator.run().await { // spawn aggregator
tracing::error!("KeyboardAggregator failed: {e}"); let aggregator_task: JoinHandle<Result<()>> = tokio::spawn(async move {
} aggregator.run().await
}); });
// 4) Add 30 second suicide for dev mode // dev-mode kill future
let suicide_fut = async { let suicide_future = async {
if std::env::var_os("NAVKA_DEV_MODE").is_some() { if self.dev_mode {
tracing::info!("DEV-mode: will exit in 30 s"); info!("DEV-mode: will kill itself in 30 s");
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
Err::<(), _>(anyhow::anyhow!("dev-mode timer expired")) // cause select! branch to win Err::<(), _>(anyhow::anyhow!("dev-mode timer expired\ngoodbye cruel world..."))
} else { } else {
futures::future::pending().await futures::future::pending().await
} }
}; };
// 5) Inbound loop: we do something with reports from the server, e.g. logging: // Combine aggregator + dev + reconnect logic
let inbound_fut = async { tokio::select! {
while let Some(report) = inbound.message().await? { res = aggregator_task => {
tracing::debug!(?report.data, "msg from server"); error!("Aggregator ended: {res:?}");
std::process::exit(1);
},
res = suicide_future => {
warn!("Dev-mode: {res:?}");
std::process::exit(0);
},
_ = self.reconnect_loop() => {
// if that loop ends, e.g. user or server
warn!("Reconnect loop ended?? We exit");
std::process::exit(0);
}
}
}
/// The loop that dials the server, sets up gRPC streaming,
/// and waits for inbound to end. Then tries again, unless aggregator ended.
async fn reconnect_loop(&self) {
loop {
// dial the server
let mut client = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => {
error!("connect error {e}, sleeping 5s");
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
} }
Err::<(), _>(anyhow::anyhow!("server closed stream"))
}; };
// 6) Race the futures // fresh channel for aggregator => we do this with new (tx, rx)
tokio::select! { let (tx, rx) = mpsc::channel::<HidReport>(32);
res = inbound_fut => { // aggregator can hold 'tx' by storing in some global or so?
tracing::warn!("Inbound stream ended: {res:?}");
}, let outbound = ReceiverStream::new(rx);
res = kb_handle => { let response = match client.stream(Request::new(outbound)).await {
tracing::warn!("Keyboard task finished: {res:?}"); Ok(r) => r,
}, Err(e) => {
res = suicide_fut => { error!("stream RPC error: {e}, sleeping 5s");
tracing::warn!("Dev-mode shutdown: {res:?}"); tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
};
let mut inbound = response.into_inner();
// read inbound
while let Some(res) = inbound.message().await.transpose() {
match res {
Ok(report) => {
tracing::debug!(?report.data, "server inbound");
}, },
Err(e) => {
error!("Inbound error: {e}");
break;
}
}
}
warn!("Inbound ended. Will try to reconnect in 5s");
tokio::time::sleep(Duration::from_secs(5)).await;
}
} }
// 7) If inbound stream ends, stop the input task fn make_channel(&self) -> Result<mpsc::Sender<HidReport>> {
Ok(()) // aggregator's main channel
let (tx, _rx) = mpsc::channel(32);
// aggregator would store tx in self
Ok(tx)
} }
} }

View File

@ -11,8 +11,8 @@ use crate::input::keymap::{keycode_to_usage, is_modifier};
/// Magic chord example: LeftCtrl + LeftAlt + LeftShift + Esc /// Magic chord example: LeftCtrl + LeftAlt + LeftShift + Esc
const MAGIC_CHORD: &[KeyCode] = &[ const MAGIC_CHORD: &[KeyCode] = &[
KeyCode::KEY_LEFTCTRL, KeyCode::KEY_LEFTCTRL,
KeyCode::KEY_LEFTSHIFT, // KeyCode::KEY_LEFTSHIFT,
KeyCode::KEY_LEFTALT, // KeyCode::KEY_LEFTALT,
KeyCode::KEY_ESC, KeyCode::KEY_ESC,
]; ];