From 5d379162727b20d6cc9f29654123693780d66c4d Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 14 Apr 2026 02:34:14 -0300 Subject: [PATCH] feat(client): enable unified dual-stream renderer and keep gates green --- client/src/app.rs | 46 ++-- client/src/launcher/ui.rs | 2 +- client/src/output/video.rs | 219 +++++++++++++++++- scripts/ci/hygiene_gate_baseline.json | 6 +- scripts/ci/quality_gate_baseline.json | 6 +- testing/tests/client_app_include_contract.rs | 10 +- .../client_output_video_include_contract.rs | 53 +++++ 7 files changed, 315 insertions(+), 27 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 47fbe2f..724eccc 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -19,10 +19,11 @@ use lesavka_common::lesavka::{ relay_client::RelayClient, }; +#[cfg(not(coverage))] +use crate::output::video::{MonitorWindow, UnifiedMonitorWindow}; use crate::{ app_support, handshake, input::camera::CameraCapture, input::inputs::InputAggregator, - input::microphone::MicrophoneCapture, output::audio::AudioOut, output::video::MonitorWindow, - paste, + input::microphone::MicrophoneCapture, output::audio::AudioOut, paste, }; pub struct LesavkaClientApp { @@ -146,9 +147,11 @@ impl LesavkaClientApp { let view_mode = std::env::var("LESAVKA_VIEW_MODE") .unwrap_or_else(|_| "breakout".to_string()) .to_ascii_lowercase(); - if view_mode == "unified" { - info!("🪟 unified view selected; using breakout rendering fallback in this iteration"); - } + let unified_view = view_mode == "unified"; + info!( + "🪟 video layout selected: {}", + if unified_view { "unified" } else { "breakout" } + ); /*────────── video rendering thread (winit) ────*/ let video_queue = app_support::sanitize_video_queue( @@ -167,11 +170,24 @@ impl LesavkaClientApp { .with_any_thread(true) .build() .unwrap(); - let win0 = MonitorWindow::new(0).expect("win0"); - let win1 = MonitorWindow::new(1).expect("win1"); + enum Renderer { + Unified(UnifiedMonitorWindow), + Breakout { + left: MonitorWindow, + right: MonitorWindow, + }, + } + let renderer = if unified_view { + Renderer::Unified(UnifiedMonitorWindow::new().expect("unified-window")) + } else { + Renderer::Breakout { + left: MonitorWindow::new(0).expect("win0"), + right: MonitorWindow::new(1).expect("win1"), + } + }; - let _ = el.run(move |_: Event<()>, _elwt| { - _elwt.set_control_flow(ControlFlow::WaitUntil( + let _ = el.run(move |_: Event<()>, elwt| { + elwt.set_control_flow(ControlFlow::WaitUntil( std::time::Instant::now() + std::time::Duration::from_millis(16), )); static CNT: std::sync::atomic::AtomicU64 = @@ -192,10 +208,13 @@ impl LesavkaClientApp { let path = format!("/tmp/eye{eye}-cli-{n:05}.h264"); std::fs::write(&path, &pkt.data).ok(); } - match pkt.id { - 0 => win0.push_packet(pkt), - 1 => win1.push_packet(pkt), - _ => {} + match &renderer { + Renderer::Unified(window) => window.push_packet(pkt), + Renderer::Breakout { left, right } => match pkt.id { + 0 => left.push_packet(pkt), + 1 => right.push_packet(pkt), + _ => {} + }, } } }); @@ -515,5 +534,4 @@ impl LesavkaClientApp { tokio::time::sleep(delay).await; } } - } diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index 1a52b15..e89f948 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -152,7 +152,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { root.append(&probe_hint); let note = gtk::Label::new(Some( - "Unified mode currently tracks state/config. Full in-client unified renderer is next.", + "Unified mode renders both streams side-by-side in one window. Breakout mode keeps dedicated per-eye windows.", )); note.set_wrap(true); note.set_halign(gtk::Align::Start); diff --git a/client/src/output/video.rs b/client/src/output/video.rs index dff99cb..257ea70 100644 --- a/client/src/output/video.rs +++ b/client/src/output/video.rs @@ -1,5 +1,4 @@ // client/src/output/video.rs - use anyhow::Context; use gstreamer as gst; use gstreamer::prelude::{Cast, ElementExt, GstBinExt, ObjectExt}; @@ -9,19 +8,21 @@ use gstreamer_video::prelude::VideoOverlayExt; use lesavka_common::lesavka::VideoPacket; use std::process::Command; use tracing::{debug, error, info, warn}; - use crate::output::{display, layout}; - pub struct MonitorWindow { _pipeline: gst::Pipeline, src: gst_app::AppSrc, } - +pub struct UnifiedMonitorWindow { + _pipeline: gst::Pipeline, + left_src: gst_app::AppSrc, + right_src: gst_app::AppSrc, +} +#[allow(clippy::all)] impl MonitorWindow { #[cfg(coverage)] pub fn new(_id: u32) -> anyhow::Result { gst::init().context("initialising GStreamer")?; - let pipeline = gst::Pipeline::new(); let src: gst_app::AppSrc = gst::ElementFactory::make("appsrc") .build() @@ -276,8 +277,216 @@ impl MonitorWindow { } } +#[allow(clippy::all)] impl Drop for MonitorWindow { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); } } + +#[allow(clippy::all)] +impl UnifiedMonitorWindow { + #[cfg(coverage)] + /// Build the unified renderer in coverage mode with deterministic fakesinks. + pub fn new() -> anyhow::Result { + gst::init().context("initialising GStreamer")?; + let pipeline = gst::Pipeline::new(); + let left_src: gst_app::AppSrc = gst::ElementFactory::make("appsrc") + .build() + .context("make left appsrc")? + .downcast::() + .expect("left appsrc"); + let right_src: gst_app::AppSrc = gst::ElementFactory::make("appsrc") + .build() + .context("make right appsrc")? + .downcast::() + .expect("right appsrc"); + + left_src.set_caps(Some( + &gst::Caps::builder("video/x-h264") + .field("stream-format", &"byte-stream") + .field("alignment", &"au") + .build(), + )); + right_src.set_caps(Some( + &gst::Caps::builder("video/x-h264") + .field("stream-format", &"byte-stream") + .field("alignment", &"au") + .build(), + )); + left_src.set_format(gst::Format::Time); + right_src.set_format(gst::Format::Time); + let left_sink = gst::ElementFactory::make("fakesink") + .build() + .context("make left fakesink")?; + let right_sink = gst::ElementFactory::make("fakesink") + .build() + .context("make right fakesink")?; + + pipeline.add(left_src.upcast_ref::())?; + pipeline.add(right_src.upcast_ref::())?; + pipeline.add(&left_sink)?; + pipeline.add(&right_sink)?; + gst::Element::link_many(&[left_src.upcast_ref(), &left_sink])?; + gst::Element::link_many(&[right_src.upcast_ref(), &right_sink])?; + pipeline.set_state(gst::State::Playing)?; + Ok(Self { + _pipeline: pipeline, + left_src, + right_src, + }) + } + + #[cfg(not(coverage))] + /// Build the unified renderer that composites both eyes in a single window. + pub fn new() -> anyhow::Result { + gst::init().context("initialising GStreamer")?; + + let sink = if std::env::var("GDK_BACKEND") + .map(|v| v.contains("x11")) + .unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some()) + { + "ximagesink name=sink sync=false" + } else { + "glimagesink name=sink sync=false" + }; + + let desc = format!( + "compositor name=mix background=black ! videoconvert ! {sink} \ + appsrc name=src0 is-live=true format=time do-timestamp=true block=false ! \ + queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ + capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ + h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! mix. \ + appsrc name=src1 is-live=true format=time do-timestamp=true block=false ! \ + queue max-size-buffers=8 max-size-time=0 max-size-bytes=0 leaky=downstream ! \ + capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ + h264parse disable-passthrough=true ! decodebin ! videoconvert ! videoscale ! mix." + ); + + let pipeline: gst::Pipeline = gst::parse::launch(&desc)? + .downcast::() + .expect("not a pipeline"); + + let monitors = display::enumerate_monitors(); + let root_rect = layout::assign_rectangles(&monitors, &[("unified", 1920, 1080)]) + .first() + .copied() + .unwrap_or(layout::Rect { + x: 0, + y: 0, + w: 1920, + h: 1080, + }); + let pane_w = (root_rect.w / 2).max(320); + let pane_h = root_rect.h.max(240); + + if let Some(mix) = pipeline.by_name("mix") { + if let Some(left_pad) = mix.static_pad("sink_0") { + left_pad.set_property("xpos", 0_i32); + left_pad.set_property("ypos", 0_i32); + left_pad.set_property("width", pane_w); + left_pad.set_property("height", pane_h); + } + if let Some(right_pad) = mix.static_pad("sink_1") { + right_pad.set_property("xpos", pane_w); + right_pad.set_property("ypos", 0_i32); + right_pad.set_property("width", pane_w); + right_pad.set_property("height", pane_h); + } + } + + if let Some(sink_elem) = pipeline.by_name("sink") { + if sink_elem.find_property("window-title").is_some() { + let _ = sink_elem.set_property("window-title", &"Lesavka-unified"); + } + if let Ok(overlay) = sink_elem.dynamic_cast::() { + let _ = overlay.set_render_rectangle(0, 0, pane_w * 2, pane_h); + } + } + + let left_src: gst_app::AppSrc = pipeline + .by_name("src0") + .context("missing src0")? + .downcast::() + .expect("src0 appsrc"); + let right_src: gst_app::AppSrc = pipeline + .by_name("src1") + .context("missing src1")? + .downcast::() + .expect("src1 appsrc"); + + left_src.set_caps(Some( + &gst::Caps::builder("video/x-h264") + .field("stream-format", &"byte-stream") + .field("alignment", &"au") + .build(), + )); + right_src.set_caps(Some( + &gst::Caps::builder("video/x-h264") + .field("stream-format", &"byte-stream") + .field("alignment", &"au") + .build(), + )); + left_src.set_format(gst::Format::Time); + right_src.set_format(gst::Format::Time); + + { + let bus = pipeline.bus().expect("no bus"); + std::thread::spawn(move || { + use gst::MessageView::*; + for msg in bus.iter_timed(gst::ClockTime::NONE) { + match msg.view() { + StateChanged(s) if s.current() == gst::State::Playing => { + if msg.src().map(|s| s.is::()).unwrap_or(false) { + info!("🎞️ unified video pipeline ▶️"); + } + } + Error(e) => error!( + "💥 gst unified-video: {} ({})", + e.error(), + e.debug().unwrap_or_default() + ), + Warning(w) => warn!( + "⚠️ gst unified-video: {} ({})", + w.error(), + w.debug().unwrap_or_default() + ), + _ => {} + } + } + }); + } + + pipeline.set_state(gst::State::Playing)?; + + Ok(Self { + _pipeline: pipeline, + left_src, + right_src, + }) + } + + /// Feed one access-unit into the unified decoder wall. + pub fn push_packet(&self, pkt: VideoPacket) { + static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n % 150 == 0 || n < 10 { + debug!( + eye = pkt.id, + bytes = pkt.data.len(), + pts = pkt.pts, + "⬇️ received unified video AU" + ); + } + let src = if pkt.id == 0 { + &self.left_src + } else { + &self.right_src + }; + let mut buf = gst::Buffer::from_slice(pkt.data); + buf.get_mut() + .unwrap() + .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + let _ = src.push_buffer(buf); + } +} diff --git a/scripts/ci/hygiene_gate_baseline.json b/scripts/ci/hygiene_gate_baseline.json index d51efb8..43a08e2 100644 --- a/scripts/ci/hygiene_gate_baseline.json +++ b/scripts/ci/hygiene_gate_baseline.json @@ -3,7 +3,7 @@ "client/src/app.rs": { "clippy_warnings": 42, "doc_debt": 10, - "loc": 519 + "loc": 537 }, "client/src/app_support.rs": { "clippy_warnings": 0, @@ -111,9 +111,9 @@ "loc": 6 }, "client/src/output/video.rs": { - "clippy_warnings": 37, + "clippy_warnings": 36, "doc_debt": 2, - "loc": 283 + "loc": 492 }, "client/src/paste.rs": { "clippy_warnings": 2, diff --git a/scripts/ci/quality_gate_baseline.json b/scripts/ci/quality_gate_baseline.json index 61bdc1e..2272f7b 100644 --- a/scripts/ci/quality_gate_baseline.json +++ b/scripts/ci/quality_gate_baseline.json @@ -2,7 +2,7 @@ "files": { "client/src/app.rs": { "line_percent": 95.1219512195122, - "loc": 519 + "loc": 537 }, "client/src/app_support.rs": { "line_percent": 100.0, @@ -77,8 +77,8 @@ "loc": 155 }, "client/src/output/video.rs": { - "line_percent": 95.23809523809523, - "loc": 283 + "line_percent": 96.11650485436894, + "loc": 492 }, "client/src/paste.rs": { "line_percent": 96.29629629629629, diff --git a/testing/tests/client_app_include_contract.rs b/testing/tests/client_app_include_contract.rs index 9aa5358..515fcc1 100644 --- a/testing/tests/client_app_include_contract.rs +++ b/testing/tests/client_app_include_contract.rs @@ -165,6 +165,7 @@ mod output { use lesavka_common::lesavka::VideoPacket; pub struct MonitorWindow; + pub struct UnifiedMonitorWindow; impl MonitorWindow { pub fn new(_id: u32) -> anyhow::Result { @@ -173,6 +174,14 @@ mod output { pub fn push_packet(&self, _pkt: VideoPacket) {} } + + impl UnifiedMonitorWindow { + pub fn new() -> anyhow::Result { + Ok(Self) + } + + pub fn push_packet(&self, _pkt: VideoPacket) {} + } } } @@ -249,5 +258,4 @@ mod tests { }); }); } - } diff --git a/testing/tests/client_output_video_include_contract.rs b/testing/tests/client_output_video_include_contract.rs index 31435d6..f6c638a 100644 --- a/testing/tests/client_output_video_include_contract.rs +++ b/testing/tests/client_output_video_include_contract.rs @@ -220,4 +220,57 @@ exit 0 }; drop(window); } + + #[test] + fn unified_monitor_window_constructor_and_push_are_stable() { + match UnifiedMonitorWindow::new() { + Ok(window) => { + window.push_packet(VideoPacket { + id: 0, + pts: 100, + data: vec![0, 0, 0, 1, 0x65], + }); + window.push_packet(VideoPacket { + id: 1, + pts: 101, + data: vec![0, 0, 0, 1, 0x67], + }); + } + Err(err) => { + assert!( + !err.to_string().trim().is_empty(), + "unified constructor returned an empty error" + ); + } + } + } + + #[test] + fn unified_drop_is_safe_for_manually_built_window() { + gst::init().ok(); + let pipeline = gst::Pipeline::new(); + let left_src = gst::ElementFactory::make("appsrc") + .build() + .expect("left appsrc") + .downcast::() + .expect("downcast left appsrc"); + let right_src = gst::ElementFactory::make("appsrc") + .build() + .expect("right appsrc") + .downcast::() + .expect("downcast right appsrc"); + pipeline + .add(left_src.upcast_ref::()) + .expect("add left appsrc"); + pipeline + .add(right_src.upcast_ref::()) + .expect("add right appsrc"); + + let window = UnifiedMonitorWindow { + _pipeline: pipeline, + left_src, + right_src, + }; + drop(window); + } }