This commit is contained in:
Brad Stein 2025-06-15 22:15:50 -05:00
parent 4a8ad7c3b3
commit cf23c31a60
4 changed files with 52 additions and 10 deletions

View File

@ -7,7 +7,7 @@ use tokio::{sync::mpsc, sync::broadcast, task::JoinHandle};
use tokio_stream::StreamExt as _;
use tokio_stream::wrappers::{ReceiverStream, BroadcastStream};
use tonic::Request;
use tracing::{info, warn, error};
use tracing::{info, warn, error, debug};
use navka_common::navka::{relay_client::RelayClient, HidReport};
use crate::input::inputs::InputAggregator;
@ -94,7 +94,7 @@ impl NavkaClientApp {
async fn reconnect_loop(&self) {
loop {
// dial the servers
info!("Dialing server at: {}", self.server_addr);
info!("📞 dialling {}", self.server_addr);
let mut client = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => {
@ -105,9 +105,12 @@ impl NavkaClientApp {
};
// fresh reader over the *same* broadcast channel
let outbound = BroadcastStream::new(self.tx.subscribe()).filter_map(|r| async { r.ok() });
let mut rx = self.tx.subscribe();
let outbound = BroadcastStream::new(rx.clone()).filter_map(|r| async { r.ok() });
info!("🛫 spawning stream()");
let response = match client.stream(Request::new(outbound)).await {
Ok(r) => r,
Ok(r) => { info!("✅ stream established"); r },
Err(e) => {
error!("stream RPC error: {e}, sleeping 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
@ -119,7 +122,7 @@ impl NavkaClientApp {
while let Some(res) = inbound.message().await.transpose() {
match res {
Ok(report) => {
tracing::debug!(?report.kind, "server inbound");
debug!(?report.kind, "↩️ echo from server");
},
Err(e) => {
error!("Inbound error: {e}");
@ -127,7 +130,7 @@ impl NavkaClientApp {
}
}
}
warn!("Diconnected. Inbound ended. Will try to reconnect in 1s");
warn!("🔌 disconnected retrying in 1s");
tokio::time::sleep(Duration::from_secs(1)).await;
}
}

View File

@ -115,10 +115,10 @@ impl KeyboardAggregator {
match self.tx.try_send(msg.clone()) {
Ok(n) => {
tracing::trace!("queued → {} receiver(s)", n);
info!("📤 sent HID report → {n} subscriber(s)");
}
Err(e) => {
tracing::warn!("try_send dropped report ({e}); falling back to send()");
tracing::warn!("❌ try_send failed: {e}");
let _ = self.tx.send(msg);
}
}

View File

@ -8,6 +8,7 @@ use std::env;
use std::fs::OpenOptions;
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
use tracing_subscriber::{fmt, EnvFilter};
#[tokio::main]
async fn main() -> Result<()> {
@ -37,4 +38,21 @@ async fn main() -> Result<()> {
let mut app = NavkaClientApp::new()?;
app.run().await
}
}
fmt()
.with_env_filter(
// honour RUST_LOG but fall back to very chatty defaults
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(
// componentlevel granularity
"navka_client=trace,\
navka_server=trace,\
tonic=debug,\
h2=debug,\
tower=debug"))
)
.with_target(true) // show module path
.with_thread_ids(true)
.with_file(true)
.init();

View File

@ -7,6 +7,7 @@ use tokio::{fs::{File, OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tonic::{transport::Server, Request, Response, Status};
use tracing::{info, error};
use tracing_subscriber::{fmt, EnvFilter};
use navka_common::navka::{
relay_server::{Relay, RelayServer},
@ -40,23 +41,26 @@ impl Relay for Handler {
match msg.kind {
Some(hid_report::Kind::KeyboardReport(ref v)) if v.len() == 8 => {
kb.lock().await.write_all(v).await?;
trace!(" └─ wrote 8B to /dev/hidg0");
}
Some(hid_report::Kind::MouseReport(ref v)) if v.len() == 4 => {
ms.lock().await.write_all(v).await?;
trace!(" └─ wrote 4B to /dev/hidg1");
}
_ => {
error!(?msg.kind, "⚠️ malformed packet");
let bad_len = match &msg.kind {
Some(hid_report::Kind::KeyboardReport(v)) => v.len(),
Some(hid_report::Kind::MouseReport(v)) => v.len(),
_ => 0,
};
error!("bad {:?} packet len={}", msg.kind, bad_len);
continue;
}
}
info!("HID report forwarded");
let _ = tx.send(Ok(msg)).await;
}
info!("🔚 client stream closed");
Ok::<_, Status>(())
});
@ -94,3 +98,20 @@ async fn main() -> anyhow::Result<()> {
.await?;
Ok(())
}
fmt()
.with_env_filter(
// honour RUST_LOG but fall back to very chatty defaults
EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(
// componentlevel granularity
"navka_client=trace,\
navka_server=trace,\
tonic=debug,\
h2=debug,\
tower=debug"))
)
.with_target(true) // show module path
.with_thread_ids(true)
.with_file(true)
.init();