#[cfg(not(coverage))] use anyhow::{Context, Result}; #[cfg(not(coverage))] use gstreamer as gst; #[cfg(not(coverage))] use gstreamer::prelude::{Cast, ElementExt, GstBinExt}; #[cfg(not(coverage))] use gstreamer_app as gst_app; #[cfg(not(coverage))] use gtk::{gdk, glib}; #[cfg(not(coverage))] use lesavka_common::lesavka::{MonitorRequest, VideoPacket, relay_client::RelayClient}; #[cfg(not(coverage))] use std::sync::atomic::{AtomicBool, Ordering}; #[cfg(not(coverage))] use std::sync::{Arc, Mutex}; #[cfg(not(coverage))] use std::time::Duration; #[cfg(not(coverage))] use tonic::{Request, transport::Channel}; #[cfg(not(coverage))] use tracing::{debug, warn}; #[cfg(not(coverage))] const PREVIEW_WIDTH: i32 = 640; #[cfg(not(coverage))] const PREVIEW_HEIGHT: i32 = 360; #[cfg(not(coverage))] pub struct LauncherPreview { feeds: [PreviewFeed; 2], } #[cfg(not(coverage))] #[derive(Clone)] pub struct PreviewBinding { enabled: Arc, alive: Arc, } #[cfg(not(coverage))] impl LauncherPreview { pub fn new(server_addr: String) -> Result { gst::init().context("initialising preview gstreamer")?; Ok(Self { feeds: [ PreviewFeed::spawn(server_addr.clone(), 0)?, PreviewFeed::spawn(server_addr, 1)?, ], }) } pub fn install_on_picture( &self, monitor_id: usize, picture: >k::Picture, status_label: >k::Label, ) -> Option { self.feeds .get(monitor_id) .map(|feed| feed.install_on_picture(picture, status_label)) } } #[cfg(not(coverage))] impl PreviewBinding { pub fn set_enabled(&self, enabled: bool) { self.enabled.store(enabled, Ordering::Relaxed); } pub fn close(&self) { self.alive.store(false, Ordering::Relaxed); } } #[cfg(not(coverage))] struct PreviewFeed { latest: Arc>>, } #[cfg(not(coverage))] impl PreviewFeed { fn spawn(server_addr: String, monitor_id: u32) -> Result { let latest = Arc::new(Mutex::new(None)); let store = Arc::clone(&latest); std::thread::spawn(move || { if let Err(err) = run_preview_feed(server_addr, monitor_id, store) { warn!(monitor_id, ?err, "launcher preview feed exited"); } }); Ok(Self { latest }) } fn install_on_picture( &self, picture: >k::Picture, status_label: >k::Label, ) -> PreviewBinding { let picture = picture.clone(); let status_label = status_label.clone(); let latest = Arc::clone(&self.latest); let enabled = Arc::new(AtomicBool::new(true)); let alive = Arc::new(AtomicBool::new(true)); let enabled_flag = Arc::clone(&enabled); let alive_flag = Arc::clone(&alive); glib::timeout_add_local(Duration::from_millis(120), move || { if !alive_flag.load(Ordering::Relaxed) { return glib::ControlFlow::Break; } if !enabled_flag.load(Ordering::Relaxed) { return glib::ControlFlow::Continue; } let next = latest.lock().ok().and_then(|mut slot| slot.take()); if let Some(frame) = next { let bytes = glib::Bytes::from_owned(frame.rgba); let texture = gdk::MemoryTexture::new( frame.width, frame.height, gdk::MemoryFormat::R8g8b8a8, &bytes, frame.stride, ); picture.set_paintable(Some(&texture)); status_label.set_text("Live"); } glib::ControlFlow::Continue }); PreviewBinding { enabled, alive } } } #[cfg(not(coverage))] struct PreviewFrame { width: i32, height: i32, stride: usize, rgba: Vec, } #[cfg(not(coverage))] fn run_preview_feed( server_addr: String, monitor_id: u32, latest: Arc>>, ) -> Result<()> { let (pipeline, appsrc, appsink) = build_preview_pipeline()?; pipeline .set_state(gst::State::Playing) .context("starting launcher preview pipeline")?; { let latest = Arc::clone(&latest); let appsink = appsink.clone(); std::thread::spawn(move || { loop { if let Some(sample) = appsink.try_pull_sample(gst::ClockTime::from_mseconds(250)) { if let Some(frame) = sample_to_frame(&sample) { if let Ok(mut slot) = latest.lock() { *slot = Some(frame); } } } } }); } let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .context("building preview tokio runtime")?; let _ = rt.block_on(async move { loop { let channel = match Channel::from_shared(server_addr.clone()) { Ok(endpoint) => match endpoint.tcp_nodelay(true).connect().await { Ok(channel) => channel, Err(err) => { warn!(monitor_id, ?err, "launcher preview connect failed"); tokio::time::sleep(Duration::from_millis(750)).await; continue; } }, Err(err) => { warn!(monitor_id, ?err, "launcher preview endpoint invalid"); tokio::time::sleep(Duration::from_millis(750)).await; continue; } }; let mut cli = RelayClient::new(channel); let req = MonitorRequest { id: monitor_id, max_bitrate: preview_max_bitrate(), }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { debug!(monitor_id, "launcher preview connected"); while let Some(item) = stream.get_mut().message().await.transpose() { match item { Ok(pkt) => push_preview_packet(&appsrc, pkt), Err(err) => { warn!(monitor_id, ?err, "launcher preview stream error"); break; } } } } Err(err) => warn!(monitor_id, ?err, "launcher preview rpc failed"), } tokio::time::sleep(Duration::from_millis(750)).await; } #[allow(unreachable_code)] Ok::<(), anyhow::Error>(()) }); Ok(()) } #[cfg(not(coverage))] fn build_preview_pipeline() -> Result<(gst::Pipeline, gst_app::AppSrc, gst_app::AppSink)> { let desc = format!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue max-size-buffers=6 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ h264parse disable-passthrough=true ! avdec_h264 ! videoconvert ! videoscale ! \ video/x-raw,format=RGBA,width={PREVIEW_WIDTH},height={PREVIEW_HEIGHT},pixel-aspect-ratio=1/1 ! \ appsink name=sink emit-signals=false sync=false max-buffers=1 drop=true" ); let pipeline = gst::parse::launch(&desc)? .downcast::() .expect("preview pipeline"); let appsrc = pipeline .by_name("src") .context("missing preview appsrc")? .downcast::() .expect("preview appsrc"); appsrc.set_caps(Some( &gst::Caps::builder("video/x-h264") .field("stream-format", &"byte-stream") .field("alignment", &"au") .build(), )); appsrc.set_format(gst::Format::Time); let appsink = pipeline .by_name("sink") .context("missing preview appsink")? .downcast::() .expect("preview appsink"); appsink.set_caps(Some( &gst::Caps::builder("video/x-raw") .field("format", &"RGBA") .field("width", &PREVIEW_WIDTH) .field("height", &PREVIEW_HEIGHT) .build(), )); Ok((pipeline, appsrc, appsink)) } #[cfg(not(coverage))] fn push_preview_packet(appsrc: &gst_app::AppSrc, pkt: VideoPacket) { let mut buf = gst::Buffer::from_slice(pkt.data); if let Some(buf) = buf.get_mut() { buf.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); } let _ = appsrc.push_buffer(buf); } #[cfg(not(coverage))] fn sample_to_frame(sample: &gst::Sample) -> Option { let caps = sample.caps()?; let structure = caps.structure(0)?; let width = structure.get::("width").ok()?; let height = structure.get::("height").ok()?; let buffer = sample.buffer()?; let map = buffer.map_readable().ok()?; let rgba = map.as_slice().to_vec(); let stride = rgba.len() / height.max(1) as usize; Some(PreviewFrame { width, height, stride, rgba, }) } #[cfg(not(coverage))] fn preview_max_bitrate() -> u32 { std::env::var("LESAVKA_PREVIEW_MAX_KBIT") .ok() .and_then(|raw| raw.parse::().ok()) .unwrap_or(2_500) }