// client/src/app.rs #![forbid(unsafe_code)] use anyhow::Result; use std::time::Duration; use tokio::{sync::broadcast, task::JoinHandle}; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tonic::Request; use tracing::{info, warn, error, debug}; use navka_common::navka::{relay_client::RelayClient, HidReport}; use crate::input::inputs::InputAggregator; pub struct NavkaClientApp { aggregator: Option, server_addr: String, dev_mode: bool, tx: broadcast::Sender, } impl NavkaClientApp { pub fn new() -> Result { info!("Creating navka-client app!"); let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok(); let addr = std::env::args() .nth(1) .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".to_owned()); let (tx, _) = broadcast::channel::(2048); // 🖱️ + ⌨️ burst‑proof let mut aggregator = InputAggregator::new(dev_mode, tx.clone()); aggregator.init()?; // discover & grab Ok(Self { aggregator: Some(aggregator), server_addr: addr, dev_mode, tx, }) } pub async fn run(&mut self) -> Result<()> { info!("Running navka-client app!"); // spawn aggregator let mut aggregator = std::mem::take(&mut self.aggregator).expect("aggregator must exist"); let aggregator_task: JoinHandle> = tokio::spawn(async move { aggregator.run().await }); // dev-mode kill future let suicide_future = async { if self.dev_mode { info!("DEV-mode: will kill itself in 30s"); tokio::time::sleep(Duration::from_secs(30)).await; Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - goodbye cruel world...")) } else { futures::future::pending().await } }; // Combine aggregator + dev + reconnect logic tokio::select! { // aggregator finishes agg_res = aggregator_task => { match agg_res { Ok(Ok(())) => { error!("Aggregator ended normally, exiting."); std::process::exit(0); } Ok(Err(e)) => { error!("Aggregator ended with error: {e}"); std::process::exit(1); } Err(join_err) => { error!("Aggregator task panicked or was cancelled: {join_err}"); std::process::exit(1); } } }, // dev-mode res = suicide_future => { warn!("Dev-mode: {res:?}"); std::process::exit(0); }, // reconnect loop _ = self.reconnect_loop() => { warn!("Reconnect loop ended??"); std::process::exit(0); } } } /// The loop that dials the server, sets up gRPC streaming, /// and waits for inbound to end. Then tries again, unless aggregator ended. async fn reconnect_loop(&self) { loop { // dial the servers info!("📞 dialling {}", self.server_addr); let mut client = match RelayClient::connect(self.server_addr.clone()).await { Ok(c) => c, Err(e) => { error!("connect error {e}, sleeping 1s"); tokio::time::sleep(Duration::from_secs(1)).await; continue; } }; // fresh reader over the *same* broadcast channel let outbound = BroadcastStream::new(self.tx.subscribe()).filter_map(|r| r.ok()); info!("🛫 spawning stream()"); let response = match client.stream(Request::new(outbound)).await { Ok(r) => { info!("✅ stream established"); r }, Err(e) => { error!("stream RPC error: {e}, sleeping 1s"); tokio::time::sleep(Duration::from_secs(1)).await; continue; } }; let mut inbound = response.into_inner(); while let Some(res) = inbound.message().await.transpose() { match res { Ok(report) => { debug!(?report.kind, "↩️ echo from server"); }, Err(e) => { error!("Inbound error: {e}"); break; } } } warn!("🔌 disconnected – retrying in 1 s"); tokio::time::sleep(Duration::from_secs(1)).await; } } }