video updates

This commit is contained in:
Brad Stein 2025-06-26 18:29:14 -05:00
parent db9239acbc
commit 74cf5a46ee
3 changed files with 62 additions and 90 deletions

View File

@ -123,49 +123,43 @@ impl LesavkaClientApp {
/*──────────────── keyboard stream ───────────────*/ /*──────────────── keyboard stream ───────────────*/
async fn stream_loop_keyboard(&self, ep: Channel) { async fn stream_loop_keyboard(&self, ep: Channel) {
loop { loop {
info!("⌨️ connect {}", self.server_addr); info!("⌨️ dial {}", self.server_addr); // LESAVKA-client
let mut cli = RelayClient::new(ep.clone()); let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok()); // ✅ use kbd_tx here fixes E0271
let outbound = BroadcastStream::new(self.kbd_tx.subscribe())
.filter_map(|r| r.ok());
match cli.stream_keyboard(Request::new(outbound)).await { match cli.stream_keyboard(Request::new(outbound)).await {
Ok(mut resp) => { Ok(mut resp) => {
// Drain echoes so the h2 window never fills up. while let Some(msg) = resp.get_mut().message().await.transpose() {
tokio::spawn(async move { if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; }
while let Some(_) = resp.get_mut().message().await.transpose() {} }
warn!("⌨️ server closed keyboard stream");
});
}
Err(e) => {
error!("stream_keyboard: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
} }
Err(e) => warn!("⌨️ connect failed: {e}"),
} }
std::future::pending::<()>().await; tokio::time::sleep(Duration::from_secs(1)).await; // retry
} }
} }
/*──────────────── mouse stream ──────────────────*/ /*──────────────── mouse stream ──────────────────*/
async fn stream_loop_mouse(&self, ep: Channel) { async fn stream_loop_mouse(&self, ep: Channel) {
loop { loop {
info!("🖱️ connect {}", self.server_addr); info!("🖱️ dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone()); let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok()); let outbound = BroadcastStream::new(self.mou_tx.subscribe())
.filter_map(|r| r.ok());
match cli.stream_mouse(Request::new(outbound)).await { match cli.stream_mouse(Request::new(outbound)).await {
Ok(mut resp) => { Ok(mut resp) => {
tokio::spawn(async move { while let Some(msg) = resp.get_mut().message().await.transpose() {
while let Some(_) = resp.get_mut().message().await.transpose() {} if let Err(e) = msg { warn!("🖱️ server err: {e}"); break; }
warn!("🖱️ server closed mouse stream"); }
});
}
Err(e) => {
error!("stream_mouse: {e}");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
} }
Err(e) => warn!("🖱️ connect failed: {e}"),
} }
std::future::pending::<()>().await; tokio::time::sleep(Duration::from_secs(1)).await;
} }
} }

View File

@ -9,7 +9,7 @@ use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream}; use tokio_stream::{wrappers::ReceiverStream};
use tonic::{transport::Server, Request, Response, Status}; use tonic::{transport::Server, Request, Response, Status};
use anyhow::Context as _; use anyhow::Context as _;
use tracing::{info, trace, error}; use tracing::{info, trace, warn, error};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tracing_appender::non_blocking; use tracing_appender::non_blocking;
use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::non_blocking::WorkerGuard;
@ -110,6 +110,15 @@ impl Handler {
async fn make(gadget: UsbGadget) -> anyhow::Result<Self> { async fn make(gadget: UsbGadget) -> anyhow::Result<Self> {
info!("🛠️ Handler::make - cycling gadget ..."); info!("🛠️ Handler::make - cycling gadget ...");
gadget.cycle()?; gadget.cycle()?;
let ctrl = UsbGadget::find_controller()?;
let state = UsbGadget::wait_state_any(&ctrl, 5_000)?;
match state.as_str() {
"configured" => info!("✅ host enumerated (configured)"),
"not attached" => warn!("⚠️ host absent HID writes will be queued"),
_ => warn!("⚠️ unexpected UDC state: {state}"),
}
tokio::time::sleep(Duration::from_millis(1000)).await; tokio::time::sleep(Duration::from_millis(1000)).await;
info!("🛠️ opening HID endpoints ..."); info!("🛠️ opening HID endpoints ...");
@ -154,38 +163,15 @@ impl Relay for Handler {
const SPINS: usize = 20; const SPINS: usize = 20;
for _ in 0..SPINS { for _ in 0..SPINS {
match kb.lock().await.write(&pkt.data).await { match kb.lock().await.write(&pkt.data).await {
// Ok(n) if n == pkt.data.len() => { Ok(n) if n == pkt.data.len() => break, // success
// trace!("⌨️ wrote {}", pkt.data.iter() Ok(_) => continue, // short write
// .map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" ")); Err(ref e) if matches!(e.raw_os_error(),
// break; Some(libc::EPIPE)|Some(libc::ENODEV)) => break, // host gone
// },
// Ok(_) | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// std::hint::spin_loop();
// continue; // try again
// }
// Err(e)
// if matches!(e.raw_os_error(),
// Some(libc::EBUSY) | // still opening
// Some(libc::ENODEV) | // gadget notyet configured
// Some(libc::EPIPE) | // host vanished
// Some(libc::EINVAL) | // host hasnt accepted EP config yet
// Some(libc::EAGAIN)) // nonblocking
// => {
// tokio::time::sleep(Duration::from_millis(10)).await;
// continue;
// }
// Err(e) => return Err(Status::internal(e.to_string())),
Ok(n) if n == pkt.data.len() => { // success
trace!("⌨️ wrote {}", pkt.data.iter().map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
}
Ok(_) => continue,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::hint::spin_loop(); std::hint::spin_loop();
continue; continue;
} }
Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, Err(e) => { tracing::error!("hid write: {e}"); break; }
Err(e) => return Err(Status::internal(e.to_string())),
} }
} }
tx.send(Ok(pkt)).await;//.ok(); // best-effort echo tx.send(Ok(pkt)).await;//.ok(); // best-effort echo
@ -210,40 +196,18 @@ impl Relay for Handler {
while let Some(pkt) = s.next().await.transpose()? { while let Some(pkt) = s.next().await.transpose()? {
loop { loop {
match ms.lock().await.write(&pkt.data).await { match ms.lock().await.write(&pkt.data).await {
// Ok(n) if n == pkt.data.len() => { Ok(n) if n == pkt.data.len() => break, // success
// trace!("🖱️ wrote {}", pkt.data.iter() Ok(_) => continue, // short write
// .map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" ")); Err(ref e) if matches!(e.raw_os_error(),
// break; Some(libc::EPIPE)|Some(libc::ENODEV)) => break, // host gone
// } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// Ok(_) | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { std::hint::spin_loop();
// std::hint::spin_loop(); continue;
// continue; // try again }
// } Err(e) => { tracing::error!("hid write: {e}"); break; }
// Err(e)
// if matches!(e.raw_os_error(),
// Some(libc::EBUSY) | // still opening
// Some(libc::ENODEV) | // gadget notyet configured
// Some(libc::EPIPE) | // host vanished
// Some(libc::EINVAL) | // host hasnt accepted EP config yet
// Some(libc::EAGAIN)) // nonblocking
// => {
// tokio::time::sleep(Duration::from_millis(10)).await;
// continue;
// }
// Err(e) => return Err(Status::internal(e.to_string())),
Ok(n) if n == pkt.data.len() => { // success
trace!("⌨️ wrote {}", pkt.data.iter().map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
}
Ok(_) => continue,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::hint::spin_loop();
continue;
}
Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => break,
Err(e) => return Err(Status::internal(e.to_string())),
} }
} } // <-- closes `loop {`
let _ = tx.send(Ok(pkt)).await; let _ = tx.send(Ok(pkt)).await;
} }
Ok::<(), Status>(()) Ok::<(), Status>(())

View File

@ -20,7 +20,7 @@ impl UsbGadget {
/* helpers */ /* helpers */
/// Find the first controller in /sys/class/udc (e.g. `1000480000.usb`) /// Find the first controller in /sys/class/udc (e.g. `1000480000.usb`)
fn find_controller() -> Result<String> { pub fn find_controller() -> Result<String> {
Ok(fs::read_dir("/sys/class/udc")? Ok(fs::read_dir("/sys/class/udc")?
.next() .next()
.transpose()? .transpose()?
@ -45,6 +45,20 @@ impl UsbGadget {
fs::read_to_string(&path).unwrap_or_default())) fs::read_to_string(&path).unwrap_or_default()))
} }
pub fn wait_state_any(ctrl: &str, limit_ms: u64) -> anyhow::Result<String> {
let path = format!("/sys/class/udc/{ctrl}/state");
for _ in 0..=limit_ms / 50 {
if let Ok(s) = std::fs::read_to_string(&path) {
let s = s.trim();
if matches!(s, "configured" | "not attached") {
return Ok(s.to_owned());
}
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
Err(anyhow::anyhow!("UDC state did not settle within {limit_ms}ms"))
}
/// Write `value` (plus “\n”) into a sysfs attribute /// Write `value` (plus “\n”) into a sysfs attribute
fn write_attr<P: AsRef<Path>>(p: P, value: &str) -> Result<()> { fn write_attr<P: AsRef<Path>>(p: P, value: &str) -> Result<()> {
OpenOptions::new() OpenOptions::new()