#![forbid(unsafe_code)] use anyhow::Context as _; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::time::Duration; use std::{collections::BTreeSet, fs}; use tokio::fs::OpenOptions; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; use tracing::{error, info, trace, warn}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use crate::{audio, gadget::UsbGadget}; static STREAM_SEQ: AtomicU64 = AtomicU64::new(1); /// Initialise structured tracing for the server process. /// /// Inputs: none; configuration is read from `RUST_LOG`. /// Outputs: the non-blocking file writer guard that must stay alive for the /// lifetime of the process. /// Why: the server writes both to stdout and a local log file so field logs are /// still available after a transient SSH disconnect. #[cfg(coverage)] pub fn init_tracing() -> anyhow::Result { let (_writer, guard) = tracing_appender::non_blocking(std::io::sink()); Ok(guard) } #[cfg(not(coverage))] pub fn init_tracing() -> anyhow::Result { let file = std::fs::OpenOptions::new() .create(true) .truncate(true) .write(true) .open("/tmp/lesavka-server.log")?; let (file_writer, guard) = tracing_appender::non_blocking(file); let env_filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::new("lesavka_server=info,lesavka_server::video=warn")); let filter_str = env_filter.to_string(); tracing_subscriber::registry() .with(env_filter) .with(fmt::layer().with_target(true).with_thread_ids(true)) .with( fmt::layer() .with_writer(file_writer) .with_ansi(false) .with_target(true) .with_level(true), ) .init(); tracing::info!("📜 effective RUST_LOG = \"{}\"", filter_str); Ok(guard) } /// Open a HID gadget endpoint with bounded retry logic. /// /// Inputs: the path of the gadget device node to open. /// Outputs: a writable non-blocking file handle once the kernel reports the /// endpoint as ready. /// Why: gadget endpoints frequently flap during cable changes, so the server /// must wait for readiness instead of failing the whole process immediately. #[cfg(coverage)] pub async fn open_with_retry(path: &str) -> anyhow::Result { open_hid_file(path) .await .with_context(|| format!("opening {path}")) } #[cfg(not(coverage))] pub async fn open_with_retry(path: &str) -> anyhow::Result { for attempt in 1..=200 { match open_hid_file(path).await { Ok(file) => { info!("✅ {path} opened on attempt #{attempt}"); return Ok(file); } Err(error) if hid_endpoint_open_is_temporarily_unavailable(error.raw_os_error()) || error.raw_os_error() == Some(libc::EBUSY) => { trace!("⏳ {path} unavailable ({error})… retry #{attempt}"); tokio::time::sleep(Duration::from_millis(50)).await; } Err(error) => return Err(error).with_context(|| format!("opening {path}")), } } Err(anyhow::anyhow!("timeout waiting for {path}")) } async fn open_hid_file(path: &str) -> std::io::Result { OpenOptions::new() .write(true) .custom_flags(libc::O_NONBLOCK) .open(path) .await } pub async fn open_hid_if_ready(path: &str) -> anyhow::Result> { match open_hid_file(path).await { Ok(file) => { info!("✅ {path} opened"); Ok(Some(file)) } Err(error) if hid_endpoint_open_is_temporarily_unavailable(error.raw_os_error()) => { warn!("⌛ {path} is not ready yet ({error}); relay will retry lazily"); Ok(None) } Err(error) => Err(error).with_context(|| format!("opening {path}")), } } #[must_use] pub fn hid_endpoint_open_is_temporarily_unavailable(code: Option) -> bool { matches!( code, Some(libc::ENOENT) | Some(libc::ENODEV) | Some(libc::ENXIO) ) } /// Check whether gadget auto-recovery is enabled. /// /// Inputs: none. /// Outputs: `true` only when the explicit recovery opt-in env var is present. /// Why: cycling the whole USB gadget can be disruptive, so operators must /// choose that behavior deliberately on each deployment. #[must_use] pub fn allow_gadget_cycle() -> bool { std::env::var("LESAVKA_ALLOW_GADGET_CYCLE").is_ok() } /// Return whether a HID write error should trigger recovery. /// /// Inputs: the raw `errno` value observed while writing to a HID gadget. /// Outputs: `true` when the error is consistent with a lost USB connection. /// Why: only transport-level failures should cause device reopen and gadget /// cycling; transient backpressure is handled elsewhere. #[must_use] pub fn should_recover_hid_error(code: Option) -> bool { matches!( code, Some(libc::ENOTCONN) | Some(libc::ESHUTDOWN) | Some(libc::EPIPE) ) || hid_endpoint_open_is_temporarily_unavailable(code) } /// Recover the HID endpoints after a transport failure. /// /// Inputs: the write error plus the current gadget and file handles. /// Outputs: none; recovery runs asynchronously and updates the shared handles /// in place when reopening succeeds. /// Why: streams should survive cable resets without dropping the entire server /// process or requiring a manual restart from the operator. #[cfg(coverage)] pub async fn recover_hid_if_needed( err: &std::io::Error, gadget: UsbGadget, kb: Arc>>, ms: Arc>>, _kb_path: String, _ms_path: String, did_cycle: Arc, ) { let code = err.raw_os_error(); if !should_recover_hid_error(code) { return; } if did_cycle .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { return; } let allow_cycle = allow_gadget_cycle(); tokio::spawn(async move { if allow_cycle { let _ = tokio::task::spawn_blocking(move || gadget.cycle()).await; } else { let _ = (kb, ms); } tokio::time::sleep(Duration::from_secs(2)).await; did_cycle.store(false, Ordering::SeqCst); }); } #[cfg(not(coverage))] pub async fn recover_hid_if_needed( err: &std::io::Error, gadget: UsbGadget, kb: Arc>>, ms: Arc>>, kb_path: String, ms_path: String, did_cycle: Arc, ) { let code = err.raw_os_error(); if !should_recover_hid_error(code) { return; } if did_cycle .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .is_err() { return; } let allow_cycle = allow_gadget_cycle(); tokio::spawn(async move { if allow_cycle { warn!("🔁 HID transport down (errno={code:?}) - cycling gadget"); match tokio::task::spawn_blocking(move || gadget.cycle()).await { Ok(Ok(())) => info!("✅ USB gadget cycle complete (auto-recover)"), Ok(Err(error)) => error!("💥 USB gadget cycle failed: {error:#}"), Err(error) => error!("💥 USB gadget cycle task panicked: {error:#}"), } } else { warn!( "🔒 HID transport down (errno={code:?}) - gadget cycle disabled; set LESAVKA_ALLOW_GADGET_CYCLE=1 to enable" ); } if let Err(error) = async { let kb_new = open_hid_if_ready(&kb_path).await?; let ms_new = open_hid_if_ready(&ms_path).await?; *kb.lock().await = kb_new; *ms.lock().await = ms_new; Ok::<(), anyhow::Error>(()) } .await { error!("💥 HID reopen failed: {error:#}"); } tokio::time::sleep(Duration::from_secs(2)).await; did_cycle.store(false, Ordering::SeqCst); }); } /// Open the UAC sink with retry logic. /// /// Inputs: the ALSA device string that should receive microphone audio. /// Outputs: a ready-to-use `Voice` sink. /// Why: the USB audio gadget can appear after the RPC stream has already been /// negotiated, so the server retries briefly before declaring the sink broken. #[cfg(coverage)] pub async fn open_voice_with_retry(uac_dev: &str) -> anyhow::Result { audio::Voice::new(uac_dev).await } #[cfg(not(coverage))] pub async fn open_voice_with_retry(uac_dev: &str) -> anyhow::Result { let candidates = preferred_uac_device_candidates(uac_dev); let (attempts, delay_ms) = audio_init_retry_policy(); let mut last_error: Option = None; for attempt in 1..=attempts { for candidate in &candidates { match audio::Voice::new(candidate).await { Ok(voice) => { if attempt > 1 || candidate != uac_dev { info!( requested = %uac_dev, resolved = %candidate, attempt, "🎤 microphone sink recovered" ); } return Ok(voice); } Err(error) => { warn!( requested = %uac_dev, candidate = %candidate, attempt, "⚠️ microphone sink init failed: {error:#}" ); last_error = Some(error); } } } tokio::time::sleep(Duration::from_millis(delay_ms)).await; } Err(last_error.unwrap_or_else(|| anyhow::anyhow!("microphone sink init failed"))) } /// Open the UAC capture source with retry logic. /// /// Inputs: the preferred ALSA device string plus the logical stream id. /// Outputs: a ready-to-stream AAC capture pipeline. /// Why: the USB gadget card name is not always stable, so the server should /// retry both the preferred name and any discovered aliases before failing. #[cfg(coverage)] pub async fn open_ear_with_retry(alsa_dev: &str, id: u32) -> anyhow::Result { audio::ear(alsa_dev, id).await } #[cfg(not(coverage))] pub async fn open_ear_with_retry(alsa_dev: &str, id: u32) -> anyhow::Result { let candidates = preferred_uac_device_candidates(alsa_dev); let (attempts, delay_ms) = audio_init_retry_policy(); let mut last_error: Option = None; for attempt in 1..=attempts { for candidate in &candidates { match audio::ear(candidate, id).await { Ok(stream) => { if attempt > 1 || candidate != alsa_dev { info!( requested = %alsa_dev, resolved = %candidate, attempt, "🔊 audio source recovered" ); } return Ok(stream); } Err(error) => { warn!( requested = %alsa_dev, candidate = %candidate, attempt, "⚠️ audio source init failed: {error:#}" ); last_error = Some(error); } } } tokio::time::sleep(Duration::from_millis(delay_ms)).await; } Err(last_error.unwrap_or_else(|| anyhow::anyhow!("audio source init failed"))) } #[cfg(not(coverage))] fn audio_init_retry_policy() -> (u32, u64) { let attempts = std::env::var("LESAVKA_AUDIO_INIT_ATTEMPTS") .ok() .and_then(|value| value.parse::().ok()) .or_else(|| { std::env::var("LESAVKA_MIC_INIT_ATTEMPTS") .ok() .and_then(|value| value.parse::().ok()) }) .unwrap_or(20) .max(1); let delay_ms = std::env::var("LESAVKA_AUDIO_INIT_DELAY_MS") .ok() .and_then(|value| value.parse::().ok()) .or_else(|| { std::env::var("LESAVKA_MIC_INIT_DELAY_MS") .ok() .and_then(|value| value.parse::().ok()) }) .unwrap_or(250); (attempts, delay_ms) } #[cfg(not(coverage))] fn preferred_uac_device_candidates(preferred: &str) -> Vec { let mut out = Vec::new(); let mut seen = BTreeSet::new(); let auto_family = [ "hw:UAC2Gadget,0", "hw:UAC2_Gadget,0", "hw:Composite,0", "hw:Lesavka,0", ]; let allow_aliases = auto_family.contains(&preferred); push_audio_candidate_family(&mut out, &mut seen, preferred); if allow_aliases { for detected in detect_uac_card_candidates() { push_audio_candidate_family(&mut out, &mut seen, &detected); } for alias in auto_family { push_audio_candidate_family(&mut out, &mut seen, alias); } } out } #[cfg(not(coverage))] fn push_audio_candidate_family( out: &mut Vec, seen: &mut BTreeSet, candidate: &str, ) { let trimmed = candidate.trim(); if trimmed.is_empty() { return; } push_audio_candidate(out, seen, trimmed); if let Some(rest) = trimmed.strip_prefix("hw:") { push_audio_candidate(out, seen, &format!("plughw:{rest}")); } else if let Some(rest) = trimmed.strip_prefix("plughw:") { push_audio_candidate(out, seen, &format!("hw:{rest}")); } } #[cfg(not(coverage))] fn push_audio_candidate(out: &mut Vec, seen: &mut BTreeSet, candidate: &str) { let trimmed = candidate.trim(); if trimmed.is_empty() { return; } if seen.insert(trimmed.to_string()) { out.push(trimmed.to_string()); } } #[cfg(not(coverage))] fn detect_uac_card_candidates() -> Vec { let mut out = Vec::new(); let mut seen = BTreeSet::new(); let card_data = fs::read_to_string("/proc/asound/cards").ok(); let numeric_card_ids = card_data .as_deref() .map(parse_uac_numeric_card_ids) .unwrap_or_default(); if let Some(cards) = card_data.as_deref() { for candidate in parse_uac_named_card_candidates(cards) { push_audio_candidate(&mut out, &mut seen, &candidate); } } if let Ok(pcm) = fs::read_to_string("/proc/asound/pcm") { for candidate in parse_uac_pcm_candidates(&pcm, &numeric_card_ids) { push_audio_candidate(&mut out, &mut seen, &candidate); } } out } #[cfg(not(coverage))] fn parse_uac_named_card_candidates(cards: &str) -> Vec { cards .lines() .filter_map(|line| { let lower = line.to_ascii_lowercase(); if !(lower.contains("uac2") || lower.contains("gadget") || lower.contains("composite") || lower.contains("lesavka")) { return None; } let start = line.find('[')?; let end = line[start + 1..].find(']')?; let card_id = line[start + 1..start + 1 + end].trim(); (!card_id.is_empty()).then(|| format!("hw:{card_id},0")) }) .collect() } #[cfg(not(coverage))] fn parse_uac_numeric_card_ids(cards: &str) -> BTreeSet { cards .lines() .filter_map(|line| { let lower = line.to_ascii_lowercase(); if !(lower.contains("uac2") || lower.contains("gadget") || lower.contains("composite") || lower.contains("lesavka")) { return None; } line.split_whitespace() .next() .filter(|candidate| candidate.chars().all(|ch| ch.is_ascii_digit())) .map(|candidate| candidate.to_string()) }) .collect() } #[cfg(not(coverage))] fn parse_uac_pcm_candidates(pcm: &str, numeric_card_ids: &BTreeSet) -> Vec { pcm.lines() .filter_map(|line| { let (prefix, _) = line.split_once(':')?; let (card_id, device_id) = prefix.split_once('-')?; let normalized_card = card_id.trim_start_matches('0'); let normalized_card = if normalized_card.is_empty() { "0" } else { normalized_card }; let normalized_device = device_id.trim_start_matches('0'); let normalized_device = if normalized_device.is_empty() { "0" } else { normalized_device }; numeric_card_ids .contains(normalized_card) .then(|| format!("hw:{normalized_card},{normalized_device}")) }) .collect() } /// Allocate a stream identifier for logging and correlation. /// /// Inputs: none. /// Outputs: a monotonically increasing identifier. /// Why: the server multiplexes several long-lived streams, so log lines need a /// cheap correlation id that is stable across retries. #[must_use] pub fn next_stream_id() -> u64 { STREAM_SEQ.fetch_add(1, Ordering::Relaxed) } /// Write one HID report with a short bounded retry loop. /// /// Inputs: the shared gadget file handle plus the already-encoded report. /// Outputs: `Ok(())` when the report reached the kernel buffer, or the final /// write error after retrying transient backpressure. /// Why: a brief retry window avoids dropping reports during momentary gadget /// stalls without blocking the stream task indefinitely. #[cfg(coverage)] pub async fn write_hid_report( dev: &Arc>>, path: &str, data: &[u8], ) -> std::io::Result<()> { let mut file = dev.lock().await; if file.is_none() { *file = Some(open_hid_file(path).await?); } if let Some(file) = file.as_mut() { file.write_all(data).await } else { Err(std::io::Error::new( std::io::ErrorKind::NotConnected, "HID endpoint is not open", )) } } #[cfg(not(coverage))] pub async fn write_hid_report( dev: &Arc>>, path: &str, data: &[u8], ) -> std::io::Result<()> { let attempts = std::env::var("LESAVKA_HID_WRITE_RETRIES") .ok() .and_then(|value| value.parse::().ok()) .unwrap_or(24) .max(1); let base_delay_ms = std::env::var("LESAVKA_HID_WRITE_RETRY_DELAY_MS") .ok() .and_then(|value| value.parse::().ok()) .unwrap_or(2) .max(1); let mut last_error: Option = None; for attempt in 0..attempts { let mut file = dev.lock().await; if file.is_none() { match open_hid_file(path).await { Ok(opened) => { info!("✅ {path} opened lazily"); *file = Some(opened); } Err(error) => return Err(error), } } let Some(file_handle) = file.as_mut() else { return Err(std::io::Error::new( std::io::ErrorKind::NotConnected, "HID endpoint is not open", )); }; match file_handle.write_all(data).await { Ok(()) => return Ok(()), Err(error) if error.kind() == std::io::ErrorKind::WouldBlock || error.raw_os_error() == Some(libc::EAGAIN) => { last_error = Some(error); } Err(error) => { if should_recover_hid_error(error.raw_os_error()) { *file = None; } return Err(error); } } drop(file); tokio::time::sleep(Duration::from_millis((attempt as u64 + 1) * base_delay_ms)).await; } Err(last_error.unwrap_or_else(|| std::io::Error::from_raw_os_error(libc::EAGAIN))) } #[cfg(test)] mod tests { use super::{ allow_gadget_cycle, detect_uac_card_candidates, next_stream_id, open_with_retry, parse_uac_named_card_candidates, parse_uac_numeric_card_ids, parse_uac_pcm_candidates, preferred_uac_device_candidates, should_recover_hid_error, write_hid_report, }; use serial_test::serial; use std::collections::BTreeSet; use std::sync::Arc; use temp_env::with_var; use tempfile::NamedTempFile; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; #[test] #[serial] fn allow_gadget_cycle_tracks_env_presence() { with_var("LESAVKA_ALLOW_GADGET_CYCLE", None::<&str>, || { assert!(!allow_gadget_cycle()); }); with_var("LESAVKA_ALLOW_GADGET_CYCLE", Some("1"), || { assert!(allow_gadget_cycle()); }); } #[test] fn should_recover_hid_error_matches_transport_failures() { assert!(should_recover_hid_error(Some(libc::ENOTCONN))); assert!(should_recover_hid_error(Some(libc::ESHUTDOWN))); assert!(should_recover_hid_error(Some(libc::EPIPE))); assert!(!should_recover_hid_error(Some(libc::EAGAIN))); assert!(!should_recover_hid_error(None)); } #[test] fn next_stream_id_monotonically_increments() { let first = next_stream_id(); let second = next_stream_id(); assert!(second > first); } #[test] fn preferred_uac_device_candidates_keeps_custom_override_only() { let candidates = preferred_uac_device_candidates("hw:7,0"); assert_eq!(candidates, vec!["hw:7,0", "plughw:7,0"]); } #[test] fn preferred_uac_device_candidates_expands_known_aliases() { let candidates = preferred_uac_device_candidates("hw:UAC2Gadget,0"); assert!(candidates.iter().any(|value| value == "hw:UAC2Gadget,0")); assert!( candidates .iter() .any(|value| value == "plughw:UAC2Gadget,0") ); assert!(candidates.iter().any(|value| value == "hw:UAC2_Gadget,0")); assert!(candidates.iter().any(|value| value == "hw:Composite,0")); } #[test] fn detect_uac_card_candidates_returns_hw_names_only() { let live = detect_uac_card_candidates(); assert!(live.iter().all(|value| value.starts_with("hw:"))); } #[test] fn parse_uac_card_helpers_collect_named_and_numeric_candidates() { let cards = "\ 0 [PCH ]: HDA-Intel - HDA Intel PCH\n\ 2 [UAC2Gadget ]: USB-Audio - UAC2Gadget\n\ Lesavka USB Audio\n"; assert_eq!( parse_uac_named_card_candidates(cards), vec!["hw:UAC2Gadget,0"] ); assert!( parse_uac_numeric_card_ids(cards).contains("2"), "expected numeric card index for the gadget card" ); } #[test] fn parse_uac_pcm_candidates_expands_all_matching_device_indexes() { let pcm = "\ 00-00: PCH device : playback 1 : capture 1\n\ 02-00: USB Audio : USB Audio : playback 1 : capture 1\n\ 02-01: USB Audio #1 : USB Audio #1 : playback 1 : capture 1\n"; let ids = BTreeSet::from(["2".to_string()]); assert_eq!( parse_uac_pcm_candidates(pcm, &ids), vec!["hw:2,0", "hw:2,1"] ); } #[tokio::test] #[serial] async fn open_with_retry_opens_existing_file() { let tmp = NamedTempFile::new().expect("temp file"); let mut file = open_with_retry(tmp.path().to_str().unwrap()) .await .expect("open should succeed"); file.write_all(b"ok").await.expect("write temp file"); file.sync_all().await.expect("sync temp file"); assert_eq!( tokio::fs::read(tmp.path()).await.expect("read temp file"), b"ok" ); } #[tokio::test] #[serial] async fn write_hid_report_writes_bytes() { let tmp = NamedTempFile::new().expect("temp file"); let file = tokio::fs::OpenOptions::new() .write(true) .truncate(true) .open(tmp.path()) .await .expect("open temp file"); let shared = Arc::new(Mutex::new(Some(file))); write_hid_report(&shared, tmp.path().to_str().unwrap(), &[1, 2, 3, 4]) .await .expect("write succeeds"); let contents = tokio::fs::read(tmp.path()) .await .expect("read back temp file"); assert_eq!(&contents, &[1, 2, 3, 4]); } }