// lesavka-server - gadget cycle guarded by env // server/src/main.rs #[allow(clippy::useless_attribute)] #[forbid(unsafe_code)] use anyhow::Context; use futures_util::{Stream, StreamExt}; use std::collections::HashMap; use std::collections::HashSet; use std::os::unix::fs::FileTypeExt; use std::process::Command; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc, time::Duration}; use tokio::sync::{Mutex, broadcast}; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Server; use tonic::{Request, Response, Status}; use tonic_reflection::server::Builder as ReflBuilder; use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::{ AudioPacket, CapturePowerCommand, CapturePowerState, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, VideoPacket, relay_server::{Relay, RelayServer}, }; use lesavka_server::{ camera, camera_runtime::CameraRuntime, capture_power::CapturePowerManager, gadget::UsbGadget, handshake::HandshakeSvc, paste, runtime_support, runtime_support::init_tracing, uvc_runtime, video, }; /*──────────────── constants ────────────────*/ const PKG_NAME: &str = env!("CARGO_PKG_NAME"); type VideoStream = Pin> + Send>>; type AudioStream = Pin> + Send>>; fn hid_endpoint(index: u8) -> String { std::env::var("LESAVKA_HID_DIR") .map(|dir| format!("{dir}/hidg{index}")) .unwrap_or_else(|_| format!("/dev/hidg{index}")) } fn live_keyboard_report_delay() -> Duration { std::env::var("LESAVKA_LIVE_KEYBOARD_REPORT_DELAY_MS") .ok() .and_then(|value| value.parse::().ok()) .map(Duration::from_millis) .unwrap_or_else(|| Duration::from_millis(8)) } /*──────────────── Handler ───────────────────*/ struct Handler { kb: Arc>>, ms: Arc>>, gadget: UsbGadget, did_cycle: Arc, camera_rt: Arc, capture_power: CapturePowerManager, eye_hubs: Arc>>>, } #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] struct EyeHubKey { source_id: u32, requested_width: u32, requested_height: u32, requested_fps: u32, } struct EyeHub { tx: broadcast::Sender, running: Arc, subscribers: Arc, abort: tokio::task::AbortHandle, } impl Handler { async fn new(gadget: UsbGadget) -> anyhow::Result { #[cfg(not(coverage))] if runtime_support::allow_gadget_cycle() { info!("🛠️ Initial USB recovery…"); match UsbGadget::current_controller_state() { Ok((ctrl, state)) if !UsbGadget::host_enumerated_state(&state) => { warn!("⚠️ UDC {ctrl} is {state}; forcing gadget recovery before opening HID"); if let Err(error) = gadget.recover_enumeration() { warn!("⚠️ initial USB recovery did not enumerate the host: {error:#}"); } } Ok(_) => { let _ = gadget.cycle(); // ignore failure - may boot without host } Err(error) => { warn!("⚠️ UDC state unavailable during startup: {error:#}"); let _ = gadget.cycle(); // preserve the old best-effort startup path } } } #[cfg(not(coverage))] { if !runtime_support::allow_gadget_cycle() { info!( "🔒 gadget cycle disabled at startup (set LESAVKA_ALLOW_GADGET_CYCLE=1 to enable)" ); } info!("🛠️ opening HID endpoints …"); } let kb_path = hid_endpoint(0); let ms_path = hid_endpoint(1); let kb = runtime_support::open_hid_if_ready(&kb_path).await?; let ms = runtime_support::open_hid_if_ready(&ms_path).await?; #[cfg(not(coverage))] if kb.is_some() && ms.is_some() { info!("✅ HID endpoints ready"); } else { warn!("⌛ HID endpoints are not ready; relay will keep running and open them lazily"); } Ok(Self { kb: Arc::new(Mutex::new(kb)), ms: Arc::new(Mutex::new(ms)), gadget, did_cycle: Arc::new(AtomicBool::new(false)), camera_rt: Arc::new(CameraRuntime::new()), capture_power: CapturePowerManager::new(), eye_hubs: Arc::new(Mutex::new(HashMap::new())), }) } async fn reopen_hid(&self) -> anyhow::Result<()> { let kb_new = runtime_support::open_hid_if_ready(&hid_endpoint(0)).await?; let ms_new = runtime_support::open_hid_if_ready(&hid_endpoint(1)).await?; *self.kb.lock().await = kb_new; *self.ms.lock().await = ms_new; Ok(()) } fn detected_capture_devices_from_symlinks() -> u32 { ["/dev/lesavka_l_eye", "/dev/lesavka_r_eye"] .into_iter() .filter(|path| { std::fs::metadata(path) .ok() .is_some_and(|metadata| metadata.file_type().is_char_device()) }) .count() as u32 } fn detected_capture_devices_from_udev() -> u32 { let Ok(mut enumerator) = udev::Enumerator::new() else { return 0; }; let _ = enumerator.match_subsystem("video4linux"); let Ok(devices) = enumerator.scan_devices() else { return 0; }; devices .filter(|device| { device .attribute_value("index") .and_then(|value| value.to_str()) == Some("0") && device .property_value("ID_VENDOR_ID") .and_then(|value| value.to_str()) == Some("07ca") && device .property_value("ID_MODEL_ID") .and_then(|value| value.to_str()) == Some("3311") }) .count() .min(2) as u32 } async fn active_eye_source_count(&self) -> u32 { self.eye_hubs .lock() .await .iter() .filter_map(|(key, hub)| hub.running.load(Ordering::Relaxed).then_some(key.source_id)) .collect::>() .len() .min(2) as u32 } async fn with_detected_capture_devices( &self, mut state: CapturePowerState, ) -> CapturePowerState { state.detected_devices = Self::detected_capture_devices_from_udev() .max(Self::detected_capture_devices_from_symlinks()) .max(self.active_eye_source_count().await); state } async fn capture_video_reply( &self, req: MonitorRequest, ) -> Result, Status> { let id = req.id; if id > 1 { return Err(Status::invalid_argument("monitor id must be 0 or 1")); } let source_id = req.source_id.unwrap_or(id); if source_id > 1 { return Err(Status::invalid_argument("source id must be 0 or 1")); } let dev = match source_id { 0 => "/dev/lesavka_l_eye", 1 => "/dev/lesavka_r_eye", _ => return Err(Status::invalid_argument("source id must be 0 or 1")), }; #[cfg(not(coverage))] { let rpc_id = runtime_support::next_stream_id(); info!( rpc_id, id, source_id, max_bitrate = req.max_bitrate, requested_width = req.requested_width, requested_height = req.requested_height, requested_fps = req.requested_fps, "🎥 capture_video opened" ); debug!(rpc_id, "🎥 streaming {dev}"); } let hub = self .eye_hub( dev, EyeHubKey { source_id, requested_width: req.requested_width, requested_height: req.requested_height, requested_fps: req.requested_fps, }, req.max_bitrate, ) .await?; let mut hub_rx = hub.tx.subscribe(); hub.subscribers.fetch_add(1, Ordering::AcqRel); let subscribers = Arc::clone(&hub.subscribers); let hub_for_task = Arc::clone(&hub); let (tx, rx) = tokio::sync::mpsc::channel(32); tokio::spawn(async move { loop { match hub_rx.recv().await { Ok(mut pkt) => { pkt.id = id; if tx.send(Ok(pkt)).await.is_err() { break; } } Err(broadcast::error::RecvError::Lagged(_)) => continue, Err(broadcast::error::RecvError::Closed) => break, } } if subscribers.fetch_sub(1, Ordering::AcqRel) == 1 { hub_for_task.shutdown(); } }); Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) } async fn eye_hub( &self, dev: &str, key: EyeHubKey, max_bitrate_kbit: u32, ) -> Result, Status> { let stale_hubs = { let mut hubs = self.eye_hubs.lock().await; if let Some(hub) = hubs.get(&key) && hub.running.load(Ordering::Relaxed) { return Ok(Arc::clone(hub)); } take_conflicting_eye_hubs(&mut hubs, key) }; if !stale_hubs.is_empty() { info!( source_id = key.source_id, requested_width = key.requested_width, requested_height = key.requested_height, requested_fps = key.requested_fps, stale_hubs = stale_hubs.len(), "🎥 replacing stale/conflicting eye hubs before opening the source" ); } for hub in stale_hubs { hub.shutdown(); } let lease = self.capture_power.acquire().await; let stream = video::eye_ball_with_request( dev, key.source_id, max_bitrate_kbit, key.requested_width, key.requested_height, key.requested_fps, ) .await .map_err(|e| Status::internal(format!("{e:#}")))?; let hub = EyeHub::spawn(stream, lease); let mut hubs = self.eye_hubs.lock().await; if let Some(existing) = hubs.get(&key) && existing.running.load(Ordering::Relaxed) { hub.shutdown(); return Ok(Arc::clone(existing)); } hubs.insert(key, Arc::clone(&hub)); Ok(hub) } #[cfg(test)] async fn eye_hub_count(&self) -> usize { self.eye_hubs.lock().await.len() } async fn paste_text_reply( &self, req: Request, ) -> Result, Status> { let req = req.into_inner(); let text = paste::decrypt(&req).map_err(|e| Status::unauthenticated(format!("{e}")))?; if let Err(e) = paste::type_text(&self.kb, &hid_endpoint(0), &text).await { return Ok(Response::new(PasteReply { ok: false, error: format!("{e}"), })); } Ok(Response::new(PasteReply { ok: true, error: String::new(), })) } async fn reset_usb_reply(&self) -> Result, Status> { #[cfg(not(coverage))] info!("🔴 explicit ResetUsb() called"); match self.gadget.recover_enumeration() { Ok(_) => { if let Err(e) = self.reopen_hid().await { #[cfg(not(coverage))] error!("💥 reopen HID failed: {e:#}"); return Err(Status::internal(e.to_string())); } if let Err(e) = restart_uvc_helper() { #[cfg(not(coverage))] error!("💥 restart UVC helper failed: {e:#}"); return Err(Status::internal(e.to_string())); } match UsbGadget::current_controller_state() { Ok((ctrl, state)) if UsbGadget::host_enumerated_state(&state) => { #[cfg(not(coverage))] info!( "✅ USB host enumerated gadget after recovery ctrl={ctrl} state={state}" ); } Ok((ctrl, state)) => { let message = format!( "USB gadget recovery ran, but UDC {ctrl} is still {state}; the controlled host has not enumerated the relay HID/audio/video gadget" ); #[cfg(not(coverage))] warn!("⚠️ {message}"); return Err(Status::failed_precondition(message)); } Err(err) => { let message = format!( "USB gadget recovery ran, but the relay cannot read UDC state: {err:#}" ); #[cfg(not(coverage))] warn!("⚠️ {message}"); return Err(Status::failed_precondition(message)); } } Ok(Response::new(ResetUsbReply { ok: true })) } Err(e) => { #[cfg(not(coverage))] error!("💥 USB recovery failed: {e:#}"); let message = format!("{e:#}"); if message.contains("still not attached") || message.contains("not attached") { Err(Status::failed_precondition(message)) } else { Err(Status::internal(message)) } } } } async fn get_capture_power_reply(&self) -> Result, Status> { let state = self .capture_power .snapshot() .await .map_err(|e| Status::internal(format!("{e:#}")))?; Ok(Response::new( self.with_detected_capture_devices(state).await, )) } async fn set_capture_power_reply( &self, req: Request, ) -> Result, Status> { let req = req.into_inner(); let result = match CapturePowerCommand::try_from(req.command) .unwrap_or(CapturePowerCommand::Unspecified) { CapturePowerCommand::Auto => self.capture_power.set_auto().await, CapturePowerCommand::ForceOn => self.capture_power.set_manual(true).await, CapturePowerCommand::ForceOff => self.capture_power.set_manual(false).await, CapturePowerCommand::Unspecified => self.capture_power.set_manual(req.enabled).await, }; let state = result.map_err(|e| Status::internal(format!("{e:#}")))?; Ok(Response::new( self.with_detected_capture_devices(state).await, )) } } fn restart_uvc_helper() -> anyhow::Result<()> { if std::env::var("LESAVKA_GADGET_SYSFS_ROOT").is_ok() || std::env::var("LESAVKA_GADGET_CONFIGFS_ROOT").is_ok() { return Ok(()); } run_systemctl(&["reset-failed", "lesavka-uvc.service"])?; match run_systemctl(&["restart", "lesavka-uvc.service"]) { Ok(()) => Ok(()), Err(err) if uvc_helper_restart_was_dependency_refused(&err.to_string()) => { warn!( "lesavka-uvc.service refused a direct restart because it is dependency-managed; USB reset already cycled the gadget" ); Ok(()) } Err(err) => Err(err), } } fn run_systemctl(args: &[&str]) -> anyhow::Result<()> { let output = Command::new("systemctl") .args(args) .output() .with_context(|| format!("running systemctl {}", args.join(" ")))?; if output.status.success() { return Ok(()); } let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); anyhow::bail!( "systemctl {} failed: {}{}", args.join(" "), if stderr.is_empty() { stdout.as_str() } else { stderr.as_str() }, if stderr.is_empty() || stdout.is_empty() || stderr == stdout { "" } else { " / also see stdout" } ); } fn uvc_helper_restart_was_dependency_refused(message: &str) -> bool { message.contains("Operation refused") || message.contains("may be requested by dependency only") } impl EyeHub { fn spawn(mut stream: S, lease: lesavka_server::capture_power::CapturePowerLease) -> Arc where S: Stream> + Unpin + Send + 'static, { let (tx, _) = broadcast::channel(32); let running = Arc::new(AtomicBool::new(true)); let subscribers = Arc::new(AtomicUsize::new(0)); let tx_for_task = tx.clone(); let running_for_task = Arc::clone(&running); let subscribers_for_task = Arc::clone(&subscribers); let task = tokio::spawn(async move { let _lease = lease; let mut idle_ticks = 0_u32; while running_for_task.load(Ordering::Relaxed) { match stream.next().await { Some(Ok(pkt)) => { let _ = tx_for_task.send(pkt); if subscribers_for_task.load(Ordering::Relaxed) == 0 { idle_ticks = idle_ticks.saturating_add(1); if idle_ticks >= 60 { break; } } else { idle_ticks = 0; } } Some(Err(err)) => { warn!(?err, "shared eye hub stream error"); break; } None => break, } } running_for_task.store(false, Ordering::Relaxed); }); let hub = Arc::new(Self { tx: tx.clone(), running: Arc::clone(&running), subscribers: Arc::clone(&subscribers), abort: task.abort_handle(), }); hub } fn shutdown(&self) { if self.running.swap(false, Ordering::AcqRel) { self.abort.abort(); } } } fn take_conflicting_eye_hubs( hubs: &mut HashMap>, key: EyeHubKey, ) -> Vec> { let stale_keys: Vec<_> = hubs .iter() .filter_map(|(existing_key, hub)| { let running = hub.running.load(Ordering::Relaxed); let conflicting_source = existing_key.source_id == key.source_id && *existing_key != key; if !running || conflicting_source { Some(*existing_key) } else { None } }) .collect(); stale_keys .into_iter() .filter_map(|existing_key| hubs.remove(&existing_key)) .collect() } /*──────────────── gRPC service ─────────────*/ #[cfg(not(coverage))] #[tonic::async_trait] impl Relay for Handler { /* existing streams ─ unchanged, except: no more auto-reset */ type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; type CaptureVideoStream = VideoStream; type CaptureAudioStream = AudioStream; type StreamMicrophoneStream = ReceiverStream>; type StreamCameraStream = ReceiverStream>; async fn stream_keyboard( &self, req: Request>, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); info!(rpc_id, "⌨️ stream_keyboard opened"); let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); let ms = self.ms.clone(); let kb_path = hid_endpoint(0); let ms_path = hid_endpoint(1); let gadget = self.gadget.clone(); let did_cycle = self.did_cycle.clone(); let session_lease = self.capture_power.acquire_session().await; let report_delay = live_keyboard_report_delay(); tokio::spawn(async move { let _session_lease = session_lease; let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if let Err(e) = runtime_support::write_hid_report(&kb, &kb_path, &pkt.data).await { if e.raw_os_error() == Some(libc::EAGAIN) { debug!(rpc_id, "⌨️ write would block (dropped)"); } else { warn!(rpc_id, "⌨️ write failed: {e} (dropped)"); runtime_support::recover_hid_if_needed( &e, gadget.clone(), kb.clone(), ms.clone(), kb_path.clone(), ms_path.clone(), did_cycle.clone(), ) .await; } } tx.send(Ok(pkt)).await.ok(); if !report_delay.is_zero() { tokio::time::sleep(report_delay).await; } } info!(rpc_id, "⌨️ stream_keyboard closed"); Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn stream_mouse( &self, req: Request>, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); info!(rpc_id, "🖱️ stream_mouse opened"); let (tx, rx) = tokio::sync::mpsc::channel(1024); let ms = self.ms.clone(); let kb = self.kb.clone(); let kb_path = hid_endpoint(0); let ms_path = hid_endpoint(1); let gadget = self.gadget.clone(); let did_cycle = self.did_cycle.clone(); let session_lease = self.capture_power.acquire_session().await; tokio::spawn(async move { let _session_lease = session_lease; let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if let Err(e) = runtime_support::write_hid_report(&ms, &ms_path, &pkt.data).await { if e.raw_os_error() == Some(libc::EAGAIN) { debug!(rpc_id, "🖱️ write would block (dropped)"); } else { warn!(rpc_id, "🖱️ write failed: {e} (dropped)"); runtime_support::recover_hid_if_needed( &e, gadget.clone(), kb.clone(), ms.clone(), kb_path.clone(), ms_path.clone(), did_cycle.clone(), ) .await; } } tx.send(Ok(pkt)).await.ok(); } info!(rpc_id, "🖱️ stream_mouse closed"); Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } /// Accept synthetic upstream microphone packets without ALSA hardware. async fn stream_microphone( &self, req: Request>, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); info!(rpc_id, "🎤 stream_microphone opened"); // 1 ─ build once, early let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); info!(%uac_dev, "🎤 stream_microphone using UAC sink"); let mut sink = runtime_support::open_voice_with_retry(&uac_dev) .await .map_err(|e| Status::internal(format!("{e:#}")))?; // 2 ─ dummy outbound stream (same trick as before) let (tx, rx) = tokio::sync::mpsc::channel(1); // 3 ─ drive the sink in a background task tokio::spawn(async move { let mut inbound = req.into_inner(); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); while let Some(pkt) = inbound.next().await.transpose()? { let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 5 || n % 3_000 == 0 { tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); } sink.push(&pkt); } sink.finish(); // flush on EOS let _ = tx.send(Ok(Empty {})).await; info!(rpc_id, "🎤 stream_microphone closed"); Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } /// Accept synthetic upstream webcam packets without UVC/HDMI hardware. async fn stream_camera( &self, req: Request>, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); let cfg = camera::current_camera_config(); info!( rpc_id, output = cfg.output.as_str(), codec = cfg.codec.as_str(), width = cfg.width, height = cfg.height, fps = cfg.fps, hdmi = cfg.hdmi.as_ref().map(|h| h.name.as_str()).unwrap_or("none"), "🎥 stream_camera output selected" ); let (session_id, relay) = self.camera_rt.activate(&cfg).await?; let camera_rt = self.camera_rt.clone(); info!(rpc_id, session_id, "🎥 stream_camera opened"); // dummy outbound (same pattern as other streams) let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if !camera_rt.is_active(session_id) { info!(rpc_id, session_id, "🎥 stream_camera session superseded"); break; } relay.feed(pkt); // ← all logging inside video.rs } tx.send(Ok(Empty {})).await.ok(); info!(rpc_id, session_id, "🎥 stream_camera closed"); Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn capture_video( &self, req: Request, ) -> Result, Status> { self.capture_video_reply(req.into_inner()).await } async fn capture_audio( &self, req: Request, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); // Only one speaker stream for now; both 0/1 → same ALSA dev. let _id = req.into_inner().id; // Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging). let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); info!(rpc_id, %dev, "🔊 capture_audio opened"); let s = runtime_support::open_ear_with_retry(&dev, 0) .await .map_err(|e| remote_audio_status(format!("{e:#}")))?; Ok(Response::new(Box::pin(s))) } async fn paste_text(&self, req: Request) -> Result, Status> { self.paste_text_reply(req).await } /*────────────── USB-reset RPC ────────────*/ async fn reset_usb(&self, _req: Request) -> Result, Status> { self.reset_usb_reply().await } async fn get_capture_power( &self, _req: Request, ) -> Result, Status> { self.get_capture_power_reply().await } async fn set_capture_power( &self, req: Request, ) -> Result, Status> { self.set_capture_power_reply(req).await } } fn remote_audio_status(message: String) -> Status { if message.contains("remote USB gadget is not attached") { Status::unavailable(message) } else { Status::internal(message) } } #[cfg(coverage)] #[tonic::async_trait] impl Relay for Handler { type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; type CaptureVideoStream = VideoStream; type CaptureAudioStream = AudioStream; type StreamMicrophoneStream = ReceiverStream>; type StreamCameraStream = ReceiverStream>; async fn stream_keyboard( &self, req: Request>, ) -> Result, Status> { let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); let report_delay = live_keyboard_report_delay(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { let _ = runtime_support::write_hid_report(&kb, &hid_endpoint(0), &pkt.data).await; tx.send(Ok(pkt)).await.ok(); if !report_delay.is_zero() { tokio::time::sleep(report_delay).await; } } Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn stream_mouse( &self, req: Request>, ) -> Result, Status> { let (tx, rx) = tokio::sync::mpsc::channel(32); let ms = self.ms.clone(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { let _ = runtime_support::write_hid_report(&ms, &hid_endpoint(1), &pkt.data).await; tx.send(Ok(pkt)).await.ok(); } Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn stream_microphone( &self, req: Request>, ) -> Result, Status> { let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); let mut sink = runtime_support::open_voice_with_retry(&uac_dev) .await .map_err(|e| Status::internal(format!("{e:#}")))?; let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let mut inbound = req.into_inner(); while let Some(pkt) = inbound.next().await.transpose()? { sink.push(&pkt); } sink.finish(); let _ = tx.send(Ok(Empty {})).await; Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn stream_camera( &self, req: Request>, ) -> Result, Status> { let cfg = camera::current_camera_config(); let (session_id, relay) = self.camera_rt.activate(&cfg).await?; let camera_rt = self.camera_rt.clone(); let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if !camera_rt.is_active(session_id) { break; } relay.feed(pkt); } tx.send(Ok(Empty {})).await.ok(); Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn capture_video( &self, req: Request, ) -> Result, Status> { self.capture_video_reply(req.into_inner()).await } async fn capture_audio( &self, _req: Request, ) -> Result, Status> { Err(Status::internal( "audio capture unavailable in coverage harness", )) } async fn paste_text(&self, req: Request) -> Result, Status> { self.paste_text_reply(req).await } async fn reset_usb(&self, _req: Request) -> Result, Status> { self.reset_usb_reply().await } async fn get_capture_power( &self, _req: Request, ) -> Result, Status> { self.get_capture_power_reply().await } async fn set_capture_power( &self, req: Request, ) -> Result, Status> { self.set_capture_power_reply(req).await } } /*──────────────── main ───────────────────────*/ #[cfg(not(coverage))] #[tokio::main(worker_threads = 4)] async fn main() -> anyhow::Result<()> { let _guard = init_tracing()?; info!("🚀 {} v{} starting up", PKG_NAME, lesavka_server::VERSION); panic::set_hook(Box::new(|p| { let bt = Backtrace::force_capture(); error!("💥 panic: {p}\n{bt}"); })); let gadget = UsbGadget::new("lesavka"); if std::env::var("LESAVKA_DISABLE_UVC").is_err() { if std::env::var("LESAVKA_UVC_EXTERNAL").is_ok() { info!("📷 UVC control helper external; not spawning"); } else { let bin = uvc_runtime::uvc_ctrl_bin(); tokio::spawn(uvc_runtime::supervise_uvc_control(bin)); } } else { info!("📷 UVC disabled (LESAVKA_DISABLE_UVC set)"); } let handler = Handler::new(gadget.clone()).await?; info!("🌐 lesavka-server listening on 0.0.0.0:50051"); Server::builder() .tcp_nodelay(true) .max_frame_size(Some(2 * 1024 * 1024)) .add_service(RelayServer::new(handler)) .add_service(HandshakeSvc::server()) .add_service(ReflBuilder::configure().build_v1().unwrap()) .serve(([0, 0, 0, 0], 50051).into()) .await?; Ok(()) } #[cfg(coverage)] #[tokio::main(worker_threads = 2)] async fn main() -> anyhow::Result<()> { let gadget = UsbGadget::new("lesavka"); let _handler = Handler::new(gadget).await?; Err(anyhow::anyhow!("coverage mode skips live gRPC serve loop")) }