lesavka/client/src/app.rs

224 lines
8.6 KiB
Rust
Raw Normal View History

2025-06-08 22:24:14 -05:00
// client/src/app.rs
2025-06-17 20:54:31 -05:00
2025-06-15 20:19:27 -05:00
#![forbid(unsafe_code)]
2025-06-17 20:54:31 -05:00
2025-06-11 00:37:01 -05:00
use anyhow::Result;
use std::time::Duration;
2025-06-24 23:48:06 -05:00
use tokio::sync::broadcast;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
2025-06-26 14:05:23 -05:00
use tonic::{Request, transport::Channel};
2025-06-26 04:32:08 -05:00
use tracing::{debug, error, info, warn, trace};
2025-06-24 23:48:06 -05:00
use winit::{
event_loop::EventLoopBuilder,
2025-06-25 09:21:39 -05:00
platform::wayland::EventLoopBuilderExtWayland,
2025-06-24 23:48:06 -05:00
event::Event,
};
2025-06-15 20:19:27 -05:00
2025-06-23 07:18:26 -05:00
use lesavka_common::lesavka::{relay_client::RelayClient, KeyboardReport, MouseReport, MonitorRequest, VideoPacket};
2025-06-17 08:17:23 -05:00
2025-06-24 23:48:06 -05:00
use crate::input::inputs::InputAggregator;
use crate::output::video::MonitorWindow;
2025-06-08 04:11:58 -05:00
2025-06-23 07:18:26 -05:00
pub struct LesavkaClientApp {
2025-06-11 00:37:01 -05:00
aggregator: Option<InputAggregator>,
2025-06-08 04:11:58 -05:00
server_addr: String,
2025-06-08 13:11:31 -05:00
dev_mode: bool,
2025-06-17 08:17:23 -05:00
kbd_tx: broadcast::Sender<KeyboardReport>,
mou_tx: broadcast::Sender<MouseReport>,
2025-06-08 04:11:58 -05:00
}
2025-06-23 07:18:26 -05:00
impl LesavkaClientApp {
2025-06-08 04:11:58 -05:00
pub fn new() -> Result<Self> {
2025-06-26 14:05:23 -05:00
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
2025-06-17 20:54:31 -05:00
let server_addr = std::env::args()
2025-06-08 04:11:58 -05:00
.nth(1)
2025-06-26 14:05:23 -05:00
.or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok())
2025-06-17 20:54:31 -05:00
.unwrap_or_else(|| "http://127.0.0.1:50051".into());
2025-06-15 20:19:27 -05:00
2025-06-17 08:17:23 -05:00
let (kbd_tx, _) = broadcast::channel::<KeyboardReport>(1024);
let (mou_tx, _) = broadcast::channel::<MouseReport>(4096);
2025-06-17 20:54:31 -05:00
2025-06-17 08:17:23 -05:00
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
2025-06-17 20:54:31 -05:00
agg.init()?; // grab devices
Ok(Self { aggregator: Some(agg),
server_addr, dev_mode,
kbd_tx, mou_tx })
2025-06-08 04:11:58 -05:00
}
pub async fn run(&mut self) -> Result<()> {
2025-06-26 14:05:23 -05:00
// ---- build two channels ------------------------------------------------
let hid_ep: Channel = Channel::from_shared(self.server_addr.clone())
.unwrap()
.tcp_nodelay(true)
.concurrency_limit(1)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_lazy();
let vid_ep: Channel = Channel::from_shared(self.server_addr.clone())
.unwrap()
.tcp_nodelay(true)
.connect_lazy();
2025-06-17 20:54:31 -05:00
/* detach the aggregator before spawn so `self` is not moved */
let aggregator = self.aggregator.take().expect("InputAggregator present");
2025-06-17 22:02:33 -05:00
let agg_task = tokio::spawn(async move {
let mut agg = aggregator;
agg.run().await
});
2025-06-08 04:11:58 -05:00
2025-06-17 20:54:31 -05:00
/* two networking tasks */
2025-06-26 14:05:23 -05:00
let kbd_loop = self.stream_loop_keyboard(hid_ep.clone());
let mou_loop = self.stream_loop_mouse(hid_ep.clone());
2025-06-17 08:17:23 -05:00
2025-06-17 20:54:31 -05:00
/* optional suicide timer */
let suicide = async {
2025-06-08 18:11:44 -05:00
if self.dev_mode {
2025-06-15 20:19:27 -05:00
tokio::time::sleep(Duration::from_secs(30)).await;
2025-06-26 16:17:31 -05:00
warn!("💀 devmode timeout 💀");
2025-06-24 23:48:06 -05:00
// self.aggregator.keyboards.dev.ungrab();
// self.aggregator.mice.dev.ungrab();
2025-06-17 20:54:31 -05:00
std::process::exit(0);
} else { futures::future::pending::<()>().await }
2025-06-08 13:35:23 -05:00
};
2025-06-08 13:11:31 -05:00
2025-06-21 05:21:57 -05:00
/* video windows use a dedicated eventloop thread */
let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::<VideoPacket>();
2025-06-24 23:48:06 -05:00
// let (event_tx, event_rx) = std::sync::mpsc::channel();
let (_event_tx, _event_rx) = std::sync::mpsc::channel::<()>();
2025-06-21 05:21:57 -05:00
std::thread::spawn(move || {
2025-06-25 07:46:50 -05:00
let el = EventLoopBuilder::<()>::new()
2025-06-24 23:48:06 -05:00
.with_any_thread(true)
.build()
.unwrap();
2025-06-21 05:21:57 -05:00
let win0 = MonitorWindow::new(0, &el).expect("win0");
let win1 = MonitorWindow::new(1, &el).expect("win1");
2025-06-24 23:48:06 -05:00
2025-06-25 16:23:50 -05:00
let _ = el.run(move |_: Event<()>, _| {
2025-06-21 05:21:57 -05:00
while let Ok(pkt) = video_rx.try_recv() {
match pkt.id {
0 => win0.push_packet(pkt),
1 => win1.push_packet(pkt),
_ => {}
}
}
});
});
2025-06-26 14:05:23 -05:00
let vid_loop = Self::video_loop(vid_ep.clone(), video_tx);
2025-06-21 05:21:57 -05:00
2025-06-08 13:35:23 -05:00
tokio::select! {
2025-06-17 20:54:31 -05:00
_ = kbd_loop => unreachable!(),
_ = mou_loop => unreachable!(),
2025-06-21 05:21:57 -05:00
_ = vid_loop => unreachable!(),
2025-06-17 20:54:31 -05:00
_ = suicide => unreachable!(),
// _ = suicide => { warn!("devmode timeout"); std::process::exit(0) },
r = agg_task => {
error!("aggregator task ended: {r:?}");
std::process::exit(1)
2025-06-08 18:11:44 -05:00
}
2025-06-08 04:11:58 -05:00
}
2025-06-08 18:11:44 -05:00
}
2025-06-17 20:54:31 -05:00
/*──────────────── keyboard stream ───────────────*/
2025-06-26 14:05:23 -05:00
async fn stream_loop_keyboard(&self, ep: Channel) {
2025-06-08 18:11:44 -05:00
loop {
2025-06-17 20:54:31 -05:00
info!("⌨️ connect {}", self.server_addr);
2025-06-26 14:05:23 -05:00
// let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
// Ok(c) => c,
// Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
// };
let mut cli = RelayClient::new(ep.clone());
2025-06-08 18:11:44 -05:00
2025-06-17 20:54:31 -05:00
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok());
2025-06-26 15:12:23 -05:00
match cli.stream_keyboard(Request::new(outbound)).await {
Ok(mut resp) => {
// spawn a task just to drain echoes (keeps h2 window happy)
tokio::spawn(async move {
while let Some(_)= resp.get_mut().message().await.transpose() {}
warn!("⌨️ server closed stream");
});
}
Err(e) => {
error!("stream_keyboard: {e}");
Self::delay().await;
continue;
}
}
// from now on we just park connection persists until it errors
futures::future::pending::<()>().await;
// drop(resp);
// warn!("⌨️ disconnected");
// Self::delay().await;
2025-06-17 08:17:23 -05:00
}
}
2025-06-17 20:54:31 -05:00
/*──────────────── mouse stream ──────────────────*/
2025-06-26 14:05:23 -05:00
async fn stream_loop_mouse(&self, ep: Channel) {
2025-06-17 08:17:23 -05:00
loop {
2025-06-17 20:54:31 -05:00
info!("🖱️ connect {}", self.server_addr);
2025-06-26 14:05:23 -05:00
// let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
// Ok(c) => c,
// Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
// };
let mut cli = RelayClient::new(ep.clone());
2025-06-08 18:11:44 -05:00
2025-06-17 20:54:31 -05:00
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok());
2025-06-26 15:12:23 -05:00
match cli.stream_mouse(Request::new(outbound)).await {
Ok(mut resp) => {
// spawn a task just to drain echoes (keeps h2 window happy)
tokio::spawn(async move {
while let Some(_)= resp.get_mut().message().await.transpose() {}
warn!("⌨️ server closed stream");
});
}
Err(e) => {
error!("stream_mouse: {e}");
Self::delay().await;
continue;
}
}
// from now on we just park connection persists until it errors
futures::future::pending::<()>().await;
// drop(resp);
// warn!("🖱️ disconnected");
// Self::delay().await;
2025-06-08 18:11:44 -05:00
}
}
2025-06-17 20:54:31 -05:00
2025-06-21 05:21:57 -05:00
/*──────────────── monitor stream ────────────────*/
2025-06-26 16:17:31 -05:00
async fn video_loop(
ep: Channel,
tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>,
) {
for monitor_id in 0..=1 {
let tx = tx.clone();
let ep = ep.clone();
tokio::spawn(async move {
loop {
let mut cli = RelayClient::new(ep.clone());
2025-06-26 14:05:23 -05:00
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
2025-06-26 16:17:31 -05:00
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
while let Some(pkt) = stream.get_mut().message().await.transpose() {
match pkt {
Ok(p) => { let _ = tx.send(p); }
Err(e) => { error!("video {monitor_id}: {e}"); break }
}
2025-06-21 05:21:57 -05:00
}
}
2025-06-26 16:17:31 -05:00
Err(e) => error!("video {monitor_id}: {e}"),
2025-06-21 05:21:57 -05:00
}
2025-06-26 16:17:31 -05:00
tokio::time::sleep(Duration::from_secs(1)).await;
2025-06-21 05:21:57 -05:00
}
2025-06-26 16:17:31 -05:00
});
2025-06-21 05:21:57 -05:00
}
2025-06-26 14:05:23 -05:00
}
2025-06-26 16:17:31 -05:00
2025-06-21 05:21:57 -05:00
2025-06-17 20:54:31 -05:00
#[inline(always)]
async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; }
2025-06-08 04:11:58 -05:00
}