diff --git a/client/src/app.rs b/client/src/app.rs index f72afc9..b918569 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -1,23 +1,22 @@ -// client/src/app.rs - #![forbid(unsafe_code)] use anyhow::Result; use std::time::Duration; use tokio::sync::broadcast; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; -use tonic::{Request, transport::Channel}; -use tracing::{debug, error, info, warn, trace}; +use tonic::{transport::Channel, Request}; +use tracing::{error, info, warn}; use winit::{ + event::Event, event_loop::EventLoopBuilder, platform::wayland::EventLoopBuilderExtWayland, - event::Event, }; -use lesavka_common::lesavka::{relay_client::RelayClient, KeyboardReport, MouseReport, MonitorRequest, VideoPacket}; +use lesavka_common::lesavka::{ + relay_client::RelayClient, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, +}; -use crate::input::inputs::InputAggregator; -use crate::output::video::MonitorWindow; +use crate::{input::inputs::InputAggregator, output::video::MonitorWindow}; pub struct LesavkaClientApp { aggregator: Option, @@ -29,72 +28,63 @@ pub struct LesavkaClientApp { impl LesavkaClientApp { pub fn new() -> Result { - let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); + let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); let server_addr = std::env::args() .nth(1) .or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".into()); - let (kbd_tx, _) = broadcast::channel::(1024); - let (mou_tx, _) = broadcast::channel::(4096); + let (kbd_tx, _) = broadcast::channel(1024); + let (mou_tx, _) = broadcast::channel(4096); let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); - agg.init()?; // grab devices + agg.init()?; // grab devices immediately - Ok(Self { aggregator: Some(agg), - server_addr, dev_mode, - kbd_tx, mou_tx }) + Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) } pub async fn run(&mut self) -> Result<()> { - // ---- build two channels ------------------------------------------------ - let hid_ep: Channel = Channel::from_shared(self.server_addr.clone()) - .unwrap() + /*────────── persistent gRPC channels ──────────*/ + let hid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) .concurrency_limit(1) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_lazy(); - let vid_ep: Channel = Channel::from_shared(self.server_addr.clone()) - .unwrap() + let vid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) .connect_lazy(); - /* detach the aggregator before spawn so `self` is not moved */ + /*────────── input aggregator task ─────────────*/ let aggregator = self.aggregator.take().expect("InputAggregator present"); - let agg_task = tokio::spawn(async move { - let mut agg = aggregator; - agg.run().await + let agg_task = tokio::spawn(async move { + let mut a = aggregator; + a.run().await }); - /* two networking tasks */ + /*────────── HID streams (never return) ────────*/ let kbd_loop = self.stream_loop_keyboard(hid_ep.clone()); let mou_loop = self.stream_loop_mouse(hid_ep.clone()); - /* optional suicide timer */ + /*────────── optional 30 s auto‑exit in dev mode */ let suicide = async { if self.dev_mode { tokio::time::sleep(Duration::from_secs(30)).await; - warn!("💀 dev‑mode timeout 💀"); - // self.aggregator.keyboards.dev.ungrab(); - // self.aggregator.mice.dev.ungrab(); + warn!("💀 dev‑mode timeout"); std::process::exit(0); - } else { futures::future::pending::<()>().await } + } else { + std::future::pending::<()>().await + } }; - /* video windows use a dedicated event‑loop thread */ + /*────────── video rendering thread (winit) ────*/ let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::(); - // let (event_tx, event_rx) = std::sync::mpsc::channel(); - let (_event_tx, _event_rx) = std::sync::mpsc::channel::<()>(); std::thread::spawn(move || { - let el = EventLoopBuilder::<()>::new() - .with_any_thread(true) - .build() - .unwrap(); + let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap(); let win0 = MonitorWindow::new(0, &el).expect("win0"); let win1 = MonitorWindow::new(1, &el).expect("win1"); - + let _ = el.run(move |_: Event<()>, _| { while let Ok(pkt) = video_rx.try_recv() { match pkt.id { @@ -106,52 +96,52 @@ impl LesavkaClientApp { }); }); - let vid_loop = Self::video_loop(vid_ep.clone(), video_tx); + /*────────── start video gRPC pullers ──────────*/ + tokio::spawn(Self::video_loop(vid_ep.clone(), video_tx)); + /*────────── central reactor ───────────────────*/ tokio::select! { - _ = kbd_loop => unreachable!(), - _ = mou_loop => unreachable!(), - _ = vid_loop => unreachable!(), - _ = suicide => unreachable!(), - // _ = suicide => { warn!("dev‑mode timeout"); std::process::exit(0) }, - r = agg_task => { - error!("aggregator task ended: {r:?}"); - std::process::exit(1) + _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, + _ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); }, + _ = suicide => { /* handled above */ }, + r = agg_task => { + match r { + Ok(Ok(())) => warn!("input aggregator terminated cleanly"), + Ok(Err(e)) => error!("input aggregator error: {e:?}"), + Err(join_err) => error!("aggregator task panicked: {join_err:?}"), + } + std::process::exit(1); } } + + // The branches above either loop forever or exit the process; this + // point is unreachable but satisfies the type checker. + #[allow(unreachable_code)] + Ok(()) } /*──────────────── keyboard stream ───────────────*/ async fn stream_loop_keyboard(&self, ep: Channel) { loop { info!("⌨️ connect {}", self.server_addr); - // let mut cli = match RelayClient::connect(self.server_addr.clone()).await { - // Ok(c) => c, - // Err(e) => { error!("connect: {e}"); Self::delay().await; continue } - // }; let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok()); match cli.stream_keyboard(Request::new(outbound)).await { Ok(mut resp) => { - // spawn a task just to drain echoes (keeps h2 window happy) + // Drain echoes so the h2 window never fills up. tokio::spawn(async move { - while let Some(_)= resp.get_mut().message().await.transpose() {} - warn!("⌨️ server closed stream"); + while let Some(_) = resp.get_mut().message().await.transpose() {} + warn!("⌨️ server closed keyboard stream"); }); } Err(e) => { error!("stream_keyboard: {e}"); - Self::delay().await; + tokio::time::sleep(Duration::from_secs(1)).await; continue; } } - // from now on we just park – connection persists until it errors - futures::future::pending::<()>().await; - - // drop(resp); - // warn!("⌨️ disconnected"); - // Self::delay().await; + std::future::pending::<()>().await; } } @@ -159,32 +149,23 @@ impl LesavkaClientApp { async fn stream_loop_mouse(&self, ep: Channel) { loop { info!("🖱️ connect {}", self.server_addr); - // let mut cli = match RelayClient::connect(self.server_addr.clone()).await { - // Ok(c) => c, - // Err(e) => { error!("connect: {e}"); Self::delay().await; continue } - // }; let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok()); match cli.stream_mouse(Request::new(outbound)).await { Ok(mut resp) => { - // spawn a task just to drain echoes (keeps h2 window happy) tokio::spawn(async move { - while let Some(_)= resp.get_mut().message().await.transpose() {} - warn!("⌨️ server closed stream"); + while let Some(_) = resp.get_mut().message().await.transpose() {} + warn!("🖱️ server closed mouse stream"); }); } Err(e) => { error!("stream_mouse: {e}"); - Self::delay().await; + tokio::time::sleep(Duration::from_secs(1)).await; continue; } } - // from now on we just park – connection persists until it errors - futures::future::pending::<()>().await; - // drop(resp); - // warn!("🖱️ disconnected"); - // Self::delay().await; + std::future::pending::<()>().await; } } @@ -194,8 +175,8 @@ impl LesavkaClientApp { tx: tokio::sync::mpsc::UnboundedSender, ) { for monitor_id in 0..=1 { - let tx = tx.clone(); - let ep = ep.clone(); + let ep = ep.clone(); + let tx = tx.clone(); tokio::spawn(async move { loop { let mut cli = RelayClient::new(ep.clone()); @@ -204,8 +185,8 @@ impl LesavkaClientApp { Ok(mut stream) => { while let Some(pkt) = stream.get_mut().message().await.transpose() { match pkt { - Ok(p) => { let _ = tx.send(p); } - Err(e) => { error!("video {monitor_id}: {e}"); break } + Ok(p) => { let _ = tx.send(p); } + Err(e) => { error!("video {monitor_id}: {e}"); break; } } } } @@ -216,8 +197,4 @@ impl LesavkaClientApp { }); } } - - - #[inline(always)] - async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; } } diff --git a/client/src/output/video.rs b/client/src/output/video.rs index bf21337..1d4a614 100644 --- a/client/src/output/video.rs +++ b/client/src/output/video.rs @@ -1,64 +1,66 @@ -// client/src/output/video.rs - use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; -use gst_app::prelude::*; use lesavka_common::lesavka::VideoPacket; -use winit::window::{Window, WindowAttributes}; -use winit::event_loop::EventLoop; +use winit::{ + event_loop::EventLoop, + window::{Window, WindowAttributes}, +}; pub struct MonitorWindow { - id: u32, + id: u32, _window: Window, - src: gst_app::AppSrc, + src: gst_app::AppSrc, } impl MonitorWindow { pub fn new(id: u32, el: &EventLoop<()>) -> anyhow::Result { - gst::init()?; + gst::init()?; // idempotent + /*────────────────────── window ──────────────────────*/ let window = el.create_window( WindowAttributes::default() .with_title(format!("Lesavka‑monitor‑{id}")) - .with_decorations(false) + .with_decorations(false), )?; + /*────────────────────── pipeline ────────────────────*/ let caps = gst::Caps::builder("video/x-h264") .field("stream-format", &"byte-stream") - .field("alignment", &"au") + .field("alignment", &"au") .build(); + // Optional HW decode with VA‑API if LESAVKA_HW_DEC is set. let desc = if std::env::var_os("LESAVKA_HW_DEC").is_some() { - "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ - capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ - queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! \ - h264parse ! vaapih264dec low-latency=true ! videoconvert ! \ - autovideosink sync=false max-lateness=-1" + concat!( + "appsrc name=src is-live=true format=time do-timestamp=true block=false ! ", + "capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! ", + "queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! ", + "h264parse ! vaapih264dec low-latency=true ! videoconvert ! ", + "autovideosink sync=false", + ) } else { - "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ - capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ - queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! \ - h264parse ! decodebin ! videoconvert ! autovideosink sync=false max-lateness=-1" + concat!( + "appsrc name=src is-live=true format=time do-timestamp=true block=false ! ", + "capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! ", + "queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! ", + "h264parse ! decodebin ! videoconvert ! autovideosink sync=false", + ) }; let pipeline = gst::parse::launch(desc)? .downcast::() - .unwrap(); + .expect("pipeline down‑cast"); - let src = pipeline.by_name("src").unwrap() - .downcast::().unwrap(); + let src = pipeline + .by_name("src") + .expect("appsink") + .downcast::() + .expect("appsink down‑cast"); src.set_caps(Some(&caps)); - - // use the dedicated helpers from `AppSrcExt` instead of the generic - // `set_property`, which returns `()` (hence the earlier E0277). - src.set_format(gst::Format::Time); // timestamps are in running-time - - // “blocksize=0” → deliver whole access-units (no chunking). The generic - // `set_property()` API returns a `Result<(), glib::BoolError>` so we just - // unwrap: this property always exists on AppSrc. - src.set_property("blocksize", &0u32); + src.set_format(gst::Format::Time); // running‑time PTS + src.set_property("blocksize", &0u32); // whole AU per buffer src.set_latency(gst::ClockTime::NONE, gst::ClockTime::NONE); pipeline.set_state(gst::State::Playing)?; @@ -66,10 +68,12 @@ impl MonitorWindow { Ok(Self { id, _window: window, src }) } + /// Push one encoded access‑unit into the local pipeline. pub fn push_packet(&self, pkt: VideoPacket) { let mut buf = gst::Buffer::from_slice(pkt.data); - buf.get_mut().unwrap() - .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); - let _ = self.src.push_buffer(buf); + if let Some(mut b) = buf.get_mut() { + b.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + } + let _ = self.src.push_buffer(buf); // ignore Eos / Flushing } } diff --git a/scripts/install-server.sh b/scripts/install-server.sh index f3c213c..6c5cd54 100755 --- a/scripts/install-server.sh +++ b/scripts/install-server.sh @@ -104,7 +104,7 @@ After=network.target lesavka-core.service [Service] ExecStart=/usr/local/bin/lesavka-server Restart=always -Environment=RUST_LOG=lesavka_server=info,lesavka_server::usb_gadget=info +Environment=RUST_LOG=lesavka_server=debug,lesavka_server::usb_gadget=info Environment=RUST_BACKTRACE=1 Restart=always RestartSec=5