lesavka/client/src/app/session_lifecycle.rs

302 lines
14 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

impl LesavkaClientApp {
pub fn new() -> Result<Self> {
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let headless = std::env::var("LESAVKA_HEADLESS").is_ok();
let capture_remote_boot = std::env::var("LESAVKA_CAPTURE_REMOTE")
.map(|value| value != "0")
.unwrap_or(true);
let args = std::env::args().skip(1).collect::<Vec<_>>();
let env_addr = std::env::var("LESAVKA_SERVER_ADDR").ok();
let server_addr = app_support::resolve_server_addr(&args, env_addr.as_deref());
let (kbd_tx, _) = broadcast::channel(1024);
let (mou_tx, _) = broadcast::channel(4096);
let (paste_tx, paste_rx) = mpsc::unbounded_channel();
let (agg, remote_capture_enabled) = if headless {
(None, Arc::new(AtomicBool::new(false)))
} else {
let aggregator = InputAggregator::new_with_capture_mode(
dev_mode,
kbd_tx.clone(),
mou_tx.clone(),
Some(paste_tx),
capture_remote_boot,
);
let remote_capture_enabled = aggregator.remote_capture_enabled_handle();
(Some(aggregator), remote_capture_enabled)
};
Ok(Self {
aggregator: agg,
server_addr,
dev_mode,
headless,
kbd_tx,
mou_tx,
paste_rx: Some(paste_rx),
remote_capture_enabled,
})
}
#[cfg(coverage)]
pub async fn run(&mut self) -> Result<()> {
info!(server = %self.server_addr, "🚦 starting handshake");
let _caps = handshake::negotiate(&self.server_addr).await;
if self.headless {
info!("🧪 headless mode: skipping HID input capture");
} else {
info!("🧪 coverage mode: skipping runtime stream wiring");
}
std::future::pending::<Result<()>>().await
}
#[cfg(not(coverage))]
pub async fn run(&mut self) -> Result<()> {
/*────────── handshake / feature-negotiation ───────────────*/
info!(server = %self.server_addr, "🚦 starting handshake");
let caps = handshake::negotiate(&self.server_addr).await;
tracing::info!("🤝 server capabilities = {:?}", caps);
let camera_cfg = app_support::camera_config_from_caps(&caps);
let initial_cam_source = std::env::var("LESAVKA_CAM_SOURCE").ok();
let initial_cam_profile = initial_camera_profile_id_from_env();
let initial_mic_source = std::env::var("LESAVKA_MIC_SOURCE").ok();
let initial_audio_sink = std::env::var("LESAVKA_AUDIO_SINK").ok();
let media_controls = crate::live_media_control::LiveMediaControls::from_env(
crate::live_media_control::MediaControlState::with_devices(
caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err(),
caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err(),
std::env::var("LESAVKA_AUDIO_DISABLE").is_err(),
initial_cam_source.clone(),
initial_cam_profile.clone(),
initial_mic_source.clone(),
initial_audio_sink.clone(),
),
);
let media_state = media_controls.refresh();
let uplink_telemetry = crate::uplink_telemetry::UplinkTelemetryPublisher::from_env(
media_state.camera,
media_state.microphone,
);
/*────────── persistent gRPC channels ──────────*/
let hid_ep = relay_transport::endpoint(&self.server_addr)?
.tcp_nodelay(true)
.concurrency_limit(4)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_lazy();
let vid_ep = relay_transport::endpoint(&self.server_addr)?
.initial_connection_window_size(4 << 20)
.initial_stream_window_size(4 << 20)
.tcp_nodelay(true)
.connect_lazy();
let mut agg_task = None;
let mut kbd_loop = None;
let mut mou_loop = None;
let mut paste_task = None;
let paste_rx = self.paste_rx.take();
if !self.headless {
/*────────── input aggregator task (grab after handshake) ─────────────*/
let mut aggregator = self.aggregator.take().expect("InputAggregator present");
info!("⌛ grabbing input devices…");
aggregator.init()?; // grab devices now that handshake succeeded
agg_task = Some(tokio::spawn(async move {
let mut a = aggregator;
a.run().await
}));
/*────────── HID streams (never return) ────────*/
kbd_loop = Some(self.stream_loop_keyboard(hid_ep.clone()));
mou_loop = Some(self.stream_loop_mouse(hid_ep.clone()));
if let Some(rx) = paste_rx {
paste_task = Some(Self::paste_loop(hid_ep.clone(), rx));
}
} else {
info!("🧪 headless mode: skipping HID input capture");
}
/*───────── optional 300s auto-exit in dev mode */
let suicide = async {
if self.dev_mode {
tokio::time::sleep(Duration::from_secs(300)).await;
warn!("💀 dev-mode timeout");
std::process::exit(0);
} else {
std::future::pending::<()>().await
}
};
if !self.headless {
let view_mode = std::env::var("LESAVKA_VIEW_MODE")
.unwrap_or_else(|_| "breakout".to_string())
.to_ascii_lowercase();
let unified_view = view_mode == "unified";
let disable_video_render = std::env::var("LESAVKA_DISABLE_VIDEO_RENDER")
.map(|value| value.trim() != "0")
.unwrap_or(false);
info!(
"🪟 video layout selected: {}",
if unified_view { "unified" } else { "breakout" }
);
if disable_video_render {
info!("🪟 launcher preview active; skipping standalone client video windows");
} else {
/*────────── video rendering thread (winit) ────*/
let video_queue = app_support::sanitize_video_queue(
std::env::var("LESAVKA_VIDEO_QUEUE")
.ok()
.and_then(|v| v.parse::<usize>().ok()),
);
let dump_video = std::env::var("LESAVKA_DUMP_VIDEO").is_ok();
let (video_tx, mut video_rx) =
tokio::sync::mpsc::channel::<VideoPacket>(video_queue);
std::thread::spawn(move || {
gtk::init().expect("GTK initialisation failed");
#[allow(deprecated)]
{
let el = EventLoopBuilder::<()>::new()
.with_any_thread(true)
.build()
.unwrap();
enum Renderer {
Unified(UnifiedMonitorWindow),
Breakout {
left: MonitorWindow,
right: MonitorWindow,
},
}
let renderer = if unified_view {
Renderer::Unified(UnifiedMonitorWindow::new().expect("unified-window"))
} else {
Renderer::Breakout {
left: MonitorWindow::new(0).expect("win0"),
right: MonitorWindow::new(1).expect("win1"),
}
};
let _ = el.run(move |_: Event<()>, elwt| {
elwt.set_control_flow(ControlFlow::WaitUntil(
std::time::Instant::now() + std::time::Duration::from_millis(8),
));
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
while let Ok(pkt) = video_rx.try_recv() {
CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if CNT.load(std::sync::atomic::Ordering::Relaxed).is_multiple_of(300) {
debug!(
"🎥 received {} video packets",
CNT.load(std::sync::atomic::Ordering::Relaxed)
);
}
if dump_video {
static DUMP_CNT: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);
let n =
DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let eye = if pkt.id == 0 { "l" } else { "r" };
let path = format!("/tmp/eye{eye}-cli-{n:05}.h264");
std::fs::write(&path, &pkt.data).ok();
}
match &renderer {
Renderer::Unified(window) => window.push_packet(pkt),
Renderer::Breakout { left, right } => match pkt.id {
0 => left.push_packet(pkt),
1 => right.push_packet(pkt),
_ => {}
},
}
}
});
}
});
/*────────── start video gRPC pullers ──────────*/
let ep_video = vid_ep.clone();
tokio::spawn(Self::video_loop(ep_video, video_tx));
}
/*────────── audio renderer & puller ───────────*/
if std::env::var("LESAVKA_AUDIO_DISABLE").is_err() {
let ep_audio = vid_ep.clone();
tokio::spawn(Self::audio_loop(ep_audio, media_controls.clone()));
} else {
info!("🔇 remote audio disabled for this relay session");
}
} else {
info!("🧪 headless mode: skipping video/audio renderers");
}
/*────────── camera & mic tasks (gated by caps) ───────────*/
if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() {
if let Some(cfg) = camera_cfg {
info!(
codec = ?cfg.codec,
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
"📸 using negotiated server UVC caps for emitted format; launcher quality still controls local capture"
);
}
let ep = vid_ep.clone();
let cam_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Camera);
let media_controls = media_controls.clone();
tokio::spawn(Self::cam_loop(
ep,
initial_cam_source.clone(),
initial_cam_profile.clone(),
camera_cfg,
cam_telemetry,
media_controls,
));
}
if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() {
let ep = vid_ep.clone();
let mic_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone);
let media_controls = media_controls.clone();
tokio::spawn(Self::voice_loop(
ep,
initial_mic_source.clone(),
mic_telemetry,
media_controls,
));
}
/*────────── central reactor ───────────────────*/
if self.headless {
tokio::select! {
_ = suicide => { /* handled above */ },
}
} else {
let kbd_loop = kbd_loop.expect("kbd_loop");
let mou_loop = mou_loop.expect("mou_loop");
let agg_task = agg_task.expect("agg_task");
let paste_task = paste_task.expect("paste_task");
tokio::select! {
_ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); },
_ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); },
_ = paste_task => { warn!("⚠️📋 paste loop finished"); },
_ = suicide => { /* handled above */ },
r = agg_task => {
match r {
Ok(Ok(())) => warn!("input aggregator terminated cleanly"),
Ok(Err(e)) => error!("input aggregator error: {e:?}"),
Err(join_err) => error!("aggregator task panicked: {join_err:?}"),
}
return Ok(());
}
}
}
// The branches above either loop forever or exit the process; this
// point is unreachable but satisfies the type checker.
#[allow(unreachable_code)]
Ok(())
}
}