diff --git a/Cargo.lock b/Cargo.lock index 3a76b9c..6f86e67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.12.3" +version = "0.12.4" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.12.3" +version = "0.12.4" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.12.3" +version = "0.12.4" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index a684316..0567936 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.12.3" +version = "0.12.4" edition = "2024" [dependencies] diff --git a/client/src/app.rs b/client/src/app.rs index 1dc4398..f6ad717 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -19,8 +19,7 @@ use winit::{ }; use lesavka_common::lesavka::{ - AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, - relay_client::RelayClient, + Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, relay_client::RelayClient, }; #[cfg(not(coverage))] diff --git a/client/src/app/session_lifecycle.rs b/client/src/app/session_lifecycle.rs index f65b066..26235a6 100644 --- a/client/src/app/session_lifecycle.rs +++ b/client/src/app/session_lifecycle.rs @@ -58,6 +58,10 @@ impl LesavkaClientApp { let caps = handshake::negotiate(&self.server_addr).await; tracing::info!("🀝 server capabilities = {:?}", caps); let camera_cfg = app_support::camera_config_from_caps(&caps); + let uplink_telemetry = crate::uplink_telemetry::UplinkTelemetryPublisher::from_env( + caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(), + caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(), + ); /*────────── persistent gRPC channels ──────────*/ let hid_ep = Channel::from_shared(self.server_addr.clone())? @@ -223,6 +227,8 @@ impl LesavkaClientApp { } let ep = vid_ep.clone(); let cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok(); + let cam_telemetry = + uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera); tokio::spawn(async move { let result = tokio::task::spawn_blocking(move || { CameraCapture::new(cam_source.as_deref(), camera_cfg) @@ -231,14 +237,20 @@ impl LesavkaClientApp { match result { Ok(Ok(cam)) => { let cam = Arc::new(cam); - tokio::spawn(Self::cam_loop(ep, cam)); + tokio::spawn(Self::cam_loop(ep, cam, cam_telemetry.clone())); } Ok(Err(err)) => { + cam_telemetry.record_disconnect(format!( + "webcam uplink setup failed: {err:#}" + )); warn!( "πŸ“Έ webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}" ); } Err(err) => { + cam_telemetry.record_disconnect(format!( + "webcam uplink setup task failed: {err}" + )); warn!( "πŸ“Έ webcam uplink setup task failed before StreamCamera could start: {err}" ); @@ -248,19 +260,27 @@ impl LesavkaClientApp { } if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { let ep = vid_ep.clone(); + let mic_telemetry = + uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); tokio::spawn(async move { let result = tokio::task::spawn_blocking(MicrophoneCapture::new).await; match result { Ok(Ok(mic)) => { let mic = Arc::new(mic); - tokio::spawn(Self::voice_loop(ep, mic)); + tokio::spawn(Self::voice_loop(ep, mic, mic_telemetry.clone())); } Ok(Err(err)) => { + mic_telemetry.record_disconnect(format!( + "microphone uplink setup failed: {err:#}" + )); warn!( "🎀 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}" ); } Err(err) => { + mic_telemetry.record_disconnect(format!( + "microphone uplink setup task failed: {err}" + )); warn!( "🎀 microphone uplink setup task failed before StreamMicrophone could start: {err}" ); diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 3b4b514..125fd8f 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -1,33 +1,71 @@ impl LesavkaClientApp { /*──────────────── mic stream ─────────────────*/ #[cfg(not(coverage))] - async fn voice_loop(ep: Channel, mic: Arc) { + async fn voice_loop( + ep: Channel, + mic: Arc, + telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, + ) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); - loop { - let mut cli = RelayClient::new(ep.clone()); - // 1. create a Tokio MPSC channel - let (tx, rx) = tokio::sync::mpsc::channel::(256); + loop { + telemetry.record_reconnect_attempt(); + let mut cli = RelayClient::new(ep.clone()); + let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE); let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); - // 2. spawn a real thread that does the blocking `pull()` let mic_clone = mic.clone(); + let telemetry_thread = telemetry.clone(); + let queue_thread = queue.clone(); std::thread::spawn(move || { + let mut age_tracker = PacketAgeTracker::default(); while stop_rx.try_recv().is_err() { if let Some(pkt) = mic_clone.pull() { trace!("πŸŽ€πŸ“€ cli {} bytes β†’ gRPC", pkt.data.len()); - let _ = tx.blocking_send(pkt); + let enqueue_age = age_tracker.packet_age(pkt.pts); + let stats = queue_thread.push(pkt, enqueue_age); + if stats.dropped_queue_full > 0 { + telemetry_thread.record_queue_full_drop(stats.dropped_queue_full); + } + telemetry_thread.record_enqueue( + queue_depth_u32(stats.queue_depth), + duration_ms(enqueue_age), + 0.0, + ); } } }); - // 3. turn `rx` into an async stream for gRPC - let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + let queue_stream = queue.clone(); + let telemetry_stream = telemetry.clone(); + let outbound = async_stream::stream! { + loop { + let next = queue_stream.pop_fresh().await; + if next.dropped_stale > 0 { + telemetry_stream.record_stale_drop(next.dropped_stale); + } + if let Some(packet) = next.packet { + telemetry_stream.record_streamed( + queue_depth_u32(next.queue_depth), + duration_ms(next.delivery_age), + ); + yield packet; + continue; + } + break; + } + }; + match cli.stream_microphone(Request::new(outbound)).await { - Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {}, + Ok(mut resp) => { + delay = Duration::from_secs(1); + telemetry.record_connected(); + while resp.get_mut().message().await.transpose().is_some() {} + telemetry.record_disconnect("microphone uplink stream ended"); + } Err(e) => { - // first failure β†’ warn, subsequent ones β†’ debug + telemetry.record_disconnect(format!("microphone uplink connect failed: {e}")); if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎀 connect failed: {e}"); warn!("⚠️🎀 further microphone‑stream failures will be logged at DEBUG"); @@ -37,6 +75,8 @@ impl LesavkaClientApp { delay = app_support::next_delay(delay); } } + + queue.close(); let _ = stop_tx.send(()); tokio::time::sleep(delay).await; } @@ -44,56 +84,142 @@ impl LesavkaClientApp { /*──────────────── cam stream ───────────────────*/ #[cfg(not(coverage))] - async fn cam_loop(ep: Channel, cam: Arc) { + async fn cam_loop( + ep: Channel, + cam: Arc, + telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, + ) { let mut delay = Duration::from_secs(1); + loop { + telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); - let (tx, rx) = tokio::sync::mpsc::channel::(256); + let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE); let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); + let cam_worker = std::thread::spawn({ let cam = cam.clone(); - move || loop { - if stop_rx.try_recv().is_ok() { - break; - } - let Some(pkt) = cam.pull() else { - std::thread::sleep(Duration::from_millis(5)); - continue; - }; - // TRACE every 120 frames only - static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); - let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n < 10 || n.is_multiple_of(120) { - tracing::trace!("πŸ“Έ cli frame#{n} {} B", pkt.data.len()); - } - tracing::trace!("πŸ“Έβ¬†οΈ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); - if tx.blocking_send(pkt).is_err() { - break; + let telemetry = telemetry.clone(); + let queue = queue.clone(); + move || { + let mut age_tracker = PacketAgeTracker::default(); + loop { + if stop_rx.try_recv().is_ok() { + break; + } + let Some(pkt) = cam.pull() else { + std::thread::sleep(Duration::from_millis(5)); + continue; + }; + static CNT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n.is_multiple_of(120) { + tracing::trace!("πŸ“Έ cli frame#{n} {} B", pkt.data.len()); + } + tracing::trace!("πŸ“Έβ¬†οΈ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); + let enqueue_age = age_tracker.packet_age(pkt.pts); + let stats = queue.push(pkt, enqueue_age); + if stats.dropped_queue_full > 0 { + telemetry.record_queue_full_drop(stats.dropped_queue_full); + } + telemetry.record_enqueue( + queue_depth_u32(stats.queue_depth), + duration_ms(enqueue_age), + 0.0, + ); } } }); - let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + let queue_stream = queue.clone(); + let telemetry_stream = telemetry.clone(); + let outbound = async_stream::stream! { + loop { + let next = queue_stream.pop_fresh().await; + if next.dropped_stale > 0 { + telemetry_stream.record_stale_drop(next.dropped_stale); + } + if let Some(packet) = next.packet { + telemetry_stream.record_streamed( + queue_depth_u32(next.queue_depth), + duration_ms(next.delivery_age), + ); + yield packet; + continue; + } + break; + } + }; + match cli.stream_camera(Request::new(outbound)).await { Ok(mut resp) => { - delay = Duration::from_secs(1); // got a stream β†’ reset + delay = Duration::from_secs(1); + telemetry.record_connected(); while resp.get_mut().message().await.transpose().is_some() {} + telemetry.record_disconnect("camera uplink stream ended"); } Err(e) if e.code() == tonic::Code::Unimplemented => { tracing::warn!("πŸ“Έ server does not support StreamCamera – giving up"); + telemetry.record_disconnect("camera uplink unavailable on server"); + queue.close(); let _ = stop_tx.send(()); let _ = cam_worker.join(); - return; // stop the task completely (#3) + return; } Err(e) => { + telemetry.record_disconnect(format!("camera uplink connect failed: {e}")); tracing::warn!("βŒπŸ“Έ connect failed: {e:?}"); - delay = app_support::next_delay(delay); // back-off (#2) + delay = app_support::next_delay(delay); } } + + queue.close(); let _ = stop_tx.send(()); let _ = cam_worker.join(); tokio::time::sleep(delay).await; } } - +} + +#[cfg(not(coverage))] +const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = + crate::uplink_fresh_queue::FreshQueueConfig { + capacity: 8, + max_age: Duration::from_millis(350), + }; + +#[cfg(not(coverage))] +const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = + crate::uplink_fresh_queue::FreshQueueConfig { + capacity: 16, + max_age: Duration::from_millis(400), + }; + +#[cfg(not(coverage))] +fn queue_depth_u32(depth: usize) -> u32 { + depth.try_into().unwrap_or(u32::MAX) +} + +#[cfg(not(coverage))] +fn duration_ms(duration: Duration) -> f32 { + duration.as_secs_f32() * 1_000.0 +} + +#[cfg(not(coverage))] +#[derive(Default)] +struct PacketAgeTracker { + origin: Option, +} + +#[cfg(not(coverage))] +impl PacketAgeTracker { + fn packet_age(&mut self, pts_us: u64) -> Duration { + let pts = Duration::from_micros(pts_us); + let now = Instant::now(); + let origin = self + .origin + .get_or_insert_with(|| now.checked_sub(pts).unwrap_or(now)); + now.saturating_duration_since(*origin + pts) + } } diff --git a/client/src/launcher/diagnostics/diagnostics_models.rs b/client/src/launcher/diagnostics/diagnostics_models.rs index 7376c38..edf7ee1 100644 --- a/client/src/launcher/diagnostics/diagnostics_models.rs +++ b/client/src/launcher/diagnostics/diagnostics_models.rs @@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::fmt::Write as _; +use crate::uplink_telemetry::UpstreamStreamTelemetry; + use super::{ devices::CameraMode, state::{CaptureSizeChoice, FeedSourcePreset, InputRouting, LauncherState, ViewMode}, @@ -48,6 +50,8 @@ pub struct PerformanceSample { pub right_stream_caps_label: String, pub right_decoded_caps_label: String, pub right_rendered_caps_label: String, + pub upstream_camera: UpstreamStreamTelemetry, + pub upstream_microphone: UpstreamStreamTelemetry, pub dropped_frames: u64, pub queue_depth: u32, } @@ -154,6 +158,8 @@ pub struct SnapshotReport { pub media_channels: MediaChannelState, pub audio_gain_label: String, pub mic_gain_label: String, + pub upstream_camera: UpstreamStreamTelemetry, + pub upstream_microphone: UpstreamStreamTelemetry, pub selected_keyboard: Option, pub selected_mouse: Option, pub status: String, diff --git a/client/src/launcher/diagnostics/recommendations.rs b/client/src/launcher/diagnostics/recommendations.rs index ba81e93..a93bb15 100644 --- a/client/src/launcher/diagnostics/recommendations.rs +++ b/client/src/launcher/diagnostics/recommendations.rs @@ -128,6 +128,53 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec= 120.0 + || sample.upstream_camera.enqueue_age_peak_ms >= 250.0 + || sample.upstream_camera.enqueue_block_peak_ms >= 40.0) + { + items.push( + "The webcam uplink queue is aging packets before they even leave the client. That points at backlog inside the relay child, so freshness-first dropping should help more than extra bitrate." + .to_string(), + ); + } + if sample.upstream_microphone.enabled + && (sample.upstream_microphone.latest_enqueue_age_ms >= 80.0 + || sample.upstream_microphone.enqueue_age_peak_ms >= 180.0 + || sample.upstream_microphone.enqueue_block_peak_ms >= 25.0) + { + items.push( + "The microphone uplink queue is aging live audio before transport. That is a direct path to lip-sync drift, so stale audio needs to be dropped instead of drained." + .to_string(), + ); + } + if sample.upstream_camera.reconnect_count >= 3 || sample.upstream_microphone.reconnect_count >= 3 + { + items.push( + "The upstream media loops have already reconnected several times during this session. Treat repeated connect churn as part of the lag budget, not just a cosmetic log problem." + .to_string(), + ); + } + if sample.upstream_camera.enabled + && (sample.upstream_camera.delivery_age_peak_ms >= 250.0 + || sample.upstream_camera.dropped_queue_full_packets > 0 + || sample.upstream_camera.dropped_stale_packets > 0) + { + items.push( + "The webcam uplink is now choosing freshness over completeness. If delivery age or stale-drop counts keep climbing, the source path is outrunning the live-call budget and needs more than bitrate tweaks." + .to_string(), + ); + } + if sample.upstream_microphone.enabled + && (sample.upstream_microphone.delivery_age_peak_ms >= 180.0 + || sample.upstream_microphone.dropped_queue_full_packets > 0 + || sample.upstream_microphone.dropped_stale_packets > 0) + { + items.push( + "The microphone uplink is dropping or aging live chunks to stay current. A few drops are healthier than minutes of lag, but persistent churn means the upstream path is still unstable." + .to_string(), + ); + } if sample.client_process_cpu_pct >= 85.0 { items.push(if hardware_decode_active { "Client process CPU is high even though hardware decode is active. If motion still looks rough, favor lighter breakout layouts or a cheaper source mode before adding more bitrate." diff --git a/client/src/launcher/diagnostics/snapshot_report.rs b/client/src/launcher/diagnostics/snapshot_report.rs index a87d9ee..0391adf 100644 --- a/client/src/launcher/diagnostics/snapshot_report.rs +++ b/client/src/launcher/diagnostics/snapshot_report.rs @@ -214,6 +214,12 @@ impl SnapshotReport { }, audio_gain_label: state.audio_gain_label(), mic_gain_label: state.mic_gain_label(), + upstream_camera: latest + .map(|sample| sample.upstream_camera.clone()) + .unwrap_or_default(), + upstream_microphone: latest + .map(|sample| sample.upstream_microphone.clone()) + .unwrap_or_default(), selected_keyboard: state.devices.keyboard.clone(), selected_mouse: state.devices.mouse.clone(), status: state.status_line(), @@ -334,6 +340,16 @@ impl SnapshotReport { self.mic_gain_label, self.media_channels.microphone ); + let _ = writeln!( + text, + " uplink camera: {}", + uplink_summary(&self.upstream_camera) + ); + let _ = writeln!( + text, + " uplink microphone: {}", + uplink_summary(&self.upstream_microphone) + ); let _ = writeln!( text, " keyboard: {}", @@ -388,6 +404,12 @@ impl SnapshotReport { sample.right_server_send_gap_peak_ms, sample.right_server_queue_peak ); + let _ = writeln!( + text, + " uplink: cam={} mic={}", + uplink_summary(&sample.upstream_camera), + uplink_summary(&sample.upstream_microphone) + ); } } let _ = writeln!(text); @@ -408,3 +430,36 @@ impl SnapshotReport { text } } + +fn uplink_summary(stream: &crate::uplink_telemetry::UpstreamStreamTelemetry) -> String { + if !stream.enabled { + return "disabled".to_string(); + } + let connection = if stream.connected { + "live" + } else if stream.reconnect_count > 0 { + "reconnecting" + } else { + "idle" + }; + let error = if stream.last_error.is_empty() { + "ok".to_string() + } else { + stream.last_error.clone() + }; + format!( + "{connection} queue={}/{} enq-age={:.0}/{:.0}ms delivery={:.0}/{:.0}ms block-peak={:.0}ms reconnects={} streamed={} drops(total/full/stale)={}/{}/{} error={error}", + stream.queue_depth, + stream.queue_peak, + stream.latest_enqueue_age_ms, + stream.enqueue_age_peak_ms, + stream.latest_delivery_age_ms, + stream.delivery_age_peak_ms, + stream.enqueue_block_peak_ms, + stream.reconnect_count, + stream.packets_streamed, + stream.dropped_packets, + stream.dropped_queue_full_packets, + stream.dropped_stale_packets + ) +} diff --git a/client/src/launcher/tests/diagnostics.rs b/client/src/launcher/tests/diagnostics.rs index 2604bd7..ddeada8 100644 --- a/client/src/launcher/tests/diagnostics.rs +++ b/client/src/launcher/tests/diagnostics.rs @@ -2,6 +2,7 @@ use super::*; use crate::launcher::state::{ CaptureSizePreset, DeviceSelection, DisplaySurface, FeedSourcePreset, LauncherState, }; +use crate::uplink_telemetry::UpstreamStreamTelemetry; fn sample(n: u64) -> PerformanceSample { PerformanceSample { @@ -50,6 +51,42 @@ fn sample(n: u64) -> PerformanceSample { "video/x-raw, format=(string)NV12, width=(int)1920, height=(int)1080".to_string(), right_rendered_caps_label: "video/x-raw, format=(string)RGBA, width=(int)1920, height=(int)1080".to_string(), + upstream_camera: UpstreamStreamTelemetry { + enabled: true, + connected: true, + reconnect_count: 2, + queue_depth: 3, + queue_peak: 7, + latest_enqueue_age_ms: 14.0, + enqueue_age_peak_ms: 48.0, + enqueue_block_peak_ms: 5.0, + packets_enqueued: 120, + packets_streamed: 118, + dropped_packets: 0, + dropped_queue_full_packets: 0, + dropped_stale_packets: 0, + latest_delivery_age_ms: 22.0, + delivery_age_peak_ms: 61.0, + last_error: String::new(), + }, + upstream_microphone: UpstreamStreamTelemetry { + enabled: true, + connected: true, + reconnect_count: 1, + queue_depth: 2, + queue_peak: 5, + latest_enqueue_age_ms: 11.0, + enqueue_age_peak_ms: 22.0, + enqueue_block_peak_ms: 3.0, + packets_enqueued: 220, + packets_streamed: 216, + dropped_packets: 1, + dropped_queue_full_packets: 1, + dropped_stale_packets: 0, + latest_delivery_age_ms: 19.0, + delivery_age_peak_ms: 37.0, + last_error: String::new(), + }, dropped_frames: n, queue_depth: n as u32, } @@ -119,6 +156,8 @@ fn snapshot_report_contains_state_fields_and_samples() { assert!(report.left_stream_caps_label.contains("video/x-h264")); assert!(report.left_decoded_caps_label.contains("video/x-raw")); assert!(report.left_rendered_caps_label.contains("video/x-raw")); + assert_eq!(report.upstream_camera.queue_peak, 7); + assert_eq!(report.upstream_microphone.reconnect_count, 1); } #[test] @@ -185,6 +224,8 @@ fn snapshot_text_mentions_versions_profiles_and_recommendations() { assert!(text.contains("decoded caps:")); assert!(text.contains("rendered caps:")); assert!(text.contains("media staging")); + assert!(text.contains("uplink camera:")); + assert!(text.contains("uplink microphone:")); assert!(text.contains("current UI state")); assert!(text.contains("recommendations")); } @@ -230,10 +271,66 @@ fn snapshot_text_renders_recent_samples_and_notes() { assert!(text.contains("server: unknown (reachable)")); assert!(text.contains("rtt=23.0ms")); assert!(text.contains("server=lx264enc:42/48/4")); + assert!(text.contains("uplink: cam=live queue=3/7")); assert!(text.contains("notes")); assert!(text.contains("operator changed camera quality during the run")); } +#[test] +fn recommendations_call_out_upstream_queue_age_and_reconnect_churn() { + let mut log = DiagnosticsLog::new(1); + let mut stressed = sample(9); + stressed.upstream_camera.latest_enqueue_age_ms = 180.0; + stressed.upstream_camera.enqueue_age_peak_ms = 320.0; + stressed.upstream_camera.enqueue_block_peak_ms = 55.0; + stressed.upstream_camera.delivery_age_peak_ms = 420.0; + stressed.upstream_camera.dropped_stale_packets = 4; + stressed.upstream_camera.reconnect_count = 4; + stressed.upstream_microphone.latest_enqueue_age_ms = 120.0; + stressed.upstream_microphone.enqueue_age_peak_ms = 260.0; + stressed.upstream_microphone.enqueue_block_peak_ms = 31.0; + stressed.upstream_microphone.delivery_age_peak_ms = 240.0; + stressed.upstream_microphone.dropped_queue_full_packets = 2; + log.record(stressed); + + let report = SnapshotReport::from_state( + &LauncherState::new(), + &log, + quality_probe_command().to_string(), + ); + + assert!( + report + .recommendations + .iter() + .any(|item| { item.contains("webcam uplink queue is aging packets") }) + ); + assert!( + report + .recommendations + .iter() + .any(|item| { item.contains("microphone uplink queue is aging live audio") }) + ); + assert!( + report + .recommendations + .iter() + .any(|item| { item.contains("upstream media loops have already reconnected") }) + ); + assert!( + report + .recommendations + .iter() + .any(|item| { item.contains("webcam uplink is now choosing freshness") }) + ); + assert!( + report + .recommendations + .iter() + .any(|item| { item.contains("microphone uplink is dropping or aging live chunks") }) + ); +} + #[test] fn snapshot_report_uses_effective_mirrored_capture_profile() { let mut state = LauncherState::new(); diff --git a/client/src/launcher/tests/ui_runtime.rs b/client/src/launcher/tests/ui_runtime.rs index 56a3f96..3707835 100644 --- a/client/src/launcher/tests/ui_runtime.rs +++ b/client/src/launcher/tests/ui_runtime.rs @@ -289,19 +289,19 @@ fn server_chip_state_tracks_connection_not_just_reachability() { assert_eq!(server_version_label(&state), "-"); state.set_server_available(true); - state.set_server_version(Some("0.12.3".to_string())); + state.set_server_version(Some("0.12.4".to_string())); assert_eq!(server_light_state(&state, false), StatusLightState::Live); - assert_eq!(server_version_label(&state), "v0.12.3"); + assert_eq!(server_version_label(&state), "v0.12.4"); assert_eq!( server_light_state(&state, true), StatusLightState::Connected ); - state.set_server_version(Some("v0.12.4".to_string())); + state.set_server_version(Some("v0.12.5".to_string())); assert_eq!(server_light_state(&state, false), StatusLightState::Warning); assert_eq!(server_light_state(&state, true), StatusLightState::Caution); - assert_eq!(server_version_label(&state), "v0.12.4"); + assert_eq!(server_version_label(&state), "v0.12.5"); state.set_server_version(Some(" ".to_string())); assert_eq!(server_light_state(&state, false), StatusLightState::Idle); @@ -413,6 +413,7 @@ fn dock_all_displays_to_preview_closes_popouts_and_resets_surfaces() { &DeviceCatalog::default(), &state_snapshot, ); + present_and_settle(&view.window); let child_proc = Rc::new(RefCell::new(None::)); let left_binding = PreviewBinding::test_stub(); @@ -505,6 +506,73 @@ fn dock_all_displays_to_preview_handles_reentrant_close_callbacks() { assert_eq!(state.borrow().display_surface(0), DisplaySurface::Preview); } +#[gtk::test] +#[serial] +fn dock_display_to_preview_restores_closed_eye_placeholder_when_relay_is_idle() { + if gtk::gdk::Display::default().is_none() { + return; + } + + let app = gtk::Application::builder() + .application_id("dev.lesavka.test-dock-placeholder") + .build(); + let _ = app.register(None::<>k::gio::Cancellable>); + + let state = Rc::new(RefCell::new(LauncherState::new())); + state + .borrow_mut() + .set_display_surface(1, DisplaySurface::Window); + let state_snapshot = state.borrow().clone(); + let view = build_launcher_view( + &app, + "http://127.0.0.1:50051", + &DeviceCatalog::default(), + &state_snapshot, + ); + let child_proc = Rc::new(RefCell::new(None::)); + + *view.widgets.display_panes[1].preview_binding.borrow_mut() = Some(PreviewBinding::test_stub()); + view.widgets.display_panes[1] + .preview_placeholder + .set_visible(false); + + { + let mut popouts = view.popouts.borrow_mut(); + popouts[1] = Some(PopoutWindowHandle { + window: gtk::ApplicationWindow::builder() + .application(&app) + .title("Right") + .build(), + root: gtk::Box::new(gtk::Orientation::Vertical, 0), + frame: gtk::AspectFrame::new(0.5, 0.5, 16.0 / 9.0, false), + picture: gtk::Picture::new(), + status_label: gtk::Label::new(None), + binding: PreviewBinding::test_stub(), + }); + } + + dock_display_to_preview(&state, &child_proc, &view.popouts, &view.widgets, 1); + + present_and_settle(&view.window); + + assert_eq!(state.borrow().display_surface(1), DisplaySurface::Preview); + assert!(view.popouts.borrow()[1].is_none()); + assert!( + view.widgets.display_panes[1].picture.paintable().is_none(), + "idle docked pane should not retain a stale preview texture" + ); + assert!( + view.widgets.display_panes[1] + .preview_placeholder + .is_visible(), + "closed-eye placeholder should return when docking back after disconnect" + ); + assert_eq!( + view.widgets.display_panes[1].stream_status.text(), + "Connect relay to preview." + ); +} + fn realistic_device_catalog() -> DeviceCatalog { DeviceCatalog { cameras: vec!["usb-046d_Logitech_BRIO_5F6EB379-video-index0".to_string()], diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index a24760c..9948a24 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -28,8 +28,8 @@ use { refresh_launcher_ui, refresh_test_buttons, routing_name, selected_combo_value, selected_server_addr, shutdown_launcher_runtime, spawn_client_process, stop_child_process, toggle_key_label, update_test_action_result, uplink_camera_preview_path, - uplink_mic_level_path, write_audio_gain_request, write_input_routing_request, - write_input_toggle_key_request, write_mic_gain_request, + uplink_mic_level_path, uplink_telemetry_path, write_audio_gain_request, + write_input_routing_request, write_input_toggle_key_request, write_mic_gain_request, }, crate::handshake::{HandshakeProbe, probe}, crate::output::display::enumerate_monitors, diff --git a/client/src/launcher/ui/diagnostic_sampling.rs b/client/src/launcher/ui/diagnostic_sampling.rs index a4d69e4..e21351e 100644 --- a/client/src/launcher/ui/diagnostic_sampling.rs +++ b/client/src/launcher/ui/diagnostic_sampling.rs @@ -69,6 +69,7 @@ fn record_diagnostics_sample( widgets: &super::ui_components::LauncherWidgets, state: &LauncherState, preview: Option<&super::preview::LauncherPreview>, + uplink: Option<&crate::uplink_telemetry::UplinkTelemetrySnapshot>, network: NetworkSnapshot, client_process_cpu_pct: f32, ) { @@ -148,6 +149,10 @@ fn record_diagnostics_sample( right_stream_caps_label: right_metrics.stream_caps_label.clone(), right_decoded_caps_label: right_metrics.decoded_caps_label.clone(), right_rendered_caps_label: right_metrics.rendered_caps_label.clone(), + upstream_camera: uplink.map(|snapshot| snapshot.camera.clone()).unwrap_or_default(), + upstream_microphone: uplink + .map(|snapshot| snapshot.microphone.clone()) + .unwrap_or_default(), dropped_frames: left_metrics .dropped_frames .saturating_add(right_metrics.dropped_frames), diff --git a/client/src/launcher/ui/runtime_poll.rs b/client/src/launcher/ui/runtime_poll.rs index 66aad6d..c33ca06 100644 --- a/client/src/launcher/ui/runtime_poll.rs +++ b/client/src/launcher/ui/runtime_poll.rs @@ -25,6 +25,7 @@ let log_tx = log_tx.clone(); let camera_preview_path = uplink_camera_preview_path(); let mic_level_path = uplink_mic_level_path(); + let uplink_telemetry_path = uplink_telemetry_path(); glib::timeout_add_local(Duration::from_millis(180), move || { let child_running = reap_exited_child(&child_proc); if let Some(preview) = preview.as_ref() { @@ -328,6 +329,8 @@ if now >= next_diagnostics_sample.get() { let network = diagnostics_network.borrow_mut().snapshot(); + let uplink = + crate::uplink_telemetry::load_uplink_telemetry(&uplink_telemetry_path); let client_process_cpu_pct = diagnostics_process .borrow_mut() .sample_percent() @@ -336,6 +339,7 @@ &widgets, &state.borrow(), preview.as_ref().map(|preview| preview.as_ref()), + uplink.as_ref(), network, client_process_cpu_pct, ); diff --git a/client/src/launcher/ui_runtime/control_paths.rs b/client/src/launcher/ui_runtime/control_paths.rs index 7c14f0a..763f258 100644 --- a/client/src/launcher/ui_runtime/control_paths.rs +++ b/client/src/launcher/ui_runtime/control_paths.rs @@ -87,6 +87,12 @@ pub fn uplink_mic_level_path() -> PathBuf { .unwrap_or_else(|_| PathBuf::from(DEFAULT_UPLINK_MIC_LEVEL_PATH)) } +pub fn uplink_telemetry_path() -> PathBuf { + std::env::var(UPLINK_TELEMETRY_ENV) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(DEFAULT_UPLINK_TELEMETRY_PATH)) +} + pub fn write_input_routing_request(path: &Path, routing: InputRouting) -> Result<()> { std::fs::write( path, diff --git a/client/src/launcher/ui_runtime/display_popouts.rs b/client/src/launcher/ui_runtime/display_popouts.rs index c5c0dbf..22807f6 100644 --- a/client/src/launcher/ui_runtime/display_popouts.rs +++ b/client/src/launcher/ui_runtime/display_popouts.rs @@ -187,6 +187,12 @@ pub fn dock_display_to_preview( state.set_display_surface(monitor_id, DisplaySurface::Preview); } let child_running = child_proc.borrow().is_some(); + if !child_running { + let pane = &widgets.display_panes[monitor_id]; + pane.picture.set_paintable(Option::<&gdk::Paintable>::None); + pane.preview_placeholder.set_visible(true); + pane.stream_status.set_text("Connect relay to preview."); + } let state_snapshot = state.borrow().clone(); refresh_launcher_ui(widgets, &state_snapshot, child_running); } @@ -229,6 +235,13 @@ pub fn dock_all_displays_to_preview( } let child_running = child_proc.borrow().is_some(); + if !child_running { + for pane in &widgets.display_panes { + pane.picture.set_paintable(Option::<&gdk::Paintable>::None); + pane.preview_placeholder.set_visible(true); + pane.stream_status.set_text("Connect relay to preview."); + } + } let state_snapshot = state.borrow().clone(); refresh_launcher_ui(widgets, &state_snapshot, child_running); } @@ -243,9 +256,10 @@ pub fn refresh_display_pane(pane: &DisplayPaneWidgets, surface: DisplaySurface) DisplaySurface::Preview => { pane.stack.set_visible_child_name("preview"); pane.action_button.set_label("Break Out"); + pane.preview_placeholder + .set_visible(pane.picture.paintable().is_none()); if pane.preview_binding.borrow().is_none() { pane.stream_status.set_text("Preview unavailable"); - pane.preview_placeholder.set_visible(true); } } DisplaySurface::Window => { diff --git a/client/src/launcher/ui_runtime/process_logs.rs b/client/src/launcher/ui_runtime/process_logs.rs index 3c7155f..075ef6c 100644 --- a/client/src/launcher/ui_runtime/process_logs.rs +++ b/client/src/launcher/ui_runtime/process_logs.rs @@ -45,6 +45,9 @@ pub fn spawn_client_process( let mic_level_path = uplink_mic_level_path(); let _ = std::fs::remove_file(&mic_level_path); command.env(UPLINK_MIC_LEVEL_ENV, mic_level_path); + let uplink_telemetry_path = uplink_telemetry_path(); + let _ = std::fs::remove_file(&uplink_telemetry_path); + command.env(UPLINK_TELEMETRY_ENV, uplink_telemetry_path); for (key, value) in runtime_env_vars(state) { command.env(key, value); } diff --git a/client/src/launcher/ui_runtime/status_refresh.rs b/client/src/launcher/ui_runtime/status_refresh.rs index 159c242..5393a33 100644 --- a/client/src/launcher/ui_runtime/status_refresh.rs +++ b/client/src/launcher/ui_runtime/status_refresh.rs @@ -31,6 +31,7 @@ pub const AUDIO_GAIN_CONTROL_ENV: &str = "LESAVKA_AUDIO_GAIN_CONTROL"; pub const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL"; pub const UPLINK_CAMERA_PREVIEW_ENV: &str = "LESAVKA_UPLINK_CAMERA_PREVIEW"; pub const UPLINK_MIC_LEVEL_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL"; +pub use crate::uplink_telemetry::{DEFAULT_UPLINK_TELEMETRY_PATH, UPLINK_TELEMETRY_ENV}; pub const DEFAULT_INPUT_CONTROL_PATH: &str = "/tmp/lesavka-launcher-input.control"; pub const DEFAULT_INPUT_STATE_PATH: &str = "/tmp/lesavka-launcher-input.state"; pub const DEFAULT_TOGGLE_KEY_CONTROL_PATH: &str = "/tmp/lesavka-launcher-toggle-key.control"; diff --git a/client/src/lib.rs b/client/src/lib.rs index 29aceb4..7640703 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -14,6 +14,9 @@ pub mod launcher; pub mod layout; pub mod output; pub mod paste; +pub(crate) mod uplink_fresh_queue; +pub(crate) mod uplink_latency_harness; +pub(crate) mod uplink_telemetry; pub(crate) mod video_support; pub use app::LesavkaClientApp; diff --git a/client/src/uplink_fresh_queue.rs b/client/src/uplink_fresh_queue.rs new file mode 100644 index 0000000..30c1bb8 --- /dev/null +++ b/client/src/uplink_fresh_queue.rs @@ -0,0 +1,288 @@ +#![cfg_attr(not(test), allow(dead_code))] +//! Freshness-first bounded queue for live-call upstream media. +//! +//! The queue keeps the newest useful packets and drops stale or superseded ones +//! instead of preserving backlog indefinitely. + +use std::{ + collections::VecDeque, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; +use tokio::sync::Notify; + +/// Queue admission/configuration for one live media stream. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct FreshQueueConfig { + /// Maximum number of packets retained while the transport catches up. + pub capacity: usize, + /// Maximum packet age tolerated before the packet is considered stale. + pub max_age: Duration, +} + +/// Statistics returned after pushing one packet into the bounded queue. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct QueuePushStats { + /// Queue depth after the packet was admitted. + pub queue_depth: usize, + /// Number of older packets dropped because the queue was already full. + pub dropped_queue_full: u64, +} + +/// Result of taking the next fresh packet from the queue. +#[derive(Clone, Debug)] +pub struct QueuePop { + /// Fresh packet ready to send, if any. + pub packet: Option, + /// Queue depth after removing stale packets and the returned packet. + pub queue_depth: usize, + /// Number of stale packets discarded before a fresh packet was found. + pub dropped_stale: u64, + /// Fresh packet age at the moment it left the queue. + pub delivery_age: Duration, + /// Whether the queue was closed and drained. + pub closed: bool, +} + +#[derive(Debug)] +struct QueuedPacket { + packet: T, + queued_at: Instant, + age_at_enqueue: Duration, +} + +#[derive(Debug)] +struct QueueState { + queue: VecDeque>, + closed: bool, +} + +/// Shared bounded queue for the blocking capture thread and async gRPC stream. +#[derive(Debug)] +pub struct FreshPacketQueue { + config: FreshQueueConfig, + inner: Arc>>, + notify: Arc, +} + +impl Clone for FreshPacketQueue { + fn clone(&self) -> Self { + Self { + config: self.config, + inner: Arc::clone(&self.inner), + notify: Arc::clone(&self.notify), + } + } +} + +impl FreshPacketQueue { + /// Creates a new bounded queue that favors fresh packets over backlog. + #[must_use] + pub fn new(config: FreshQueueConfig) -> Self { + assert!(config.capacity > 0, "fresh queue capacity must be non-zero"); + assert!( + !config.max_age.is_zero(), + "fresh queue max age must be non-zero" + ); + Self { + config, + inner: Arc::new(Mutex::new(QueueState { + queue: VecDeque::with_capacity(config.capacity), + closed: false, + })), + notify: Arc::new(Notify::new()), + } + } + + /// Pushes a new packet into the queue, dropping the oldest retained packet if full. + #[must_use] + pub fn push(&self, packet: T, age_at_enqueue: Duration) -> QueuePushStats { + let mut state = self.inner.lock().expect("fresh queue mutex poisoned"); + if state.closed { + return QueuePushStats::default(); + } + + let mut dropped_queue_full = 0_u64; + if state.queue.len() == self.config.capacity { + let _ = state.queue.pop_front(); + dropped_queue_full = 1; + } + state.queue.push_back(QueuedPacket { + packet, + queued_at: Instant::now(), + age_at_enqueue, + }); + let queue_depth = state.queue.len(); + drop(state); + self.notify.notify_one(); + QueuePushStats { + queue_depth, + dropped_queue_full, + } + } + + /// Closes the queue and wakes any waiting consumer. + pub fn close(&self) { + if let Ok(mut state) = self.inner.lock() { + state.closed = true; + } + self.notify.notify_waiters(); + } + + /// Pops the next fresh packet, discarding any packets that have aged past the live budget. + pub async fn pop_fresh(&self) -> QueuePop { + let mut dropped_stale = 0_u64; + loop { + let wait_for_more = { + let mut state = self.inner.lock().expect("fresh queue mutex poisoned"); + while let Some(front) = state.queue.pop_front() { + let delivery_age = front.age_at_enqueue + front.queued_at.elapsed(); + if delivery_age > self.config.max_age { + dropped_stale = dropped_stale.saturating_add(1); + continue; + } + return QueuePop { + packet: Some(front.packet), + queue_depth: state.queue.len(), + dropped_stale, + delivery_age, + closed: false, + }; + } + if state.closed { + return QueuePop { + packet: None, + queue_depth: 0, + dropped_stale, + delivery_age: Duration::ZERO, + closed: true, + }; + } + self.notify.notified() + }; + wait_for_more.await; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn push_drops_oldest_when_queue_is_full() { + let queue = FreshPacketQueue::new(FreshQueueConfig { + capacity: 2, + max_age: Duration::from_secs(1), + }); + + let first = queue.push(1_u8, Duration::ZERO); + let second = queue.push(2_u8, Duration::ZERO); + let third = queue.push(3_u8, Duration::ZERO); + + assert_eq!(first.dropped_queue_full, 0); + assert_eq!(second.queue_depth, 2); + assert_eq!(third.dropped_queue_full, 1); + + let popped = queue.pop_fresh().await; + assert_eq!(popped.packet, Some(2)); + let popped = queue.pop_fresh().await; + assert_eq!(popped.packet, Some(3)); + } + + #[tokio::test] + async fn pop_fresh_discards_stale_packets_before_returning_live_media() { + let queue = FreshPacketQueue::new(FreshQueueConfig { + capacity: 3, + max_age: Duration::from_millis(60), + }); + + let _ = queue.push(1_u8, Duration::from_millis(40)); + let _ = queue.push(2_u8, Duration::ZERO); + tokio::time::sleep(Duration::from_millis(30)).await; + + let popped = queue.pop_fresh().await; + assert_eq!(popped.packet, Some(2)); + assert_eq!(popped.dropped_stale, 1); + assert!(popped.delivery_age <= Duration::from_millis(40)); + } + + #[tokio::test] + async fn pop_fresh_returns_closed_when_queue_is_drained() { + let queue = FreshPacketQueue::::new(FreshQueueConfig { + capacity: 1, + max_age: Duration::from_secs(1), + }); + + queue.close(); + let popped = queue.pop_fresh().await; + assert!(popped.closed); + assert!(popped.packet.is_none()); + } + + #[tokio::test] + async fn clone_shares_the_same_underlying_queue() { + let queue = FreshPacketQueue::new(FreshQueueConfig { + capacity: 2, + max_age: Duration::from_secs(1), + }); + let cloned = queue.clone(); + + let _ = cloned.push(7_u8, Duration::ZERO); + let popped = queue.pop_fresh().await; + + assert_eq!(popped.packet, Some(7)); + assert_eq!(popped.queue_depth, 0); + } + + #[tokio::test] + async fn push_returns_default_after_close() { + let queue = FreshPacketQueue::new(FreshQueueConfig { + capacity: 2, + max_age: Duration::from_secs(1), + }); + + queue.close(); + let stats = queue.push(9_u8, Duration::ZERO); + let popped = queue.pop_fresh().await; + + assert_eq!(stats, QueuePushStats::default()); + assert!(popped.closed); + assert!(popped.packet.is_none()); + } + + #[tokio::test] + async fn pop_fresh_waits_for_a_future_packet() { + let queue = FreshPacketQueue::new(FreshQueueConfig { + capacity: 2, + max_age: Duration::from_secs(1), + }); + let waiter = queue.clone(); + + let task = tokio::spawn(async move { waiter.pop_fresh().await }); + tokio::time::sleep(Duration::from_millis(20)).await; + let _ = queue.push(5_u8, Duration::from_millis(10)); + + let popped = task.await.expect("waiter task"); + assert_eq!(popped.packet, Some(5)); + assert!(!popped.closed); + assert!(popped.delivery_age >= Duration::from_millis(10)); + } + + #[tokio::test] + async fn pop_fresh_waiter_wakes_when_the_queue_closes() { + let queue = FreshPacketQueue::::new(FreshQueueConfig { + capacity: 2, + max_age: Duration::from_secs(1), + }); + let waiter = queue.clone(); + + let task = tokio::spawn(async move { waiter.pop_fresh().await }); + tokio::time::sleep(Duration::from_millis(20)).await; + queue.close(); + + let popped = task.await.expect("waiter task"); + assert!(popped.closed); + assert!(popped.packet.is_none()); + } +} diff --git a/client/src/uplink_latency_harness.rs b/client/src/uplink_latency_harness.rs new file mode 100644 index 0000000..dd2d801 --- /dev/null +++ b/client/src/uplink_latency_harness.rs @@ -0,0 +1,270 @@ +#![cfg_attr(not(test), allow(dead_code))] +//! Synthetic upstream queue harness for live-call latency experiments. +//! +//! This models the relay child's bounded async queue under temporary downstream stalls. +//! The goal is not to emulate GStreamer perfectly; it is to answer the operational +//! question we care about: does the current queue policy preserve stale media instead +//! of keeping the stream fresh? + +use std::{collections::VecDeque, time::Duration}; + +/// Queue policy to simulate under upstream backpressure. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum UplinkQueuePolicy { + /// Preserve every queued packet, blocking new admission until space returns. + PreserveBacklog, + /// Drop the oldest queued packet when full so the newest packet stays live. + DropOldestWhenFull, +} + +/// Deterministic synthetic queue configuration. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct UplinkHarnessConfig { + /// Packet cadence at the capture side. + pub capture_interval: Duration, + /// Packet cadence at the consumer side. + pub consume_interval: Duration, + /// Number of packets admitted to the async queue before backpressure kicks in. + pub queue_capacity: usize, + /// Total packets the synthetic capture source will attempt to produce. + pub total_packets: usize, + /// Optional one-shot downstream stall start time. + pub stall_after: Option, + /// One-shot downstream stall duration. + pub stall_duration: Duration, +} + +/// Summary from one synthetic run. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct UplinkHarnessResult { + /// Packets accepted into the async queue. + pub enqueued_packets: usize, + /// Packets consumed from the async queue. + pub delivered_packets: usize, + /// Packets dropped at queue admission time. + pub dropped_packets: usize, + /// Highest queue depth observed. + pub max_queue_depth: usize, + /// Oldest packet age at queue admission time. + pub max_enqueue_age: Duration, + /// Oldest packet age at playout time. + pub max_delivery_age: Duration, + /// Longest time a producer packet spent blocked before queue admission. + pub max_block_time: Duration, +} + +#[derive(Clone, Copy, Debug)] +struct PendingPacket { + captured_at: Duration, + blocked_at: Option, +} + +/// Runs the synthetic queue harness under the selected policy. +#[must_use] +pub fn run_uplink_harness( + config: UplinkHarnessConfig, + policy: UplinkQueuePolicy, +) -> UplinkHarnessResult { + assert!(config.queue_capacity > 0, "queue capacity must be non-zero"); + assert!( + !config.capture_interval.is_zero(), + "capture interval must be non-zero" + ); + assert!( + !config.consume_interval.is_zero(), + "consume interval must be non-zero" + ); + + let mut result = UplinkHarnessResult::default(); + let mut queue: VecDeque = VecDeque::with_capacity(config.queue_capacity); + let mut produced = 0usize; + let mut next_capture_at = Duration::ZERO; + let mut next_consume_at = config.consume_interval; + let mut blocked_packet = None::; + + loop { + let capture_ready = (produced < config.total_packets && blocked_packet.is_none()) + .then_some(next_capture_at); + let consume_ready = ((!queue.is_empty() || blocked_packet.is_some()) + && produced <= config.total_packets) + .then_some(next_allowed_consume_at(next_consume_at, config)); + let next_event = min_option_duration(capture_ready, consume_ready); + let Some(event_time) = next_event else { + break; + }; + let now = event_time; + + if consume_ready == Some(now) { + if let Some(packet) = queue.pop_front() { + result.delivered_packets += 1; + result.max_delivery_age = result.max_delivery_age.max(now - packet.captured_at); + } + next_consume_at = now + config.consume_interval; + if let Some(packet) = blocked_packet.take() { + let enqueue_age = now - packet.captured_at; + result.max_enqueue_age = result.max_enqueue_age.max(enqueue_age); + if let Some(blocked_at) = packet.blocked_at { + result.max_block_time = result.max_block_time.max(now - blocked_at); + } + queue.push_back(PendingPacket { + blocked_at: None, + ..packet + }); + result.enqueued_packets += 1; + result.max_queue_depth = result.max_queue_depth.max(queue.len()); + next_capture_at = now + config.capture_interval; + } + continue; + } + + if capture_ready == Some(now) { + let packet = PendingPacket { + captured_at: now, + blocked_at: None, + }; + produced += 1; + if queue.len() < config.queue_capacity { + queue.push_back(packet); + result.enqueued_packets += 1; + result.max_queue_depth = result.max_queue_depth.max(queue.len()); + next_capture_at = now + config.capture_interval; + } else { + match policy { + UplinkQueuePolicy::PreserveBacklog => { + blocked_packet = Some(PendingPacket { + blocked_at: Some(now), + ..packet + }); + } + UplinkQueuePolicy::DropOldestWhenFull => { + let _ = queue.pop_front(); + result.dropped_packets += 1; + queue.push_back(packet); + result.enqueued_packets += 1; + result.max_queue_depth = result.max_queue_depth.max(queue.len()); + next_capture_at = now + config.capture_interval; + } + } + } + } + } + + result +} + +fn next_allowed_consume_at(next_consume_at: Duration, config: UplinkHarnessConfig) -> Duration { + let Some(stall_after) = config.stall_after else { + return next_consume_at; + }; + let stall_end = stall_after + config.stall_duration; + if next_consume_at >= stall_after && next_consume_at < stall_end { + stall_end + } else { + next_consume_at + } +} + +fn min_option_duration(left: Option, right: Option) -> Option { + match (left, right) { + (Some(left), Some(right)) => Some(left.min(right)), + (Some(left), None) => Some(left), + (None, Some(right)) => Some(right), + (None, None) => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn camera_stall_config() -> UplinkHarnessConfig { + UplinkHarnessConfig { + capture_interval: Duration::from_millis(33), + consume_interval: Duration::from_millis(33), + queue_capacity: 8, + total_packets: 160, + stall_after: Some(Duration::from_millis(800)), + stall_duration: Duration::from_secs(2), + } + } + + fn microphone_stall_config() -> UplinkHarnessConfig { + UplinkHarnessConfig { + capture_interval: Duration::from_millis(20), + consume_interval: Duration::from_millis(20), + queue_capacity: 16, + total_packets: 320, + stall_after: Some(Duration::from_millis(600)), + stall_duration: Duration::from_secs(2), + } + } + + #[test] + fn preserve_backlog_camera_policy_accumulates_stale_video_after_a_stall() { + let result = run_uplink_harness(camera_stall_config(), UplinkQueuePolicy::PreserveBacklog); + + assert_eq!(result.dropped_packets, 0); + assert!(result.max_queue_depth >= 8); + assert!( + result.max_enqueue_age >= Duration::from_millis(1500), + "expected stale video to build, got {:?}", + result.max_enqueue_age + ); + assert!( + result.max_delivery_age >= Duration::from_millis(1700), + "expected stale playout to build, got {:?}", + result.max_delivery_age + ); + assert!( + result.max_block_time >= Duration::from_millis(1500), + "expected capture-side blocking, got {:?}", + result.max_block_time + ); + } + + #[test] + fn preserve_backlog_microphone_policy_accumulates_stale_audio_after_a_stall() { + let result = run_uplink_harness( + microphone_stall_config(), + UplinkQueuePolicy::PreserveBacklog, + ); + + assert_eq!(result.dropped_packets, 0); + assert!(result.max_queue_depth >= 16); + assert!( + result.max_enqueue_age >= Duration::from_millis(1500), + "expected stale audio to build, got {:?}", + result.max_enqueue_age + ); + assert!( + result.max_delivery_age >= Duration::from_millis(1800), + "expected stale audio playout, got {:?}", + result.max_delivery_age + ); + } + + #[test] + fn drop_oldest_policy_keeps_media_live_by_sacrificing_old_packets() { + let camera = + run_uplink_harness(camera_stall_config(), UplinkQueuePolicy::DropOldestWhenFull); + let microphone = run_uplink_harness( + microphone_stall_config(), + UplinkQueuePolicy::DropOldestWhenFull, + ); + + assert!(camera.dropped_packets > 0); + assert!(microphone.dropped_packets > 0); + assert_eq!(camera.max_enqueue_age, Duration::ZERO); + assert_eq!(microphone.max_enqueue_age, Duration::ZERO); + assert!( + camera.max_delivery_age <= Duration::from_millis(350), + "freshness-first video should stay bounded, got {:?}", + camera.max_delivery_age + ); + assert!( + microphone.max_delivery_age <= Duration::from_millis(400), + "freshness-first audio should stay bounded, got {:?}", + microphone.max_delivery_age + ); + } +} diff --git a/client/src/uplink_telemetry.rs b/client/src/uplink_telemetry.rs new file mode 100644 index 0000000..2f818db --- /dev/null +++ b/client/src/uplink_telemetry.rs @@ -0,0 +1,301 @@ +//! Shared upstream media telemetry for the relay child and launcher. + +use serde::{Deserialize, Serialize}; +use std::{ + fs, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + time::{Duration, Instant, SystemTime, UNIX_EPOCH}, +}; + +/// Environment variable used to point the relay child at the upstream telemetry file. +pub const UPLINK_TELEMETRY_ENV: &str = "LESAVKA_UPLINK_TELEMETRY"; + +/// Default JSON file used by the relay child to publish upstream queue telemetry. +pub const DEFAULT_UPLINK_TELEMETRY_PATH: &str = "/tmp/lesavka-uplink-telemetry.json"; + +const FLUSH_INTERVAL: Duration = Duration::from_millis(250); + +/// Camera/microphone telemetry sampled by the launcher diagnostics pane. +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] +pub struct UplinkTelemetrySnapshot { + /// Last time the relay child wrote this snapshot, in Unix milliseconds. + pub updated_at_unix_ms: u128, + /// Upstream webcam queue telemetry. + pub camera: UpstreamStreamTelemetry, + /// Upstream microphone queue telemetry. + pub microphone: UpstreamStreamTelemetry, +} + +/// Per-stream state for measuring reconnect churn and queue staleness. +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] +pub struct UpstreamStreamTelemetry { + /// Whether the stream is enabled for the current relay session. + pub enabled: bool, + /// Whether the stream is currently attached to an active gRPC uplink. + pub connected: bool, + /// Number of gRPC connection attempts during the current relay child lifetime. + pub reconnect_count: u64, + /// Current bounded queue depth inside the relay child. + pub queue_depth: u32, + /// Highest observed bounded queue depth inside the relay child. + pub queue_peak: u32, + /// Age of the most recently enqueued packet, measured at queue admission time. + pub latest_enqueue_age_ms: f32, + /// Highest observed packet age at queue admission time. + pub enqueue_age_peak_ms: f32, + /// Highest observed time spent blocked while enqueuing into the async uplink queue. + pub enqueue_block_peak_ms: f32, + /// Number of packets admitted into the async uplink queue. + pub packets_enqueued: u64, + /// Number of packets emitted from the async uplink queue into the gRPC stream. + pub packets_streamed: u64, + /// Number of packets intentionally dropped before queue admission. + pub dropped_packets: u64, + /// Number of packets dropped because the bounded queue was already full. + pub dropped_queue_full_packets: u64, + /// Number of packets dropped because they exceeded the live-call age budget. + pub dropped_stale_packets: u64, + /// Age of the most recently streamed packet after queue residence was accounted for. + pub latest_delivery_age_ms: f32, + /// Highest observed streamed packet age after queue residence was accounted for. + pub delivery_age_peak_ms: f32, + /// Most recent queue/setup failure for this stream, if any. + pub last_error: String, +} + +#[derive(Clone, Copy, Debug)] +pub enum UpstreamStreamKind { + Camera, + Microphone, +} + +#[derive(Debug)] +struct TelemetryState { + path: PathBuf, + snapshot: UplinkTelemetrySnapshot, + last_flush: Instant, +} + +/// Shared publisher used by the relay child to update queue telemetry. +#[derive(Clone, Debug)] +pub struct UplinkTelemetryPublisher { + inner: Arc>, +} + +/// Per-stream handle used by individual uplink loops. +#[derive(Clone, Debug)] +pub struct UplinkTelemetryHandle { + kind: UpstreamStreamKind, + inner: Arc>, +} + +impl UplinkTelemetryPublisher { + /// Creates a publisher seeded with the current session's enabled uplinks. + #[must_use] + pub fn from_env(camera_enabled: bool, microphone_enabled: bool) -> Self { + let path = std::env::var(UPLINK_TELEMETRY_ENV) + .map(PathBuf::from) + .unwrap_or_else(|_| PathBuf::from(DEFAULT_UPLINK_TELEMETRY_PATH)); + Self::new(path, camera_enabled, microphone_enabled) + } + + /// Creates a publisher that writes to an explicit JSON path. + #[must_use] + pub fn new(path: PathBuf, camera_enabled: bool, microphone_enabled: bool) -> Self { + let state = TelemetryState { + path, + snapshot: UplinkTelemetrySnapshot { + updated_at_unix_ms: unix_time_ms(), + camera: UpstreamStreamTelemetry { + enabled: camera_enabled, + ..UpstreamStreamTelemetry::default() + }, + microphone: UpstreamStreamTelemetry { + enabled: microphone_enabled, + ..UpstreamStreamTelemetry::default() + }, + }, + last_flush: Instant::now() - FLUSH_INTERVAL, + }; + let publisher = Self { + inner: Arc::new(Mutex::new(state)), + }; + publisher.flush_now(); + publisher + } + + /// Returns a per-stream handle for camera or microphone updates. + #[must_use] + pub fn handle(&self, kind: UpstreamStreamKind) -> UplinkTelemetryHandle { + UplinkTelemetryHandle { + kind, + inner: Arc::clone(&self.inner), + } + } + + /// Forces an immediate write of the current snapshot. + pub fn flush_now(&self) { + if let Ok(mut state) = self.inner.lock() { + write_snapshot(&mut state, true); + } + } +} + +impl UplinkTelemetryHandle { + /// Records a fresh gRPC connection attempt for this stream. + pub fn record_reconnect_attempt(&self) { + self.update(false, |stream| { + stream.connected = false; + stream.queue_depth = 0; + stream.reconnect_count = stream.reconnect_count.saturating_add(1); + stream.last_error.clear(); + }); + } + + /// Marks the stream as successfully connected to the relay server. + pub fn record_connected(&self) { + self.update(true, |stream| { + stream.connected = true; + stream.last_error.clear(); + }); + } + + /// Marks the stream as disconnected and stores the latest error detail. + pub fn record_disconnect(&self, error: impl AsRef) { + let error = error.as_ref().trim().to_string(); + self.update(true, |stream| { + stream.connected = false; + stream.queue_depth = 0; + stream.latest_enqueue_age_ms = 0.0; + stream.latest_delivery_age_ms = 0.0; + if !error.is_empty() { + stream.last_error = error.clone(); + } + }); + } + + /// Records one packet entering the async uplink queue. + pub fn record_enqueue(&self, queue_depth: u32, enqueue_age_ms: f32, enqueue_block_ms: f32) { + self.update(false, |stream| { + stream.queue_depth = queue_depth; + stream.queue_peak = stream.queue_peak.max(queue_depth); + stream.latest_enqueue_age_ms = enqueue_age_ms.max(0.0); + stream.enqueue_age_peak_ms = stream.enqueue_age_peak_ms.max(enqueue_age_ms.max(0.0)); + stream.enqueue_block_peak_ms = + stream.enqueue_block_peak_ms.max(enqueue_block_ms.max(0.0)); + stream.packets_enqueued = stream.packets_enqueued.saturating_add(1); + stream.last_error.clear(); + }); + } + + /// Records packets dropped because the bounded queue was already full. + pub fn record_queue_full_drop(&self, count: u64) { + self.update(false, |stream| { + stream.dropped_packets = stream.dropped_packets.saturating_add(count); + stream.dropped_queue_full_packets = + stream.dropped_queue_full_packets.saturating_add(count); + }); + } + + /// Records packets dropped because they exceeded the live-call age budget. + pub fn record_stale_drop(&self, count: u64) { + self.update(false, |stream| { + stream.dropped_packets = stream.dropped_packets.saturating_add(count); + stream.dropped_stale_packets = stream.dropped_stale_packets.saturating_add(count); + }); + } + + /// Records one packet emitted from the bounded queue into the gRPC stream. + pub fn record_streamed(&self, queue_depth: u32, delivery_age_ms: f32) { + self.update(false, |stream| { + stream.connected = true; + stream.queue_depth = queue_depth; + stream.latest_delivery_age_ms = delivery_age_ms.max(0.0); + stream.delivery_age_peak_ms = stream.delivery_age_peak_ms.max(delivery_age_ms.max(0.0)); + stream.packets_streamed = stream.packets_streamed.saturating_add(1); + stream.last_error.clear(); + }); + } + + fn update(&self, force_flush: bool, update_stream: impl FnOnce(&mut UpstreamStreamTelemetry)) { + if let Ok(mut state) = self.inner.lock() { + state.snapshot.updated_at_unix_ms = unix_time_ms(); + update_stream(stream_for_kind(&mut state.snapshot, self.kind)); + write_snapshot(&mut state, force_flush); + } + } +} + +/// Loads the latest upstream telemetry snapshot written by the relay child. +pub fn load_uplink_telemetry(path: &Path) -> Option { + let raw = fs::read_to_string(path).ok()?; + serde_json::from_str(&raw).ok() +} + +fn stream_for_kind( + snapshot: &mut UplinkTelemetrySnapshot, + kind: UpstreamStreamKind, +) -> &mut UpstreamStreamTelemetry { + match kind { + UpstreamStreamKind::Camera => &mut snapshot.camera, + UpstreamStreamKind::Microphone => &mut snapshot.microphone, + } +} + +fn write_snapshot(state: &mut TelemetryState, force_flush: bool) { + if !force_flush && state.last_flush.elapsed() < FLUSH_INTERVAL { + return; + } + if let Ok(rendered) = serde_json::to_string(&state.snapshot) { + let _ = fs::write(&state.path, rendered); + state.last_flush = Instant::now(); + } +} + +fn unix_time_ms() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_millis()) + .unwrap_or_default() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn handle_updates_keep_queue_peaks_and_errors() { + let temp_dir = tempfile::tempdir().expect("tempdir"); + let path = temp_dir.path().join("uplink.json"); + let publisher = UplinkTelemetryPublisher::new(path.clone(), true, false); + let camera = publisher.handle(UpstreamStreamKind::Camera); + + camera.record_reconnect_attempt(); + camera.record_connected(); + camera.record_enqueue(3, 41.0, 6.0); + camera.record_enqueue(1, 12.0, 2.0); + camera.record_streamed(0, 55.0); + camera.record_queue_full_drop(2); + camera.record_stale_drop(3); + camera.record_disconnect("stream ended"); + publisher.flush_now(); + + let snapshot = load_uplink_telemetry(&path).expect("load snapshot"); + assert!(snapshot.camera.enabled); + assert!(!snapshot.microphone.enabled); + assert!(!snapshot.camera.connected); + assert_eq!(snapshot.camera.reconnect_count, 1); + assert_eq!(snapshot.camera.queue_peak, 3); + assert_eq!(snapshot.camera.latest_enqueue_age_ms, 0.0); + assert_eq!(snapshot.camera.enqueue_age_peak_ms, 41.0); + assert_eq!(snapshot.camera.enqueue_block_peak_ms, 6.0); + assert_eq!(snapshot.camera.packets_enqueued, 2); + assert_eq!(snapshot.camera.packets_streamed, 1); + assert_eq!(snapshot.camera.dropped_packets, 5); + assert_eq!(snapshot.camera.dropped_queue_full_packets, 2); + assert_eq!(snapshot.camera.dropped_stale_packets, 3); + assert_eq!(snapshot.camera.delivery_age_peak_ms, 55.0); + assert_eq!(snapshot.camera.last_error, "stream ended"); + } +} diff --git a/common/Cargo.toml b/common/Cargo.toml index fd9aba7..5fb01bf 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.12.3" +version = "0.12.4" edition = "2024" build = "build.rs" diff --git a/scripts/ci/quality_gate_baseline.json b/scripts/ci/quality_gate_baseline.json index b2d468d..4ddb5c6 100644 --- a/scripts/ci/quality_gate_baseline.json +++ b/scripts/ci/quality_gate_baseline.json @@ -6,7 +6,7 @@ }, "client/src/app/session_lifecycle.rs": { "line_percent": 97.56, - "loc": 304 + "loc": 324 }, "client/src/app_support.rs": { "line_percent": 100.0, @@ -102,15 +102,15 @@ }, "client/src/launcher/diagnostics/diagnostics_models.rs": { "line_percent": 100.0, - "loc": 164 + "loc": 170 }, "client/src/launcher/diagnostics/recommendations.rs": { - "line_percent": 97.56, - "loc": 230 + "line_percent": 97.62, + "loc": 277 }, "client/src/launcher/diagnostics/snapshot_report.rs": { - "line_percent": 99.35, - "loc": 410 + "line_percent": 98.22, + "loc": 465 }, "client/src/launcher/mod.rs": { "line_percent": 100.0, @@ -168,6 +168,18 @@ "line_percent": 100.0, "loc": 82 }, + "client/src/uplink_fresh_queue.rs": { + "line_percent": 100.0, + "loc": 288 + }, + "client/src/uplink_latency_harness.rs": { + "line_percent": 98.65, + "loc": 270 + }, + "client/src/uplink_telemetry.rs": { + "line_percent": 95.76, + "loc": 301 + }, "client/src/video_support.rs": { "line_percent": 97.3, "loc": 56 diff --git a/server/Cargo.toml b/server/Cargo.toml index 3d776e9..8604cb6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.12.3" +version = "0.12.4" edition = "2024" autobins = false