From 3458b42a119fda28a06745e46a7014ec7f2df102 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 1 Dec 2025 11:38:51 -0300 Subject: [PATCH] client: mic selection defaults improved --- client/src/app.rs | 139 +++++++++++++++++++-------------- client/src/handshake.rs | 10 +-- client/src/input/camera.rs | 51 ++++++------ client/src/input/inputs.rs | 91 +++++++++++++-------- client/src/input/keyboard.rs | 72 +++++++++++++---- client/src/input/keymap.rs | 78 +++++++++--------- client/src/input/microphone.rs | 109 +++++++++++++++++--------- client/src/input/mod.rs | 12 +-- client/src/input/mouse.rs | 95 ++++++++++++++++------ client/src/layout.rs | 36 +++++---- client/src/lib.rs | 6 +- client/src/main.rs | 7 +- client/src/output/audio.rs | 67 +++++++++------- client/src/output/display.rs | 29 ++++--- client/src/output/layout.rs | 57 ++++++++++---- client/src/output/mod.rs | 4 +- client/src/output/video.rs | 77 +++++++++--------- 17 files changed, 579 insertions(+), 361 deletions(-) diff --git a/client/src/app.rs b/client/src/app.rs index 2b8acf6..b6d5ca4 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -1,30 +1,28 @@ #![forbid(unsafe_code)] use anyhow::Result; -use std::time::Duration; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::time::Duration; use tokio::sync::broadcast; -use tokio_stream::{wrappers::BroadcastStream, StreamExt}; -use tonic::{transport::Channel, Request}; -use tracing::{error, trace, debug, info, warn}; +use tokio_stream::{StreamExt, wrappers::BroadcastStream}; +use tonic::{Request, transport::Channel}; +use tracing::{debug, error, info, trace, warn}; use winit::{ event::Event, - event_loop::{EventLoopBuilder, ControlFlow}, + event_loop::{ControlFlow, EventLoopBuilder}, platform::wayland::EventLoopBuilderExtWayland, }; use lesavka_common::lesavka::{ - relay_client::RelayClient, KeyboardReport, - MonitorRequest, MouseReport, VideoPacket, AudioPacket + AudioPacket, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, + relay_client::RelayClient, }; -use crate::{handshake, - input::inputs::InputAggregator, - input::microphone::MicrophoneCapture, - input::camera::CameraCapture, - output::video::MonitorWindow, - output::audio::AudioOut}; +use crate::{ + handshake, input::camera::CameraCapture, input::inputs::InputAggregator, + input::microphone::MicrophoneCapture, output::audio::AudioOut, output::video::MonitorWindow, +}; pub struct LesavkaClientApp { aggregator: Option, @@ -36,7 +34,7 @@ pub struct LesavkaClientApp { impl LesavkaClientApp { pub fn new() -> Result { - let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); + let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok(); let server_addr = std::env::args() .nth(1) .or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok()) @@ -47,7 +45,13 @@ impl LesavkaClientApp { let agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone()); - Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx }) + Ok(Self { + aggregator: Some(agg), + server_addr, + dev_mode, + kbd_tx, + mou_tx, + }) } pub async fn run(&mut self) -> Result<()> { @@ -64,16 +68,16 @@ impl LesavkaClientApp { .connect_lazy(); let vid_ep = Channel::from_shared(self.server_addr.clone())? - .initial_connection_window_size(4<<20) - .initial_stream_window_size(4<<20) + .initial_connection_window_size(4 << 20) + .initial_stream_window_size(4 << 20) .tcp_nodelay(true) .connect_lazy(); /*────────── input aggregator task (grab after handshake) ─────────────*/ let mut aggregator = self.aggregator.take().expect("InputAggregator present"); info!("⌛ grabbing input devices…"); - aggregator.init()?; // grab devices now that handshake succeeded - let agg_task = tokio::spawn(async move { + aggregator.init()?; // grab devices now that handshake succeeded + let agg_task = tokio::spawn(async move { let mut a = aggregator; a.run().await }); @@ -98,19 +102,27 @@ impl LesavkaClientApp { std::thread::spawn(move || { gtk::init().expect("GTK initialisation failed"); - let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap(); + let el = EventLoopBuilder::<()>::new() + .with_any_thread(true) + .build() + .unwrap(); let win0 = MonitorWindow::new(0).expect("win0"); let win1 = MonitorWindow::new(1).expect("win1"); let _ = el.run(move |_: Event<()>, _elwt| { _elwt.set_control_flow(ControlFlow::WaitUntil( - std::time::Instant::now() + std::time::Duration::from_millis(16))); + std::time::Instant::now() + std::time::Duration::from_millis(16), + )); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); - static DUMP_CNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0); + static DUMP_CNT: std::sync::atomic::AtomicU32 = + std::sync::atomic::AtomicU32::new(0); while let Ok(pkt) = video_rx.try_recv() { CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 { - debug!("🎥 received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed)); + debug!( + "🎥 received {} video packets", + CNT.load(std::sync::atomic::Ordering::Relaxed) + ); } let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 120 == 0 { @@ -133,19 +145,19 @@ impl LesavkaClientApp { /*────────── audio renderer & puller ───────────*/ let audio_out = AudioOut::new()?; - let ep_audio = vid_ep.clone(); + let ep_audio = vid_ep.clone(); tokio::spawn(Self::audio_loop(ep_audio, audio_out)); /*────────── camera & mic tasks (gated by caps) ───────────*/ if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() { let cam = Arc::new(CameraCapture::new( - std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref() + std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref(), )?); tokio::spawn(Self::cam_loop(vid_ep.clone(), cam)); } if caps.microphone { let mic = Arc::new(MicrophoneCapture::new()?); - tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed + tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed } /*────────── central reactor ───────────────────*/ @@ -174,19 +186,21 @@ impl LesavkaClientApp { loop { info!("⌨️🤙 Keyboard dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); - - let outbound = BroadcastStream::new(self.kbd_tx.subscribe()) - .filter_map(|r| r.ok()); - + + let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok()); + match cli.stream_keyboard(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { - if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; } + if let Err(e) = msg { + warn!("⌨️ server err: {e}"); + break; + } } } Err(e) => warn!("❌⌨️ connect failed: {e}"), } - tokio::time::sleep(Duration::from_secs(1)).await; // retry + tokio::time::sleep(Duration::from_secs(1)).await; // retry } } @@ -195,14 +209,16 @@ impl LesavkaClientApp { loop { info!("🖱️🤙 Mouse dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); - - let outbound = BroadcastStream::new(self.mou_tx.subscribe()) - .filter_map(|r| r.ok()); - + + let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok()); + match cli.stream_mouse(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { - if let Err(e) = msg { warn!("🖱️ server err: {e}"); break; } + if let Err(e) = msg { + warn!("🖱️ server err: {e}"); + break; + } } } Err(e) => warn!("❌🖱️ connect failed: {e}"), @@ -212,24 +228,27 @@ impl LesavkaClientApp { } /*──────────────── monitor stream ────────────────*/ - async fn video_loop( - ep: Channel, - tx: tokio::sync::mpsc::UnboundedSender, - ) { + async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::UnboundedSender) { for monitor_id in 0..=1 { let ep = ep.clone(); let tx = tx.clone(); tokio::spawn(async move { loop { let mut cli = RelayClient::new(ep.clone()); - let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; + let req = MonitorRequest { + id: monitor_id, + max_bitrate: 6_000, + }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { debug!("🎥🏁 cli video{monitor_id}: stream opened"); while let Some(res) = stream.get_mut().message().await.transpose() { match res { Ok(pkt) => { - trace!("🎥📥 cli video{monitor_id}: got {} bytes", pkt.data.len()); + trace!( + "🎥📥 cli video{monitor_id}: got {} bytes", + pkt.data.len() + ); if tx.send(pkt).is_err() { warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone"); break; @@ -255,11 +274,16 @@ impl LesavkaClientApp { async fn audio_loop(ep: Channel, out: AudioOut) { loop { let mut cli = RelayClient::new(ep.clone()); - let req = MonitorRequest { id: 0, max_bitrate: 0 }; + let req = MonitorRequest { + id: 0, + max_bitrate: 0, + }; match cli.capture_audio(Request::new(req)).await { Ok(mut stream) => { while let Some(res) = stream.get_mut().message().await.transpose() { - if let Ok(pkt) = res { out.push(pkt); } + if let Ok(pkt) = res { + out.push(pkt); + } } } Err(e) => tracing::warn!("❌🔊 audio stream err: {e}"), @@ -274,11 +298,11 @@ impl LesavkaClientApp { static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let mut cli = RelayClient::new(ep.clone()); - + // 1. create a Tokio MPSC channel let (tx, rx) = tokio::sync::mpsc::channel::(256); let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); - + // 2. spawn a real thread that does the blocking `pull()` let mic_clone = mic.clone(); std::thread::spawn(move || { @@ -289,14 +313,12 @@ impl LesavkaClientApp { } } }); - + // 3. turn `rx` into an async stream for gRPC let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - match cli.stream_microphone(Request::new(outbound)).await { - Ok(mut resp) => { - while resp.get_mut().message().await.transpose().is_some() {} - } - Err(e) => { + match cli.stream_microphone(Request::new(outbound)).await { + Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {}, + Err(e) => { // first failure → warn, subsequent ones → debug if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎤 connect failed: {e}"); @@ -330,8 +352,7 @@ impl LesavkaClientApp { if n < 10 || n % 120 == 0 { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } - tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", - pkt.pts, pkt.data.len()); + tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); let _ = tx.try_send(pkt); } } @@ -339,14 +360,14 @@ impl LesavkaClientApp { let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); match cli.stream_camera(Request::new(outbound)).await { - Ok(_) => delay = Duration::from_secs(1), // got a stream → reset + Ok(_) => delay = Duration::from_secs(1), // got a stream → reset Err(e) if e.code() == tonic::Code::Unimplemented => { tracing::warn!("📸 server does not support StreamCamera – giving up"); - return; // stop the task completely (#3) + return; // stop the task completely (#3) } Err(e) => { tracing::warn!("❌📸 connect failed: {e:?}"); - delay = next_delay(delay); // back-off (#2) + delay = next_delay(delay); // back-off (#2) } } tokio::time::sleep(delay).await; @@ -357,6 +378,6 @@ impl LesavkaClientApp { fn next_delay(cur: std::time::Duration) -> std::time::Duration { match cur.as_secs() { 1..=15 => cur * 2, - _ => std::time::Duration::from_secs(30), + _ => std::time::Duration::from_secs(30), } } diff --git a/client/src/handshake.rs b/client/src/handshake.rs index 783029e..c1d0a61 100644 --- a/client/src/handshake.rs +++ b/client/src/handshake.rs @@ -1,16 +1,16 @@ // client/src/handshake.rs #![forbid(unsafe_code)] -use std::time::Duration; use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient}; -use tonic::{Code, transport::Endpoint}; +use std::time::Duration; use tokio::time::timeout; +use tonic::{Code, transport::Endpoint}; use tracing::{info, warn}; #[derive(Default, Clone, Copy, Debug)] pub struct PeerCaps { - pub camera: bool, - pub microphone: bool, + pub camera: bool, + pub microphone: bool, } pub async fn negotiate(uri: &str) -> PeerCaps { @@ -34,7 +34,7 @@ pub async fn negotiate(uri: &str) -> PeerCaps { match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await { Ok(Ok(rsp)) => { let caps = PeerCaps { - camera: rsp.get_ref().camera, + camera: rsp.get_ref().camera, microphone: rsp.get_ref().microphone, }; info!(?caps, "🤝 handshake ok"); diff --git a/client/src/input/camera.rs b/client/src/input/camera.rs index ab59bb7..ee949fa 100644 --- a/client/src/input/camera.rs +++ b/client/src/input/camera.rs @@ -2,15 +2,15 @@ #![forbid(unsafe_code)] use anyhow::Context; +use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; -use gst::prelude::*; use lesavka_common::lesavka::VideoPacket; pub struct CameraCapture { - #[allow(dead_code)] // kept alive to hold PLAYING state + #[allow(dead_code)] // kept alive to hold PLAYING state pipeline: gst::Pipeline, - sink: gst_app::AppSink, + sink: gst_app::AppSink, } impl CameraCapture { @@ -20,8 +20,7 @@ impl CameraCapture { // Pick device (prefers V4L2 nodes with capture capability) let dev = match device_fragment { Some(path) if path.starts_with("/dev/") => path.to_string(), - Some(fragment) => Self::find_device(fragment) - .unwrap_or_else(|| "/dev/video0".into()), + Some(fragment) => Self::find_device(fragment).unwrap_or_else(|| "/dev/video0".into()), None => "/dev/video0".into(), }; @@ -46,7 +45,7 @@ impl CameraCapture { _ => ("video/x-raw,width=1280,height=720", "videoconvert !"), - }; + }; // let desc = format!( // "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \ @@ -88,10 +87,14 @@ impl CameraCapture { pub fn pull(&self) -> Option { let sample = self.sink.pull_sample().ok()?; - let buf = sample.buffer()?; - let map = buf.map_readable().ok()?; + let buf = sample.buffer()?; + let map = buf.map_readable().ok()?; let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; - Some(VideoPacket { id: 2, pts, data: map.as_slice().to_vec() }) + Some(VideoPacket { + id: 2, + pts, + data: map.as_slice().to_vec(), + }) } /// Fuzzy‑match devices under `/dev/v4l/by-id`, preferring capture nodes @@ -134,8 +137,7 @@ impl CameraCapture { .and_then(|d| d.query_caps().ok()) .map(|caps| { let bits = caps.capabilities.bits(); - (bits & V4L2_CAP_VIDEO_CAPTURE != 0) - || (bits & V4L2_CAP_VIDEO_CAPTURE_MPLANE != 0) + (bits & V4L2_CAP_VIDEO_CAPTURE != 0) || (bits & V4L2_CAP_VIDEO_CAPTURE_MPLANE != 0) }) .unwrap_or(false) } @@ -151,13 +153,13 @@ impl CameraCapture { Self { pipeline, sink } } - #[allow(dead_code)] // helper kept for future heuristics + #[allow(dead_code)] // helper kept for future heuristics fn pick_encoder() -> (&'static str, &'static str) { let encoders = &[ - ("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"), - ("vaapih264enc","video/x-raw,format=NV12"), - ("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc. - ("x264enc", "video/x-raw"), // software + ("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"), + ("vaapih264enc", "video/x-raw,format=NV12"), + ("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc. + ("x264enc", "video/x-raw"), // software ]; for (name, caps) in encoders { if gst::ElementFactory::find(name).is_some() { @@ -170,13 +172,16 @@ impl CameraCapture { fn choose_encoder() -> (&'static str, &'static str, &'static str) { match () { - _ if gst::ElementFactory::find("nvh264enc").is_some() => - ("nvh264enc", "gop-size", "30"), - _ if gst::ElementFactory::find("vaapih264enc").is_some() => - ("vaapih264enc","keyframe-period","30"), - _ if gst::ElementFactory::find("v4l2h264enc").is_some() => - ("v4l2h264enc","idrcount", "30"), - _ => ("x264enc", "key-int-max", "30"), + _ if gst::ElementFactory::find("nvh264enc").is_some() => { + ("nvh264enc", "gop-size", "30") + } + _ if gst::ElementFactory::find("vaapih264enc").is_some() => { + ("vaapih264enc", "keyframe-period", "30") + } + _ if gst::ElementFactory::find("v4l2h264enc").is_some() => { + ("v4l2h264enc", "idrcount", "30") + } + _ => ("x264enc", "key-int-max", "30"), } } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index d3a3346..5d32329 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -1,8 +1,11 @@ // client/src/input/inputs.rs -use anyhow::{bail, Context, Result}; +use anyhow::{Context, Result, bail}; use evdev::{Device, EventType, KeyCode, RelativeAxisCode}; -use tokio::{sync::broadcast::Sender, time::{interval, Duration}}; +use tokio::{ + sync::broadcast::Sender, + time::{Duration, interval}, +}; use tracing::{debug, info, warn}; use lesavka_common::lesavka::{KeyboardReport, MouseReport}; @@ -21,19 +24,26 @@ pub struct InputAggregator { } impl InputAggregator { - pub fn new(dev_mode: bool, - kbd_tx: Sender, - mou_tx: Sender) -> Self { - Self { kbd_tx, mou_tx, dev_mode, released: false, magic_active: false, - keyboards: Vec::new(), mice: Vec::new() - } + pub fn new( + dev_mode: bool, + kbd_tx: Sender, + mou_tx: Sender, + ) -> Self { + Self { + kbd_tx, + mou_tx, + dev_mode, + released: false, + magic_active: false, + keyboards: Vec::new(), + mice: Vec::new(), + } } /// Called once at startup: enumerates input devices, /// classifies them, and constructs a aggregator struct per type. pub fn init(&mut self) -> Result<()> { - let paths = std::fs::read_dir("/dev/input") - .context("Failed to read /dev/input")?; + let paths = std::fs::read_dir("/dev/input").context("Failed to read /dev/input")?; let mut found_any = false; @@ -42,7 +52,10 @@ impl InputAggregator { let path = entry.path(); // skip anything that isn't "event*" - if !path.file_name().map_or(false, |f| f.to_string_lossy().starts_with("event")) { + if !path + .file_name() + .map_or(false, |f| f.to_string_lossy().starts_with("event")) + { continue; } @@ -56,12 +69,17 @@ impl InputAggregator { }; // non-blocking so fetch_events never stalls the whole loop - dev.set_nonblocking(true).with_context(|| format!("set_non_blocking {:?}", path))?; + dev.set_nonblocking(true) + .with_context(|| format!("set_non_blocking {:?}", path))?; match classify_device(&dev) { DeviceKind::Keyboard => { - dev.grab().with_context(|| format!("grabbing keyboard {path:?}"))?; - info!("🤏🖱️ Grabbed keyboard {:?}", dev.name().unwrap_or("UNKNOWN")); + dev.grab() + .with_context(|| format!("grabbing keyboard {path:?}"))?; + info!( + "🤏🖱️ Grabbed keyboard {:?}", + dev.name().unwrap_or("UNKNOWN") + ); // pass dev_mode to aggregator // let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode); @@ -71,7 +89,8 @@ impl InputAggregator { continue; } DeviceKind::Mouse => { - dev.grab().with_context(|| format!("grabbing mouse {path:?}"))?; + dev.grab() + .with_context(|| format!("grabbing mouse {path:?}"))?; info!("🤏⌨️ Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN")); // let mouse_agg = MouseAggregator::new(dev); @@ -81,7 +100,10 @@ impl InputAggregator { continue; } DeviceKind::Other => { - debug!("Skipping non-kbd/mouse device: {:?}", dev.name().unwrap_or("UNKNOWN")); + debug!( + "Skipping non-kbd/mouse device: {:?}", + dev.name().unwrap_or("UNKNOWN") + ); continue; } } @@ -104,24 +126,26 @@ impl InputAggregator { let magic_now = self.keyboards.iter().any(|k| k.magic_grab()); let magic_left = self.keyboards.iter().any(|k| k.magic_left()); let magic_right = self.keyboards.iter().any(|k| k.magic_right()); - let mut want_kill = false; + let mut want_kill = false; for kbd in &mut self.keyboards { kbd.process_events(); - want_kill |= kbd.magic_kill(); + want_kill |= kbd.magic_kill(); } - if magic_now && !self.magic_active { self.toggle_grab(); } + if magic_now && !self.magic_active { + self.toggle_grab(); + } if (magic_left || magic_right) && self.magic_active { current = match current { Layout::SideBySide => Layout::FullLeft, - Layout::FullLeft => Layout::FullRight, - Layout::FullRight => Layout::SideBySide, + Layout::FullLeft => Layout::FullRight, + Layout::FullRight => Layout::SideBySide, }; apply_layout(current); } - if want_kill { + if want_kill { warn!("🧙 magic chord - killing 🪄 AVADA KEDAVRA!!! 💥💀⚰️"); - std::process::exit(0); + std::process::exit(0); } for mouse in &mut self.mice { @@ -139,8 +163,14 @@ impl InputAggregator { } else { tracing::info!("🧙 magic chord - freeing devices 🪄 EXPELLIARMUS!!! 🔓🕊️"); } - for k in &mut self.keyboards { k.set_grab(self.released); k.set_send(self.released); } - for m in &mut self.mice { m.set_grab(self.released); m.set_send(self.released); } + for k in &mut self.keyboards { + k.set_grab(self.released); + k.set_send(self.released); + } + for m in &mut self.mice { + m.set_grab(self.released); + m.set_send(self.released); + } self.released = !self.released; } } @@ -160,13 +190,10 @@ fn classify_device(dev: &Device) -> DeviceKind { // Mouse logic if evbits.contains(EventType::RELATIVE) { - if let (Some(rel), Some(keys)) = - (dev.supported_relative_axes(), dev.supported_keys()) - { - let has_xy = rel.contains(RelativeAxisCode::REL_X) - && rel.contains(RelativeAxisCode::REL_Y); - let has_btn = keys.contains(KeyCode::BTN_LEFT) - || keys.contains(KeyCode::BTN_RIGHT); + if let (Some(rel), Some(keys)) = (dev.supported_relative_axes(), dev.supported_keys()) { + let has_xy = + rel.contains(RelativeAxisCode::REL_X) && rel.contains(RelativeAxisCode::REL_Y); + let has_btn = keys.contains(KeyCode::BTN_LEFT) || keys.contains(KeyCode::BTN_RIGHT); if has_xy && has_btn { return DeviceKind::Mouse; } diff --git a/client/src/input/keyboard.rs b/client/src/input/keyboard.rs index 6ce8fd6..a203789 100644 --- a/client/src/input/keyboard.rs +++ b/client/src/input/keyboard.rs @@ -1,7 +1,10 @@ // client/src/input/keyboard.rs -use std::{collections::HashSet, sync::atomic::{AtomicU32, Ordering}}; use evdev::{Device, EventType, InputEvent, KeyCode}; +use std::{ + collections::HashSet, + sync::atomic::{AtomicU32, Ordering}, +}; use tokio::sync::broadcast::Sender; use tracing::{debug, error, trace}; @@ -11,7 +14,7 @@ use super::keymap::{is_modifier, keycode_to_usage}; pub struct KeyboardAggregator { dev: Device, - tx: Sender, + tx: Sender, dev_mode: bool, sending_disabled: bool, pressed_keys: HashSet, @@ -24,11 +27,21 @@ static SEQ: AtomicU32 = AtomicU32::new(0); impl KeyboardAggregator { pub fn new(dev: Device, dev_mode: bool, tx: Sender) -> Self { let _ = dev.set_nonblocking(true); - Self { dev, tx, dev_mode, sending_disabled: false, pressed_keys: HashSet::new()} + Self { + dev, + tx, + dev_mode, + sending_disabled: false, + pressed_keys: HashSet::new(), + } } pub fn set_grab(&mut self, grab: bool) { - let _ = if grab { self.dev.grab() } else { self.dev.ungrab() }; + let _ = if grab { + self.dev.grab() + } else { + self.dev.ungrab() + }; } pub fn set_send(&mut self, send: bool) { @@ -40,28 +53,47 @@ impl KeyboardAggregator { let events: Vec = match self.dev.fetch_events() { Ok(it) => it.collect(), Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return, - Err(e) => { if self.dev_mode { error!("⌨️❌ read error: {e}"); } return } + Err(e) => { + if self.dev_mode { + error!("⌨️❌ read error: {e}"); + } + return; + } }; if self.dev_mode && !events.is_empty() { - trace!("⌨️ {} kbd evts from {}", events.len(), self.dev.name().unwrap_or("?")); + trace!( + "⌨️ {} kbd evts from {}", + events.len(), + self.dev.name().unwrap_or("?") + ); } for ev in events { - if ev.event_type() != EventType::KEY { continue } + if ev.event_type() != EventType::KEY { + continue; + } let code = KeyCode::new(ev.code()); match ev.value() { - 1 => { self.pressed_keys.insert(code); } // press - 0 => { self.pressed_keys.remove(&code); } // release + 1 => { + self.pressed_keys.insert(code); + } // press + 0 => { + self.pressed_keys.remove(&code); + } // release _ => {} } let report = self.build_report(); // Generate a local sequence number for debugging/log-merge only. let id = SEQ.fetch_add(1, Ordering::Relaxed); - if self.dev_mode { debug!(seq = id, ?report, "kbd"); } + if self.dev_mode { + debug!(seq = id, ?report, "kbd"); + } if !self.sending_disabled { - let _ = self.tx.send(KeyboardReport { data: report.to_vec() }); + let _ = self.tx.send(KeyboardReport { + data: report.to_vec(), + }); } } } @@ -72,16 +104,23 @@ impl KeyboardAggregator { let mut keys = Vec::new(); for &kc in &self.pressed_keys { - if let Some(m) = is_modifier(kc) { mods |= m } - else if let Some(u) = keycode_to_usage(kc) { keys.push(u) } + if let Some(m) = is_modifier(kc) { + mods |= m + } else if let Some(u) = keycode_to_usage(kc) { + keys.push(u) + } } out[0] = mods; - for (i, k) in keys.into_iter().take(6).enumerate() { out[2+i] = k } + for (i, k) in keys.into_iter().take(6).enumerate() { + out[2 + i] = k + } out } - pub fn has_key(&self, kc: KeyCode) -> bool { self.pressed_keys.contains(&kc) } + pub fn has_key(&self, kc: KeyCode) -> bool { + self.pressed_keys.contains(&kc) + } pub fn magic_grab(&self) -> bool { self.has_key(KeyCode::KEY_LEFTCTRL) @@ -102,8 +141,7 @@ impl KeyboardAggregator { } pub fn magic_kill(&self) -> bool { - self.has_key(KeyCode::KEY_LEFTCTRL) - && self.has_key(KeyCode::KEY_ESC) + self.has_key(KeyCode::KEY_LEFTCTRL) && self.has_key(KeyCode::KEY_ESC) } } diff --git a/client/src/input/keymap.rs b/client/src/input/keymap.rs index 388d54f..b4d3192 100644 --- a/client/src/input/keymap.rs +++ b/client/src/input/keymap.rs @@ -77,46 +77,46 @@ pub fn keycode_to_usage(key: KeyCode) -> Option { KeyCode::KEY_F10 => Some(0x43), KeyCode::KEY_F11 => Some(0x44), KeyCode::KEY_F12 => Some(0x45), - + // --- Navigation / editing cluster --------------------------------- - KeyCode::KEY_SYSRQ => Some(0x46), // Print-Screen - KeyCode::KEY_SCROLLLOCK => Some(0x47), - KeyCode::KEY_PAUSE => Some(0x48), - KeyCode::KEY_INSERT => Some(0x49), - KeyCode::KEY_HOME => Some(0x4A), - KeyCode::KEY_PAGEUP => Some(0x4B), - KeyCode::KEY_DELETE => Some(0x4C), - KeyCode::KEY_END => Some(0x4D), - KeyCode::KEY_PAGEDOWN => Some(0x4E), - KeyCode::KEY_RIGHT => Some(0x4F), - KeyCode::KEY_LEFT => Some(0x50), - KeyCode::KEY_DOWN => Some(0x51), - KeyCode::KEY_UP => Some(0x52), + KeyCode::KEY_SYSRQ => Some(0x46), // Print-Screen + KeyCode::KEY_SCROLLLOCK => Some(0x47), + KeyCode::KEY_PAUSE => Some(0x48), + KeyCode::KEY_INSERT => Some(0x49), + KeyCode::KEY_HOME => Some(0x4A), + KeyCode::KEY_PAGEUP => Some(0x4B), + KeyCode::KEY_DELETE => Some(0x4C), + KeyCode::KEY_END => Some(0x4D), + KeyCode::KEY_PAGEDOWN => Some(0x4E), + KeyCode::KEY_RIGHT => Some(0x4F), + KeyCode::KEY_LEFT => Some(0x50), + KeyCode::KEY_DOWN => Some(0x51), + KeyCode::KEY_UP => Some(0x52), // --- Keypad / Num-lock block -------------------------------------- - KeyCode::KEY_NUMLOCK => Some(0x53), - KeyCode::KEY_KPSLASH => Some(0x54), - KeyCode::KEY_KPASTERISK => Some(0x55), - KeyCode::KEY_KPMINUS => Some(0x56), - KeyCode::KEY_KPPLUS => Some(0x57), - KeyCode::KEY_KPENTER => Some(0x58), - KeyCode::KEY_KP1 => Some(0x59), - KeyCode::KEY_KP2 => Some(0x5A), - KeyCode::KEY_KP3 => Some(0x5B), - KeyCode::KEY_KP4 => Some(0x5C), - KeyCode::KEY_KP5 => Some(0x5D), - KeyCode::KEY_KP6 => Some(0x5E), - KeyCode::KEY_KP7 => Some(0x5F), - KeyCode::KEY_KP8 => Some(0x60), - KeyCode::KEY_KP9 => Some(0x61), - KeyCode::KEY_KP0 => Some(0x62), - KeyCode::KEY_KPDOT => Some(0x63), - KeyCode::KEY_KPEQUAL => Some(0x67), + KeyCode::KEY_NUMLOCK => Some(0x53), + KeyCode::KEY_KPSLASH => Some(0x54), + KeyCode::KEY_KPASTERISK => Some(0x55), + KeyCode::KEY_KPMINUS => Some(0x56), + KeyCode::KEY_KPPLUS => Some(0x57), + KeyCode::KEY_KPENTER => Some(0x58), + KeyCode::KEY_KP1 => Some(0x59), + KeyCode::KEY_KP2 => Some(0x5A), + KeyCode::KEY_KP3 => Some(0x5B), + KeyCode::KEY_KP4 => Some(0x5C), + KeyCode::KEY_KP5 => Some(0x5D), + KeyCode::KEY_KP6 => Some(0x5E), + KeyCode::KEY_KP7 => Some(0x5F), + KeyCode::KEY_KP8 => Some(0x60), + KeyCode::KEY_KP9 => Some(0x61), + KeyCode::KEY_KP0 => Some(0x62), + KeyCode::KEY_KPDOT => Some(0x63), + KeyCode::KEY_KPEQUAL => Some(0x67), // --- Misc --------------------------------------------------------- - KeyCode::KEY_102ND => Some(0x64), // “<>” on ISO boards - KeyCode::KEY_MENU => Some(0x65), // Application / Compose - + KeyCode::KEY_102ND => Some(0x64), // “<>” on ISO boards + KeyCode::KEY_MENU => Some(0x65), // Application / Compose + // We'll handle modifiers (ctrl, shift, alt, meta) in `is_modifier()` _ => None, } @@ -125,13 +125,13 @@ pub fn keycode_to_usage(key: KeyCode) -> Option { /// If a key is a modifier, return the bit(s) to set in HID byte[0]. pub fn is_modifier(key: KeyCode) -> Option { match key { - KeyCode::KEY_LEFTCTRL => Some(0x01), + KeyCode::KEY_LEFTCTRL => Some(0x01), KeyCode::KEY_LEFTSHIFT => Some(0x02), - KeyCode::KEY_LEFTALT => Some(0x04), - KeyCode::KEY_LEFTMETA => Some(0x08), + KeyCode::KEY_LEFTALT => Some(0x04), + KeyCode::KEY_LEFTMETA => Some(0x08), KeyCode::KEY_RIGHTCTRL => Some(0x10), KeyCode::KEY_RIGHTSHIFT => Some(0x20), - KeyCode::KEY_RIGHTALT => Some(0x40), + KeyCode::KEY_RIGHTALT => Some(0x40), KeyCode::KEY_RIGHTMETA => Some(0x80), _ => None, } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 7e9f733..c692fe1 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -3,32 +3,36 @@ #![forbid(unsafe_code)] use anyhow::{Context, Result}; +use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; -use gst::prelude::*; use lesavka_common::lesavka::AudioPacket; -use tracing::{debug, error, info, warn, trace}; use shell_escape::unix::escape; use std::sync::atomic::{AtomicU64, Ordering}; +use tracing::{debug, error, info, trace, warn}; pub struct MicrophoneCapture { - #[allow(dead_code)] // kept alive to hold PLAYING state + #[allow(dead_code)] // kept alive to hold PLAYING state pipeline: gst::Pipeline, - sink: gst_app::AppSink, + sink: gst_app::AppSink, } impl MicrophoneCapture { pub fn new() -> Result { - gst::init().ok(); // idempotent + gst::init().ok(); // idempotent /* pulsesrc (default mic) → AAC/ADTS → appsink -------------------*/ // Optional override: LESAVKA_MIC_SOURCE= + // If not provided or not found, fall back to first non-monitor source. let device_arg = match std::env::var("LESAVKA_MIC_SOURCE") { - Ok(s) if !s.is_empty() => { - let full = Self::pulse_source_by_substr(&s).unwrap_or(s); - format!("device={}", escape(full.into())) - } - _ => String::new(), + Ok(s) if !s.is_empty() => match Self::pulse_source_by_substr(&s) { + Some(full) => format!("device={}", escape(full.into())), + None => { + warn!("🎤 requested mic '{s}' not found; using default"); + Self::default_source_arg() + } + }, + _ => Self::default_source_arg(), }; debug!("🎤 device: {device_arg}"); let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"] @@ -51,14 +55,8 @@ impl MicrophoneCapture { appsink name=asink emit-signals=true max-buffers=50 drop=true" ); - let pipeline: gst::Pipeline = gst::parse::launch(&desc)? - .downcast() - .expect("pipeline"); - let sink: gst_app::AppSink = pipeline - .by_name("asink") - .unwrap() - .downcast() - .unwrap(); + let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline"); + let sink: gst_app::AppSink = pipeline.by_name("asink").unwrap().downcast().unwrap(); /* ─── bus for diagnostics ───────────────────────────────────────*/ { @@ -67,20 +65,30 @@ impl MicrophoneCapture { use gst::MessageView::*; for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { - StateChanged(s) if s.current() == gst::State::Playing - && msg.src().map(|s| s.is::()).unwrap_or(false) => - info!("🎤 mic pipeline ▶️ (source=pulsesrc)"), - Error(e) => - error!("🎤💥 mic: {} ({})", e.error(), e.debug().unwrap_or_default()), - Warning(w) => - warn!("🎤⚠️ mic: {} ({})", w.error(), w.debug().unwrap_or_default()), + StateChanged(s) + if s.current() == gst::State::Playing + && msg.src().map(|s| s.is::()).unwrap_or(false) => + { + info!("🎤 mic pipeline ▶️ (source=pulsesrc)") + } + Error(e) => error!( + "🎤💥 mic: {} ({})", + e.error(), + e.debug().unwrap_or_default() + ), + Warning(w) => warn!( + "🎤⚠️ mic: {} ({})", + w.error(), + w.debug().unwrap_or_default() + ), _ => {} } } }); } - pipeline.set_state(gst::State::Playing) + pipeline + .set_state(gst::State::Playing) .context("start mic pipeline")?; Ok(Self { pipeline, sink }) @@ -98,8 +106,11 @@ impl MicrophoneCapture { if n < 10 || n % 300 == 0 { trace!("🎤⇧ cli pkt#{n} {} bytes", map.len()); } - Some(AudioPacket { id: 0, pts, data: map.as_slice().to_vec() }) - + Some(AudioPacket { + id: 0, + pts, + data: map.as_slice().to_vec(), + }) } Err(_) => None, } @@ -107,15 +118,39 @@ impl MicrophoneCapture { fn pulse_source_by_substr(fragment: &str) -> Option { use std::process::Command; - let out = Command::new("pactl").args(["list", "short", "sources"]) - .output().ok()?; + let out = Command::new("pactl") + .args(["list", "short", "sources"]) + .output() + .ok()?; let list = String::from_utf8_lossy(&out.stdout); - list.lines() - .find_map(|ln| { - let mut cols = ln.split_whitespace(); - let _id = cols.next()?; - let name = cols.next()?; // column #1 - if name.contains(fragment) { Some(name.to_owned()) } else { None } - }) + list.lines().find_map(|ln| { + let mut cols = ln.split_whitespace(); + let _id = cols.next()?; + let name = cols.next()?; // column #1 + if name.contains(fragment) { + Some(name.to_owned()) + } else { + None + } + }) + } + + /// Pick the first non-monitor Pulse source if available; otherwise empty. + fn default_source_arg() -> String { + use std::process::Command; + let out = Command::new("pactl") + .args(["list", "short", "sources"]) + .output(); + if let Ok(out) = out { + let list = String::from_utf8_lossy(&out.stdout); + if let Some(name) = list + .lines() + .filter_map(|ln| ln.split_whitespace().nth(1)) + .find(|name| !name.ends_with(".monitor")) + { + return format!("device={}", escape(name.into())); + } + } + String::new() } } diff --git a/client/src/input/mod.rs b/client/src/input/mod.rs index 1da92ad..cbe2cff 100644 --- a/client/src/input/mod.rs +++ b/client/src/input/mod.rs @@ -1,8 +1,8 @@ // client/src/input/mod.rs -pub mod inputs; // the aggregator that scans /dev/input and spawns sub-aggregators -pub mod keyboard; // existing keyboard aggregator logic (minus scanning) -pub mod mouse; // a stub aggregator for mice -pub mod camera; // stub for camera -pub mod microphone; // stub for mic -pub mod keymap; // keyboard keymap logic +pub mod camera; // stub for camera +pub mod inputs; // the aggregator that scans /dev/input and spawns sub-aggregators +pub mod keyboard; // existing keyboard aggregator logic (minus scanning) +pub mod keymap; +pub mod microphone; // stub for mic +pub mod mouse; // a stub aggregator for mice // keyboard keymap logic diff --git a/client/src/input/mouse.rs b/client/src/input/mouse.rs index 1757dec..8547a5d 100644 --- a/client/src/input/mouse.rs +++ b/client/src/input/mouse.rs @@ -1,9 +1,9 @@ // client/src/input/mouse.rs use evdev::{Device, EventType, InputEvent, KeyCode, RelativeAxisCode}; -use tokio::sync::broadcast::{self, Sender}; use std::time::{Duration, Instant}; -use tracing::{debug, error, warn, trace}; +use tokio::sync::broadcast::{self, Sender}; +use tracing::{debug, error, trace, warn}; use lesavka_common::lesavka::MouseReport; @@ -11,60 +11,91 @@ const SEND_INTERVAL: Duration = Duration::from_millis(1); pub struct MouseAggregator { dev: Device, - tx: Sender, + tx: Sender, dev_mode: bool, sending_disabled: bool, next_send: Instant, buttons: u8, last_buttons: u8, - dx: i8, - dy: i8, + dx: i8, + dy: i8, wheel: i8, } impl MouseAggregator { pub fn new(dev: Device, dev_mode: bool, tx: Sender) -> Self { - Self { dev, tx, dev_mode, sending_disabled: false, next_send: Instant::now(), buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 } + Self { + dev, + tx, + dev_mode, + sending_disabled: false, + next_send: Instant::now(), + buttons: 0, + last_buttons: 0, + dx: 0, + dy: 0, + wheel: 0, + } } #[inline] #[allow(dead_code)] - fn slog(&self, f: impl FnOnce()) { if self.dev_mode { f() } } + fn slog(&self, f: impl FnOnce()) { + if self.dev_mode { + f() + } + } pub fn set_grab(&mut self, grab: bool) { - let _ = if grab { self.dev.grab() } else { self.dev.ungrab() }; + let _ = if grab { + self.dev.grab() + } else { + self.dev.ungrab() + }; } pub fn set_send(&mut self, send: bool) { self.sending_disabled = !send; } - + pub fn process_events(&mut self) { let evts: Vec = match self.dev.fetch_events() { Ok(it) => it.collect(), Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return, - Err(e) => { if self.dev_mode { error!("🖱️❌ mouse read err: {e}"); } return } + Err(e) => { + if self.dev_mode { + error!("🖱️❌ mouse read err: {e}"); + } + return; + } }; if self.dev_mode && !evts.is_empty() { - trace!("🖱️ {} evts from {}", evts.len(), self.dev.name().unwrap_or("?")); + trace!( + "🖱️ {} evts from {}", + evts.len(), + self.dev.name().unwrap_or("?") + ); } for e in evts { match e.event_type() { EventType::KEY => match e.code() { - c if c == KeyCode::BTN_LEFT.0 => self.set_btn(0, e.value()), - c if c == KeyCode::BTN_RIGHT.0 => self.set_btn(1, e.value()), + c if c == KeyCode::BTN_LEFT.0 => self.set_btn(0, e.value()), + c if c == KeyCode::BTN_RIGHT.0 => self.set_btn(1, e.value()), c if c == KeyCode::BTN_MIDDLE.0 => self.set_btn(2, e.value()), _ => {} }, EventType::RELATIVE => match e.code() { - c if c == RelativeAxisCode::REL_X.0 => - self.dx = self.dx.saturating_add(e.value().clamp(-127,127) as i8), - c if c == RelativeAxisCode::REL_Y.0 => - self.dy = self.dy.saturating_add(e.value().clamp(-127,127) as i8), - c if c == RelativeAxisCode::REL_WHEEL.0 => - self.wheel = self.wheel.saturating_add(e.value().clamp(-1,1) as i8), + c if c == RelativeAxisCode::REL_X.0 => { + self.dx = self.dx.saturating_add(e.value().clamp(-127, 127) as i8) + } + c if c == RelativeAxisCode::REL_Y.0 => { + self.dy = self.dy.saturating_add(e.value().clamp(-127, 127) as i8) + } + c if c == RelativeAxisCode::REL_WHEEL.0 => { + self.wheel = self.wheel.saturating_add(e.value().clamp(-1, 1) as i8) + } _ => {} }, EventType::SYNCHRONIZATION => self.flush(), @@ -74,13 +105,15 @@ impl MouseAggregator { } fn flush(&mut self) { - if self.buttons == self.last_buttons && Instant::now() < self.next_send { return; } + if self.buttons == self.last_buttons && Instant::now() < self.next_send { + return; + } self.next_send = Instant::now() + SEND_INTERVAL; let pkt = [ self.buttons, - self.dx.clamp(-127,127) as u8, - self.dy.clamp(-127,127) as u8, + self.dx.clamp(-127, 127) as u8, + self.dy.clamp(-127, 127) as u8, self.wheel as u8, ]; @@ -88,17 +121,27 @@ impl MouseAggregator { if let Err(broadcast::error::SendError(_)) = self.tx.send(MouseReport { data: pkt.to_vec() }) { - if self.dev_mode { warn!("❌🖱️ no HID receiver (mouse)"); } + if self.dev_mode { + warn!("❌🖱️ no HID receiver (mouse)"); + } } else if self.dev_mode { debug!("📤🖱️ mouse {:?}", pkt); } } - self.dx=0; self.dy=0; self.wheel=0; self.last_buttons=self.buttons; + self.dx = 0; + self.dy = 0; + self.wheel = 0; + self.last_buttons = self.buttons; } - #[inline] fn set_btn(&mut self, bit: u8, val: i32) { - if val!=0 { self.buttons |= 1<