From 1a24db04d60976cde910fbceacd33da9ff6f0637 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 17 Apr 2026 05:50:24 -0300 Subject: [PATCH] lesavka: deepen stream diagnostics --- client/Cargo.toml | 2 +- client/src/launcher/diagnostics.rs | 143 ++++++++++++++++++++++++++++- client/src/launcher/preview.rs | 75 +++++++++++++-- client/src/launcher/ui.rs | 12 +++ common/Cargo.toml | 2 +- common/src/cli.rs | 2 +- server/Cargo.toml | 2 +- 7 files changed, 226 insertions(+), 12 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index ca9c021..797f756 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.8.3" +version = "0.9.0" edition = "2024" [dependencies] diff --git a/client/src/launcher/diagnostics.rs b/client/src/launcher/diagnostics.rs index 12dbe10..da3b956 100644 --- a/client/src/launcher/diagnostics.rs +++ b/client/src/launcher/diagnostics.rs @@ -14,9 +14,21 @@ pub struct PerformanceSample { pub left_receive_fps: f32, pub left_present_fps: f32, pub left_server_fps: f32, + pub left_stream_spread_ms: f32, + pub left_packet_gap_peak_ms: f32, + pub left_present_gap_peak_ms: f32, + pub left_queue_depth: u32, + pub left_queue_peak: u32, + pub left_decoder_label: String, pub right_receive_fps: f32, pub right_present_fps: f32, pub right_server_fps: f32, + pub right_stream_spread_ms: f32, + pub right_packet_gap_peak_ms: f32, + pub right_present_gap_peak_ms: f32, + pub right_queue_depth: u32, + pub right_queue_peak: u32, + pub right_decoder_label: String, pub dropped_frames: u64, pub queue_depth: u32, } @@ -75,10 +87,22 @@ pub struct SnapshotReport { pub left_capture_profile: String, pub left_capture_transport: String, pub left_breakout_profile: String, + pub left_decoder_label: String, + pub left_stream_spread_ms: f32, + pub left_packet_gap_peak_ms: f32, + pub left_present_gap_peak_ms: f32, + pub left_queue_depth: u32, + pub left_queue_peak: u32, pub right_surface: String, pub right_capture_profile: String, pub right_capture_transport: String, pub right_breakout_profile: String, + pub right_decoder_label: String, + pub right_stream_spread_ms: f32, + pub right_packet_gap_peak_ms: f32, + pub right_present_gap_peak_ms: f32, + pub right_queue_depth: u32, + pub right_queue_peak: u32, pub selected_camera: Option, pub selected_microphone: Option, pub selected_speaker: Option, @@ -97,6 +121,7 @@ impl SnapshotReport { let right_capture = state.capture_size_choice(1); let left_breakout = state.breakout_size_choice(0); let right_breakout = state.breakout_size_choice(1); + let latest = log.latest(); Self { client_version: crate::VERSION.to_string(), server_version: state.server_version.clone(), @@ -134,6 +159,26 @@ impl SnapshotReport { left_breakout.width, left_breakout.height ), + left_decoder_label: latest + .map(|sample| { + if sample.left_decoder_label.is_empty() { + "pending".to_string() + } else { + sample.left_decoder_label.clone() + } + }) + .unwrap_or_else(|| "pending".to_string()), + left_stream_spread_ms: latest + .map(|sample| sample.left_stream_spread_ms) + .unwrap_or(0.0), + left_packet_gap_peak_ms: latest + .map(|sample| sample.left_packet_gap_peak_ms) + .unwrap_or(0.0), + left_present_gap_peak_ms: latest + .map(|sample| sample.left_present_gap_peak_ms) + .unwrap_or(0.0), + left_queue_depth: latest.map(|sample| sample.left_queue_depth).unwrap_or(0), + left_queue_peak: latest.map(|sample| sample.left_queue_peak).unwrap_or(0), right_surface: state.display_surface(1).label().to_string(), right_capture_profile: format!( "{} | {}x{} | {} fps | {} kbit", @@ -150,6 +195,26 @@ impl SnapshotReport { right_breakout.width, right_breakout.height ), + right_decoder_label: latest + .map(|sample| { + if sample.right_decoder_label.is_empty() { + "pending".to_string() + } else { + sample.right_decoder_label.clone() + } + }) + .unwrap_or_else(|| "pending".to_string()), + right_stream_spread_ms: latest + .map(|sample| sample.right_stream_spread_ms) + .unwrap_or(0.0), + right_packet_gap_peak_ms: latest + .map(|sample| sample.right_packet_gap_peak_ms) + .unwrap_or(0.0), + right_present_gap_peak_ms: latest + .map(|sample| sample.right_present_gap_peak_ms) + .unwrap_or(0.0), + right_queue_depth: latest.map(|sample| sample.right_queue_depth).unwrap_or(0), + right_queue_peak: latest.map(|sample| sample.right_queue_peak).unwrap_or(0), selected_camera: state.devices.camera.clone(), selected_microphone: state.devices.microphone.clone(), selected_speaker: state.devices.speaker.clone(), @@ -191,11 +256,31 @@ impl SnapshotReport { let _ = writeln!(text, " capture: {}", self.left_capture_profile); let _ = writeln!(text, " transport: {}", self.left_capture_transport); let _ = writeln!(text, " breakout: {}", self.left_breakout_profile); + let _ = writeln!( + text, + " live: decoder={} spread={:.1}ms gaps={:.0}/{:.0}ms queue={}/{}", + self.left_decoder_label, + self.left_stream_spread_ms, + self.left_packet_gap_peak_ms, + self.left_present_gap_peak_ms, + self.left_queue_depth, + self.left_queue_peak + ); let _ = writeln!(text, "right eye"); let _ = writeln!(text, " surface: {}", self.right_surface); let _ = writeln!(text, " capture: {}", self.right_capture_profile); let _ = writeln!(text, " transport: {}", self.right_capture_transport); let _ = writeln!(text, " breakout: {}", self.right_breakout_profile); + let _ = writeln!( + text, + " live: decoder={} spread={:.1}ms gaps={:.0}/{:.0}ms queue={}/{}", + self.right_decoder_label, + self.right_stream_spread_ms, + self.right_packet_gap_peak_ms, + self.right_present_gap_peak_ms, + self.right_queue_depth, + self.right_queue_peak + ); let _ = writeln!(text); let _ = writeln!(text, "device staging"); let _ = writeln!( @@ -237,7 +322,7 @@ impl SnapshotReport { for sample in &self.recent_samples { let _ = writeln!( text, - " rtt={:.1}ms probe-spread={:.1}ms input-floor={:.1}ms probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}", + " rtt={:.1}ms probe-spread={:.1}ms input-floor={:.1}ms probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}/{} peaks=l{:.0}/{:.0}ms r{:.0}/{:.0}ms", sample.rtt_ms, sample.probe_spread_ms, sample.input_latency_ms, @@ -250,7 +335,12 @@ impl SnapshotReport { sample.right_present_fps, sample.right_server_fps, sample.dropped_frames, - sample.queue_depth + sample.queue_depth, + sample.left_queue_peak.max(sample.right_queue_peak), + sample.left_packet_gap_peak_ms, + sample.left_present_gap_peak_ms, + sample.right_packet_gap_peak_ms, + sample.right_present_gap_peak_ms ); } } @@ -312,12 +402,26 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec 40.0 + || (sample.right_present_gap_peak_ms - sample.right_packet_gap_peak_ms) > 40.0 + { + items.push( + "Present-gap spikes are materially larger than packet-gap spikes. That usually means the client decode/render path is stalling after packets arrive." + .to_string(), + ); + } if sample.queue_depth > 8 { items.push( "The preview queue is backing up. When queue depth climbs, expect laggy mouse feel and delayed visual response even if raw fps still looks okay." .to_string(), ); } + if sample.left_queue_peak >= 4 || sample.right_queue_peak >= 4 { + items.push( + "Queue depth is spiking even if the latest sample looks calm. That points at bursty backpressure rather than steady-state overload." + .to_string(), + ); + } } let heavy_capture = state.capture_sizes.iter().any(|preset| { matches!( @@ -354,6 +458,27 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec= 140.0 || sample.right_packet_gap_peak_ms >= 140.0) + { + items.push( + "Packet-gap spikes are high without packet loss. That means the stream is arriving in bursts, which usually points at source cadence, encoder stalls, or local decoder starvation more than raw WAN loss." + .to_string(), + ); + } + if let Some(sample) = log.latest() + && ((sample.left_decoder_label.contains("avdec") + && sample.left_present_fps + 1.0 < sample.left_receive_fps) + || (sample.right_decoder_label.contains("avdec") + && sample.right_present_fps + 1.0 < sample.right_receive_fps)) + { + items.push( + "At least one eye is falling back to `avdec_*` while presentation lags behind receive. A hardware decode path would likely help more than extra bitrate." + .to_string(), + ); + } if state.breakout_count() == 2 { items.push( "Both eye feeds are broken out right now. If the client starts struggling, compare in-launcher preview smoothness against full-window decode." @@ -381,9 +506,21 @@ mod tests { left_receive_fps: 30.0, left_present_fps: 29.0, left_server_fps: 30.0, + left_stream_spread_ms: 4.0, + left_packet_gap_peak_ms: 55.0, + left_present_gap_peak_ms: 60.0, + left_queue_depth: n as u32, + left_queue_peak: n as u32, + left_decoder_label: "decodebin".to_string(), right_receive_fps: 30.0, right_present_fps: 28.0, right_server_fps: 30.0, + right_stream_spread_ms: 5.0, + right_packet_gap_peak_ms: 65.0, + right_present_gap_peak_ms: 75.0, + right_queue_depth: n as u32, + right_queue_peak: n as u32, + right_decoder_label: "decodebin".to_string(), dropped_frames: n, queue_depth: n as u32, } @@ -443,6 +580,7 @@ mod tests { assert!(report.client_version.starts_with("0.")); assert!(report.left_capture_profile.contains("fps")); assert_eq!(report.left_capture_transport, "source pass-through"); + assert_eq!(report.left_decoder_label, "decodebin"); } #[test] @@ -470,6 +608,7 @@ mod tests { assert!(text.contains("client: v")); assert!(text.contains("left eye")); assert!(text.contains("transport:")); + assert!(text.contains("live: decoder=")); assert!(text.contains("recommendations")); } diff --git a/client/src/launcher/preview.rs b/client/src/launcher/preview.rs index 4689949..32aa1fa 100644 --- a/client/src/launcher/preview.rs +++ b/client/src/launcher/preview.rs @@ -3,7 +3,7 @@ use anyhow::{Context, Result}; #[cfg(not(coverage))] use gstreamer as gst; #[cfg(not(coverage))] -use gstreamer::prelude::{Cast, ElementExt, GstBinExt}; +use gstreamer::prelude::{Cast, ElementExt, GstBinExt, GstObjectExt}; #[cfg(not(coverage))] use gstreamer_app as gst_app; #[cfg(not(coverage))] @@ -62,15 +62,19 @@ pub enum PreviewSurface { } #[cfg(not(coverage))] -#[derive(Clone, Copy, Debug, Default, PartialEq)] +#[derive(Clone, Debug, Default, PartialEq)] pub struct PreviewMetricsSnapshot { pub receive_fps: f32, pub present_fps: f32, pub server_fps: f32, - pub jitter_ms: f32, + pub stream_spread_ms: f32, pub packet_loss_pct: f32, pub dropped_frames: u64, pub queue_depth: u32, + pub queue_depth_peak: u32, + pub packet_gap_peak_ms: f32, + pub present_gap_peak_ms: f32, + pub decoder_label: String, } #[cfg(not(coverage))] @@ -430,13 +434,17 @@ struct PreviewTelemetry { packet_times: VecDeque, frame_times: VecDeque, packet_intervals_ms: VecDeque<(Instant, f32)>, + frame_intervals_ms: VecDeque<(Instant, f32)>, packet_losses: VecDeque<(Instant, u64)>, dropped_deltas: VecDeque<(Instant, u64)>, + queue_depth_samples: VecDeque<(Instant, u32)>, last_packet_at: Option, + last_frame_at: Option, last_seq: Option, last_dropped_total: Option, latest_server_fps: u32, latest_queue_depth: u32, + decoder_label: String, } #[cfg(not(coverage))] @@ -479,6 +487,7 @@ impl PreviewTelemetry { self.last_dropped_total = Some(dropped_total); self.latest_server_fps = server_fps.max(1); self.latest_queue_depth = queue_depth; + self.queue_depth_samples.push_back((now, queue_depth)); self.trim(now); } @@ -488,9 +497,21 @@ impl PreviewTelemetry { fn record_presented_frame_at(&mut self, now: Instant) { self.trim(now); + if let Some(previous) = self.last_frame_at.replace(now) { + self.frame_intervals_ms.push_back(( + now, + now.saturating_duration_since(previous).as_secs_f32() * 1000.0, + )); + } self.frame_times.push_back(now); } + fn note_decoder(&mut self, decoder_label: &str) { + if !decoder_label.is_empty() { + self.decoder_label = decoder_label.to_string(); + } + } + fn snapshot(&mut self) -> PreviewMetricsSnapshot { self.snapshot_at(Instant::now()) } @@ -511,14 +532,24 @@ impl PreviewTelemetry { .iter() .map(|(_, dropped)| *dropped) .sum(); + let queue_depth_peak = self + .queue_depth_samples + .iter() + .map(|(_, depth)| *depth) + .max() + .unwrap_or(self.latest_queue_depth); PreviewMetricsSnapshot { receive_fps, present_fps, server_fps: self.latest_server_fps as f32, - jitter_ms: compute_jitter_ms(&self.packet_intervals_ms), + stream_spread_ms: compute_jitter_ms(&self.packet_intervals_ms), packet_loss_pct, dropped_frames, queue_depth: self.latest_queue_depth, + queue_depth_peak, + packet_gap_peak_ms: compute_peak_gap_ms(&self.packet_intervals_ms), + present_gap_peak_ms: compute_peak_gap_ms(&self.frame_intervals_ms), + decoder_label: self.decoder_label.clone(), } } @@ -526,8 +557,10 @@ impl PreviewTelemetry { trim_instant_queue(&mut self.packet_times, now); trim_instant_queue(&mut self.frame_times, now); trim_value_queue(&mut self.packet_intervals_ms, now); + trim_value_queue(&mut self.frame_intervals_ms, now); trim_value_queue(&mut self.packet_losses, now); trim_value_queue(&mut self.dropped_deltas, now); + trim_value_queue(&mut self.queue_depth_samples, now); } } @@ -687,6 +720,16 @@ fn run_preview_feed( log_sink: Arc>>>, ) -> Result<()> { let (pipeline, appsrc, appsink) = build_preview_pipeline(profile)?; + { + let shared = Arc::clone(&shared); + pipeline.connect_deep_element_added(move |_, _, element| { + if let Some(decoder_label) = preview_decoder_label(element) + && let Ok(mut slot) = shared.lock() + { + slot.telemetry.note_decoder(&decoder_label); + } + }); + } pipeline .set_state(gst::State::Playing) .context("starting launcher preview pipeline")?; @@ -1087,7 +1130,7 @@ fn build_preview_pipeline( let desc = format!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue max-size-buffers=6 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ - h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! \ + h264parse disable-passthrough=true ! decodebin name=decoder ! videoconvert ! videoscale ! \ video/x-raw,format=RGBA,width={},height={},pixel-aspect-ratio=1/1 ! \ appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true", profile.display_width, profile.display_height @@ -1164,6 +1207,16 @@ fn sample_to_frame(sample: &gst::Sample) -> Option { }) } +#[cfg(not(coverage))] +fn preview_decoder_label(element: &gst::Element) -> Option { + let factory = element.factory()?; + let klass = factory.klass().to_ascii_lowercase(); + if !klass.contains("decoder") || !klass.contains("video") { + return None; + } + Some(factory.name().to_string()) +} + #[cfg(not(coverage))] fn preview_bitrate(var: &str, default: u32) -> u32 { std::env::var(var) @@ -1228,6 +1281,11 @@ fn compute_jitter_ms(samples: &VecDeque<(Instant, f32)>) -> f32 { / samples.len() as f32 } +#[cfg(not(coverage))] +fn compute_peak_gap_ms(samples: &VecDeque<(Instant, f32)>) -> f32 { + samples.iter().map(|(_, value)| *value).fold(0.0, f32::max) +} + #[cfg(test)] mod tests { use super::{ @@ -1262,6 +1320,7 @@ mod tests { fn preview_telemetry_reports_fps_jitter_loss_and_drop_metrics() { let mut telemetry = PreviewTelemetry::default(); let start = Instant::now(); + telemetry.note_decoder("nvh264dec"); telemetry.record_packet_at(start, 1, 30, 0, 1); telemetry.record_presented_frame_at(start + Duration::from_millis(5)); telemetry.record_packet_at(start + Duration::from_millis(33), 2, 30, 0, 1); @@ -1273,9 +1332,13 @@ mod tests { assert!(snapshot.receive_fps >= 12.0); assert!(snapshot.present_fps >= 12.0); assert_eq!(snapshot.server_fps, 27.0); - assert!(snapshot.jitter_ms > 0.0); + assert!(snapshot.stream_spread_ms > 0.0); assert!(snapshot.packet_loss_pct > 0.0); assert_eq!(snapshot.dropped_frames, 2); assert_eq!(snapshot.queue_depth, 3); + assert_eq!(snapshot.queue_depth_peak, 3); + assert!(snapshot.packet_gap_peak_ms >= 47.0); + assert!(snapshot.present_gap_peak_ms >= 53.0); + assert_eq!(snapshot.decoder_label, "nvh264dec"); } } diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index a7cd82a..433972c 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -287,9 +287,21 @@ fn record_diagnostics_sample( left_receive_fps: left_metrics.receive_fps, left_present_fps: left_metrics.present_fps, left_server_fps: left_metrics.server_fps, + left_stream_spread_ms: left_metrics.stream_spread_ms, + left_packet_gap_peak_ms: left_metrics.packet_gap_peak_ms, + left_present_gap_peak_ms: left_metrics.present_gap_peak_ms, + left_queue_depth: left_metrics.queue_depth, + left_queue_peak: left_metrics.queue_depth_peak, + left_decoder_label: left_metrics.decoder_label.clone(), right_receive_fps: right_metrics.receive_fps, right_present_fps: right_metrics.present_fps, right_server_fps: right_metrics.server_fps, + right_stream_spread_ms: right_metrics.stream_spread_ms, + right_packet_gap_peak_ms: right_metrics.packet_gap_peak_ms, + right_present_gap_peak_ms: right_metrics.present_gap_peak_ms, + right_queue_depth: right_metrics.queue_depth, + right_queue_peak: right_metrics.queue_depth_peak, + right_decoder_label: right_metrics.decoder_label.clone(), dropped_frames: left_metrics .dropped_frames .saturating_add(right_metrics.dropped_frames), diff --git a/common/Cargo.toml b/common/Cargo.toml index a85ade1..a2c9f9e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.8.3" +version = "0.9.0" edition = "2024" build = "build.rs" diff --git a/common/src/cli.rs b/common/src/cli.rs index 590f859..8003b5f 100644 --- a/common/src/cli.rs +++ b/common/src/cli.rs @@ -17,6 +17,6 @@ mod tests { #[test] fn banner_includes_version() { - assert_eq!(banner("0.8.3"), "lesavka-common CLI (v0.8.3)"); + assert_eq!(banner("0.9.0"), "lesavka-common CLI (v0.9.0)"); } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 63afc5d..d9d1d3b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.8.3" +version = "0.9.0" edition = "2024" autobins = false