lesavka/client/src/app.rs

809 lines
34 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#![cfg_attr(coverage, allow(unused_imports))]
use anyhow::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc};
use tokio_stream::{
StreamExt,
wrappers::{BroadcastStream, errors::BroadcastStreamRecvError},
};
use tonic::{Request, transport::Channel};
use tracing::{debug, error, info, trace, warn};
use winit::{
event::Event,
event_loop::{ControlFlow, EventLoopBuilder},
platform::wayland::EventLoopBuilderExtWayland,
};
use lesavka_common::lesavka::{
AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
relay_client::RelayClient,
};
#[cfg(not(coverage))]
use crate::output::video::{MonitorWindow, UnifiedMonitorWindow};
use crate::{
app_support, handshake, input::camera::CameraCapture, input::inputs::InputAggregator,
input::microphone::MicrophoneCapture, output::audio::AudioOut, paste,
};
pub struct LesavkaClientApp {
aggregator: Option<InputAggregator>,
server_addr: String,
dev_mode: bool,
headless: bool,
kbd_tx: broadcast::Sender<KeyboardReport>,
mou_tx: broadcast::Sender<MouseReport>,
paste_rx: Option<mpsc::UnboundedReceiver<String>>,
remote_capture_enabled: Arc<AtomicBool>,
}
impl LesavkaClientApp {
pub fn new() -> Result<Self> {
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let headless = std::env::var("LESAVKA_HEADLESS").is_ok();
let capture_remote_boot = std::env::var("LESAVKA_CAPTURE_REMOTE")
.map(|value| value != "0")
.unwrap_or(true);
let args = std::env::args().skip(1).collect::<Vec<_>>();
let env_addr = std::env::var("LESAVKA_SERVER_ADDR").ok();
let server_addr = app_support::resolve_server_addr(&args, env_addr.as_deref());
let (kbd_tx, _) = broadcast::channel(1024);
let (mou_tx, _) = broadcast::channel(4096);
let (paste_tx, paste_rx) = mpsc::unbounded_channel();
let (agg, remote_capture_enabled) = if headless {
(None, Arc::new(AtomicBool::new(false)))
} else {
let aggregator = InputAggregator::new_with_capture_mode(
dev_mode,
kbd_tx.clone(),
mou_tx.clone(),
Some(paste_tx),
capture_remote_boot,
);
let remote_capture_enabled = aggregator.remote_capture_enabled_handle();
(Some(aggregator), remote_capture_enabled)
};
Ok(Self {
aggregator: agg,
server_addr,
dev_mode,
headless,
kbd_tx,
mou_tx,
paste_rx: Some(paste_rx),
remote_capture_enabled,
})
}
#[cfg(coverage)]
pub async fn run(&mut self) -> Result<()> {
info!(server = %self.server_addr, "🚦 starting handshake");
let _caps = handshake::negotiate(&self.server_addr).await;
if self.headless {
info!("🧪 headless mode: skipping HID input capture");
} else {
info!("🧪 coverage mode: skipping runtime stream wiring");
}
std::future::pending::<Result<()>>().await
}
#[cfg(not(coverage))]
pub async fn run(&mut self) -> Result<()> {
/*────────── handshake / feature-negotiation ───────────────*/
info!(server = %self.server_addr, "🚦 starting handshake");
let caps = handshake::negotiate(&self.server_addr).await;
tracing::info!("🤝 server capabilities = {:?}", caps);
let camera_cfg = app_support::camera_config_from_caps(&caps);
/*────────── persistent gRPC channels ──────────*/
let hid_ep = Channel::from_shared(self.server_addr.clone())?
.tcp_nodelay(true)
.concurrency_limit(4)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_lazy();
let vid_ep = Channel::from_shared(self.server_addr.clone())?
.initial_connection_window_size(4 << 20)
.initial_stream_window_size(4 << 20)
.tcp_nodelay(true)
.connect_lazy();
let mut agg_task = None;
let mut kbd_loop = None;
let mut mou_loop = None;
let mut paste_task = None;
let paste_rx = self.paste_rx.take();
if !self.headless {
/*────────── input aggregator task (grab after handshake) ─────────────*/
let mut aggregator = self.aggregator.take().expect("InputAggregator present");
info!("⌛ grabbing input devices…");
aggregator.init()?; // grab devices now that handshake succeeded
agg_task = Some(tokio::spawn(async move {
let mut a = aggregator;
a.run().await
}));
/*────────── HID streams (never return) ────────*/
kbd_loop = Some(self.stream_loop_keyboard(hid_ep.clone()));
mou_loop = Some(self.stream_loop_mouse(hid_ep.clone()));
if let Some(rx) = paste_rx {
paste_task = Some(Self::paste_loop(hid_ep.clone(), rx));
}
} else {
info!("🧪 headless mode: skipping HID input capture");
}
/*───────── optional 300s auto-exit in dev mode */
let suicide = async {
if self.dev_mode {
tokio::time::sleep(Duration::from_secs(300)).await;
warn!("💀 dev-mode timeout");
std::process::exit(0);
} else {
std::future::pending::<()>().await
}
};
if !self.headless {
let view_mode = std::env::var("LESAVKA_VIEW_MODE")
.unwrap_or_else(|_| "breakout".to_string())
.to_ascii_lowercase();
let unified_view = view_mode == "unified";
let disable_video_render = std::env::var("LESAVKA_DISABLE_VIDEO_RENDER")
.map(|value| value.trim() != "0")
.unwrap_or(false);
info!(
"🪟 video layout selected: {}",
if unified_view { "unified" } else { "breakout" }
);
if disable_video_render {
info!("🪟 launcher preview active; skipping standalone client video windows");
} else {
/*────────── video rendering thread (winit) ────*/
let video_queue = app_support::sanitize_video_queue(
std::env::var("LESAVKA_VIDEO_QUEUE")
.ok()
.and_then(|v| v.parse::<usize>().ok()),
);
let dump_video = std::env::var("LESAVKA_DUMP_VIDEO").is_ok();
let (video_tx, mut video_rx) =
tokio::sync::mpsc::channel::<VideoPacket>(video_queue);
std::thread::spawn(move || {
gtk::init().expect("GTK initialisation failed");
#[allow(deprecated)]
{
let el = EventLoopBuilder::<()>::new()
.with_any_thread(true)
.build()
.unwrap();
enum Renderer {
Unified(UnifiedMonitorWindow),
Breakout {
left: MonitorWindow,
right: MonitorWindow,
},
}
let renderer = if unified_view {
Renderer::Unified(UnifiedMonitorWindow::new().expect("unified-window"))
} else {
Renderer::Breakout {
left: MonitorWindow::new(0).expect("win0"),
right: MonitorWindow::new(1).expect("win1"),
}
};
let _ = el.run(move |_: Event<()>, elwt| {
elwt.set_control_flow(ControlFlow::WaitUntil(
std::time::Instant::now() + std::time::Duration::from_millis(16),
));
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
while let Ok(pkt) = video_rx.try_recv() {
CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 {
debug!(
"🎥 received {} video packets",
CNT.load(std::sync::atomic::Ordering::Relaxed)
);
}
if dump_video {
static DUMP_CNT: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);
let n =
DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let eye = if pkt.id == 0 { "l" } else { "r" };
let path = format!("/tmp/eye{eye}-cli-{n:05}.h264");
std::fs::write(&path, &pkt.data).ok();
}
match &renderer {
Renderer::Unified(window) => window.push_packet(pkt),
Renderer::Breakout { left, right } => match pkt.id {
0 => left.push_packet(pkt),
1 => right.push_packet(pkt),
_ => {}
},
}
}
});
}
});
/*────────── start video gRPC pullers ──────────*/
let ep_video = vid_ep.clone();
tokio::spawn(Self::video_loop(ep_video, video_tx));
}
/*────────── audio renderer & puller ───────────*/
if std::env::var("LESAVKA_AUDIO_DISABLE").is_err() {
let audio_out = AudioOut::new()?;
let ep_audio = vid_ep.clone();
tokio::spawn(Self::audio_loop(ep_audio, audio_out));
} else {
info!("🔇 remote audio disabled for this relay session");
}
} else {
info!("🧪 headless mode: skipping video/audio renderers");
}
/*────────── camera & mic tasks (gated by caps) ───────────*/
if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() {
if let Some(cfg) = camera_cfg {
info!(
codec = ?cfg.codec,
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
"📸 using camera settings from server"
);
}
let ep = vid_ep.clone();
let cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok();
tokio::spawn(async move {
let result = tokio::task::spawn_blocking(move || {
CameraCapture::new(cam_source.as_deref(), camera_cfg)
})
.await;
match result {
Ok(Ok(cam)) => {
let cam = Arc::new(cam);
tokio::spawn(Self::cam_loop(ep, cam));
}
Ok(Err(err)) => {
warn!(
"📸 webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}"
);
}
Err(err) => {
warn!(
"📸 webcam uplink setup task failed before StreamCamera could start: {err}"
);
}
}
});
}
if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() {
let ep = vid_ep.clone();
tokio::spawn(async move {
let result = tokio::task::spawn_blocking(MicrophoneCapture::new).await;
match result {
Ok(Ok(mic)) => {
let mic = Arc::new(mic);
tokio::spawn(Self::voice_loop(ep, mic));
}
Ok(Err(err)) => {
warn!(
"🎤 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}"
);
}
Err(err) => {
warn!(
"🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}"
);
}
}
});
}
/*────────── central reactor ───────────────────*/
if self.headless {
tokio::select! {
_ = suicide => { /* handled above */ },
}
} else {
let kbd_loop = kbd_loop.expect("kbd_loop");
let mou_loop = mou_loop.expect("mou_loop");
let agg_task = agg_task.expect("agg_task");
let paste_task = paste_task.expect("paste_task");
tokio::select! {
_ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); },
_ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); },
_ = paste_task => { warn!("⚠️📋 paste loop finished"); },
_ = suicide => { /* handled above */ },
r = agg_task => {
match r {
Ok(Ok(())) => warn!("input aggregator terminated cleanly"),
Ok(Err(e)) => error!("input aggregator error: {e:?}"),
Err(join_err) => error!("aggregator task panicked: {join_err:?}"),
}
return Ok(());
}
}
}
// The branches above either loop forever or exit the process; this
// point is unreachable but satisfies the type checker.
#[allow(unreachable_code)]
Ok(())
}
/*──────────────── paste loop ───────────────*/
#[cfg(not(coverage))]
fn paste_loop(
ep: Channel,
mut rx: mpsc::UnboundedReceiver<String>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut cli = RelayClient::new(ep.clone());
while let Some(text) = rx.recv().await {
match paste::build_paste_request(&text) {
Ok(req) => match cli.paste_text(Request::new(req)).await {
Ok(resp) => {
let reply = resp.get_ref();
if !reply.ok {
warn!("📋 paste rejected: {}", reply.error);
} else {
debug!("📋 paste delivered");
}
}
Err(e) => {
warn!("📋 paste failed: {e}");
cli = RelayClient::new(ep.clone());
}
},
Err(e) => {
warn!("📋 paste build failed: {e}");
}
}
}
})
}
/*──────────────── keyboard stream ───────────────*/
#[cfg(not(coverage))]
async fn stream_loop_keyboard(&self, ep: Channel) {
loop {
info!("⌨️🤙 Keyboard dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let capture_enabled = Arc::clone(&self.remote_capture_enabled);
let mut remote_capture_was_enabled = capture_enabled.load(Ordering::Relaxed);
let outbound =
BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(move |report| {
let remote_capture_enabled = capture_enabled.load(Ordering::Relaxed);
keyboard_stream_report(
report,
remote_capture_enabled,
&mut remote_capture_was_enabled,
)
});
match cli.stream_keyboard(Request::new(outbound)).await {
Ok(mut resp) => {
while let Some(msg) = resp.get_mut().message().await.transpose() {
if let Err(e) = msg {
warn!("⌨️ server err: {e}");
break;
}
}
}
Err(e) => warn!("❌⌨️ connect failed: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await; // retry
}
}
/*──────────────── mouse stream ──────────────────*/
#[cfg(not(coverage))]
async fn stream_loop_mouse(&self, ep: Channel) {
loop {
info!("🖱️🤙 Mouse dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let capture_enabled = Arc::clone(&self.remote_capture_enabled);
let mut remote_capture_was_enabled = capture_enabled.load(Ordering::Relaxed);
let outbound =
BroadcastStream::new(self.mou_tx.subscribe()).filter_map(move |report| {
let remote_capture_enabled = capture_enabled.load(Ordering::Relaxed);
mouse_stream_report(
report,
remote_capture_enabled,
&mut remote_capture_was_enabled,
)
});
match cli.stream_mouse(Request::new(outbound)).await {
Ok(mut resp) => {
while let Some(msg) = resp.get_mut().message().await.transpose() {
if let Err(e) = msg {
warn!("🖱️ server err: {e}");
break;
}
}
}
Err(e) => warn!("❌🖱️ connect failed: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
/*──────────────── monitor stream ────────────────*/
#[cfg(not(coverage))]
async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::Sender<VideoPacket>) {
let max_bitrate = std::env::var("LESAVKA_VIDEO_MAX_KBIT")
.ok()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(6_000);
for monitor_id in 0..=1 {
let ep = ep.clone();
let tx = tx.clone();
tokio::spawn(async move {
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest {
id: monitor_id,
max_bitrate,
requested_width: 0,
requested_height: 0,
requested_fps: 0,
source_id: None,
};
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
debug!("🎥🏁 cli video{monitor_id}: stream opened");
while let Some(res) = stream.get_mut().message().await.transpose() {
match res {
Ok(pkt) => {
trace!(
"🎥📥 cli video{monitor_id}: got {}bytes",
pkt.data.len()
);
if tx.send(pkt).await.is_err() {
warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone");
break;
}
}
Err(e) => {
error!("❌🎥 cli video{monitor_id}: gRPC error: {e}");
break;
}
}
}
warn!("⚠️🎥 cli video{monitor_id}: stream ended");
}
Err(e) => error!("❌🎥 video {monitor_id}: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}
}
/*──────────────── audio stream ───────────────*/
#[cfg(not(coverage))]
async fn audio_loop(ep: Channel, out: AudioOut) {
let mut consecutive_source_failures = 0_u32;
let mut last_usb_recovery_at: Option<Instant> = None;
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest {
id: 0,
max_bitrate: 0,
requested_width: 0,
requested_height: 0,
requested_fps: 0,
source_id: None,
};
match cli.capture_audio(Request::new(req)).await {
Ok(mut stream) => {
tracing::info!("🔊 audio stream opened");
let mut packet_count: u64 = 0;
let mut warned_no_packets = false;
loop {
match tokio::time::timeout(
Duration::from_secs(1),
stream.get_mut().message(),
)
.await
{
Ok(Ok(Some(pkt))) => {
packet_count = packet_count.saturating_add(1);
if packet_count <= 8 || packet_count % 600 == 0 {
tracing::info!(
packet = packet_count,
bytes = pkt.data.len(),
remote_pts_us = pkt.pts,
"🔊 audio packet received"
);
}
out.push(pkt);
}
Ok(Ok(None)) => {
tracing::warn!(packets = packet_count, "⚠️🔊 audio stream ended");
break;
}
Ok(Err(err)) => {
let message = err.to_string();
tracing::warn!("❌🔊 audio stream recv error: {message}");
Self::maybe_recover_audio_usb(
&ep,
&mut consecutive_source_failures,
&mut last_usb_recovery_at,
&message,
)
.await;
break;
}
Err(_) => {
if packet_count == 0 && !warned_no_packets {
warned_no_packets = true;
tracing::warn!(
"⚠️🔊 audio stream connected but no packets received yet; source may be idle or unavailable"
);
}
}
}
}
}
Err(e) => {
let message = e.to_string();
tracing::warn!("❌🔊 audio stream err: {message}");
Self::maybe_recover_audio_usb(
&ep,
&mut consecutive_source_failures,
&mut last_usb_recovery_at,
&message,
)
.await;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[cfg(not(coverage))]
async fn maybe_recover_audio_usb(
ep: &Channel,
consecutive_source_failures: &mut u32,
last_usb_recovery_at: &mut Option<Instant>,
message: &str,
) {
if !audio_usb_auto_recover_enabled() || !is_recoverable_remote_audio_error(message) {
return;
}
*consecutive_source_failures = consecutive_source_failures.saturating_add(1);
let threshold = audio_usb_recover_after();
if *consecutive_source_failures < threshold {
tracing::warn!(
failures = *consecutive_source_failures,
threshold,
"🔊🛟 remote speaker capture is unhealthy; waiting before USB recovery"
);
return;
}
let cooldown = audio_usb_recover_cooldown();
if last_usb_recovery_at.is_some_and(|last| last.elapsed() < cooldown) {
tracing::warn!(
cooldown_ms = cooldown.as_millis(),
"🔊🛟 remote speaker capture is still unhealthy, but USB recovery is cooling down"
);
return;
}
*consecutive_source_failures = 0;
*last_usb_recovery_at = Some(Instant::now());
tracing::warn!("🔊🛟 requesting USB gadget recovery for stalled remote speaker capture");
let mut cli = RelayClient::new(ep.clone());
match cli.reset_usb(Request::new(Empty {})).await {
Ok(reply) => {
if reply.into_inner().ok {
tracing::warn!("🔊🛟 USB gadget recovery completed; audio will reconnect");
} else {
tracing::warn!("🔊🛟 USB gadget recovery returned ok=false");
}
}
Err(err) => {
tracing::warn!("🔊🛟 USB gadget recovery failed: {err}");
}
}
}
/*──────────────── mic stream ─────────────────*/
#[cfg(not(coverage))]
async fn voice_loop(ep: Channel, mic: Arc<MicrophoneCapture>) {
let mut delay = Duration::from_secs(1);
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let mut cli = RelayClient::new(ep.clone());
// 1. create a Tokio MPSC channel
let (tx, rx) = tokio::sync::mpsc::channel::<AudioPacket>(256);
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
// 2. spawn a real thread that does the blocking `pull()`
let mic_clone = mic.clone();
std::thread::spawn(move || {
while stop_rx.try_recv().is_err() {
if let Some(pkt) = mic_clone.pull() {
trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len());
let _ = tx.blocking_send(pkt);
}
}
});
// 3. turn `rx` into an async stream for gRPC
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {},
Err(e) => {
// first failure → warn, subsequent ones → debug
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
warn!("❌🎤 connect failed: {e}");
warn!("⚠️🎤 further microphonestream failures will be logged at DEBUG");
} else {
debug!("❌🎤 reconnect failed: {e}");
}
delay = app_support::next_delay(delay);
}
}
let _ = stop_tx.send(());
tokio::time::sleep(delay).await;
}
}
/*──────────────── cam stream ───────────────────*/
#[cfg(not(coverage))]
async fn cam_loop(ep: Channel, cam: Arc<CameraCapture>) {
let mut delay = Duration::from_secs(1);
loop {
let mut cli = RelayClient::new(ep.clone());
let (tx, rx) = tokio::sync::mpsc::channel::<VideoPacket>(256);
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let cam_worker = std::thread::spawn({
let cam = cam.clone();
move || loop {
if stop_rx.try_recv().is_ok() {
break;
}
let Some(pkt) = cam.pull() else {
std::thread::sleep(Duration::from_millis(5));
continue;
};
// TRACE every 120 frames only
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 120 == 0 {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
if tx.blocking_send(pkt).is_err() {
break;
}
}
});
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_camera(Request::new(outbound)).await {
Ok(mut resp) => {
delay = Duration::from_secs(1); // got a stream → reset
while resp.get_mut().message().await.transpose().is_some() {}
}
Err(e) if e.code() == tonic::Code::Unimplemented => {
tracing::warn!("📸 server does not support StreamCamera giving up");
let _ = stop_tx.send(());
let _ = cam_worker.join();
return; // stop the task completely (#3)
}
Err(e) => {
tracing::warn!("❌📸 connect failed: {e:?}");
delay = app_support::next_delay(delay); // back-off (#2)
}
}
let _ = stop_tx.send(());
let _ = cam_worker.join();
tokio::time::sleep(delay).await;
}
}
}
#[cfg(not(coverage))]
fn audio_usb_auto_recover_enabled() -> bool {
std::env::var("LESAVKA_AUDIO_AUTO_RECOVER_USB")
.map(|raw| {
!matches!(
raw.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "no" | "off"
)
})
.unwrap_or(false)
}
#[cfg(not(coverage))]
fn audio_usb_recover_after() -> u32 {
std::env::var("LESAVKA_AUDIO_AUTO_RECOVER_AFTER")
.ok()
.and_then(|raw| raw.parse::<u32>().ok())
.filter(|value| *value > 0)
.unwrap_or(3)
}
#[cfg(not(coverage))]
fn audio_usb_recover_cooldown() -> Duration {
let millis = std::env::var("LESAVKA_AUDIO_AUTO_RECOVER_COOLDOWN_MS")
.ok()
.and_then(|raw| raw.parse::<u64>().ok())
.unwrap_or(60_000);
Duration::from_millis(millis)
}
#[cfg(not(coverage))]
fn is_recoverable_remote_audio_error(message: &str) -> bool {
message.contains("remote speaker capture produced no audio samples")
|| message.contains("remote speaker capture stalled")
|| message.contains("remote speaker capture cadence is too low")
}
pub(crate) fn keyboard_stream_report(
report: Result<KeyboardReport, BroadcastStreamRecvError>,
remote_capture_enabled: bool,
remote_capture_was_enabled: &mut bool,
) -> Option<KeyboardReport> {
if !remote_capture_enabled {
let emit_reset = *remote_capture_was_enabled;
*remote_capture_was_enabled = false;
return emit_reset.then_some(KeyboardReport { data: vec![0; 8] });
}
*remote_capture_was_enabled = true;
match report {
Ok(report) => Some(report),
Err(BroadcastStreamRecvError::Lagged(skipped)) => {
warn!(
skipped,
"⌨️ live keyboard stream lagged; sending a clean reset report before continuing"
);
Some(KeyboardReport { data: vec![0; 8] })
}
}
}
pub(crate) fn mouse_stream_report(
report: Result<MouseReport, BroadcastStreamRecvError>,
remote_capture_enabled: bool,
remote_capture_was_enabled: &mut bool,
) -> Option<MouseReport> {
if !remote_capture_enabled {
let emit_reset = *remote_capture_was_enabled;
*remote_capture_was_enabled = false;
return emit_reset.then_some(MouseReport { data: vec![0; 4] });
}
*remote_capture_was_enabled = true;
match report {
Ok(report) => Some(report),
Err(BroadcastStreamRecvError::Lagged(skipped)) => {
warn!(
skipped,
"🖱️ live mouse stream lagged; sending a neutral report before continuing"
);
Some(MouseReport { data: vec![0; 4] })
}
}
}