diff --git a/client/src/app.rs b/client/src/app.rs index 7d1c4ac..b465f98 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -32,6 +32,7 @@ pub struct LesavkaClientApp { aggregator: Option, server_addr: String, dev_mode: bool, + headless: bool, kbd_tx: broadcast::Sender, mou_tx: broadcast::Sender, } @@ -39,6 +40,7 @@ pub struct LesavkaClientApp { impl LesavkaClientApp { pub fn new() -> Result { let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); + let headless = std::env::var("LESAVKA_HEADLESS").is_ok(); let server_addr = std::env::args() .nth(1) .or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok()) @@ -47,12 +49,17 @@ impl LesavkaClientApp { let (kbd_tx, _) = broadcast::channel(1024); let (mou_tx, _) = broadcast::channel(4096); - let agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); + let agg = if headless { + None + } else { + Some(InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone())) + }; Ok(Self { - aggregator: Some(agg), + aggregator: agg, server_addr, dev_mode, + headless, kbd_tx, mou_tx, }) @@ -98,18 +105,25 @@ impl LesavkaClientApp { .tcp_nodelay(true) .connect_lazy(); - /*────────── input aggregator task (grab after handshake) ─────────────*/ - let mut aggregator = self.aggregator.take().expect("InputAggregator present"); - info!("⌛ grabbing input devices…"); - aggregator.init()?; // grab devices now that handshake succeeded - let agg_task = tokio::spawn(async move { - let mut a = aggregator; - a.run().await - }); + let mut agg_task = None; + let mut kbd_loop = None; + let mut mou_loop = None; + if !self.headless { + /*────────── input aggregator task (grab after handshake) ─────────────*/ + let mut aggregator = self.aggregator.take().expect("InputAggregator present"); + info!("⌛ grabbing input devices…"); + aggregator.init()?; // grab devices now that handshake succeeded + agg_task = Some(tokio::spawn(async move { + let mut a = aggregator; + a.run().await + })); - /*────────── HID streams (never return) ────────*/ - let kbd_loop = self.stream_loop_keyboard(hid_ep.clone()); - let mou_loop = self.stream_loop_mouse(hid_ep.clone()); + /*────────── HID streams (never return) ────────*/ + kbd_loop = Some(self.stream_loop_keyboard(hid_ep.clone())); + mou_loop = Some(self.stream_loop_mouse(hid_ep.clone())); + } else { + info!("🧪 headless mode: skipping HID input capture"); + } /*───────── optional 300 s auto-exit in dev mode */ let suicide = async { @@ -122,57 +136,61 @@ impl LesavkaClientApp { } }; - /*────────── video rendering thread (winit) ────*/ - let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::(); + if !self.headless { + /*────────── video rendering thread (winit) ────*/ + let (video_tx, mut video_rx) = tokio::sync::mpsc::unbounded_channel::(); - std::thread::spawn(move || { - gtk::init().expect("GTK initialisation failed"); - let el = EventLoopBuilder::<()>::new() - .with_any_thread(true) - .build() - .unwrap(); - let win0 = MonitorWindow::new(0).expect("win0"); - let win1 = MonitorWindow::new(1).expect("win1"); + std::thread::spawn(move || { + gtk::init().expect("GTK initialisation failed"); + let el = EventLoopBuilder::<()>::new() + .with_any_thread(true) + .build() + .unwrap(); + let win0 = MonitorWindow::new(0).expect("win0"); + let win1 = MonitorWindow::new(1).expect("win1"); - let _ = el.run(move |_: Event<()>, _elwt| { - _elwt.set_control_flow(ControlFlow::WaitUntil( - std::time::Instant::now() + std::time::Duration::from_millis(16), - )); - static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); - static DUMP_CNT: std::sync::atomic::AtomicU32 = - std::sync::atomic::AtomicU32::new(0); - while let Ok(pkt) = video_rx.try_recv() { - CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 { - debug!( - "🎥 received {} video packets", - CNT.load(std::sync::atomic::Ordering::Relaxed) - ); + let _ = el.run(move |_: Event<()>, _elwt| { + _elwt.set_control_flow(ControlFlow::WaitUntil( + std::time::Instant::now() + std::time::Duration::from_millis(16), + )); + static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + static DUMP_CNT: std::sync::atomic::AtomicU32 = + std::sync::atomic::AtomicU32::new(0); + while let Ok(pkt) = video_rx.try_recv() { + CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 { + debug!( + "🎥 received {} video packets", + CNT.load(std::sync::atomic::Ordering::Relaxed) + ); + } + let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n % 120 == 0 { + let eye = if pkt.id == 0 { "l" } else { "r" }; + let path = format!("/tmp/eye{eye}-cli-{n:05}.h264"); + std::fs::write(&path, &pkt.data).ok(); + } + match pkt.id { + 0 => win0.push_packet(pkt), + 1 => win1.push_packet(pkt), + _ => {} + } } - let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n % 120 == 0 { - let eye = if pkt.id == 0 { "l" } else { "r" }; - let path = format!("/tmp/eye{eye}-cli-{n:05}.h264"); - std::fs::write(&path, &pkt.data).ok(); - } - match pkt.id { - 0 => win0.push_packet(pkt), - 1 => win1.push_packet(pkt), - _ => {} - } - } + }); }); - }); - /*────────── start video gRPC pullers ──────────*/ - let ep_video = vid_ep.clone(); - tokio::spawn(Self::video_loop(ep_video, video_tx)); + /*────────── start video gRPC pullers ──────────*/ + let ep_video = vid_ep.clone(); + tokio::spawn(Self::video_loop(ep_video, video_tx)); - /*────────── audio renderer & puller ───────────*/ - let audio_out = AudioOut::new()?; - let ep_audio = vid_ep.clone(); + /*────────── audio renderer & puller ───────────*/ + let audio_out = AudioOut::new()?; + let ep_audio = vid_ep.clone(); - tokio::spawn(Self::audio_loop(ep_audio, audio_out)); + tokio::spawn(Self::audio_loop(ep_audio, audio_out)); + } else { + info!("🧪 headless mode: skipping video/audio renderers"); + } /*────────── camera & mic tasks (gated by caps) ───────────*/ if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { if let Some(cfg) = camera_cfg { @@ -190,23 +208,32 @@ impl LesavkaClientApp { )?); tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); } - if caps.microphone { + if caps.microphone && std::env::var("LESAVKA_MIC_DISABLE").is_err() { let mic = Arc::new(MicrophoneCapture::new()?); tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed } /*────────── central reactor ───────────────────*/ - tokio::select! { - _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, - _ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); }, - _ = suicide => { /* handled above */ }, - r = agg_task => { - match r { - Ok(Ok(())) => warn!("input aggregator terminated cleanly"), - Ok(Err(e)) => error!("input aggregator error: {e:?}"), - Err(join_err) => error!("aggregator task panicked: {join_err:?}"), + if self.headless { + tokio::select! { + _ = suicide => { /* handled above */ }, + } + } else { + let kbd_loop = kbd_loop.expect("kbd_loop"); + let mou_loop = mou_loop.expect("mou_loop"); + let agg_task = agg_task.expect("agg_task"); + tokio::select! { + _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, + _ = mou_loop => { warn!("⚠️🖱️ mouse stream finished"); }, + _ = suicide => { /* handled above */ }, + r = agg_task => { + match r { + Ok(Ok(())) => warn!("input aggregator terminated cleanly"), + Ok(Err(e)) => error!("input aggregator error: {e:?}"), + Err(join_err) => error!("aggregator task panicked: {join_err:?}"), + } + std::process::exit(1); } - std::process::exit(1); } } diff --git a/client/src/main.rs b/client/src/main.rs index ab05991..ede4402 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -23,7 +23,10 @@ fn ensure_runtime_dir() { #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { - ensure_runtime_dir(); + let headless = env::var("LESAVKA_HEADLESS").is_ok(); + if !headless { + ensure_runtime_dir(); + } /*------------- common filter & stderr layer ------------------------*/ let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { diff --git a/server/src/video.rs b/server/src/video.rs index 696eefd..c429792 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -546,7 +546,15 @@ fn build_hdmi_sink(cfg: &CameraConfig) -> anyhow::Result { if gst::ElementFactory::find("kmssink").is_some() { let sink = gst::ElementFactory::make("kmssink").build()?; if let Some(connector) = cfg.hdmi.as_ref().and_then(|h| h.id) { - sink.set_property("connector-id", &connector); + if sink.has_property("connector-id", None) { + sink.set_property("connector-id", &(connector as i32)); + } else { + tracing::warn!( + target: "lesavka_server::video", + %connector, + "kmssink does not expose connector-id property; using default connector" + ); + } } sink.set_property("sync", &false); return Ok(sink);