#[cfg(not(coverage))] use anyhow::{Context, Result}; #[cfg(not(coverage))] use gstreamer as gst; #[cfg(not(coverage))] use gstreamer::prelude::{Cast, ElementExt, GstBinExt, GstObjectExt}; #[cfg(not(coverage))] use gstreamer_app as gst_app; #[cfg(not(coverage))] use gtk::prelude::WidgetExt; #[cfg(not(coverage))] use gtk::{gdk, glib}; #[cfg(not(coverage))] use lesavka_common::lesavka::{MonitorRequest, VideoPacket, relay_client::RelayClient}; #[cfg(not(coverage))] use std::collections::VecDeque; #[cfg(not(coverage))] use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; #[cfg(not(coverage))] use std::sync::{Arc, Mutex}; #[cfg(not(coverage))] use std::time::{Duration, Instant}; #[cfg(not(coverage))] use tonic::{Request, transport::Channel}; #[cfg(not(coverage))] use tracing::{debug, warn}; #[cfg(not(coverage))] const PREVIEW_WIDTH: i32 = 640; #[cfg(not(coverage))] const PREVIEW_HEIGHT: i32 = 360; #[cfg(not(coverage))] const DEFAULT_EYE_SOURCE_WIDTH: i32 = 1920; #[cfg(not(coverage))] const DEFAULT_EYE_SOURCE_HEIGHT: i32 = 1080; #[cfg(not(coverage))] const PREVIEW_IDLE_STATUS: &str = "Connect relay to preview."; #[cfg(not(coverage))] const TELEMETRY_WINDOW: Duration = Duration::from_secs(5); #[cfg(not(coverage))] pub struct LauncherPreview { server_addr: Arc>, log_sink: Arc>>>, inline_feeds: Arc>, window_feeds: Arc>, } #[cfg(not(coverage))] #[derive(Clone)] pub struct PreviewBinding { enabled: Arc, alive: Arc, active_bindings: Arc, } #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] pub enum PreviewSurface { Inline, Window, } #[cfg(not(coverage))] #[derive(Clone, Debug, Default, PartialEq)] pub struct PreviewMetricsSnapshot { pub receive_fps: f32, pub present_fps: f32, pub server_fps: f32, pub stream_spread_ms: f32, pub packet_loss_pct: f32, pub dropped_frames: u64, pub queue_depth: u32, pub queue_depth_peak: u32, pub packet_gap_peak_ms: f32, pub present_gap_peak_ms: f32, pub server_source_gap_peak_ms: f32, pub server_send_gap_peak_ms: f32, pub server_queue_peak: u32, pub decoder_label: String, } #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] struct PreviewProfile { display_width: i32, display_height: i32, requested_width: i32, requested_height: i32, requested_fps: u32, max_bitrate_kbit: u32, } #[cfg(not(coverage))] impl PreviewSurface { fn profile(self) -> PreviewProfile { match self { Self::Inline => PreviewProfile { display_width: preview_dimension("LESAVKA_PREVIEW_WIDTH", PREVIEW_WIDTH), display_height: preview_dimension("LESAVKA_PREVIEW_HEIGHT", PREVIEW_HEIGHT), requested_width: preview_dimension( "LESAVKA_PREVIEW_REQUEST_WIDTH", DEFAULT_EYE_SOURCE_WIDTH, ), requested_height: preview_dimension( "LESAVKA_PREVIEW_REQUEST_HEIGHT", DEFAULT_EYE_SOURCE_HEIGHT, ), requested_fps: preview_bitrate("LESAVKA_PREVIEW_REQUEST_FPS", 30), max_bitrate_kbit: preview_bitrate("LESAVKA_PREVIEW_MAX_KBIT", 12_000), }, Self::Window => PreviewProfile { display_width: preview_dimension("LESAVKA_BREAKOUT_PREVIEW_WIDTH", 1280), display_height: preview_dimension("LESAVKA_BREAKOUT_PREVIEW_HEIGHT", 720), requested_width: preview_dimension( "LESAVKA_BREAKOUT_REQUEST_WIDTH", DEFAULT_EYE_SOURCE_WIDTH, ), requested_height: preview_dimension( "LESAVKA_BREAKOUT_REQUEST_HEIGHT", DEFAULT_EYE_SOURCE_HEIGHT, ), requested_fps: preview_bitrate("LESAVKA_BREAKOUT_REQUEST_FPS", 30), max_bitrate_kbit: preview_bitrate("LESAVKA_BREAKOUT_PREVIEW_MAX_KBIT", 12_000), }, } } } #[cfg(not(coverage))] impl LauncherPreview { pub fn new(server_addr: String) -> Result { gst::init().context("initialising preview gstreamer")?; let server_addr = Arc::new(Mutex::new(server_addr)); let log_sink = Arc::new(Mutex::new(None)); let inline_feeds = Arc::new(Mutex::new([ PreviewFeed::spawn( Arc::clone(&server_addr), 0, PreviewSurface::Inline.profile(), Arc::clone(&log_sink), )?, PreviewFeed::spawn( Arc::clone(&server_addr), 1, PreviewSurface::Inline.profile(), Arc::clone(&log_sink), )?, ])); let window_feeds = Arc::new(Mutex::new([ PreviewFeed::spawn( Arc::clone(&server_addr), 0, PreviewSurface::Window.profile(), Arc::clone(&log_sink), )?, PreviewFeed::spawn( Arc::clone(&server_addr), 1, PreviewSurface::Window.profile(), Arc::clone(&log_sink), )?, ])); Ok(Self { server_addr: Arc::clone(&server_addr), log_sink: Arc::clone(&log_sink), inline_feeds, window_feeds, }) } pub fn set_log_sink(&self, tx: std::sync::mpsc::Sender) { if let Ok(mut slot) = self.log_sink.lock() { *slot = Some(tx); } } pub fn set_server_addr(&self, server_addr: String) { if let Ok(mut slot) = self.server_addr.lock() { *slot = server_addr; } } pub fn set_session_active(&self, active: bool) { if let Ok(feeds) = self.inline_feeds.lock() { for feed in feeds.iter() { feed.set_active(active); } } if let Ok(feeds) = self.window_feeds.lock() { for feed in feeds.iter() { feed.set_active(active); } } } pub fn shutdown_all(&self) { if let Ok(feeds) = self.inline_feeds.lock() { for feed in feeds.iter() { feed.shutdown(); } } if let Ok(feeds) = self.window_feeds.lock() { for feed in feeds.iter() { feed.shutdown(); } } } pub fn install_on_picture( &self, monitor_id: usize, surface: PreviewSurface, picture: >k::Picture, status_label: >k::Label, ) -> Option { match surface { PreviewSurface::Inline => self .inline_feeds .lock() .ok() .and_then(|feeds| feeds.get(monitor_id).cloned()) .map(|feed| feed.install_on_picture(picture, status_label)), PreviewSurface::Window => self .window_feeds .lock() .ok() .and_then(|feeds| feeds.get(monitor_id).cloned()) .map(|feed| feed.install_on_picture(picture, status_label)), } } pub fn snapshot_metrics( &self, monitor_id: usize, surface: PreviewSurface, ) -> Option { match surface { PreviewSurface::Inline => self .inline_feeds .lock() .ok() .and_then(|feeds| feeds.get(monitor_id).cloned()) .map(|feed| feed.snapshot_metrics()), PreviewSurface::Window => self .window_feeds .lock() .ok() .and_then(|feeds| feeds.get(monitor_id).cloned()) .map(|feed| feed.snapshot_metrics()), } } pub fn set_capture_profile( &self, monitor_id: usize, requested_width: i32, requested_height: i32, requested_fps: u32, max_bitrate_kbit: u32, ) { self.rebuild_feed( &self.inline_feeds, monitor_id, Some(( requested_width, requested_height, requested_fps, max_bitrate_kbit, )), None, ); self.rebuild_feed( &self.window_feeds, monitor_id, Some(( requested_width, requested_height, requested_fps, max_bitrate_kbit, )), None, ); } pub fn set_breakout_profile(&self, monitor_id: usize, width: i32, height: i32) { self.rebuild_feed(&self.window_feeds, monitor_id, None, Some((width, height))); } fn rebuild_feed( &self, feeds: &Arc>, monitor_id: usize, requested: Option<(i32, i32, u32, u32)>, display: Option<(i32, i32)>, ) { let Ok(mut feeds) = feeds.lock() else { return; }; let Some(existing) = feeds.get(monitor_id).cloned() else { return; }; let was_active = existing.is_active(); let mut profile = existing.profile(); if let Some((requested_width, requested_height, requested_fps, max_bitrate_kbit)) = requested { profile.requested_width = requested_width.max(2); profile.requested_height = requested_height.max(2); profile.requested_fps = requested_fps.max(1); profile.max_bitrate_kbit = max_bitrate_kbit.max(800); } if let Some((display_width, display_height)) = display { profile.display_width = display_width.max(2); profile.display_height = display_height.max(2); } match PreviewFeed::spawn( Arc::clone(&self.server_addr), monitor_id as u32, profile, Arc::clone(&self.log_sink), ) { Ok(feed) => { if was_active { feed.set_active(true); } existing.shutdown(); feeds[monitor_id] = feed; } Err(err) => { warn!(monitor_id, ?err, "could not rebuild preview feed"); } } } } #[cfg(not(coverage))] impl PreviewBinding { pub fn set_enabled(&self, enabled: bool) { let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel); match (was_enabled, enabled) { (false, true) => { self.active_bindings.fetch_add(1, Ordering::AcqRel); } (true, false) => { self.active_bindings.fetch_sub(1, Ordering::AcqRel); } _ => {} } } pub fn close(&self) { if !self.alive.swap(false, Ordering::AcqRel) { return; } if self.enabled.swap(false, Ordering::AcqRel) { self.active_bindings.fetch_sub(1, Ordering::AcqRel); } } #[cfg(test)] pub(crate) fn test_stub() -> Self { Self { enabled: Arc::new(AtomicBool::new(true)), alive: Arc::new(AtomicBool::new(true)), active_bindings: Arc::new(AtomicUsize::new(1)), } } } #[cfg(not(coverage))] #[derive(Clone)] struct PreviewFeed { shared: Arc>, session_active: Arc, active_bindings: Arc, running: Arc, profile: PreviewProfile, } #[cfg(not(coverage))] struct SharedPreviewState { latest: Option, status: String, generation: u64, clear_picture: bool, last_logged_error: Option, last_logged_status: Option, telemetry: PreviewTelemetry, } #[cfg(not(coverage))] impl SharedPreviewState { fn new() -> Self { Self { latest: None, status: PREVIEW_IDLE_STATUS.to_string(), generation: 1, clear_picture: true, last_logged_error: None, last_logged_status: None, telemetry: PreviewTelemetry::default(), } } fn set_status(&mut self, status: impl Into, clear_picture: bool) { let status = status.into(); let changed = self.status != status || clear_picture; self.status = status.clone(); if clear_picture { self.latest = None; self.clear_picture = true; } if !looks_like_preview_problem(&status) { self.last_logged_error = None; } if changed { self.generation = self.generation.saturating_add(1); } } fn push_frame(&mut self, frame: PreviewFrame) { self.telemetry.record_presented_frame(); self.latest = Some(frame); self.clear_picture = false; self.last_logged_error = None; if self.status != "Live" { self.status = "Live".to_string(); self.generation = self.generation.saturating_add(1); } } } #[cfg(not(coverage))] #[derive(Debug, Default)] struct PreviewTelemetry { packet_times: VecDeque, frame_times: VecDeque, packet_intervals_ms: VecDeque<(Instant, f32)>, frame_intervals_ms: VecDeque<(Instant, f32)>, packet_losses: VecDeque<(Instant, u64)>, dropped_deltas: VecDeque<(Instant, u64)>, queue_depth_samples: VecDeque<(Instant, u32)>, last_packet_at: Option, last_frame_at: Option, last_seq: Option, last_dropped_total: Option, latest_server_fps: u32, latest_queue_depth: u32, latest_server_source_gap_peak_ms: u32, latest_server_send_gap_peak_ms: u32, latest_server_queue_peak: u32, decoder_label: String, } #[cfg(not(coverage))] impl PreviewTelemetry { fn record_packet( &mut self, seq: u64, server_fps: u32, dropped_total: u64, queue_depth: u32, server_source_gap_peak_ms: u32, server_send_gap_peak_ms: u32, server_queue_peak: u32, ) { self.record_packet_at( Instant::now(), seq, server_fps, dropped_total, queue_depth, server_source_gap_peak_ms, server_send_gap_peak_ms, server_queue_peak, ); } fn record_packet_at( &mut self, now: Instant, seq: u64, server_fps: u32, dropped_total: u64, queue_depth: u32, server_source_gap_peak_ms: u32, server_send_gap_peak_ms: u32, server_queue_peak: u32, ) { self.trim(now); self.packet_times.push_back(now); if let Some(previous) = self.last_packet_at.replace(now) { self.packet_intervals_ms.push_back(( now, now.saturating_duration_since(previous).as_secs_f32() * 1000.0, )); } if seq > 0 { if let Some(previous_seq) = self.last_seq && seq > previous_seq + 1 { self.packet_losses .push_back((now, seq.saturating_sub(previous_seq + 1))); } self.last_seq = Some(seq); } if let Some(previous_dropped) = self.last_dropped_total && dropped_total > previous_dropped { self.dropped_deltas .push_back((now, dropped_total.saturating_sub(previous_dropped))); } self.last_dropped_total = Some(dropped_total); self.latest_server_fps = server_fps.max(1); self.latest_queue_depth = queue_depth; self.latest_server_source_gap_peak_ms = server_source_gap_peak_ms; self.latest_server_send_gap_peak_ms = server_send_gap_peak_ms; self.latest_server_queue_peak = server_queue_peak.max(queue_depth); self.queue_depth_samples.push_back((now, queue_depth)); self.trim(now); } fn record_presented_frame(&mut self) { self.record_presented_frame_at(Instant::now()); } fn record_presented_frame_at(&mut self, now: Instant) { self.trim(now); if let Some(previous) = self.last_frame_at.replace(now) { self.frame_intervals_ms.push_back(( now, now.saturating_duration_since(previous).as_secs_f32() * 1000.0, )); } self.frame_times.push_back(now); } fn note_decoder(&mut self, decoder_label: &str) { if !decoder_label.is_empty() { self.decoder_label = decoder_label.to_string(); } } fn snapshot(&mut self) -> PreviewMetricsSnapshot { self.snapshot_at(Instant::now()) } fn snapshot_at(&mut self, now: Instant) -> PreviewMetricsSnapshot { self.trim(now); let receive_fps = events_per_second(&self.packet_times, now); let present_fps = events_per_second(&self.frame_times, now); let delivered = self.packet_times.len() as u64; let packet_losses: u64 = self.packet_losses.iter().map(|(_, loss)| *loss).sum(); let packet_loss_pct = if delivered + packet_losses == 0 { 0.0 } else { packet_losses as f32 * 100.0 / (delivered + packet_losses) as f32 }; let dropped_frames: u64 = self .dropped_deltas .iter() .map(|(_, dropped)| *dropped) .sum(); let queue_depth_peak = self .queue_depth_samples .iter() .map(|(_, depth)| *depth) .max() .unwrap_or(self.latest_queue_depth); PreviewMetricsSnapshot { receive_fps, present_fps, server_fps: self.latest_server_fps as f32, stream_spread_ms: compute_jitter_ms(&self.packet_intervals_ms), packet_loss_pct, dropped_frames, queue_depth: self.latest_queue_depth, queue_depth_peak, packet_gap_peak_ms: compute_peak_gap_ms(&self.packet_intervals_ms), present_gap_peak_ms: compute_peak_gap_ms(&self.frame_intervals_ms), server_source_gap_peak_ms: self.latest_server_source_gap_peak_ms as f32, server_send_gap_peak_ms: self.latest_server_send_gap_peak_ms as f32, server_queue_peak: self.latest_server_queue_peak, decoder_label: self.decoder_label.clone(), } } fn trim(&mut self, now: Instant) { trim_instant_queue(&mut self.packet_times, now); trim_instant_queue(&mut self.frame_times, now); trim_value_queue(&mut self.packet_intervals_ms, now); trim_value_queue(&mut self.frame_intervals_ms, now); trim_value_queue(&mut self.packet_losses, now); trim_value_queue(&mut self.dropped_deltas, now); trim_value_queue(&mut self.queue_depth_samples, now); } } #[cfg(not(coverage))] impl PreviewFeed { fn spawn( server_addr: Arc>, monitor_id: u32, profile: PreviewProfile, log_sink: Arc>>>, ) -> Result { let shared = Arc::new(Mutex::new(SharedPreviewState::new())); let session_active = Arc::new(AtomicBool::new(false)); let active_bindings = Arc::new(AtomicUsize::new(0)); let running = Arc::new(AtomicBool::new(true)); let shared_state = Arc::clone(&shared); let session_active_flag = Arc::clone(&session_active); let active_bindings_flag = Arc::clone(&active_bindings); let running_flag = Arc::clone(&running); std::thread::spawn(move || { if let Err(err) = run_preview_feed( server_addr, monitor_id, profile, session_active_flag, active_bindings_flag, running_flag, shared_state, log_sink, ) { warn!(monitor_id, ?err, "launcher preview feed exited"); } }); Ok(Self { shared, session_active, active_bindings, running, profile, }) } fn profile(&self) -> PreviewProfile { self.profile } fn is_active(&self) -> bool { self.session_active.load(Ordering::Relaxed) } fn set_active(&self, active: bool) { self.session_active.store(active, Ordering::Relaxed); if !active { self.replace_status(PREVIEW_IDLE_STATUS, true); } } fn shutdown(&self) { self.running.store(false, Ordering::Relaxed); self.replace_status(PREVIEW_IDLE_STATUS, true); } fn replace_status(&self, status: impl Into, clear_picture: bool) { if let Ok(mut shared) = self.shared.lock() { shared.set_status(status, clear_picture); } } fn install_on_picture( &self, picture: >k::Picture, status_label: >k::Label, ) -> PreviewBinding { let picture = picture.clone(); let status_label = status_label.clone(); let shared = Arc::clone(&self.shared); let enabled = Arc::new(AtomicBool::new(true)); let alive = Arc::new(AtomicBool::new(true)); let active_bindings = Arc::clone(&self.active_bindings); let enabled_flag = Arc::clone(&enabled); let alive_flag = Arc::clone(&alive); active_bindings.fetch_add(1, Ordering::AcqRel); let mut last_generation = 0_u64; glib::timeout_add_local(Duration::from_millis(120), move || { if !alive_flag.load(Ordering::Relaxed) { return glib::ControlFlow::Break; } if !enabled_flag.load(Ordering::Relaxed) { return glib::ControlFlow::Continue; } let (frame, status, generation, clear_picture) = match shared.lock() { Ok(mut slot) => { let frame = slot.latest.take(); let status = slot.status.clone(); let generation = slot.generation; let clear_picture = slot.clear_picture; slot.clear_picture = false; (frame, status, generation, clear_picture) } Err(_) => return glib::ControlFlow::Continue, }; if clear_picture { picture.set_paintable(Option::<&gdk::Paintable>::None); } if let Some(frame) = frame { let bytes = glib::Bytes::from_owned(frame.rgba); let texture = gdk::MemoryTexture::new( frame.width, frame.height, gdk::MemoryFormat::R8g8b8a8, &bytes, frame.stride, ); picture.set_paintable(Some(&texture)); } if generation != last_generation { status_label.set_text(&status); status_label.set_tooltip_text(Some(&status)); last_generation = generation; } glib::ControlFlow::Continue }); PreviewBinding { enabled, alive, active_bindings, } } fn snapshot_metrics(&self) -> PreviewMetricsSnapshot { self.shared .lock() .map(|mut shared| shared.telemetry.snapshot()) .unwrap_or_default() } } #[cfg(not(coverage))] struct PreviewFrame { width: i32, height: i32, stride: usize, rgba: Vec, } #[cfg(not(coverage))] fn run_preview_feed( server_addr: Arc>, monitor_id: u32, profile: PreviewProfile, session_active: Arc, active_bindings: Arc, running: Arc, shared: Arc>, log_sink: Arc>>>, ) -> Result<()> { let (pipeline, appsrc, appsink) = build_preview_pipeline(profile)?; { let shared = Arc::clone(&shared); pipeline.connect_deep_element_added(move |_, _, element| { if let Some(decoder_label) = preview_decoder_label(element) && let Ok(mut slot) = shared.lock() { slot.telemetry.note_decoder(&decoder_label); } }); } pipeline .set_state(gst::State::Playing) .context("starting launcher preview pipeline")?; { let shared = Arc::clone(&shared); let appsink = appsink.clone(); let running = Arc::clone(&running); std::thread::spawn(move || { loop { if !running.load(Ordering::Relaxed) { break; } if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) { if let Some(frame) = sample_to_frame(&sample) { if let Ok(mut slot) = shared.lock() { slot.push_frame(frame); } } } } }); } let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .context("building preview tokio runtime")?; let _ = rt.block_on(async move { let mut was_active = false; let mut retry_delay = Duration::from_millis(750); loop { if !running.load(Ordering::Relaxed) { break; } let active_now = session_active.load(Ordering::Relaxed) && active_bindings.load(Ordering::Relaxed) > 0; if !active_now { was_active = false; retry_delay = Duration::from_millis(750); set_shared_status(&shared, &log_sink, monitor_id, PREVIEW_IDLE_STATUS, true); tokio::time::sleep(Duration::from_millis(150)).await; continue; } if !was_active { was_active = true; set_shared_status( &shared, &log_sink, monitor_id, "Waking relay preview...", true, ); tokio::time::sleep(Duration::from_millis(350)).await; } set_shared_status( &shared, &log_sink, monitor_id, "Connecting relay preview...", true, ); let current_addr = match server_addr.lock() { Ok(value) => value.clone(), Err(_) => { set_shared_status( &shared, &log_sink, monitor_id, "Preview address is unavailable.", true, ); tokio::time::sleep(Duration::from_millis(750)).await; continue; } }; let channel = match Channel::from_shared(current_addr.clone()) { Ok(endpoint) => match endpoint.tcp_nodelay(true).connect().await { Ok(channel) => channel, Err(err) => { warn!(monitor_id, ?err, "launcher preview connect failed"); log_preview_issue( &shared, &log_sink, monitor_id, &format!("Preview host is unavailable: {err}"), ); set_shared_status( &shared, &log_sink, monitor_id, "Preview host is unavailable.", true, ); tokio::time::sleep(retry_delay).await; continue; } }, Err(err) => { warn!(monitor_id, ?err, "launcher preview endpoint invalid"); log_preview_issue( &shared, &log_sink, monitor_id, &format!("Preview address is invalid: {err}"), ); set_shared_status( &shared, &log_sink, monitor_id, "Preview address is invalid.", true, ); tokio::time::sleep(retry_delay).await; continue; } }; let mut cli = RelayClient::new(channel); let req = MonitorRequest { id: monitor_id, max_bitrate: profile.max_bitrate_kbit, requested_width: profile.requested_width.max(0) as u32, requested_height: profile.requested_height.max(0) as u32, requested_fps: profile.requested_fps, }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { retry_delay = Duration::from_millis(750); debug!(monitor_id, "launcher preview connected"); set_shared_status( &shared, &log_sink, monitor_id, "Waiting for stream...", true, ); loop { if !session_active.load(Ordering::Relaxed) || !running.load(Ordering::Relaxed) || active_bindings.load(Ordering::Relaxed) == 0 { break; } match tokio::time::timeout( Duration::from_millis(300), stream.get_mut().message(), ) .await { Ok(Ok(Some(pkt))) => { record_preview_packet(&shared, &pkt); push_preview_packet(&appsrc, pkt); } Ok(Ok(None)) => { set_shared_status( &shared, &log_sink, monitor_id, "Preview stream ended.", true, ); retry_delay = Duration::from_millis(1_500); break; } Ok(Err(err)) => { warn!(monitor_id, ?err, "launcher preview stream error"); log_preview_issue( &shared, &log_sink, monitor_id, &format!("Preview stream error: {err}"), ); set_shared_status( &shared, &log_sink, monitor_id, "Preview stream error. See session log.", true, ); retry_delay = preview_retry_delay(retry_delay, Some(&err.to_string())); break; } Err(_) => continue, } } } Err(err) => { if preview_startup_condition(&err) { debug!( monitor_id, ?err, "launcher preview waiting for capture pipeline" ); log_preview_issue( &shared, &log_sink, monitor_id, &format!("Waiting for capture pipeline: {err}"), ); set_shared_status( &shared, &log_sink, monitor_id, "Waiting for capture pipeline...", true, ); retry_delay = preview_retry_delay(retry_delay, Some(err.message())); } else { warn!(monitor_id, ?err, "launcher preview rpc failed"); log_preview_issue( &shared, &log_sink, monitor_id, &format!("Preview RPC failed: {err}"), ); set_shared_status( &shared, &log_sink, monitor_id, "Preview RPC failed. See session log.", true, ); retry_delay = preview_retry_delay(retry_delay, Some(err.message())); } } } tokio::time::sleep(retry_delay).await; } #[allow(unreachable_code)] Ok::<(), anyhow::Error>(()) }); let _ = pipeline.set_state(gst::State::Null); Ok(()) } #[cfg(not(coverage))] fn preview_startup_condition(err: &tonic::Status) -> bool { let message = err.message().to_ascii_lowercase(); err.code() == tonic::Code::Internal && (message.contains("starting video pipeline") || message.contains("failed to change its state") || message.contains("resource busy") || message.contains("device or resource busy") || message.contains("no signal") || message.contains("was not ready") || message.contains("no such file or directory")) } #[cfg(not(coverage))] fn preview_retry_delay(current: Duration, message: Option<&str>) -> Duration { let current_ms = current.as_millis() as u64; let mut next_ms = if current_ms < 1_500 { 1_500 } else { current_ms.saturating_mul(2) }; if let Some(message) = message { let message = message.to_ascii_lowercase(); if message.contains("too many open files") || message.contains("failed to change its state") || message.contains("resource busy") || message.contains("device or resource busy") { next_ms = next_ms.max(6_000); } } Duration::from_millis(next_ms.min(30_000)) } #[cfg(not(coverage))] fn set_shared_status( shared: &Arc>, log_sink: &Arc>>>, monitor_id: u32, status: impl Into, clear: bool, ) { let status = status.into(); let should_log = if let Ok(mut slot) = shared.lock() { let should_log = slot.last_logged_status.as_deref() != Some(status.as_str()); if should_log { slot.last_logged_status = Some(status.clone()); } slot.set_status(status.clone(), clear); should_log } else { false }; if should_log { log_preview_status(log_sink, monitor_id, &status); } } #[cfg(not(coverage))] fn log_preview_issue( shared: &Arc>, log_sink: &Arc>>>, monitor_id: u32, message: &str, ) { let should_log = if let Ok(mut slot) = shared.lock() { if slot.last_logged_error.as_deref() == Some(message) { false } else { slot.last_logged_error = Some(message.to_string()); true } } else { false }; if !should_log { return; } if let Ok(slot) = log_sink.lock() && let Some(tx) = slot.as_ref() { let _ = tx.send(format!( "[preview:{}] {message}", preview_eye_label(monitor_id) )); } } #[cfg(not(coverage))] fn log_preview_status( log_sink: &Arc>>>, monitor_id: u32, status: &str, ) { if status == PREVIEW_IDLE_STATUS { return; } let eye = preview_eye_label(monitor_id); let message = match status { "Waking relay preview..." => format!("🪄 {eye} eye is waking the preview spell."), "Connecting relay preview..." => format!("🛰️ dialing the {eye} eye feed."), "Waiting for stream..." => { format!("👀 {eye} eye is connected and waiting for the first frame.") } "Preview stream ended." => format!("🌙 {eye} eye preview stream ended."), "Preview host is unavailable." => format!("💔 {eye} eye cannot reach the preview host."), "Preview address is unavailable." => { format!("🧭 {eye} eye does not have a usable preview address yet.") } "Preview address is invalid." => format!("🧭 {eye} eye was given a bad preview address."), "Waiting for capture pipeline..." => { format!("⏳ {eye} eye is waiting for the capture pipeline to wake up.") } "Preview stream error. See session log." => { format!("💥 {eye} eye hit a preview stream error. See the log spellbook for detail.") } "Preview RPC failed. See session log." => { format!("💥 {eye} eye preview RPC fizzled. See the log spellbook for detail.") } other => format!("🎥 {eye} eye: {other}"), }; if let Ok(slot) = log_sink.lock() && let Some(tx) = slot.as_ref() { let _ = tx.send(format!( "[preview:{}] {message}", preview_eye_label(monitor_id) )); } } #[cfg(not(coverage))] fn preview_eye_label(monitor_id: u32) -> &'static str { match monitor_id { 0 => "left", 1 => "right", _ => "eye", } } #[cfg(not(coverage))] fn looks_like_preview_problem(status: &str) -> bool { let lower = status.to_ascii_lowercase(); lower.contains("unavailable") || lower.contains("invalid") || lower.contains("failed") || lower.contains("waiting for capture pipeline") || lower.contains("error") } #[cfg(not(coverage))] fn build_preview_pipeline( profile: PreviewProfile, ) -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink)> { let desc = format!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue max-size-buffers=6 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ h264parse disable-passthrough=true ! decodebin name=decoder ! videoconvert ! videoscale ! \ video/x-raw,format=RGBA,width={},height={},pixel-aspect-ratio=1/1 ! \ appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true", profile.display_width, profile.display_height ); let pipeline = gst::parse::launch(&desc)? .downcast::() .expect("preview pipeline"); let appsrc = pipeline .by_name("src") .context("missing preview appsrc")? .downcast::() .expect("preview appsrc"); appsrc.set_caps(Some( &gst::Caps::builder("video/x-h264") .field("stream-format", &"byte-stream") .field("alignment", &"au") .build(), )); appsrc.set_format(gst::Format::Time); let appsink = pipeline .by_name("sink") .context("missing preview appsink")? .downcast::() .expect("preview appsink"); appsink.set_caps(Some( &gst::Caps::builder("video/x-raw") .field("format", &"RGBA") .field("width", &profile.display_width) .field("height", &profile.display_height) .build(), )); Ok((pipeline, appsrc, appsink)) } #[cfg(not(coverage))] fn push_preview_packet(appsrc: &gst_app::AppSrc, pkt: VideoPacket) { let mut buf = gst::Buffer::from_slice(pkt.data); if let Some(buf) = buf.get_mut() { buf.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); } let _ = appsrc.push_buffer(buf); } #[cfg(not(coverage))] fn record_preview_packet(shared: &Arc>, pkt: &VideoPacket) { if let Ok(mut slot) = shared.lock() { slot.telemetry.record_packet( pkt.seq, pkt.effective_fps, pkt.dropped_total, pkt.queue_depth, pkt.server_source_gap_peak_ms, pkt.server_send_gap_peak_ms, pkt.server_queue_peak, ); } } #[cfg(not(coverage))] fn sample_to_frame(sample: &gst::Sample) -> Option { let caps = sample.caps()?; let structure = caps.structure(0)?; let width = structure.get::("width").ok()?; let height = structure.get::("height").ok()?; let buffer = sample.buffer()?; let map = buffer.map_readable().ok()?; let rgba = map.as_slice().to_vec(); let stride = rgba.len() / height.max(1) as usize; Some(PreviewFrame { width, height, stride, rgba, }) } #[cfg(not(coverage))] fn preview_decoder_label(element: &gst::Element) -> Option { let factory = element.factory()?; let klass = factory.klass().to_ascii_lowercase(); if !klass.contains("decoder") || !klass.contains("video") { return None; } Some(factory.name().to_string()) } #[cfg(not(coverage))] fn preview_bitrate(var: &str, default: u32) -> u32 { std::env::var(var) .ok() .and_then(|raw| raw.parse::().ok()) .unwrap_or(default) } #[cfg(not(coverage))] fn preview_dimension(var: &str, default: i32) -> i32 { std::env::var(var) .ok() .and_then(|raw| raw.parse::().ok()) .filter(|value| *value > 0) .unwrap_or(default) } #[cfg(not(coverage))] fn events_per_second(events: &VecDeque, now: Instant) -> f32 { let Some(oldest) = events.front().copied() else { return 0.0; }; let span = now .saturating_duration_since(oldest) .as_secs_f32() .clamp(0.25, TELEMETRY_WINDOW.as_secs_f32()); events.len() as f32 / span } #[cfg(not(coverage))] fn trim_instant_queue(queue: &mut VecDeque, now: Instant) { while let Some(oldest) = queue.front().copied() { if now.saturating_duration_since(oldest) > TELEMETRY_WINDOW { let _ = queue.pop_front(); } else { break; } } } #[cfg(not(coverage))] fn trim_value_queue(queue: &mut VecDeque<(Instant, T)>, now: Instant) { while let Some((oldest, _)) = queue.front() { if now.saturating_duration_since(*oldest) > TELEMETRY_WINDOW { let _ = queue.pop_front(); } else { break; } } } #[cfg(not(coverage))] fn compute_jitter_ms(samples: &VecDeque<(Instant, f32)>) -> f32 { if samples.len() < 2 { return 0.0; } let mean = samples.iter().map(|(_, value)| *value).sum::() / samples.len() as f32; samples .iter() .map(|(_, value)| (value - mean).abs()) .sum::() / samples.len() as f32 } #[cfg(not(coverage))] fn compute_peak_gap_ms(samples: &VecDeque<(Instant, f32)>) -> f32 { samples.iter().map(|(_, value)| *value).fold(0.0, f32::max) } #[cfg(test)] mod tests { use super::{ DEFAULT_EYE_SOURCE_HEIGHT, DEFAULT_EYE_SOURCE_WIDTH, PREVIEW_HEIGHT, PREVIEW_WIDTH, PreviewSurface, PreviewTelemetry, }; use std::time::{Duration, Instant}; #[test] fn inline_preview_profile_uses_existing_defaults() { let profile = PreviewSurface::Inline.profile(); assert_eq!(profile.display_width, PREVIEW_WIDTH); assert_eq!(profile.display_height, PREVIEW_HEIGHT); assert_eq!(profile.requested_width, DEFAULT_EYE_SOURCE_WIDTH); assert_eq!(profile.requested_height, DEFAULT_EYE_SOURCE_HEIGHT); assert_eq!(profile.requested_fps, 30); assert_eq!(profile.max_bitrate_kbit, 12_000); } #[test] fn breakout_preview_profile_defaults_to_higher_quality() { let profile = PreviewSurface::Window.profile(); assert_eq!(profile.display_width, 1280); assert_eq!(profile.display_height, 720); assert_eq!(profile.requested_width, DEFAULT_EYE_SOURCE_WIDTH); assert_eq!(profile.requested_height, DEFAULT_EYE_SOURCE_HEIGHT); assert_eq!(profile.requested_fps, 30); assert_eq!(profile.max_bitrate_kbit, 12_000); } #[test] fn preview_telemetry_reports_fps_jitter_loss_and_drop_metrics() { let mut telemetry = PreviewTelemetry::default(); let start = Instant::now(); telemetry.note_decoder("nvh264dec"); telemetry.record_packet_at(start, 1, 30, 0, 1, 41, 38, 2); telemetry.record_presented_frame_at(start + Duration::from_millis(5)); telemetry.record_packet_at(start + Duration::from_millis(33), 2, 30, 0, 1, 41, 38, 2); telemetry.record_presented_frame_at(start + Duration::from_millis(37)); telemetry.record_packet_at(start + Duration::from_millis(80), 4, 27, 2, 3, 77, 88, 4); telemetry.record_presented_frame_at(start + Duration::from_millis(90)); let snapshot = telemetry.snapshot_at(start + Duration::from_millis(120)); assert!(snapshot.receive_fps >= 12.0); assert!(snapshot.present_fps >= 12.0); assert_eq!(snapshot.server_fps, 27.0); assert!(snapshot.stream_spread_ms > 0.0); assert!(snapshot.packet_loss_pct > 0.0); assert_eq!(snapshot.dropped_frames, 2); assert_eq!(snapshot.queue_depth, 3); assert_eq!(snapshot.queue_depth_peak, 3); assert!(snapshot.packet_gap_peak_ms >= 47.0); assert!(snapshot.present_gap_peak_ms >= 53.0); assert_eq!(snapshot.server_source_gap_peak_ms, 77.0); assert_eq!(snapshot.server_send_gap_peak_ms, 88.0); assert_eq!(snapshot.server_queue_peak, 4); assert_eq!(snapshot.decoder_label, "nvh264dec"); } }