From 4ef2c9337e35f0f9e40c3ee8c9d22a0cec03bb90 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 25 Jun 2025 15:13:49 -0500 Subject: [PATCH] updates --- server/src/lib.rs | 1 - server/src/main.rs | 86 ++++++++++++++-------------------------- server/src/usb_gadget.rs | 1 + server/src/usb_reset.rs | 21 ---------- server/src/video.rs | 5 +-- 5 files changed, 32 insertions(+), 82 deletions(-) delete mode 100644 server/src/usb_reset.rs diff --git a/server/src/lib.rs b/server/src/lib.rs index 33cfb8b..f149978 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,5 +1,4 @@ // server/src/lib.rs pub mod video; -pub mod usb_reset; pub mod usb_gadget; diff --git a/server/src/main.rs b/server/src/main.rs index 2d9ea76..9e9e6ee 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,6 +4,7 @@ use futures_util::{Stream, StreamExt}; use std::{pin::Pin, sync::Arc, time::Duration}; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex}; use tokio_stream::{wrappers::ReceiverStream}; use tonic::{transport::Server, Request, Response, Status}; @@ -11,7 +12,7 @@ use tracing::{info, trace, warn}; use tracing_subscriber::{fmt, EnvFilter}; use udev::{MonitorBuilder}; -use lesavka_server::{usb_gadget::UsbGadget, video, usb_reset}; +use lesavka_server::{usb_gadget::UsbGadget, video}; use lesavka_common::lesavka::{ relay_server::{Relay, RelayServer}, @@ -19,76 +20,25 @@ use lesavka_common::lesavka::{ MonitorRequest, VideoPacket, }; -/*─────────────────── GC311 discovery ───────────────────*/ -fn list_gc311_devices() -> anyhow::Result> { - let mut v = Vec::new(); - for entry in std::fs::read_dir("/sys/class/video4linux")? { - let path = entry?.path(); - let name = std::fs::read_to_string(path.join("name"))?; - if name.to_lowercase().contains("gc311") { - v.push( - path.file_name() - .unwrap() - .to_string_lossy() - .replace("video", "/dev/video"), - ); - } - } - v.sort(); - Ok(v) -} - -// /// background task: whenever GC311 disappears, cycle USB port -// async fn monitor_gc311_disconnect() -> anyhow::Result<()> { -// let mut mon = MonitorBuilder::new()? -// .match_subsystem("usb")? -// // .match_tag("PRODUCT", "7ca/3311/*")? // vendor: 0x07ca, device 0x3311 -// .listen()?; - -// // Blocking I/O -> move into a dedicated thread -// tokio::task::spawn_blocking(move || { -// for ev in mon { // `Socket` implements `Iterator` -// if ev.event_type() == udev::EventType::Remove { -// if let (Some(prod), Some(bus), Some(dev)) = -// (ev.attribute_value("PRODUCT"), -// ev.attribute_value("busnum"), -// ev.attribute_value("devnum")) -// { -// // 0x07ca / 0x3311 == AVerMedia GC311 -// if prod.to_str().map_or(false, |p| p.starts_with("7ca/3311/")) { -// usb_reset::cycle_port(bus.to_str().unwrap(), -// dev.to_str().unwrap()); -// } -// } -// } -// } -// }); -// // if ev.event_type() == udev::EventType::Remove { -// // if let (Some(bus), Some(dev)) = (ev.attribute_value("busnum"), ev.attribute_value("devnum")) { -// // usb_reset::cycle_port(bus.to_str().unwrap(), dev.to_str().unwrap()); -// // } -// // } -// // } -// Ok(()) -// } - /*─────────────────── tonic service ─────────────────────*/ struct Handler { kb: Arc>, ms: Arc>, gadget: UsbGadget, + did_cycle: AtomicBool, } impl Handler { async fn make(gadget: UsbGadget) -> anyhow::Result { - gadget.cycle().ok(); let kb = OpenOptions::new().write(true).open("/dev/hidg0").await?; let ms = OpenOptions::new().write(true) .custom_flags(libc::O_NONBLOCK) .open("/dev/hidg1").await?; Ok(Self { kb: Arc::new(Mutex::new(kb)), ms: Arc::new(Mutex::new(ms)), - gadget }) + gadget, + did_cycle: AtomicBool::new(false), + }) } } @@ -103,13 +53,35 @@ impl Relay for Handler { req: Request>, ) -> Result, Status> { // self.gadget.cycle().map_err(|e| Status::internal(e.to_string()))?; + if !self.did_cycle.swap(true, Ordering::SeqCst) { + self.gadget + .cycle() + .map_err(|e| Status::internal(e.to_string()))?; + tokio::time::sleep(Duration::from_millis(500)).await; + } let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { - kb.lock().await.write_all(&pkt.data).await?; + // kb.lock().await.write_all(&pkt.data).await?; + for attempt in 0..50 { + match kb.lock().await.write_all(&pkt.data).await { + Ok(()) => { + trace!("⌨️ wrote {}", pkt.data.iter() + .map(|b| format!("{b:02X}")).collect::>().join(" ")); + break; + }, + Err(e) if e.raw_os_error() == Some(libc::EBUSY) => { + tokio::time::sleep(Duration::from_millis(20)).await; + } + Err(e) => return Err(Status::internal(e.to_string())), + } + if attempt == 49 { + return Err(Status::internal("hidg0 stayed BUSY")); + } + } tx.send(Ok(pkt)).await;//.ok(); // best-effort echo } Ok::<(), Status>(()) diff --git a/server/src/usb_gadget.rs b/server/src/usb_gadget.rs index 7c7903f..44b1998 100644 --- a/server/src/usb_gadget.rs +++ b/server/src/usb_gadget.rs @@ -26,6 +26,7 @@ impl UsbGadget { .transpose()? .context("no UDC present")? .file_name(); + info!("UDC‑cycle: re‑attaching to {}", udc_name.to_string_lossy()); OpenOptions::new().write(true).open(self.udc_file)? .write_all(udc_name.to_str().unwrap().as_bytes())?; info!("USB‑gadget cycled"); diff --git a/server/src/usb_reset.rs b/server/src/usb_reset.rs deleted file mode 100644 index b941dd4..0000000 --- a/server/src/usb_reset.rs +++ /dev/null @@ -1,21 +0,0 @@ -// server/src/usb_reset.rs -//! Helpers to (re‑)power GC311 if udev reports a disconnect. - -use std::process::Command; -use tracing::{info, warn}; - -/// Try to cycle power on the USB port where the GC311 was. -/// Works only when the Pi is behind a hub that supports per‑port power control. -/// Uses `uhubctl`, which must be `apt install uhubctl`. -pub fn cycle_port(busnum: &str, devnum: &str) { - warn!("GC311 disappeared ({}:{}), cycling port power", busnum, devnum); - // example: uhubctl -l 1-1 -p 2 -a cycle - // mapping port# requires lsusb -t; we fall back to a generic bus reset - let _ = Command::new("sh") - .arg("-c") - .arg(format!("echo 0 | sudo tee /sys/bus/usb/devices/{busnum}-{devnum}/authorized; \ - sleep 1; \ - echo 1 | sudo tee /sys/bus/usb/devices/{busnum}-{devnum}/authorized")) - .status(); - info!("port cycle issued"); -} diff --git a/server/src/video.rs b/server/src/video.rs index 37a5c47..a5cc3f6 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -17,9 +17,8 @@ pub async fn spawn_camera( // v4l2src → H.264 already, we only parse & relay let desc = format!( - "v4l2src device={dev} io-mode=dmabuf ! queue ! h264parse config-interval=-1 ! \ - video/x-h264,stream-format=byte-stream,profile=baseline,level=4,\ - bitrate={max_bitrate_kbit}000 ! appsink name=sink emit-signals=true sync=false" + "v4l2src device={dev} io-mode=auto ! queue ! h264parse config-interval=-1 ! \ + appsink name=sink emit-signals=true sync=false" ); let pipeline = gst::parse::launch(&desc)?