From f10e8a00ae84c9325744eee33c0585bec72b3947 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 17 Apr 2026 06:14:54 -0300 Subject: [PATCH] lesavka: add runtime pressure diagnostics --- client/Cargo.toml | 2 +- client/src/launcher/diagnostics.rs | 79 ++++++++++++++++++++++++++++-- client/src/launcher/preview.rs | 62 ++++++++++++++++++++--- client/src/launcher/state.rs | 6 ++- client/src/launcher/ui.rs | 15 ++++++ client/src/lib.rs | 1 + client/src/output/video.rs | 40 +++++++++++++-- client/src/video_support.rs | 41 ++++++++++++++++ common/Cargo.toml | 2 +- common/proto/lesavka.proto | 2 + common/src/cli.rs | 2 +- common/src/lib.rs | 1 + common/src/process_metrics.rs | 66 +++++++++++++++++++++++++ server/Cargo.toml | 2 +- server/src/video.rs | 32 ++++++++++++ 15 files changed, 333 insertions(+), 20 deletions(-) create mode 100644 client/src/video_support.rs create mode 100644 common/src/process_metrics.rs diff --git a/client/Cargo.toml b/client/Cargo.toml index 7acf683..7d4cff1 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.10.0" +version = "0.11.0" edition = "2024" [dependencies] diff --git a/client/src/launcher/diagnostics.rs b/client/src/launcher/diagnostics.rs index 25dd474..e5a0bad 100644 --- a/client/src/launcher/diagnostics.rs +++ b/client/src/launcher/diagnostics.rs @@ -10,6 +10,8 @@ pub struct PerformanceSample { pub probe_spread_ms: f32, pub input_latency_ms: f32, pub probe_loss_pct: f32, + pub client_process_cpu_pct: f32, + pub server_process_cpu_pct: f32, pub video_loss_pct: f32, pub left_receive_fps: f32, pub left_present_fps: f32, @@ -22,6 +24,7 @@ pub struct PerformanceSample { pub left_server_source_gap_peak_ms: f32, pub left_server_send_gap_peak_ms: f32, pub left_server_queue_peak: u32, + pub left_server_encoder_label: String, pub left_decoder_label: String, pub right_receive_fps: f32, pub right_present_fps: f32, @@ -34,6 +37,7 @@ pub struct PerformanceSample { pub right_server_source_gap_peak_ms: f32, pub right_server_send_gap_peak_ms: f32, pub right_server_queue_peak: u32, + pub right_server_encoder_label: String, pub right_decoder_label: String, pub dropped_frames: u64, pub queue_depth: u32, @@ -87,6 +91,8 @@ pub struct SnapshotReport { pub view_mode: ViewMode, pub remote_active: bool, pub power_state: String, + pub client_process_cpu_pct: f32, + pub server_process_cpu_pct: f32, pub preview_source: String, pub client_display_limit: String, pub left_surface: String, @@ -102,6 +108,7 @@ pub struct SnapshotReport { pub left_server_source_gap_peak_ms: f32, pub left_server_send_gap_peak_ms: f32, pub left_server_queue_peak: u32, + pub left_server_encoder_label: String, pub right_surface: String, pub right_capture_profile: String, pub right_capture_transport: String, @@ -115,6 +122,7 @@ pub struct SnapshotReport { pub right_server_source_gap_peak_ms: f32, pub right_server_send_gap_peak_ms: f32, pub right_server_queue_peak: u32, + pub right_server_encoder_label: String, pub selected_camera: Option, pub selected_microphone: Option, pub selected_speaker: Option, @@ -147,6 +155,12 @@ impl SnapshotReport { state.capture_power.detail, state.capture_power.active_leases ), + client_process_cpu_pct: latest + .map(|sample| sample.client_process_cpu_pct) + .unwrap_or(0.0), + server_process_cpu_pct: latest + .map(|sample| sample.server_process_cpu_pct) + .unwrap_or(0.0), preview_source: format!( "{}x{} @ {} fps", state.preview_source.width, state.preview_source.height, state.preview_source.fps @@ -200,6 +214,15 @@ impl SnapshotReport { left_server_queue_peak: latest .map(|sample| sample.left_server_queue_peak) .unwrap_or(0), + left_server_encoder_label: latest + .map(|sample| { + if sample.left_server_encoder_label.is_empty() { + "pending".to_string() + } else { + sample.left_server_encoder_label.clone() + } + }) + .unwrap_or_else(|| "pending".to_string()), right_surface: state.display_surface(1).label().to_string(), right_capture_profile: format!( "{} | {}x{} | {} fps | {} kbit", @@ -245,6 +268,15 @@ impl SnapshotReport { right_server_queue_peak: latest .map(|sample| sample.right_server_queue_peak) .unwrap_or(0), + right_server_encoder_label: latest + .map(|sample| { + if sample.right_server_encoder_label.is_empty() { + "pending".to_string() + } else { + sample.right_server_encoder_label.clone() + } + }) + .unwrap_or_else(|| "pending".to_string()), selected_camera: state.devices.camera.clone(), selected_microphone: state.devices.microphone.clone(), selected_speaker: state.devices.speaker.clone(), @@ -278,6 +310,11 @@ impl SnapshotReport { "session: routing={:?} view={:?} relay_active={} power={}", self.routing, self.view_mode, self.remote_active, self.power_state ); + let _ = writeln!( + text, + "runtime pressure: client={:.1}% server={:.1}%", + self.client_process_cpu_pct, self.server_process_cpu_pct + ); let _ = writeln!(text, "source feed: {}", self.preview_source); let _ = writeln!(text, "client display limit: {}", self.client_display_limit); let _ = writeln!(text); @@ -298,7 +335,9 @@ impl SnapshotReport { ); let _ = writeln!( text, - " server: gaps={:.0}/{:.0}ms queue-peak={}", + " server: encoder={} cpu={:.1}% gaps={:.0}/{:.0}ms queue-peak={}", + self.left_server_encoder_label, + self.server_process_cpu_pct, self.left_server_source_gap_peak_ms, self.left_server_send_gap_peak_ms, self.left_server_queue_peak @@ -320,7 +359,9 @@ impl SnapshotReport { ); let _ = writeln!( text, - " server: gaps={:.0}/{:.0}ms queue-peak={}", + " server: encoder={} cpu={:.1}% gaps={:.0}/{:.0}ms queue-peak={}", + self.right_server_encoder_label, + self.server_process_cpu_pct, self.right_server_source_gap_peak_ms, self.right_server_send_gap_peak_ms, self.right_server_queue_peak @@ -366,10 +407,12 @@ impl SnapshotReport { for sample in &self.recent_samples { let _ = writeln!( text, - " rtt={:.1}ms probe-spread={:.1}ms input-floor={:.1}ms probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}/{} peaks=l{:.0}/{:.0}ms r{:.0}/{:.0}ms server=l{:.0}/{:.0}/{} r{:.0}/{:.0}/{}", + " rtt={:.1}ms probe-spread={:.1}ms input-floor={:.1}ms cpu={:.1}/{:.1}% probe-loss={:.1}% video-loss={:.1}% left={:.1}/{:.1}/{:.1}fps right={:.1}/{:.1}/{:.1}fps dropped={} queue={}/{} peaks=l{:.0}/{:.0}ms r{:.0}/{:.0}ms server=l{}:{:.0}/{:.0}/{} r{}:{:.0}/{:.0}/{}", sample.rtt_ms, sample.probe_spread_ms, sample.input_latency_ms, + sample.client_process_cpu_pct, + sample.server_process_cpu_pct, sample.probe_loss_pct, sample.video_loss_pct, sample.left_receive_fps, @@ -385,9 +428,11 @@ impl SnapshotReport { sample.left_present_gap_peak_ms, sample.right_packet_gap_peak_ms, sample.right_present_gap_peak_ms, + sample.left_server_encoder_label, sample.left_server_source_gap_peak_ms, sample.left_server_send_gap_peak_ms, sample.left_server_queue_peak, + sample.right_server_encoder_label, sample.right_server_source_gap_peak_ms, sample.right_server_send_gap_peak_ms, sample.right_server_queue_peak @@ -494,6 +539,18 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec= 85.0 { + items.push( + "Client process CPU is high. If motion still looks rough, favor lighter breakout layouts or a hardware decoder before adding more bitrate." + .to_string(), + ); + } + if sample.server_process_cpu_pct >= 85.0 { + items.push( + "Server process CPU is high. That makes re-encode stalls more likely, so compare Source against lighter re-encode profiles before assuming the WAN is the bottleneck." + .to_string(), + ); + } } let heavy_capture = state.capture_sizes.iter().any(|preset| { matches!( @@ -551,6 +608,16 @@ fn recommendations_for(state: &LauncherState, log: &DiagnosticsLog) -> Vec= 70.0 + && (sample.left_server_encoder_label.contains("x264") + || sample.right_server_encoder_label.contains("x264")) + { + items.push( + "The server is leaning on `x264enc` while process CPU is already elevated. That makes `Source` or lighter re-encode settings more attractive than pushing the bitrate ceiling upward." + .to_string(), + ); + } if state.breakout_count() == 2 { items.push( "Both eye feeds are broken out right now. If the client starts struggling, compare in-launcher preview smoothness against full-window decode." @@ -574,6 +641,8 @@ mod tests { probe_spread_ms: 3.0 + n as f32, input_latency_ms: 10.0 + n as f32, probe_loss_pct: n as f32, + client_process_cpu_pct: 12.5 + n as f32, + server_process_cpu_pct: 22.5 + n as f32, video_loss_pct: (n as f32) * 0.5, left_receive_fps: 30.0, left_present_fps: 29.0, @@ -586,6 +655,7 @@ mod tests { left_server_source_gap_peak_ms: 42.0, left_server_send_gap_peak_ms: 48.0, left_server_queue_peak: n as u32 + 1, + left_server_encoder_label: "x264enc".to_string(), left_decoder_label: "decodebin".to_string(), right_receive_fps: 30.0, right_present_fps: 28.0, @@ -598,6 +668,7 @@ mod tests { right_server_source_gap_peak_ms: 51.0, right_server_send_gap_peak_ms: 58.0, right_server_queue_peak: n as u32 + 1, + right_server_encoder_label: "source-pass-through".to_string(), right_decoder_label: "decodebin".to_string(), dropped_frames: n, queue_depth: n as u32, @@ -657,7 +728,7 @@ mod tests { assert!(report.status.contains("mode=remote")); assert!(report.client_version.starts_with("0.")); assert!(report.left_capture_profile.contains("fps")); - assert_eq!(report.left_capture_transport, "source pass-through"); + assert_eq!(report.left_capture_transport, "server re-encode"); assert_eq!(report.left_decoder_label, "decodebin"); } diff --git a/client/src/launcher/preview.rs b/client/src/launcher/preview.rs index d21be16..3ca39a9 100644 --- a/client/src/launcher/preview.rs +++ b/client/src/launcher/preview.rs @@ -1,4 +1,6 @@ #[cfg(not(coverage))] +use crate::video_support::pick_h264_decoder; +#[cfg(not(coverage))] use anyhow::{Context, Result}; #[cfg(not(coverage))] use gstreamer as gst; @@ -67,6 +69,7 @@ pub struct PreviewMetricsSnapshot { pub receive_fps: f32, pub present_fps: f32, pub server_fps: f32, + pub server_process_cpu_pct: f32, pub stream_spread_ms: f32, pub packet_loss_pct: f32, pub dropped_frames: u64, @@ -77,6 +80,7 @@ pub struct PreviewMetricsSnapshot { pub server_source_gap_peak_ms: f32, pub server_send_gap_peak_ms: f32, pub server_queue_peak: u32, + pub server_encoder_label: String, pub decoder_label: String, } @@ -446,10 +450,12 @@ struct PreviewTelemetry { last_seq: Option, last_dropped_total: Option, latest_server_fps: u32, + latest_server_process_cpu_tenths: u32, latest_queue_depth: u32, latest_server_source_gap_peak_ms: u32, latest_server_send_gap_peak_ms: u32, latest_server_queue_peak: u32, + latest_server_encoder_label: String, decoder_label: String, } @@ -464,6 +470,8 @@ impl PreviewTelemetry { server_source_gap_peak_ms: u32, server_send_gap_peak_ms: u32, server_queue_peak: u32, + server_encoder_label: &str, + server_process_cpu_tenths: u32, ) { self.record_packet_at( Instant::now(), @@ -474,6 +482,8 @@ impl PreviewTelemetry { server_source_gap_peak_ms, server_send_gap_peak_ms, server_queue_peak, + server_encoder_label, + server_process_cpu_tenths, ); } @@ -487,6 +497,8 @@ impl PreviewTelemetry { server_source_gap_peak_ms: u32, server_send_gap_peak_ms: u32, server_queue_peak: u32, + server_encoder_label: &str, + server_process_cpu_tenths: u32, ) { self.trim(now); self.packet_times.push_back(now); @@ -513,10 +525,14 @@ impl PreviewTelemetry { } self.last_dropped_total = Some(dropped_total); self.latest_server_fps = server_fps.max(1); + self.latest_server_process_cpu_tenths = server_process_cpu_tenths; self.latest_queue_depth = queue_depth; self.latest_server_source_gap_peak_ms = server_source_gap_peak_ms; self.latest_server_send_gap_peak_ms = server_send_gap_peak_ms; self.latest_server_queue_peak = server_queue_peak.max(queue_depth); + if !server_encoder_label.is_empty() { + self.latest_server_encoder_label = server_encoder_label.to_string(); + } self.queue_depth_samples.push_back((now, queue_depth)); self.trim(now); } @@ -572,6 +588,7 @@ impl PreviewTelemetry { receive_fps, present_fps, server_fps: self.latest_server_fps as f32, + server_process_cpu_pct: self.latest_server_process_cpu_tenths as f32 / 10.0, stream_spread_ms: compute_jitter_ms(&self.packet_intervals_ms), packet_loss_pct, dropped_frames, @@ -582,6 +599,7 @@ impl PreviewTelemetry { server_source_gap_peak_ms: self.latest_server_source_gap_peak_ms as f32, server_send_gap_peak_ms: self.latest_server_send_gap_peak_ms as f32, server_queue_peak: self.latest_server_queue_peak, + server_encoder_label: self.latest_server_encoder_label.clone(), decoder_label: self.decoder_label.clone(), } } @@ -752,7 +770,10 @@ fn run_preview_feed( shared: Arc>, log_sink: Arc>>>, ) -> Result<()> { - let (pipeline, appsrc, appsink) = build_preview_pipeline(profile)?; + let (pipeline, appsrc, appsink, decoder_name) = build_preview_pipeline(profile)?; + if let Ok(mut slot) = shared.lock() { + slot.telemetry.note_decoder(&decoder_name); + } { let shared = Arc::clone(&shared); pipeline.connect_deep_element_added(move |_, _, element| { @@ -1159,11 +1180,12 @@ fn looks_like_preview_problem(status: &str) -> bool { #[cfg(not(coverage))] fn build_preview_pipeline( profile: PreviewProfile, -) -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink)> { +) -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink, String)> { + let decoder_name = pick_h264_decoder(); let desc = format!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue max-size-buffers=6 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ - h264parse disable-passthrough=true ! decodebin name=decoder ! videoconvert ! videoscale ! \ + h264parse disable-passthrough=true ! {decoder_name} name=decoder ! videoconvert ! videoscale ! \ video/x-raw,format=RGBA,width={},height={},pixel-aspect-ratio=1/1 ! \ appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true", profile.display_width, profile.display_height @@ -1198,7 +1220,7 @@ fn build_preview_pipeline( .build(), )); - Ok((pipeline, appsrc, appsink)) + Ok((pipeline, appsrc, appsink, decoder_name)) } #[cfg(not(coverage))] @@ -1221,6 +1243,8 @@ fn record_preview_packet(shared: &Arc>, pkt: &VideoPac pkt.server_source_gap_peak_ms, pkt.server_send_gap_peak_ms, pkt.server_queue_peak, + &pkt.server_encoder_label, + pkt.server_process_cpu_tenths, ); } } @@ -1357,11 +1381,33 @@ mod tests { let mut telemetry = PreviewTelemetry::default(); let start = Instant::now(); telemetry.note_decoder("nvh264dec"); - telemetry.record_packet_at(start, 1, 30, 0, 1, 41, 38, 2); + telemetry.record_packet_at(start, 1, 30, 0, 1, 41, 38, 2, "x264enc", 215); telemetry.record_presented_frame_at(start + Duration::from_millis(5)); - telemetry.record_packet_at(start + Duration::from_millis(33), 2, 30, 0, 1, 41, 38, 2); + telemetry.record_packet_at( + start + Duration::from_millis(33), + 2, + 30, + 0, + 1, + 41, + 38, + 2, + "x264enc", + 215, + ); telemetry.record_presented_frame_at(start + Duration::from_millis(37)); - telemetry.record_packet_at(start + Duration::from_millis(80), 4, 27, 2, 3, 77, 88, 4); + telemetry.record_packet_at( + start + Duration::from_millis(80), + 4, + 27, + 2, + 3, + 77, + 88, + 4, + "x264enc", + 382, + ); telemetry.record_presented_frame_at(start + Duration::from_millis(90)); let snapshot = telemetry.snapshot_at(start + Duration::from_millis(120)); @@ -1378,6 +1424,8 @@ mod tests { assert_eq!(snapshot.server_source_gap_peak_ms, 77.0); assert_eq!(snapshot.server_send_gap_peak_ms, 88.0); assert_eq!(snapshot.server_queue_peak, 4); + assert_eq!(snapshot.server_process_cpu_pct, 38.2); + assert_eq!(snapshot.server_encoder_label, "x264enc"); assert_eq!(snapshot.decoder_label, "nvh264dec"); } } diff --git a/client/src/launcher/state.rs b/client/src/launcher/state.rs index ffbe3e0..ab9ea53 100644 --- a/client/src/launcher/state.rs +++ b/client/src/launcher/state.rs @@ -269,7 +269,7 @@ impl Default for LauncherState { preview_source: PreviewSourceSize::default(), breakout_limit: PreviewSourceSize::default(), breakout_display: PreviewSourceSize::default(), - capture_sizes: [CaptureSizePreset::Source, CaptureSizePreset::Source], + capture_sizes: [CaptureSizePreset::P1080, CaptureSizePreset::P1080], capture_fps: [30, 30], capture_bitrates_kbit: [12_000, 12_000], breakout_sizes: [BreakoutSizePreset::Source, BreakoutSizePreset::Source], @@ -944,7 +944,7 @@ mod tests { assert_eq!(state.display_surface(1), DisplaySurface::Preview); assert_eq!(state.preview_source_size(), PreviewSourceSize::default()); assert_eq!(state.breakout_limit_size(), PreviewSourceSize::default()); - assert_eq!(state.capture_size_preset(0), CaptureSizePreset::Source); + assert_eq!(state.capture_size_preset(0), CaptureSizePreset::P1080); assert_eq!(state.breakout_size_preset(0), BreakoutSizePreset::Source); assert!(!state.server_available); assert!(!state.remote_active); @@ -1184,6 +1184,7 @@ mod tests { fn source_capture_profile_uses_source_fps_and_scaled_profiles_cap_it() { let mut state = LauncherState::new(); state.set_preview_source_profile(1920, 1080, 60); + state.set_capture_size_preset(0, CaptureSizePreset::Source); let source = state.capture_size_choice(0); assert_eq!(source.width, 1920); assert_eq!(source.height, 1080); @@ -1228,6 +1229,7 @@ mod tests { fn source_capture_ignores_manual_fps_and_bitrate_knobs() { let mut state = LauncherState::new(); state.set_preview_source_profile(1920, 1080, 25); + state.set_capture_size_preset(0, CaptureSizePreset::Source); state.set_capture_fps(0, 60); state.set_capture_bitrate_kbit(0, 24_000); diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index 50a2017..ceecd4e 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -29,6 +29,7 @@ use { gtk::glib, gtk::prelude::*, lesavka_common::lesavka::CapturePowerCommand, + lesavka_common::process_metrics::ProcessCpuSampler, std::cell::{Cell, RefCell}, std::collections::VecDeque, std::process::Command, @@ -249,6 +250,7 @@ fn record_diagnostics_sample( state: &LauncherState, preview: Option<&super::preview::LauncherPreview>, network: NetworkSnapshot, + client_process_cpu_pct: f32, ) { let left_metrics = preview .and_then(|preview| { @@ -281,6 +283,10 @@ fn record_diagnostics_sample( probe_spread_ms: network.probe_spread_ms, input_latency_ms: network.rtt_ms * 0.5, probe_loss_pct: network.probe_loss_pct, + client_process_cpu_pct, + server_process_cpu_pct: left_metrics + .server_process_cpu_pct + .max(right_metrics.server_process_cpu_pct), video_loss_pct: left_metrics .packet_loss_pct .max(right_metrics.packet_loss_pct), @@ -295,6 +301,7 @@ fn record_diagnostics_sample( left_server_source_gap_peak_ms: left_metrics.server_source_gap_peak_ms, left_server_send_gap_peak_ms: left_metrics.server_send_gap_peak_ms, left_server_queue_peak: left_metrics.server_queue_peak, + left_server_encoder_label: left_metrics.server_encoder_label.clone(), left_decoder_label: left_metrics.decoder_label.clone(), right_receive_fps: right_metrics.receive_fps, right_present_fps: right_metrics.present_fps, @@ -307,6 +314,7 @@ fn record_diagnostics_sample( right_server_source_gap_peak_ms: right_metrics.server_source_gap_peak_ms, right_server_send_gap_peak_ms: right_metrics.server_send_gap_peak_ms, right_server_queue_peak: right_metrics.server_queue_peak, + right_server_encoder_label: right_metrics.server_encoder_label.clone(), right_decoder_label: right_metrics.decoder_label.clone(), dropped_frames: left_metrics .dropped_frames @@ -622,6 +630,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let (caps_tx, caps_rx) = std::sync::mpsc::channel::(); let caps_request_in_flight = Rc::new(Cell::new(false)); let diagnostics_network = Rc::new(RefCell::new(NetworkTelemetry::default())); + let diagnostics_process = Rc::new(RefCell::new(ProcessCpuSampler::new())); let next_diagnostics_probe = Rc::new(Cell::new(Instant::now() + Duration::from_millis(250))); let next_diagnostics_sample = @@ -1551,6 +1560,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { let caps_tx = caps_tx.clone(); let caps_request_in_flight = Rc::clone(&caps_request_in_flight); let diagnostics_network = Rc::clone(&diagnostics_network); + let diagnostics_process = Rc::clone(&diagnostics_process); let next_diagnostics_probe = Rc::clone(&next_diagnostics_probe); let next_diagnostics_sample = Rc::clone(&next_diagnostics_sample); let log_tx = log_tx.clone(); @@ -1843,11 +1853,16 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { if now >= next_diagnostics_sample.get() { let network = diagnostics_network.borrow_mut().snapshot(); + let client_process_cpu_pct = diagnostics_process + .borrow_mut() + .sample_percent() + .unwrap_or(0.0); record_diagnostics_sample( &widgets, &state.borrow(), preview.as_ref().map(|preview| preview.as_ref()), network, + client_process_cpu_pct, ); next_diagnostics_sample.set(now + Duration::from_secs(1)); } diff --git a/client/src/lib.rs b/client/src/lib.rs index 2b5ebdb..29aceb4 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -14,5 +14,6 @@ pub mod launcher; pub mod layout; pub mod output; pub mod paste; +pub(crate) mod video_support; pub use app::LesavkaClientApp; diff --git a/client/src/output/video.rs b/client/src/output/video.rs index ff1884f..84bde87 100644 --- a/client/src/output/video.rs +++ b/client/src/output/video.rs @@ -9,6 +9,36 @@ use gstreamer_video::prelude::VideoOverlayExt; use lesavka_common::lesavka::VideoPacket; use std::process::Command; use tracing::{debug, error, info, warn}; + +fn pick_h264_decoder() -> String { + if let Ok(raw) = std::env::var("LESAVKA_H264_DECODER") { + let name = raw.trim(); + if name.eq_ignore_ascii_case("decodebin") { + return "decodebin".to_string(); + } + if !name.is_empty() && gst::ElementFactory::find(name).is_some() { + return name.to_string(); + } + } + + for name in [ + "nvh264dec", + "nvh264sldec", + "vah264dec", + "vaapih264dec", + "v4l2h264dec", + "v4l2slh264dec", + "openh264dec", + "avdec_h264", + ] { + if gst::ElementFactory::find(name).is_some() { + return name.to_string(); + } + } + + "decodebin".to_string() +} + pub struct MonitorWindow { _pipeline: gst::Pipeline, src: gst_app::AppSrc, @@ -124,6 +154,7 @@ impl MonitorWindow { gst::init().context("initialising GStreamer")?; // --- Build pipeline --------------------------------------------------- + let decoder_name = pick_h264_decoder(); let sink = if std::env::var("GDK_BACKEND") .map(|v| v.contains("x11")) .unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some()) @@ -137,7 +168,7 @@ impl MonitorWindow { "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ - h264parse disable-passthrough=true ! decodebin ! videoconvert ! {sink}" + h264parse disable-passthrough=true ! {decoder_name} name=decoder ! videoconvert ! {sink}" ); let pipeline: gst::Pipeline = gst::parse::launch(&desc)? @@ -279,6 +310,7 @@ impl MonitorWindow { StateChanged(s) if s.current() == gst::State::Playing => { if msg.src().map(|s| s.is::()).unwrap_or(false) { info!("🎞️ video{id} pipeline ▶️ (sink='glimagesink')"); + info!("🎞️ video{id} decoder → {decoder_name}"); } } Error(e) => error!( @@ -390,6 +422,7 @@ impl UnifiedMonitorWindow { pub fn new() -> anyhow::Result { gst::init().context("initialising GStreamer")?; + let decoder_name = pick_h264_decoder(); let sink = if std::env::var("GDK_BACKEND") .map(|v| v.contains("x11")) .unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some()) @@ -404,11 +437,11 @@ impl UnifiedMonitorWindow { appsrc name=src0 is-live=true format=time do-timestamp=true block=false ! \ queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ - h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! mix. \ + h264parse disable-passthrough=true ! {decoder_name} name=decoder0 ! videoconvert ! videoscale ! mix. \ appsrc name=src1 is-live=true format=time do-timestamp=true block=false ! \ queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ - h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! mix." + h264parse disable-passthrough=true ! {decoder_name} name=decoder1 ! videoconvert ! videoscale ! mix." ); let pipeline: gst::Pipeline = gst::parse::launch(&desc)? @@ -487,6 +520,7 @@ impl UnifiedMonitorWindow { StateChanged(s) if s.current() == gst::State::Playing => { if msg.src().map(|s| s.is::()).unwrap_or(false) { info!("🎞️ unified video pipeline ▶️"); + info!("🎞️ unified decoder → {decoder_name}"); } } Error(e) => error!( diff --git a/client/src/video_support.rs b/client/src/video_support.rs new file mode 100644 index 0000000..de344df --- /dev/null +++ b/client/src/video_support.rs @@ -0,0 +1,41 @@ +#![forbid(unsafe_code)] + +use gstreamer as gst; + +/// Pick the client-side H.264 decoder in a predictable preference order. +/// +/// Inputs: none, though operators may override the choice with +/// `LESAVKA_H264_DECODER=`. +/// Outputs: the chosen decoder element name, or `decodebin` as a last-resort +/// fallback when no explicit decoder is present. +/// Why: `decodebin` is flexible, but a stable preference order makes decode +/// behavior easier to reason about and compare in diagnostics. +#[must_use] +pub fn pick_h264_decoder() -> String { + if let Ok(raw) = std::env::var("LESAVKA_H264_DECODER") { + let name = raw.trim(); + if name.eq_ignore_ascii_case("decodebin") { + return "decodebin".to_string(); + } + if !name.is_empty() && gst::ElementFactory::find(name).is_some() { + return name.to_string(); + } + } + + for name in [ + "nvh264dec", + "nvh264sldec", + "vah264dec", + "vaapih264dec", + "v4l2h264dec", + "v4l2slh264dec", + "openh264dec", + "avdec_h264", + ] { + if gst::ElementFactory::find(name).is_some() { + return name.to_string(); + } + } + + "decodebin".to_string() +} diff --git a/common/Cargo.toml b/common/Cargo.toml index b722709..d65271f 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.10.0" +version = "0.11.0" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 7f5e7c5..3ac16c0 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -23,6 +23,8 @@ message VideoPacket { uint32 server_source_gap_peak_ms = 8; uint32 server_send_gap_peak_ms = 9; uint32 server_queue_peak = 10; + string server_encoder_label = 11; + uint32 server_process_cpu_tenths = 12; } message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } diff --git a/common/src/cli.rs b/common/src/cli.rs index 0059fa7..8d29cfe 100644 --- a/common/src/cli.rs +++ b/common/src/cli.rs @@ -17,6 +17,6 @@ mod tests { #[test] fn banner_includes_version() { - assert_eq!(banner("0.10.0"), "lesavka-common CLI (v0.10.0)"); + assert_eq!(banner("0.11.0"), "lesavka-common CLI (v0.11.0)"); } } diff --git a/common/src/lib.rs b/common/src/lib.rs index 02fc516..f34b6ac 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -5,6 +5,7 @@ pub mod cli; pub mod hid; pub mod paste; +pub mod process_metrics; #[allow(warnings)] pub mod lesavka { diff --git a/common/src/process_metrics.rs b/common/src/process_metrics.rs new file mode 100644 index 0000000..cfa83ee --- /dev/null +++ b/common/src/process_metrics.rs @@ -0,0 +1,66 @@ +#![forbid(unsafe_code)] + +use std::time::Instant; + +/// Sample per-process CPU pressure from Linux procfs. +/// +/// Inputs: none when constructed; call `sample_percent()` over time. +/// Outputs: percentage of one CPU core used by the current process over the +/// elapsed sampling window. +/// Why: this gives the launcher and server lightweight pressure telemetry +/// without adding a heavyweight system-information dependency. +#[derive(Debug, Clone, Default)] +pub struct ProcessCpuSampler { + last: Option<(Instant, u64)>, +} + +impl ProcessCpuSampler { + #[must_use] + pub fn new() -> Self { + Self::default() + } + + pub fn sample_percent(&mut self) -> Option { + let now = Instant::now(); + let runtime_ns = read_process_runtime_ns()?; + let previous = self.last.replace((now, runtime_ns))?; + let elapsed_ns = now + .saturating_duration_since(previous.0) + .as_nanos() + .min(u128::from(u64::MAX)) as u64; + if elapsed_ns == 0 || runtime_ns < previous.1 { + return None; + } + Some(runtime_ns.saturating_sub(previous.1) as f32 * 100.0 / elapsed_ns as f32) + } + + pub fn sample_tenths_percent(&mut self) -> Option { + self.sample_percent() + .map(|pct| (pct * 10.0).clamp(0.0, u32::MAX as f32) as u32) + } +} + +fn read_process_runtime_ns() -> Option { + let text = std::fs::read_to_string("/proc/self/schedstat").ok()?; + text.split_whitespace().next()?.parse::().ok() +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + use std::time::Duration; + + #[test] + fn schedstat_runtime_reads() { + assert!(read_process_runtime_ns().is_some()); + } + + #[test] + fn sampler_returns_percentage_after_two_samples() { + let mut sampler = ProcessCpuSampler::new(); + assert!(sampler.sample_percent().is_none()); + thread::sleep(Duration::from_millis(10)); + let _ = sampler.sample_percent(); + } +} diff --git a/server/Cargo.toml b/server/Cargo.toml index aae1531..88ad3e5 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.10.0" +version = "0.11.0" edition = "2024" autobins = false diff --git a/server/src/video.rs b/server/src/video.rs index 7257855..5fd1c36 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -7,6 +7,7 @@ use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; use lesavka_common::lesavka::VideoPacket; +use lesavka_common::process_metrics::ProcessCpuSampler; use std::os::unix::fs::FileTypeExt; use std::sync::Arc; use std::sync::OnceLock; @@ -23,6 +24,24 @@ use crate::video_support::{ const EYE_ID: [&str; 2] = ["l", "r"]; static START: OnceLock = OnceLock::new(); +static SERVER_PROCESS_CPU_TENTHS: OnceLock> = OnceLock::new(); + +fn server_process_cpu_metric() -> Arc { + Arc::clone(SERVER_PROCESS_CPU_TENTHS.get_or_init(|| { + let metric = Arc::new(AtomicU32::new(0)); + let metric_for_thread = Arc::clone(&metric); + std::thread::spawn(move || { + let mut sampler = ProcessCpuSampler::new(); + loop { + if let Some(value) = sampler.sample_tenths_percent() { + metric_for_thread.store(value, Ordering::Relaxed); + } + std::thread::sleep(std::time::Duration::from_secs(1)); + } + }); + metric + })) +} pub struct VideoStream { _pipeline: gst::Pipeline, @@ -424,6 +443,14 @@ pub async fn eye_ball_with_request( .clamp(1, request.requested_fps.max(1)); let use_test_src = dev.eq_ignore_ascii_case("testsrc") || dev.eq_ignore_ascii_case("videotestsrc"); + let server_encoder_label = if use_test_src { + "x264enc(testsrc)".to_string() + } else if request.downscale { + "x264enc".to_string() + } else { + "source-pass-through".to_string() + }; + let server_process_cpu_tenths = server_process_cpu_metric(); if !use_test_src { wait_for_eye_device(dev, eye).await?; } @@ -503,6 +530,8 @@ pub async fn eye_ball_with_request( let send_gap_peak_ms_for_cb = Arc::clone(&send_gap_peak_ms); let queue_peak_depth_for_cb = Arc::clone(&queue_peak_depth); let last_telemetry_sec_for_cb = Arc::clone(&last_telemetry_sec); + let server_encoder_label_for_cb = server_encoder_label.clone(); + let server_process_cpu_tenths_for_cb = Arc::clone(&server_process_cpu_tenths); sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |sink| { @@ -619,6 +648,9 @@ pub async fn eye_ball_with_request( server_source_gap_peak_ms: source_gap_peak_ms_for_cb.load(Ordering::Relaxed), server_send_gap_peak_ms: send_gap_peak_ms_for_cb.load(Ordering::Relaxed), server_queue_peak: queue_peak_depth_for_cb.load(Ordering::Relaxed), + server_encoder_label: server_encoder_label_for_cb.clone(), + server_process_cpu_tenths: server_process_cpu_tenths_for_cb + .load(Ordering::Relaxed), }; match tx.try_send(Ok(pkt)) { Ok(_) => {