Video Reliability fixes

This commit is contained in:
Brad Stein 2025-06-28 15:45:11 -05:00
parent 70a22040e5
commit 00696baa5e
3 changed files with 51 additions and 35 deletions

View File

@ -1,4 +1,4 @@
//! lesavkaserver **autocycle disabled** //! lesavka-server - **auto-cycle disabled**
// server/src/main.rs // server/src/main.rs
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
@ -79,9 +79,9 @@ impl Handler {
async fn new(gadget: UsbGadget) -> anyhow::Result<Self> { async fn new(gadget: UsbGadget) -> anyhow::Result<Self> {
if AUTO_CYCLE { if AUTO_CYCLE {
info!("🛠️ Initial USB reset…"); info!("🛠️ Initial USB reset…");
let _ = gadget.cycle(); // ignore failure may boot without host let _ = gadget.cycle(); // ignore failure - may boot without host
} else { } else {
info!("🛠️ AUTO_CYCLE disabled  no initial reset"); info!("🛠️ AUTO_CYCLE disabled - no initial reset");
} }
info!("🛠️ opening HID endpoints …"); info!("🛠️ opening HID endpoints …");
@ -101,7 +101,7 @@ impl Handler {
/*──────────────── gRPC service ─────────────*/ /*──────────────── gRPC service ─────────────*/
#[tonic::async_trait] #[tonic::async_trait]
impl Relay for Handler { impl Relay for Handler {
/* existing streams ─ unchanged, except: no more autoreset */ /* existing streams ─ unchanged, except: no more auto-reset */
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>; type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>; type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send + Sync>>; type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send + Sync>>;
@ -131,7 +131,7 @@ impl Relay for Handler {
&self, &self,
req: Request<tonic::Streaming<MouseReport>>, req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> { ) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(4096); let (tx, rx) = tokio::sync::mpsc::channel(1024);
let ms = self.ms.clone(); let ms = self.ms.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -165,7 +165,7 @@ impl Relay for Handler {
Ok(Response::new(Box::pin(s))) Ok(Response::new(Box::pin(s)))
} }
/*────────────── USBreset RPC ───────────*/ /*────────────── USB-reset RPC ───────────*/
async fn reset_usb( async fn reset_usb(
&self, &self,
_req: Request<ResetUsbRequest>, _req: Request<ResetUsbRequest>,
@ -194,7 +194,7 @@ async fn main() -> anyhow::Result<()> {
let gadget = UsbGadget::new("lesavka"); let gadget = UsbGadget::new("lesavka");
let handler = Handler::new(gadget.clone()).await?; let handler = Handler::new(gadget.clone()).await?;
info!("🌐 lesavkaserver listening on 0.0.0.0:50051"); info!("🌐 lesavka-server listening on 0.0.0.0:50051");
Server::builder() Server::builder()
.tcp_nodelay(true) .tcp_nodelay(true)
.max_frame_size(Some(2*1024*1024)) .max_frame_size(Some(2*1024*1024))

View File

@ -22,7 +22,7 @@ impl UsbGadget {
Ok(std::fs::read_to_string(p)?.trim().to_owned()) Ok(std::fs::read_to_string(p)?.trim().to_owned())
} }
/* helpers */ /*---- helpers ----*/
/// Find the first controller in /sys/class/udc (e.g. `1000480000.usb`) /// Find the first controller in /sys/class/udc (e.g. `1000480000.usb`)
pub fn find_controller() -> Result<String> { pub fn find_controller() -> Result<String> {
@ -35,7 +35,7 @@ impl UsbGadget {
.into_owned()) .into_owned())
} }
/// Busyloop (≤ `limit_ms`) until `state` matches `wanted` /// Busy-loop (≤ `limit_ms`) until `state` matches `wanted`
fn wait_state(ctrl: &str, wanted: &str, limit_ms: u64) -> Result<()> { fn wait_state(ctrl: &str, wanted: &str, limit_ms: u64) -> Result<()> {
let path = format!("/sys/class/udc/{ctrl}/state"); let path = format!("/sys/class/udc/{ctrl}/state");
for _ in 0..=limit_ms / 50 { for _ in 0..=limit_ms / 50 {
@ -81,7 +81,7 @@ impl UsbGadget {
} }
thread::sleep(Duration::from_millis(50)); thread::sleep(Duration::from_millis(50));
} }
Err(anyhow::anyhow!("⚠️ UDC {ctrl} did not reappear within {limit_ms}ms")) Err(anyhow::anyhow!("⚠️ UDC {ctrl} did not re-appear within {limit_ms}ms"))
} }
/// Scan platform devices when /sys/class/udc is empty /// Scan platform devices when /sys/class/udc is empty
@ -93,20 +93,20 @@ impl UsbGadget {
Ok(None) Ok(None)
} }
/* public API */ /*---- public API ----*/
/// Hardreset the gadget → identical to a physical cable replug /// Hard-reset the gadget → identical to a physical cable re-plug
pub fn cycle(&self) -> Result<()> { pub fn cycle(&self) -> Result<()> {
/* 0ensure we *know* the controller even after a previous crash */ /* 0-ensure we *know* the controller even after a previous crash */
let ctrl = Self::find_controller() let ctrl = Self::find_controller()
.or_else(|_| Self::probe_platform_udc()? .or_else(|_| Self::probe_platform_udc()?
.ok_or_else(|| anyhow::anyhow!("no UDC present")))?; .ok_or_else(|| anyhow::anyhow!("no UDC present")))?;
/* 1 detach gadget */ /* 1 - detach gadget */
info!("🔌 detaching gadget from {ctrl}"); info!("🔌 detaching gadget from {ctrl}");
// a) drop pullups (if the controller offers the switch) // a) drop pull-ups (if the controller offers the switch)
let sc = format!("/sys/class/udc/{ctrl}/soft_connect"); let sc = format!("/sys/class/udc/{ctrl}/soft_connect");
let _ = Self::write_attr(&sc, "0"); // ignore errors not all HW has it let _ = Self::write_attr(&sc, "0"); // ignore errors - not all HW has it
// b) clear the UDC attribute; the kernel may transiently answer EBUSY // b) clear the UDC attribute; the kernel may transiently answer EBUSY
for attempt in 1..=10 { for attempt in 1..=10 {
@ -118,7 +118,7 @@ impl UsbGadget {
.and_then(|io| io.raw_os_error()) .and_then(|io| io.raw_os_error())
== Some(libc::EBUSY) && attempt < 10 == Some(libc::EBUSY) && attempt < 10
} => { } => {
trace!("⏳ UDC busy (attempt {attempt}/10) retrying…"); trace!("⏳ UDC busy (attempt {attempt}/10) - retrying…");
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
} }
Err(err) => return Err(err), Err(err) => return Err(err),
@ -126,14 +126,14 @@ impl UsbGadget {
} }
Self::wait_state(&ctrl, "not attached", 3_000)?; Self::wait_state(&ctrl, "not attached", 3_000)?;
/* 2 reset driver */ /* 2 - reset driver */
Self::rebind_driver(&ctrl)?; Self::rebind_driver(&ctrl)?;
/* 3 wait UDC node to reappear */ /* 3 - wait UDC node to re-appear */
Self::wait_udc_present(&ctrl, 3_000)?; Self::wait_udc_present(&ctrl, 3_000)?;
/* 4 reattach + pullup */ /* 4 - re-attach + pull-up */
info!("🔌 reattaching gadget to {ctrl}"); info!("🔌 re-attaching gadget to {ctrl}");
Self::write_attr(self.udc_file, &ctrl)?; Self::write_attr(self.udc_file, &ctrl)?;
if Path::new(&sc).exists() { if Path::new(&sc).exists() {
// try to set the pull-up; ignore if the kernel rejects it // try to set the pull-up; ignore if the kernel rejects it
@ -156,24 +156,24 @@ impl UsbGadget {
} }
} }
/* 5 wait for host (but tolerate sleep) */ /* 5 - wait for host (but tolerate sleep) */
Self::wait_state(&ctrl, "configured", 6_000) Self::wait_state(&ctrl, "configured", 6_000)
.or_else(|e| { .or_else(|e| {
// If the host is physically absent (sleep / KVM paused) // If the host is physically absent (sleep / KVM paused)
// we allow 'not attached' and continue we can still // we allow 'not attached' and continue - we can still
// accept keyboard/mouse data and the host will enumerate // accept keyboard/mouse data and the host will enumerate
// later without another reset. // later without another reset.
let last = fs::read_to_string(format!("/sys/class/udc/{ctrl}/state")) let last = fs::read_to_string(format!("/sys/class/udc/{ctrl}/state"))
.unwrap_or_default(); .unwrap_or_default();
if last.trim() == "not attached" { if last.trim() == "not attached" {
warn!("⚠️ host did not enumerate within 6s continuing (state = {last:?})"); warn!("⚠️ host did not enumerate within 6s - continuing (state = {last:?})");
Ok(()) Ok(())
} else { } else {
Err(e) Err(e)
} }
})?; })?;
info!("✅ USBgadget cycle complete"); info!("✅ USB-gadget cycle complete");
Ok(()) Ok(())
} }
@ -190,7 +190,7 @@ impl UsbGadget {
match Self::write_attr(format!("{root}/unbind"), ctrl) { match Self::write_attr(format!("{root}/unbind"), ctrl) {
Ok(_) => break, Ok(_) => break,
Err(err) if attempt < 20 && Self::is_still_detaching(&err) => { Err(err) if attempt < 20 && Self::is_still_detaching(&err) => {
trace!("unbind inprogress (#{attempt}) waiting…"); trace!("unbind in-progress (#{attempt}) - waiting…");
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
} }
Err(err) => return Err(err) Err(err) => return Err(err)
@ -205,7 +205,7 @@ impl UsbGadget {
match Self::write_attr(format!("{root}/bind"), ctrl) { match Self::write_attr(format!("{root}/bind"), ctrl) {
Ok(_) => return Ok(()), // success 🎉 Ok(_) => return Ok(()), // success 🎉
Err(err) if attempt < 20 && Self::is_still_detaching(&err) => { Err(err) if attempt < 20 && Self::is_still_detaching(&err) => {
trace!("bind busy (#{attempt}) retrying…"); trace!("bind busy (#{attempt}) - retrying…");
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
} }
Err(err) => return Err(err) Err(err) => return Err(err)

View File

@ -4,7 +4,7 @@ use anyhow::Context;
use gstreamer as gst; use gstreamer as gst;
use gstreamer_app as gst_app; use gstreamer_app as gst_app;
use gst::prelude::*; use gst::prelude::*;
use gst::log; use gst::{log, MessageView};
use lesavka_common::lesavka::VideoPacket; use lesavka_common::lesavka::VideoPacket;
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tonic::Status; use tonic::Status;
@ -21,24 +21,31 @@ pub async fn eye_ball(
let eye = EYE_ID[id as usize]; let eye = EYE_ID[id as usize];
gst::init().context("gst init")?; gst::init().context("gst init")?;
// let desc = format!(
// "v4l2src device={dev} io-mode=mmap ! \
// video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \
// h264parse config-interval=-1 ! \
// appsink name=sink emit-signals=true drop=true sync=false"
// );
let desc = format!( let desc = format!(
"v4l2src device={dev} io-mode=mmap ! \ "v4l2src device={dev} io-mode=mmap ! videorate skip-to-first=true ! \
video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \ queue2 max-size-buffers=0 max-size-bytes=0 min-threshold-time=10000000 ! \
h264parse config-interval=-1 ! \ video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \
appsink name=sink emit-signals=true drop=true sync=false" h264parse config-interval=1 ! \
appsink name=sink emit-signals=true drop=false sync=false"
); );
let pipeline = gst::parse::launch(&desc)? let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>() .downcast::<gst::Pipeline>()
.expect("pipeline downcast"); .expect("pipeline down-cast");
let sink = pipeline let sink = pipeline
.by_name("sink") .by_name("sink")
.expect("appsink") .expect("appsink")
.dynamic_cast::<gst_app::AppSink>() .dynamic_cast::<gst_app::AppSink>()
.expect("appsink downcast"); .expect("appsink down-cast");
let (tx, rx) = tokio::sync::mpsc::channel(256); let (tx, rx) = tokio::sync::mpsc::channel(8192);
sink.set_callbacks( sink.set_callbacks(
gst_app::AppSinkCallbacks::builder() gst_app::AppSinkCallbacks::builder()
@ -102,5 +109,14 @@ pub async fn eye_ball(
); );
pipeline.set_state(gst::State::Playing)?; pipeline.set_state(gst::State::Playing)?;
let bus = pipeline.bus().unwrap();
loop {
match bus.timed_pop(gst::ClockTime::NONE) {
Some(msg) if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) => break,
Some(_) => continue,
None => continue,
}
}
Ok(ReceiverStream::new(rx)) Ok(ReceiverStream::new(rx))
} }