// client/src/output/video.rs use crate::output::{display, layout}; use anyhow::Context; use gstreamer as gst; use gstreamer::prelude::{Cast, ElementExt, GstBinExt, ObjectExt}; use gstreamer_app as gst_app; use gstreamer_video::VideoOverlay; use gstreamer_video::prelude::VideoOverlayExt; use lesavka_common::lesavka::VideoPacket; use std::process::Command; use tracing::{debug, error, info, warn}; /// Pick the first H.264 decoder that can be built on this client. fn pick_h264_decoder() -> String { if let Ok(raw) = std::env::var("LESAVKA_H264_DECODER") { let name = raw.trim(); if name.eq_ignore_ascii_case("decodebin") { return "decodebin".to_string(); } if !name.is_empty() && buildable_decoder(name) { return name.to_string(); } } for name in [ "avdec_h264", "openh264dec", "nvh264dec", "nvh264sldec", "vah264dec", "vaapih264dec", "v4l2h264dec", "v4l2slh264dec", ] { if buildable_decoder(name) { return name.to_string(); } } "decodebin".to_string() } #[cfg(coverage)] /// Probe decoder availability, with a coverage hook for fallback behavior. fn buildable_decoder(name: &str) -> bool { if std::env::var("LESAVKA_TEST_DISABLE_H264_DECODERS").is_ok() { return false; } gst::ElementFactory::find(name).is_some() && gst::ElementFactory::make(name).build().is_ok() } #[cfg(not(coverage))] /// Probe decoder availability through the local GStreamer registry. fn buildable_decoder(name: &str) -> bool { gst::ElementFactory::find(name).is_some() && gst::ElementFactory::make(name).build().is_ok() } pub struct MonitorWindow { _pipeline: gst::Pipeline, src: gst_app::AppSrc, } pub struct UnifiedMonitorWindow { pipeline: gst::Pipeline, left_src: gst_app::AppSrc, right_src: gst_app::AppSrc, } #[cfg(not(coverage))] /// Place an eye window with wmctrl once the compositor maps it. fn spawn_wmctrl_placement(id: u32, rect: layout::Rect) { let x = rect.x; let y = rect.y; let w = rect.w; let h = rect.h; std::thread::spawn(move || { for attempt in 1..=12 { std::thread::sleep(std::time::Duration::from_millis(300)); let Some(window_id) = nth_lesavka_window_id(id as usize) else { tracing::debug!("⌛ wmctrl: eye-{id} not mapped yet (attempt {attempt})"); continue; }; let _ = Command::new("wmctrl") .args([ "-i", "-r", &window_id, "-b", "remove,maximized_vert,maximized_horz", ]) .status(); let status = Command::new("wmctrl") .args(["-i", "-r", &window_id, "-e", &format!("0,{x},{y},{w},{h}")]) .status(); match status { Ok(st) if st.success() => { tracing::info!("✅ wmctrl placed eye-{id} via {window_id} (attempt {attempt})"); break; } _ => tracing::debug!( "⌛ wmctrl: eye-{id} not ready for placement (attempt {attempt})" ), } } }); } #[cfg(not(coverage))] /// Return the nth Lesavka video window ID from wmctrl output. fn nth_lesavka_window_id(index: usize) -> Option { let out = Command::new("wmctrl").args(["-lp"]).output().ok()?; if !out.status.success() { return None; } let mut windows = String::from_utf8_lossy(&out.stdout) .lines() .filter_map(|line| { let mut parts = line.split_whitespace(); let id = parts.next()?.to_string(); let _desktop = parts.next()?; let _pid = parts.next()?; let _host = parts.next()?; let title = parts.collect::>().join(" "); if title == "lesavka-client" || title.starts_with("Lesavka-eye-") { Some(id) } else { None } }) .collect::>(); windows.sort(); windows.dedup(); if windows.len() < 2 { return None; } windows.get(index).cloned() } #[allow(clippy::all)] impl MonitorWindow { #[cfg(coverage)] /// Build a deterministic fakesink monitor for coverage tests. pub fn new(_id: u32) -> anyhow::Result { gst::init().context("initialising GStreamer")?; let pipeline = gst::Pipeline::new(); let src: gst_app::AppSrc = gst::ElementFactory::make("appsrc") .build() .context("make appsrc")? .downcast::() .expect("appsrc"); src.set_caps(Some( &gst::Caps::builder("video/x-h264") .field("stream-format", &"byte-stream") .field("alignment", &"au") .build(), )); src.set_format(gst::Format::Time); let sink = gst::ElementFactory::make("fakesink") .build() .context("make fakesink")?; pipeline.add(src.upcast_ref::())?; pipeline.add(&sink)?; gst::Element::link_many(&[src.upcast_ref(), &sink])?; pipeline.set_state(gst::State::Playing)?; Ok(Self { _pipeline: pipeline, src, }) } #[cfg(not(coverage))] /// Build a live monitor window for one remote eye stream. pub fn new(id: u32) -> anyhow::Result { gst::init().context("initialising GStreamer")?; // --- Build pipeline --------------------------------------------------- let decoder_name = pick_h264_decoder(); let sink = if std::env::var("GDK_BACKEND") .map(|v| v.contains("x11")) .unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some()) { "ximagesink name=sink sync=false" } else { "glimagesink name=sink sync=false" }; let desc = format!( "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ h264parse disable-passthrough=true ! {decoder_name} name=decoder ! videoconvert ! {sink}" ); let pipeline: gst::Pipeline = gst::parse::launch(&desc)? .downcast::() .expect("not a pipeline"); /* -------- placement maths -------------------------------------- */ let monitors = display::enumerate_monitors(); let stream_defs = &[("eye-0", 1920, 1080), ("eye-1", 1920, 1080)]; let rects = layout::assign_rectangles(&monitors, stream_defs); // --- AppSrc------------------------------------------------------------ let src: gst_app::AppSrc = pipeline .by_name("src") .unwrap() .downcast::() .unwrap(); src.set_caps(Some( &gst::Caps::builder("video/x-h264") .field("stream-format", &"byte-stream") .field("alignment", &"au") .build(), )); src.set_format(gst::Format::Time); /* -------- move/resize overlay ---------------------------------- */ if let Some(sink_elem) = pipeline.by_name("sink") { if sink_elem.find_property("window-title").is_some() { let _ = sink_elem.set_property("window-title", &format!("Lesavka-eye-{id}")); } if let Some(r) = rects.get(id as usize) { if let Ok(overlay) = sink_elem.dynamic_cast::() { // 1. Tell glimagesink how to crop the texture in its own window let _ = overlay.set_render_rectangle(0, 0, r.w, r.h); debug!( "🔲 eye-{id} → render_rectangle({}, {}, {}, {})", 0, 0, r.w, r.h ); } // 2. **Compositor-level** placement (Wayland only) if std::env::var_os("WAYLAND_DISPLAY").is_some() { use std::process::{Command, ExitStatus}; use std::sync::Arc; use std::thread; use std::time::Duration; // A small helper struct so the two branches return the same type struct Placer { name: &'static str, run: Arc std::io::Result + Send + Sync>, } let placer = if Command::new("swaymsg") .arg("-t") .arg("get_tree") .output() .map(|out| out.status.success()) .unwrap_or(false) { Some(Placer { name: "swaymsg", run: Arc::new(|cmd| Command::new("swaymsg").arg(cmd).status()), }) } else if Command::new("hyprctl") .arg("version") .output() .map(|out| out.status.success()) .unwrap_or(false) { Some(Placer { name: "hyprctl", run: Arc::new(|cmd| { Command::new("hyprctl") .args(["dispatch", "exec", cmd]) .status() }), }) } else if std::env::var_os("DISPLAY").is_some() && Command::new("wmctrl").arg("-m").output().is_ok() { spawn_wmctrl_placement(id, *r); None } else { None }; if let Some(placer) = placer { let cmd = match placer.name { // Criteria string that works for i3-/sway-compatible IPC "swaymsg" | "hyprctl" => format!( r#"[title="^Lesavka-eye-{id}$"] \ resize set {w} {h}; \ move absolute position {x} {y}"#, w = r.w, h = r.h, x = r.x, y = r.y, ), _ => String::new(), }; // Retry in a detached thread - avoids blocking GStreamer let placename = placer.name; let runner = placer.run.clone(); thread::spawn(move || { for attempt in 1..=10 { thread::sleep(Duration::from_millis(300)); match runner(&cmd) { Ok(st) if st.success() => { tracing::info!( "✅ {placename}: placed eye-{id} (attempt {attempt})" ); break; } _ => tracing::debug!( "⌛ {placename}: eye-{id} not mapped yet (attempt {attempt})" ), } } }); } } // 3. X11 / Xwayland placement via wmctrl else if std::env::var_os("DISPLAY").is_some() { spawn_wmctrl_placement(id, *r); } } } { let id = id; // move into thread let bus = pipeline.bus().expect("no bus"); std::thread::spawn(move || { use gst::MessageView::*; for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { StateChanged(s) if s.current() == gst::State::Playing => { if msg.src().map(|s| s.is::()).unwrap_or(false) { info!("🎞️ video{id} pipeline ▶️ (sink='glimagesink')"); info!("🎞️ video{id} decoder → {decoder_name}"); } } Error(e) => error!( "💥 gst video{id}: {} ({})", e.error(), e.debug().unwrap_or_default() ), Warning(w) => warn!( "⚠️ gst video{id}: {} ({})", w.error(), w.debug().unwrap_or_default() ), _ => {} } } }); } pipeline.set_state(gst::State::Playing)?; Ok(Self { _pipeline: pipeline, src, }) } /// Feed one access-unit to the decoder. pub fn push_packet(&self, pkt: VideoPacket) { static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 150 == 0 || n < 10 { debug!( eye = pkt.id, bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received video AU" ); } let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut() .unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); let _ = self.src.push_buffer(buf); // ignore Eos/flushing } }