impl LesavkaClientApp { pub fn new() -> Result { let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); let headless = std::env::var("LESAVKA_HEADLESS").is_ok(); let capture_remote_boot = std::env::var("LESAVKA_CAPTURE_REMOTE") .map(|value| value != "0") .unwrap_or(true); let args = std::env::args().skip(1).collect::>(); let env_addr = std::env::var("LESAVKA_SERVER_ADDR").ok(); let server_addr = app_support::resolve_server_addr(&args, env_addr.as_deref()); let (kbd_tx, _) = broadcast::channel(1024); let (mou_tx, _) = broadcast::channel(4096); let (paste_tx, paste_rx) = mpsc::unbounded_channel(); let (agg, remote_capture_enabled) = if headless { (None, Arc::new(AtomicBool::new(false))) } else { let aggregator = InputAggregator::new_with_capture_mode( dev_mode, kbd_tx.clone(), mou_tx.clone(), Some(paste_tx), capture_remote_boot, ); let remote_capture_enabled = aggregator.remote_capture_enabled_handle(); (Some(aggregator), remote_capture_enabled) }; Ok(Self { aggregator: agg, server_addr, dev_mode, headless, kbd_tx, mou_tx, paste_rx: Some(paste_rx), remote_capture_enabled, }) } #[cfg(coverage)] pub async fn run(&mut self) -> Result<()> { info!(server = %self.server_addr, "🚦 starting handshake"); let _caps = handshake::negotiate(&self.server_addr).await; if self.headless { info!("πŸ§ͺ headless mode: skipping HID input capture"); } else { info!("πŸ§ͺ coverage mode: skipping runtime stream wiring"); } std::future::pending::>().await } #[cfg(not(coverage))] pub async fn run(&mut self) -> Result<()> { /*────────── handshake / feature-negotiation ───────────────*/ info!(server = %self.server_addr, "🚦 starting handshake"); let caps = handshake::negotiate(&self.server_addr).await; tracing::info!("🀝 server capabilities = {:?}", caps); let camera_cfg = app_support::camera_config_from_caps(&caps); let uplink_telemetry = crate::uplink_telemetry::UplinkTelemetryPublisher::from_env( caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(), caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(), ); /*────────── persistent gRPC channels ──────────*/ let hid_ep = relay_transport::endpoint(&self.server_addr)? .tcp_nodelay(true) .concurrency_limit(4) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_lazy(); let vid_ep = relay_transport::endpoint(&self.server_addr)? .initial_connection_window_size(4 << 20) .initial_stream_window_size(4 << 20) .tcp_nodelay(true) .connect_lazy(); let mut agg_task = None; let mut kbd_loop = None; let mut mou_loop = None; let mut paste_task = None; let paste_rx = self.paste_rx.take(); if !self.headless { /*────────── input aggregator task (grab after handshake) ─────────────*/ let mut aggregator = self.aggregator.take().expect("InputAggregator present"); info!("βŒ› grabbing input devices…"); aggregator.init()?; // grab devices now that handshake succeeded agg_task = Some(tokio::spawn(async move { let mut a = aggregator; a.run().await })); /*────────── HID streams (never return) ────────*/ kbd_loop = Some(self.stream_loop_keyboard(hid_ep.clone())); mou_loop = Some(self.stream_loop_mouse(hid_ep.clone())); if let Some(rx) = paste_rx { paste_task = Some(Self::paste_loop(hid_ep.clone(), rx)); } } else { info!("πŸ§ͺ headless mode: skipping HID input capture"); } /*───────── optional 300β€―s auto-exit in dev mode */ let suicide = async { if self.dev_mode { tokio::time::sleep(Duration::from_secs(300)).await; warn!("πŸ’€ dev-mode timeout"); std::process::exit(0); } else { std::future::pending::<()>().await } }; if !self.headless { let view_mode = std::env::var("LESAVKA_VIEW_MODE") .unwrap_or_else(|_| "breakout".to_string()) .to_ascii_lowercase(); let unified_view = view_mode == "unified"; let disable_video_render = std::env::var("LESAVKA_DISABLE_VIDEO_RENDER") .map(|value| value.trim() != "0") .unwrap_or(false); info!( "πŸͺŸ video layout selected: {}", if unified_view { "unified" } else { "breakout" } ); if disable_video_render { info!("πŸͺŸ launcher preview active; skipping standalone client video windows"); } else { /*────────── video rendering thread (winit) ────*/ let video_queue = app_support::sanitize_video_queue( std::env::var("LESAVKA_VIDEO_QUEUE") .ok() .and_then(|v| v.parse::().ok()), ); let dump_video = std::env::var("LESAVKA_DUMP_VIDEO").is_ok(); let (video_tx, mut video_rx) = tokio::sync::mpsc::channel::(video_queue); std::thread::spawn(move || { gtk::init().expect("GTK initialisation failed"); #[allow(deprecated)] { let el = EventLoopBuilder::<()>::new() .with_any_thread(true) .build() .unwrap(); enum Renderer { Unified(UnifiedMonitorWindow), Breakout { left: MonitorWindow, right: MonitorWindow, }, } let renderer = if unified_view { Renderer::Unified(UnifiedMonitorWindow::new().expect("unified-window")) } else { Renderer::Breakout { left: MonitorWindow::new(0).expect("win0"), right: MonitorWindow::new(1).expect("win1"), } }; let _ = el.run(move |_: Event<()>, elwt| { elwt.set_control_flow(ControlFlow::WaitUntil( std::time::Instant::now() + std::time::Duration::from_millis(8), )); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); while let Ok(pkt) = video_rx.try_recv() { CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if CNT.load(std::sync::atomic::Ordering::Relaxed).is_multiple_of(300) { debug!( "πŸŽ₯ received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed) ); } if dump_video { static DUMP_CNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0); let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let eye = if pkt.id == 0 { "l" } else { "r" }; let path = format!("/tmp/eye{eye}-cli-{n:05}.h264"); std::fs::write(&path, &pkt.data).ok(); } match &renderer { Renderer::Unified(window) => window.push_packet(pkt), Renderer::Breakout { left, right } => match pkt.id { 0 => left.push_packet(pkt), 1 => right.push_packet(pkt), _ => {} }, } } }); } }); /*────────── start video gRPC pullers ──────────*/ let ep_video = vid_ep.clone(); tokio::spawn(Self::video_loop(ep_video, video_tx)); } /*────────── audio renderer & puller ───────────*/ if std::env::var("LESAVKA_AUDIO_DISABLE").is_err() { let audio_out = AudioOut::new()?; let ep_audio = vid_ep.clone(); tokio::spawn(Self::audio_loop(ep_audio, audio_out)); } else { info!("πŸ”‡ remote audio disabled for this relay session"); } } else { info!("πŸ§ͺ headless mode: skipping video/audio renderers"); } /*────────── camera & mic tasks (gated by caps) ───────────*/ if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { if let Some(cfg) = camera_cfg { info!( codec = ?cfg.codec, width = cfg.width, height = cfg.height, fps = cfg.fps, "πŸ“Έ using camera settings from server" ); } let ep = vid_ep.clone(); let cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok(); let cam_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera); tokio::spawn(async move { let result = tokio::task::spawn_blocking(move || { CameraCapture::new(cam_source.as_deref(), camera_cfg) }) .await; match result { Ok(Ok(cam)) => { let cam = Arc::new(cam); tokio::spawn(Self::cam_loop(ep, cam, cam_telemetry.clone())); } Ok(Err(err)) => { cam_telemetry.record_disconnect(format!( "webcam uplink setup failed: {err:#}" )); warn!( "πŸ“Έ webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}" ); } Err(err) => { cam_telemetry.record_disconnect(format!( "webcam uplink setup task failed: {err}" )); warn!( "πŸ“Έ webcam uplink setup task failed before StreamCamera could start: {err}" ); } } }); } if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { let ep = vid_ep.clone(); let mic_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); tokio::spawn(async move { let result = tokio::task::spawn_blocking(MicrophoneCapture::new).await; match result { Ok(Ok(mic)) => { let mic = Arc::new(mic); tokio::spawn(Self::voice_loop(ep, mic, mic_telemetry.clone())); } Ok(Err(err)) => { mic_telemetry.record_disconnect(format!( "microphone uplink setup failed: {err:#}" )); warn!( "🎀 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}" ); } Err(err) => { mic_telemetry.record_disconnect(format!( "microphone uplink setup task failed: {err}" )); warn!( "🎀 microphone uplink setup task failed before StreamMicrophone could start: {err}" ); } } }); } /*────────── central reactor ───────────────────*/ if self.headless { tokio::select! { _ = suicide => { /* handled above */ }, } } else { let kbd_loop = kbd_loop.expect("kbd_loop"); let mou_loop = mou_loop.expect("mou_loop"); let agg_task = agg_task.expect("agg_task"); let paste_task = paste_task.expect("paste_task"); tokio::select! { _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, _ = mou_loop => { warn!("βš οΈπŸ–±οΈ mouse stream finished"); }, _ = paste_task => { warn!("βš οΈπŸ“‹ paste loop finished"); }, _ = suicide => { /* handled above */ }, r = agg_task => { match r { Ok(Ok(())) => warn!("input aggregator terminated cleanly"), Ok(Err(e)) => error!("input aggregator error: {e:?}"), Err(join_err) => error!("aggregator task panicked: {join_err:?}"), } return Ok(()); } } } // The branches above either loop forever or exit the process; this // point is unreachable but satisfies the type checker. #[allow(unreachable_code)] Ok(()) } }