diff --git a/client/src/app.rs b/client/src/app.rs index 02c2d7f..f72afc9 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -75,7 +75,7 @@ impl LesavkaClientApp { let suicide = async { if self.dev_mode { tokio::time::sleep(Duration::from_secs(30)).await; - warn!("dev‑mode timeout"); + warn!("πŸ’€ dev‑mode timeout πŸ’€"); // self.aggregator.keyboards.dev.ungrab(); // self.aggregator.mice.dev.ungrab(); std::process::exit(0); @@ -189,23 +189,34 @@ impl LesavkaClientApp { } /*──────────────── monitor stream ────────────────*/ - async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::UnboundedSender) { - loop { - let mut cli = RelayClient::new(ep.clone()); - for monitor_id in 0..=1 { + async fn video_loop( + ep: Channel, + tx: tokio::sync::mpsc::UnboundedSender, + ) { + for monitor_id in 0..=1 { + let tx = tx.clone(); + let ep = ep.clone(); + tokio::spawn(async move { + loop { + let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; - if let Ok(mut stream) = cli.capture_video(Request::new(req)).await { - while let Some(pkt) = stream.get_mut().message().await.transpose() { - match pkt { - Ok(p) => { let _ = tx.send(p); } - Err(e) => { error!("video {monitor_id}: {e}"); break } + match cli.capture_video(Request::new(req)).await { + Ok(mut stream) => { + while let Some(pkt) = stream.get_mut().message().await.transpose() { + match pkt { + Ok(p) => { let _ = tx.send(p); } + Err(e) => { error!("video {monitor_id}: {e}"); break } + } } } + Err(e) => error!("video {monitor_id}: {e}"), } + tokio::time::sleep(Duration::from_secs(1)).await; } - tokio::time::sleep(Duration::from_secs(2)).await; + }); } } + #[inline(always)] async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/client/src/input/mouse.rs b/client/src/input/mouse.rs index 3de7978..78d1d2f 100644 --- a/client/src/input/mouse.rs +++ b/client/src/input/mouse.rs @@ -7,7 +7,7 @@ use tracing::{debug, error, warn, trace}; use lesavka_common::lesavka::MouseReport; -const SEND_INTERVAL: Duration = Duration::from_micros(250); +const SEND_INTERVAL: Duration = Duration::from_micros(50); pub struct MouseAggregator { dev: Device, diff --git a/client/src/output/video.rs b/client/src/output/video.rs index f25cb4b..bf21337 100644 --- a/client/src/output/video.rs +++ b/client/src/output/video.rs @@ -24,26 +24,43 @@ impl MonitorWindow { .with_decorations(false) )?; - // appsrc -> decode -> convert -> autovideosink + let caps = gst::Caps::builder("video/x-h264") + .field("stream-format", &"byte-stream") + .field("alignment", &"au") + .build(); + let desc = if std::env::var_os("LESAVKA_HW_DEC").is_some() { - "appsrc name=src is-live=true format=time do-timestamp=true ! queue ! \ - h264parse ! vaapih264dec low-latency=true ! videoconvert ! \ - autovideosink sync=false" + "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ + capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ + queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! \ + h264parse ! vaapih264dec low-latency=true ! videoconvert ! \ + autovideosink sync=false max-lateness=-1" } else { - // fallback - "appsrc name=src is-live=true format=time do-timestamp=true ! \ - queue ! h264parse ! decodebin ! videoconvert ! autovideosink sync=false" + "appsrc name=src is-live=true format=time do-timestamp=true block=false ! \ + capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \ + queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! \ + h264parse ! decodebin ! videoconvert ! autovideosink sync=false max-lateness=-1" }; let pipeline = gst::parse::launch(desc)? .downcast::() .unwrap(); - - let src = pipeline - .by_name("src").unwrap() + + let src = pipeline.by_name("src").unwrap() .downcast::().unwrap(); + src.set_caps(Some(&caps)); + + // use the dedicated helpers from `AppSrcExt` instead of the generic + // `set_property`, which returns `()` (hence the earlier E0277). + src.set_format(gst::Format::Time); // timestamps are in running-time + + // β€œblocksize=0” β†’ deliver whole access-units (no chunking). The generic + // `set_property()` API returns a `Result<(), glib::BoolError>` so we just + // unwrap: this property always exists on AppSrc. + src.set_property("blocksize", &0u32); src.set_latency(gst::ClockTime::NONE, gst::ClockTime::NONE); + pipeline.set_state(gst::State::Playing)?; Ok(Self { id, _window: window, src }) diff --git a/server/src/main.rs b/server/src/main.rs index 30e79b2..fdcce9f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -327,6 +327,7 @@ async fn main() -> anyhow::Result<()> { if let Err(e) = Server::builder() .tcp_nodelay(true) + .max_frame_size(Some(256 * 1024)) .add_service(RelayServer::new(handler)) .serve(([0, 0, 0, 0], 50051).into()) .await diff --git a/server/src/video.rs b/server/src/video.rs index b149dc0..d48787e 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -15,10 +15,13 @@ pub async fn spawn_camera( ) -> anyhow::Result>> { gst::init().context("gst init")?; - // v4l2src β†’ H.264 already, we only parse & relay + // IMPORTANT: keep one AU per buffer, include regular SPS/PPS let desc = format!( - "v4l2src device={dev} io-mode=auto ! queue ! h264parse config-interval=-1 ! \ - appsink name=sink emit-signals=true sync=false" + "v4l2src device={dev} io-mode=dmabuf ! \ + video/x-h264,alignment=au,stream-format=byte-stream ! \ + queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 ! \ + h264parse config-interval=1 ! \ + appsink name=sink emit-signals=true sync=false drop=true" ); let pipeline = gst::parse::launch(&desc)?