//! lesavka-server — receive HidReport and write to /dev/hidg0 // sever/src/main.rs #![forbid(unsafe_code)] use futures_util::{Stream, StreamExt}; use std::{pin::Pin, sync::Arc, time::Duration}; use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex}; use tokio_stream::{wrappers::ReceiverStream}; use tonic::{transport::Server, Request, Response, Status}; use tracing::{info, trace}; use tracing_subscriber::{fmt, EnvFilter}; use udev::{MonitorBuilder}; use usb_gadget::UsbGadgetManager; use lesavka_server::{video, usb_reset}; use lesavka_common::lesavka::{ relay_server::{Relay, RelayServer}, KeyboardReport, MouseReport, MonitorRequest, VideoPacket, }; /*─────────────────── GC311 discovery ───────────────────*/ fn list_gc311_devices() -> anyhow::Result> { let mut v = Vec::new(); for entry in std::fs::read_dir("/sys/class/video4linux")? { let path = entry?.path(); let name = std::fs::read_to_string(path.join("name"))?; if name.to_lowercase().contains("gc311") { v.push( path.file_name() .unwrap() .to_string_lossy() .replace("video", "/dev/video"), ); } } v.sort(); Ok(v) } // /// background task: whenever GC311 disappears, cycle USB port // async fn monitor_gc311_disconnect() -> anyhow::Result<()> { // let mut mon = MonitorBuilder::new()? // .match_subsystem("usb")? // // .match_tag("PRODUCT", "7ca/3311/*")? // vendor: 0x07ca, device 0x3311 // .listen()?; // // Blocking I/O -> move into a dedicated thread // tokio::task::spawn_blocking(move || { // for ev in mon { // `Socket` implements `Iterator` // if ev.event_type() == udev::EventType::Remove { // if let (Some(prod), Some(bus), Some(dev)) = // (ev.attribute_value("PRODUCT"), // ev.attribute_value("busnum"), // ev.attribute_value("devnum")) // { // // 0x07ca / 0x3311 == AVerMedia GC311 // if prod.to_str().map_or(false, |p| p.starts_with("7ca/3311/")) { // usb_reset::cycle_port(bus.to_str().unwrap(), // dev.to_str().unwrap()); // } // } // } // } // }); // // if ev.event_type() == udev::EventType::Remove { // // if let (Some(bus), Some(dev)) = (ev.attribute_value("busnum"), ev.attribute_value("devnum")) { // // usb_reset::cycle_port(bus.to_str().unwrap(), dev.to_str().unwrap()); // // } // // } // // } // Ok(()) // } /*─────────────────── tonic service ─────────────────────*/ struct Handler { kb: Arc>, ms: Arc>, gadget: UsbGadgetManager, } impl Handler { fn make(gadget: UsbGadget) -> anyhow::Result { let kb = OpenOptions::new().write(true).open("/dev/hidg0").await?; let ms = OpenOptions::new().write(true) .custom_flags(libc::O_NONBLOCK) .open("/dev/hidg1").await?; Ok(Self { kb: Arc::new(Mutex::new(kb)), ms: Arc::new(Mutex::new(ms)), gadget }) } } #[tonic::async_trait] impl Relay for Handler { type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; type CaptureVideoStream = Pin> + Send + Sync + 'static>>; async fn stream_keyboard( &self, req: Request>, ) -> Result, Status> { self.gadget.cycle().map_err(|e| Status::internal(e.to_string()))?; let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { kb.lock().await.write_all(&pkt.data).await?; tx.send(Ok(pkt)).await;//.ok(); // best-effort echo } Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn stream_mouse( &self, req: Request>, ) -> Result, Status> { let (tx, rx) = tokio::sync::mpsc::channel(4096); // higher burst let ms = self.ms.clone(); tokio::spawn(async move { let mut s = req.into_inner(); let mut boot_mode = true; while let Some(pkt) = s.next().await.transpose()? { loop { match ms.lock().await.write_all(&pkt.data).await { Ok(()) => { trace!("🖱️ wrote {}", pkt.data.iter() .map(|b| format!("{b:02X}")).collect::>().join(" ")); break; } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { tokio::time::sleep(Duration::from_micros(500)).await; } Err(e) => return Err(Status::internal(format!("hidg1: {e}"))), } } let _ = tx.send(Ok(pkt)).await; } Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn capture_video( &self, req: Request, ) -> Result, Status> { let r = req.into_inner(); let devs = loop { let list = list_gc311_devices() .map_err(|e| Status::internal(format!("enum v4l2: {e}")))?; if !list.is_empty() { break list; } tokio::time::sleep(Duration::from_secs(1)).await; }; let dev = devs .get(r.id as usize) .ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))? .to_owned(); info!("🎥 streaming {dev} at ≤{} kb/s", r.max_bitrate); let s = video::spawn_camera(&dev, r.id, r.max_bitrate) .await .map_err(|e| Status::internal(format!("{e:#?}")))?; Ok(Response::new(Box::pin(s) as _)) } } /*─────────────────── main ──────────────────────────────*/ #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()> { /* logging */ fmt().with_env_filter( EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("lesavka_server=info")), ) .init(); /* auto‑cycle task */ // tokio::spawn(async { monitor_gc311_disconnect().await.ok(); }); let gadget = UsbGadgetManager::new("lesavka"); gadget.cycle().ok(); let handler = Handler::make(gadget.clone())?; tokio::spawn({ let gadget = gadget.clone(); async move { loop { tokio::time::sleep(Duration::from_secs(4)).await; if LAST_HID_WRITE.elapsed().as_secs() > 3 { warn!("no HID traffic in 3 s – cycling UDC"); let _ = gadget.cycle(); } } } }); println!("🌐 lesavka-server listening on 0.0.0.0:50051"); Server::builder() .add_service(RelayServer::new(handler)) .serve(([0, 0, 0, 0], 50051).into()) .await?; Ok(()) }