fix(media): bound upstream latency and restore idle eye placeholder
This commit is contained in:
parent
e0b2b70b29
commit
ce11632c89
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_client"
|
||||
version = "0.12.3"
|
||||
version = "0.12.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
@ -1676,7 +1676,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_common"
|
||||
version = "0.12.3"
|
||||
version = "0.12.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
@ -1688,7 +1688,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_server"
|
||||
version = "0.12.3"
|
||||
version = "0.12.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
|
||||
@ -4,7 +4,7 @@ path = "src/main.rs"
|
||||
|
||||
[package]
|
||||
name = "lesavka_client"
|
||||
version = "0.12.3"
|
||||
version = "0.12.4"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@ -19,8 +19,7 @@ use winit::{
|
||||
};
|
||||
|
||||
use lesavka_common::lesavka::{
|
||||
AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
|
||||
relay_client::RelayClient,
|
||||
Empty, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, relay_client::RelayClient,
|
||||
};
|
||||
|
||||
#[cfg(not(coverage))]
|
||||
|
||||
@ -58,6 +58,10 @@ impl LesavkaClientApp {
|
||||
let caps = handshake::negotiate(&self.server_addr).await;
|
||||
tracing::info!("🤝 server capabilities = {:?}", caps);
|
||||
let camera_cfg = app_support::camera_config_from_caps(&caps);
|
||||
let uplink_telemetry = crate::uplink_telemetry::UplinkTelemetryPublisher::from_env(
|
||||
caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(),
|
||||
caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(),
|
||||
);
|
||||
|
||||
/*────────── persistent gRPC channels ──────────*/
|
||||
let hid_ep = Channel::from_shared(self.server_addr.clone())?
|
||||
@ -223,6 +227,8 @@ impl LesavkaClientApp {
|
||||
}
|
||||
let ep = vid_ep.clone();
|
||||
let cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok();
|
||||
let cam_telemetry =
|
||||
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera);
|
||||
tokio::spawn(async move {
|
||||
let result = tokio::task::spawn_blocking(move || {
|
||||
CameraCapture::new(cam_source.as_deref(), camera_cfg)
|
||||
@ -231,14 +237,20 @@ impl LesavkaClientApp {
|
||||
match result {
|
||||
Ok(Ok(cam)) => {
|
||||
let cam = Arc::new(cam);
|
||||
tokio::spawn(Self::cam_loop(ep, cam));
|
||||
tokio::spawn(Self::cam_loop(ep, cam, cam_telemetry.clone()));
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
cam_telemetry.record_disconnect(format!(
|
||||
"webcam uplink setup failed: {err:#}"
|
||||
));
|
||||
warn!(
|
||||
"📸 webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
cam_telemetry.record_disconnect(format!(
|
||||
"webcam uplink setup task failed: {err}"
|
||||
));
|
||||
warn!(
|
||||
"📸 webcam uplink setup task failed before StreamCamera could start: {err}"
|
||||
);
|
||||
@ -248,19 +260,27 @@ impl LesavkaClientApp {
|
||||
}
|
||||
if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() {
|
||||
let ep = vid_ep.clone();
|
||||
let mic_telemetry =
|
||||
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone);
|
||||
tokio::spawn(async move {
|
||||
let result = tokio::task::spawn_blocking(MicrophoneCapture::new).await;
|
||||
match result {
|
||||
Ok(Ok(mic)) => {
|
||||
let mic = Arc::new(mic);
|
||||
tokio::spawn(Self::voice_loop(ep, mic));
|
||||
tokio::spawn(Self::voice_loop(ep, mic, mic_telemetry.clone()));
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
mic_telemetry.record_disconnect(format!(
|
||||
"microphone uplink setup failed: {err:#}"
|
||||
));
|
||||
warn!(
|
||||
"🎤 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}"
|
||||
);
|
||||
}
|
||||
Err(err) => {
|
||||
mic_telemetry.record_disconnect(format!(
|
||||
"microphone uplink setup task failed: {err}"
|
||||
));
|
||||
warn!(
|
||||
"🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}"
|
||||
);
|
||||
|
||||
@ -1,33 +1,71 @@
|
||||
impl LesavkaClientApp {
|
||||
/*──────────────── mic stream ─────────────────*/
|
||||
#[cfg(not(coverage))]
|
||||
async fn voice_loop(ep: Channel, mic: Arc<MicrophoneCapture>) {
|
||||
async fn voice_loop(
|
||||
ep: Channel,
|
||||
mic: Arc<MicrophoneCapture>,
|
||||
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
|
||||
) {
|
||||
let mut delay = Duration::from_secs(1);
|
||||
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
|
||||
loop {
|
||||
let mut cli = RelayClient::new(ep.clone());
|
||||
|
||||
// 1. create a Tokio MPSC channel
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<AudioPacket>(256);
|
||||
loop {
|
||||
telemetry.record_reconnect_attempt();
|
||||
let mut cli = RelayClient::new(ep.clone());
|
||||
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE);
|
||||
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
|
||||
|
||||
// 2. spawn a real thread that does the blocking `pull()`
|
||||
let mic_clone = mic.clone();
|
||||
let telemetry_thread = telemetry.clone();
|
||||
let queue_thread = queue.clone();
|
||||
std::thread::spawn(move || {
|
||||
let mut age_tracker = PacketAgeTracker::default();
|
||||
while stop_rx.try_recv().is_err() {
|
||||
if let Some(pkt) = mic_clone.pull() {
|
||||
trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len());
|
||||
let _ = tx.blocking_send(pkt);
|
||||
let enqueue_age = age_tracker.packet_age(pkt.pts);
|
||||
let stats = queue_thread.push(pkt, enqueue_age);
|
||||
if stats.dropped_queue_full > 0 {
|
||||
telemetry_thread.record_queue_full_drop(stats.dropped_queue_full);
|
||||
}
|
||||
telemetry_thread.record_enqueue(
|
||||
queue_depth_u32(stats.queue_depth),
|
||||
duration_ms(enqueue_age),
|
||||
0.0,
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// 3. turn `rx` into an async stream for gRPC
|
||||
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
|
||||
let queue_stream = queue.clone();
|
||||
let telemetry_stream = telemetry.clone();
|
||||
let outbound = async_stream::stream! {
|
||||
loop {
|
||||
let next = queue_stream.pop_fresh().await;
|
||||
if next.dropped_stale > 0 {
|
||||
telemetry_stream.record_stale_drop(next.dropped_stale);
|
||||
}
|
||||
if let Some(packet) = next.packet {
|
||||
telemetry_stream.record_streamed(
|
||||
queue_depth_u32(next.queue_depth),
|
||||
duration_ms(next.delivery_age),
|
||||
);
|
||||
yield packet;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match cli.stream_microphone(Request::new(outbound)).await {
|
||||
Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {},
|
||||
Ok(mut resp) => {
|
||||
delay = Duration::from_secs(1);
|
||||
telemetry.record_connected();
|
||||
while resp.get_mut().message().await.transpose().is_some() {}
|
||||
telemetry.record_disconnect("microphone uplink stream ended");
|
||||
}
|
||||
Err(e) => {
|
||||
// first failure → warn, subsequent ones → debug
|
||||
telemetry.record_disconnect(format!("microphone uplink connect failed: {e}"));
|
||||
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
|
||||
warn!("❌🎤 connect failed: {e}");
|
||||
warn!("⚠️🎤 further microphone‑stream failures will be logged at DEBUG");
|
||||
@ -37,6 +75,8 @@ impl LesavkaClientApp {
|
||||
delay = app_support::next_delay(delay);
|
||||
}
|
||||
}
|
||||
|
||||
queue.close();
|
||||
let _ = stop_tx.send(());
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
@ -44,15 +84,26 @@ impl LesavkaClientApp {
|
||||
|
||||
/*──────────────── cam stream ───────────────────*/
|
||||
#[cfg(not(coverage))]
|
||||
async fn cam_loop(ep: Channel, cam: Arc<CameraCapture>) {
|
||||
async fn cam_loop(
|
||||
ep: Channel,
|
||||
cam: Arc<CameraCapture>,
|
||||
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
|
||||
) {
|
||||
let mut delay = Duration::from_secs(1);
|
||||
|
||||
loop {
|
||||
telemetry.record_reconnect_attempt();
|
||||
let mut cli = RelayClient::new(ep.clone());
|
||||
let (tx, rx) = tokio::sync::mpsc::channel::<VideoPacket>(256);
|
||||
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE);
|
||||
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
|
||||
|
||||
let cam_worker = std::thread::spawn({
|
||||
let cam = cam.clone();
|
||||
move || loop {
|
||||
let telemetry = telemetry.clone();
|
||||
let queue = queue.clone();
|
||||
move || {
|
||||
let mut age_tracker = PacketAgeTracker::default();
|
||||
loop {
|
||||
if stop_rx.try_recv().is_ok() {
|
||||
break;
|
||||
}
|
||||
@ -60,40 +111,115 @@ impl LesavkaClientApp {
|
||||
std::thread::sleep(Duration::from_millis(5));
|
||||
continue;
|
||||
};
|
||||
// TRACE every 120 frames only
|
||||
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
|
||||
static CNT: std::sync::atomic::AtomicU64 =
|
||||
std::sync::atomic::AtomicU64::new(0);
|
||||
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
if n < 10 || n.is_multiple_of(120) {
|
||||
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
|
||||
}
|
||||
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
|
||||
if tx.blocking_send(pkt).is_err() {
|
||||
break;
|
||||
let enqueue_age = age_tracker.packet_age(pkt.pts);
|
||||
let stats = queue.push(pkt, enqueue_age);
|
||||
if stats.dropped_queue_full > 0 {
|
||||
telemetry.record_queue_full_drop(stats.dropped_queue_full);
|
||||
}
|
||||
telemetry.record_enqueue(
|
||||
queue_depth_u32(stats.queue_depth),
|
||||
duration_ms(enqueue_age),
|
||||
0.0,
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
|
||||
let queue_stream = queue.clone();
|
||||
let telemetry_stream = telemetry.clone();
|
||||
let outbound = async_stream::stream! {
|
||||
loop {
|
||||
let next = queue_stream.pop_fresh().await;
|
||||
if next.dropped_stale > 0 {
|
||||
telemetry_stream.record_stale_drop(next.dropped_stale);
|
||||
}
|
||||
if let Some(packet) = next.packet {
|
||||
telemetry_stream.record_streamed(
|
||||
queue_depth_u32(next.queue_depth),
|
||||
duration_ms(next.delivery_age),
|
||||
);
|
||||
yield packet;
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
match cli.stream_camera(Request::new(outbound)).await {
|
||||
Ok(mut resp) => {
|
||||
delay = Duration::from_secs(1); // got a stream → reset
|
||||
delay = Duration::from_secs(1);
|
||||
telemetry.record_connected();
|
||||
while resp.get_mut().message().await.transpose().is_some() {}
|
||||
telemetry.record_disconnect("camera uplink stream ended");
|
||||
}
|
||||
Err(e) if e.code() == tonic::Code::Unimplemented => {
|
||||
tracing::warn!("📸 server does not support StreamCamera – giving up");
|
||||
telemetry.record_disconnect("camera uplink unavailable on server");
|
||||
queue.close();
|
||||
let _ = stop_tx.send(());
|
||||
let _ = cam_worker.join();
|
||||
return; // stop the task completely (#3)
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
telemetry.record_disconnect(format!("camera uplink connect failed: {e}"));
|
||||
tracing::warn!("❌📸 connect failed: {e:?}");
|
||||
delay = app_support::next_delay(delay); // back-off (#2)
|
||||
delay = app_support::next_delay(delay);
|
||||
}
|
||||
}
|
||||
|
||||
queue.close();
|
||||
let _ = stop_tx.send(());
|
||||
let _ = cam_worker.join();
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[cfg(not(coverage))]
|
||||
const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
|
||||
crate::uplink_fresh_queue::FreshQueueConfig {
|
||||
capacity: 8,
|
||||
max_age: Duration::from_millis(350),
|
||||
};
|
||||
|
||||
#[cfg(not(coverage))]
|
||||
const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
|
||||
crate::uplink_fresh_queue::FreshQueueConfig {
|
||||
capacity: 16,
|
||||
max_age: Duration::from_millis(400),
|
||||
};
|
||||
|
||||
#[cfg(not(coverage))]
|
||||
fn queue_depth_u32(depth: usize) -> u32 {
|
||||
depth.try_into().unwrap_or(u32::MAX)
|
||||
}
|
||||
|
||||
#[cfg(not(coverage))]
|
||||
fn duration_ms(duration: Duration) -> f32 {
|
||||
duration.as_secs_f32() * 1_000.0
|
||||
}
|
||||
|
||||
#[cfg(not(coverage))]
|
||||
#[derive(Default)]
|
||||
struct PacketAgeTracker {
|
||||
origin: Option<Instant>,
|
||||
}
|
||||
|
||||
#[cfg(not(coverage))]
|
||||
impl PacketAgeTracker {
|
||||
fn packet_age(&mut self, pts_us: u64) -> Duration {
|
||||
let pts = Duration::from_micros(pts_us);
|
||||
let now = Instant::now();
|
||||
let origin = self
|
||||
.origin
|
||||
.get_or_insert_with(|| now.checked_sub(pts).unwrap_or(now));
|
||||
now.saturating_duration_since(*origin + pts)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt::Write as _;
|
||||
|
||||
use crate::uplink_telemetry::UpstreamStreamTelemetry;
|
||||
|
||||
use super::{
|
||||
devices::CameraMode,
|
||||
state::{CaptureSizeChoice, FeedSourcePreset, InputRouting, LauncherState, ViewMode},
|
||||
@ -48,6 +50,8 @@ pub struct PerformanceSample {
|
||||
pub right_stream_caps_label: String,
|
||||
pub right_decoded_caps_label: String,
|
||||
pub right_rendered_caps_label: String,
|
||||
pub upstream_camera: UpstreamStreamTelemetry,
|
||||
pub upstream_microphone: UpstreamStreamTelemetry,
|
||||
pub dropped_frames: u64,
|
||||
pub queue_depth: u32,
|
||||
}
|
||||
@ -154,6 +158,8 @@ pub struct SnapshotReport {
|
||||
pub media_channels: MediaChannelState,
|
||||
pub audio_gain_label: String,
|
||||
pub mic_gain_label: String,
|
||||
pub upstream_camera: UpstreamStreamTelemetry,
|
||||
pub upstream_microphone: UpstreamStreamTelemetry,
|
||||
pub selected_keyboard: Option<String>,
|
||||
pub selected_mouse: Option<String>,
|
||||
pub status: String,
|
||||
|
||||
@ -128,6 +128,53 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec<Strin
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
if sample.upstream_camera.enabled
|
||||
&& (sample.upstream_camera.latest_enqueue_age_ms >= 120.0
|
||||
|| sample.upstream_camera.enqueue_age_peak_ms >= 250.0
|
||||
|| sample.upstream_camera.enqueue_block_peak_ms >= 40.0)
|
||||
{
|
||||
items.push(
|
||||
"The webcam uplink queue is aging packets before they even leave the client. That points at backlog inside the relay child, so freshness-first dropping should help more than extra bitrate."
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
if sample.upstream_microphone.enabled
|
||||
&& (sample.upstream_microphone.latest_enqueue_age_ms >= 80.0
|
||||
|| sample.upstream_microphone.enqueue_age_peak_ms >= 180.0
|
||||
|| sample.upstream_microphone.enqueue_block_peak_ms >= 25.0)
|
||||
{
|
||||
items.push(
|
||||
"The microphone uplink queue is aging live audio before transport. That is a direct path to lip-sync drift, so stale audio needs to be dropped instead of drained."
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
if sample.upstream_camera.reconnect_count >= 3 || sample.upstream_microphone.reconnect_count >= 3
|
||||
{
|
||||
items.push(
|
||||
"The upstream media loops have already reconnected several times during this session. Treat repeated connect churn as part of the lag budget, not just a cosmetic log problem."
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
if sample.upstream_camera.enabled
|
||||
&& (sample.upstream_camera.delivery_age_peak_ms >= 250.0
|
||||
|| sample.upstream_camera.dropped_queue_full_packets > 0
|
||||
|| sample.upstream_camera.dropped_stale_packets > 0)
|
||||
{
|
||||
items.push(
|
||||
"The webcam uplink is now choosing freshness over completeness. If delivery age or stale-drop counts keep climbing, the source path is outrunning the live-call budget and needs more than bitrate tweaks."
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
if sample.upstream_microphone.enabled
|
||||
&& (sample.upstream_microphone.delivery_age_peak_ms >= 180.0
|
||||
|| sample.upstream_microphone.dropped_queue_full_packets > 0
|
||||
|| sample.upstream_microphone.dropped_stale_packets > 0)
|
||||
{
|
||||
items.push(
|
||||
"The microphone uplink is dropping or aging live chunks to stay current. A few drops are healthier than minutes of lag, but persistent churn means the upstream path is still unstable."
|
||||
.to_string(),
|
||||
);
|
||||
}
|
||||
if sample.client_process_cpu_pct >= 85.0 {
|
||||
items.push(if hardware_decode_active {
|
||||
"Client process CPU is high even though hardware decode is active. If motion still looks rough, favor lighter breakout layouts or a cheaper source mode before adding more bitrate."
|
||||
|
||||
@ -214,6 +214,12 @@ impl SnapshotReport {
|
||||
},
|
||||
audio_gain_label: state.audio_gain_label(),
|
||||
mic_gain_label: state.mic_gain_label(),
|
||||
upstream_camera: latest
|
||||
.map(|sample| sample.upstream_camera.clone())
|
||||
.unwrap_or_default(),
|
||||
upstream_microphone: latest
|
||||
.map(|sample| sample.upstream_microphone.clone())
|
||||
.unwrap_or_default(),
|
||||
selected_keyboard: state.devices.keyboard.clone(),
|
||||
selected_mouse: state.devices.mouse.clone(),
|
||||
status: state.status_line(),
|
||||
@ -334,6 +340,16 @@ impl SnapshotReport {
|
||||
self.mic_gain_label,
|
||||
self.media_channels.microphone
|
||||
);
|
||||
let _ = writeln!(
|
||||
text,
|
||||
" uplink camera: {}",
|
||||
uplink_summary(&self.upstream_camera)
|
||||
);
|
||||
let _ = writeln!(
|
||||
text,
|
||||
" uplink microphone: {}",
|
||||
uplink_summary(&self.upstream_microphone)
|
||||
);
|
||||
let _ = writeln!(
|
||||
text,
|
||||
" keyboard: {}",
|
||||
@ -388,6 +404,12 @@ impl SnapshotReport {
|
||||
sample.right_server_send_gap_peak_ms,
|
||||
sample.right_server_queue_peak
|
||||
);
|
||||
let _ = writeln!(
|
||||
text,
|
||||
" uplink: cam={} mic={}",
|
||||
uplink_summary(&sample.upstream_camera),
|
||||
uplink_summary(&sample.upstream_microphone)
|
||||
);
|
||||
}
|
||||
}
|
||||
let _ = writeln!(text);
|
||||
@ -408,3 +430,36 @@ impl SnapshotReport {
|
||||
text
|
||||
}
|
||||
}
|
||||
|
||||
fn uplink_summary(stream: &crate::uplink_telemetry::UpstreamStreamTelemetry) -> String {
|
||||
if !stream.enabled {
|
||||
return "disabled".to_string();
|
||||
}
|
||||
let connection = if stream.connected {
|
||||
"live"
|
||||
} else if stream.reconnect_count > 0 {
|
||||
"reconnecting"
|
||||
} else {
|
||||
"idle"
|
||||
};
|
||||
let error = if stream.last_error.is_empty() {
|
||||
"ok".to_string()
|
||||
} else {
|
||||
stream.last_error.clone()
|
||||
};
|
||||
format!(
|
||||
"{connection} queue={}/{} enq-age={:.0}/{:.0}ms delivery={:.0}/{:.0}ms block-peak={:.0}ms reconnects={} streamed={} drops(total/full/stale)={}/{}/{} error={error}",
|
||||
stream.queue_depth,
|
||||
stream.queue_peak,
|
||||
stream.latest_enqueue_age_ms,
|
||||
stream.enqueue_age_peak_ms,
|
||||
stream.latest_delivery_age_ms,
|
||||
stream.delivery_age_peak_ms,
|
||||
stream.enqueue_block_peak_ms,
|
||||
stream.reconnect_count,
|
||||
stream.packets_streamed,
|
||||
stream.dropped_packets,
|
||||
stream.dropped_queue_full_packets,
|
||||
stream.dropped_stale_packets
|
||||
)
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ use super::*;
|
||||
use crate::launcher::state::{
|
||||
CaptureSizePreset, DeviceSelection, DisplaySurface, FeedSourcePreset, LauncherState,
|
||||
};
|
||||
use crate::uplink_telemetry::UpstreamStreamTelemetry;
|
||||
|
||||
fn sample(n: u64) -> PerformanceSample {
|
||||
PerformanceSample {
|
||||
@ -50,6 +51,42 @@ fn sample(n: u64) -> PerformanceSample {
|
||||
"video/x-raw, format=(string)NV12, width=(int)1920, height=(int)1080".to_string(),
|
||||
right_rendered_caps_label:
|
||||
"video/x-raw, format=(string)RGBA, width=(int)1920, height=(int)1080".to_string(),
|
||||
upstream_camera: UpstreamStreamTelemetry {
|
||||
enabled: true,
|
||||
connected: true,
|
||||
reconnect_count: 2,
|
||||
queue_depth: 3,
|
||||
queue_peak: 7,
|
||||
latest_enqueue_age_ms: 14.0,
|
||||
enqueue_age_peak_ms: 48.0,
|
||||
enqueue_block_peak_ms: 5.0,
|
||||
packets_enqueued: 120,
|
||||
packets_streamed: 118,
|
||||
dropped_packets: 0,
|
||||
dropped_queue_full_packets: 0,
|
||||
dropped_stale_packets: 0,
|
||||
latest_delivery_age_ms: 22.0,
|
||||
delivery_age_peak_ms: 61.0,
|
||||
last_error: String::new(),
|
||||
},
|
||||
upstream_microphone: UpstreamStreamTelemetry {
|
||||
enabled: true,
|
||||
connected: true,
|
||||
reconnect_count: 1,
|
||||
queue_depth: 2,
|
||||
queue_peak: 5,
|
||||
latest_enqueue_age_ms: 11.0,
|
||||
enqueue_age_peak_ms: 22.0,
|
||||
enqueue_block_peak_ms: 3.0,
|
||||
packets_enqueued: 220,
|
||||
packets_streamed: 216,
|
||||
dropped_packets: 1,
|
||||
dropped_queue_full_packets: 1,
|
||||
dropped_stale_packets: 0,
|
||||
latest_delivery_age_ms: 19.0,
|
||||
delivery_age_peak_ms: 37.0,
|
||||
last_error: String::new(),
|
||||
},
|
||||
dropped_frames: n,
|
||||
queue_depth: n as u32,
|
||||
}
|
||||
@ -119,6 +156,8 @@ fn snapshot_report_contains_state_fields_and_samples() {
|
||||
assert!(report.left_stream_caps_label.contains("video/x-h264"));
|
||||
assert!(report.left_decoded_caps_label.contains("video/x-raw"));
|
||||
assert!(report.left_rendered_caps_label.contains("video/x-raw"));
|
||||
assert_eq!(report.upstream_camera.queue_peak, 7);
|
||||
assert_eq!(report.upstream_microphone.reconnect_count, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -185,6 +224,8 @@ fn snapshot_text_mentions_versions_profiles_and_recommendations() {
|
||||
assert!(text.contains("decoded caps:"));
|
||||
assert!(text.contains("rendered caps:"));
|
||||
assert!(text.contains("media staging"));
|
||||
assert!(text.contains("uplink camera:"));
|
||||
assert!(text.contains("uplink microphone:"));
|
||||
assert!(text.contains("current UI state"));
|
||||
assert!(text.contains("recommendations"));
|
||||
}
|
||||
@ -230,10 +271,66 @@ fn snapshot_text_renders_recent_samples_and_notes() {
|
||||
assert!(text.contains("server: unknown (reachable)"));
|
||||
assert!(text.contains("rtt=23.0ms"));
|
||||
assert!(text.contains("server=lx264enc:42/48/4"));
|
||||
assert!(text.contains("uplink: cam=live queue=3/7"));
|
||||
assert!(text.contains("notes"));
|
||||
assert!(text.contains("operator changed camera quality during the run"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn recommendations_call_out_upstream_queue_age_and_reconnect_churn() {
|
||||
let mut log = DiagnosticsLog::new(1);
|
||||
let mut stressed = sample(9);
|
||||
stressed.upstream_camera.latest_enqueue_age_ms = 180.0;
|
||||
stressed.upstream_camera.enqueue_age_peak_ms = 320.0;
|
||||
stressed.upstream_camera.enqueue_block_peak_ms = 55.0;
|
||||
stressed.upstream_camera.delivery_age_peak_ms = 420.0;
|
||||
stressed.upstream_camera.dropped_stale_packets = 4;
|
||||
stressed.upstream_camera.reconnect_count = 4;
|
||||
stressed.upstream_microphone.latest_enqueue_age_ms = 120.0;
|
||||
stressed.upstream_microphone.enqueue_age_peak_ms = 260.0;
|
||||
stressed.upstream_microphone.enqueue_block_peak_ms = 31.0;
|
||||
stressed.upstream_microphone.delivery_age_peak_ms = 240.0;
|
||||
stressed.upstream_microphone.dropped_queue_full_packets = 2;
|
||||
log.record(stressed);
|
||||
|
||||
let report = SnapshotReport::from_state(
|
||||
&LauncherState::new(),
|
||||
&log,
|
||||
quality_probe_command().to_string(),
|
||||
);
|
||||
|
||||
assert!(
|
||||
report
|
||||
.recommendations
|
||||
.iter()
|
||||
.any(|item| { item.contains("webcam uplink queue is aging packets") })
|
||||
);
|
||||
assert!(
|
||||
report
|
||||
.recommendations
|
||||
.iter()
|
||||
.any(|item| { item.contains("microphone uplink queue is aging live audio") })
|
||||
);
|
||||
assert!(
|
||||
report
|
||||
.recommendations
|
||||
.iter()
|
||||
.any(|item| { item.contains("upstream media loops have already reconnected") })
|
||||
);
|
||||
assert!(
|
||||
report
|
||||
.recommendations
|
||||
.iter()
|
||||
.any(|item| { item.contains("webcam uplink is now choosing freshness") })
|
||||
);
|
||||
assert!(
|
||||
report
|
||||
.recommendations
|
||||
.iter()
|
||||
.any(|item| { item.contains("microphone uplink is dropping or aging live chunks") })
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snapshot_report_uses_effective_mirrored_capture_profile() {
|
||||
let mut state = LauncherState::new();
|
||||
|
||||
@ -289,19 +289,19 @@ fn server_chip_state_tracks_connection_not_just_reachability() {
|
||||
assert_eq!(server_version_label(&state), "-");
|
||||
|
||||
state.set_server_available(true);
|
||||
state.set_server_version(Some("0.12.3".to_string()));
|
||||
state.set_server_version(Some("0.12.4".to_string()));
|
||||
assert_eq!(server_light_state(&state, false), StatusLightState::Live);
|
||||
assert_eq!(server_version_label(&state), "v0.12.3");
|
||||
assert_eq!(server_version_label(&state), "v0.12.4");
|
||||
|
||||
assert_eq!(
|
||||
server_light_state(&state, true),
|
||||
StatusLightState::Connected
|
||||
);
|
||||
|
||||
state.set_server_version(Some("v0.12.4".to_string()));
|
||||
state.set_server_version(Some("v0.12.5".to_string()));
|
||||
assert_eq!(server_light_state(&state, false), StatusLightState::Warning);
|
||||
assert_eq!(server_light_state(&state, true), StatusLightState::Caution);
|
||||
assert_eq!(server_version_label(&state), "v0.12.4");
|
||||
assert_eq!(server_version_label(&state), "v0.12.5");
|
||||
|
||||
state.set_server_version(Some(" ".to_string()));
|
||||
assert_eq!(server_light_state(&state, false), StatusLightState::Idle);
|
||||
@ -413,6 +413,7 @@ fn dock_all_displays_to_preview_closes_popouts_and_resets_surfaces() {
|
||||
&DeviceCatalog::default(),
|
||||
&state_snapshot,
|
||||
);
|
||||
present_and_settle(&view.window);
|
||||
let child_proc = Rc::new(RefCell::new(None::<RelayChild>));
|
||||
|
||||
let left_binding = PreviewBinding::test_stub();
|
||||
@ -505,6 +506,73 @@ fn dock_all_displays_to_preview_handles_reentrant_close_callbacks() {
|
||||
assert_eq!(state.borrow().display_surface(0), DisplaySurface::Preview);
|
||||
}
|
||||
|
||||
#[gtk::test]
|
||||
#[serial]
|
||||
fn dock_display_to_preview_restores_closed_eye_placeholder_when_relay_is_idle() {
|
||||
if gtk::gdk::Display::default().is_none() {
|
||||
return;
|
||||
}
|
||||
|
||||
let app = gtk::Application::builder()
|
||||
.application_id("dev.lesavka.test-dock-placeholder")
|
||||
.build();
|
||||
let _ = app.register(None::<>k::gio::Cancellable>);
|
||||
|
||||
let state = Rc::new(RefCell::new(LauncherState::new()));
|
||||
state
|
||||
.borrow_mut()
|
||||
.set_display_surface(1, DisplaySurface::Window);
|
||||
let state_snapshot = state.borrow().clone();
|
||||
let view = build_launcher_view(
|
||||
&app,
|
||||
"http://127.0.0.1:50051",
|
||||
&DeviceCatalog::default(),
|
||||
&state_snapshot,
|
||||
);
|
||||
let child_proc = Rc::new(RefCell::new(None::<RelayChild>));
|
||||
|
||||
*view.widgets.display_panes[1].preview_binding.borrow_mut() = Some(PreviewBinding::test_stub());
|
||||
view.widgets.display_panes[1]
|
||||
.preview_placeholder
|
||||
.set_visible(false);
|
||||
|
||||
{
|
||||
let mut popouts = view.popouts.borrow_mut();
|
||||
popouts[1] = Some(PopoutWindowHandle {
|
||||
window: gtk::ApplicationWindow::builder()
|
||||
.application(&app)
|
||||
.title("Right")
|
||||
.build(),
|
||||
root: gtk::Box::new(gtk::Orientation::Vertical, 0),
|
||||
frame: gtk::AspectFrame::new(0.5, 0.5, 16.0 / 9.0, false),
|
||||
picture: gtk::Picture::new(),
|
||||
status_label: gtk::Label::new(None),
|
||||
binding: PreviewBinding::test_stub(),
|
||||
});
|
||||
}
|
||||
|
||||
dock_display_to_preview(&state, &child_proc, &view.popouts, &view.widgets, 1);
|
||||
|
||||
present_and_settle(&view.window);
|
||||
|
||||
assert_eq!(state.borrow().display_surface(1), DisplaySurface::Preview);
|
||||
assert!(view.popouts.borrow()[1].is_none());
|
||||
assert!(
|
||||
view.widgets.display_panes[1].picture.paintable().is_none(),
|
||||
"idle docked pane should not retain a stale preview texture"
|
||||
);
|
||||
assert!(
|
||||
view.widgets.display_panes[1]
|
||||
.preview_placeholder
|
||||
.is_visible(),
|
||||
"closed-eye placeholder should return when docking back after disconnect"
|
||||
);
|
||||
assert_eq!(
|
||||
view.widgets.display_panes[1].stream_status.text(),
|
||||
"Connect relay to preview."
|
||||
);
|
||||
}
|
||||
|
||||
fn realistic_device_catalog() -> DeviceCatalog {
|
||||
DeviceCatalog {
|
||||
cameras: vec!["usb-046d_Logitech_BRIO_5F6EB379-video-index0".to_string()],
|
||||
|
||||
@ -28,8 +28,8 @@ use {
|
||||
refresh_launcher_ui, refresh_test_buttons, routing_name, selected_combo_value,
|
||||
selected_server_addr, shutdown_launcher_runtime, spawn_client_process, stop_child_process,
|
||||
toggle_key_label, update_test_action_result, uplink_camera_preview_path,
|
||||
uplink_mic_level_path, write_audio_gain_request, write_input_routing_request,
|
||||
write_input_toggle_key_request, write_mic_gain_request,
|
||||
uplink_mic_level_path, uplink_telemetry_path, write_audio_gain_request,
|
||||
write_input_routing_request, write_input_toggle_key_request, write_mic_gain_request,
|
||||
},
|
||||
crate::handshake::{HandshakeProbe, probe},
|
||||
crate::output::display::enumerate_monitors,
|
||||
|
||||
@ -69,6 +69,7 @@ fn record_diagnostics_sample(
|
||||
widgets: &super::ui_components::LauncherWidgets,
|
||||
state: &LauncherState,
|
||||
preview: Option<&super::preview::LauncherPreview>,
|
||||
uplink: Option<&crate::uplink_telemetry::UplinkTelemetrySnapshot>,
|
||||
network: NetworkSnapshot,
|
||||
client_process_cpu_pct: f32,
|
||||
) {
|
||||
@ -148,6 +149,10 @@ fn record_diagnostics_sample(
|
||||
right_stream_caps_label: right_metrics.stream_caps_label.clone(),
|
||||
right_decoded_caps_label: right_metrics.decoded_caps_label.clone(),
|
||||
right_rendered_caps_label: right_metrics.rendered_caps_label.clone(),
|
||||
upstream_camera: uplink.map(|snapshot| snapshot.camera.clone()).unwrap_or_default(),
|
||||
upstream_microphone: uplink
|
||||
.map(|snapshot| snapshot.microphone.clone())
|
||||
.unwrap_or_default(),
|
||||
dropped_frames: left_metrics
|
||||
.dropped_frames
|
||||
.saturating_add(right_metrics.dropped_frames),
|
||||
|
||||
@ -25,6 +25,7 @@
|
||||
let log_tx = log_tx.clone();
|
||||
let camera_preview_path = uplink_camera_preview_path();
|
||||
let mic_level_path = uplink_mic_level_path();
|
||||
let uplink_telemetry_path = uplink_telemetry_path();
|
||||
glib::timeout_add_local(Duration::from_millis(180), move || {
|
||||
let child_running = reap_exited_child(&child_proc);
|
||||
if let Some(preview) = preview.as_ref() {
|
||||
@ -328,6 +329,8 @@
|
||||
|
||||
if now >= next_diagnostics_sample.get() {
|
||||
let network = diagnostics_network.borrow_mut().snapshot();
|
||||
let uplink =
|
||||
crate::uplink_telemetry::load_uplink_telemetry(&uplink_telemetry_path);
|
||||
let client_process_cpu_pct = diagnostics_process
|
||||
.borrow_mut()
|
||||
.sample_percent()
|
||||
@ -336,6 +339,7 @@
|
||||
&widgets,
|
||||
&state.borrow(),
|
||||
preview.as_ref().map(|preview| preview.as_ref()),
|
||||
uplink.as_ref(),
|
||||
network,
|
||||
client_process_cpu_pct,
|
||||
);
|
||||
|
||||
@ -87,6 +87,12 @@ pub fn uplink_mic_level_path() -> PathBuf {
|
||||
.unwrap_or_else(|_| PathBuf::from(DEFAULT_UPLINK_MIC_LEVEL_PATH))
|
||||
}
|
||||
|
||||
pub fn uplink_telemetry_path() -> PathBuf {
|
||||
std::env::var(UPLINK_TELEMETRY_ENV)
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|_| PathBuf::from(DEFAULT_UPLINK_TELEMETRY_PATH))
|
||||
}
|
||||
|
||||
pub fn write_input_routing_request(path: &Path, routing: InputRouting) -> Result<()> {
|
||||
std::fs::write(
|
||||
path,
|
||||
|
||||
@ -187,6 +187,12 @@ pub fn dock_display_to_preview(
|
||||
state.set_display_surface(monitor_id, DisplaySurface::Preview);
|
||||
}
|
||||
let child_running = child_proc.borrow().is_some();
|
||||
if !child_running {
|
||||
let pane = &widgets.display_panes[monitor_id];
|
||||
pane.picture.set_paintable(Option::<&gdk::Paintable>::None);
|
||||
pane.preview_placeholder.set_visible(true);
|
||||
pane.stream_status.set_text("Connect relay to preview.");
|
||||
}
|
||||
let state_snapshot = state.borrow().clone();
|
||||
refresh_launcher_ui(widgets, &state_snapshot, child_running);
|
||||
}
|
||||
@ -229,6 +235,13 @@ pub fn dock_all_displays_to_preview(
|
||||
}
|
||||
|
||||
let child_running = child_proc.borrow().is_some();
|
||||
if !child_running {
|
||||
for pane in &widgets.display_panes {
|
||||
pane.picture.set_paintable(Option::<&gdk::Paintable>::None);
|
||||
pane.preview_placeholder.set_visible(true);
|
||||
pane.stream_status.set_text("Connect relay to preview.");
|
||||
}
|
||||
}
|
||||
let state_snapshot = state.borrow().clone();
|
||||
refresh_launcher_ui(widgets, &state_snapshot, child_running);
|
||||
}
|
||||
@ -243,9 +256,10 @@ pub fn refresh_display_pane(pane: &DisplayPaneWidgets, surface: DisplaySurface)
|
||||
DisplaySurface::Preview => {
|
||||
pane.stack.set_visible_child_name("preview");
|
||||
pane.action_button.set_label("Break Out");
|
||||
pane.preview_placeholder
|
||||
.set_visible(pane.picture.paintable().is_none());
|
||||
if pane.preview_binding.borrow().is_none() {
|
||||
pane.stream_status.set_text("Preview unavailable");
|
||||
pane.preview_placeholder.set_visible(true);
|
||||
}
|
||||
}
|
||||
DisplaySurface::Window => {
|
||||
|
||||
@ -45,6 +45,9 @@ pub fn spawn_client_process(
|
||||
let mic_level_path = uplink_mic_level_path();
|
||||
let _ = std::fs::remove_file(&mic_level_path);
|
||||
command.env(UPLINK_MIC_LEVEL_ENV, mic_level_path);
|
||||
let uplink_telemetry_path = uplink_telemetry_path();
|
||||
let _ = std::fs::remove_file(&uplink_telemetry_path);
|
||||
command.env(UPLINK_TELEMETRY_ENV, uplink_telemetry_path);
|
||||
for (key, value) in runtime_env_vars(state) {
|
||||
command.env(key, value);
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@ pub const AUDIO_GAIN_CONTROL_ENV: &str = "LESAVKA_AUDIO_GAIN_CONTROL";
|
||||
pub const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL";
|
||||
pub const UPLINK_CAMERA_PREVIEW_ENV: &str = "LESAVKA_UPLINK_CAMERA_PREVIEW";
|
||||
pub const UPLINK_MIC_LEVEL_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL";
|
||||
pub use crate::uplink_telemetry::{DEFAULT_UPLINK_TELEMETRY_PATH, UPLINK_TELEMETRY_ENV};
|
||||
pub const DEFAULT_INPUT_CONTROL_PATH: &str = "/tmp/lesavka-launcher-input.control";
|
||||
pub const DEFAULT_INPUT_STATE_PATH: &str = "/tmp/lesavka-launcher-input.state";
|
||||
pub const DEFAULT_TOGGLE_KEY_CONTROL_PATH: &str = "/tmp/lesavka-launcher-toggle-key.control";
|
||||
|
||||
@ -14,6 +14,9 @@ pub mod launcher;
|
||||
pub mod layout;
|
||||
pub mod output;
|
||||
pub mod paste;
|
||||
pub(crate) mod uplink_fresh_queue;
|
||||
pub(crate) mod uplink_latency_harness;
|
||||
pub(crate) mod uplink_telemetry;
|
||||
pub(crate) mod video_support;
|
||||
|
||||
pub use app::LesavkaClientApp;
|
||||
|
||||
288
client/src/uplink_fresh_queue.rs
Normal file
288
client/src/uplink_fresh_queue.rs
Normal file
@ -0,0 +1,288 @@
|
||||
#![cfg_attr(not(test), allow(dead_code))]
|
||||
//! Freshness-first bounded queue for live-call upstream media.
|
||||
//!
|
||||
//! The queue keeps the newest useful packets and drops stale or superseded ones
|
||||
//! instead of preserving backlog indefinitely.
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
/// Queue admission/configuration for one live media stream.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct FreshQueueConfig {
|
||||
/// Maximum number of packets retained while the transport catches up.
|
||||
pub capacity: usize,
|
||||
/// Maximum packet age tolerated before the packet is considered stale.
|
||||
pub max_age: Duration,
|
||||
}
|
||||
|
||||
/// Statistics returned after pushing one packet into the bounded queue.
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
|
||||
pub struct QueuePushStats {
|
||||
/// Queue depth after the packet was admitted.
|
||||
pub queue_depth: usize,
|
||||
/// Number of older packets dropped because the queue was already full.
|
||||
pub dropped_queue_full: u64,
|
||||
}
|
||||
|
||||
/// Result of taking the next fresh packet from the queue.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct QueuePop<T> {
|
||||
/// Fresh packet ready to send, if any.
|
||||
pub packet: Option<T>,
|
||||
/// Queue depth after removing stale packets and the returned packet.
|
||||
pub queue_depth: usize,
|
||||
/// Number of stale packets discarded before a fresh packet was found.
|
||||
pub dropped_stale: u64,
|
||||
/// Fresh packet age at the moment it left the queue.
|
||||
pub delivery_age: Duration,
|
||||
/// Whether the queue was closed and drained.
|
||||
pub closed: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct QueuedPacket<T> {
|
||||
packet: T,
|
||||
queued_at: Instant,
|
||||
age_at_enqueue: Duration,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct QueueState<T> {
|
||||
queue: VecDeque<QueuedPacket<T>>,
|
||||
closed: bool,
|
||||
}
|
||||
|
||||
/// Shared bounded queue for the blocking capture thread and async gRPC stream.
|
||||
#[derive(Debug)]
|
||||
pub struct FreshPacketQueue<T> {
|
||||
config: FreshQueueConfig,
|
||||
inner: Arc<Mutex<QueueState<T>>>,
|
||||
notify: Arc<Notify>,
|
||||
}
|
||||
|
||||
impl<T> Clone for FreshPacketQueue<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
config: self.config,
|
||||
inner: Arc::clone(&self.inner),
|
||||
notify: Arc::clone(&self.notify),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> FreshPacketQueue<T> {
|
||||
/// Creates a new bounded queue that favors fresh packets over backlog.
|
||||
#[must_use]
|
||||
pub fn new(config: FreshQueueConfig) -> Self {
|
||||
assert!(config.capacity > 0, "fresh queue capacity must be non-zero");
|
||||
assert!(
|
||||
!config.max_age.is_zero(),
|
||||
"fresh queue max age must be non-zero"
|
||||
);
|
||||
Self {
|
||||
config,
|
||||
inner: Arc::new(Mutex::new(QueueState {
|
||||
queue: VecDeque::with_capacity(config.capacity),
|
||||
closed: false,
|
||||
})),
|
||||
notify: Arc::new(Notify::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Pushes a new packet into the queue, dropping the oldest retained packet if full.
|
||||
#[must_use]
|
||||
pub fn push(&self, packet: T, age_at_enqueue: Duration) -> QueuePushStats {
|
||||
let mut state = self.inner.lock().expect("fresh queue mutex poisoned");
|
||||
if state.closed {
|
||||
return QueuePushStats::default();
|
||||
}
|
||||
|
||||
let mut dropped_queue_full = 0_u64;
|
||||
if state.queue.len() == self.config.capacity {
|
||||
let _ = state.queue.pop_front();
|
||||
dropped_queue_full = 1;
|
||||
}
|
||||
state.queue.push_back(QueuedPacket {
|
||||
packet,
|
||||
queued_at: Instant::now(),
|
||||
age_at_enqueue,
|
||||
});
|
||||
let queue_depth = state.queue.len();
|
||||
drop(state);
|
||||
self.notify.notify_one();
|
||||
QueuePushStats {
|
||||
queue_depth,
|
||||
dropped_queue_full,
|
||||
}
|
||||
}
|
||||
|
||||
/// Closes the queue and wakes any waiting consumer.
|
||||
pub fn close(&self) {
|
||||
if let Ok(mut state) = self.inner.lock() {
|
||||
state.closed = true;
|
||||
}
|
||||
self.notify.notify_waiters();
|
||||
}
|
||||
|
||||
/// Pops the next fresh packet, discarding any packets that have aged past the live budget.
|
||||
pub async fn pop_fresh(&self) -> QueuePop<T> {
|
||||
let mut dropped_stale = 0_u64;
|
||||
loop {
|
||||
let wait_for_more = {
|
||||
let mut state = self.inner.lock().expect("fresh queue mutex poisoned");
|
||||
while let Some(front) = state.queue.pop_front() {
|
||||
let delivery_age = front.age_at_enqueue + front.queued_at.elapsed();
|
||||
if delivery_age > self.config.max_age {
|
||||
dropped_stale = dropped_stale.saturating_add(1);
|
||||
continue;
|
||||
}
|
||||
return QueuePop {
|
||||
packet: Some(front.packet),
|
||||
queue_depth: state.queue.len(),
|
||||
dropped_stale,
|
||||
delivery_age,
|
||||
closed: false,
|
||||
};
|
||||
}
|
||||
if state.closed {
|
||||
return QueuePop {
|
||||
packet: None,
|
||||
queue_depth: 0,
|
||||
dropped_stale,
|
||||
delivery_age: Duration::ZERO,
|
||||
closed: true,
|
||||
};
|
||||
}
|
||||
self.notify.notified()
|
||||
};
|
||||
wait_for_more.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn push_drops_oldest_when_queue_is_full() {
|
||||
let queue = FreshPacketQueue::new(FreshQueueConfig {
|
||||
capacity: 2,
|
||||
max_age: Duration::from_secs(1),
|
||||
});
|
||||
|
||||
let first = queue.push(1_u8, Duration::ZERO);
|
||||
let second = queue.push(2_u8, Duration::ZERO);
|
||||
let third = queue.push(3_u8, Duration::ZERO);
|
||||
|
||||
assert_eq!(first.dropped_queue_full, 0);
|
||||
assert_eq!(second.queue_depth, 2);
|
||||
assert_eq!(third.dropped_queue_full, 1);
|
||||
|
||||
let popped = queue.pop_fresh().await;
|
||||
assert_eq!(popped.packet, Some(2));
|
||||
let popped = queue.pop_fresh().await;
|
||||
assert_eq!(popped.packet, Some(3));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pop_fresh_discards_stale_packets_before_returning_live_media() {
|
||||
let queue = FreshPacketQueue::new(FreshQueueConfig {
|
||||
capacity: 3,
|
||||
max_age: Duration::from_millis(60),
|
||||
});
|
||||
|
||||
let _ = queue.push(1_u8, Duration::from_millis(40));
|
||||
let _ = queue.push(2_u8, Duration::ZERO);
|
||||
tokio::time::sleep(Duration::from_millis(30)).await;
|
||||
|
||||
let popped = queue.pop_fresh().await;
|
||||
assert_eq!(popped.packet, Some(2));
|
||||
assert_eq!(popped.dropped_stale, 1);
|
||||
assert!(popped.delivery_age <= Duration::from_millis(40));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pop_fresh_returns_closed_when_queue_is_drained() {
|
||||
let queue = FreshPacketQueue::<u8>::new(FreshQueueConfig {
|
||||
capacity: 1,
|
||||
max_age: Duration::from_secs(1),
|
||||
});
|
||||
|
||||
queue.close();
|
||||
let popped = queue.pop_fresh().await;
|
||||
assert!(popped.closed);
|
||||
assert!(popped.packet.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn clone_shares_the_same_underlying_queue() {
|
||||
let queue = FreshPacketQueue::new(FreshQueueConfig {
|
||||
capacity: 2,
|
||||
max_age: Duration::from_secs(1),
|
||||
});
|
||||
let cloned = queue.clone();
|
||||
|
||||
let _ = cloned.push(7_u8, Duration::ZERO);
|
||||
let popped = queue.pop_fresh().await;
|
||||
|
||||
assert_eq!(popped.packet, Some(7));
|
||||
assert_eq!(popped.queue_depth, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn push_returns_default_after_close() {
|
||||
let queue = FreshPacketQueue::new(FreshQueueConfig {
|
||||
capacity: 2,
|
||||
max_age: Duration::from_secs(1),
|
||||
});
|
||||
|
||||
queue.close();
|
||||
let stats = queue.push(9_u8, Duration::ZERO);
|
||||
let popped = queue.pop_fresh().await;
|
||||
|
||||
assert_eq!(stats, QueuePushStats::default());
|
||||
assert!(popped.closed);
|
||||
assert!(popped.packet.is_none());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pop_fresh_waits_for_a_future_packet() {
|
||||
let queue = FreshPacketQueue::new(FreshQueueConfig {
|
||||
capacity: 2,
|
||||
max_age: Duration::from_secs(1),
|
||||
});
|
||||
let waiter = queue.clone();
|
||||
|
||||
let task = tokio::spawn(async move { waiter.pop_fresh().await });
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
let _ = queue.push(5_u8, Duration::from_millis(10));
|
||||
|
||||
let popped = task.await.expect("waiter task");
|
||||
assert_eq!(popped.packet, Some(5));
|
||||
assert!(!popped.closed);
|
||||
assert!(popped.delivery_age >= Duration::from_millis(10));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pop_fresh_waiter_wakes_when_the_queue_closes() {
|
||||
let queue = FreshPacketQueue::<u8>::new(FreshQueueConfig {
|
||||
capacity: 2,
|
||||
max_age: Duration::from_secs(1),
|
||||
});
|
||||
let waiter = queue.clone();
|
||||
|
||||
let task = tokio::spawn(async move { waiter.pop_fresh().await });
|
||||
tokio::time::sleep(Duration::from_millis(20)).await;
|
||||
queue.close();
|
||||
|
||||
let popped = task.await.expect("waiter task");
|
||||
assert!(popped.closed);
|
||||
assert!(popped.packet.is_none());
|
||||
}
|
||||
}
|
||||
270
client/src/uplink_latency_harness.rs
Normal file
270
client/src/uplink_latency_harness.rs
Normal file
@ -0,0 +1,270 @@
|
||||
#![cfg_attr(not(test), allow(dead_code))]
|
||||
//! Synthetic upstream queue harness for live-call latency experiments.
|
||||
//!
|
||||
//! This models the relay child's bounded async queue under temporary downstream stalls.
|
||||
//! The goal is not to emulate GStreamer perfectly; it is to answer the operational
|
||||
//! question we care about: does the current queue policy preserve stale media instead
|
||||
//! of keeping the stream fresh?
|
||||
|
||||
use std::{collections::VecDeque, time::Duration};
|
||||
|
||||
/// Queue policy to simulate under upstream backpressure.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum UplinkQueuePolicy {
|
||||
/// Preserve every queued packet, blocking new admission until space returns.
|
||||
PreserveBacklog,
|
||||
/// Drop the oldest queued packet when full so the newest packet stays live.
|
||||
DropOldestWhenFull,
|
||||
}
|
||||
|
||||
/// Deterministic synthetic queue configuration.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct UplinkHarnessConfig {
|
||||
/// Packet cadence at the capture side.
|
||||
pub capture_interval: Duration,
|
||||
/// Packet cadence at the consumer side.
|
||||
pub consume_interval: Duration,
|
||||
/// Number of packets admitted to the async queue before backpressure kicks in.
|
||||
pub queue_capacity: usize,
|
||||
/// Total packets the synthetic capture source will attempt to produce.
|
||||
pub total_packets: usize,
|
||||
/// Optional one-shot downstream stall start time.
|
||||
pub stall_after: Option<Duration>,
|
||||
/// One-shot downstream stall duration.
|
||||
pub stall_duration: Duration,
|
||||
}
|
||||
|
||||
/// Summary from one synthetic run.
|
||||
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
|
||||
pub struct UplinkHarnessResult {
|
||||
/// Packets accepted into the async queue.
|
||||
pub enqueued_packets: usize,
|
||||
/// Packets consumed from the async queue.
|
||||
pub delivered_packets: usize,
|
||||
/// Packets dropped at queue admission time.
|
||||
pub dropped_packets: usize,
|
||||
/// Highest queue depth observed.
|
||||
pub max_queue_depth: usize,
|
||||
/// Oldest packet age at queue admission time.
|
||||
pub max_enqueue_age: Duration,
|
||||
/// Oldest packet age at playout time.
|
||||
pub max_delivery_age: Duration,
|
||||
/// Longest time a producer packet spent blocked before queue admission.
|
||||
pub max_block_time: Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct PendingPacket {
|
||||
captured_at: Duration,
|
||||
blocked_at: Option<Duration>,
|
||||
}
|
||||
|
||||
/// Runs the synthetic queue harness under the selected policy.
|
||||
#[must_use]
|
||||
pub fn run_uplink_harness(
|
||||
config: UplinkHarnessConfig,
|
||||
policy: UplinkQueuePolicy,
|
||||
) -> UplinkHarnessResult {
|
||||
assert!(config.queue_capacity > 0, "queue capacity must be non-zero");
|
||||
assert!(
|
||||
!config.capture_interval.is_zero(),
|
||||
"capture interval must be non-zero"
|
||||
);
|
||||
assert!(
|
||||
!config.consume_interval.is_zero(),
|
||||
"consume interval must be non-zero"
|
||||
);
|
||||
|
||||
let mut result = UplinkHarnessResult::default();
|
||||
let mut queue: VecDeque<PendingPacket> = VecDeque::with_capacity(config.queue_capacity);
|
||||
let mut produced = 0usize;
|
||||
let mut next_capture_at = Duration::ZERO;
|
||||
let mut next_consume_at = config.consume_interval;
|
||||
let mut blocked_packet = None::<PendingPacket>;
|
||||
|
||||
loop {
|
||||
let capture_ready = (produced < config.total_packets && blocked_packet.is_none())
|
||||
.then_some(next_capture_at);
|
||||
let consume_ready = ((!queue.is_empty() || blocked_packet.is_some())
|
||||
&& produced <= config.total_packets)
|
||||
.then_some(next_allowed_consume_at(next_consume_at, config));
|
||||
let next_event = min_option_duration(capture_ready, consume_ready);
|
||||
let Some(event_time) = next_event else {
|
||||
break;
|
||||
};
|
||||
let now = event_time;
|
||||
|
||||
if consume_ready == Some(now) {
|
||||
if let Some(packet) = queue.pop_front() {
|
||||
result.delivered_packets += 1;
|
||||
result.max_delivery_age = result.max_delivery_age.max(now - packet.captured_at);
|
||||
}
|
||||
next_consume_at = now + config.consume_interval;
|
||||
if let Some(packet) = blocked_packet.take() {
|
||||
let enqueue_age = now - packet.captured_at;
|
||||
result.max_enqueue_age = result.max_enqueue_age.max(enqueue_age);
|
||||
if let Some(blocked_at) = packet.blocked_at {
|
||||
result.max_block_time = result.max_block_time.max(now - blocked_at);
|
||||
}
|
||||
queue.push_back(PendingPacket {
|
||||
blocked_at: None,
|
||||
..packet
|
||||
});
|
||||
result.enqueued_packets += 1;
|
||||
result.max_queue_depth = result.max_queue_depth.max(queue.len());
|
||||
next_capture_at = now + config.capture_interval;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if capture_ready == Some(now) {
|
||||
let packet = PendingPacket {
|
||||
captured_at: now,
|
||||
blocked_at: None,
|
||||
};
|
||||
produced += 1;
|
||||
if queue.len() < config.queue_capacity {
|
||||
queue.push_back(packet);
|
||||
result.enqueued_packets += 1;
|
||||
result.max_queue_depth = result.max_queue_depth.max(queue.len());
|
||||
next_capture_at = now + config.capture_interval;
|
||||
} else {
|
||||
match policy {
|
||||
UplinkQueuePolicy::PreserveBacklog => {
|
||||
blocked_packet = Some(PendingPacket {
|
||||
blocked_at: Some(now),
|
||||
..packet
|
||||
});
|
||||
}
|
||||
UplinkQueuePolicy::DropOldestWhenFull => {
|
||||
let _ = queue.pop_front();
|
||||
result.dropped_packets += 1;
|
||||
queue.push_back(packet);
|
||||
result.enqueued_packets += 1;
|
||||
result.max_queue_depth = result.max_queue_depth.max(queue.len());
|
||||
next_capture_at = now + config.capture_interval;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn next_allowed_consume_at(next_consume_at: Duration, config: UplinkHarnessConfig) -> Duration {
|
||||
let Some(stall_after) = config.stall_after else {
|
||||
return next_consume_at;
|
||||
};
|
||||
let stall_end = stall_after + config.stall_duration;
|
||||
if next_consume_at >= stall_after && next_consume_at < stall_end {
|
||||
stall_end
|
||||
} else {
|
||||
next_consume_at
|
||||
}
|
||||
}
|
||||
|
||||
fn min_option_duration(left: Option<Duration>, right: Option<Duration>) -> Option<Duration> {
|
||||
match (left, right) {
|
||||
(Some(left), Some(right)) => Some(left.min(right)),
|
||||
(Some(left), None) => Some(left),
|
||||
(None, Some(right)) => Some(right),
|
||||
(None, None) => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn camera_stall_config() -> UplinkHarnessConfig {
|
||||
UplinkHarnessConfig {
|
||||
capture_interval: Duration::from_millis(33),
|
||||
consume_interval: Duration::from_millis(33),
|
||||
queue_capacity: 8,
|
||||
total_packets: 160,
|
||||
stall_after: Some(Duration::from_millis(800)),
|
||||
stall_duration: Duration::from_secs(2),
|
||||
}
|
||||
}
|
||||
|
||||
fn microphone_stall_config() -> UplinkHarnessConfig {
|
||||
UplinkHarnessConfig {
|
||||
capture_interval: Duration::from_millis(20),
|
||||
consume_interval: Duration::from_millis(20),
|
||||
queue_capacity: 16,
|
||||
total_packets: 320,
|
||||
stall_after: Some(Duration::from_millis(600)),
|
||||
stall_duration: Duration::from_secs(2),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preserve_backlog_camera_policy_accumulates_stale_video_after_a_stall() {
|
||||
let result = run_uplink_harness(camera_stall_config(), UplinkQueuePolicy::PreserveBacklog);
|
||||
|
||||
assert_eq!(result.dropped_packets, 0);
|
||||
assert!(result.max_queue_depth >= 8);
|
||||
assert!(
|
||||
result.max_enqueue_age >= Duration::from_millis(1500),
|
||||
"expected stale video to build, got {:?}",
|
||||
result.max_enqueue_age
|
||||
);
|
||||
assert!(
|
||||
result.max_delivery_age >= Duration::from_millis(1700),
|
||||
"expected stale playout to build, got {:?}",
|
||||
result.max_delivery_age
|
||||
);
|
||||
assert!(
|
||||
result.max_block_time >= Duration::from_millis(1500),
|
||||
"expected capture-side blocking, got {:?}",
|
||||
result.max_block_time
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn preserve_backlog_microphone_policy_accumulates_stale_audio_after_a_stall() {
|
||||
let result = run_uplink_harness(
|
||||
microphone_stall_config(),
|
||||
UplinkQueuePolicy::PreserveBacklog,
|
||||
);
|
||||
|
||||
assert_eq!(result.dropped_packets, 0);
|
||||
assert!(result.max_queue_depth >= 16);
|
||||
assert!(
|
||||
result.max_enqueue_age >= Duration::from_millis(1500),
|
||||
"expected stale audio to build, got {:?}",
|
||||
result.max_enqueue_age
|
||||
);
|
||||
assert!(
|
||||
result.max_delivery_age >= Duration::from_millis(1800),
|
||||
"expected stale audio playout, got {:?}",
|
||||
result.max_delivery_age
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drop_oldest_policy_keeps_media_live_by_sacrificing_old_packets() {
|
||||
let camera =
|
||||
run_uplink_harness(camera_stall_config(), UplinkQueuePolicy::DropOldestWhenFull);
|
||||
let microphone = run_uplink_harness(
|
||||
microphone_stall_config(),
|
||||
UplinkQueuePolicy::DropOldestWhenFull,
|
||||
);
|
||||
|
||||
assert!(camera.dropped_packets > 0);
|
||||
assert!(microphone.dropped_packets > 0);
|
||||
assert_eq!(camera.max_enqueue_age, Duration::ZERO);
|
||||
assert_eq!(microphone.max_enqueue_age, Duration::ZERO);
|
||||
assert!(
|
||||
camera.max_delivery_age <= Duration::from_millis(350),
|
||||
"freshness-first video should stay bounded, got {:?}",
|
||||
camera.max_delivery_age
|
||||
);
|
||||
assert!(
|
||||
microphone.max_delivery_age <= Duration::from_millis(400),
|
||||
"freshness-first audio should stay bounded, got {:?}",
|
||||
microphone.max_delivery_age
|
||||
);
|
||||
}
|
||||
}
|
||||
301
client/src/uplink_telemetry.rs
Normal file
301
client/src/uplink_telemetry.rs
Normal file
@ -0,0 +1,301 @@
|
||||
//! Shared upstream media telemetry for the relay child and launcher.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
fs,
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
/// Environment variable used to point the relay child at the upstream telemetry file.
|
||||
pub const UPLINK_TELEMETRY_ENV: &str = "LESAVKA_UPLINK_TELEMETRY";
|
||||
|
||||
/// Default JSON file used by the relay child to publish upstream queue telemetry.
|
||||
pub const DEFAULT_UPLINK_TELEMETRY_PATH: &str = "/tmp/lesavka-uplink-telemetry.json";
|
||||
|
||||
const FLUSH_INTERVAL: Duration = Duration::from_millis(250);
|
||||
|
||||
/// Camera/microphone telemetry sampled by the launcher diagnostics pane.
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
|
||||
pub struct UplinkTelemetrySnapshot {
|
||||
/// Last time the relay child wrote this snapshot, in Unix milliseconds.
|
||||
pub updated_at_unix_ms: u128,
|
||||
/// Upstream webcam queue telemetry.
|
||||
pub camera: UpstreamStreamTelemetry,
|
||||
/// Upstream microphone queue telemetry.
|
||||
pub microphone: UpstreamStreamTelemetry,
|
||||
}
|
||||
|
||||
/// Per-stream state for measuring reconnect churn and queue staleness.
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
|
||||
pub struct UpstreamStreamTelemetry {
|
||||
/// Whether the stream is enabled for the current relay session.
|
||||
pub enabled: bool,
|
||||
/// Whether the stream is currently attached to an active gRPC uplink.
|
||||
pub connected: bool,
|
||||
/// Number of gRPC connection attempts during the current relay child lifetime.
|
||||
pub reconnect_count: u64,
|
||||
/// Current bounded queue depth inside the relay child.
|
||||
pub queue_depth: u32,
|
||||
/// Highest observed bounded queue depth inside the relay child.
|
||||
pub queue_peak: u32,
|
||||
/// Age of the most recently enqueued packet, measured at queue admission time.
|
||||
pub latest_enqueue_age_ms: f32,
|
||||
/// Highest observed packet age at queue admission time.
|
||||
pub enqueue_age_peak_ms: f32,
|
||||
/// Highest observed time spent blocked while enqueuing into the async uplink queue.
|
||||
pub enqueue_block_peak_ms: f32,
|
||||
/// Number of packets admitted into the async uplink queue.
|
||||
pub packets_enqueued: u64,
|
||||
/// Number of packets emitted from the async uplink queue into the gRPC stream.
|
||||
pub packets_streamed: u64,
|
||||
/// Number of packets intentionally dropped before queue admission.
|
||||
pub dropped_packets: u64,
|
||||
/// Number of packets dropped because the bounded queue was already full.
|
||||
pub dropped_queue_full_packets: u64,
|
||||
/// Number of packets dropped because they exceeded the live-call age budget.
|
||||
pub dropped_stale_packets: u64,
|
||||
/// Age of the most recently streamed packet after queue residence was accounted for.
|
||||
pub latest_delivery_age_ms: f32,
|
||||
/// Highest observed streamed packet age after queue residence was accounted for.
|
||||
pub delivery_age_peak_ms: f32,
|
||||
/// Most recent queue/setup failure for this stream, if any.
|
||||
pub last_error: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub enum UpstreamStreamKind {
|
||||
Camera,
|
||||
Microphone,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TelemetryState {
|
||||
path: PathBuf,
|
||||
snapshot: UplinkTelemetrySnapshot,
|
||||
last_flush: Instant,
|
||||
}
|
||||
|
||||
/// Shared publisher used by the relay child to update queue telemetry.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UplinkTelemetryPublisher {
|
||||
inner: Arc<Mutex<TelemetryState>>,
|
||||
}
|
||||
|
||||
/// Per-stream handle used by individual uplink loops.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct UplinkTelemetryHandle {
|
||||
kind: UpstreamStreamKind,
|
||||
inner: Arc<Mutex<TelemetryState>>,
|
||||
}
|
||||
|
||||
impl UplinkTelemetryPublisher {
|
||||
/// Creates a publisher seeded with the current session's enabled uplinks.
|
||||
#[must_use]
|
||||
pub fn from_env(camera_enabled: bool, microphone_enabled: bool) -> Self {
|
||||
let path = std::env::var(UPLINK_TELEMETRY_ENV)
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_else(|_| PathBuf::from(DEFAULT_UPLINK_TELEMETRY_PATH));
|
||||
Self::new(path, camera_enabled, microphone_enabled)
|
||||
}
|
||||
|
||||
/// Creates a publisher that writes to an explicit JSON path.
|
||||
#[must_use]
|
||||
pub fn new(path: PathBuf, camera_enabled: bool, microphone_enabled: bool) -> Self {
|
||||
let state = TelemetryState {
|
||||
path,
|
||||
snapshot: UplinkTelemetrySnapshot {
|
||||
updated_at_unix_ms: unix_time_ms(),
|
||||
camera: UpstreamStreamTelemetry {
|
||||
enabled: camera_enabled,
|
||||
..UpstreamStreamTelemetry::default()
|
||||
},
|
||||
microphone: UpstreamStreamTelemetry {
|
||||
enabled: microphone_enabled,
|
||||
..UpstreamStreamTelemetry::default()
|
||||
},
|
||||
},
|
||||
last_flush: Instant::now() - FLUSH_INTERVAL,
|
||||
};
|
||||
let publisher = Self {
|
||||
inner: Arc::new(Mutex::new(state)),
|
||||
};
|
||||
publisher.flush_now();
|
||||
publisher
|
||||
}
|
||||
|
||||
/// Returns a per-stream handle for camera or microphone updates.
|
||||
#[must_use]
|
||||
pub fn handle(&self, kind: UpstreamStreamKind) -> UplinkTelemetryHandle {
|
||||
UplinkTelemetryHandle {
|
||||
kind,
|
||||
inner: Arc::clone(&self.inner),
|
||||
}
|
||||
}
|
||||
|
||||
/// Forces an immediate write of the current snapshot.
|
||||
pub fn flush_now(&self) {
|
||||
if let Ok(mut state) = self.inner.lock() {
|
||||
write_snapshot(&mut state, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UplinkTelemetryHandle {
|
||||
/// Records a fresh gRPC connection attempt for this stream.
|
||||
pub fn record_reconnect_attempt(&self) {
|
||||
self.update(false, |stream| {
|
||||
stream.connected = false;
|
||||
stream.queue_depth = 0;
|
||||
stream.reconnect_count = stream.reconnect_count.saturating_add(1);
|
||||
stream.last_error.clear();
|
||||
});
|
||||
}
|
||||
|
||||
/// Marks the stream as successfully connected to the relay server.
|
||||
pub fn record_connected(&self) {
|
||||
self.update(true, |stream| {
|
||||
stream.connected = true;
|
||||
stream.last_error.clear();
|
||||
});
|
||||
}
|
||||
|
||||
/// Marks the stream as disconnected and stores the latest error detail.
|
||||
pub fn record_disconnect(&self, error: impl AsRef<str>) {
|
||||
let error = error.as_ref().trim().to_string();
|
||||
self.update(true, |stream| {
|
||||
stream.connected = false;
|
||||
stream.queue_depth = 0;
|
||||
stream.latest_enqueue_age_ms = 0.0;
|
||||
stream.latest_delivery_age_ms = 0.0;
|
||||
if !error.is_empty() {
|
||||
stream.last_error = error.clone();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Records one packet entering the async uplink queue.
|
||||
pub fn record_enqueue(&self, queue_depth: u32, enqueue_age_ms: f32, enqueue_block_ms: f32) {
|
||||
self.update(false, |stream| {
|
||||
stream.queue_depth = queue_depth;
|
||||
stream.queue_peak = stream.queue_peak.max(queue_depth);
|
||||
stream.latest_enqueue_age_ms = enqueue_age_ms.max(0.0);
|
||||
stream.enqueue_age_peak_ms = stream.enqueue_age_peak_ms.max(enqueue_age_ms.max(0.0));
|
||||
stream.enqueue_block_peak_ms =
|
||||
stream.enqueue_block_peak_ms.max(enqueue_block_ms.max(0.0));
|
||||
stream.packets_enqueued = stream.packets_enqueued.saturating_add(1);
|
||||
stream.last_error.clear();
|
||||
});
|
||||
}
|
||||
|
||||
/// Records packets dropped because the bounded queue was already full.
|
||||
pub fn record_queue_full_drop(&self, count: u64) {
|
||||
self.update(false, |stream| {
|
||||
stream.dropped_packets = stream.dropped_packets.saturating_add(count);
|
||||
stream.dropped_queue_full_packets =
|
||||
stream.dropped_queue_full_packets.saturating_add(count);
|
||||
});
|
||||
}
|
||||
|
||||
/// Records packets dropped because they exceeded the live-call age budget.
|
||||
pub fn record_stale_drop(&self, count: u64) {
|
||||
self.update(false, |stream| {
|
||||
stream.dropped_packets = stream.dropped_packets.saturating_add(count);
|
||||
stream.dropped_stale_packets = stream.dropped_stale_packets.saturating_add(count);
|
||||
});
|
||||
}
|
||||
|
||||
/// Records one packet emitted from the bounded queue into the gRPC stream.
|
||||
pub fn record_streamed(&self, queue_depth: u32, delivery_age_ms: f32) {
|
||||
self.update(false, |stream| {
|
||||
stream.connected = true;
|
||||
stream.queue_depth = queue_depth;
|
||||
stream.latest_delivery_age_ms = delivery_age_ms.max(0.0);
|
||||
stream.delivery_age_peak_ms = stream.delivery_age_peak_ms.max(delivery_age_ms.max(0.0));
|
||||
stream.packets_streamed = stream.packets_streamed.saturating_add(1);
|
||||
stream.last_error.clear();
|
||||
});
|
||||
}
|
||||
|
||||
fn update(&self, force_flush: bool, update_stream: impl FnOnce(&mut UpstreamStreamTelemetry)) {
|
||||
if let Ok(mut state) = self.inner.lock() {
|
||||
state.snapshot.updated_at_unix_ms = unix_time_ms();
|
||||
update_stream(stream_for_kind(&mut state.snapshot, self.kind));
|
||||
write_snapshot(&mut state, force_flush);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads the latest upstream telemetry snapshot written by the relay child.
|
||||
pub fn load_uplink_telemetry(path: &Path) -> Option<UplinkTelemetrySnapshot> {
|
||||
let raw = fs::read_to_string(path).ok()?;
|
||||
serde_json::from_str(&raw).ok()
|
||||
}
|
||||
|
||||
fn stream_for_kind(
|
||||
snapshot: &mut UplinkTelemetrySnapshot,
|
||||
kind: UpstreamStreamKind,
|
||||
) -> &mut UpstreamStreamTelemetry {
|
||||
match kind {
|
||||
UpstreamStreamKind::Camera => &mut snapshot.camera,
|
||||
UpstreamStreamKind::Microphone => &mut snapshot.microphone,
|
||||
}
|
||||
}
|
||||
|
||||
fn write_snapshot(state: &mut TelemetryState, force_flush: bool) {
|
||||
if !force_flush && state.last_flush.elapsed() < FLUSH_INTERVAL {
|
||||
return;
|
||||
}
|
||||
if let Ok(rendered) = serde_json::to_string(&state.snapshot) {
|
||||
let _ = fs::write(&state.path, rendered);
|
||||
state.last_flush = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
fn unix_time_ms() -> u128 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map(|duration| duration.as_millis())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn handle_updates_keep_queue_peaks_and_errors() {
|
||||
let temp_dir = tempfile::tempdir().expect("tempdir");
|
||||
let path = temp_dir.path().join("uplink.json");
|
||||
let publisher = UplinkTelemetryPublisher::new(path.clone(), true, false);
|
||||
let camera = publisher.handle(UpstreamStreamKind::Camera);
|
||||
|
||||
camera.record_reconnect_attempt();
|
||||
camera.record_connected();
|
||||
camera.record_enqueue(3, 41.0, 6.0);
|
||||
camera.record_enqueue(1, 12.0, 2.0);
|
||||
camera.record_streamed(0, 55.0);
|
||||
camera.record_queue_full_drop(2);
|
||||
camera.record_stale_drop(3);
|
||||
camera.record_disconnect("stream ended");
|
||||
publisher.flush_now();
|
||||
|
||||
let snapshot = load_uplink_telemetry(&path).expect("load snapshot");
|
||||
assert!(snapshot.camera.enabled);
|
||||
assert!(!snapshot.microphone.enabled);
|
||||
assert!(!snapshot.camera.connected);
|
||||
assert_eq!(snapshot.camera.reconnect_count, 1);
|
||||
assert_eq!(snapshot.camera.queue_peak, 3);
|
||||
assert_eq!(snapshot.camera.latest_enqueue_age_ms, 0.0);
|
||||
assert_eq!(snapshot.camera.enqueue_age_peak_ms, 41.0);
|
||||
assert_eq!(snapshot.camera.enqueue_block_peak_ms, 6.0);
|
||||
assert_eq!(snapshot.camera.packets_enqueued, 2);
|
||||
assert_eq!(snapshot.camera.packets_streamed, 1);
|
||||
assert_eq!(snapshot.camera.dropped_packets, 5);
|
||||
assert_eq!(snapshot.camera.dropped_queue_full_packets, 2);
|
||||
assert_eq!(snapshot.camera.dropped_stale_packets, 3);
|
||||
assert_eq!(snapshot.camera.delivery_age_peak_ms, 55.0);
|
||||
assert_eq!(snapshot.camera.last_error, "stream ended");
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lesavka_common"
|
||||
version = "0.12.3"
|
||||
version = "0.12.4"
|
||||
edition = "2024"
|
||||
build = "build.rs"
|
||||
|
||||
|
||||
@ -6,7 +6,7 @@
|
||||
},
|
||||
"client/src/app/session_lifecycle.rs": {
|
||||
"line_percent": 97.56,
|
||||
"loc": 304
|
||||
"loc": 324
|
||||
},
|
||||
"client/src/app_support.rs": {
|
||||
"line_percent": 100.0,
|
||||
@ -102,15 +102,15 @@
|
||||
},
|
||||
"client/src/launcher/diagnostics/diagnostics_models.rs": {
|
||||
"line_percent": 100.0,
|
||||
"loc": 164
|
||||
"loc": 170
|
||||
},
|
||||
"client/src/launcher/diagnostics/recommendations.rs": {
|
||||
"line_percent": 97.56,
|
||||
"loc": 230
|
||||
"line_percent": 97.62,
|
||||
"loc": 277
|
||||
},
|
||||
"client/src/launcher/diagnostics/snapshot_report.rs": {
|
||||
"line_percent": 99.35,
|
||||
"loc": 410
|
||||
"line_percent": 98.22,
|
||||
"loc": 465
|
||||
},
|
||||
"client/src/launcher/mod.rs": {
|
||||
"line_percent": 100.0,
|
||||
@ -168,6 +168,18 @@
|
||||
"line_percent": 100.0,
|
||||
"loc": 82
|
||||
},
|
||||
"client/src/uplink_fresh_queue.rs": {
|
||||
"line_percent": 100.0,
|
||||
"loc": 288
|
||||
},
|
||||
"client/src/uplink_latency_harness.rs": {
|
||||
"line_percent": 98.65,
|
||||
"loc": 270
|
||||
},
|
||||
"client/src/uplink_telemetry.rs": {
|
||||
"line_percent": 95.76,
|
||||
"loc": 301
|
||||
},
|
||||
"client/src/video_support.rs": {
|
||||
"line_percent": 97.3,
|
||||
"loc": 56
|
||||
|
||||
@ -10,7 +10,7 @@ bench = false
|
||||
|
||||
[package]
|
||||
name = "lesavka_server"
|
||||
version = "0.12.3"
|
||||
version = "0.12.4"
|
||||
edition = "2024"
|
||||
autobins = false
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user