lesavka/client/src/uplink_telemetry.rs

337 lines
13 KiB
Rust

//! Shared upstream media telemetry for the relay child and launcher.
use serde::{Deserialize, Serialize};
use std::{
fs,
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
/// Environment variable used to point the relay child at the upstream telemetry file.
pub const UPLINK_TELEMETRY_ENV: &str = "LESAVKA_UPLINK_TELEMETRY";
/// Default JSON file used by the relay child to publish upstream queue telemetry.
pub const DEFAULT_UPLINK_TELEMETRY_PATH: &str = "/tmp/lesavka-uplink-telemetry.json";
const FLUSH_INTERVAL: Duration = Duration::from_millis(250);
/// Camera/microphone telemetry sampled by the launcher diagnostics pane.
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct UplinkTelemetrySnapshot {
/// Last time the relay child wrote this snapshot, in Unix milliseconds.
pub updated_at_unix_ms: u128,
/// Upstream webcam queue telemetry.
pub camera: UpstreamStreamTelemetry,
/// Upstream microphone queue telemetry.
pub microphone: UpstreamStreamTelemetry,
}
/// Per-stream state for measuring reconnect churn and queue staleness.
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct UpstreamStreamTelemetry {
/// Whether the stream is enabled for the current relay session.
pub enabled: bool,
/// Whether the stream is currently attached to an active gRPC uplink.
pub connected: bool,
/// Number of gRPC connection attempts during the current relay child lifetime.
pub reconnect_count: u64,
/// Current bounded queue depth inside the relay child.
pub queue_depth: u32,
/// Highest observed bounded queue depth inside the relay child.
pub queue_peak: u32,
/// Age of the most recently enqueued packet, measured at queue admission time.
pub latest_enqueue_age_ms: f32,
/// Highest observed packet age at queue admission time.
pub enqueue_age_peak_ms: f32,
/// Highest observed time spent blocked while enqueuing into the async uplink queue.
pub enqueue_block_peak_ms: f32,
/// Number of packets admitted into the async uplink queue.
pub packets_enqueued: u64,
/// Number of packets emitted from the async uplink queue into the gRPC stream.
pub packets_streamed: u64,
/// Number of packets intentionally dropped before queue admission.
pub dropped_packets: u64,
/// Number of packets dropped because the bounded queue was already full.
pub dropped_queue_full_packets: u64,
/// Number of packets dropped because they exceeded the live-call age budget.
pub dropped_stale_packets: u64,
/// Age of the most recently streamed packet after queue residence was accounted for.
pub latest_delivery_age_ms: f32,
/// Highest observed streamed packet age after queue residence was accounted for.
pub delivery_age_peak_ms: f32,
/// Most recent queue/setup failure for this stream, if any.
pub last_error: String,
}
#[derive(Clone, Copy, Debug)]
pub enum UpstreamStreamKind {
Camera,
Microphone,
}
#[derive(Debug)]
struct TelemetryState {
path: PathBuf,
snapshot: UplinkTelemetrySnapshot,
last_flush: Instant,
}
/// Shared publisher used by the relay child to update queue telemetry.
#[derive(Clone, Debug)]
pub struct UplinkTelemetryPublisher {
inner: Arc<Mutex<TelemetryState>>,
}
/// Per-stream handle used by individual uplink loops.
#[derive(Clone, Debug)]
pub struct UplinkTelemetryHandle {
kind: UpstreamStreamKind,
inner: Arc<Mutex<TelemetryState>>,
}
impl UplinkTelemetryPublisher {
/// Creates a publisher seeded with the current session's enabled uplinks.
#[must_use]
pub fn from_env(camera_enabled: bool, microphone_enabled: bool) -> Self {
let path = std::env::var(UPLINK_TELEMETRY_ENV)
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from(DEFAULT_UPLINK_TELEMETRY_PATH));
Self::new(path, camera_enabled, microphone_enabled)
}
/// Creates a publisher that writes to an explicit JSON path.
#[must_use]
pub fn new(path: PathBuf, camera_enabled: bool, microphone_enabled: bool) -> Self {
let state = TelemetryState {
path,
snapshot: UplinkTelemetrySnapshot {
updated_at_unix_ms: unix_time_ms(),
camera: UpstreamStreamTelemetry {
enabled: camera_enabled,
..UpstreamStreamTelemetry::default()
},
microphone: UpstreamStreamTelemetry {
enabled: microphone_enabled,
..UpstreamStreamTelemetry::default()
},
},
last_flush: Instant::now() - FLUSH_INTERVAL,
};
let publisher = Self {
inner: Arc::new(Mutex::new(state)),
};
publisher.flush_now();
publisher
}
/// Returns a per-stream handle for camera or microphone updates.
#[must_use]
pub fn handle(&self, kind: UpstreamStreamKind) -> UplinkTelemetryHandle {
UplinkTelemetryHandle {
kind,
inner: Arc::clone(&self.inner),
}
}
/// Forces an immediate write of the current snapshot.
pub fn flush_now(&self) {
if let Ok(mut state) = self.inner.lock() {
write_snapshot(&mut state, true);
}
}
}
impl UplinkTelemetryHandle {
/// Updates whether this feed is intentionally active in the current relay session.
pub fn record_enabled(&self, enabled: bool) {
self.update(true, |stream| {
stream.enabled = enabled;
if !enabled {
stream.connected = false;
stream.queue_depth = 0;
stream.latest_enqueue_age_ms = 0.0;
stream.latest_delivery_age_ms = 0.0;
stream.last_error.clear();
}
});
}
/// Records a fresh gRPC connection attempt for this stream.
pub fn record_reconnect_attempt(&self) {
self.update(false, |stream| {
stream.connected = false;
stream.queue_depth = 0;
stream.reconnect_count = stream.reconnect_count.saturating_add(1);
stream.last_error.clear();
});
}
/// Marks the stream as successfully connected to the relay server.
pub fn record_connected(&self) {
self.update(true, |stream| {
stream.connected = true;
stream.last_error.clear();
});
}
/// Marks the stream as disconnected and stores the latest error detail.
pub fn record_disconnect(&self, error: impl AsRef<str>) {
let error = error.as_ref().trim().to_string();
self.update(true, |stream| {
stream.connected = false;
stream.queue_depth = 0;
stream.latest_enqueue_age_ms = 0.0;
stream.latest_delivery_age_ms = 0.0;
if !error.is_empty() {
stream.last_error = error.clone();
}
});
}
/// Records one packet entering the async uplink queue.
pub fn record_enqueue(&self, queue_depth: u32, enqueue_age_ms: f32, enqueue_block_ms: f32) {
self.update(false, |stream| {
stream.queue_depth = queue_depth;
stream.queue_peak = stream.queue_peak.max(queue_depth);
stream.latest_enqueue_age_ms = enqueue_age_ms.max(0.0);
stream.enqueue_age_peak_ms = stream.enqueue_age_peak_ms.max(enqueue_age_ms.max(0.0));
stream.enqueue_block_peak_ms =
stream.enqueue_block_peak_ms.max(enqueue_block_ms.max(0.0));
stream.packets_enqueued = stream.packets_enqueued.saturating_add(1);
stream.last_error.clear();
});
}
/// Records packets dropped because the bounded queue was already full.
pub fn record_queue_full_drop(&self, count: u64) {
self.update(false, |stream| {
stream.dropped_packets = stream.dropped_packets.saturating_add(count);
stream.dropped_queue_full_packets =
stream.dropped_queue_full_packets.saturating_add(count);
});
}
/// Records packets dropped because they exceeded the live-call age budget.
pub fn record_stale_drop(&self, count: u64) {
self.update(false, |stream| {
stream.dropped_packets = stream.dropped_packets.saturating_add(count);
stream.dropped_stale_packets = stream.dropped_stale_packets.saturating_add(count);
});
}
/// Records one packet emitted from the bounded queue into the gRPC stream.
pub fn record_streamed(&self, queue_depth: u32, delivery_age_ms: f32) {
self.update(false, |stream| {
stream.connected = true;
stream.queue_depth = queue_depth;
stream.latest_delivery_age_ms = delivery_age_ms.max(0.0);
stream.delivery_age_peak_ms = stream.delivery_age_peak_ms.max(delivery_age_ms.max(0.0));
stream.packets_streamed = stream.packets_streamed.saturating_add(1);
stream.last_error.clear();
});
}
fn update(&self, force_flush: bool, update_stream: impl FnOnce(&mut UpstreamStreamTelemetry)) {
if let Ok(mut state) = self.inner.lock() {
state.snapshot.updated_at_unix_ms = unix_time_ms();
update_stream(stream_for_kind(&mut state.snapshot, self.kind));
write_snapshot(&mut state, force_flush);
}
}
}
/// Loads the latest upstream telemetry snapshot written by the relay child.
pub fn load_uplink_telemetry(path: &Path) -> Option<UplinkTelemetrySnapshot> {
let raw = fs::read_to_string(path).ok()?;
serde_json::from_str(&raw).ok()
}
fn stream_for_kind(
snapshot: &mut UplinkTelemetrySnapshot,
kind: UpstreamStreamKind,
) -> &mut UpstreamStreamTelemetry {
match kind {
UpstreamStreamKind::Camera => &mut snapshot.camera,
UpstreamStreamKind::Microphone => &mut snapshot.microphone,
}
}
fn write_snapshot(state: &mut TelemetryState, force_flush: bool) {
if !force_flush && state.last_flush.elapsed() < FLUSH_INTERVAL {
return;
}
if let Ok(rendered) = serde_json::to_string(&state.snapshot) {
let _ = fs::write(&state.path, rendered);
state.last_flush = Instant::now();
}
}
fn unix_time_ms() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_millis())
.unwrap_or_default()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn handle_updates_keep_queue_peaks_and_errors() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let path = temp_dir.path().join("uplink.json");
let publisher = UplinkTelemetryPublisher::new(path.clone(), true, false);
let camera = publisher.handle(UpstreamStreamKind::Camera);
camera.record_reconnect_attempt();
camera.record_connected();
camera.record_enqueue(3, 41.0, 6.0);
camera.record_enqueue(1, 12.0, 2.0);
camera.record_streamed(0, 55.0);
camera.record_queue_full_drop(2);
camera.record_stale_drop(3);
camera.record_disconnect("stream ended");
publisher.flush_now();
let snapshot = load_uplink_telemetry(&path).expect("load snapshot");
assert!(snapshot.camera.enabled);
assert!(!snapshot.microphone.enabled);
assert!(!snapshot.camera.connected);
assert_eq!(snapshot.camera.reconnect_count, 1);
assert_eq!(snapshot.camera.queue_peak, 3);
assert_eq!(snapshot.camera.latest_enqueue_age_ms, 0.0);
assert_eq!(snapshot.camera.enqueue_age_peak_ms, 41.0);
assert_eq!(snapshot.camera.enqueue_block_peak_ms, 6.0);
assert_eq!(snapshot.camera.packets_enqueued, 2);
assert_eq!(snapshot.camera.packets_streamed, 1);
assert_eq!(snapshot.camera.dropped_packets, 5);
assert_eq!(snapshot.camera.dropped_queue_full_packets, 2);
assert_eq!(snapshot.camera.dropped_stale_packets, 3);
assert_eq!(snapshot.camera.delivery_age_peak_ms, 55.0);
assert_eq!(snapshot.camera.last_error, "stream ended");
}
/// Soft-paused streams should report disabled without looking stale or crashed.
#[test]
fn handle_enabled_updates_expose_live_soft_pause_state() {
let temp_dir = tempfile::tempdir().expect("tempdir");
let path = temp_dir.path().join("uplink.json");
let publisher = UplinkTelemetryPublisher::new(path.clone(), true, true);
let microphone = publisher.handle(UpstreamStreamKind::Microphone);
microphone.record_connected();
microphone.record_enqueue(4, 12.0, 0.0);
microphone.record_enabled(false);
publisher.flush_now();
let snapshot = load_uplink_telemetry(&path).expect("load snapshot");
assert!(!snapshot.microphone.enabled);
assert!(!snapshot.microphone.connected);
assert_eq!(snapshot.microphone.queue_depth, 0);
assert_eq!(snapshot.microphone.latest_enqueue_age_ms, 0.0);
assert!(snapshot.microphone.last_error.is_empty());
}
}