client: mic selection defaults improved

This commit is contained in:
Brad Stein 2025-12-01 11:38:51 -03:00
parent c274e8ce18
commit 3458b42a11
17 changed files with 579 additions and 361 deletions

View File

@ -1,30 +1,28 @@
#![forbid(unsafe_code)]
use anyhow::Result;
use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::sync::broadcast;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tonic::{transport::Channel, Request};
use tracing::{error, trace, debug, info, warn};
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
use tonic::{Request, transport::Channel};
use tracing::{debug, error, info, trace, warn};
use winit::{
event::Event,
event_loop::{EventLoopBuilder, ControlFlow},
event_loop::{ControlFlow, EventLoopBuilder},
platform::wayland::EventLoopBuilderExtWayland,
};
use lesavka_common::lesavka::{
relay_client::RelayClient, KeyboardReport,
MonitorRequest, MouseReport, VideoPacket, AudioPacket
AudioPacket, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
relay_client::RelayClient,
};
use crate::{handshake,
input::inputs::InputAggregator,
input::microphone::MicrophoneCapture,
input::camera::CameraCapture,
output::video::MonitorWindow,
output::audio::AudioOut};
use crate::{
handshake, input::camera::CameraCapture, input::inputs::InputAggregator,
input::microphone::MicrophoneCapture, output::audio::AudioOut, output::video::MonitorWindow,
};
pub struct LesavkaClientApp {
aggregator: Option<InputAggregator>,
@ -47,7 +45,13 @@ impl LesavkaClientApp {
let agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx })
Ok(Self {
aggregator: Some(agg),
server_addr,
dev_mode,
kbd_tx,
mou_tx,
})
}
pub async fn run(&mut self) -> Result<()> {
@ -64,8 +68,8 @@ impl LesavkaClientApp {
.connect_lazy();
let vid_ep = Channel::from_shared(self.server_addr.clone())?
.initial_connection_window_size(4<<20)
.initial_stream_window_size(4<<20)
.initial_connection_window_size(4 << 20)
.initial_stream_window_size(4 << 20)
.tcp_nodelay(true)
.connect_lazy();
@ -98,19 +102,27 @@ impl LesavkaClientApp {
std::thread::spawn(move || {
gtk::init().expect("GTK initialisation failed");
let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap();
let el = EventLoopBuilder::<()>::new()
.with_any_thread(true)
.build()
.unwrap();
let win0 = MonitorWindow::new(0).expect("win0");
let win1 = MonitorWindow::new(1).expect("win1");
let _ = el.run(move |_: Event<()>, _elwt| {
_elwt.set_control_flow(ControlFlow::WaitUntil(
std::time::Instant::now() + std::time::Duration::from_millis(16)));
std::time::Instant::now() + std::time::Duration::from_millis(16),
));
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
static DUMP_CNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
static DUMP_CNT: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);
while let Ok(pkt) = video_rx.try_recv() {
CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 {
debug!("🎥 received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed));
debug!(
"🎥 received {} video packets",
CNT.load(std::sync::atomic::Ordering::Relaxed)
);
}
let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 {
@ -139,7 +151,7 @@ impl LesavkaClientApp {
/*────────── camera & mic tasks (gated by caps) ───────────*/
if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() {
let cam = Arc::new(CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref(),
)?);
tokio::spawn(Self::cam_loop(vid_ep.clone(), cam));
}
@ -175,13 +187,15 @@ impl LesavkaClientApp {
info!("⌨️🤙 Keyboard dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.kbd_tx.subscribe())
.filter_map(|r| r.ok());
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok());
match cli.stream_keyboard(Request::new(outbound)).await {
Ok(mut resp) => {
while let Some(msg) = resp.get_mut().message().await.transpose() {
if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; }
if let Err(e) = msg {
warn!("⌨️ server err: {e}");
break;
}
}
}
Err(e) => warn!("❌⌨️ connect failed: {e}"),
@ -196,13 +210,15 @@ impl LesavkaClientApp {
info!("🖱️🤙 Mouse dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.mou_tx.subscribe())
.filter_map(|r| r.ok());
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok());
match cli.stream_mouse(Request::new(outbound)).await {
Ok(mut resp) => {
while let Some(msg) = resp.get_mut().message().await.transpose() {
if let Err(e) = msg { warn!("🖱️ server err: {e}"); break; }
if let Err(e) = msg {
warn!("🖱️ server err: {e}");
break;
}
}
}
Err(e) => warn!("❌🖱️ connect failed: {e}"),
@ -212,24 +228,27 @@ impl LesavkaClientApp {
}
/*──────────────── monitor stream ────────────────*/
async fn video_loop(
ep: Channel,
tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>,
) {
async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>) {
for monitor_id in 0..=1 {
let ep = ep.clone();
let tx = tx.clone();
tokio::spawn(async move {
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
let req = MonitorRequest {
id: monitor_id,
max_bitrate: 6_000,
};
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
debug!("🎥🏁 cli video{monitor_id}: stream opened");
while let Some(res) = stream.get_mut().message().await.transpose() {
match res {
Ok(pkt) => {
trace!("🎥📥 cli video{monitor_id}: got {}bytes", pkt.data.len());
trace!(
"🎥📥 cli video{monitor_id}: got {}bytes",
pkt.data.len()
);
if tx.send(pkt).is_err() {
warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone");
break;
@ -255,11 +274,16 @@ impl LesavkaClientApp {
async fn audio_loop(ep: Channel, out: AudioOut) {
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest { id: 0, max_bitrate: 0 };
let req = MonitorRequest {
id: 0,
max_bitrate: 0,
};
match cli.capture_audio(Request::new(req)).await {
Ok(mut stream) => {
while let Some(res) = stream.get_mut().message().await.transpose() {
if let Ok(pkt) = res { out.push(pkt); }
if let Ok(pkt) = res {
out.push(pkt);
}
}
}
Err(e) => tracing::warn!("❌🔊 audio stream err: {e}"),
@ -293,9 +317,7 @@ impl LesavkaClientApp {
// 3. turn `rx` into an async stream for gRPC
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => {
while resp.get_mut().message().await.transpose().is_some() {}
}
Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {},
Err(e) => {
// first failure → warn, subsequent ones → debug
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
@ -330,8 +352,7 @@ impl LesavkaClientApp {
if n < 10 || n % 120 == 0 {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B",
pkt.pts, pkt.data.len());
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
let _ = tx.try_send(pkt);
}
}

View File

@ -1,10 +1,10 @@
// client/src/handshake.rs
#![forbid(unsafe_code)]
use std::time::Duration;
use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient};
use tonic::{Code, transport::Endpoint};
use std::time::Duration;
use tokio::time::timeout;
use tonic::{Code, transport::Endpoint};
use tracing::{info, warn};
#[derive(Default, Clone, Copy, Debug)]

View File

@ -2,9 +2,9 @@
#![forbid(unsafe_code)]
use anyhow::Context;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use lesavka_common::lesavka::VideoPacket;
pub struct CameraCapture {
@ -20,8 +20,7 @@ impl CameraCapture {
// Pick device (prefers V4L2 nodes with capture capability)
let dev = match device_fragment {
Some(path) if path.starts_with("/dev/") => path.to_string(),
Some(fragment) => Self::find_device(fragment)
.unwrap_or_else(|| "/dev/video0".into()),
Some(fragment) => Self::find_device(fragment).unwrap_or_else(|| "/dev/video0".into()),
None => "/dev/video0".into(),
};
@ -91,7 +90,11 @@ impl CameraCapture {
let buf = sample.buffer()?;
let map = buf.map_readable().ok()?;
let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
Some(VideoPacket { id: 2, pts, data: map.as_slice().to_vec() })
Some(VideoPacket {
id: 2,
pts,
data: map.as_slice().to_vec(),
})
}
/// Fuzzymatch devices under `/dev/v4l/by-id`, preferring capture nodes
@ -134,8 +137,7 @@ impl CameraCapture {
.and_then(|d| d.query_caps().ok())
.map(|caps| {
let bits = caps.capabilities.bits();
(bits & V4L2_CAP_VIDEO_CAPTURE != 0)
|| (bits & V4L2_CAP_VIDEO_CAPTURE_MPLANE != 0)
(bits & V4L2_CAP_VIDEO_CAPTURE != 0) || (bits & V4L2_CAP_VIDEO_CAPTURE_MPLANE != 0)
})
.unwrap_or(false)
}
@ -155,7 +157,7 @@ impl CameraCapture {
fn pick_encoder() -> (&'static str, &'static str) {
let encoders = &[
("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"),
("vaapih264enc","video/x-raw,format=NV12"),
("vaapih264enc", "video/x-raw,format=NV12"),
("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc.
("x264enc", "video/x-raw"), // software
];
@ -170,12 +172,15 @@ impl CameraCapture {
fn choose_encoder() -> (&'static str, &'static str, &'static str) {
match () {
_ if gst::ElementFactory::find("nvh264enc").is_some() =>
("nvh264enc", "gop-size", "30"),
_ if gst::ElementFactory::find("vaapih264enc").is_some() =>
("vaapih264enc","keyframe-period","30"),
_ if gst::ElementFactory::find("v4l2h264enc").is_some() =>
("v4l2h264enc","idrcount", "30"),
_ if gst::ElementFactory::find("nvh264enc").is_some() => {
("nvh264enc", "gop-size", "30")
}
_ if gst::ElementFactory::find("vaapih264enc").is_some() => {
("vaapih264enc", "keyframe-period", "30")
}
_ if gst::ElementFactory::find("v4l2h264enc").is_some() => {
("v4l2h264enc", "idrcount", "30")
}
_ => ("x264enc", "key-int-max", "30"),
}
}

View File

@ -1,8 +1,11 @@
// client/src/input/inputs.rs
use anyhow::{bail, Context, Result};
use anyhow::{Context, Result, bail};
use evdev::{Device, EventType, KeyCode, RelativeAxisCode};
use tokio::{sync::broadcast::Sender, time::{interval, Duration}};
use tokio::{
sync::broadcast::Sender,
time::{Duration, interval},
};
use tracing::{debug, info, warn};
use lesavka_common::lesavka::{KeyboardReport, MouseReport};
@ -21,19 +24,26 @@ pub struct InputAggregator {
}
impl InputAggregator {
pub fn new(dev_mode: bool,
pub fn new(
dev_mode: bool,
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>) -> Self {
Self { kbd_tx, mou_tx, dev_mode, released: false, magic_active: false,
keyboards: Vec::new(), mice: Vec::new()
mou_tx: Sender<MouseReport>,
) -> Self {
Self {
kbd_tx,
mou_tx,
dev_mode,
released: false,
magic_active: false,
keyboards: Vec::new(),
mice: Vec::new(),
}
}
/// Called once at startup: enumerates input devices,
/// classifies them, and constructs a aggregator struct per type.
pub fn init(&mut self) -> Result<()> {
let paths = std::fs::read_dir("/dev/input")
.context("Failed to read /dev/input")?;
let paths = std::fs::read_dir("/dev/input").context("Failed to read /dev/input")?;
let mut found_any = false;
@ -42,7 +52,10 @@ impl InputAggregator {
let path = entry.path();
// skip anything that isn't "event*"
if !path.file_name().map_or(false, |f| f.to_string_lossy().starts_with("event")) {
if !path
.file_name()
.map_or(false, |f| f.to_string_lossy().starts_with("event"))
{
continue;
}
@ -56,12 +69,17 @@ impl InputAggregator {
};
// non-blocking so fetch_events never stalls the whole loop
dev.set_nonblocking(true).with_context(|| format!("set_non_blocking {:?}", path))?;
dev.set_nonblocking(true)
.with_context(|| format!("set_non_blocking {:?}", path))?;
match classify_device(&dev) {
DeviceKind::Keyboard => {
dev.grab().with_context(|| format!("grabbing keyboard {path:?}"))?;
info!("🤏🖱️ Grabbed keyboard {:?}", dev.name().unwrap_or("UNKNOWN"));
dev.grab()
.with_context(|| format!("grabbing keyboard {path:?}"))?;
info!(
"🤏🖱️ Grabbed keyboard {:?}",
dev.name().unwrap_or("UNKNOWN")
);
// pass dev_mode to aggregator
// let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode);
@ -71,7 +89,8 @@ impl InputAggregator {
continue;
}
DeviceKind::Mouse => {
dev.grab().with_context(|| format!("grabbing mouse {path:?}"))?;
dev.grab()
.with_context(|| format!("grabbing mouse {path:?}"))?;
info!("🤏⌨️ Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN"));
// let mouse_agg = MouseAggregator::new(dev);
@ -81,7 +100,10 @@ impl InputAggregator {
continue;
}
DeviceKind::Other => {
debug!("Skipping non-kbd/mouse device: {:?}", dev.name().unwrap_or("UNKNOWN"));
debug!(
"Skipping non-kbd/mouse device: {:?}",
dev.name().unwrap_or("UNKNOWN")
);
continue;
}
}
@ -110,7 +132,9 @@ impl InputAggregator {
want_kill |= kbd.magic_kill();
}
if magic_now && !self.magic_active { self.toggle_grab(); }
if magic_now && !self.magic_active {
self.toggle_grab();
}
if (magic_left || magic_right) && self.magic_active {
current = match current {
Layout::SideBySide => Layout::FullLeft,
@ -139,8 +163,14 @@ impl InputAggregator {
} else {
tracing::info!("🧙 magic chord - freeing devices 🪄 EXPELLIARMUS!!! 🔓🕊️");
}
for k in &mut self.keyboards { k.set_grab(self.released); k.set_send(self.released); }
for m in &mut self.mice { m.set_grab(self.released); m.set_send(self.released); }
for k in &mut self.keyboards {
k.set_grab(self.released);
k.set_send(self.released);
}
for m in &mut self.mice {
m.set_grab(self.released);
m.set_send(self.released);
}
self.released = !self.released;
}
}
@ -160,13 +190,10 @@ fn classify_device(dev: &Device) -> DeviceKind {
// Mouse logic
if evbits.contains(EventType::RELATIVE) {
if let (Some(rel), Some(keys)) =
(dev.supported_relative_axes(), dev.supported_keys())
{
let has_xy = rel.contains(RelativeAxisCode::REL_X)
&& rel.contains(RelativeAxisCode::REL_Y);
let has_btn = keys.contains(KeyCode::BTN_LEFT)
|| keys.contains(KeyCode::BTN_RIGHT);
if let (Some(rel), Some(keys)) = (dev.supported_relative_axes(), dev.supported_keys()) {
let has_xy =
rel.contains(RelativeAxisCode::REL_X) && rel.contains(RelativeAxisCode::REL_Y);
let has_btn = keys.contains(KeyCode::BTN_LEFT) || keys.contains(KeyCode::BTN_RIGHT);
if has_xy && has_btn {
return DeviceKind::Mouse;
}

View File

@ -1,7 +1,10 @@
// client/src/input/keyboard.rs
use std::{collections::HashSet, sync::atomic::{AtomicU32, Ordering}};
use evdev::{Device, EventType, InputEvent, KeyCode};
use std::{
collections::HashSet,
sync::atomic::{AtomicU32, Ordering},
};
use tokio::sync::broadcast::Sender;
use tracing::{debug, error, trace};
@ -24,11 +27,21 @@ static SEQ: AtomicU32 = AtomicU32::new(0);
impl KeyboardAggregator {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<KeyboardReport>) -> Self {
let _ = dev.set_nonblocking(true);
Self { dev, tx, dev_mode, sending_disabled: false, pressed_keys: HashSet::new()}
Self {
dev,
tx,
dev_mode,
sending_disabled: false,
pressed_keys: HashSet::new(),
}
}
pub fn set_grab(&mut self, grab: bool) {
let _ = if grab { self.dev.grab() } else { self.dev.ungrab() };
let _ = if grab {
self.dev.grab()
} else {
self.dev.ungrab()
};
}
pub fn set_send(&mut self, send: bool) {
@ -40,28 +53,47 @@ impl KeyboardAggregator {
let events: Vec<InputEvent> = match self.dev.fetch_events() {
Ok(it) => it.collect(),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => { if self.dev_mode { error!("⌨️❌ read error: {e}"); } return }
Err(e) => {
if self.dev_mode {
error!("⌨️❌ read error: {e}");
}
return;
}
};
if self.dev_mode && !events.is_empty() {
trace!("⌨️ {} kbd evts from {}", events.len(), self.dev.name().unwrap_or("?"));
trace!(
"⌨️ {} kbd evts from {}",
events.len(),
self.dev.name().unwrap_or("?")
);
}
for ev in events {
if ev.event_type() != EventType::KEY { continue }
if ev.event_type() != EventType::KEY {
continue;
}
let code = KeyCode::new(ev.code());
match ev.value() {
1 => { self.pressed_keys.insert(code); } // press
0 => { self.pressed_keys.remove(&code); } // release
1 => {
self.pressed_keys.insert(code);
} // press
0 => {
self.pressed_keys.remove(&code);
} // release
_ => {}
}
let report = self.build_report();
// Generate a local sequence number for debugging/log-merge only.
let id = SEQ.fetch_add(1, Ordering::Relaxed);
if self.dev_mode { debug!(seq = id, ?report, "kbd"); }
if self.dev_mode {
debug!(seq = id, ?report, "kbd");
}
if !self.sending_disabled {
let _ = self.tx.send(KeyboardReport { data: report.to_vec() });
let _ = self.tx.send(KeyboardReport {
data: report.to_vec(),
});
}
}
}
@ -72,16 +104,23 @@ impl KeyboardAggregator {
let mut keys = Vec::new();
for &kc in &self.pressed_keys {
if let Some(m) = is_modifier(kc) { mods |= m }
else if let Some(u) = keycode_to_usage(kc) { keys.push(u) }
if let Some(m) = is_modifier(kc) {
mods |= m
} else if let Some(u) = keycode_to_usage(kc) {
keys.push(u)
}
}
out[0] = mods;
for (i, k) in keys.into_iter().take(6).enumerate() { out[2+i] = k }
for (i, k) in keys.into_iter().take(6).enumerate() {
out[2 + i] = k
}
out
}
pub fn has_key(&self, kc: KeyCode) -> bool { self.pressed_keys.contains(&kc) }
pub fn has_key(&self, kc: KeyCode) -> bool {
self.pressed_keys.contains(&kc)
}
pub fn magic_grab(&self) -> bool {
self.has_key(KeyCode::KEY_LEFTCTRL)
@ -102,8 +141,7 @@ impl KeyboardAggregator {
}
pub fn magic_kill(&self) -> bool {
self.has_key(KeyCode::KEY_LEFTCTRL)
&& self.has_key(KeyCode::KEY_ESC)
self.has_key(KeyCode::KEY_LEFTCTRL) && self.has_key(KeyCode::KEY_ESC)
}
}

View File

@ -3,13 +3,13 @@
#![forbid(unsafe_code)]
use anyhow::{Context, Result};
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use lesavka_common::lesavka::AudioPacket;
use tracing::{debug, error, info, warn, trace};
use shell_escape::unix::escape;
use std::sync::atomic::{AtomicU64, Ordering};
use tracing::{debug, error, info, trace, warn};
pub struct MicrophoneCapture {
#[allow(dead_code)] // kept alive to hold PLAYING state
@ -23,12 +23,16 @@ impl MicrophoneCapture {
/* pulsesrc (default mic) → AAC/ADTS → appsink -------------------*/
// Optional override: LESAVKA_MIC_SOURCE=<pulsedevicename>
// If not provided or not found, fall back to first non-monitor source.
let device_arg = match std::env::var("LESAVKA_MIC_SOURCE") {
Ok(s) if !s.is_empty() => {
let full = Self::pulse_source_by_substr(&s).unwrap_or(s);
format!("device={}", escape(full.into()))
Ok(s) if !s.is_empty() => match Self::pulse_source_by_substr(&s) {
Some(full) => format!("device={}", escape(full.into())),
None => {
warn!("🎤 requested mic '{s}' not found; using default");
Self::default_source_arg()
}
_ => String::new(),
},
_ => Self::default_source_arg(),
};
debug!("🎤 device: {device_arg}");
let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"]
@ -51,14 +55,8 @@ impl MicrophoneCapture {
appsink name=asink emit-signals=true max-buffers=50 drop=true"
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast()
.expect("pipeline");
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.unwrap()
.downcast()
.unwrap();
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline");
let sink: gst_app::AppSink = pipeline.by_name("asink").unwrap().downcast().unwrap();
/* ─── bus for diagnostics ───────────────────────────────────────*/
{
@ -67,20 +65,30 @@ impl MicrophoneCapture {
use gst::MessageView::*;
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
StateChanged(s) if s.current() == gst::State::Playing
StateChanged(s)
if s.current() == gst::State::Playing
&& msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) =>
info!("🎤 mic pipeline ▶️ (source=pulsesrc)"),
Error(e) =>
error!("🎤💥 mic: {} ({})", e.error(), e.debug().unwrap_or_default()),
Warning(w) =>
warn!("🎤⚠️ mic: {} ({})", w.error(), w.debug().unwrap_or_default()),
{
info!("🎤 mic pipeline ▶️ (source=pulsesrc)")
}
Error(e) => error!(
"🎤💥 mic: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"🎤⚠️ mic: {} ({})",
w.error(),
w.debug().unwrap_or_default()
),
_ => {}
}
}
});
}
pipeline.set_state(gst::State::Playing)
pipeline
.set_state(gst::State::Playing)
.context("start mic pipeline")?;
Ok(Self { pipeline, sink })
@ -98,8 +106,11 @@ impl MicrophoneCapture {
if n < 10 || n % 300 == 0 {
trace!("🎤⇧ cli pkt#{n} {} bytes", map.len());
}
Some(AudioPacket { id: 0, pts, data: map.as_slice().to_vec() })
Some(AudioPacket {
id: 0,
pts,
data: map.as_slice().to_vec(),
})
}
Err(_) => None,
}
@ -107,15 +118,39 @@ impl MicrophoneCapture {
fn pulse_source_by_substr(fragment: &str) -> Option<String> {
use std::process::Command;
let out = Command::new("pactl").args(["list", "short", "sources"])
.output().ok()?;
let out = Command::new("pactl")
.args(["list", "short", "sources"])
.output()
.ok()?;
let list = String::from_utf8_lossy(&out.stdout);
list.lines()
.find_map(|ln| {
list.lines().find_map(|ln| {
let mut cols = ln.split_whitespace();
let _id = cols.next()?;
let name = cols.next()?; // column #1
if name.contains(fragment) { Some(name.to_owned()) } else { None }
if name.contains(fragment) {
Some(name.to_owned())
} else {
None
}
})
}
/// Pick the first non-monitor Pulse source if available; otherwise empty.
fn default_source_arg() -> String {
use std::process::Command;
let out = Command::new("pactl")
.args(["list", "short", "sources"])
.output();
if let Ok(out) = out {
let list = String::from_utf8_lossy(&out.stdout);
if let Some(name) = list
.lines()
.filter_map(|ln| ln.split_whitespace().nth(1))
.find(|name| !name.ends_with(".monitor"))
{
return format!("device={}", escape(name.into()));
}
}
String::new()
}
}

View File

@ -1,8 +1,8 @@
// client/src/input/mod.rs
pub mod camera; // stub for camera
pub mod inputs; // the aggregator that scans /dev/input and spawns sub-aggregators
pub mod keyboard; // existing keyboard aggregator logic (minus scanning)
pub mod mouse; // a stub aggregator for mice
pub mod camera; // stub for camera
pub mod keymap;
pub mod microphone; // stub for mic
pub mod keymap; // keyboard keymap logic
pub mod mouse; // a stub aggregator for mice // keyboard keymap logic

View File

@ -1,9 +1,9 @@
// client/src/input/mouse.rs
use evdev::{Device, EventType, InputEvent, KeyCode, RelativeAxisCode};
use tokio::sync::broadcast::{self, Sender};
use std::time::{Duration, Instant};
use tracing::{debug, error, warn, trace};
use tokio::sync::broadcast::{self, Sender};
use tracing::{debug, error, trace, warn};
use lesavka_common::lesavka::MouseReport;
@ -25,14 +25,33 @@ pub struct MouseAggregator {
impl MouseAggregator {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<MouseReport>) -> Self {
Self { dev, tx, dev_mode, sending_disabled: false, next_send: Instant::now(), buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 }
Self {
dev,
tx,
dev_mode,
sending_disabled: false,
next_send: Instant::now(),
buttons: 0,
last_buttons: 0,
dx: 0,
dy: 0,
wheel: 0,
}
}
#[inline]
#[allow(dead_code)]
fn slog(&self, f: impl FnOnce()) { if self.dev_mode { f() } }
fn slog(&self, f: impl FnOnce()) {
if self.dev_mode {
f()
}
}
pub fn set_grab(&mut self, grab: bool) {
let _ = if grab { self.dev.grab() } else { self.dev.ungrab() };
let _ = if grab {
self.dev.grab()
} else {
self.dev.ungrab()
};
}
pub fn set_send(&mut self, send: bool) {
@ -43,11 +62,20 @@ impl MouseAggregator {
let evts: Vec<InputEvent> = match self.dev.fetch_events() {
Ok(it) => it.collect(),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => { if self.dev_mode { error!("🖱️❌ mouse read err: {e}"); } return }
Err(e) => {
if self.dev_mode {
error!("🖱️❌ mouse read err: {e}");
}
return;
}
};
if self.dev_mode && !evts.is_empty() {
trace!("🖱️ {} evts from {}", evts.len(), self.dev.name().unwrap_or("?"));
trace!(
"🖱️ {} evts from {}",
evts.len(),
self.dev.name().unwrap_or("?")
);
}
for e in evts {
@ -59,12 +87,15 @@ impl MouseAggregator {
_ => {}
},
EventType::RELATIVE => match e.code() {
c if c == RelativeAxisCode::REL_X.0 =>
self.dx = self.dx.saturating_add(e.value().clamp(-127,127) as i8),
c if c == RelativeAxisCode::REL_Y.0 =>
self.dy = self.dy.saturating_add(e.value().clamp(-127,127) as i8),
c if c == RelativeAxisCode::REL_WHEEL.0 =>
self.wheel = self.wheel.saturating_add(e.value().clamp(-1,1) as i8),
c if c == RelativeAxisCode::REL_X.0 => {
self.dx = self.dx.saturating_add(e.value().clamp(-127, 127) as i8)
}
c if c == RelativeAxisCode::REL_Y.0 => {
self.dy = self.dy.saturating_add(e.value().clamp(-127, 127) as i8)
}
c if c == RelativeAxisCode::REL_WHEEL.0 => {
self.wheel = self.wheel.saturating_add(e.value().clamp(-1, 1) as i8)
}
_ => {}
},
EventType::SYNCHRONIZATION => self.flush(),
@ -74,13 +105,15 @@ impl MouseAggregator {
}
fn flush(&mut self) {
if self.buttons == self.last_buttons && Instant::now() < self.next_send { return; }
if self.buttons == self.last_buttons && Instant::now() < self.next_send {
return;
}
self.next_send = Instant::now() + SEND_INTERVAL;
let pkt = [
self.buttons,
self.dx.clamp(-127,127) as u8,
self.dy.clamp(-127,127) as u8,
self.dx.clamp(-127, 127) as u8,
self.dy.clamp(-127, 127) as u8,
self.wheel as u8,
];
@ -88,17 +121,27 @@ impl MouseAggregator {
if let Err(broadcast::error::SendError(_)) =
self.tx.send(MouseReport { data: pkt.to_vec() })
{
if self.dev_mode { warn!("❌🖱️ no HID receiver (mouse)"); }
if self.dev_mode {
warn!("❌🖱️ no HID receiver (mouse)");
}
} else if self.dev_mode {
debug!("📤🖱️ mouse {:?}", pkt);
}
}
self.dx=0; self.dy=0; self.wheel=0; self.last_buttons=self.buttons;
self.dx = 0;
self.dy = 0;
self.wheel = 0;
self.last_buttons = self.buttons;
}
#[inline] fn set_btn(&mut self, bit: u8, val: i32) {
if val!=0 { self.buttons |= 1<<bit } else { self.buttons &= !(1<<bit) }
#[inline]
fn set_btn(&mut self, bit: u8, val: i32) {
if val != 0 {
self.buttons |= 1 << bit
} else {
self.buttons &= !(1 << bit)
}
}
}

View File

@ -1,9 +1,9 @@
// client/src/layout.rs - Wayland-only window placement utilities
#![forbid(unsafe_code)]
use serde_json::Value;
use std::process::Command;
use tracing::{info, warn};
use serde_json::Value;
/// The three layouts we cycle through.
#[derive(Clone, Copy)]
@ -16,9 +16,7 @@ pub enum Layout {
/// Move/resize a window titled “Lesavka-eye-{eye}” using `swaymsg`.
fn place_window(eye: u32, x: i32, y: i32, w: i32, h: i32) {
let title = format!("Lesavka-eye-{eye}");
let cmd = format!(
r#"[title="^{title}$"] resize set {w} {h}; move position {x} {y}"#
);
let cmd = format!(r#"[title="^{title}$"] resize set {w} {h}; move position {x} {y}"#);
match Command::new("swaymsg").arg(cmd).status() {
Ok(st) if st.success() => info!("✅ placed eye{eye} {w}×{h}@{x},{y}"),
@ -35,15 +33,23 @@ pub fn apply(layout: Layout) {
.output()
{
Ok(o) => o.stdout,
Err(e) => { warn!("get_outputs failed: {e}"); return; }
Err(e) => {
warn!("get_outputs failed: {e}");
return;
}
};
let Ok(Value::Array(outputs)) = serde_json::from_slice::<Value>(&out) else {
warn!("unexpected JSON from swaymsg"); return;
warn!("unexpected JSON from swaymsg");
return;
};
let Some(rect) = outputs.iter()
let Some(rect) = outputs
.iter()
.find(|o| o.get("focused").and_then(Value::as_bool) == Some(true))
.and_then(|o| o.get("rect")) else { return; };
.and_then(|o| o.get("rect"))
else {
return;
};
// helper to read an i64 → i32 with defaults
let g = |k: &str| rect.get(k).and_then(Value::as_i64).unwrap_or(0) as i32;

View File

@ -3,9 +3,9 @@
#![forbid(unsafe_code)]
pub mod app;
pub mod input;
pub mod output;
pub mod layout;
pub mod handshake;
pub mod input;
pub mod layout;
pub mod output;
pub use app::LesavkaClientApp;

View File

@ -72,7 +72,10 @@ async fn main() -> Result<()> {
.with(file_layer)
.init();
tracing::info!("📜 lesavka-client running in DEV mode → {}", log_path.display());
tracing::info!(
"📜 lesavka-client running in DEV mode → {}",
log_path.display()
);
} else {
tracing_subscriber::registry()
.with(env_filter)

View File

@ -1,11 +1,11 @@
// client/src/output/audio.rs
use anyhow::{Context, Result};
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst::MessageView::*;
use tracing::{error, info, warn, debug};
use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::AudioPacket;
@ -58,12 +58,13 @@ impl AudioOut {
.downcast::<gst_app::AppSrc>()
.expect("src not an AppSrc");
src.set_caps(Some(&gst::Caps::builder("audio/mpeg")
src.set_caps(Some(
&gst::Caps::builder("audio/mpeg")
.field("mpegversion", &4i32) // AAC
.field("stream-format", &"adts") // ADTS frames
.field("rate", &48_000i32) // 48kHz
.field("channels", &2i32) // stereo
.build()
.build(),
));
src.set_format(gst::Format::Time);
@ -72,34 +73,40 @@ impl AudioOut {
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!("💥 gst error from {:?}: {} ({})",
Error(e) => error!(
"💥 gst error from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
e.error(), e.debug().unwrap_or_default()),
Warning(w) => warn!("⚠️ gst warning from {:?}: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ gst warning from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
w.error(), w.debug().unwrap_or_default()),
Element(e) => debug!("🔎 gst element message: {}", e
.structure()
.map(|s| s.to_string())
.unwrap_or_default()),
w.error(),
w.debug().unwrap_or_default()
),
Element(e) => debug!(
"🔎 gst element message: {}",
e.structure().map(|s| s.to_string()).unwrap_or_default()
),
StateChanged(s) if s.current() == gst::State::Playing => {
if msg
.src()
.map(|s| s.is::<gst::Pipeline>())
.unwrap_or(false)
{
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🔊 audio pipeline ▶️ (sink='{}')", sink);
} else {
debug!("🔊 element {} now ▶️",
msg.src().map(|s| s.name()).unwrap_or_default());
debug!(
"🔊 element {} now ▶️",
msg.src().map(|s| s.name()).unwrap_or_default()
);
}
}
},
_ => {}
}
}
});
pipeline.set_state(gst::State::Playing).context("starting audio pipeline")?;
pipeline
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
Ok(Self { pipeline, src })
}

View File

@ -1,8 +1,8 @@
// client/src/output/display.rs
use gtk::gdk;
use gtk::prelude::ListModelExt;
use gtk::prelude::*;
use gtk::gdk;
use tracing::debug;
#[derive(Clone, Debug)]
@ -41,9 +41,16 @@ pub fn enumerate_monitors() -> Vec<MonitorInfo> {
debug!(
"🖥️ monitor: {:?}, connector={:?}, geom={:?}, scale={}",
m.model(), connector, geometry, scale_factor
m.model(),
connector,
geometry,
scale_factor
);
MonitorInfo { geometry, scale_factor, is_internal }
MonitorInfo {
geometry,
scale_factor,
is_internal,
}
})
.collect();

View File

@ -4,21 +4,34 @@ use super::display::MonitorInfo;
use tracing::debug;
#[derive(Clone, Copy, Debug)]
pub struct Rect { pub x: i32, pub y: i32, pub w: i32, pub h: i32 }
pub struct Rect {
pub x: i32,
pub y: i32,
pub w: i32,
pub h: i32,
}
/// Compute rectangles for N video streams (all 16:9 here).
pub fn assign_rectangles(
monitors: &[MonitorInfo],
streams: &[(&str, i32, i32)], // (name, w, h)
) -> Vec<Rect> {
let mut rects = vec![Rect { x:0, y:0, w:0, h:0 }; streams.len()];
let mut rects = vec![
Rect {
x: 0,
y: 0,
w: 0,
h: 0
};
streams.len()
];
match monitors.len() {
0 => return rects, // impossible, but keep compiler happy
1 => {
// One monitor: side-by-side layout
let m = &monitors[0].geometry;
let total_native_width: i32 = streams.iter().map(|(_,w,_)| *w).sum();
let total_native_width: i32 = streams.iter().map(|(_, w, _)| *w).sum();
let scale = f64::min(
m.width() as f64 / total_native_width as f64,
m.height() as f64 / streams[0].2 as f64,
@ -29,14 +42,21 @@ pub fn assign_rectangles(
for (idx, &(_, w, h)) in streams.iter().enumerate() {
let ww = (w as f64 * scale).round() as i32;
let hh = (h as f64 * scale).round() as i32;
rects[idx] = Rect { x, y: m.y(), w: ww, h: hh };
rects[idx] = Rect {
x,
y: m.y(),
w: ww,
h: hh,
};
x += ww;
}
}
_ => {
// ≥2 monitors: map 1-to-1 until we run out
for (idx, stream) in streams.iter().enumerate() {
if idx >= monitors.len() { break; }
if idx >= monitors.len() {
break;
}
let m = &monitors[idx];
let geom = m.geometry;
@ -52,7 +72,12 @@ pub fn assign_rectangles(
let hh = (h as f64 * scale).round() as i32;
let xx = geom.x() + (geom.width() - ww) / 2;
let yy = geom.y() + (geom.height() - hh) / 2;
rects[idx] = Rect { x: xx, y: yy, w: ww, h: hh };
rects[idx] = Rect {
x: xx,
y: yy,
w: ww,
h: hh,
};
}
}
}

View File

@ -1,6 +1,6 @@
// client/src/output/mod.rs
pub mod audio;
pub mod video;
pub mod layout;
pub mod display;
pub mod layout;
pub mod video;

View File

@ -1,14 +1,14 @@
// client/src/output/video.rs
use std::process::Command;
use anyhow::Context;
use gstreamer as gst;
use gstreamer::prelude::{Cast, ElementExt, GstBinExt, ObjectExt};
use gstreamer_app as gst_app;
use gstreamer_video::prelude::VideoOverlayExt;
use gstreamer_video::VideoOverlay;
use gstreamer_video::prelude::VideoOverlayExt;
use lesavka_common::lesavka::VideoPacket;
use tracing::{error, info, warn, debug};
use std::process::Command;
use tracing::{debug, error, info, warn};
use crate::output::{display, layout};
@ -54,10 +54,12 @@ impl MonitorWindow {
.downcast::<gst_app::AppSrc>()
.unwrap();
src.set_caps(Some(&gst::Caps::builder("video/x-h264")
src.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build()));
.build(),
));
src.set_format(gst::Format::Time);
/* -------- move/resize overlay ---------------------------------- */
@ -87,8 +89,11 @@ impl MonitorWindow {
run: Arc<dyn Fn(&str) -> std::io::Result<ExitStatus> + Send + Sync>,
}
let placer = if Command::new("swaymsg").arg("-t").arg("get_tree")
.output().is_ok()
let placer = if Command::new("swaymsg")
.arg("-t")
.arg("get_tree")
.output()
.is_ok()
{
Placer {
name: "swaymsg",
@ -162,12 +167,7 @@ impl MonitorWindow {
for attempt in 1..=10 {
std::thread::sleep(std::time::Duration::from_millis(300));
let status = Command::new("wmctrl")
.args([
"-r",
&title,
"-e",
&format!("0,{x},{y},{w},{h}"),
])
.args(["-r", &title, "-e", &format!("0,{x},{y},{w},{h}")])
.status();
match status {
Ok(st) if st.success() => {
@ -195,14 +195,8 @@ impl MonitorWindow {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
StateChanged(s) if s.current() == gst::State::Playing => {
if msg
.src()
.map(|s| s.is::<gst::Pipeline>())
.unwrap_or(false)
{
info!(
"🎞️ video{id} pipeline ▶️ (sink='glimagesink')"
);
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🎞️ video{id} pipeline ▶️ (sink='glimagesink')");
}
}
Error(e) => error!(
@ -223,16 +217,23 @@ impl MonitorWindow {
pipeline.set_state(gst::State::Playing)?;
Ok(Self { _pipeline: pipeline, src })
Ok(Self {
_pipeline: pipeline,
src,
})
}
/// Feed one access-unit to the decoder.
pub fn push_packet(&self, pkt: VideoPacket) {
static CNT : std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 150 == 0 || n < 10 {
debug!(eye = pkt.id, bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received video AU");
debug!(
eye = pkt.id,
bytes = pkt.data.len(),
pts = pkt.pts,
"⬇️ received video AU"
);
}
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()