diff --git a/.gitignore b/.gitignore index 0de3165..00d6f99 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ dist/ coverage/ logs/ captures/ +tmp/ override.toml .cache/sccache/ /unit-graph.json diff --git a/Cargo.lock b/Cargo.lock index f9d36e6..312432a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.16.3" +version = "0.16.4" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.16.3" +version = "0.16.4" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.16.3" +version = "0.16.4" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 4a6cc3b..33afe89 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.16.3" +version = "0.16.4" edition = "2024" [dependencies] diff --git a/client/src/app/downlink_media.rs b/client/src/app/downlink_media.rs index d6e4213..226a9ce 100644 --- a/client/src/app/downlink_media.rs +++ b/client/src/app/downlink_media.rs @@ -68,12 +68,30 @@ impl LesavkaClientApp { /*──────────────── audio stream ───────────────*/ #[cfg(not(coverage))] - async fn audio_loop(ep: Channel, out: AudioOut) { + async fn audio_loop( + ep: Channel, + out: AudioOut, + media_controls: crate::live_media_control::LiveMediaControls, + ) { let mut consecutive_source_failures = 0_u32; let mut last_usb_recovery_at: Option = None; let mut delay = Duration::from_secs(1); let mut audio_failure_log = AudioFailureLogLimiter::default(); + let mut paused = false; loop { + if !media_controls.refresh().audio { + if !paused { + tracing::info!("🔇 remote audio soft-paused"); + paused = true; + } + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } + if paused { + tracing::info!("🔊 remote audio resumed"); + paused = false; + delay = Duration::from_secs(1); + } let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: 0, @@ -90,6 +108,10 @@ impl LesavkaClientApp { let mut warned_no_packets = false; delay = Duration::from_secs(1); loop { + if !media_controls.refresh().audio { + tracing::info!("🔇 remote audio soft-paused; closing capture stream"); + break; + } match tokio::time::timeout( Duration::from_secs(1), stream.get_mut().message(), diff --git a/client/src/app/session_lifecycle.rs b/client/src/app/session_lifecycle.rs index 962a6aa..e803bfc 100644 --- a/client/src/app/session_lifecycle.rs +++ b/client/src/app/session_lifecycle.rs @@ -58,9 +58,17 @@ impl LesavkaClientApp { let caps = handshake::negotiate(&self.server_addr).await; tracing::info!("🤝 server capabilities = {:?}", caps); let camera_cfg = app_support::camera_config_from_caps(&caps); + let media_controls = crate::live_media_control::LiveMediaControls::from_env( + crate::live_media_control::MediaControlState::new( + caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(), + caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(), + std::env::var("LESAVKA_AUDIO_DISABLE").is_err(), + ), + ); + let media_state = media_controls.refresh(); let uplink_telemetry = crate::uplink_telemetry::UplinkTelemetryPublisher::from_env( - caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(), - caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(), + media_state.camera, + media_state.microphone, ); /*────────── persistent gRPC channels ──────────*/ @@ -207,7 +215,11 @@ impl LesavkaClientApp { if std::env::var("LESAVKA_AUDIO_DISABLE").is_err() { let audio_out = AudioOut::new()?; let ep_audio = vid_ep.clone(); - tokio::spawn(Self::audio_loop(ep_audio, audio_out)); + tokio::spawn(Self::audio_loop( + ep_audio, + audio_out, + media_controls.clone(), + )); } else { info!("🔇 remote audio disabled for this relay session"); } @@ -229,6 +241,7 @@ impl LesavkaClientApp { let cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok(); let cam_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera); + let media_controls = media_controls.clone(); tokio::spawn(async move { let result = tokio::task::spawn_blocking(move || { CameraCapture::new(cam_source.as_deref(), camera_cfg) @@ -237,7 +250,12 @@ impl LesavkaClientApp { match result { Ok(Ok(cam)) => { let cam = Arc::new(cam); - tokio::spawn(Self::cam_loop(ep, cam, cam_telemetry.clone())); + tokio::spawn(Self::cam_loop( + ep, + cam, + cam_telemetry.clone(), + media_controls.clone(), + )); } Ok(Err(err)) => { cam_telemetry.record_disconnect(format!( @@ -262,12 +280,18 @@ impl LesavkaClientApp { let ep = vid_ep.clone(); let mic_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); + let media_controls = media_controls.clone(); tokio::spawn(async move { let result = tokio::task::spawn_blocking(MicrophoneCapture::new).await; match result { Ok(Ok(mic)) => { let mic = Arc::new(mic); - tokio::spawn(Self::voice_loop(ep, mic, mic_telemetry.clone())); + tokio::spawn(Self::voice_loop( + ep, + mic, + mic_telemetry.clone(), + media_controls.clone(), + )); } Ok(Err(err)) => { mic_telemetry.record_disconnect(format!( diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index dfab5c6..6f75c21 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -5,6 +5,7 @@ impl LesavkaClientApp { ep: Channel, mic: Arc, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, + media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); @@ -53,8 +54,24 @@ impl LesavkaClientApp { let telemetry_thread = telemetry.clone(); let queue_thread = queue.clone(); let drop_log_thread = Arc::clone(&drop_log); + let media_controls_thread = media_controls.clone(); let mic_worker = std::thread::spawn(move || { + let mut paused = false; while stop_rx.try_recv().is_err() { + if !media_controls_thread.refresh().microphone { + if !paused { + telemetry_thread.record_enabled(false); + tracing::info!("🎤 microphone uplink soft-paused"); + paused = true; + } + std::thread::sleep(Duration::from_millis(20)); + continue; + } + if paused { + telemetry_thread.record_enabled(true); + tracing::info!("🎤 microphone uplink resumed"); + paused = false; + } if let Some(pkt) = mic_clone.pull() { trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len()); let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); @@ -108,6 +125,7 @@ impl LesavkaClientApp { ep: Channel, cam: Arc, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, + media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); @@ -154,10 +172,25 @@ impl LesavkaClientApp { let telemetry = telemetry.clone(); let queue = queue.clone(); let drop_log = Arc::clone(&drop_log); + let media_controls = media_controls.clone(); move || loop { if stop_rx.try_recv().is_ok() { break; } + if !media_controls.refresh().camera { + telemetry.record_enabled(false); + tracing::info!("📸 webcam uplink soft-paused"); + while stop_rx.try_recv().is_err() + && !media_controls.refresh().camera + { + std::thread::sleep(Duration::from_millis(25)); + } + if stop_rx.try_recv().is_ok() { + break; + } + telemetry.record_enabled(true); + tracing::info!("📸 webcam uplink resumed"); + } let Some(pkt) = cam.pull() else { std::thread::sleep(Duration::from_millis(5)); continue; diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 538aa51..bca07ec 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -303,17 +303,33 @@ fn microphone_pipeline_desc(source_desc: &str, gain: f64, level_tap_enabled: boo } fn buffer_duration_us(buf: &gst::BufferRef, bytes: usize) -> u64 { + let payload_duration_us = pcm_payload_duration_us(bytes); buf.duration() .map(|ts| ts.nseconds() / 1_000) - .unwrap_or_else(|| { - let bytes_per_frame = MIC_CHANNELS * MIC_SAMPLE_BYTES; - let frames = bytes / bytes_per_frame.max(1); - ((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) - as u64 - }) + .filter(|duration_us| duration_matches_pcm_payload(*duration_us, payload_duration_us)) + .unwrap_or(payload_duration_us) .max(1) } +fn pcm_payload_duration_us(bytes: usize) -> u64 { + let bytes_per_frame = MIC_CHANNELS * MIC_SAMPLE_BYTES; + let frames = bytes / bytes_per_frame.max(1); + ((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) as u64 +} + +/// Rejects bogus capture timestamps before they can poison mic PTS rebasing. +fn duration_matches_pcm_payload(reported_us: u64, payload_us: u64) -> bool { + if reported_us == 0 { + return false; + } + if payload_us <= 1 { + return true; + } + let lower = (payload_us / 8).max(1); + let upper = payload_us.saturating_mul(8); + reported_us >= lower && reported_us <= upper +} + /// Detect launcher catalog names that should be opened through Pulse directly. fn looks_like_pulse_source_name(source: &str) -> bool { let source = source.trim(); @@ -411,3 +427,53 @@ impl Drop for MicrophoneCapture { let _ = self.pipeline.set_state(gst::State::Null); } } + +#[cfg(test)] +mod tests { + use super::{ + MIC_CHANNELS, MIC_SAMPLE_BYTES, MIC_SAMPLE_RATE, buffer_duration_us, + pcm_payload_duration_us, + }; + use gstreamer as gst; + + fn buffer_with_duration(size: usize, duration: Option) -> gst::Buffer { + gst::init().ok(); + let mut buffer = gst::Buffer::with_size(size).expect("test buffer"); + buffer + .get_mut() + .expect("test buffer should be uniquely owned") + .set_duration(duration); + buffer + } + + #[test] + fn mic_payload_duration_uses_pcm_frame_count() { + let ten_ms_bytes = (MIC_SAMPLE_RATE as usize / 100) * MIC_CHANNELS * MIC_SAMPLE_BYTES; + + assert_eq!(pcm_payload_duration_us(ten_ms_bytes), 10_000); + } + + #[test] + fn zero_reported_duration_falls_back_to_pcm_payload_duration() { + let bytes = 1_024 * MIC_CHANNELS * MIC_SAMPLE_BYTES; + let buffer = buffer_with_duration(bytes, Some(gst::ClockTime::ZERO)); + + assert_eq!(buffer_duration_us(buffer.as_ref(), bytes), 21_333); + } + + #[test] + fn implausibly_tiny_reported_duration_falls_back_to_payload_duration() { + let bytes = 1_024 * MIC_CHANNELS * MIC_SAMPLE_BYTES; + let buffer = buffer_with_duration(bytes, Some(gst::ClockTime::from_useconds(1))); + + assert_eq!(buffer_duration_us(buffer.as_ref(), bytes), 21_333); + } + + #[test] + fn plausible_reported_duration_is_preserved() { + let bytes = 1_024 * MIC_CHANNELS * MIC_SAMPLE_BYTES; + let buffer = buffer_with_duration(bytes, Some(gst::ClockTime::from_useconds(20_000))); + + assert_eq!(buffer_duration_us(buffer.as_ref(), bytes), 20_000); + } +} diff --git a/client/src/launcher/mod.rs b/client/src/launcher/mod.rs index 8867387..4eaa352 100644 --- a/client/src/launcher/mod.rs +++ b/client/src/launcher/mod.rs @@ -162,9 +162,7 @@ pub fn runtime_env_vars(state: &LauncherState) -> BTreeMap { if matches!(state.view_mode, ViewMode::Unified) { envs.insert("LESAVKA_DISABLE_VIDEO_RENDER".to_string(), "1".to_string()); } - if state.channels.camera - && let Some(camera) = state.devices.camera.as_ref() - { + if let Some(camera) = state.devices.camera.as_ref() { envs.insert("LESAVKA_CAM_SOURCE".to_string(), camera.clone()); if let Some(mode) = state.camera_quality { envs.insert("LESAVKA_CAM_WIDTH".to_string(), mode.width.to_string()); @@ -178,16 +176,12 @@ pub fn runtime_env_vars(state: &LauncherState) -> BTreeMap { } else { envs.insert("LESAVKA_CAM_DISABLE".to_string(), "1".to_string()); } - if state.channels.microphone - && let Some(microphone) = state.devices.microphone.as_ref() - { + if let Some(microphone) = state.devices.microphone.as_ref() { envs.insert("LESAVKA_MIC_SOURCE".to_string(), microphone.clone()); } else { envs.insert("LESAVKA_MIC_DISABLE".to_string(), "1".to_string()); } - if state.channels.audio - && let Some(speaker) = state.devices.speaker.as_ref() - { + if let Some(speaker) = state.devices.speaker.as_ref() { envs.insert("LESAVKA_AUDIO_SINK".to_string(), speaker.clone()); } else { envs.insert("LESAVKA_AUDIO_DISABLE".to_string(), "1".to_string()); diff --git a/client/src/launcher/tests/mod.rs b/client/src/launcher/tests/mod.rs index 6f97e60..b00f1fe 100644 --- a/client/src/launcher/tests/mod.rs +++ b/client/src/launcher/tests/mod.rs @@ -287,7 +287,7 @@ fn runtime_env_vars_emit_selected_audio_gain() { } #[test] -fn runtime_env_vars_use_channel_toggles_for_media_inclusion() { +fn runtime_env_vars_keep_selected_media_available_for_live_soft_pause() { let mut state = LauncherState::new(); let envs = runtime_env_vars(&state); @@ -298,6 +298,15 @@ fn runtime_env_vars_use_channel_toggles_for_media_inclusion() { state.select_camera(Some("/dev/video0".to_string())); state.select_microphone(Some("alsa_input.usb".to_string())); state.select_speaker(Some("alsa_output.usb".to_string())); + let envs = runtime_env_vars(&state); + assert!(!envs.contains_key("LESAVKA_CAM_DISABLE")); + assert!(!envs.contains_key("LESAVKA_MIC_DISABLE")); + assert!(!envs.contains_key("LESAVKA_AUDIO_DISABLE")); + assert_eq!( + envs.get("LESAVKA_CAM_SOURCE"), + Some(&"/dev/video0".to_string()) + ); + state.set_camera_channel_enabled(true); state.set_microphone_channel_enabled(true); let envs = runtime_env_vars(&state); @@ -307,7 +316,7 @@ fn runtime_env_vars_use_channel_toggles_for_media_inclusion() { state.set_audio_channel_enabled(false); let envs = runtime_env_vars(&state); - assert_eq!(envs.get("LESAVKA_AUDIO_DISABLE"), Some(&"1".to_string())); + assert!(!envs.contains_key("LESAVKA_AUDIO_DISABLE")); } #[test] diff --git a/client/src/launcher/tests/ui_runtime.rs b/client/src/launcher/tests/ui_runtime.rs index f47ddf6..09fd8e5 100644 --- a/client/src/launcher/tests/ui_runtime.rs +++ b/client/src/launcher/tests/ui_runtime.rs @@ -5,6 +5,7 @@ use crate::launcher::{ state::{BreakoutSizePreset, LauncherState, PreviewSourceSize}, ui_components::build_launcher_view, }; +use crate::uplink_telemetry::UpstreamStreamTelemetry; use gtk::prelude::*; use serial_test::serial; use std::{ @@ -399,6 +400,88 @@ fn server_chip_state_tracks_connection_not_just_reachability() { assert_eq!(server_version_label(&state), "???"); } +#[test] +fn uac_chip_uses_live_microphone_flow_not_only_server_caps() { + let mut state = LauncherState::new(); + state.set_server_available(true); + state.set_server_media_caps(Some(true), Some(true), Some("uvc".to_string()), None); + state.set_microphone_channel_enabled(true); + + assert_eq!( + recovery_uac_health(&state, false, None), + (StatusLightState::Live, "Ready".to_string()) + ); + assert_eq!( + recovery_uac_health(&state, true, None), + (StatusLightState::Caution, "No Flow".to_string()) + ); + + let healthy = UpstreamStreamTelemetry { + enabled: true, + connected: true, + packets_streamed: 24, + latest_delivery_age_ms: 42.0, + latest_enqueue_age_ms: 12.0, + queue_depth: 1, + ..UpstreamStreamTelemetry::default() + }; + assert_eq!( + recovery_uac_health(&state, true, Some(&healthy)), + (StatusLightState::Live, "Flowing".to_string()) + ); + + state.set_microphone_channel_enabled(false); + assert_eq!( + recovery_uac_health(&state, true, Some(&healthy)), + (StatusLightState::Idle, "Paused".to_string()) + ); +} + +#[test] +fn uvc_chip_degrades_when_live_camera_frames_are_not_flowing() { + let mut state = LauncherState::new(); + state.set_server_available(true); + state.set_server_media_caps( + Some(true), + Some(true), + Some("uvc".to_string()), + Some("mjpeg".to_string()), + ); + state.set_camera_channel_enabled(true); + + assert_eq!( + recovery_uvc_health(&state, false, None), + (StatusLightState::Live, "MJPEG".to_string()) + ); + assert_eq!( + recovery_uvc_health(&state, true, None), + (StatusLightState::Caution, "No Frames".to_string()) + ); + + let healthy = UpstreamStreamTelemetry { + enabled: true, + connected: true, + packets_streamed: 12, + latest_delivery_age_ms: 48.0, + latest_enqueue_age_ms: 20.0, + queue_depth: 3, + ..UpstreamStreamTelemetry::default() + }; + assert_eq!( + recovery_uvc_health(&state, true, Some(&healthy)), + (StatusLightState::Live, "MJPEG".to_string()) + ); + + let lagging = UpstreamStreamTelemetry { + latest_delivery_age_ms: 321.0, + ..healthy + }; + assert_eq!( + recovery_uvc_health(&state, true, Some(&lagging)), + (StatusLightState::Caution, "Lagging".to_string()) + ); +} + #[test] fn capture_power_detail_mentions_detected_eyes_when_powered() { let power = CapturePowerStatus { @@ -478,6 +561,22 @@ fn write_mic_gain_request_formats_live_control_file() { assert!(raw.starts_with("3.250 "), "{raw}"); } +#[test] +fn write_media_control_request_formats_soft_pause_state() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = dir.path().join("media.control"); + let mut state = LauncherState::new(); + state.set_camera_channel_enabled(true); + state.set_microphone_channel_enabled(false); + state.set_audio_channel_enabled(true); + + write_media_control_request(&path, &state).expect("write media control"); + let raw = std::fs::read_to_string(path).expect("read media control"); + assert!(raw.contains("camera=1"), "{raw}"); + assert!(raw.contains("microphone=0"), "{raw}"); + assert!(raw.contains("audio=1"), "{raw}"); +} + #[gtk::test] #[serial] fn dock_all_displays_to_preview_closes_popouts_and_resets_surfaces() { diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index bfcc152..7af663c 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -27,14 +27,15 @@ use { RelayChild, append_session_log_for_level, apply_popout_window_size, attach_child_log_streams, audio_gain_control_path, capture_swap_key, copy_plain_text, copy_session_log, dock_all_displays_to_preview, dock_display_to_preview, - input_control_path, input_state_path, input_toggle_control_path, mic_gain_control_path, - next_input_routing, open_diagnostics_popout, open_popout_window, open_session_log_popout, - path_marker, present_popout_windows, read_input_routing_state, reap_exited_child, - refresh_launcher_ui, refresh_test_buttons, routing_name, selected_combo_value, - selected_server_addr, shutdown_launcher_runtime, spawn_client_process, stop_child_process, - toggle_key_label, update_test_action_result, uplink_camera_preview_path, - uplink_mic_level_path, uplink_telemetry_path, write_audio_gain_request, - write_input_routing_request, write_input_toggle_key_request, write_mic_gain_request, + input_control_path, input_state_path, input_toggle_control_path, media_control_path, + mic_gain_control_path, next_input_routing, open_diagnostics_popout, open_popout_window, + open_session_log_popout, path_marker, present_popout_windows, read_input_routing_state, + reap_exited_child, refresh_launcher_ui, refresh_test_buttons, routing_name, + selected_combo_value, selected_server_addr, shutdown_launcher_runtime, + spawn_client_process, stop_child_process, toggle_key_label, update_test_action_result, + uplink_camera_preview_path, uplink_mic_level_path, uplink_telemetry_path, + write_audio_gain_request, write_input_routing_request, write_input_toggle_key_request, + write_media_control_request, write_mic_gain_request, }, crate::handshake::{HandshakeProbe, probe}, crate::output::display::enumerate_monitors, diff --git a/client/src/launcher/ui/control_requests.rs b/client/src/launcher/ui/control_requests.rs index 2e6acc3..a07a8aa 100644 --- a/client/src/launcher/ui/control_requests.rs +++ b/client/src/launcher/ui/control_requests.rs @@ -103,6 +103,38 @@ fn apply_mic_gain_change( true } +#[cfg(not(coverage))] +/// Apply a live media soft-pause/resume request without restarting USB gadget functions. +fn apply_media_control_change( + state_snapshot: &LauncherState, + widgets: &super::ui_components::LauncherWidgets, + child_proc: &Rc>>, + feed_label: &str, + enabled: bool, +) { + let relay_live = child_proc + .try_borrow() + .map(|child| child.is_some()) + .unwrap_or(false); + let action = if enabled { "resumed" } else { "soft-paused" }; + if relay_live { + let path = media_control_path(); + match write_media_control_request(&path, state_snapshot) { + Ok(()) => widgets + .status_label + .set_text(&format!("{feed_label} {action} for the live relay.")), + Err(err) => widgets.status_label.set_text(&format!( + "{feed_label} will be {action} on the next relay launch, but the live soft-pause control could not be written: {err}" + )), + } + } else { + widgets.status_label.set_text(&format!( + "{feed_label} will start {} on the next relay launch.", + if enabled { "enabled" } else { "paused" } + )); + } +} + #[cfg(not(coverage))] /// Refresh relay capture-power state in the background so GTK stays responsive. fn request_capture_power_refresh( diff --git a/client/src/launcher/ui/media_device_bindings.rs b/client/src/launcher/ui/media_device_bindings.rs index 674112c..ef5d55a 100644 --- a/client/src/launcher/ui/media_device_bindings.rs +++ b/client/src/launcher/ui/media_device_bindings.rs @@ -87,16 +87,21 @@ let tests = Rc::clone(&tests); let toggle = widgets.camera_channel_toggle.clone(); toggle.connect_toggled(move |toggle| { + let enabled = toggle.is_active(); if let Ok(mut state) = state.try_borrow_mut() { - state.set_camera_channel_enabled(toggle.is_active()); + state.set_camera_channel_enabled(enabled); } - if !toggle.is_active() { + if !enabled { tests.borrow_mut().stop_camera_preview(); - widgets - .status_label - .set_text("Camera stream disabled. Webcam preview stopped."); } if let Ok(state_snapshot) = state.try_borrow().map(|state| state.clone()) { + apply_media_control_change( + &state_snapshot, + &widgets, + &child_proc, + "Camera", + enabled, + ); refresh_launcher_ui( &widgets, &state_snapshot, @@ -114,18 +119,23 @@ let tests = Rc::clone(&tests); let toggle = widgets.microphone_channel_toggle.clone(); toggle.connect_toggled(move |toggle| { + let enabled = toggle.is_active(); if let Ok(mut state) = state.try_borrow_mut() { - state.set_microphone_channel_enabled(toggle.is_active()); + state.set_microphone_channel_enabled(enabled); } - if !toggle.is_active() { + if !enabled { let mut tests = tests.borrow_mut(); tests.stop_microphone_monitor(); tests.stop_microphone_replay(); - widgets - .status_label - .set_text("Mic stream disabled. Mic monitor and replay stopped."); } if let Ok(state_snapshot) = state.try_borrow().map(|state| state.clone()) { + apply_media_control_change( + &state_snapshot, + &widgets, + &child_proc, + "Mic", + enabled, + ); refresh_launcher_ui( &widgets, &state_snapshot, @@ -143,18 +153,23 @@ let tests = Rc::clone(&tests); let toggle = widgets.audio_channel_toggle.clone(); toggle.connect_toggled(move |toggle| { + let enabled = toggle.is_active(); if let Ok(mut state) = state.try_borrow_mut() { - state.set_audio_channel_enabled(toggle.is_active()); + state.set_audio_channel_enabled(enabled); } - if !toggle.is_active() { + if !enabled { let mut tests = tests.borrow_mut(); tests.stop_speaker_test(); tests.stop_microphone_replay(); - widgets - .status_label - .set_text("Speaker stream disabled. Local audio playback stopped."); } if let Ok(state_snapshot) = state.try_borrow().map(|state| state.clone()) { + apply_media_control_change( + &state_snapshot, + &widgets, + &child_proc, + "Speaker", + enabled, + ); refresh_launcher_ui( &widgets, &state_snapshot, diff --git a/client/src/launcher/ui_runtime/control_paths.rs b/client/src/launcher/ui_runtime/control_paths.rs index 22f094c..a2054d8 100644 --- a/client/src/launcher/ui_runtime/control_paths.rs +++ b/client/src/launcher/ui_runtime/control_paths.rs @@ -88,6 +88,12 @@ pub fn mic_gain_control_path() -> PathBuf { .unwrap_or_else(|_| PathBuf::from(DEFAULT_MIC_GAIN_CONTROL_PATH)) } +pub fn media_control_path() -> PathBuf { + std::env::var(MEDIA_CONTROL_ENV) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(DEFAULT_MEDIA_CONTROL_PATH)) +} + pub fn uplink_camera_preview_path() -> PathBuf { std::env::var(UPLINK_CAMERA_PREVIEW_ENV) .map(PathBuf::from) @@ -126,6 +132,18 @@ pub fn write_mic_gain_request(path: &Path, gain_percent: u32) -> Result<()> { Ok(()) } +pub fn write_media_control_request(path: &Path, state: &LauncherState) -> Result<()> { + crate::live_media_control::write_media_control_request( + path, + crate::live_media_control::MediaControlState::new( + state.channels.camera, + state.channels.microphone, + state.channels.audio, + ), + )?; + Ok(()) +} + pub fn write_input_toggle_key_request(path: &Path, swap_key: &str) -> Result<()> { std::fs::write( path, diff --git a/client/src/launcher/ui_runtime/process_logs.rs b/client/src/launcher/ui_runtime/process_logs.rs index 075ef6c..9e0b986 100644 --- a/client/src/launcher/ui_runtime/process_logs.rs +++ b/client/src/launcher/ui_runtime/process_logs.rs @@ -39,6 +39,9 @@ pub fn spawn_client_process( let mic_gain_path = mic_gain_control_path(); let _ = write_mic_gain_request(&mic_gain_path, state.mic_gain_percent); command.env(MIC_GAIN_CONTROL_ENV, mic_gain_path); + let media_control_path = media_control_path(); + let _ = write_media_control_request(&media_control_path, state); + command.env(MEDIA_CONTROL_ENV, media_control_path); let camera_preview_path = uplink_camera_preview_path(); let _ = std::fs::remove_file(&camera_preview_path); command.env(UPLINK_CAMERA_PREVIEW_ENV, camera_preview_path); diff --git a/client/src/launcher/ui_runtime/status_details.rs b/client/src/launcher/ui_runtime/status_details.rs index 046a352..7d8ffa4 100644 --- a/client/src/launcher/ui_runtime/status_details.rs +++ b/client/src/launcher/ui_runtime/status_details.rs @@ -139,19 +139,35 @@ fn recovery_usb_health(state: &LauncherState) -> (StatusLightState, String) { } /// Summarize whether the UAC microphone/audio function is advertised by the relay. -fn recovery_uac_health(state: &LauncherState) -> (StatusLightState, String) { +fn recovery_uac_health( + state: &LauncherState, + relay_live: bool, + stream: Option<&crate::uplink_telemetry::UpstreamStreamTelemetry>, +) -> (StatusLightState, String) { if !state.server_available { return (StatusLightState::Idle, "Offline".to_string()); } - match state.server_microphone { - Some(true) => (StatusLightState::Live, "Ready".to_string()), - Some(false) => (StatusLightState::Warning, "Missing".to_string()), - None => (StatusLightState::Caution, "Unknown".to_string()), + if state.server_microphone == Some(false) { + return (StatusLightState::Warning, "Missing".to_string()); } + if state.server_microphone.is_none() { + return (StatusLightState::Caution, "Unknown".to_string()); + } + if !relay_live { + return (StatusLightState::Live, "Ready".to_string()); + } + if !state.channels.microphone { + return (StatusLightState::Idle, "Paused".to_string()); + } + media_stream_health(stream, MediaStreamKind::Microphone) } /// Summarize whether the UVC camera function is advertised with the expected codec. -fn recovery_uvc_health(state: &LauncherState) -> (StatusLightState, String) { +fn recovery_uvc_health( + state: &LauncherState, + relay_live: bool, + stream: Option<&crate::uplink_telemetry::UpstreamStreamTelemetry>, +) -> (StatusLightState, String) { if !state.server_available { return (StatusLightState::Idle, "Offline".to_string()); } @@ -160,22 +176,88 @@ fn recovery_uvc_health(state: &LauncherState) -> (StatusLightState, String) { .as_deref() .map(|value| value.to_ascii_uppercase()) .unwrap_or_else(|| "READY".to_string()); - match state.server_camera { - Some(true) => { - if matches!(state.server_camera_output.as_deref(), Some("uvc")) { - (StatusLightState::Live, codec) - } else { - let value = state - .server_camera_output - .as_deref() - .map(|output| format!("{}/{}", output.to_ascii_uppercase(), codec)) - .unwrap_or(codec); - (StatusLightState::Caution, value) - } - } - Some(false) => (StatusLightState::Warning, "Missing".to_string()), - None => (StatusLightState::Caution, "Unknown".to_string()), + if state.server_camera == Some(false) { + return (StatusLightState::Warning, "Missing".to_string()); } + if state.server_camera.is_none() { + return (StatusLightState::Caution, "Unknown".to_string()); + } + if !matches!(state.server_camera_output.as_deref(), Some("uvc")) { + let value = state + .server_camera_output + .as_deref() + .map(|output| format!("{}/{}", output.to_ascii_uppercase(), codec)) + .unwrap_or(codec); + return (StatusLightState::Caution, value); + } + if !relay_live { + return (StatusLightState::Live, codec); + } + if !state.channels.camera { + return (StatusLightState::Idle, "Paused".to_string()); + } + let (health, label) = media_stream_health(stream, MediaStreamKind::Camera); + if matches!(health, StatusLightState::Live) { + (health, codec) + } else { + (health, label) + } +} + +#[derive(Clone, Copy)] +enum MediaStreamKind { + Camera, + Microphone, +} + +/// Converts live uplink telemetry into the small, glanceable UAC/UVC chip state. +fn media_stream_health( + stream: Option<&crate::uplink_telemetry::UpstreamStreamTelemetry>, + kind: MediaStreamKind, +) -> (StatusLightState, String) { + let Some(stream) = stream else { + return match kind { + MediaStreamKind::Camera => (StatusLightState::Caution, "No Frames".to_string()), + MediaStreamKind::Microphone => (StatusLightState::Caution, "No Flow".to_string()), + }; + }; + if !stream.enabled { + return (StatusLightState::Idle, "Paused".to_string()); + } + if !stream.last_error.trim().is_empty() { + return (StatusLightState::Warning, "Error".to_string()); + } + if !stream.connected { + return match kind { + MediaStreamKind::Camera => (StatusLightState::Caution, "No Frames".to_string()), + MediaStreamKind::Microphone => (StatusLightState::Caution, "No Flow".to_string()), + }; + } + if stream.packets_streamed == 0 { + let label = if stream.packets_enqueued > 0 { + "Queued" + } else { + match kind { + MediaStreamKind::Camera => "No Frames", + MediaStreamKind::Microphone => "No Flow", + } + }; + return (StatusLightState::Caution, label.to_string()); + } + + let (delivery_budget_ms, enqueue_budget_ms, queue_pressure, healthy_label) = match kind { + MediaStreamKind::Camera => (250.0, 250.0, 24, "Frames"), + MediaStreamKind::Microphone => (180.0, 120.0, 12, "Flowing"), + }; + if stream.latest_delivery_age_ms > delivery_budget_ms + || stream.latest_enqueue_age_ms > enqueue_budget_ms + { + return (StatusLightState::Caution, "Lagging".to_string()); + } + if stream.queue_depth >= queue_pressure { + return (StatusLightState::Caution, "Dropping".to_string()); + } + (StatusLightState::Live, healthy_label.to_string()) } fn gpio_light_state(power: &CapturePowerStatus) -> StatusLightState { diff --git a/client/src/launcher/ui_runtime/status_refresh.rs b/client/src/launcher/ui_runtime/status_refresh.rs index 76beb06..cf2b9ca 100644 --- a/client/src/launcher/ui_runtime/status_refresh.rs +++ b/client/src/launcher/ui_runtime/status_refresh.rs @@ -29,6 +29,7 @@ pub const INPUT_STATE_ENV: &str = "LESAVKA_LAUNCHER_INPUT_STATE"; pub const TOGGLE_KEY_CONTROL_ENV: &str = "LESAVKA_LAUNCHER_TOGGLE_KEY_CONTROL"; pub const AUDIO_GAIN_CONTROL_ENV: &str = "LESAVKA_AUDIO_GAIN_CONTROL"; pub const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL"; +pub const MEDIA_CONTROL_ENV: &str = crate::live_media_control::MEDIA_CONTROL_ENV; pub const UPLINK_CAMERA_PREVIEW_ENV: &str = "LESAVKA_UPLINK_CAMERA_PREVIEW"; pub const UPLINK_MIC_LEVEL_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL"; pub use crate::uplink_telemetry::{DEFAULT_UPLINK_TELEMETRY_PATH, UPLINK_TELEMETRY_ENV}; @@ -37,6 +38,7 @@ pub const DEFAULT_INPUT_STATE_PATH: &str = "/tmp/lesavka-launcher-input.state"; pub const DEFAULT_TOGGLE_KEY_CONTROL_PATH: &str = "/tmp/lesavka-launcher-toggle-key.control"; pub const DEFAULT_AUDIO_GAIN_CONTROL_PATH: &str = "/tmp/lesavka-audio-gain.control"; pub const DEFAULT_MIC_GAIN_CONTROL_PATH: &str = "/tmp/lesavka-mic-gain.control"; +pub const DEFAULT_MEDIA_CONTROL_PATH: &str = crate::live_media_control::DEFAULT_MEDIA_CONTROL_PATH; pub const DEFAULT_UPLINK_CAMERA_PREVIEW_PATH: &str = "/tmp/lesavka-uplink-camera-preview.rgba"; pub const DEFAULT_UPLINK_MIC_LEVEL_PATH: &str = "/tmp/lesavka-uplink-mic-level.value"; @@ -44,6 +46,7 @@ pub type RelayChild = Child; pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, child_running: bool) { let relay_live = child_running || state.remote_active; + let latest_sample = widgets.diagnostics_log.borrow().latest().cloned(); let server_label = server_version_label(state); set_status_light( &widgets.summary.relay_light, @@ -86,11 +89,23 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi set_status_light(&widgets.summary.usb_light, usb_state); widgets.summary.usb_value.set_text(&usb_value); widgets.summary.usb_value.set_tooltip_text(Some(&usb_value)); - let (uac_state, uac_value) = recovery_uac_health(state); + let (uac_state, uac_value) = recovery_uac_health( + state, + relay_live, + latest_sample + .as_ref() + .map(|sample| &sample.upstream_microphone), + ); set_status_light(&widgets.summary.uac_light, uac_state); widgets.summary.uac_value.set_text(&uac_value); widgets.summary.uac_value.set_tooltip_text(Some(&uac_value)); - let (uvc_state, uvc_value) = recovery_uvc_health(state); + let (uvc_state, uvc_value) = recovery_uvc_health( + state, + relay_live, + latest_sample + .as_ref() + .map(|sample| &sample.upstream_camera), + ); set_status_light(&widgets.summary.uvc_light, uvc_state); widgets.summary.uvc_value.set_text(&uvc_value); widgets.summary.uvc_value.set_tooltip_text(Some(&uvc_value)); @@ -177,9 +192,29 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi .set_sensitive(!relay_live && state.channels.audio); widgets.keyboard_combo.set_sensitive(!relay_live); widgets.mouse_combo.set_sensitive(!relay_live); - widgets.camera_channel_toggle.set_sensitive(!relay_live); - widgets.microphone_channel_toggle.set_sensitive(!relay_live); - widgets.audio_channel_toggle.set_sensitive(!relay_live); + widgets + .camera_channel_toggle + .set_sensitive(!relay_live || state.devices.camera.is_some()); + widgets + .microphone_channel_toggle + .set_sensitive(!relay_live || state.devices.microphone.is_some()); + widgets + .audio_channel_toggle + .set_sensitive(!relay_live || state.devices.speaker.is_some()); + let media_toggle_tooltip = if relay_live { + "Soft-pause or resume this feed in the running relay without resetting USB. Use Recover for hard device resets." + } else { + "Choose which media feeds are enabled when the next relay session starts." + }; + widgets + .camera_channel_toggle + .set_tooltip_text(Some(media_toggle_tooltip)); + widgets + .microphone_channel_toggle + .set_tooltip_text(Some(media_toggle_tooltip)); + widgets + .audio_channel_toggle + .set_tooltip_text(Some(media_toggle_tooltip)); widgets .camera_test_button .set_sensitive(!relay_live && state.channels.camera); diff --git a/client/src/lib.rs b/client/src/lib.rs index a15cb01..8988ebb 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -13,6 +13,7 @@ pub mod input; pub mod launcher; pub mod layout; pub(crate) mod live_capture_clock; +pub(crate) mod live_media_control; pub mod output; pub mod paste; pub mod relay_transport; diff --git a/client/src/live_media_control.rs b/client/src/live_media_control.rs new file mode 100644 index 0000000..61796b3 --- /dev/null +++ b/client/src/live_media_control.rs @@ -0,0 +1,200 @@ +//! Live media feed controls shared by the launcher and relay child. + +use std::{ + fs, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + time::{SystemTime, UNIX_EPOCH}, +}; + +pub const MEDIA_CONTROL_ENV: &str = "LESAVKA_MEDIA_CONTROL"; +pub const DEFAULT_MEDIA_CONTROL_PATH: &str = "/tmp/lesavka-media.control"; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub(crate) struct MediaControlState { + pub camera: bool, + pub microphone: bool, + pub audio: bool, +} + +impl MediaControlState { + #[must_use] + pub const fn new(camera: bool, microphone: bool, audio: bool) -> Self { + Self { + camera, + microphone, + audio, + } + } +} + +#[derive(Clone, Debug)] +pub(crate) struct LiveMediaControls { + path: PathBuf, + inner: Arc>, +} + +#[derive(Debug)] +struct LiveMediaControlsInner { + state: MediaControlState, +} + +impl LiveMediaControls { + #[must_use] + pub fn from_env(initial: MediaControlState) -> Self { + let path = std::env::var(MEDIA_CONTROL_ENV) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(DEFAULT_MEDIA_CONTROL_PATH)); + let controls = Self { + path, + inner: Arc::new(Mutex::new(LiveMediaControlsInner { state: initial })), + }; + let _ = controls.refresh(); + controls + } + + /// Reloads the launcher-written soft-pause switches, falling back safely on read errors. + pub fn refresh(&self) -> MediaControlState { + let Ok(mut inner) = self.inner.lock() else { + return MediaControlState::new(true, true, true); + }; + if let Ok(raw) = fs::read_to_string(&self.path) + && let Some(state) = parse_media_control_state(&raw) + { + inner.state = state; + } + inner.state + } +} + +/// Writes one atomic-ish soft-pause request for the running relay child to poll. +pub(crate) fn write_media_control_request( + path: &Path, + state: MediaControlState, +) -> std::io::Result<()> { + fs::write( + path, + format!( + "camera={} microphone={} audio={} {}\n", + bool_flag(state.camera), + bool_flag(state.microphone), + bool_flag(state.audio), + control_request_nonce() + ), + ) +} + +/// Parses the small launcher control-file grammar used across process boundaries. +fn parse_media_control_state(raw: &str) -> Option { + let mut camera = None; + let mut microphone = None; + let mut audio = None; + for token in raw.split_ascii_whitespace() { + let Some((key, value)) = token.split_once('=') else { + continue; + }; + match key { + "camera" => camera = Some(parse_bool_flag(value)?), + "microphone" | "mic" => microphone = Some(parse_bool_flag(value)?), + "audio" | "speaker" => audio = Some(parse_bool_flag(value)?), + _ => {} + } + } + Some(MediaControlState { + camera: camera?, + microphone: microphone?, + audio: audio?, + }) +} + +fn parse_bool_flag(value: &str) -> Option { + match value.trim().to_ascii_lowercase().as_str() { + "1" | "true" | "on" | "yes" => Some(true), + "0" | "false" | "off" | "no" => Some(false), + _ => None, + } +} + +const fn bool_flag(enabled: bool) -> &'static str { + if enabled { "1" } else { "0" } +} + +fn control_request_nonce() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos()) + .unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_media_control_state_from_launcher_file() { + assert_eq!( + parse_media_control_state("camera=1 microphone=0 audio=true 123"), + Some(MediaControlState::new(true, false, true)) + ); + } + + #[test] + fn live_media_controls_refresh_after_file_changes() { + let dir = tempfile::tempdir().expect("tempdir"); + let path = dir.path().join("media.control"); + write_media_control_request(&path, MediaControlState::new(true, true, false)) + .expect("write initial controls"); + let controls = LiveMediaControls { + path: path.clone(), + inner: Arc::new(Mutex::new(LiveMediaControlsInner { + state: MediaControlState::new(false, false, false), + })), + }; + assert_eq!( + controls.refresh(), + MediaControlState::new(true, true, false) + ); + + std::thread::sleep(std::time::Duration::from_millis(5)); + write_media_control_request(&path, MediaControlState::new(false, true, true)) + .expect("write updated controls"); + assert_eq!( + controls.refresh(), + MediaControlState::new(false, true, true) + ); + } + + #[test] + fn from_env_default_path_fallback_is_safe() { + let controls = LiveMediaControls::from_env(MediaControlState::new(true, false, true)); + let _ = controls.refresh(); + } + + #[test] + fn parser_tolerates_unknown_tokens_and_rejects_invalid_flags() { + assert_eq!( + parse_media_control_state("camera=on extra=ignored microphone=no audio=off"), + Some(MediaControlState::new(true, false, false)) + ); + assert_eq!( + parse_media_control_state("camera=maybe microphone=1 audio=1"), + None + ); + } + + #[test] + fn refresh_falls_back_to_all_enabled_if_lock_is_poisoned() { + let controls = LiveMediaControls { + path: PathBuf::from("/definitely/not/a/real/lesavka-media.control"), + inner: Arc::new(Mutex::new(LiveMediaControlsInner { + state: MediaControlState::new(false, false, false), + })), + }; + let inner = Arc::clone(&controls.inner); + let _ = std::panic::catch_unwind(move || { + let _guard = inner.lock().expect("lock"); + panic!("poison media controls lock"); + }); + assert_eq!(controls.refresh(), MediaControlState::new(true, true, true)); + } +} diff --git a/client/src/uplink_telemetry.rs b/client/src/uplink_telemetry.rs index 2f818db..510336b 100644 --- a/client/src/uplink_telemetry.rs +++ b/client/src/uplink_telemetry.rs @@ -143,6 +143,20 @@ impl UplinkTelemetryPublisher { } impl UplinkTelemetryHandle { + /// Updates whether this feed is intentionally active in the current relay session. + pub fn record_enabled(&self, enabled: bool) { + self.update(true, |stream| { + stream.enabled = enabled; + if !enabled { + stream.connected = false; + stream.queue_depth = 0; + stream.latest_enqueue_age_ms = 0.0; + stream.latest_delivery_age_ms = 0.0; + stream.last_error.clear(); + } + }); + } + /// Records a fresh gRPC connection attempt for this stream. pub fn record_reconnect_attempt(&self) { self.update(false, |stream| { @@ -298,4 +312,25 @@ mod tests { assert_eq!(snapshot.camera.delivery_age_peak_ms, 55.0); assert_eq!(snapshot.camera.last_error, "stream ended"); } + + /// Soft-paused streams should report disabled without looking stale or crashed. + #[test] + fn handle_enabled_updates_expose_live_soft_pause_state() { + let temp_dir = tempfile::tempdir().expect("tempdir"); + let path = temp_dir.path().join("uplink.json"); + let publisher = UplinkTelemetryPublisher::new(path.clone(), true, true); + let microphone = publisher.handle(UpstreamStreamKind::Microphone); + + microphone.record_connected(); + microphone.record_enqueue(4, 12.0, 0.0); + microphone.record_enabled(false); + publisher.flush_now(); + + let snapshot = load_uplink_telemetry(&path).expect("load snapshot"); + assert!(!snapshot.microphone.enabled); + assert!(!snapshot.microphone.connected); + assert_eq!(snapshot.microphone.queue_depth, 0); + assert_eq!(snapshot.microphone.latest_enqueue_age_ms, 0.0); + assert!(snapshot.microphone.last_error.is_empty()); + } } diff --git a/common/Cargo.toml b/common/Cargo.toml index dfa4462..73549ec 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.16.3" +version = "0.16.4" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index 9dc47b0..42cc4fb 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -162,6 +162,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_LIVE_KEYBOARD_REPORT_DELAY_MS` | input routing/clipboard override | | `LESAVKA_LIVE_MODIFIER_DELAY_MS` | input routing/clipboard override | | `LESAVKA_MAX_SPEED` | document near use before promoting to operator config | +| `LESAVKA_MEDIA_CONTROL` | launcher-to-relay live media control file; used for soft-pausing camera, mic, or speaker streams without hard-resetting USB | | `LESAVKA_MEDIA_GATE_PUSHGATEWAY_JOB` | CI metrics destination override | | `LESAVKA_MIC_DISABLE` | client media capture/playback override | | `LESAVKA_MIC_DISABLE_PIPEWIRE` | client media capture/playback override | @@ -304,6 +305,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_UAC_SANITY_VOLUME` | manual UAC sanity probe override | | `LESAVKA_UPLINK_TELEMETRY` | launcher/uplink telemetry path override | | `LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS` | upstream A/V timing override | +| `LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_US` | upstream A/V timing override used by test contracts and server startup grace handling | | `LESAVKA_UPSTREAM_REANCHOR_LATE_MS` | upstream A/V timing override | | `LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS` | upstream A/V timing override | | `LESAVKA_UVC_CONTROL_READ_ONLY` | UVC helper runtime override | diff --git a/scripts/ci/hygiene_gate_baseline.json b/scripts/ci/hygiene_gate_baseline.json index 38f2723..83d1840 100644 --- a/scripts/ci/hygiene_gate_baseline.json +++ b/scripts/ci/hygiene_gate_baseline.json @@ -13,7 +13,7 @@ "client/src/app/downlink_media.rs": { "clippy_warnings": 0, "doc_debt": 3, - "loc": 209 + "loc": 231 }, "client/src/app/input_streams.rs": { "clippy_warnings": 0, @@ -23,12 +23,12 @@ "client/src/app/session_lifecycle.rs": { "clippy_warnings": 0, "doc_debt": 3, - "loc": 324 + "loc": 348 }, "client/src/app/uplink_media.rs": { "clippy_warnings": 0, "doc_debt": 2, - "loc": 329 + "loc": 362 }, "client/src/app_support.rs": { "clippy_warnings": 0, @@ -148,7 +148,7 @@ "client/src/input/microphone.rs": { "clippy_warnings": 0, "doc_debt": 12, - "loc": 413 + "loc": 479 }, "client/src/input/mod.rs": { "clippy_warnings": 0, @@ -293,7 +293,7 @@ "client/src/launcher/ui.rs": { "clippy_warnings": 0, "doc_debt": 1, - "loc": 193 + "loc": 194 }, "client/src/launcher/ui/activation_context.rs": { "clippy_warnings": 0, @@ -308,7 +308,7 @@ "client/src/launcher/ui/control_requests.rs": { "clippy_warnings": 0, "doc_debt": 1, - "loc": 214 + "loc": 246 }, "client/src/launcher/ui/device_refresh_binding.rs": { "clippy_warnings": 0, @@ -338,7 +338,7 @@ "client/src/launcher/ui/media_device_bindings.rs": { "clippy_warnings": 0, "doc_debt": 0, - "loc": 167 + "loc": 182 }, "client/src/launcher/ui/message_and_network_state.rs": { "clippy_warnings": 0, @@ -458,7 +458,7 @@ "client/src/launcher/ui_runtime/control_paths.rs": { "clippy_warnings": 0, "doc_debt": 9, - "loc": 257 + "loc": 275 }, "client/src/launcher/ui_runtime/display_popouts.rs": { "clippy_warnings": 0, @@ -473,7 +473,7 @@ "client/src/launcher/ui_runtime/process_logs.rs": { "clippy_warnings": 0, "doc_debt": 5, - "loc": 216 + "loc": 219 }, "client/src/launcher/ui_runtime/report_popouts.rs": { "clippy_warnings": 0, @@ -483,12 +483,12 @@ "client/src/launcher/ui_runtime/status_details.rs": { "clippy_warnings": 0, "doc_debt": 12, - "loc": 345 + "loc": 427 }, "client/src/launcher/ui_runtime/status_refresh.rs": { "clippy_warnings": 0, "doc_debt": 3, - "loc": 316 + "loc": 351 }, "client/src/layout.rs": { "clippy_warnings": 0, @@ -498,13 +498,18 @@ "client/src/lib.rs": { "clippy_warnings": 0, "doc_debt": 0, - "loc": 25 + "loc": 26 }, "client/src/live_capture_clock.rs": { "clippy_warnings": 0, "doc_debt": 7, "loc": 429 }, + "client/src/live_media_control.rs": { + "loc": 200, + "clippy_warnings": 0, + "doc_debt": 3 + }, "client/src/main.rs": { "clippy_warnings": 0, "doc_debt": 2, @@ -648,7 +653,7 @@ "client/src/uplink_telemetry.rs": { "clippy_warnings": 0, "doc_debt": 4, - "loc": 301 + "loc": 336 }, "client/src/video_support.rs": { "clippy_warnings": 0, @@ -808,7 +813,7 @@ "server/src/main.rs": { "clippy_warnings": 0, "doc_debt": 1, - "loc": 99 + "loc": 100 }, "server/src/main/entrypoint.rs": { "clippy_warnings": 0, @@ -832,8 +837,8 @@ }, "server/src/main/relay_service.rs": { "clippy_warnings": 0, - "doc_debt": 5, - "loc": 485 + "doc_debt": 3, + "loc": 498 }, "server/src/main/relay_service_coverage.rs": { "clippy_warnings": 0, @@ -843,7 +848,12 @@ "server/src/main/relay_service_tests.rs": { "clippy_warnings": 0, "doc_debt": 1, - "loc": 30 + "loc": 60 + }, + "server/src/main/relay_stream_lifecycle.rs": { + "clippy_warnings": 0, + "doc_debt": 0, + "loc": 130 }, "server/src/main/rpc_helpers.rs": { "clippy_warnings": 0, diff --git a/scripts/ci/quality_gate_baseline.json b/scripts/ci/quality_gate_baseline.json index 72a02e7..de3dcce 100644 --- a/scripts/ci/quality_gate_baseline.json +++ b/scripts/ci/quality_gate_baseline.json @@ -6,7 +6,7 @@ }, "client/src/app/session_lifecycle.rs": { "line_percent": 97.56, - "loc": 324 + "loc": 348 }, "client/src/app_support.rs": { "line_percent": 100.0, @@ -93,8 +93,8 @@ "loc": 196 }, "client/src/input/microphone.rs": { - "line_percent": 100.0, - "loc": 413 + "line_percent": 99.63, + "loc": 479 }, "client/src/input/mouse.rs": { "line_percent": 98.85, @@ -146,7 +146,7 @@ }, "client/src/launcher/ui.rs": { "line_percent": 100.0, - "loc": 193 + "loc": 194 }, "client/src/launcher/ui/session_preview_coverage.rs": { "line_percent": 100.0, @@ -160,6 +160,10 @@ "line_percent": 99.08, "loc": 429 }, + "client/src/live_media_control.rs": { + "line_percent": 100.0, + "loc": 203 + }, "client/src/main.rs": { "line_percent": 100.0, "loc": 101 @@ -249,8 +253,8 @@ "loc": 284 }, "client/src/uplink_telemetry.rs": { - "line_percent": 95.76, - "loc": 301 + "line_percent": 96.89, + "loc": 336 }, "client/src/video_support.rs": { "line_percent": 97.3, @@ -358,7 +362,7 @@ }, "server/src/main.rs": { "line_percent": 100.0, - "loc": 99 + "loc": 100 }, "server/src/main/entrypoint.rs": { "line_percent": 100.0, @@ -384,6 +388,10 @@ "line_percent": 96.53, "loc": 301 }, + "server/src/main/relay_stream_lifecycle.rs": { + "line_percent": 100.0, + "loc": 130 + }, "server/src/main/rpc_helpers.rs": { "line_percent": 100.0, "loc": 118 diff --git a/server/Cargo.toml b/server/Cargo.toml index 79d2434..1e29a85 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.16.3" +version = "0.16.4" edition = "2024" autobins = false diff --git a/server/src/main.rs b/server/src/main.rs index a0445f6..864a38c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -94,6 +94,7 @@ include!("main/rpc_helpers.rs"); include!("main/usb_recovery_helpers.rs"); include!("main/eye_hub.rs"); +include!("main/relay_stream_lifecycle.rs"); include!("main/relay_service.rs"); include!("main/relay_service_coverage.rs"); include!("main/entrypoint.rs"); diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 601ae60..46615e9 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -1,31 +1,6 @@ -/*──────────────── gRPC service ─────────────*/ -#[cfg(not(coverage))] -fn upstream_stale_drop_budget() -> Duration { - let drop_ms = std::env::var("LESAVKA_UPSTREAM_STALE_DROP_MS") - .ok() - .and_then(|value| value.trim().parse::().ok()) - .unwrap_or(80); - Duration::from_millis(drop_ms) -} - -#[cfg(not(coverage))] -fn retain_freshest_video_packet( - pending: &mut std::collections::VecDeque, -) -> usize { - if pending.len() <= 1 { - return 0; - } - let newest = pending.pop_back().expect("non-empty pending video queue"); - let dropped = pending.len(); - pending.clear(); - pending.push_back(newest); - dropped -} - #[cfg(not(coverage))] #[tonic::async_trait] impl Relay for Handler { - /* existing streams ─ unchanged, except: no more auto-reset */ type StreamKeyboardStream = ReceiverStream>; type StreamMouseStream = ReceiverStream>; type CaptureVideoStream = VideoStream; @@ -145,11 +120,11 @@ impl Relay for Handler { session_id = lease.session_id, "🎤 stream_microphone stood down before the sink became available" ); + self.upstream_media_rt.close_microphone(lease.generation); return Err(Status::aborted( "microphone stream superseded before sink became available", )); }; - // 1 ─ build once, early let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); info!(%uac_dev, "🎤 stream_microphone using UAC sink"); let mut sink = runtime_support::open_voice_with_retry(&uac_dev) @@ -159,13 +134,17 @@ impl Relay for Handler { Status::internal(format!("{e:#}")) })?; - // 2 ─ dummy outbound stream (same trick as before) let (tx, rx) = tokio::sync::mpsc::channel(1); let upstream_media_rt = self.upstream_media_rt.clone(); - // 3 ─ drive the sink in a background task tokio::spawn(async move { let _microphone_sink_permit = microphone_sink_permit; + let mut cleanup = UpstreamStreamCleanup::microphone( + upstream_media_rt.clone(), + lease.generation, + rpc_id, + lease.session_id, + ); let mut inbound = req.into_inner(); let mut pending = std::collections::VecDeque::new(); let mut inbound_closed = false; @@ -175,6 +154,7 @@ impl Relay for Handler { loop { if !upstream_media_rt.is_microphone_active(lease.generation) { info!(rpc_id, session_id = lease.session_id, "🎤 stream_microphone session superseded"); + cleanup.mark_superseded(); break; } if !inbound_closed { @@ -183,14 +163,24 @@ impl Relay for Handler { _ = tokio::time::sleep(Duration::from_millis(50)) => None, }; if let Some(next_packet) = next_packet { - match next_packet.transpose()? { - Some(pkt) => pending.push_back(pkt), - None => inbound_closed = true, + match next_packet.transpose() { + Ok(Some(pkt)) => pending.push_back(pkt), + Ok(None) => inbound_closed = true, + Err(err) => { + cleanup.mark_aborted(); + warn!( + rpc_id, + session_id = lease.session_id, + "🎤 stream_microphone inbound error before clean EOF: {err}" + ); + break; + } } } } let Some(mut pkt) = pending.pop_front() else { if inbound_closed { + cleanup.mark_closed(); break; } continue; @@ -237,9 +227,7 @@ impl Relay for Handler { sink.push(&pkt); } sink.finish(); // flush on EOS - upstream_media_rt.close_microphone(lease.generation); let _ = tx.send(Ok(Empty {})).await; - info!(rpc_id, session_id = lease.session_id, "🎤 stream_microphone closed"); Ok::<(), Status>(()) }); @@ -265,26 +253,43 @@ impl Relay for Handler { ); let upstream_lease = self.upstream_media_rt.activate_camera(); - let (session_id, relay, _relay_reused) = self.camera_rt.activate(&cfg).await?; + let (camera_session_id, relay, _relay_reused) = self.camera_rt.activate(&cfg).await?; let camera_rt = self.camera_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone(); - info!(rpc_id, session_id, "🎥 stream_camera opened"); + info!( + rpc_id, + session_id = upstream_lease.session_id, + camera_session_id, + "🎥 stream_camera opened" + ); let frame_step_us = (1_000_000u64 / u64::from(cfg.fps.max(1))).max(1); - // dummy outbound (same pattern as other streams) let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { + let mut cleanup = UpstreamStreamCleanup::camera( + upstream_media_rt.clone(), + upstream_lease.generation, + rpc_id, + upstream_lease.session_id, + camera_session_id, + ); let mut s = req.into_inner(); let mut pending = std::collections::VecDeque::new(); let mut inbound_closed = false; let stale_drop_budget = upstream_stale_drop_budget(); let mut startup_video_settled = false; loop { - if !camera_rt.is_active(session_id) + if !camera_rt.is_active(camera_session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) { - info!(rpc_id, session_id, "🎥 stream_camera session superseded"); + info!( + rpc_id, + session_id = upstream_lease.session_id, + camera_session_id, + "🎥 stream_camera session superseded" + ); + cleanup.mark_superseded(); break; } if !inbound_closed { @@ -293,25 +298,37 @@ impl Relay for Handler { _ = tokio::time::sleep(Duration::from_millis(50)) => None, }; if let Some(next_packet) = next_packet { - match next_packet.transpose()? { - Some(pkt) => { + match next_packet.transpose() { + Ok(Some(pkt)) => { pending.push_back(pkt); let coalesced = retain_freshest_video_packet(&mut pending); if coalesced > 0 { tracing::debug!( rpc_id, - session_id, + session_id = upstream_lease.session_id, + camera_session_id, dropped = coalesced, "🎥 coalesced stale upstream video backlog down to the freshest frame" ); } } - None => inbound_closed = true, + Ok(None) => inbound_closed = true, + Err(err) => { + cleanup.mark_aborted(); + warn!( + rpc_id, + session_id = upstream_lease.session_id, + camera_session_id, + "🎥 stream_camera inbound error before clean EOF: {err}" + ); + break; + } } } } let Some(mut pkt) = pending.pop_front() else { if inbound_closed { + cleanup.mark_closed(); break; } continue; @@ -321,7 +338,8 @@ impl Relay for Handler { if inbound_closed { tracing::debug!( rpc_id, - session_id, + session_id = upstream_lease.session_id, + camera_session_id, pts = pkt.pts, "🎥 dropping trailing upstream video frame because no paired audio arrived before stream close" ); @@ -341,7 +359,8 @@ impl Relay for Handler { { tracing::warn!( rpc_id, - session_id, + session_id = upstream_lease.session_id, + camera_session_id, pts = plan.local_pts_us, "🎥 upstream video frame dropped because the audio master never caught up inside the pairing window" ); @@ -352,7 +371,8 @@ impl Relay for Handler { if startup_video_settled { tracing::warn!( rpc_id, - session_id, + session_id = upstream_lease.session_id, + camera_session_id, late_by_ms = plan.late_by.as_millis(), pts = plan.local_pts_us, dropped_pending = coalesced, @@ -361,7 +381,8 @@ impl Relay for Handler { } else { tracing::debug!( rpc_id, - session_id, + session_id = upstream_lease.session_id, + camera_session_id, late_by_ms = plan.late_by.as_millis(), pts = plan.local_pts_us, dropped_pending = coalesced, @@ -379,7 +400,8 @@ impl Relay for Handler { if startup_video_settled { tracing::warn!( rpc_id, - session_id, + session_id = upstream_lease.session_id, + camera_session_id, late_by_ms = actual_late_by.as_millis(), pts = plan.local_pts_us, dropped_pending = coalesced, @@ -388,7 +410,8 @@ impl Relay for Handler { } else { tracing::debug!( rpc_id, - session_id, + session_id = upstream_lease.session_id, + camera_session_id, late_by_ms = actual_late_by.as_millis(), pts = plan.local_pts_us, dropped_pending = coalesced, @@ -401,9 +424,7 @@ impl Relay for Handler { startup_video_settled = true; relay.feed(pkt); // ← all logging inside video.rs } - upstream_media_rt.close_camera(upstream_lease.generation); tx.send(Ok(Empty {})).await.ok(); - info!(rpc_id, session_id, "🎥 stream_camera closed"); Ok::<(), Status>(()) }); @@ -473,13 +494,5 @@ impl Relay for Handler { } } -fn remote_audio_status(message: String) -> Status { - if message.contains("remote USB gadget is not attached") { - Status::unavailable(message) - } else { - Status::internal(message) - } -} - #[cfg(test)] include!("relay_service_tests.rs"); diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index 5a445e3..95986ed 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -1,8 +1,10 @@ -#[cfg(test)] +#[cfg(all(test, not(coverage)))] #[allow(clippy::items_after_test_module)] mod tests { - use super::retain_freshest_video_packet; + use super::{UpstreamStreamCleanup, retain_freshest_video_packet}; use lesavka_common::lesavka::VideoPacket; + use lesavka_server::upstream_media_runtime::UpstreamMediaRuntime; + use std::sync::Arc; #[test] fn retain_freshest_video_packet_keeps_only_the_latest_frame() { @@ -27,4 +29,32 @@ mod tests { assert_eq!(pending.len(), 1); assert_eq!(pending.front().map(|pkt| pkt.pts), Some(300)); } + + #[test] + fn upstream_cleanup_guard_closes_its_microphone_generation() { + let runtime = Arc::new(UpstreamMediaRuntime::new()); + let lease = runtime.activate_microphone(); + + { + let _guard = + UpstreamStreamCleanup::microphone(runtime.clone(), lease.generation, 1, lease.session_id); + } + + assert!(!runtime.is_microphone_active(lease.generation)); + } + + #[test] + fn upstream_cleanup_guard_cannot_close_a_newer_camera_generation() { + let runtime = Arc::new(UpstreamMediaRuntime::new()); + let first = runtime.activate_camera(); + let second = runtime.activate_camera(); + + { + let _guard = + UpstreamStreamCleanup::camera(runtime.clone(), first.generation, 2, first.session_id, 7); + } + + assert!(!runtime.is_camera_active(first.generation)); + assert!(runtime.is_camera_active(second.generation)); + } } diff --git a/server/src/main/relay_stream_lifecycle.rs b/server/src/main/relay_stream_lifecycle.rs new file mode 100644 index 0000000..5ce20d8 --- /dev/null +++ b/server/src/main/relay_stream_lifecycle.rs @@ -0,0 +1,130 @@ +#[cfg(not(coverage))] +fn upstream_stale_drop_budget() -> Duration { + let drop_ms = std::env::var("LESAVKA_UPSTREAM_STALE_DROP_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(80); + Duration::from_millis(drop_ms) +} + +#[cfg(not(coverage))] +/// Keeps only the newest webcam packet when the host path is already behind. +fn retain_freshest_video_packet( + pending: &mut std::collections::VecDeque, +) -> usize { + if pending.len() <= 1 { + return 0; + } + let newest = pending.pop_back().expect("non-empty pending video queue"); + let dropped = pending.len(); + pending.clear(); + pending.push_back(newest); + dropped +} + +#[cfg(not(coverage))] +#[derive(Clone, Copy, Debug)] +enum UpstreamStreamCleanupKind { + Microphone, + Camera, +} + +#[cfg(not(coverage))] +struct UpstreamStreamCleanup { + runtime: Arc, + kind: UpstreamStreamCleanupKind, + generation: u64, + rpc_id: u64, + session_id: u64, + camera_session_id: Option, + outcome: &'static str, +} + +#[cfg(not(coverage))] +impl UpstreamStreamCleanup { + fn microphone( + runtime: Arc, + generation: u64, + rpc_id: u64, + session_id: u64, + ) -> Self { + Self { + runtime, + kind: UpstreamStreamCleanupKind::Microphone, + generation, + rpc_id, + session_id, + camera_session_id: None, + outcome: "aborted", + } + } + + fn camera( + runtime: Arc, + generation: u64, + rpc_id: u64, + session_id: u64, + camera_session_id: u64, + ) -> Self { + Self { + runtime, + kind: UpstreamStreamCleanupKind::Camera, + generation, + rpc_id, + session_id, + camera_session_id: Some(camera_session_id), + outcome: "aborted", + } + } + + fn mark_closed(&mut self) { + self.outcome = "closed"; + } + + fn mark_superseded(&mut self) { + self.outcome = "superseded"; + } + + fn mark_aborted(&mut self) { + self.outcome = "aborted"; + } +} + +#[cfg(not(coverage))] +impl Drop for UpstreamStreamCleanup { + /// Closes only the stream generation owned by this RPC lifecycle guard. + fn drop(&mut self) { + match self.kind { + UpstreamStreamCleanupKind::Microphone => { + self.runtime.close_microphone(self.generation); + info!( + rpc_id = self.rpc_id, + session_id = self.session_id, + generation = self.generation, + outcome = self.outcome, + "🎤 stream_microphone lifecycle ended" + ); + } + UpstreamStreamCleanupKind::Camera => { + self.runtime.close_camera(self.generation); + info!( + rpc_id = self.rpc_id, + session_id = self.session_id, + camera_session_id = self.camera_session_id.unwrap_or_default(), + generation = self.generation, + outcome = self.outcome, + "🎥 stream_camera lifecycle ended" + ); + } + } + } +} + +/// Maps expected remote-audio availability failures onto retryable gRPC status codes. +fn remote_audio_status(message: String) -> Status { + if message.contains("remote USB gadget is not attached") { + Status::unavailable(message) + } else { + Status::internal(message) + } +} diff --git a/testing/tests/client_app_include_contract.rs b/testing/tests/client_app_include_contract.rs index a4ce9b6..f5ac4a8 100644 --- a/testing/tests/client_app_include_contract.rs +++ b/testing/tests/client_app_include_contract.rs @@ -37,6 +37,10 @@ mod uplink_fresh_queue; #[allow(warnings)] mod uplink_telemetry; +#[path = "../../client/src/live_media_control.rs"] +#[allow(warnings)] +mod live_media_control; + mod app_support { use super::handshake::PeerCaps; use std::time::Duration; diff --git a/testing/tests/client_launcher_runtime_contract.rs b/testing/tests/client_launcher_runtime_contract.rs index cdb6f24..b0b16ff 100644 --- a/testing/tests/client_launcher_runtime_contract.rs +++ b/testing/tests/client_launcher_runtime_contract.rs @@ -93,11 +93,21 @@ fn relay_address_entry_is_locked_while_relay_is_live() { assert!(UI_RUNTIME_SRC.contains(".set_sensitive(!relay_live && state.channels.microphone);")); assert!(UI_RUNTIME_SRC.contains("widgets.keyboard_combo.set_sensitive(!relay_live);")); assert!(UI_RUNTIME_SRC.contains("widgets.mouse_combo.set_sensitive(!relay_live);")); - assert!(UI_RUNTIME_SRC.contains("widgets.camera_channel_toggle.set_sensitive(!relay_live);")); assert!( - UI_RUNTIME_SRC.contains("widgets.microphone_channel_toggle.set_sensitive(!relay_live);") + UI_RUNTIME_SRC + .contains(".camera_channel_toggle\n .set_sensitive(!relay_live || state.devices.camera.is_some());") ); - assert!(UI_RUNTIME_SRC.contains("widgets.audio_channel_toggle.set_sensitive(!relay_live);")); + assert!( + UI_RUNTIME_SRC.contains( + ".microphone_channel_toggle\n .set_sensitive(!relay_live || state.devices.microphone.is_some());" + ) + ); + assert!( + UI_RUNTIME_SRC.contains( + ".audio_channel_toggle\n .set_sensitive(!relay_live || state.devices.speaker.is_some());" + ) + ); + assert!(UI_RUNTIME_SRC.contains("Soft-pause or resume this feed in the running relay")); assert!(UI_RUNTIME_SRC.contains("\"Connect\"")); assert!(UI_RUNTIME_SRC.contains("\"Disconnect\"")); }