use anyhow::{Context, Result}; use tokio::sync::mpsc; use tokio::time::Duration; use tokio_stream::wrappers::ReceiverStream; use tonic::Request; use tracing::info; use navka_common::navka::{relay_client::RelayClient, HidReport}; use crate::input::keyboard::KeyboardAggregator; pub struct NavkaClientApp { server_addr: String, dev_mode: bool, } impl NavkaClientApp { pub fn new() -> Result { let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok(); let addr = std::env::args() .nth(1) .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".to_owned()); Ok(Self { server_addr: addr, dev_mode, }) } pub async fn run(&mut self) -> Result<()> { // 1) Connect to navka-server let mut client = RelayClient::connect(self.server_addr.clone()) .await .with_context(|| format!("failed to connect to {}", self.server_addr))?; // 2) Create a bidirectional streaming stub let (tx, rx) = mpsc::channel::(32); let outbound = ReceiverStream::new(rx); let response = client.stream(Request::new(outbound)).await?; let mut inbound = response.into_inner(); // 3) Start reading from all keyboards in a background task let mut aggregator = KeyboardAggregator::new(tx.clone()); aggregator.init_devices()?; // discover & grab let kb_handle = tokio::spawn(async move { if let Err(e) = aggregator.run().await { tracing::error!("KeyboardAggregator failed: {e}"); } }); // 4) Add 30 second suicide for dev mode let suicide_fut = async { if std::env::var_os("NAVKA_DEV_MODE").is_some() { tracing::info!("DEV-mode: will exit in 30 s"); tokio::time::sleep(Duration::from_secs(30)).await; Err::<(), _>(anyhow::anyhow!("dev-mode timer expired")) // cause select! branch to win } else { futures::future::pending().await } }; // 5) Inbound loop: we do something with reports from the server, e.g. logging: let inbound_fut = async { while let Some(report) = inbound.message().await? { tracing::debug!(?report.data, "msg from server"); } Err::<(), _>(anyhow::anyhow!("server closed stream")) }; // 6) Race the futures tokio::select! { res = inbound_fut => { tracing::warn!("Inbound stream ended: {res:?}"); }, res = kb_handle => { tracing::warn!("Keyboard task finished: {res:?}"); }, res = suicide_fut => { tracing::warn!("Dev-mode shutdown: {res:?}"); }, } // 7) If inbound stream ends, stop the input task Ok(()) } }