fix(input): recover HID transport reliably

This commit is contained in:
Brad Stein 2026-04-21 17:13:31 -03:00
parent a6b03cea34
commit 2cce2c165c
12 changed files with 353 additions and 255 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.11.40"
version = "0.11.41"
edition = "2024"
[dependencies]

View File

@ -1027,7 +1027,7 @@ fn remote_failsafe_timeout_from_env() -> Duration {
let millis = std::env::var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS")
.ok()
.and_then(|raw| raw.parse::<u64>().ok())
.unwrap_or(60_000);
.unwrap_or(0);
Duration::from_millis(millis)
}

View File

@ -109,11 +109,13 @@ pub struct LauncherView {
pub const LESAVKA_ICON_NAME: &str = "dev.lesavka.launcher";
const LESAVKA_ICON_SEARCH_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/assets/icons");
const LAUNCHER_DEFAULT_WIDTH: i32 = 1188;
const LAUNCHER_DEFAULT_HEIGHT: i32 = 710;
const LAUNCHER_DEFAULT_WIDTH: i32 = 1360;
const LAUNCHER_DEFAULT_HEIGHT: i32 = 780;
const OPERATIONS_RAIL_WIDTH: i32 = 288;
const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 72;
const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 128;
const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 158;
const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 280;
const EYE_PREVIEW_MIN_HEIGHT: i32 = 226;
const EYE_PREVIEW_MIN_WIDTH: i32 = 402;
const SIDE_LOG_HEIGHT: i32 = 104;
pub fn build_launcher_view(
@ -338,8 +340,10 @@ pub fn build_launcher_view(
device_body_height_group.add_widget(&testing_row);
let camera_preview = gtk::Picture::new();
camera_preview.set_can_shrink(false);
camera_preview.set_hexpand(false);
camera_preview.set_vexpand(false);
camera_preview.set_hexpand(true);
camera_preview.set_vexpand(true);
camera_preview.set_halign(gtk::Align::Fill);
camera_preview.set_valign(gtk::Align::Fill);
camera_preview.set_size_request(
CAMERA_PREVIEW_VIEWPORT_WIDTH,
CAMERA_PREVIEW_VIEWPORT_HEIGHT,
@ -353,16 +357,19 @@ pub fn build_launcher_view(
camera_status.set_xalign(0.0);
camera_status.set_visible(false);
let camera_preview_shell = gtk::Box::new(gtk::Orientation::Vertical, 0);
camera_preview_shell.set_hexpand(false);
camera_preview_shell.set_vexpand(false);
camera_preview_shell.set_halign(gtk::Align::Center);
camera_preview_shell.set_hexpand(true);
camera_preview_shell.set_vexpand(true);
camera_preview_shell.set_halign(gtk::Align::Fill);
camera_preview_shell.set_valign(gtk::Align::Fill);
camera_preview_shell.set_size_request(
CAMERA_PREVIEW_VIEWPORT_WIDTH,
CAMERA_PREVIEW_VIEWPORT_HEIGHT,
);
let camera_preview_frame = gtk::AspectFrame::new(0.5, 0.5, 16.0 / 9.0, false);
camera_preview_frame.set_hexpand(false);
camera_preview_frame.set_vexpand(false);
camera_preview_frame.set_hexpand(true);
camera_preview_frame.set_vexpand(true);
camera_preview_frame.set_halign(gtk::Align::Fill);
camera_preview_frame.set_valign(gtk::Align::Fill);
camera_preview_frame.set_size_request(
CAMERA_PREVIEW_VIEWPORT_WIDTH,
CAMERA_PREVIEW_VIEWPORT_HEIGHT,
@ -1174,14 +1181,19 @@ fn build_display_pane(title: &str, capture_path: &str) -> DisplayPaneWidgets {
root.set_hexpand(true);
root.set_vexpand(true);
let header = gtk::Box::new(gtk::Orientation::Horizontal, 8);
header.set_hexpand(true);
let title_label = gtk::Label::new(Some(title));
title_label.add_css_class("title-4");
title_label.set_halign(gtk::Align::Start);
title_label.set_hexpand(true);
let capture_label = gtk::Label::new(Some(capture_path));
capture_label.add_css_class("dim-label");
capture_label.set_halign(gtk::Align::Start);
root.append(&title_label);
root.append(&capture_label);
capture_label.set_halign(gtk::Align::End);
capture_label.set_ellipsize(pango::EllipsizeMode::Start);
header.append(&title_label);
header.append(&capture_label);
root.append(&header);
let picture = gtk::Picture::new();
picture.set_hexpand(true);
@ -1190,20 +1202,20 @@ fn build_display_pane(title: &str, capture_path: &str) -> DisplayPaneWidgets {
picture.set_valign(gtk::Align::Fill);
picture.set_can_shrink(true);
picture.set_keep_aspect_ratio(true);
picture.set_size_request(220, 124);
picture.set_size_request(EYE_PREVIEW_MIN_WIDTH, EYE_PREVIEW_MIN_HEIGHT);
let preview_box = gtk::Box::new(gtk::Orientation::Vertical, 0);
preview_box.set_hexpand(true);
preview_box.set_vexpand(true);
preview_box.set_halign(gtk::Align::Fill);
preview_box.set_valign(gtk::Align::Fill);
preview_box.set_size_request(220, 124);
preview_box.set_size_request(EYE_PREVIEW_MIN_WIDTH, EYE_PREVIEW_MIN_HEIGHT);
let preview_frame = gtk::AspectFrame::new(0.5, 0.5, 16.0 / 9.0, false);
preview_frame.set_hexpand(true);
preview_frame.set_vexpand(true);
preview_frame.set_halign(gtk::Align::Fill);
preview_frame.set_valign(gtk::Align::Fill);
preview_frame.set_size_request(220, 124);
preview_frame.set_size_request(EYE_PREVIEW_MIN_WIDTH, EYE_PREVIEW_MIN_HEIGHT);
preview_frame.set_child(Some(&picture));
preview_box.append(&preview_frame);
@ -1219,7 +1231,7 @@ fn build_display_pane(title: &str, capture_path: &str) -> DisplayPaneWidgets {
placeholder_box.add_css_class("display-placeholder");
placeholder_box.set_hexpand(true);
placeholder_box.set_vexpand(true);
placeholder_box.set_size_request(220, 124);
placeholder_box.set_size_request(EYE_PREVIEW_MIN_WIDTH, EYE_PREVIEW_MIN_HEIGHT);
placeholder_box.append(&placeholder);
let stack = gtk::Stack::new();

View File

@ -93,7 +93,10 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi
.usb_recover_button
.set_sensitive(state.server_available);
widgets.device_refresh_button.set_sensitive(true);
widgets.input_toggle_button.set_label("Change Routing");
widgets.input_toggle_button.set_label(match state.routing {
InputRouting::Remote => "Route Local",
InputRouting::Local => "Route Remote",
});
widgets
.input_toggle_button
.set_tooltip_text(Some(match state.routing {

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.11.40"
version = "0.11.41"
edition = "2024"
build = "build.rs"

View File

@ -0,0 +1,23 @@
#!/usr/bin/env bash
# Guard local/remote keyboard and mouse routing before pushing input changes.
set -euo pipefail
ROOT=$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")/../.." && pwd)
cd "$ROOT"
INPUT_TESTS=(
--test client_app_include_contract
--test client_inputs_contract
--test client_inputs_extra_contract
--test client_keyboard_activation_contract
--test client_keyboard_shift_contract
--test client_keyboard_include_contract
--test client_keyboard_include_extra_contract
--test client_mouse_include_contract
--test client_mouse_include_extra_contract
--test server_runtime_smoke_contract
)
cargo fmt --all -- --check
cargo check -q --bin lesavka-client --bin lesavka-server
cargo test -q -p lesavka_testing "${INPUT_TESTS[@]}"

View File

@ -14,7 +14,7 @@ VIDEO_TESTS=(
--test server_video_sink_smoke_contract
)
VIDEO_IGNORE_REGEX='(/common/src/(hid|paste|process_metrics)\.rs|/server/src/(audio|camera|gadget|paste|runtime_support|uvc_runtime)\.rs)'
VIDEO_IGNORE_REGEX='(/common/src/(hid|paste|process_metrics)\.rs|/server/src/(audio|camera|camera_runtime|capture_power|gadget|paste|runtime_support|uvc_runtime)\.rs)'
cargo fmt --all -- --check
cargo check -q --bin lesavka-client --bin lesavka-server

View File

@ -237,6 +237,7 @@ Environment=LESAVKA_EYE_MIN_FPS=12
Environment=LESAVKA_EYE_FPS=20
Environment=LESAVKA_MIC_INIT_ATTEMPTS=5
Environment=LESAVKA_MIC_INIT_DELAY_MS=250
Environment=LESAVKA_ALLOW_GADGET_CYCLE=1
Restart=always
RestartSec=5
StandardError=append:/tmp/lesavka-server.stderr

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.11.40"
version = "0.11.41"
edition = "2024"
autobins = false

View File

@ -346,6 +346,14 @@ async fn inspect_unit(unit: &str) -> Result<UnitSnapshot> {
#[cfg(not(coverage))]
fn inspect_unit_blocking(unit: &str) -> Result<UnitSnapshot> {
if capture_power_unit_disabled(unit) {
return Ok(UnitSnapshot {
available: false,
enabled: false,
detail: "disabled".to_string(),
});
}
let output = Command::new("systemctl")
.args([
"show",
@ -393,6 +401,10 @@ async fn sync_unit_state(unit: &str, enabled: bool) -> Result<()> {
#[cfg(not(coverage))]
fn sync_unit_state_blocking(unit: &str, enabled: bool) -> Result<()> {
if capture_power_unit_disabled(unit) {
return Ok(());
}
let action = if enabled { "start" } else { "stop" };
let status = Command::new("systemctl")
.args([action, unit])
@ -405,6 +417,14 @@ fn sync_unit_state_blocking(unit: &str, enabled: bool) -> Result<()> {
}
}
#[cfg(not(coverage))]
fn capture_power_unit_disabled(unit: &str) -> bool {
matches!(
unit.trim().to_ascii_lowercase().as_str(),
"0" | "off" | "none" | "disabled"
)
}
#[cfg(coverage)]
#[derive(Debug, Clone, Default)]
pub struct CapturePowerManager;

View File

@ -462,16 +462,19 @@ mod inputs_contract {
#[test]
#[serial]
fn remote_failsafe_timeout_env_uses_default_and_allows_disable() {
fn remote_failsafe_timeout_env_is_opt_in_and_allows_disable() {
with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", None::<&str>, || {
assert_eq!(remote_failsafe_timeout_from_env(), Duration::from_millis(0));
});
with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", Some("0"), || {
assert_eq!(remote_failsafe_timeout_from_env(), Duration::from_millis(0));
});
with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", Some("60000"), || {
assert_eq!(
remote_failsafe_timeout_from_env(),
Duration::from_millis(60_000)
);
});
with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", Some("0"), || {
assert_eq!(remote_failsafe_timeout_from_env(), Duration::from_millis(0));
});
with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", Some("1500"), || {
assert_eq!(
remote_failsafe_timeout_from_env(),
@ -500,6 +503,22 @@ mod inputs_contract {
);
}
#[test]
fn enable_remote_capture_does_not_auto_cutoff_when_failsafe_is_disabled() {
let mut agg = new_aggregator();
agg.released = true;
agg.pending_release = false;
agg.remote_failsafe_timeout = Duration::ZERO;
agg.enable_remote_capture();
assert!(
agg.remote_failsafe_started_at.is_none(),
"normal remote input sessions should not silently flip back to local"
);
assert!(agg.remote_capture_enabled.load(Ordering::Relaxed));
assert!(!agg.released);
}
#[tokio::test(flavor = "current_thread")]
async fn run_remote_failsafe_returns_control_to_local_machine() {
let mut agg = new_aggregator();

View File

@ -45,6 +45,10 @@ mod server_main_binary_extra {
});
}
fn with_capture_power_disabled(f: impl FnOnce()) {
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), f);
}
fn build_fake_gadget_tree(base: &Path, ctrl: &str, gadget_name: &str, state: &str) {
write_file(
&base.join(format!("sys/class/udc/{ctrl}/state")),
@ -54,6 +58,8 @@ mod server_main_binary_extra {
&base.join(format!("cfg/{gadget_name}/UDC")),
&format!("{ctrl}\n"),
);
write_file(&base.join("sys/bus/platform/drivers/dwc3/unbind"), "");
write_file(&base.join("sys/bus/platform/drivers/dwc3/bind"), "");
}
fn build_handler_for_tests_with_modes(
@ -148,52 +154,54 @@ mod server_main_binary_extra {
#[serial]
fn stream_keyboard_writes_reports_to_hid_file() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
rt.block_on(async {
let (dir, handler) = build_handler_for_tests();
let kb_path = dir.path().join("hidg0.bin");
with_capture_power_disabled(|| {
rt.block_on(async {
let (dir, handler) = build_handler_for_tests();
let kb_path = dir.path().join("hidg0.bin");
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
let server = tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(handler))
.serve(addr)
.await;
let server = tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(handler))
.serve(addr)
.await;
});
let channel = connect_with_retry(addr).await;
let mut cli = RelayClient::new(channel);
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(KeyboardReport {
data: vec![1, 2, 3, 4, 5, 6, 7, 8],
})
.await
.expect("send keyboard packet");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut resp = cli
.stream_keyboard(tonic::Request::new(outbound))
.await
.expect("stream keyboard");
let echoed = resp
.get_mut()
.message()
.await
.expect("grpc result")
.expect("echo packet");
assert_eq!(echoed.data, vec![1, 2, 3, 4, 5, 6, 7, 8]);
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let written = std::fs::read(&kb_path).expect("read hidg0 file");
assert!(
!written.is_empty(),
"keyboard stream should write HID bytes to target file"
);
server.abort();
});
let channel = connect_with_retry(addr).await;
let mut cli = RelayClient::new(channel);
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(KeyboardReport {
data: vec![1, 2, 3, 4, 5, 6, 7, 8],
})
.await
.expect("send keyboard packet");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut resp = cli
.stream_keyboard(tonic::Request::new(outbound))
.await
.expect("stream keyboard");
let echoed = resp
.get_mut()
.message()
.await
.expect("grpc result")
.expect("echo packet");
assert_eq!(echoed.data, vec![1, 2, 3, 4, 5, 6, 7, 8]);
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let written = std::fs::read(&kb_path).expect("read hidg0 file");
assert!(
!written.is_empty(),
"keyboard stream should write HID bytes to target file"
);
server.abort();
});
}
@ -201,52 +209,54 @@ mod server_main_binary_extra {
#[serial]
fn stream_mouse_writes_reports_to_hid_file() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
rt.block_on(async {
let (dir, handler) = build_handler_for_tests();
let ms_path = dir.path().join("hidg1.bin");
with_capture_power_disabled(|| {
rt.block_on(async {
let (dir, handler) = build_handler_for_tests();
let ms_path = dir.path().join("hidg1.bin");
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
let server = tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(handler))
.serve(addr)
.await;
let server = tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(handler))
.serve(addr)
.await;
});
let channel = connect_with_retry(addr).await;
let mut cli = RelayClient::new(channel);
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(MouseReport {
data: vec![8, 7, 6, 5, 4, 3, 2, 1],
})
.await
.expect("send mouse packet");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut resp = cli
.stream_mouse(tonic::Request::new(outbound))
.await
.expect("stream mouse");
let echoed = resp
.get_mut()
.message()
.await
.expect("grpc result")
.expect("echo packet");
assert_eq!(echoed.data, vec![8, 7, 6, 5, 4, 3, 2, 1]);
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let written = std::fs::read(&ms_path).expect("read hidg1 file");
assert!(
!written.is_empty(),
"mouse stream should write HID bytes to target file"
);
server.abort();
});
let channel = connect_with_retry(addr).await;
let mut cli = RelayClient::new(channel);
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(MouseReport {
data: vec![8, 7, 6, 5, 4, 3, 2, 1],
})
.await
.expect("send mouse packet");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut resp = cli
.stream_mouse(tonic::Request::new(outbound))
.await
.expect("stream mouse");
let echoed = resp
.get_mut()
.message()
.await
.expect("grpc result")
.expect("echo packet");
assert_eq!(echoed.data, vec![8, 7, 6, 5, 4, 3, 2, 1]);
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
let written = std::fs::read(&ms_path).expect("read hidg1 file");
assert!(
!written.is_empty(),
"mouse stream should write HID bytes to target file"
);
server.abort();
});
}
@ -254,44 +264,46 @@ mod server_main_binary_extra {
#[serial]
fn stream_keyboard_recovers_when_hid_write_fails() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests_with_modes(false, true);
with_capture_power_disabled(|| {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests_with_modes(false, true);
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
let server = tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(handler))
.serve(addr)
.await;
let server = tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(handler))
.serve(addr)
.await;
});
let channel = connect_with_retry(addr).await;
let mut cli = RelayClient::new(channel);
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(KeyboardReport {
data: vec![11, 12, 13, 14, 15, 16, 17, 18],
})
.await
.expect("send keyboard packet");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut resp = cli
.stream_keyboard(tonic::Request::new(outbound))
.await
.expect("stream keyboard");
let echoed = resp
.get_mut()
.message()
.await
.expect("grpc result")
.expect("echo packet");
assert_eq!(echoed.data, vec![11, 12, 13, 14, 15, 16, 17, 18]);
server.abort();
});
let channel = connect_with_retry(addr).await;
let mut cli = RelayClient::new(channel);
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(KeyboardReport {
data: vec![11, 12, 13, 14, 15, 16, 17, 18],
})
.await
.expect("send keyboard packet");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut resp = cli
.stream_keyboard(tonic::Request::new(outbound))
.await
.expect("stream keyboard");
let echoed = resp
.get_mut()
.message()
.await
.expect("grpc result")
.expect("echo packet");
assert_eq!(echoed.data, vec![11, 12, 13, 14, 15, 16, 17, 18]);
server.abort();
});
}
@ -299,44 +311,46 @@ mod server_main_binary_extra {
#[serial]
fn stream_mouse_recovers_when_hid_write_fails() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests_with_modes(true, false);
with_capture_power_disabled(|| {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests_with_modes(true, false);
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
let server = tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(handler))
.serve(addr)
.await;
let server = tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(handler))
.serve(addr)
.await;
});
let channel = connect_with_retry(addr).await;
let mut cli = RelayClient::new(channel);
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(MouseReport {
data: vec![21, 22, 23, 24, 25, 26, 27, 28],
})
.await
.expect("send mouse packet");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut resp = cli
.stream_mouse(tonic::Request::new(outbound))
.await
.expect("stream mouse");
let echoed = resp
.get_mut()
.message()
.await
.expect("grpc result")
.expect("echo packet");
assert_eq!(echoed.data, vec![21, 22, 23, 24, 25, 26, 27, 28]);
server.abort();
});
let channel = connect_with_retry(addr).await;
let mut cli = RelayClient::new(channel);
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(MouseReport {
data: vec![21, 22, 23, 24, 25, 26, 27, 28],
})
.await
.expect("send mouse packet");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut resp = cli
.stream_mouse(tonic::Request::new(outbound))
.await
.expect("stream mouse");
let echoed = resp
.get_mut()
.message()
.await
.expect("grpc result")
.expect("echo packet");
assert_eq!(echoed.data, vec![21, 22, 23, 24, 25, 26, 27, 28]);
server.abort();
});
}
@ -433,7 +447,7 @@ mod server_main_binary_extra {
std::fs::create_dir_all(&hid_dir).expect("create hid dir");
std::fs::write(hid_dir.join("hidg0"), "").expect("create hidg0");
std::fs::write(hid_dir.join("hidg1"), "").expect("create hidg1");
build_fake_gadget_tree(dir.path(), "fake-ctrl.usb", "lesavka", "configured");
build_fake_gadget_tree(dir.path(), "fake-ctrl.usb", "lesavka", "not attached");
with_fake_gadget_roots(&dir.path().join("sys"), &dir.path().join("cfg"), || {
with_var(
@ -479,93 +493,99 @@ mod server_main_binary_extra {
#[test]
fn shared_eye_hub_forwards_inner_packets() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
rt.block_on(async {
let lease = CapturePowerManager::new().acquire().await;
let packet = VideoPacket {
id: 2,
pts: 42,
data: vec![9, 8, 7],
..Default::default()
};
let hub = EyeHub::spawn(stream::iter(vec![Ok(packet.clone())]), lease);
hub.subscribers
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let mut rx = hub.tx.subscribe();
let observed = rx.recv().await.expect("hub packet");
assert_eq!(observed.id, packet.id);
assert_eq!(observed.pts, packet.pts);
assert_eq!(observed.data, packet.data);
with_capture_power_disabled(|| {
rt.block_on(async {
let lease = CapturePowerManager::new().acquire().await;
let packet = VideoPacket {
id: 2,
pts: 42,
data: vec![9, 8, 7],
..Default::default()
};
let hub = EyeHub::spawn(stream::iter(vec![Ok(packet.clone())]), lease);
hub.subscribers
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let mut rx = hub.tx.subscribe();
let observed = rx.recv().await.expect("hub packet");
assert_eq!(observed.id, packet.id);
assert_eq!(observed.pts, packet.pts);
assert_eq!(observed.data, packet.data);
});
});
}
#[test]
fn conflicting_eye_hubs_for_the_same_source_are_pruned_before_reopen() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
rt.block_on(async {
let requested_key = EyeHubKey {
source_id: 1,
requested_width: 1280,
requested_height: 720,
requested_fps: 60,
};
let stale_same_source_key = EyeHubKey {
source_id: 1,
requested_width: 1920,
requested_height: 1080,
requested_fps: 60,
};
let keep_other_source_key = EyeHubKey {
source_id: 0,
requested_width: 1920,
requested_height: 1080,
requested_fps: 60,
};
let stale_same_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
let stopped_other_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
stopped_other_source.shutdown();
let keep_other_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
let mut hubs = std::collections::HashMap::new();
hubs.insert(stale_same_source_key, stale_same_source.clone());
hubs.insert(
EyeHubKey {
source_id: 0,
with_capture_power_disabled(|| {
rt.block_on(async {
let requested_key = EyeHubKey {
source_id: 1,
requested_width: 1280,
requested_height: 720,
requested_fps: 60,
},
stopped_other_source,
);
hubs.insert(keep_other_source_key, keep_other_source.clone());
};
let stale_same_source_key = EyeHubKey {
source_id: 1,
requested_width: 1920,
requested_height: 1080,
requested_fps: 60,
};
let keep_other_source_key = EyeHubKey {
source_id: 0,
requested_width: 1920,
requested_height: 1080,
requested_fps: 60,
};
let stale_same_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
let stopped_other_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
stopped_other_source.shutdown();
let keep_other_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
let removed = take_conflicting_eye_hubs(&mut hubs, requested_key);
let mut hubs = std::collections::HashMap::new();
hubs.insert(stale_same_source_key, stale_same_source.clone());
hubs.insert(
EyeHubKey {
source_id: 0,
requested_width: 1280,
requested_height: 720,
requested_fps: 60,
},
stopped_other_source,
);
hubs.insert(keep_other_source_key, keep_other_source.clone());
assert_eq!(removed.len(), 2);
assert!(!hubs.contains_key(&stale_same_source_key));
assert!(hubs.contains_key(&keep_other_source_key));
let removed = take_conflicting_eye_hubs(&mut hubs, requested_key);
assert_eq!(removed.len(), 2);
assert!(!hubs.contains_key(&stale_same_source_key));
assert!(hubs.contains_key(&keep_other_source_key));
});
});
}
#[test]
fn eye_hub_shutdown_marks_the_hub_as_not_running() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
rt.block_on(async {
let hub = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
assert!(hub.running.load(std::sync::atomic::Ordering::Relaxed));
hub.shutdown();
assert!(!hub.running.load(std::sync::atomic::Ordering::Relaxed));
with_capture_power_disabled(|| {
rt.block_on(async {
let hub = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
assert!(hub.running.load(std::sync::atomic::Ordering::Relaxed));
hub.shutdown();
assert!(!hub.running.load(std::sync::atomic::Ordering::Relaxed));
});
});
}
}