//! lesavka-server // sever/src/main.rs #![forbid(unsafe_code)] use std::sync::atomic::{AtomicBool, Ordering}; use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc, time::Duration}; use futures_util::{Stream, StreamExt}; use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex}; use tokio_stream::{wrappers::ReceiverStream}; use tonic::{transport::Server, Request, Response, Status}; use anyhow::Context as _; use tracing::{info, trace, warn, error}; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use tracing_appender::non_blocking; use tracing_appender::non_blocking::WorkerGuard; use udev::{MonitorBuilder}; use lesavka_server::{usb_gadget::UsbGadget, video}; use lesavka_common::lesavka::{ relay_server::{Relay, RelayServer}, KeyboardReport, MouseReport, MonitorRequest, VideoPacket, }; // ───────────────── helper ───────────────────── fn init_tracing() -> anyhow::Result { // 1. create file writer + guard let file = std::fs::OpenOptions::new() .create(true).write(true).append(true) .open("/tmp/lesavka-server.log")?; let (file_writer, guard) = non_blocking(file); // 2. build subscriber once let env = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("lesavka_server=info")); let console_layer = fmt::layer() .with_target(true) .with_thread_ids(true); let file_layer = fmt::layer() .with_writer(file_writer) .with_ansi(false); tracing_subscriber::registry() .with(env) .with(console_layer) .with(file_layer) .init(); Ok(guard) } async fn open_with_retry(path: &str) -> anyhow::Result { const MAX_ATTEMPTS: usize = 200; // ≈ 10 s (@50 ms) for attempt in 1..=MAX_ATTEMPTS { match OpenOptions::new() .write(true) .custom_flags(libc::O_NONBLOCK) .open(path) .await { Ok(f) => { info!("✅ {path} opened on attempt #{attempt}"); return Ok(f); } Err(e) if e.raw_os_error() == Some(libc::EBUSY) => { trace!("⏳ {path} busy, retry #{attempt}"); tokio::time::sleep(Duration::from_millis(50)).await; } Err(e) => return Err(e).with_context(|| format!("💥 opening {path}")), } } Err(anyhow::anyhow!("💥 timeout waiting for {path} to become available")) } fn owners_of(path: &str) -> String { use std::{fs, os::unix::fs::MetadataExt}; let Ok(target_ino) = fs::metadata(path).map(|m| m.ino()) else { return "-".into() }; let mut pids = Vec::new(); if let Ok(entries) = fs::read_dir("/proc") { for e in entries.flatten() { let file_name = e.file_name(); let pid = file_name.to_string_lossy().into_owned(); if !pid.chars().all(|c| c.is_ascii_digit()) { continue } let fd_dir = e.path().join("fd"); for fd in fs::read_dir(fd_dir).into_iter().flatten().flatten() { if let Ok(meta) = fd.metadata() { if meta.ino() == target_ino { pids.push(pid.to_string()); break; } } } } } if pids.is_empty() { "-".into() } else { pids.join(",") } } /*─────────────────── tonic service ─────────────────────*/ struct Handler { kb: Arc>, ms: Arc>, gadget: UsbGadget, did_cycle: AtomicBool, } impl Handler { async fn make(gadget: UsbGadget) -> anyhow::Result { info!("🛠️ Handler::make - cycling gadget ..."); gadget.cycle()?; tokio::time::sleep(Duration::from_millis(1000)).await; info!("🛠️ opening HID endpoints ..."); let kb = open_with_retry("/dev/hidg0").await?; let ms = open_with_retry("/dev/hidg1").await?; info!("✅ HID endpoints ready"); Ok(Self { kb: Arc::new(Mutex::new(kb)), ms: Arc::new(Mutex::new(ms)), gadget, did_cycle: AtomicBool::new(true), }) } } #[tonic::async_trait] impl Relay for Handler { type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; type CaptureVideoStream = Pin> + Send + Sync + 'static>>; async fn stream_keyboard( &self, req: Request>, ) -> Result, Status> { // self.gadget.cycle().map_err(|e| Status::internal(e.to_string()))?; if !self.did_cycle.swap(true, Ordering::SeqCst) { self.gadget .cycle() .map_err(|e| Status::internal(e.to_string()))?; tokio::time::sleep(Duration::from_millis(500)).await; } let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { // kb.lock().await.write_all(&pkt.data).await?; loop { match kb.lock().await.write_all(&pkt.data).await { Ok(()) => { trace!("⌨️ wrote {}", pkt.data.iter() .map(|b| format!("{b:02X}")).collect::>().join(" ")); break; }, Err(e) if matches!(e.raw_os_error(), Some(libc::EBUSY) | Some(libc::EAGAIN)) => { tokio::time::sleep(Duration::from_millis(10)).await; continue; } Err(e) => return Err(Status::internal(e.to_string())), } } tx.send(Ok(pkt)).await;//.ok(); // best-effort echo } Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn stream_mouse( &self, req: Request>, ) -> Result, Status> { let (tx, rx) = tokio::sync::mpsc::channel(4096); // higher burst let ms = self.ms.clone(); tokio::spawn(async move { let mut s = req.into_inner(); let mut boot_mode = true; while let Some(pkt) = s.next().await.transpose()? { loop { match ms.lock().await.write_all(&pkt.data).await { Ok(()) => { trace!("🖱️ wrote {}", pkt.data.iter() .map(|b| format!("{b:02X}")).collect::>().join(" ")); break; } Err(e) if matches!(e.raw_os_error(), Some(libc::EBUSY) | Some(libc::EAGAIN)) => { tokio::time::sleep(Duration::from_millis(10)).await; continue; } Err(e) => return Err(Status::internal(e.to_string())), } } let _ = tx.send(Ok(pkt)).await; } Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn capture_video( &self, req: Request, ) -> Result, Status> { let r = req.into_inner(); // let devs = loop { // let list = list_gc311_devices() // .map_err(|e| Status::internal(format!("enum v4l2: {e}")))?; // if !list.is_empty() { break list; } // tokio::time::sleep(Duration::from_secs(1)).await; // }; // let dev = devs // .get(r.id as usize) // .ok_or_else(|| Status::invalid_argument(format!("monitor id {} absent", r.id)))? // .to_owned(); let dev = match r.id { 0 => "/dev/lesavka_l_eye", 1 => "/dev/lesavka_r_eye", _ => return Err(Status::invalid_argument("monitor id must be 0 or 1")), } .to_string(); info!("🎥 streaming {dev} at ≤{} kb/s", r.max_bitrate); let s = video::spawn_camera(&dev, r.id, r.max_bitrate) .await .map_err(|e| Status::internal(format!("{e:#?}")))?; Ok(Response::new(Box::pin(s) as _)) } } /*─────────────────── main ──────────────────────────────*/ #[tokio::main(flavor = "multi_thread", worker_threads = 4)] async fn main() -> anyhow::Result<()> { /* logging */ let _log_guard: WorkerGuard = init_tracing()?; panic::set_hook(Box::new(|p| { let bt = Backtrace::force_capture(); error!("💥 panic: {p}\n{bt}"); })); /* auto‑cycle task */ // tokio::spawn(async { monitor_gc311_disconnect().await.ok(); }); let gadget = UsbGadget::new("lesavka"); let handler = match Handler::make(gadget.clone()).await { Ok(h) => h, Err(e) => { error!("💥 failed to create Handler: {e:#}"); std::process::exit(1); } }; // tokio::spawn({ // let gadget = gadget.clone(); // async move { // loop { // tokio::time::sleep(Duration::from_secs(4)).await; // if LAST_HID_WRITE.elapsed().as_secs() > 3 { // warn!("no HID traffic in 3 s - cycling UDC"); // let _ = gadget.cycle(); // } // } // } // }); info!("🌐 lesavka‑server listening on 0.0.0.0:50051"); if let Err(e) = Server::builder() .add_service(RelayServer::new(handler)) .serve(([0, 0, 0, 0], 50051).into()) .await { error!("💥 gRPC server exited: {e:#}"); std::process::exit(1); } Ok(()) }