#[cfg(not(coverage))] #[tonic::async_trait] impl Relay for Handler { type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; type CaptureVideoStream = VideoStream; type CaptureAudioStream = AudioStream; type StreamMicrophoneStream = ReceiverStream>; type StreamCameraStream = ReceiverStream>; async fn stream_keyboard( &self, req: Request>, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); info!(rpc_id, "⌨️ stream_keyboard opened"); let (tx, rx) = tokio::sync::mpsc::channel(32); let kb = self.kb.clone(); let ms = self.ms.clone(); let kb_path = hid_endpoint(0); let ms_path = hid_endpoint(1); let gadget = self.gadget.clone(); let did_cycle = self.did_cycle.clone(); let session_lease = self.capture_power.acquire_session().await; let report_delay = live_keyboard_report_delay(); tokio::spawn(async move { let _session_lease = session_lease; let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if let Err(e) = runtime_support::write_hid_report(&kb, &kb_path, &pkt.data).await { if e.raw_os_error() == Some(libc::EAGAIN) { debug!(rpc_id, "⌨️ write would block (dropped)"); } else { warn!(rpc_id, "⌨️ write failed: {e} (dropped)"); runtime_support::recover_hid_if_needed( &e, gadget.clone(), kb.clone(), ms.clone(), kb_path.clone(), ms_path.clone(), did_cycle.clone(), ) .await; } } tx.send(Ok(pkt)).await.ok(); if !report_delay.is_zero() { tokio::time::sleep(report_delay).await; } } info!(rpc_id, "⌨️ stream_keyboard closed"); Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn stream_mouse( &self, req: Request>, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); info!(rpc_id, "🖱️ stream_mouse opened"); let (tx, rx) = tokio::sync::mpsc::channel(1024); let ms = self.ms.clone(); let kb = self.kb.clone(); let kb_path = hid_endpoint(0); let ms_path = hid_endpoint(1); let gadget = self.gadget.clone(); let did_cycle = self.did_cycle.clone(); let session_lease = self.capture_power.acquire_session().await; tokio::spawn(async move { let _session_lease = session_lease; let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if let Err(e) = runtime_support::write_hid_report(&ms, &ms_path, &pkt.data).await { if e.raw_os_error() == Some(libc::EAGAIN) { debug!(rpc_id, "🖱️ write would block (dropped)"); } else { warn!(rpc_id, "🖱️ write failed: {e} (dropped)"); runtime_support::recover_hid_if_needed( &e, gadget.clone(), kb.clone(), ms.clone(), kb_path.clone(), ms_path.clone(), did_cycle.clone(), ) .await; } } tx.send(Ok(pkt)).await.ok(); } info!(rpc_id, "🖱️ stream_mouse closed"); Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } /// Accept synthetic upstream microphone packets without ALSA hardware. async fn stream_microphone( &self, req: Request>, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); let lease = self.upstream_media_rt.activate_microphone(); info!(rpc_id, session_id = lease.session_id, "🎤 stream_microphone opened"); let Some(microphone_sink_permit) = self .upstream_media_rt .reserve_microphone_sink(lease.generation) .await else { info!( rpc_id, session_id = lease.session_id, "🎤 stream_microphone stood down before the sink became available" ); self.upstream_media_rt.close_microphone(lease.generation); return Err(Status::aborted( "microphone stream superseded before sink became available", )); }; let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); info!(%uac_dev, "🎤 stream_microphone using UAC sink"); let mut sink = runtime_support::open_voice_with_retry(&uac_dev) .await .map_err(|e| { self.upstream_media_rt.close_microphone(lease.generation); Status::internal(format!("{e:#}")) })?; let (tx, rx) = tokio::sync::mpsc::channel(1); let upstream_media_rt = self.upstream_media_rt.clone(); tokio::spawn(async move { let _microphone_sink_permit = microphone_sink_permit; let mut cleanup = UpstreamStreamCleanup::microphone( upstream_media_rt.clone(), lease.generation, rpc_id, lease.session_id, ); let mut inbound = req.into_inner(); let mut pending = std::collections::VecDeque::new(); let mut inbound_closed = false; let stale_drop_budget = upstream_stale_drop_budget(); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); loop { if !upstream_media_rt.is_microphone_active(lease.generation) { info!(rpc_id, session_id = lease.session_id, "🎤 stream_microphone session superseded"); cleanup.mark_superseded(); break; } if !inbound_closed { let next_packet = tokio::select! { packet = inbound.next() => Some(packet), _ = tokio::time::sleep(Duration::from_millis(50)) => None, }; if let Some(next_packet) = next_packet { match next_packet.transpose() { Ok(Some(pkt)) => pending.push_back(pkt), Ok(None) => inbound_closed = true, Err(err) => { cleanup.mark_aborted(); warn!( rpc_id, session_id = lease.session_id, "🎤 stream_microphone inbound error before clean EOF: {err}" ); break; } } } } let Some(mut pkt) = pending.pop_front() else { if inbound_closed { cleanup.mark_closed(); break; } continue; }; let plan = match upstream_media_rt.plan_audio_pts(pkt.pts) { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { pending.push_front(pkt); continue; } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { continue; } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; if plan.late_by > stale_drop_budget { tracing::warn!( rpc_id, session_id = lease.session_id, late_by_ms = plan.late_by.as_millis(), pts = plan.local_pts_us, "🎤 upstream audio packet dropped after missing its freshness budget" ); continue; } tokio::time::sleep_until(plan.due_at).await; let actual_late_by = tokio::time::Instant::now() .checked_duration_since(plan.due_at) .unwrap_or_default(); if actual_late_by > stale_drop_budget { tracing::warn!( rpc_id, session_id = lease.session_id, late_by_ms = actual_late_by.as_millis(), pts = plan.local_pts_us, "🎤 upstream audio packet dropped after waking too late for fresh playout" ); continue; } pkt.pts = plan.local_pts_us; let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 5 || n.is_multiple_of(3_000) { tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); } sink.push(&pkt); } sink.finish(); // flush on EOS let _ = tx.send(Ok(Empty {})).await; Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } /// Accept synthetic upstream webcam packets without UVC/HDMI hardware. async fn stream_camera( &self, req: Request>, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); let cfg = camera::current_camera_config(); info!( rpc_id, output = cfg.output.as_str(), codec = cfg.codec.as_str(), width = cfg.width, height = cfg.height, fps = cfg.fps, hdmi = cfg.hdmi.as_ref().map(|h| h.name.as_str()).unwrap_or("none"), "🎥 stream_camera output selected" ); let upstream_lease = self.upstream_media_rt.activate_camera(); let (camera_session_id, relay, _relay_reused) = self.camera_rt.activate(&cfg).await?; let camera_rt = self.camera_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone(); info!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, "🎥 stream_camera opened" ); let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1); let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let mut cleanup = UpstreamStreamCleanup::camera( upstream_media_rt.clone(), upstream_lease.generation, rpc_id, upstream_lease.session_id, camera_session_id, ); let mut s = req.into_inner(); let mut pending = std::collections::VecDeque::new(); let mut inbound_closed = false; let stale_drop_budget = upstream_stale_drop_budget(); let mut startup_video_settled = false; loop { if !camera_rt.is_active(camera_session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) { info!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, "🎥 stream_camera session superseded" ); cleanup.mark_superseded(); break; } if !inbound_closed { let next_packet = tokio::select! { packet = s.next() => Some(packet), _ = tokio::time::sleep(Duration::from_millis(50)) => None, }; if let Some(next_packet) = next_packet { match next_packet.transpose() { Ok(Some(pkt)) => { pending.push_back(pkt); let coalesced = retain_freshest_video_packet(&mut pending); if coalesced > 0 { tracing::debug!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, dropped = coalesced, "🎥 coalesced stale upstream video backlog down to the freshest frame" ); } } Ok(None) => inbound_closed = true, Err(err) => { cleanup.mark_aborted(); warn!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, "🎥 stream_camera inbound error before clean EOF: {err}" ); break; } } } } let Some(mut pkt) = pending.pop_front() else { if inbound_closed { cleanup.mark_closed(); break; } continue; }; let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { if inbound_closed { tracing::debug!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, pts = pkt.pts, "🎥 dropping trailing upstream video frame because no paired audio arrived before stream close" ); continue; } pending.push_front(pkt); continue; } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { continue; } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; if !upstream_media_rt .wait_for_audio_master(plan.local_pts_us, plan.due_at) .await { tracing::warn!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, pts = plan.local_pts_us, "🎥 upstream video frame dropped because the audio master never caught up inside the pairing window" ); continue; } if plan.late_by > stale_drop_budget { let coalesced = retain_freshest_video_packet(&mut pending); if startup_video_settled { tracing::warn!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, late_by_ms = plan.late_by.as_millis(), pts = plan.local_pts_us, dropped_pending = coalesced, "🎥 upstream video frame dropped after missing its freshness budget" ); } else { tracing::debug!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, late_by_ms = plan.late_by.as_millis(), pts = plan.local_pts_us, dropped_pending = coalesced, "🎥 dropping startup-stale upstream video until the playout window settles" ); } continue; } tokio::time::sleep_until(plan.due_at).await; let actual_late_by = tokio::time::Instant::now() .checked_duration_since(plan.due_at) .unwrap_or_default(); if actual_late_by > stale_drop_budget { let coalesced = retain_freshest_video_packet(&mut pending); if startup_video_settled { tracing::warn!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, late_by_ms = actual_late_by.as_millis(), pts = plan.local_pts_us, dropped_pending = coalesced, "🎥 upstream video frame dropped after waking too late for fresh playout" ); } else { tracing::debug!( rpc_id, session_id = upstream_lease.session_id, camera_session_id, late_by_ms = actual_late_by.as_millis(), pts = plan.local_pts_us, dropped_pending = coalesced, "🎥 dropping startup-stale upstream video after a late wake until the playout window settles" ); } continue; } pkt.pts = plan.local_pts_us; startup_video_settled = true; relay.feed(pkt); // ← all logging inside video.rs } tx.send(Ok(Empty {})).await.ok(); Ok::<(), Status>(()) }); Ok(Response::new(ReceiverStream::new(rx))) } async fn capture_video( &self, req: Request, ) -> Result, Status> { self.capture_video_reply(req.into_inner()).await } async fn capture_audio( &self, req: Request, ) -> Result, Status> { let rpc_id = runtime_support::next_stream_id(); // Only one speaker stream for now; both 0/1 → same ALSA dev. let _id = req.into_inner().id; // Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging). let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); info!(rpc_id, %dev, "🔊 capture_audio opened"); let s = runtime_support::open_ear_with_retry(&dev, 0) .await .map_err(|e| remote_audio_status(format!("{e:#}")))?; Ok(Response::new(Box::pin(s))) } async fn paste_text(&self, req: Request) -> Result, Status> { self.paste_text_reply(req).await } /*────────────── USB-reset RPC ────────────*/ async fn reset_usb(&self, _req: Request) -> Result, Status> { self.reset_usb_reply().await } async fn get_capture_power( &self, _req: Request, ) -> Result, Status> { self.get_capture_power_reply().await } async fn set_capture_power( &self, req: Request, ) -> Result, Status> { self.set_capture_power_reply(req).await } async fn get_calibration( &self, _req: Request, ) -> Result, Status> { self.get_calibration_reply().await } async fn calibrate( &self, req: Request, ) -> Result, Status> { self.calibrate_reply(req).await } } #[cfg(test)] include!("relay_service_tests.rs");