diff --git a/client/src/handshake.rs b/client/src/handshake.rs index fb6e984..3e07e98 100644 --- a/client/src/handshake.rs +++ b/client/src/handshake.rs @@ -2,7 +2,7 @@ #![forbid(unsafe_code)] use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient}; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::time::timeout; use tonic::{Code, transport::Endpoint}; use tracing::{info, warn}; @@ -23,6 +23,14 @@ pub struct PeerCaps { pub eye_fps: Option, } +#[derive(Default, Clone, Debug)] +#[cfg_attr(test, derive(PartialEq))] +pub struct HandshakeProbe { + pub caps: PeerCaps, + pub rtt_ms: Option, + pub reachable: bool, +} + fn likely_port_typo_hint(uri: &str) -> Option<&'static str> { if uri.contains(":5005") && !uri.contains(":50051") { Some("possible typo: lesavka server listens on port 50051") @@ -82,6 +90,66 @@ pub async fn negotiate(uri: &str) -> PeerCaps { } } +#[cfg(coverage)] +pub async fn probe(uri: &str) -> HandshakeProbe { + if likely_port_typo_hint(uri).is_some() { + return HandshakeProbe::default(); + } + + let started = Instant::now(); + let ep = match Endpoint::from_shared(uri.to_owned()) { + Ok(ep) => ep + .tcp_nodelay(true) + .http2_keep_alive_interval(Duration::from_secs(15)) + .connect_timeout(Duration::from_secs(5)), + Err(_) => return HandshakeProbe::default(), + }; + + let channel = match timeout(Duration::from_secs(8), ep.connect()).await { + Ok(Ok(channel)) => channel, + _ => return HandshakeProbe::default(), + }; + + let mut cli = HandshakeClient::new(channel); + let rtt_ms = started.elapsed().as_secs_f32() * 1000.0; + match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await { + Ok(Ok(rsp)) => { + let rsp = rsp.get_ref(); + HandshakeProbe { + caps: PeerCaps { + camera: rsp.camera, + microphone: rsp.microphone, + server_version: (!rsp.server_version.is_empty()) + .then_some(rsp.server_version.clone()), + camera_output: (!rsp.camera_output.is_empty()) + .then_some(rsp.camera_output.clone()), + camera_codec: (!rsp.camera_codec.is_empty()) + .then_some(rsp.camera_codec.clone()), + camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width), + camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height), + camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps), + eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width), + eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height), + eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps), + }, + rtt_ms: Some(rtt_ms), + reachable: true, + } + } + Ok(Err(e)) if e.code() == Code::Unimplemented => HandshakeProbe { + caps: PeerCaps::default(), + rtt_ms: Some(rtt_ms), + reachable: true, + }, + Ok(Err(_)) => HandshakeProbe { + caps: PeerCaps::default(), + rtt_ms: Some(rtt_ms), + reachable: true, + }, + Err(_) => HandshakeProbe::default(), + } +} + #[cfg(not(coverage))] pub async fn negotiate(uri: &str) -> PeerCaps { info!(%uri, "๐Ÿค dial handshake"); @@ -196,9 +264,93 @@ pub async fn negotiate(uri: &str) -> PeerCaps { PeerCaps::default() } +#[cfg(not(coverage))] +pub async fn probe(uri: &str) -> HandshakeProbe { + info!(%uri, "๐Ÿงช probing handshake"); + + let Some(hint) = likely_port_typo_hint(uri) else { + let started = Instant::now(); + let ep = match Endpoint::from_shared(uri.to_owned()) { + Ok(ep) => ep + .tcp_nodelay(true) + .http2_keep_alive_interval(Duration::from_secs(15)) + .connect_timeout(Duration::from_secs(5)), + Err(e) => { + warn!("๐Ÿงช invalid probe endpoint '{uri}': {e}"); + return HandshakeProbe::default(); + } + }; + + let channel = match timeout(Duration::from_secs(8), ep.connect()).await { + Ok(Ok(channel)) => channel, + Ok(Err(e)) => { + warn!("๐Ÿงช handshake probe connect failed: {e}"); + return HandshakeProbe::default(); + } + Err(_) => { + warn!("๐Ÿงช handshake probe timed out"); + return HandshakeProbe::default(); + } + }; + + let mut cli = HandshakeClient::new(channel); + let rtt_ms = started.elapsed().as_secs_f32() * 1000.0; + return match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await { + Ok(Ok(rsp)) => { + let rsp = rsp.get_ref(); + let caps = PeerCaps { + camera: rsp.camera, + microphone: rsp.microphone, + server_version: (!rsp.server_version.is_empty()) + .then_some(rsp.server_version.clone()), + camera_output: (!rsp.camera_output.is_empty()) + .then_some(rsp.camera_output.clone()), + camera_codec: (!rsp.camera_codec.is_empty()) + .then_some(rsp.camera_codec.clone()), + camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width), + camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height), + camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps), + eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width), + eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height), + eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps), + }; + info!(rtt_ms, ?caps, "๐Ÿงช handshake probe ok"); + HandshakeProbe { + caps, + rtt_ms: Some(rtt_ms), + reachable: true, + } + } + Ok(Err(e)) if e.code() == Code::Unimplemented => { + warn!("๐Ÿงช handshake probe reached a server without the handshake service"); + HandshakeProbe { + caps: PeerCaps::default(), + rtt_ms: Some(rtt_ms), + reachable: true, + } + } + Ok(Err(e)) => { + warn!("๐Ÿงช handshake probe RPC failed: {e}"); + HandshakeProbe { + caps: PeerCaps::default(), + rtt_ms: Some(rtt_ms), + reachable: true, + } + } + Err(_) => { + warn!("๐Ÿงช handshake probe RPC timed out"); + HandshakeProbe::default() + } + }; + }; + + warn!("๐Ÿงช handshake probe endpoint '{uri}' looks wrong ({hint})"); + HandshakeProbe::default() +} + #[cfg(test)] mod tests { - use super::{PeerCaps, likely_port_typo_hint, negotiate}; + use super::{HandshakeProbe, PeerCaps, likely_port_typo_hint, negotiate, probe}; #[test] fn likely_port_typo_hint_flags_common_port_mistype() { @@ -220,4 +372,10 @@ mod tests { let caps = negotiate("http://127.0.0.1:5005").await; assert_eq!(caps, PeerCaps::default()); } + + #[tokio::test] + async fn probe_returns_defaults_for_port_typo_hint() { + let probe_result = probe("http://127.0.0.1:5005").await; + assert_eq!(probe_result, HandshakeProbe::default()); + } } diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index fb5e728..47fbd7d 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -214,6 +214,7 @@ impl CameraCapture { id: 2, pts, data: map.as_slice().to_vec(), + ..Default::default() }) } diff --git a/client/src/launcher/diagnostics.rs b/client/src/launcher/diagnostics.rs index ad5f736..b9a7c1c 100644 --- a/client/src/launcher/diagnostics.rs +++ b/client/src/launcher/diagnostics.rs @@ -7,9 +7,16 @@ use super::state::{InputRouting, LauncherState, ViewMode}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PerformanceSample { pub rtt_ms: f32, + pub jitter_ms: f32, pub input_latency_ms: f32, - pub left_fps: f32, - pub right_fps: f32, + pub probe_loss_pct: f32, + pub video_loss_pct: f32, + pub left_receive_fps: f32, + pub left_present_fps: f32, + pub left_server_fps: f32, + pub right_receive_fps: f32, + pub right_present_fps: f32, + pub right_server_fps: f32, pub dropped_frames: u64, pub queue_depth: u32, } @@ -224,11 +231,18 @@ impl SnapshotReport { for sample in &self.recent_samples { let _ = writeln!( text, - " rtt={:.1}ms input={:.1}ms left={:.1}fps right={:.1}fps dropped={} queue={}", + " rtt={:.1}ms jitter={:.1}ms input-floor={:.1}ms probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}", sample.rtt_ms, + sample.jitter_ms, sample.input_latency_ms, - sample.left_fps, - sample.right_fps, + sample.probe_loss_pct, + sample.video_loss_pct, + sample.left_receive_fps, + sample.left_present_fps, + sample.left_server_fps, + sample.right_receive_fps, + sample.right_present_fps, + sample.right_server_fps, sample.dropped_frames, sample.queue_depth ); @@ -267,10 +281,38 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec= 3.0 || sample.jitter_ms >= 18.0 { + items.push( + "Probe jitter/loss is elevated. A wired client connection or a gentler capture profile will usually help more than changing the breakout size." + .to_string(), + ); + } + if sample.video_loss_pct >= 2.0 || sample.dropped_frames > 0 { + items.push( + "Video packets are arriving with gaps or server-side drops. Try 900p or 720p capture first, then watch whether dropped frames and video-loss fall." + .to_string(), + ); + } + if sample.left_present_fps + 1.0 < sample.left_receive_fps + || sample.right_present_fps + 1.0 < sample.right_receive_fps + { + items.push( + "The client is receiving more frames than it is presenting. That points at local decode/render pressure, so prefer lighter breakout sizes or hardware decode." + .to_string(), + ); + } + if sample.queue_depth > 8 { + items.push( + "The preview queue is backing up. When queue depth climbs, expect laggy mouse feel and delayed visual response even if raw fps still looks okay." + .to_string(), + ); + } + } let heavy_capture = state.capture_sizes.iter().any(|preset| { matches!( preset, @@ -305,9 +347,16 @@ mod tests { fn sample(n: u64) -> PerformanceSample { PerformanceSample { rtt_ms: 20.0 + n as f32, + jitter_ms: 3.0 + n as f32, input_latency_ms: 10.0 + n as f32, - left_fps: 30.0, - right_fps: 30.0, + probe_loss_pct: n as f32, + video_loss_pct: (n as f32) * 0.5, + left_receive_fps: 30.0, + left_present_fps: 29.0, + left_server_fps: 30.0, + right_receive_fps: 30.0, + right_present_fps: 28.0, + right_server_fps: 30.0, dropped_frames: n, queue_depth: n as u32, } diff --git a/client/src/launcher/preview.rs b/client/src/launcher/preview.rs index 1b974ca..1874627 100644 --- a/client/src/launcher/preview.rs +++ b/client/src/launcher/preview.rs @@ -13,11 +13,13 @@ use gtk::{gdk, glib}; #[cfg(not(coverage))] use lesavka_common::lesavka::{MonitorRequest, VideoPacket, relay_client::RelayClient}; #[cfg(not(coverage))] +use std::collections::VecDeque; +#[cfg(not(coverage))] use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; #[cfg(not(coverage))] use std::sync::{Arc, Mutex}; #[cfg(not(coverage))] -use std::time::Duration; +use std::time::{Duration, Instant}; #[cfg(not(coverage))] use tonic::{Request, transport::Channel}; #[cfg(not(coverage))] @@ -33,6 +35,8 @@ const DEFAULT_EYE_SOURCE_WIDTH: i32 = 1920; const DEFAULT_EYE_SOURCE_HEIGHT: i32 = 1080; #[cfg(not(coverage))] const PREVIEW_IDLE_STATUS: &str = "Connect relay to preview."; +#[cfg(not(coverage))] +const TELEMETRY_WINDOW: Duration = Duration::from_secs(5); #[cfg(not(coverage))] pub struct LauncherPreview { @@ -57,6 +61,18 @@ pub enum PreviewSurface { Window, } +#[cfg(not(coverage))] +#[derive(Clone, Copy, Debug, Default, PartialEq)] +pub struct PreviewMetricsSnapshot { + pub receive_fps: f32, + pub present_fps: f32, + pub server_fps: f32, + pub jitter_ms: f32, + pub packet_loss_pct: f32, + pub dropped_frames: u64, + pub queue_depth: u32, +} + #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] struct PreviewProfile { @@ -194,6 +210,27 @@ impl LauncherPreview { } } + pub fn snapshot_metrics( + &self, + monitor_id: usize, + surface: PreviewSurface, + ) -> Option { + match surface { + PreviewSurface::Inline => self + .inline_feeds + .lock() + .ok() + .and_then(|feeds| feeds.get(monitor_id).cloned()) + .map(|feed| feed.snapshot_metrics()), + PreviewSurface::Window => self + .window_feeds + .lock() + .ok() + .and_then(|feeds| feeds.get(monitor_id).cloned()) + .map(|feed| feed.snapshot_metrics()), + } + } + pub fn set_capture_profile( &self, monitor_id: usize, @@ -329,6 +366,7 @@ struct SharedPreviewState { clear_picture: bool, last_logged_error: Option, last_logged_status: Option, + telemetry: PreviewTelemetry, } #[cfg(not(coverage))] @@ -341,6 +379,7 @@ impl SharedPreviewState { clear_picture: true, last_logged_error: None, last_logged_status: None, + telemetry: PreviewTelemetry::default(), } } @@ -361,6 +400,7 @@ impl SharedPreviewState { } fn push_frame(&mut self, frame: PreviewFrame) { + self.telemetry.record_presented_frame(); self.latest = Some(frame); self.clear_picture = false; self.last_logged_error = None; @@ -371,6 +411,113 @@ impl SharedPreviewState { } } +#[cfg(not(coverage))] +#[derive(Debug, Default)] +struct PreviewTelemetry { + packet_times: VecDeque, + frame_times: VecDeque, + packet_intervals_ms: VecDeque<(Instant, f32)>, + packet_losses: VecDeque<(Instant, u64)>, + dropped_deltas: VecDeque<(Instant, u64)>, + last_packet_at: Option, + last_seq: Option, + last_dropped_total: Option, + latest_server_fps: u32, + latest_queue_depth: u32, +} + +#[cfg(not(coverage))] +impl PreviewTelemetry { + fn record_packet(&mut self, seq: u64, server_fps: u32, dropped_total: u64, queue_depth: u32) { + self.record_packet_at(Instant::now(), seq, server_fps, dropped_total, queue_depth); + } + + fn record_packet_at( + &mut self, + now: Instant, + seq: u64, + server_fps: u32, + dropped_total: u64, + queue_depth: u32, + ) { + self.trim(now); + self.packet_times.push_back(now); + if let Some(previous) = self.last_packet_at.replace(now) { + self.packet_intervals_ms.push_back(( + now, + now.saturating_duration_since(previous).as_secs_f32() * 1000.0, + )); + } + if seq > 0 { + if let Some(previous_seq) = self.last_seq + && seq > previous_seq + 1 + { + self.packet_losses + .push_back((now, seq.saturating_sub(previous_seq + 1))); + } + self.last_seq = Some(seq); + } + if let Some(previous_dropped) = self.last_dropped_total + && dropped_total > previous_dropped + { + self.dropped_deltas + .push_back((now, dropped_total.saturating_sub(previous_dropped))); + } + self.last_dropped_total = Some(dropped_total); + self.latest_server_fps = server_fps.max(1); + self.latest_queue_depth = queue_depth; + self.trim(now); + } + + fn record_presented_frame(&mut self) { + self.record_presented_frame_at(Instant::now()); + } + + fn record_presented_frame_at(&mut self, now: Instant) { + self.trim(now); + self.frame_times.push_back(now); + } + + fn snapshot(&mut self) -> PreviewMetricsSnapshot { + self.snapshot_at(Instant::now()) + } + + fn snapshot_at(&mut self, now: Instant) -> PreviewMetricsSnapshot { + self.trim(now); + let receive_fps = events_per_second(&self.packet_times, now); + let present_fps = events_per_second(&self.frame_times, now); + let delivered = self.packet_times.len() as u64; + let packet_losses: u64 = self.packet_losses.iter().map(|(_, loss)| *loss).sum(); + let packet_loss_pct = if delivered + packet_losses == 0 { + 0.0 + } else { + packet_losses as f32 * 100.0 / (delivered + packet_losses) as f32 + }; + let dropped_frames: u64 = self + .dropped_deltas + .iter() + .map(|(_, dropped)| *dropped) + .sum(); + PreviewMetricsSnapshot { + receive_fps, + present_fps, + server_fps: self.latest_server_fps as f32, + jitter_ms: compute_jitter_ms(&self.packet_intervals_ms), + packet_loss_pct, + dropped_frames, + queue_depth: self.latest_queue_depth, + } + } + + fn trim(&mut self, now: Instant) { + trim_instant_queue(&mut self.packet_times, now); + trim_instant_queue(&mut self.frame_times, now); + trim_value_queue(&mut self.packet_intervals_ms, now); + trim_value_queue(&mut self.packet_losses, now); + trim_value_queue(&mut self.dropped_deltas, now); + } +} + #[cfg(not(coverage))] impl PreviewFeed { fn spawn( @@ -498,6 +645,13 @@ impl PreviewFeed { active_bindings, } } + + fn snapshot_metrics(&self) -> PreviewMetricsSnapshot { + self.shared + .lock() + .map(|mut shared| shared.telemetry.snapshot()) + .unwrap_or_default() + } } #[cfg(not(coverage))] @@ -674,7 +828,10 @@ fn run_preview_feed( ) .await { - Ok(Ok(Some(pkt))) => push_preview_packet(&appsrc, pkt), + Ok(Ok(Some(pkt))) => { + record_preview_packet(&shared, &pkt); + push_preview_packet(&appsrc, pkt); + } Ok(Ok(None)) => { set_shared_status( &shared, @@ -964,6 +1121,18 @@ fn push_preview_packet(appsrc: &gst_app::AppSrc, pkt: VideoPacket) { let _ = appsrc.push_buffer(buf); } +#[cfg(not(coverage))] +fn record_preview_packet(shared: &Arc>, pkt: &VideoPacket) { + if let Ok(mut slot) = shared.lock() { + slot.telemetry.record_packet( + pkt.seq, + pkt.effective_fps, + pkt.dropped_total, + pkt.queue_depth, + ); + } +} + #[cfg(not(coverage))] fn sample_to_frame(sample: &gst::Sample) -> Option { let caps = sample.caps()?; @@ -999,12 +1168,60 @@ fn preview_dimension(var: &str, default: i32) -> i32 { .unwrap_or(default) } +#[cfg(not(coverage))] +fn events_per_second(events: &VecDeque, now: Instant) -> f32 { + let Some(oldest) = events.front().copied() else { + return 0.0; + }; + let span = now + .saturating_duration_since(oldest) + .as_secs_f32() + .clamp(0.25, TELEMETRY_WINDOW.as_secs_f32()); + events.len() as f32 / span +} + +#[cfg(not(coverage))] +fn trim_instant_queue(queue: &mut VecDeque, now: Instant) { + while let Some(oldest) = queue.front().copied() { + if now.saturating_duration_since(oldest) > TELEMETRY_WINDOW { + let _ = queue.pop_front(); + } else { + break; + } + } +} + +#[cfg(not(coverage))] +fn trim_value_queue(queue: &mut VecDeque<(Instant, T)>, now: Instant) { + while let Some((oldest, _)) = queue.front() { + if now.saturating_duration_since(*oldest) > TELEMETRY_WINDOW { + let _ = queue.pop_front(); + } else { + break; + } + } +} + +#[cfg(not(coverage))] +fn compute_jitter_ms(samples: &VecDeque<(Instant, f32)>) -> f32 { + if samples.len() < 2 { + return 0.0; + } + let mean = samples.iter().map(|(_, value)| *value).sum::() / samples.len() as f32; + samples + .iter() + .map(|(_, value)| (value - mean).abs()) + .sum::() + / samples.len() as f32 +} + #[cfg(test)] mod tests { use super::{ DEFAULT_EYE_SOURCE_HEIGHT, DEFAULT_EYE_SOURCE_WIDTH, PREVIEW_HEIGHT, PREVIEW_WIDTH, - PreviewSurface, + PreviewSurface, PreviewTelemetry, }; + use std::time::{Duration, Instant}; #[test] fn inline_preview_profile_uses_existing_defaults() { @@ -1027,4 +1244,25 @@ mod tests { assert_eq!(profile.requested_fps, 30); assert_eq!(profile.max_bitrate_kbit, 12_000); } + + #[test] + fn preview_telemetry_reports_fps_jitter_loss_and_drop_metrics() { + let mut telemetry = PreviewTelemetry::default(); + let start = Instant::now(); + telemetry.record_packet_at(start, 1, 30, 0, 1); + telemetry.record_presented_frame_at(start + Duration::from_millis(5)); + telemetry.record_packet_at(start + Duration::from_millis(33), 2, 30, 0, 1); + telemetry.record_presented_frame_at(start + Duration::from_millis(37)); + telemetry.record_packet_at(start + Duration::from_millis(80), 4, 27, 2, 3); + telemetry.record_presented_frame_at(start + Duration::from_millis(90)); + + let snapshot = telemetry.snapshot_at(start + Duration::from_millis(120)); + assert!(snapshot.receive_fps >= 12.0); + assert!(snapshot.present_fps >= 12.0); + assert_eq!(snapshot.server_fps, 27.0); + assert!(snapshot.jitter_ms > 0.0); + assert!(snapshot.packet_loss_pct > 0.0); + assert_eq!(snapshot.dropped_frames, 2); + assert_eq!(snapshot.queue_depth, 3); + } } diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index 55191af..b5b5501 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -5,7 +5,7 @@ use { super::clipboard::send_clipboard_text_to_remote, super::device_test::{DeviceTestController, DeviceTestKind}, super::devices::DeviceCatalog, - super::diagnostics::quality_probe_command, + super::diagnostics::{PerformanceSample, quality_probe_command}, super::launcher_clipboard_control_path, super::launcher_focus_signal_path, super::power::{fetch_capture_power, set_capture_power_mode}, @@ -24,15 +24,16 @@ use { stop_child_process, toggle_key_label, update_test_action_result, write_input_routing_request, }, - crate::handshake::{PeerCaps, negotiate}, + crate::handshake::{HandshakeProbe, probe}, crate::output::display::enumerate_monitors, gtk::glib, gtk::prelude::*, lesavka_common::lesavka::CapturePowerCommand, std::cell::{Cell, RefCell}, + std::collections::VecDeque, std::process::Command, std::rc::Rc, - std::time::Duration, + std::time::{Duration, Instant}, }; #[cfg(not(coverage))] @@ -48,7 +49,7 @@ enum RelayMessage { #[cfg(not(coverage))] enum CapsMessage { - Refresh(PeerCaps), + Refresh(HandshakeProbe), } #[cfg(not(coverage))] @@ -56,6 +57,86 @@ enum ClipboardMessage { Finished(std::result::Result), } +#[cfg(not(coverage))] +const NETWORK_TELEMETRY_WINDOW: Duration = Duration::from_secs(8); + +#[cfg(not(coverage))] +#[derive(Default)] +struct NetworkTelemetry { + rtt_samples: VecDeque<(Instant, f32)>, + failures: VecDeque, +} + +#[cfg(not(coverage))] +#[derive(Clone, Copy, Debug, Default)] +struct NetworkSnapshot { + rtt_ms: f32, + jitter_ms: f32, + probe_loss_pct: f32, +} + +#[cfg(not(coverage))] +impl NetworkTelemetry { + fn record(&mut self, probe: &HandshakeProbe) { + let now = Instant::now(); + self.trim(now); + if let Some(rtt_ms) = probe.rtt_ms { + self.rtt_samples.push_back((now, rtt_ms)); + } else { + self.failures.push_back(now); + } + self.trim(now); + } + + fn snapshot(&mut self) -> NetworkSnapshot { + let now = Instant::now(); + self.trim(now); + let rtt_ms = self.rtt_samples.back().map(|(_, rtt)| *rtt).unwrap_or(0.0); + let jitter_ms = network_jitter_ms(&self.rtt_samples); + let probe_count = self.rtt_samples.len() + self.failures.len(); + let probe_loss_pct = if probe_count == 0 { + 0.0 + } else { + self.failures.len() as f32 * 100.0 / probe_count as f32 + }; + NetworkSnapshot { + rtt_ms, + jitter_ms, + probe_loss_pct, + } + } + + fn trim(&mut self, now: Instant) { + while let Some((oldest, _)) = self.rtt_samples.front().copied() { + if now.saturating_duration_since(oldest) > NETWORK_TELEMETRY_WINDOW { + let _ = self.rtt_samples.pop_front(); + } else { + break; + } + } + while let Some(oldest) = self.failures.front().copied() { + if now.saturating_duration_since(oldest) > NETWORK_TELEMETRY_WINDOW { + let _ = self.failures.pop_front(); + } else { + break; + } + } + } +} + +#[cfg(not(coverage))] +fn network_jitter_ms(samples: &VecDeque<(Instant, f32)>) -> f32 { + if samples.len() < 2 { + return 0.0; + } + let mean = samples.iter().map(|(_, value)| *value).sum::() / samples.len() as f32; + samples + .iter() + .map(|(_, value)| (value - mean).abs()) + .sum::() + / samples.len() as f32 +} + #[cfg(not(coverage))] fn request_capture_power_refresh( power_tx: std::sync::mpsc::Sender, @@ -96,11 +177,11 @@ fn request_handshake_caps( let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build(); - let caps = match runtime { - Ok(runtime) => runtime.block_on(negotiate(&server_addr)), - Err(_) => PeerCaps::default(), + let probe = match runtime { + Ok(runtime) => runtime.block_on(probe(&server_addr)), + Err(_) => HandshakeProbe::default(), }; - let _ = caps_tx.send(CapsMessage::Refresh(caps)); + let _ = caps_tx.send(CapsMessage::Refresh(probe)); }); } @@ -135,6 +216,60 @@ fn refresh_eye_feed_controls( } } +#[cfg(not(coverage))] +fn record_diagnostics_sample( + widgets: &super::ui_components::LauncherWidgets, + state: &LauncherState, + preview: Option<&super::preview::LauncherPreview>, + network: NetworkSnapshot, +) { + let left_metrics = preview + .and_then(|preview| { + preview.snapshot_metrics( + 0, + match state.display_surface(0) { + DisplaySurface::Preview => super::preview::PreviewSurface::Inline, + DisplaySurface::Window => super::preview::PreviewSurface::Window, + }, + ) + }) + .unwrap_or_default(); + let right_metrics = preview + .and_then(|preview| { + preview.snapshot_metrics( + 1, + match state.display_surface(1) { + DisplaySurface::Preview => super::preview::PreviewSurface::Inline, + DisplaySurface::Window => super::preview::PreviewSurface::Window, + }, + ) + }) + .unwrap_or_default(); + + widgets + .diagnostics_log + .borrow_mut() + .record(PerformanceSample { + rtt_ms: network.rtt_ms, + jitter_ms: network.jitter_ms, + input_latency_ms: network.rtt_ms * 0.5, + probe_loss_pct: network.probe_loss_pct, + video_loss_pct: left_metrics + .packet_loss_pct + .max(right_metrics.packet_loss_pct), + left_receive_fps: left_metrics.receive_fps, + left_present_fps: left_metrics.present_fps, + left_server_fps: left_metrics.server_fps, + right_receive_fps: right_metrics.receive_fps, + right_present_fps: right_metrics.present_fps, + right_server_fps: right_metrics.server_fps, + dropped_frames: left_metrics + .dropped_frames + .saturating_add(right_metrics.dropped_frames), + queue_depth: left_metrics.queue_depth.max(right_metrics.queue_depth), + }); +} + #[cfg(not(coverage))] fn largest_monitor_size() -> (u32, u32) { let (width, height) = enumerate_monitors() @@ -385,6 +520,12 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let (relay_tx, relay_rx) = std::sync::mpsc::channel::(); let relay_request_in_flight = Rc::new(Cell::new(false)); let (caps_tx, caps_rx) = std::sync::mpsc::channel::(); + let caps_request_in_flight = Rc::new(Cell::new(false)); + let diagnostics_network = Rc::new(RefCell::new(NetworkTelemetry::default())); + let next_diagnostics_probe = + Rc::new(Cell::new(Instant::now() + Duration::from_millis(250))); + let next_diagnostics_sample = + Rc::new(Cell::new(Instant::now() + Duration::from_secs(1))); let (clipboard_tx, clipboard_rx) = std::sync::mpsc::channel::(); let (log_tx, log_rx) = std::sync::mpsc::channel::(); @@ -470,6 +611,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { selected_server_addr(&server_entry, server_addr.as_ref()), Duration::ZERO, ); + caps_request_in_flight.set(true); request_handshake_caps( caps_tx.clone(), selected_server_addr(&server_entry, server_addr.as_ref()), @@ -486,6 +628,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let preview = preview.clone(); let power_tx = power_tx.clone(); let caps_tx = caps_tx.clone(); + let caps_request_in_flight = Rc::clone(&caps_request_in_flight); server_entry.connect_changed(move |_| { let server_addr = selected_server_addr(&server_entry_read, server_addr_fallback.as_ref()); @@ -503,6 +646,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { server_addr.clone(), Duration::from_millis(150), ); + caps_request_in_flight.set(true); request_handshake_caps(caps_tx.clone(), server_addr, Duration::from_millis(150)); }); } @@ -1222,6 +1366,11 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let relay_request_in_flight = Rc::clone(&relay_request_in_flight); let preview = preview.clone(); let power_tx = power_tx.clone(); + let caps_tx = caps_tx.clone(); + let caps_request_in_flight = Rc::clone(&caps_request_in_flight); + let diagnostics_network = Rc::clone(&diagnostics_network); + let next_diagnostics_probe = Rc::clone(&next_diagnostics_probe); + let next_diagnostics_sample = Rc::clone(&next_diagnostics_sample); let log_tx = log_tx.clone(); glib::timeout_add_local(Duration::from_millis(180), move || { let child_running = reap_exited_child(&child_proc); @@ -1435,10 +1584,18 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { } while let Ok(message) = caps_rx.try_recv() { + caps_request_in_flight.set(false); match message { - CapsMessage::Refresh(caps) => { + CapsMessage::Refresh(probe_result) => { + diagnostics_network.borrow_mut().record(&probe_result); + let caps = probe_result.caps; { let mut state = state.borrow_mut(); + if probe_result.reachable { + state.set_server_available(true); + } else if child_proc.borrow().is_none() { + state.set_server_available(false); + } state.set_server_version(caps.server_version.clone()); } if let (Some(width), Some(height)) = @@ -1490,6 +1647,26 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { } } + let now = Instant::now(); + if now >= next_diagnostics_probe.get() && !caps_request_in_flight.get() { + caps_request_in_flight.set(true); + let server_addr = + selected_server_addr(&server_entry, server_addr_fallback.as_ref()); + request_handshake_caps(caps_tx.clone(), server_addr, Duration::ZERO); + next_diagnostics_probe.set(now + Duration::from_secs(2)); + } + + if now >= next_diagnostics_sample.get() { + let network = diagnostics_network.borrow_mut().snapshot(); + record_diagnostics_sample( + &widgets, + &state.borrow(), + preview.as_ref().map(|preview| preview.as_ref()), + network, + ); + next_diagnostics_sample.set(now + Duration::from_secs(1)); + } + let child_running = child_proc.borrow().is_some(); refresh_launcher_ui(&widgets, &state.borrow(), child_running); refresh_test_buttons(&widgets, &mut tests.borrow_mut()); diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 519186b..b8d3b00 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -12,7 +12,15 @@ message MonitorRequest { uint32 requested_height = 4; uint32 requested_fps = 5; } -message VideoPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } +message VideoPacket { + uint32 id = 1; + uint64 pts = 2; + bytes data = 3; + uint64 seq = 4; + uint32 effective_fps = 5; + uint64 dropped_total = 6; + uint32 queue_depth = 7; +} message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } message ResetUsbReply { bool ok = 1; } // true = success diff --git a/server/src/video.rs b/server/src/video.rs index b8bbe50..06569f8 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -322,6 +322,7 @@ pub async fn eye_ball_with_request( id: id.min(1), pts: 0, data: vec![0, 0, 0, 1, 0x65, 0x88, 0x84], + ..Default::default() })); Ok(VideoStream { @@ -381,10 +382,12 @@ pub async fn eye_ball_with_request( let effective_fps = Arc::new(AtomicU32::new(target_fps)); let dropped_window = Arc::new(AtomicU64::new(0)); + let dropped_total = Arc::new(AtomicU64::new(0)); let sent_window = Arc::new(AtomicU64::new(0)); let last_adjust_sec = Arc::new(AtomicU64::new(0)); let wait_for_idr = Arc::new(AtomicBool::new(false)); let last_sent = Arc::new(AtomicU64::new(0)); + let packet_seq = Arc::new(AtomicU64::new(0)); let queue_buffers = env_u32("LESAVKA_EYE_QUEUE_BUFFERS", 8).max(1); let appsink_buffers = env_u32("LESAVKA_EYE_APPSINK_BUFFERS", 8).max(1); @@ -461,6 +464,9 @@ pub async fn eye_ball_with_request( } let eye_name = eye.to_string(); + let dropped_total_for_cb = Arc::clone(&dropped_total); + let packet_seq_for_cb = Arc::clone(&packet_seq); + let effective_fps_for_cb = Arc::clone(&effective_fps); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |sink| { @@ -546,7 +552,16 @@ pub async fn eye_ball_with_request( let data = map.as_slice().to_vec(); let size = data.len(); - let pkt = VideoPacket { id, pts: pts_us, data }; + let seq = packet_seq_for_cb.fetch_add(1, Ordering::Relaxed) + 1; + let pkt = VideoPacket { + id, + pts: pts_us, + data, + seq, + effective_fps: effective_fps_for_cb.load(Ordering::Relaxed).max(1), + dropped_total: dropped_total_for_cb.load(Ordering::Relaxed), + queue_depth: (chan_capacity.saturating_sub(tx.capacity())) as u32, + }; match tx.try_send(Ok(pkt)) { Ok(_) => { sent_window.fetch_add(1, Ordering::Relaxed); @@ -557,6 +572,7 @@ pub async fn eye_ball_with_request( } Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { dropped_window.fetch_add(1, Ordering::Relaxed); + dropped_total_for_cb.fetch_add(1, Ordering::Relaxed); wait_for_idr.store(true, Ordering::Relaxed); static DROP_CNT: AtomicU64 = AtomicU64::new(0); let dropped = DROP_CNT.fetch_add(1, Ordering::Relaxed); diff --git a/testing/tests/client_output_video_include_contract.rs b/testing/tests/client_output_video_include_contract.rs index f775fda..3993f8b 100644 --- a/testing/tests/client_output_video_include_contract.rs +++ b/testing/tests/client_output_video_include_contract.rs @@ -107,6 +107,7 @@ mod video_include_contract { id: 0, pts: 5, data: vec![0, 0, 0, 1, 0x67], + ..Default::default() }); } }); @@ -198,6 +199,7 @@ exit 0 id: 1, pts: 12_345, data: vec![0, 0, 0, 1, 0x65], + ..Default::default() }); } @@ -229,11 +231,13 @@ exit 0 id: 0, pts: 100, data: vec![0, 0, 0, 1, 0x65], + ..Default::default() }); window.push_packet(VideoPacket { id: 1, pts: 101, data: vec![0, 0, 0, 1, 0x67], + ..Default::default() }); } Err(err) => { diff --git a/testing/tests/client_runtime_smoke_contract.rs b/testing/tests/client_runtime_smoke_contract.rs index 440bbd2..f72e966 100644 --- a/testing/tests/client_runtime_smoke_contract.rs +++ b/testing/tests/client_runtime_smoke_contract.rs @@ -130,6 +130,7 @@ fn monitor_window_constructor_and_push_are_stable() { id: 0, pts: 0, data: vec![0, 0, 0, 1, 0x65], + ..Default::default() }); } Err(err) => { diff --git a/testing/tests/server_main_binary_extra_contract.rs b/testing/tests/server_main_binary_extra_contract.rs index 100ec8a..af10a1b 100644 --- a/testing/tests/server_main_binary_extra_contract.rs +++ b/testing/tests/server_main_binary_extra_contract.rs @@ -394,6 +394,7 @@ mod server_main_binary_extra { id: 2, pts: 1, data: vec![0, 1, 2, 3], + ..Default::default() }) .await .expect("send camera packet"); @@ -478,6 +479,7 @@ mod server_main_binary_extra { id: 2, pts: 42, data: vec![9, 8, 7], + ..Default::default() }; let mut guarded = GuardedVideoStream { inner: stream::iter(vec![Ok(packet.clone())]), diff --git a/testing/tests/server_video_include_contract.rs b/testing/tests/server_video_include_contract.rs index 134b52d..3cf0d07 100644 --- a/testing/tests/server_video_include_contract.rs +++ b/testing/tests/server_video_include_contract.rs @@ -39,6 +39,7 @@ mod video_include_contract { id: 1, pts: 42, data: vec![1, 2, 3, 4], + ..Default::default() })) .await .expect("send packet"); diff --git a/testing/tests/server_video_sink_smoke_contract.rs b/testing/tests/server_video_sink_smoke_contract.rs index e499fb7..9d87916 100644 --- a/testing/tests/server_video_sink_smoke_contract.rs +++ b/testing/tests/server_video_sink_smoke_contract.rs @@ -32,6 +32,7 @@ fn webcam_sink_constructor_is_stable_for_missing_uvc_device() { id: 2, pts: 0, data: vec![0xFF, 0xD8, 0xFF, 0xD9], + ..Default::default() }), Err(err) => assert!(!err.to_string().trim().is_empty()), } @@ -47,6 +48,7 @@ fn hdmi_sink_constructor_and_push_are_stable_with_override() { id: 2, pts: 0, data: vec![0, 0, 0, 1, 0x65], + ..Default::default() }), Err(err) => assert!(!err.to_string().trim().is_empty()), } @@ -63,6 +65,7 @@ fn camera_relay_hdmi_constructor_and_feed_are_stable() { id: 2, pts: 123, data: vec![0xFF, 0xD8, 0xFF, 0xD9], + ..Default::default() }), Err(err) => assert!(!err.to_string().trim().is_empty()), } @@ -78,6 +81,7 @@ fn camera_relay_uvc_constructor_is_stable_for_missing_device() { id: 2, pts: 321, data: vec![0xFF, 0xD8, 0xFF, 0xD9], + ..Default::default() }), Err(err) => assert!(!err.to_string().trim().is_empty()), } @@ -92,6 +96,7 @@ fn webcam_sink_h264_constructor_path_is_stable() { id: 3, pts: 55, data: vec![0, 0, 0, 1, 0x65], + ..Default::default() }), Err(err) => assert!(!err.to_string().trim().is_empty()), } @@ -107,6 +112,7 @@ fn hdmi_sink_mjpeg_constructor_path_is_stable() { id: 4, pts: 99, data: vec![0xFF, 0xD8, 0xFF, 0xD9], + ..Default::default() }), Err(err) => assert!(!err.to_string().trim().is_empty()), } diff --git a/testing/tests/server_video_sinks_include_contract.rs b/testing/tests/server_video_sinks_include_contract.rs index d8f7857..eca2b9c 100644 --- a/testing/tests/server_video_sinks_include_contract.rs +++ b/testing/tests/server_video_sinks_include_contract.rs @@ -80,6 +80,7 @@ mod video_sinks_include_contract { id: 8, pts: 1, data: vec![0xFF, 0xD8, 0xFF, 0xD9], + ..Default::default() }); } }); @@ -95,6 +96,7 @@ mod video_sinks_include_contract { id: 9, pts: 2, data: vec![0xFF, 0xD8, 0xFF, 0xD9], + ..Default::default() }); } } @@ -109,6 +111,7 @@ mod video_sinks_include_contract { id: 3, pts: 3, data: vec![0, 0, 0, 1, 0x65, 0x88, 0x84], + ..Default::default() }); } });