From 74cf5a46ee1d0c9ebaac9cedfecdebf4e103a0fa Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 26 Jun 2025 18:29:14 -0500 Subject: [PATCH] video updates --- client/src/app.rs | 48 ++++++++++------------ server/src/main.rs | 88 ++++++++++++---------------------------- server/src/usb_gadget.rs | 16 +++++++- 3 files changed, 62 insertions(+), 90 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index b918569..a9d3cc2 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -123,49 +123,43 @@ impl LesavkaClientApp { /*──────────────── keyboard stream ───────────────*/ async fn stream_loop_keyboard(&self, ep: Channel) { loop { - info!("⌨️ connect {}", self.server_addr); + info!("⌨️ dial {}", self.server_addr); // LESAVKA-client let mut cli = RelayClient::new(ep.clone()); - - let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok()); + + // ✅ use kbd_tx here – fixes E0271 + let outbound = BroadcastStream::new(self.kbd_tx.subscribe()) + .filter_map(|r| r.ok()); + match cli.stream_keyboard(Request::new(outbound)).await { Ok(mut resp) => { - // Drain echoes so the h2 window never fills up. - tokio::spawn(async move { - while let Some(_) = resp.get_mut().message().await.transpose() {} - warn!("⌨️ server closed keyboard stream"); - }); - } - Err(e) => { - error!("stream_keyboard: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; + while let Some(msg) = resp.get_mut().message().await.transpose() { + if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; } + } } + Err(e) => warn!("⌨️ connect failed: {e}"), } - std::future::pending::<()>().await; + tokio::time::sleep(Duration::from_secs(1)).await; // retry } } /*──────────────── mouse stream ──────────────────*/ async fn stream_loop_mouse(&self, ep: Channel) { loop { - info!("🖱️ connect {}", self.server_addr); + info!("🖱️ dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); - - let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok()); + + let outbound = BroadcastStream::new(self.mou_tx.subscribe()) + .filter_map(|r| r.ok()); + match cli.stream_mouse(Request::new(outbound)).await { Ok(mut resp) => { - tokio::spawn(async move { - while let Some(_) = resp.get_mut().message().await.transpose() {} - warn!("🖱️ server closed mouse stream"); - }); - } - Err(e) => { - error!("stream_mouse: {e}"); - tokio::time::sleep(Duration::from_secs(1)).await; - continue; + while let Some(msg) = resp.get_mut().message().await.transpose() { + if let Err(e) = msg { warn!("🖱️ server err: {e}"); break; } + } } + Err(e) => warn!("🖱️ connect failed: {e}"), } - std::future::pending::<()>().await; + tokio::time::sleep(Duration::from_secs(1)).await; } } diff --git a/server/src/main.rs b/server/src/main.rs index fdcce9f..d96d248 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -9,7 +9,7 @@ use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex}; use tokio_stream::{wrappers::ReceiverStream}; use tonic::{transport::Server, Request, Response, Status}; use anyhow::Context as _; -use tracing::{info, trace, error}; +use tracing::{info, trace, warn, error}; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use tracing_appender::non_blocking; use tracing_appender::non_blocking::WorkerGuard; @@ -110,6 +110,15 @@ impl Handler { async fn make(gadget: UsbGadget) -> anyhow::Result { info!("🛠️ Handler::make - cycling gadget ..."); gadget.cycle()?; + + let ctrl = UsbGadget::find_controller()?; + let state = UsbGadget::wait_state_any(&ctrl, 5_000)?; + match state.as_str() { + "configured" => info!("✅ host enumerated (configured)"), + "not attached" => warn!("⚠️ host absent – HID writes will be queued"), + _ => warn!("⚠️ unexpected UDC state: {state}"), + } + tokio::time::sleep(Duration::from_millis(1000)).await; info!("🛠️ opening HID endpoints ..."); @@ -154,38 +163,15 @@ impl Relay for Handler { const SPINS: usize = 20; for _ in 0..SPINS { match kb.lock().await.write(&pkt.data).await { - // Ok(n) if n == pkt.data.len() => { - // trace!("⌨️ wrote {}", pkt.data.iter() - // .map(|b| format!("{b:02X}")).collect::>().join(" ")); - // break; - // }, - // Ok(_) | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // std::hint::spin_loop(); - // continue; // try again - // } - // Err(e) - // if matches!(e.raw_os_error(), - // Some(libc::EBUSY) | // still opening - // Some(libc::ENODEV) | // gadget not‑yet configured - // Some(libc::EPIPE) | // host vanished - // Some(libc::EINVAL) | // host hasn’t accepted EP config yet - // Some(libc::EAGAIN)) // non‑blocking - // => { - // tokio::time::sleep(Duration::from_millis(10)).await; - // continue; - // } - // Err(e) => return Err(Status::internal(e.to_string())), - Ok(n) if n == pkt.data.len() => { // success - trace!("⌨️ wrote {}", pkt.data.iter().map(|b| format!("{b:02X}")).collect::>().join(" ")); - break; - } - Ok(_) => continue, + Ok(n) if n == pkt.data.len() => break, // success + Ok(_) => continue, // short write + Err(ref e) if matches!(e.raw_os_error(), + Some(libc::EPIPE)|Some(libc::ENODEV)) => break, // host gone Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { std::hint::spin_loop(); continue; } - Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, - Err(e) => return Err(Status::internal(e.to_string())), + Err(e) => { tracing::error!("hid write: {e}"); break; } } } tx.send(Ok(pkt)).await;//.ok(); // best-effort echo @@ -210,40 +196,18 @@ impl Relay for Handler { while let Some(pkt) = s.next().await.transpose()? { loop { match ms.lock().await.write(&pkt.data).await { - // Ok(n) if n == pkt.data.len() => { - // trace!("🖱️ wrote {}", pkt.data.iter() - // .map(|b| format!("{b:02X}")).collect::>().join(" ")); - // break; - // } - // Ok(_) | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - // std::hint::spin_loop(); - // continue; // try again - // } - // Err(e) - // if matches!(e.raw_os_error(), - // Some(libc::EBUSY) | // still opening - // Some(libc::ENODEV) | // gadget not‑yet configured - // Some(libc::EPIPE) | // host vanished - // Some(libc::EINVAL) | // host hasn’t accepted EP config yet - // Some(libc::EAGAIN)) // non‑blocking - // => { - // tokio::time::sleep(Duration::from_millis(10)).await; - // continue; - // } - // Err(e) => return Err(Status::internal(e.to_string())), - Ok(n) if n == pkt.data.len() => { // success - trace!("⌨️ wrote {}", pkt.data.iter().map(|b| format!("{b:02X}")).collect::>().join(" ")); - break; - } - Ok(_) => continue, - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - std::hint::spin_loop(); - continue; - } - Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, - Err(e) => return Err(Status::internal(e.to_string())), + Ok(n) if n == pkt.data.len() => break, // success + Ok(_) => continue, // short write + Err(ref e) if matches!(e.raw_os_error(), + Some(libc::EPIPE)|Some(libc::ENODEV)) => break, // host gone + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::hint::spin_loop(); + continue; + } + Err(e) => { tracing::error!("hid write: {e}"); break; } } - } + } // <-- closes `loop {` + let _ = tx.send(Ok(pkt)).await; } Ok::<(), Status>(()) diff --git a/server/src/usb_gadget.rs b/server/src/usb_gadget.rs index 34c29ae..ca8e5be 100644 --- a/server/src/usb_gadget.rs +++ b/server/src/usb_gadget.rs @@ -20,7 +20,7 @@ impl UsbGadget { /*–––– helpers ––––*/ /// Find the first controller in /sys/class/udc (e.g. `1000480000.usb`) - fn find_controller() -> Result { + pub fn find_controller() -> Result { Ok(fs::read_dir("/sys/class/udc")? .next() .transpose()? @@ -45,6 +45,20 @@ impl UsbGadget { fs::read_to_string(&path).unwrap_or_default())) } + pub fn wait_state_any(ctrl: &str, limit_ms: u64) -> anyhow::Result { + let path = format!("/sys/class/udc/{ctrl}/state"); + for _ in 0..=limit_ms / 50 { + if let Ok(s) = std::fs::read_to_string(&path) { + let s = s.trim(); + if matches!(s, "configured" | "not attached") { + return Ok(s.to_owned()); + } + } + std::thread::sleep(std::time::Duration::from_millis(50)); + } + Err(anyhow::anyhow!("UDC state did not settle within {limit_ms} ms")) + } + /// Write `value` (plus “\n”) into a sysfs attribute fn write_attr>(p: P, value: &str) -> Result<()> { OpenOptions::new()