diff --git a/client/src/app.rs b/client/src/app.rs index 2f425c3..c9d2a80 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -6,7 +6,7 @@ use anyhow::Result; use std::time::Duration; use tokio::sync::broadcast; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; -use tonic::Request; +use tonic::{Request, transport::Channel}; use tracing::{debug, error, info, warn, trace}; use winit::{ event_loop::EventLoopBuilder, @@ -29,10 +29,10 @@ pub struct LesavkaClientApp { impl LesavkaClientApp { pub fn new() -> Result { - let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok(); + let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); let server_addr = std::env::args() .nth(1) - .or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok()) + .or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".into()); let (kbd_tx, _) = broadcast::channel::(1024); @@ -47,6 +47,19 @@ impl LesavkaClientApp { } pub async fn run(&mut self) -> Result<()> { + // ---- build two channels ------------------------------------------------ + let hid_ep: Channel = Channel::from_shared(self.server_addr.clone()) + .unwrap() + .tcp_nodelay(true) + .concurrency_limit(1) + .http2_keep_alive_interval(Duration::from_secs(15)) + .connect_lazy(); + + let vid_ep: Channel = Channel::from_shared(self.server_addr.clone()) + .unwrap() + .tcp_nodelay(true) + .connect_lazy(); + /* detach the aggregator before spawn so `self` is not moved */ let aggregator = self.aggregator.take().expect("InputAggregator present"); let agg_task = tokio::spawn(async move { @@ -55,8 +68,8 @@ impl LesavkaClientApp { }); /* two networking tasks */ - let kbd_loop = self.stream_loop_keyboard(); - let mou_loop = self.stream_loop_mouse(); + let kbd_loop = self.stream_loop_keyboard(hid_ep.clone()); + let mou_loop = self.stream_loop_mouse(hid_ep.clone()); /* optional suicide timer */ let suicide = async { @@ -93,7 +106,7 @@ impl LesavkaClientApp { }); }); - let vid_loop = Self::video_loop(self.server_addr.clone(), video_tx); + let vid_loop = Self::video_loop(vid_ep.clone(), video_tx); tokio::select! { _ = kbd_loop => unreachable!(), @@ -109,82 +122,79 @@ impl LesavkaClientApp { } /*──────────────── keyboard stream ───────────────*/ - async fn stream_loop_keyboard(&self) { + async fn stream_loop_keyboard(&self, ep: Channel) { loop { info!("⌨️ connect {}", self.server_addr); - let mut cli = match RelayClient::connect(self.server_addr.clone()).await { - Ok(c) => c, - Err(e) => { error!("connect: {e}"); Self::delay().await; continue } - }; + // let mut cli = match RelayClient::connect(self.server_addr.clone()).await { + // Ok(c) => c, + // Err(e) => { error!("connect: {e}"); Self::delay().await; continue } + // }; + let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok()); let resp = match cli.stream_keyboard(Request::new(outbound)).await { Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); Self::delay().await; continue } }; - let mut inbound = resp.into_inner(); - while let Some(m) = inbound.message().await.transpose() { - match m { - Ok(r) => trace!("kbd echo {} B", r.data.len()), - Err(e) => { error!("kbd inbound: {e}"); break } - } - } + // let mut inbound = resp.into_inner(); + // while let Some(m) = inbound.message().await.transpose() { + // match m { + // Ok(r) => trace!("kbd echo {} B", r.data.len()), + // Err(e) => { error!("kbd inbound: {e}"); break } + // } + // } + drop(resp); warn!("⌨️ disconnected"); Self::delay().await; } } /*──────────────── mouse stream ──────────────────*/ - async fn stream_loop_mouse(&self) { + async fn stream_loop_mouse(&self, ep: Channel) { loop { info!("🖱️ connect {}", self.server_addr); - let mut cli = match RelayClient::connect(self.server_addr.clone()).await { - Ok(c) => c, - Err(e) => { error!("connect: {e}"); Self::delay().await; continue } - }; + // let mut cli = match RelayClient::connect(self.server_addr.clone()).await { + // Ok(c) => c, + // Err(e) => { error!("connect: {e}"); Self::delay().await; continue } + // }; + let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok()); let resp = match cli.stream_mouse(Request::new(outbound)).await { Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); Self::delay().await; continue } }; - let mut inbound = resp.into_inner(); - while let Some(m) = inbound.message().await.transpose() { - match m { - Ok(r) => trace!("mouse echo {} B", r.data.len()), - Err(e) => { error!("mouse inbound: {e}"); break } - } - } + // let mut inbound = resp.into_inner(); + // while let Some(m) = inbound.message().await.transpose() { + // match m { + // Ok(r) => trace!("mouse echo {} B", r.data.len()), + // Err(e) => { error!("mouse inbound: {e}"); break } + // } + // } + drop(resp); warn!("🖱️ disconnected"); Self::delay().await; } } /*──────────────── monitor stream ────────────────*/ - async fn video_loop(addr: String, tx: tokio::sync::mpsc::UnboundedSender) { + async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::UnboundedSender) { loop { - match RelayClient::connect(addr.clone()).await { - Ok(mut cli) => { - for monitor_id in 0..=1 { - let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; - match cli.capture_video(Request::new(req)).await { - Ok(mut stream) => { - while let Some(pkt) = stream.get_mut().message().await.transpose() { - match pkt { - Ok(p) => { let _ = tx.send(p); }, - Err(e) => { error!("video {monitor_id}: {e}"); break } - } - } + let mut cli = RelayClient::new(ep.clone()); + for monitor_id in 0..=1 { + let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; + if let Ok(mut stream) = cli.capture_video(Request::new(req)).await { + while let Some(pkt) = stream.get_mut().message().await.transpose() { + match pkt { + Ok(p) => { let _ = tx.send(p); } + Err(e) => { error!("video {monitor_id}: {e}"); break } } - Err(e) => error!("video {monitor_id}: {e}"), } } } - Err(e) => error!("video connect: {e}"), - } tokio::time::sleep(Duration::from_secs(2)).await; } - } + } #[inline(always)] async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/client/src/input/keyboard.rs b/client/src/input/keyboard.rs index 9b42676..1778919 100644 --- a/client/src/input/keyboard.rs +++ b/client/src/input/keyboard.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, sync::atomic::{AtomicU32, Ordering}}; use evdev::{Device, EventType, InputEvent, KeyCode}; use tokio::sync::broadcast::Sender; -use tracing::{debug, error, warn}; +use tracing::{debug, error, warn, trace}; use lesavka_common::lesavka::KeyboardReport; diff --git a/client/src/input/mouse.rs b/client/src/input/mouse.rs index 7f39b8c..96568f9 100644 --- a/client/src/input/mouse.rs +++ b/client/src/input/mouse.rs @@ -2,14 +2,18 @@ use evdev::{Device, EventType, InputEvent, KeyCode, RelativeAxisCode}; use tokio::sync::broadcast::{self, Sender}; -use tracing::{debug, error, warn}; +use std::time::{Duration, Instant}; +use tracing::{debug, error, warn, trace}; use lesavka_common::lesavka::MouseReport; +const SEND_INTERVAL: Duration = Duration::from_micros(2000); + pub struct MouseAggregator { dev: Device, tx: Sender, dev_mode: bool, + next_send: Instant, buttons: u8, last_buttons: u8, @@ -20,7 +24,7 @@ pub struct MouseAggregator { impl MouseAggregator { pub fn new(dev: Device, dev_mode: bool, tx: Sender) -> Self { - Self { dev, tx, dev_mode, buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 } + Self { dev, tx, dev_mode, next_send: Instant::now(), buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 } } #[inline] fn slog(&self, f: impl FnOnce()) { if self.dev_mode { f() } } @@ -60,9 +64,8 @@ impl MouseAggregator { } fn flush(&mut self) { - if self.dx==0 && self.dy==0 && self.wheel==0 && self.buttons==self.last_buttons { - return; - } + if Instant::now() < self.next_send { return; } + self.next_send += SEND_INTERVAL; let pkt = [ self.buttons, diff --git a/client/src/main.rs b/client/src/main.rs index a7b2125..5e13f9b 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -21,7 +21,7 @@ fn ensure_runtime_dir() { } } -#[tokio::main] +#[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { ensure_runtime_dir(); @@ -41,7 +41,7 @@ async fn main() -> Result<()> { .with_thread_ids(true) .with_file(true); - let dev_mode = env::var("NAVKA_DEV_MODE").is_ok(); + let dev_mode = env::var("LESAVKA_DEV_MODE").is_ok(); let mut _guard: Option = None; // keep guard alive /*------------- subscriber setup -----------------------------------*/ diff --git a/client/src/output/video.rs b/client/src/output/video.rs index d8c400d..f25cb4b 100644 --- a/client/src/output/video.rs +++ b/client/src/output/video.rs @@ -25,14 +25,14 @@ impl MonitorWindow { )?; // appsrc -> decode -> convert -> autovideosink - let desc = if std::env::var_os("XDG_RUNTIME_DIR").is_some() { - // graphical + let desc = if std::env::var_os("LESAVKA_HW_DEC").is_some() { + "appsrc name=src is-live=true format=time do-timestamp=true ! queue ! \ + h264parse ! vaapih264dec low-latency=true ! videoconvert ! \ + autovideosink sync=false" + } else { + // fallback "appsrc name=src is-live=true format=time do-timestamp=true ! \ queue ! h264parse ! decodebin ! videoconvert ! autovideosink sync=false" - } else { - // headless / debugging over ssh - "appsrc name=src is-live=true format=time do-timestamp=true ! \ - fakesink sync=false" }; let pipeline = gst::parse::launch(desc)? diff --git a/scripts/install-client.sh b/scripts/install-client.sh index 3df9053..a35f2ce 100755 --- a/scripts/install-client.sh +++ b/scripts/install-client.sh @@ -38,8 +38,8 @@ User=root Group=root Environment=RUST_LOG=debug -Environment=NAVKA_DEV_MODE=1 -Environment=NAVKA_SERVER_ADDR=http://64.25.10.31:50051 +Environment=LESAVKA_DEV_MODE=1 +Environment=LESAVKA_SERVER_ADDR=http://64.25.10.31:50051 ExecStart=/usr/local/bin/lesavka-client Restart=no diff --git a/server/src/main.rs b/server/src/main.rs index 4e4bd31..1468041 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -9,11 +9,10 @@ use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex}; use tokio_stream::{wrappers::ReceiverStream}; use tonic::{transport::Server, Request, Response, Status}; use anyhow::Context as _; -use tracing::{info, trace, warn, error}; +use tracing::{info, trace, error}; use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use tracing_appender::non_blocking; use tracing_appender::non_blocking::WorkerGuard; -use udev::{MonitorBuilder}; use lesavka_server::{usb_gadget::UsbGadget, video}; @@ -144,31 +143,48 @@ impl Relay for Handler { .map_err(|e| Status::internal(e.to_string()))?; tokio::time::sleep(Duration::from_millis(500)).await; } - let (tx, rx) = tokio::sync::mpsc::channel(32); + let (tx, _rx) = + tokio::sync::mpsc::channel::>(32); let kb = self.kb.clone(); tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { // kb.lock().await.write_all(&pkt.data).await?; - loop { - match kb.lock().await.write_all(&pkt.data).await { - Ok(()) => { - trace!("⌨️ wrote {}", pkt.data.iter() - .map(|b| format!("{b:02X}")).collect::>().join(" ")); + const SPINS: usize = 20; + for _ in 0..SPINS { + match kb.lock().await.write(&pkt.data).await { + // Ok(n) if n == pkt.data.len() => { + // trace!("⌨️ wrote {}", pkt.data.iter() + // .map(|b| format!("{b:02X}")).collect::>().join(" ")); + // break; + // }, + // Ok(_) | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // std::hint::spin_loop(); + // continue; // try again + // } + // Err(e) + // if matches!(e.raw_os_error(), + // Some(libc::EBUSY) | // still opening + // Some(libc::ENODEV) | // gadget not‑yet configured + // Some(libc::EPIPE) | // host vanished + // Some(libc::EINVAL) | // host hasn’t accepted EP config yet + // Some(libc::EAGAIN)) // non‑blocking + // => { + // tokio::time::sleep(Duration::from_millis(10)).await; + // continue; + // } + // Err(e) => return Err(Status::internal(e.to_string())), + Ok(n) if n == pkt.data.len() => { // success + trace!("⌨️ wrote {}", pkt.data.iter().map(|b| format!("{b:02X}")).collect::>().join(" ")); break; - }, - Err(e) - if matches!(e.raw_os_error(), - Some(libc::EBUSY) | // still opening - Some(libc::ENODEV) | // gadget not‑yet configured - Some(libc::EPIPE) | // host vanished - Some(libc::EINVAL) | // host hasn’t accepted EP config yet - Some(libc::EAGAIN)) // non‑blocking - => { - tokio::time::sleep(Duration::from_millis(10)).await; + } + Ok(_) => continue, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::hint::spin_loop(); continue; } + Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, Err(e) => return Err(Status::internal(e.to_string())), } } @@ -177,14 +193,17 @@ impl Relay for Handler { Ok::<(), Status>(()) }); - Ok(Response::new(ReceiverStream::new(rx))) + let (_noop_tx, empty_rx) = + tokio::sync::mpsc::channel::>(1); + Ok(Response::new(ReceiverStream::new(empty_rx))) } async fn stream_mouse( &self, req: Request>, ) -> Result, Status> { - let (tx, rx) = tokio::sync::mpsc::channel(4096); // higher burst + let (tx, _rx) = + tokio::sync::mpsc::channel::>(4096); let ms = self.ms.clone(); tokio::spawn(async move { @@ -192,23 +211,38 @@ impl Relay for Handler { let mut boot_mode = true; while let Some(pkt) = s.next().await.transpose()? { loop { - match ms.lock().await.write_all(&pkt.data).await { - Ok(()) => { - trace!("🖱️ wrote {}", pkt.data.iter() - .map(|b| format!("{b:02X}")).collect::>().join(" ")); + match ms.lock().await.write(&pkt.data).await { + // Ok(n) if n == pkt.data.len() => { + // trace!("🖱️ wrote {}", pkt.data.iter() + // .map(|b| format!("{b:02X}")).collect::>().join(" ")); + // break; + // } + // Ok(_) | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // std::hint::spin_loop(); + // continue; // try again + // } + // Err(e) + // if matches!(e.raw_os_error(), + // Some(libc::EBUSY) | // still opening + // Some(libc::ENODEV) | // gadget not‑yet configured + // Some(libc::EPIPE) | // host vanished + // Some(libc::EINVAL) | // host hasn’t accepted EP config yet + // Some(libc::EAGAIN)) // non‑blocking + // => { + // tokio::time::sleep(Duration::from_millis(10)).await; + // continue; + // } + // Err(e) => return Err(Status::internal(e.to_string())), + Ok(n) if n == pkt.data.len() => { // success + trace!("⌨️ wrote {}", pkt.data.iter().map(|b| format!("{b:02X}")).collect::>().join(" ")); break; } - Err(e) - if matches!(e.raw_os_error(), - Some(libc::EBUSY) | // still opening - Some(libc::ENODEV) | // gadget not‑yet configured - Some(libc::EPIPE) | // host vanished - Some(libc::EINVAL) | // host hasn’t accepted EP config yet - Some(libc::EAGAIN)) // non‑blocking - => { - tokio::time::sleep(Duration::from_millis(10)).await; + Ok(_) => continue, + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + std::hint::spin_loop(); continue; } + Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => break, Err(e) => return Err(Status::internal(e.to_string())), } } @@ -217,7 +251,9 @@ impl Relay for Handler { Ok::<(), Status>(()) }); - Ok(Response::new(ReceiverStream::new(rx))) + let (_noop_tx, empty_rx) = + tokio::sync::mpsc::channel::>(1); + Ok(Response::new(ReceiverStream::new(empty_rx))) } async fn capture_video( @@ -294,6 +330,7 @@ async fn main() -> anyhow::Result<()> { info!("🌐 lesavka‑server listening on 0.0.0.0:50051"); if let Err(e) = Server::builder() + .tcp_nodelay(true) .add_service(RelayServer::new(handler)) .serve(([0, 0, 0, 0], 50051).into()) .await