lesavka: add runtime pressure diagnostics

This commit is contained in:
Brad Stein 2026-04-17 06:14:54 -03:00
parent 8f04300aba
commit f10e8a00ae
15 changed files with 333 additions and 20 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.10.0"
version = "0.11.0"
edition = "2024"
[dependencies]

View File

@ -10,6 +10,8 @@ pub struct PerformanceSample {
pub probe_spread_ms: f32,
pub input_latency_ms: f32,
pub probe_loss_pct: f32,
pub client_process_cpu_pct: f32,
pub server_process_cpu_pct: f32,
pub video_loss_pct: f32,
pub left_receive_fps: f32,
pub left_present_fps: f32,
@ -22,6 +24,7 @@ pub struct PerformanceSample {
pub left_server_source_gap_peak_ms: f32,
pub left_server_send_gap_peak_ms: f32,
pub left_server_queue_peak: u32,
pub left_server_encoder_label: String,
pub left_decoder_label: String,
pub right_receive_fps: f32,
pub right_present_fps: f32,
@ -34,6 +37,7 @@ pub struct PerformanceSample {
pub right_server_source_gap_peak_ms: f32,
pub right_server_send_gap_peak_ms: f32,
pub right_server_queue_peak: u32,
pub right_server_encoder_label: String,
pub right_decoder_label: String,
pub dropped_frames: u64,
pub queue_depth: u32,
@ -87,6 +91,8 @@ pub struct SnapshotReport {
pub view_mode: ViewMode,
pub remote_active: bool,
pub power_state: String,
pub client_process_cpu_pct: f32,
pub server_process_cpu_pct: f32,
pub preview_source: String,
pub client_display_limit: String,
pub left_surface: String,
@ -102,6 +108,7 @@ pub struct SnapshotReport {
pub left_server_source_gap_peak_ms: f32,
pub left_server_send_gap_peak_ms: f32,
pub left_server_queue_peak: u32,
pub left_server_encoder_label: String,
pub right_surface: String,
pub right_capture_profile: String,
pub right_capture_transport: String,
@ -115,6 +122,7 @@ pub struct SnapshotReport {
pub right_server_source_gap_peak_ms: f32,
pub right_server_send_gap_peak_ms: f32,
pub right_server_queue_peak: u32,
pub right_server_encoder_label: String,
pub selected_camera: Option<String>,
pub selected_microphone: Option<String>,
pub selected_speaker: Option<String>,
@ -147,6 +155,12 @@ impl SnapshotReport {
state.capture_power.detail,
state.capture_power.active_leases
),
client_process_cpu_pct: latest
.map(|sample| sample.client_process_cpu_pct)
.unwrap_or(0.0),
server_process_cpu_pct: latest
.map(|sample| sample.server_process_cpu_pct)
.unwrap_or(0.0),
preview_source: format!(
"{}x{} @ {} fps",
state.preview_source.width, state.preview_source.height, state.preview_source.fps
@ -200,6 +214,15 @@ impl SnapshotReport {
left_server_queue_peak: latest
.map(|sample| sample.left_server_queue_peak)
.unwrap_or(0),
left_server_encoder_label: latest
.map(|sample| {
if sample.left_server_encoder_label.is_empty() {
"pending".to_string()
} else {
sample.left_server_encoder_label.clone()
}
})
.unwrap_or_else(|| "pending".to_string()),
right_surface: state.display_surface(1).label().to_string(),
right_capture_profile: format!(
"{} | {}x{} | {} fps | {} kbit",
@ -245,6 +268,15 @@ impl SnapshotReport {
right_server_queue_peak: latest
.map(|sample| sample.right_server_queue_peak)
.unwrap_or(0),
right_server_encoder_label: latest
.map(|sample| {
if sample.right_server_encoder_label.is_empty() {
"pending".to_string()
} else {
sample.right_server_encoder_label.clone()
}
})
.unwrap_or_else(|| "pending".to_string()),
selected_camera: state.devices.camera.clone(),
selected_microphone: state.devices.microphone.clone(),
selected_speaker: state.devices.speaker.clone(),
@ -278,6 +310,11 @@ impl SnapshotReport {
"session: routing={:?} view={:?} relay_active={} power={}",
self.routing, self.view_mode, self.remote_active, self.power_state
);
let _ = writeln!(
text,
"runtime pressure: client={:.1}% server={:.1}%",
self.client_process_cpu_pct, self.server_process_cpu_pct
);
let _ = writeln!(text, "source feed: {}", self.preview_source);
let _ = writeln!(text, "client display limit: {}", self.client_display_limit);
let _ = writeln!(text);
@ -298,7 +335,9 @@ impl SnapshotReport {
);
let _ = writeln!(
text,
" server: gaps={:.0}/{:.0}ms queue-peak={}",
" server: encoder={} cpu={:.1}% gaps={:.0}/{:.0}ms queue-peak={}",
self.left_server_encoder_label,
self.server_process_cpu_pct,
self.left_server_source_gap_peak_ms,
self.left_server_send_gap_peak_ms,
self.left_server_queue_peak
@ -320,7 +359,9 @@ impl SnapshotReport {
);
let _ = writeln!(
text,
" server: gaps={:.0}/{:.0}ms queue-peak={}",
" server: encoder={} cpu={:.1}% gaps={:.0}/{:.0}ms queue-peak={}",
self.right_server_encoder_label,
self.server_process_cpu_pct,
self.right_server_source_gap_peak_ms,
self.right_server_send_gap_peak_ms,
self.right_server_queue_peak
@ -366,10 +407,12 @@ impl SnapshotReport {
for sample in &self.recent_samples {
let _ = writeln!(
text,
" rtt={:.1}ms probe-spread={:.1}ms input-floor={:.1}ms probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}/{} peaks=l{:.0}/{:.0}ms r{:.0}/{:.0}ms server=l{:.0}/{:.0}/{} r{:.0}/{:.0}/{}",
" rtt={:.1}ms probe-spread={:.1}ms input-floor={:.1}ms cpu={:.1}/{:.1}% probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}/{} peaks=l{:.0}/{:.0}ms r{:.0}/{:.0}ms server=l{}:{:.0}/{:.0}/{} r{}:{:.0}/{:.0}/{}",
sample.rtt_ms,
sample.probe_spread_ms,
sample.input_latency_ms,
sample.client_process_cpu_pct,
sample.server_process_cpu_pct,
sample.probe_loss_pct,
sample.video_loss_pct,
sample.left_receive_fps,
@ -385,9 +428,11 @@ impl SnapshotReport {
sample.left_present_gap_peak_ms,
sample.right_packet_gap_peak_ms,
sample.right_present_gap_peak_ms,
sample.left_server_encoder_label,
sample.left_server_source_gap_peak_ms,
sample.left_server_send_gap_peak_ms,
sample.left_server_queue_peak,
sample.right_server_encoder_label,
sample.right_server_source_gap_peak_ms,
sample.right_server_send_gap_peak_ms,
sample.right_server_queue_peak
@ -494,6 +539,18 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec<Strin
.to_string(),
);
}
if sample.client_process_cpu_pct >= 85.0 {
items.push(
"Client process CPU is high. If motion still looks rough, favor lighter breakout layouts or a hardware decoder before adding more bitrate."
.to_string(),
);
}
if sample.server_process_cpu_pct >= 85.0 {
items.push(
"Server process CPU is high. That makes re-encode stalls more likely, so compare Source against lighter re-encode profiles before assuming the WAN is the bottleneck."
.to_string(),
);
}
}
let heavy_capture = state.capture_sizes.iter().any(|preset| {
matches!(
@ -551,6 +608,16 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec<Strin
.to_string(),
);
}
if let Some(sample) = log.latest()
&& sample.server_process_cpu_pct >= 70.0
&& (sample.left_server_encoder_label.contains("x264")
|| sample.right_server_encoder_label.contains("x264"))
{
items.push(
"The server is leaning on `x264enc` while process CPU is already elevated. That makes `Source` or lighter re-encode settings more attractive than pushing the bitrate ceiling upward."
.to_string(),
);
}
if state.breakout_count() == 2 {
items.push(
"Both eye feeds are broken out right now. If the client starts struggling, compare in-launcher preview smoothness against full-window decode."
@ -574,6 +641,8 @@ mod tests {
probe_spread_ms: 3.0 + n as f32,
input_latency_ms: 10.0 + n as f32,
probe_loss_pct: n as f32,
client_process_cpu_pct: 12.5 + n as f32,
server_process_cpu_pct: 22.5 + n as f32,
video_loss_pct: (n as f32) * 0.5,
left_receive_fps: 30.0,
left_present_fps: 29.0,
@ -586,6 +655,7 @@ mod tests {
left_server_source_gap_peak_ms: 42.0,
left_server_send_gap_peak_ms: 48.0,
left_server_queue_peak: n as u32 + 1,
left_server_encoder_label: "x264enc".to_string(),
left_decoder_label: "decodebin".to_string(),
right_receive_fps: 30.0,
right_present_fps: 28.0,
@ -598,6 +668,7 @@ mod tests {
right_server_source_gap_peak_ms: 51.0,
right_server_send_gap_peak_ms: 58.0,
right_server_queue_peak: n as u32 + 1,
right_server_encoder_label: "source-pass-through".to_string(),
right_decoder_label: "decodebin".to_string(),
dropped_frames: n,
queue_depth: n as u32,
@ -657,7 +728,7 @@ mod tests {
assert!(report.status.contains("mode=remote"));
assert!(report.client_version.starts_with("0."));
assert!(report.left_capture_profile.contains("fps"));
assert_eq!(report.left_capture_transport, "source pass-through");
assert_eq!(report.left_capture_transport, "server re-encode");
assert_eq!(report.left_decoder_label, "decodebin");
}

View File

@ -1,4 +1,6 @@
#[cfg(not(coverage))]
use crate::video_support::pick_h264_decoder;
#[cfg(not(coverage))]
use anyhow::{Context, Result};
#[cfg(not(coverage))]
use gstreamer as gst;
@ -67,6 +69,7 @@ pub struct PreviewMetricsSnapshot {
pub receive_fps: f32,
pub present_fps: f32,
pub server_fps: f32,
pub server_process_cpu_pct: f32,
pub stream_spread_ms: f32,
pub packet_loss_pct: f32,
pub dropped_frames: u64,
@ -77,6 +80,7 @@ pub struct PreviewMetricsSnapshot {
pub server_source_gap_peak_ms: f32,
pub server_send_gap_peak_ms: f32,
pub server_queue_peak: u32,
pub server_encoder_label: String,
pub decoder_label: String,
}
@ -446,10 +450,12 @@ struct PreviewTelemetry {
last_seq: Option<u64>,
last_dropped_total: Option<u64>,
latest_server_fps: u32,
latest_server_process_cpu_tenths: u32,
latest_queue_depth: u32,
latest_server_source_gap_peak_ms: u32,
latest_server_send_gap_peak_ms: u32,
latest_server_queue_peak: u32,
latest_server_encoder_label: String,
decoder_label: String,
}
@ -464,6 +470,8 @@ impl PreviewTelemetry {
server_source_gap_peak_ms: u32,
server_send_gap_peak_ms: u32,
server_queue_peak: u32,
server_encoder_label: &str,
server_process_cpu_tenths: u32,
) {
self.record_packet_at(
Instant::now(),
@ -474,6 +482,8 @@ impl PreviewTelemetry {
server_source_gap_peak_ms,
server_send_gap_peak_ms,
server_queue_peak,
server_encoder_label,
server_process_cpu_tenths,
);
}
@ -487,6 +497,8 @@ impl PreviewTelemetry {
server_source_gap_peak_ms: u32,
server_send_gap_peak_ms: u32,
server_queue_peak: u32,
server_encoder_label: &str,
server_process_cpu_tenths: u32,
) {
self.trim(now);
self.packet_times.push_back(now);
@ -513,10 +525,14 @@ impl PreviewTelemetry {
}
self.last_dropped_total = Some(dropped_total);
self.latest_server_fps = server_fps.max(1);
self.latest_server_process_cpu_tenths = server_process_cpu_tenths;
self.latest_queue_depth = queue_depth;
self.latest_server_source_gap_peak_ms = server_source_gap_peak_ms;
self.latest_server_send_gap_peak_ms = server_send_gap_peak_ms;
self.latest_server_queue_peak = server_queue_peak.max(queue_depth);
if !server_encoder_label.is_empty() {
self.latest_server_encoder_label = server_encoder_label.to_string();
}
self.queue_depth_samples.push_back((now, queue_depth));
self.trim(now);
}
@ -572,6 +588,7 @@ impl PreviewTelemetry {
receive_fps,
present_fps,
server_fps: self.latest_server_fps as f32,
server_process_cpu_pct: self.latest_server_process_cpu_tenths as f32 / 10.0,
stream_spread_ms: compute_jitter_ms(&self.packet_intervals_ms),
packet_loss_pct,
dropped_frames,
@ -582,6 +599,7 @@ impl PreviewTelemetry {
server_source_gap_peak_ms: self.latest_server_source_gap_peak_ms as f32,
server_send_gap_peak_ms: self.latest_server_send_gap_peak_ms as f32,
server_queue_peak: self.latest_server_queue_peak,
server_encoder_label: self.latest_server_encoder_label.clone(),
decoder_label: self.decoder_label.clone(),
}
}
@ -752,7 +770,10 @@ fn run_preview_feed(
shared: Arc<Mutex<SharedPreviewState>>,
log_sink: Arc<Mutex<Option<std::sync::mpsc::Sender<String>>>>,
) -> Result<()> {
let (pipeline, appsrc, appsink) = build_preview_pipeline(profile)?;
let (pipeline, appsrc, appsink, decoder_name) = build_preview_pipeline(profile)?;
if let Ok(mut slot) = shared.lock() {
slot.telemetry.note_decoder(&decoder_name);
}
{
let shared = Arc::clone(&shared);
pipeline.connect_deep_element_added(move |_, _, element| {
@ -1159,11 +1180,12 @@ fn looks_like_preview_problem(status: &str) -> bool {
#[cfg(not(coverage))]
fn build_preview_pipeline(
profile: PreviewProfile,
) -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink)> {
) -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink, String)> {
let decoder_name = pick_h264_decoder();
let desc = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=6 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
h264parse disable-passthrough=true ! decodebin name=decoder ! videoconvert ! videoscale ! \
h264parse disable-passthrough=true ! {decoder_name} name=decoder ! videoconvert ! videoscale ! \
video/x-raw,format=RGBA,width={},height={},pixel-aspect-ratio=1/1 ! \
appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true",
profile.display_width, profile.display_height
@ -1198,7 +1220,7 @@ fn build_preview_pipeline(
.build(),
));
Ok((pipeline, appsrc, appsink))
Ok((pipeline, appsrc, appsink, decoder_name))
}
#[cfg(not(coverage))]
@ -1221,6 +1243,8 @@ fn record_preview_packet(shared: &Arc<Mutex<SharedPreviewState>>, pkt: &VideoPac
pkt.server_source_gap_peak_ms,
pkt.server_send_gap_peak_ms,
pkt.server_queue_peak,
&pkt.server_encoder_label,
pkt.server_process_cpu_tenths,
);
}
}
@ -1357,11 +1381,33 @@ mod tests {
let mut telemetry = PreviewTelemetry::default();
let start = Instant::now();
telemetry.note_decoder("nvh264dec");
telemetry.record_packet_at(start, 1, 30, 0, 1, 41, 38, 2);
telemetry.record_packet_at(start, 1, 30, 0, 1, 41, 38, 2, "x264enc", 215);
telemetry.record_presented_frame_at(start + Duration::from_millis(5));
telemetry.record_packet_at(start + Duration::from_millis(33), 2, 30, 0, 1, 41, 38, 2);
telemetry.record_packet_at(
start + Duration::from_millis(33),
2,
30,
0,
1,
41,
38,
2,
"x264enc",
215,
);
telemetry.record_presented_frame_at(start + Duration::from_millis(37));
telemetry.record_packet_at(start + Duration::from_millis(80), 4, 27, 2, 3, 77, 88, 4);
telemetry.record_packet_at(
start + Duration::from_millis(80),
4,
27,
2,
3,
77,
88,
4,
"x264enc",
382,
);
telemetry.record_presented_frame_at(start + Duration::from_millis(90));
let snapshot = telemetry.snapshot_at(start + Duration::from_millis(120));
@ -1378,6 +1424,8 @@ mod tests {
assert_eq!(snapshot.server_source_gap_peak_ms, 77.0);
assert_eq!(snapshot.server_send_gap_peak_ms, 88.0);
assert_eq!(snapshot.server_queue_peak, 4);
assert_eq!(snapshot.server_process_cpu_pct, 38.2);
assert_eq!(snapshot.server_encoder_label, "x264enc");
assert_eq!(snapshot.decoder_label, "nvh264dec");
}
}

View File

@ -269,7 +269,7 @@ impl Default for LauncherState {
preview_source: PreviewSourceSize::default(),
breakout_limit: PreviewSourceSize::default(),
breakout_display: PreviewSourceSize::default(),
capture_sizes: [CaptureSizePreset::Source, CaptureSizePreset::Source],
capture_sizes: [CaptureSizePreset::P1080, CaptureSizePreset::P1080],
capture_fps: [30, 30],
capture_bitrates_kbit: [12_000, 12_000],
breakout_sizes: [BreakoutSizePreset::Source, BreakoutSizePreset::Source],
@ -944,7 +944,7 @@ mod tests {
assert_eq!(state.display_surface(1), DisplaySurface::Preview);
assert_eq!(state.preview_source_size(), PreviewSourceSize::default());
assert_eq!(state.breakout_limit_size(), PreviewSourceSize::default());
assert_eq!(state.capture_size_preset(0), CaptureSizePreset::Source);
assert_eq!(state.capture_size_preset(0), CaptureSizePreset::P1080);
assert_eq!(state.breakout_size_preset(0), BreakoutSizePreset::Source);
assert!(!state.server_available);
assert!(!state.remote_active);
@ -1184,6 +1184,7 @@ mod tests {
fn source_capture_profile_uses_source_fps_and_scaled_profiles_cap_it() {
let mut state = LauncherState::new();
state.set_preview_source_profile(1920, 1080, 60);
state.set_capture_size_preset(0, CaptureSizePreset::Source);
let source = state.capture_size_choice(0);
assert_eq!(source.width, 1920);
assert_eq!(source.height, 1080);
@ -1228,6 +1229,7 @@ mod tests {
fn source_capture_ignores_manual_fps_and_bitrate_knobs() {
let mut state = LauncherState::new();
state.set_preview_source_profile(1920, 1080, 25);
state.set_capture_size_preset(0, CaptureSizePreset::Source);
state.set_capture_fps(0, 60);
state.set_capture_bitrate_kbit(0, 24_000);

View File

@ -29,6 +29,7 @@ use {
gtk::glib,
gtk::prelude::*,
lesavka_common::lesavka::CapturePowerCommand,
lesavka_common::process_metrics::ProcessCpuSampler,
std::cell::{Cell, RefCell},
std::collections::VecDeque,
std::process::Command,
@ -249,6 +250,7 @@ fn record_diagnostics_sample(
state: &LauncherState,
preview: Option<&super::preview::LauncherPreview>,
network: NetworkSnapshot,
client_process_cpu_pct: f32,
) {
let left_metrics = preview
.and_then(|preview| {
@ -281,6 +283,10 @@ fn record_diagnostics_sample(
probe_spread_ms: network.probe_spread_ms,
input_latency_ms: network.rtt_ms * 0.5,
probe_loss_pct: network.probe_loss_pct,
client_process_cpu_pct,
server_process_cpu_pct: left_metrics
.server_process_cpu_pct
.max(right_metrics.server_process_cpu_pct),
video_loss_pct: left_metrics
.packet_loss_pct
.max(right_metrics.packet_loss_pct),
@ -295,6 +301,7 @@ fn record_diagnostics_sample(
left_server_source_gap_peak_ms: left_metrics.server_source_gap_peak_ms,
left_server_send_gap_peak_ms: left_metrics.server_send_gap_peak_ms,
left_server_queue_peak: left_metrics.server_queue_peak,
left_server_encoder_label: left_metrics.server_encoder_label.clone(),
left_decoder_label: left_metrics.decoder_label.clone(),
right_receive_fps: right_metrics.receive_fps,
right_present_fps: right_metrics.present_fps,
@ -307,6 +314,7 @@ fn record_diagnostics_sample(
right_server_source_gap_peak_ms: right_metrics.server_source_gap_peak_ms,
right_server_send_gap_peak_ms: right_metrics.server_send_gap_peak_ms,
right_server_queue_peak: right_metrics.server_queue_peak,
right_server_encoder_label: right_metrics.server_encoder_label.clone(),
right_decoder_label: right_metrics.decoder_label.clone(),
dropped_frames: left_metrics
.dropped_frames
@ -622,6 +630,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let (caps_tx, caps_rx) = std::sync::mpsc::channel::<CapsMessage>();
let caps_request_in_flight = Rc::new(Cell::new(false));
let diagnostics_network = Rc::new(RefCell::new(NetworkTelemetry::default()));
let diagnostics_process = Rc::new(RefCell::new(ProcessCpuSampler::new()));
let next_diagnostics_probe =
Rc::new(Cell::new(Instant::now() + Duration::from_millis(250)));
let next_diagnostics_sample =
@ -1551,6 +1560,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
let caps_tx = caps_tx.clone();
let caps_request_in_flight = Rc::clone(&caps_request_in_flight);
let diagnostics_network = Rc::clone(&diagnostics_network);
let diagnostics_process = Rc::clone(&diagnostics_process);
let next_diagnostics_probe = Rc::clone(&next_diagnostics_probe);
let next_diagnostics_sample = Rc::clone(&next_diagnostics_sample);
let log_tx = log_tx.clone();
@ -1843,11 +1853,16 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
if now >= next_diagnostics_sample.get() {
let network = diagnostics_network.borrow_mut().snapshot();
let client_process_cpu_pct = diagnostics_process
.borrow_mut()
.sample_percent()
.unwrap_or(0.0);
record_diagnostics_sample(
&widgets,
&state.borrow(),
preview.as_ref().map(|preview| preview.as_ref()),
network,
client_process_cpu_pct,
);
next_diagnostics_sample.set(now + Duration::from_secs(1));
}

View File

@ -14,5 +14,6 @@ pub mod launcher;
pub mod layout;
pub mod output;
pub mod paste;
pub(crate) mod video_support;
pub use app::LesavkaClientApp;

View File

@ -9,6 +9,36 @@ use gstreamer_video::prelude::VideoOverlayExt;
use lesavka_common::lesavka::VideoPacket;
use std::process::Command;
use tracing::{debug, error, info, warn};
fn pick_h264_decoder() -> String {
if let Ok(raw) = std::env::var("LESAVKA_H264_DECODER") {
let name = raw.trim();
if name.eq_ignore_ascii_case("decodebin") {
return "decodebin".to_string();
}
if !name.is_empty() && gst::ElementFactory::find(name).is_some() {
return name.to_string();
}
}
for name in [
"nvh264dec",
"nvh264sldec",
"vah264dec",
"vaapih264dec",
"v4l2h264dec",
"v4l2slh264dec",
"openh264dec",
"avdec_h264",
] {
if gst::ElementFactory::find(name).is_some() {
return name.to_string();
}
}
"decodebin".to_string()
}
pub struct MonitorWindow {
_pipeline: gst::Pipeline,
src: gst_app::AppSrc,
@ -124,6 +154,7 @@ impl MonitorWindow {
gst::init().context("initialising GStreamer")?;
// --- Build pipeline ---------------------------------------------------
let decoder_name = pick_h264_decoder();
let sink = if std::env::var("GDK_BACKEND")
.map(|v| v.contains("x11"))
.unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some())
@ -137,7 +168,7 @@ impl MonitorWindow {
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! decodebin ! videoconvert ! {sink}"
h264parse disable-passthrough=true ! {decoder_name} name=decoder ! videoconvert ! {sink}"
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
@ -279,6 +310,7 @@ impl MonitorWindow {
StateChanged(s) if s.current() == gst::State::Playing => {
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🎞️ video{id} pipeline ▶️ (sink='glimagesink')");
info!("🎞️ video{id} decoder → {decoder_name}");
}
}
Error(e) => error!(
@ -390,6 +422,7 @@ impl UnifiedMonitorWindow {
pub fn new() -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
let decoder_name = pick_h264_decoder();
let sink = if std::env::var("GDK_BACKEND")
.map(|v| v.contains("x11"))
.unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some())
@ -404,11 +437,11 @@ impl UnifiedMonitorWindow {
appsrc name=src0 is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! mix. \
h264parse disable-passthrough=true ! {decoder_name} name=decoder0 ! videoconvert ! videoscale ! mix. \
appsrc name=src1 is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! mix."
h264parse disable-passthrough=true ! {decoder_name} name=decoder1 ! videoconvert ! videoscale ! mix."
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
@ -487,6 +520,7 @@ impl UnifiedMonitorWindow {
StateChanged(s) if s.current() == gst::State::Playing => {
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🎞️ unified video pipeline ▶️");
info!("🎞️ unified decoder → {decoder_name}");
}
}
Error(e) => error!(

View File

@ -0,0 +1,41 @@
#![forbid(unsafe_code)]
use gstreamer as gst;
/// Pick the client-side H.264 decoder in a predictable preference order.
///
/// Inputs: none, though operators may override the choice with
/// `LESAVKA_H264_DECODER=<element>`.
/// Outputs: the chosen decoder element name, or `decodebin` as a last-resort
/// fallback when no explicit decoder is present.
/// Why: `decodebin` is flexible, but a stable preference order makes decode
/// behavior easier to reason about and compare in diagnostics.
#[must_use]
pub fn pick_h264_decoder() -> String {
if let Ok(raw) = std::env::var("LESAVKA_H264_DECODER") {
let name = raw.trim();
if name.eq_ignore_ascii_case("decodebin") {
return "decodebin".to_string();
}
if !name.is_empty() && gst::ElementFactory::find(name).is_some() {
return name.to_string();
}
}
for name in [
"nvh264dec",
"nvh264sldec",
"vah264dec",
"vaapih264dec",
"v4l2h264dec",
"v4l2slh264dec",
"openh264dec",
"avdec_h264",
] {
if gst::ElementFactory::find(name).is_some() {
return name.to_string();
}
}
"decodebin".to_string()
}

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.10.0"
version = "0.11.0"
edition = "2024"
build = "build.rs"

View File

@ -23,6 +23,8 @@ message VideoPacket {
uint32 server_source_gap_peak_ms = 8;
uint32 server_send_gap_peak_ms = 9;
uint32 server_queue_peak = 10;
string server_encoder_label = 11;
uint32 server_process_cpu_tenths = 12;
}
message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; }

View File

@ -17,6 +17,6 @@ mod tests {
#[test]
fn banner_includes_version() {
assert_eq!(banner("0.10.0"), "lesavka-common CLI (v0.10.0)");
assert_eq!(banner("0.11.0"), "lesavka-common CLI (v0.11.0)");
}
}

View File

@ -5,6 +5,7 @@
pub mod cli;
pub mod hid;
pub mod paste;
pub mod process_metrics;
#[allow(warnings)]
pub mod lesavka {

View File

@ -0,0 +1,66 @@
#![forbid(unsafe_code)]
use std::time::Instant;
/// Sample per-process CPU pressure from Linux procfs.
///
/// Inputs: none when constructed; call `sample_percent()` over time.
/// Outputs: percentage of one CPU core used by the current process over the
/// elapsed sampling window.
/// Why: this gives the launcher and server lightweight pressure telemetry
/// without adding a heavyweight system-information dependency.
#[derive(Debug, Clone, Default)]
pub struct ProcessCpuSampler {
last: Option<(Instant, u64)>,
}
impl ProcessCpuSampler {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn sample_percent(&mut self) -> Option<f32> {
let now = Instant::now();
let runtime_ns = read_process_runtime_ns()?;
let previous = self.last.replace((now, runtime_ns))?;
let elapsed_ns = now
.saturating_duration_since(previous.0)
.as_nanos()
.min(u128::from(u64::MAX)) as u64;
if elapsed_ns == 0 || runtime_ns < previous.1 {
return None;
}
Some(runtime_ns.saturating_sub(previous.1) as f32 * 100.0 / elapsed_ns as f32)
}
pub fn sample_tenths_percent(&mut self) -> Option<u32> {
self.sample_percent()
.map(|pct| (pct * 10.0).clamp(0.0, u32::MAX as f32) as u32)
}
}
fn read_process_runtime_ns() -> Option<u64> {
let text = std::fs::read_to_string("/proc/self/schedstat").ok()?;
text.split_whitespace().next()?.parse::<u64>().ok()
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn schedstat_runtime_reads() {
assert!(read_process_runtime_ns().is_some());
}
#[test]
fn sampler_returns_percentage_after_two_samples() {
let mut sampler = ProcessCpuSampler::new();
assert!(sampler.sample_percent().is_none());
thread::sleep(Duration::from_millis(10));
let _ = sampler.sample_percent();
}
}

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.10.0"
version = "0.11.0"
edition = "2024"
autobins = false

View File

@ -7,6 +7,7 @@ use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::VideoPacket;
use lesavka_common::process_metrics::ProcessCpuSampler;
use std::os::unix::fs::FileTypeExt;
use std::sync::Arc;
use std::sync::OnceLock;
@ -23,6 +24,24 @@ use crate::video_support::{
const EYE_ID: [&str; 2] = ["l", "r"];
static START: OnceLock<gst::ClockTime> = OnceLock::new();
static SERVER_PROCESS_CPU_TENTHS: OnceLock<Arc<AtomicU32>> = OnceLock::new();
fn server_process_cpu_metric() -> Arc<AtomicU32> {
Arc::clone(SERVER_PROCESS_CPU_TENTHS.get_or_init(|| {
let metric = Arc::new(AtomicU32::new(0));
let metric_for_thread = Arc::clone(&metric);
std::thread::spawn(move || {
let mut sampler = ProcessCpuSampler::new();
loop {
if let Some(value) = sampler.sample_tenths_percent() {
metric_for_thread.store(value, Ordering::Relaxed);
}
std::thread::sleep(std::time::Duration::from_secs(1));
}
});
metric
}))
}
pub struct VideoStream {
_pipeline: gst::Pipeline,
@ -424,6 +443,14 @@ pub async fn eye_ball_with_request(
.clamp(1, request.requested_fps.max(1));
let use_test_src =
dev.eq_ignore_ascii_case("testsrc") || dev.eq_ignore_ascii_case("videotestsrc");
let server_encoder_label = if use_test_src {
"x264enc(testsrc)".to_string()
} else if request.downscale {
"x264enc".to_string()
} else {
"source-pass-through".to_string()
};
let server_process_cpu_tenths = server_process_cpu_metric();
if !use_test_src {
wait_for_eye_device(dev, eye).await?;
}
@ -503,6 +530,8 @@ pub async fn eye_ball_with_request(
let send_gap_peak_ms_for_cb = Arc::clone(&send_gap_peak_ms);
let queue_peak_depth_for_cb = Arc::clone(&queue_peak_depth);
let last_telemetry_sec_for_cb = Arc::clone(&last_telemetry_sec);
let server_encoder_label_for_cb = server_encoder_label.clone();
let server_process_cpu_tenths_for_cb = Arc::clone(&server_process_cpu_tenths);
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |sink| {
@ -619,6 +648,9 @@ pub async fn eye_ball_with_request(
server_source_gap_peak_ms: source_gap_peak_ms_for_cb.load(Ordering::Relaxed),
server_send_gap_peak_ms: send_gap_peak_ms_for_cb.load(Ordering::Relaxed),
server_queue_peak: queue_peak_depth_for_cb.load(Ordering::Relaxed),
server_encoder_label: server_encoder_label_for_cb.clone(),
server_process_cpu_tenths: server_process_cpu_tenths_for_cb
.load(Ordering::Relaxed),
};
match tx.try_send(Ok(pkt)) {
Ok(_) => {