use anyhow::Context as _; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::Duration; use std::{collections::BTreeSet, fs}; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; use tracing::{error, info, trace, warn}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use crate::{audio, gadget::UsbGadget}; static STREAM_SEQ: AtomicU64 = AtomicU64::new(1); /// Initialise structured tracing for the server process. /// /// Inputs: none; configuration is read from `RUST_LOG`. /// Outputs: the non-blocking file writer guard that must stay alive for the /// lifetime of the process. /// Why: the server writes both to stdout and a local log file so field logs are /// still available after a transient SSH disconnect. #[cfg(coverage)] pub fn init_tracing() -> anyhow::Result { let (_writer, guard) = tracing_appender::non_blocking(std::io::sink()); Ok(guard) } #[cfg(not(coverage))] pub fn init_tracing() -> anyhow::Result { let env_filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("lesavka_server=info,lesavka_server::video=warn")); let filter_str = env_filter.to_string(); let file = open_server_log_file(); let log_open_error = file.as_ref().err().map(ToString::to_string); let (file_writer, guard) = match file { Ok(file) => { let (writer, guard) = tracing_appender::non_blocking(file); (Some(writer), guard) } Err(_) => { let (_writer, guard) = tracing_appender::non_blocking(std::io::sink()); (None, guard) } }; let registry = tracing_subscriber::registry() .with(env_filter) .with(fmt::layer().with_target(true).with_thread_ids(true)); if let Some(file_writer) = file_writer { registry .with( fmt::layer() .with_writer(file_writer) .with_ansi(false) .with_target(true) .with_level(true), ) .init(); } else { registry.init(); } tracing::info!("📜 effective RUST_LOG = \"{}\"", filter_str); if let Some(error) = log_open_error { tracing::warn!("file logging disabled: {error}"); } Ok(guard) } #[cfg(not(coverage))] fn open_server_log_file() -> std::io::Result { let preferred = std::env::var("LESAVKA_SERVER_LOG_PATH") .unwrap_or_else(|_| "/var/log/lesavka/server.log".to_string()); for path in [preferred.as_str(), "/tmp/lesavka-server.log"] { match std::fs::OpenOptions::new() .create(true) .truncate(true) .write(true) .open(path) { Ok(file) => return Ok(file), Err(error) if path != "/tmp/lesavka-server.log" => { eprintln!("lesavka-server: failed to open {path}: {error}; trying /tmp fallback"); } Err(error) => return Err(error), } } unreachable!("static log path list is non-empty") } /// Open a HID gadget endpoint with bounded retry logic. /// /// Inputs: the path of the gadget device node to open. /// Outputs: a writable non-blocking file handle once the kernel reports the /// endpoint as ready. /// Why: gadget endpoints frequently flap during cable changes, so the server /// must wait for readiness instead of failing the whole process immediately. #[cfg(coverage)] pub async fn open_with_retry(path: &str) -> anyhow::Result { open_hid_file(path) .await .with_context(|| format!("opening {path}")) } #[cfg(not(coverage))] pub async fn open_with_retry(path: &str) -> anyhow::Result { for attempt in 1..=200 { match open_hid_file(path).await { Ok(file) => { info!("✅ {path} opened on attempt #{attempt}"); return Ok(file); } Err(error) if hid_endpoint_open_is_temporarily_unavailable(error.raw_os_error()) || error.raw_os_error() == Some(libc::EBUSY) => { trace!("⏳ {path} unavailable ({error})… retry #{attempt}"); tokio::time::sleep(Duration::from_millis(50)).await; } Err(error) => return Err(error).with_context(|| format!("opening {path}")), } } Err(anyhow::anyhow!("timeout waiting for {path}")) } async fn open_hid_file(path: &str) -> std::io::Result { OpenOptions::new() .write(true) .custom_flags(libc::O_NONBLOCK) .open(path) .await } pub async fn open_hid_if_ready(path: &str) -> anyhow::Result> { match open_hid_file(path).await { Ok(file) => { info!("✅ {path} opened"); Ok(Some(file)) } Err(error) if hid_endpoint_open_is_temporarily_unavailable(error.raw_os_error()) => { warn!("⌛ {path} is not ready yet ({error}); relay will retry lazily"); Ok(None) } Err(error) => Err(error).with_context(|| format!("opening {path}")), } } #[must_use] pub fn hid_endpoint_open_is_temporarily_unavailable(code: Option) -> bool { matches!( code, Some(libc::ENOENT) | Some(libc::ENODEV) | Some(libc::ENXIO) ) } /// Check whether the standalone UVC helper owns the gadget device. /// /// Inputs: process environment. /// Outputs: `true` when UVC is enabled and supervised by systemd instead of /// the server process. /// Why: automatic whole-gadget resets can wedge configfs if they race the UVC /// helper's open video-output node. #[must_use] pub fn external_uvc_helper_owns_gadget() -> bool { std::env::var("LESAVKA_UVC_EXTERNAL").is_ok() && std::env::var("LESAVKA_DISABLE_UVC").is_err() } /// Check whether gadget auto-recovery is enabled. /// /// Inputs: none. /// Outputs: `true` only when the explicit recovery opt-in env var is present. /// Why: cycling the whole USB gadget can be disruptive, and it is especially /// unsafe while the external UVC helper owns the gadget video node. #[must_use] pub fn allow_gadget_cycle() -> bool { std::env::var("LESAVKA_ALLOW_GADGET_CYCLE").is_ok() && (!external_uvc_helper_owns_gadget() || std::env::var("LESAVKA_ALLOW_EXTERNAL_UVC_GADGET_CYCLE").is_ok()) } /// Return whether a HID write error should trigger recovery. /// /// Inputs: the raw `errno` value observed while writing to a HID gadget. /// Outputs: `true` when the error is consistent with a lost USB connection. /// Why: only transport-level failures should cause device reopen and gadget /// cycling; transient backpressure is handled elsewhere. #[must_use] pub fn should_recover_hid_error(code: Option) -> bool { matches!( code, Some(libc::ENOTCONN) | Some(libc::ESHUTDOWN) | Some(libc::EPIPE) ) || hid_endpoint_open_is_temporarily_unavailable(code) } /// Recover the HID endpoints after a transport failure. /// /// Inputs: the write error plus the current gadget and file handles. /// Outputs: none; recovery runs asynchronously and updates the shared handles /// in place when reopening succeeds. /// Why: streams should survive cable resets without dropping the entire server /// process or requiring a manual restart from the operator. #[cfg(coverage)] pub async fn recover_hid_if_needed( err: &std::io::Error, gadget: UsbGadget, kb: Arc>>, ms: Arc>>, _kb_path: String, _ms_path: String, did_cycle: Arc, ) { let code = err.raw_os_error(); if !should_recover_hid_error(code) { return; } if did_cycle .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { return; } let allow_cycle = allow_gadget_cycle(); tokio::spawn(async move { if allow_cycle { let _ = tokio::task::spawn_blocking(move || gadget.cycle()).await; } else { let _ = (kb, ms); } tokio::time::sleep(Duration::from_secs(2)).await; did_cycle.store(false, Ordering::SeqCst); }); } #[cfg(not(coverage))] pub async fn recover_hid_if_needed( err: &std::io::Error, gadget: UsbGadget, kb: Arc>>, ms: Arc>>, kb_path: String, ms_path: String, did_cycle: Arc, ) { let code = err.raw_os_error(); if !should_recover_hid_error(code) { return; } if did_cycle .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { return; } let allow_cycle = allow_gadget_cycle(); tokio::spawn(async move { if allow_cycle { warn!("🔁 HID transport down (errno={code:?}) - aggressively recovering gadget"); match tokio::task::spawn_blocking(move || gadget.recover_enumeration()).await { Ok(Ok(())) => info!("✅ USB gadget recovery complete (auto-recover)"), Ok(Err(error)) => error!("💥 USB gadget recovery failed: {error:#}"), Err(error) => error!("💥 USB gadget recovery task panicked: {error:#}"), } } else { warn!( "🔒 HID transport down (errno={code:?}) - gadget cycle disabled; set LESAVKA_ALLOW_GADGET_CYCLE=1 to enable" ); } if let Err(error) = async { let kb_new = open_hid_if_ready(&kb_path).await?; let ms_new = open_hid_if_ready(&ms_path).await?; *kb.lock().await = kb_new; *ms.lock().await = ms_new; Ok::<(), anyhow::Error>(()) } .await { error!("💥 HID reopen failed: {error:#}"); } tokio::time::sleep(Duration::from_secs(2)).await; did_cycle.store(false, Ordering::SeqCst); }); }