diff --git a/server/src/main.rs b/server/src/main.rs index 463e36e..c8ce366 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -4,18 +4,15 @@ use anyhow::Context as _; use futures_util::{Stream, StreamExt}; -use std::path::Path; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::AtomicBool; use std::time::Duration; use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc}; -use tokio::{fs::OpenOptions, io::AsyncWriteExt, process::Command, sync::Mutex}; +use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Server; use tonic::{Request, Response, Status}; use tonic_reflection::server::Builder as ReflBuilder; -use tracing::{debug, error, info, trace, warn}; -use tracing_appender::non_blocking::WorkerGuard; -use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; +use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::{ AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, @@ -23,289 +20,14 @@ use lesavka_common::lesavka::{ relay_server::{Relay, RelayServer}, }; -use lesavka_server::{audio, camera, gadget::UsbGadget, handshake::HandshakeSvc, paste, video}; +use lesavka_server::{ + audio, camera, camera_runtime::CameraRuntime, gadget::UsbGadget, handshake::HandshakeSvc, + paste, runtime_support, runtime_support::init_tracing, uvc_runtime, video, +}; /*──────────────── constants ────────────────*/ const VERSION: &str = env!("CARGO_PKG_VERSION"); const PKG_NAME: &str = env!("CARGO_PKG_NAME"); -static STREAM_SEQ: AtomicU64 = AtomicU64::new(1); - -/*──────────────── logging ───────────────────*/ -fn init_tracing() -> anyhow::Result { - let file = std::fs::OpenOptions::new() - .create(true) - .truncate(true) - .write(true) - .open("/tmp/lesavka-server.log")?; - let (file_writer, guard) = tracing_appender::non_blocking(file); - - let env_filter = EnvFilter::try_from_default_env() - .unwrap_or_else(|_| EnvFilter::new("lesavka_server=info,lesavka_server::video=warn")); - let filter_str = env_filter.to_string(); - - tracing_subscriber::registry() - .with(env_filter) - .with(fmt::layer().with_target(true).with_thread_ids(true)) - .with( - fmt::layer() - .with_writer(file_writer) - .with_ansi(false) - .with_target(true) - .with_level(true), - ) - .init(); - tracing::info!("📜 effective RUST_LOG = \"{}\"", filter_str); - Ok(guard) -} - -/*──────────────── helpers ───────────────────*/ -async fn open_with_retry(path: &str) -> anyhow::Result { - for attempt in 1..=200 { - // ≈10 s - match OpenOptions::new() - .write(true) - .custom_flags(libc::O_NONBLOCK) - .open(path) - .await - { - Ok(f) => { - info!("✅ {path} opened on attempt #{attempt}"); - return Ok(f); - } - Err(e) if e.raw_os_error() == Some(libc::EBUSY) => { - trace!("⏳ {path} busy… retry #{attempt}"); - tokio::time::sleep(Duration::from_millis(50)).await; - } - Err(e) => return Err(e).with_context(|| format!("opening {path}")), - } - } - Err(anyhow::anyhow!("timeout waiting for {path}")) -} - -fn allow_gadget_cycle() -> bool { - std::env::var("LESAVKA_ALLOW_GADGET_CYCLE").is_ok() -} - -async fn recover_hid_if_needed( - err: &std::io::Error, - gadget: UsbGadget, - kb: Arc>, - ms: Arc>, - did_cycle: Arc, -) { - let code = err.raw_os_error(); - let should_recover = matches!( - code, - Some(libc::ENOTCONN) | Some(libc::ESHUTDOWN) | Some(libc::EPIPE) - ); - if !should_recover { - return; - } - if did_cycle - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - return; - } - - let allow_cycle = allow_gadget_cycle(); - tokio::spawn(async move { - if allow_cycle { - warn!("🔁 HID transport down (errno={code:?}) - cycling gadget"); - match tokio::task::spawn_blocking(move || gadget.cycle()).await { - Ok(Ok(())) => info!("✅ USB gadget cycle complete (auto-recover)"), - Ok(Err(e)) => error!("💥 USB gadget cycle failed: {e:#}"), - Err(e) => error!("💥 USB gadget cycle task panicked: {e:#}"), - } - } else { - warn!( - "🔒 HID transport down (errno={code:?}) - gadget cycle disabled; set LESAVKA_ALLOW_GADGET_CYCLE=1 to enable" - ); - } - - if let Err(e) = async { - let kb_new = open_with_retry("/dev/hidg0").await?; - let ms_new = open_with_retry("/dev/hidg1").await?; - *kb.lock().await = kb_new; - *ms.lock().await = ms_new; - Ok::<(), anyhow::Error>(()) - } - .await - { - error!("💥 HID reopen failed: {e:#}"); - } - - tokio::time::sleep(Duration::from_secs(2)).await; - did_cycle.store(false, Ordering::SeqCst); - }); -} - -async fn open_voice_with_retry(uac_dev: &str) -> anyhow::Result { - let attempts = std::env::var("LESAVKA_MIC_INIT_ATTEMPTS") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(5) - .max(1); - let delay_ms = std::env::var("LESAVKA_MIC_INIT_DELAY_MS") - .ok() - .and_then(|v| v.parse::().ok()) - .unwrap_or(250); - let mut last_err: Option = None; - for attempt in 1..=attempts { - match audio::Voice::new(uac_dev).await { - Ok(v) => { - if attempt > 1 { - info!(%uac_dev, attempt, "🎤 microphone sink recovered"); - } - return Ok(v); - } - Err(e) => { - warn!(%uac_dev, attempt, "⚠️ microphone sink init failed: {e:#}"); - last_err = Some(e); - tokio::time::sleep(Duration::from_millis(delay_ms)).await; - } - } - } - Err(last_err.unwrap_or_else(|| anyhow::anyhow!("microphone sink init failed"))) -} - -fn next_stream_id() -> u64 { - STREAM_SEQ.fetch_add(1, Ordering::Relaxed) -} - -async fn write_hid_report(dev: &Arc>, data: &[u8]) -> std::io::Result<()> { - let mut last: Option = None; - for attempt in 0..5 { - let mut f = dev.lock().await; - match f.write_all(data).await { - Ok(()) => return Ok(()), - Err(e) - if e.kind() == std::io::ErrorKind::WouldBlock - || e.raw_os_error() == Some(libc::EAGAIN) => - { - last = Some(e); - } - Err(e) => return Err(e), - } - drop(f); - tokio::time::sleep(Duration::from_millis((attempt as u64 + 1) * 2)).await; - } - Err(last.unwrap_or_else(|| std::io::Error::from_raw_os_error(libc::EAGAIN))) -} - -/// Pick the UVC gadget video node. -/// Priority: 1) `LESAVKA_UVC_DEV` override; 2) first `video_output` node. -/// Returns an error when nothing matches instead of guessing a capture card. -fn pick_uvc_device() -> anyhow::Result { - if let Ok(path) = std::env::var("LESAVKA_UVC_DEV") { - return Ok(path); - } - - let ctrl = UsbGadget::find_controller().ok(); - if let Some(ctrl) = ctrl.as_deref() { - let by_path = format!("/dev/v4l/by-path/platform-{ctrl}-video-index0"); - if Path::new(&by_path).exists() { - return Ok(by_path); - } - } - - // walk /dev/video* via udev and look for an output‑capable node (gadget exposes one) - let mut fallback: Option = None; - if let Ok(mut en) = udev::Enumerator::new() { - let _ = en.match_subsystem("video4linux"); - if let Ok(devs) = en.scan_devices() { - for dev in devs { - let caps = dev - .property_value("ID_V4L_CAPABILITIES") - .and_then(|v| v.to_str()) - .unwrap_or_default(); - if !caps.contains(":video_output:") { - continue; - } - let Some(node) = dev.devnode() else { continue }; - let node = node.to_string_lossy().into_owned(); - let product = dev - .property_value("ID_V4L_PRODUCT") - .and_then(|v| v.to_str()) - .unwrap_or_default(); - let path = dev - .property_value("ID_PATH") - .and_then(|v| v.to_str()) - .unwrap_or_default(); - if let Some(ctrl) = ctrl.as_deref() { - if product == ctrl || path.contains(ctrl) { - return Ok(node); - } - } - if fallback.is_none() { - fallback = Some(node); - } - } - } - } - if let Some(node) = fallback { - return Ok(node); - } - - Err(anyhow::anyhow!( - "no video_output v4l2 node found; set LESAVKA_UVC_DEV" - )) -} - -fn uvc_ctrl_bin() -> String { - std::env::var("LESAVKA_UVC_CTRL_BIN") - .unwrap_or_else(|_| "/usr/local/bin/lesavka-uvc".to_string()) -} - -fn spawn_uvc_control(bin: &str, uvc_dev: &str) -> anyhow::Result { - Command::new(bin) - .arg("--device") - .arg(uvc_dev) - .spawn() - .context("spawning lesavka-uvc") -} - -async fn supervise_uvc_control(bin: String) { - let mut waiting_logged = false; - loop { - let uvc_dev = match pick_uvc_device() { - Ok(dev) => { - if waiting_logged { - info!(%dev, "📷 UVC device discovered"); - waiting_logged = false; - } - dev - } - Err(e) => { - if !waiting_logged { - warn!("⚠️ UVC device not ready: {e:#}"); - waiting_logged = true; - } - tokio::time::sleep(Duration::from_secs(2)).await; - continue; - } - }; - - match spawn_uvc_control(&bin, &uvc_dev) { - Ok(mut child) => { - info!(%uvc_dev, "📷 UVC control helper started"); - match child.wait().await { - Ok(status) => { - warn!(%uvc_dev, "⚠️ lesavka-uvc exited: {status}"); - } - Err(e) => { - warn!(%uvc_dev, "⚠️ lesavka-uvc wait failed: {e:#}"); - } - } - } - Err(e) => { - warn!(%uvc_dev, "⚠️ failed to start lesavka-uvc: {e:#}"); - } - } - - tokio::time::sleep(Duration::from_secs(2)).await; - } -} /*──────────────── Handler ───────────────────*/ struct Handler { @@ -316,107 +38,9 @@ struct Handler { camera_rt: Arc, } -struct CameraRelaySlot { - cfg: camera::CameraConfig, - relay: Arc, -} - -struct CameraRuntime { - generation: AtomicU64, - slot: Mutex>, -} - -impl CameraRuntime { - fn new() -> Self { - Self { - generation: AtomicU64::new(0), - slot: Mutex::new(None), - } - } - - async fn activate( - &self, - cfg: &camera::CameraConfig, - ) -> Result<(u64, Arc), Status> { - let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1; - let mut slot = self.slot.lock().await; - let mut reused = false; - - let relay = if let Some(existing) = slot.as_ref() { - if camera_cfg_eq(&existing.cfg, cfg) { - reused = true; - existing.relay.clone() - } else { - self.make_relay(cfg)? - } - } else { - self.make_relay(cfg)? - }; - - if !reused { - *slot = Some(CameraRelaySlot { - cfg: cfg.clone(), - relay: relay.clone(), - }); - info!( - session_id, - output = cfg.output.as_str(), - codec = cfg.codec.as_str(), - width = cfg.width, - height = cfg.height, - fps = cfg.fps, - "🎥 camera relay (re)created" - ); - } else { - info!(session_id, "🎥 camera relay reused"); - } - - Ok((session_id, relay)) - } - - fn is_active(&self, session_id: u64) -> bool { - self.generation.load(Ordering::Relaxed) == session_id - } - - fn make_relay(&self, cfg: &camera::CameraConfig) -> Result, Status> { - let relay = match cfg.output { - camera::CameraOutput::Uvc => { - if std::env::var("LESAVKA_DISABLE_UVC").is_ok() { - return Err(Status::failed_precondition( - "UVC output disabled (LESAVKA_DISABLE_UVC set)", - )); - } - let uvc = pick_uvc_device().map_err(|e| Status::internal(format!("{e:#}")))?; - info!(%uvc, "🎥 stream_camera using UVC sink"); - video::CameraRelay::new_uvc(0, &uvc, cfg) - .map_err(|e| Status::internal(format!("{e:#}")))? - } - camera::CameraOutput::Hdmi => video::CameraRelay::new_hdmi(0, cfg) - .map_err(|e| Status::internal(format!("{e:#}")))?, - }; - Ok(Arc::new(relay)) - } -} - -fn camera_cfg_eq(a: &camera::CameraConfig, b: &camera::CameraConfig) -> bool { - if a.output != b.output - || a.codec != b.codec - || a.width != b.width - || a.height != b.height - || a.fps != b.fps - { - return false; - } - match (&a.hdmi, &b.hdmi) { - (Some(ha), Some(hb)) => ha.name == hb.name && ha.id == hb.id, - (None, None) => true, - _ => false, - } -} - impl Handler { async fn new(gadget: UsbGadget) -> anyhow::Result { - if allow_gadget_cycle() { + if runtime_support::allow_gadget_cycle() { info!("🛠️ Initial USB reset…"); let _ = gadget.cycle(); // ignore failure - may boot without host } else { @@ -426,8 +50,8 @@ impl Handler { } info!("🛠️ opening HID endpoints …"); - let kb = open_with_retry("/dev/hidg0").await?; - let ms = open_with_retry("/dev/hidg1").await?; + let kb = runtime_support::open_with_retry("/dev/hidg0").await?; + let ms = runtime_support::open_with_retry("/dev/hidg1").await?; info!("✅ HID endpoints ready"); Ok(Self { @@ -440,8 +64,8 @@ impl Handler { } async fn reopen_hid(&self) -> anyhow::Result<()> { - let kb_new = open_with_retry("/dev/hidg0").await?; - let ms_new = open_with_retry("/dev/hidg1").await?; + let kb_new = runtime_support::open_with_retry("/dev/hidg0").await?; + let ms_new = runtime_support::open_with_retry("/dev/hidg1").await?; *self.kb.lock().await = kb_new; *self.ms.lock().await = ms_new; Ok(()) @@ -463,7 +87,7 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { - let rpc_id = next_stream_id(); + let rpc_id = runtime_support::next_stream_id(); info!(rpc_id, "⌨️ stream_keyboard opened"); let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); @@ -474,12 +98,12 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { - if let Err(e) = write_hid_report(&kb, &pkt.data).await { + if let Err(e) = runtime_support::write_hid_report(&kb, &pkt.data).await { if e.raw_os_error() == Some(libc::EAGAIN) { debug!(rpc_id, "⌨️ write would block (dropped)"); } else { warn!(rpc_id, "⌨️ write failed: {e} (dropped)"); - recover_hid_if_needed( + runtime_support::recover_hid_if_needed( &e, gadget.clone(), kb.clone(), @@ -502,7 +126,7 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { - let rpc_id = next_stream_id(); + let rpc_id = runtime_support::next_stream_id(); info!(rpc_id, "🖱️ stream_mouse opened"); let (tx, rx) = tokio::sync::mpsc::channel(1024); let ms = self.ms.clone(); @@ -513,12 +137,12 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { - if let Err(e) = write_hid_report(&ms, &pkt.data).await { + if let Err(e) = runtime_support::write_hid_report(&ms, &pkt.data).await { if e.raw_os_error() == Some(libc::EAGAIN) { debug!(rpc_id, "🖱️ write would block (dropped)"); } else { warn!(rpc_id, "🖱️ write failed: {e} (dropped)"); - recover_hid_if_needed( + runtime_support::recover_hid_if_needed( &e, gadget.clone(), kb.clone(), @@ -541,12 +165,12 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { - let rpc_id = next_stream_id(); + let rpc_id = runtime_support::next_stream_id(); info!(rpc_id, "🎤 stream_microphone opened"); // 1 ─ build once, early let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); info!(%uac_dev, "🎤 stream_microphone using UAC sink"); - let mut sink = open_voice_with_retry(&uac_dev) + let mut sink = runtime_support::open_voice_with_retry(&uac_dev) .await .map_err(|e| Status::internal(format!("{e:#}")))?; @@ -578,7 +202,7 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { - let rpc_id = next_stream_id(); + let rpc_id = runtime_support::next_stream_id(); let cfg = camera::current_camera_config(); info!( rpc_id, @@ -619,7 +243,7 @@ impl Relay for Handler { &self, req: Request, ) -> Result, Status> { - let rpc_id = next_stream_id(); + let rpc_id = runtime_support::next_stream_id(); let req = req.into_inner(); let id = req.id; let dev = match id { @@ -644,7 +268,7 @@ impl Relay for Handler { &self, req: Request, ) -> Result, Status> { - let rpc_id = next_stream_id(); + let rpc_id = runtime_support::next_stream_id(); // Only one speaker stream for now; both 0/1 → same ALSA dev. let _id = req.into_inner().id; // Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging). @@ -708,8 +332,8 @@ async fn main() -> anyhow::Result<()> { if std::env::var("LESAVKA_UVC_EXTERNAL").is_ok() { info!("📷 UVC control helper external; not spawning"); } else { - let bin = uvc_ctrl_bin(); - tokio::spawn(supervise_uvc_control(bin)); + let bin = uvc_runtime::uvc_ctrl_bin(); + tokio::spawn(uvc_runtime::supervise_uvc_control(bin)); } } else { info!("📷 UVC disabled (LESAVKA_DISABLE_UVC set)"); diff --git a/server/tests/hid.rs b/server/tests/hid.rs index 6ca3426..6dcc8a6 100644 --- a/server/tests/hid.rs +++ b/server/tests/hid.rs @@ -1,24 +1,14 @@ -#[tokio::test] -async fn hid_roundtrip() { - use lesavka_common::lesavka::*; - use lesavka_server::RelaySvc; // export the struct in lib.rs - let svc = RelaySvc::default(); - let (mut cli, srv) = tonic::transport::Channel::balance_channel(1); - tokio::spawn( - tonic::transport::server::Server::builder() - .add_service(relay_server::RelayServer::new(svc)) - .serve_with_incoming(srv), - ); +use lesavka_server::runtime_support::{next_stream_id, should_recover_hid_error}; - let (mut tx, mut rx) = relay_client::RelayClient::new(cli) - .stream() - .await - .unwrap() - .into_inner(); - tx.send(HidReport { - data: vec![0, 0, 4, 0, 0, 0, 0, 0], - }) - .await - .unwrap(); - assert!(rx.message().await.unwrap().is_none()); // nothing echoed yet +#[test] +fn hid_runtime_helpers_compile_and_behave() { + assert!(should_recover_hid_error(Some(libc::ENOTCONN))); + assert!(should_recover_hid_error(Some(libc::ESHUTDOWN))); + assert!(should_recover_hid_error(Some(libc::EPIPE))); + assert!(!should_recover_hid_error(Some(libc::EAGAIN))); + assert!(!should_recover_hid_error(None)); + + let first = next_stream_id(); + let second = next_stream_id(); + assert!(second > first); }