// client/src/output/video.rs use anyhow::Context; use gstreamer as gst; use gstreamer::prelude::{Cast, ElementExt, GstBinExt, ObjectExt}; use gstreamer_app as gst_app; use gstreamer_video::VideoOverlay; use gstreamer_video::prelude::VideoOverlayExt; use lesavka_common::lesavka::VideoPacket; use std::process::Command; use tracing::{debug, error, info, warn}; use crate::output::{display, layout}; pub struct MonitorWindow { _pipeline: gst::Pipeline, src: gst_app::AppSrc, } impl MonitorWindow { pub fn new(id: u32) -> anyhow::Result { gst::init().context("initialising GStreamer")?; // --- Build pipeline --------------------------------------------------- let sink = if std::env::var("GDK_BACKEND") .map(|v| v.contains("x11")) .unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some()) { "ximagesink name=sink sync=false" } else { "glimagesink name=sink sync=false" }; let desc = format!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue leaky=downstream ! \ capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ h264parse disable-passthrough=true ! decodebin ! videoconvert ! {sink}" ); let pipeline: gst::Pipeline = gst::parse::launch(&desc)? .downcast::() .expect("not a pipeline"); /* -------- placement maths -------------------------------------- */ let monitors = display::enumerate_monitors(); let stream_defs = &[("eye-0", 1920, 1080), ("eye-1", 1920, 1080)]; let rects = layout::assign_rectangles(&monitors, stream_defs); // --- AppSrc------------------------------------------------------------ let src: gst_app::AppSrc = pipeline .by_name("src") .unwrap() .downcast::() .unwrap(); src.set_caps(Some( &gst::Caps::builder("video/x-h264") .field("stream-format", &"byte-stream") .field("alignment", &"au") .build(), )); src.set_format(gst::Format::Time); /* -------- move/resize overlay ---------------------------------- */ if let Some(sink_elem) = pipeline.by_name("sink") { if sink_elem.find_property("window-title").is_some() { let _ = sink_elem.set_property("window-title", &format!("Lesavka-eye-{id}")); } if let Ok(overlay) = sink_elem.dynamic_cast::() { if let Some(r) = rects.get(id as usize) { // 1. Tell glimagesink how to crop the texture in its own window let _ = overlay.set_render_rectangle(0, 0, r.w, r.h); debug!( "🔲 eye-{id} → render_rectangle({}, {}, {}, {})", 0, 0, r.w, r.h ); // 2. **Compositor-level** placement (Wayland only) if std::env::var_os("WAYLAND_DISPLAY").is_some() { use std::process::{Command, ExitStatus}; use std::sync::Arc; use std::thread; use std::time::Duration; // A small helper struct so the two branches return the same type struct Placer { name: &'static str, run: Arc std::io::Result + Send + Sync>, } let placer = if Command::new("swaymsg") .arg("-t") .arg("get_tree") .output() .is_ok() { Placer { name: "swaymsg", run: Arc::new(|cmd| Command::new("swaymsg").arg(cmd).status()), } } else if Command::new("hyprctl").arg("version").output().is_ok() { Placer { name: "hyprctl", run: Arc::new(|cmd| { Command::new("hyprctl") .args(["dispatch", "exec", cmd]) .status() }), } } else { Placer { name: "noop", run: Arc::new(|_| { Err(std::io::Error::new( std::io::ErrorKind::Other, "no swaymsg/hyprctl found", )) }), } }; if placer.name != "noop" { let cmd = match placer.name { // Criteria string that works for i3-/sway-compatible IPC "swaymsg" | "hyprctl" => format!( r#"[title="^Lesavka-eye-{id}$"] \ resize set {w} {h}; \ move absolute position {x} {y}"#, w = r.w, h = r.h, x = r.x, y = r.y, ), _ => String::new(), }; // Retry in a detached thread - avoids blocking GStreamer let placename = placer.name; let runner = placer.run.clone(); thread::spawn(move || { for attempt in 1..=10 { thread::sleep(Duration::from_millis(300)); match runner(&cmd) { Ok(st) if st.success() => { tracing::info!( "✅ {placename}: placed eye-{id} (attempt {attempt})" ); break; } _ => tracing::debug!( "⌛ {placename}: eye-{id} not mapped yet (attempt {attempt})" ), } } }); } } // 3. X11 / Xwayland placement via wmctrl else if std::env::var_os("DISPLAY").is_some() { let title = format!("Lesavka-eye-{id}"); let w = r.w; let h = r.h; let x = r.x; let y = r.y; std::thread::spawn(move || { for attempt in 1..=10 { std::thread::sleep(std::time::Duration::from_millis(300)); let status = Command::new("wmctrl") .args(["-r", &title, "-e", &format!("0,{x},{y},{w},{h}")]) .status(); match status { Ok(st) if st.success() => { tracing::info!( "✅ wmctrl placed eye-{id} (attempt {attempt})" ); break; } _ => tracing::debug!( "⌛ wmctrl: eye-{id} not mapped yet (attempt {attempt})" ), } } }); } } } } { let id = id; // move into thread let bus = pipeline.bus().expect("no bus"); std::thread::spawn(move || { use gst::MessageView::*; for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { StateChanged(s) if s.current() == gst::State::Playing => { if msg.src().map(|s| s.is::()).unwrap_or(false) { info!("🎞️ video{id} pipeline ▶️ (sink='glimagesink')"); } } Error(e) => error!( "💥 gst video{id}: {} ({})", e.error(), e.debug().unwrap_or_default() ), Warning(w) => warn!( "⚠️ gst video{id}: {} ({})", w.error(), w.debug().unwrap_or_default() ), _ => {} } } }); } pipeline.set_state(gst::State::Playing)?; Ok(Self { _pipeline: pipeline, src, }) } /// Feed one access-unit to the decoder. pub fn push_packet(&self, pkt: VideoPacket) { static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 150 == 0 || n < 10 { debug!( eye = pkt.id, bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received video AU" ); } let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); let _ = self.src.push_buffer(buf); // ignore Eos/flushing } } impl Drop for MonitorWindow { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); } }