Compare commits

...

23 Commits

Author SHA1 Message Date
2e5f162d2c install mic deps 2025-12-01 16:21:39 -03:00
103220a05a server: mic dependency updates in install script 2025-12-01 15:54:03 -03:00
3458b42a11 client: mic selection defaults improved 2025-12-01 11:38:51 -03:00
c274e8ce18 client: debugging web cam 2025-12-01 03:34:01 -03:00
5be0837f45 client: debugging lines added for hang 2025-12-01 01:29:41 -03:00
8e4b0eabeb client: debugging lines added for hang 2025-12-01 01:24:12 -03:00
d5435666f4 client: debugging lines added for hang 2025-12-01 01:21:27 -03:00
76770b9c41 client: moved handshake earlier to avoid input traps 2025-12-01 01:13:34 -03:00
35196ee8f3 client: addressed unsafe block 2025-12-01 00:14:47 -03:00
6194848b0e client: addressed warning in build 2025-12-01 00:11:23 -03:00
3aef06001b client: screen relay switched to ximagesink 2025-12-01 00:07:09 -03:00
39e4458967 client: webcam fixes on missing glimagesink property 2025-12-01 00:00:28 -03:00
1ece995ec1 client: switched to VideoOverlayExt 2025-11-30 23:52:07 -03:00
d1e86f36f1 client: switched to VideoOverlayExt 2025-11-30 23:50:09 -03:00
7585dd9430 client: switched to VideoOverlayExt 2025-11-30 23:47:41 -03:00
5d68b87dfb client: forgot some imports 2025-11-30 23:44:26 -03:00
b56789d081 client: webcam fixes on v41 constants 2025-11-30 23:41:29 -03:00
3cdf5c0718 core: permissions change, client: weyland window support 2025-11-30 23:07:31 -03:00
4f69556235 core: ignore UVC header strings errors 2025-11-30 18:28:07 -03:00
29e86791ed core: tolerate uvc control class failures 2025-11-30 18:22:48 -03:00
e5e5cd2630 core: tolerate missing uvc control speeds 2025-11-30 18:09:52 -03:00
e223d90db4 install updates - path errors fixed 2025-11-30 18:01:54 -03:00
d2677afc46 install updates - relay service update 2025-11-30 16:16:03 -03:00
33 changed files with 1398 additions and 684 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ override.toml
**/*~
*.swp
*.swo
*.md

38
AGENTS.md Normal file
View File

@ -0,0 +1,38 @@
# Repository Guidelines
## Project Structure & Module Organization
Rust is split into three crates: `server/` (gRPC relay and device control with async tests in `server/tests`), `client/` (GTK + GStreamer desktop agent; `src/input/` and `src/output/` hold HID and media pipelines), and `common/` (shared protobuf schema in `common/proto/lesavka.proto`, compiled via `build.rs`). `scripts/install/*.sh` provision Arch-based hosts, `scripts/manual/*.sh` run hardware diagnostics, and `scripts/daemon/lesavka-core.sh` handles USB gadget bring-up.
## Build, Test, and Development Commands
- `cargo fmt --all` — run rustfmt across every crate before committing.
- `cargo clippy --workspace --all-targets -D warnings` — enforce lint cleanliness against desktop, server, and shared code.
- `cargo build --workspace --all-targets` — compile every binary/lib in debug mode; prefer `--release` for deployment artifacts referenced by install scripts.
- `cargo test --workspace` — executes unit tests plus async scenarios such as `server/tests/hid.rs`.
- `cargo run -p lesavka_server` / `cargo run -p lesavka_client` — start each side locally; set `LESAVKA_SERVER_ADDR` in the environment when pointing the client at a remote relay.
## Coding Style & Naming Conventions
Follow Rust 2024 idioms: four-space indentation, `snake_case` for modules/functions, and `CamelCase` for types and protobuf messages. Keep gRPC traits in `common` authoritative and prefer `tonic` streams over manual channels. Never hand-edit generated files in `target/` or the `OUT_DIR`; update `.proto` files and rebuild instead. Document hardware constants inline.
## Testing Guidelines
Async tests should use `#[tokio::test(flavor = "multi_thread")]` when they spawn background servers, mirroring `hid_roundtrip`. Give tests descriptive names and keep them near the code they exercise (`client/src/tests/`, `server/tests/`). Cover new RPCs end-to-end by instantiating the tonic server in-process and asserting on gRPC clients.
## Commit & Pull Request Guidelines
Recent history favors terse, imperative summaries (“usb fix”), so keep titles under ~50 characters and describe the subsystem. Commits should stay focused (proto update plus regeneration in one commit, client UI tweaks in another). Pull requests must include context, manual verification steps (`cargo test --workspace`, notable hardware checks), relevant logs or screenshots, and linked issues. Call out script or deployment changes so operators know when to re-run `scripts/install/server.sh` or update systemd units.
## Deployment Notes
For reproducible installs, prefer `scripts/install/server.sh --ref <branch>` to provision capture nodes (udev rules, GStreamer stack) and drop `lesavka-core.service`, while `scripts/install/client.sh` handles desktop prerequisites and systemd activation. When editing these scripts, test on an Arch VM. Keep secret material (VPN credentials, Tailscale auth) out of git and load them via environment variables or systemd drop-ins.
Install scripts must stay idempotent and self-contained: add any new runtime dependencies (e.g., audio/ALSA tools needed for mic bring-up) to the respective install script so a rerun on an arbitrary server/client brings the box to a ready state.
## Current Setup
- Server runs on Raspberry Pi 5 host `titan-jh` (ssh alias configured) and is already provisioned; client setup happens on this machine.
- HDMI capture uses two USB AVerMedia/GC311 devices with power/data split; AC relays on GPIO keep them off unless needed (control scripts already on the Pi).
- USB gadget exposes HID/UAC/UVC; webcam feed is expected from the client into the gadget UVC node (default `LESAVKA_UVC_DEV=/dev/video4`).
- Client ↔ server address: `http://38.28.125.112:50051` (was `64.25.10.31`).
- Webcam uplink is the remaining piece to confirm end-to-end after client install; server-side audio/video capture is in place.
## Session Notes (Dec 1, 2025)
- Server change pending deployment: `server/src/main.rs` now auto-picks UVC sink via `LESAVKA_UVC_DEV` or first `:video_output:` node (titan-jh gadget is `/dev/video0`); errors if none. `stream_microphone` honors `LESAVKA_UAC_DEV`.
- Client change: mic capture falls back to first non-`.monitor` Pulse source if `LESAVKA_MIC_SOURCE` missing/not found.
- Action: rerun `scripts/install/server.sh --ref feature/webcam-caps` on titan-jh (optionally set `LESAVKA_UVC_DEV=/dev/video0`, `LESAVKA_UAC_DEV=plughw:UAC2Gadget,0`), then restart service. Check `/tmp/lesavka-server.log` for “stream_camera using UVC sink”.
- Tethys display: SDDM running but greeter not; capture shows black+cursor. Need greeter/desktop on HDMI head (GC311 input) once display/cable is correct; investigate sddm greeter crash after reboot.

View File

@ -29,6 +29,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
async-stream = "0.3"
shell-escape = "0.1"
v4l = "0.14"
[build-dependencies]
prost-build = "0.13"

View File

@ -1,30 +1,28 @@
#![forbid(unsafe_code)]
use anyhow::Result;
use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tokio::sync::broadcast;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tonic::{transport::Channel, Request};
use tracing::{error, trace, debug, info, warn};
use tokio_stream::{StreamExt, wrappers::BroadcastStream};
use tonic::{Request, transport::Channel};
use tracing::{debug, error, info, trace, warn};
use winit::{
event::Event,
event_loop::{EventLoopBuilder, ControlFlow},
event_loop::{ControlFlow, EventLoopBuilder},
platform::wayland::EventLoopBuilderExtWayland,
};
use lesavka_common::lesavka::{
relay_client::RelayClient, KeyboardReport,
MonitorRequest, MouseReport, VideoPacket, AudioPacket
AudioPacket, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
relay_client::RelayClient,
};
use crate::{handshake,
input::inputs::InputAggregator,
input::microphone::MicrophoneCapture,
input::camera::CameraCapture,
output::video::MonitorWindow,
output::audio::AudioOut};
use crate::{
handshake, input::camera::CameraCapture, input::inputs::InputAggregator,
input::microphone::MicrophoneCapture, output::audio::AudioOut, output::video::MonitorWindow,
};
pub struct LesavkaClientApp {
aggregator: Option<InputAggregator>,
@ -36,7 +34,7 @@ pub struct LesavkaClientApp {
impl LesavkaClientApp {
pub fn new() -> Result<Self> {
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let dev_mode = std::env::var("LESAVKA_DEV_MODE").is_ok();
let server_addr = std::env::args()
.nth(1)
.or_else(|| std::env::var("LESAVKA_SERVER_ADDR").ok())
@ -45,14 +43,20 @@ impl LesavkaClientApp {
let (kbd_tx, _) = broadcast::channel(1024);
let (mou_tx, _) = broadcast::channel(4096);
let mut agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
agg.init()?; // grab devices immediately
let agg = InputAggregator::new(dev_mode, kbd_tx.clone(), mou_tx.clone());
Ok(Self { aggregator: Some(agg), server_addr, dev_mode, kbd_tx, mou_tx })
Ok(Self {
aggregator: Some(agg),
server_addr,
dev_mode,
kbd_tx,
mou_tx,
})
}
pub async fn run(&mut self) -> Result<()> {
/*────────── handshake / feature-negotiation ───────────────*/
info!(server = %self.server_addr, "🚦 starting handshake");
let caps = handshake::negotiate(&self.server_addr).await;
tracing::info!("🤝 server capabilities = {:?}", caps);
@ -64,14 +68,16 @@ impl LesavkaClientApp {
.connect_lazy();
let vid_ep = Channel::from_shared(self.server_addr.clone())?
.initial_connection_window_size(4<<20)
.initial_stream_window_size(4<<20)
.initial_connection_window_size(4 << 20)
.initial_stream_window_size(4 << 20)
.tcp_nodelay(true)
.connect_lazy();
/*────────── input aggregator task ─────────────*/
let aggregator = self.aggregator.take().expect("InputAggregator present");
let agg_task = tokio::spawn(async move {
/*────────── input aggregator task (grab after handshake) ─────────────*/
let mut aggregator = self.aggregator.take().expect("InputAggregator present");
info!("⌛ grabbing input devices…");
aggregator.init()?; // grab devices now that handshake succeeded
let agg_task = tokio::spawn(async move {
let mut a = aggregator;
a.run().await
});
@ -96,19 +102,27 @@ impl LesavkaClientApp {
std::thread::spawn(move || {
gtk::init().expect("GTK initialisation failed");
let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap();
let el = EventLoopBuilder::<()>::new()
.with_any_thread(true)
.build()
.unwrap();
let win0 = MonitorWindow::new(0).expect("win0");
let win1 = MonitorWindow::new(1).expect("win1");
let _ = el.run(move |_: Event<()>, _elwt| {
_elwt.set_control_flow(ControlFlow::WaitUntil(
std::time::Instant::now() + std::time::Duration::from_millis(16)));
std::time::Instant::now() + std::time::Duration::from_millis(16),
));
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
static DUMP_CNT: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
static DUMP_CNT: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(0);
while let Ok(pkt) = video_rx.try_recv() {
CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 {
debug!("🎥 received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed));
debug!(
"🎥 received {} video packets",
CNT.load(std::sync::atomic::Ordering::Relaxed)
);
}
let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 {
@ -131,19 +145,19 @@ impl LesavkaClientApp {
/*────────── audio renderer & puller ───────────*/
let audio_out = AudioOut::new()?;
let ep_audio = vid_ep.clone();
let ep_audio = vid_ep.clone();
tokio::spawn(Self::audio_loop(ep_audio, audio_out));
/*────────── camera & mic tasks (gated by caps) ───────────*/
if caps.camera && std::env::var("LESAVKA_CAM_DISABLE").is_err() {
let cam = Arc::new(CameraCapture::new(
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref()
std::env::var("LESAVKA_CAM_SOURCE").ok().as_deref(),
)?);
tokio::spawn(Self::cam_loop(vid_ep.clone(), cam));
}
if caps.microphone {
let mic = Arc::new(MicrophoneCapture::new()?);
tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed
tokio::spawn(Self::voice_loop(vid_ep.clone(), mic)); // renamed
}
/*────────── central reactor ───────────────────*/
@ -172,19 +186,21 @@ impl LesavkaClientApp {
loop {
info!("⌨️🤙 Keyboard dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.kbd_tx.subscribe())
.filter_map(|r| r.ok());
let outbound = BroadcastStream::new(self.kbd_tx.subscribe()).filter_map(|r| r.ok());
match cli.stream_keyboard(Request::new(outbound)).await {
Ok(mut resp) => {
while let Some(msg) = resp.get_mut().message().await.transpose() {
if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; }
if let Err(e) = msg {
warn!("⌨️ server err: {e}");
break;
}
}
}
Err(e) => warn!("❌⌨️ connect failed: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await; // retry
tokio::time::sleep(Duration::from_secs(1)).await; // retry
}
}
@ -193,14 +209,16 @@ impl LesavkaClientApp {
loop {
info!("🖱️🤙 Mouse dial {}", self.server_addr);
let mut cli = RelayClient::new(ep.clone());
let outbound = BroadcastStream::new(self.mou_tx.subscribe())
.filter_map(|r| r.ok());
let outbound = BroadcastStream::new(self.mou_tx.subscribe()).filter_map(|r| r.ok());
match cli.stream_mouse(Request::new(outbound)).await {
Ok(mut resp) => {
while let Some(msg) = resp.get_mut().message().await.transpose() {
if let Err(e) = msg { warn!("🖱️ server err: {e}"); break; }
if let Err(e) = msg {
warn!("🖱️ server err: {e}");
break;
}
}
}
Err(e) => warn!("❌🖱️ connect failed: {e}"),
@ -210,24 +228,27 @@ impl LesavkaClientApp {
}
/*──────────────── monitor stream ────────────────*/
async fn video_loop(
ep: Channel,
tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>,
) {
async fn video_loop(ep: Channel, tx: tokio::sync::mpsc::UnboundedSender<VideoPacket>) {
for monitor_id in 0..=1 {
let ep = ep.clone();
let tx = tx.clone();
tokio::spawn(async move {
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 };
let req = MonitorRequest {
id: monitor_id,
max_bitrate: 6_000,
};
match cli.capture_video(Request::new(req)).await {
Ok(mut stream) => {
debug!("🎥🏁 cli video{monitor_id}: stream opened");
while let Some(res) = stream.get_mut().message().await.transpose() {
match res {
Ok(pkt) => {
trace!("🎥📥 cli video{monitor_id}: got {}bytes", pkt.data.len());
trace!(
"🎥📥 cli video{monitor_id}: got {}bytes",
pkt.data.len()
);
if tx.send(pkt).is_err() {
warn!("⚠️🎥 cli video{monitor_id}: GUI thread gone");
break;
@ -253,11 +274,16 @@ impl LesavkaClientApp {
async fn audio_loop(ep: Channel, out: AudioOut) {
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest { id: 0, max_bitrate: 0 };
let req = MonitorRequest {
id: 0,
max_bitrate: 0,
};
match cli.capture_audio(Request::new(req)).await {
Ok(mut stream) => {
while let Some(res) = stream.get_mut().message().await.transpose() {
if let Ok(pkt) = res { out.push(pkt); }
if let Ok(pkt) = res {
out.push(pkt);
}
}
}
Err(e) => tracing::warn!("❌🔊 audio stream err: {e}"),
@ -272,11 +298,11 @@ impl LesavkaClientApp {
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let mut cli = RelayClient::new(ep.clone());
// 1. create a Tokio MPSC channel
let (tx, rx) = tokio::sync::mpsc::channel::<AudioPacket>(256);
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
// 2. spawn a real thread that does the blocking `pull()`
let mic_clone = mic.clone();
std::thread::spawn(move || {
@ -287,14 +313,12 @@ impl LesavkaClientApp {
}
}
});
// 3. turn `rx` into an async stream for gRPC
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => {
while resp.get_mut().message().await.transpose().is_some() {}
}
Err(e) => {
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {},
Err(e) => {
// first failure → warn, subsequent ones → debug
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
warn!("❌🎤 connect failed: {e}");
@ -328,8 +352,7 @@ impl LesavkaClientApp {
if n < 10 || n % 120 == 0 {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B",
pkt.pts, pkt.data.len());
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
let _ = tx.try_send(pkt);
}
}
@ -337,14 +360,14 @@ impl LesavkaClientApp {
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
match cli.stream_camera(Request::new(outbound)).await {
Ok(_) => delay = Duration::from_secs(1), // got a stream → reset
Ok(_) => delay = Duration::from_secs(1), // got a stream → reset
Err(e) if e.code() == tonic::Code::Unimplemented => {
tracing::warn!("📸 server does not support StreamCamera giving up");
return; // stop the task completely (#3)
return; // stop the task completely (#3)
}
Err(e) => {
tracing::warn!("❌📸 connect failed: {e:?}");
delay = next_delay(delay); // back-off (#2)
delay = next_delay(delay); // back-off (#2)
}
}
tokio::time::sleep(delay).await;
@ -355,6 +378,6 @@ impl LesavkaClientApp {
fn next_delay(cur: std::time::Duration) -> std::time::Duration {
match cur.as_secs() {
1..=15 => cur * 2,
_ => std::time::Duration::from_secs(30),
_ => std::time::Duration::from_secs(30),
}
}

View File

@ -2,28 +2,55 @@
#![forbid(unsafe_code)]
use lesavka_common::lesavka::{self as pb, handshake_client::HandshakeClient};
use tonic::Code;
use std::time::Duration;
use tokio::time::timeout;
use tonic::{Code, transport::Endpoint};
use tracing::{info, warn};
#[derive(Default, Clone, Copy, Debug)]
pub struct PeerCaps {
pub camera: bool,
pub microphone: bool,
pub camera: bool,
pub microphone: bool,
}
pub async fn negotiate(uri: &str) -> PeerCaps {
let mut cli = HandshakeClient::connect(uri.to_owned())
.await
.expect("\"dial handshake\"");
info!(%uri, "🤝 dial handshake");
match cli.get_capabilities(pb::Empty {}).await {
Ok(rsp) => PeerCaps {
camera: rsp.get_ref().camera,
microphone: rsp.get_ref().microphone,
},
Err(e) if e.code() == Code::Unimplemented => {
// ↺ old server pretend it supports nothing special.
let ep = Endpoint::from_shared(uri.to_owned())
.expect("handshake endpoint")
.tcp_nodelay(true)
.http2_keep_alive_interval(Duration::from_secs(15))
.connect_timeout(Duration::from_secs(5));
let channel = timeout(Duration::from_secs(8), ep.connect())
.await
.expect("handshake connect timeout")
.expect("handshake connect failed");
info!("🤝 handshake channel connected");
let mut cli = HandshakeClient::new(channel);
info!("🤝 fetching capabilities…");
match timeout(Duration::from_secs(5), cli.get_capabilities(pb::Empty {})).await {
Ok(Ok(rsp)) => {
let caps = PeerCaps {
camera: rsp.get_ref().camera,
microphone: rsp.get_ref().microphone,
};
info!(?caps, "🤝 handshake ok");
caps
}
Ok(Err(e)) if e.code() == Code::Unimplemented => {
warn!("🤝 handshake not implemented on server assuming defaults");
PeerCaps::default()
}
Ok(Err(e)) => {
warn!("🤝 handshake failed: {e} assuming defaults");
PeerCaps::default()
}
Err(_) => {
warn!("🤝 handshake timed out assuming defaults");
PeerCaps::default()
}
Err(e) => panic!("\"handshake failed: {e}\""),
}
}

View File

@ -1,26 +1,28 @@
// client/src/input/camera.rs
#![forbid(unsafe_code)]
use anyhow::Result;
use anyhow::Context;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use lesavka_common::lesavka::VideoPacket;
pub struct CameraCapture {
#[allow(dead_code)] // kept alive to hold PLAYING state
pipeline: gst::Pipeline,
sink: gst_app::AppSink,
sink: gst_app::AppSink,
}
impl CameraCapture {
pub fn new(device_fragment: Option<&str>) -> anyhow::Result<Self> {
gst::init().ok();
// Pick device
let dev = device_fragment
.and_then(Self::find_device)
.unwrap_or_else(|| "/dev/video0".into());
// Pick device (prefers V4L2 nodes with capture capability)
let dev = match device_fragment {
Some(path) if path.starts_with("/dev/") => path.to_string(),
Some(fragment) => Self::find_device(fragment).unwrap_or_else(|| "/dev/video0".into()),
None => "/dev/video0".into(),
};
// let (enc, raw_caps) = Self::pick_encoder();
// (NVIDIA → VA-API → software x264).
@ -43,7 +45,7 @@ impl CameraCapture {
_ =>
("video/x-raw,width=1280,height=720",
"videoconvert !"),
};
};
// let desc = format!(
// "v4l2src device={dev} do-timestamp=true ! {raw_caps},width=1280,height=720 ! \
@ -85,26 +87,61 @@ impl CameraCapture {
pub fn pull(&self) -> Option<VideoPacket> {
let sample = self.sink.pull_sample().ok()?;
let buf = sample.buffer()?;
let map = buf.map_readable().ok()?;
let buf = sample.buffer()?;
let map = buf.map_readable().ok()?;
let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
Some(VideoPacket { id: 2, pts, data: map.as_slice().to_vec() })
Some(VideoPacket {
id: 2,
pts,
data: map.as_slice().to_vec(),
})
}
/// Fuzzymatch devices under `/dev/v4l/by-id`
/// Fuzzymatch devices under `/dev/v4l/by-id`, preferring capture nodes
fn find_device(substr: &str) -> Option<String> {
let dir = std::fs::read_dir("/dev/v4l/by-id").ok()?;
for e in dir.flatten() {
let p = e.path();
if p.file_name()?.to_string_lossy().contains(substr) {
if let Ok(target) = std::fs::read_link(&p) {
return Some(format!("/dev/{}", target.file_name()?.to_string_lossy()));
let wanted = substr.to_ascii_lowercase();
let mut matches: Vec<_> = std::fs::read_dir("/dev/v4l/by-id")
.ok()?
.flatten()
.filter_map(|e| {
let p = e.path();
let name = p.file_name()?.to_string_lossy().to_ascii_lowercase();
if name.contains(&wanted) {
Some(p)
} else {
None
}
})
.collect();
// deterministic order
matches.sort();
for p in matches {
if let Ok(target) = std::fs::read_link(&p) {
let dev = format!("/dev/{}", target.file_name()?.to_string_lossy());
if Self::is_capture(&dev) {
return Some(dev);
}
}
}
None
}
fn is_capture(dev: &str) -> bool {
const V4L2_CAP_VIDEO_CAPTURE: u32 = 0x0000_0001;
const V4L2_CAP_VIDEO_CAPTURE_MPLANE: u32 = 0x0000_1000;
v4l::Device::with_path(dev)
.ok()
.and_then(|d| d.query_caps().ok())
.map(|caps| {
let bits = caps.capabilities.bits();
(bits & V4L2_CAP_VIDEO_CAPTURE != 0) || (bits & V4L2_CAP_VIDEO_CAPTURE_MPLANE != 0)
})
.unwrap_or(false)
}
/// Cheap stub used when the webcam is disabled
pub fn new_stub() -> Self {
let pipeline = gst::Pipeline::new();
@ -116,13 +153,13 @@ impl CameraCapture {
Self { pipeline, sink }
}
#[allow(dead_code)] // helper kept for future heuristics
#[allow(dead_code)] // helper kept for future heuristics
fn pick_encoder() -> (&'static str, &'static str) {
let encoders = &[
("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"),
("vaapih264enc","video/x-raw,format=NV12"),
("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc.
("x264enc", "video/x-raw"), // software
("nvh264enc", "video/x-raw(memory:NVMM),format=NV12"),
("vaapih264enc", "video/x-raw,format=NV12"),
("v4l2h264enc", "video/x-raw"), // RPi, Jetson, etc.
("x264enc", "video/x-raw"), // software
];
for (name, caps) in encoders {
if gst::ElementFactory::find(name).is_some() {
@ -135,13 +172,16 @@ impl CameraCapture {
fn choose_encoder() -> (&'static str, &'static str, &'static str) {
match () {
_ if gst::ElementFactory::find("nvh264enc").is_some() =>
("nvh264enc", "gop-size", "30"),
_ if gst::ElementFactory::find("vaapih264enc").is_some() =>
("vaapih264enc","keyframe-period","30"),
_ if gst::ElementFactory::find("v4l2h264enc").is_some() =>
("v4l2h264enc","idrcount", "30"),
_ => ("x264enc", "key-int-max", "30"),
_ if gst::ElementFactory::find("nvh264enc").is_some() => {
("nvh264enc", "gop-size", "30")
}
_ if gst::ElementFactory::find("vaapih264enc").is_some() => {
("vaapih264enc", "keyframe-period", "30")
}
_ if gst::ElementFactory::find("v4l2h264enc").is_some() => {
("v4l2h264enc", "idrcount", "30")
}
_ => ("x264enc", "key-int-max", "30"),
}
}
}

View File

@ -1,9 +1,12 @@
// client/src/input/inputs.rs
use anyhow::{bail, Context, Result};
use anyhow::{Context, Result, bail};
use evdev::{Device, EventType, KeyCode, RelativeAxisCode};
use tokio::{sync::broadcast::Sender, time::{interval, Duration}};
use tracing::{debug, info, warn, trace};
use tokio::{
sync::broadcast::Sender,
time::{Duration, interval},
};
use tracing::{debug, info, warn};
use lesavka_common::lesavka::{KeyboardReport, MouseReport};
@ -21,19 +24,26 @@ pub struct InputAggregator {
}
impl InputAggregator {
pub fn new(dev_mode: bool,
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>) -> Self {
Self { kbd_tx, mou_tx, dev_mode, released: false, magic_active: false,
keyboards: Vec::new(), mice: Vec::new()
}
pub fn new(
dev_mode: bool,
kbd_tx: Sender<KeyboardReport>,
mou_tx: Sender<MouseReport>,
) -> Self {
Self {
kbd_tx,
mou_tx,
dev_mode,
released: false,
magic_active: false,
keyboards: Vec::new(),
mice: Vec::new(),
}
}
/// Called once at startup: enumerates input devices,
/// classifies them, and constructs a aggregator struct per type.
pub fn init(&mut self) -> Result<()> {
let paths = std::fs::read_dir("/dev/input")
.context("Failed to read /dev/input")?;
let paths = std::fs::read_dir("/dev/input").context("Failed to read /dev/input")?;
let mut found_any = false;
@ -42,7 +52,10 @@ impl InputAggregator {
let path = entry.path();
// skip anything that isn't "event*"
if !path.file_name().map_or(false, |f| f.to_string_lossy().starts_with("event")) {
if !path
.file_name()
.map_or(false, |f| f.to_string_lossy().starts_with("event"))
{
continue;
}
@ -56,12 +69,17 @@ impl InputAggregator {
};
// non-blocking so fetch_events never stalls the whole loop
dev.set_nonblocking(true).with_context(|| format!("set_non_blocking {:?}", path))?;
dev.set_nonblocking(true)
.with_context(|| format!("set_non_blocking {:?}", path))?;
match classify_device(&dev) {
DeviceKind::Keyboard => {
dev.grab().with_context(|| format!("grabbing keyboard {path:?}"))?;
info!("🤏🖱️ Grabbed keyboard {:?}", dev.name().unwrap_or("UNKNOWN"));
dev.grab()
.with_context(|| format!("grabbing keyboard {path:?}"))?;
info!(
"🤏🖱️ Grabbed keyboard {:?}",
dev.name().unwrap_or("UNKNOWN")
);
// pass dev_mode to aggregator
// let kbd_agg = KeyboardAggregator::new(dev, self.dev_mode);
@ -71,7 +89,8 @@ impl InputAggregator {
continue;
}
DeviceKind::Mouse => {
dev.grab().with_context(|| format!("grabbing mouse {path:?}"))?;
dev.grab()
.with_context(|| format!("grabbing mouse {path:?}"))?;
info!("🤏⌨️ Grabbed mouse {:?}", dev.name().unwrap_or("UNKNOWN"));
// let mouse_agg = MouseAggregator::new(dev);
@ -81,7 +100,10 @@ impl InputAggregator {
continue;
}
DeviceKind::Other => {
debug!("Skipping non-kbd/mouse device: {:?}", dev.name().unwrap_or("UNKNOWN"));
debug!(
"Skipping non-kbd/mouse device: {:?}",
dev.name().unwrap_or("UNKNOWN")
);
continue;
}
}
@ -104,24 +126,26 @@ impl InputAggregator {
let magic_now = self.keyboards.iter().any(|k| k.magic_grab());
let magic_left = self.keyboards.iter().any(|k| k.magic_left());
let magic_right = self.keyboards.iter().any(|k| k.magic_right());
let mut want_kill = false;
let mut want_kill = false;
for kbd in &mut self.keyboards {
kbd.process_events();
want_kill |= kbd.magic_kill();
want_kill |= kbd.magic_kill();
}
if magic_now && !self.magic_active { self.toggle_grab(); }
if magic_now && !self.magic_active {
self.toggle_grab();
}
if (magic_left || magic_right) && self.magic_active {
current = match current {
Layout::SideBySide => Layout::FullLeft,
Layout::FullLeft => Layout::FullRight,
Layout::FullRight => Layout::SideBySide,
Layout::FullLeft => Layout::FullRight,
Layout::FullRight => Layout::SideBySide,
};
apply_layout(current);
}
if want_kill {
if want_kill {
warn!("🧙 magic chord - killing 🪄 AVADA KEDAVRA!!! 💥💀⚰️");
std::process::exit(0);
std::process::exit(0);
}
for mouse in &mut self.mice {
@ -139,18 +163,18 @@ impl InputAggregator {
} else {
tracing::info!("🧙 magic chord - freeing devices 🪄 EXPELLIARMUS!!! 🔓🕊️");
}
for k in &mut self.keyboards { k.set_grab(self.released); k.set_send(self.released); }
for m in &mut self.mice { m.set_grab(self.released); m.set_send(self.released); }
for k in &mut self.keyboards {
k.set_grab(self.released);
k.set_send(self.released);
}
for m in &mut self.mice {
m.set_grab(self.released);
m.set_send(self.released);
}
self.released = !self.released;
}
}
#[derive(Debug)]
struct Classification {
keyboard: Option<()>,
mouse: Option<()>,
}
/// The classification function
fn classify_device(dev: &Device) -> DeviceKind {
let evbits = dev.supported_events();
@ -166,13 +190,10 @@ fn classify_device(dev: &Device) -> DeviceKind {
// Mouse logic
if evbits.contains(EventType::RELATIVE) {
if let (Some(rel), Some(keys)) =
(dev.supported_relative_axes(), dev.supported_keys())
{
let has_xy = rel.contains(RelativeAxisCode::REL_X)
&& rel.contains(RelativeAxisCode::REL_Y);
let has_btn = keys.contains(KeyCode::BTN_LEFT)
|| keys.contains(KeyCode::BTN_RIGHT);
if let (Some(rel), Some(keys)) = (dev.supported_relative_axes(), dev.supported_keys()) {
let has_xy =
rel.contains(RelativeAxisCode::REL_X) && rel.contains(RelativeAxisCode::REL_Y);
let has_btn = keys.contains(KeyCode::BTN_LEFT) || keys.contains(KeyCode::BTN_RIGHT);
if has_xy && has_btn {
return DeviceKind::Mouse;
}

View File

@ -1,7 +1,10 @@
// client/src/input/keyboard.rs
use std::{collections::HashSet, sync::atomic::{AtomicU32, Ordering}};
use evdev::{Device, EventType, InputEvent, KeyCode};
use std::{
collections::HashSet,
sync::atomic::{AtomicU32, Ordering},
};
use tokio::sync::broadcast::Sender;
use tracing::{debug, error, trace};
@ -11,7 +14,7 @@ use super::keymap::{is_modifier, keycode_to_usage};
pub struct KeyboardAggregator {
dev: Device,
tx: Sender<KeyboardReport>,
tx: Sender<KeyboardReport>,
dev_mode: bool,
sending_disabled: bool,
pressed_keys: HashSet<KeyCode>,
@ -24,11 +27,21 @@ static SEQ: AtomicU32 = AtomicU32::new(0);
impl KeyboardAggregator {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<KeyboardReport>) -> Self {
let _ = dev.set_nonblocking(true);
Self { dev, tx, dev_mode, sending_disabled: false, pressed_keys: HashSet::new()}
Self {
dev,
tx,
dev_mode,
sending_disabled: false,
pressed_keys: HashSet::new(),
}
}
pub fn set_grab(&mut self, grab: bool) {
let _ = if grab { self.dev.grab() } else { self.dev.ungrab() };
let _ = if grab {
self.dev.grab()
} else {
self.dev.ungrab()
};
}
pub fn set_send(&mut self, send: bool) {
@ -40,28 +53,47 @@ impl KeyboardAggregator {
let events: Vec<InputEvent> = match self.dev.fetch_events() {
Ok(it) => it.collect(),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => { if self.dev_mode { error!("⌨️❌ read error: {e}"); } return }
Err(e) => {
if self.dev_mode {
error!("⌨️❌ read error: {e}");
}
return;
}
};
if self.dev_mode && !events.is_empty() {
trace!("⌨️ {} kbd evts from {}", events.len(), self.dev.name().unwrap_or("?"));
trace!(
"⌨️ {} kbd evts from {}",
events.len(),
self.dev.name().unwrap_or("?")
);
}
for ev in events {
if ev.event_type() != EventType::KEY { continue }
if ev.event_type() != EventType::KEY {
continue;
}
let code = KeyCode::new(ev.code());
match ev.value() {
1 => { self.pressed_keys.insert(code); } // press
0 => { self.pressed_keys.remove(&code); } // release
1 => {
self.pressed_keys.insert(code);
} // press
0 => {
self.pressed_keys.remove(&code);
} // release
_ => {}
}
let report = self.build_report();
// Generate a local sequence number for debugging/log-merge only.
let id = SEQ.fetch_add(1, Ordering::Relaxed);
if self.dev_mode { debug!(seq = id, ?report, "kbd"); }
if self.dev_mode {
debug!(seq = id, ?report, "kbd");
}
if !self.sending_disabled {
let _ = self.tx.send(KeyboardReport { data: report.to_vec() });
let _ = self.tx.send(KeyboardReport {
data: report.to_vec(),
});
}
}
}
@ -72,16 +104,23 @@ impl KeyboardAggregator {
let mut keys = Vec::new();
for &kc in &self.pressed_keys {
if let Some(m) = is_modifier(kc) { mods |= m }
else if let Some(u) = keycode_to_usage(kc) { keys.push(u) }
if let Some(m) = is_modifier(kc) {
mods |= m
} else if let Some(u) = keycode_to_usage(kc) {
keys.push(u)
}
}
out[0] = mods;
for (i, k) in keys.into_iter().take(6).enumerate() { out[2+i] = k }
for (i, k) in keys.into_iter().take(6).enumerate() {
out[2 + i] = k
}
out
}
pub fn has_key(&self, kc: KeyCode) -> bool { self.pressed_keys.contains(&kc) }
pub fn has_key(&self, kc: KeyCode) -> bool {
self.pressed_keys.contains(&kc)
}
pub fn magic_grab(&self) -> bool {
self.has_key(KeyCode::KEY_LEFTCTRL)
@ -102,8 +141,7 @@ impl KeyboardAggregator {
}
pub fn magic_kill(&self) -> bool {
self.has_key(KeyCode::KEY_LEFTCTRL)
&& self.has_key(KeyCode::KEY_ESC)
self.has_key(KeyCode::KEY_LEFTCTRL) && self.has_key(KeyCode::KEY_ESC)
}
}

View File

@ -77,46 +77,46 @@ pub fn keycode_to_usage(key: KeyCode) -> Option<u8> {
KeyCode::KEY_F10 => Some(0x43),
KeyCode::KEY_F11 => Some(0x44),
KeyCode::KEY_F12 => Some(0x45),
// --- Navigation / editing cluster ---------------------------------
KeyCode::KEY_SYSRQ => Some(0x46), // Print-Screen
KeyCode::KEY_SCROLLLOCK => Some(0x47),
KeyCode::KEY_PAUSE => Some(0x48),
KeyCode::KEY_INSERT => Some(0x49),
KeyCode::KEY_HOME => Some(0x4A),
KeyCode::KEY_PAGEUP => Some(0x4B),
KeyCode::KEY_DELETE => Some(0x4C),
KeyCode::KEY_END => Some(0x4D),
KeyCode::KEY_PAGEDOWN => Some(0x4E),
KeyCode::KEY_RIGHT => Some(0x4F),
KeyCode::KEY_LEFT => Some(0x50),
KeyCode::KEY_DOWN => Some(0x51),
KeyCode::KEY_UP => Some(0x52),
KeyCode::KEY_SYSRQ => Some(0x46), // Print-Screen
KeyCode::KEY_SCROLLLOCK => Some(0x47),
KeyCode::KEY_PAUSE => Some(0x48),
KeyCode::KEY_INSERT => Some(0x49),
KeyCode::KEY_HOME => Some(0x4A),
KeyCode::KEY_PAGEUP => Some(0x4B),
KeyCode::KEY_DELETE => Some(0x4C),
KeyCode::KEY_END => Some(0x4D),
KeyCode::KEY_PAGEDOWN => Some(0x4E),
KeyCode::KEY_RIGHT => Some(0x4F),
KeyCode::KEY_LEFT => Some(0x50),
KeyCode::KEY_DOWN => Some(0x51),
KeyCode::KEY_UP => Some(0x52),
// --- Keypad / Num-lock block --------------------------------------
KeyCode::KEY_NUMLOCK => Some(0x53),
KeyCode::KEY_KPSLASH => Some(0x54),
KeyCode::KEY_KPASTERISK => Some(0x55),
KeyCode::KEY_KPMINUS => Some(0x56),
KeyCode::KEY_KPPLUS => Some(0x57),
KeyCode::KEY_KPENTER => Some(0x58),
KeyCode::KEY_KP1 => Some(0x59),
KeyCode::KEY_KP2 => Some(0x5A),
KeyCode::KEY_KP3 => Some(0x5B),
KeyCode::KEY_KP4 => Some(0x5C),
KeyCode::KEY_KP5 => Some(0x5D),
KeyCode::KEY_KP6 => Some(0x5E),
KeyCode::KEY_KP7 => Some(0x5F),
KeyCode::KEY_KP8 => Some(0x60),
KeyCode::KEY_KP9 => Some(0x61),
KeyCode::KEY_KP0 => Some(0x62),
KeyCode::KEY_KPDOT => Some(0x63),
KeyCode::KEY_KPEQUAL => Some(0x67),
KeyCode::KEY_NUMLOCK => Some(0x53),
KeyCode::KEY_KPSLASH => Some(0x54),
KeyCode::KEY_KPASTERISK => Some(0x55),
KeyCode::KEY_KPMINUS => Some(0x56),
KeyCode::KEY_KPPLUS => Some(0x57),
KeyCode::KEY_KPENTER => Some(0x58),
KeyCode::KEY_KP1 => Some(0x59),
KeyCode::KEY_KP2 => Some(0x5A),
KeyCode::KEY_KP3 => Some(0x5B),
KeyCode::KEY_KP4 => Some(0x5C),
KeyCode::KEY_KP5 => Some(0x5D),
KeyCode::KEY_KP6 => Some(0x5E),
KeyCode::KEY_KP7 => Some(0x5F),
KeyCode::KEY_KP8 => Some(0x60),
KeyCode::KEY_KP9 => Some(0x61),
KeyCode::KEY_KP0 => Some(0x62),
KeyCode::KEY_KPDOT => Some(0x63),
KeyCode::KEY_KPEQUAL => Some(0x67),
// --- Misc ---------------------------------------------------------
KeyCode::KEY_102ND => Some(0x64), // “<>” on ISO boards
KeyCode::KEY_MENU => Some(0x65), // Application / Compose
KeyCode::KEY_102ND => Some(0x64), // “<>” on ISO boards
KeyCode::KEY_MENU => Some(0x65), // Application / Compose
// We'll handle modifiers (ctrl, shift, alt, meta) in `is_modifier()`
_ => None,
}
@ -125,13 +125,13 @@ pub fn keycode_to_usage(key: KeyCode) -> Option<u8> {
/// If a key is a modifier, return the bit(s) to set in HID byte[0].
pub fn is_modifier(key: KeyCode) -> Option<u8> {
match key {
KeyCode::KEY_LEFTCTRL => Some(0x01),
KeyCode::KEY_LEFTCTRL => Some(0x01),
KeyCode::KEY_LEFTSHIFT => Some(0x02),
KeyCode::KEY_LEFTALT => Some(0x04),
KeyCode::KEY_LEFTMETA => Some(0x08),
KeyCode::KEY_LEFTALT => Some(0x04),
KeyCode::KEY_LEFTMETA => Some(0x08),
KeyCode::KEY_RIGHTCTRL => Some(0x10),
KeyCode::KEY_RIGHTSHIFT => Some(0x20),
KeyCode::KEY_RIGHTALT => Some(0x40),
KeyCode::KEY_RIGHTALT => Some(0x40),
KeyCode::KEY_RIGHTMETA => Some(0x80),
_ => None,
}

View File

@ -3,31 +3,36 @@
#![forbid(unsafe_code)]
use anyhow::{Context, Result};
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use lesavka_common::lesavka::AudioPacket;
use tracing::{debug, error, info, warn, trace};
use shell_escape::unix::escape;
use std::sync::atomic::{AtomicU64, Ordering};
use tracing::{debug, error, info, trace, warn};
pub struct MicrophoneCapture {
#[allow(dead_code)] // kept alive to hold PLAYING state
pipeline: gst::Pipeline,
sink: gst_app::AppSink,
sink: gst_app::AppSink,
}
impl MicrophoneCapture {
pub fn new() -> Result<Self> {
gst::init().ok(); // idempotent
gst::init().ok(); // idempotent
/* pulsesrc (default mic) → AAC/ADTS → appsink -------------------*/
// Optional override: LESAVKA_MIC_SOURCE=<pulsedevicename>
// If not provided or not found, fall back to first non-monitor source.
let device_arg = match std::env::var("LESAVKA_MIC_SOURCE") {
Ok(s) if !s.is_empty() => {
let full = Self::pulse_source_by_substr(&s).unwrap_or(s);
format!("device={}", escape(full.into()))
}
_ => String::new(),
Ok(s) if !s.is_empty() => match Self::pulse_source_by_substr(&s) {
Some(full) => format!("device={}", escape(full.into())),
None => {
warn!("🎤 requested mic '{s}' not found; using default");
Self::default_source_arg()
}
},
_ => Self::default_source_arg(),
};
debug!("🎤 device: {device_arg}");
let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"]
@ -50,14 +55,8 @@ impl MicrophoneCapture {
appsink name=asink emit-signals=true max-buffers=50 drop=true"
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast()
.expect("pipeline");
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.unwrap()
.downcast()
.unwrap();
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline");
let sink: gst_app::AppSink = pipeline.by_name("asink").unwrap().downcast().unwrap();
/* ─── bus for diagnostics ───────────────────────────────────────*/
{
@ -66,20 +65,30 @@ impl MicrophoneCapture {
use gst::MessageView::*;
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
StateChanged(s) if s.current() == gst::State::Playing
&& msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) =>
info!("🎤 mic pipeline ▶️ (source=pulsesrc)"),
Error(e) =>
error!("🎤💥 mic: {} ({})", e.error(), e.debug().unwrap_or_default()),
Warning(w) =>
warn!("🎤⚠️ mic: {} ({})", w.error(), w.debug().unwrap_or_default()),
StateChanged(s)
if s.current() == gst::State::Playing
&& msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) =>
{
info!("🎤 mic pipeline ▶️ (source=pulsesrc)")
}
Error(e) => error!(
"🎤💥 mic: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"🎤⚠️ mic: {} ({})",
w.error(),
w.debug().unwrap_or_default()
),
_ => {}
}
}
});
}
pipeline.set_state(gst::State::Playing)
pipeline
.set_state(gst::State::Playing)
.context("start mic pipeline")?;
Ok(Self { pipeline, sink })
@ -97,8 +106,11 @@ impl MicrophoneCapture {
if n < 10 || n % 300 == 0 {
trace!("🎤⇧ cli pkt#{n} {} bytes", map.len());
}
Some(AudioPacket { id: 0, pts, data: map.as_slice().to_vec() })
Some(AudioPacket {
id: 0,
pts,
data: map.as_slice().to_vec(),
})
}
Err(_) => None,
}
@ -106,15 +118,39 @@ impl MicrophoneCapture {
fn pulse_source_by_substr(fragment: &str) -> Option<String> {
use std::process::Command;
let out = Command::new("pactl").args(["list", "short", "sources"])
.output().ok()?;
let out = Command::new("pactl")
.args(["list", "short", "sources"])
.output()
.ok()?;
let list = String::from_utf8_lossy(&out.stdout);
list.lines()
.find_map(|ln| {
let mut cols = ln.split_whitespace();
let _id = cols.next()?;
let name = cols.next()?; // column #1
if name.contains(fragment) { Some(name.to_owned()) } else { None }
})
list.lines().find_map(|ln| {
let mut cols = ln.split_whitespace();
let _id = cols.next()?;
let name = cols.next()?; // column #1
if name.contains(fragment) {
Some(name.to_owned())
} else {
None
}
})
}
/// Pick the first non-monitor Pulse source if available; otherwise empty.
fn default_source_arg() -> String {
use std::process::Command;
let out = Command::new("pactl")
.args(["list", "short", "sources"])
.output();
if let Ok(out) = out {
let list = String::from_utf8_lossy(&out.stdout);
if let Some(name) = list
.lines()
.filter_map(|ln| ln.split_whitespace().nth(1))
.find(|name| !name.ends_with(".monitor"))
{
return format!("device={}", escape(name.into()));
}
}
String::new()
}
}

View File

@ -1,8 +1,8 @@
// client/src/input/mod.rs
pub mod inputs; // the aggregator that scans /dev/input and spawns sub-aggregators
pub mod keyboard; // existing keyboard aggregator logic (minus scanning)
pub mod mouse; // a stub aggregator for mice
pub mod camera; // stub for camera
pub mod microphone; // stub for mic
pub mod keymap; // keyboard keymap logic
pub mod camera; // stub for camera
pub mod inputs; // the aggregator that scans /dev/input and spawns sub-aggregators
pub mod keyboard; // existing keyboard aggregator logic (minus scanning)
pub mod keymap;
pub mod microphone; // stub for mic
pub mod mouse; // a stub aggregator for mice // keyboard keymap logic

View File

@ -1,9 +1,9 @@
// client/src/input/mouse.rs
use evdev::{Device, EventType, InputEvent, KeyCode, RelativeAxisCode};
use tokio::sync::broadcast::{self, Sender};
use std::time::{Duration, Instant};
use tracing::{debug, error, warn, trace};
use tokio::sync::broadcast::{self, Sender};
use tracing::{debug, error, trace, warn};
use lesavka_common::lesavka::MouseReport;
@ -11,58 +11,91 @@ const SEND_INTERVAL: Duration = Duration::from_millis(1);
pub struct MouseAggregator {
dev: Device,
tx: Sender<MouseReport>,
tx: Sender<MouseReport>,
dev_mode: bool,
sending_disabled: bool,
next_send: Instant,
buttons: u8,
last_buttons: u8,
dx: i8,
dy: i8,
dx: i8,
dy: i8,
wheel: i8,
}
impl MouseAggregator {
pub fn new(dev: Device, dev_mode: bool, tx: Sender<MouseReport>) -> Self {
Self { dev, tx, dev_mode, sending_disabled: false, next_send: Instant::now(), buttons:0, last_buttons:0, dx:0, dy:0, wheel:0 }
Self {
dev,
tx,
dev_mode,
sending_disabled: false,
next_send: Instant::now(),
buttons: 0,
last_buttons: 0,
dx: 0,
dy: 0,
wheel: 0,
}
}
#[inline] fn slog(&self, f: impl FnOnce()) { if self.dev_mode { f() } }
#[inline]
#[allow(dead_code)]
fn slog(&self, f: impl FnOnce()) {
if self.dev_mode {
f()
}
}
pub fn set_grab(&mut self, grab: bool) {
let _ = if grab { self.dev.grab() } else { self.dev.ungrab() };
let _ = if grab {
self.dev.grab()
} else {
self.dev.ungrab()
};
}
pub fn set_send(&mut self, send: bool) {
self.sending_disabled = !send;
}
pub fn process_events(&mut self) {
let evts: Vec<InputEvent> = match self.dev.fetch_events() {
Ok(it) => it.collect(),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => return,
Err(e) => { if self.dev_mode { error!("🖱️❌ mouse read err: {e}"); } return }
Err(e) => {
if self.dev_mode {
error!("🖱️❌ mouse read err: {e}");
}
return;
}
};
if self.dev_mode && !evts.is_empty() {
trace!("🖱️ {} evts from {}", evts.len(), self.dev.name().unwrap_or("?"));
trace!(
"🖱️ {} evts from {}",
evts.len(),
self.dev.name().unwrap_or("?")
);
}
for e in evts {
match e.event_type() {
EventType::KEY => match e.code() {
c if c == KeyCode::BTN_LEFT.0 => self.set_btn(0, e.value()),
c if c == KeyCode::BTN_RIGHT.0 => self.set_btn(1, e.value()),
c if c == KeyCode::BTN_LEFT.0 => self.set_btn(0, e.value()),
c if c == KeyCode::BTN_RIGHT.0 => self.set_btn(1, e.value()),
c if c == KeyCode::BTN_MIDDLE.0 => self.set_btn(2, e.value()),
_ => {}
},
EventType::RELATIVE => match e.code() {
c if c == RelativeAxisCode::REL_X.0 =>
self.dx = self.dx.saturating_add(e.value().clamp(-127,127) as i8),
c if c == RelativeAxisCode::REL_Y.0 =>
self.dy = self.dy.saturating_add(e.value().clamp(-127,127) as i8),
c if c == RelativeAxisCode::REL_WHEEL.0 =>
self.wheel = self.wheel.saturating_add(e.value().clamp(-1,1) as i8),
c if c == RelativeAxisCode::REL_X.0 => {
self.dx = self.dx.saturating_add(e.value().clamp(-127, 127) as i8)
}
c if c == RelativeAxisCode::REL_Y.0 => {
self.dy = self.dy.saturating_add(e.value().clamp(-127, 127) as i8)
}
c if c == RelativeAxisCode::REL_WHEEL.0 => {
self.wheel = self.wheel.saturating_add(e.value().clamp(-1, 1) as i8)
}
_ => {}
},
EventType::SYNCHRONIZATION => self.flush(),
@ -72,13 +105,15 @@ impl MouseAggregator {
}
fn flush(&mut self) {
if self.buttons == self.last_buttons && Instant::now() < self.next_send { return; }
if self.buttons == self.last_buttons && Instant::now() < self.next_send {
return;
}
self.next_send = Instant::now() + SEND_INTERVAL;
let pkt = [
self.buttons,
self.dx.clamp(-127,127) as u8,
self.dy.clamp(-127,127) as u8,
self.dx.clamp(-127, 127) as u8,
self.dy.clamp(-127, 127) as u8,
self.wheel as u8,
];
@ -86,17 +121,27 @@ impl MouseAggregator {
if let Err(broadcast::error::SendError(_)) =
self.tx.send(MouseReport { data: pkt.to_vec() })
{
if self.dev_mode { warn!("❌🖱️ no HID receiver (mouse)"); }
if self.dev_mode {
warn!("❌🖱️ no HID receiver (mouse)");
}
} else if self.dev_mode {
debug!("📤🖱️ mouse {:?}", pkt);
}
}
self.dx=0; self.dy=0; self.wheel=0; self.last_buttons=self.buttons;
self.dx = 0;
self.dy = 0;
self.wheel = 0;
self.last_buttons = self.buttons;
}
#[inline] fn set_btn(&mut self, bit: u8, val: i32) {
if val!=0 { self.buttons |= 1<<bit } else { self.buttons &= !(1<<bit) }
#[inline]
fn set_btn(&mut self, bit: u8, val: i32) {
if val != 0 {
self.buttons |= 1 << bit
} else {
self.buttons &= !(1 << bit)
}
}
}
@ -108,4 +153,3 @@ impl Drop for MouseAggregator {
});
}
}

View File

@ -1,29 +1,27 @@
// client/src/layout.rs - Wayland-only window placement utilities
#![forbid(unsafe_code)]
use serde_json::Value;
use std::process::Command;
use tracing::{info, warn};
use serde_json::Value;
/// The three layouts we cycle through.
#[derive(Clone, Copy)]
pub enum Layout {
SideBySide, // two halves on the current monitor
FullLeft, // left eye full-screen, right hidden
FullRight, // right eye full-screen, left hidden
SideBySide, // two halves on the current monitor
FullLeft, // left eye full-screen, right hidden
FullRight, // right eye full-screen, left hidden
}
/// Move/resize a window titled “Lesavka-eye-{eye}” using `swaymsg`.
fn place_window(eye: u32, x: i32, y: i32, w: i32, h: i32) {
let title = format!("Lesavka-eye-{eye}");
let cmd = format!(
r#"[title="^{title}$"] resize set {w} {h}; move position {x} {y}"#
);
let cmd = format!(r#"[title="^{title}$"] resize set {w} {h}; move position {x} {y}"#);
match Command::new("swaymsg").arg(cmd).status() {
Ok(st) if st.success() => info!("✅ placed eye{eye} {w}×{h}@{x},{y}"),
Ok(st) => warn!("⚠️ swaymsg exited with {st}"),
Err(e) => warn!("⚠️ swaymsg failed: {e}"),
Ok(st) => warn!("⚠️ swaymsg exited with {st}"),
Err(e) => warn!("⚠️ swaymsg failed: {e}"),
}
}
@ -35,15 +33,23 @@ pub fn apply(layout: Layout) {
.output()
{
Ok(o) => o.stdout,
Err(e) => { warn!("get_outputs failed: {e}"); return; }
Err(e) => {
warn!("get_outputs failed: {e}");
return;
}
};
let Ok(Value::Array(outputs)) = serde_json::from_slice::<Value>(&out) else {
warn!("unexpected JSON from swaymsg"); return;
warn!("unexpected JSON from swaymsg");
return;
};
let Some(rect) = outputs.iter()
let Some(rect) = outputs
.iter()
.find(|o| o.get("focused").and_then(Value::as_bool) == Some(true))
.and_then(|o| o.get("rect")) else { return; };
.and_then(|o| o.get("rect"))
else {
return;
};
// helper to read an i64 → i32 with defaults
let g = |k: &str| rect.get(k).and_then(Value::as_i64).unwrap_or(0) as i32;
@ -52,8 +58,8 @@ pub fn apply(layout: Layout) {
match layout {
Layout::SideBySide => {
let w_half = w / 2;
place_window(0, x, y, w_half, h);
place_window(1, x + w_half, y, w_half, h);
place_window(0, x, y, w_half, h);
place_window(1, x + w_half, y, w_half, h);
}
Layout::FullLeft => {
place_window(0, x, y, w, h);

View File

@ -3,9 +3,9 @@
#![forbid(unsafe_code)]
pub mod app;
pub mod input;
pub mod output;
pub mod layout;
pub mod handshake;
pub mod input;
pub mod layout;
pub mod output;
pub use app::LesavkaClientApp;

View File

@ -28,11 +28,14 @@ async fn main() -> Result<()> {
/*------------- common filter & stderr layer ------------------------*/
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new(
"lesavka_client=trace,\
lesavka_server=trace,\
tonic=debug,\
h2=debug,\
tower=debug",
"warn,\
lesavka_client::app=info,\
lesavka_client::input::camera=debug,\
lesavka_client::output::video=info,\
lesavka_client::output::audio=info,\
tonic=warn,\
h2=warn,\
tower=warn",
)
});
@ -42,7 +45,7 @@ async fn main() -> Result<()> {
.with_file(true);
let dev_mode = env::var("LESAVKA_DEV_MODE").is_ok();
let mut _guard: Option<WorkerGuard> = None; // keep guard alive
let mut _guard: Option<WorkerGuard> = None; // keep guard alive
/*------------- subscriber setup -----------------------------------*/
if dev_mode {
@ -69,7 +72,10 @@ async fn main() -> Result<()> {
.with(file_layer)
.init();
tracing::info!("📜 lesavka-client running in DEV mode → {}", log_path.display());
tracing::info!(
"📜 lesavka-client running in DEV mode → {}",
log_path.display()
);
} else {
tracing_subscriber::registry()
.with(env_filter)

View File

@ -1,11 +1,11 @@
// client/src/output/audio.rs
use anyhow::{Context, Result};
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst::MessageView::*;
use tracing::{error, info, warn, debug};
use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::AudioPacket;
@ -57,13 +57,14 @@ impl AudioOut {
.expect("no src element")
.downcast::<gst_app::AppSrc>()
.expect("src not an AppSrc");
src.set_caps(Some(&gst::Caps::builder("audio/mpeg")
.field("mpegversion", &4i32) // AAC
.field("stream-format", &"adts") // ADTS frames
.field("rate", &48_000i32) // 48kHz
.field("channels", &2i32) // stereo
.build()
src.set_caps(Some(
&gst::Caps::builder("audio/mpeg")
.field("mpegversion", &4i32) // AAC
.field("stream-format", &"adts") // ADTS frames
.field("rate", &48_000i32) // 48kHz
.field("channels", &2i32) // stereo
.build(),
));
src.set_format(gst::Format::Time);
@ -72,34 +73,40 @@ impl AudioOut {
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!("💥 gst error from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
e.error(), e.debug().unwrap_or_default()),
Warning(w) => warn!("⚠️ gst warning from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
w.error(), w.debug().unwrap_or_default()),
Element(e) => debug!("🔎 gst element message: {}", e
.structure()
.map(|s| s.to_string())
.unwrap_or_default()),
Error(e) => error!(
"💥 gst error from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ gst warning from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
w.error(),
w.debug().unwrap_or_default()
),
Element(e) => debug!(
"🔎 gst element message: {}",
e.structure().map(|s| s.to_string()).unwrap_or_default()
),
StateChanged(s) if s.current() == gst::State::Playing => {
if msg
.src()
.map(|s| s.is::<gst::Pipeline>())
.unwrap_or(false)
{
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🔊 audio pipeline ▶️ (sink='{}')", sink);
} else {
debug!("🔊 element {} now ▶️",
msg.src().map(|s| s.name()).unwrap_or_default());
debug!(
"🔊 element {} now ▶️",
msg.src().map(|s| s.name()).unwrap_or_default()
);
}
},
}
_ => {}
}
}
});
pipeline.set_state(gst::State::Playing).context("starting audio pipeline")?;
pipeline
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
Ok(Self { pipeline, src })
}
@ -131,7 +138,7 @@ fn pick_sink_element() -> Result<String> {
}
// 2. Query PipeWire for default & running sinks
let sinks = list_pw_sinks(); // Vec<(name,state)>
let sinks = list_pw_sinks(); // Vec<(name,state)>
for (n, st) in &sinks {
if *st == "RUNNING" {
info!("🔈 using default RUNNING sink '{}'", n);
@ -157,20 +164,16 @@ fn pick_sink_element() -> Result<String> {
}
fn list_pw_sinks() -> Vec<(String, String)> {
let mut out = Vec::new();
if out.is_empty() {
// ── PulseAudio / pactl fallback ────────────────────────────────
if let Ok(info) = std::process::Command::new("pactl")
.args(["info"])
.output()
.map(|o| String::from_utf8_lossy(&o.stdout).to_string())
{
if let Some(line) = info.lines().find(|l| l.starts_with("Default Sink:")) {
let def = line["Default Sink:".len()..].trim();
return vec![(def.to_string(), "UNKNOWN".to_string())];
}
// ── PulseAudio / pactl fallback ────────────────────────────────
if let Ok(info) = std::process::Command::new("pactl")
.args(["info"])
.output()
.map(|o| String::from_utf8_lossy(&o.stdout).to_string())
{
if let Some(line) = info.lines().find(|l| l.starts_with("Default Sink:")) {
let def = line["Default Sink:".len()..].trim();
return vec![(def.to_string(), "UNKNOWN".to_string())];
}
}
out
Vec::new()
}

View File

@ -1,15 +1,15 @@
// client/src/output/display.rs
use gtk::gdk;
use gtk::prelude::ListModelExt;
use gtk::prelude::*;
use gtk::gdk;
use tracing::debug;
#[derive(Clone, Debug)]
pub struct MonitorInfo {
pub geometry: gdk::Rectangle,
pub geometry: gdk::Rectangle,
pub scale_factor: i32,
pub is_internal: bool,
pub is_internal: bool,
}
/// Enumerate monitors sorted by our desired priority.
@ -22,28 +22,35 @@ pub fn enumerate_monitors() -> Vec<MonitorInfo> {
is_internal: false,
}];
};
let model = display.monitors(); // gio::ListModel
let model = display.monitors(); // gio::ListModel
let mut list: Vec<_> = (0..model.n_items())
.filter_map(|i| model.item(i))
.filter_map(|obj| obj.downcast::<gdk::Monitor>().ok())
.map(|m| {
// -------- internal vs external ----------------------------------
let connector = m.connector().unwrap_or_default(); // e.g. "eDP-1"
let connector = m.connector().unwrap_or_default(); // e.g. "eDP-1"
let is_internal = connector.starts_with("eDP")
|| connector.starts_with("LVDS")
|| connector.starts_with("DSI")
|| connector.to_ascii_lowercase().contains("internal");
|| connector.starts_with("LVDS")
|| connector.starts_with("DSI")
|| connector.to_ascii_lowercase().contains("internal");
// -------- geometry / scale --------------------------------------
let geometry = m.geometry();
let geometry = m.geometry();
let scale_factor = m.scale_factor();
debug!(
"🖥️ monitor: {:?}, connector={:?}, geom={:?}, scale={}",
m.model(), connector, geometry, scale_factor
m.model(),
connector,
geometry,
scale_factor
);
MonitorInfo { geometry, scale_factor, is_internal }
MonitorInfo {
geometry,
scale_factor,
is_internal,
}
})
.collect();

View File

@ -4,23 +4,36 @@ use super::display::MonitorInfo;
use tracing::debug;
#[derive(Clone, Copy, Debug)]
pub struct Rect { pub x: i32, pub y: i32, pub w: i32, pub h: i32 }
pub struct Rect {
pub x: i32,
pub y: i32,
pub w: i32,
pub h: i32,
}
/// Compute rectangles for N video streams (all 16:9 here).
pub fn assign_rectangles(
monitors: &[MonitorInfo],
streams: &[(&str, i32, i32)], // (name, w, h)
streams: &[(&str, i32, i32)], // (name, w, h)
) -> Vec<Rect> {
let mut rects = vec![Rect { x:0, y:0, w:0, h:0 }; streams.len()];
let mut rects = vec![
Rect {
x: 0,
y: 0,
w: 0,
h: 0
};
streams.len()
];
match monitors.len() {
0 => return rects, // impossible, but keep compiler happy
0 => return rects, // impossible, but keep compiler happy
1 => {
// One monitor: side-by-side layout
let m = &monitors[0].geometry;
let total_native_width: i32 = streams.iter().map(|(_,w,_)| *w).sum();
let total_native_width: i32 = streams.iter().map(|(_, w, _)| *w).sum();
let scale = f64::min(
m.width() as f64 / total_native_width as f64,
m.width() as f64 / total_native_width as f64,
m.height() as f64 / streams[0].2 as f64,
);
debug!("one-monitor scale = {}", scale);
@ -29,30 +42,42 @@ pub fn assign_rectangles(
for (idx, &(_, w, h)) in streams.iter().enumerate() {
let ww = (w as f64 * scale).round() as i32;
let hh = (h as f64 * scale).round() as i32;
rects[idx] = Rect { x, y: m.y(), w: ww, h: hh };
rects[idx] = Rect {
x,
y: m.y(),
w: ww,
h: hh,
};
x += ww;
}
}
_ => {
// ≥2 monitors: map 1-to-1 until we run out
for (idx, stream) in streams.iter().enumerate() {
if idx >= monitors.len() { break; }
let m = &monitors[idx];
let geom = m.geometry;
if idx >= monitors.len() {
break;
}
let m = &monitors[idx];
let geom = m.geometry;
let (w, h) = (stream.1, stream.2);
let scale = f64::min(
geom.width() as f64 / w as f64,
geom.width() as f64 / w as f64,
geom.height() as f64 / h as f64,
);
debug!("monitor#{idx} scale = {scale}");
let ww = (w as f64 * scale).round() as i32;
let hh = (h as f64 * scale).round() as i32;
let xx = geom.x() + (geom.width() - ww) / 2;
let xx = geom.x() + (geom.width() - ww) / 2;
let yy = geom.y() + (geom.height() - hh) / 2;
rects[idx] = Rect { x: xx, y: yy, w: ww, h: hh };
rects[idx] = Rect {
x: xx,
y: yy,
w: ww,
h: hh,
};
}
}
}

View File

@ -1,6 +1,6 @@
// client/src/output/mod.rs
pub mod audio;
pub mod video;
pub mod layout;
pub mod display;
pub mod layout;
pub mod video;

View File

@ -1,36 +1,20 @@
// client/src/output/video.rs
use std::process::Command;
use std::time::Duration;
use std::thread;
use anyhow::Context;
use gstreamer as gst;
use gstreamer::prelude::{Cast, ElementExt, GstBinExt, ObjectExt};
use gstreamer_app as gst_app;
use gstreamer_video::{prelude::*, VideoOverlay};
use gst::prelude::*;
use gstreamer_video::VideoOverlay;
use gstreamer_video::prelude::VideoOverlayExt;
use lesavka_common::lesavka::VideoPacket;
use tracing::{error, info, warn, debug};
use gstreamer_video as gst_video;
use gstreamer_video::glib::Type;
use std::process::Command;
use tracing::{debug, error, info, warn};
use crate::output::{display, layout};
/* ---------- pipeline ----------------------------------------------------
* H.264/AU Decoded
* AppSrc decodebin glimagesink
* (autoplug) (overlay) |
* ----------------------------------------------------------------------*/
const PIPELINE_DESC: &str = concat!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! ",
"queue leaky=downstream ! ",
"capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! ",
"h264parse disable-passthrough=true ! decodebin ! videoconvert ! ",
"glimagesink name=sink sync=false"
);
pub struct MonitorWindow {
_pipeline: gst::Pipeline,
src: gst_app::AppSrc,
src: gst_app::AppSrc,
}
impl MonitorWindow {
@ -38,14 +22,30 @@ impl MonitorWindow {
gst::init().context("initialising GStreamer")?;
// --- Build pipeline ---------------------------------------------------
let pipeline: gst::Pipeline = gst::parse::launch(PIPELINE_DESC)?
let sink = if std::env::var("GDK_BACKEND")
.map(|v| v.contains("x11"))
.unwrap_or_else(|_| std::env::var_os("DISPLAY").is_some())
{
"ximagesink name=sink sync=false"
} else {
"glimagesink name=sink sync=false"
};
let desc = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
queue leaky=downstream ! \
capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! \
h264parse disable-passthrough=true ! decodebin ! videoconvert ! {sink}"
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
/* -------- placement maths -------------------------------------- */
let monitors = display::enumerate_monitors();
let monitors = display::enumerate_monitors();
let stream_defs = &[("eye-0", 1920, 1080), ("eye-1", 1920, 1080)];
let rects = layout::assign_rectangles(&monitors, stream_defs);
let rects = layout::assign_rectangles(&monitors, stream_defs);
// --- AppSrc------------------------------------------------------------
let src: gst_app::AppSrc = pipeline
@ -54,38 +54,46 @@ impl MonitorWindow {
.downcast::<gst_app::AppSrc>()
.unwrap();
src.set_caps(Some(&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build()));
src.set_caps(Some(
&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build(),
));
src.set_format(gst::Format::Time);
/* -------- move/resize overlay ---------------------------------- */
if let Some(sink_elem) = pipeline.by_name("sink") {
if sink_elem.find_property("window-title").is_some() {
let _ = sink_elem.set_property("window-title", &format!("Lesavka-eye-{id}"));
}
if let Ok(overlay) = sink_elem.dynamic_cast::<VideoOverlay>() {
if let Some(r) = rects.get(id as usize) {
// 1. Tell glimagesink how to crop the texture in its own window
overlay.set_render_rectangle(r.x, r.y, r.w, r.h);
let _ = overlay.set_render_rectangle(r.x, r.y, r.w, r.h);
debug!(
"🔲 eye-{id} → render_rectangle({}, {}, {}, {})",
r.x, r.y, r.w, r.h
);
// 2. **Compositor-level** placement (Wayland only)
if std::env::var_os("WAYLAND_DISPLAY").is_some() {
use std::process::{Command, ExitStatus};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
// A small helper struct so the two branches return the same type
struct Placer {
name: &'static str,
run: Arc<dyn Fn(&str) -> std::io::Result<ExitStatus> + Send + Sync>,
run: Arc<dyn Fn(&str) -> std::io::Result<ExitStatus> + Send + Sync>,
}
let placer = if Command::new("swaymsg").arg("-t").arg("get_tree")
.output().is_ok()
let placer = if Command::new("swaymsg")
.arg("-t")
.arg("get_tree")
.output()
.is_ok()
{
Placer {
name: "swaymsg",
@ -111,26 +119,29 @@ impl MonitorWindow {
}),
}
};
if placer.name != "noop" {
// Criteria string that works for i3-/sway-compatible IPC
let criteria = format!(
r#"[title="^Lesavka-eye-{id}$"] \
resize set {w} {h}; \
move absolute position {x} {y}"#,
w = r.w,
h = r.h,
x = r.x,
y = r.y,
);
let cmd = match placer.name {
// Criteria string that works for i3-/sway-compatible IPC
"swaymsg" | "hyprctl" => format!(
r#"[title="^Lesavka-eye-{id}$"] \
resize set {w} {h}; \
move absolute position {x} {y}"#,
w = r.w,
h = r.h,
x = r.x,
y = r.y,
),
_ => String::new(),
};
// Retry in a detached thread - avoids blocking GStreamer
let placename = placer.name;
let runner = placer.run.clone();
let runner = placer.run.clone();
thread::spawn(move || {
for attempt in 1..=10 {
thread::sleep(Duration::from_millis(300));
match runner(&criteria) {
match runner(&cmd) {
Ok(st) if st.success() => {
tracing::info!(
"✅ {placename}: placed eye-{id} (attempt {attempt})"
@ -145,26 +156,47 @@ impl MonitorWindow {
});
}
}
// 3. X11 / Xwayland placement via wmctrl
else if std::env::var_os("DISPLAY").is_some() {
let title = format!("Lesavka-eye-{id}");
let w = r.w;
let h = r.h;
let x = r.x;
let y = r.y;
std::thread::spawn(move || {
for attempt in 1..=10 {
std::thread::sleep(std::time::Duration::from_millis(300));
let status = Command::new("wmctrl")
.args(["-r", &title, "-e", &format!("0,{x},{y},{w},{h}")])
.status();
match status {
Ok(st) if st.success() => {
tracing::info!(
"✅ wmctrl placed eye-{id} (attempt {attempt})"
);
break;
}
_ => tracing::debug!(
"⌛ wmctrl: eye-{id} not mapped yet (attempt {attempt})"
),
}
}
});
}
}
}
}
{
let id = id; // move into thread
let id = id; // move into thread
let bus = pipeline.bus().expect("no bus");
std::thread::spawn(move || {
use gst::MessageView::*;
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
StateChanged(s) if s.current() == gst::State::Playing => {
if msg
.src()
.map(|s| s.is::<gst::Pipeline>())
.unwrap_or(false)
{
info!(
"🎞️ video{id} pipeline ▶️ (sink='glimagesink')"
);
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🎞️ video{id} pipeline ▶️ (sink='glimagesink')");
}
}
Error(e) => error!(
@ -185,16 +217,23 @@ impl MonitorWindow {
pipeline.set_state(gst::State::Playing)?;
Ok(Self { _pipeline: pipeline, src })
Ok(Self {
_pipeline: pipeline,
src,
})
}
/// Feed one access-unit to the decoder.
pub fn push_packet(&self, pkt: VideoPacket) {
static CNT : std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 150 == 0 || n < 10 {
debug!(eye = pkt.id, bytes = pkt.data.len(), pts = pkt.pts, "⬇️ received video AU");
debug!(
eye = pkt.id,
bytes = pkt.data.len(),
pts = pkt.pts,
"⬇️ received video AU"
);
}
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()

View File

@ -161,44 +161,45 @@ for s in fs hs ss; do
done
# ── 4. VideoControl interface ─────────────────────────────────────
set +e # relax errors for configfs quirks
mkdir -p "$F/control/header/h" # real dir mandatory
mkdir -p "$F/control/class" # parent once
mkdir -p "$F/control/class/fs" "$F/control/class/hs" "$F/control/class/ss" 2>/dev/null || true
echo "[lesavka-core] ★ directory tree just before links:"
tree -L 3 "$F/control" | sed 's/^/[lesavka-core] /'
for s in fs hs ss; do
# ensure the perspeed dir exists (created by kernel)
mkdir -p "$F/control/class/$s" # harmless if already there
# create the mandatory *symlink inside* that directory:
ln -snf ../../header/h "$F/control/class/$s/h"
# best-effort: some UDCs reject certain speeds; skip on failure
if mkdir -p "$F/control/class/$s" 2>/dev/null; then
ln -snf "$F/control/header/h" "$F/control/class/$s/h" 2>/dev/null || \
log "⚠️ control/class/$s/h link missing (continuing)"
else
log "⚠️ skipping control/class/$s (mkdir failed)"
fi
done
for s in fs hs ss; do
[ -L "$F/control/class/$s/h" ] || {
echo "[lesavkacore] ❌ $s/h link missing, aborting" >&2
exit 1
}
[ -L "$F/control/class/$s/h" ] || log "⚠️ $s/h link missing (continuing)"
done
echo "[lesavka-core] ★ directory tree just before bind:"
tree -L 3 "$F/control" | sed 's/^/[lesavka-core] /'
for s in fs hs ss; do
[ -L "$F/control/class/$s" ] || {
echo "[lesavka-core] ❌ $s link missing, gadget aborting" >&2
exit 1
}
[ -L "$F/control/class/$s" ] || log "⚠️ $s link missing (continuing)"
done
set -e # back to strict mode
# optional: hide unsupported controls
echo 0 >"$F/control/terminal/camera/default/bmControls" 2>/dev/null || true
echo 0 >"$F/control/processing/default/bmControls" 2>/dev/null || true
# friendly label
mkdir -p "$F/control/header/strings/0x409"
echo "Lesavka UVC" >"$F/control/header/strings/0x409/label"
set +e
mkdir -p "$F/control/header/strings/0x409" 2>/dev/null || log "⚠️ skipping control/header strings (mkdir failed)"
echo "Lesavka UVC" >"$F/control/header/strings/0x409/label" 2>/dev/null || log "⚠️ unable to set UVC label (continuing)"
set -e
# ----------------------- configuration -----------------------------
mkdir -p "$G/configs/c.1/strings/0x409"

View File

@ -5,8 +5,20 @@ set -euo pipefail
ORIG_USER=${SUDO_USER:-$(id -un)}
# 1. packages (Arch)
sudo pacman -Syq --needed --noconfirm git rustup protobuf gcc evtest gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav pipewire pipewire-pulse
yay -S --noconfirm grpcurl-bin
sudo pacman -Syq --needed --noconfirm \
git rustup protobuf gcc clang evtest base-devel \
gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav \
pipewire pipewire-pulse \
wmctrl qt6-tools
# 1b. yay for AUR bits (run as the invoking user, never root)
if ! command -v yay >/dev/null 2>&1; then
sudo -u "$ORIG_USER" bash -c 'cd /tmp && git clone --depth 1 https://aur.archlinux.org/yay.git &&
cd yay && makepkg -si --noconfirm'
fi
sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin
# 1c. input access
sudo usermod -aG input "$ORIG_USER"
# 2. Rust tool-chain for both root & user
@ -14,7 +26,9 @@ sudo rustup default stable
sudo -u "$ORIG_USER" rustup default stable
# 3. clone / update into a user-writable dir
SRC="$HOME/.local/src/lesavka"
USER_HOME=$(getent passwd "$ORIG_USER" | cut -d: -f6)
SRC="$USER_HOME/.local/src/lesavka"
sudo -u "$ORIG_USER" mkdir -p "$(dirname "$SRC")"
if [[ -d $SRC/.git ]]; then
sudo -u "$ORIG_USER" git -C "$SRC" pull --ff-only
else
@ -41,7 +55,7 @@ Group=root
Environment=RUST_LOG=debug
Environment=LESAVKA_DEV_MODE=1
Environment=LESAVKA_SERVER_ADDR=http://64.25.10.31:50051
Environment=LESAVKA_SERVER_ADDR=http://38.28.125.112:50051
ExecStart=/usr/local/bin/lesavka-client
Restart=no

View File

@ -20,10 +20,12 @@ sudo pacman -Syq --needed --noconfirm git \
rustup \
protobuf \
gcc \
alsa-utils \
pipewire \
pipewire-pulse \
tailscale \
base-devel \
v4l-utils \
gstreamer \
gst-plugins-base \
gst-plugins-base-libs \
@ -42,18 +44,40 @@ if ! command -v yay >/dev/null 2>&1; then
fi
# yay -S --noconfirm grpcurl-bin
echo "==> 1c. GPIO permissions for relay"
echo 'z /dev/gpiochip* 0660 root gpio -' | sudo tee /etc/tmpfiles.d/gpiochip.conf >/dev/null
sudo systemd-tmpfiles --create /etc/tmpfiles.d/gpiochip.conf || true
echo "==> 2a. Kernel-driver tweaks"
cat <<'EOF' | sudo tee /etc/modprobe.d/gc311-stream.conf >/dev/null
options uvcvideo quirks=0x200 timeout=10000
EOF
echo "==> 2b. Predictable /dev names for each capture card"
# probe all v4l2 devices, keep only the two GC311 capture cards
# ensure relay (GPIO power) is on if present
if systemctl list-unit-files | grep -q '^relay.service'; then
sudo systemctl enable --now relay.service
sleep 2
fi
# probe v4l2 devices for GC311s (07ca:3311)
mapfile -t GC_VIDEOS < <(
sudo v4l2-ctl --list-devices |
sudo v4l2-ctl --list-devices 2>/dev/null |
awk '/Live Gamer MINI/{getline; print $1}'
)
# fallback via udev if v4l2-ctl output is empty/partial
if [ "${#GC_VIDEOS[@]}" -ne 2 ]; then
mapfile -t GC_VIDEOS < <(
for dev in /dev/video*; do
props=$(sudo udevadm info -q property -n "$dev" 2>/dev/null || true)
if echo "$props" | grep -q 'ID_VENDOR_ID=07ca' && echo "$props" | grep -q 'ID_MODEL_ID=3311'; then
echo "$dev"
fi
done | sort -u
)
fi
if [ "${#GC_VIDEOS[@]}" -ne 2 ]; then
echo "❌ Exactly two GC311 capture cards (index0) must be attached!" >&2
printf ' Detected: %s\n' "${GC_VIDEOS[@]}"
@ -89,7 +113,7 @@ sudo -u "$ORIG_USER" rustup default stable
echo "==> 4a. Source checkout"
SRC_DIR=/var/src/lesavka
REPO_URL=ssh://git@scm.bstein.dev:2242/brad_stein/lesavka.git
REPO_URL=ssh://git@scm.bstein.dev:2242/bstein/lesavka.git
if [[ ! -d $SRC_DIR ]]; then
sudo mkdir -p /var/src
sudo chown "$ORIG_USER":"$ORIG_USER" /var/src
@ -124,7 +148,8 @@ Requires=sys-kernel-config.mount
Type=oneshot
ExecStart=/usr/local/bin/lesavka-core.sh
RemainAfterExit=yes
CapabilityBoundingSet=CAP_SYS_ADMIN
CapabilityBoundingSet=CAP_SYS_ADMIN CAP_SYS_MODULE
AmbientCapabilities=CAP_SYS_MODULE
MountFlags=slave
[Install]

107
scripts/manual/eval_lesavka.sh Executable file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env bash
# scripts/manual/eval_lesavka.sh - iterative health check for lesavka client/server/gadget
# - Locally: probes TCP + gRPC handshake on LESAVKA_SERVER_ADDR
# - Optional: if TETHYS_HOST is set, ssh to run lsusb + dmesg tail (enumeration check)
# - Optional: if THEIA_HOST is set, ssh to show core/server status + hidg/uvc presence
#
# Env:
# LESAVKA_SERVER_ADDR (default http://38.28.125.112:50051)
# ITER=0 (loop forever) or number of iterations
# SLEEP=10 (seconds between iterations)
# TETHYS_HOST=host (ssh target for target machine; requires key auth)
# THEIA_HOST=host (ssh target for server/gadget Pi)
# SSH_OPTS="-o ConnectTimeout=5" (optional extra ssh flags)
set -euo pipefail
SERVER=${LESAVKA_SERVER_ADDR:-http://38.28.125.112:50051}
# default to a few iterations instead of infinite to avoid unintentional long runs
ITER=${ITER:-5}
SLEEP=${SLEEP:-10}
SSH_OPTS=${SSH_OPTS:-"-o ConnectTimeout=5 -o BatchMode=yes"}
SCRIPT_DIR="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
PROTO_DIR="${PROTO_DIR:-${SCRIPT_DIR}/../../common/proto}"
hostport=${SERVER#http://}
hostport=${hostport#https://}
host=${hostport%%:*}
port=${hostport##*:}
has_nc() { command -v nc >/dev/null 2>&1; }
has_grpc() { command -v grpcurl >/dev/null 2>&1; }
probe_server() {
echo "==> [local] $(date -Is) probing $SERVER"
if has_nc; then
if nc -zw3 "$host" "$port"; then
echo " tcp: OK (port reachable)"
else
echo " tcp: FAIL (port unreachable)"
fi
else
echo " tcp: skipped (nc not present)"
fi
if has_grpc; then
if [[ -f "${PROTO_DIR}/lesavka.proto" ]]; then
if out=$(grpcurl -plaintext -max-time 5 \
-import-path "${PROTO_DIR}" -proto lesavka.proto \
"$host:$port" lesavka.Handshake/GetCapabilities 2>&1); then
echo " gRPC Handshake (proto): OK → $out"
else
echo " gRPC Handshake (proto): FAIL → $out"
fi
else
if out=$(grpcurl -plaintext -max-time 5 "$host:$port" list 2>&1); then
echo " gRPC list (reflection): $out"
else
echo " gRPC list (reflection) FAIL → $out"
fi
fi
else
echo " gRPC: skipped (grpcurl not present)"
fi
}
probe_tethys() {
[[ -z "${TETHYS_HOST:-}" ]] && return
echo "==> [tethys] $(date -Is) checking lsusb + dmesg tail on $TETHYS_HOST"
ssh $SSH_OPTS "$TETHYS_HOST" '
lsusb;
echo "--- /dev/hidraw* ---";
ls /dev/hidraw* 2>/dev/null || true;
echo "--- dmesg (USB tail) ---";
dmesg | tail -n 20
' || echo " ssh to $TETHYS_HOST failed"
}
probe_theia() {
[[ -z "${THEIA_HOST:-}" ]] && return
echo "==> [theia] $(date -Is) checking services/device nodes on $THEIA_HOST"
ssh $SSH_OPTS "$THEIA_HOST" '
systemctl --no-pager --quiet is-active lesavka-core && echo "lesavka-core: active" || echo "lesavka-core: INACTIVE";
systemctl --no-pager --quiet is-active lesavka-server && echo "lesavka-server: active" || echo "lesavka-server: INACTIVE";
echo "--- hidg nodes ---";
ls -l /dev/hidg0 /dev/hidg1 2>/dev/null || true;
echo "--- video nodes ---";
ls -l /dev/video* 2>/dev/null | head;
echo "--- recent server log ---";
journalctl -u lesavka-server -n 20 --no-pager
' || echo " ssh to $THEIA_HOST failed"
}
count=0
while :; do
probe_server
probe_theia
probe_tethys
count=$((count + 1))
if [[ "$ITER" -gt 0 && "$count" -ge "$ITER" ]]; then
break
fi
echo "==> sleeping ${SLEEP}s (iteration $count complete)"
sleep "$SLEEP"
done
echo "Done."

View File

@ -0,0 +1,56 @@
#!/usr/bin/env bash
# scripts/manual/kde-start-tethys.sh
#
# Start/restart SDDM on tethys and set display geometry over :0.
# Intended for remote use after SSH-ing into tethys.
#
# Env overrides:
# MODE=1920x1080 (preferred mode)
# RATE=60 (refresh rate)
# OUTPUTS="HDMI-1 DP-1" (space-separated outputs to try)
# DISPLAY=:0 (X display; default :0)
# XAUTHORITY=... (override cookie; otherwise auto-detected from SDDM)
set -euo pipefail
MODE=${MODE:-1920x1080}
RATE=${RATE:-60}
OUTPUTS=${OUTPUTS:-"HDMI-1 DP-1"}
DISPLAY=${DISPLAY:-:0}
log() { printf "[kde-start] %s\n" "$*"; }
log "restarting sddm.service"
sudo systemctl restart sddm
sleep 2
# find SDDM Xauthority if not provided
if [[ -z "${XAUTHORITY:-}" ]]; then
XAUTHORITY=$(ls /var/run/sddm/*/xauth_* 2>/dev/null | head -n1 || true)
fi
if [[ -z "${XAUTHORITY:-}" ]]; then
log "warning: no XAUTHORITY found; xrandr may fail"
fi
# wait for X to come up
for attempt in {1..15}; do
if DISPLAY=$DISPLAY XAUTHORITY=${XAUTHORITY:-} xrandr --query >/dev/null 2>&1; then
break
fi
sleep 1
done
log "setting mode ${MODE}@${RATE} on outputs: ${OUTPUTS}"
for out in $OUTPUTS; do
if DISPLAY=$DISPLAY XAUTHORITY=${XAUTHORITY:-} xrandr --output "$out" --mode "$MODE" --rate "$RATE" --primary >/dev/null 2>&1; then
log "set $out to ${MODE}@${RATE}"
else
log "skip $out (xrandr failed)"
fi
done
log "current xrandr:"
DISPLAY=$DISPLAY XAUTHORITY=${XAUTHORITY:-} xrandr --query || true
log "done."

View File

@ -1,10 +1,14 @@
#!/usr/bin/env bash
# scripts/manual/usb-reset.sh
# scripts/manual/usb-reset.sh - trigger USB reset RPC on the server
set -euo pipefail
SCRIPT_DIR="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)"
PROTO_DIR="${SCRIPT_DIR}/../../common/proto"
grpcurl \
-plaintext \
-import-path ./../../common/proto \
-import-path "${PROTO_DIR}" \
-proto lesavka.proto \
-d '{}' \
64.25.10.31:50051 \
38.28.125.112:50051 \
lesavka.Relay/ResetUsb

View File

@ -1,19 +1,19 @@
// server/src/audio.rs
#![forbid(unsafe_code)]
use anyhow::{anyhow, Context};
use anyhow::{Context, anyhow};
use chrono::Local;
use futures_util::Stream;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst::ElementFactory;
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, error, warn};
use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH};
use std::sync::{Arc, Mutex};
use lesavka_common::lesavka::AudioPacket;
@ -21,7 +21,7 @@ use lesavka_common::lesavka::AudioPacket;
/// endpoint) **towards** the client.
pub struct AudioStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<AudioPacket, Status>>,
inner: ReceiverStream<Result<AudioPacket, Status>>,
}
impl Stream for AudioStream {
@ -58,17 +58,18 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
*/
let desc = build_pipeline_desc(alsa_dev)?;
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast()
.expect("pipeline");
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline");
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.expect("asink")
.downcast()
.expect("appsink");
let tap = Arc::new(Mutex::new(ClipTap::new("🎧 - ear", Duration::from_secs(60))));
let tap = Arc::new(Mutex::new(ClipTap::new(
"🎧 - ear",
Duration::from_secs(60),
)));
// sink.connect("underrun", false, |_| {
// tracing::warn!("⚠️ USB playback underrun host muted or not reading");
// None
@ -80,12 +81,19 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!("💥 audio pipeline: {} ({})",
e.error(), e.debug().unwrap_or_default()),
Warning(w) => warn!("⚠️ audio pipeline: {} ({})",
w.error(), w.debug().unwrap_or_default()),
StateChanged(s) if s.current() == gst::State::Playing =>
debug!("🎶 audio pipeline PLAYING"),
Error(e) => error!(
"💥 audio pipeline: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ audio pipeline: {} ({})",
w.error(),
w.debug().unwrap_or_default()
),
StateChanged(s) if s.current() == gst::State::Playing => {
debug!("🎶 audio pipeline PLAYING")
}
_ => {}
}
}
@ -94,52 +102,53 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
/*──────────── callbacks ────────────*/
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let tap = tap.clone();
move |s| {
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
.new_sample({
let tap = tap.clone();
move |s| {
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
// -------- cliptap (minute dumps) ------------
tap.lock().unwrap().feed(map.as_slice());
// -------- cliptap (minute dumps) ------------
tap.lock().unwrap().feed(map.as_slice());
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
debug!("🎧 ear #{n}: {}bytes", map.len());
}
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.nseconds() / 1_000;
// push nonblocking; drop oldest on overflow
if tx.try_send(Ok(AudioPacket {
id,
pts: pts_us,
data: map.as_slice().to_vec(),
})).is_err() {
static DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if d % 300 == 0 {
warn!("🎧💔 dropped {d} audio AUs (client too slow)");
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
debug!("🎧 ear #{n}: {}bytes", map.len());
}
let pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
// push nonblocking; drop oldest on overflow
if tx
.try_send(Ok(AudioPacket {
id,
pts: pts_us,
data: map.as_slice().to_vec(),
}))
.is_err()
{
static DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if d % 300 == 0 {
warn!("🎧💔 dropped {d} audio AUs (client too slow)");
}
}
Ok(gst::FlowSuccess::Ok)
}
Ok(gst::FlowSuccess::Ok)
}
}).build(),
})
.build(),
);
pipeline.set_state(gst::State::Playing)
pipeline
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
Ok(AudioStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
inner: ReceiverStream::new(rx),
})
}
@ -152,9 +161,7 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
.into_iter()
.find(|&e| {
reg.find_plugin(e).is_some()
|| reg
.find_feature(e, ElementFactory::static_type())
.is_some()
|| reg.find_feature(e, ElementFactory::static_type()).is_some()
})
.ok_or_else(|| anyhow!("no AAC encoder plugin available"))?;
@ -176,8 +183,8 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
// ────────────────────── minuteclip helper ───────────────────────────────
pub struct ClipTap {
buf: Vec<u8>,
tag: &'static str,
buf: Vec<u8>,
tag: &'static str,
next_dump: Instant,
period: Duration,
}
@ -222,8 +229,8 @@ impl Drop for ClipTap {
// ────────────────────── microphone sink ────────────────────────────────
pub struct Voice {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline, // keep pipeline alive
tap: ClipTap,
_pipe: gst::Pipeline, // keep pipeline alive
tap: ClipTap,
}
impl Voice {
@ -236,7 +243,7 @@ impl Voice {
let pipeline = gst::Pipeline::new();
// elements
let appsrc = gst::ElementFactory::make("appsrc")
let appsrc = gst::ElementFactory::make("appsrc")
.build()
.context("make appsrc")?
.downcast::<gst_app::AppSrc>()
@ -246,10 +253,10 @@ impl Voice {
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
let decodebin = gst::ElementFactory::make("decodebin")
let decodebin = gst::ElementFactory::make("decodebin")
.build()
.context("make decodebin")?;
let alsa_sink = gst::ElementFactory::make("alsasink")
let alsa_sink = gst::ElementFactory::make("alsasink")
.build()
.context("make alsasink")?;
@ -259,7 +266,7 @@ impl Voice {
appsrc.link(&decodebin)?;
/*------------ decodebin autolink ----------------*/
let sink_clone = alsa_sink.clone(); // keep original for later
let sink_clone = alsa_sink.clone(); // keep original for later
decodebin.connect_pad_added(move |_db, pad| {
let sink_pad = sink_clone.static_pad("sink").unwrap();
if !sink_pad.is_linked() {

View File

@ -1,7 +1,13 @@
// server/src/gadget.rs
use std::{fs::{self, OpenOptions}, io::Write, path::Path, thread, time::Duration};
use anyhow::{Context, Result};
use tracing::{info, warn, trace};
use std::{
fs::{self, OpenOptions},
io::Write,
path::Path,
thread,
time::Duration,
};
use tracing::{info, trace, warn};
#[derive(Clone)]
pub struct UsbGadget {
@ -46,8 +52,10 @@ impl UsbGadget {
}
thread::sleep(Duration::from_millis(50));
}
Err(anyhow::anyhow!("UDC never reached '{wanted}' (last = {:?})",
fs::read_to_string(&path).unwrap_or_default()))
Err(anyhow::anyhow!(
"UDC never reached '{wanted}' (last = {:?})",
fs::read_to_string(&path).unwrap_or_default()
))
}
pub fn wait_state_any(ctrl: &str, limit_ms: u64) -> anyhow::Result<String> {
@ -61,7 +69,9 @@ impl UsbGadget {
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
Err(anyhow::anyhow!("UDC state did not settle within {limit_ms}ms"))
Err(anyhow::anyhow!(
"UDC state did not settle within {limit_ms}ms"
))
}
/// Write `value` (plus “\n”) into a sysfs attribute
@ -81,14 +91,18 @@ impl UsbGadget {
}
thread::sleep(Duration::from_millis(50));
}
Err(anyhow::anyhow!("⚠️ UDC {ctrl} did not re-appear within {limit_ms}ms"))
Err(anyhow::anyhow!(
"⚠️ UDC {ctrl} did not re-appear within {limit_ms}ms"
))
}
/// Scan platform devices when /sys/class/udc is empty
fn probe_platform_udc() -> Result<Option<String>> {
for entry in fs::read_dir("/sys/bus/platform/devices")? {
let p = entry?.file_name().into_string().unwrap();
if p.ends_with(".usb") { return Ok(Some(p)); }
if p.ends_with(".usb") {
return Ok(Some(p));
}
}
Ok(None)
}
@ -98,26 +112,29 @@ impl UsbGadget {
/// Hard-reset the gadget → identical to a physical cable re-plug
pub fn cycle(&self) -> Result<()> {
/* 0-ensure we *know* the controller even after a previous crash */
let ctrl = Self::find_controller()
.or_else(|_| Self::probe_platform_udc()?
.ok_or_else(|| anyhow::anyhow!("no UDC present")))?;
let ctrl = Self::find_controller().or_else(|_| {
Self::probe_platform_udc()?.ok_or_else(|| anyhow::anyhow!("no UDC present"))
})?;
/* 1 - detach gadget */
info!("🔌 detaching gadget from {ctrl}");
// a) drop pull-ups (if the controller offers the switch)
let sc = format!("/sys/class/udc/{ctrl}/soft_connect");
let _ = Self::write_attr(&sc, "0"); // ignore errors - not all HW has it
let _ = Self::write_attr(&sc, "0"); // ignore errors - not all HW has it
// b) clear the UDC attribute; the kernel may transiently answer EBUSY
for attempt in 1..=10 {
match Self::write_attr(self.udc_file, "") {
Ok(_) => break,
Err(err) if {
// only swallow EBUSY
err.downcast_ref::<std::io::Error>()
.and_then(|io| io.raw_os_error())
== Some(libc::EBUSY) && attempt < 10
} => {
Err(err)
if {
// only swallow EBUSY
err.downcast_ref::<std::io::Error>()
.and_then(|io| io.raw_os_error())
== Some(libc::EBUSY)
&& attempt < 10
} =>
{
trace!("⏳ UDC busy (attempt {attempt}/10) - retrying…");
thread::sleep(Duration::from_millis(100));
}
@ -146,10 +163,10 @@ impl UsbGadget {
Some(libc::EINVAL) | Some(libc::EPERM) | Some(libc::ENOENT) => {
warn!("⚠️ soft_connect unsupported ({io}); continuing");
}
_ => return Err(err), // propagate all other errors
_ => return Err(err), // propagate all other errors
}
} else {
return Err(err); // non-IO errors: propagate
return Err(err); // non-IO errors: propagate
}
}
Ok(_) => { /* success */ }
@ -157,21 +174,20 @@ impl UsbGadget {
}
/* 5 - wait for host (but tolerate sleep) */
Self::wait_state(&ctrl, "configured", 6_000)
.or_else(|e| {
// If the host is physically absent (sleep / KVM paused)
// we allow 'not attached' and continue - we can still
// accept keyboard/mouse data and the host will enumerate
// later without another reset.
let last = fs::read_to_string(format!("/sys/class/udc/{ctrl}/state"))
.unwrap_or_default();
if last.trim() == "not attached" {
warn!("⚠️ host did not enumerate within 6s - continuing (state = {last:?})");
Ok(())
} else {
Err(e)
}
})?;
Self::wait_state(&ctrl, "configured", 6_000).or_else(|e| {
// If the host is physically absent (sleep / KVM paused)
// we allow 'not attached' and continue - we can still
// accept keyboard/mouse data and the host will enumerate
// later without another reset.
let last =
fs::read_to_string(format!("/sys/class/udc/{ctrl}/state")).unwrap_or_default();
if last.trim() == "not attached" {
warn!("⚠️ host did not enumerate within 6s - continuing (state = {last:?})");
Ok(())
} else {
Err(e)
}
})?;
info!("✅ USB-gadget cycle complete");
Ok(())
@ -182,8 +198,10 @@ impl UsbGadget {
let cand = ["dwc2", "dwc3"];
for drv in cand {
let root = format!("/sys/bus/platform/drivers/{drv}");
if !Path::new(&root).exists() { continue }
if !Path::new(&root).exists() {
continue;
}
/*----------- unbind ------------------------------------------------*/
info!("🔧 unbinding UDC driver ({drv})");
for attempt in 1..=20 {
@ -193,34 +211,32 @@ impl UsbGadget {
trace!("unbind in-progress (#{attempt}) - waiting…");
thread::sleep(Duration::from_millis(100));
}
Err(err) => return Err(err)
.context("UDC unbind failed irrecoverably"),
Err(err) => return Err(err).context("UDC unbind failed irrecoverably"),
}
}
thread::sleep(Duration::from_millis(150)); // let the core quiesce
thread::sleep(Duration::from_millis(150)); // let the core quiesce
/*----------- bind --------------------------------------------------*/
info!("🔧 binding UDC driver ({drv})");
for attempt in 1..=20 {
match Self::write_attr(format!("{root}/bind"), ctrl) {
Ok(_) => return Ok(()), // success 🎉
Ok(_) => return Ok(()), // success 🎉
Err(err) if attempt < 20 && Self::is_still_detaching(&err) => {
trace!("bind busy (#{attempt}) - retrying…");
thread::sleep(Duration::from_millis(100));
}
Err(err) => return Err(err)
.context("UDC bind failed irrecoverably"),
Err(err) => return Err(err).context("UDC bind failed irrecoverably"),
}
}
}
Err(anyhow::anyhow!("no dwc2/dwc3 driver nodes found"))
}
fn is_still_detaching(err: &anyhow::Error) -> bool {
err.downcast_ref::<std::io::Error>()
.and_then(|io| io.raw_os_error())
.map_or(false, |code| {
matches!(code, libc::EBUSY | libc::ENOENT | libc::ENODEV)
})
.and_then(|io| io.raw_os_error())
.map_or(false, |code| {
matches!(code, libc::EBUSY | libc::ENOENT | libc::ENODEV)
})
}
}

View File

@ -7,7 +7,7 @@ use lesavka_common::lesavka::{
};
pub struct HandshakeSvc {
pub camera: bool,
pub camera: bool,
pub microphone: bool,
}
@ -18,7 +18,7 @@ impl Handshake for HandshakeSvc {
_req: Request<Empty>,
) -> Result<Response<HandshakeSet>, Status> {
Ok(Response::new(HandshakeSet {
camera: self.camera,
camera: self.camera,
microphone: self.microphone,
}))
}
@ -26,6 +26,9 @@ impl Handshake for HandshakeSvc {
impl HandshakeSvc {
pub fn server() -> HandshakeServer<Self> {
HandshakeServer::new(Self { camera: true, microphone: true })
HandshakeServer::new(Self {
camera: true,
microphone: true,
})
}
}

View File

@ -1,6 +1,6 @@
// server/src/lib.rs
pub mod audio;
pub mod video;
pub mod gadget;
pub mod handshake;
pub mod video;

View File

@ -2,33 +2,27 @@
// server/src/main.rs
#![forbid(unsafe_code)]
use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc};
use std::sync::atomic::AtomicBool;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::Context as _;
use futures_util::{Stream, StreamExt};
use tokio::{
fs::{OpenOptions},
io::AsyncWriteExt,
sync::Mutex,
};
use gstreamer as gst;
use std::sync::atomic::AtomicBool;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc};
use tokio::{fs::OpenOptions, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tonic::transport::Server;
use tonic_reflection::server::{Builder as ReflBuilder};
use tracing::{info, warn, error, trace, debug};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tonic::{Request, Response, Status};
use tonic_reflection::server::Builder as ReflBuilder;
use tracing::{debug, error, info, trace, warn};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use lesavka_common::lesavka::{
Empty, ResetUsbReply,
AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, ResetUsbReply, VideoPacket,
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket, AudioPacket
};
use lesavka_server::{gadget::UsbGadget, video, audio, handshake::HandshakeSvc};
use lesavka_server::{audio, gadget::UsbGadget, handshake::HandshakeSvc, video};
/*──────────────── constants ────────────────*/
/// **false** = never reset automatically.
@ -39,22 +33,19 @@ const PKG_NAME: &str = env!("CARGO_PKG_NAME");
/*──────────────── logging ───────────────────*/
fn init_tracing() -> anyhow::Result<WorkerGuard> {
let file = std::fs::OpenOptions::new()
.create(true).truncate(true).write(true)
.create(true)
.truncate(true)
.write(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, guard) = tracing_appender::non_blocking(file);
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("lesavka_server=info,lesavka_server::video=warn")
});
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=info,lesavka_server::video=warn"));
let filter_str = env_filter.to_string();
tracing_subscriber::registry()
.with(env_filter)
.with(
fmt::layer()
.with_target(true)
.with_thread_ids(true),
)
.with(fmt::layer().with_target(true).with_thread_ids(true))
.with(
fmt::layer()
.with_writer(file_writer)
@ -69,9 +60,13 @@ fn init_tracing() -> anyhow::Result<WorkerGuard> {
/*──────────────── helpers ───────────────────*/
async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
for attempt in 1..=200 { // ≈10s
for attempt in 1..=200 {
// ≈10s
match OpenOptions::new()
.write(true).custom_flags(libc::O_NONBLOCK).open(path).await
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.await
{
Ok(f) => {
info!("✅ {path} opened on attempt #{attempt}");
@ -88,13 +83,43 @@ async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
}
fn next_minute() -> SystemTime {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH).unwrap();
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let secs = now.as_secs();
let next = (secs / 60 + 1) * 60;
UNIX_EPOCH + Duration::from_secs(next)
}
/// Pick the UVC gadget video node.
/// Priority: 1) `LESAVKA_UVC_DEV` override; 2) first `video_output` node.
/// Returns an error when nothing matches instead of guessing a capture card.
fn pick_uvc_device() -> anyhow::Result<String> {
if let Ok(path) = std::env::var("LESAVKA_UVC_DEV") {
return Ok(path);
}
// walk /dev/video* via udev and look for an outputcapable node (gadget exposes one)
if let Ok(mut en) = udev::Enumerator::new() {
let _ = en.match_subsystem("video4linux");
if let Ok(devs) = en.scan_devices() {
for dev in devs {
let caps = dev
.property_value("ID_V4L_CAPABILITIES")
.and_then(|v| v.to_str())
.unwrap_or_default();
if caps.contains(":video_output:") {
if let Some(node) = dev.devnode() {
return Ok(node.to_string_lossy().into_owned());
}
}
}
}
}
Err(anyhow::anyhow!(
"no video_output v4l2 node found; set LESAVKA_UVC_DEV"
))
}
/*──────────────── Handler ───────────────────*/
struct Handler {
kb: Arc<Mutex<tokio::fs::File>>,
@ -107,7 +132,7 @@ impl Handler {
async fn new(gadget: UsbGadget) -> anyhow::Result<Self> {
if AUTO_CYCLE {
info!("🛠️ Initial USB reset…");
let _ = gadget.cycle(); // ignore failure - may boot without host
let _ = gadget.cycle(); // ignore failure - may boot without host
} else {
info!("🛠️ AUTO_CYCLE disabled - no initial reset");
}
@ -138,10 +163,10 @@ impl Handler {
#[tonic::async_trait]
impl Relay for Handler {
/* existing streams ─ unchanged, except: no more auto-reset */
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send>>;
type CaptureAudioStream = Pin<Box<dyn Stream<Item=Result<AudioPacket,Status>> + Send>>;
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send>>;
type CaptureAudioStream = Pin<Box<dyn Stream<Item = Result<AudioPacket, Status>> + Send>>;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
@ -191,9 +216,11 @@ impl Relay for Handler {
&self,
req: Request<tonic::Streaming<AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
// 1 ─ build once, early
let mut sink = audio::Voice::new("hw:UAC2Gadget,0").await
let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
info!(%uac_dev, "🎤 stream_microphone using UAC sink");
let mut sink = audio::Voice::new(&uac_dev)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
// 2 ─ dummy outbound stream (same trick as before)
@ -202,8 +229,7 @@ impl Relay for Handler {
// 3 ─ drive the sink in a background task
tokio::spawn(async move {
let mut inbound = req.into_inner();
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
while let Some(pkt) = inbound.next().await.transpose()? {
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@ -212,7 +238,7 @@ impl Relay for Handler {
}
sink.push(&pkt);
}
sink.finish(); // flush on EOS
sink.finish(); // flush on EOS
let _ = tx.send(Ok(Empty {})).await;
Ok::<(), Status>(())
});
@ -225,25 +251,25 @@ impl Relay for Handler {
req: Request<tonic::Streaming<VideoPacket>>,
) -> Result<Response<Self::StreamCameraStream>, Status> {
// map gRPC camera id → UVC device
let uvc = std::env::var("LESAVKA_UVC_DEV")
.unwrap_or_else(|_| "/dev/video4".into());
let uvc = pick_uvc_device().map_err(|e| Status::internal(format!("{e:#}")))?;
info!(%uvc, "🎥 stream_camera using UVC sink");
// build once
let relay = video::CameraRelay::new(0, &uvc)
.map_err(|e| Status::internal(format!("{e:#}")))?;
let relay =
video::CameraRelay::new(0, &uvc).map_err(|e| Status::internal(format!("{e:#}")))?;
// dummy outbound (same pattern as other streams)
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
relay.feed(pkt); // ← all logging inside video.rs
relay.feed(pkt); // ← all logging inside video.rs
}
tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
@ -269,10 +295,9 @@ impl Relay for Handler {
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
// Only one speaker stream for now; both 0/1 → same ALSA dev.
let _id = req.into_inner().id;
let _id = req.into_inner().id;
// Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging).
let dev = std::env::var("LESAVKA_ALSA_DEV")
.unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let s = audio::ear(&dev, 0)
.await
@ -282,10 +307,7 @@ impl Relay for Handler {
}
/*────────────── USB-reset RPC ────────────*/
async fn reset_usb(
&self,
_req: Request<Empty>,
) -> Result<Response<ResetUsbReply>, Status> {
async fn reset_usb(&self, _req: Request<Empty>) -> Result<Response<ResetUsbReply>, Status> {
info!("🔴 explicit ResetUsb() called");
match self.gadget.cycle() {
Ok(_) => {
@ -314,17 +336,17 @@ async fn main() -> anyhow::Result<()> {
error!("💥 panic: {p}\n{bt}");
}));
let gadget = UsbGadget::new("lesavka");
let gadget = UsbGadget::new("lesavka");
let handler = Handler::new(gadget.clone()).await?;
info!("🌐 lesavka-server listening on 0.0.0.0:50051");
Server::builder()
.tcp_nodelay(true)
.max_frame_size(Some(2*1024*1024))
.max_frame_size(Some(2 * 1024 * 1024))
.add_service(RelayServer::new(handler))
.add_service(HandshakeSvc::server())
.add_service(ReflBuilder::configure().build_v1().unwrap())
.serve(([0,0,0,0], 50051).into())
.serve(([0, 0, 0, 0], 50051).into())
.await?;
Ok(())
}

View File

@ -1,19 +1,52 @@
// server/src/video.rs
use anyhow::Context;
use futures_util::Stream;
use gst::prelude::*;
use gst::MessageView::*;
use gst::MessageView;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst::{log, MessageView};
use gst::MessageView::*;
use lesavka_common::lesavka::VideoPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, warn, error, info, enabled, trace, Level};
use futures_util::Stream;
use tracing::{Level, debug, enabled, error, info, trace, warn};
const EYE_ID: [&str; 2] = ["l", "r"];
static START: std::sync::OnceLock<gst::ClockTime> = std::sync::OnceLock::new();
static DEV_MODE: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
fn dev_mode_enabled() -> bool {
*DEV_MODE
.get_or_init(|| std::env::var("LESAVKA_DEV_MODE").is_ok())
}
fn contains_idr(h264: &[u8]) -> bool {
// naive AnnexB scan for H.264 IDR (NAL type 5)
let mut i = 0;
while i + 4 < h264.len() {
// find start code 0x000001 or 0x00000001
if h264[i] == 0 && h264[i + 1] == 0 {
let offset = if h264[i + 2] == 1 {
3
} else if h264[i + 2] == 0 && h264[i + 3] == 1 {
4
} else {
i += 1;
continue;
};
let nal_idx = i + offset;
if nal_idx < h264.len() {
let nal = h264[nal_idx] & 0x1F;
if nal == 5 {
return true;
}
}
}
i += 1;
}
false
}
pub struct VideoStream {
_pipeline: gst::Pipeline,
@ -37,11 +70,7 @@ impl Drop for VideoStream {
}
}
pub async fn eye_ball(
dev: &str,
id: u32,
_max_bitrate_kbit: u32,
) -> anyhow::Result<VideoStream> {
pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
@ -79,8 +108,10 @@ pub async fn eye_ball(
/* ----- BUS WATCH: show errors & warnings immediately --------------- */
let bus = pipeline.bus().expect("bus");
if let Some(src_pad) = pipeline.by_name(&format!("cam_{eye}"))
.and_then(|e| e.static_pad("src")) {
if let Some(src_pad) = pipeline
.by_name(&format!("cam_{eye}"))
.and_then(|e| e.static_pad("src"))
{
src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if let gst::EventView::Caps(c) = ev.view() {
@ -139,10 +170,9 @@ pub async fn eye_ball(
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
/* -------- basic counters ------ */
static FRAME: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 {
if n % 120 == 0 && contains_idr(map.as_slice()) {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames");
if enabled!(Level::TRACE) {
let path = format!("/tmp/eye-{eye}-srv-{:05}.h264", n);
@ -157,7 +187,9 @@ pub async fn eye_ball(
/* -------- detect SPS / IDR ---- */
if enabled!(Level::DEBUG) {
if let Some(&nal) = map.as_slice().get(4) {
if (nal & 0x1F) == 0x05 /* IDR */ {
if (nal & 0x1F) == 0x05
/* IDR */
{
debug!("eye-{eye}: IDR");
}
}
@ -175,7 +207,11 @@ pub async fn eye_ball(
/* -------- ship over gRPC ----- */
let data = map.as_slice().to_vec();
let size = data.len();
let pkt = VideoPacket { id, pts: pts_us, data };
let pkt = VideoPacket {
id,
pts: pts_us,
data,
};
match tx.try_send(Ok(pkt)) {
Ok(_) => {
trace!(target:"lesavka_server::video",
@ -202,22 +238,31 @@ pub async fn eye_ball(
.build(),
);
pipeline.set_state(gst::State::Playing).context("🎥 starting video pipeline eye-{eye}")?;
pipeline
.set_state(gst::State::Playing)
.context("🎥 starting video pipeline eye-{eye}")?;
let bus = pipeline.bus().unwrap();
loop {
match bus.timed_pop(gst::ClockTime::NONE) {
Some(msg) if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) => break,
Some(msg)
if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) =>
{
break;
}
Some(_) => continue,
None => continue,
}
}
Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx) })
Ok(VideoStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
})
}
pub struct WebcamSink {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline,
_pipe: gst::Pipeline,
}
impl WebcamSink {
@ -225,39 +270,85 @@ impl WebcamSink {
gst::init()?;
let pipeline = gst::Pipeline::new();
let caps_h264 = gst::Caps::builder("video/x-h264")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
let raw_caps = gst::Caps::builder("video/x-raw")
.field("format", "YUY2")
.field("width", 1280i32)
.field("height", 720i32)
.field("framerate", gst::Fraction::new(30, 1))
.build();
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_caps(Some(&caps_h264));
src.set_property("block", &true);
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder = gst::ElementFactory::make("v4l2h264dec").build()?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
let decoder_name = Self::pick_decoder();
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building decoder element {decoder_name}"))?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
// Upcast to &gst::Element for the collection macros
pipeline.add_many(&[
src.upcast_ref(), &h264parse, &decoder, &convert, &sink
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&caps,
&sink,
])?;
gst::Element::link_many(&[
src.upcast_ref(), &h264parse, &decoder, &convert, &sink
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&caps,
&sink,
])?;
pipeline.set_state(gst::State::Playing)?;
Ok(Self { appsrc: src, _pipe: pipeline })
Ok(Self {
appsrc: src,
_pipe: pipeline,
})
}
pub fn push(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut().unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.appsrc.push_buffer(buf);
}
fn pick_decoder() -> &'static str {
if gst::ElementFactory::find("v4l2h264dec").is_some() {
"v4l2h264dec"
} else if gst::ElementFactory::find("v4l2slh264dec").is_some() {
"v4l2slh264dec"
} else if gst::ElementFactory::find("omxh264dec").is_some() {
"omxh264dec"
} else {
"avdec_h264"
}
}
}
/*─────────────────────────────────*/
@ -265,15 +356,15 @@ impl WebcamSink {
/*─────────────────────────────────*/
pub struct CameraRelay {
sink: WebcamSink, // the v4l2sink pipeline (or stub)
id: u32, // gRPC “id” (for future multicam)
sink: WebcamSink, // the v4l2sink pipeline (or stub)
id: u32, // gRPC “id” (for future multicam)
frames: std::sync::atomic::AtomicU64,
}
impl CameraRelay {
pub fn new(id: u32, uvc_dev: &str) -> anyhow::Result<Self> {
Ok(Self {
sink: WebcamSink::new(uvc_dev)?,
sink: WebcamSink::new(uvc_dev)?,
id,
frames: std::sync::atomic::AtomicU64::new(0),
})
@ -281,7 +372,9 @@ impl CameraRelay {
/// Push one VideoPacket coming from the client
pub fn feed(&self, pkt: VideoPacket) {
let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let n = self
.frames
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 60 == 0 {
tracing::debug!(target:"lesavka_server::video",
cam_id = self.id,
@ -296,14 +389,15 @@ impl CameraRelay {
"📸📥 srv pkt");
}
if cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE) {
if n % 120 == 0 {
let path = format!("/tmp/eye3-cli-{n:05}.h264");
if let Err(e) = std::fs::write(&path, &pkt.data) {
tracing::warn!("📸💾 dump failed: {e}");
} else {
tracing::debug!("📸💾 wrote {}", path);
}
if dev_mode_enabled()
&& (cfg!(debug_assertions) || tracing::enabled!(tracing::Level::TRACE))
&& contains_idr(&pkt.data)
{
let path = format!("/tmp/eye3-cli-{n:05}.h264");
if let Err(e) = std::fs::write(&path, &pkt.data) {
tracing::warn!("📸💾 dump failed: {e}");
} else {
tracing::debug!("📸💾 wrote {}", path);
}
}

View File

@ -1,14 +1,24 @@
#[tokio::test]
async fn hid_roundtrip() {
use lesavka_common::lesavka::*;
use lesavka_server::RelaySvc; // export the struct in lib.rs
use lesavka_server::RelaySvc; // export the struct in lib.rs
let svc = RelaySvc::default();
let (mut cli, srv) = tonic::transport::Channel::balance_channel(1);
tokio::spawn(tonic::transport::server::Server::builder()
.add_service(relay_server::RelayServer::new(svc))
.serve_with_incoming(srv));
tokio::spawn(
tonic::transport::server::Server::builder()
.add_service(relay_server::RelayServer::new(svc))
.serve_with_incoming(srv),
);
let (mut tx, mut rx) = relay_client::RelayClient::new(cli).stream().await.unwrap().into_inner();
tx.send(HidReport { data: vec![0,0,4,0,0,0,0,0] }).await.unwrap();
assert!(rx.message().await.unwrap().is_none()); // nothing echoed yet
let (mut tx, mut rx) = relay_client::RelayClient::new(cli)
.stream()
.await
.unwrap()
.into_inner();
tx.send(HidReport {
data: vec![0, 0, 4, 0, 0, 0, 0, 0],
})
.await
.unwrap();
assert!(rx.message().await.unwrap().is_none()); // nothing echoed yet
}