lesavka: add live stream telemetry

This commit is contained in:
Brad Stein 2026-04-16 21:18:34 -03:00
parent 77f80398ce
commit e8670a1c58
13 changed files with 688 additions and 24 deletions

View File

@ -2,7 +2,7 @@
#![forbid(unsafe_code)]
use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::time::timeout;
use tonic::{Code, transport::Endpoint};
use tracing::{info, warn};
@ -23,6 +23,14 @@ pub struct PeerCaps {
pub eye_fps: Option<u32>,
}
#[derive(Default, Clone, Debug)]
#[cfg_attr(test, derive(PartialEq))]
pub struct HandshakeProbe {
pub caps: PeerCaps,
pub rtt_ms: Option<f32>,
pub reachable: bool,
}
fn likely_port_typo_hint(uri: &str) -> Option<&'static str> {
if uri.contains(":5005") && !uri.contains(":50051") {
Some("possible typo: lesavka server listens on port 50051")
@ -82,6 +90,66 @@ pub async fn negotiate(uri: &str) -> PeerCaps {
}
}
#[cfg(coverage)]
pub async fn probe(uri: &str) -> HandshakeProbe {
if likely_port_typo_hint(uri).is_some() {
return HandshakeProbe::default();
}
let started = Instant::now();
let ep = match Endpoint::from_shared(uri.to_owned()) {
Ok(ep) => ep
.tcp_nodelay(true)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_timeout(Duration::from_secs(5)),
Err(_) => return HandshakeProbe::default(),
};
let channel = match timeout(Duration::from_secs(8), ep.connect()).await {
Ok(Ok(channel)) => channel,
_ => return HandshakeProbe::default(),
};
let mut cli = HandshakeClient::new(channel);
let rtt_ms = started.elapsed().as_secs_f32() * 1000.0;
match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await {
Ok(Ok(rsp)) => {
let rsp = rsp.get_ref();
HandshakeProbe {
caps: PeerCaps {
camera: rsp.camera,
microphone: rsp.microphone,
server_version: (!rsp.server_version.is_empty())
.then_some(rsp.server_version.clone()),
camera_output: (!rsp.camera_output.is_empty())
.then_some(rsp.camera_output.clone()),
camera_codec: (!rsp.camera_codec.is_empty())
.then_some(rsp.camera_codec.clone()),
camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width),
camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height),
camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps),
eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width),
eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height),
eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps),
},
rtt_ms: Some(rtt_ms),
reachable: true,
}
}
Ok(Err(e)) if e.code() == Code::Unimplemented => HandshakeProbe {
caps: PeerCaps::default(),
rtt_ms: Some(rtt_ms),
reachable: true,
},
Ok(Err(_)) => HandshakeProbe {
caps: PeerCaps::default(),
rtt_ms: Some(rtt_ms),
reachable: true,
},
Err(_) => HandshakeProbe::default(),
}
}
#[cfg(not(coverage))]
pub async fn negotiate(uri: &str) -> PeerCaps {
info!(%uri, "🤝 dial handshake");
@ -196,9 +264,93 @@ pub async fn negotiate(uri: &str) -> PeerCaps {
PeerCaps::default()
}
#[cfg(not(coverage))]
pub async fn probe(uri: &str) -> HandshakeProbe {
info!(%uri, "🧪 probing handshake");
let Some(hint) = likely_port_typo_hint(uri) else {
let started = Instant::now();
let ep = match Endpoint::from_shared(uri.to_owned()) {
Ok(ep) => ep
.tcp_nodelay(true)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_timeout(Duration::from_secs(5)),
Err(e) => {
warn!("🧪 invalid probe endpoint '{uri}': {e}");
return HandshakeProbe::default();
}
};
let channel = match timeout(Duration::from_secs(8), ep.connect()).await {
Ok(Ok(channel)) => channel,
Ok(Err(e)) => {
warn!("🧪 handshake probe connect failed: {e}");
return HandshakeProbe::default();
}
Err(_) => {
warn!("🧪 handshake probe timed out");
return HandshakeProbe::default();
}
};
let mut cli = HandshakeClient::new(channel);
let rtt_ms = started.elapsed().as_secs_f32() * 1000.0;
return match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await {
Ok(Ok(rsp)) => {
let rsp = rsp.get_ref();
let caps = PeerCaps {
camera: rsp.camera,
microphone: rsp.microphone,
server_version: (!rsp.server_version.is_empty())
.then_some(rsp.server_version.clone()),
camera_output: (!rsp.camera_output.is_empty())
.then_some(rsp.camera_output.clone()),
camera_codec: (!rsp.camera_codec.is_empty())
.then_some(rsp.camera_codec.clone()),
camera_width: (rsp.camera_width != 0).then_some(rsp.camera_width),
camera_height: (rsp.camera_height != 0).then_some(rsp.camera_height),
camera_fps: (rsp.camera_fps != 0).then_some(rsp.camera_fps),
eye_width: (rsp.eye_width != 0).then_some(rsp.eye_width),
eye_height: (rsp.eye_height != 0).then_some(rsp.eye_height),
eye_fps: (rsp.eye_fps != 0).then_some(rsp.eye_fps),
};
info!(rtt_ms, ?caps, "🧪 handshake probe ok");
HandshakeProbe {
caps,
rtt_ms: Some(rtt_ms),
reachable: true,
}
}
Ok(Err(e)) if e.code() == Code::Unimplemented => {
warn!("🧪 handshake probe reached a server without the handshake service");
HandshakeProbe {
caps: PeerCaps::default(),
rtt_ms: Some(rtt_ms),
reachable: true,
}
}
Ok(Err(e)) => {
warn!("🧪 handshake probe RPC failed: {e}");
HandshakeProbe {
caps: PeerCaps::default(),
rtt_ms: Some(rtt_ms),
reachable: true,
}
}
Err(_) => {
warn!("🧪 handshake probe RPC timed out");
HandshakeProbe::default()
}
};
};
warn!("🧪 handshake probe endpoint '{uri}' looks wrong ({hint})");
HandshakeProbe::default()
}
#[cfg(test)]
mod tests {
use super::{PeerCaps, likely_port_typo_hint, negotiate};
use super::{HandshakeProbe, PeerCaps, likely_port_typo_hint, negotiate, probe};
#[test]
fn likely_port_typo_hint_flags_common_port_mistype() {
@ -220,4 +372,10 @@ mod tests {
let caps = negotiate("http://127.0.0.1:5005").await;
assert_eq!(caps, PeerCaps::default());
}
#[tokio::test]
async fn probe_returns_defaults_for_port_typo_hint() {
let probe_result = probe("http://127.0.0.1:5005").await;
assert_eq!(probe_result, HandshakeProbe::default());
}
}

View File

@ -214,6 +214,7 @@ impl CameraCapture {
id: 2,
pts,
data: map.as_slice().to_vec(),
..Default::default()
})
}

View File

@ -7,9 +7,16 @@ use super::state::{InputRouting, LauncherState, ViewMode};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PerformanceSample {
pub rtt_ms: f32,
pub jitter_ms: f32,
pub input_latency_ms: f32,
pub left_fps: f32,
pub right_fps: f32,
pub probe_loss_pct: f32,
pub video_loss_pct: f32,
pub left_receive_fps: f32,
pub left_present_fps: f32,
pub left_server_fps: f32,
pub right_receive_fps: f32,
pub right_present_fps: f32,
pub right_server_fps: f32,
pub dropped_frames: u64,
pub queue_depth: u32,
}
@ -224,11 +231,18 @@ impl SnapshotReport {
for sample in &self.recent_samples {
let _ = writeln!(
text,
" rtt={:.1}ms input={:.1}ms left={:.1}fps right={:.1}fps dropped={} queue={}",
" rtt={:.1}ms jitter={:.1}ms input-floor={:.1}ms probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}",
sample.rtt_ms,
sample.jitter_ms,
sample.input_latency_ms,
sample.left_fps,
sample.right_fps,
sample.probe_loss_pct,
sample.video_loss_pct,
sample.left_receive_fps,
sample.left_present_fps,
sample.left_server_fps,
sample.right_receive_fps,
sample.right_present_fps,
sample.right_server_fps,
sample.dropped_frames,
sample.queue_depth
);
@ -267,10 +281,38 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec<Strin
}
if log.is_empty() {
items.push(
"Live RTT, jitter, packet-loss, and queue samples are the next diagnostics tranche; this panel currently reports launcher/session state."
"Live stream samples will appear here after the launcher collects a few probe windows. Leave the relay up for a few seconds to populate RTT, jitter, loss, and fps."
.to_string(),
);
}
if let Some(sample) = log.latest() {
if sample.probe_loss_pct >= 3.0 || sample.jitter_ms >= 18.0 {
items.push(
"Probe jitter/loss is elevated. A wired client connection or a gentler capture profile will usually help more than changing the breakout size."
.to_string(),
);
}
if sample.video_loss_pct >= 2.0 || sample.dropped_frames > 0 {
items.push(
"Video packets are arriving with gaps or server-side drops. Try 900p or 720p capture first, then watch whether dropped frames and video-loss fall."
.to_string(),
);
}
if sample.left_present_fps + 1.0 < sample.left_receive_fps
|| sample.right_present_fps + 1.0 < sample.right_receive_fps
{
items.push(
"The client is receiving more frames than it is presenting. That points at local decode/render pressure, so prefer lighter breakout sizes or hardware decode."
.to_string(),
);
}
if sample.queue_depth > 8 {
items.push(
"The preview queue is backing up. When queue depth climbs, expect laggy mouse feel and delayed visual response even if raw fps still looks okay."
.to_string(),
);
}
}
let heavy_capture = state.capture_sizes.iter().any(|preset| {
matches!(
preset,
@ -305,9 +347,16 @@ mod tests {
fn sample(n: u64) -> PerformanceSample {
PerformanceSample {
rtt_ms: 20.0 + n as f32,
jitter_ms: 3.0 + n as f32,
input_latency_ms: 10.0 + n as f32,
left_fps: 30.0,
right_fps: 30.0,
probe_loss_pct: n as f32,
video_loss_pct: (n as f32) * 0.5,
left_receive_fps: 30.0,
left_present_fps: 29.0,
left_server_fps: 30.0,
right_receive_fps: 30.0,
right_present_fps: 28.0,
right_server_fps: 30.0,
dropped_frames: n,
queue_depth: n as u32,
}

View File

@ -13,11 +13,13 @@ use gtk::{gdk, glib};
#[cfg(not(coverage))]
use lesavka_common::lesavka::{MonitorRequest, VideoPacket, relay_client::RelayClient};
#[cfg(not(coverage))]
use std::collections::VecDeque;
#[cfg(not(coverage))]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[cfg(not(coverage))]
use std::sync::{Arc, Mutex};
#[cfg(not(coverage))]
use std::time::Duration;
use std::time::{Duration, Instant};
#[cfg(not(coverage))]
use tonic::{Request, transport::Channel};
#[cfg(not(coverage))]
@ -33,6 +35,8 @@ const DEFAULT_EYE_SOURCE_WIDTH: i32 = 1920;
const DEFAULT_EYE_SOURCE_HEIGHT: i32 = 1080;
#[cfg(not(coverage))]
const PREVIEW_IDLE_STATUS: &str = "Connect relay to preview.";
#[cfg(not(coverage))]
const TELEMETRY_WINDOW: Duration = Duration::from_secs(5);
#[cfg(not(coverage))]
pub struct LauncherPreview {
@ -57,6 +61,18 @@ pub enum PreviewSurface {
Window,
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub struct PreviewMetricsSnapshot {
pub receive_fps: f32,
pub present_fps: f32,
pub server_fps: f32,
pub jitter_ms: f32,
pub packet_loss_pct: f32,
pub dropped_frames: u64,
pub queue_depth: u32,
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug)]
struct PreviewProfile {
@ -194,6 +210,27 @@ impl LauncherPreview {
}
}
pub fn snapshot_metrics(
&self,
monitor_id: usize,
surface: PreviewSurface,
) -> Option<PreviewMetricsSnapshot> {
match surface {
PreviewSurface::Inline => self
.inline_feeds
.lock()
.ok()
.and_then(|feeds| feeds.get(monitor_id).cloned())
.map(|feed| feed.snapshot_metrics()),
PreviewSurface::Window => self
.window_feeds
.lock()
.ok()
.and_then(|feeds| feeds.get(monitor_id).cloned())
.map(|feed| feed.snapshot_metrics()),
}
}
pub fn set_capture_profile(
&self,
monitor_id: usize,
@ -329,6 +366,7 @@ struct SharedPreviewState {
clear_picture: bool,
last_logged_error: Option<String>,
last_logged_status: Option<String>,
telemetry: PreviewTelemetry,
}
#[cfg(not(coverage))]
@ -341,6 +379,7 @@ impl SharedPreviewState {
clear_picture: true,
last_logged_error: None,
last_logged_status: None,
telemetry: PreviewTelemetry::default(),
}
}
@ -361,6 +400,7 @@ impl SharedPreviewState {
}
fn push_frame(&mut self, frame: PreviewFrame) {
self.telemetry.record_presented_frame();
self.latest = Some(frame);
self.clear_picture = false;
self.last_logged_error = None;
@ -371,6 +411,113 @@ impl SharedPreviewState {
}
}
#[cfg(not(coverage))]
#[derive(Debug, Default)]
struct PreviewTelemetry {
packet_times: VecDeque<Instant>,
frame_times: VecDeque<Instant>,
packet_intervals_ms: VecDeque<(Instant, f32)>,
packet_losses: VecDeque<(Instant, u64)>,
dropped_deltas: VecDeque<(Instant, u64)>,
last_packet_at: Option<Instant>,
last_seq: Option<u64>,
last_dropped_total: Option<u64>,
latest_server_fps: u32,
latest_queue_depth: u32,
}
#[cfg(not(coverage))]
impl PreviewTelemetry {
fn record_packet(&mut self, seq: u64, server_fps: u32, dropped_total: u64, queue_depth: u32) {
self.record_packet_at(Instant::now(), seq, server_fps, dropped_total, queue_depth);
}
fn record_packet_at(
&mut self,
now: Instant,
seq: u64,
server_fps: u32,
dropped_total: u64,
queue_depth: u32,
) {
self.trim(now);
self.packet_times.push_back(now);
if let Some(previous) = self.last_packet_at.replace(now) {
self.packet_intervals_ms.push_back((
now,
now.saturating_duration_since(previous).as_secs_f32() * 1000.0,
));
}
if seq > 0 {
if let Some(previous_seq) = self.last_seq
&& seq > previous_seq + 1
{
self.packet_losses
.push_back((now, seq.saturating_sub(previous_seq + 1)));
}
self.last_seq = Some(seq);
}
if let Some(previous_dropped) = self.last_dropped_total
&& dropped_total > previous_dropped
{
self.dropped_deltas
.push_back((now, dropped_total.saturating_sub(previous_dropped)));
}
self.last_dropped_total = Some(dropped_total);
self.latest_server_fps = server_fps.max(1);
self.latest_queue_depth = queue_depth;
self.trim(now);
}
fn record_presented_frame(&mut self) {
self.record_presented_frame_at(Instant::now());
}
fn record_presented_frame_at(&mut self, now: Instant) {
self.trim(now);
self.frame_times.push_back(now);
}
fn snapshot(&mut self) -> PreviewMetricsSnapshot {
self.snapshot_at(Instant::now())
}
fn snapshot_at(&mut self, now: Instant) -> PreviewMetricsSnapshot {
self.trim(now);
let receive_fps = events_per_second(&self.packet_times, now);
let present_fps = events_per_second(&self.frame_times, now);
let delivered = self.packet_times.len() as u64;
let packet_losses: u64 = self.packet_losses.iter().map(|(_, loss)| *loss).sum();
let packet_loss_pct = if delivered + packet_losses == 0 {
0.0
} else {
packet_losses as f32 * 100.0 / (delivered + packet_losses) as f32
};
let dropped_frames: u64 = self
.dropped_deltas
.iter()
.map(|(_, dropped)| *dropped)
.sum();
PreviewMetricsSnapshot {
receive_fps,
present_fps,
server_fps: self.latest_server_fps as f32,
jitter_ms: compute_jitter_ms(&self.packet_intervals_ms),
packet_loss_pct,
dropped_frames,
queue_depth: self.latest_queue_depth,
}
}
fn trim(&mut self, now: Instant) {
trim_instant_queue(&mut self.packet_times, now);
trim_instant_queue(&mut self.frame_times, now);
trim_value_queue(&mut self.packet_intervals_ms, now);
trim_value_queue(&mut self.packet_losses, now);
trim_value_queue(&mut self.dropped_deltas, now);
}
}
#[cfg(not(coverage))]
impl PreviewFeed {
fn spawn(
@ -498,6 +645,13 @@ impl PreviewFeed {
active_bindings,
}
}
fn snapshot_metrics(&self) -> PreviewMetricsSnapshot {
self.shared
.lock()
.map(|mut shared| shared.telemetry.snapshot())
.unwrap_or_default()
}
}
#[cfg(not(coverage))]
@ -674,7 +828,10 @@ fn run_preview_feed(
)
.await
{
Ok(Ok(Some(pkt))) => push_preview_packet(&appsrc, pkt),
Ok(Ok(Some(pkt))) => {
record_preview_packet(&shared, &pkt);
push_preview_packet(&appsrc, pkt);
}
Ok(Ok(None)) => {
set_shared_status(
&shared,
@ -964,6 +1121,18 @@ fn push_preview_packet(appsrc: &gst_app::AppSrc, pkt: VideoPacket) {
let _ = appsrc.push_buffer(buf);
}
#[cfg(not(coverage))]
fn record_preview_packet(shared: &Arc<Mutex<SharedPreviewState>>, pkt: &VideoPacket) {
if let Ok(mut slot) = shared.lock() {
slot.telemetry.record_packet(
pkt.seq,
pkt.effective_fps,
pkt.dropped_total,
pkt.queue_depth,
);
}
}
#[cfg(not(coverage))]
fn sample_to_frame(sample: &gst::Sample) -> Option<PreviewFrame> {
let caps = sample.caps()?;
@ -999,12 +1168,60 @@ fn preview_dimension(var: &str, default: i32) -> i32 {
.unwrap_or(default)
}
#[cfg(not(coverage))]
fn events_per_second(events: &VecDeque<Instant>, now: Instant) -> f32 {
let Some(oldest) = events.front().copied() else {
return 0.0;
};
let span = now
.saturating_duration_since(oldest)
.as_secs_f32()
.clamp(0.25, TELEMETRY_WINDOW.as_secs_f32());
events.len() as f32 / span
}
#[cfg(not(coverage))]
fn trim_instant_queue(queue: &mut VecDeque<Instant>, now: Instant) {
while let Some(oldest) = queue.front().copied() {
if now.saturating_duration_since(oldest) > TELEMETRY_WINDOW {
let _ = queue.pop_front();
} else {
break;
}
}
}
#[cfg(not(coverage))]
fn trim_value_queue<T>(queue: &mut VecDeque<(Instant, T)>, now: Instant) {
while let Some((oldest, _)) = queue.front() {
if now.saturating_duration_since(*oldest) > TELEMETRY_WINDOW {
let _ = queue.pop_front();
} else {
break;
}
}
}
#[cfg(not(coverage))]
fn compute_jitter_ms(samples: &VecDeque<(Instant, f32)>) -> f32 {
if samples.len() < 2 {
return 0.0;
}
let mean = samples.iter().map(|(_, value)| *value).sum::<f32>() / samples.len() as f32;
samples
.iter()
.map(|(_, value)| (value - mean).abs())
.sum::<f32>()
/ samples.len() as f32
}
#[cfg(test)]
mod tests {
use super::{
DEFAULT_EYE_SOURCE_HEIGHT, DEFAULT_EYE_SOURCE_WIDTH, PREVIEW_HEIGHT, PREVIEW_WIDTH,
PreviewSurface,
PreviewSurface, PreviewTelemetry,
};
use std::time::{Duration, Instant};
#[test]
fn inline_preview_profile_uses_existing_defaults() {
@ -1027,4 +1244,25 @@ mod tests {
assert_eq!(profile.requested_fps, 30);
assert_eq!(profile.max_bitrate_kbit, 12_000);
}
#[test]
fn preview_telemetry_reports_fps_jitter_loss_and_drop_metrics() {
let mut telemetry = PreviewTelemetry::default();
let start = Instant::now();
telemetry.record_packet_at(start, 1, 30, 0, 1);
telemetry.record_presented_frame_at(start + Duration::from_millis(5));
telemetry.record_packet_at(start + Duration::from_millis(33), 2, 30, 0, 1);
telemetry.record_presented_frame_at(start + Duration::from_millis(37));
telemetry.record_packet_at(start + Duration::from_millis(80), 4, 27, 2, 3);
telemetry.record_presented_frame_at(start + Duration::from_millis(90));
let snapshot = telemetry.snapshot_at(start + Duration::from_millis(120));
assert!(snapshot.receive_fps >= 12.0);
assert!(snapshot.present_fps >= 12.0);
assert_eq!(snapshot.server_fps, 27.0);
assert!(snapshot.jitter_ms > 0.0);
assert!(snapshot.packet_loss_pct > 0.0);
assert_eq!(snapshot.dropped_frames, 2);
assert_eq!(snapshot.queue_depth, 3);
}
}

View File

@ -5,7 +5,7 @@ use {
super::clipboard::send_clipboard_text_to_remote,
super::device_test::{DeviceTestController, DeviceTestKind},
super::devices::DeviceCatalog,
super::diagnostics::quality_probe_command,
super::diagnostics::{PerformanceSample, quality_probe_command},
super::launcher_clipboard_control_path,
super::launcher_focus_signal_path,
super::power::{fetch_capture_power, set_capture_power_mode},
@ -24,15 +24,16 @@ use {
stop_child_process, toggle_key_label, update_test_action_result,
write_input_routing_request,
},
crate::handshake::{PeerCaps, negotiate},
crate::handshake::{HandshakeProbe, probe},
crate::output::display::enumerate_monitors,
gtk::glib,
gtk::prelude::*,
lesavka_common::lesavka::CapturePowerCommand,
std::cell::{Cell, RefCell},
std::collections::VecDeque,
std::process::Command,
std::rc::Rc,
std::time::Duration,
std::time::{Duration, Instant},
};
#[cfg(not(coverage))]
@ -48,7 +49,7 @@ enum RelayMessage {
#[cfg(not(coverage))]
enum CapsMessage {
Refresh(PeerCaps),
Refresh(HandshakeProbe),
}
#[cfg(not(coverage))]
@ -56,6 +57,86 @@ enum ClipboardMessage {
Finished(std::result::Result<String, String>),
}
#[cfg(not(coverage))]
const NETWORK_TELEMETRY_WINDOW: Duration = Duration::from_secs(8);
#[cfg(not(coverage))]
#[derive(Default)]
struct NetworkTelemetry {
rtt_samples: VecDeque<(Instant, f32)>,
failures: VecDeque<Instant>,
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug, Default)]
struct NetworkSnapshot {
rtt_ms: f32,
jitter_ms: f32,
probe_loss_pct: f32,
}
#[cfg(not(coverage))]
impl NetworkTelemetry {
fn record(&mut self, probe: &HandshakeProbe) {
let now = Instant::now();
self.trim(now);
if let Some(rtt_ms) = probe.rtt_ms {
self.rtt_samples.push_back((now, rtt_ms));
} else {
self.failures.push_back(now);
}
self.trim(now);
}
fn snapshot(&mut self) -> NetworkSnapshot {
let now = Instant::now();
self.trim(now);
let rtt_ms = self.rtt_samples.back().map(|(_, rtt)| *rtt).unwrap_or(0.0);
let jitter_ms = network_jitter_ms(&self.rtt_samples);
let probe_count = self.rtt_samples.len() + self.failures.len();
let probe_loss_pct = if probe_count == 0 {
0.0
} else {
self.failures.len() as f32 * 100.0 / probe_count as f32
};
NetworkSnapshot {
rtt_ms,
jitter_ms,
probe_loss_pct,
}
}
fn trim(&mut self, now: Instant) {
while let Some((oldest, _)) = self.rtt_samples.front().copied() {
if now.saturating_duration_since(oldest) > NETWORK_TELEMETRY_WINDOW {
let _ = self.rtt_samples.pop_front();
} else {
break;
}
}
while let Some(oldest) = self.failures.front().copied() {
if now.saturating_duration_since(oldest) > NETWORK_TELEMETRY_WINDOW {
let _ = self.failures.pop_front();
} else {
break;
}
}
}
}
#[cfg(not(coverage))]
fn network_jitter_ms(samples: &VecDeque<(Instant, f32)>) -> f32 {
if samples.len() < 2 {
return 0.0;
}
let mean = samples.iter().map(|(_, value)| *value).sum::<f32>() / samples.len() as f32;
samples
.iter()
.map(|(_, value)| (value - mean).abs())
.sum::<f32>()
/ samples.len() as f32
}
#[cfg(not(coverage))]
fn request_capture_power_refresh(
power_tx: std::sync::mpsc::Sender<PowerMessage>,
@ -96,11 +177,11 @@ fn request_handshake_caps(
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
let caps = match runtime {
Ok(runtime) => runtime.block_on(negotiate(&server_addr)),
Err(_) => PeerCaps::default(),
let probe = match runtime {
Ok(runtime) => runtime.block_on(probe(&server_addr)),
Err(_) => HandshakeProbe::default(),
};
let _ = caps_tx.send(CapsMessage::Refresh(caps));
let _ = caps_tx.send(CapsMessage::Refresh(probe));
});
}
@ -135,6 +216,60 @@ fn refresh_eye_feed_controls(
}
}
#[cfg(not(coverage))]
fn record_diagnostics_sample(
widgets: &super::ui_components::LauncherWidgets,
state: &LauncherState,
preview: Option<&super::preview::LauncherPreview>,
network: NetworkSnapshot,
) {
let left_metrics = preview
.and_then(|preview| {
preview.snapshot_metrics(
0,
match state.display_surface(0) {
DisplaySurface::Preview => super::preview::PreviewSurface::Inline,
DisplaySurface::Window => super::preview::PreviewSurface::Window,
},
)
})
.unwrap_or_default();
let right_metrics = preview
.and_then(|preview| {
preview.snapshot_metrics(
1,
match state.display_surface(1) {
DisplaySurface::Preview => super::preview::PreviewSurface::Inline,
DisplaySurface::Window => super::preview::PreviewSurface::Window,
},
)
})
.unwrap_or_default();
widgets
.diagnostics_log
.borrow_mut()
.record(PerformanceSample {
rtt_ms: network.rtt_ms,
jitter_ms: network.jitter_ms,
input_latency_ms: network.rtt_ms * 0.5,
probe_loss_pct: network.probe_loss_pct,
video_loss_pct: left_metrics
.packet_loss_pct
.max(right_metrics.packet_loss_pct),
left_receive_fps: left_metrics.receive_fps,
left_present_fps: left_metrics.present_fps,
left_server_fps: left_metrics.server_fps,
right_receive_fps: right_metrics.receive_fps,
right_present_fps: right_metrics.present_fps,
right_server_fps: right_metrics.server_fps,
dropped_frames: left_metrics
.dropped_frames
.saturating_add(right_metrics.dropped_frames),
queue_depth: left_metrics.queue_depth.max(right_metrics.queue_depth),
});
}
#[cfg(not(coverage))]
fn largest_monitor_size() -> (u32, u32) {
let (width, height) = enumerate_monitors()
@ -385,6 +520,12 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let (relay_tx, relay_rx) = std::sync::mpsc::channel::<RelayMessage>();
let relay_request_in_flight = Rc::new(Cell::new(false));
let (caps_tx, caps_rx) = std::sync::mpsc::channel::<CapsMessage>();
let caps_request_in_flight = Rc::new(Cell::new(false));
let diagnostics_network = Rc::new(RefCell::new(NetworkTelemetry::default()));
let next_diagnostics_probe =
Rc::new(Cell::new(Instant::now() + Duration::from_millis(250)));
let next_diagnostics_sample =
Rc::new(Cell::new(Instant::now() + Duration::from_secs(1)));
let (clipboard_tx, clipboard_rx) = std::sync::mpsc::channel::<ClipboardMessage>();
let (log_tx, log_rx) = std::sync::mpsc::channel::<String>();
@ -470,6 +611,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
selected_server_addr(&server_entry, server_addr.as_ref()),
Duration::ZERO,
);
caps_request_in_flight.set(true);
request_handshake_caps(
caps_tx.clone(),
selected_server_addr(&server_entry, server_addr.as_ref()),
@ -486,6 +628,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let preview = preview.clone();
let power_tx = power_tx.clone();
let caps_tx = caps_tx.clone();
let caps_request_in_flight = Rc::clone(&caps_request_in_flight);
server_entry.connect_changed(move |_| {
let server_addr =
selected_server_addr(&server_entry_read, server_addr_fallback.as_ref());
@ -503,6 +646,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
server_addr.clone(),
Duration::from_millis(150),
);
caps_request_in_flight.set(true);
request_handshake_caps(caps_tx.clone(), server_addr, Duration::from_millis(150));
});
}
@ -1222,6 +1366,11 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let relay_request_in_flight = Rc::clone(&relay_request_in_flight);
let preview = preview.clone();
let power_tx = power_tx.clone();
let caps_tx = caps_tx.clone();
let caps_request_in_flight = Rc::clone(&caps_request_in_flight);
let diagnostics_network = Rc::clone(&diagnostics_network);
let next_diagnostics_probe = Rc::clone(&next_diagnostics_probe);
let next_diagnostics_sample = Rc::clone(&next_diagnostics_sample);
let log_tx = log_tx.clone();
glib::timeout_add_local(Duration::from_millis(180), move || {
let child_running = reap_exited_child(&child_proc);
@ -1435,10 +1584,18 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
}
while let Ok(message) = caps_rx.try_recv() {
caps_request_in_flight.set(false);
match message {
CapsMessage::Refresh(caps) => {
CapsMessage::Refresh(probe_result) => {
diagnostics_network.borrow_mut().record(&probe_result);
let caps = probe_result.caps;
{
let mut state = state.borrow_mut();
if probe_result.reachable {
state.set_server_available(true);
} else if child_proc.borrow().is_none() {
state.set_server_available(false);
}
state.set_server_version(caps.server_version.clone());
}
if let (Some(width), Some(height)) =
@ -1490,6 +1647,26 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
}
}
let now = Instant::now();
if now >= next_diagnostics_probe.get() && !caps_request_in_flight.get() {
caps_request_in_flight.set(true);
let server_addr =
selected_server_addr(&server_entry, server_addr_fallback.as_ref());
request_handshake_caps(caps_tx.clone(), server_addr, Duration::ZERO);
next_diagnostics_probe.set(now + Duration::from_secs(2));
}
if now >= next_diagnostics_sample.get() {
let network = diagnostics_network.borrow_mut().snapshot();
record_diagnostics_sample(
&widgets,
&state.borrow(),
preview.as_ref().map(|preview| preview.as_ref()),
network,
);
next_diagnostics_sample.set(now + Duration::from_secs(1));
}
let child_running = child_proc.borrow().is_some();
refresh_launcher_ui(&widgets, &state.borrow(), child_running);
refresh_test_buttons(&widgets, &mut tests.borrow_mut());

View File

@ -12,7 +12,15 @@ message MonitorRequest {
uint32 requested_height = 4;
uint32 requested_fps = 5;
}
message VideoPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; }
message VideoPacket {
uint32 id = 1;
uint64 pts = 2;
bytes data = 3;
uint64 seq = 4;
uint32 effective_fps = 5;
uint64 dropped_total = 6;
uint32 queue_depth = 7;
}
message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; }
message ResetUsbReply { bool ok = 1; } // true = success

View File

@ -322,6 +322,7 @@ pub async fn eye_ball_with_request(
id: id.min(1),
pts: 0,
data: vec![0, 0, 0, 1, 0x65, 0x88, 0x84],
..Default::default()
}));
Ok(VideoStream {
@ -381,10 +382,12 @@ pub async fn eye_ball_with_request(
let effective_fps = Arc::new(AtomicU32::new(target_fps));
let dropped_window = Arc::new(AtomicU64::new(0));
let dropped_total = Arc::new(AtomicU64::new(0));
let sent_window = Arc::new(AtomicU64::new(0));
let last_adjust_sec = Arc::new(AtomicU64::new(0));
let wait_for_idr = Arc::new(AtomicBool::new(false));
let last_sent = Arc::new(AtomicU64::new(0));
let packet_seq = Arc::new(AtomicU64::new(0));
let queue_buffers = env_u32("LESAVKA_EYE_QUEUE_BUFFERS", 8).max(1);
let appsink_buffers = env_u32("LESAVKA_EYE_APPSINK_BUFFERS", 8).max(1);
@ -461,6 +464,9 @@ pub async fn eye_ball_with_request(
}
let eye_name = eye.to_string();
let dropped_total_for_cb = Arc::clone(&dropped_total);
let packet_seq_for_cb = Arc::clone(&packet_seq);
let effective_fps_for_cb = Arc::clone(&effective_fps);
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
@ -546,7 +552,16 @@ pub async fn eye_ball_with_request(
let data = map.as_slice().to_vec();
let size = data.len();
let pkt = VideoPacket { id, pts: pts_us, data };
let seq = packet_seq_for_cb.fetch_add(1, Ordering::Relaxed) + 1;
let pkt = VideoPacket {
id,
pts: pts_us,
data,
seq,
effective_fps: effective_fps_for_cb.load(Ordering::Relaxed).max(1),
dropped_total: dropped_total_for_cb.load(Ordering::Relaxed),
queue_depth: (chan_capacity.saturating_sub(tx.capacity())) as u32,
};
match tx.try_send(Ok(pkt)) {
Ok(_) => {
sent_window.fetch_add(1, Ordering::Relaxed);
@ -557,6 +572,7 @@ pub async fn eye_ball_with_request(
}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
dropped_window.fetch_add(1, Ordering::Relaxed);
dropped_total_for_cb.fetch_add(1, Ordering::Relaxed);
wait_for_idr.store(true, Ordering::Relaxed);
static DROP_CNT: AtomicU64 = AtomicU64::new(0);
let dropped = DROP_CNT.fetch_add(1, Ordering::Relaxed);

View File

@ -107,6 +107,7 @@ mod video_include_contract {
id: 0,
pts: 5,
data: vec![0, 0, 0, 1, 0x67],
..Default::default()
});
}
});
@ -198,6 +199,7 @@ exit 0
id: 1,
pts: 12_345,
data: vec![0, 0, 0, 1, 0x65],
..Default::default()
});
}
@ -229,11 +231,13 @@ exit 0
id: 0,
pts: 100,
data: vec![0, 0, 0, 1, 0x65],
..Default::default()
});
window.push_packet(VideoPacket {
id: 1,
pts: 101,
data: vec![0, 0, 0, 1, 0x67],
..Default::default()
});
}
Err(err) => {

View File

@ -130,6 +130,7 @@ fn monitor_window_constructor_and_push_are_stable() {
id: 0,
pts: 0,
data: vec![0, 0, 0, 1, 0x65],
..Default::default()
});
}
Err(err) => {

View File

@ -394,6 +394,7 @@ mod server_main_binary_extra {
id: 2,
pts: 1,
data: vec![0, 1, 2, 3],
..Default::default()
})
.await
.expect("send camera packet");
@ -478,6 +479,7 @@ mod server_main_binary_extra {
id: 2,
pts: 42,
data: vec![9, 8, 7],
..Default::default()
};
let mut guarded = GuardedVideoStream {
inner: stream::iter(vec![Ok(packet.clone())]),

View File

@ -39,6 +39,7 @@ mod video_include_contract {
id: 1,
pts: 42,
data: vec![1, 2, 3, 4],
..Default::default()
}))
.await
.expect("send packet");

View File

@ -32,6 +32,7 @@ fn webcam_sink_constructor_is_stable_for_missing_uvc_device() {
id: 2,
pts: 0,
data: vec![0xFF, 0xD8, 0xFF, 0xD9],
..Default::default()
}),
Err(err) => assert!(!err.to_string().trim().is_empty()),
}
@ -47,6 +48,7 @@ fn hdmi_sink_constructor_and_push_are_stable_with_override() {
id: 2,
pts: 0,
data: vec![0, 0, 0, 1, 0x65],
..Default::default()
}),
Err(err) => assert!(!err.to_string().trim().is_empty()),
}
@ -63,6 +65,7 @@ fn camera_relay_hdmi_constructor_and_feed_are_stable() {
id: 2,
pts: 123,
data: vec![0xFF, 0xD8, 0xFF, 0xD9],
..Default::default()
}),
Err(err) => assert!(!err.to_string().trim().is_empty()),
}
@ -78,6 +81,7 @@ fn camera_relay_uvc_constructor_is_stable_for_missing_device() {
id: 2,
pts: 321,
data: vec![0xFF, 0xD8, 0xFF, 0xD9],
..Default::default()
}),
Err(err) => assert!(!err.to_string().trim().is_empty()),
}
@ -92,6 +96,7 @@ fn webcam_sink_h264_constructor_path_is_stable() {
id: 3,
pts: 55,
data: vec![0, 0, 0, 1, 0x65],
..Default::default()
}),
Err(err) => assert!(!err.to_string().trim().is_empty()),
}
@ -107,6 +112,7 @@ fn hdmi_sink_mjpeg_constructor_path_is_stable() {
id: 4,
pts: 99,
data: vec![0xFF, 0xD8, 0xFF, 0xD9],
..Default::default()
}),
Err(err) => assert!(!err.to_string().trim().is_empty()),
}

View File

@ -80,6 +80,7 @@ mod video_sinks_include_contract {
id: 8,
pts: 1,
data: vec![0xFF, 0xD8, 0xFF, 0xD9],
..Default::default()
});
}
});
@ -95,6 +96,7 @@ mod video_sinks_include_contract {
id: 9,
pts: 2,
data: vec![0xFF, 0xD8, 0xFF, 0xD9],
..Default::default()
});
}
}
@ -109,6 +111,7 @@ mod video_sinks_include_contract {
id: 3,
pts: 3,
data: vec![0, 0, 0, 1, 0x65, 0x88, 0x84],
..Default::default()
});
}
});