From fa2233f59e80fd219117c0e8632ccc072b498b56 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 19 Apr 2026 04:24:27 -0300 Subject: [PATCH] lesavka: share mirrored sources and tighten launcher fit --- client/Cargo.toml | 2 +- client/src/launcher/ui.rs | 9 ++ client/src/launcher/ui_components.rs | 47 +++--- common/Cargo.toml | 2 +- common/src/cli.rs | 2 +- server/Cargo.toml | 2 +- server/src/main.rs | 152 +++++++++++++++--- testing/tests/server_main_binary_contract.rs | 3 + .../server_main_binary_extra_contract.rs | 24 +-- testing/tests/server_main_rpc_contract.rs | 62 ++++++- 10 files changed, 241 insertions(+), 64 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index b24116e..ccb19da 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.11.12" +version = "0.11.13" edition = "2024" [dependencies] diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index ac839d1..bfb3f05 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -441,6 +441,13 @@ fn normalize_breakout_limit(width: u32, height: u32) -> (u32, u32) { .unwrap_or((width.max(2), height.max(2))) } +#[cfg(not(coverage))] +fn launcher_default_size(width: u32, height: u32) -> (i32, i32) { + let max_width = width.saturating_sub(48).max(640) as i32; + let max_height = height.saturating_sub(72).max(520) as i32; + (1380.min(max_width), 860.min(max_height)) +} + #[cfg(not(coverage))] fn rebind_inline_preview( preview: &super::preview::LauncherPreview, @@ -618,6 +625,8 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { } let view = build_launcher_view(app, server_addr.as_ref(), &catalog, &state.borrow()); let window = view.window.clone(); + let (launcher_width, launcher_height) = launcher_default_size(display_width, display_height); + window.set_default_size(launcher_width, launcher_height); let server_entry = view.server_entry.clone(); let camera_combo = view.camera_combo.clone(); let microphone_combo = view.microphone_combo.clone(); diff --git a/client/src/launcher/ui_components.rs b/client/src/launcher/ui_components.rs index becc760..dee4555 100644 --- a/client/src/launcher/ui_components.rs +++ b/client/src/launcher/ui_components.rs @@ -102,10 +102,9 @@ pub struct LauncherView { pub const LESAVKA_ICON_NAME: &str = "dev.lesavka.launcher"; const LESAVKA_ICON_SEARCH_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/assets/icons"); -const LAUNCHER_DEFAULT_WIDTH: i32 = 1510; -const LAUNCHER_DEFAULT_HEIGHT: i32 = 930; -const OPERATIONS_RAIL_WIDTH: i32 = 304; -const STAGING_COMBO_WIDTH: i32 = 690; +const LAUNCHER_DEFAULT_WIDTH: i32 = 1380; +const LAUNCHER_DEFAULT_HEIGHT: i32 = 860; +const OPERATIONS_RAIL_WIDTH: i32 = 288; const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 178; const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 316; @@ -263,8 +262,8 @@ pub fn build_launcher_view( media_grid.set_row_spacing(10); media_grid.set_column_spacing(8); media_group.append(&media_grid); - camera_combo.set_size_request(STAGING_COMBO_WIDTH, -1); - speaker_combo.set_size_request(STAGING_COMBO_WIDTH, -1); + camera_combo.set_size_request(0, -1); + speaker_combo.set_size_request(0, -1); attach_device_row(&media_grid, 0, "Camera", &camera_combo, &camera_test_button); attach_device_row( &media_grid, @@ -288,7 +287,7 @@ pub fn build_launcher_view( microphone_test_button.set_tooltip_text(Some( "Monitor the selected microphone through the selected speaker until you stop the test.", )); - microphone_combo.set_size_request(STAGING_COMBO_WIDTH, -1); + microphone_combo.set_size_request(0, -1); attach_device_row( &media_grid, 2, @@ -1173,41 +1172,45 @@ fn build_display_pane(title: &str, capture_path: &str) -> DisplayPaneWidgets { feed_source_combo.set_tooltip_text(Some( "Choose which physical eye feed appears in this pane. Off disables the pane; the opposite-eye option mirrors the other physical feed while preserving a separate stream load for realistic validation.", )); - feed_source_combo.set_size_request(150, -1); + feed_source_combo.set_size_request(118, -1); let capture_resolution_combo = gtk::ComboBoxText::new(); capture_resolution_combo.set_tooltip_text(Some( "Choose the eye-stream source mode for this feed. Source keeps the HDMI device's own H.264 stream; cheaper source-device modes will appear here once the hardware proves it supports them.", )); - capture_resolution_combo.set_size_request(240, -1); + capture_resolution_combo.set_size_request(0, -1); + capture_resolution_combo.set_hexpand(true); let capture_fps_combo = gtk::ComboBoxText::new(); capture_fps_combo.set_tooltip_text(Some( "Source pass-through uses the HDMI device's own cadence. This control will wake back up if we add a proven source-side fps option later.", )); - capture_fps_combo.set_size_request(120, -1); + capture_fps_combo.set_size_request(96, -1); let capture_bitrate_combo = gtk::ComboBoxText::new(); capture_bitrate_combo.set_tooltip_text(Some( "Source pass-through uses the eye device's own H.264 bitrate behavior. This control stays disabled until the hardware exposes a real source-side bitrate mode we can trust.", )); - capture_bitrate_combo.set_size_request(170, -1); + capture_bitrate_combo.set_size_request(0, -1); + capture_bitrate_combo.set_hexpand(true); let breakout_combo = gtk::ComboBoxText::new(); breakout_combo.set_tooltip_text(Some( "Choose the client-side breakout window size for this eye feed. Source Size preserves the feed's own dimensions; Display Size fills the effective monitor size.", )); - breakout_combo.set_size_request(260, -1); + breakout_combo.set_size_request(0, -1); + breakout_combo.set_hexpand(true); let action_button = gtk::Button::with_label("Break Out"); stabilize_button(&action_button, 104); action_button.set_halign(gtk::Align::End); - let capture_row = gtk::Box::new(gtk::Orientation::Horizontal, 8); - capture_row.append(&feed_source_combo); - capture_row.append(&capture_resolution_combo); - capture_row.append(&capture_fps_combo); - capture_row.append(&capture_bitrate_combo); - let breakout_row = gtk::Box::new(gtk::Orientation::Horizontal, 8); - breakout_row.append(&breakout_combo); - breakout_row.append(&action_button); let footer_shell = gtk::Box::new(gtk::Orientation::Vertical, 6); - footer_shell.append(&capture_row); - footer_shell.append(&breakout_row); + let controls_grid = gtk::Grid::new(); + controls_grid.set_column_spacing(8); + controls_grid.set_row_spacing(8); + controls_grid.set_hexpand(true); + controls_grid.attach(&feed_source_combo, 0, 0, 1, 1); + controls_grid.attach(&capture_resolution_combo, 1, 0, 1, 1); + controls_grid.attach(&capture_fps_combo, 0, 1, 1, 1); + controls_grid.attach(&capture_bitrate_combo, 1, 1, 1, 1); + controls_grid.attach(&breakout_combo, 0, 2, 1, 1); + controls_grid.attach(&action_button, 1, 2, 1, 1); + footer_shell.append(&controls_grid); root.append(&footer_shell); DisplayPaneWidgets { diff --git a/common/Cargo.toml b/common/Cargo.toml index 5e72e99..f4ee62a 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.11.12" +version = "0.11.13" edition = "2024" build = "build.rs" diff --git a/common/src/cli.rs b/common/src/cli.rs index 17599ac..a8155d1 100644 --- a/common/src/cli.rs +++ b/common/src/cli.rs @@ -17,6 +17,6 @@ mod tests { #[test] fn banner_includes_version() { - assert_eq!(banner("0.11.12"), "lesavka-common CLI (v0.11.12)"); + assert_eq!(banner("0.11.13"), "lesavka-common CLI (v0.11.13)"); } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 996d26f..33dc615 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.11.12" +version = "0.11.13" edition = "2024" autobins = false diff --git a/server/src/main.rs b/server/src/main.rs index 494f05e..3168643 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -3,9 +3,10 @@ #[allow(clippy::useless_attribute)] #[forbid(unsafe_code)] use futures_util::{Stream, StreamExt}; -use std::sync::atomic::AtomicBool; +use std::collections::HashMap; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc, time::Duration}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, broadcast}; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Server; use tonic::{Request, Response, Status}; @@ -52,6 +53,21 @@ struct Handler { did_cycle: Arc, camera_rt: Arc, capture_power: CapturePowerManager, + eye_hubs: Arc>>>, +} + +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +struct EyeHubKey { + source_id: u32, + requested_width: u32, + requested_height: u32, + requested_fps: u32, +} + +struct EyeHub { + tx: broadcast::Sender, + running: Arc, + subscribers: Arc, } impl Handler { @@ -82,6 +98,7 @@ impl Handler { did_cycle: Arc::new(AtomicBool::new(false)), camera_rt: Arc::new(CameraRuntime::new()), capture_power: CapturePowerManager::new(), + eye_hubs: Arc::new(Mutex::new(HashMap::new())), }) } @@ -98,7 +115,13 @@ impl Handler { req: MonitorRequest, ) -> Result, Status> { let id = req.id; + if id > 1 { + return Err(Status::invalid_argument("monitor id must be 0 or 1")); + } let source_id = req.source_id.unwrap_or(id); + if source_id > 1 { + return Err(Status::invalid_argument("source id must be 0 or 1")); + } let dev = match source_id { 0 => "/dev/lesavka_l_eye", 1 => "/dev/lesavka_r_eye", @@ -121,21 +144,75 @@ impl Handler { debug!(rpc_id, "🎥 streaming {dev}"); } + let hub = self + .eye_hub( + dev, + EyeHubKey { + source_id, + requested_width: req.requested_width, + requested_height: req.requested_height, + requested_fps: req.requested_fps, + }, + req.max_bitrate, + ) + .await?; + + let mut hub_rx = hub.tx.subscribe(); + hub.subscribers.fetch_add(1, Ordering::AcqRel); + let subscribers = Arc::clone(&hub.subscribers); + let (tx, rx) = tokio::sync::mpsc::channel(32); + tokio::spawn(async move { + loop { + match hub_rx.recv().await { + Ok(mut pkt) => { + pkt.id = id; + if tx.send(Ok(pkt)).await.is_err() { + break; + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + subscribers.fetch_sub(1, Ordering::AcqRel); + }); + + Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) + } + + async fn eye_hub( + &self, + dev: &str, + key: EyeHubKey, + max_bitrate_kbit: u32, + ) -> Result, Status> { + let mut hubs = self.eye_hubs.lock().await; + if let Some(hub) = hubs.get(&key) + && hub.running.load(Ordering::Relaxed) + { + return Ok(Arc::clone(hub)); + } + let lease = self.capture_power.acquire().await; let stream = video::eye_ball_with_request( dev, - id, - req.max_bitrate, - req.requested_width, - req.requested_height, - req.requested_fps, + key.source_id, + max_bitrate_kbit, + key.requested_width, + key.requested_height, + key.requested_fps, ) .await .map_err(|e| Status::internal(format!("{e:#}")))?; - Ok(Response::new(Box::pin(GuardedVideoStream { - inner: stream, - _lease: lease, - }))) + + let hub = EyeHub::spawn(stream, lease); + hubs.insert(key, Arc::clone(&hub)); + Ok(hub) + } + + #[cfg(test)] + async fn eye_hub_count(&self) -> usize { + self.eye_hubs.lock().await.len() } async fn paste_text_reply( @@ -204,22 +281,47 @@ impl Handler { } } -struct GuardedVideoStream { - inner: S, - _lease: lesavka_server::capture_power::CapturePowerLease, -} +impl EyeHub { + fn spawn(mut stream: S, lease: lesavka_server::capture_power::CapturePowerLease) -> Arc + where + S: Stream> + Unpin + Send + 'static, + { + let (tx, _) = broadcast::channel(32); + let running = Arc::new(AtomicBool::new(true)); + let subscribers = Arc::new(AtomicUsize::new(0)); + let hub = Arc::new(Self { + tx: tx.clone(), + running: Arc::clone(&running), + subscribers: Arc::clone(&subscribers), + }); -impl Stream for GuardedVideoStream -where - S: Stream> + Unpin, -{ - type Item = Result; + tokio::spawn(async move { + let _lease = lease; + let mut idle_ticks = 0_u32; + while running.load(Ordering::Relaxed) { + match stream.next().await { + Some(Ok(pkt)) => { + let _ = tx.send(pkt); + if subscribers.load(Ordering::Relaxed) == 0 { + idle_ticks = idle_ticks.saturating_add(1); + if idle_ticks >= 60 { + break; + } + } else { + idle_ticks = 0; + } + } + Some(Err(err)) => { + warn!(?err, "shared eye hub stream error"); + break; + } + None => break, + } + } + running.store(false, Ordering::Relaxed); + }); - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - Pin::new(&mut self.inner).poll_next(cx) + hub } } diff --git a/testing/tests/server_main_binary_contract.rs b/testing/tests/server_main_binary_contract.rs index d8b4ea6..085553a 100644 --- a/testing/tests/server_main_binary_contract.rs +++ b/testing/tests/server_main_binary_contract.rs @@ -50,6 +50,9 @@ mod server_main_binary { did_cycle: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), camera_rt: std::sync::Arc::new(CameraRuntime::new()), capture_power: CapturePowerManager::new(), + eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), }, ) } diff --git a/testing/tests/server_main_binary_extra_contract.rs b/testing/tests/server_main_binary_extra_contract.rs index af10a1b..5c551d5 100644 --- a/testing/tests/server_main_binary_extra_contract.rs +++ b/testing/tests/server_main_binary_extra_contract.rs @@ -92,6 +92,9 @@ mod server_main_binary_extra { did_cycle: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), camera_rt: std::sync::Arc::new(CameraRuntime::new()), capture_power: CapturePowerManager::new(), + eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), }, ) } @@ -458,6 +461,9 @@ mod server_main_binary_extra { did_cycle: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), camera_rt: std::sync::Arc::new(CameraRuntime::new()), capture_power: CapturePowerManager::new(), + eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), }; let rt = tokio::runtime::Runtime::new().expect("runtime"); let reply = rt @@ -471,7 +477,7 @@ mod server_main_binary_extra { } #[test] - fn guarded_video_stream_forwards_inner_packets() { + fn shared_eye_hub_forwards_inner_packets() { let rt = tokio::runtime::Runtime::new().expect("runtime"); rt.block_on(async { let lease = CapturePowerManager::new().acquire().await; @@ -481,20 +487,14 @@ mod server_main_binary_extra { data: vec![9, 8, 7], ..Default::default() }; - let mut guarded = GuardedVideoStream { - inner: stream::iter(vec![Ok(packet.clone())]), - _lease: lease, - }; - - let observed = guarded - .next() - .await - .expect("guarded stream item") - .expect("packet"); + let hub = EyeHub::spawn(stream::iter(vec![Ok(packet.clone())]), lease); + hub.subscribers + .fetch_add(1, std::sync::atomic::Ordering::AcqRel); + let mut rx = hub.tx.subscribe(); + let observed = rx.recv().await.expect("hub packet"); assert_eq!(observed.id, packet.id); assert_eq!(observed.pts, packet.pts); assert_eq!(observed.data, packet.data); - assert!(guarded.next().await.is_none()); }); } } diff --git a/testing/tests/server_main_rpc_contract.rs b/testing/tests/server_main_rpc_contract.rs index c88dc00..965b14a 100644 --- a/testing/tests/server_main_rpc_contract.rs +++ b/testing/tests/server_main_rpc_contract.rs @@ -51,6 +51,9 @@ mod server_main_rpc { did_cycle: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), camera_rt: std::sync::Arc::new(CameraRuntime::new()), capture_power: CapturePowerManager::new(), + eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), }, ) } @@ -119,7 +122,7 @@ mod server_main_rpc { #[test] #[cfg(coverage)] #[serial] - fn capture_video_returns_guarded_stream_when_coverage_source_is_overridden() { + fn capture_video_returns_stream_when_coverage_source_is_overridden() { let (_dir, handler) = build_handler_for_tests(); let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var( @@ -151,6 +154,60 @@ mod server_main_rpc { ); } + #[test] + #[cfg(coverage)] + #[serial] + fn mirrored_capture_requests_share_one_source_hub_but_keep_logical_ids() { + let (_dir, handler) = build_handler_for_tests(); + let rt = tokio::runtime::Runtime::new().expect("runtime"); + with_var( + "LESAVKA_TEST_VIDEO_SOURCE", + Some("/dev/lesavka_r_eye"), + || { + let (left_packet, right_packet, hub_count) = rt.block_on(async { + let mut left = handler + .capture_video(tonic::Request::new(MonitorRequest { + id: 0, + max_bitrate: 3_000, + requested_width: 1920, + requested_height: 1080, + requested_fps: 60, + source_id: Some(1), + })) + .await + .expect("left stream") + .into_inner(); + let mut right = handler + .capture_video(tonic::Request::new(MonitorRequest { + id: 1, + max_bitrate: 3_000, + requested_width: 1920, + requested_height: 1080, + requested_fps: 60, + source_id: Some(1), + })) + .await + .expect("right stream") + .into_inner(); + let left_packet = left.next().await.expect("left item").expect("left packet"); + let right_packet = right + .next() + .await + .expect("right item") + .expect("right packet"); + let hub_count = handler.eye_hub_count().await; + (left_packet, right_packet, hub_count) + }); + + assert_eq!(left_packet.id, 0); + assert_eq!(right_packet.id, 1); + assert!(!left_packet.data.is_empty()); + assert!(!right_packet.data.is_empty()); + assert_eq!(hub_count, 1); + }, + ); + } + #[test] #[serial] fn paste_text_accepts_encrypted_payload_and_returns_reply() { @@ -350,6 +407,9 @@ mod server_main_rpc { )), camera_rt: std::sync::Arc::new(CameraRuntime::new()), capture_power: CapturePowerManager::new(), + eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new( + std::collections::HashMap::new(), + )), }; with_var(