548 lines
21 KiB
Rust

// client/src/output/video.rs
use crate::output::{display, layout};
use anyhow::Context;
use gstreamer as gst;
use gstreamer::prelude::{Cast, ElementExt, GstBinExt, ObjectExt};
use gstreamer_app as gst_app;
use gstreamer_video::VideoOverlay;
use gstreamer_video::prelude::VideoOverlayExt;
use lesavka_common::lesavka::VideoPacket;
use std::process::Command;
use tracing::{debug, error, info, warn};
pub struct MonitorWindow {
_pipeline: gst::Pipeline,
src: gst_app::AppSrc,
}
pub struct UnifiedMonitorWindow {
pipeline: gst::Pipeline,
left_src: gst_app::AppSrc,
right_src: gst_app::AppSrc,
}
#[cfg(not(coverage))]
fn spawn_wmctrl_placement(id: u32, rect: layout::Rect) {
let x = rect.x;
let y = rect.y;
let w = rect.w;
let h = rect.h;
std::thread::spawn(move || {
for attempt in 1..=12 {
std::thread::sleep(std::time::Duration::from_millis(300));
let Some(window_id) = nth_lesavka_window_id(id as usize) else {
tracing::debug!("⌛ wmctrl: eye-{id} not mapped yet (attempt {attempt})");
continue;
};
let _ = Command::new("wmctrl")
.args([
"-i",
"-r",
&window_id,
"-b",
"remove,maximized_vert,maximized_horz",
])
.status();
let status = Command::new("wmctrl")
.args(["-i", "-r", &window_id, "-e", &format!("0,{x},{y},{w},{h}")])
.status();
match status {
Ok(st) if st.success() => {
tracing::info!("✅ wmctrl placed eye-{id} via {window_id} (attempt {attempt})");
break;
}
_ => tracing::debug!(
"⌛ wmctrl: eye-{id} not ready for placement (attempt {attempt})"
),
}
}
});
}
#[cfg(not(coverage))]
fn nth_lesavka_window_id(index: usize) -> Option<String> {
let out = Command::new("wmctrl").args(["-lp"]).output().ok()?;
if !out.status.success() {
return None;
}
let mut windows = String::from_utf8_lossy(&out.stdout)
.lines()
.filter_map(|line| {
let mut parts = line.split_whitespace();
let id = parts.next()?.to_string();
let _desktop = parts.next()?;
let _pid = parts.next()?;
let _host = parts.next()?;
let title = parts.collect::<Vec<_>>().join(" ");
if title == "lesavka-client" || title.starts_with("Lesavka-eye-") {
Some(id)
} else {
None
}
})
.collect::<Vec<_>>();
windows.sort();
windows.dedup();
if windows.len() < 2 {
return None;
}
windows.get(index).cloned()
}
#[allow(clippy::all)]
impl MonitorWindow {
#[cfg(coverage)]
pub fn new(_id: u32) -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
let pipeline = gst::Pipeline::new();
let src: gst_app::AppSrc = gst::ElementFactory::make("appsrc")
.build()
.context("make appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
src.set_format(gst::Format::Time);
let sink = gst::ElementFactory::make("fakesink")
.build()
.context("make fakesink")?;
pipeline.add(src.upcast_ref::<gst::Element>())?;
pipeline.add(&sink)?;
gst::Element::link_many(&[src.upcast_ref(), &sink])?;
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
_pipeline: pipeline,
src,
})
}
#[cfg(not(coverage))]
pub fn new(id: u32) -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
// --- Build pipeline ---------------------------------------------------
let sink = if std::env::var("GDK_BACKEND")
.map(|v| v.contains("x11"))
.unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some())
{
"ximagesink name=sink sync=false"
} else {
"glimagesink name=sink sync=false"
};
let desc = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! decodebin ! videoconvert ! {sink}"
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
/* -------- placement maths -------------------------------------- */
let monitors = display::enumerate_monitors();
let stream_defs = &[("eye-0", 1920, 1080), ("eye-1", 1920, 1080)];
let rects = layout::assign_rectangles(&monitors, stream_defs);
// --- AppSrc------------------------------------------------------------
let src: gst_app::AppSrc = pipeline
.by_name("src")
.unwrap()
.downcast::<gst_app::AppSrc>()
.unwrap();
src.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
src.set_format(gst::Format::Time);
/* -------- move/resize overlay ---------------------------------- */
if let Some(sink_elem) = pipeline.by_name("sink") {
if sink_elem.find_property("window-title").is_some() {
let _ = sink_elem.set_property("window-title", &format!("Lesavka-eye-{id}"));
}
if let Some(r) = rects.get(id as usize) {
if let Ok(overlay) = sink_elem.dynamic_cast::<VideoOverlay>() {
// 1. Tell glimagesink how to crop the texture in its own window
let _ = overlay.set_render_rectangle(0, 0, r.w, r.h);
debug!(
"🔲 eye-{id} → render_rectangle({}, {}, {}, {})",
0, 0, r.w, r.h
);
}
// 2. **Compositor-level** placement (Wayland only)
if std::env::var_os("WAYLAND_DISPLAY").is_some() {
use std::process::{Command, ExitStatus};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
// A small helper struct so the two branches return the same type
struct Placer {
name: &'static str,
run: Arc<dyn Fn(&str) -> std::io::Result<ExitStatus> + Send + Sync>,
}
let placer = if Command::new("swaymsg")
.arg("-t")
.arg("get_tree")
.output()
.map(|out| out.status.success())
.unwrap_or(false)
{
Some(Placer {
name: "swaymsg",
run: Arc::new(|cmd| Command::new("swaymsg").arg(cmd).status()),
})
} else if Command::new("hyprctl")
.arg("version")
.output()
.map(|out| out.status.success())
.unwrap_or(false)
{
Some(Placer {
name: "hyprctl",
run: Arc::new(|cmd| {
Command::new("hyprctl")
.args(["dispatch", "exec", cmd])
.status()
}),
})
} else if std::env::var_os("DISPLAY").is_some()
&& Command::new("wmctrl").arg("-m").output().is_ok()
{
spawn_wmctrl_placement(id, *r);
None
} else {
None
};
if let Some(placer) = placer {
let cmd = match placer.name {
// Criteria string that works for i3-/sway-compatible IPC
"swaymsg" | "hyprctl" => format!(
r#"[title="^Lesavka-eye-{id}$"] \
resize set {w} {h}; \
move absolute position {x} {y}"#,
w = r.w,
h = r.h,
x = r.x,
y = r.y,
),
_ => String::new(),
};
// Retry in a detached thread - avoids blocking GStreamer
let placename = placer.name;
let runner = placer.run.clone();
thread::spawn(move || {
for attempt in 1..=10 {
thread::sleep(Duration::from_millis(300));
match runner(&cmd) {
Ok(st) if st.success() => {
tracing::info!(
"✅ {placename}: placed eye-{id} (attempt {attempt})"
);
break;
}
_ => tracing::debug!(
"⌛ {placename}: eye-{id} not mapped yet (attempt {attempt})"
),
}
}
});
}
}
// 3. X11 / Xwayland placement via wmctrl
else if std::env::var_os("DISPLAY").is_some() {
spawn_wmctrl_placement(id, *r);
}
}
}
{
let id = id; // move into thread
let bus = pipeline.bus().expect("no bus");
std::thread::spawn(move || {
use gst::MessageView::*;
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
StateChanged(s) if s.current() == gst::State::Playing => {
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🎞️ video{id} pipeline ▶️ (sink='glimagesink')");
}
}
Error(e) => error!(
"💥 gst video{id}: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ gst video{id}: {} ({})",
w.error(),
w.debug().unwrap_or_default()
),
_ => {}
}
}
});
}
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
_pipeline: pipeline,
src,
})
}
/// Feed one access-unit to the decoder.
pub fn push_packet(&self, pkt: VideoPacket) {
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 150 == 0 || n < 10 {
debug!(
eye = pkt.id,
bytes = pkt.data.len(),
pts = pkt.pts,
"⬇️ received video AU"
);
}
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.src.push_buffer(buf); // ignore Eos/flushing
}
}
#[allow(clippy::all)]
impl Drop for MonitorWindow {
fn drop(&mut self) {
let _ = self._pipeline.set_state(gst::State::Null);
}
}
#[allow(clippy::all)]
impl UnifiedMonitorWindow {
#[cfg(coverage)]
/// Build the unified renderer in coverage mode with deterministic fakesinks.
pub fn new() -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
let pipeline = gst::Pipeline::new();
let left_src: gst_app::AppSrc = gst::ElementFactory::make("appsrc")
.build()
.context("make left appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("left appsrc");
let right_src: gst_app::AppSrc = gst::ElementFactory::make("appsrc")
.build()
.context("make right appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("right appsrc");
left_src.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
right_src.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
left_src.set_format(gst::Format::Time);
right_src.set_format(gst::Format::Time);
let left_sink = gst::ElementFactory::make("fakesink")
.build()
.context("make left fakesink")?;
let right_sink = gst::ElementFactory::make("fakesink")
.build()
.context("make right fakesink")?;
pipeline.add(left_src.upcast_ref::<gst::Element>())?;
pipeline.add(right_src.upcast_ref::<gst::Element>())?;
pipeline.add(&left_sink)?;
pipeline.add(&right_sink)?;
gst::Element::link_many(&[left_src.upcast_ref(), &left_sink])?;
gst::Element::link_many(&[right_src.upcast_ref(), &right_sink])?;
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
pipeline,
left_src,
right_src,
})
}
#[cfg(not(coverage))]
/// Build the unified renderer that composites both eyes in a single window.
pub fn new() -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
let sink = if std::env::var("GDK_BACKEND")
.map(|v| v.contains("x11"))
.unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some())
{
"ximagesink name=sink sync=false"
} else {
"glimagesink name=sink sync=false"
};
let desc = format!(
"compositor name=mix background=black ! videoconvert ! {sink} \
appsrc name=src0 is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! mix. \
appsrc name=src1 is-live=true format=time do-timestamp=true block=false ! \
queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! mix."
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
let monitors = display::enumerate_monitors();
let root_rect = layout::assign_rectangles(&monitors, &[("unified", 1920, 1080)])
.first()
.copied()
.unwrap_or(layout::Rect {
x: 0,
y: 0,
w: 1920,
h: 1080,
});
let pane_w = (root_rect.w / 2).max(320);
let pane_h = root_rect.h.max(240);
if let Some(mix) = pipeline.by_name("mix") {
if let Some(left_pad) = mix.static_pad("sink_0") {
left_pad.set_property("xpos", 0_i32);
left_pad.set_property("ypos", 0_i32);
left_pad.set_property("width", pane_w);
left_pad.set_property("height", pane_h);
}
if let Some(right_pad) = mix.static_pad("sink_1") {
right_pad.set_property("xpos", pane_w);
right_pad.set_property("ypos", 0_i32);
right_pad.set_property("width", pane_w);
right_pad.set_property("height", pane_h);
}
}
if let Some(sink_elem) = pipeline.by_name("sink") {
if sink_elem.find_property("window-title").is_some() {
let _ = sink_elem.set_property("window-title", &"Lesavka-unified");
}
if let Ok(overlay) = sink_elem.dynamic_cast::<VideoOverlay>() {
let _ = overlay.set_render_rectangle(0, 0, pane_w * 2, pane_h);
}
}
let left_src: gst_app::AppSrc = pipeline
.by_name("src0")
.context("missing src0")?
.downcast::<gst_app::AppSrc>()
.expect("src0 appsrc");
let right_src: gst_app::AppSrc = pipeline
.by_name("src1")
.context("missing src1")?
.downcast::<gst_app::AppSrc>()
.expect("src1 appsrc");
left_src.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
right_src.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
left_src.set_format(gst::Format::Time);
right_src.set_format(gst::Format::Time);
{
let bus = pipeline.bus().expect("no bus");
std::thread::spawn(move || {
use gst::MessageView::*;
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
StateChanged(s) if s.current() == gst::State::Playing => {
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🎞️ unified video pipeline ▶️");
}
}
Error(e) => error!(
"💥 gst unified-video: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ gst unified-video: {} ({})",
w.error(),
w.debug().unwrap_or_default()
),
_ => {}
}
}
});
}
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
pipeline,
left_src,
right_src,
})
}
/// Feed one access-unit into the unified decoder wall.
pub fn push_packet(&self, pkt: VideoPacket) {
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 150 == 0 || n < 10 {
debug!(
eye = pkt.id,
bytes = pkt.data.len(),
pts = pkt.pts,
"⬇️ received unified video AU"
);
}
let src = if pkt.id == 0 {
&self.left_src
} else {
&self.right_src
};
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = src.push_buffer(buf);
}
}
#[allow(clippy::all)]
impl Drop for UnifiedMonitorWindow {
fn drop(&mut self) {
let _ = self.pipeline.set_state(gst::State::Null);
}
}