fix(client): restore audio recovery and input capture

This commit is contained in:
Brad Stein 2026-04-21 12:46:47 -03:00
parent 6fb1ad527e
commit e3f9fd2610
12 changed files with 278 additions and 77 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.11.33"
version = "0.11.34"
edition = "2024"
[dependencies]

View File

@ -3,7 +3,7 @@
use anyhow::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc};
use tokio_stream::{
StreamExt,
@ -18,7 +18,7 @@ use winit::{
};
use lesavka_common::lesavka::{
AudioPacket, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
relay_client::RelayClient,
};
@ -498,6 +498,8 @@ impl LesavkaClientApp {
/*──────────────── audio stream ───────────────*/
#[cfg(not(coverage))]
async fn audio_loop(ep: Channel, out: AudioOut) {
let mut consecutive_source_failures = 0_u32;
let mut last_usb_recovery_at: Option<Instant> = None;
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest {
@ -537,7 +539,15 @@ impl LesavkaClientApp {
break;
}
Ok(Err(err)) => {
tracing::warn!("❌🔊 audio stream recv error: {err}");
let message = err.to_string();
tracing::warn!("❌🔊 audio stream recv error: {message}");
Self::maybe_recover_audio_usb(
&ep,
&mut consecutive_source_failures,
&mut last_usb_recovery_at,
&message,
)
.await;
break;
}
Err(_) => {
@ -551,12 +561,71 @@ impl LesavkaClientApp {
}
}
}
Err(e) => tracing::warn!("❌🔊 audio stream err: {e}"),
Err(e) => {
let message = e.to_string();
tracing::warn!("❌🔊 audio stream err: {message}");
Self::maybe_recover_audio_usb(
&ep,
&mut consecutive_source_failures,
&mut last_usb_recovery_at,
&message,
)
.await;
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
#[cfg(not(coverage))]
async fn maybe_recover_audio_usb(
ep: &Channel,
consecutive_source_failures: &mut u32,
last_usb_recovery_at: &mut Option<Instant>,
message: &str,
) {
if !audio_usb_auto_recover_enabled() || !is_recoverable_remote_audio_error(message) {
return;
}
*consecutive_source_failures = consecutive_source_failures.saturating_add(1);
let threshold = audio_usb_recover_after();
if *consecutive_source_failures < threshold {
tracing::warn!(
failures = *consecutive_source_failures,
threshold,
"🔊🛟 remote speaker capture is unhealthy; waiting before USB recovery"
);
return;
}
let cooldown = audio_usb_recover_cooldown();
if last_usb_recovery_at.is_some_and(|last| last.elapsed() < cooldown) {
tracing::warn!(
cooldown_ms = cooldown.as_millis(),
"🔊🛟 remote speaker capture is still unhealthy, but USB recovery is cooling down"
);
return;
}
*consecutive_source_failures = 0;
*last_usb_recovery_at = Some(Instant::now());
tracing::warn!("🔊🛟 requesting USB gadget recovery for stalled remote speaker capture");
let mut cli = RelayClient::new(ep.clone());
match cli.reset_usb(Request::new(Empty {})).await {
Ok(reply) => {
if reply.into_inner().ok {
tracing::warn!("🔊🛟 USB gadget recovery completed; audio will reconnect");
} else {
tracing::warn!("🔊🛟 USB gadget recovery returned ok=false");
}
}
Err(err) => {
tracing::warn!("🔊🛟 USB gadget recovery failed: {err}");
}
}
}
/*──────────────── mic stream ─────────────────*/
#[cfg(not(coverage))]
async fn voice_loop(ep: Channel, mic: Arc<MicrophoneCapture>) {
@ -655,6 +724,43 @@ impl LesavkaClientApp {
}
}
#[cfg(not(coverage))]
fn audio_usb_auto_recover_enabled() -> bool {
std::env::var("LESAVKA_AUDIO_AUTO_RECOVER_USB")
.map(|raw| {
!matches!(
raw.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "no" | "off"
)
})
.unwrap_or(true)
}
#[cfg(not(coverage))]
fn audio_usb_recover_after() -> u32 {
std::env::var("LESAVKA_AUDIO_AUTO_RECOVER_AFTER")
.ok()
.and_then(|raw| raw.parse::<u32>().ok())
.filter(|value| *value > 0)
.unwrap_or(3)
}
#[cfg(not(coverage))]
fn audio_usb_recover_cooldown() -> Duration {
let millis = std::env::var("LESAVKA_AUDIO_AUTO_RECOVER_COOLDOWN_MS")
.ok()
.and_then(|raw| raw.parse::<u64>().ok())
.unwrap_or(60_000);
Duration::from_millis(millis)
}
#[cfg(not(coverage))]
fn is_recoverable_remote_audio_error(message: &str) -> bool {
message.contains("remote speaker capture produced no audio samples")
|| message.contains("remote speaker capture stalled")
|| message.contains("remote speaker capture cadence is too low")
}
pub(crate) fn keyboard_stream_report(
report: Result<KeyboardReport, BroadcastStreamRecvError>,
remote_capture_enabled: bool,

View File

@ -42,6 +42,8 @@ pub struct InputAggregator {
mice: Vec<MouseAggregator>,
selected_keyboard_path: Option<String>,
selected_mouse_path: Option<String>,
#[cfg(not(coverage))]
known_input_paths: HashSet<PathBuf>,
capture_remote_boot: bool,
quick_toggle_key: Option<KeyCode>,
quick_toggle_down: bool,
@ -113,6 +115,8 @@ impl InputAggregator {
mice: Vec::new(),
selected_keyboard_path: input_device_override_from_env("LESAVKA_KEYBOARD_DEVICE"),
selected_mouse_path: input_device_override_from_env("LESAVKA_MOUSE_DEVICE"),
#[cfg(not(coverage))]
known_input_paths: HashSet::new(),
capture_remote_boot,
quick_toggle_key,
quick_toggle_down: false,
@ -210,8 +214,18 @@ impl InputAggregator {
#[cfg(not(coverage))]
pub fn init(&mut self) -> Result<()> {
let paths = std::fs::read_dir("/dev/input").context("Failed to read /dev/input")?;
let found_any = self.scan_input_devices(self.capture_remote_boot, true)?;
if !found_any {
bail!("No suitable keyboard/mouse devices found or none grabbed.");
}
Ok(())
}
#[cfg(not(coverage))]
fn scan_input_devices(&mut self, remote_active: bool, fail_grab: bool) -> Result<bool> {
let paths = std::fs::read_dir("/dev/input").context("Failed to read /dev/input")?;
let mut found_any = false;
for entry in paths {
@ -224,6 +238,9 @@ impl InputAggregator {
{
continue;
}
if self.known_input_paths.contains(&path) {
continue;
}
let mut dev = match Device::open(&path) {
Ok(d) => d,
@ -240,18 +257,25 @@ impl InputAggregator {
DeviceKind::Keyboard => {
if !matches_selected_input_device(&path, self.selected_keyboard_path.as_deref())
{
self.known_input_paths.insert(path);
continue;
}
if self.capture_remote_boot {
dev.grab()
.with_context(|| format!("grabbing keyboard {path:?}"))?;
if remote_active {
if let Err(err) = dev.grab() {
if fail_grab {
return Err(err)
.with_context(|| format!("grabbing keyboard {path:?}"));
}
warn!("❌ grab keyboard {}: {err}", path.display());
continue;
}
info!(
"🤏🖱️ Grabbed keyboard {:?}",
dev.name().unwrap_or("UNKNOWN")
);
} else {
info!(
"⌨️ local-input boot mode; keyboard left ungrabbed {:?}",
"⌨️ local-input mode; keyboard staged ungrabbed {:?}",
dev.name().unwrap_or("UNKNOWN")
);
}
@ -262,54 +286,57 @@ impl InputAggregator {
self.kbd_tx.clone(),
self.paste_tx.clone(),
);
kbd_agg.set_send(self.capture_remote_boot);
if !self.capture_remote_boot {
kbd_agg.set_send(remote_active);
if !remote_active {
kbd_agg.set_grab(false);
}
self.known_input_paths.insert(path);
self.keyboards.push(kbd_agg);
found_any = true;
continue;
}
DeviceKind::Mouse => {
if !matches_selected_input_device(&path, self.selected_mouse_path.as_deref()) {
self.known_input_paths.insert(path);
continue;
}
if self.capture_remote_boot {
dev.grab()
.with_context(|| format!("grabbing mouse {path:?}"))?;
if remote_active {
if let Err(err) = dev.grab() {
if fail_grab {
return Err(err)
.with_context(|| format!("grabbing mouse {path:?}"));
}
warn!("❌ grab mouse {}: {err}", path.display());
continue;
}
info!("🤏⌨️ Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN"));
} else {
info!(
"🖱️ local-input boot mode; mouse left ungrabbed {:?}",
"🖱️ local-input mode; mouse staged ungrabbed {:?}",
dev.name().unwrap_or("UNKNOWN")
);
}
let mut mouse_agg =
MouseAggregator::new(dev, self.dev_mode, self.mou_tx.clone());
mouse_agg.set_send(self.capture_remote_boot);
if !self.capture_remote_boot {
mouse_agg.set_send(remote_active);
if !remote_active {
mouse_agg.set_grab(false);
}
self.known_input_paths.insert(path);
self.mice.push(mouse_agg);
found_any = true;
continue;
}
DeviceKind::Other => {
debug!(
"Skipping non-kbd/mouse device: {:?}",
dev.name().unwrap_or("UNKNOWN")
);
continue;
self.known_input_paths.insert(path);
}
}
}
if !found_any {
bail!("No suitable keyboard/mouse devices found or none grabbed.");
}
Ok(())
Ok(found_any)
}
#[cfg(coverage)]
@ -362,10 +389,21 @@ impl InputAggregator {
// Example approach: poll each aggregator in a simple loop
let mut tick = interval(Duration::from_millis(10));
let mut current = Layout::SideBySide;
let input_rescan_interval = input_rescan_interval_from_env();
let mut last_input_rescan_at = Instant::now();
self.publish_routing_state_if_changed();
loop {
let mut want_kill = false;
self.process_keyboard_updates();
if !input_rescan_interval.is_zero()
&& last_input_rescan_at.elapsed() >= input_rescan_interval
{
last_input_rescan_at = Instant::now();
let remote_active = self.remote_capture_active();
if let Err(err) = self.scan_input_devices(remote_active, false) {
warn!("⚠️ input device rescan failed: {err:#}");
}
}
for kbd in &self.keyboards {
want_kill |= kbd.magic_kill();
}
@ -553,6 +591,13 @@ impl InputAggregator {
.is_some_and(|started_at| started_at.elapsed() >= self.remote_failsafe_timeout)
}
fn remote_capture_active(&self) -> bool {
!self.released
&& !self.pending_release
&& !self.pending_kill
&& self.remote_capture_enabled.load(Ordering::Relaxed)
}
fn capture_pending_keys(&mut self) {
self.pending_keys.clear();
for k in &self.keyboards {
@ -979,6 +1024,19 @@ fn remote_failsafe_timeout_from_env() -> Duration {
Duration::from_millis(millis)
}
#[cfg(not(coverage))]
fn input_rescan_interval_from_env() -> Duration {
let millis = std::env::var("LESAVKA_INPUT_RESCAN_MS")
.ok()
.and_then(|raw| raw.parse::<u64>().ok())
.unwrap_or(1_000);
if millis == 0 {
Duration::ZERO
} else {
Duration::from_millis(millis.max(250))
}
}
#[cfg(not(coverage))]
fn focus_launcher_on_local_if_enabled() {
if std::env::var("LESAVKA_FOCUS_LAUNCHER_ON_LOCAL")

View File

@ -269,13 +269,9 @@ impl KeyboardAggregator {
return false;
}
// If V is pressed with any paste modifier down, arm a possible paste chord.
// This prevents leaking Ctrl+V / Alt+V while user is completing chord order.
if code == KeyCode::KEY_V
&& value == 1
&& ((self.has_key(KeyCode::KEY_LEFTCTRL) || self.has_key(KeyCode::KEY_RIGHTCTRL))
|| (self.has_key(KeyCode::KEY_LEFTALT) || self.has_key(KeyCode::KEY_RIGHTALT)))
{
// Only intercept the complete configured Lesavka paste chord. Plain
// app shortcuts such as Ctrl+V must keep travelling to the remote HID.
if code == KeyCode::KEY_V && value == 1 && self.paste_chord_active() {
self.paste_chord_armed = true;
}

View File

@ -1159,7 +1159,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let message = if usb_audio_kernel_support_missing() {
"Device staging refreshed. USB audio devices may still stay invisible until the host boots a kernel with snd_usb_audio available; reconnect the relay if you want the live session to use a new webcam, mic, or speaker."
} else {
"Device staging refreshed. Newly attached devices are ready for local tests; reconnect the relay if you want the live session to use a new webcam, mic, or speaker."
"Device staging refreshed. Newly attached devices are ready for local tests; all-keyboard/all-mouse relay sessions will pick up new input devices automatically."
};
widgets_handle.status_label.set_text(message);
}

View File

@ -311,6 +311,7 @@ pub fn build_launcher_view(
audio_check_detail.set_wrap(false);
audio_check_detail.set_ellipsize(pango::EllipsizeMode::End);
audio_check_detail.set_xalign(0.0);
audio_check_detail.set_visible(false);
let audio_check_meter = gtk::ProgressBar::new();
audio_check_meter.add_css_class("audio-check-meter");
audio_check_meter.set_show_text(false);
@ -371,24 +372,22 @@ pub fn build_launcher_view(
playback_group.set_hexpand(false);
playback_group.set_vexpand(false);
playback_group.set_valign(gtk::Align::Start);
playback_group.set_size_request(148, -1);
let playback_body = gtk::Box::new(gtk::Orientation::Horizontal, 8);
playback_group.set_size_request(72, -1);
let playback_body = gtk::Box::new(gtk::Orientation::Vertical, 6);
playback_body.set_halign(gtk::Align::Center);
playback_body.set_vexpand(false);
let playback_controls = gtk::Box::new(gtk::Orientation::Vertical, 6);
playback_controls.set_hexpand(true);
let microphone_replay_button = gtk::Button::with_label("Replay Last 3s");
stabilize_button(&microphone_replay_button, 124);
let microphone_replay_button = gtk::Button::with_label("Replay");
stabilize_button(&microphone_replay_button, 70);
audio_check_meter.set_orientation(gtk::Orientation::Vertical);
audio_check_meter.set_inverted(true);
audio_check_meter.set_hexpand(false);
audio_check_meter.set_vexpand(false);
audio_check_meter.set_size_request(20, CAMERA_PREVIEW_VIEWPORT_HEIGHT - 38);
audio_check_meter.set_halign(gtk::Align::Center);
audio_check_meter.set_size_request(20, CAMERA_PREVIEW_VIEWPORT_HEIGHT - 40);
audio_check_meter.set_show_text(false);
audio_check_meter.set_text(Some("Idle"));
playback_controls.append(&microphone_replay_button);
playback_controls.append(&audio_check_detail);
playback_body.append(&audio_check_meter);
playback_body.append(&playback_controls);
playback_body.append(&microphone_replay_button);
playback_group.append(&playback_body);
testing_row.append(&playback_group);
preview_body.append(&testing_row);

View File

@ -152,9 +152,9 @@ pub fn refresh_test_buttons(widgets: &LauncherWidgets, tests: &mut DeviceTestCon
widgets
.microphone_replay_button
.set_label(if microphone_replay_running {
"Stop Replay"
"Stop"
} else {
"Replay Last 3s"
"Replay"
});
widgets
.microphone_replay_button
@ -586,7 +586,7 @@ fn local_test_detail(
}
if active.is_empty() {
"Local checks are idle. Use Start Preview, Monitor Mic, Replay Last 3s, or Play Tone before you launch."
"Local checks are idle. Use Start Preview, Monitor Mic, Replay, or Play Tone before you launch."
.to_string()
} else {
format!(

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.11.33"
version = "0.11.34"
edition = "2024"
build = "build.rs"

View File

@ -17,6 +17,6 @@ mod tests {
#[test]
fn banner_includes_version() {
assert_eq!(banner("0.11.33"), "lesavka-common CLI (v0.11.33)");
assert_eq!(banner("0.11.34"), "lesavka-common CLI (v0.11.34)");
}
}

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.11.33"
version = "0.11.34"
edition = "2024"
autobins = false

View File

@ -381,34 +381,53 @@ impl Handler {
}
fn restart_uvc_helper() -> anyhow::Result<()> {
for args in [
["reset-failed", "lesavka-uvc.service"].as_slice(),
["restart", "lesavka-uvc.service"].as_slice(),
] {
let output = Command::new("systemctl")
.args(args)
.output()
.with_context(|| format!("running systemctl {}", args.join(" ")))?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
anyhow::bail!(
"systemctl {} failed: {}{}",
args.join(" "),
if stderr.is_empty() {
stdout.as_str()
} else {
stderr.as_str()
},
if stderr.is_empty() || stdout.is_empty() || stderr == stdout {
""
} else {
" / also see stdout"
}
);
}
if std::env::var("LESAVKA_GADGET_SYSFS_ROOT").is_ok()
|| std::env::var("LESAVKA_GADGET_CONFIGFS_ROOT").is_ok()
{
return Ok(());
}
Ok(())
run_systemctl(&["reset-failed", "lesavka-uvc.service"])?;
match run_systemctl(&["restart", "lesavka-uvc.service"]) {
Ok(()) => Ok(()),
Err(err) if uvc_helper_restart_was_dependency_refused(&err.to_string()) => {
warn!(
"lesavka-uvc.service refused a direct restart because it is dependency-managed; USB reset already cycled the gadget"
);
Ok(())
}
Err(err) => Err(err),
}
}
fn run_systemctl(args: &[&str]) -> anyhow::Result<()> {
let output = Command::new("systemctl")
.args(args)
.output()
.with_context(|| format!("running systemctl {}", args.join(" ")))?;
if output.status.success() {
return Ok(());
}
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
anyhow::bail!(
"systemctl {} failed: {}{}",
args.join(" "),
if stderr.is_empty() {
stdout.as_str()
} else {
stderr.as_str()
},
if stderr.is_empty() || stdout.is_empty() || stderr == stdout {
""
} else {
" / also see stdout"
}
);
}
fn uvc_helper_restart_was_dependency_refused(message: &str) -> bool {
message.contains("Operation refused") || message.contains("may be requested by dependency only")
}
impl EyeHub {

View File

@ -240,6 +240,29 @@ mod keyboard_contract_extra {
});
}
#[test]
#[serial]
fn ctrl_v_reaches_remote_when_lesavka_paste_chord_requires_alt() {
let Some(dev) =
open_any_keyboard_device().or_else(|| build_keyboard("lesavka-include-kbd-ctrl-v-app"))
else {
return;
};
let (mut agg, _) = new_aggregator(dev);
agg.paste_enabled = true;
agg.pressed_keys.insert(evdev::KeyCode::KEY_LEFTCTRL);
agg.pressed_keys.insert(evdev::KeyCode::KEY_V);
with_var("LESAVKA_CLIPBOARD_CHORD", Some("ctrl+alt+v"), || {
assert!(
!agg.try_handle_paste_event(evdev::KeyCode::KEY_V, 1),
"plain Ctrl+V should relay as an app shortcut, not trigger Lesavka paste"
);
});
assert!(!agg.paste_chord_armed);
assert!(!agg.paste_chord_consumed);
}
#[test]
#[serial]
fn paste_via_rpc_returns_false_without_sender() {