#![forbid(unsafe_code)] use anyhow::Context as _; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::Duration; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; use tracing::{error, info, trace, warn}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use crate::{audio, gadget::UsbGadget}; static STREAM_SEQ: AtomicU64 = AtomicU64::new(1); /// Initialise structured tracing for the server process. /// /// Inputs: none; configuration is read from `RUST_LOG`. /// Outputs: the non-blocking file writer guard that must stay alive for the /// lifetime of the process. /// Why: the server writes both to stdout and a local log file so field logs are /// still available after a transient SSH disconnect. pub fn init_tracing() -> anyhow::Result { let file = std::fs::OpenOptions::new() .create(true) .truncate(true) .write(true) .open("/tmp/lesavka-server.log")?; let (file_writer, guard) = tracing_appender::non_blocking(file); let env_filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("lesavka_server=info,lesavka_server::video=warn")); let filter_str = env_filter.to_string(); tracing_subscriber::registry() .with(env_filter) .with(fmt::layer().with_target(true).with_thread_ids(true)) .with( fmt::layer() .with_writer(file_writer) .with_ansi(false) .with_target(true) .with_level(true), ) .init(); tracing::info!("📜 effective RUST_LOG = \"{}\"", filter_str); Ok(guard) } /// Open a HID gadget endpoint with bounded retry logic. /// /// Inputs: the path of the gadget device node to open. /// Outputs: a writable non-blocking file handle once the kernel reports the /// endpoint as ready. /// Why: gadget endpoints frequently flap during cable changes, so the server /// must wait for readiness instead of failing the whole process immediately. pub async fn open_with_retry(path: &str) -> anyhow::Result { for attempt in 1..=200 { match OpenOptions::new() .write(true) .custom_flags(libc::O_NONBLOCK) .open(path) .await { Ok(file) => { info!("✅ {path} opened on attempt #{attempt}"); return Ok(file); } Err(error) if error.raw_os_error() == Some(libc::EBUSY) => { trace!("⏳ {path} busy… retry #{attempt}"); tokio::time::sleep(Duration::from_millis(50)).await; } Err(error) => return Err(error).with_context(|| format!("opening {path}")), } } Err(anyhow::anyhow!("timeout waiting for {path}")) } /// Check whether gadget auto-recovery is enabled. /// /// Inputs: none. /// Outputs: `true` only when the explicit recovery opt-in env var is present. /// Why: cycling the whole USB gadget can be disruptive, so operators must /// choose that behavior deliberately on each deployment. #[must_use] pub fn allow_gadget_cycle() -> bool { std::env::var("LESAVKA_ALLOW_GADGET_CYCLE").is_ok() } /// Return whether a HID write error should trigger recovery. /// /// Inputs: the raw `errno` value observed while writing to a HID gadget. /// Outputs: `true` when the error is consistent with a lost USB connection. /// Why: only transport-level failures should cause device reopen and gadget /// cycling; transient backpressure is handled elsewhere. #[must_use] pub fn should_recover_hid_error(code: Option) -> bool { matches!( code, Some(libc::ENOTCONN) | Some(libc::ESHUTDOWN) | Some(libc::EPIPE) ) } /// Recover the HID endpoints after a transport failure. /// /// Inputs: the write error plus the current gadget and file handles. /// Outputs: none; recovery runs asynchronously and updates the shared handles /// in place when reopening succeeds. /// Why: streams should survive cable resets without dropping the entire server /// process or requiring a manual restart from the operator. pub async fn recover_hid_if_needed( err: &std::io::Error, gadget: UsbGadget, kb: Arc>, ms: Arc>, did_cycle: Arc, ) { let code = err.raw_os_error(); if !should_recover_hid_error(code) { return; } if did_cycle .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { return; } let allow_cycle = allow_gadget_cycle(); tokio::spawn(async move { if allow_cycle { warn!("🔁 HID transport down (errno={code:?}) - cycling gadget"); match tokio::task::spawn_blocking(move || gadget.cycle()).await { Ok(Ok(())) => info!("✅ USB gadget cycle complete (auto-recover)"), Ok(Err(error)) => error!("💥 USB gadget cycle failed: {error:#}"), Err(error) => error!("💥 USB gadget cycle task panicked: {error:#}"), } } else { warn!( "🔒 HID transport down (errno={code:?}) - gadget cycle disabled; set LESAVKA_ALLOW_GADGET_CYCLE=1 to enable" ); } if let Err(error) = async { let kb_new = open_with_retry("/dev/hidg0").await?; let ms_new = open_with_retry("/dev/hidg1").await?; *kb.lock().await = kb_new; *ms.lock().await = ms_new; Ok::<(), anyhow::Error>(()) } .await { error!("💥 HID reopen failed: {error:#}"); } tokio::time::sleep(Duration::from_secs(2)).await; did_cycle.store(false, Ordering::SeqCst); }); } /// Open the UAC sink with retry logic. /// /// Inputs: the ALSA device string that should receive microphone audio. /// Outputs: a ready-to-use `Voice` sink. /// Why: the USB audio gadget can appear after the RPC stream has already been /// negotiated, so the server retries briefly before declaring the sink broken. pub async fn open_voice_with_retry(uac_dev: &str) -> anyhow::Result { let attempts = std::env::var("LESAVKA_MIC_INIT_ATTEMPTS") .ok() .and_then(|value| value.parse::().ok()) .unwrap_or(5) .max(1); let delay_ms = std::env::var("LESAVKA_MIC_INIT_DELAY_MS") .ok() .and_then(|value| value.parse::().ok()) .unwrap_or(250); let mut last_error: Option = None; for attempt in 1..=attempts { match audio::Voice::new(uac_dev).await { Ok(voice) => { if attempt > 1 { info!(%uac_dev, attempt, "🎤 microphone sink recovered"); } return Ok(voice); } Err(error) => { warn!(%uac_dev, attempt, "⚠️ microphone sink init failed: {error:#}"); last_error = Some(error); tokio::time::sleep(Duration::from_millis(delay_ms)).await; } } } Err(last_error.unwrap_or_else(|| anyhow::anyhow!("microphone sink init failed"))) } /// Allocate a stream identifier for logging and correlation. /// /// Inputs: none. /// Outputs: a monotonically increasing identifier. /// Why: the server multiplexes several long-lived streams, so log lines need a /// cheap correlation id that is stable across retries. #[must_use] pub fn next_stream_id() -> u64 { STREAM_SEQ.fetch_add(1, Ordering::Relaxed) } /// Write one HID report with a short bounded retry loop. /// /// Inputs: the shared gadget file handle plus the already-encoded report. /// Outputs: `Ok(())` when the report reached the kernel buffer, or the final /// write error after retrying transient backpressure. /// Why: a brief retry window avoids dropping reports during momentary gadget /// stalls without blocking the stream task indefinitely. pub async fn write_hid_report( dev: &Arc>, data: &[u8], ) -> std::io::Result<()> { let mut last_error: Option = None; for attempt in 0..5 { let mut file = dev.lock().await; match file.write_all(data).await { Ok(()) => return Ok(()), Err(error) if error.kind() == std::io::ErrorKind::WouldBlock || error.raw_os_error() == Some(libc::EAGAIN) => { last_error = Some(error); } Err(error) => return Err(error), } drop(file); tokio::time::sleep(Duration::from_millis((attempt as u64 + 1) * 2)).await; } Err(last_error.unwrap_or_else(|| std::io::Error::from_raw_os_error(libc::EAGAIN))) } #[cfg(test)] mod tests { use super::{ allow_gadget_cycle, next_stream_id, open_with_retry, should_recover_hid_error, write_hid_report, }; use serial_test::serial; use std::sync::Arc; use temp_env::with_var; use tempfile::NamedTempFile; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; #[test] #[serial] fn allow_gadget_cycle_tracks_env_presence() { with_var("LESAVKA_ALLOW_GADGET_CYCLE", None::<&str>, || { assert!(!allow_gadget_cycle()); }); with_var("LESAVKA_ALLOW_GADGET_CYCLE", Some("1"), || { assert!(allow_gadget_cycle()); }); } #[test] fn should_recover_hid_error_matches_transport_failures() { assert!(should_recover_hid_error(Some(libc::ENOTCONN))); assert!(should_recover_hid_error(Some(libc::ESHUTDOWN))); assert!(should_recover_hid_error(Some(libc::EPIPE))); assert!(!should_recover_hid_error(Some(libc::EAGAIN))); assert!(!should_recover_hid_error(None)); } #[test] fn next_stream_id_monotonically_increments() { let first = next_stream_id(); let second = next_stream_id(); assert!(second > first); } #[tokio::test] #[serial] async fn open_with_retry_opens_existing_file() { let tmp = NamedTempFile::new().expect("temp file"); let mut file = open_with_retry(tmp.path().to_str().unwrap()) .await .expect("open should succeed"); file.write_all(b"ok").await.expect("write temp file"); file.sync_all().await.expect("sync temp file"); assert_eq!( tokio::fs::read(tmp.path()).await.expect("read temp file"), b"ok" ); } #[tokio::test] #[serial] async fn write_hid_report_writes_bytes() { let tmp = NamedTempFile::new().expect("temp file"); let file = tokio::fs::OpenOptions::new() .write(true) .truncate(true) .open(tmp.path()) .await .expect("open temp file"); let shared = Arc::new(Mutex::new(file)); write_hid_report(&shared, &[1, 2, 3, 4]) .await .expect("write succeeds"); let contents = tokio::fs::read(tmp.path()) .await .expect("read back temp file"); assert_eq!(&contents, &[1, 2, 3, 4]); } }