// client/src/output/video.rs use std::process::Command; use std::time::Duration; use std::thread; use anyhow::Context; use gstreamer as gst; use gstreamer_app as gst_app; use gstreamer_video::{prelude::*, VideoOverlay}; use gst::prelude::*; use lesavka_common::lesavka::VideoPacket; use tracing::{error, info, warn, debug}; use gstreamer_video as gst_video; use gstreamer_video::glib::Type; use crate::output::{display, layout}; /* ---------- pipeline ---------------------------------------------------- * ┌────────────┐ H.264/AU ┌─────────┐ Decoded ┌─────────────┐ * │ AppSrc │────────────► decodebin ├──────────► glimagesink │ * └────────────┘ (autoplug) (overlay) | * ----------------------------------------------------------------------*/ const PIPELINE_DESC: &str = concat!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! ", "queue leaky=downstream ! ", "capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! ", "h264parse disable-passthrough=true ! decodebin ! videoconvert ! ", "glimagesink name=sink sync=false" ); pub struct MonitorWindow { _pipeline: gst::Pipeline, src: gst_app::AppSrc, } impl MonitorWindow { pub fn new(id: u32) -> anyhow::Result { gst::init().context("initialising GStreamer")?; // --- Build pipeline --------------------------------------------------- let pipeline: gst::Pipeline = gst::parse::launch(PIPELINE_DESC)? .downcast::() .expect("not a pipeline"); /* -------- placement maths -------------------------------------- */ let monitors = display::enumerate_monitors(); let stream_defs = &[("eye-0", 1920, 1080), ("eye-1", 1920, 1080)]; let rects = layout::assign_rectangles(&monitors, stream_defs); // --- AppSrc------------------------------------------------------------ let src: gst_app::AppSrc = pipeline .by_name("src") .unwrap() .downcast::() .unwrap(); src.set_caps(Some(&gst::Caps::builder("video/x-h264") .field("stream-format", &"byte-stream") .field("alignment", &"au") .build())); src.set_format(gst::Format::Time); /* -------- move/resize overlay ---------------------------------- */ if let Some(sink_elem) = pipeline.by_name("sink") { if let Ok(overlay) = sink_elem.dynamic_cast::() { if let Some(r) = rects.get(id as usize) { // 1. Tell glimagesink how to crop the texture in its own window overlay.set_render_rectangle(r.x, r.y, r.w, r.h); debug!( "🔲 eye-{id} → render_rectangle({}, {}, {}, {})", r.x, r.y, r.w, r.h ); // 2. **Compositor-level** placement (Wayland only) if std::env::var_os("WAYLAND_DISPLAY").is_some() { use std::process::{Command, ExitStatus}; use std::sync::Arc; use std::thread; use std::time::Duration; // A small helper struct so the two branches return the same type struct Placer { name: &'static str, run: Arc std::io::Result + Send + Sync>, } let placer = if Command::new("swaymsg").arg("-t").arg("get_tree") .output().is_ok() { Placer { name: "swaymsg", run: Arc::new(|cmd| Command::new("swaymsg").arg(cmd).status()), } } else if Command::new("hyprctl").arg("version").output().is_ok() { Placer { name: "hyprctl", run: Arc::new(|cmd| { Command::new("hyprctl") .args(["dispatch", "exec", cmd]) .status() }), } } else { Placer { name: "noop", run: Arc::new(|_| { Err(std::io::Error::new( std::io::ErrorKind::Other, "no swaymsg/hyprctl found", )) }), } }; if placer.name != "noop" { // Criteria string that works for i3-/sway-compatible IPC let criteria = format!( r#"[title="^Lesavka-eye-{id}$"] \ resize set {w} {h}; \ move absolute position {x} {y}"#, w = r.w, h = r.h, x = r.x, y = r.y, ); // Retry in a detached thread - avoids blocking GStreamer let placename = placer.name; let runner = placer.run.clone(); thread::spawn(move || { for attempt in 1..=10 { thread::sleep(Duration::from_millis(300)); match runner(&criteria) { Ok(st) if st.success() => { tracing::info!( "✅ {placename}: placed eye-{id} (attempt {attempt})" ); break; } _ => tracing::debug!( "⌛ {placename}: eye-{id} not mapped yet (attempt {attempt})" ), } } }); } } } } } pipeline.set_state(gst::State::Playing)?; Ok(Self { _pipeline: pipeline, src }) } /// Feed one access-unit to the decoder. pub fn push_packet(&self, pkt: VideoPacket) { static CNT : std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 150 == 0 || n < 10 { debug!(eye = pkt.id, bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received video AU"); } let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); let _ = self.src.push_buffer(buf); // ignore Eos/flushing } }