lesavka/client/src/app.rs

133 lines
4.7 KiB
Rust
Raw Normal View History

2025-06-08 22:24:14 -05:00
// client/src/app.rs
2025-06-11 00:37:01 -05:00
use anyhow::Result;
use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle};
2025-06-08 04:11:58 -05:00
use tokio_stream::wrappers::ReceiverStream;
use tonic::Request;
2025-06-08 18:11:44 -05:00
use tracing::{info, warn, error};
2025-06-08 13:11:31 -05:00
use navka_common::navka::{relay_client::RelayClient, HidReport};
2025-06-08 22:24:14 -05:00
use crate::input::inputs::InputAggregator;
2025-06-08 04:11:58 -05:00
pub struct NavkaClientApp {
2025-06-11 00:37:01 -05:00
aggregator: Option<InputAggregator>,
2025-06-08 04:11:58 -05:00
server_addr: String,
2025-06-08 13:11:31 -05:00
dev_mode: bool,
2025-06-08 04:11:58 -05:00
}
impl NavkaClientApp {
pub fn new() -> Result<Self> {
2025-06-08 13:11:31 -05:00
let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok();
2025-06-08 04:11:58 -05:00
let addr = std::env::args()
.nth(1)
.or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok())
.unwrap_or_else(|| "http://127.0.0.1:50051".to_owned());
2025-06-12 01:48:48 -05:00
let (tx, _rx) = mpsc::channel::<HidReport>(64);
let mut aggregator = InputAggregator::new(dev_mode, tx);
2025-06-08 22:24:14 -05:00
aggregator.init()?; // discover & grab
2025-06-11 00:37:01 -05:00
2025-06-08 13:11:31 -05:00
Ok(Self {
2025-06-11 00:37:01 -05:00
aggregator: Some(aggregator),
2025-06-08 13:11:31 -05:00
server_addr: addr,
dev_mode,
})
2025-06-08 04:11:58 -05:00
}
pub async fn run(&mut self) -> Result<()> {
2025-06-08 18:11:44 -05:00
// spawn aggregator
2025-06-11 00:37:01 -05:00
let mut aggregator = std::mem::take(&mut self.aggregator).expect("aggregator must exist");
2025-06-08 18:11:44 -05:00
let aggregator_task: JoinHandle<Result<()>> = tokio::spawn(async move {
2025-06-11 00:37:01 -05:00
aggregator.run().await
2025-06-08 04:11:58 -05:00
});
2025-06-08 18:11:44 -05:00
// dev-mode kill future
let suicide_future = async {
if self.dev_mode {
2025-06-11 00:37:01 -05:00
info!("DEV-mode: will kill itself in 30s");
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
2025-06-08 22:24:14 -05:00
Err::<(), _>(anyhow::anyhow!("dev-mode timer expired - goodbye cruel world..."))
2025-06-08 13:35:23 -05:00
} else {
futures::future::pending().await
2025-06-08 13:11:31 -05:00
}
2025-06-08 13:35:23 -05:00
};
2025-06-08 13:11:31 -05:00
2025-06-08 18:11:44 -05:00
// Combine aggregator + dev + reconnect logic
2025-06-08 13:35:23 -05:00
tokio::select! {
2025-06-11 00:37:01 -05:00
// aggregator finishes
agg_res = aggregator_task => {
match agg_res {
Ok(Ok(())) => {
error!("Aggregator ended normally, exiting.");
std::process::exit(0);
}
Ok(Err(e)) => {
error!("Aggregator ended with error: {e}");
std::process::exit(1);
}
Err(join_err) => {
error!("Aggregator task panicked or was cancelled: {join_err}");
std::process::exit(1);
}
}
2025-06-08 13:35:23 -05:00
},
2025-06-11 00:37:01 -05:00
// dev-mode
2025-06-08 18:11:44 -05:00
res = suicide_future => {
warn!("Dev-mode: {res:?}");
std::process::exit(0);
2025-06-08 13:35:23 -05:00
},
2025-06-11 00:37:01 -05:00
// reconnect loop
2025-06-08 18:11:44 -05:00
_ = self.reconnect_loop() => {
warn!("Reconnect loop ended?? We exit");
std::process::exit(0);
}
2025-06-08 04:11:58 -05:00
}
2025-06-08 18:11:44 -05:00
}
/// The loop that dials the server, sets up gRPC streaming,
/// and waits for inbound to end. Then tries again, unless aggregator ended.
async fn reconnect_loop(&self) {
loop {
2025-06-11 00:37:01 -05:00
// dial the servers
info!("Dialing server at: {}", self.server_addr);
2025-06-08 18:11:44 -05:00
let mut client = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => {
2025-06-11 00:37:01 -05:00
error!("connect error {e}, sleeping 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
2025-06-08 18:11:44 -05:00
continue;
}
};
// fresh channel for aggregator => we do this with new (tx, rx)
let (tx, rx) = mpsc::channel::<HidReport>(64);
2025-06-12 01:48:48 -05:00
let _unused = InputAggregator::new(self.dev_mode, tx.clone()); // TODO: wire this up properly
2025-06-08 18:11:44 -05:00
let outbound = ReceiverStream::new(rx);
let response = match client.stream(Request::new(outbound)).await {
Ok(r) => r,
Err(e) => {
2025-06-11 00:37:01 -05:00
error!("stream RPC error: {e}, sleeping 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
2025-06-08 18:11:44 -05:00
continue;
}
};
let mut inbound = response.into_inner();
// read inbound
while let Some(res) = inbound.message().await.transpose() {
match res {
Ok(report) => {
2025-06-12 01:48:48 -05:00
tracing::debug!(?report.kind, "server inbound");
2025-06-08 18:11:44 -05:00
},
Err(e) => {
error!("Inbound error: {e}");
break;
}
}
}
2025-06-11 00:37:01 -05:00
warn!("Inbound ended. Will try to reconnect in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
2025-06-08 18:11:44 -05:00
}
}
2025-06-08 04:11:58 -05:00
}