// client/src/app.rs use anyhow::{Context, Result}; use tokio::{sync::mpsc, time::Duration, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; use tonic::Request; use tracing::{info, warn, error}; use navka_common::navka::{relay_client::RelayClient, HidReport}; use crate::input::inputs::InputAggregator; pub struct NavkaClientApp { aggregator: InputAggregator, server_addr: String, dev_mode: bool, } impl NavkaClientApp { pub fn new() -> Result { let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok(); let addr = std::env::args() .nth(1) .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".to_owned()); let mut aggregator = InputAggregator::new(); aggregator.init()?; // discover & grab Ok(Self { aggregator, server_addr: addr, dev_mode, }) } pub async fn run(&mut self) -> Result<()> { // spawn aggregator let aggregator_task: JoinHandle> = tokio::spawn(async move { self.aggregator.run().await }); // dev-mode kill future let suicide_future = async { if self.dev_mode { info!("DEV-mode: will kill itself in 30 s"); tokio::time::sleep(Duration::from_secs(30)).await; Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - goodbye cruel world...")) } else { futures::future::pending().await } }; // Combine aggregator + dev + reconnect logic tokio::select! { res = aggregator_task => { error!("Aggregator ended: {res:?}"); std::process::exit(1); }, res = suicide_future => { warn!("Dev-mode: {res:?}"); std::process::exit(0); }, _ = self.reconnect_loop() => { // if that loop ends, e.g. user or server warn!("Reconnect loop ended?? We exit"); std::process::exit(0); } } } /// The loop that dials the server, sets up gRPC streaming, /// and waits for inbound to end. Then tries again, unless aggregator ended. async fn reconnect_loop(&self) { loop { // dial the server let mut client = match RelayClient::connect(self.server_addr.clone()).await { Ok(c) => c, Err(e) => { error!("connect error {e}, sleeping 5s"); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; // fresh channel for aggregator => we do this with new (tx, rx) let (tx, rx) = mpsc::channel::(32); // aggregator can hold 'tx' by storing in some global or so? let outbound = ReceiverStream::new(rx); let response = match client.stream(Request::new(outbound)).await { Ok(r) => r, Err(e) => { error!("stream RPC error: {e}, sleeping 5s"); tokio::time::sleep(Duration::from_secs(5)).await; continue; } }; let mut inbound = response.into_inner(); // read inbound while let Some(res) = inbound.message().await.transpose() { match res { Ok(report) => { tracing::debug!(?report.data, "server inbound"); }, Err(e) => { error!("Inbound error: {e}"); break; } } } warn!("Inbound ended. Will try to reconnect in 5s"); tokio::time::sleep(Duration::from_secs(5)).await; } } fn make_channel(&self) -> Result> { // aggregator's main channel let (tx, _rx) = mpsc::channel(32); // aggregator would store tx in self Ok(tx) } }