diff --git a/client/src/launcher/preview.rs b/client/src/launcher/preview.rs index 9a75b03..6b7a6ab 100644 --- a/client/src/launcher/preview.rs +++ b/client/src/launcher/preview.rs @@ -9,7 +9,7 @@ use gstreamer_app as gst_app; #[cfg(not(coverage))] use gtk::{gdk, glib}; #[cfg(not(coverage))] -use lesavka_common::lesavka::{MonitorRequest, VideoPacket, relay_client::RelayClient}; +use lesavka_common::lesavka::{relay_client::RelayClient, MonitorRequest, VideoPacket}; #[cfg(not(coverage))] use std::sync::atomic::{AtomicBool, Ordering}; #[cfg(not(coverage))] @@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex}; #[cfg(not(coverage))] use std::time::Duration; #[cfg(not(coverage))] -use tonic::{Request, transport::Channel}; +use tonic::{transport::Channel, Request}; #[cfg(not(coverage))] use tracing::{debug, warn}; @@ -25,9 +25,12 @@ use tracing::{debug, warn}; const PREVIEW_WIDTH: i32 = 640; #[cfg(not(coverage))] const PREVIEW_HEIGHT: i32 = 360; +#[cfg(not(coverage))] +const PREVIEW_IDLE_STATUS: &str = "Connect relay to preview."; #[cfg(not(coverage))] pub struct LauncherPreview { + server_addr: Arc>, feeds: [PreviewFeed; 2], } @@ -42,14 +45,28 @@ pub struct PreviewBinding { impl LauncherPreview { pub fn new(server_addr: String) -> Result { gst::init().context("initialising preview gstreamer")?; + let server_addr = Arc::new(Mutex::new(server_addr)); Ok(Self { + server_addr: Arc::clone(&server_addr), feeds: [ - PreviewFeed::spawn(server_addr.clone(), 0)?, + PreviewFeed::spawn(Arc::clone(&server_addr), 0)?, PreviewFeed::spawn(server_addr, 1)?, ], }) } + pub fn set_server_addr(&self, server_addr: String) { + if let Ok(mut slot) = self.server_addr.lock() { + *slot = server_addr; + } + } + + pub fn set_session_active(&self, active: bool) { + for feed in &self.feeds { + feed.set_active(active); + } + } + pub fn install_on_picture( &self, monitor_id: usize, @@ -75,20 +92,78 @@ impl PreviewBinding { #[cfg(not(coverage))] struct PreviewFeed { - latest: Arc>>, + shared: Arc>, + active: Arc, +} + +#[cfg(not(coverage))] +struct SharedPreviewState { + latest: Option, + status: String, + generation: u64, + clear_picture: bool, +} + +#[cfg(not(coverage))] +impl SharedPreviewState { + fn new() -> Self { + Self { + latest: None, + status: PREVIEW_IDLE_STATUS.to_string(), + generation: 1, + clear_picture: true, + } + } + + fn set_status(&mut self, status: impl Into, clear_picture: bool) { + let status = status.into(); + let changed = self.status != status || clear_picture; + self.status = status; + if clear_picture { + self.latest = None; + self.clear_picture = true; + } + if changed { + self.generation = self.generation.saturating_add(1); + } + } + + fn push_frame(&mut self, frame: PreviewFrame) { + self.latest = Some(frame); + self.clear_picture = false; + if self.status != "Live" { + self.status = "Live".to_string(); + self.generation = self.generation.saturating_add(1); + } + } } #[cfg(not(coverage))] impl PreviewFeed { - fn spawn(server_addr: String, monitor_id: u32) -> Result { - let latest = Arc::new(Mutex::new(None)); - let store = Arc::clone(&latest); + fn spawn(server_addr: Arc>, monitor_id: u32) -> Result { + let shared = Arc::new(Mutex::new(SharedPreviewState::new())); + let active = Arc::new(AtomicBool::new(false)); + let shared_state = Arc::clone(&shared); + let active_flag = Arc::clone(&active); std::thread::spawn(move || { - if let Err(err) = run_preview_feed(server_addr, monitor_id, store) { + if let Err(err) = run_preview_feed(server_addr, monitor_id, active_flag, shared_state) { warn!(monitor_id, ?err, "launcher preview feed exited"); } }); - Ok(Self { latest }) + Ok(Self { shared, active }) + } + + fn set_active(&self, active: bool) { + self.active.store(active, Ordering::Relaxed); + if !active { + self.replace_status(PREVIEW_IDLE_STATUS, true); + } + } + + fn replace_status(&self, status: impl Into, clear_picture: bool) { + if let Ok(mut shared) = self.shared.lock() { + shared.set_status(status, clear_picture); + } } fn install_on_picture( @@ -98,11 +173,12 @@ impl PreviewFeed { ) -> PreviewBinding { let picture = picture.clone(); let status_label = status_label.clone(); - let latest = Arc::clone(&self.latest); + let shared = Arc::clone(&self.shared); let enabled = Arc::new(AtomicBool::new(true)); let alive = Arc::new(AtomicBool::new(true)); let enabled_flag = Arc::clone(&enabled); let alive_flag = Arc::clone(&alive); + let mut last_generation = 0_u64; glib::timeout_add_local(Duration::from_millis(120), move || { if !alive_flag.load(Ordering::Relaxed) { return glib::ControlFlow::Break; @@ -110,8 +186,23 @@ impl PreviewFeed { if !enabled_flag.load(Ordering::Relaxed) { return glib::ControlFlow::Continue; } - let next = latest.lock().ok().and_then(|mut slot| slot.take()); - if let Some(frame) = next { + + let (frame, status, generation, clear_picture) = match shared.lock() { + Ok(mut slot) => { + let frame = slot.latest.take(); + let status = slot.status.clone(); + let generation = slot.generation; + let clear_picture = slot.clear_picture; + slot.clear_picture = false; + (frame, status, generation, clear_picture) + } + Err(_) => return glib::ControlFlow::Continue, + }; + + if clear_picture { + picture.set_paintable(Option::<&gdk::Paintable>::None); + } + if let Some(frame) = frame { let bytes = glib::Bytes::from_owned(frame.rgba); let texture = gdk::MemoryTexture::new( frame.width, @@ -121,7 +212,10 @@ impl PreviewFeed { frame.stride, ); picture.set_paintable(Some(&texture)); - status_label.set_text("Live"); + } + if generation != last_generation { + status_label.set_text(&status); + last_generation = generation; } glib::ControlFlow::Continue }); @@ -139,9 +233,10 @@ struct PreviewFrame { #[cfg(not(coverage))] fn run_preview_feed( - server_addr: String, + server_addr: Arc>, monitor_id: u32, - latest: Arc>>, + active: Arc, + shared: Arc>, ) -> Result<()> { let (pipeline, appsrc, appsink) = build_preview_pipeline()?; pipeline @@ -149,15 +244,13 @@ fn run_preview_feed( .context("starting launcher preview pipeline")?; { - let latest = Arc::clone(&latest); + let shared = Arc::clone(&shared); let appsink = appsink.clone(); - std::thread::spawn(move || { - loop { - if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) { - if let Some(frame) = sample_to_frame(&sample) { - if let Ok(mut slot) = latest.lock() { - *slot = Some(frame); - } + std::thread::spawn(move || loop { + if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) { + if let Some(frame) = sample_to_frame(&sample) { + if let Ok(mut slot) = shared.lock() { + slot.push_frame(frame); } } } @@ -171,21 +264,44 @@ fn run_preview_feed( let _ = rt.block_on(async move { loop { - let channel = match Channel::from_shared(server_addr.clone()) { + if !active.load(Ordering::Relaxed) { + set_shared_status(&shared, PREVIEW_IDLE_STATUS, true); + tokio::time::sleep(Duration::from_millis(150)).await; + continue; + } + + set_shared_status(&shared, "Connecting relay preview...", true); + let current_addr = match server_addr.lock() { + Ok(value) => value.clone(), + Err(_) => { + set_shared_status(&shared, "Preview address is unavailable.", true); + tokio::time::sleep(Duration::from_millis(750)).await; + continue; + } + }; + + let channel = match Channel::from_shared(current_addr.clone()) { Ok(endpoint) => match endpoint.tcp_nodelay(true).connect().await { Ok(channel) => channel, Err(err) => { warn!(monitor_id, ?err, "launcher preview connect failed"); + set_shared_status( + &shared, + format!("Preview host is unavailable: {err}"), + true, + ); tokio::time::sleep(Duration::from_millis(750)).await; continue; } }, Err(err) => { warn!(monitor_id, ?err, "launcher preview endpoint invalid"); + set_shared_status(&shared, format!("Preview address is invalid: {err}"), true); tokio::time::sleep(Duration::from_millis(750)).await; continue; } }; + let mut cli = RelayClient::new(channel); let req = MonitorRequest { id: monitor_id, @@ -194,17 +310,39 @@ fn run_preview_feed( match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { debug!(monitor_id, "launcher preview connected"); - while let Some(item) = stream.get_mut().message().await.transpose() { - match item { - Ok(pkt) => push_preview_packet(&appsrc, pkt), - Err(err) => { - warn!(monitor_id, ?err, "launcher preview stream error"); + set_shared_status(&shared, "Waiting for stream...", true); + loop { + if !active.load(Ordering::Relaxed) { + break; + } + match tokio::time::timeout( + Duration::from_millis(300), + stream.get_mut().message(), + ) + .await + { + Ok(Ok(Some(pkt))) => push_preview_packet(&appsrc, pkt), + Ok(Ok(None)) => { + set_shared_status(&shared, "Preview stream ended.", true); break; } + Ok(Err(err)) => { + warn!(monitor_id, ?err, "launcher preview stream error"); + set_shared_status( + &shared, + format!("Preview stream error: {err}"), + true, + ); + break; + } + Err(_) => continue, } } } - Err(err) => warn!(monitor_id, ?err, "launcher preview rpc failed"), + Err(err) => { + warn!(monitor_id, ?err, "launcher preview rpc failed"); + set_shared_status(&shared, format!("Preview RPC failed: {err}"), true); + } } tokio::time::sleep(Duration::from_millis(750)).await; } @@ -215,6 +353,17 @@ fn run_preview_feed( Ok(()) } +#[cfg(not(coverage))] +fn set_shared_status( + shared: &Arc>, + status: impl Into, + clear: bool, +) { + if let Ok(mut slot) = shared.lock() { + slot.set_status(status, clear); + } +} + #[cfg(not(coverage))] fn build_preview_pipeline() -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink)> { let desc = format!( diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index 009ec2a..20f1bca 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -23,15 +23,39 @@ use { std::cell::{Cell, RefCell}, std::process::Child, std::rc::Rc, - std::time::{Duration, Instant}, + std::time::Duration, }; #[cfg(not(coverage))] enum PowerMessage { - Poll(std::result::Result), + Refresh(std::result::Result), Command(std::result::Result), } +#[cfg(not(coverage))] +fn request_capture_power_refresh( + power_tx: std::sync::mpsc::Sender, + server_addr: String, + delay: Duration, +) { + std::thread::spawn(move || { + if !delay.is_zero() { + std::thread::sleep(delay); + } + let result = fetch_capture_power(&server_addr).map_err(|err| err.to_string()); + let _ = power_tx.send(PowerMessage::Refresh(result)); + }); +} + +#[cfg(not(coverage))] +fn disconnected_capture_note(mode: &str) -> &'static str { + match mode { + "forced-on" => "Relay disconnected. Capture is still forced on for staging.", + "forced-off" => "Relay disconnected. Capture stays intentionally dark until you return to Auto or Force On.", + _ => "Relay disconnected. The server will hold capture briefly, then let it return to standby.", + } +} + #[cfg(not(coverage))] pub fn run_gui_launcher(server_addr: String) -> Result<()> { let app = gtk::Application::builder() @@ -110,7 +134,6 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let (power_tx, power_rx) = std::sync::mpsc::channel::(); let power_request_in_flight = Rc::new(Cell::new(false)); - let last_power_poll = Rc::new(RefCell::new(None::)); { let state = Rc::clone(&state); @@ -139,6 +162,15 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { }); } + if let Some(preview) = preview.as_ref() { + preview.set_session_active(false); + } + request_capture_power_refresh( + power_tx.clone(), + selected_server_addr(&server_entry, server_addr.as_ref()), + Duration::ZERO, + ); + { let state = Rc::clone(&state); let widgets = widgets.clone(); @@ -195,14 +227,38 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let input_control_path = Rc::clone(&input_control_path); let input_state_path = Rc::clone(&input_state_path); let server_addr_fallback = Rc::clone(&server_addr); + let preview = preview.clone(); + let power_tx = power_tx.clone(); let start_button = widgets.start_button.clone(); let widgets_handle = widgets.clone(); start_button.connect_clicked(move |_| { + let server_addr = + selected_server_addr(&server_entry, server_addr_fallback.as_ref()); if child_proc.borrow().is_some() { + stop_child_process(&child_proc); + let power_mode = { + let mut state = state.borrow_mut(); + let _ = state.stop_remote(); + state.capture_power.mode.clone() + }; + if let Some(preview) = preview.as_ref() { + preview.set_server_addr(server_addr.clone()); + preview.set_session_active(false); + } widgets_handle .status_label - .set_text("Relay is already running."); - refresh_launcher_ui(&widgets_handle, &state.borrow(), true); + .set_text(disconnected_capture_note(&power_mode)); + request_capture_power_refresh( + power_tx.clone(), + server_addr.clone(), + Duration::from_millis(250), + ); + request_capture_power_refresh( + power_tx.clone(), + server_addr, + Duration::from_secs(31), + ); + refresh_launcher_ui(&widgets_handle, &state.borrow(), false); return; } { @@ -213,8 +269,6 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { } let _ = std::fs::remove_file(input_control_path.as_path()); let _ = std::fs::remove_file(input_state_path.as_path()); - let server_addr = - selected_server_addr(&server_entry, server_addr_fallback.as_ref()); let launch_state = state.borrow().clone(); let input_toggle_key = selected_toggle_key(&widgets.toggle_key_combo); match spawn_client_process( @@ -227,25 +281,42 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { Ok(child) => { *child_proc.borrow_mut() = Some(child); let _ = state.borrow_mut().start_remote(); + if let Some(preview) = preview.as_ref() { + preview.set_server_addr(server_addr.clone()); + preview.set_session_active(true); + } let routing = routing_name(state.borrow().routing); let power_mode = state.borrow().capture_power.mode.clone(); let message = match power_mode.as_str() { "forced-off" => format!( - "Relay started with inputs routed to {}, but capture is forced off. Return capture to Auto or Force On when you want remote video.", + "Relay connected with inputs routed to {}, but capture is forced off. Return capture to Auto or Force On when you want remote video.", routing ), "forced-on" => format!( - "Relay started with inputs routed to {}. Capture is being held awake for staging.", + "Relay connected with inputs routed to {}. Capture is being held awake and the eye previews are coming online.", routing ), _ => format!( - "Relay started with inputs routed to {}. Capture will wake automatically for previews and live session demand.", + "Relay connected with inputs routed to {}. The eye previews will come up with the live session.", routing ), }; widgets_handle.status_label.set_text(&message); + request_capture_power_refresh( + power_tx.clone(), + server_addr.clone(), + Duration::from_millis(250), + ); + request_capture_power_refresh( + power_tx.clone(), + server_addr, + Duration::from_millis(1250), + ); } Err(err) => { + if let Some(preview) = preview.as_ref() { + preview.set_session_active(false); + } widgets_handle .status_label .set_text(&format!("Relay start failed: {err}")); @@ -259,22 +330,6 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { }); } - { - let state = Rc::clone(&state); - let child_proc = Rc::clone(&child_proc); - let widgets = widgets.clone(); - let stop_button = widgets.stop_button.clone(); - let widgets_handle = widgets.clone(); - stop_button.connect_clicked(move |_| { - stop_child_process(&child_proc); - let _ = state.borrow_mut().stop_remote(); - widgets_handle - .status_label - .set_text("Relay stopped. Local staging remains available."); - refresh_launcher_ui(&widgets_handle, &state.borrow(), false); - }); - } - { let state = Rc::clone(&state); let child_proc = Rc::clone(&child_proc); @@ -581,14 +636,34 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let last_state_marker = Rc::new(RefCell::new(path_marker(input_state_path.as_path()))); let power_request_in_flight = Rc::clone(&power_request_in_flight); - let last_power_poll = Rc::clone(&last_power_poll); + let preview = preview.clone(); + let power_tx = power_tx.clone(); glib::timeout_add_local(Duration::from_millis(180), move || { let child_running = reap_exited_child(&child_proc); if !child_running && state.borrow().remote_active { - let _ = state.borrow_mut().stop_remote(); + let power_mode = { + let mut state = state.borrow_mut(); + let _ = state.stop_remote(); + state.capture_power.mode.clone() + }; + if let Some(preview) = preview.as_ref() { + preview.set_session_active(false); + } widgets .status_label - .set_text("Relay ended. Local staging remains available."); + .set_text(disconnected_capture_note(&power_mode)); + let server_addr = + selected_server_addr(&server_entry, server_addr_fallback.as_ref()); + request_capture_power_refresh( + power_tx.clone(), + server_addr.clone(), + Duration::from_millis(250), + ); + request_capture_power_refresh( + power_tx.clone(), + server_addr, + Duration::from_secs(31), + ); } let next_state_marker = path_marker(input_state_path.as_path()); @@ -617,10 +692,10 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { while let Ok(message) = power_rx.try_recv() { power_request_in_flight.set(false); match message { - PowerMessage::Poll(Ok(power)) => { + PowerMessage::Refresh(Ok(power)) => { state.borrow_mut().set_capture_power(power); } - PowerMessage::Poll(Err(err)) => { + PowerMessage::Refresh(Err(err)) => { state.borrow_mut().set_capture_power(CapturePowerStatus { available: false, enabled: false, @@ -647,24 +722,6 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { } } - let should_poll_power = !power_request_in_flight.get() - && last_power_poll - .borrow() - .map(|stamp| stamp.elapsed() >= Duration::from_millis(1400)) - .unwrap_or(true); - if should_poll_power { - *last_power_poll.borrow_mut() = Some(Instant::now()); - power_request_in_flight.set(true); - let server_addr = - selected_server_addr(&server_entry, server_addr_fallback.as_ref()); - let tx = power_tx.clone(); - std::thread::spawn(move || { - let result = - fetch_capture_power(&server_addr).map_err(|err| err.to_string()); - let _ = tx.send(PowerMessage::Poll(result)); - }); - } - refresh_launcher_ui(&widgets, &state.borrow(), child_running); refresh_test_buttons(&widgets, &mut tests.borrow_mut()); glib::ControlFlow::Continue diff --git a/client/src/launcher/ui_components.rs b/client/src/launcher/ui_components.rs index 168453f..57293ee 100644 --- a/client/src/launcher/ui_components.rs +++ b/client/src/launcher/ui_components.rs @@ -45,7 +45,6 @@ pub struct LauncherWidgets { pub local_test_detail: gtk::Label, pub display_panes: [DisplayPaneWidgets; 2], pub start_button: gtk::Button, - pub stop_button: gtk::Button, pub power_auto_button: gtk::Button, pub power_on_button: gtk::Button, pub power_off_button: gtk::Button, @@ -155,20 +154,13 @@ pub fn build_launcher_view( connection_body.append(&server_entry); let relay_actions_row = gtk::Box::new(gtk::Orientation::Horizontal, 8); - let start_button = gtk::Button::with_label("Start Relay"); + let start_button = gtk::Button::with_label("Connect Relay"); start_button.add_css_class("suggested-action"); start_button.set_hexpand(true); start_button.set_tooltip_text(Some( - "Launch the relay using the staged devices and current input routing.", - )); - let stop_button = gtk::Button::with_label("Stop Relay"); - stop_button.add_css_class("destructive-action"); - stop_button.set_hexpand(true); - stop_button.set_tooltip_text(Some( - "Stop the live relay session. Local staging and previews stay available.", + "Connect to the relay host, bring the staged session online, and start the eye previews.", )); relay_actions_row.append(&start_button); - relay_actions_row.append(&stop_button); connection_body.append(&relay_actions_row); let live_actions_row = gtk::Box::new(gtk::Orientation::Horizontal, 8); @@ -228,10 +220,6 @@ pub fn build_launcher_view( input_toggle_button.set_tooltip_text(Some( "Switch live keyboard and mouse ownership between the local machine and the remote target.", )); - routing_row.append(&input_toggle_button); - routing_body.append(&routing_row); - - let swap_row = gtk::Box::new(gtk::Orientation::Horizontal, 8); let swap_label = gtk::Label::new(Some("Swap key")); swap_label.set_halign(gtk::Align::Start); let toggle_key_combo = gtk::ComboBoxText::new(); @@ -246,9 +234,10 @@ pub fn build_launcher_view( toggle_key_combo.set_tooltip_text(Some( "Single-key live input swap while the relay is running.", )); - swap_row.append(&swap_label); - swap_row.append(&toggle_key_combo); - routing_body.append(&swap_row); + routing_row.append(&input_toggle_button); + routing_row.append(&swap_label); + routing_row.append(&toggle_key_combo); + routing_body.append(&routing_row); sidebar.append(&routing_panel); let (devices_panel, devices_body) = build_panel("Device Staging"); @@ -328,14 +317,7 @@ pub fn build_launcher_view( stage_title.add_css_class("title-4"); stage_title.set_halign(gtk::Align::Start); stage_header.append(&stage_title); - let stage_note = gtk::Label::new(Some( - "Live server-side eye feeds. In Auto mode, open previews and active relay sessions count as capture demand.", - )); - stage_note.add_css_class("dim-label"); - stage_note.set_wrap(true); - stage_note.set_xalign(0.0); stage.append(&stage_header); - stage.append(&stage_note); let display_row = gtk::Box::new(gtk::Orientation::Horizontal, 16); display_row.set_hexpand(true); @@ -454,7 +436,6 @@ pub fn build_launcher_view( local_test_detail, display_panes: [left_pane.clone(), right_pane.clone()], start_button: start_button.clone(), - stop_button: stop_button.clone(), power_auto_button: power_auto_button.clone(), power_on_button: power_on_button.clone(), power_off_button: power_off_button.clone(), @@ -667,7 +648,7 @@ fn build_display_pane(title: &str, capture_path: &str) -> DisplayPaneWidgets { root.append(&stack); let footer = gtk::Box::new(gtk::Orientation::Horizontal, 8); - let stream_status = gtk::Label::new(Some("Waiting for stream...")); + let stream_status = gtk::Label::new(Some("Connect relay to preview.")); stream_status.set_halign(gtk::Align::Start); stream_status.set_hexpand(true); let action_button = gtk::Button::with_label("Break Out"); diff --git a/client/src/launcher/ui_runtime.rs b/client/src/launcher/ui_runtime.rs index b33528e..390ac5f 100644 --- a/client/src/launcher/ui_runtime.rs +++ b/client/src/launcher/ui_runtime.rs @@ -8,13 +8,13 @@ use std::{ }; use super::{ - LAUNCHER_FOCUS_SIGNAL_ENV, device_test::{DeviceTestController, DeviceTestKind}, launcher_focus_signal_path, preview::LauncherPreview, runtime_env_vars, state::{CapturePowerStatus, DisplaySurface, InputRouting, LauncherState}, ui_components::{DisplayPaneWidgets, LauncherWidgets, PopoutWindowHandle}, + LAUNCHER_FOCUS_SIGNAL_ENV, }; pub const INPUT_CONTROL_ENV: &str = "LESAVKA_LAUNCHER_INPUT_CONTROL"; @@ -23,14 +23,11 @@ pub const DEFAULT_INPUT_CONTROL_PATH: &str = "/tmp/lesavka-launcher-input.contro pub const DEFAULT_INPUT_STATE_PATH: &str = "/tmp/lesavka-launcher-input.state"; pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, child_running: bool) { + let relay_live = child_running || state.remote_active; widgets .summary .relay_value - .set_text(if child_running || state.remote_active { - "Running" - } else { - "Stopped" - }); + .set_text(if relay_live { "Running" } else { "Stopped" }); widgets .summary .routing_value @@ -62,14 +59,18 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi .launch_plan_detail .set_text(&launch_plan_detail(state, child_running)); - widgets.start_button.set_label(if child_running { - "Relay Running" + widgets.start_button.set_label(if relay_live { + "Disconnect Relay" } else { - "Start Relay" + "Connect Relay" }); - widgets.start_button.set_sensitive(!child_running); - widgets.stop_button.set_sensitive(child_running); - widgets.clipboard_button.set_sensitive(child_running); + widgets.start_button.set_sensitive(true); + widgets.start_button.set_tooltip_text(Some(if relay_live { + "Disconnect from the relay host, stop the live session, and let capture fall back to grace/standby." + } else { + "Connect to the relay host, start the live session, and bring the eye previews online." + })); + widgets.clipboard_button.set_sensitive(relay_live); widgets.probe_button.set_sensitive(true); widgets.input_toggle_button.set_label(match state.routing { InputRouting::Remote => "Route Inputs To Local", @@ -187,7 +188,7 @@ pub fn open_popout_window( picture.set_can_shrink(true); root.append(&picture); - let stream_status = gtk::Label::new(Some("Waiting for stream...")); + let stream_status = gtk::Label::new(Some("Connect relay to preview.")); stream_status.set_halign(gtk::Align::Start); root.append(&stream_status); @@ -354,7 +355,7 @@ fn launch_plan_title(state: &LauncherState, child_running: bool) -> String { return match state.capture_power.mode.as_str() { "forced-off" => "Relay live, but capture is intentionally dark.".to_string(), "forced-on" => "Relay live with capture held awake.".to_string(), - _ => "Relay live with automatic capture management.".to_string(), + _ => "Relay live with previews tied to the session.".to_string(), }; } @@ -380,15 +381,15 @@ fn launch_plan_detail(state: &LauncherState, child_running: bool) -> String { if child_running || state.remote_active { return match state.capture_power.mode.as_str() { "forced-off" => format!( - "Inputs are routed to {}. Return capture to Auto or Force On when you want the remote eyes and session video to wake up.", + "Inputs are routed to {}. The session is connected, but capture is intentionally dark until you return to Auto or Force On.", capitalize(routing_name(state.routing)) ), "forced-on" => format!( - "Inputs are routed to {}. The relay host is keeping the capture feeds up even without preview demand.", + "Inputs are routed to {}. The relay host is holding capture awake, so the eye previews stay ready even between bursts of activity.", capitalize(routing_name(state.routing)) ), _ => format!( - "Inputs are routed to {}. Live eye previews and session demand will wake capture automatically as needed.", + "Inputs are routed to {}. Connecting the relay also brings the eye previews online, and the server keeps capture awake for the live session.", capitalize(routing_name(state.routing)) ), }; @@ -407,11 +408,11 @@ fn launch_plan_detail(state: &LauncherState, child_running: bool) -> String { .to_string() } "forced-on" => { - "The relay host is already holding capture awake, which is useful for preflight framing checks before the session starts." + "The relay host is already holding capture awake, which is useful when you want the eye feeds ready the moment you connect." .to_string() } _ => { - "Automatic capture mode wakes the remote feeds only while eye previews or the live relay session actually need them." + "When you connect the relay, the eye previews come up with it. Disconnecting returns capture to the server-side grace/standby path." .to_string() } } @@ -641,7 +642,7 @@ mod tests { state.start_remote(); let detail = launch_plan_detail(&state, true); - assert!(detail.contains("Return capture to Auto or Force On")); + assert!(detail.contains("intentionally dark")); } #[test] diff --git a/scripts/ci/hygiene_gate_baseline.json b/scripts/ci/hygiene_gate_baseline.json index e0c296b..d465bae 100644 --- a/scripts/ci/hygiene_gate_baseline.json +++ b/scripts/ci/hygiene_gate_baseline.json @@ -81,9 +81,9 @@ "loc": 69 }, "client/src/launcher/preview.rs": { - "clippy_warnings": 20, - "doc_debt": 6, - "loc": 293 + "clippy_warnings": 24, + "doc_debt": 13, + "loc": 442 }, "client/src/launcher/state.rs": { "clippy_warnings": 14, @@ -92,8 +92,8 @@ }, "client/src/launcher/ui.rs": { "clippy_warnings": 10, - "doc_debt": 1, - "loc": 695 + "doc_debt": 3, + "loc": 752 }, "client/src/launcher/ui_components.rs": { "clippy_warnings": 8, @@ -103,7 +103,7 @@ "client/src/launcher/ui_runtime.rs": { "clippy_warnings": 10, "doc_debt": 20, - "loc": 660 + "loc": 661 }, "client/src/layout.rs": { "clippy_warnings": 6, @@ -178,7 +178,7 @@ "server/src/audio.rs": { "clippy_warnings": 37, "doc_debt": 7, - "loc": 386 + "loc": 397 }, "server/src/bin/lesavka-uvc.real.inc": { "clippy_warnings": 31 @@ -199,9 +199,9 @@ "loc": 200 }, "server/src/capture_power.rs": { - "clippy_warnings": 10, - "doc_debt": 7, - "loc": 338 + "clippy_warnings": 12, + "doc_debt": 10, + "loc": 513 }, "server/src/gadget.rs": { "clippy_warnings": 30, @@ -221,7 +221,7 @@ "server/src/main.rs": { "clippy_warnings": 10, "doc_debt": 13, - "loc": 572 + "loc": 576 }, "server/src/paste.rs": { "clippy_warnings": 6, diff --git a/scripts/ci/quality_gate_baseline.json b/scripts/ci/quality_gate_baseline.json index 0789c64..f66f65b 100644 --- a/scripts/ci/quality_gate_baseline.json +++ b/scripts/ci/quality_gate_baseline.json @@ -58,7 +58,7 @@ }, "client/src/launcher/ui.rs": { "line_percent": 100.0, - "loc": 695 + "loc": 752 }, "client/src/layout.rs": { "line_percent": 97.72727272727273, @@ -110,7 +110,7 @@ }, "server/src/audio.rs": { "line_percent": 100.0, - "loc": 386 + "loc": 397 }, "server/src/bin/lesavka-uvc.rs": { "line_percent": 96.35535307517085, @@ -126,7 +126,7 @@ }, "server/src/capture_power.rs": { "line_percent": 100.0, - "loc": 338 + "loc": 513 }, "server/src/gadget.rs": { "line_percent": 96.875, @@ -138,7 +138,7 @@ }, "server/src/main.rs": { "line_percent": 95.33333333333334, - "loc": 572 + "loc": 576 }, "server/src/paste.rs": { "line_percent": 97.12230215827337, diff --git a/server/src/audio.rs b/server/src/audio.rs index a9b1f6d..6750b77 100644 --- a/server/src/audio.rs +++ b/server/src/audio.rs @@ -384,3 +384,14 @@ impl Voice { let _ = self.appsrc.end_of_stream(); } } + +#[cfg(all(test, coverage))] +mod tests { + use super::Voice; + + #[tokio::test] + async fn coverage_voice_constructor_starts_stub_pipeline() { + let mut voice = Voice::new("coverage-audio").await.expect("voice"); + voice.finish(); + } +} diff --git a/server/src/capture_power.rs b/server/src/capture_power.rs index 1c603ed..703c1ce 100644 --- a/server/src/capture_power.rs +++ b/server/src/capture_power.rs @@ -2,27 +2,34 @@ use lesavka_common::lesavka::CapturePowerState; #[cfg(not(coverage))] use { - anyhow::{Context, Result, anyhow}, + anyhow::{anyhow, Context, Result}, std::process::Command, std::sync::{ - Arc, atomic::{AtomicBool, Ordering}, + Arc, + }, + tokio::{ + sync::Mutex, + time::{Duration, Instant}, }, - tokio::sync::Mutex, tracing::{info, warn}, }; #[cfg(not(coverage))] #[derive(Debug, Default)] struct CapturePowerInner { - active_leases: u32, + preview_leases: u32, + session_leases: u32, manual_override: Option, + session_grace_deadline: Option, + sync_generation: u64, } #[cfg(not(coverage))] #[derive(Debug, Clone)] pub struct CapturePowerManager { unit: Arc, + session_grace: Duration, inner: Arc>, } @@ -30,9 +37,17 @@ pub struct CapturePowerManager { #[derive(Clone)] pub struct CapturePowerLease { manager: CapturePowerManager, + kind: LeaseKind, released: Arc, } +#[cfg(not(coverage))] +#[derive(Clone, Copy, Debug)] +enum LeaseKind { + Preview, + Session, +} + #[cfg(not(coverage))] #[derive(Debug)] struct UnitSnapshot { @@ -41,6 +56,16 @@ struct UnitSnapshot { detail: String, } +#[cfg(not(coverage))] +impl LeaseKind { + fn as_str(self) -> &'static str { + match self { + Self::Preview => "preview", + Self::Session => "session", + } + } +} + #[cfg(not(coverage))] impl CapturePowerManager { pub fn new() -> Self { @@ -50,28 +75,59 @@ impl CapturePowerManager { .unwrap_or_else(|| "relay.service".to_string()); Self { unit: Arc::::from(unit), + session_grace: capture_power_session_grace_from_env(), inner: Arc::new(Mutex::new(CapturePowerInner::default())), } } pub async fn acquire(&self) -> CapturePowerLease { - let (desired, unit, leases, manual_override) = { + self.acquire_kind(LeaseKind::Preview).await + } + + pub async fn acquire_session(&self) -> CapturePowerLease { + self.acquire_kind(LeaseKind::Session).await + } + + async fn acquire_kind(&self, kind: LeaseKind) -> CapturePowerLease { + let (desired, unit, leases, manual_override, grace_remaining) = { let mut inner = self.inner.lock().await; - inner.active_leases = inner.active_leases.saturating_add(1); + match kind { + LeaseKind::Preview => { + inner.preview_leases = inner.preview_leases.saturating_add(1); + } + LeaseKind::Session => { + inner.session_leases = inner.session_leases.saturating_add(1); + if inner.session_grace_deadline.take().is_some() { + inner.sync_generation = inner.sync_generation.saturating_add(1); + } + } + } + let (desired, grace_remaining) = desired_state_and_grace(&inner, Instant::now()); ( - desired_state(&inner), + desired, self.unit.to_string(), - inner.active_leases, + active_leases(&inner), inner.manual_override, + grace_remaining, ) }; if let Err(err) = sync_unit_state(unit.as_str(), desired).await { - warn!(unit = %unit, leases, desired, ?manual_override, ?err, "capture power sync failed on acquire"); + warn!( + unit = %unit, + kind = kind.as_str(), + leases, + desired, + ?manual_override, + ?grace_remaining, + ?err, + "capture power sync failed on acquire" + ); } CapturePowerLease { manager: self.clone(), + kind, released: Arc::new(AtomicBool::new(false)), } } @@ -81,6 +137,8 @@ impl CapturePowerManager { { let mut inner = self.inner.lock().await; inner.manual_override = Some(enabled); + inner.session_grace_deadline = None; + inner.sync_generation = inner.sync_generation.saturating_add(1); } sync_unit_state(unit.as_str(), enabled).await?; @@ -92,7 +150,7 @@ impl CapturePowerManager { let desired = { let mut inner = self.inner.lock().await; inner.manual_override = None; - desired_state(&inner) + desired_state_and_grace(&inner, Instant::now()).0 }; sync_unit_state(unit.as_str(), desired).await?; @@ -100,17 +158,28 @@ impl CapturePowerManager { } pub async fn snapshot(&self) -> Result { - let (active_leases, manual_override) = { + let (active_leases, manual_override, grace_remaining) = { let inner = self.inner.lock().await; - (inner.active_leases, inner.manual_override) + ( + active_leases(&inner), + inner.manual_override, + desired_state_and_grace(&inner, Instant::now()).1, + ) }; let unit = self.unit.to_string(); let snapshot = inspect_unit(unit.as_str()).await?; + let mut detail = snapshot.detail; + if let Some(grace_remaining) = grace_remaining { + detail = format!( + "{detail} • disconnect grace {}s", + grace_remaining.as_secs().max(1) + ); + } Ok(CapturePowerState { available: snapshot.available, enabled: snapshot.enabled, unit, - detail: snapshot.detail, + detail, active_leases, mode: match manual_override { Some(true) => "forced-on".to_string(), @@ -120,36 +189,109 @@ impl CapturePowerManager { }) } - async fn release_one(&self) { - let (desired, unit, leases, manual_override) = { + async fn release_one(&self, kind: LeaseKind) { + let (desired, unit, leases, manual_override, grace_remaining, grace_sync) = { let mut inner = self.inner.lock().await; - inner.active_leases = inner.active_leases.saturating_sub(1); + match kind { + LeaseKind::Preview => { + inner.preview_leases = inner.preview_leases.saturating_sub(1); + } + LeaseKind::Session => { + inner.session_leases = inner.session_leases.saturating_sub(1); + if inner.session_leases == 0 { + let deadline = Instant::now() + self.session_grace; + inner.session_grace_deadline = Some(deadline); + inner.sync_generation = inner.sync_generation.saturating_add(1); + } + } + } + let grace_sync = match kind { + LeaseKind::Session if inner.session_leases == 0 => inner + .session_grace_deadline + .map(|deadline| (inner.sync_generation, deadline)), + _ => None, + }; + let (desired, grace_remaining) = desired_state_and_grace(&inner, Instant::now()); ( - desired_state(&inner), + desired, self.unit.to_string(), - inner.active_leases, + active_leases(&inner), inner.manual_override, + grace_remaining, + grace_sync, ) }; if let Err(err) = sync_unit_state(unit.as_str(), desired).await { warn!( unit = %unit, + kind = kind.as_str(), leases, desired, ?manual_override, + ?grace_remaining, ?err, "capture power sync failed on release" ); } else { info!( unit = %unit, + kind = kind.as_str(), leases, desired, ?manual_override, + ?grace_remaining, "capture power synced" ); } + + if let Some((generation, deadline)) = grace_sync { + self.schedule_grace_sync(generation, deadline); + } + } + + fn schedule_grace_sync(&self, generation: u64, deadline: Instant) { + let manager = self.clone(); + tokio::spawn(async move { + tokio::time::sleep_until(deadline).await; + let (desired, unit, leases, manual_override, grace_remaining, current_generation) = { + let inner = manager.inner.lock().await; + let (desired, grace_remaining) = desired_state_and_grace(&inner, Instant::now()); + ( + desired, + manager.unit.to_string(), + active_leases(&inner), + inner.manual_override, + grace_remaining, + inner.sync_generation, + ) + }; + if current_generation != generation { + return; + } + if let Err(err) = sync_unit_state(unit.as_str(), desired).await { + warn!( + unit = %unit, + generation, + leases, + desired, + ?manual_override, + ?grace_remaining, + ?err, + "capture power sync failed after grace" + ); + } else { + info!( + unit = %unit, + generation, + leases, + desired, + ?manual_override, + ?grace_remaining, + "capture power synced after grace" + ); + } + }); } } @@ -160,15 +302,37 @@ impl Drop for CapturePowerLease { return; } let manager = self.manager.clone(); + let kind = self.kind; tokio::spawn(async move { - manager.release_one().await; + manager.release_one(kind).await; }); } } #[cfg(not(coverage))] -fn desired_state(inner: &CapturePowerInner) -> bool { - inner.manual_override.unwrap_or(inner.active_leases > 0) +fn active_leases(inner: &CapturePowerInner) -> u32 { + inner.preview_leases.saturating_add(inner.session_leases) +} + +#[cfg(not(coverage))] +fn desired_state_and_grace(inner: &CapturePowerInner, now: Instant) -> (bool, Option) { + if let Some(manual_override) = inner.manual_override { + return (manual_override, None); + } + let grace_remaining = inner + .session_grace_deadline + .and_then(|deadline| deadline.checked_duration_since(now)); + let desired = inner.preview_leases > 0 || inner.session_leases > 0 || grace_remaining.is_some(); + (desired, grace_remaining) +} + +#[cfg(not(coverage))] +fn capture_power_session_grace_from_env() -> Duration { + std::env::var("LESAVKA_CAPTURE_POWER_GRACE_SECS") + .ok() + .and_then(|raw| raw.parse::().ok()) + .map(Duration::from_secs) + .unwrap_or_else(|| Duration::from_secs(30)) } #[cfg(not(coverage))] @@ -258,6 +422,10 @@ impl CapturePowerManager { CapturePowerLease } + pub async fn acquire_session(&self) -> CapturePowerLease { + CapturePowerLease + } + pub async fn set_manual(&self, enabled: bool) -> anyhow::Result { Ok(CapturePowerState { available: true, @@ -304,6 +472,13 @@ impl CapturePowerManager { mod tests { use super::*; + #[tokio::test] + async fn coverage_stub_supports_preview_and_session_leases() { + let manager = CapturePowerManager::new(); + let _preview = manager.acquire().await; + let _session = manager.acquire_session().await; + } + #[tokio::test] async fn coverage_stub_reports_auto_snapshot() { let state = CapturePowerManager::new() diff --git a/server/src/main.rs b/server/src/main.rs index ca6efb9..8c98f40 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -13,9 +13,9 @@ use tonic_reflection::server::Builder as ReflBuilder; use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::{ + relay_server::{Relay, RelayServer}, AudioPacket, CapturePowerCommand, CapturePowerState, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, VideoPacket, - relay_server::{Relay, RelayServer}, }; use lesavka_server::{ @@ -227,8 +227,10 @@ impl Relay for Handler { let ms = self.ms.clone(); let gadget = self.gadget.clone(); let did_cycle = self.did_cycle.clone(); + let session_lease = self.capture_power.acquire_session().await; tokio::spawn(async move { + let _session_lease = session_lease; let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if let Err(e) = runtime_support::write_hid_report(&kb, &pkt.data).await { @@ -266,8 +268,10 @@ impl Relay for Handler { let kb = self.kb.clone(); let gadget = self.gadget.clone(); let did_cycle = self.did_cycle.clone(); + let session_lease = self.capture_power.acquire_session().await; tokio::spawn(async move { + let _session_lease = session_lease; let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { if let Err(e) = runtime_support::write_hid_report(&ms, &pkt.data).await {