#![forbid(unsafe_code)] use anyhow::Result; use std::time::Duration; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::sync::broadcast; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use tonic::{transport::Channel, Request}; use tracing::{error, trace, debug, info, warn}; use winit::{ event::Event, event_loop::{EventLoopBuilder, ControlFlow}, platform::wayland::EventLoopBuilderExtWayland, }; use lesavka_common::lesavka::{ relay_client::RelayClient, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, AudioPacket }; use crate::{input::inputs::InputAggregator, input::microphone::MicrophoneCapture, input::camera::CameraCapture, output::video::MonitorWindow, output::audio::AudioOut}; pub struct LesavkaClientApp { aggregator: Option, server_addr: String, dev_mode: bool, kbd_tx: broadcast::Sender, mou_tx: broadcast::Sender, } impl LesavkaClientApp { pub fn new() -> Result { let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); let server_addr = std::env::args() .nth(1) .or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok()) .unwrap_or_else(|| "http://127.0.0.1:50051".into()); let (kbd_tx, _) = broadcast::channel(1024); let (mou_tx, _) = broadcast::channel(4096); let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); agg.init()?; // grab devices immediately let cam = if std::env::var("LESAVKA_CAM_DISABLE").is_ok() { None } else { Some(Arc::new(CameraCapture::new( std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() )?)) }; Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) } pub async fn run(&mut self) -> Result<()> { /*────────── persistent gRPC channels ──────────*/ let hid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) .concurrency_limit(4) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_lazy(); let vid_ep = Channel::from_shared(self.server_addr.clone())? .initial_connection_window_size(4<<20) .initial_stream_window_size(4<<20) .tcp_nodelay(true) .connect_lazy(); /*────────── input aggregator task ─────────────*/ let aggregator = self.aggregator.take().expect("InputAggregator present"); let agg_task = tokio::spawn(async move { let mut a = aggregator; a.run().await }); /*────────── HID streams (never return) ────────*/ let kbd_loop = self.stream_loop_keyboard(hid_ep.clone()); let mou_loop = self.stream_loop_mouse(hid_ep.clone()); /*───────── optional 300 s auto-exit in dev mode */ let suicide = async { if self.dev_mode { tokio::time::sleep(Duration::from_secs(300)).await; warn!("💀 dev-mode timeout"); std::process::exit(0); } else { std::future::pending::<()>().await } }; /*────────── video rendering thread (winit) ────*/ let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::(); std::thread::spawn(move || { gtk::init().expect("GTK initialisation failed"); let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap(); let win0 = MonitorWindow::new(0).expect("win0"); let win1 = MonitorWindow::new(1).expect("win1"); let _ = el.run(move |_: Event<()>, _elwt| { _elwt.set_control_flow(ControlFlow::WaitUntil( std::time::Instant::now() + std::time::Duration::from_millis(16))); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); static DUMP_CNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0); while let Ok(pkt) = video_rx.try_recv() { CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 { debug!("🎥 received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed)); } let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 120 == 0 { let eye = if pkt.id == 0 { "l" } else { "r" }; let path = format!("/tmp/eye{eye}-cli-{n:05}.h264"); std::fs::write(&path, &pkt.data).ok(); } match pkt.id { 0 => win0.push_packet(pkt), 1 => win1.push_packet(pkt), _ => {} } } }); }); /*────────── start video gRPC pullers ──────────*/ let ep_video = vid_ep.clone(); tokio::spawn(Self::video_loop(ep_video, video_tx)); /*────────── audio renderer & puller ───────────*/ let audio_out = AudioOut::new()?; let ep_audio = vid_ep.clone(); tokio::spawn(Self::audio_loop(ep_audio, audio_out)); /*────────── camera & mic tasks ───────────────*/ let cam = Arc::new(CameraCapture::new( std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() )?); tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); let mic = Arc::new(MicrophoneCapture::new()?); tokio::spawn(Self::mic_loop(vid_ep.clone(), mic)); /*────────── central reactor ───────────────────*/ tokio::select! { _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, _ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); }, _ = suicide => { /* handled above */ }, r = agg_task => { match r { Ok(Ok(())) => warn!("input aggregator terminated cleanly"), Ok(Err(e)) => error!("input aggregator error: {e:?}"), Err(join_err) => error!("aggregator task panicked: {join_err:?}"), } std::process::exit(1); } } // The branches above either loop forever or exit the process; this // point is unreachable but satisfies the type checker. #[allow(unreachable_code)] Ok(()) } /*──────────────── keyboard stream ───────────────*/ async fn stream_loop_keyboard(&self, ep: Channel) { loop { info!("⌨️🤙 Keyboard dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.kbd_tx.subscribe()) .filter_map(|r| r.ok()); match cli.stream_keyboard(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; } } } Err(e) => warn!("❌⌨️ connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; // retry } } /*──────────────── mouse stream ──────────────────*/ async fn stream_loop_mouse(&self, ep: Channel) { loop { info!("🖱️🤙 Mouse dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.mou_tx.subscribe()) .filter_map(|r| r.ok()); match cli.stream_mouse(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { if let Err(e) = msg { warn!("🖱️ server err: {e}"); break; } } } Err(e) => warn!("❌🖱️ connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } } /*──────────────── monitor stream ────────────────*/ async fn video_loop( ep: Channel, tx: tokio::sync::mpsc::UnboundedSender, ) { for monitor_id in 0..=1 { let ep = ep.clone(); let tx = tx.clone(); tokio::spawn(async move { loop { let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { debug!("🎥🏁 cli video{monitor_id}: stream opened"); while let Some(res) = stream.get_mut().message().await.transpose() { match res { Ok(pkt) => { trace!("🎥📥 cli video{monitor_id}: got {} bytes", pkt.data.len()); if tx.send(pkt).is_err() { warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone"); break; } } Err(e) => { error!("❌🎥 cli video{monitor_id}: gRPC error: {e}"); break; } } } warn!("⚠️🎥 cli video{monitor_id}: stream ended"); } Err(e) => error!("❌🎥 video {monitor_id}: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } }); } } /*──────────────── audio stream ───────────────*/ async fn audio_loop(ep: Channel, out: AudioOut) { loop { let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: 0, max_bitrate: 0 }; match cli.capture_audio(Request::new(req)).await { Ok(mut stream) => { while let Some(res) = stream.get_mut().message().await.transpose() { if let Ok(pkt) = res { out.push(pkt); } } } Err(e) => tracing::warn!("❌🔊 audio stream err: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } } /*──────────────── mic stream ─────────────────*/ async fn mic_loop(ep: Channel, mic: Arc) { static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let mut cli = RelayClient::new(ep.clone()); // 1. create a Tokio MPSC channel let (tx, rx) = tokio::sync::mpsc::channel::(256); let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); // 2. spawn a real thread that does the blocking `pull()` let mic_clone = mic.clone(); std::thread::spawn(move || { while stop_rx.try_recv().is_err() { if let Some(pkt) = mic_clone.pull() { trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len()); let _ = tx.blocking_send(pkt); } } }); // 3. turn `rx` into an async stream for gRPC let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); match cli.stream_microphone(Request::new(outbound)).await { Ok(mut resp) => { while resp.get_mut().message().await.transpose().is_some() {} } Err(e) => { // first failure → warn, subsequent ones → debug if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎤 connect failed: {e}"); warn!("⚠️🎤 further microphone‑stream failures will be logged at DEBUG"); } else { debug!("❌🎤 reconnect failed: {e}"); } } } let _ = stop_tx.send(()); tokio::time::sleep(Duration::from_secs(1)).await; } } /*──────────────── cam stream ───────────────────*/ async fn cam_loop(ep: Channel, cam: Arc) { loop { let mut cli = RelayClient::new(ep.clone()); let (tx, rx) = tokio::sync::mpsc::channel::(256); std::thread::spawn({ let cam = cam.clone(); move || { while let Some(pkt) = cam.pull() { // TRACE every 120 frames only static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 120 == 0 { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } let _ = tx.blocking_send(pkt); } } }); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); if let Err(e) = cli.stream_camera(Request::new(outbound)).await { tracing::warn!("❌📸 connect failed: {e}"); } tokio::time::sleep(Duration::from_secs(1)).await; } } }