lesavka/client/src/app.rs
2025-06-25 12:31:48 -05:00

193 lines
7.4 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// client/src/app.rs
#![forbid(unsafe_code)]
use anyhow::Result;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tonic::Request;
use tracing::{debug, error, info, warn};
use winit::{
event_loop::EventLoopBuilder,
// platform::x11::EventLoopBuilderExtX11,
platform::wayland::EventLoopBuilderExtWayland,
event::Event,
};
use lesavka_common::lesavka::{relay_client::RelayClient, KeyboardReport, MouseReport, MonitorRequest, VideoPacket};
use crate::input::inputs::InputAggregator;
use crate::output::video::MonitorWindow;
pub struct LesavkaClientApp {
aggregator: Option<InputAggregator>,
server_addr: String,
dev_mode: bool,
kbd_tx: broadcast::Sender<KeyboardReport>,
mou_tx: broadcast::Sender<MouseReport>,
}
impl LesavkaClientApp {
pub fn new() -> Result<Self> {
let dev_mode = std::env::var("NAVKA_DEV_MODE").is_ok();
let server_addr = std::env::args()
.nth(1)
.or_else(|| std::env::var("NAVKA_SERVER_ADDR").ok())
.unwrap_or_else(|| "http://127.0.0.1:50051".into());
let (kbd_tx, _) = broadcast::channel::<KeyboardReport>(1024);
let (mou_tx, _) = broadcast::channel::<MouseReport>(4096);
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
agg.init()?; // grab devices
Ok(Self { aggregator: Some(agg),
server_addr, dev_mode,
kbd_tx, mou_tx })
}
pub async fn run(&mut self) -> Result<()> {
/* detach the aggregator before spawn so `self` is not moved */
let aggregator = self.aggregator.take().expect("InputAggregator present");
let agg_task = tokio::spawn(async move {
let mut agg = aggregator;
agg.run().await
});
/* two networking tasks */
let kbd_loop = self.stream_loop_keyboard();
let mou_loop = self.stream_loop_mouse();
/* optional suicide timer */
let suicide = async {
if self.dev_mode {
tokio::time::sleep(Duration::from_secs(30)).await;
warn!("devmode timeout");
// self.aggregator.keyboards.dev.ungrab();
// self.aggregator.mice.dev.ungrab();
std::process::exit(0);
} else { futures::future::pending::<()>().await }
};
/* video windows use a dedicated eventloop thread */
let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::<VideoPacket>();
// let (event_tx, event_rx) = std::sync::mpsc::channel();
let (_event_tx, _event_rx) = std::sync::mpsc::channel::<()>();
std::thread::spawn(move || {
let el = EventLoopBuilder::<()>::new()
.with_any_thread(true)
.build()
.unwrap();
let win0 = MonitorWindow::new(0, &el).expect("win0");
let win1 = MonitorWindow::new(1, &el).expect("win1");
el.run(move |_: Event<()>, _| {
while let Ok(pkt) = video_rx.try_recv() {
match pkt.id {
0 => win0.push_packet(pkt),
1 => win1.push_packet(pkt),
_ => {}
}
}
});
});
let vid_loop = Self::video_loop(self.server_addr.clone(), video_tx);
tokio::select! {
_ = kbd_loop => unreachable!(),
_ = mou_loop => unreachable!(),
_ = vid_loop => unreachable!(),
_ = suicide => unreachable!(),
// _ = suicide => { warn!("devmode timeout"); std::process::exit(0) },
r = agg_task => {
error!("aggregator task ended: {r:?}");
std::process::exit(1)
}
}
}
/*──────────────── keyboard stream ───────────────*/
async fn stream_loop_keyboard(&self) {
loop {
info!("⌨️ connect {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
};
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok());
let resp = match cli.stream_keyboard(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_keyboard: {e}"); Self::delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => debug!("kbd echo {}B", r.data.len()),
Err(e) => { error!("kbd inbound: {e}"); break }
}
}
warn!("⌨️ disconnected");
Self::delay().await;
}
}
/*──────────────── mouse stream ──────────────────*/
async fn stream_loop_mouse(&self) {
loop {
info!("🖱️ connect {}", self.server_addr);
let mut cli = match RelayClient::connect(self.server_addr.clone()).await {
Ok(c) => c,
Err(e) => { error!("connect: {e}"); Self::delay().await; continue }
};
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok());
let resp = match cli.stream_mouse(Request::new(outbound)).await {
Ok(r) => r, Err(e) => { error!("stream_mouse: {e}"); Self::delay().await; continue }
};
let mut inbound = resp.into_inner();
while let Some(m) = inbound.message().await.transpose() {
match m {
Ok(r) => debug!("mouse echo {}B", r.data.len()),
Err(e) => { error!("mouse inbound: {e}"); break }
}
}
warn!("🖱️ disconnected");
Self::delay().await;
}
}
/*──────────────── monitor stream ────────────────*/
async fn video_loop(addr: String, tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>) {
loop {
match RelayClient::connect(addr.clone()).await {
Ok(mut cli) => {
for monitor_id in 0..=1 {
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
while let Some(pkt) = stream.get_mut().message().await.transpose() {
match pkt {
Ok(p) => { let _ = tx.send(p); },
Err(e) => { error!("video {monitor_id}: {e}"); break }
}
}
}
Err(e) => error!("video {monitor_id}: {e}"),
}
}
}
Err(e) => error!("video connect: {e}"),
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
#[inline(always)]
async fn delay() { tokio::time::sleep(Duration::from_secs(1)).await; }
}