lesavka/client/src/launcher/preview.rs

2147 lines
73 KiB
Rust

#[cfg(not(coverage))]
use crate::video_support::pick_h264_decoder;
#[cfg(not(coverage))]
use anyhow::{Context, Result};
#[cfg(not(coverage))]
use gstreamer as gst;
#[cfg(not(coverage))]
use gstreamer::prelude::{Cast, ElementExt, GstBinExt, GstObjectExt, PadExt};
#[cfg(not(coverage))]
use gstreamer_app as gst_app;
#[cfg(not(coverage))]
use gtk::prelude::WidgetExt;
#[cfg(not(coverage))]
use gtk::{gdk, glib};
use lesavka_common::{
eye_source::eye_source_mode_for_request,
lesavka::{MonitorRequest, VideoPacket, relay_client::RelayClient},
};
#[cfg(not(coverage))]
use std::collections::VecDeque;
#[cfg(not(coverage))]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[cfg(not(coverage))]
use std::sync::{Arc, Mutex};
#[cfg(not(coverage))]
use std::time::{Duration, Instant};
#[cfg(not(coverage))]
use tonic::{Request, transport::Channel};
#[cfg(not(coverage))]
use tracing::{debug, warn};
#[cfg(not(coverage))]
const PREVIEW_WIDTH: i32 = 960;
#[cfg(not(coverage))]
const PREVIEW_HEIGHT: i32 = 540;
#[cfg(not(coverage))]
const INLINE_PREVIEW_REQUEST_WIDTH: i32 = DEFAULT_EYE_SOURCE_WIDTH;
#[cfg(not(coverage))]
const INLINE_PREVIEW_REQUEST_HEIGHT: i32 = DEFAULT_EYE_SOURCE_HEIGHT;
#[cfg(not(coverage))]
const INLINE_PREVIEW_REQUEST_FPS: u32 = DEFAULT_EYE_SOURCE_FPS;
#[cfg(not(coverage))]
const INLINE_PREVIEW_MAX_KBIT: u32 = DEFAULT_EYE_SOURCE_MAX_KBIT;
#[cfg(not(coverage))]
const DEFAULT_EYE_SOURCE_WIDTH: i32 = 1920;
#[cfg(not(coverage))]
const DEFAULT_EYE_SOURCE_HEIGHT: i32 = 1080;
#[cfg(not(coverage))]
const DEFAULT_EYE_SOURCE_FPS: u32 = 60;
#[cfg(not(coverage))]
const DEFAULT_EYE_SOURCE_MAX_KBIT: u32 = 18_000;
#[cfg(not(coverage))]
const PREVIEW_IDLE_STATUS: &str = "Connect relay to preview.";
#[cfg(not(coverage))]
const TELEMETRY_WINDOW: Duration = Duration::from_secs(5);
#[cfg(not(coverage))]
pub struct LauncherPreview {
server_addr: Arc<Mutex<String>>,
log_sink: Arc<Mutex<Option<std::sync::mpsc::Sender<String>>>>,
inline_feeds: Arc<Mutex<[PreviewFeed; 2]>>,
window_feeds: Arc<Mutex<[PreviewFeed; 2]>>,
}
#[cfg(not(coverage))]
#[derive(Clone)]
pub struct PreviewBinding {
enabled: Arc<AtomicBool>,
alive: Arc<AtomicBool>,
active_bindings: Arc<AtomicUsize>,
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug)]
pub enum PreviewSurface {
Inline,
Window,
}
#[cfg(not(coverage))]
#[derive(Clone, Debug, Default, PartialEq)]
pub struct PreviewMetricsSnapshot {
pub receive_fps: f32,
pub present_fps: f32,
pub server_fps: f32,
pub server_process_cpu_pct: f32,
pub stream_spread_ms: f32,
pub packet_loss_pct: f32,
pub dropped_frames: u64,
pub queue_depth: u32,
pub queue_depth_peak: u32,
pub packet_gap_peak_ms: f32,
pub present_gap_peak_ms: f32,
pub server_source_gap_peak_ms: f32,
pub server_send_gap_peak_ms: f32,
pub server_queue_peak: u32,
pub server_encoder_label: String,
pub decoder_label: String,
pub stream_caps_label: String,
pub decoded_caps_label: String,
pub rendered_caps_label: String,
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug)]
struct PreviewProfile {
source_monitor_id: u32,
display_width: i32,
display_height: i32,
requested_width: i32,
requested_height: i32,
requested_fps: u32,
max_bitrate_kbit: u32,
}
#[cfg(not(coverage))]
impl PreviewSurface {
fn profile(self) -> PreviewProfile {
match self {
Self::Inline => PreviewProfile {
source_monitor_id: 0,
display_width: preview_dimension("LESAVKA_PREVIEW_WIDTH", PREVIEW_WIDTH),
display_height: preview_dimension("LESAVKA_PREVIEW_HEIGHT", PREVIEW_HEIGHT),
requested_width: preview_dimension(
"LESAVKA_PREVIEW_REQUEST_WIDTH",
INLINE_PREVIEW_REQUEST_WIDTH,
),
requested_height: preview_dimension(
"LESAVKA_PREVIEW_REQUEST_HEIGHT",
INLINE_PREVIEW_REQUEST_HEIGHT,
),
requested_fps: preview_bitrate(
"LESAVKA_PREVIEW_REQUEST_FPS",
INLINE_PREVIEW_REQUEST_FPS,
),
max_bitrate_kbit: preview_bitrate(
"LESAVKA_PREVIEW_MAX_KBIT",
INLINE_PREVIEW_MAX_KBIT,
),
},
Self::Window => PreviewProfile {
source_monitor_id: 0,
display_width: preview_dimension("LESAVKA_BREAKOUT_PREVIEW_WIDTH", 1280),
display_height: preview_dimension("LESAVKA_BREAKOUT_PREVIEW_HEIGHT", 720),
requested_width: preview_dimension(
"LESAVKA_BREAKOUT_REQUEST_WIDTH",
DEFAULT_EYE_SOURCE_WIDTH,
),
requested_height: preview_dimension(
"LESAVKA_BREAKOUT_REQUEST_HEIGHT",
DEFAULT_EYE_SOURCE_HEIGHT,
),
requested_fps: preview_bitrate(
"LESAVKA_BREAKOUT_REQUEST_FPS",
DEFAULT_EYE_SOURCE_FPS,
),
max_bitrate_kbit: preview_bitrate(
"LESAVKA_BREAKOUT_PREVIEW_MAX_KBIT",
DEFAULT_EYE_SOURCE_MAX_KBIT,
),
},
}
}
}
#[cfg(not(coverage))]
impl LauncherPreview {
pub fn new(server_addr: String) -> Result<Self> {
gst::init().context("initialising preview gstreamer")?;
let server_addr = Arc::new(Mutex::new(server_addr));
let log_sink = Arc::new(Mutex::new(None));
let inline_feeds = Arc::new(Mutex::new([
PreviewFeed::spawn(
Arc::clone(&server_addr),
0,
PreviewSurface::Inline.profile(),
Arc::clone(&log_sink),
)?,
PreviewFeed::spawn(
Arc::clone(&server_addr),
1,
PreviewSurface::Inline.profile(),
Arc::clone(&log_sink),
)?,
]));
let window_feeds = Arc::new(Mutex::new([
PreviewFeed::spawn(
Arc::clone(&server_addr),
0,
PreviewSurface::Window.profile(),
Arc::clone(&log_sink),
)?,
PreviewFeed::spawn(
Arc::clone(&server_addr),
1,
PreviewSurface::Window.profile(),
Arc::clone(&log_sink),
)?,
]));
Ok(Self {
server_addr: Arc::clone(&server_addr),
log_sink: Arc::clone(&log_sink),
inline_feeds,
window_feeds,
})
}
pub fn set_log_sink(&self, tx: std::sync::mpsc::Sender<String>) {
if let Ok(mut slot) = self.log_sink.lock() {
*slot = Some(tx);
}
}
pub fn set_server_addr(&self, server_addr: String) {
if let Ok(mut slot) = self.server_addr.lock() {
*slot = server_addr;
}
}
pub fn set_session_active(&self, active: bool) {
if let Ok(feeds) = self.inline_feeds.lock() {
for feed in feeds.iter() {
feed.set_active(active);
}
}
if let Ok(feeds) = self.window_feeds.lock() {
for feed in feeds.iter() {
feed.set_active(active);
}
}
}
pub fn shutdown_all(&self) {
if let Ok(feeds) = self.inline_feeds.lock() {
for feed in feeds.iter() {
feed.shutdown();
}
}
if let Ok(feeds) = self.window_feeds.lock() {
for feed in feeds.iter() {
feed.shutdown();
}
}
}
pub fn install_on_picture(
&self,
monitor_id: usize,
surface: PreviewSurface,
picture: &gtk::Picture,
status_label: &gtk::Label,
) -> Option<PreviewBinding> {
match surface {
PreviewSurface::Inline => self
.inline_feeds
.lock()
.ok()
.and_then(|feeds| feeds.get(monitor_id).cloned())
.map(|feed| feed.install_on_picture(picture, status_label)),
PreviewSurface::Window => self
.window_feeds
.lock()
.ok()
.and_then(|feeds| feeds.get(monitor_id).cloned())
.map(|feed| feed.install_on_picture(picture, status_label)),
}
}
pub fn snapshot_metrics(
&self,
monitor_id: usize,
surface: PreviewSurface,
) -> Option<PreviewMetricsSnapshot> {
match surface {
PreviewSurface::Inline => self
.inline_feeds
.lock()
.ok()
.and_then(|feeds| feeds.get(monitor_id).cloned())
.map(|feed| feed.snapshot_metrics()),
PreviewSurface::Window => self
.window_feeds
.lock()
.ok()
.and_then(|feeds| feeds.get(monitor_id).cloned())
.map(|feed| feed.snapshot_metrics()),
}
}
pub fn set_capture_profile(
&self,
monitor_id: usize,
source_monitor_id: usize,
requested_width: i32,
requested_height: i32,
requested_fps: u32,
max_bitrate_kbit: u32,
) {
let (
inline_requested_width,
inline_requested_height,
inline_requested_fps,
inline_max_bitrate_kbit,
) = sanitize_preview_request(
requested_width,
requested_height,
requested_fps,
max_bitrate_kbit,
);
self.rebuild_feed(
&self.inline_feeds,
monitor_id,
Some((
source_monitor_id,
inline_requested_width,
inline_requested_height,
inline_requested_fps,
inline_max_bitrate_kbit,
)),
None,
);
self.rebuild_feed(
&self.window_feeds,
monitor_id,
Some((
source_monitor_id,
requested_width,
requested_height,
requested_fps,
max_bitrate_kbit,
)),
None,
);
}
pub fn set_breakout_profile(&self, monitor_id: usize, width: i32, height: i32) {
self.rebuild_feed(&self.window_feeds, monitor_id, None, Some((width, height)));
}
#[cfg(test)]
pub(crate) fn profile_for_test(
&self,
monitor_id: usize,
surface: PreviewSurface,
) -> Option<(u32, i32, i32, i32, i32, u32, u32)> {
let feed = match surface {
PreviewSurface::Inline => self.inline_feeds.lock().ok()?.get(monitor_id).cloned(),
PreviewSurface::Window => self.window_feeds.lock().ok()?.get(monitor_id).cloned(),
}?;
let profile = feed.profile();
Some((
profile.source_monitor_id,
profile.display_width,
profile.display_height,
profile.requested_width,
profile.requested_height,
profile.requested_fps,
profile.max_bitrate_kbit,
))
}
#[cfg(test)]
pub(crate) fn feed_disabled_for_test(
&self,
monitor_id: usize,
surface: PreviewSurface,
) -> Option<bool> {
let feed = match surface {
PreviewSurface::Inline => self.inline_feeds.lock().ok()?.get(monitor_id).cloned(),
PreviewSurface::Window => self.window_feeds.lock().ok()?.get(monitor_id).cloned(),
}?;
Some(feed.is_disabled())
}
#[cfg(test)]
pub(crate) fn activate_surface_for_test(&self, monitor_id: usize, surface: PreviewSurface) {
let feed = match surface {
PreviewSurface::Inline => self
.inline_feeds
.lock()
.ok()
.and_then(|feeds| feeds.get(monitor_id).cloned()),
PreviewSurface::Window => self
.window_feeds
.lock()
.ok()
.and_then(|feeds| feeds.get(monitor_id).cloned()),
};
if let Some(feed) = feed {
feed.session_active.store(true, Ordering::Relaxed);
feed.active_bindings.fetch_add(1, Ordering::AcqRel);
}
}
fn rebuild_feed(
&self,
feeds: &Arc<Mutex<[PreviewFeed; 2]>>,
monitor_id: usize,
requested: Option<(usize, i32, i32, u32, u32)>,
display: Option<(i32, i32)>,
) {
let Ok(mut feeds) = feeds.lock() else {
return;
};
let Some(existing) = feeds.get(monitor_id).cloned() else {
return;
};
let was_active = existing.is_active();
let keep_disabled = existing.is_disabled();
let mut profile = existing.profile();
if let Some((
source_monitor_id,
requested_width,
requested_height,
requested_fps,
max_bitrate_kbit,
)) = requested
{
profile.source_monitor_id = source_monitor_id as u32;
profile.requested_width = requested_width.max(2);
profile.requested_height = requested_height.max(2);
profile.requested_fps = requested_fps.max(1);
profile.max_bitrate_kbit = max_bitrate_kbit.max(800);
}
if let Some((display_width, display_height)) = display {
profile.display_width = display_width.max(2);
profile.display_height = display_height.max(2);
}
let next_feed = if keep_disabled {
Some(PreviewFeed::spawn_disabled(profile))
} else {
match PreviewFeed::spawn(
Arc::clone(&self.server_addr),
monitor_id as u32,
profile,
Arc::clone(&self.log_sink),
) {
Ok(feed) => Some(feed),
Err(err) => {
warn!(monitor_id, ?err, "could not rebuild preview feed");
None
}
}
};
if let Some(feed) = next_feed {
if was_active {
feed.set_active(true);
}
existing.shutdown();
feeds[monitor_id] = feed;
}
}
pub fn set_monitor_enabled(&self, monitor_id: usize, enabled: bool) {
self.set_feed_enabled(&self.inline_feeds, monitor_id, enabled);
self.set_feed_enabled(&self.window_feeds, monitor_id, enabled);
}
fn set_feed_enabled(
&self,
feeds: &Arc<Mutex<[PreviewFeed; 2]>>,
monitor_id: usize,
enabled: bool,
) {
let Ok(mut feeds) = feeds.lock() else {
return;
};
let Some(existing) = feeds.get(monitor_id).cloned() else {
return;
};
if existing.is_disabled() == !enabled {
return;
}
let was_active = existing.is_active();
let profile = existing.profile();
let replacement = if enabled {
match PreviewFeed::spawn(
Arc::clone(&self.server_addr),
monitor_id as u32,
profile,
Arc::clone(&self.log_sink),
) {
Ok(feed) => feed,
Err(err) => {
warn!(monitor_id, ?err, "could not enable preview feed");
return;
}
}
} else {
PreviewFeed::spawn_disabled(profile)
};
if was_active {
replacement.set_active(true);
}
existing.shutdown();
feeds[monitor_id] = replacement;
}
}
#[cfg(not(coverage))]
impl PreviewBinding {
pub fn set_enabled(&self, enabled: bool) {
let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
match (was_enabled, enabled) {
(false, true) => {
self.active_bindings.fetch_add(1, Ordering::AcqRel);
}
(true, false) => {
self.active_bindings.fetch_sub(1, Ordering::AcqRel);
}
_ => {}
}
}
pub fn close(&self) {
if !self.alive.swap(false, Ordering::AcqRel) {
return;
}
if self.enabled.swap(false, Ordering::AcqRel) {
self.active_bindings.fetch_sub(1, Ordering::AcqRel);
}
}
#[cfg(test)]
pub(crate) fn test_stub() -> Self {
Self {
enabled: Arc::new(AtomicBool::new(true)),
alive: Arc::new(AtomicBool::new(true)),
active_bindings: Arc::new(AtomicUsize::new(1)),
}
}
}
#[cfg(not(coverage))]
#[derive(Clone)]
struct PreviewFeed {
shared: Arc<Mutex<SharedPreviewState>>,
session_active: Arc<AtomicBool>,
active_bindings: Arc<AtomicUsize>,
running: Arc<AtomicBool>,
profile: PreviewProfile,
disabled: bool,
}
#[cfg(not(coverage))]
struct SharedPreviewState {
latest: Option<PreviewFrame>,
status: String,
generation: u64,
clear_picture: bool,
last_logged_error: Option<String>,
last_logged_status: Option<String>,
telemetry: PreviewTelemetry,
}
#[cfg(not(coverage))]
impl SharedPreviewState {
fn new() -> Self {
Self {
latest: None,
status: PREVIEW_IDLE_STATUS.to_string(),
generation: 1,
clear_picture: true,
last_logged_error: None,
last_logged_status: None,
telemetry: PreviewTelemetry::default(),
}
}
fn set_status(&mut self, status: impl Into<String>, clear_picture: bool) {
let status = status.into();
let changed = self.status != status || clear_picture;
self.status = status.clone();
if clear_picture {
self.latest = None;
self.clear_picture = true;
}
if !looks_like_preview_problem(&status) {
self.last_logged_error = None;
}
if changed {
self.generation = self.generation.saturating_add(1);
}
}
fn push_frame(&mut self, frame: PreviewFrame) {
self.telemetry.record_presented_frame();
self.latest = Some(frame);
self.clear_picture = false;
self.last_logged_error = None;
if self.status != "Live" {
self.status = "Live".to_string();
self.generation = self.generation.saturating_add(1);
}
}
}
#[cfg(not(coverage))]
#[derive(Debug, Default)]
struct PreviewTelemetry {
packet_times: VecDeque<Instant>,
frame_times: VecDeque<Instant>,
packet_intervals_ms: VecDeque<(Instant, f32)>,
frame_intervals_ms: VecDeque<(Instant, f32)>,
packet_losses: VecDeque<(Instant, u64)>,
dropped_deltas: VecDeque<(Instant, u64)>,
queue_depth_samples: VecDeque<(Instant, u32)>,
last_packet_at: Option<Instant>,
last_frame_at: Option<Instant>,
last_seq: Option<u64>,
last_dropped_total: Option<u64>,
latest_server_fps: u32,
latest_server_process_cpu_tenths: u32,
latest_queue_depth: u32,
latest_server_source_gap_peak_ms: u32,
latest_server_send_gap_peak_ms: u32,
latest_server_queue_peak: u32,
latest_server_encoder_label: String,
decoder_label: String,
stream_caps_label: String,
decoded_caps_label: String,
rendered_caps_label: String,
}
#[cfg(not(coverage))]
impl PreviewTelemetry {
fn record_packet(
&mut self,
seq: u64,
server_fps: u32,
dropped_total: u64,
queue_depth: u32,
server_source_gap_peak_ms: u32,
server_send_gap_peak_ms: u32,
server_queue_peak: u32,
server_encoder_label: &str,
server_process_cpu_tenths: u32,
) {
self.record_packet_at(
Instant::now(),
seq,
server_fps,
dropped_total,
queue_depth,
server_source_gap_peak_ms,
server_send_gap_peak_ms,
server_queue_peak,
server_encoder_label,
server_process_cpu_tenths,
);
}
fn record_packet_at(
&mut self,
now: Instant,
seq: u64,
server_fps: u32,
dropped_total: u64,
queue_depth: u32,
server_source_gap_peak_ms: u32,
server_send_gap_peak_ms: u32,
server_queue_peak: u32,
server_encoder_label: &str,
server_process_cpu_tenths: u32,
) {
self.trim(now);
self.packet_times.push_back(now);
if let Some(previous) = self.last_packet_at.replace(now) {
self.packet_intervals_ms.push_back((
now,
now.saturating_duration_since(previous).as_secs_f32() * 1000.0,
));
}
if seq > 0 {
if let Some(previous_seq) = self.last_seq
&& seq > previous_seq + 1
{
self.packet_losses
.push_back((now, seq.saturating_sub(previous_seq + 1)));
}
self.last_seq = Some(seq);
}
if let Some(previous_dropped) = self.last_dropped_total
&& dropped_total > previous_dropped
{
self.dropped_deltas
.push_back((now, dropped_total.saturating_sub(previous_dropped)));
}
self.last_dropped_total = Some(dropped_total);
self.latest_server_fps = server_fps.max(1);
self.latest_server_process_cpu_tenths = server_process_cpu_tenths;
self.latest_queue_depth = queue_depth;
self.latest_server_source_gap_peak_ms = server_source_gap_peak_ms;
self.latest_server_send_gap_peak_ms = server_send_gap_peak_ms;
self.latest_server_queue_peak = server_queue_peak.max(queue_depth);
if !server_encoder_label.is_empty() {
self.latest_server_encoder_label = server_encoder_label.to_string();
}
self.queue_depth_samples.push_back((now, queue_depth));
self.trim(now);
}
fn record_presented_frame(&mut self) {
self.record_presented_frame_at(Instant::now());
}
fn record_presented_frame_at(&mut self, now: Instant) {
self.trim(now);
if let Some(previous) = self.last_frame_at.replace(now) {
self.frame_intervals_ms.push_back((
now,
now.saturating_duration_since(previous).as_secs_f32() * 1000.0,
));
}
self.frame_times.push_back(now);
}
fn note_decoder(&mut self, decoder_label: &str) {
if !decoder_label.is_empty() {
self.decoder_label = decoder_label.to_string();
}
}
fn note_stream_caps(&mut self, caps_label: &str) {
if !caps_label.is_empty() {
self.stream_caps_label = caps_label.to_string();
}
}
fn note_decoded_caps(&mut self, caps_label: &str) {
if !caps_label.is_empty() {
self.decoded_caps_label = caps_label.to_string();
}
}
fn note_rendered_caps(&mut self, caps_label: &str) {
if !caps_label.is_empty() {
self.rendered_caps_label = caps_label.to_string();
}
}
fn snapshot(&mut self) -> PreviewMetricsSnapshot {
self.snapshot_at(Instant::now())
}
fn snapshot_at(&mut self, now: Instant) -> PreviewMetricsSnapshot {
self.trim(now);
let receive_fps = events_per_second(&self.packet_times, now);
let present_fps = events_per_second(&self.frame_times, now);
let delivered = self.packet_times.len() as u64;
let packet_losses: u64 = self.packet_losses.iter().map(|(_, loss)| *loss).sum();
let packet_loss_pct = if delivered + packet_losses == 0 {
0.0
} else {
packet_losses as f32 * 100.0 / (delivered + packet_losses) as f32
};
let dropped_frames: u64 = self
.dropped_deltas
.iter()
.map(|(_, dropped)| *dropped)
.sum();
let queue_depth_peak = self
.queue_depth_samples
.iter()
.map(|(_, depth)| *depth)
.max()
.unwrap_or(self.latest_queue_depth);
PreviewMetricsSnapshot {
receive_fps,
present_fps,
server_fps: self.latest_server_fps as f32,
server_process_cpu_pct: self.latest_server_process_cpu_tenths as f32 / 10.0,
stream_spread_ms: compute_jitter_ms(&self.packet_intervals_ms),
packet_loss_pct,
dropped_frames,
queue_depth: self.latest_queue_depth,
queue_depth_peak,
packet_gap_peak_ms: compute_peak_gap_ms(&self.packet_intervals_ms),
present_gap_peak_ms: compute_peak_gap_ms(&self.frame_intervals_ms),
server_source_gap_peak_ms: self.latest_server_source_gap_peak_ms as f32,
server_send_gap_peak_ms: self.latest_server_send_gap_peak_ms as f32,
server_queue_peak: self.latest_server_queue_peak,
server_encoder_label: self.latest_server_encoder_label.clone(),
decoder_label: self.decoder_label.clone(),
stream_caps_label: self.stream_caps_label.clone(),
decoded_caps_label: self.decoded_caps_label.clone(),
rendered_caps_label: self.rendered_caps_label.clone(),
}
}
fn trim(&mut self, now: Instant) {
trim_instant_queue(&mut self.packet_times, now);
trim_instant_queue(&mut self.frame_times, now);
trim_value_queue(&mut self.packet_intervals_ms, now);
trim_value_queue(&mut self.frame_intervals_ms, now);
trim_value_queue(&mut self.packet_losses, now);
trim_value_queue(&mut self.dropped_deltas, now);
trim_value_queue(&mut self.queue_depth_samples, now);
}
}
#[cfg(not(coverage))]
impl PreviewFeed {
fn spawn(
server_addr: Arc<Mutex<String>>,
monitor_id: u32,
profile: PreviewProfile,
log_sink: Arc<Mutex<Option<std::sync::mpsc::Sender<String>>>>,
) -> Result<Self> {
let shared = Arc::new(Mutex::new(SharedPreviewState::new()));
let session_active = Arc::new(AtomicBool::new(false));
let active_bindings = Arc::new(AtomicUsize::new(0));
let running = Arc::new(AtomicBool::new(true));
let shared_state = Arc::clone(&shared);
let session_active_flag = Arc::clone(&session_active);
let active_bindings_flag = Arc::clone(&active_bindings);
let running_flag = Arc::clone(&running);
std::thread::spawn(move || {
if let Err(err) = run_preview_feed(
server_addr,
monitor_id,
profile,
session_active_flag,
active_bindings_flag,
running_flag,
shared_state,
log_sink,
) {
warn!(monitor_id, ?err, "launcher preview feed exited");
}
});
Ok(Self {
shared,
session_active,
active_bindings,
running,
profile,
disabled: false,
})
}
fn spawn_disabled(profile: PreviewProfile) -> Self {
let shared = Arc::new(Mutex::new(SharedPreviewState::new()));
if let Ok(mut slot) = shared.lock() {
slot.set_status("Feed disabled.", true);
}
Self {
shared,
session_active: Arc::new(AtomicBool::new(false)),
active_bindings: Arc::new(AtomicUsize::new(0)),
running: Arc::new(AtomicBool::new(false)),
profile,
disabled: true,
}
}
fn profile(&self) -> PreviewProfile {
self.profile
}
fn is_disabled(&self) -> bool {
self.disabled
}
fn is_active(&self) -> bool {
self.session_active.load(Ordering::Relaxed)
}
fn set_active(&self, active: bool) {
self.session_active.store(active, Ordering::Relaxed);
if !active && !self.disabled {
self.replace_status(PREVIEW_IDLE_STATUS, true);
}
}
fn shutdown(&self) {
self.running.store(false, Ordering::Relaxed);
self.replace_status(
if self.disabled {
"Feed disabled."
} else {
PREVIEW_IDLE_STATUS
},
true,
);
}
fn replace_status(&self, status: impl Into<String>, clear_picture: bool) {
if let Ok(mut shared) = self.shared.lock() {
shared.set_status(status, clear_picture);
}
}
fn install_on_picture(
&self,
picture: &gtk::Picture,
status_label: &gtk::Label,
) -> PreviewBinding {
let picture = picture.clone();
let status_label = status_label.clone();
let shared = Arc::clone(&self.shared);
let enabled = Arc::new(AtomicBool::new(true));
let alive = Arc::new(AtomicBool::new(true));
let active_bindings = Arc::clone(&self.active_bindings);
let enabled_flag = Arc::clone(&enabled);
let alive_flag = Arc::clone(&alive);
active_bindings.fetch_add(1, Ordering::AcqRel);
let mut last_generation = 0_u64;
glib::timeout_add_local(Duration::from_millis(120), move || {
if !alive_flag.load(Ordering::Relaxed) {
return glib::ControlFlow::Break;
}
if !enabled_flag.load(Ordering::Relaxed) {
return glib::ControlFlow::Continue;
}
let (frame, status, generation, clear_picture) = match shared.lock() {
Ok(mut slot) => {
let frame = slot.latest.take();
let status = slot.status.clone();
let generation = slot.generation;
let clear_picture = slot.clear_picture;
slot.clear_picture = false;
(frame, status, generation, clear_picture)
}
Err(_) => return glib::ControlFlow::Continue,
};
if clear_picture {
picture.set_paintable(Option::<&gdk::Paintable>::None);
}
if let Some(frame) = frame {
let bytes = glib::Bytes::from_owned(frame.rgba);
let texture = gdk::MemoryTexture::new(
frame.width,
frame.height,
gdk::MemoryFormat::R8g8b8a8,
&bytes,
frame.stride,
);
picture.set_paintable(Some(&texture));
}
if generation != last_generation {
status_label.set_text(&status);
status_label.set_tooltip_text(Some(&status));
last_generation = generation;
}
glib::ControlFlow::Continue
});
PreviewBinding {
enabled,
alive,
active_bindings,
}
}
fn snapshot_metrics(&self) -> PreviewMetricsSnapshot {
self.shared
.lock()
.map(|mut shared| shared.telemetry.snapshot())
.unwrap_or_default()
}
}
#[cfg(not(coverage))]
struct PreviewFrame {
width: i32,
height: i32,
stride: usize,
rgba: Vec<u8>,
}
#[cfg(not(coverage))]
fn run_preview_feed(
server_addr: Arc<Mutex<String>>,
monitor_id: u32,
profile: PreviewProfile,
session_active: Arc<AtomicBool>,
active_bindings: Arc<AtomicUsize>,
running: Arc<AtomicBool>,
shared: Arc<Mutex<SharedPreviewState>>,
log_sink: Arc<Mutex<Option<std::sync::mpsc::Sender<String>>>>,
) -> Result<()> {
let (pipeline, appsrc, appsink, decoder_name) = build_preview_pipeline(profile)?;
let parser = pipeline.by_name("preview_parse");
let decoder = pipeline.by_name("decoder");
if let Ok(mut slot) = shared.lock() {
slot.telemetry.note_decoder(&decoder_name);
}
{
let shared = Arc::clone(&shared);
pipeline.connect_deep_element_added(move |_, _, element| {
if let Some(decoder_label) = preview_decoder_label(element)
&& let Ok(mut slot) = shared.lock()
{
slot.telemetry.note_decoder(&decoder_label);
}
});
}
pipeline
.set_state(gst::State::Playing)
.context("starting launcher preview pipeline")?;
{
let shared = Arc::clone(&shared);
let appsink = appsink.clone();
let parser = parser.clone();
let decoder = decoder.clone();
let running = Arc::clone(&running);
std::thread::spawn(move || {
loop {
if !running.load(Ordering::Relaxed) {
break;
}
if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) {
if let Some(parser) = parser.as_ref() {
record_preview_caps(&shared, parser, "src", PreviewCapsKind::Stream);
}
if let Some(decoder) = decoder.as_ref() {
record_preview_caps(&shared, decoder, "src", PreviewCapsKind::Decoded);
}
if let Some(caps) = sample.caps() {
let caps_label = preview_caps_summary(&caps);
if !caps_label.is_empty()
&& let Ok(mut slot) = shared.lock()
{
slot.telemetry.note_rendered_caps(&caps_label);
}
}
if let Some(frame) = sample_to_frame(&sample) {
if let Ok(mut slot) = shared.lock() {
slot.push_frame(frame);
}
}
}
}
});
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("building preview tokio runtime")?;
let _ = rt.block_on(async move {
let mut was_active = false;
let mut retry_delay = Duration::from_millis(750);
loop {
if !running.load(Ordering::Relaxed) {
break;
}
let active_now = session_active.load(Ordering::Relaxed)
&& active_bindings.load(Ordering::Relaxed) > 0;
if !active_now {
was_active = false;
retry_delay = Duration::from_millis(750);
set_shared_status(&shared, &log_sink, monitor_id, PREVIEW_IDLE_STATUS, true);
tokio::time::sleep(Duration::from_millis(150)).await;
continue;
}
if !was_active {
was_active = true;
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Waking relay preview...",
true,
);
tokio::time::sleep(Duration::from_millis(350)).await;
}
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Connecting relay preview...",
true,
);
let current_addr = match server_addr.lock() {
Ok(value) => value.clone(),
Err(_) => {
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Preview address is unavailable.",
true,
);
tokio::time::sleep(Duration::from_millis(750)).await;
continue;
}
};
let channel = match Channel::from_shared(current_addr.clone()) {
Ok(endpoint) => match endpoint.tcp_nodelay(true).connect().await {
Ok(channel) => channel,
Err(err) => {
warn!(monitor_id, ?err, "launcher preview connect failed");
log_preview_issue(
&shared,
&log_sink,
monitor_id,
&format!("Preview host is unavailable: {err}"),
);
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Preview host is unavailable.",
true,
);
tokio::time::sleep(retry_delay).await;
continue;
}
},
Err(err) => {
warn!(monitor_id, ?err, "launcher preview endpoint invalid");
log_preview_issue(
&shared,
&log_sink,
monitor_id,
&format!("Preview address is invalid: {err}"),
);
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Preview address is invalid.",
true,
);
tokio::time::sleep(retry_delay).await;
continue;
}
};
let mut cli = RelayClient::new(channel);
let req = MonitorRequest {
id: monitor_id,
max_bitrate: profile.max_bitrate_kbit,
requested_width: profile.requested_width.max(0) as u32,
requested_height: profile.requested_height.max(0) as u32,
requested_fps: profile.requested_fps,
source_id: Some(profile.source_monitor_id),
};
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
retry_delay = Duration::from_millis(750);
debug!(monitor_id, "launcher preview connected");
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Waiting for stream...",
true,
);
loop {
if !session_active.load(Ordering::Relaxed)
|| !running.load(Ordering::Relaxed)
|| active_bindings.load(Ordering::Relaxed) == 0
{
break;
}
match tokio::time::timeout(
Duration::from_millis(300),
stream.get_mut().message(),
)
.await
{
Ok(Ok(Some(pkt))) => {
record_preview_packet(&shared, &pkt);
push_preview_packet(&appsrc, pkt);
}
Ok(Ok(None)) => {
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Preview stream ended.",
true,
);
retry_delay = Duration::from_millis(1_500);
break;
}
Ok(Err(err)) => {
warn!(monitor_id, ?err, "launcher preview stream error");
log_preview_issue(
&shared,
&log_sink,
monitor_id,
&format!("Preview stream error: {err}"),
);
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Preview stream error. See session log.",
true,
);
retry_delay =
preview_retry_delay(retry_delay, Some(&err.to_string()));
break;
}
Err(_) => continue,
}
}
}
Err(err) => {
if preview_startup_condition(&err) {
debug!(
monitor_id,
?err,
"launcher preview waiting for capture pipeline"
);
log_preview_issue(
&shared,
&log_sink,
monitor_id,
&format!("Waiting for capture pipeline: {err}"),
);
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Waiting for capture pipeline...",
true,
);
retry_delay = preview_retry_delay(retry_delay, Some(err.message()));
} else {
warn!(monitor_id, ?err, "launcher preview rpc failed");
log_preview_issue(
&shared,
&log_sink,
monitor_id,
&format!("Preview RPC failed: {err}"),
);
set_shared_status(
&shared,
&log_sink,
monitor_id,
"Preview RPC failed. See session log.",
true,
);
retry_delay = preview_retry_delay(retry_delay, Some(err.message()));
}
}
}
tokio::time::sleep(retry_delay).await;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
let _ = pipeline.set_state(gst::State::Null);
Ok(())
}
#[cfg(not(coverage))]
fn preview_startup_condition(err: &tonic::Status) -> bool {
let message = err.message().to_ascii_lowercase();
err.code() == tonic::Code::Internal
&& (message.contains("starting video pipeline")
|| message.contains("failed to change its state")
|| message.contains("resource busy")
|| message.contains("device or resource busy")
|| message.contains("no signal")
|| message.contains("was not ready")
|| message.contains("no such file or directory"))
}
#[cfg(not(coverage))]
fn preview_retry_delay(current: Duration, message: Option<&str>) -> Duration {
let current_ms = current.as_millis() as u64;
let mut next_ms = if current_ms < 1_500 {
1_500
} else {
current_ms.saturating_mul(2)
};
if let Some(message) = message {
let message = message.to_ascii_lowercase();
if message.contains("too many open files")
|| message.contains("failed to change its state")
|| message.contains("resource busy")
|| message.contains("device or resource busy")
{
next_ms = next_ms.max(6_000);
}
}
Duration::from_millis(next_ms.min(30_000))
}
#[cfg(not(coverage))]
fn set_shared_status(
shared: &Arc<Mutex<SharedPreviewState>>,
log_sink: &Arc<Mutex<Option<std::sync::mpsc::Sender<String>>>>,
monitor_id: u32,
status: impl Into<String>,
clear: bool,
) {
let status = status.into();
let should_log = if let Ok(mut slot) = shared.lock() {
let should_log = slot.last_logged_status.as_deref() != Some(status.as_str());
if should_log {
slot.last_logged_status = Some(status.clone());
}
slot.set_status(status.clone(), clear);
should_log
} else {
false
};
if should_log {
log_preview_status(log_sink, monitor_id, &status);
}
}
#[cfg(not(coverage))]
fn log_preview_issue(
shared: &Arc<Mutex<SharedPreviewState>>,
log_sink: &Arc<Mutex<Option<std::sync::mpsc::Sender<String>>>>,
monitor_id: u32,
message: &str,
) {
let should_log = if let Ok(mut slot) = shared.lock() {
if slot.last_logged_error.as_deref() == Some(message) {
false
} else {
slot.last_logged_error = Some(message.to_string());
true
}
} else {
false
};
if !should_log {
return;
}
if let Ok(slot) = log_sink.lock()
&& let Some(tx) = slot.as_ref()
{
let _ = tx.send(format!(
"[preview:{}] {message}",
preview_eye_label(monitor_id)
));
}
}
#[cfg(not(coverage))]
fn log_preview_status(
log_sink: &Arc<Mutex<Option<std::sync::mpsc::Sender<String>>>>,
monitor_id: u32,
status: &str,
) {
if status == PREVIEW_IDLE_STATUS {
return;
}
let eye = preview_eye_label(monitor_id);
let message = match status {
"Waking relay preview..." => format!("🪄 {eye} eye is waking the preview spell."),
"Connecting relay preview..." => format!("🛰️ dialing the {eye} eye feed."),
"Waiting for stream..." => {
format!("👀 {eye} eye is connected and waiting for the first frame.")
}
"Preview stream ended." => format!("🌙 {eye} eye preview stream ended."),
"Preview host is unavailable." => format!("💔 {eye} eye cannot reach the preview host."),
"Preview address is unavailable." => {
format!("🧭 {eye} eye does not have a usable preview address yet.")
}
"Preview address is invalid." => format!("🧭 {eye} eye was given a bad preview address."),
"Waiting for capture pipeline..." => {
format!("{eye} eye is waiting for the capture pipeline to wake up.")
}
"Preview stream error. See session log." => {
format!("💥 {eye} eye hit a preview stream error. See the log spellbook for detail.")
}
"Preview RPC failed. See session log." => {
format!("💥 {eye} eye preview RPC fizzled. See the log spellbook for detail.")
}
other => format!("🎥 {eye} eye: {other}"),
};
if let Ok(slot) = log_sink.lock()
&& let Some(tx) = slot.as_ref()
{
let _ = tx.send(format!(
"[preview:{}] {message}",
preview_eye_label(monitor_id)
));
}
}
#[cfg(not(coverage))]
fn preview_eye_label(monitor_id: u32) -> &'static str {
match monitor_id {
0 => "left",
1 => "right",
_ => "eye",
}
}
#[cfg(not(coverage))]
fn looks_like_preview_problem(status: &str) -> bool {
let lower = status.to_ascii_lowercase();
lower.contains("unavailable")
|| lower.contains("invalid")
|| lower.contains("failed")
|| lower.contains("waiting for capture pipeline")
|| lower.contains("error")
}
#[cfg(not(coverage))]
fn build_preview_pipeline(
profile: PreviewProfile,
) -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink, String)> {
let decoder_name = pick_h264_decoder();
let source_mode = eye_source_mode_for_request(
profile.requested_width.max(2) as u32,
profile.requested_height.max(2) as u32,
);
let (render_width, render_height) =
preview_render_size(profile, source_mode.width, source_mode.height);
let desc = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=6 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
h264parse name=preview_parse disable-passthrough=true ! {decoder_name} name=decoder ! videoconvert ! \
videoscale add-borders=false ! \
video/x-raw,format=RGBA,width=(int){render_width},height=(int){render_height},pixel-aspect-ratio=1/1 ! \
appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true",
);
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("preview pipeline");
let appsrc = pipeline
.by_name("src")
.context("missing preview appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("preview appsrc");
appsrc.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
appsrc.set_format(gst::Format::Time);
let appsink = pipeline
.by_name("sink")
.context("missing preview appsink")?
.downcast::<gst_app::AppSink>()
.expect("preview appsink");
appsink.set_caps(Some(
&gst::Caps::builder("video/x-raw")
.field("format", &"RGBA")
.field("width", &(render_width as i32))
.field("height", &(render_height as i32))
.field("pixel-aspect-ratio", &gst::Fraction::new(1, 1))
.build(),
));
Ok((pipeline, appsrc, appsink, decoder_name))
}
#[cfg(not(coverage))]
fn preview_render_size(
profile: PreviewProfile,
source_width: u32,
source_height: u32,
) -> (i32, i32) {
fn round_down_even(value: i32) -> i32 {
let clamped = value.max(2);
clamped - (clamped % 2)
}
let source_w = source_width.max(2) as f32;
let source_h = source_height.max(2) as f32;
let max_w = profile.display_width.max(2) as f32;
let max_h = profile.display_height.max(2) as f32;
let scale = (max_w / source_w).min(max_h / source_h).min(1.0).max(0.01);
let render_w = round_down_even((source_w * scale).round() as i32);
let render_h = round_down_even((source_h * scale).round() as i32);
(render_w.max(2), render_h.max(2))
}
#[cfg(not(coverage))]
fn push_preview_packet(appsrc: &gst_app::AppSrc, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
if let Some(buf) = buf.get_mut() {
buf.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
}
let _ = appsrc.push_buffer(buf);
}
#[cfg(not(coverage))]
#[derive(Clone, Copy)]
enum PreviewCapsKind {
Stream,
Decoded,
}
#[cfg(not(coverage))]
fn record_preview_caps(
shared: &Arc<Mutex<SharedPreviewState>>,
element: &gst::Element,
pad_name: &str,
kind: PreviewCapsKind,
) {
let Some(pad) = element.static_pad(pad_name) else {
return;
};
let Some(caps) = pad.current_caps() else {
return;
};
let caps_label = preview_caps_summary(&caps);
if caps_label.is_empty() {
return;
}
if let Ok(mut slot) = shared.lock() {
match kind {
PreviewCapsKind::Stream => slot.telemetry.note_stream_caps(&caps_label),
PreviewCapsKind::Decoded => slot.telemetry.note_decoded_caps(&caps_label),
}
}
}
#[cfg(not(coverage))]
fn preview_caps_summary(caps: &impl std::fmt::Display) -> String {
caps.to_string()
}
#[cfg(not(coverage))]
fn record_preview_packet(shared: &Arc<Mutex<SharedPreviewState>>, pkt: &VideoPacket) {
if let Ok(mut slot) = shared.lock() {
slot.telemetry.record_packet(
pkt.seq,
pkt.effective_fps,
pkt.dropped_total,
pkt.queue_depth,
pkt.server_source_gap_peak_ms,
pkt.server_send_gap_peak_ms,
pkt.server_queue_peak,
&pkt.server_encoder_label,
pkt.server_process_cpu_tenths,
);
}
}
#[cfg(not(coverage))]
fn sample_to_frame(sample: &gst::Sample) -> Option<PreviewFrame> {
let caps = sample.caps()?;
let structure = caps.structure(0)?;
let width = structure.get::<i32>("width").ok()?;
let height = structure.get::<i32>("height").ok()?;
let buffer = sample.buffer()?;
let map = buffer.map_readable().ok()?;
let rgba = map.as_slice().to_vec();
let stride = rgba.len() / height.max(1) as usize;
Some(PreviewFrame {
width,
height,
stride,
rgba,
})
}
#[cfg(not(coverage))]
fn preview_decoder_label(element: &gst::Element) -> Option<String> {
let factory = element.factory()?;
let klass = factory.klass().to_ascii_lowercase();
if !klass.contains("decoder") || !klass.contains("video") {
return None;
}
Some(factory.name().to_string())
}
#[cfg(not(coverage))]
fn preview_bitrate(var: &str, default: u32) -> u32 {
std::env::var(var)
.ok()
.and_then(|raw| raw.parse::<u32>().ok())
.unwrap_or(default)
}
#[cfg(not(coverage))]
fn preview_dimension(var: &str, default: i32) -> i32 {
std::env::var(var)
.ok()
.and_then(|raw| raw.parse::<i32>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
#[cfg(not(coverage))]
fn sanitize_preview_request(
requested_width: i32,
requested_height: i32,
requested_fps: u32,
max_bitrate_kbit: u32,
) -> (i32, i32, u32, u32) {
(
requested_width.max(2),
requested_height.max(2),
requested_fps.max(1),
max_bitrate_kbit.max(800),
)
}
#[cfg(not(coverage))]
#[cfg(not(coverage))]
fn events_per_second(events: &VecDeque<Instant>, now: Instant) -> f32 {
let Some(oldest) = events.front().copied() else {
return 0.0;
};
let span = now
.saturating_duration_since(oldest)
.as_secs_f32()
.clamp(0.25, TELEMETRY_WINDOW.as_secs_f32());
events.len() as f32 / span
}
#[cfg(not(coverage))]
fn trim_instant_queue(queue: &mut VecDeque<Instant>, now: Instant) {
while let Some(oldest) = queue.front().copied() {
if now.saturating_duration_since(oldest) > TELEMETRY_WINDOW {
let _ = queue.pop_front();
} else {
break;
}
}
}
#[cfg(not(coverage))]
fn trim_value_queue<T>(queue: &mut VecDeque<(Instant, T)>, now: Instant) {
while let Some((oldest, _)) = queue.front() {
if now.saturating_duration_since(*oldest) > TELEMETRY_WINDOW {
let _ = queue.pop_front();
} else {
break;
}
}
}
#[cfg(not(coverage))]
fn compute_jitter_ms(samples: &VecDeque<(Instant, f32)>) -> f32 {
if samples.len() < 2 {
return 0.0;
}
let mean = samples.iter().map(|(_, value)| *value).sum::<f32>() / samples.len() as f32;
samples
.iter()
.map(|(_, value)| (value - mean).abs())
.sum::<f32>()
/ samples.len() as f32
}
#[cfg(not(coverage))]
fn compute_peak_gap_ms(samples: &VecDeque<(Instant, f32)>) -> f32 {
samples.iter().map(|(_, value)| *value).fold(0.0, f32::max)
}
#[cfg(test)]
mod tests {
use super::{
DEFAULT_EYE_SOURCE_HEIGHT, DEFAULT_EYE_SOURCE_WIDTH, INLINE_PREVIEW_MAX_KBIT,
INLINE_PREVIEW_REQUEST_FPS, INLINE_PREVIEW_REQUEST_HEIGHT, INLINE_PREVIEW_REQUEST_WIDTH,
LauncherPreview, PREVIEW_HEIGHT, PREVIEW_WIDTH, PreviewSurface, PreviewTelemetry,
preview_render_size, sanitize_preview_request,
};
use crate::launcher::state::{CaptureSizePreset, LauncherState};
use futures::stream;
use lesavka_common::lesavka::relay_server::{Relay, RelayServer};
use lesavka_common::lesavka::{MonitorRequest, VideoPacket};
use serial_test::serial;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
#[derive(Clone, Default)]
struct ProbeRelay {
requests: Arc<Mutex<Vec<MonitorRequest>>>,
}
#[tonic::async_trait]
impl Relay for ProbeRelay {
type StreamKeyboardStream = Pin<
Box<
dyn futures::Stream<Item = Result<lesavka_common::lesavka::KeyboardReport, Status>>
+ Send,
>,
>;
type StreamMouseStream = Pin<
Box<
dyn futures::Stream<Item = Result<lesavka_common::lesavka::MouseReport, Status>>
+ Send,
>,
>;
type CaptureVideoStream =
Pin<Box<dyn futures::Stream<Item = Result<VideoPacket, Status>> + Send>>;
type CaptureAudioStream = Pin<
Box<
dyn futures::Stream<Item = Result<lesavka_common::lesavka::AudioPacket, Status>>
+ Send,
>,
>;
type StreamMicrophoneStream = Pin<
Box<dyn futures::Stream<Item = Result<lesavka_common::lesavka::Empty, Status>> + Send>,
>;
type StreamCameraStream = Pin<
Box<dyn futures::Stream<Item = Result<lesavka_common::lesavka::Empty, Status>> + Send>,
>;
async fn stream_keyboard(
&self,
_request: Request<tonic::Streaming<lesavka_common::lesavka::KeyboardReport>>,
) -> Result<Response<Self::StreamKeyboardStream>, Status> {
Ok(Response::new(Box::pin(stream::empty())))
}
async fn stream_mouse(
&self,
_request: Request<tonic::Streaming<lesavka_common::lesavka::MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
Ok(Response::new(Box::pin(stream::empty())))
}
async fn capture_video(
&self,
request: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureVideoStream>, Status> {
self.requests.lock().unwrap().push(request.into_inner());
let (_tx, rx) = mpsc::channel(1);
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
async fn capture_audio(
&self,
_request: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
Ok(Response::new(Box::pin(stream::empty())))
}
async fn stream_microphone(
&self,
_request: Request<tonic::Streaming<lesavka_common::lesavka::AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
Ok(Response::new(Box::pin(stream::empty())))
}
async fn stream_camera(
&self,
_request: Request<tonic::Streaming<VideoPacket>>,
) -> Result<Response<Self::StreamCameraStream>, Status> {
Ok(Response::new(Box::pin(stream::empty())))
}
async fn paste_text(
&self,
_request: Request<lesavka_common::lesavka::PasteRequest>,
) -> Result<Response<lesavka_common::lesavka::PasteReply>, Status> {
Ok(Response::new(lesavka_common::lesavka::PasteReply {
ok: true,
error: String::new(),
}))
}
async fn reset_usb(
&self,
_request: Request<lesavka_common::lesavka::Empty>,
) -> Result<Response<lesavka_common::lesavka::ResetUsbReply>, Status> {
Ok(Response::new(lesavka_common::lesavka::ResetUsbReply {
ok: true,
}))
}
async fn get_capture_power(
&self,
_request: Request<lesavka_common::lesavka::Empty>,
) -> Result<Response<lesavka_common::lesavka::CapturePowerState>, Status> {
Ok(Response::new(lesavka_common::lesavka::CapturePowerState {
available: true,
enabled: true,
unit: "relay.service".to_string(),
detail: "active/running".to_string(),
active_leases: 1,
mode: "auto".to_string(),
detected_devices: 2,
}))
}
async fn set_capture_power(
&self,
_request: Request<lesavka_common::lesavka::SetCapturePowerRequest>,
) -> Result<Response<lesavka_common::lesavka::CapturePowerState>, Status> {
self.get_capture_power(Request::new(lesavka_common::lesavka::Empty {}))
.await
}
}
#[test]
fn inline_preview_profile_uses_lightweight_defaults() {
let profile = PreviewSurface::Inline.profile();
assert_eq!(profile.display_width, PREVIEW_WIDTH);
assert_eq!(profile.display_height, PREVIEW_HEIGHT);
assert_eq!(profile.requested_width, INLINE_PREVIEW_REQUEST_WIDTH);
assert_eq!(profile.requested_height, INLINE_PREVIEW_REQUEST_HEIGHT);
assert_eq!(profile.requested_fps, INLINE_PREVIEW_REQUEST_FPS);
assert_eq!(profile.max_bitrate_kbit, INLINE_PREVIEW_MAX_KBIT);
}
#[test]
fn breakout_preview_profile_defaults_to_higher_quality() {
let profile = PreviewSurface::Window.profile();
assert_eq!(profile.display_width, 1280);
assert_eq!(profile.display_height, 720);
assert_eq!(profile.requested_width, DEFAULT_EYE_SOURCE_WIDTH);
assert_eq!(profile.requested_height, DEFAULT_EYE_SOURCE_HEIGHT);
assert_eq!(profile.requested_fps, 60);
assert_eq!(profile.max_bitrate_kbit, 18_000);
}
#[test]
fn preview_render_size_fits_source_into_display_budget() {
let profile = PreviewSurface::Inline.profile();
assert_eq!(preview_render_size(profile, 1920, 1080), (960, 540));
}
#[test]
fn preview_render_size_never_upscales_beyond_source_geometry() {
let profile = PreviewSurface::Window.profile();
assert_eq!(preview_render_size(profile, 1280, 720), (1280, 720));
}
#[test]
fn preview_request_sanitizer_keeps_requested_source_geometry() {
let adapted = sanitize_preview_request(1920, 1080, 60, 18_000);
assert_eq!(adapted, (1920, 1080, 60, 18_000));
}
#[test]
fn preview_request_sanitizer_clamps_invalid_values() {
let adapted = sanitize_preview_request(0, 0, 0, 0);
assert_eq!(adapted, (2, 2, 1, 800));
}
#[test]
fn preview_telemetry_reports_fps_jitter_loss_and_drop_metrics() {
let mut telemetry = PreviewTelemetry::default();
let start = Instant::now();
telemetry.note_decoder("nvh264dec");
telemetry.record_packet_at(start, 1, 30, 0, 1, 41, 38, 2, "x264enc", 215);
telemetry.record_presented_frame_at(start + Duration::from_millis(5));
telemetry.record_packet_at(
start + Duration::from_millis(33),
2,
30,
0,
1,
41,
38,
2,
"x264enc",
215,
);
telemetry.record_presented_frame_at(start + Duration::from_millis(37));
telemetry.record_packet_at(
start + Duration::from_millis(80),
4,
27,
2,
3,
77,
88,
4,
"x264enc",
382,
);
telemetry.record_presented_frame_at(start + Duration::from_millis(90));
let snapshot = telemetry.snapshot_at(start + Duration::from_millis(120));
assert!(snapshot.receive_fps >= 12.0);
assert!(snapshot.present_fps >= 12.0);
assert_eq!(snapshot.server_fps, 27.0);
assert!(snapshot.stream_spread_ms > 0.0);
assert!(snapshot.packet_loss_pct > 0.0);
assert_eq!(snapshot.dropped_frames, 2);
assert_eq!(snapshot.queue_depth, 3);
assert_eq!(snapshot.queue_depth_peak, 3);
assert!(snapshot.packet_gap_peak_ms >= 47.0);
assert!(snapshot.present_gap_peak_ms >= 53.0);
assert_eq!(snapshot.server_source_gap_peak_ms, 77.0);
assert_eq!(snapshot.server_send_gap_peak_ms, 88.0);
assert_eq!(snapshot.server_queue_peak, 4);
assert_eq!(snapshot.server_process_cpu_pct, 38.2);
assert_eq!(snapshot.server_encoder_label, "x264enc");
assert_eq!(snapshot.decoder_label, "nvh264dec");
}
#[test]
#[serial]
fn inline_preview_requests_selected_source_profile_on_wire() {
let relay = ProbeRelay::default();
let requests = relay.requests.clone();
let rt = tokio::runtime::Runtime::new().expect("runtime");
let addr = rt.block_on(async move {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(relay))
.serve(addr)
.await;
});
addr
});
let preview = LauncherPreview::new(format!("http://{addr}")).expect("preview");
let state = LauncherState::default();
let capture = state.capture_size_choice(1);
preview.set_capture_profile(
1,
1,
capture.width,
capture.height,
capture.fps,
capture.max_bitrate_kbit,
);
preview.activate_surface_for_test(1, PreviewSurface::Inline);
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if let Some(request) = requests.lock().unwrap().last().cloned() {
assert_eq!(request.id, 1);
assert_eq!(request.source_id, Some(1));
assert_eq!(request.requested_width, 1920);
assert_eq!(request.requested_height, 1080);
assert_eq!(request.requested_fps, 60);
assert_eq!(request.max_bitrate, 18_000);
preview.shutdown_all();
return;
}
std::thread::sleep(Duration::from_millis(50));
}
preview.shutdown_all();
panic!("preview did not issue a capture request within timeout");
}
#[test]
#[serial]
fn inline_preview_requests_honest_source_profile_on_wire() {
let relay = ProbeRelay::default();
let requests = relay.requests.clone();
let rt = tokio::runtime::Runtime::new().expect("runtime");
let addr = rt.block_on(async move {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(relay))
.serve(addr)
.await;
});
addr
});
let preview = LauncherPreview::new(format!("http://{addr}")).expect("preview");
let mut state = LauncherState::default();
state.set_capture_size_preset(1, CaptureSizePreset::P1080);
let capture = state.capture_size_choice(1);
preview.set_capture_profile(
1,
1,
capture.width,
capture.height,
capture.fps,
capture.max_bitrate_kbit,
);
preview.activate_surface_for_test(1, PreviewSurface::Inline);
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if let Some(request) = requests.lock().unwrap().last().cloned() {
assert_eq!(request.id, 1);
assert_eq!(request.source_id, Some(1));
assert_eq!(request.requested_width, 1920);
assert_eq!(request.requested_height, 1080);
assert_eq!(request.requested_fps, 60);
assert_eq!(request.max_bitrate, 18_000);
preview.shutdown_all();
return;
}
std::thread::sleep(Duration::from_millis(50));
}
preview.shutdown_all();
panic!("preview did not issue a source capture request within timeout");
}
#[test]
#[serial]
fn inline_preview_requests_native_720p_source_mode_on_wire() {
let relay = ProbeRelay::default();
let requests = relay.requests.clone();
let rt = tokio::runtime::Runtime::new().expect("runtime");
let addr = rt.block_on(async move {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(relay))
.serve(addr)
.await;
});
addr
});
let preview = LauncherPreview::new(format!("http://{addr}")).expect("preview");
let mut state = LauncherState::default();
state.set_capture_size_preset(1, CaptureSizePreset::P720);
let capture = state.capture_size_choice(1);
preview.set_capture_profile(
1,
1,
capture.width,
capture.height,
capture.fps,
capture.max_bitrate_kbit,
);
preview.activate_surface_for_test(1, PreviewSurface::Inline);
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if let Some(request) = requests.lock().unwrap().last().cloned() {
assert_eq!(request.id, 1);
assert_eq!(request.source_id, Some(1));
assert_eq!(request.requested_width, 1280);
assert_eq!(request.requested_height, 720);
assert_eq!(request.requested_fps, 60);
assert_eq!(request.max_bitrate, 12_000);
preview.shutdown_all();
return;
}
std::thread::sleep(Duration::from_millis(50));
}
preview.shutdown_all();
panic!("preview did not issue a 720p source capture request within timeout");
}
#[test]
#[serial]
fn inline_preview_legacy_low_modes_fall_forward_to_720p_on_wire() {
let relay = ProbeRelay::default();
let requests = relay.requests.clone();
let rt = tokio::runtime::Runtime::new().expect("runtime");
let addr = rt.block_on(async move {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(relay))
.serve(addr)
.await;
});
addr
});
let preview = LauncherPreview::new(format!("http://{addr}")).expect("preview");
let mut state = LauncherState::default();
state.set_capture_size_preset(1, CaptureSizePreset::P480);
let capture = state.capture_size_choice(1);
preview.set_capture_profile(
1,
1,
capture.width,
capture.height,
capture.fps,
capture.max_bitrate_kbit,
);
preview.activate_surface_for_test(1, PreviewSurface::Inline);
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if let Some(request) = requests.lock().unwrap().last().cloned() {
assert_eq!(request.id, 1);
assert_eq!(request.source_id, Some(1));
assert_eq!(request.requested_width, 1280);
assert_eq!(request.requested_height, 720);
assert_eq!(request.requested_fps, 60);
assert_eq!(request.max_bitrate, 12_000);
preview.shutdown_all();
return;
}
std::thread::sleep(Duration::from_millis(50));
}
preview.shutdown_all();
panic!("preview did not issue a 720p fallback source capture request within timeout");
}
#[test]
#[serial]
fn preview_can_request_other_eye_as_a_distinct_stream() {
let relay = ProbeRelay::default();
let requests = relay.requests.clone();
let rt = tokio::runtime::Runtime::new().expect("runtime");
let addr = rt.block_on(async move {
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
let addr = listener.local_addr().expect("addr");
drop(listener);
tokio::spawn(async move {
let _ = tonic::transport::Server::builder()
.add_service(RelayServer::new(relay))
.serve(addr)
.await;
});
addr
});
let preview = LauncherPreview::new(format!("http://{addr}")).expect("preview");
preview.set_capture_profile(0, 1, 1920, 1080, 30, 12_000);
preview.activate_surface_for_test(0, PreviewSurface::Inline);
let deadline = Instant::now() + Duration::from_secs(5);
while Instant::now() < deadline {
if let Some(request) = requests.lock().unwrap().last().cloned() {
assert_eq!(request.id, 0);
assert_eq!(request.source_id, Some(1));
assert_eq!(request.requested_width, 1920);
assert_eq!(request.requested_height, 1080);
preview.shutdown_all();
return;
}
std::thread::sleep(Duration::from_millis(50));
}
preview.shutdown_all();
panic!("preview did not issue a mirrored capture request within timeout");
}
}