video updates

This commit is contained in:
Brad Stein 2025-06-26 17:26:28 -05:00
parent 0940004c70
commit db9239acbc
3 changed files with 98 additions and 117 deletions

View File

@ -1,23 +1,22 @@
// client/src/app.rs
#![forbid(unsafe_code)]
use anyhow::Result;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tonic::{Request, transport::Channel};
use tracing::{debug, error, info, warn, trace};
use tonic::{transport::Channel, Request};
use tracing::{error, info, warn};
use winit::{
event::Event,
event_loop::EventLoopBuilder,
platform::wayland::EventLoopBuilderExtWayland,
event::Event,
};
use lesavka_common::lesavka::{relay_client::RelayClient, KeyboardReport, MouseReport, MonitorRequest, VideoPacket};
use lesavka_common::lesavka::{
relay_client::RelayClient, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
};
use crate::input::inputs::InputAggregator;
use crate::output::video::MonitorWindow;
use crate::{input::inputs::InputAggregator, output::video::MonitorWindow};
pub struct LesavkaClientApp {
aggregator: Option<InputAggregator>,
@ -29,72 +28,63 @@ pub struct LesavkaClientApp {
impl LesavkaClientApp {
pub fn new() -> Result<Self> {
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let server_addr = std::env::args()
.nth(1)
.or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok())
.unwrap_or_else(|| "http://127.0.0.1:50051".into());
let (kbd_tx, _) = broadcast::channel::<KeyboardReport>(1024);
let (mou_tx, _) = broadcast::channel::<MouseReport>(4096);
let (kbd_tx, _) = broadcast::channel(1024);
let (mou_tx, _) = broadcast::channel(4096);
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
agg.init()?; // grab devices
agg.init()?; // grab devices immediately
Ok(Self { aggregator: Some(agg),
server_addr, dev_mode,
kbd_tx, mou_tx })
Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx })
}
pub async fn run(&mut self) -> Result<()> {
// ---- build two channels ------------------------------------------------
let hid_ep: Channel = Channel::from_shared(self.server_addr.clone())
.unwrap()
/*────────── persistent gRPC channels ──────────*/
let hid_ep = Channel::from_shared(self.server_addr.clone())?
.tcp_nodelay(true)
.concurrency_limit(1)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_lazy();
let vid_ep: Channel = Channel::from_shared(self.server_addr.clone())
.unwrap()
let vid_ep = Channel::from_shared(self.server_addr.clone())?
.tcp_nodelay(true)
.connect_lazy();
/* detach the aggregator before spawn so `self` is not moved */
/*────────── input aggregator task ─────────────*/
let aggregator = self.aggregator.take().expect("InputAggregator present");
let agg_task = tokio::spawn(async move {
let mut agg = aggregator;
agg.run().await
let agg_task = tokio::spawn(async move {
let mut a = aggregator;
a.run().await
});
/* two networking tasks */
/*────────── HID streams (never return) ────────*/
let kbd_loop = self.stream_loop_keyboard(hid_ep.clone());
let mou_loop = self.stream_loop_mouse(hid_ep.clone());
/* optional suicide timer */
/*────────── optional 30s autoexit in dev mode */
let suicide = async {
if self.dev_mode {
tokio::time::sleep(Duration::from_secs(30)).await;
warn!("💀 devmode timeout 💀");
// self.aggregator.keyboards.dev.ungrab();
// self.aggregator.mice.dev.ungrab();
warn!("💀 devmode timeout");
std::process::exit(0);
} else { futures::future::pending::<()>().await }
} else {
std::future::pending::<()>().await
}
};
/* video windows use a dedicated eventloop thread */
/*────────── video rendering thread (winit) ────*/
let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::<VideoPacket>();
// let (event_tx, event_rx) = std::sync::mpsc::channel();
let (_event_tx, _event_rx) = std::sync::mpsc::channel::<()>();
std::thread::spawn(move || {
let el = EventLoopBuilder::<()>::new()
.with_any_thread(true)
.build()
.unwrap();
let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap();
let win0 = MonitorWindow::new(0, &el).expect("win0");
let win1 = MonitorWindow::new(1, &el).expect("win1");
let _ = el.run(move |_: Event<()>, _| {
while let Ok(pkt) = video_rx.try_recv() {
match pkt.id {
@ -106,52 +96,52 @@ impl LesavkaClientApp {
});
});
let vid_loop = Self::video_loop(vid_ep.clone(), video_tx);
/*────────── start video gRPC pullers ──────────*/
tokio::spawn(Self::video_loop(vid_ep.clone(), video_tx));
/*────────── central reactor ───────────────────*/
tokio::select! {
_ = kbd_loop => unreachable!(),
_ = mou_loop => unreachable!(),
_ = vid_loop => unreachable!(),
_ = suicide => unreachable!(),
// _ = suicide => { warn!("devmode timeout"); std::process::exit(0) },
r = agg_task => {
error!("aggregator task ended: {r:?}");
std::process::exit(1)
_ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); },
_ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); },
_ = suicide => { /* handled above */ },
r = agg_task => {
match r {
Ok(Ok(())) => warn!("input aggregator terminated cleanly"),
Ok(Err(e)) => error!("input aggregator error: {e:?}"),
Err(join_err) => error!("aggregator task panicked: {join_err:?}"),
}
std::process::exit(1);
}
}
// The branches above either loop forever or exit the process; this
// point is unreachable but satisfies the type checker.
#[allow(unreachable_code)]
Ok(())
}
/*──────────────── keyboard stream ───────────────*/
async fn stream_loop_keyboard(&self, ep: Channel) {
loop {
info!("⌨️ connect {}", self.server_addr);
// let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
// Ok(c) => c,
// Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
// };
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok());
match cli.stream_keyboard(Request::new(outbound)).await {
Ok(mut resp) => {
// spawn a task just to drain echoes (keeps h2 window happy)
// Drain echoes so the h2 window never fills up.
tokio::spawn(async move {
while let Some(_)= resp.get_mut().message().await.transpose() {}
warn!("⌨️ server closed stream");
while let Some(_) = resp.get_mut().message().await.transpose() {}
warn!("⌨️ server closed keyboard stream");
});
}
Err(e) => {
error!("stream_keyboard: {e}");
Self::delay().await;
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
// from now on we just park connection persists until it errors
futures::future::pending::<()>().await;
// drop(resp);
// warn!("⌨️ disconnected");
// Self::delay().await;
std::future::pending::<()>().await;
}
}
@ -159,32 +149,23 @@ impl LesavkaClientApp {
async fn stream_loop_mouse(&self, ep: Channel) {
loop {
info!("🖱️ connect {}", self.server_addr);
// let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
// Ok(c) => c,
// Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
// };
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok());
match cli.stream_mouse(Request::new(outbound)).await {
Ok(mut resp) => {
// spawn a task just to drain echoes (keeps h2 window happy)
tokio::spawn(async move {
while let Some(_)= resp.get_mut().message().await.transpose() {}
warn!("⌨️ server closed stream");
while let Some(_) = resp.get_mut().message().await.transpose() {}
warn!("🖱️ server closed mouse stream");
});
}
Err(e) => {
error!("stream_mouse: {e}");
Self::delay().await;
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
}
// from now on we just park connection persists until it errors
futures::future::pending::<()>().await;
// drop(resp);
// warn!("🖱️ disconnected");
// Self::delay().await;
std::future::pending::<()>().await;
}
}
@ -194,8 +175,8 @@ impl LesavkaClientApp {
tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>,
) {
for monitor_id in 0..=1 {
let tx = tx.clone();
let ep = ep.clone();
let ep = ep.clone();
let tx = tx.clone();
tokio::spawn(async move {
loop {
let mut cli = RelayClient::new(ep.clone());
@ -204,8 +185,8 @@ impl LesavkaClientApp {
Ok(mut stream) => {
while let Some(pkt) = stream.get_mut().message().await.transpose() {
match pkt {
Ok(p) => { let _ = tx.send(p); }
Err(e) => { error!("video {monitor_id}: {e}"); break }
Ok(p) => { let _ = tx.send(p); }
Err(e) => { error!("video {monitor_id}: {e}"); break; }
}
}
}
@ -216,8 +197,4 @@ impl LesavkaClientApp {
});
}
}
#[inline(always)]
async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; }
}

View File

@ -1,64 +1,66 @@
// client/src/output/video.rs
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst_app::prelude::*;
use lesavka_common::lesavka::VideoPacket;
use winit::window::{Window, WindowAttributes};
use winit::event_loop::EventLoop;
use winit::{
event_loop::EventLoop,
window::{Window, WindowAttributes},
};
pub struct MonitorWindow {
id: u32,
id: u32,
_window: Window,
src: gst_app::AppSrc,
src: gst_app::AppSrc,
}
impl MonitorWindow {
pub fn new(id: u32, el: &EventLoop<()>) -> anyhow::Result<Self> {
gst::init()?;
gst::init()?; // idempotent
/*────────────────────── window ──────────────────────*/
let window = el.create_window(
WindowAttributes::default()
.with_title(format!("Lesavkamonitor{id}"))
.with_decorations(false)
.with_decorations(false),
)?;
/*────────────────────── pipeline ────────────────────*/
let caps = gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.field("alignment", &"au")
.build();
// Optional HW decode with VAAPI if LESAVKA_HW_DEC is set.
let desc = if std::env::var_os("LESAVKA_HW_DEC").is_some() {
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! \
h264parse ! vaapih264dec low-latency=true ! videoconvert ! \
autovideosink sync=false max-lateness=-1"
concat!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! ",
"capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! ",
"queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! ",
"h264parse ! vaapih264dec low-latency=true ! videoconvert ! ",
"autovideosink sync=false",
)
} else {
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! \
h264parse ! decodebin ! videoconvert ! autovideosink sync=false max-lateness=-1"
concat!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! ",
"capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! ",
"queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! ",
"h264parse ! decodebin ! videoconvert ! autovideosink sync=false",
)
};
let pipeline = gst::parse::launch(desc)?
.downcast::<gst::Pipeline>()
.unwrap();
.expect("pipeline downcast");
let src = pipeline.by_name("src").unwrap()
.downcast::<gst_app::AppSrc>().unwrap();
let src = pipeline
.by_name("src")
.expect("appsink")
.downcast::<gst_app::AppSrc>()
.expect("appsink downcast");
src.set_caps(Some(&caps));
// use the dedicated helpers from `AppSrcExt` instead of the generic
// `set_property`, which returns `()` (hence the earlier E0277).
src.set_format(gst::Format::Time); // timestamps are in running-time
// “blocksize=0” → deliver whole access-units (no chunking). The generic
// `set_property()` API returns a `Result<(), glib::BoolError>` so we just
// unwrap: this property always exists on AppSrc.
src.set_property("blocksize", &0u32);
src.set_format(gst::Format::Time); // runningtime PTS
src.set_property("blocksize", &0u32); // whole AU per buffer
src.set_latency(gst::ClockTime::NONE, gst::ClockTime::NONE);
pipeline.set_state(gst::State::Playing)?;
@ -66,10 +68,12 @@ impl MonitorWindow {
Ok(Self { id, _window: window, src })
}
/// Push one encoded accessunit into the local pipeline.
pub fn push_packet(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut().unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.src.push_buffer(buf);
if let Some(mut b) = buf.get_mut() {
b.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
}
let _ = self.src.push_buffer(buf); // ignore Eos / Flushing
}
}

View File

@ -104,7 +104,7 @@ After=network.target lesavka-core.service
[Service]
ExecStart=/usr/local/bin/lesavka-server
Restart=always
Environment=RUST_LOG=lesavka_server=info,lesavka_server::usb_gadget=info
Environment=RUST_LOG=lesavka_server=debug,lesavka_server::usb_gadget=info
Environment=RUST_BACKTRACE=1
Restart=always
RestartSec=5