fix(relay): harden swap safety and camera startup

This commit is contained in:
Brad Stein 2026-04-20 21:11:33 -03:00
parent 3526bf7b6d
commit d4fab3f958
9 changed files with 189 additions and 35 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.11.29" version = "0.11.30"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -263,15 +263,33 @@ impl LesavkaClientApp {
"📸 using camera settings from server" "📸 using camera settings from server"
); );
} }
let cam = Arc::new(CameraCapture::new( match CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref(), std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref(),
camera_cfg, camera_cfg,
)?); ) {
tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); Ok(cam) => {
let cam = Arc::new(cam);
tokio::spawn(Self::cam_loop(vid_ep.clone(), cam));
}
Err(err) => {
warn!(
"📸 webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}"
);
}
}
} }
if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() {
let mic = Arc::new(MicrophoneCapture::new()?); match MicrophoneCapture::new() {
tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed Ok(mic) => {
let mic = Arc::new(mic);
tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed
}
Err(err) => {
warn!(
"🎤 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}"
);
}
}
} }
/*────────── central reactor ───────────────────*/ /*────────── central reactor ───────────────────*/

View File

@ -92,7 +92,7 @@ impl CameraCapture {
.max(1); .max(1);
let keyframe_interval = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps); let keyframe_interval = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps);
let (enc, kf_prop) = if use_mjpg_source && !output_mjpeg { let (enc, kf_prop) = if use_mjpg_source && !output_mjpeg {
("x264enc", "key-int-max") ("x264enc", Some("key-int-max"))
} else { } else {
Self::choose_encoder() Self::choose_encoder()
}; };
@ -101,11 +101,16 @@ impl CameraCapture {
} }
let _enc_opts = if enc == "x264enc" { let _enc_opts = if enc == "x264enc" {
let bitrate_kbit = env_u32("LESAVKA_CAM_H264_KBIT", 4500); let bitrate_kbit = env_u32("LESAVKA_CAM_H264_KBIT", 4500);
let keyframe_opt = kf_prop
.map(|property| format!(" {property}={keyframe_interval}"))
.unwrap_or_default();
format!( format!(
"{enc} tune=zerolatency speed-preset=faster bitrate={bitrate_kbit} {kf_prop}={keyframe_interval}" "{enc} tune=zerolatency speed-preset=faster bitrate={bitrate_kbit}{keyframe_opt}"
) )
} else if let Some(property) = kf_prop {
format!("{enc} {property}={keyframe_interval}")
} else { } else {
format!("{enc} {kf_prop}={keyframe_interval}") enc.to_string()
}; };
if output_mjpeg { if output_mjpeg {
tracing::info!("📸 outputting MJPEG frames for UVC (quality={jpeg_quality})"); tracing::info!("📸 outputting MJPEG frames for UVC (quality={jpeg_quality})");
@ -337,32 +342,58 @@ impl CameraCapture {
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn choose_encoder() -> (&'static str, &'static str) { fn choose_encoder() -> (&'static str, Option<&'static str>) {
match () { if gst::ElementFactory::find("nvh264enc").is_some() {
_ if gst::ElementFactory::find("nvh264enc").is_some() => ("nvh264enc", "gop-size"), return (
_ if gst::ElementFactory::find("vaapih264enc").is_some() => { "nvh264enc",
("vaapih264enc", "keyframe-period") supported_encoder_property(
} "nvh264enc",
_ if gst::ElementFactory::find("v4l2h264enc").is_some() => ("v4l2h264enc", "idrcount"), &["iframeinterval", "idrinterval", "gop-size"],
_ => ("x264enc", "key-int-max"), ),
);
} }
if gst::ElementFactory::find("vaapih264enc").is_some() {
return (
"vaapih264enc",
supported_encoder_property("vaapih264enc", &["keyframe-period"]),
);
}
if gst::ElementFactory::find("v4l2h264enc").is_some() {
return (
"v4l2h264enc",
supported_encoder_property("v4l2h264enc", &["idrcount"]),
);
}
("x264enc", Some("key-int-max"))
} }
#[cfg(coverage)] #[cfg(coverage)]
fn choose_encoder() -> (&'static str, &'static str) { fn choose_encoder() -> (&'static str, Option<&'static str>) {
match std::env::var("LESAVKA_CAM_TEST_ENCODER") match std::env::var("LESAVKA_CAM_TEST_ENCODER")
.ok() .ok()
.as_deref() .as_deref()
.map(str::trim) .map(str::trim)
{ {
Some("nvh264enc") => ("nvh264enc", "gop-size"), Some("nvh264enc") => ("nvh264enc", None),
Some("vaapih264enc") => ("vaapih264enc", "keyframe-period"), Some("vaapih264enc") => ("vaapih264enc", Some("keyframe-period")),
Some("v4l2h264enc") => ("v4l2h264enc", "idrcount"), Some("v4l2h264enc") => ("v4l2h264enc", Some("idrcount")),
_ => ("x264enc", "key-int-max"), _ => ("x264enc", Some("key-int-max")),
} }
} }
} }
#[cfg(not(coverage))]
fn supported_encoder_property(
encoder: &'static str,
properties: &[&'static str],
) -> Option<&'static str> {
let element = gst::ElementFactory::make(encoder).build().ok()?;
properties
.iter()
.copied()
.find(|property| element.find_property(property).is_some())
}
impl Drop for CameraCapture { impl Drop for CameraCapture {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.pipeline.set_state(gst::State::Null); let _ = self.pipeline.set_state(gst::State::Null);

View File

@ -49,6 +49,8 @@ pub struct InputAggregator {
last_quick_toggle_at: Option<Instant>, last_quick_toggle_at: Option<Instant>,
pending_release_started_at: Option<Instant>, pending_release_started_at: Option<Instant>,
pending_release_timeout: Duration, pending_release_timeout: Duration,
remote_failsafe_started_at: Option<Instant>,
remote_failsafe_timeout: Duration,
#[cfg(not(coverage))] #[cfg(not(coverage))]
routing_control_path: Option<PathBuf>, routing_control_path: Option<PathBuf>,
#[cfg(not(coverage))] #[cfg(not(coverage))]
@ -118,6 +120,8 @@ impl InputAggregator {
last_quick_toggle_at: None, last_quick_toggle_at: None,
pending_release_started_at: None, pending_release_started_at: None,
pending_release_timeout: pending_release_timeout_from_env(), pending_release_timeout: pending_release_timeout_from_env(),
remote_failsafe_started_at: capture_remote_boot.then(Instant::now),
remote_failsafe_timeout: remote_failsafe_timeout_from_env(),
#[cfg(not(coverage))] #[cfg(not(coverage))]
last_routing_request_raw: routing_control_path last_routing_request_raw: routing_control_path
.as_deref() .as_deref()
@ -401,6 +405,14 @@ impl InputAggregator {
self.pending_release_started_at = Some(Instant::now()); self.pending_release_started_at = Some(Instant::now());
} }
if self.remote_failsafe_expired() {
warn!(
"🛟 remote input failsafe expired after {} ms; returning control to this machine",
self.remote_failsafe_timeout.as_millis()
);
self.begin_local_release();
}
if self.pending_release || self.pending_kill { if self.pending_release || self.pending_kill {
let chord_released = if self.pending_keys.is_empty() { let chord_released = if self.pending_keys.is_empty() {
!self !self
@ -469,6 +481,14 @@ impl InputAggregator {
self.pending_release = false; self.pending_release = false;
self.pending_release_started_at = None; self.pending_release_started_at = None;
self.pending_keys.clear(); self.pending_keys.clear();
self.remote_failsafe_started_at =
(!self.remote_failsafe_timeout.is_zero()).then(Instant::now);
if !self.remote_failsafe_timeout.is_zero() {
info!(
"🛟 remote input failsafe armed for {} ms while the swap key path is being re-validated",
self.remote_failsafe_timeout.as_millis()
);
}
self.last_keyboard_report = [0; 8]; self.last_keyboard_report = [0; 8];
} }
@ -478,6 +498,7 @@ impl InputAggregator {
self.publish_routing_state_if_changed(); self.publish_routing_state_if_changed();
return; return;
} }
self.remote_failsafe_started_at = None;
self.remote_capture_enabled.store(false, Ordering::Relaxed); self.remote_capture_enabled.store(false, Ordering::Relaxed);
for k in &mut self.keyboards { for k in &mut self.keyboards {
k.send_empty_report(); k.send_empty_report();
@ -506,6 +527,7 @@ impl InputAggregator {
self.pending_release = false; self.pending_release = false;
self.pending_release_started_at = None; self.pending_release_started_at = None;
self.pending_keys.clear(); self.pending_keys.clear();
self.remote_failsafe_started_at = None;
if focus_launcher { if focus_launcher {
#[cfg(not(coverage))] #[cfg(not(coverage))]
focus_launcher_on_local_if_enabled(); focus_launcher_on_local_if_enabled();
@ -521,6 +543,16 @@ impl InputAggregator {
.is_some_and(|started_at| started_at.elapsed() >= self.pending_release_timeout) .is_some_and(|started_at| started_at.elapsed() >= self.pending_release_timeout)
} }
fn remote_failsafe_expired(&self) -> bool {
!self.released
&& !self.pending_release
&& !self.pending_kill
&& !self.remote_failsafe_timeout.is_zero()
&& self
.remote_failsafe_started_at
.is_some_and(|started_at| started_at.elapsed() >= self.remote_failsafe_timeout)
}
fn capture_pending_keys(&mut self) { fn capture_pending_keys(&mut self) {
self.pending_keys.clear(); self.pending_keys.clear();
for k in &self.keyboards { for k in &self.keyboards {
@ -939,6 +971,14 @@ fn pending_release_timeout_from_env() -> Duration {
Duration::from_millis(millis.max(100)) Duration::from_millis(millis.max(100))
} }
fn remote_failsafe_timeout_from_env() -> Duration {
let millis = std::env::var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS")
.ok()
.and_then(|raw| raw.parse::<u64>().ok())
.unwrap_or(5_000);
Duration::from_millis(millis)
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn focus_launcher_on_local_if_enabled() { fn focus_launcher_on_local_if_enabled() {
if std::env::var("LESAVKA_FOCUS_LAUNCHER_ON_LOCAL") if std::env::var("LESAVKA_FOCUS_LAUNCHER_ON_LOCAL")

View File

@ -111,8 +111,8 @@ const LESAVKA_ICON_SEARCH_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/ass
const LAUNCHER_DEFAULT_WIDTH: i32 = 1380; const LAUNCHER_DEFAULT_WIDTH: i32 = 1380;
const LAUNCHER_DEFAULT_HEIGHT: i32 = 860; const LAUNCHER_DEFAULT_HEIGHT: i32 = 860;
const OPERATIONS_RAIL_WIDTH: i32 = 288; const OPERATIONS_RAIL_WIDTH: i32 = 288;
const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 178; const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 202;
const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 316; const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 360;
pub fn build_launcher_view( pub fn build_launcher_view(
app: &gtk::Application, app: &gtk::Application,
@ -197,7 +197,7 @@ pub fn build_launcher_view(
let staging_row = gtk::Box::new(gtk::Orientation::Horizontal, 8); let staging_row = gtk::Box::new(gtk::Orientation::Horizontal, 8);
staging_row.set_hexpand(true); staging_row.set_hexpand(true);
staging_row.set_vexpand(true); staging_row.set_vexpand(false);
staging_row.set_homogeneous(true); staging_row.set_homogeneous(true);
workspace.append(&staging_row); workspace.append(&staging_row);
@ -209,9 +209,8 @@ pub fn build_launcher_view(
let (devices_panel, devices_body) = let (devices_panel, devices_body) =
build_panel_with_action("Device Staging", Some(device_refresh_button.upcast_ref())); build_panel_with_action("Device Staging", Some(device_refresh_button.upcast_ref()));
devices_panel.set_hexpand(true); devices_panel.set_hexpand(true);
devices_panel.set_vexpand(true); devices_panel.set_vexpand(false);
devices_body.set_spacing(8); devices_body.set_spacing(8);
devices_body.set_vexpand(true);
let control_group = build_subgroup("Control Inputs"); let control_group = build_subgroup("Control Inputs");
let control_stack = gtk::Box::new(gtk::Orientation::Vertical, 10); let control_stack = gtk::Box::new(gtk::Orientation::Vertical, 10);
@ -319,9 +318,8 @@ pub fn build_launcher_view(
let (preview_panel, preview_body) = build_panel("Device Testing"); let (preview_panel, preview_body) = build_panel("Device Testing");
preview_panel.set_hexpand(true); preview_panel.set_hexpand(true);
preview_panel.set_vexpand(true); preview_panel.set_vexpand(false);
preview_body.set_spacing(8); preview_body.set_spacing(8);
preview_body.set_vexpand(true);
let camera_preview = gtk::Picture::new(); let camera_preview = gtk::Picture::new();
camera_preview.set_can_shrink(false); camera_preview.set_can_shrink(false);
camera_preview.set_hexpand(true); camera_preview.set_hexpand(true);
@ -336,18 +334,18 @@ pub fn build_launcher_view(
camera_status.add_css_class("dim-label"); camera_status.add_css_class("dim-label");
camera_status.set_wrap(true); camera_status.set_wrap(true);
camera_status.set_xalign(0.0); camera_status.set_xalign(0.0);
camera_status.set_visible(false);
let camera_preview_shell = gtk::Box::new(gtk::Orientation::Vertical, 0); let camera_preview_shell = gtk::Box::new(gtk::Orientation::Vertical, 0);
camera_preview_shell.set_hexpand(true); camera_preview_shell.set_hexpand(true);
camera_preview_shell.set_vexpand(false); camera_preview_shell.set_vexpand(true);
camera_preview_shell.set_size_request(-1, CAMERA_PREVIEW_VIEWPORT_HEIGHT); camera_preview_shell.set_size_request(-1, CAMERA_PREVIEW_VIEWPORT_HEIGHT);
let camera_preview_frame = gtk::AspectFrame::new(0.5, 0.5, 16.0 / 9.0, false); let camera_preview_frame = gtk::AspectFrame::new(0.5, 0.5, 16.0 / 9.0, false);
camera_preview_frame.set_hexpand(true); camera_preview_frame.set_hexpand(true);
camera_preview_frame.set_vexpand(false); camera_preview_frame.set_vexpand(true);
camera_preview_frame.set_size_request(-1, CAMERA_PREVIEW_VIEWPORT_HEIGHT); camera_preview_frame.set_size_request(-1, CAMERA_PREVIEW_VIEWPORT_HEIGHT);
camera_preview_frame.set_child(Some(&camera_preview)); camera_preview_frame.set_child(Some(&camera_preview));
camera_preview_shell.append(&camera_preview_frame); camera_preview_shell.append(&camera_preview_frame);
let webcam_group = build_subgroup("Webcam Preview"); let webcam_group = build_subgroup("Webcam Preview");
webcam_group.set_vexpand(true);
webcam_group.append(&camera_preview_shell); webcam_group.append(&camera_preview_shell);
webcam_group.append(&camera_status); webcam_group.append(&camera_status);
preview_body.append(&webcam_group); preview_body.append(&webcam_group);
@ -366,6 +364,7 @@ pub fn build_launcher_view(
playback_row.append(&audio_preview_heading); playback_row.append(&audio_preview_heading);
playback_body.append(&playback_row); playback_body.append(&playback_row);
playback_body.append(&audio_check_meter); playback_body.append(&audio_check_meter);
audio_check_detail.set_visible(false);
playback_body.append(&audio_check_detail); playback_body.append(&audio_check_detail);
playback_group.append(&playback_body); playback_group.append(&playback_body);
preview_body.append(&playback_group); preview_body.append(&playback_group);

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.11.29" version = "0.11.30"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -17,6 +17,6 @@ mod tests {
#[test] #[test]
fn banner_includes_version() { fn banner_includes_version() {
assert_eq!(banner("0.11.29"), "lesavka-common CLI (v0.11.29)"); assert_eq!(banner("0.11.30"), "lesavka-common CLI (v0.11.30)");
} }
} }

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.11.29" version = "0.11.30"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -452,6 +452,72 @@ mod inputs_contract {
}); });
} }
#[test]
#[serial]
fn remote_failsafe_timeout_env_uses_default_and_allows_disable() {
with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", None::<&str>, || {
assert_eq!(
remote_failsafe_timeout_from_env(),
Duration::from_millis(5_000)
);
});
with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", Some("0"), || {
assert_eq!(remote_failsafe_timeout_from_env(), Duration::from_millis(0));
});
with_var("LESAVKA_INPUT_REMOTE_FAILSAFE_MS", Some("1500"), || {
assert_eq!(
remote_failsafe_timeout_from_env(),
Duration::from_millis(1_500)
);
});
}
#[test]
fn enable_remote_capture_arms_failsafe_and_local_release_clears_it() {
let mut agg = new_aggregator();
agg.released = true;
agg.pending_release = false;
agg.remote_failsafe_timeout = Duration::from_millis(5_000);
agg.enable_remote_capture();
assert!(
agg.remote_failsafe_started_at.is_some(),
"remote capture should arm the temporary failsafe window"
);
agg.begin_local_release();
assert!(
agg.remote_failsafe_started_at.is_none(),
"returning control locally should clear the failsafe timer"
);
}
#[tokio::test(flavor = "current_thread")]
async fn run_remote_failsafe_returns_control_to_local_machine() {
let mut agg = new_aggregator();
agg.remote_failsafe_timeout = Duration::from_millis(1);
agg.remote_failsafe_started_at =
Some(std::time::Instant::now() - Duration::from_millis(10));
let result = tokio::time::timeout(Duration::from_millis(120), agg.run()).await;
assert!(
result.is_err(),
"run should keep looping after the failsafe returns control locally"
);
assert!(
agg.released,
"failsafe expiry should release devices back to the local machine"
);
assert!(
!agg.pending_release,
"failsafe expiry should complete the local-release handoff"
);
assert!(
agg.remote_failsafe_started_at.is_none(),
"failsafe timer should clear once local control is restored"
);
}
#[test] #[test]
fn observe_quick_toggle_uses_rising_edge_to_avoid_repeat_toggling() { fn observe_quick_toggle_uses_rising_edge_to_avoid_repeat_toggling() {
let mut agg = new_aggregator(); let mut agg = new_aggregator();