This commit is contained in:
Brad Stein 2025-06-26 14:05:23 -05:00
parent dd9df940db
commit 1110c59f1e
7 changed files with 147 additions and 97 deletions

View File

@ -6,7 +6,7 @@ use anyhow::Result;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tonic::Request;
use tonic::{Request, transport::Channel};
use tracing::{debug, error, info, warn, trace};
use winit::{
event_loop::EventLoopBuilder,
@ -29,10 +29,10 @@ pub struct LesavkaClientApp {
impl LesavkaClientApp {
pub fn new() -> Result<Self> {
let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok();
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let server_addr = std::env::args()
.nth(1)
.or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok())
.or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok())
.unwrap_or_else(|| "http://127.0.0.1:50051".into());
let (kbd_tx, _) = broadcast::channel::<KeyboardReport>(1024);
@ -47,6 +47,19 @@ impl LesavkaClientApp {
}
pub async fn run(&mut self) -> Result<()> {
// ---- build two channels ------------------------------------------------
let hid_ep: Channel = Channel::from_shared(self.server_addr.clone())
.unwrap()
.tcp_nodelay(true)
.concurrency_limit(1)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_lazy();
let vid_ep: Channel = Channel::from_shared(self.server_addr.clone())
.unwrap()
.tcp_nodelay(true)
.connect_lazy();
/* detach the aggregator before spawn so `self` is not moved */
let aggregator = self.aggregator.take().expect("InputAggregator present");
let agg_task = tokio::spawn(async move {
@ -55,8 +68,8 @@ impl LesavkaClientApp {
});
/* two networking tasks */
let kbd_loop = self.stream_loop_keyboard();
let mou_loop = self.stream_loop_mouse();
let kbd_loop = self.stream_loop_keyboard(hid_ep.clone());
let mou_loop = self.stream_loop_mouse(hid_ep.clone());
/* optional suicide timer */
let suicide = async {
@ -93,7 +106,7 @@ impl LesavkaClientApp {
});
});
let vid_loop = Self::video_loop(self.server_addr.clone(), video_tx);
let vid_loop = Self::video_loop(vid_ep.clone(), video_tx);
tokio::select! {
_ = kbd_loop => unreachable!(),
@ -109,82 +122,79 @@ impl LesavkaClientApp {
}
/*──────────────── keyboard stream ───────────────*/
async fn stream_loop_keyboard(&self) {
async fn stream_loop_keyboard(&self, ep: Channel) {
loop {
info!("⌨️ connect {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
};
// let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
// Ok(c) => c,
// Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
// };
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok());
let resp = match cli.stream_keyboard(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); Self::delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => trace!("kbd echo {}B", r.data.len()),
Err(e) => { error!("kbd inbound: {e}"); break }
}
}
// let mut inbound = resp.into_inner();
// while let Some(m) = inbound.message().await.transpose() {
// match m {
// Ok(r) => trace!("kbd echo {}B", r.data.len()),
// Err(e) => { error!("kbd inbound: {e}"); break }
// }
// }
drop(resp);
warn!("⌨️ disconnected");
Self::delay().await;
}
}
/*──────────────── mouse stream ──────────────────*/
async fn stream_loop_mouse(&self) {
async fn stream_loop_mouse(&self, ep: Channel) {
loop {
info!("🖱️ connect {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
};
// let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
// Ok(c) => c,
// Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
// };
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok());
let resp = match cli.stream_mouse(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); Self::delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => trace!("mouse echo {}B", r.data.len()),
Err(e) => { error!("mouse inbound: {e}"); break }
}
}
// let mut inbound = resp.into_inner();
// while let Some(m) = inbound.message().await.transpose() {
// match m {
// Ok(r) => trace!("mouse echo {}B", r.data.len()),
// Err(e) => { error!("mouse inbound: {e}"); break }
// }
// }
drop(resp);
warn!("🖱️ disconnected");
Self::delay().await;
}
}
/*──────────────── monitor stream ────────────────*/
async fn video_loop(addr: String, tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>) {
async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>) {
loop {
match RelayClient::connect(addr.clone()).await {
Ok(mut cli) => {
for monitor_id in 0..=1 {
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
while let Some(pkt) = stream.get_mut().message().await.transpose() {
match pkt {
Ok(p) => { let _ = tx.send(p); },
Err(e) => { error!("video {monitor_id}: {e}"); break }
}
}
let mut cli = RelayClient::new(ep.clone());
for monitor_id in 0..=1 {
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
if let Ok(mut stream) = cli.capture_video(Request::new(req)).await {
while let Some(pkt) = stream.get_mut().message().await.transpose() {
match pkt {
Ok(p) => { let _ = tx.send(p); }
Err(e) => { error!("video {monitor_id}: {e}"); break }
}
Err(e) => error!("video {monitor_id}: {e}"),
}
}
}
Err(e) => error!("video connect: {e}"),
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
#[inline(always)]
async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; }

View File

@ -3,7 +3,7 @@
use std::{collections::HashSet, sync::atomic::{AtomicU32, Ordering}};
use evdev::{Device, EventType, InputEvent, KeyCode};
use tokio::sync::broadcast::Sender;
use tracing::{debug, error, warn};
use tracing::{debug, error, warn, trace};
use lesavka_common::lesavka::KeyboardReport;

View File

@ -2,14 +2,18 @@
use evdev::{Device, EventType, InputEvent, KeyCode, RelativeAxisCode};
use tokio::sync::broadcast::{self, Sender};
use tracing::{debug, error, warn};
use std::time::{Duration, Instant};
use tracing::{debug, error, warn, trace};
use lesavka_common::lesavka::MouseReport;
const SEND_INTERVAL: Duration = Duration::from_micros(2000);
pub struct MouseAggregator {
dev: Device,
tx: Sender<MouseReport>,
dev_mode: bool,
next_send: Instant,
buttons: u8,
last_buttons: u8,
@ -20,7 +24,7 @@ pub struct MouseAggregator {
impl MouseAggregator {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<MouseReport>) -> Self {
Self { dev, tx, dev_mode, buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 }
Self { dev, tx, dev_mode, next_send: Instant::now(), buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 }
}
#[inline] fn slog(&self, f: impl FnOnce()) { if self.dev_mode { f() } }
@ -60,9 +64,8 @@ impl MouseAggregator {
}
fn flush(&mut self) {
if self.dx==0 && self.dy==0 && self.wheel==0 && self.buttons==self.last_buttons {
return;
}
if Instant::now() < self.next_send { return; }
self.next_send += SEND_INTERVAL;
let pkt = [
self.buttons,

View File

@ -21,7 +21,7 @@ fn ensure_runtime_dir() {
}
}
#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
ensure_runtime_dir();
@ -41,7 +41,7 @@ async fn main() -> Result<()> {
.with_thread_ids(true)
.with_file(true);
let dev_mode = env::var("NAVKA_DEV_MODE").is_ok();
let dev_mode = env::var("LESAVKA_DEV_MODE").is_ok();
let mut _guard: Option<WorkerGuard> = None; // keep guard alive
/*------------- subscriber setup -----------------------------------*/

View File

@ -25,14 +25,14 @@ impl MonitorWindow {
)?;
// appsrc -> decode -> convert -> autovideosink
let desc = if std::env::var_os("XDG_RUNTIME_DIR").is_some() {
// graphical
let desc = if std::env::var_os("LESAVKA_HW_DEC").is_some() {
"appsrc name=src is-live=true format=time do-timestamp=true ! queue ! \
h264parse ! vaapih264dec low-latency=true ! videoconvert ! \
autovideosink sync=false"
} else {
// fallback
"appsrc name=src is-live=true format=time do-timestamp=true ! \
queue ! h264parse ! decodebin ! videoconvert ! autovideosink sync=false"
} else {
// headless / debugging over ssh
"appsrc name=src is-live=true format=time do-timestamp=true ! \
fakesink sync=false"
};
let pipeline = gst::parse::launch(desc)?

View File

@ -38,8 +38,8 @@ User=root
Group=root
Environment=RUST_LOG=debug
Environment=NAVKA_DEV_MODE=1
Environment=NAVKA_SERVER_ADDR=http://64.25.10.31:50051
Environment=LESAVKA_DEV_MODE=1
Environment=LESAVKA_SERVER_ADDR=http://64.25.10.31:50051
ExecStart=/usr/local/bin/lesavka-client
Restart=no

View File

@ -9,11 +9,10 @@ use tokio::{fs::{OpenOptions}, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::{wrappers::ReceiverStream};
use tonic::{transport::Server, Request, Response, Status};
use anyhow::Context as _;
use tracing::{info, trace, warn, error};
use tracing::{info, trace, error};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tracing_appender::non_blocking;
use tracing_appender::non_blocking::WorkerGuard;
use udev::{MonitorBuilder};
use lesavka_server::{usb_gadget::UsbGadget, video};
@ -144,31 +143,48 @@ impl Relay for Handler {
.map_err(|e| Status::internal(e.to_string()))?;
tokio::time::sleep(Duration::from_millis(500)).await;
}
let (tx, rx) = tokio::sync::mpsc::channel(32);
let (tx, _rx) =
tokio::sync::mpsc::channel::<Result<KeyboardReport, Status>>(32);
let kb = self.kb.clone();
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
// kb.lock().await.write_all(&pkt.data).await?;
loop {
match kb.lock().await.write_all(&pkt.data).await {
Ok(()) => {
trace!("⌨️ wrote {}", pkt.data.iter()
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
const SPINS: usize = 20;
for _ in 0..SPINS {
match kb.lock().await.write(&pkt.data).await {
// Ok(n) if n == pkt.data.len() => {
// trace!("⌨️ wrote {}", pkt.data.iter()
// .map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
// break;
// },
// Ok(_) | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// std::hint::spin_loop();
// continue; // try again
// }
// Err(e)
// if matches!(e.raw_os_error(),
// Some(libc::EBUSY) | // still opening
// Some(libc::ENODEV) | // gadget notyet configured
// Some(libc::EPIPE) | // host vanished
// Some(libc::EINVAL) | // host hasnt accepted EP config yet
// Some(libc::EAGAIN)) // nonblocking
// => {
// tokio::time::sleep(Duration::from_millis(10)).await;
// continue;
// }
// Err(e) => return Err(Status::internal(e.to_string())),
Ok(n) if n == pkt.data.len() => { // success
trace!("⌨️ wrote {}", pkt.data.iter().map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
},
Err(e)
if matches!(e.raw_os_error(),
Some(libc::EBUSY) | // still opening
Some(libc::ENODEV) | // gadget notyet configured
Some(libc::EPIPE) | // host vanished
Some(libc::EINVAL) | // host hasnt accepted EP config yet
Some(libc::EAGAIN)) // nonblocking
=> {
tokio::time::sleep(Duration::from_millis(10)).await;
}
Ok(_) => continue,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::hint::spin_loop();
continue;
}
Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => break,
Err(e) => return Err(Status::internal(e.to_string())),
}
}
@ -177,14 +193,17 @@ impl Relay for Handler {
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
let (_noop_tx, empty_rx) =
tokio::sync::mpsc::channel::<Result<KeyboardReport, Status>>(1);
Ok(Response::new(ReceiverStream::new(empty_rx)))
}
async fn stream_mouse(
&self,
req: Request<tonic::Streaming<MouseReport>>,
) -> Result<Response<Self::StreamMouseStream>, Status> {
let (tx, rx) = tokio::sync::mpsc::channel(4096); // higher burst
let (tx, _rx) =
tokio::sync::mpsc::channel::<Result<MouseReport, Status>>(4096);
let ms = self.ms.clone();
tokio::spawn(async move {
@ -192,23 +211,38 @@ impl Relay for Handler {
let mut boot_mode = true;
while let Some(pkt) = s.next().await.transpose()? {
loop {
match ms.lock().await.write_all(&pkt.data).await {
Ok(()) => {
trace!("🖱️ wrote {}", pkt.data.iter()
.map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
match ms.lock().await.write(&pkt.data).await {
// Ok(n) if n == pkt.data.len() => {
// trace!("🖱️ wrote {}", pkt.data.iter()
// .map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
// break;
// }
// Ok(_) | Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// std::hint::spin_loop();
// continue; // try again
// }
// Err(e)
// if matches!(e.raw_os_error(),
// Some(libc::EBUSY) | // still opening
// Some(libc::ENODEV) | // gadget notyet configured
// Some(libc::EPIPE) | // host vanished
// Some(libc::EINVAL) | // host hasnt accepted EP config yet
// Some(libc::EAGAIN)) // nonblocking
// => {
// tokio::time::sleep(Duration::from_millis(10)).await;
// continue;
// }
// Err(e) => return Err(Status::internal(e.to_string())),
Ok(n) if n == pkt.data.len() => { // success
trace!("⌨️ wrote {}", pkt.data.iter().map(|b| format!("{b:02X}")).collect::<Vec<_>>().join(" "));
break;
}
Err(e)
if matches!(e.raw_os_error(),
Some(libc::EBUSY) | // still opening
Some(libc::ENODEV) | // gadget notyet configured
Some(libc::EPIPE) | // host vanished
Some(libc::EINVAL) | // host hasnt accepted EP config yet
Some(libc::EAGAIN)) // nonblocking
=> {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(_) => continue,
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::hint::spin_loop();
continue;
}
Err(ref e) if e.kind() == std::io::ErrorKind::BrokenPipe => break,
Err(e) => return Err(Status::internal(e.to_string())),
}
}
@ -217,7 +251,9 @@ impl Relay for Handler {
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
let (_noop_tx, empty_rx) =
tokio::sync::mpsc::channel::<Result<MouseReport, Status>>(1);
Ok(Response::new(ReceiverStream::new(empty_rx)))
}
async fn capture_video(
@ -294,6 +330,7 @@ async fn main() -> anyhow::Result<()> {
info!("🌐 lesavkaserver listening on 0.0.0.0:50051");
if let Err(e) = Server::builder()
.tcp_nodelay(true)
.add_service(RelayServer::new(handler))
.serve(([0, 0, 0, 0], 50051).into())
.await