#![forbid(unsafe_code)] use anyhow::Result; use std::time::Duration; use tokio::sync::broadcast; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tonic::{transport::Channel, Request}; use tracing::{error, trace, debug, info, warn}; use winit::{ event::Event, event_loop::{EventLoopBuilder, ControlFlow}, platform::wayland::EventLoopBuilderExtWayland, }; use lesavka_common::lesavka::{ relay_client::RelayClient, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, }; use crate::{input::inputs::InputAggregator, input::microphone::MicrophoneCapture, output::video::MonitorWindow, output::audio::AudioOut}; pub struct LesavkaClientApp { aggregator: Option, server_addr: String, dev_mode: bool, kbd_tx: broadcast::Sender, mou_tx: broadcast::Sender, } impl LesavkaClientApp { pub fn new() -> Result { let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); let server_addr = std::env::args() .nth(1) .or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".into()); let (kbd_tx, _) = broadcast::channel(1024); let (mou_tx, _) = broadcast::channel(4096); let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); agg.init()?; // grab devices immediately Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) } pub async fn run(&mut self) -> Result<()> { /*────────── persistent gRPC channels ──────────*/ let hid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) .concurrency_limit(1) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_lazy(); let vid_ep = Channel::from_shared(self.server_addr.clone())? .initial_connection_window_size(4<<20) .initial_stream_window_size(4<<20) .tcp_nodelay(true) .connect_lazy(); /*────────── input aggregator task ─────────────*/ let aggregator = self.aggregator.take().expect("InputAggregator present"); let agg_task = tokio::spawn(async move { let mut a = aggregator; a.run().await }); /*────────── HID streams (never return) ────────*/ let kbd_loop = self.stream_loop_keyboard(hid_ep.clone()); let mou_loop = self.stream_loop_mouse(hid_ep.clone()); /*───────── optional 300 s auto-exit in dev mode */ let suicide = async { if self.dev_mode { tokio::time::sleep(Duration::from_secs(300)).await; warn!("💀 dev-mode timeout"); std::process::exit(0); } else { std::future::pending::<()>().await } }; /*────────── video rendering thread (winit) ────*/ let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::(); std::thread::spawn(move || { gtk::init().expect("GTK initialisation failed"); let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap(); let win0 = MonitorWindow::new(0).expect("win0"); let win1 = MonitorWindow::new(1).expect("win1"); let _ = el.run(move |_: Event<()>, _elwt| { _elwt.set_control_flow(ControlFlow::WaitUntil( std::time::Instant::now() + std::time::Duration::from_millis(16))); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); static DUMP_CNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0); while let Ok(pkt) = video_rx.try_recv() { CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 { debug!("🎥 received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed)); } let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 120 == 0 { let eye = if pkt.id == 0 { "l" } else { "r" }; let path = format!("/tmp/eye{eye}-cli-{n:05}.h264"); std::fs::write(&path, &pkt.data).ok(); } match pkt.id { 0 => win0.push_packet(pkt), 1 => win1.push_packet(pkt), _ => {} } } }); }); /*────────── start video gRPC pullers ──────────*/ let ep_video = vid_ep.clone(); tokio::spawn(Self::video_loop(ep_video, video_tx)); /*────────── audio renderer & puller ───────────*/ let audio_out = AudioOut::new()?; let ep_audio = vid_ep.clone(); tokio::spawn(Self::audio_loop(ep_audio, audio_out)); /*────────── microphone gRPC pusher ───────────*/ let mic = Arc::new(MicrophoneCapture::new()?); let ep_mic = vid_ep.clone(); tokio::spawn(Self::mic_loop(ep_mic, mic)); /*────────── central reactor ───────────────────*/ tokio::select! { _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, _ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); }, _ = suicide => { /* handled above */ }, r = agg_task => { match r { Ok(Ok(())) => warn!("input aggregator terminated cleanly"), Ok(Err(e)) => error!("input aggregator error: {e:?}"), Err(join_err) => error!("aggregator task panicked: {join_err:?}"), } std::process::exit(1); } } // The branches above either loop forever or exit the process; this // point is unreachable but satisfies the type checker. #[allow(unreachable_code)] Ok(()) } /*──────────────── keyboard stream ───────────────*/ async fn stream_loop_keyboard(&self, ep: Channel) { loop { info!("⌨️🤙 Keyboard dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.kbd_tx.subscribe()) .filter_map(|r| r.ok()); match cli.stream_keyboard(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; } } } Err(e) => warn!("⌨️ connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; // retry } } /*──────────────── mouse stream ──────────────────*/ async fn stream_loop_mouse(&self, ep: Channel) { loop { info!("🖱️🤙 Mouse dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.mou_tx.subscribe()) .filter_map(|r| r.ok()); match cli.stream_mouse(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { if let Err(e) = msg { warn!("🖱️ server err: {e}"); break; } } } Err(e) => warn!("🖱️ connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } } /*──────────────── monitor stream ────────────────*/ async fn video_loop( ep: Channel, tx: tokio::sync::mpsc::UnboundedSender, ) { for monitor_id in 0..=1 { let ep = ep.clone(); let tx = tx.clone(); tokio::spawn(async move { loop { let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { debug!("🎥 cli video{monitor_id}: stream opened"); while let Some(res) = stream.get_mut().message().await.transpose() { match res { Ok(pkt) => { trace!("🎥 cli video{monitor_id}: got {} bytes", pkt.data.len()); if tx.send(pkt).is_err() { warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone"); break; } } Err(e) => { error!("❌🎥 cli video{monitor_id}: gRPC error: {e}"); break; } } } warn!("⚠️🎥 cli video{monitor_id}: stream ended"); } Err(e) => error!("❌🎥 video {monitor_id}: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } }); } } /*──────────────── audio stream ───────────────*/ async fn audio_loop(ep: Channel, out: AudioOut) { loop { let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: 0, max_bitrate: 0 }; match cli.capture_audio(Request::new(req)).await { Ok(mut stream) => { while let Some(res) = stream.get_mut().message().await.transpose() { if let Ok(pkt) = res { out.push(pkt); } } } Err(e) => tracing::warn!("audio stream err: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } } /*──────────────── mic stream ─────────────────*/ async fn mic_loop(ep: Channel, mic: Arc) { loop { let mut cli = RelayClient::new(ep.clone()); let mic_clone = mic.clone(); let stream = async_stream::stream! { loop { if let Some(pkt) = mic_clone.pull() { yield pkt; } else { break; // EOS – should not happen } } }; match cli.stream_microphone(Request::new(stream)).await { Ok(mut resp) => { while resp.get_mut().message().await?.is_some() {} } Err(e) => warn!("🎤 connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } } }