From d4fab3f95821e9d7046cc59020cb2abc9836488a Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 20 Apr 2026 21:11:33 -0300 Subject: [PATCH] fix(relay): harden swap safety and camera startup --- client/Cargo.toml | 2 +- client/src/app.rs | 28 +++++++++-- client/src/input/camera.rs | 63 +++++++++++++++++------ client/src/input/inputs.rs | 40 +++++++++++++++ client/src/launcher/ui_components.rs | 19 ++++--- common/Cargo.toml | 2 +- common/src/cli.rs | 2 +- server/Cargo.toml | 2 +- testing/tests/client_inputs_contract.rs | 66 +++++++++++++++++++++++++ 9 files changed, 189 insertions(+), 35 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 55a2e54..c222754 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.11.29" +version = "0.11.30" edition = "2024" [dependencies] diff --git a/client/src/app.rs b/client/src/app.rs index e6f5bb3..3e0606c 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -263,15 +263,33 @@ impl LesavkaClientApp { "📸 using camera settings from server" ); } - let cam = Arc::new(CameraCapture::new( + match CameraCapture::new( std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref(), camera_cfg, - )?); - tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); + ) { + Ok(cam) => { + let cam = Arc::new(cam); + tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); + } + Err(err) => { + warn!( + "📸 webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}" + ); + } + } } if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { - let mic = Arc::new(MicrophoneCapture::new()?); - tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed + match MicrophoneCapture::new() { + Ok(mic) => { + let mic = Arc::new(mic); + tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed + } + Err(err) => { + warn!( + "🎤 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}" + ); + } + } } /*────────── central reactor ───────────────────*/ diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index c752aa1..08973b1 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -92,7 +92,7 @@ impl CameraCapture { .max(1); let keyframe_interval = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps); let (enc, kf_prop) = if use_mjpg_source && !output_mjpeg { - ("x264enc", "key-int-max") + ("x264enc", Some("key-int-max")) } else { Self::choose_encoder() }; @@ -101,11 +101,16 @@ impl CameraCapture { } let _enc_opts = if enc == "x264enc" { let bitrate_kbit = env_u32("LESAVKA_CAM_H264_KBIT", 4500); + let keyframe_opt = kf_prop + .map(|property| format!(" {property}={keyframe_interval}")) + .unwrap_or_default(); format!( - "{enc} tune=zerolatency speed-preset=faster bitrate={bitrate_kbit} {kf_prop}={keyframe_interval}" + "{enc} tune=zerolatency speed-preset=faster bitrate={bitrate_kbit}{keyframe_opt}" ) + } else if let Some(property) = kf_prop { + format!("{enc} {property}={keyframe_interval}") } else { - format!("{enc} {kf_prop}={keyframe_interval}") + enc.to_string() }; if output_mjpeg { tracing::info!("📸 outputting MJPEG frames for UVC (quality={jpeg_quality})"); @@ -337,32 +342,58 @@ impl CameraCapture { } #[cfg(not(coverage))] - fn choose_encoder() -> (&'static str, &'static str) { - match () { - _ if gst::ElementFactory::find("nvh264enc").is_some() => ("nvh264enc", "gop-size"), - _ if gst::ElementFactory::find("vaapih264enc").is_some() => { - ("vaapih264enc", "keyframe-period") - } - _ if gst::ElementFactory::find("v4l2h264enc").is_some() => ("v4l2h264enc", "idrcount"), - _ => ("x264enc", "key-int-max"), + fn choose_encoder() -> (&'static str, Option<&'static str>) { + if gst::ElementFactory::find("nvh264enc").is_some() { + return ( + "nvh264enc", + supported_encoder_property( + "nvh264enc", + &["iframeinterval", "idrinterval", "gop-size"], + ), + ); } + if gst::ElementFactory::find("vaapih264enc").is_some() { + return ( + "vaapih264enc", + supported_encoder_property("vaapih264enc", &["keyframe-period"]), + ); + } + if gst::ElementFactory::find("v4l2h264enc").is_some() { + return ( + "v4l2h264enc", + supported_encoder_property("v4l2h264enc", &["idrcount"]), + ); + } + ("x264enc", Some("key-int-max")) } #[cfg(coverage)] - fn choose_encoder() -> (&'static str, &'static str) { + fn choose_encoder() -> (&'static str, Option<&'static str>) { match std::env::var("LESAVKA_CAM_TEST_ENCODER") .ok() .as_deref() .map(str::trim) { - Some("nvh264enc") => ("nvh264enc", "gop-size"), - Some("vaapih264enc") => ("vaapih264enc", "keyframe-period"), - Some("v4l2h264enc") => ("v4l2h264enc", "idrcount"), - _ => ("x264enc", "key-int-max"), + Some("nvh264enc") => ("nvh264enc", None), + Some("vaapih264enc") => ("vaapih264enc", Some("keyframe-period")), + Some("v4l2h264enc") => ("v4l2h264enc", Some("idrcount")), + _ => ("x264enc", Some("key-int-max")), } } } +#[cfg(not(coverage))] +fn supported_encoder_property( + encoder: &'static str, + properties: &[&'static str], +) -> Option<&'static str> { + let element = gst::ElementFactory::make(encoder).build().ok()?; + properties + .iter() + .copied() + .find(|property| element.find_property(property).is_some()) +} + impl Drop for CameraCapture { fn drop(&mut self) { let _ = self.pipeline.set_state(gst::State::Null); diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index 0066437..c8af0b5 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -49,6 +49,8 @@ pub struct InputAggregator { last_quick_toggle_at: Option, pending_release_started_at: Option, pending_release_timeout: Duration, + remote_failsafe_started_at: Option, + remote_failsafe_timeout: Duration, #[cfg(not(coverage))] routing_control_path: Option, #[cfg(not(coverage))] @@ -118,6 +120,8 @@ impl InputAggregator { last_quick_toggle_at: None, pending_release_started_at: None, pending_release_timeout: pending_release_timeout_from_env(), + remote_failsafe_started_at: capture_remote_boot.then(Instant::now), + remote_failsafe_timeout: remote_failsafe_timeout_from_env(), #[cfg(not(coverage))] last_routing_request_raw: routing_control_path .as_deref() @@ -401,6 +405,14 @@ impl InputAggregator { self.pending_release_started_at = Some(Instant::now()); } + if self.remote_failsafe_expired() { + warn!( + "🛟 remote input failsafe expired after {} ms; returning control to this machine", + self.remote_failsafe_timeout.as_millis() + ); + self.begin_local_release(); + } + if self.pending_release || self.pending_kill { let chord_released = if self.pending_keys.is_empty() { !self @@ -469,6 +481,14 @@ impl InputAggregator { self.pending_release = false; self.pending_release_started_at = None; self.pending_keys.clear(); + self.remote_failsafe_started_at = + (!self.remote_failsafe_timeout.is_zero()).then(Instant::now); + if !self.remote_failsafe_timeout.is_zero() { + info!( + "🛟 remote input failsafe armed for {} ms while the swap key path is being re-validated", + self.remote_failsafe_timeout.as_millis() + ); + } self.last_keyboard_report = [0; 8]; } @@ -478,6 +498,7 @@ impl InputAggregator { self.publish_routing_state_if_changed(); return; } + self.remote_failsafe_started_at = None; self.remote_capture_enabled.store(false, Ordering::Relaxed); for k in &mut self.keyboards { k.send_empty_report(); @@ -506,6 +527,7 @@ impl InputAggregator { self.pending_release = false; self.pending_release_started_at = None; self.pending_keys.clear(); + self.remote_failsafe_started_at = None; if focus_launcher { #[cfg(not(coverage))] focus_launcher_on_local_if_enabled(); @@ -521,6 +543,16 @@ impl InputAggregator { .is_some_and(|started_at| started_at.elapsed() >= self.pending_release_timeout) } + fn remote_failsafe_expired(&self) -> bool { + !self.released + && !self.pending_release + && !self.pending_kill + && !self.remote_failsafe_timeout.is_zero() + && self + .remote_failsafe_started_at + .is_some_and(|started_at| started_at.elapsed() >= self.remote_failsafe_timeout) + } + fn capture_pending_keys(&mut self) { self.pending_keys.clear(); for k in &self.keyboards { @@ -939,6 +971,14 @@ fn pending_release_timeout_from_env() -> Duration { Duration::from_millis(millis.max(100)) } +fn remote_failsafe_timeout_from_env() -> Duration { + let millis = std::env::var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS") + .ok() + .and_then(|raw| raw.parse::().ok()) + .unwrap_or(5_000); + Duration::from_millis(millis) +} + #[cfg(not(coverage))] fn focus_launcher_on_local_if_enabled() { if std::env::var("LESAVKA_FOCUS_LAUNCHER_ON_LOCAL") diff --git a/client/src/launcher/ui_components.rs b/client/src/launcher/ui_components.rs index e67e7b6..86980e8 100644 --- a/client/src/launcher/ui_components.rs +++ b/client/src/launcher/ui_components.rs @@ -111,8 +111,8 @@ const LESAVKA_ICON_SEARCH_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/ass const LAUNCHER_DEFAULT_WIDTH: i32 = 1380; const LAUNCHER_DEFAULT_HEIGHT: i32 = 860; const OPERATIONS_RAIL_WIDTH: i32 = 288; -const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 178; -const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 316; +const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 202; +const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 360; pub fn build_launcher_view( app: >k::Application, @@ -197,7 +197,7 @@ pub fn build_launcher_view( let staging_row = gtk::Box::new(gtk::Orientation::Horizontal, 8); staging_row.set_hexpand(true); - staging_row.set_vexpand(true); + staging_row.set_vexpand(false); staging_row.set_homogeneous(true); workspace.append(&staging_row); @@ -209,9 +209,8 @@ pub fn build_launcher_view( let (devices_panel, devices_body) = build_panel_with_action("Device Staging", Some(device_refresh_button.upcast_ref())); devices_panel.set_hexpand(true); - devices_panel.set_vexpand(true); + devices_panel.set_vexpand(false); devices_body.set_spacing(8); - devices_body.set_vexpand(true); let control_group = build_subgroup("Control Inputs"); let control_stack = gtk::Box::new(gtk::Orientation::Vertical, 10); @@ -319,9 +318,8 @@ pub fn build_launcher_view( let (preview_panel, preview_body) = build_panel("Device Testing"); preview_panel.set_hexpand(true); - preview_panel.set_vexpand(true); + preview_panel.set_vexpand(false); preview_body.set_spacing(8); - preview_body.set_vexpand(true); let camera_preview = gtk::Picture::new(); camera_preview.set_can_shrink(false); camera_preview.set_hexpand(true); @@ -336,18 +334,18 @@ pub fn build_launcher_view( camera_status.add_css_class("dim-label"); camera_status.set_wrap(true); camera_status.set_xalign(0.0); + camera_status.set_visible(false); let camera_preview_shell = gtk::Box::new(gtk::Orientation::Vertical, 0); camera_preview_shell.set_hexpand(true); - camera_preview_shell.set_vexpand(false); + camera_preview_shell.set_vexpand(true); camera_preview_shell.set_size_request(-1, CAMERA_PREVIEW_VIEWPORT_HEIGHT); let camera_preview_frame = gtk::AspectFrame::new(0.5, 0.5, 16.0 / 9.0, false); camera_preview_frame.set_hexpand(true); - camera_preview_frame.set_vexpand(false); + camera_preview_frame.set_vexpand(true); camera_preview_frame.set_size_request(-1, CAMERA_PREVIEW_VIEWPORT_HEIGHT); camera_preview_frame.set_child(Some(&camera_preview)); camera_preview_shell.append(&camera_preview_frame); let webcam_group = build_subgroup("Webcam Preview"); - webcam_group.set_vexpand(true); webcam_group.append(&camera_preview_shell); webcam_group.append(&camera_status); preview_body.append(&webcam_group); @@ -366,6 +364,7 @@ pub fn build_launcher_view( playback_row.append(&audio_preview_heading); playback_body.append(&playback_row); playback_body.append(&audio_check_meter); + audio_check_detail.set_visible(false); playback_body.append(&audio_check_detail); playback_group.append(&playback_body); preview_body.append(&playback_group); diff --git a/common/Cargo.toml b/common/Cargo.toml index 8680ab7..6f87a54 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.11.29" +version = "0.11.30" edition = "2024" build = "build.rs" diff --git a/common/src/cli.rs b/common/src/cli.rs index 5be5bee..8628df9 100644 --- a/common/src/cli.rs +++ b/common/src/cli.rs @@ -17,6 +17,6 @@ mod tests { #[test] fn banner_includes_version() { - assert_eq!(banner("0.11.29"), "lesavka-common CLI (v0.11.29)"); + assert_eq!(banner("0.11.30"), "lesavka-common CLI (v0.11.30)"); } } diff --git a/server/Cargo.toml b/server/Cargo.toml index ee489f9..157ac80 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.11.29" +version = "0.11.30" edition = "2024" autobins = false diff --git a/testing/tests/client_inputs_contract.rs b/testing/tests/client_inputs_contract.rs index 309c05e..cd0b021 100644 --- a/testing/tests/client_inputs_contract.rs +++ b/testing/tests/client_inputs_contract.rs @@ -452,6 +452,72 @@ mod inputs_contract { }); } + #[test] + #[serial] + fn remote_failsafe_timeout_env_uses_default_and_allows_disable() { + with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", None::<&str>, || { + assert_eq!( + remote_failsafe_timeout_from_env(), + Duration::from_millis(5_000) + ); + }); + with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", Some("0"), || { + assert_eq!(remote_failsafe_timeout_from_env(), Duration::from_millis(0)); + }); + with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", Some("1500"), || { + assert_eq!( + remote_failsafe_timeout_from_env(), + Duration::from_millis(1_500) + ); + }); + } + + #[test] + fn enable_remote_capture_arms_failsafe_and_local_release_clears_it() { + let mut agg = new_aggregator(); + agg.released = true; + agg.pending_release = false; + agg.remote_failsafe_timeout = Duration::from_millis(5_000); + + agg.enable_remote_capture(); + assert!( + agg.remote_failsafe_started_at.is_some(), + "remote capture should arm the temporary failsafe window" + ); + + agg.begin_local_release(); + assert!( + agg.remote_failsafe_started_at.is_none(), + "returning control locally should clear the failsafe timer" + ); + } + + #[tokio::test(flavor = "current_thread")] + async fn run_remote_failsafe_returns_control_to_local_machine() { + let mut agg = new_aggregator(); + agg.remote_failsafe_timeout = Duration::from_millis(1); + agg.remote_failsafe_started_at = + Some(std::time::Instant::now() - Duration::from_millis(10)); + + let result = tokio::time::timeout(Duration::from_millis(120), agg.run()).await; + assert!( + result.is_err(), + "run should keep looping after the failsafe returns control locally" + ); + assert!( + agg.released, + "failsafe expiry should release devices back to the local machine" + ); + assert!( + !agg.pending_release, + "failsafe expiry should complete the local-release handoff" + ); + assert!( + agg.remote_failsafe_started_at.is_none(), + "failsafe timer should clear once local control is restored" + ); + } + #[test] fn observe_quick_toggle_uses_rising_edge_to_avoid_repeat_toggling() { let mut agg = new_aggregator();