feat(launcher): tie relay session to previews and capture power

This commit is contained in:
Brad Stein 2026-04-15 04:11:47 -03:00
parent 308ea1bf85
commit b53ea917d7
9 changed files with 540 additions and 162 deletions

View File

@ -9,7 +9,7 @@ use gstreamer_app as gst_app;
#[cfg(not(coverage))]
use gtk::{gdk, glib};
#[cfg(not(coverage))]
use lesavka_common::lesavka::{MonitorRequest, VideoPacket, relay_client::RelayClient};
use lesavka_common::lesavka::{relay_client::RelayClient, MonitorRequest, VideoPacket};
#[cfg(not(coverage))]
use std::sync::atomic::{AtomicBool, Ordering};
#[cfg(not(coverage))]
@ -17,7 +17,7 @@ use std::sync::{Arc, Mutex};
#[cfg(not(coverage))]
use std::time::Duration;
#[cfg(not(coverage))]
use tonic::{Request, transport::Channel};
use tonic::{transport::Channel, Request};
#[cfg(not(coverage))]
use tracing::{debug, warn};
@ -25,9 +25,12 @@ use tracing::{debug, warn};
const PREVIEW_WIDTH: i32 = 640;
#[cfg(not(coverage))]
const PREVIEW_HEIGHT: i32 = 360;
#[cfg(not(coverage))]
const PREVIEW_IDLE_STATUS: &str = "Connect relay to preview.";
#[cfg(not(coverage))]
pub struct LauncherPreview {
server_addr: Arc<Mutex<String>>,
feeds: [PreviewFeed; 2],
}
@ -42,14 +45,28 @@ pub struct PreviewBinding {
impl LauncherPreview {
pub fn new(server_addr: String) -> Result<Self> {
gst::init().context("initialising preview gstreamer")?;
let server_addr = Arc::new(Mutex::new(server_addr));
Ok(Self {
server_addr: Arc::clone(&server_addr),
feeds: [
PreviewFeed::spawn(server_addr.clone(), 0)?,
PreviewFeed::spawn(Arc::clone(&server_addr), 0)?,
PreviewFeed::spawn(server_addr, 1)?,
],
})
}
pub fn set_server_addr(&self, server_addr: String) {
if let Ok(mut slot) = self.server_addr.lock() {
*slot = server_addr;
}
}
pub fn set_session_active(&self, active: bool) {
for feed in &self.feeds {
feed.set_active(active);
}
}
pub fn install_on_picture(
&self,
monitor_id: usize,
@ -75,20 +92,78 @@ impl PreviewBinding {
#[cfg(not(coverage))]
struct PreviewFeed {
latest: Arc<Mutex<Option<PreviewFrame>>>,
shared: Arc<Mutex<SharedPreviewState>>,
active: Arc<AtomicBool>,
}
#[cfg(not(coverage))]
struct SharedPreviewState {
latest: Option<PreviewFrame>,
status: String,
generation: u64,
clear_picture: bool,
}
#[cfg(not(coverage))]
impl SharedPreviewState {
fn new() -> Self {
Self {
latest: None,
status: PREVIEW_IDLE_STATUS.to_string(),
generation: 1,
clear_picture: true,
}
}
fn set_status(&mut self, status: impl Into<String>, clear_picture: bool) {
let status = status.into();
let changed = self.status != status || clear_picture;
self.status = status;
if clear_picture {
self.latest = None;
self.clear_picture = true;
}
if changed {
self.generation = self.generation.saturating_add(1);
}
}
fn push_frame(&mut self, frame: PreviewFrame) {
self.latest = Some(frame);
self.clear_picture = false;
if self.status != "Live" {
self.status = "Live".to_string();
self.generation = self.generation.saturating_add(1);
}
}
}
#[cfg(not(coverage))]
impl PreviewFeed {
fn spawn(server_addr: String, monitor_id: u32) -> Result<Self> {
let latest = Arc::new(Mutex::new(None));
let store = Arc::clone(&latest);
fn spawn(server_addr: Arc<Mutex<String>>, monitor_id: u32) -> Result<Self> {
let shared = Arc::new(Mutex::new(SharedPreviewState::new()));
let active = Arc::new(AtomicBool::new(false));
let shared_state = Arc::clone(&shared);
let active_flag = Arc::clone(&active);
std::thread::spawn(move || {
if let Err(err) = run_preview_feed(server_addr, monitor_id, store) {
if let Err(err) = run_preview_feed(server_addr, monitor_id, active_flag, shared_state) {
warn!(monitor_id, ?err, "launcher preview feed exited");
}
});
Ok(Self { latest })
Ok(Self { shared, active })
}
fn set_active(&self, active: bool) {
self.active.store(active, Ordering::Relaxed);
if !active {
self.replace_status(PREVIEW_IDLE_STATUS, true);
}
}
fn replace_status(&self, status: impl Into<String>, clear_picture: bool) {
if let Ok(mut shared) = self.shared.lock() {
shared.set_status(status, clear_picture);
}
}
fn install_on_picture(
@ -98,11 +173,12 @@ impl PreviewFeed {
) -> PreviewBinding {
let picture = picture.clone();
let status_label = status_label.clone();
let latest = Arc::clone(&self.latest);
let shared = Arc::clone(&self.shared);
let enabled = Arc::new(AtomicBool::new(true));
let alive = Arc::new(AtomicBool::new(true));
let enabled_flag = Arc::clone(&enabled);
let alive_flag = Arc::clone(&alive);
let mut last_generation = 0_u64;
glib::timeout_add_local(Duration::from_millis(120), move || {
if !alive_flag.load(Ordering::Relaxed) {
return glib::ControlFlow::Break;
@ -110,8 +186,23 @@ impl PreviewFeed {
if !enabled_flag.load(Ordering::Relaxed) {
return glib::ControlFlow::Continue;
}
let next = latest.lock().ok().and_then(|mut slot| slot.take());
if let Some(frame) = next {
let (frame, status, generation, clear_picture) = match shared.lock() {
Ok(mut slot) => {
let frame = slot.latest.take();
let status = slot.status.clone();
let generation = slot.generation;
let clear_picture = slot.clear_picture;
slot.clear_picture = false;
(frame, status, generation, clear_picture)
}
Err(_) => return glib::ControlFlow::Continue,
};
if clear_picture {
picture.set_paintable(Option::<&gdk::Paintable>::None);
}
if let Some(frame) = frame {
let bytes = glib::Bytes::from_owned(frame.rgba);
let texture = gdk::MemoryTexture::new(
frame.width,
@ -121,7 +212,10 @@ impl PreviewFeed {
frame.stride,
);
picture.set_paintable(Some(&texture));
status_label.set_text("Live");
}
if generation != last_generation {
status_label.set_text(&status);
last_generation = generation;
}
glib::ControlFlow::Continue
});
@ -139,9 +233,10 @@ struct PreviewFrame {
#[cfg(not(coverage))]
fn run_preview_feed(
server_addr: String,
server_addr: Arc<Mutex<String>>,
monitor_id: u32,
latest: Arc<Mutex<Option<PreviewFrame>>>,
active: Arc<AtomicBool>,
shared: Arc<Mutex<SharedPreviewState>>,
) -> Result<()> {
let (pipeline, appsrc, appsink) = build_preview_pipeline()?;
pipeline
@ -149,15 +244,13 @@ fn run_preview_feed(
.context("starting launcher preview pipeline")?;
{
let latest = Arc::clone(&latest);
let shared = Arc::clone(&shared);
let appsink = appsink.clone();
std::thread::spawn(move || {
loop {
if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) {
if let Some(frame) = sample_to_frame(&sample) {
if let Ok(mut slot) = latest.lock() {
*slot = Some(frame);
}
std::thread::spawn(move || loop {
if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) {
if let Some(frame) = sample_to_frame(&sample) {
if let Ok(mut slot) = shared.lock() {
slot.push_frame(frame);
}
}
}
@ -171,21 +264,44 @@ fn run_preview_feed(
let _ = rt.block_on(async move {
loop {
let channel = match Channel::from_shared(server_addr.clone()) {
if !active.load(Ordering::Relaxed) {
set_shared_status(&shared, PREVIEW_IDLE_STATUS, true);
tokio::time::sleep(Duration::from_millis(150)).await;
continue;
}
set_shared_status(&shared, "Connecting relay preview...", true);
let current_addr = match server_addr.lock() {
Ok(value) => value.clone(),
Err(_) => {
set_shared_status(&shared, "Preview address is unavailable.", true);
tokio::time::sleep(Duration::from_millis(750)).await;
continue;
}
};
let channel = match Channel::from_shared(current_addr.clone()) {
Ok(endpoint) => match endpoint.tcp_nodelay(true).connect().await {
Ok(channel) => channel,
Err(err) => {
warn!(monitor_id, ?err, "launcher preview connect failed");
set_shared_status(
&shared,
format!("Preview host is unavailable: {err}"),
true,
);
tokio::time::sleep(Duration::from_millis(750)).await;
continue;
}
},
Err(err) => {
warn!(monitor_id, ?err, "launcher preview endpoint invalid");
set_shared_status(&shared, format!("Preview address is invalid: {err}"), true);
tokio::time::sleep(Duration::from_millis(750)).await;
continue;
}
};
let mut cli = RelayClient::new(channel);
let req = MonitorRequest {
id: monitor_id,
@ -194,17 +310,39 @@ fn run_preview_feed(
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
debug!(monitor_id, "launcher preview connected");
while let Some(item) = stream.get_mut().message().await.transpose() {
match item {
Ok(pkt) => push_preview_packet(&appsrc, pkt),
Err(err) => {
warn!(monitor_id, ?err, "launcher preview stream error");
set_shared_status(&shared, "Waiting for stream...", true);
loop {
if !active.load(Ordering::Relaxed) {
break;
}
match tokio::time::timeout(
Duration::from_millis(300),
stream.get_mut().message(),
)
.await
{
Ok(Ok(Some(pkt))) => push_preview_packet(&appsrc, pkt),
Ok(Ok(None)) => {
set_shared_status(&shared, "Preview stream ended.", true);
break;
}
Ok(Err(err)) => {
warn!(monitor_id, ?err, "launcher preview stream error");
set_shared_status(
&shared,
format!("Preview stream error: {err}"),
true,
);
break;
}
Err(_) => continue,
}
}
}
Err(err) => warn!(monitor_id, ?err, "launcher preview rpc failed"),
Err(err) => {
warn!(monitor_id, ?err, "launcher preview rpc failed");
set_shared_status(&shared, format!("Preview RPC failed: {err}"), true);
}
}
tokio::time::sleep(Duration::from_millis(750)).await;
}
@ -215,6 +353,17 @@ fn run_preview_feed(
Ok(())
}
#[cfg(not(coverage))]
fn set_shared_status(
shared: &Arc<Mutex<SharedPreviewState>>,
status: impl Into<String>,
clear: bool,
) {
if let Ok(mut slot) = shared.lock() {
slot.set_status(status, clear);
}
}
#[cfg(not(coverage))]
fn build_preview_pipeline() -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink)> {
let desc = format!(

View File

@ -23,15 +23,39 @@ use {
std::cell::{Cell, RefCell},
std::process::Child,
std::rc::Rc,
std::time::{Duration, Instant},
std::time::Duration,
};
#[cfg(not(coverage))]
enum PowerMessage {
Poll(std::result::Result<CapturePowerStatus, String>),
Refresh(std::result::Result<CapturePowerStatus, String>),
Command(std::result::Result<CapturePowerStatus, String>),
}
#[cfg(not(coverage))]
fn request_capture_power_refresh(
power_tx: std::sync::mpsc::Sender<PowerMessage>,
server_addr: String,
delay: Duration,
) {
std::thread::spawn(move || {
if !delay.is_zero() {
std::thread::sleep(delay);
}
let result = fetch_capture_power(&server_addr).map_err(|err| err.to_string());
let _ = power_tx.send(PowerMessage::Refresh(result));
});
}
#[cfg(not(coverage))]
fn disconnected_capture_note(mode: &str) -> &'static str {
match mode {
"forced-on" => "Relay disconnected. Capture is still forced on for staging.",
"forced-off" => "Relay disconnected. Capture stays intentionally dark until you return to Auto or Force On.",
_ => "Relay disconnected. The server will hold capture briefly, then let it return to standby.",
}
}
#[cfg(not(coverage))]
pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let app = gtk::Application::builder()
@ -110,7 +134,6 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let (power_tx, power_rx) = std::sync::mpsc::channel::<PowerMessage>();
let power_request_in_flight = Rc::new(Cell::new(false));
let last_power_poll = Rc::new(RefCell::new(None::<Instant>));
{
let state = Rc::clone(&state);
@ -139,6 +162,15 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
});
}
if let Some(preview) = preview.as_ref() {
preview.set_session_active(false);
}
request_capture_power_refresh(
power_tx.clone(),
selected_server_addr(&server_entry, server_addr.as_ref()),
Duration::ZERO,
);
{
let state = Rc::clone(&state);
let widgets = widgets.clone();
@ -195,14 +227,38 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let input_control_path = Rc::clone(&input_control_path);
let input_state_path = Rc::clone(&input_state_path);
let server_addr_fallback = Rc::clone(&server_addr);
let preview = preview.clone();
let power_tx = power_tx.clone();
let start_button = widgets.start_button.clone();
let widgets_handle = widgets.clone();
start_button.connect_clicked(move |_| {
let server_addr =
selected_server_addr(&server_entry, server_addr_fallback.as_ref());
if child_proc.borrow().is_some() {
stop_child_process(&child_proc);
let power_mode = {
let mut state = state.borrow_mut();
let _ = state.stop_remote();
state.capture_power.mode.clone()
};
if let Some(preview) = preview.as_ref() {
preview.set_server_addr(server_addr.clone());
preview.set_session_active(false);
}
widgets_handle
.status_label
.set_text("Relay is already running.");
refresh_launcher_ui(&widgets_handle, &state.borrow(), true);
.set_text(disconnected_capture_note(&power_mode));
request_capture_power_refresh(
power_tx.clone(),
server_addr.clone(),
Duration::from_millis(250),
);
request_capture_power_refresh(
power_tx.clone(),
server_addr,
Duration::from_secs(31),
);
refresh_launcher_ui(&widgets_handle, &state.borrow(), false);
return;
}
{
@ -213,8 +269,6 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
}
let _ = std::fs::remove_file(input_control_path.as_path());
let _ = std::fs::remove_file(input_state_path.as_path());
let server_addr =
selected_server_addr(&server_entry, server_addr_fallback.as_ref());
let launch_state = state.borrow().clone();
let input_toggle_key = selected_toggle_key(&widgets.toggle_key_combo);
match spawn_client_process(
@ -227,25 +281,42 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
Ok(child) => {
*child_proc.borrow_mut() = Some(child);
let _ = state.borrow_mut().start_remote();
if let Some(preview) = preview.as_ref() {
preview.set_server_addr(server_addr.clone());
preview.set_session_active(true);
}
let routing = routing_name(state.borrow().routing);
let power_mode = state.borrow().capture_power.mode.clone();
let message = match power_mode.as_str() {
"forced-off" => format!(
"Relay started with inputs routed to {}, but capture is forced off. Return capture to Auto or Force On when you want remote video.",
"Relay connected with inputs routed to {}, but capture is forced off. Return capture to Auto or Force On when you want remote video.",
routing
),
"forced-on" => format!(
"Relay started with inputs routed to {}. Capture is being held awake for staging.",
"Relay connected with inputs routed to {}. Capture is being held awake and the eye previews are coming online.",
routing
),
_ => format!(
"Relay started with inputs routed to {}. Capture will wake automatically for previews and live session demand.",
"Relay connected with inputs routed to {}. The eye previews will come up with the live session.",
routing
),
};
widgets_handle.status_label.set_text(&message);
request_capture_power_refresh(
power_tx.clone(),
server_addr.clone(),
Duration::from_millis(250),
);
request_capture_power_refresh(
power_tx.clone(),
server_addr,
Duration::from_millis(1250),
);
}
Err(err) => {
if let Some(preview) = preview.as_ref() {
preview.set_session_active(false);
}
widgets_handle
.status_label
.set_text(&format!("Relay start failed: {err}"));
@ -259,22 +330,6 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
});
}
{
let state = Rc::clone(&state);
let child_proc = Rc::clone(&child_proc);
let widgets = widgets.clone();
let stop_button = widgets.stop_button.clone();
let widgets_handle = widgets.clone();
stop_button.connect_clicked(move |_| {
stop_child_process(&child_proc);
let _ = state.borrow_mut().stop_remote();
widgets_handle
.status_label
.set_text("Relay stopped. Local staging remains available.");
refresh_launcher_ui(&widgets_handle, &state.borrow(), false);
});
}
{
let state = Rc::clone(&state);
let child_proc = Rc::clone(&child_proc);
@ -581,14 +636,34 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let last_state_marker =
Rc::new(RefCell::new(path_marker(input_state_path.as_path())));
let power_request_in_flight = Rc::clone(&power_request_in_flight);
let last_power_poll = Rc::clone(&last_power_poll);
let preview = preview.clone();
let power_tx = power_tx.clone();
glib::timeout_add_local(Duration::from_millis(180), move || {
let child_running = reap_exited_child(&child_proc);
if !child_running && state.borrow().remote_active {
let _ = state.borrow_mut().stop_remote();
let power_mode = {
let mut state = state.borrow_mut();
let _ = state.stop_remote();
state.capture_power.mode.clone()
};
if let Some(preview) = preview.as_ref() {
preview.set_session_active(false);
}
widgets
.status_label
.set_text("Relay ended. Local staging remains available.");
.set_text(disconnected_capture_note(&power_mode));
let server_addr =
selected_server_addr(&server_entry, server_addr_fallback.as_ref());
request_capture_power_refresh(
power_tx.clone(),
server_addr.clone(),
Duration::from_millis(250),
);
request_capture_power_refresh(
power_tx.clone(),
server_addr,
Duration::from_secs(31),
);
}
let next_state_marker = path_marker(input_state_path.as_path());
@ -617,10 +692,10 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
while let Ok(message) = power_rx.try_recv() {
power_request_in_flight.set(false);
match message {
PowerMessage::Poll(Ok(power)) => {
PowerMessage::Refresh(Ok(power)) => {
state.borrow_mut().set_capture_power(power);
}
PowerMessage::Poll(Err(err)) => {
PowerMessage::Refresh(Err(err)) => {
state.borrow_mut().set_capture_power(CapturePowerStatus {
available: false,
enabled: false,
@ -647,24 +722,6 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
}
}
let should_poll_power = !power_request_in_flight.get()
&& last_power_poll
.borrow()
.map(|stamp| stamp.elapsed() >= Duration::from_millis(1400))
.unwrap_or(true);
if should_poll_power {
*last_power_poll.borrow_mut() = Some(Instant::now());
power_request_in_flight.set(true);
let server_addr =
selected_server_addr(&server_entry, server_addr_fallback.as_ref());
let tx = power_tx.clone();
std::thread::spawn(move || {
let result =
fetch_capture_power(&server_addr).map_err(|err| err.to_string());
let _ = tx.send(PowerMessage::Poll(result));
});
}
refresh_launcher_ui(&widgets, &state.borrow(), child_running);
refresh_test_buttons(&widgets, &mut tests.borrow_mut());
glib::ControlFlow::Continue

View File

@ -45,7 +45,6 @@ pub struct LauncherWidgets {
pub local_test_detail: gtk::Label,
pub display_panes: [DisplayPaneWidgets; 2],
pub start_button: gtk::Button,
pub stop_button: gtk::Button,
pub power_auto_button: gtk::Button,
pub power_on_button: gtk::Button,
pub power_off_button: gtk::Button,
@ -155,20 +154,13 @@ pub fn build_launcher_view(
connection_body.append(&server_entry);
let relay_actions_row = gtk::Box::new(gtk::Orientation::Horizontal, 8);
let start_button = gtk::Button::with_label("Start Relay");
let start_button = gtk::Button::with_label("Connect Relay");
start_button.add_css_class("suggested-action");
start_button.set_hexpand(true);
start_button.set_tooltip_text(Some(
"Launch the relay using the staged devices and current input routing.",
));
let stop_button = gtk::Button::with_label("Stop Relay");
stop_button.add_css_class("destructive-action");
stop_button.set_hexpand(true);
stop_button.set_tooltip_text(Some(
"Stop the live relay session. Local staging and previews stay available.",
"Connect to the relay host, bring the staged session online, and start the eye previews.",
));
relay_actions_row.append(&start_button);
relay_actions_row.append(&stop_button);
connection_body.append(&relay_actions_row);
let live_actions_row = gtk::Box::new(gtk::Orientation::Horizontal, 8);
@ -228,10 +220,6 @@ pub fn build_launcher_view(
input_toggle_button.set_tooltip_text(Some(
"Switch live keyboard and mouse ownership between the local machine and the remote target.",
));
routing_row.append(&input_toggle_button);
routing_body.append(&routing_row);
let swap_row = gtk::Box::new(gtk::Orientation::Horizontal, 8);
let swap_label = gtk::Label::new(Some("Swap key"));
swap_label.set_halign(gtk::Align::Start);
let toggle_key_combo = gtk::ComboBoxText::new();
@ -246,9 +234,10 @@ pub fn build_launcher_view(
toggle_key_combo.set_tooltip_text(Some(
"Single-key live input swap while the relay is running.",
));
swap_row.append(&swap_label);
swap_row.append(&toggle_key_combo);
routing_body.append(&swap_row);
routing_row.append(&input_toggle_button);
routing_row.append(&swap_label);
routing_row.append(&toggle_key_combo);
routing_body.append(&routing_row);
sidebar.append(&routing_panel);
let (devices_panel, devices_body) = build_panel("Device Staging");
@ -328,14 +317,7 @@ pub fn build_launcher_view(
stage_title.add_css_class("title-4");
stage_title.set_halign(gtk::Align::Start);
stage_header.append(&stage_title);
let stage_note = gtk::Label::new(Some(
"Live server-side eye feeds. In Auto mode, open previews and active relay sessions count as capture demand.",
));
stage_note.add_css_class("dim-label");
stage_note.set_wrap(true);
stage_note.set_xalign(0.0);
stage.append(&stage_header);
stage.append(&stage_note);
let display_row = gtk::Box::new(gtk::Orientation::Horizontal, 16);
display_row.set_hexpand(true);
@ -454,7 +436,6 @@ pub fn build_launcher_view(
local_test_detail,
display_panes: [left_pane.clone(), right_pane.clone()],
start_button: start_button.clone(),
stop_button: stop_button.clone(),
power_auto_button: power_auto_button.clone(),
power_on_button: power_on_button.clone(),
power_off_button: power_off_button.clone(),
@ -667,7 +648,7 @@ fn build_display_pane(title: &str, capture_path: &str) -> DisplayPaneWidgets {
root.append(&stack);
let footer = gtk::Box::new(gtk::Orientation::Horizontal, 8);
let stream_status = gtk::Label::new(Some("Waiting for stream..."));
let stream_status = gtk::Label::new(Some("Connect relay to preview."));
stream_status.set_halign(gtk::Align::Start);
stream_status.set_hexpand(true);
let action_button = gtk::Button::with_label("Break Out");

View File

@ -8,13 +8,13 @@ use std::{
};
use super::{
LAUNCHER_FOCUS_SIGNAL_ENV,
device_test::{DeviceTestController, DeviceTestKind},
launcher_focus_signal_path,
preview::LauncherPreview,
runtime_env_vars,
state::{CapturePowerStatus, DisplaySurface, InputRouting, LauncherState},
ui_components::{DisplayPaneWidgets, LauncherWidgets, PopoutWindowHandle},
LAUNCHER_FOCUS_SIGNAL_ENV,
};
pub const INPUT_CONTROL_ENV: &str = "LESAVKA_LAUNCHER_INPUT_CONTROL";
@ -23,14 +23,11 @@ pub const DEFAULT_INPUT_CONTROL_PATH: &str = "/tmp/lesavka-launcher-input.contro
pub const DEFAULT_INPUT_STATE_PATH: &str = "/tmp/lesavka-launcher-input.state";
pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, child_running: bool) {
let relay_live = child_running || state.remote_active;
widgets
.summary
.relay_value
.set_text(if child_running || state.remote_active {
"Running"
} else {
"Stopped"
});
.set_text(if relay_live { "Running" } else { "Stopped" });
widgets
.summary
.routing_value
@ -62,14 +59,18 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi
.launch_plan_detail
.set_text(&launch_plan_detail(state, child_running));
widgets.start_button.set_label(if child_running {
"Relay Running"
widgets.start_button.set_label(if relay_live {
"Disconnect Relay"
} else {
"Start Relay"
"Connect Relay"
});
widgets.start_button.set_sensitive(!child_running);
widgets.stop_button.set_sensitive(child_running);
widgets.clipboard_button.set_sensitive(child_running);
widgets.start_button.set_sensitive(true);
widgets.start_button.set_tooltip_text(Some(if relay_live {
"Disconnect from the relay host, stop the live session, and let capture fall back to grace/standby."
} else {
"Connect to the relay host, start the live session, and bring the eye previews online."
}));
widgets.clipboard_button.set_sensitive(relay_live);
widgets.probe_button.set_sensitive(true);
widgets.input_toggle_button.set_label(match state.routing {
InputRouting::Remote => "Route Inputs To Local",
@ -187,7 +188,7 @@ pub fn open_popout_window(
picture.set_can_shrink(true);
root.append(&picture);
let stream_status = gtk::Label::new(Some("Waiting for stream..."));
let stream_status = gtk::Label::new(Some("Connect relay to preview."));
stream_status.set_halign(gtk::Align::Start);
root.append(&stream_status);
@ -354,7 +355,7 @@ fn launch_plan_title(state: &LauncherState, child_running: bool) -> String {
return match state.capture_power.mode.as_str() {
"forced-off" => "Relay live, but capture is intentionally dark.".to_string(),
"forced-on" => "Relay live with capture held awake.".to_string(),
_ => "Relay live with automatic capture management.".to_string(),
_ => "Relay live with previews tied to the session.".to_string(),
};
}
@ -380,15 +381,15 @@ fn launch_plan_detail(state: &LauncherState, child_running: bool) -> String {
if child_running || state.remote_active {
return match state.capture_power.mode.as_str() {
"forced-off" => format!(
"Inputs are routed to {}. Return capture to Auto or Force On when you want the remote eyes and session video to wake up.",
"Inputs are routed to {}. The session is connected, but capture is intentionally dark until you return to Auto or Force On.",
capitalize(routing_name(state.routing))
),
"forced-on" => format!(
"Inputs are routed to {}. The relay host is keeping the capture feeds up even without preview demand.",
"Inputs are routed to {}. The relay host is holding capture awake, so the eye previews stay ready even between bursts of activity.",
capitalize(routing_name(state.routing))
),
_ => format!(
"Inputs are routed to {}. Live eye previews and session demand will wake capture automatically as needed.",
"Inputs are routed to {}. Connecting the relay also brings the eye previews online, and the server keeps capture awake for the live session.",
capitalize(routing_name(state.routing))
),
};
@ -407,11 +408,11 @@ fn launch_plan_detail(state: &LauncherState, child_running: bool) -> String {
.to_string()
}
"forced-on" => {
"The relay host is already holding capture awake, which is useful for preflight framing checks before the session starts."
"The relay host is already holding capture awake, which is useful when you want the eye feeds ready the moment you connect."
.to_string()
}
_ => {
"Automatic capture mode wakes the remote feeds only while eye previews or the live relay session actually need them."
"When you connect the relay, the eye previews come up with it. Disconnecting returns capture to the server-side grace/standby path."
.to_string()
}
}
@ -641,7 +642,7 @@ mod tests {
state.start_remote();
let detail = launch_plan_detail(&state, true);
assert!(detail.contains("Return capture to Auto or Force On"));
assert!(detail.contains("intentionally dark"));
}
#[test]

View File

@ -81,9 +81,9 @@
"loc": 69
},
"client/src/launcher/preview.rs": {
"clippy_warnings": 20,
"doc_debt": 6,
"loc": 293
"clippy_warnings": 24,
"doc_debt": 13,
"loc": 442
},
"client/src/launcher/state.rs": {
"clippy_warnings": 14,
@ -92,8 +92,8 @@
},
"client/src/launcher/ui.rs": {
"clippy_warnings": 10,
"doc_debt": 1,
"loc": 695
"doc_debt": 3,
"loc": 752
},
"client/src/launcher/ui_components.rs": {
"clippy_warnings": 8,
@ -103,7 +103,7 @@
"client/src/launcher/ui_runtime.rs": {
"clippy_warnings": 10,
"doc_debt": 20,
"loc": 660
"loc": 661
},
"client/src/layout.rs": {
"clippy_warnings": 6,
@ -178,7 +178,7 @@
"server/src/audio.rs": {
"clippy_warnings": 37,
"doc_debt": 7,
"loc": 386
"loc": 397
},
"server/src/bin/lesavka-uvc.real.inc": {
"clippy_warnings": 31
@ -199,9 +199,9 @@
"loc": 200
},
"server/src/capture_power.rs": {
"clippy_warnings": 10,
"doc_debt": 7,
"loc": 338
"clippy_warnings": 12,
"doc_debt": 10,
"loc": 513
},
"server/src/gadget.rs": {
"clippy_warnings": 30,
@ -221,7 +221,7 @@
"server/src/main.rs": {
"clippy_warnings": 10,
"doc_debt": 13,
"loc": 572
"loc": 576
},
"server/src/paste.rs": {
"clippy_warnings": 6,

View File

@ -58,7 +58,7 @@
},
"client/src/launcher/ui.rs": {
"line_percent": 100.0,
"loc": 695
"loc": 752
},
"client/src/layout.rs": {
"line_percent": 97.72727272727273,
@ -110,7 +110,7 @@
},
"server/src/audio.rs": {
"line_percent": 100.0,
"loc": 386
"loc": 397
},
"server/src/bin/lesavka-uvc.rs": {
"line_percent": 96.35535307517085,
@ -126,7 +126,7 @@
},
"server/src/capture_power.rs": {
"line_percent": 100.0,
"loc": 338
"loc": 513
},
"server/src/gadget.rs": {
"line_percent": 96.875,
@ -138,7 +138,7 @@
},
"server/src/main.rs": {
"line_percent": 95.33333333333334,
"loc": 572
"loc": 576
},
"server/src/paste.rs": {
"line_percent": 97.12230215827337,

View File

@ -384,3 +384,14 @@ impl Voice {
let _ = self.appsrc.end_of_stream();
}
}
#[cfg(all(test, coverage))]
mod tests {
use super::Voice;
#[tokio::test]
async fn coverage_voice_constructor_starts_stub_pipeline() {
let mut voice = Voice::new("coverage-audio").await.expect("voice");
voice.finish();
}
}

View File

@ -2,27 +2,34 @@ use lesavka_common::lesavka::CapturePowerState;
#[cfg(not(coverage))]
use {
anyhow::{Context, Result, anyhow},
anyhow::{anyhow, Context, Result},
std::process::Command,
std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
Arc,
},
tokio::{
sync::Mutex,
time::{Duration, Instant},
},
tokio::sync::Mutex,
tracing::{info, warn},
};
#[cfg(not(coverage))]
#[derive(Debug, Default)]
struct CapturePowerInner {
active_leases: u32,
preview_leases: u32,
session_leases: u32,
manual_override: Option<bool>,
session_grace_deadline: Option<Instant>,
sync_generation: u64,
}
#[cfg(not(coverage))]
#[derive(Debug, Clone)]
pub struct CapturePowerManager {
unit: Arc<str>,
session_grace: Duration,
inner: Arc<Mutex<CapturePowerInner>>,
}
@ -30,9 +37,17 @@ pub struct CapturePowerManager {
#[derive(Clone)]
pub struct CapturePowerLease {
manager: CapturePowerManager,
kind: LeaseKind,
released: Arc<AtomicBool>,
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug)]
enum LeaseKind {
Preview,
Session,
}
#[cfg(not(coverage))]
#[derive(Debug)]
struct UnitSnapshot {
@ -41,6 +56,16 @@ struct UnitSnapshot {
detail: String,
}
#[cfg(not(coverage))]
impl LeaseKind {
fn as_str(self) -> &'static str {
match self {
Self::Preview => "preview",
Self::Session => "session",
}
}
}
#[cfg(not(coverage))]
impl CapturePowerManager {
pub fn new() -> Self {
@ -50,28 +75,59 @@ impl CapturePowerManager {
.unwrap_or_else(|| "relay.service".to_string());
Self {
unit: Arc::<str>::from(unit),
session_grace: capture_power_session_grace_from_env(),
inner: Arc::new(Mutex::new(CapturePowerInner::default())),
}
}
pub async fn acquire(&self) -> CapturePowerLease {
let (desired, unit, leases, manual_override) = {
self.acquire_kind(LeaseKind::Preview).await
}
pub async fn acquire_session(&self) -> CapturePowerLease {
self.acquire_kind(LeaseKind::Session).await
}
async fn acquire_kind(&self, kind: LeaseKind) -> CapturePowerLease {
let (desired, unit, leases, manual_override, grace_remaining) = {
let mut inner = self.inner.lock().await;
inner.active_leases = inner.active_leases.saturating_add(1);
match kind {
LeaseKind::Preview => {
inner.preview_leases = inner.preview_leases.saturating_add(1);
}
LeaseKind::Session => {
inner.session_leases = inner.session_leases.saturating_add(1);
if inner.session_grace_deadline.take().is_some() {
inner.sync_generation = inner.sync_generation.saturating_add(1);
}
}
}
let (desired, grace_remaining) = desired_state_and_grace(&inner, Instant::now());
(
desired_state(&inner),
desired,
self.unit.to_string(),
inner.active_leases,
active_leases(&inner),
inner.manual_override,
grace_remaining,
)
};
if let Err(err) = sync_unit_state(unit.as_str(), desired).await {
warn!(unit = %unit, leases, desired, ?manual_override, ?err, "capture power sync failed on acquire");
warn!(
unit = %unit,
kind = kind.as_str(),
leases,
desired,
?manual_override,
?grace_remaining,
?err,
"capture power sync failed on acquire"
);
}
CapturePowerLease {
manager: self.clone(),
kind,
released: Arc::new(AtomicBool::new(false)),
}
}
@ -81,6 +137,8 @@ impl CapturePowerManager {
{
let mut inner = self.inner.lock().await;
inner.manual_override = Some(enabled);
inner.session_grace_deadline = None;
inner.sync_generation = inner.sync_generation.saturating_add(1);
}
sync_unit_state(unit.as_str(), enabled).await?;
@ -92,7 +150,7 @@ impl CapturePowerManager {
let desired = {
let mut inner = self.inner.lock().await;
inner.manual_override = None;
desired_state(&inner)
desired_state_and_grace(&inner, Instant::now()).0
};
sync_unit_state(unit.as_str(), desired).await?;
@ -100,17 +158,28 @@ impl CapturePowerManager {
}
pub async fn snapshot(&self) -> Result<CapturePowerState> {
let (active_leases, manual_override) = {
let (active_leases, manual_override, grace_remaining) = {
let inner = self.inner.lock().await;
(inner.active_leases, inner.manual_override)
(
active_leases(&inner),
inner.manual_override,
desired_state_and_grace(&inner, Instant::now()).1,
)
};
let unit = self.unit.to_string();
let snapshot = inspect_unit(unit.as_str()).await?;
let mut detail = snapshot.detail;
if let Some(grace_remaining) = grace_remaining {
detail = format!(
"{detail} • disconnect grace {}s",
grace_remaining.as_secs().max(1)
);
}
Ok(CapturePowerState {
available: snapshot.available,
enabled: snapshot.enabled,
unit,
detail: snapshot.detail,
detail,
active_leases,
mode: match manual_override {
Some(true) => "forced-on".to_string(),
@ -120,36 +189,109 @@ impl CapturePowerManager {
})
}
async fn release_one(&self) {
let (desired, unit, leases, manual_override) = {
async fn release_one(&self, kind: LeaseKind) {
let (desired, unit, leases, manual_override, grace_remaining, grace_sync) = {
let mut inner = self.inner.lock().await;
inner.active_leases = inner.active_leases.saturating_sub(1);
match kind {
LeaseKind::Preview => {
inner.preview_leases = inner.preview_leases.saturating_sub(1);
}
LeaseKind::Session => {
inner.session_leases = inner.session_leases.saturating_sub(1);
if inner.session_leases == 0 {
let deadline = Instant::now() + self.session_grace;
inner.session_grace_deadline = Some(deadline);
inner.sync_generation = inner.sync_generation.saturating_add(1);
}
}
}
let grace_sync = match kind {
LeaseKind::Session if inner.session_leases == 0 => inner
.session_grace_deadline
.map(|deadline| (inner.sync_generation, deadline)),
_ => None,
};
let (desired, grace_remaining) = desired_state_and_grace(&inner, Instant::now());
(
desired_state(&inner),
desired,
self.unit.to_string(),
inner.active_leases,
active_leases(&inner),
inner.manual_override,
grace_remaining,
grace_sync,
)
};
if let Err(err) = sync_unit_state(unit.as_str(), desired).await {
warn!(
unit = %unit,
kind = kind.as_str(),
leases,
desired,
?manual_override,
?grace_remaining,
?err,
"capture power sync failed on release"
);
} else {
info!(
unit = %unit,
kind = kind.as_str(),
leases,
desired,
?manual_override,
?grace_remaining,
"capture power synced"
);
}
if let Some((generation, deadline)) = grace_sync {
self.schedule_grace_sync(generation, deadline);
}
}
fn schedule_grace_sync(&self, generation: u64, deadline: Instant) {
let manager = self.clone();
tokio::spawn(async move {
tokio::time::sleep_until(deadline).await;
let (desired, unit, leases, manual_override, grace_remaining, current_generation) = {
let inner = manager.inner.lock().await;
let (desired, grace_remaining) = desired_state_and_grace(&inner, Instant::now());
(
desired,
manager.unit.to_string(),
active_leases(&inner),
inner.manual_override,
grace_remaining,
inner.sync_generation,
)
};
if current_generation != generation {
return;
}
if let Err(err) = sync_unit_state(unit.as_str(), desired).await {
warn!(
unit = %unit,
generation,
leases,
desired,
?manual_override,
?grace_remaining,
?err,
"capture power sync failed after grace"
);
} else {
info!(
unit = %unit,
generation,
leases,
desired,
?manual_override,
?grace_remaining,
"capture power synced after grace"
);
}
});
}
}
@ -160,15 +302,37 @@ impl Drop for CapturePowerLease {
return;
}
let manager = self.manager.clone();
let kind = self.kind;
tokio::spawn(async move {
manager.release_one().await;
manager.release_one(kind).await;
});
}
}
#[cfg(not(coverage))]
fn desired_state(inner: &CapturePowerInner) -> bool {
inner.manual_override.unwrap_or(inner.active_leases > 0)
fn active_leases(inner: &CapturePowerInner) -> u32 {
inner.preview_leases.saturating_add(inner.session_leases)
}
#[cfg(not(coverage))]
fn desired_state_and_grace(inner: &CapturePowerInner, now: Instant) -> (bool, Option<Duration>) {
if let Some(manual_override) = inner.manual_override {
return (manual_override, None);
}
let grace_remaining = inner
.session_grace_deadline
.and_then(|deadline| deadline.checked_duration_since(now));
let desired = inner.preview_leases > 0 || inner.session_leases > 0 || grace_remaining.is_some();
(desired, grace_remaining)
}
#[cfg(not(coverage))]
fn capture_power_session_grace_from_env() -> Duration {
std::env::var("LESAVKA_CAPTURE_POWER_GRACE_SECS")
.ok()
.and_then(|raw| raw.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or_else(|| Duration::from_secs(30))
}
#[cfg(not(coverage))]
@ -258,6 +422,10 @@ impl CapturePowerManager {
CapturePowerLease
}
pub async fn acquire_session(&self) -> CapturePowerLease {
CapturePowerLease
}
pub async fn set_manual(&self, enabled: bool) -> anyhow::Result<CapturePowerState> {
Ok(CapturePowerState {
available: true,
@ -304,6 +472,13 @@ impl CapturePowerManager {
mod tests {
use super::*;
#[tokio::test]
async fn coverage_stub_supports_preview_and_session_leases() {
let manager = CapturePowerManager::new();
let _preview = manager.acquire().await;
let _session = manager.acquire_session().await;
}
#[tokio::test]
async fn coverage_stub_reports_auto_snapshot() {
let state = CapturePowerManager::new()

View File

@ -13,9 +13,9 @@ use tonic_reflection::server::Builder as ReflBuilder;
use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::{
relay_server::{Relay, RelayServer},
AudioPacket, CapturePowerCommand, CapturePowerState, Empty, KeyboardReport, MonitorRequest,
MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, VideoPacket,
relay_server::{Relay, RelayServer},
};
use lesavka_server::{
@ -227,8 +227,10 @@ impl Relay for Handler {
let ms = self.ms.clone();
let gadget = self.gadget.clone();
let did_cycle = self.did_cycle.clone();
let session_lease = self.capture_power.acquire_session().await;
tokio::spawn(async move {
let _session_lease = session_lease;
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = runtime_support::write_hid_report(&kb, &pkt.data).await {
@ -266,8 +268,10 @@ impl Relay for Handler {
let kb = self.kb.clone();
let gadget = self.gadget.clone();
let did_cycle = self.did_cycle.clone();
let session_lease = self.capture_power.acquire_session().await;
tokio::spawn(async move {
let _session_lease = session_lease;
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
if let Err(e) = runtime_support::write_hid_report(&ms, &pkt.data).await {