From 00696baa5eddf046be8b159ce1a110d6406f79bb Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 28 Jun 2025 15:45:11 -0500 Subject: [PATCH] Video Reliability fixes --- server/src/main.rs | 14 +++++++------- server/src/usb_gadget.rs | 40 ++++++++++++++++++++-------------------- server/src/video.rs | 32 ++++++++++++++++++++++++-------- 3 files changed, 51 insertions(+), 35 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index b0c0c2c..eb06437 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,4 +1,4 @@ -//! lesavka‑server – **auto‑cycle disabled** +//! lesavka-server - **auto-cycle disabled** // server/src/main.rs #![forbid(unsafe_code)] @@ -79,9 +79,9 @@ impl Handler { async fn new(gadget: UsbGadget) -> anyhow::Result { if AUTO_CYCLE { info!("🛠️ Initial USB reset…"); - let _ = gadget.cycle(); // ignore failure – may boot without host + let _ = gadget.cycle(); // ignore failure - may boot without host } else { - info!("🛠️ AUTO_CYCLE disabled – no initial reset"); + info!("🛠️ AUTO_CYCLE disabled - no initial reset"); } info!("🛠️ opening HID endpoints …"); @@ -101,7 +101,7 @@ impl Handler { /*──────────────── gRPC service ─────────────*/ #[tonic::async_trait] impl Relay for Handler { - /* existing streams ─ unchanged, except: no more auto‑reset */ + /* existing streams ─ unchanged, except: no more auto-reset */ type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; type CaptureVideoStream = Pin> + Send + Sync>>; @@ -131,7 +131,7 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { - let (tx, rx) = tokio::sync::mpsc::channel(4096); + let (tx, rx) = tokio::sync::mpsc::channel(1024); let ms = self.ms.clone(); tokio::spawn(async move { @@ -165,7 +165,7 @@ impl Relay for Handler { Ok(Response::new(Box::pin(s))) } - /*────────────── USB‑reset RPC ───────────*/ + /*────────────── USB-reset RPC ───────────*/ async fn reset_usb( &self, _req: Request, @@ -194,7 +194,7 @@ async fn main() -> anyhow::Result<()> { let gadget = UsbGadget::new("lesavka"); let handler = Handler::new(gadget.clone()).await?; - info!("🌐 lesavka‑server listening on 0.0.0.0:50051"); + info!("🌐 lesavka-server listening on 0.0.0.0:50051"); Server::builder() .tcp_nodelay(true) .max_frame_size(Some(2*1024*1024)) diff --git a/server/src/usb_gadget.rs b/server/src/usb_gadget.rs index 98ed2f1..f982f93 100644 --- a/server/src/usb_gadget.rs +++ b/server/src/usb_gadget.rs @@ -22,7 +22,7 @@ impl UsbGadget { Ok(std::fs::read_to_string(p)?.trim().to_owned()) } - /*–––– helpers ––––*/ + /*---- helpers ----*/ /// Find the first controller in /sys/class/udc (e.g. `1000480000.usb`) pub fn find_controller() -> Result { @@ -35,7 +35,7 @@ impl UsbGadget { .into_owned()) } - /// Busy‑loop (≤ `limit_ms`) until `state` matches `wanted` + /// Busy-loop (≤ `limit_ms`) until `state` matches `wanted` fn wait_state(ctrl: &str, wanted: &str, limit_ms: u64) -> Result<()> { let path = format!("/sys/class/udc/{ctrl}/state"); for _ in 0..=limit_ms / 50 { @@ -81,7 +81,7 @@ impl UsbGadget { } thread::sleep(Duration::from_millis(50)); } - Err(anyhow::anyhow!("⚠️ UDC {ctrl} did not re‑appear within {limit_ms} ms")) + Err(anyhow::anyhow!("⚠️ UDC {ctrl} did not re-appear within {limit_ms} ms")) } /// Scan platform devices when /sys/class/udc is empty @@ -93,20 +93,20 @@ impl UsbGadget { Ok(None) } - /*–––– public API ––––*/ + /*---- public API ----*/ - /// Hard‑reset the gadget → identical to a physical cable re‑plug + /// Hard-reset the gadget → identical to a physical cable re-plug pub fn cycle(&self) -> Result<()> { - /* 0 – ensure we *know* the controller even after a previous crash */ + /* 0 - ensure we *know* the controller even after a previous crash */ let ctrl = Self::find_controller() .or_else(|_| Self::probe_platform_udc()? .ok_or_else(|| anyhow::anyhow!("no UDC present")))?; - /* 1 – detach gadget */ + /* 1 - detach gadget */ info!("🔌 detaching gadget from {ctrl}"); - // a) drop pull‑ups (if the controller offers the switch) + // a) drop pull-ups (if the controller offers the switch) let sc = format!("/sys/class/udc/{ctrl}/soft_connect"); - let _ = Self::write_attr(&sc, "0"); // ignore errors – not all HW has it + let _ = Self::write_attr(&sc, "0"); // ignore errors - not all HW has it // b) clear the UDC attribute; the kernel may transiently answer EBUSY for attempt in 1..=10 { @@ -118,7 +118,7 @@ impl UsbGadget { .and_then(|io| io.raw_os_error()) == Some(libc::EBUSY) && attempt < 10 } => { - trace!("⏳ UDC busy (attempt {attempt}/10) – retrying…"); + trace!("⏳ UDC busy (attempt {attempt}/10) - retrying…"); thread::sleep(Duration::from_millis(100)); } Err(err) => return Err(err), @@ -126,14 +126,14 @@ impl UsbGadget { } Self::wait_state(&ctrl, "not attached", 3_000)?; - /* 2 – reset driver */ + /* 2 - reset driver */ Self::rebind_driver(&ctrl)?; - /* 3 – wait UDC node to re‑appear */ + /* 3 - wait UDC node to re-appear */ Self::wait_udc_present(&ctrl, 3_000)?; - /* 4 – re‑attach + pull‑up */ - info!("🔌 re‑attaching gadget to {ctrl}"); + /* 4 - re-attach + pull-up */ + info!("🔌 re-attaching gadget to {ctrl}"); Self::write_attr(self.udc_file, &ctrl)?; if Path::new(&sc).exists() { // try to set the pull-up; ignore if the kernel rejects it @@ -156,24 +156,24 @@ impl UsbGadget { } } - /* 5 – wait for host (but tolerate sleep) */ + /* 5 - wait for host (but tolerate sleep) */ Self::wait_state(&ctrl, "configured", 6_000) .or_else(|e| { // If the host is physically absent (sleep / KVM paused) - // we allow 'not attached' and continue – we can still + // we allow 'not attached' and continue - we can still // accept keyboard/mouse data and the host will enumerate // later without another reset. let last = fs::read_to_string(format!("/sys/class/udc/{ctrl}/state")) .unwrap_or_default(); if last.trim() == "not attached" { - warn!("⚠️ host did not enumerate within 6 s – continuing (state = {last:?})"); + warn!("⚠️ host did not enumerate within 6 s - continuing (state = {last:?})"); Ok(()) } else { Err(e) } })?; - info!("✅ USB‑gadget cycle complete"); + info!("✅ USB-gadget cycle complete"); Ok(()) } @@ -190,7 +190,7 @@ impl UsbGadget { match Self::write_attr(format!("{root}/unbind"), ctrl) { Ok(_) => break, Err(err) if attempt < 20 && Self::is_still_detaching(&err) => { - trace!("unbind in‑progress (#{attempt}) – waiting…"); + trace!("unbind in-progress (#{attempt}) - waiting…"); thread::sleep(Duration::from_millis(100)); } Err(err) => return Err(err) @@ -205,7 +205,7 @@ impl UsbGadget { match Self::write_attr(format!("{root}/bind"), ctrl) { Ok(_) => return Ok(()), // success 🎉 Err(err) if attempt < 20 && Self::is_still_detaching(&err) => { - trace!("bind busy (#{attempt}) – retrying…"); + trace!("bind busy (#{attempt}) - retrying…"); thread::sleep(Duration::from_millis(100)); } Err(err) => return Err(err) diff --git a/server/src/video.rs b/server/src/video.rs index e55d655..be41145 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -4,7 +4,7 @@ use anyhow::Context; use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; -use gst::log; +use gst::{log, MessageView}; use lesavka_common::lesavka::VideoPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; @@ -21,24 +21,31 @@ pub async fn eye_ball( let eye = EYE_ID[id as usize]; gst::init().context("gst init")?; + // let desc = format!( + // "v4l2src device={dev} io-mode=mmap ! \ + // video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \ + // h264parse config-interval=-1 ! \ + // appsink name=sink emit-signals=true drop=true sync=false" + // ); let desc = format!( - "v4l2src device={dev} io-mode=mmap ! \ - video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \ - h264parse config-interval=-1 ! \ - appsink name=sink emit-signals=true drop=true sync=false" + "v4l2src device={dev} io-mode=mmap ! videorate skip-to-first=true ! \ + queue2 max-size-buffers=0 max-size-bytes=0 min-threshold-time=10000000 ! \ + video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \ + h264parse config-interval=1 ! \ + appsink name=sink emit-signals=true drop=false sync=false" ); let pipeline = gst::parse::launch(&desc)? .downcast::() - .expect("pipeline down‑cast"); + .expect("pipeline down-cast"); let sink = pipeline .by_name("sink") .expect("appsink") .dynamic_cast::() - .expect("appsink down‑cast"); + .expect("appsink down-cast"); - let (tx, rx) = tokio::sync::mpsc::channel(256); + let (tx, rx) = tokio::sync::mpsc::channel(8192); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() @@ -102,5 +109,14 @@ pub async fn eye_ball( ); pipeline.set_state(gst::State::Playing)?; + let bus = pipeline.bus().unwrap(); + loop { + match bus.timed_pop(gst::ClockTime::NONE) { + Some(msg) if matches!(msg.view(), MessageView::StateChanged(s) + if s.current() == gst::State::Playing) => break, + Some(_) => continue, + None => continue, + } + } Ok(ReceiverStream::new(rx)) }