lesavka/server/src/main.rs

953 lines
34 KiB
Rust
Raw Normal View History

// lesavka-server - gadget cycle guarded by env
2025-06-27 06:56:08 -05:00
// server/src/main.rs
#[allow(clippy::useless_attribute)]
#[forbid(unsafe_code)]
2026-04-20 08:38:26 -03:00
use anyhow::Context;
2025-06-27 06:56:08 -05:00
use futures_util::{Stream, StreamExt};
use std::collections::HashMap;
use std::collections::HashSet;
use std::os::unix::fs::FileTypeExt;
2026-04-20 08:38:26 -03:00
use std::process::Command;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc, time::Duration};
use tokio::sync::{Mutex, broadcast};
2025-06-27 06:56:08 -05:00
use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
2025-11-30 16:16:03 -03:00
use tonic::{Request, Response, Status};
use tonic_reflection::server::Builder as ReflBuilder;
2026-04-10 14:05:04 -03:00
use tracing::{debug, error, info, warn};
2025-06-21 05:21:57 -05:00
2025-06-23 07:18:26 -05:00
use lesavka_common::lesavka::{
AudioPacket, CapturePowerCommand, CapturePowerState, Empty, KeyboardReport, MonitorRequest,
MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, VideoPacket,
relay_server::{Relay, RelayServer},
2025-06-01 16:04:00 -05:00
};
2025-06-02 20:24:00 -05:00
2026-04-10 14:05:04 -03:00
use lesavka_server::{
camera, camera_runtime::CameraRuntime, capture_power::CapturePowerManager, gadget::UsbGadget,
handshake::HandshakeSvc, paste, runtime_support, runtime_support::init_tracing, uvc_runtime,
video,
2026-04-10 14:05:04 -03:00
};
2025-06-27 06:56:08 -05:00
/*──────────────── constants ────────────────*/
2025-07-04 10:17:01 -05:00
const PKG_NAME: &str = env!("CARGO_PKG_NAME");
2026-01-06 11:48:36 -03:00
type VideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send>>;
type AudioStream = Pin<Box<dyn Stream<Item = Result<AudioPacket, Status>> + Send>>;
fn hid_endpoint(index: u8) -> String {
std::env::var("LESAVKA_HID_DIR")
.map(|dir| format!("{dir}/hidg{index}"))
.unwrap_or_else(|_| format!("/dev/hidg{index}"))
}
fn live_keyboard_report_delay() -> Duration {
std::env::var("LESAVKA_LIVE_KEYBOARD_REPORT_DELAY_MS")
.ok()
.and_then(|value| value.parse::<u64>().ok())
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(8))
}
2025-06-27 06:56:08 -05:00
/*──────────────── Handler ───────────────────*/
2025-06-02 20:41:36 -05:00
struct Handler {
kb: Arc<Mutex<Option<tokio::fs::File>>>,
ms: Arc<Mutex<Option<tokio::fs::File>>>,
2025-06-25 07:46:50 -05:00
gadget: UsbGadget,
2026-01-06 04:38:41 -03:00
did_cycle: Arc<AtomicBool>,
camera_rt: Arc<CameraRuntime>,
capture_power: CapturePowerManager,
eye_hubs: Arc<Mutex<HashMap<EyeHubKey, Arc<EyeHub>>>>,
}
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
struct EyeHubKey {
source_id: u32,
requested_width: u32,
requested_height: u32,
requested_fps: u32,
}
struct EyeHub {
tx: broadcast::Sender<VideoPacket>,
running: Arc<AtomicBool>,
subscribers: Arc<AtomicUsize>,
abort: tokio::task::AbortHandle,
}
2025-06-24 23:48:06 -05:00
impl Handler {
2025-06-27 06:56:08 -05:00
async fn new(gadget: UsbGadget) -> anyhow::Result<Self> {
#[cfg(not(coverage))]
2026-04-10 14:05:04 -03:00
if runtime_support::allow_gadget_cycle() {
info!("🛠️ Initial USB recovery…");
match UsbGadget::current_controller_state() {
Ok((ctrl, state)) if !UsbGadget::host_enumerated_state(&state) => {
warn!("⚠️ UDC {ctrl} is {state}; forcing gadget recovery before opening HID");
if let Err(error) = gadget.recover_enumeration() {
warn!("⚠️ initial USB recovery did not enumerate the host: {error:#}");
}
}
Ok(_) => {
let _ = gadget.cycle(); // ignore failure - may boot without host
}
Err(error) => {
warn!("⚠️ UDC state unavailable during startup: {error:#}");
let _ = gadget.cycle(); // preserve the old best-effort startup path
}
}
2025-06-26 21:49:29 -05:00
}
#[cfg(not(coverage))]
{
if !runtime_support::allow_gadget_cycle() {
info!(
"🔒 gadget cycle disabled at startup (set LESAVKA_ALLOW_GADGET_CYCLE=1 to enable)"
);
}
info!("🛠️ opening HID endpoints …");
}
let kb_path = hid_endpoint(0);
let ms_path = hid_endpoint(1);
let kb = runtime_support::open_hid_if_ready(&kb_path).await?;
let ms = runtime_support::open_hid_if_ready(&ms_path).await?;
#[cfg(not(coverage))]
if kb.is_some() && ms.is_some() {
info!("✅ HID endpoints ready");
} else {
warn!("⌛ HID endpoints are not ready; relay will keep running and open them lazily");
}
2025-06-26 21:49:29 -05:00
Ok(Self {
kb: Arc::new(Mutex::new(kb)),
ms: Arc::new(Mutex::new(ms)),
gadget,
2026-01-06 04:38:41 -03:00
did_cycle: Arc::new(AtomicBool::new(false)),
camera_rt: Arc::new(CameraRuntime::new()),
capture_power: CapturePowerManager::new(),
eye_hubs: Arc::new(Mutex::new(HashMap::new())),
2025-06-26 21:49:29 -05:00
})
}
2025-07-05 12:45:08 -05:00
async fn reopen_hid(&self) -> anyhow::Result<()> {
let kb_new = runtime_support::open_hid_if_ready(&hid_endpoint(0)).await?;
let ms_new = runtime_support::open_hid_if_ready(&hid_endpoint(1)).await?;
2025-07-05 12:45:08 -05:00
*self.kb.lock().await = kb_new;
*self.ms.lock().await = ms_new;
Ok(())
}
2026-04-20 01:41:57 -03:00
fn detected_capture_devices_from_symlinks() -> u32 {
["/dev/lesavka_l_eye", "/dev/lesavka_r_eye"]
.into_iter()
.filter(|path| {
std::fs::metadata(path)
.ok()
.is_some_and(|metadata| metadata.file_type().is_char_device())
})
.count() as u32
}
2026-04-20 01:41:57 -03:00
fn detected_capture_devices_from_udev() -> u32 {
let Ok(mut enumerator) = udev::Enumerator::new() else {
return 0;
};
let _ = enumerator.match_subsystem("video4linux");
let Ok(devices) = enumerator.scan_devices() else {
return 0;
};
devices
.filter(|device| {
device
.attribute_value("index")
.and_then(|value| value.to_str())
== Some("0")
&& device
.property_value("ID_VENDOR_ID")
.and_then(|value| value.to_str())
== Some("07ca")
&& device
.property_value("ID_MODEL_ID")
.and_then(|value| value.to_str())
== Some("3311")
})
.count()
.min(2) as u32
}
async fn active_eye_source_count(&self) -> u32 {
self.eye_hubs
.lock()
.await
.iter()
.filter_map(|(key, hub)| hub.running.load(Ordering::Relaxed).then_some(key.source_id))
2026-04-20 01:41:57 -03:00
.collect::<HashSet<_>>()
.len()
.min(2) as u32
}
async fn with_detected_capture_devices(
&self,
mut state: CapturePowerState,
) -> CapturePowerState {
2026-04-20 01:41:57 -03:00
state.detected_devices = Self::detected_capture_devices_from_udev()
.max(Self::detected_capture_devices_from_symlinks())
.max(self.active_eye_source_count().await);
state
}
async fn capture_video_reply(
&self,
req: MonitorRequest,
) -> Result<Response<VideoStream>, Status> {
let id = req.id;
if id > 1 {
return Err(Status::invalid_argument("monitor id must be 0 or 1"));
}
let source_id = req.source_id.unwrap_or(id);
if source_id > 1 {
return Err(Status::invalid_argument("source id must be 0 or 1"));
}
let dev = match source_id {
0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("source id must be 0 or 1")),
};
#[cfg(not(coverage))]
{
let rpc_id = runtime_support::next_stream_id();
info!(
rpc_id,
id,
source_id,
max_bitrate = req.max_bitrate,
requested_width = req.requested_width,
requested_height = req.requested_height,
requested_fps = req.requested_fps,
"🎥 capture_video opened"
);
debug!(rpc_id, "🎥 streaming {dev}");
}
let hub = self
.eye_hub(
dev,
EyeHubKey {
source_id,
requested_width: req.requested_width,
requested_height: req.requested_height,
requested_fps: req.requested_fps,
},
req.max_bitrate,
)
.await?;
let mut hub_rx = hub.tx.subscribe();
hub.subscribers.fetch_add(1, Ordering::AcqRel);
let subscribers = Arc::clone(&hub.subscribers);
let hub_for_task = Arc::clone(&hub);
let (tx, rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
loop {
match hub_rx.recv().await {
Ok(mut pkt) => {
pkt.id = id;
if tx.send(Ok(pkt)).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
if subscribers.fetch_sub(1, Ordering::AcqRel) == 1 {
hub_for_task.shutdown();
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
async fn eye_hub(
&self,
dev: &str,
key: EyeHubKey,
max_bitrate_kbit: u32,
) -> Result<Arc<EyeHub>, Status> {
let stale_hubs = {
let mut hubs = self.eye_hubs.lock().await;
if let Some(hub) = hubs.get(&key)
&& hub.running.load(Ordering::Relaxed)
{
return Ok(Arc::clone(hub));
}
take_conflicting_eye_hubs(&mut hubs, key)
};
if !stale_hubs.is_empty() {
info!(
source_id = key.source_id,
requested_width = key.requested_width,
requested_height = key.requested_height,
requested_fps = key.requested_fps,
stale_hubs = stale_hubs.len(),
"🎥 replacing stale/conflicting eye hubs before opening the source"
);
}
for hub in stale_hubs {
hub.shutdown();
}
let lease = self.capture_power.acquire().await;
let stream = video::eye_ball_with_request(
dev,
key.source_id,
max_bitrate_kbit,
key.requested_width,
key.requested_height,
key.requested_fps,
)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
let hub = EyeHub::spawn(stream, lease);
let mut hubs = self.eye_hubs.lock().await;
if let Some(existing) = hubs.get(&key)
&& existing.running.load(Ordering::Relaxed)
{
hub.shutdown();
return Ok(Arc::clone(existing));
}
hubs.insert(key, Arc::clone(&hub));
Ok(hub)
}
#[cfg(test)]
async fn eye_hub_count(&self) -> usize {
self.eye_hubs.lock().await.len()
}
async fn paste_text_reply(
&self,
req: Request<PasteRequest>,
) -> Result<Response<PasteReply>, Status> {
let req = req.into_inner();
let text = paste::decrypt(&req).map_err(|e| Status::unauthenticated(format!("{e}")))?;
if let Err(e) = paste::type_text(&self.kb, &hid_endpoint(0), &text).await {
return Ok(Response::new(PasteReply {
ok: false,
error: format!("{e}"),
}));
}
Ok(Response::new(PasteReply {
ok: true,
error: String::new(),
}))
}
async fn reset_usb_reply(&self) -> Result<Response<ResetUsbReply>, Status> {
#[cfg(not(coverage))]
info!("🔴 explicit ResetUsb() called");
match self.gadget.recover_enumeration() {
Ok(_) => {
if let Err(e) = self.reopen_hid().await {
#[cfg(not(coverage))]
error!("💥 reopen HID failed: {e:#}");
return Err(Status::internal(e.to_string()));
}
2026-04-20 08:38:26 -03:00
if let Err(e) = restart_uvc_helper() {
#[cfg(not(coverage))]
error!("💥 restart UVC helper failed: {e:#}");
return Err(Status::internal(e.to_string()));
}
2026-04-21 17:55:26 -03:00
match UsbGadget::current_controller_state() {
Ok((ctrl, state)) if UsbGadget::host_enumerated_state(&state) => {
2026-04-21 17:55:26 -03:00
#[cfg(not(coverage))]
info!(
"✅ USB host enumerated gadget after recovery ctrl={ctrl} state={state}"
);
}
Ok((ctrl, state)) => {
let message = format!(
"USB gadget recovery ran, but UDC {ctrl} is still {state}; the controlled host has not enumerated the relay HID/audio/video gadget"
);
#[cfg(not(coverage))]
warn!("⚠️ {message}");
return Err(Status::failed_precondition(message));
}
Err(err) => {
let message = format!(
"USB gadget recovery ran, but the relay cannot read UDC state: {err:#}"
);
#[cfg(not(coverage))]
warn!("⚠️ {message}");
return Err(Status::failed_precondition(message));
}
}
Ok(Response::new(ResetUsbReply { ok: true }))
}
Err(e) => {
#[cfg(not(coverage))]
error!("💥 USB recovery failed: {e:#}");
let message = format!("{e:#}");
if message.contains("still not attached") || message.contains("not attached") {
Err(Status::failed_precondition(message))
} else {
Err(Status::internal(message))
}
}
}
}
async fn get_capture_power_reply(&self) -> Result<Response<CapturePowerState>, Status> {
2026-04-20 01:41:57 -03:00
let state = self
.capture_power
.snapshot()
.await
2026-04-20 01:41:57 -03:00
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(
self.with_detected_capture_devices(state).await,
))
}
async fn set_capture_power_reply(
&self,
req: Request<SetCapturePowerRequest>,
) -> Result<Response<CapturePowerState>, Status> {
let req = req.into_inner();
let result = match CapturePowerCommand::try_from(req.command)
.unwrap_or(CapturePowerCommand::Unspecified)
{
CapturePowerCommand::Auto => self.capture_power.set_auto().await,
CapturePowerCommand::ForceOn => self.capture_power.set_manual(true).await,
CapturePowerCommand::ForceOff => self.capture_power.set_manual(false).await,
CapturePowerCommand::Unspecified => self.capture_power.set_manual(req.enabled).await,
};
2026-04-20 01:41:57 -03:00
let state = result.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(
self.with_detected_capture_devices(state).await,
))
}
}
2026-04-20 08:38:26 -03:00
fn restart_uvc_helper() -> anyhow::Result<()> {
if std::env::var("LESAVKA_GADGET_SYSFS_ROOT").is_ok()
|| std::env::var("LESAVKA_GADGET_CONFIGFS_ROOT").is_ok()
{
return Ok(());
}
run_systemctl(&["reset-failed", "lesavka-uvc.service"])?;
match run_systemctl(&["restart", "lesavka-uvc.service"]) {
Ok(()) => Ok(()),
Err(err) if uvc_helper_restart_was_dependency_refused(&err.to_string()) => {
warn!(
"lesavka-uvc.service refused a direct restart because it is dependency-managed; USB reset already cycled the gadget"
2026-04-20 08:38:26 -03:00
);
Ok(())
2026-04-20 08:38:26 -03:00
}
Err(err) => Err(err),
2026-04-20 08:38:26 -03:00
}
}
fn run_systemctl(args: &[&str]) -> anyhow::Result<()> {
let output = Command::new("systemctl")
.args(args)
.output()
.with_context(|| format!("running systemctl {}", args.join(" ")))?;
if output.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
anyhow::bail!(
"systemctl {} failed: {}{}",
args.join(" "),
if stderr.is_empty() {
stdout.as_str()
} else {
stderr.as_str()
},
if stderr.is_empty() || stdout.is_empty() || stderr == stdout {
""
} else {
" / also see stdout"
}
);
}
fn uvc_helper_restart_was_dependency_refused(message: &str) -> bool {
message.contains("Operation refused") || message.contains("may be requested by dependency only")
2026-04-20 08:38:26 -03:00
}
impl EyeHub {
fn spawn<S>(mut stream: S, lease: lesavka_server::capture_power::CapturePowerLease) -> Arc<Self>
where
S: Stream<Item = Result<VideoPacket, Status>> + Unpin + Send + 'static,
{
let (tx, _) = broadcast::channel(32);
let running = Arc::new(AtomicBool::new(true));
let subscribers = Arc::new(AtomicUsize::new(0));
let tx_for_task = tx.clone();
let running_for_task = Arc::clone(&running);
let subscribers_for_task = Arc::clone(&subscribers);
let task = tokio::spawn(async move {
let _lease = lease;
let mut idle_ticks = 0_u32;
while running_for_task.load(Ordering::Relaxed) {
match stream.next().await {
Some(Ok(pkt)) => {
let _ = tx_for_task.send(pkt);
if subscribers_for_task.load(Ordering::Relaxed) == 0 {
idle_ticks = idle_ticks.saturating_add(1);
if idle_ticks >= 60 {
break;
}
} else {
idle_ticks = 0;
}
}
Some(Err(err)) => {
warn!(?err, "shared eye hub stream error");
break;
}
None => break,
}
}
running_for_task.store(false, Ordering::Relaxed);
});
let hub = Arc::new(Self {
tx: tx.clone(),
running: Arc::clone(&running),
subscribers: Arc::clone(&subscribers),
abort: task.abort_handle(),
});
hub
}
fn shutdown(&self) {
if self.running.swap(false, Ordering::AcqRel) {
self.abort.abort();
}
}
}
fn take_conflicting_eye_hubs(
hubs: &mut HashMap<EyeHubKey, Arc<EyeHub>>,
key: EyeHubKey,
) -> Vec<Arc<EyeHub>> {
let stale_keys: Vec<_> = hubs
.iter()
.filter_map(|(existing_key, hub)| {
let running = hub.running.load(Ordering::Relaxed);
let conflicting_source =
existing_key.source_id == key.source_id && *existing_key != key;
if !running || conflicting_source {
Some(*existing_key)
} else {
None
}
})
.collect();
stale_keys
.into_iter()
.filter_map(|existing_key| hubs.remove(&existing_key))
.collect()
2025-06-01 16:04:00 -05:00
}
2025-06-01 13:31:22 -05:00
2025-06-27 06:56:08 -05:00
/*──────────────── gRPC service ─────────────*/
#[cfg(not(coverage))]
2025-06-02 20:41:36 -05:00
#[tonic::async_trait]
2025-06-16 22:14:52 -05:00
impl Relay for Handler {
2025-06-28 15:45:11 -05:00
/* existing streams ─ unchanged, except: no more auto-reset */
2025-11-30 16:16:03 -03:00
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = VideoStream;
type CaptureAudioStream = AudioStream;
2025-06-30 19:35:38 -05:00
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
2025-07-03 08:19:59 -05:00
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
2025-06-01 13:31:22 -05:00
2025-06-17 08:17:23 -05:00
async fn stream_keyboard(
2025-06-01 13:31:22 -05:00
&self,
2025-06-17 08:17:23 -05:00
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
2026-04-10 14:05:04 -03:00
let rpc_id = runtime_support::next_stream_id();
info!(rpc_id, "⌨️ stream_keyboard opened");
2025-06-27 06:56:08 -05:00
let (tx, rx) = tokio::sync::mpsc::channel(32);
2025-06-12 01:57:08 -05:00
let kb = self.kb.clone();
2026-01-06 04:38:41 -03:00
let ms = self.ms.clone();
let kb_path = hid_endpoint(0);
let ms_path = hid_endpoint(1);
2026-01-06 04:38:41 -03:00
let gadget = self.gadget.clone();
let did_cycle = self.did_cycle.clone();
let session_lease = self.capture_power.acquire_session().await;
let report_delay = live_keyboard_report_delay();
2025-06-02 20:41:36 -05:00
tokio::spawn(async move {
let _session_lease = session_lease;
2025-06-17 08:17:23 -05:00
let mut s = req.into_inner();
2025-06-17 20:54:31 -05:00
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = runtime_support::write_hid_report(&kb, &kb_path, &pkt.data).await {
if e.raw_os_error() == Some(libc::EAGAIN) {
debug!(rpc_id, "⌨️ write would block (dropped)");
} else {
warn!(rpc_id, "⌨️ write failed: {e} (dropped)");
2026-04-10 14:05:04 -03:00
runtime_support::recover_hid_if_needed(
&e,
gadget.clone(),
kb.clone(),
ms.clone(),
kb_path.clone(),
ms_path.clone(),
did_cycle.clone(),
)
.await;
}
2025-06-25 15:13:49 -05:00
}
2025-06-27 06:56:08 -05:00
tx.send(Ok(pkt)).await.ok();
if !report_delay.is_zero() {
tokio::time::sleep(report_delay).await;
}
2025-06-17 08:17:23 -05:00
}
info!(rpc_id, "⌨️ stream_keyboard closed");
2025-06-17 08:17:23 -05:00
Ok::<(), Status>(())
});
2025-06-27 06:56:08 -05:00
2025-06-26 15:12:23 -05:00
Ok(Response::new(ReceiverStream::new(rx)))
2025-06-17 08:17:23 -05:00
}
2025-06-16 19:19:14 -05:00
2025-06-17 08:17:23 -05:00
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
2026-04-10 14:05:04 -03:00
let rpc_id = runtime_support::next_stream_id();
info!(rpc_id, "🖱️ stream_mouse opened");
2025-06-28 15:45:11 -05:00
let (tx, rx) = tokio::sync::mpsc::channel(1024);
2025-06-17 08:17:23 -05:00
let ms = self.ms.clone();
2026-01-06 04:38:41 -03:00
let kb = self.kb.clone();
let kb_path = hid_endpoint(0);
let ms_path = hid_endpoint(1);
2026-01-06 04:38:41 -03:00
let gadget = self.gadget.clone();
let did_cycle = self.did_cycle.clone();
let session_lease = self.capture_power.acquire_session().await;
2025-06-16 19:19:14 -05:00
2025-06-17 08:17:23 -05:00
tokio::spawn(async move {
let _session_lease = session_lease;
2025-06-17 08:17:23 -05:00
let mut s = req.into_inner();
2025-06-17 20:54:31 -05:00
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = runtime_support::write_hid_report(&ms, &ms_path, &pkt.data).await {
if e.raw_os_error() == Some(libc::EAGAIN) {
debug!(rpc_id, "🖱️ write would block (dropped)");
} else {
warn!(rpc_id, "🖱️ write failed: {e} (dropped)");
2026-04-10 14:05:04 -03:00
runtime_support::recover_hid_if_needed(
&e,
gadget.clone(),
kb.clone(),
ms.clone(),
kb_path.clone(),
ms_path.clone(),
did_cycle.clone(),
)
.await;
}
2025-06-26 20:38:55 -05:00
}
2025-06-27 06:56:08 -05:00
tx.send(Ok(pkt)).await.ok();
2025-06-01 13:31:22 -05:00
}
info!(rpc_id, "🖱️ stream_mouse closed");
2025-06-17 08:17:23 -05:00
Ok::<(), Status>(())
2025-06-01 13:31:22 -05:00
});
2025-06-27 06:56:08 -05:00
2025-06-26 15:12:23 -05:00
Ok(Response::new(ReceiverStream::new(rx)))
2025-06-01 13:31:22 -05:00
}
2025-06-21 05:21:57 -05:00
2025-06-30 19:35:38 -05:00
async fn stream_microphone(
&self,
req: Request<tonic::Streaming<AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
2026-04-10 14:05:04 -03:00
let rpc_id = runtime_support::next_stream_id();
info!(rpc_id, "🎤 stream_microphone opened");
2025-07-01 17:30:34 -05:00
// 1 ─ build once, early
2025-12-01 03:34:01 -03:00
let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
info!(%uac_dev, "🎤 stream_microphone using UAC sink");
2026-04-10 14:05:04 -03:00
let mut sink = runtime_support::open_voice_with_retry(&uac_dev)
2025-11-30 16:16:03 -03:00
.await
2025-06-30 19:35:38 -05:00
.map_err(|e| Status::internal(format!("{e:#}")))?;
2025-07-01 17:30:34 -05:00
// 2 ─ dummy outbound stream (same trick as before)
2025-06-30 19:35:38 -05:00
let (tx, rx) = tokio::sync::mpsc::channel(1);
2025-07-01 12:11:52 -05:00
2025-07-01 17:30:34 -05:00
// 3 ─ drive the sink in a background task
2025-06-30 19:35:38 -05:00
tokio::spawn(async move {
let mut inbound = req.into_inner();
2025-11-30 16:16:03 -03:00
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
2025-07-01 17:30:34 -05:00
2025-06-30 19:35:38 -05:00
while let Some(pkt) = inbound.next().await.transpose()? {
2025-07-01 10:23:51 -05:00
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
tracing::trace!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len());
2025-06-30 19:35:38 -05:00
}
2025-07-01 17:30:34 -05:00
sink.push(&pkt);
2025-06-30 19:35:38 -05:00
}
2025-11-30 16:16:03 -03:00
sink.finish(); // flush on EOS
2025-06-30 19:35:38 -05:00
let _ = tx.send(Ok(Empty {})).await;
info!(rpc_id, "🎤 stream_microphone closed");
2025-07-01 17:30:34 -05:00
Ok::<(), Status>(())
2025-06-30 19:35:38 -05:00
});
2025-07-01 17:30:34 -05:00
2025-06-30 19:35:38 -05:00
Ok(Response::new(ReceiverStream::new(rx)))
}
2025-07-03 08:19:59 -05:00
async fn stream_camera(
&self,
2025-07-03 09:24:57 -05:00
req: Request<tonic::Streaming<VideoPacket>>,
2025-07-03 08:19:59 -05:00
) -> Result<Response<Self::StreamCameraStream>, Status> {
2026-04-10 14:05:04 -03:00
let rpc_id = runtime_support::next_stream_id();
2026-01-28 17:52:00 -03:00
let cfg = camera::current_camera_config();
info!(
rpc_id,
2026-01-28 17:52:00 -03:00
output = cfg.output.as_str(),
codec = cfg.codec.as_str(),
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
hdmi = cfg.hdmi.as_ref().map(|h| h.name.as_str()).unwrap_or("none"),
"🎥 stream_camera output selected"
);
let (session_id, relay) = self.camera_rt.activate(&cfg).await?;
let camera_rt = self.camera_rt.clone();
info!(rpc_id, session_id, "🎥 stream_camera opened");
2025-11-30 16:16:03 -03:00
2025-07-03 09:24:57 -05:00
// dummy outbound (same pattern as other streams)
2025-07-03 08:19:59 -05:00
let (tx, rx) = tokio::sync::mpsc::channel(1);
2025-11-30 16:16:03 -03:00
2025-07-03 09:24:57 -05:00
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if !camera_rt.is_active(session_id) {
info!(rpc_id, session_id, "🎥 stream_camera session superseded");
break;
}
2025-11-30 16:16:03 -03:00
relay.feed(pkt); // ← all logging inside video.rs
2025-07-03 09:24:57 -05:00
}
tx.send(Ok(Empty {})).await.ok();
info!(rpc_id, session_id, "🎥 stream_camera closed");
2025-07-03 09:24:57 -05:00
Ok::<(), Status>(())
});
2025-11-30 16:16:03 -03:00
2025-07-03 08:19:59 -05:00
Ok(Response::new(ReceiverStream::new(rx)))
}
2025-06-21 05:21:57 -05:00
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
self.capture_video_reply(req.into_inner()).await
2025-06-27 06:56:08 -05:00
}
2025-06-21 05:21:57 -05:00
2025-06-29 03:46:34 -05:00
async fn capture_audio(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
2026-04-10 14:05:04 -03:00
let rpc_id = runtime_support::next_stream_id();
2025-06-29 22:57:54 -05:00
// Only one speaker stream for now; both 0/1 → same ALSA dev.
2025-11-30 16:16:03 -03:00
let _id = req.into_inner().id;
2025-06-29 22:57:54 -05:00
// Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging).
2025-11-30 16:16:03 -03:00
let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
info!(rpc_id, %dev, "🔊 capture_audio opened");
2025-06-29 22:57:54 -05:00
let s = runtime_support::open_ear_with_retry(&dev, 0)
2025-06-29 22:57:54 -05:00
.await
2026-04-21 13:31:49 -03:00
.map_err(|e| remote_audio_status(format!("{e:#}")))?;
2025-06-29 22:57:54 -05:00
Ok(Response::new(Box::pin(s)))
2025-06-29 03:46:34 -05:00
}
async fn paste_text(&self, req: Request<PasteRequest>) -> Result<Response<PasteReply>, Status> {
self.paste_text_reply(req).await
}
2025-07-01 12:11:52 -05:00
/*────────────── USB-reset RPC ────────────*/
2025-11-30 16:16:03 -03:00
async fn reset_usb(&self, _req: Request<Empty>) -> Result<Response<ResetUsbReply>, Status> {
self.reset_usb_reply().await
}
async fn get_capture_power(
&self,
_req: Request<Empty>,
) -> Result<Response<CapturePowerState>, Status> {
self.get_capture_power_reply().await
}
async fn set_capture_power(
&self,
req: Request<SetCapturePowerRequest>,
) -> Result<Response<CapturePowerState>, Status> {
self.set_capture_power_reply(req).await
2025-06-21 05:21:57 -05:00
}
2025-06-01 13:31:22 -05:00
}
2026-04-21 13:31:49 -03:00
fn remote_audio_status(message: String) -> Status {
if message.contains("remote USB gadget is not attached") {
Status::unavailable(message)
} else {
Status::internal(message)
}
}
#[cfg(coverage)]
#[tonic::async_trait]
impl Relay for Handler {
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = VideoStream;
type CaptureAudioStream = AudioStream;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
async fn stream_keyboard(
&self,
req: Request<tonic::Streaming<KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let kb = self.kb.clone();
let report_delay = live_keyboard_report_delay();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
let _ = runtime_support::write_hid_report(&kb, &hid_endpoint(0), &pkt.data).await;
tx.send(Ok(pkt)).await.ok();
if !report_delay.is_zero() {
tokio::time::sleep(report_delay).await;
}
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(32);
let ms = self.ms.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
let _ = runtime_support::write_hid_report(&ms, &hid_endpoint(1), &pkt.data).await;
tx.send(Ok(pkt)).await.ok();
}
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
async fn stream_microphone(
&self,
_req: Request<tonic::Streaming<AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
Err(Status::internal(
"microphone sink unavailable in coverage harness",
))
}
async fn stream_camera(
&self,
_req: Request<tonic::Streaming<VideoPacket>>,
) -> Result<Response<Self::StreamCameraStream>, Status> {
Err(Status::internal(
"camera stream unavailable in coverage harness",
))
}
async fn capture_video(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
self.capture_video_reply(req.into_inner()).await
}
async fn capture_audio(
&self,
_req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
Err(Status::internal(
"audio capture unavailable in coverage harness",
))
}
async fn paste_text(&self, req: Request<PasteRequest>) -> Result<Response<PasteReply>, Status> {
self.paste_text_reply(req).await
}
async fn reset_usb(&self, _req: Request<Empty>) -> Result<Response<ResetUsbReply>, Status> {
self.reset_usb_reply().await
}
async fn get_capture_power(
&self,
_req: Request<Empty>,
) -> Result<Response<CapturePowerState>, Status> {
self.get_capture_power_reply().await
}
async fn set_capture_power(
&self,
req: Request<SetCapturePowerRequest>,
) -> Result<Response<CapturePowerState>, Status> {
self.set_capture_power_reply(req).await
}
}
2025-06-27 06:56:08 -05:00
/*──────────────── main ───────────────────────*/
#[cfg(not(coverage))]
2025-06-27 06:56:08 -05:00
#[tokio::main(worker_threads = 4)]
2025-06-02 20:41:36 -05:00
async fn main() -> anyhow::Result<()> {
2025-06-27 06:56:08 -05:00
let _guard = init_tracing()?;
info!("🚀 {} v{} starting up", PKG_NAME, lesavka_server::VERSION);
2025-06-05 22:44:27 -05:00
2025-06-25 20:00:34 -05:00
panic::set_hook(Box::new(|p| {
let bt = Backtrace::force_capture();
error!("💥 panic: {p}\n{bt}");
}));
2025-11-30 16:16:03 -03:00
let gadget = UsbGadget::new("lesavka");
2026-01-06 11:48:36 -03:00
if std::env::var("LESAVKA_DISABLE_UVC").is_err() {
2026-01-08 00:59:14 -03:00
if std::env::var("LESAVKA_UVC_EXTERNAL").is_ok() {
info!("📷 UVC control helper external; not spawning");
} else {
2026-04-10 14:05:04 -03:00
let bin = uvc_runtime::uvc_ctrl_bin();
tokio::spawn(uvc_runtime::supervise_uvc_control(bin));
2026-01-08 00:59:14 -03:00
}
2026-01-06 04:38:41 -03:00
} else {
info!("📷 UVC disabled (LESAVKA_DISABLE_UVC set)");
2026-01-06 11:48:36 -03:00
}
2025-06-27 06:56:08 -05:00
let handler = Handler::new(gadget.clone()).await?;
2025-06-12 01:57:08 -05:00
2025-06-28 15:45:11 -05:00
info!("🌐 lesavka-server listening on 0.0.0.0:50051");
2025-06-27 06:56:08 -05:00
Server::builder()
.tcp_nodelay(true)
2025-11-30 16:16:03 -03:00
.max_frame_size(Some(2 * 1024 * 1024))
2025-06-27 06:56:08 -05:00
.add_service(RelayServer::new(handler))
2025-07-04 03:41:39 -05:00
.add_service(HandshakeSvc::server())
2025-06-27 19:31:46 -05:00
.add_service(ReflBuilder::configure().build_v1().unwrap())
2025-11-30 16:16:03 -03:00
.serve(([0, 0, 0, 0], 50051).into())
2025-06-27 06:56:08 -05:00
.await?;
2025-06-01 13:31:22 -05:00
Ok(())
}
#[cfg(coverage)]
#[tokio::main(worker_threads = 2)]
async fn main() -> anyhow::Result<()> {
let gadget = UsbGadget::new("lesavka");
let _handler = Handler::new(gadget).await?;
Err(anyhow::anyhow!("coverage mode skips live gRPC serve loop"))
}