lesavka/client/src/launcher/preview.rs

631 lines
21 KiB
Rust

#[cfg(not(coverage))]
use anyhow::{Context, Result};
#[cfg(not(coverage))]
use gstreamer as gst;
#[cfg(not(coverage))]
use gstreamer::prelude::{Cast, ElementExt, GstBinExt};
#[cfg(not(coverage))]
use gstreamer_app as gst_app;
#[cfg(not(coverage))]
use gtk::{gdk, glib};
#[cfg(not(coverage))]
use lesavka_common::lesavka::{MonitorRequest, VideoPacket, relay_client::RelayClient};
#[cfg(not(coverage))]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[cfg(not(coverage))]
use std::sync::{Arc, Mutex};
#[cfg(not(coverage))]
use std::time::Duration;
#[cfg(not(coverage))]
use tonic::{Request, transport::Channel};
#[cfg(not(coverage))]
use tracing::{debug, warn};
#[cfg(not(coverage))]
const PREVIEW_WIDTH: i32 = 640;
#[cfg(not(coverage))]
const PREVIEW_HEIGHT: i32 = 360;
#[cfg(not(coverage))]
const PREVIEW_IDLE_STATUS: &str = "Connect relay to preview.";
#[cfg(not(coverage))]
pub struct LauncherPreview {
server_addr: Arc<Mutex<String>>,
inline_feeds: [PreviewFeed; 2],
window_feeds: [PreviewFeed; 2],
}
#[cfg(not(coverage))]
#[derive(Clone)]
pub struct PreviewBinding {
enabled: Arc<AtomicBool>,
alive: Arc<AtomicBool>,
active_bindings: Arc<AtomicUsize>,
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug)]
pub enum PreviewSurface {
Inline,
Window,
}
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug)]
struct PreviewProfile {
width: i32,
height: i32,
max_bitrate_kbit: u32,
}
#[cfg(not(coverage))]
impl PreviewSurface {
fn profile(self) -> PreviewProfile {
match self {
Self::Inline => PreviewProfile {
width: preview_dimension("LESAVKA_PREVIEW_WIDTH", PREVIEW_WIDTH),
height: preview_dimension("LESAVKA_PREVIEW_HEIGHT", PREVIEW_HEIGHT),
max_bitrate_kbit: preview_bitrate("LESAVKA_PREVIEW_MAX_KBIT", 2_500),
},
Self::Window => PreviewProfile {
width: preview_dimension("LESAVKA_BREAKOUT_PREVIEW_WIDTH", 1280),
height: preview_dimension("LESAVKA_BREAKOUT_PREVIEW_HEIGHT", 720),
max_bitrate_kbit: preview_bitrate("LESAVKA_BREAKOUT_PREVIEW_MAX_KBIT", 8_000),
},
}
}
}
#[cfg(not(coverage))]
impl LauncherPreview {
pub fn new(server_addr: String) -> Result<Self> {
gst::init().context("initialising preview gstreamer")?;
let server_addr = Arc::new(Mutex::new(server_addr));
Ok(Self {
server_addr: Arc::clone(&server_addr),
inline_feeds: [
PreviewFeed::spawn(Arc::clone(&server_addr), 0, PreviewSurface::Inline.profile())?,
PreviewFeed::spawn(server_addr.clone(), 1, PreviewSurface::Inline.profile())?,
],
window_feeds: [
PreviewFeed::spawn(Arc::clone(&server_addr), 0, PreviewSurface::Window.profile())?,
PreviewFeed::spawn(server_addr, 1, PreviewSurface::Window.profile())?,
],
})
}
pub fn set_server_addr(&self, server_addr: String) {
if let Ok(mut slot) = self.server_addr.lock() {
*slot = server_addr;
}
}
pub fn set_session_active(&self, active: bool) {
for feed in self
.inline_feeds
.iter()
.chain(self.window_feeds.iter())
{
feed.set_active(active);
}
}
pub fn install_on_picture(
&self,
monitor_id: usize,
surface: PreviewSurface,
picture: &gtk::Picture,
status_label: &gtk::Label,
) -> Option<PreviewBinding> {
self.feeds_for_surface(surface)
.get(monitor_id)
.map(|feed| feed.install_on_picture(picture, status_label))
}
fn feeds_for_surface(&self, surface: PreviewSurface) -> &[PreviewFeed; 2] {
match surface {
PreviewSurface::Inline => &self.inline_feeds,
PreviewSurface::Window => &self.window_feeds,
}
}
}
#[cfg(not(coverage))]
impl PreviewBinding {
pub fn set_enabled(&self, enabled: bool) {
let was_enabled = self.enabled.swap(enabled, Ordering::AcqRel);
match (was_enabled, enabled) {
(false, true) => {
self.active_bindings.fetch_add(1, Ordering::AcqRel);
}
(true, false) => {
self.active_bindings.fetch_sub(1, Ordering::AcqRel);
}
_ => {}
}
}
pub fn close(&self) {
if !self.alive.swap(false, Ordering::AcqRel) {
return;
}
if self.enabled.swap(false, Ordering::AcqRel) {
self.active_bindings.fetch_sub(1, Ordering::AcqRel);
}
}
}
#[cfg(not(coverage))]
struct PreviewFeed {
shared: Arc<Mutex<SharedPreviewState>>,
session_active: Arc<AtomicBool>,
active_bindings: Arc<AtomicUsize>,
}
#[cfg(not(coverage))]
struct SharedPreviewState {
latest: Option<PreviewFrame>,
status: String,
generation: u64,
clear_picture: bool,
}
#[cfg(not(coverage))]
impl SharedPreviewState {
fn new() -> Self {
Self {
latest: None,
status: PREVIEW_IDLE_STATUS.to_string(),
generation: 1,
clear_picture: true,
}
}
fn set_status(&mut self, status: impl Into<String>, clear_picture: bool) {
let status = status.into();
let changed = self.status != status || clear_picture;
self.status = status;
if clear_picture {
self.latest = None;
self.clear_picture = true;
}
if changed {
self.generation = self.generation.saturating_add(1);
}
}
fn push_frame(&mut self, frame: PreviewFrame) {
self.latest = Some(frame);
self.clear_picture = false;
if self.status != "Live" {
self.status = "Live".to_string();
self.generation = self.generation.saturating_add(1);
}
}
}
#[cfg(not(coverage))]
impl PreviewFeed {
fn spawn(
server_addr: Arc<Mutex<String>>,
monitor_id: u32,
profile: PreviewProfile,
) -> Result<Self> {
let shared = Arc::new(Mutex::new(SharedPreviewState::new()));
let session_active = Arc::new(AtomicBool::new(false));
let active_bindings = Arc::new(AtomicUsize::new(0));
let shared_state = Arc::clone(&shared);
let session_active_flag = Arc::clone(&session_active);
let active_bindings_flag = Arc::clone(&active_bindings);
std::thread::spawn(move || {
if let Err(err) = run_preview_feed(
server_addr,
monitor_id,
profile,
session_active_flag,
active_bindings_flag,
shared_state,
) {
warn!(monitor_id, ?err, "launcher preview feed exited");
}
});
Ok(Self {
shared,
session_active,
active_bindings,
})
}
fn set_active(&self, active: bool) {
self.session_active.store(active, Ordering::Relaxed);
if !active {
self.replace_status(PREVIEW_IDLE_STATUS, true);
}
}
fn replace_status(&self, status: impl Into<String>, clear_picture: bool) {
if let Ok(mut shared) = self.shared.lock() {
shared.set_status(status, clear_picture);
}
}
fn install_on_picture(
&self,
picture: &gtk::Picture,
status_label: &gtk::Label,
) -> PreviewBinding {
let picture = picture.clone();
let status_label = status_label.clone();
let shared = Arc::clone(&self.shared);
let enabled = Arc::new(AtomicBool::new(true));
let alive = Arc::new(AtomicBool::new(true));
let active_bindings = Arc::clone(&self.active_bindings);
let enabled_flag = Arc::clone(&enabled);
let alive_flag = Arc::clone(&alive);
active_bindings.fetch_add(1, Ordering::AcqRel);
let mut last_generation = 0_u64;
glib::timeout_add_local(Duration::from_millis(120), move || {
if !alive_flag.load(Ordering::Relaxed) {
return glib::ControlFlow::Break;
}
if !enabled_flag.load(Ordering::Relaxed) {
return glib::ControlFlow::Continue;
}
let (frame, status, generation, clear_picture) = match shared.lock() {
Ok(mut slot) => {
let frame = slot.latest.take();
let status = slot.status.clone();
let generation = slot.generation;
let clear_picture = slot.clear_picture;
slot.clear_picture = false;
(frame, status, generation, clear_picture)
}
Err(_) => return glib::ControlFlow::Continue,
};
if clear_picture {
picture.set_paintable(Option::<&gdk::Paintable>::None);
}
if let Some(frame) = frame {
let bytes = glib::Bytes::from_owned(frame.rgba);
let texture = gdk::MemoryTexture::new(
frame.width,
frame.height,
gdk::MemoryFormat::R8g8b8a8,
&bytes,
frame.stride,
);
picture.set_paintable(Some(&texture));
}
if generation != last_generation {
status_label.set_text(&status);
last_generation = generation;
}
glib::ControlFlow::Continue
});
PreviewBinding {
enabled,
alive,
active_bindings,
}
}
}
#[cfg(not(coverage))]
struct PreviewFrame {
width: i32,
height: i32,
stride: usize,
rgba: Vec<u8>,
}
#[cfg(not(coverage))]
fn run_preview_feed(
server_addr: Arc<Mutex<String>>,
monitor_id: u32,
profile: PreviewProfile,
session_active: Arc<AtomicBool>,
active_bindings: Arc<AtomicUsize>,
shared: Arc<Mutex<SharedPreviewState>>,
) -> Result<()> {
let (pipeline, appsrc, appsink) = build_preview_pipeline(profile)?;
pipeline
.set_state(gst::State::Playing)
.context("starting launcher preview pipeline")?;
{
let shared = Arc::clone(&shared);
let appsink = appsink.clone();
std::thread::spawn(move || {
loop {
if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) {
if let Some(frame) = sample_to_frame(&sample) {
if let Ok(mut slot) = shared.lock() {
slot.push_frame(frame);
}
}
}
}
});
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("building preview tokio runtime")?;
let _ = rt.block_on(async move {
let mut was_active = false;
let mut retry_delay = Duration::from_millis(750);
loop {
let active_now = session_active.load(Ordering::Relaxed)
&& active_bindings.load(Ordering::Relaxed) > 0;
if !active_now {
was_active = false;
retry_delay = Duration::from_millis(750);
set_shared_status(&shared, PREVIEW_IDLE_STATUS, true);
tokio::time::sleep(Duration::from_millis(150)).await;
continue;
}
if !was_active {
was_active = true;
set_shared_status(&shared, "Waking relay preview...", true);
tokio::time::sleep(Duration::from_millis(350)).await;
}
set_shared_status(&shared, "Connecting relay preview...", true);
let current_addr = match server_addr.lock() {
Ok(value) => value.clone(),
Err(_) => {
set_shared_status(&shared, "Preview address is unavailable.", true);
tokio::time::sleep(Duration::from_millis(750)).await;
continue;
}
};
let channel = match Channel::from_shared(current_addr.clone()) {
Ok(endpoint) => match endpoint.tcp_nodelay(true).connect().await {
Ok(channel) => channel,
Err(err) => {
warn!(monitor_id, ?err, "launcher preview connect failed");
set_shared_status(
&shared,
format!("Preview host is unavailable: {err}"),
true,
);
tokio::time::sleep(retry_delay).await;
continue;
}
},
Err(err) => {
warn!(monitor_id, ?err, "launcher preview endpoint invalid");
set_shared_status(&shared, format!("Preview address is invalid: {err}"), true);
tokio::time::sleep(retry_delay).await;
continue;
}
};
let mut cli = RelayClient::new(channel);
let req = MonitorRequest {
id: monitor_id,
max_bitrate: profile.max_bitrate_kbit,
};
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
retry_delay = Duration::from_millis(750);
debug!(monitor_id, "launcher preview connected");
set_shared_status(&shared, "Waiting for stream...", true);
loop {
if !session_active.load(Ordering::Relaxed)
|| active_bindings.load(Ordering::Relaxed) == 0
{
break;
}
match tokio::time::timeout(
Duration::from_millis(300),
stream.get_mut().message(),
)
.await
{
Ok(Ok(Some(pkt))) => push_preview_packet(&appsrc, pkt),
Ok(Ok(None)) => {
set_shared_status(&shared, "Preview stream ended.", true);
retry_delay = Duration::from_millis(1_500);
break;
}
Ok(Err(err)) => {
warn!(monitor_id, ?err, "launcher preview stream error");
set_shared_status(
&shared,
format!("Preview stream error: {err}"),
true,
);
retry_delay = preview_retry_delay(retry_delay, Some(&err.to_string()));
break;
}
Err(_) => continue,
}
}
}
Err(err) => {
if preview_startup_condition(&err) {
debug!(
monitor_id,
?err,
"launcher preview waiting for capture pipeline"
);
set_shared_status(&shared, "Waiting for capture pipeline...", true);
retry_delay = preview_retry_delay(retry_delay, Some(err.message()));
} else {
warn!(monitor_id, ?err, "launcher preview rpc failed");
set_shared_status(&shared, format!("Preview RPC failed: {err}"), true);
retry_delay = preview_retry_delay(retry_delay, Some(err.message()));
}
}
}
tokio::time::sleep(retry_delay).await;
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
Ok(())
}
#[cfg(not(coverage))]
fn preview_startup_condition(err: &tonic::Status) -> bool {
let message = err.message().to_ascii_lowercase();
err.code() == tonic::Code::Internal
&& (message.contains("starting video pipeline")
|| message.contains("failed to change its state")
|| message.contains("resource busy")
|| message.contains("device or resource busy")
|| message.contains("no signal"))
}
#[cfg(not(coverage))]
fn preview_retry_delay(current: Duration, message: Option<&str>) -> Duration {
let current_ms = current.as_millis() as u64;
let mut next_ms = if current_ms < 1_500 {
1_500
} else {
current_ms.saturating_mul(2)
};
if let Some(message) = message {
let message = message.to_ascii_lowercase();
if message.contains("too many open files")
|| message.contains("failed to change its state")
|| message.contains("resource busy")
|| message.contains("device or resource busy")
{
next_ms = next_ms.max(6_000);
}
}
Duration::from_millis(next_ms.min(30_000))
}
#[cfg(not(coverage))]
fn set_shared_status(
shared: &Arc<Mutex<SharedPreviewState>>,
status: impl Into<String>,
clear: bool,
) {
if let Ok(mut slot) = shared.lock() {
slot.set_status(status, clear);
}
}
#[cfg(not(coverage))]
fn build_preview_pipeline(
profile: PreviewProfile,
) -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink)> {
let desc = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=6 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
h264parse disable-passthrough=true ! avdec_h264 ! videoconvert ! videoscale ! \
video/x-raw,format=RGBA,width={},height={},pixel-aspect-ratio=1/1 ! \
appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true"
,
profile.width,
profile.height
);
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("preview pipeline");
let appsrc = pipeline
.by_name("src")
.context("missing preview appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("preview appsrc");
appsrc.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
appsrc.set_format(gst::Format::Time);
let appsink = pipeline
.by_name("sink")
.context("missing preview appsink")?
.downcast::<gst_app::AppSink>()
.expect("preview appsink");
appsink.set_caps(Some(
&gst::Caps::builder("video/x-raw")
.field("format", &"RGBA")
.field("width", &profile.width)
.field("height", &profile.height)
.build(),
));
Ok((pipeline, appsrc, appsink))
}
#[cfg(not(coverage))]
fn push_preview_packet(appsrc: &gst_app::AppSrc, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
if let Some(buf) = buf.get_mut() {
buf.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
}
let _ = appsrc.push_buffer(buf);
}
#[cfg(not(coverage))]
fn sample_to_frame(sample: &gst::Sample) -> Option<PreviewFrame> {
let caps = sample.caps()?;
let structure = caps.structure(0)?;
let width = structure.get::<i32>("width").ok()?;
let height = structure.get::<i32>("height").ok()?;
let buffer = sample.buffer()?;
let map = buffer.map_readable().ok()?;
let rgba = map.as_slice().to_vec();
let stride = rgba.len() / height.max(1) as usize;
Some(PreviewFrame {
width,
height,
stride,
rgba,
})
}
#[cfg(not(coverage))]
fn preview_bitrate(var: &str, default: u32) -> u32 {
std::env::var(var)
.ok()
.and_then(|raw| raw.parse::<u32>().ok())
.unwrap_or(default)
}
#[cfg(not(coverage))]
fn preview_dimension(var: &str, default: i32) -> i32 {
std::env::var(var)
.ok()
.and_then(|raw| raw.parse::<i32>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
#[cfg(test)]
mod tests {
use super::{PREVIEW_HEIGHT, PREVIEW_WIDTH, PreviewSurface};
#[test]
fn inline_preview_profile_uses_existing_defaults() {
let profile = PreviewSurface::Inline.profile();
assert_eq!(profile.width, PREVIEW_WIDTH);
assert_eq!(profile.height, PREVIEW_HEIGHT);
assert_eq!(profile.max_bitrate_kbit, 2_500);
}
#[test]
fn breakout_preview_profile_defaults_to_higher_quality() {
let profile = PreviewSurface::Window.profile();
assert_eq!(profile.width, 1280);
assert_eq!(profile.height, 720);
assert_eq!(profile.max_bitrate_kbit, 8_000);
}
}