#![cfg_attr(coverage, allow(unused_imports))] use anyhow::Result; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use tokio::sync::{broadcast, mpsc}; use tokio_stream::{StreamExt, wrappers::BroadcastStream}; use tonic::{Request, transport::Channel}; use tracing::{debug, error, info, trace, warn}; use winit::{ event::Event, event_loop::{ControlFlow, EventLoopBuilder}, platform::wayland::EventLoopBuilderExtWayland, }; use lesavka_common::lesavka::{ AudioPacket, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, relay_client::RelayClient, }; #[cfg(not(coverage))] use crate::output::video::{MonitorWindow, UnifiedMonitorWindow}; use crate::{ app_support, handshake, input::camera::CameraCapture, input::inputs::InputAggregator, input::microphone::MicrophoneCapture, output::audio::AudioOut, paste, }; pub struct LesavkaClientApp { aggregator: Option, server_addr: String, dev_mode: bool, headless: bool, kbd_tx: broadcast::Sender, mou_tx: broadcast::Sender, paste_rx: Option>, } impl LesavkaClientApp { pub fn new() -> Result { let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); let headless = std::env::var("LESAVKA_HEADLESS").is_ok(); let capture_remote_boot = std::env::var("LESAVKA_CAPTURE_REMOTE") .map(|value| value != "0") .unwrap_or(true); let args = std::env::args().skip(1).collect::>(); let env_addr = std::env::var("LESAVKA_SERVER_ADDR").ok(); let server_addr = app_support::resolve_server_addr(&args, env_addr.as_deref()); let (kbd_tx, _) = broadcast::channel(1024); let (mou_tx, _) = broadcast::channel(4096); let (paste_tx, paste_rx) = mpsc::unbounded_channel(); let agg = if headless { None } else { Some(InputAggregator::new_with_capture_mode( dev_mode, kbd_tx.clone(), mou_tx.clone(), Some(paste_tx), capture_remote_boot, )) }; Ok(Self { aggregator: agg, server_addr, dev_mode, headless, kbd_tx, mou_tx, paste_rx: Some(paste_rx), }) } #[cfg(coverage)] pub async fn run(&mut self) -> Result<()> { info!(server = %self.server_addr, "🚦 starting handshake"); let _caps = handshake::negotiate(&self.server_addr).await; if self.headless { info!("πŸ§ͺ headless mode: skipping HID input capture"); } else { info!("πŸ§ͺ coverage mode: skipping runtime stream wiring"); } std::future::pending::>().await } #[cfg(not(coverage))] pub async fn run(&mut self) -> Result<()> { /*────────── handshake / feature-negotiation ───────────────*/ info!(server = %self.server_addr, "🚦 starting handshake"); let caps = handshake::negotiate(&self.server_addr).await; tracing::info!("🀝 server capabilities = {:?}", caps); let camera_cfg = app_support::camera_config_from_caps(&caps); /*────────── persistent gRPC channels ──────────*/ let hid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) .concurrency_limit(4) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_lazy(); let vid_ep = Channel::from_shared(self.server_addr.clone())? .initial_connection_window_size(4 << 20) .initial_stream_window_size(4 << 20) .tcp_nodelay(true) .connect_lazy(); let mut agg_task = None; let mut kbd_loop = None; let mut mou_loop = None; let mut paste_task = None; let paste_rx = self.paste_rx.take(); if !self.headless { /*────────── input aggregator task (grab after handshake) ─────────────*/ let mut aggregator = self.aggregator.take().expect("InputAggregator present"); info!("βŒ› grabbing input devices…"); aggregator.init()?; // grab devices now that handshake succeeded agg_task = Some(tokio::spawn(async move { let mut a = aggregator; a.run().await })); /*────────── HID streams (never return) ────────*/ kbd_loop = Some(self.stream_loop_keyboard(hid_ep.clone())); mou_loop = Some(self.stream_loop_mouse(hid_ep.clone())); if let Some(rx) = paste_rx { paste_task = Some(Self::paste_loop(hid_ep.clone(), rx)); } } else { info!("πŸ§ͺ headless mode: skipping HID input capture"); } /*───────── optional 300β€―s auto-exit in dev mode */ let suicide = async { if self.dev_mode { tokio::time::sleep(Duration::from_secs(300)).await; warn!("πŸ’€ dev-mode timeout"); std::process::exit(0); } else { std::future::pending::<()>().await } }; if !self.headless { let view_mode = std::env::var("LESAVKA_VIEW_MODE") .unwrap_or_else(|_| "breakout".to_string()) .to_ascii_lowercase(); let unified_view = view_mode == "unified"; let disable_video_render = std::env::var("LESAVKA_DISABLE_VIDEO_RENDER") .map(|value| value.trim() != "0") .unwrap_or(false); info!( "πŸͺŸ video layout selected: {}", if unified_view { "unified" } else { "breakout" } ); if disable_video_render { info!("πŸͺŸ launcher preview active; skipping standalone client video windows"); } else { /*────────── video rendering thread (winit) ────*/ let video_queue = app_support::sanitize_video_queue( std::env::var("LESAVKA_VIDEO_QUEUE") .ok() .and_then(|v| v.parse::().ok()), ); let dump_video = std::env::var("LESAVKA_DUMP_VIDEO").is_ok(); let (video_tx, mut video_rx) = tokio::sync::mpsc::channel::(video_queue); std::thread::spawn(move || { gtk::init().expect("GTK initialisation failed"); #[allow(deprecated)] { let el = EventLoopBuilder::<()>::new() .with_any_thread(true) .build() .unwrap(); enum Renderer { Unified(UnifiedMonitorWindow), Breakout { left: MonitorWindow, right: MonitorWindow, }, } let renderer = if unified_view { Renderer::Unified( UnifiedMonitorWindow::new().expect("unified-window") ) } else { Renderer::Breakout { left: MonitorWindow::new(0).expect("win0"), right: MonitorWindow::new(1).expect("win1"), } }; let _ = el.run(move |_: Event<()>, elwt| { elwt.set_control_flow(ControlFlow::WaitUntil( std::time::Instant::now() + std::time::Duration::from_millis(16), )); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); while let Ok(pkt) = video_rx.try_recv() { CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 { debug!( "πŸŽ₯ received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed) ); } if dump_video { static DUMP_CNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0); let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let eye = if pkt.id == 0 { "l" } else { "r" }; let path = format!("/tmp/eye{eye}-cli-{n:05}.h264"); std::fs::write(&path, &pkt.data).ok(); } match &renderer { Renderer::Unified(window) => window.push_packet(pkt), Renderer::Breakout { left, right } => match pkt.id { 0 => left.push_packet(pkt), 1 => right.push_packet(pkt), _ => {} }, } } }); } }); /*────────── start video gRPC pullers ──────────*/ let ep_video = vid_ep.clone(); tokio::spawn(Self::video_loop(ep_video, video_tx)); } /*────────── audio renderer & puller ───────────*/ let audio_out = AudioOut::new()?; let ep_audio = vid_ep.clone(); tokio::spawn(Self::audio_loop(ep_audio, audio_out)); } else { info!("πŸ§ͺ headless mode: skipping video/audio renderers"); } /*────────── camera & mic tasks (gated by caps) ───────────*/ if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { if let Some(cfg) = camera_cfg { info!( codec = ?cfg.codec, width = cfg.width, height = cfg.height, fps = cfg.fps, "πŸ“Έ using camera settings from server" ); } let cam = Arc::new(CameraCapture::new( std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref(), camera_cfg, )?); tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); } if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { let mic = Arc::new(MicrophoneCapture::new()?); tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed } /*────────── central reactor ───────────────────*/ if self.headless { tokio::select! { _ = suicide => { /* handled above */ }, } } else { let kbd_loop = kbd_loop.expect("kbd_loop"); let mou_loop = mou_loop.expect("mou_loop"); let agg_task = agg_task.expect("agg_task"); let paste_task = paste_task.expect("paste_task"); tokio::select! { _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, _ = mou_loop => { warn!("βš οΈπŸ–±οΈ mouse stream finished"); }, _ = paste_task => { warn!("βš οΈπŸ“‹ paste loop finished"); }, _ = suicide => { /* handled above */ }, r = agg_task => { match r { Ok(Ok(())) => warn!("input aggregator terminated cleanly"), Ok(Err(e)) => error!("input aggregator error: {e:?}"), Err(join_err) => error!("aggregator task panicked: {join_err:?}"), } return Ok(()); } } } // The branches above either loop forever or exit the process; this // point is unreachable but satisfies the type checker. #[allow(unreachable_code)] Ok(()) } /*──────────────── paste loop ───────────────*/ #[cfg(not(coverage))] fn paste_loop( ep: Channel, mut rx: mpsc::UnboundedReceiver, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut cli = RelayClient::new(ep.clone()); while let Some(text) = rx.recv().await { match paste::build_paste_request(&text) { Ok(req) => match cli.paste_text(Request::new(req)).await { Ok(resp) => { let reply = resp.get_ref(); if !reply.ok { warn!("πŸ“‹ paste rejected: {}", reply.error); } else { debug!("πŸ“‹ paste delivered"); } } Err(e) => { warn!("πŸ“‹ paste failed: {e}"); cli = RelayClient::new(ep.clone()); } }, Err(e) => { warn!("πŸ“‹ paste build failed: {e}"); } } } }) } /*──────────────── keyboard stream ───────────────*/ #[cfg(not(coverage))] async fn stream_loop_keyboard(&self, ep: Channel) { loop { info!("βŒ¨οΈπŸ€™ Keyboard dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok()); match cli.stream_keyboard(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; } } } Err(e) => warn!("❌⌨️ connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; // retry } } /*──────────────── mouse stream ──────────────────*/ #[cfg(not(coverage))] async fn stream_loop_mouse(&self, ep: Channel) { loop { info!("πŸ–±οΈπŸ€™ Mouse dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok()); match cli.stream_mouse(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { if let Err(e) = msg { warn!("πŸ–±οΈ server err: {e}"); break; } } } Err(e) => warn!("βŒπŸ–±οΈ connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } } /*──────────────── monitor stream ────────────────*/ #[cfg(not(coverage))] async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::Sender) { let max_bitrate = std::env::var("LESAVKA_VIDEO_MAX_KBIT") .ok() .and_then(|v| v.parse::().ok()) .unwrap_or(6_000); for monitor_id in 0..=1 { let ep = ep.clone(); let tx = tx.clone(); tokio::spawn(async move { loop { let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: monitor_id, max_bitrate, }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { debug!("πŸŽ₯🏁 cli video{monitor_id}: stream opened"); while let Some(res) = stream.get_mut().message().await.transpose() { match res { Ok(pkt) => { trace!( "πŸŽ₯πŸ“₯ cli video{monitor_id}: got {}β€―bytes", pkt.data.len() ); if tx.send(pkt).await.is_err() { warn!("⚠️πŸŽ₯ cli video{monitor_id}: GUI thread gone"); break; } } Err(e) => { error!("❌πŸŽ₯ cli video{monitor_id}: gRPC error: {e}"); break; } } } warn!("⚠️πŸŽ₯ cli video{monitor_id}: stream ended"); } Err(e) => error!("❌πŸŽ₯ video {monitor_id}: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } }); } } /*──────────────── audio stream ───────────────*/ #[cfg(not(coverage))] async fn audio_loop(ep: Channel, out: AudioOut) { loop { let mut cli = RelayClient::new(ep.clone()); let req = MonitorRequest { id: 0, max_bitrate: 0, }; match cli.capture_audio(Request::new(req)).await { Ok(mut stream) => { while let Some(res) = stream.get_mut().message().await.transpose() { if let Ok(pkt) = res { out.push(pkt); } } } Err(e) => tracing::warn!("βŒπŸ”Š audio stream err: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } } /*──────────────── mic stream ─────────────────*/ #[cfg(not(coverage))] async fn voice_loop(ep: Channel, mic: Arc) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let mut cli = RelayClient::new(ep.clone()); // 1. create a Tokio MPSC channel let (tx, rx) = tokio::sync::mpsc::channel::(256); let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); // 2. spawn a real thread that does the blocking `pull()` let mic_clone = mic.clone(); std::thread::spawn(move || { while stop_rx.try_recv().is_err() { if let Some(pkt) = mic_clone.pull() { trace!("πŸŽ€πŸ“€ cli {} bytes β†’ gRPC", pkt.data.len()); let _ = tx.blocking_send(pkt); } } }); // 3. turn `rx` into an async stream for gRPC let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); match cli.stream_microphone(Request::new(outbound)).await { Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {}, Err(e) => { // first failure β†’ warn, subsequent ones β†’ debug if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎀 connect failed: {e}"); warn!("⚠️🎀 further microphone‑stream failures will be logged at DEBUG"); } else { debug!("❌🎀 reconnect failed: {e}"); } delay = app_support::next_delay(delay); } } let _ = stop_tx.send(()); tokio::time::sleep(delay).await; } } /*──────────────── cam stream ───────────────────*/ #[cfg(not(coverage))] async fn cam_loop(ep: Channel, cam: Arc) { let mut delay = Duration::from_secs(1); loop { let mut cli = RelayClient::new(ep.clone()); let (tx, rx) = tokio::sync::mpsc::channel::(256); let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); let cam_worker = std::thread::spawn({ let cam = cam.clone(); move || loop { if stop_rx.try_recv().is_ok() { break; } let Some(pkt) = cam.pull() else { std::thread::sleep(Duration::from_millis(5)); continue; }; // TRACE every 120 frames only static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 120 == 0 { tracing::trace!("πŸ“Έ cli frame#{n} {} B", pkt.data.len()); } tracing::trace!("πŸ“Έβ¬†οΈ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); if tx.blocking_send(pkt).is_err() { break; } } }); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); match cli.stream_camera(Request::new(outbound)).await { Ok(mut resp) => { delay = Duration::from_secs(1); // got a stream β†’ reset while resp.get_mut().message().await.transpose().is_some() {} } Err(e) if e.code() == tonic::Code::Unimplemented => { tracing::warn!("πŸ“Έ server does not support StreamCamera – giving up"); let _ = stop_tx.send(()); let _ = cam_worker.join(); return; // stop the task completely (#3) } Err(e) => { tracing::warn!("βŒπŸ“Έ connect failed: {e:?}"); delay = app_support::next_delay(delay); // back-off (#2) } } let _ = stop_tx.send(()); let _ = cam_worker.join(); tokio::time::sleep(delay).await; } } }