diff --git a/client/Cargo.toml b/client/Cargo.toml index 5ebcfb8..7acf683 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.9.1" +version = "0.10.0" edition = "2024" [dependencies] diff --git a/client/src/launcher/diagnostics.rs b/client/src/launcher/diagnostics.rs index da3b956..25dd474 100644 --- a/client/src/launcher/diagnostics.rs +++ b/client/src/launcher/diagnostics.rs @@ -19,6 +19,9 @@ pub struct PerformanceSample { pub left_present_gap_peak_ms: f32, pub left_queue_depth: u32, pub left_queue_peak: u32, + pub left_server_source_gap_peak_ms: f32, + pub left_server_send_gap_peak_ms: f32, + pub left_server_queue_peak: u32, pub left_decoder_label: String, pub right_receive_fps: f32, pub right_present_fps: f32, @@ -28,6 +31,9 @@ pub struct PerformanceSample { pub right_present_gap_peak_ms: f32, pub right_queue_depth: u32, pub right_queue_peak: u32, + pub right_server_source_gap_peak_ms: f32, + pub right_server_send_gap_peak_ms: f32, + pub right_server_queue_peak: u32, pub right_decoder_label: String, pub dropped_frames: u64, pub queue_depth: u32, @@ -93,6 +99,9 @@ pub struct SnapshotReport { pub left_present_gap_peak_ms: f32, pub left_queue_depth: u32, pub left_queue_peak: u32, + pub left_server_source_gap_peak_ms: f32, + pub left_server_send_gap_peak_ms: f32, + pub left_server_queue_peak: u32, pub right_surface: String, pub right_capture_profile: String, pub right_capture_transport: String, @@ -103,6 +112,9 @@ pub struct SnapshotReport { pub right_present_gap_peak_ms: f32, pub right_queue_depth: u32, pub right_queue_peak: u32, + pub right_server_source_gap_peak_ms: f32, + pub right_server_send_gap_peak_ms: f32, + pub right_server_queue_peak: u32, pub selected_camera: Option, pub selected_microphone: Option, pub selected_speaker: Option, @@ -179,6 +191,15 @@ impl SnapshotReport { .unwrap_or(0.0), left_queue_depth: latest.map(|sample| sample.left_queue_depth).unwrap_or(0), left_queue_peak: latest.map(|sample| sample.left_queue_peak).unwrap_or(0), + left_server_source_gap_peak_ms: latest + .map(|sample| sample.left_server_source_gap_peak_ms) + .unwrap_or(0.0), + left_server_send_gap_peak_ms: latest + .map(|sample| sample.left_server_send_gap_peak_ms) + .unwrap_or(0.0), + left_server_queue_peak: latest + .map(|sample| sample.left_server_queue_peak) + .unwrap_or(0), right_surface: state.display_surface(1).label().to_string(), right_capture_profile: format!( "{} | {}x{} | {} fps | {} kbit", @@ -215,6 +236,15 @@ impl SnapshotReport { .unwrap_or(0.0), right_queue_depth: latest.map(|sample| sample.right_queue_depth).unwrap_or(0), right_queue_peak: latest.map(|sample| sample.right_queue_peak).unwrap_or(0), + right_server_source_gap_peak_ms: latest + .map(|sample| sample.right_server_source_gap_peak_ms) + .unwrap_or(0.0), + right_server_send_gap_peak_ms: latest + .map(|sample| sample.right_server_send_gap_peak_ms) + .unwrap_or(0.0), + right_server_queue_peak: latest + .map(|sample| sample.right_server_queue_peak) + .unwrap_or(0), selected_camera: state.devices.camera.clone(), selected_microphone: state.devices.microphone.clone(), selected_speaker: state.devices.speaker.clone(), @@ -266,6 +296,13 @@ impl SnapshotReport { self.left_queue_depth, self.left_queue_peak ); + let _ = writeln!( + text, + " server: gaps={:.0}/{:.0}ms queue-peak={}", + self.left_server_source_gap_peak_ms, + self.left_server_send_gap_peak_ms, + self.left_server_queue_peak + ); let _ = writeln!(text, "right eye"); let _ = writeln!(text, " surface: {}", self.right_surface); let _ = writeln!(text, " capture: {}", self.right_capture_profile); @@ -281,6 +318,13 @@ impl SnapshotReport { self.right_queue_depth, self.right_queue_peak ); + let _ = writeln!( + text, + " server: gaps={:.0}/{:.0}ms queue-peak={}", + self.right_server_source_gap_peak_ms, + self.right_server_send_gap_peak_ms, + self.right_server_queue_peak + ); let _ = writeln!(text); let _ = writeln!(text, "device staging"); let _ = writeln!( @@ -322,7 +366,7 @@ impl SnapshotReport { for sample in &self.recent_samples { let _ = writeln!( text, - " rtt={:.1}ms probe-spread={:.1}ms input-floor={:.1}ms probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}/{} peaks=l{:.0}/{:.0}ms r{:.0}/{:.0}ms", + " rtt={:.1}ms probe-spread={:.1}ms input-floor={:.1}ms probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}/{} peaks=l{:.0}/{:.0}ms r{:.0}/{:.0}ms server=l{:.0}/{:.0}/{} r{:.0}/{:.0}/{}", sample.rtt_ms, sample.probe_spread_ms, sample.input_latency_ms, @@ -340,7 +384,13 @@ impl SnapshotReport { sample.left_packet_gap_peak_ms, sample.left_present_gap_peak_ms, sample.right_packet_gap_peak_ms, - sample.right_present_gap_peak_ms + sample.right_present_gap_peak_ms, + sample.left_server_source_gap_peak_ms, + sample.left_server_send_gap_peak_ms, + sample.left_server_queue_peak, + sample.right_server_source_gap_peak_ms, + sample.right_server_send_gap_peak_ms, + sample.right_server_queue_peak ); } } @@ -422,6 +472,28 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec 60.0 + || (sample.right_packet_gap_peak_ms - sample.right_server_send_gap_peak_ms) > 60.0 + { + items.push( + "Client packet-gap spikes are much larger than the server's send-gap peaks. That points away from the server pipeline and toward network burstiness or client-side receive scheduling." + .to_string(), + ); + } + if sample.left_server_source_gap_peak_ms >= 120.0 + || sample.right_server_source_gap_peak_ms >= 120.0 + { + items.push( + "The server is seeing large source-frame gaps before packets even leave the box. That points at capture cadence or server-side pipeline stalls more than WAN loss." + .to_string(), + ); + } + if sample.left_server_queue_peak >= 4 || sample.right_server_queue_peak >= 4 { + items.push( + "The server-side stream queue is peaking above its steady state. That suggests bursty backpressure is already forming before the client sees it." + .to_string(), + ); + } } let heavy_capture = state.capture_sizes.iter().any(|preset| { matches!( @@ -511,6 +583,9 @@ mod tests { left_present_gap_peak_ms: 60.0, left_queue_depth: n as u32, left_queue_peak: n as u32, + left_server_source_gap_peak_ms: 42.0, + left_server_send_gap_peak_ms: 48.0, + left_server_queue_peak: n as u32 + 1, left_decoder_label: "decodebin".to_string(), right_receive_fps: 30.0, right_present_fps: 28.0, @@ -520,6 +595,9 @@ mod tests { right_present_gap_peak_ms: 75.0, right_queue_depth: n as u32, right_queue_peak: n as u32, + right_server_source_gap_peak_ms: 51.0, + right_server_send_gap_peak_ms: 58.0, + right_server_queue_peak: n as u32 + 1, right_decoder_label: "decodebin".to_string(), dropped_frames: n, queue_depth: n as u32, diff --git a/client/src/launcher/preview.rs b/client/src/launcher/preview.rs index 32aa1fa..d21be16 100644 --- a/client/src/launcher/preview.rs +++ b/client/src/launcher/preview.rs @@ -74,6 +74,9 @@ pub struct PreviewMetricsSnapshot { pub queue_depth_peak: u32, pub packet_gap_peak_ms: f32, pub present_gap_peak_ms: f32, + pub server_source_gap_peak_ms: f32, + pub server_send_gap_peak_ms: f32, + pub server_queue_peak: u32, pub decoder_label: String, } @@ -444,13 +447,34 @@ struct PreviewTelemetry { last_dropped_total: Option, latest_server_fps: u32, latest_queue_depth: u32, + latest_server_source_gap_peak_ms: u32, + latest_server_send_gap_peak_ms: u32, + latest_server_queue_peak: u32, decoder_label: String, } #[cfg(not(coverage))] impl PreviewTelemetry { - fn record_packet(&mut self, seq: u64, server_fps: u32, dropped_total: u64, queue_depth: u32) { - self.record_packet_at(Instant::now(), seq, server_fps, dropped_total, queue_depth); + fn record_packet( + &mut self, + seq: u64, + server_fps: u32, + dropped_total: u64, + queue_depth: u32, + server_source_gap_peak_ms: u32, + server_send_gap_peak_ms: u32, + server_queue_peak: u32, + ) { + self.record_packet_at( + Instant::now(), + seq, + server_fps, + dropped_total, + queue_depth, + server_source_gap_peak_ms, + server_send_gap_peak_ms, + server_queue_peak, + ); } fn record_packet_at( @@ -460,6 +484,9 @@ impl PreviewTelemetry { server_fps: u32, dropped_total: u64, queue_depth: u32, + server_source_gap_peak_ms: u32, + server_send_gap_peak_ms: u32, + server_queue_peak: u32, ) { self.trim(now); self.packet_times.push_back(now); @@ -487,6 +514,9 @@ impl PreviewTelemetry { self.last_dropped_total = Some(dropped_total); self.latest_server_fps = server_fps.max(1); self.latest_queue_depth = queue_depth; + self.latest_server_source_gap_peak_ms = server_source_gap_peak_ms; + self.latest_server_send_gap_peak_ms = server_send_gap_peak_ms; + self.latest_server_queue_peak = server_queue_peak.max(queue_depth); self.queue_depth_samples.push_back((now, queue_depth)); self.trim(now); } @@ -549,6 +579,9 @@ impl PreviewTelemetry { queue_depth_peak, packet_gap_peak_ms: compute_peak_gap_ms(&self.packet_intervals_ms), present_gap_peak_ms: compute_peak_gap_ms(&self.frame_intervals_ms), + server_source_gap_peak_ms: self.latest_server_source_gap_peak_ms as f32, + server_send_gap_peak_ms: self.latest_server_send_gap_peak_ms as f32, + server_queue_peak: self.latest_server_queue_peak, decoder_label: self.decoder_label.clone(), } } @@ -1185,6 +1218,9 @@ fn record_preview_packet(shared: &Arc>, pkt: &VideoPac pkt.effective_fps, pkt.dropped_total, pkt.queue_depth, + pkt.server_source_gap_peak_ms, + pkt.server_send_gap_peak_ms, + pkt.server_queue_peak, ); } } @@ -1321,11 +1357,11 @@ mod tests { let mut telemetry = PreviewTelemetry::default(); let start = Instant::now(); telemetry.note_decoder("nvh264dec"); - telemetry.record_packet_at(start, 1, 30, 0, 1); + telemetry.record_packet_at(start, 1, 30, 0, 1, 41, 38, 2); telemetry.record_presented_frame_at(start + Duration::from_millis(5)); - telemetry.record_packet_at(start + Duration::from_millis(33), 2, 30, 0, 1); + telemetry.record_packet_at(start + Duration::from_millis(33), 2, 30, 0, 1, 41, 38, 2); telemetry.record_presented_frame_at(start + Duration::from_millis(37)); - telemetry.record_packet_at(start + Duration::from_millis(80), 4, 27, 2, 3); + telemetry.record_packet_at(start + Duration::from_millis(80), 4, 27, 2, 3, 77, 88, 4); telemetry.record_presented_frame_at(start + Duration::from_millis(90)); let snapshot = telemetry.snapshot_at(start + Duration::from_millis(120)); @@ -1339,6 +1375,9 @@ mod tests { assert_eq!(snapshot.queue_depth_peak, 3); assert!(snapshot.packet_gap_peak_ms >= 47.0); assert!(snapshot.present_gap_peak_ms >= 53.0); + assert_eq!(snapshot.server_source_gap_peak_ms, 77.0); + assert_eq!(snapshot.server_send_gap_peak_ms, 88.0); + assert_eq!(snapshot.server_queue_peak, 4); assert_eq!(snapshot.decoder_label, "nvh264dec"); } } diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index 433972c..50a2017 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -292,6 +292,9 @@ fn record_diagnostics_sample( left_present_gap_peak_ms: left_metrics.present_gap_peak_ms, left_queue_depth: left_metrics.queue_depth, left_queue_peak: left_metrics.queue_depth_peak, + left_server_source_gap_peak_ms: left_metrics.server_source_gap_peak_ms, + left_server_send_gap_peak_ms: left_metrics.server_send_gap_peak_ms, + left_server_queue_peak: left_metrics.server_queue_peak, left_decoder_label: left_metrics.decoder_label.clone(), right_receive_fps: right_metrics.receive_fps, right_present_fps: right_metrics.present_fps, @@ -301,6 +304,9 @@ fn record_diagnostics_sample( right_present_gap_peak_ms: right_metrics.present_gap_peak_ms, right_queue_depth: right_metrics.queue_depth, right_queue_peak: right_metrics.queue_depth_peak, + right_server_source_gap_peak_ms: right_metrics.server_source_gap_peak_ms, + right_server_send_gap_peak_ms: right_metrics.server_send_gap_peak_ms, + right_server_queue_peak: right_metrics.server_queue_peak, right_decoder_label: right_metrics.decoder_label.clone(), dropped_frames: left_metrics .dropped_frames diff --git a/common/Cargo.toml b/common/Cargo.toml index 9e2ff8d..b722709 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.9.1" +version = "0.10.0" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index b8d3b00..7f5e7c5 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -20,6 +20,9 @@ message VideoPacket { uint32 effective_fps = 5; uint64 dropped_total = 6; uint32 queue_depth = 7; + uint32 server_source_gap_peak_ms = 8; + uint32 server_send_gap_peak_ms = 9; + uint32 server_queue_peak = 10; } message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } diff --git a/common/src/cli.rs b/common/src/cli.rs index 6d9b9c5..0059fa7 100644 --- a/common/src/cli.rs +++ b/common/src/cli.rs @@ -17,6 +17,6 @@ mod tests { #[test] fn banner_includes_version() { - assert_eq!(banner("0.9.1"), "lesavka-common CLI (v0.9.1)"); + assert_eq!(banner("0.10.0"), "lesavka-common CLI (v0.10.0)"); } } diff --git a/server/Cargo.toml b/server/Cargo.toml index 2cb94b5..aae1531 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.9.1" +version = "0.10.0" edition = "2024" autobins = false diff --git a/server/src/video.rs b/server/src/video.rs index 2167bf9..7257855 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -185,6 +185,27 @@ fn round_down_even_u32(value: u32) -> u32 { rounded - (rounded % 2) } +fn reset_stream_telemetry_window( + last_window_sec: &AtomicU64, + current_sec: u64, + source_gap_peak_ms: &AtomicU32, + send_gap_peak_ms: &AtomicU32, + queue_peak_depth: &AtomicU32, +) { + let prev = last_window_sec.load(Ordering::Relaxed); + if current_sec <= prev { + return; + } + if last_window_sec + .compare_exchange(prev, current_sec, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + source_gap_peak_ms.store(0, Ordering::Relaxed); + send_gap_peak_ms.store(0, Ordering::Relaxed); + queue_peak_depth.store(0, Ordering::Relaxed); + } +} + #[derive(Clone, Copy, Debug)] struct EyeCaptureRequest { source_width: u32, @@ -387,6 +408,11 @@ pub async fn eye_ball_with_request( let last_adjust_sec = Arc::new(AtomicU64::new(0)); let wait_for_idr = Arc::new(AtomicBool::new(false)); let last_sent = Arc::new(AtomicU64::new(0)); + let last_source_pts = Arc::new(AtomicU64::new(0)); + let source_gap_peak_ms = Arc::new(AtomicU32::new(0)); + let send_gap_peak_ms = Arc::new(AtomicU32::new(0)); + let queue_peak_depth = Arc::new(AtomicU32::new(0)); + let last_telemetry_sec = Arc::new(AtomicU64::new(0)); let packet_seq = Arc::new(AtomicU64::new(0)); let queue_buffers = env_u32("LESAVKA_EYE_QUEUE_BUFFERS", 8).max(1); @@ -472,6 +498,11 @@ pub async fn eye_ball_with_request( let dropped_total_for_cb = Arc::clone(&dropped_total); let packet_seq_for_cb = Arc::clone(&packet_seq); let effective_fps_for_cb = Arc::clone(&effective_fps); + let last_source_pts_for_cb = Arc::clone(&last_source_pts); + let source_gap_peak_ms_for_cb = Arc::clone(&source_gap_peak_ms); + let send_gap_peak_ms_for_cb = Arc::clone(&send_gap_peak_ms); + let queue_peak_depth_for_cb = Arc::clone(&queue_peak_depth); + let last_telemetry_sec_for_cb = Arc::clone(&last_telemetry_sec); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |sink| { @@ -510,9 +541,22 @@ pub async fn eye_ball_with_request( .saturating_sub(origin) .nseconds() / 1_000; + let sec = pts_us / 1_000_000; + reset_stream_telemetry_window( + &last_telemetry_sec_for_cb, + sec, + &source_gap_peak_ms_for_cb, + &send_gap_peak_ms_for_cb, + &queue_peak_depth_for_cb, + ); + let previous_source_pts = last_source_pts_for_cb.swap(pts_us, Ordering::Relaxed); + if previous_source_pts > 0 && pts_us > previous_source_pts { + let source_gap_ms = + ((pts_us.saturating_sub(previous_source_pts)) / 1_000) as u32; + source_gap_peak_ms_for_cb.fetch_max(source_gap_ms, Ordering::Relaxed); + } if adaptive { - let sec = pts_us / 1_000_000; let prev = last_adjust_sec.load(Ordering::Relaxed); if sec > prev && last_adjust_sec @@ -549,6 +593,10 @@ pub async fn eye_ball_with_request( if !should_send_frame(last, pts_us, current_fps) { return Ok(gst::FlowSuccess::Ok); } + if last > 0 && pts_us > last { + let send_gap_ms = ((pts_us.saturating_sub(last)) / 1_000) as u32; + send_gap_peak_ms_for_cb.fetch_max(send_gap_ms, Ordering::Relaxed); + } last_sent.store(pts_us, Ordering::Relaxed); if wait_for_idr.load(Ordering::Relaxed) && !is_idr { @@ -558,6 +606,8 @@ pub async fn eye_ball_with_request( let data = map.as_slice().to_vec(); let size = data.len(); let seq = packet_seq_for_cb.fetch_add(1, Ordering::Relaxed) + 1; + let queue_depth = (chan_capacity.saturating_sub(tx.capacity())) as u32; + queue_peak_depth_for_cb.fetch_max(queue_depth, Ordering::Relaxed); let pkt = VideoPacket { id, pts: pts_us, @@ -565,7 +615,10 @@ pub async fn eye_ball_with_request( seq, effective_fps: effective_fps_for_cb.load(Ordering::Relaxed).max(1), dropped_total: dropped_total_for_cb.load(Ordering::Relaxed), - queue_depth: (chan_capacity.saturating_sub(tx.capacity())) as u32, + queue_depth, + server_source_gap_peak_ms: source_gap_peak_ms_for_cb.load(Ordering::Relaxed), + server_send_gap_peak_ms: send_gap_peak_ms_for_cb.load(Ordering::Relaxed), + server_queue_peak: queue_peak_depth_for_cb.load(Ordering::Relaxed), }; match tx.try_send(Ok(pkt)) { Ok(_) => {