From 20e33f03d57c4ff9445f4a9681291a8c05aadfd9 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 27 Jun 2025 06:56:08 -0500 Subject: [PATCH] HID reset modified --- common/build.rs | 2 +- common/proto/lesavka.proto | 28 ++-- server/Cargo.toml | 4 + server/src/main.rs | 297 +++++++++++-------------------------- 4 files changed, 102 insertions(+), 229 deletions(-) diff --git a/common/build.rs b/common/build.rs index 467ee6c..e324590 100644 --- a/common/build.rs +++ b/common/build.rs @@ -1,4 +1,4 @@ - +// common/build.rs fn main() { tonic_build::configure() diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 1a72e2c..69825c5 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -1,26 +1,18 @@ syntax = "proto3"; package lesavka; -// smaller, fixed-size payloads -> less allocation and simpler decoding -message KeyboardReport { bytes data = 1; } // exactly 8 bytes -message MouseReport { bytes data = 1; } // exactly 4 bytes +message KeyboardReport { bytes data = 1; } +message MouseReport { bytes data = 1; } -// ------------ video ------------ -message MonitorRequest { - uint32 id = 1; // 0/1 for now - uint32 max_bitrate = 2; // kb/s – client hints, server may ignore -} +message MonitorRequest { uint32 id = 1; uint32 max_bitrate = 2; } +message VideoPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } -message VideoPacket { - uint32 id = 1; // monitor id - uint64 pts = 2; // monotonically increasing micro‑seconds - bytes data = 3; // full H.264 access‑unit (length‑prefixed) -} +message ResetUsbRequest {} // empty body +message ResetUsbReply { bool ok = 1; } // true = success service Relay { - rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport); - rpc StreamMouse (stream MouseReport) returns (stream MouseReport); - - // client requests one monitor, server pushes raw H.264 - rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket); + rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport); + rpc StreamMouse (stream MouseReport) returns (stream MouseReport); + rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket); + rpc ResetUsb (ResetUsbRequest) returns (ResetUsbReply); } diff --git a/server/Cargo.toml b/server/Cargo.toml index d3b640b..95cd08e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -22,3 +22,7 @@ gstreamer = { version = "0.23", features = ["v1_22"] } gstreamer-app = { version = "0.23", features = ["v1_22"] } gstreamer-video = "0.23" udev = "0.8" +prost-types = "0.13" + +[build-dependencies] +prost-build = "0.13" diff --git a/server/src/main.rs b/server/src/main.rs index e2b6f9f..add0e86 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,113 +1,72 @@ -//! lesavka-server -// sever/src/main.rs +//! lesavka‑server – **auto‑cycle disabled** +// server/src/main.rs #![forbid(unsafe_code)] -use std::sync::atomic::{AtomicBool, Ordering}; use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc, time::Duration}; -use futures_util::{Stream, StreamExt}; -use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex}; -use tokio_stream::{wrappers::ReceiverStream}; -use tonic::{transport::Server, Request, Response, Status}; +use std::sync::atomic::AtomicBool; use anyhow::Context as _; -use tracing::{info, trace, warn, error}; +use futures_util::{Stream, StreamExt}; +use tokio::{ + fs::{OpenOptions}, + io::AsyncWriteExt, + sync::Mutex, +}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; +use tonic::transport::Server; +use tracing::{info, warn, error, trace}; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; -use tracing_appender::non_blocking; use tracing_appender::non_blocking::WorkerGuard; +use lesavka_common::lesavka::{ + ResetUsbRequest, ResetUsbReply, + relay_server::{Relay, RelayServer}, + KeyboardReport, MouseReport, MonitorRequest, VideoPacket, +}; + use lesavka_server::{usb_gadget::UsbGadget, video}; -use lesavka_common::lesavka::{ - relay_server::{Relay, RelayServer}, - KeyboardReport, MouseReport, - MonitorRequest, VideoPacket, -}; +/*──────────────── constants ────────────────*/ +/// **false** = never reset automatically. +const AUTO_CYCLE: bool = false; -// ───────────────── helper ───────────────────── +/*──────────────── logging ───────────────────*/ fn init_tracing() -> anyhow::Result { - // 1. create file writer + guard let file = std::fs::OpenOptions::new() - .create(true).write(true).append(true) + .create(true).append(true).write(true) .open("/tmp/lesavka-server.log")?; - let (file_writer, guard) = non_blocking(file); - - // 2. build subscriber once - let env = EnvFilter::try_from_default_env() - .unwrap_or_else(|_| {EnvFilter::new("lesavka_server=info")}); - let console_layer = fmt::layer() - .with_target(true) - .with_thread_ids(true); - let file_layer = fmt::layer() - .with_writer(file_writer) - .with_ansi(false); + let (file_writer, guard) = tracing_appender::non_blocking(file); tracing_subscriber::registry() - .with(env) - .with(console_layer) - .with(file_layer) + .with(EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("lesavka_server=info"))) + .with(fmt::layer().with_target(true).with_thread_ids(true)) + .with(fmt::layer().with_writer(file_writer).with_ansi(false)) .init(); - Ok(guard) } +/*──────────────── helpers ───────────────────*/ async fn open_with_retry(path: &str) -> anyhow::Result { - const MAX_ATTEMPTS: usize = 200; // ≈ 10 s (@50 ms) - for attempt in 1..=MAX_ATTEMPTS { + for attempt in 1..=200 { // ≈10 s match OpenOptions::new() - .write(true) - .custom_flags(libc::O_NONBLOCK) - .open(path) - .await + .write(true).custom_flags(libc::O_NONBLOCK).open(path).await { Ok(f) => { info!("✅ {path} opened on attempt #{attempt}"); return Ok(f); } Err(e) if e.raw_os_error() == Some(libc::EBUSY) => { - trace!("⏳ {path} busy, retry #{attempt}"); + trace!("⏳ {path} busy… retry #{attempt}"); tokio::time::sleep(Duration::from_millis(50)).await; } - Err(e) => return Err(e).with_context(|| format!("💥 opening {path}")), + Err(e) => return Err(e).with_context(|| format!("opening {path}")), } } - Err(anyhow::anyhow!("💥 timeout waiting for {path} to become available")) + Err(anyhow::anyhow!("timeout waiting for {path}")) } -fn owners_of(path: &str) -> String { - use std::{fs, os::unix::fs::MetadataExt}; - - let Ok(target_ino) = fs::metadata(path).map(|m| m.ino()) else { return "-".into() }; - - let mut pids = Vec::new(); - if let Ok(entries) = fs::read_dir("/proc") { - for e in entries.flatten() { - let file_name = e.file_name(); - let pid = file_name.to_string_lossy().into_owned(); - if !pid.chars().all(|c| c.is_ascii_digit()) { continue } - - let fd_dir = e.path().join("fd"); - for fd in fs::read_dir(fd_dir).into_iter().flatten().flatten() { - if let Ok(meta) = fd.metadata() { - if meta.ino() == target_ino { - pids.push(pid.to_string()); - break; - } - } - } - } - } - if pids.is_empty() { "-".into() } else { pids.join(",") } -} - -async fn wait_configured(ctrl: &str, limit_ms: u64) -> anyhow::Result<()> { - for _ in 0..=limit_ms/50 { - let s = UsbGadget::state(ctrl)?; - if s.trim() == "configured" { return Ok(()) } - tokio::time::sleep(Duration::from_millis(50)).await; - } - Err(anyhow::anyhow!("host never configured")) -} - -/*─────────────────── tonic service ─────────────────────*/ +/*──────────────── Handler ───────────────────*/ struct Handler { kb: Arc>, ms: Arc>, @@ -116,45 +75,19 @@ struct Handler { } impl Handler { - async fn make(gadget: UsbGadget) -> anyhow::Result { - info!("🛠️ Handler::make - cycling gadget ..."); - gadget.cycle()?; - - let ctrl = UsbGadget::find_controller()?; - let configured = wait_configured(&ctrl, 10_000).await.is_ok(); - if configured { - info!("✅ host enumerated (configured)"); + async fn new(gadget: UsbGadget) -> anyhow::Result { + if AUTO_CYCLE { + info!("🛠️ Initial USB reset…"); + let _ = gadget.cycle(); // ignore failure – may boot without host } else { - warn!("⚠️ host absent – queuing HID traffic"); - } - let state = UsbGadget::wait_state_any(&ctrl, 5_000)?; - match state.as_str() { - "configured" => info!("✅ host enumerated (configured)"), - "not attached" => warn!("⚠️ host absent – HID writes will be queued"), - _ => warn!("⚠️ unexpected UDC state: {state}"), + info!("🛠️ AUTO_CYCLE disabled – no initial reset"); } - tokio::time::sleep(Duration::from_millis(1000)).await; - - info!("🛠️ opening HID endpoints ..."); + info!("🛠️ opening HID endpoints …"); let kb = open_with_retry("/dev/hidg0").await?; let ms = open_with_retry("/dev/hidg1").await?; - info!("✅ HID endpoints ready"); - Ok(Self { - kb: Arc::new(Mutex::new(kb)), - ms: Arc::new(Mutex::new(ms)), - gadget, - did_cycle: AtomicBool::new(true), - }) - } - async fn degraded(gadget: UsbGadget) -> anyhow::Result { - info!("🛠️ Handler::degraded - opening HID endpoints ..."); - let kb = open_with_retry("/dev/hidg0").await?; - let ms = open_with_retry("/dev/hidg1").await?; - - info!("✅ HID endpoints ready"); Ok(Self { kb: Arc::new(Mutex::new(kb)), ms: Arc::new(Mutex::new(ms)), @@ -164,54 +97,32 @@ impl Handler { } } +/*──────────────── gRPC service ─────────────*/ #[tonic::async_trait] impl Relay for Handler { + /* existing streams ─ unchanged, except: no more auto‑reset */ type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; - type CaptureVideoStream = Pin> + Send + Sync + 'static>>; + type CaptureVideoStream = Pin> + Send + Sync>>; async fn stream_keyboard( &self, req: Request>, ) -> Result, Status> { - // self.gadget.cycle().map_err(|e| Status::internal(e.to_string()))?; - if !self.did_cycle.swap(true, Ordering::SeqCst) { - self.gadget - .cycle() - .map_err(|e| Status::internal(e.to_string()))?; - tokio::time::sleep(Duration::from_millis(500)).await; - } - let (tx, rx) = - tokio::sync::mpsc::channel::>(32); + let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); - let gadget = self.gadget.clone(); - let ctrl = UsbGadget::find_controller().unwrap_or_default(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { - /* try to write once */ - let mut guard = kb.lock().await; - if let Err(e) = guard.write_all(&pkt.data).await { - /* host vanished ? */ - if matches!(e.raw_os_error(), - Some(libc::ESHUTDOWN)|Some(libc::ENODEV)|Some(libc::EPIPE)) { - warn!("host disappeared – recycling gadget"); - gadget.cycle().map_err(|e| Status::internal(e.to_string()))?; - wait_configured(&ctrl, 10_000).await - .map_err(|e| Status::internal(e.to_string()))?; - /* reopen endpoint & swap into mutex */ - *guard = open_with_retry("/dev/hidg0").await - .map_err(|e| Status::internal(e.to_string()))?; - } else { - return Err(Status::internal(e.to_string())); - } + if let Err(e) = kb.lock().await.write_all(&pkt.data).await { + warn!("⌨️ write failed: {e} (dropped)"); } - drop(guard); /* release lock before await */ - tx.send(Ok(pkt)).await.ok(); /* best‑effort echo */ + tx.send(Ok(pkt)).await.ok(); } Ok::<(), Status>(()) }); + Ok(Response::new(ReceiverStream::new(rx))) } @@ -219,37 +130,20 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { - let (tx, rx) = - tokio::sync::mpsc::channel::>(4096); + let (tx, rx) = tokio::sync::mpsc::channel(4096); let ms = self.ms.clone(); - let gadget = self.gadget.clone(); - let ctrl = UsbGadget::find_controller().unwrap_or_default(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { - /* try to write once */ - let mut guard = ms.lock().await; - if let Err(e) = guard.write_all(&pkt.data).await { - /* host vanished ? */ - if matches!(e.raw_os_error(), - Some(libc::ESHUTDOWN)|Some(libc::ENODEV)|Some(libc::EPIPE)) { - warn!("host disappeared – recycling gadget"); - gadget.cycle().map_err(|e| Status::internal(e.to_string()))?; - wait_configured(&ctrl, 10_000).await - .map_err(|e| Status::internal(e.to_string()))?; - /* reopen endpoint & swap into mutex */ - *guard = open_with_retry("/dev/hidg1").await - .map_err(|e| Status::internal(e.to_string()))?; - } else { - return Err(Status::internal(e.to_string())); - } + if let Err(e) = ms.lock().await.write_all(&pkt.data).await { + warn!("🖱️ write failed: {e} (dropped)"); } - drop(guard); /* release lock before await */ - tx.send(Ok(pkt)).await.ok(); /* best‑effort echo */ + tx.send(Ok(pkt)).await.ok(); } Ok::<(), Status>(()) }); + Ok(Response::new(ReceiverStream::new(rx))) } @@ -257,71 +151,54 @@ impl Relay for Handler { &self, req: Request, ) -> Result, Status> { - let r = req.into_inner(); - - // let devs = loop { - // let list = list_gc311_devices() - // .map_err(|e| Status::internal(format!("enum v4l2: {e}")))?; - // if !list.is_empty() { break list; } - // tokio::time::sleep(Duration::from_secs(1)).await; - // }; - - // let dev = devs - // .get(r.id as usize) - // .ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))? - // .to_owned(); - - let dev = match r.id { + let id = req.into_inner().id; + let dev = match id { 0 => "/dev/lesavka_l_eye", 1 => "/dev/lesavka_r_eye", _ => return Err(Status::invalid_argument("monitor id must be 0 or 1")), - } - .to_string(); - - info!("🎥 streaming {dev} at ≤{} kb/s", r.max_bitrate); - - let s = video::spawn_camera(&dev, r.id, r.max_bitrate) + }; + info!("🎥 streaming {dev}"); + let s = video::spawn_camera(dev, id, 6_000) .await - .map_err(|e| Status::internal(format!("{e:#?}")))?; + .map_err(|e| Status::internal(format!("{e:#}")))?; + Ok(Response::new(Box::pin(s))) + } - Ok(Response::new(Box::pin(s) as _)) + /*────────────── USB‑reset RPC ───────────*/ + async fn reset_usb( + &self, + _req: Request, + ) -> Result, Status> { + info!("🔴 explicit ResetUsb() called"); + match self.gadget.cycle() { + Ok(_) => Ok(Response::new(ResetUsbReply { ok: true })), + Err(e) => { + error!("💥 cycle failed: {e:#}"); + Err(Status::internal(e.to_string())) + } + } } } -/*─────────────────── main ──────────────────────────────*/ -#[tokio::main(flavor = "multi_thread", worker_threads = 4)] +/*──────────────── main ───────────────────────*/ +#[tokio::main(worker_threads = 4)] async fn main() -> anyhow::Result<()> { - /* logging */ - let _log_guard: WorkerGuard = init_tracing()?; + let _guard = init_tracing()?; panic::set_hook(Box::new(|p| { let bt = Backtrace::force_capture(); error!("💥 panic: {p}\n{bt}"); })); - /* auto‑cycle task */ - // tokio::spawn(async { monitor_gc311_disconnect().await.ok(); }); - - let gadget = UsbGadget::new("lesavka"); - let handler = match Handler::make(gadget.clone()).await { - Ok(h) => h, - Err(e) => { - error!("💥 handler degraded (host offline): {e:#}"); - Handler::degraded(gadget.clone()).await? - } - }; + let gadget = UsbGadget::new("lesavka"); + let handler = Handler::new(gadget.clone()).await?; info!("🌐 lesavka‑server listening on 0.0.0.0:50051"); - - if let Err(e) = Server::builder() - .tcp_nodelay(true) - .max_frame_size(Some(256 * 1024)) - .add_service(RelayServer::new(handler)) - .serve(([0, 0, 0, 0], 50051).into()) - .await - { - error!("💥 gRPC server exited: {e:#}"); - std::process::exit(1); - } + Server::builder() + .tcp_nodelay(true) + .max_frame_size(Some(256*1024)) + .add_service(RelayServer::new(handler)) + .serve(([0,0,0,0], 50051).into()) + .await?; Ok(()) }