lesavka/client/src/app.rs

509 lines
22 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#![cfg_attr(coverage, allow(unused_imports))]
use anyhow::Result;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
use tonic::{Request, transport::Channel};
use tracing::{debug, error, info, trace, warn};
use winit::{
event::Event,
event_loop::{ControlFlow, EventLoopBuilder},
platform::wayland::EventLoopBuilderExtWayland,
};
use lesavka_common::lesavka::{
AudioPacket, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
relay_client::RelayClient,
};
use crate::{
app_support, handshake, input::camera::CameraCapture, input::inputs::InputAggregator,
input::microphone::MicrophoneCapture, output::audio::AudioOut, output::video::MonitorWindow,
paste,
};
pub struct LesavkaClientApp {
aggregator: Option<InputAggregator>,
server_addr: String,
dev_mode: bool,
headless: bool,
kbd_tx: broadcast::Sender<KeyboardReport>,
mou_tx: broadcast::Sender<MouseReport>,
paste_rx: Option<mpsc::UnboundedReceiver<String>>,
}
impl LesavkaClientApp {
pub fn new() -> Result<Self> {
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let headless = std::env::var("LESAVKA_HEADLESS").is_ok();
let args = std::env::args().skip(1).collect::<Vec<_>>();
let env_addr = std::env::var("LESAVKA_SERVER_ADDR").ok();
let server_addr = app_support::resolve_server_addr(&args, env_addr.as_deref());
let (kbd_tx, _) = broadcast::channel(1024);
let (mou_tx, _) = broadcast::channel(4096);
let (paste_tx, paste_rx) = mpsc::unbounded_channel();
let agg = if headless {
None
} else {
Some(InputAggregator::new(
dev_mode,
kbd_tx.clone(),
mou_tx.clone(),
Some(paste_tx),
))
};
Ok(Self {
aggregator: agg,
server_addr,
dev_mode,
headless,
kbd_tx,
mou_tx,
paste_rx: Some(paste_rx),
})
}
#[cfg(coverage)]
pub async fn run(&mut self) -> Result<()> {
info!(server = %self.server_addr, "🚦 starting handshake");
let _caps = handshake::negotiate(&self.server_addr).await;
if self.headless {
info!("🧪 headless mode: skipping HID input capture");
} else {
info!("🧪 coverage mode: skipping runtime stream wiring");
}
std::future::pending::<Result<()>>().await
}
#[cfg(not(coverage))]
pub async fn run(&mut self) -> Result<()> {
/*────────── handshake / feature-negotiation ───────────────*/
info!(server = %self.server_addr, "🚦 starting handshake");
let caps = handshake::negotiate(&self.server_addr).await;
tracing::info!("🤝 server capabilities = {:?}", caps);
let camera_cfg = app_support::camera_config_from_caps(&caps);
/*────────── persistent gRPC channels ──────────*/
let hid_ep = Channel::from_shared(self.server_addr.clone())?
.tcp_nodelay(true)
.concurrency_limit(4)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_lazy();
let vid_ep = Channel::from_shared(self.server_addr.clone())?
.initial_connection_window_size(4 << 20)
.initial_stream_window_size(4 << 20)
.tcp_nodelay(true)
.connect_lazy();
let mut agg_task = None;
let mut kbd_loop = None;
let mut mou_loop = None;
let mut paste_task = None;
let paste_rx = self.paste_rx.take();
if !self.headless {
/*────────── input aggregator task (grab after handshake) ─────────────*/
let mut aggregator = self.aggregator.take().expect("InputAggregator present");
info!("⌛ grabbing input devices…");
aggregator.init()?; // grab devices now that handshake succeeded
agg_task = Some(tokio::spawn(async move {
let mut a = aggregator;
a.run().await
}));
/*────────── HID streams (never return) ────────*/
kbd_loop = Some(self.stream_loop_keyboard(hid_ep.clone()));
mou_loop = Some(self.stream_loop_mouse(hid_ep.clone()));
if let Some(rx) = paste_rx {
paste_task = Some(Self::paste_loop(hid_ep.clone(), rx));
}
} else {
info!("🧪 headless mode: skipping HID input capture");
}
/*───────── optional 300s auto-exit in dev mode */
let suicide = async {
if self.dev_mode {
tokio::time::sleep(Duration::from_secs(300)).await;
warn!("💀 dev-mode timeout");
std::process::exit(0);
} else {
std::future::pending::<()>().await
}
};
if !self.headless {
/*────────── video rendering thread (winit) ────*/
let video_queue = app_support::sanitize_video_queue(
std::env::var("LESAVKA_VIDEO_QUEUE")
.ok()
.and_then(|v| v.parse::<usize>().ok()),
);
let dump_video = std::env::var("LESAVKA_DUMP_VIDEO").is_ok();
let (video_tx, mut video_rx) = tokio::sync::mpsc::channel::<VideoPacket>(video_queue);
std::thread::spawn(move || {
gtk::init().expect("GTK initialisation failed");
#[allow(deprecated)]
{
let el = EventLoopBuilder::<()>::new()
.with_any_thread(true)
.build()
.unwrap();
let win0 = MonitorWindow::new(0).expect("win0");
let win1 = MonitorWindow::new(1).expect("win1");
let _ = el.run(move |_: Event<()>, _elwt| {
_elwt.set_control_flow(ControlFlow::WaitUntil(
std::time::Instant::now() + std::time::Duration::from_millis(16),
));
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
while let Ok(pkt) = video_rx.try_recv() {
CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 {
debug!(
"🎥 received {} video packets",
CNT.load(std::sync::atomic::Ordering::Relaxed)
);
}
if dump_video {
static DUMP_CNT: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);
let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let eye = if pkt.id == 0 { "l" } else { "r" };
let path = format!("/tmp/eye{eye}-cli-{n:05}.h264");
std::fs::write(&path, &pkt.data).ok();
}
match pkt.id {
0 => win0.push_packet(pkt),
1 => win1.push_packet(pkt),
_ => {}
}
}
});
}
});
/*────────── start video gRPC pullers ──────────*/
let ep_video = vid_ep.clone();
tokio::spawn(Self::video_loop(ep_video, video_tx));
/*────────── audio renderer & puller ───────────*/
let audio_out = AudioOut::new()?;
let ep_audio = vid_ep.clone();
tokio::spawn(Self::audio_loop(ep_audio, audio_out));
} else {
info!("🧪 headless mode: skipping video/audio renderers");
}
/*────────── camera & mic tasks (gated by caps) ───────────*/
if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() {
if let Some(cfg) = camera_cfg {
info!(
codec = ?cfg.codec,
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
"📸 using camera settings from server"
);
}
let cam = Arc::new(CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref(),
camera_cfg,
)?);
tokio::spawn(Self::cam_loop(vid_ep.clone(), cam));
}
if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() {
let mic = Arc::new(MicrophoneCapture::new()?);
tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed
}
/*────────── central reactor ───────────────────*/
if self.headless {
tokio::select! {
_ = suicide => { /* handled above */ },
}
} else {
let kbd_loop = kbd_loop.expect("kbd_loop");
let mou_loop = mou_loop.expect("mou_loop");
let agg_task = agg_task.expect("agg_task");
let paste_task = paste_task.expect("paste_task");
tokio::select! {
_ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); },
_ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); },
_ = paste_task => { warn!("⚠️📋 paste loop finished"); },
_ = suicide => { /* handled above */ },
r = agg_task => {
match r {
Ok(Ok(())) => warn!("input aggregator terminated cleanly"),
Ok(Err(e)) => error!("input aggregator error: {e:?}"),
Err(join_err) => error!("aggregator task panicked: {join_err:?}"),
}
return Ok(());
}
}
}
// The branches above either loop forever or exit the process; this
// point is unreachable but satisfies the type checker.
#[allow(unreachable_code)]
Ok(())
}
/*──────────────── paste loop ───────────────*/
#[cfg(not(coverage))]
fn paste_loop(
ep: Channel,
mut rx: mpsc::UnboundedReceiver<String>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut cli = RelayClient::new(ep.clone());
while let Some(text) = rx.recv().await {
match paste::build_paste_request(&text) {
Ok(req) => match cli.paste_text(Request::new(req)).await {
Ok(resp) => {
let reply = resp.get_ref();
if !reply.ok {
warn!("📋 paste rejected: {}", reply.error);
} else {
debug!("📋 paste delivered");
}
}
Err(e) => {
warn!("📋 paste failed: {e}");
cli = RelayClient::new(ep.clone());
}
},
Err(e) => {
warn!("📋 paste build failed: {e}");
}
}
}
})
}
/*──────────────── keyboard stream ───────────────*/
#[cfg(not(coverage))]
async fn stream_loop_keyboard(&self, ep: Channel) {
loop {
info!("⌨️🤙 Keyboard dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok());
match cli.stream_keyboard(Request::new(outbound)).await {
Ok(mut resp) => {
while let Some(msg) = resp.get_mut().message().await.transpose() {
if let Err(e) = msg {
warn!("⌨️ server err: {e}");
break;
}
}
}
Err(e) => warn!("❌⌨️ connect failed: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await; // retry
}
}
/*──────────────── mouse stream ──────────────────*/
#[cfg(not(coverage))]
async fn stream_loop_mouse(&self, ep: Channel) {
loop {
info!("🖱️🤙 Mouse dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok());
match cli.stream_mouse(Request::new(outbound)).await {
Ok(mut resp) => {
while let Some(msg) = resp.get_mut().message().await.transpose() {
if let Err(e) = msg {
warn!("🖱️ server err: {e}");
break;
}
}
}
Err(e) => warn!("❌🖱️ connect failed: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
/*──────────────── monitor stream ────────────────*/
#[cfg(not(coverage))]
async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::Sender<VideoPacket>) {
let max_bitrate = std::env::var("LESAVKA_VIDEO_MAX_KBIT")
.ok()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(4_000);
for monitor_id in 0..=1 {
let ep = ep.clone();
let tx = tx.clone();
tokio::spawn(async move {
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest {
id: monitor_id,
max_bitrate,
};
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
debug!("🎥🏁 cli video{monitor_id}: stream opened");
while let Some(res) = stream.get_mut().message().await.transpose() {
match res {
Ok(pkt) => {
trace!(
"🎥📥 cli video{monitor_id}: got {}bytes",
pkt.data.len()
);
if tx.send(pkt).await.is_err() {
warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone");
break;
}
}
Err(e) => {
error!("❌🎥 cli video{monitor_id}: gRPC error: {e}");
break;
}
}
}
warn!("⚠️🎥 cli video{monitor_id}: stream ended");
}
Err(e) => error!("❌🎥 video {monitor_id}: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}
}
/*──────────────── audio stream ───────────────*/
#[cfg(not(coverage))]
async fn audio_loop(ep: Channel, out: AudioOut) {
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest {
id: 0,
max_bitrate: 0,
};
match cli.capture_audio(Request::new(req)).await {
Ok(mut stream) => {
while let Some(res) = stream.get_mut().message().await.transpose() {
if let Ok(pkt) = res {
out.push(pkt);
}
}
}
Err(e) => tracing::warn!("❌🔊 audio stream err: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
/*──────────────── mic stream ─────────────────*/
#[cfg(not(coverage))]
async fn voice_loop(ep: Channel, mic: Arc<MicrophoneCapture>) {
let mut delay = Duration::from_secs(1);
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let mut cli = RelayClient::new(ep.clone());
// 1. create a Tokio MPSC channel
let (tx, rx) = tokio::sync::mpsc::channel::<AudioPacket>(256);
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
// 2. spawn a real thread that does the blocking `pull()`
let mic_clone = mic.clone();
std::thread::spawn(move || {
while stop_rx.try_recv().is_err() {
if let Some(pkt) = mic_clone.pull() {
trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len());
let _ = tx.blocking_send(pkt);
}
}
});
// 3. turn `rx` into an async stream for gRPC
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {},
Err(e) => {
// first failure → warn, subsequent ones → debug
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
warn!("❌🎤 connect failed: {e}");
warn!("⚠️🎤 further microphonestream failures will be logged at DEBUG");
} else {
debug!("❌🎤 reconnect failed: {e}");
}
delay = app_support::next_delay(delay);
}
}
let _ = stop_tx.send(());
tokio::time::sleep(delay).await;
}
}
/*──────────────── cam stream ───────────────────*/
#[cfg(not(coverage))]
async fn cam_loop(ep: Channel, cam: Arc<CameraCapture>) {
let mut delay = Duration::from_secs(1);
loop {
let mut cli = RelayClient::new(ep.clone());
let (tx, rx) = tokio::sync::mpsc::channel::<VideoPacket>(256);
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let cam_worker = std::thread::spawn({
let cam = cam.clone();
move || loop {
if stop_rx.try_recv().is_ok() {
break;
}
let Some(pkt) = cam.pull() else {
std::thread::sleep(Duration::from_millis(5));
continue;
};
// TRACE every 120 frames only
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 120 == 0 {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
if tx.blocking_send(pkt).is_err() {
break;
}
}
});
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_camera(Request::new(outbound)).await {
Ok(mut resp) => {
delay = Duration::from_secs(1); // got a stream → reset
while resp.get_mut().message().await.transpose().is_some() {}
}
Err(e) if e.code() == tonic::Code::Unimplemented => {
tracing::warn!("📸 server does not support StreamCamera giving up");
let _ = stop_tx.send(());
let _ = cam_worker.join();
return; // stop the task completely (#3)
}
Err(e) => {
tracing::warn!("❌📸 connect failed: {e:?}");
delay = app_support::next_delay(delay); // back-off (#2)
}
}
let _ = stop_tx.send(());
let _ = cam_worker.join();
tokio::time::sleep(delay).await;
}
}
}