lesavka/client/src/app/session_lifecycle.rs

325 lines
15 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

impl LesavkaClientApp {
pub fn new() -> Result<Self> {
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let headless = std::env::var("LESAVKA_HEADLESS").is_ok();
let capture_remote_boot = std::env::var("LESAVKA_CAPTURE_REMOTE")
.map(|value| value != "0")
.unwrap_or(true);
let args = std::env::args().skip(1).collect::<Vec<_>>();
let env_addr = std::env::var("LESAVKA_SERVER_ADDR").ok();
let server_addr = app_support::resolve_server_addr(&args, env_addr.as_deref());
let (kbd_tx, _) = broadcast::channel(1024);
let (mou_tx, _) = broadcast::channel(4096);
let (paste_tx, paste_rx) = mpsc::unbounded_channel();
let (agg, remote_capture_enabled) = if headless {
(None, Arc::new(AtomicBool::new(false)))
} else {
let aggregator = InputAggregator::new_with_capture_mode(
dev_mode,
kbd_tx.clone(),
mou_tx.clone(),
Some(paste_tx),
capture_remote_boot,
);
let remote_capture_enabled = aggregator.remote_capture_enabled_handle();
(Some(aggregator), remote_capture_enabled)
};
Ok(Self {
aggregator: agg,
server_addr,
dev_mode,
headless,
kbd_tx,
mou_tx,
paste_rx: Some(paste_rx),
remote_capture_enabled,
})
}
#[cfg(coverage)]
pub async fn run(&mut self) -> Result<()> {
info!(server = %self.server_addr, "🚦 starting handshake");
let _caps = handshake::negotiate(&self.server_addr).await;
if self.headless {
info!("🧪 headless mode: skipping HID input capture");
} else {
info!("🧪 coverage mode: skipping runtime stream wiring");
}
std::future::pending::<Result<()>>().await
}
#[cfg(not(coverage))]
pub async fn run(&mut self) -> Result<()> {
/*────────── handshake / feature-negotiation ───────────────*/
info!(server = %self.server_addr, "🚦 starting handshake");
let caps = handshake::negotiate(&self.server_addr).await;
tracing::info!("🤝 server capabilities = {:?}", caps);
let camera_cfg = app_support::camera_config_from_caps(&caps);
let uplink_telemetry = crate::uplink_telemetry::UplinkTelemetryPublisher::from_env(
caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(),
caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(),
);
/*────────── persistent gRPC channels ──────────*/
let hid_ep = Channel::from_shared(self.server_addr.clone())?
.tcp_nodelay(true)
.concurrency_limit(4)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_lazy();
let vid_ep = Channel::from_shared(self.server_addr.clone())?
.initial_connection_window_size(4 << 20)
.initial_stream_window_size(4 << 20)
.tcp_nodelay(true)
.connect_lazy();
let mut agg_task = None;
let mut kbd_loop = None;
let mut mou_loop = None;
let mut paste_task = None;
let paste_rx = self.paste_rx.take();
if !self.headless {
/*────────── input aggregator task (grab after handshake) ─────────────*/
let mut aggregator = self.aggregator.take().expect("InputAggregator present");
info!("⌛ grabbing input devices…");
aggregator.init()?; // grab devices now that handshake succeeded
agg_task = Some(tokio::spawn(async move {
let mut a = aggregator;
a.run().await
}));
/*────────── HID streams (never return) ────────*/
kbd_loop = Some(self.stream_loop_keyboard(hid_ep.clone()));
mou_loop = Some(self.stream_loop_mouse(hid_ep.clone()));
if let Some(rx) = paste_rx {
paste_task = Some(Self::paste_loop(hid_ep.clone(), rx));
}
} else {
info!("🧪 headless mode: skipping HID input capture");
}
/*───────── optional 300s auto-exit in dev mode */
let suicide = async {
if self.dev_mode {
tokio::time::sleep(Duration::from_secs(300)).await;
warn!("💀 dev-mode timeout");
std::process::exit(0);
} else {
std::future::pending::<()>().await
}
};
if !self.headless {
let view_mode = std::env::var("LESAVKA_VIEW_MODE")
.unwrap_or_else(|_| "breakout".to_string())
.to_ascii_lowercase();
let unified_view = view_mode == "unified";
let disable_video_render = std::env::var("LESAVKA_DISABLE_VIDEO_RENDER")
.map(|value| value.trim() != "0")
.unwrap_or(false);
info!(
"🪟 video layout selected: {}",
if unified_view { "unified" } else { "breakout" }
);
if disable_video_render {
info!("🪟 launcher preview active; skipping standalone client video windows");
} else {
/*────────── video rendering thread (winit) ────*/
let video_queue = app_support::sanitize_video_queue(
std::env::var("LESAVKA_VIDEO_QUEUE")
.ok()
.and_then(|v| v.parse::<usize>().ok()),
);
let dump_video = std::env::var("LESAVKA_DUMP_VIDEO").is_ok();
let (video_tx, mut video_rx) =
tokio::sync::mpsc::channel::<VideoPacket>(video_queue);
std::thread::spawn(move || {
gtk::init().expect("GTK initialisation failed");
#[allow(deprecated)]
{
let el = EventLoopBuilder::<()>::new()
.with_any_thread(true)
.build()
.unwrap();
enum Renderer {
Unified(UnifiedMonitorWindow),
Breakout {
left: MonitorWindow,
right: MonitorWindow,
},
}
let renderer = if unified_view {
Renderer::Unified(UnifiedMonitorWindow::new().expect("unified-window"))
} else {
Renderer::Breakout {
left: MonitorWindow::new(0).expect("win0"),
right: MonitorWindow::new(1).expect("win1"),
}
};
let _ = el.run(move |_: Event<()>, elwt| {
elwt.set_control_flow(ControlFlow::WaitUntil(
std::time::Instant::now() + std::time::Duration::from_millis(8),
));
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
while let Ok(pkt) = video_rx.try_recv() {
CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if CNT.load(std::sync::atomic::Ordering::Relaxed).is_multiple_of(300) {
debug!(
"🎥 received {} video packets",
CNT.load(std::sync::atomic::Ordering::Relaxed)
);
}
if dump_video {
static DUMP_CNT: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);
let n =
DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let eye = if pkt.id == 0 { "l" } else { "r" };
let path = format!("/tmp/eye{eye}-cli-{n:05}.h264");
std::fs::write(&path, &pkt.data).ok();
}
match &renderer {
Renderer::Unified(window) => window.push_packet(pkt),
Renderer::Breakout { left, right } => match pkt.id {
0 => left.push_packet(pkt),
1 => right.push_packet(pkt),
_ => {}
},
}
}
});
}
});
/*────────── start video gRPC pullers ──────────*/
let ep_video = vid_ep.clone();
tokio::spawn(Self::video_loop(ep_video, video_tx));
}
/*────────── audio renderer & puller ───────────*/
if std::env::var("LESAVKA_AUDIO_DISABLE").is_err() {
let audio_out = AudioOut::new()?;
let ep_audio = vid_ep.clone();
tokio::spawn(Self::audio_loop(ep_audio, audio_out));
} else {
info!("🔇 remote audio disabled for this relay session");
}
} else {
info!("🧪 headless mode: skipping video/audio renderers");
}
/*────────── camera & mic tasks (gated by caps) ───────────*/
if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() {
if let Some(cfg) = camera_cfg {
info!(
codec = ?cfg.codec,
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
"📸 using camera settings from server"
);
}
let ep = vid_ep.clone();
let cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok();
let cam_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera);
tokio::spawn(async move {
let result = tokio::task::spawn_blocking(move || {
CameraCapture::new(cam_source.as_deref(), camera_cfg)
})
.await;
match result {
Ok(Ok(cam)) => {
let cam = Arc::new(cam);
tokio::spawn(Self::cam_loop(ep, cam, cam_telemetry.clone()));
}
Ok(Err(err)) => {
cam_telemetry.record_disconnect(format!(
"webcam uplink setup failed: {err:#}"
));
warn!(
"📸 webcam uplink is unavailable for this relay session; continuing without StreamCamera: {err:#}"
);
}
Err(err) => {
cam_telemetry.record_disconnect(format!(
"webcam uplink setup task failed: {err}"
));
warn!(
"📸 webcam uplink setup task failed before StreamCamera could start: {err}"
);
}
}
});
}
if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() {
let ep = vid_ep.clone();
let mic_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone);
tokio::spawn(async move {
let result = tokio::task::spawn_blocking(MicrophoneCapture::new).await;
match result {
Ok(Ok(mic)) => {
let mic = Arc::new(mic);
tokio::spawn(Self::voice_loop(ep, mic, mic_telemetry.clone()));
}
Ok(Err(err)) => {
mic_telemetry.record_disconnect(format!(
"microphone uplink setup failed: {err:#}"
));
warn!(
"🎤 microphone uplink is unavailable for this relay session; continuing without StreamMicrophone: {err:#}"
);
}
Err(err) => {
mic_telemetry.record_disconnect(format!(
"microphone uplink setup task failed: {err}"
));
warn!(
"🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}"
);
}
}
});
}
/*────────── central reactor ───────────────────*/
if self.headless {
tokio::select! {
_ = suicide => { /* handled above */ },
}
} else {
let kbd_loop = kbd_loop.expect("kbd_loop");
let mou_loop = mou_loop.expect("mou_loop");
let agg_task = agg_task.expect("agg_task");
let paste_task = paste_task.expect("paste_task");
tokio::select! {
_ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); },
_ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); },
_ = paste_task => { warn!("⚠️📋 paste loop finished"); },
_ = suicide => { /* handled above */ },
r = agg_task => {
match r {
Ok(Ok(())) => warn!("input aggregator terminated cleanly"),
Ok(Err(e)) => error!("input aggregator error: {e:?}"),
Err(join_err) => error!("aggregator task panicked: {join_err:?}"),
}
return Ok(());
}
}
}
// The branches above either loop forever or exit the process; this
// point is unreachable but satisfies the type checker.
#[allow(unreachable_code)]
Ok(())
}
}