From d2677afc465da540fcee5abd5b9b75fb0f2ce2bb Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 30 Nov 2025 16:16:03 -0300 Subject: [PATCH] install updates - relay service update --- .gitignore | 1 + scripts/install/client.sh | 21 ++++-- scripts/install/server.sh | 25 ++++++- server/src/audio.rs | 137 ++++++++++++++++++++------------------ server/src/gadget.rs | 110 +++++++++++++++++------------- server/src/handshake.rs | 9 ++- server/src/lib.rs | 2 +- server/src/main.rs | 104 +++++++++++++---------------- server/src/video.rs | 132 ++++++++++++++++++++++++++---------- server/tests/hid.rs | 24 +++++-- 10 files changed, 341 insertions(+), 224 deletions(-) diff --git a/.gitignore b/.gitignore index 45e8077..2852ef9 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ override.toml **/*~ *.swp *.swo +*.md diff --git a/scripts/install/client.sh b/scripts/install/client.sh index ceed952..8fb8de7 100755 --- a/scripts/install/client.sh +++ b/scripts/install/client.sh @@ -5,8 +5,19 @@ set -euo pipefail ORIG_USER=${SUDO_USER:-$(id -un)} # 1. packages (Arch) -sudo pacman -Syq --needed --noconfirm git rustup protobuf gcc evtest gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav pipewire pipewire-pulse -yay -S --noconfirm grpcurl-bin +sudo pacman -Syq --needed --noconfirm \ + git rustup protobuf gcc evtest base-devel \ + gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav \ + pipewire pipewire-pulse + +# 1b. yay for AUR bits (run as the invoking user, never root) +if ! command -v yay >/dev/null 2>&1; then + sudo -u "$ORIG_USER" bash -c 'cd /tmp && git clone --depth 1 https://aur.archlinux.org/yay.git && + cd yay && makepkg -si --noconfirm' +fi +sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin + +# 1c. input access sudo usermod -aG input "$ORIG_USER" # 2. Rust tool-chain for both root & user @@ -14,7 +25,9 @@ sudo rustup default stable sudo -u "$ORIG_USER" rustup default stable # 3. clone / update into a user-writable dir -SRC="$HOME/.local/src/lesavka" +USER_HOME=$(getent passwd "$ORIG_USER" | cut -d: -f6) +SRC="$USER_HOME/.local/src/lesavka" +sudo -u "$ORIG_USER" mkdir -p "$(dirname "$SRC")" if [[ -d $SRC/.git ]]; then sudo -u "$ORIG_USER" git -C "$SRC" pull --ff-only else @@ -41,7 +54,7 @@ Group=root Environment=RUST_LOG=debug Environment=LESAVKA_DEV_MODE=1 -Environment=LESAVKA_SERVER_ADDR=http://64.25.10.31:50051 +Environment=LESAVKA_SERVER_ADDR=http://38.28.125.112:50051 ExecStart=/usr/local/bin/lesavka-client Restart=no diff --git a/scripts/install/server.sh b/scripts/install/server.sh index b1322e9..faf3bd2 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -24,6 +24,7 @@ sudo pacman -Syq --needed --noconfirm git \ pipewire-pulse \ tailscale \ base-devel \ + v4l-utils \ gstreamer \ gst-plugins-base \ gst-plugins-base-libs \ @@ -48,12 +49,30 @@ options uvcvideo quirks=0x200 timeout=10000 EOF echo "==> 2b. Predictable /dev names for each capture card" -# probe all v4l2 devices, keep only the two GC311 capture cards +# ensure relay (GPIO power) is on if present +if systemctl list-unit-files | grep -q '^relay.service'; then + sudo systemctl enable --now relay.service + sleep 2 +fi + +# probe v4l2 devices for GC311s (07ca:3311) mapfile -t GC_VIDEOS < <( - sudo v4l2-ctl --list-devices | + sudo v4l2-ctl --list-devices 2>/dev/null | awk '/Live Gamer MINI/{getline; print $1}' ) +# fallback via udev if v4l2-ctl output is empty/partial +if [ "${#GC_VIDEOS[@]}" -ne 2 ]; then + mapfile -t GC_VIDEOS < <( + for dev in /dev/video*; do + props=$(sudo udevadm info -q property -n "$dev" 2>/dev/null || true) + if echo "$props" | grep -q 'ID_VENDOR_ID=07ca' && echo "$props" | grep -q 'ID_MODEL_ID=3311'; then + echo "$dev" + fi + done | sort -u + ) +fi + if [ "${#GC_VIDEOS[@]}" -ne 2 ]; then echo "❌ Exactly two GC311 capture cards (index0) must be attached!" >&2 printf ' Detected: %s\n' "${GC_VIDEOS[@]}" @@ -89,7 +108,7 @@ sudo -u "$ORIG_USER" rustup default stable echo "==> 4a. Source checkout" SRC_DIR=/var/src/lesavka -REPO_URL=ssh://git@scm.bstein.dev:2242/brad_stein/lesavka.git +REPO_URL=ssh://git@scm.bstein.dev:2242/bstein/lesavka.git if [[ ! -d $SRC_DIR ]]; then sudo mkdir -p /var/src sudo chown "$ORIG_USER":"$ORIG_USER" /var/src diff --git a/server/src/audio.rs b/server/src/audio.rs index 08a4328..a03540a 100644 --- a/server/src/audio.rs +++ b/server/src/audio.rs @@ -1,19 +1,19 @@ // server/src/audio.rs #![forbid(unsafe_code)] -use anyhow::{anyhow, Context}; +use anyhow::{Context, anyhow}; use chrono::Local; use futures_util::Stream; -use gstreamer as gst; -use gstreamer_app as gst_app; -use gst::prelude::*; use gst::ElementFactory; use gst::MessageView::*; +use gst::prelude::*; +use gstreamer as gst; +use gstreamer_app as gst_app; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; use tracing::{debug, error, warn}; -use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; -use std::sync::{Arc, Mutex}; use lesavka_common::lesavka::AudioPacket; @@ -21,7 +21,7 @@ use lesavka_common::lesavka::AudioPacket; /// endpoint) **towards** the client. pub struct AudioStream { _pipeline: gst::Pipeline, - inner: ReceiverStream>, + inner: ReceiverStream>, } impl Stream for AudioStream { @@ -58,17 +58,18 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { */ let desc = build_pipeline_desc(alsa_dev)?; - let pipeline: gst::Pipeline = gst::parse::launch(&desc)? - .downcast() - .expect("pipeline"); + let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline"); let sink: gst_app::AppSink = pipeline .by_name("asink") .expect("asink") .downcast() .expect("appsink"); - - let tap = Arc::new(Mutex::new(ClipTap::new("🎧 - ear", Duration::from_secs(60)))); + + let tap = Arc::new(Mutex::new(ClipTap::new( + "🎧 - ear", + Duration::from_secs(60), + ))); // sink.connect("underrun", false, |_| { // tracing::warn!("⚠️ USB playback underrun – host muted or not reading"); // None @@ -80,12 +81,19 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { std::thread::spawn(move || { for msg in bus.iter_timed(gst::ClockTime::NONE) { match msg.view() { - Error(e) => error!("πŸ’₯ audio pipeline: {} ({})", - e.error(), e.debug().unwrap_or_default()), - Warning(w) => warn!("⚠️ audio pipeline: {} ({})", - w.error(), w.debug().unwrap_or_default()), - StateChanged(s) if s.current() == gst::State::Playing => - debug!("🎢 audio pipeline PLAYING"), + Error(e) => error!( + "πŸ’₯ audio pipeline: {} ({})", + e.error(), + e.debug().unwrap_or_default() + ), + Warning(w) => warn!( + "⚠️ audio pipeline: {} ({})", + w.error(), + w.debug().unwrap_or_default() + ), + StateChanged(s) if s.current() == gst::State::Playing => { + debug!("🎢 audio pipeline PLAYING") + } _ => {} } } @@ -94,52 +102,53 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { /*──────────── callbacks ────────────*/ sink.set_callbacks( gst_app::AppSinkCallbacks::builder() - .new_sample({ - let tap = tap.clone(); - move |s| { - let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?; - let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; - let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + .new_sample({ + let tap = tap.clone(); + move |s| { + let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; - // -------- clip‑tap (minute dumps) ------------ - tap.lock().unwrap().feed(map.as_slice()); + // -------- clip‑tap (minute dumps) ------------ + tap.lock().unwrap().feed(map.as_slice()); - static CNT: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(0); - let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if n < 10 || n % 300 == 0 { - debug!("🎧 ear #{n}: {}β€―bytes", map.len()); - } - - let pts_us = buffer - .pts() - .unwrap_or(gst::ClockTime::ZERO) - .nseconds() / 1_000; - - // push non‑blocking; drop oldest on overflow - if tx.try_send(Ok(AudioPacket { - id, - pts: pts_us, - data: map.as_slice().to_vec(), - })).is_err() { - static DROPS: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(0); - let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if d % 300 == 0 { - warn!("πŸŽ§πŸ’” dropped {d} audio AUs (client too slow)"); + static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n % 300 == 0 { + debug!("🎧 ear #{n}: {}β€―bytes", map.len()); } + + let pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; + + // push non‑blocking; drop oldest on overflow + if tx + .try_send(Ok(AudioPacket { + id, + pts: pts_us, + data: map.as_slice().to_vec(), + })) + .is_err() + { + static DROPS: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if d % 300 == 0 { + warn!("πŸŽ§πŸ’” dropped {d} audio AUs (client too slow)"); + } + } + Ok(gst::FlowSuccess::Ok) } - Ok(gst::FlowSuccess::Ok) - } - }).build(), + }) + .build(), ); - pipeline.set_state(gst::State::Playing) + pipeline + .set_state(gst::State::Playing) .context("starting audio pipeline")?; Ok(AudioStream { _pipeline: pipeline, - inner: ReceiverStream::new(rx), + inner: ReceiverStream::new(rx), }) } @@ -152,9 +161,7 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result { .into_iter() .find(|&e| { reg.find_plugin(e).is_some() - || reg - .find_feature(e, ElementFactory::static_type()) - .is_some() + || reg.find_feature(e, ElementFactory::static_type()).is_some() }) .ok_or_else(|| anyhow!("no AAC encoder plugin available"))?; @@ -176,8 +183,8 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result { // ────────────────────── minute‑clip helper ─────────────────────────────── pub struct ClipTap { - buf: Vec, - tag: &'static str, + buf: Vec, + tag: &'static str, next_dump: Instant, period: Duration, } @@ -222,8 +229,8 @@ impl Drop for ClipTap { // ────────────────────── microphone sink ──────────────────────────────── pub struct Voice { appsrc: gst_app::AppSrc, - _pipe: gst::Pipeline, // keep pipeline alive - tap: ClipTap, + _pipe: gst::Pipeline, // keep pipeline alive + tap: ClipTap, } impl Voice { @@ -236,7 +243,7 @@ impl Voice { let pipeline = gst::Pipeline::new(); // elements - let appsrc = gst::ElementFactory::make("appsrc") + let appsrc = gst::ElementFactory::make("appsrc") .build() .context("make appsrc")? .downcast::() @@ -246,10 +253,10 @@ impl Voice { appsrc.set_format(gst::Format::Time); appsrc.set_is_live(true); - let decodebin = gst::ElementFactory::make("decodebin") + let decodebin = gst::ElementFactory::make("decodebin") .build() .context("make decodebin")?; - let alsa_sink = gst::ElementFactory::make("alsasink") + let alsa_sink = gst::ElementFactory::make("alsasink") .build() .context("make alsasink")?; @@ -259,7 +266,7 @@ impl Voice { appsrc.link(&decodebin)?; /*------------ decodebin autolink ----------------*/ - let sink_clone = alsa_sink.clone(); // keep original for later + let sink_clone = alsa_sink.clone(); // keep original for later decodebin.connect_pad_added(move |_db, pad| { let sink_pad = sink_clone.static_pad("sink").unwrap(); if !sink_pad.is_linked() { diff --git a/server/src/gadget.rs b/server/src/gadget.rs index f298908..c7f4fbc 100644 --- a/server/src/gadget.rs +++ b/server/src/gadget.rs @@ -1,7 +1,13 @@ // server/src/gadget.rs -use std::{fs::{self, OpenOptions}, io::Write, path::Path, thread, time::Duration}; use anyhow::{Context, Result}; -use tracing::{info, warn, trace}; +use std::{ + fs::{self, OpenOptions}, + io::Write, + path::Path, + thread, + time::Duration, +}; +use tracing::{info, trace, warn}; #[derive(Clone)] pub struct UsbGadget { @@ -46,8 +52,10 @@ impl UsbGadget { } thread::sleep(Duration::from_millis(50)); } - Err(anyhow::anyhow!("UDC never reached '{wanted}' (last = {:?})", - fs::read_to_string(&path).unwrap_or_default())) + Err(anyhow::anyhow!( + "UDC never reached '{wanted}' (last = {:?})", + fs::read_to_string(&path).unwrap_or_default() + )) } pub fn wait_state_any(ctrl: &str, limit_ms: u64) -> anyhow::Result { @@ -61,7 +69,9 @@ impl UsbGadget { } std::thread::sleep(std::time::Duration::from_millis(50)); } - Err(anyhow::anyhow!("UDC state did not settle within {limit_ms}β€―ms")) + Err(anyhow::anyhow!( + "UDC state did not settle within {limit_ms}β€―ms" + )) } /// Write `value` (plus β€œ\n”) into a sysfs attribute @@ -81,14 +91,18 @@ impl UsbGadget { } thread::sleep(Duration::from_millis(50)); } - Err(anyhow::anyhow!("⚠️ UDC {ctrl} did not re-appear within {limit_ms}β€―ms")) + Err(anyhow::anyhow!( + "⚠️ UDC {ctrl} did not re-appear within {limit_ms}β€―ms" + )) } /// Scan platform devices when /sys/class/udc is empty fn probe_platform_udc() -> Result> { for entry in fs::read_dir("/sys/bus/platform/devices")? { let p = entry?.file_name().into_string().unwrap(); - if p.ends_with(".usb") { return Ok(Some(p)); } + if p.ends_with(".usb") { + return Ok(Some(p)); + } } Ok(None) } @@ -98,26 +112,29 @@ impl UsbGadget { /// Hard-reset the gadget β†’ identical to a physical cable re-plug pub fn cycle(&self) -> Result<()> { /* 0β€―-β€―ensure we *know* the controller even after a previous crash */ - let ctrl = Self::find_controller() - .or_else(|_| Self::probe_platform_udc()? - .ok_or_else(|| anyhow::anyhow!("no UDC present")))?; + let ctrl = Self::find_controller().or_else(|_| { + Self::probe_platform_udc()?.ok_or_else(|| anyhow::anyhow!("no UDC present")) + })?; /* 1 - detach gadget */ info!("πŸ”Œ detaching gadget from {ctrl}"); // a) drop pull-ups (if the controller offers the switch) let sc = format!("/sys/class/udc/{ctrl}/soft_connect"); - let _ = Self::write_attr(&sc, "0"); // ignore errors - not all HW has it + let _ = Self::write_attr(&sc, "0"); // ignore errors - not all HW has it // b) clear the UDC attribute; the kernel may transiently answer EBUSY for attempt in 1..=10 { match Self::write_attr(self.udc_file, "") { Ok(_) => break, - Err(err) if { - // only swallow EBUSY - err.downcast_ref::() - .and_then(|io| io.raw_os_error()) - == Some(libc::EBUSY) && attempt < 10 - } => { + Err(err) + if { + // only swallow EBUSY + err.downcast_ref::() + .and_then(|io| io.raw_os_error()) + == Some(libc::EBUSY) + && attempt < 10 + } => + { trace!("⏳ UDC busy (attempt {attempt}/10) - retrying…"); thread::sleep(Duration::from_millis(100)); } @@ -146,10 +163,10 @@ impl UsbGadget { Some(libc::EINVAL) | Some(libc::EPERM) | Some(libc::ENOENT) => { warn!("⚠️ soft_connect unsupported ({io}); continuing"); } - _ => return Err(err), // propagate all other errors + _ => return Err(err), // propagate all other errors } } else { - return Err(err); // non-IO errors: propagate + return Err(err); // non-IO errors: propagate } } Ok(_) => { /* success */ } @@ -157,21 +174,20 @@ impl UsbGadget { } /* 5 - wait for host (but tolerate sleep) */ - Self::wait_state(&ctrl, "configured", 6_000) - .or_else(|e| { - // If the host is physically absent (sleep / KVM paused) - // we allow 'not attached' and continue - we can still - // accept keyboard/mouse data and the host will enumerate - // later without another reset. - let last = fs::read_to_string(format!("/sys/class/udc/{ctrl}/state")) - .unwrap_or_default(); - if last.trim() == "not attached" { - warn!("⚠️ host did not enumerate within 6β€―s - continuing (state = {last:?})"); - Ok(()) - } else { - Err(e) - } - })?; + Self::wait_state(&ctrl, "configured", 6_000).or_else(|e| { + // If the host is physically absent (sleep / KVM paused) + // we allow 'not attached' and continue - we can still + // accept keyboard/mouse data and the host will enumerate + // later without another reset. + let last = + fs::read_to_string(format!("/sys/class/udc/{ctrl}/state")).unwrap_or_default(); + if last.trim() == "not attached" { + warn!("⚠️ host did not enumerate within 6β€―s - continuing (state = {last:?})"); + Ok(()) + } else { + Err(e) + } + })?; info!("βœ… USB-gadget cycle complete"); Ok(()) @@ -182,8 +198,10 @@ impl UsbGadget { let cand = ["dwc2", "dwc3"]; for drv in cand { let root = format!("/sys/bus/platform/drivers/{drv}"); - if !Path::new(&root).exists() { continue } - + if !Path::new(&root).exists() { + continue; + } + /*----------- unbind ------------------------------------------------*/ info!("πŸ”§ unbinding UDC driver ({drv})"); for attempt in 1..=20 { @@ -193,34 +211,32 @@ impl UsbGadget { trace!("unbind in-progress (#{attempt}) - waiting…"); thread::sleep(Duration::from_millis(100)); } - Err(err) => return Err(err) - .context("UDC unbind failed irrecoverably"), + Err(err) => return Err(err).context("UDC unbind failed irrecoverably"), } } - thread::sleep(Duration::from_millis(150)); // let the core quiesce + thread::sleep(Duration::from_millis(150)); // let the core quiesce /*----------- bind --------------------------------------------------*/ info!("πŸ”§ binding UDC driver ({drv})"); for attempt in 1..=20 { match Self::write_attr(format!("{root}/bind"), ctrl) { - Ok(_) => return Ok(()), // success πŸŽ‰ + Ok(_) => return Ok(()), // success πŸŽ‰ Err(err) if attempt < 20 && Self::is_still_detaching(&err) => { trace!("bind busy (#{attempt}) - retrying…"); thread::sleep(Duration::from_millis(100)); } - Err(err) => return Err(err) - .context("UDC bind failed irrecoverably"), + Err(err) => return Err(err).context("UDC bind failed irrecoverably"), } } } Err(anyhow::anyhow!("no dwc2/dwc3 driver nodes found")) } - + fn is_still_detaching(err: &anyhow::Error) -> bool { err.downcast_ref::() - .and_then(|io| io.raw_os_error()) - .map_or(false, |code| { - matches!(code, libc::EBUSY | libc::ENOENT | libc::ENODEV) - }) + .and_then(|io| io.raw_os_error()) + .map_or(false, |code| { + matches!(code, libc::EBUSY | libc::ENOENT | libc::ENODEV) + }) } } diff --git a/server/src/handshake.rs b/server/src/handshake.rs index d11f179..903fce9 100644 --- a/server/src/handshake.rs +++ b/server/src/handshake.rs @@ -7,7 +7,7 @@ use lesavka_common::lesavka::{ }; pub struct HandshakeSvc { - pub camera: bool, + pub camera: bool, pub microphone: bool, } @@ -18,7 +18,7 @@ impl Handshake for HandshakeSvc { _req: Request, ) -> Result, Status> { Ok(Response::new(HandshakeSet { - camera: self.camera, + camera: self.camera, microphone: self.microphone, })) } @@ -26,6 +26,9 @@ impl Handshake for HandshakeSvc { impl HandshakeSvc { pub fn server() -> HandshakeServer { - HandshakeServer::new(Self { camera: true, microphone: true }) + HandshakeServer::new(Self { + camera: true, + microphone: true, + }) } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 38c337f..db93be3 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,6 +1,6 @@ // server/src/lib.rs pub mod audio; -pub mod video; pub mod gadget; pub mod handshake; +pub mod video; diff --git a/server/src/main.rs b/server/src/main.rs index b4c2f15..e4b4e63 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,33 +2,27 @@ // server/src/main.rs #![forbid(unsafe_code)] -use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc}; -use std::sync::atomic::AtomicBool; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::Context as _; use futures_util::{Stream, StreamExt}; -use tokio::{ - fs::{OpenOptions}, - io::AsyncWriteExt, - sync::Mutex, -}; use gstreamer as gst; +use std::sync::atomic::AtomicBool; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc}; +use tokio::{fs::OpenOptions, io::AsyncWriteExt, sync::Mutex}; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status}; use tonic::transport::Server; -use tonic_reflection::server::{Builder as ReflBuilder}; -use tracing::{info, warn, error, trace, debug}; -use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; +use tonic::{Request, Response, Status}; +use tonic_reflection::server::Builder as ReflBuilder; +use tracing::{debug, error, info, trace, warn}; use tracing_appender::non_blocking::WorkerGuard; +use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use lesavka_common::lesavka::{ - Empty, ResetUsbReply, + AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, ResetUsbReply, VideoPacket, relay_server::{Relay, RelayServer}, - KeyboardReport, MouseReport, - MonitorRequest, VideoPacket, AudioPacket }; -use lesavka_server::{gadget::UsbGadget, video, audio, handshake::HandshakeSvc}; +use lesavka_server::{audio, gadget::UsbGadget, handshake::HandshakeSvc, video}; /*──────────────── constants ────────────────*/ /// **false**Β = never reset automatically. @@ -39,22 +33,19 @@ const PKG_NAME: &str = env!("CARGO_PKG_NAME"); /*──────────────── logging ───────────────────*/ fn init_tracing() -> anyhow::Result { let file = std::fs::OpenOptions::new() - .create(true).truncate(true).write(true) + .create(true) + .truncate(true) + .write(true) .open("/tmp/lesavka-server.log")?; let (file_writer, guard) = tracing_appender::non_blocking(file); - let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| { - EnvFilter::new("lesavka_server=info,lesavka_server::video=warn") - }); + let env_filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("lesavka_server=info,lesavka_server::video=warn")); let filter_str = env_filter.to_string(); tracing_subscriber::registry() .with(env_filter) - .with( - fmt::layer() - .with_target(true) - .with_thread_ids(true), - ) + .with(fmt::layer().with_target(true).with_thread_ids(true)) .with( fmt::layer() .with_writer(file_writer) @@ -69,9 +60,13 @@ fn init_tracing() -> anyhow::Result { /*──────────────── helpers ───────────────────*/ async fn open_with_retry(path: &str) -> anyhow::Result { - for attempt in 1..=200 { // β‰ˆ10β€―s + for attempt in 1..=200 { + // β‰ˆ10β€―s match OpenOptions::new() - .write(true).custom_flags(libc::O_NONBLOCK).open(path).await + .write(true) + .custom_flags(libc::O_NONBLOCK) + .open(path) + .await { Ok(f) => { info!("βœ… {path} opened on attempt #{attempt}"); @@ -88,8 +83,7 @@ async fn open_with_retry(path: &str) -> anyhow::Result { } fn next_minute() -> SystemTime { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH).unwrap(); + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); let secs = now.as_secs(); let next = (secs / 60 + 1) * 60; UNIX_EPOCH + Duration::from_secs(next) @@ -107,7 +101,7 @@ impl Handler { async fn new(gadget: UsbGadget) -> anyhow::Result { if AUTO_CYCLE { info!("πŸ› οΈ Initial USB reset…"); - let _ = gadget.cycle(); // ignore failure - may boot without host + let _ = gadget.cycle(); // ignore failure - may boot without host } else { info!("πŸ› οΈ AUTO_CYCLE disabled -Β no initial reset"); } @@ -138,10 +132,10 @@ impl Handler { #[tonic::async_trait] impl Relay for Handler { /* existing streams ─ unchanged, except: no more auto-reset */ - type StreamKeyboardStream = ReceiverStream>; - type StreamMouseStream = ReceiverStream>; - type CaptureVideoStream = Pin> + Send>>; - type CaptureAudioStream = Pin> + Send>>; + type StreamKeyboardStream = ReceiverStream>; + type StreamMouseStream = ReceiverStream>; + type CaptureVideoStream = Pin> + Send>>; + type CaptureAudioStream = Pin> + Send>>; type StreamMicrophoneStream = ReceiverStream>; type StreamCameraStream = ReceiverStream>; @@ -191,9 +185,9 @@ impl Relay for Handler { &self, req: Request>, ) -> Result, Status> { - // 1 ─ build once, early - let mut sink = audio::Voice::new("hw:UAC2Gadget,0").await + let mut sink = audio::Voice::new("hw:UAC2Gadget,0") + .await .map_err(|e| Status::internal(format!("{e:#}")))?; // 2 ─ dummy outbound stream (same trick as before) @@ -202,8 +196,7 @@ impl Relay for Handler { // 3 ─ drive the sink in a background task tokio::spawn(async move { let mut inbound = req.into_inner(); - static CNT: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(0); + static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); while let Some(pkt) = inbound.next().await.transpose()? { let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -212,7 +205,7 @@ impl Relay for Handler { } sink.push(&pkt); } - sink.finish(); // flush on EOS + sink.finish(); // flush on EOS let _ = tx.send(Ok(Empty {})).await; Ok::<(), Status>(()) }); @@ -225,25 +218,24 @@ impl Relay for Handler { req: Request>, ) -> Result, Status> { // map gRPC camera id β†’ UVC device - let uvc = std::env::var("LESAVKA_UVC_DEV") - .unwrap_or_else(|_| "/dev/video4".into()); - + let uvc = std::env::var("LESAVKA_UVC_DEV").unwrap_or_else(|_| "/dev/video4".into()); + // build once - let relay = video::CameraRelay::new(0, &uvc) - .map_err(|e| Status::internal(format!("{e:#}")))?; - + let relay = + video::CameraRelay::new(0, &uvc).map_err(|e| Status::internal(format!("{e:#}")))?; + // dummy outbound (same pattern as other streams) let (tx, rx) = tokio::sync::mpsc::channel(1); - + tokio::spawn(async move { let mut s = req.into_inner(); while let Some(pkt) = s.next().await.transpose()? { - relay.feed(pkt); // ← all logging inside video.rs + relay.feed(pkt); // ← all logging inside video.rs } tx.send(Ok(Empty {})).await.ok(); Ok::<(), Status>(()) }); - + Ok(Response::new(ReceiverStream::new(rx))) } @@ -269,10 +261,9 @@ impl Relay for Handler { req: Request, ) -> Result, Status> { // Only one speaker stream for now; both 0/1 β†’ same ALSA dev. - let _id = req.into_inner().id; + let _id = req.into_inner().id; // Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging). - let dev = std::env::var("LESAVKA_ALSA_DEV") - .unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); + let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); let s = audio::ear(&dev, 0) .await @@ -282,10 +273,7 @@ impl Relay for Handler { } /*────────────── USB-reset RPC ────────────*/ - async fn reset_usb( - &self, - _req: Request, - ) -> Result, Status> { + async fn reset_usb(&self, _req: Request) -> Result, Status> { info!("πŸ”΄ explicit ResetUsb() called"); match self.gadget.cycle() { Ok(_) => { @@ -314,17 +302,17 @@ async fn main() -> anyhow::Result<()> { error!("πŸ’₯ panic: {p}\n{bt}"); })); - let gadget = UsbGadget::new("lesavka"); + let gadget = UsbGadget::new("lesavka"); let handler = Handler::new(gadget.clone()).await?; info!("🌐 lesavka-server listening on 0.0.0.0:50051"); Server::builder() .tcp_nodelay(true) - .max_frame_size(Some(2*1024*1024)) + .max_frame_size(Some(2 * 1024 * 1024)) .add_service(RelayServer::new(handler)) .add_service(HandshakeSvc::server()) .add_service(ReflBuilder::configure().build_v1().unwrap()) - .serve(([0,0,0,0], 50051).into()) + .serve(([0, 0, 0, 0], 50051).into()) .await?; Ok(()) } diff --git a/server/src/video.rs b/server/src/video.rs index 7957e9c..dc20ea9 100644 --- a/server/src/video.rs +++ b/server/src/video.rs @@ -1,16 +1,16 @@ // server/src/video.rs use anyhow::Context; +use futures_util::Stream; +use gst::MessageView::*; +use gst::prelude::*; +use gst::{MessageView, log}; use gstreamer as gst; use gstreamer_app as gst_app; -use gst::prelude::*; -use gst::{log, MessageView}; -use gst::MessageView::*; use lesavka_common::lesavka::VideoPacket; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; -use tracing::{debug, warn, error, info, enabled, trace, Level}; -use futures_util::Stream; +use tracing::{Level, debug, enabled, error, info, trace, warn}; const EYE_ID: [&str; 2] = ["l", "r"]; static START: std::sync::OnceLock = std::sync::OnceLock::new(); @@ -37,11 +37,7 @@ impl Drop for VideoStream { } } -pub async fn eye_ball( - dev: &str, - id: u32, - _max_bitrate_kbit: u32, -) -> anyhow::Result { +pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result { let eye = EYE_ID[id as usize]; gst::init().context("gst init")?; @@ -79,8 +75,10 @@ pub async fn eye_ball( /* ----- BUS WATCH: show errors & warnings immediately --------------- */ let bus = pipeline.bus().expect("bus"); - if let Some(src_pad) = pipeline.by_name(&format!("cam_{eye}")) - .and_then(|e| e.static_pad("src")) { + if let Some(src_pad) = pipeline + .by_name(&format!("cam_{eye}")) + .and_then(|e| e.static_pad("src")) + { src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| { if let Some(gst::PadProbeData::Event(ref ev)) = info.data { if let gst::EventView::Caps(c) = ev.view() { @@ -139,8 +137,7 @@ pub async fn eye_ball( let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; /* -------- basic counters ------ */ - static FRAME: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(0); + static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 120 == 0 { trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames"); @@ -157,7 +154,9 @@ pub async fn eye_ball( /* -------- detect SPS / IDR ---- */ if enabled!(Level::DEBUG) { if let Some(&nal) = map.as_slice().get(4) { - if (nal & 0x1F) == 0x05 /* IDR */ { + if (nal & 0x1F) == 0x05 + /* IDR */ + { debug!("eye-{eye}: IDR"); } } @@ -175,7 +174,11 @@ pub async fn eye_ball( /* -------- ship over gRPC ----- */ let data = map.as_slice().to_vec(); let size = data.len(); - let pkt = VideoPacket { id, pts: pts_us, data }; + let pkt = VideoPacket { + id, + pts: pts_us, + data, + }; match tx.try_send(Ok(pkt)) { Ok(_) => { trace!(target:"lesavka_server::video", @@ -202,22 +205,31 @@ pub async fn eye_ball( .build(), ); - pipeline.set_state(gst::State::Playing).context("πŸŽ₯ starting video pipeline eye-{eye}")?; + pipeline + .set_state(gst::State::Playing) + .context("πŸŽ₯ starting video pipeline eye-{eye}")?; let bus = pipeline.bus().unwrap(); loop { match bus.timed_pop(gst::ClockTime::NONE) { - Some(msg) if matches!(msg.view(), MessageView::StateChanged(s) - if s.current() == gst::State::Playing) => break, + Some(msg) + if matches!(msg.view(), MessageView::StateChanged(s) + if s.current() == gst::State::Playing) => + { + break; + } Some(_) => continue, None => continue, } } - Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx) }) + Ok(VideoStream { + _pipeline: pipeline, + inner: ReceiverStream::new(rx), + }) } pub struct WebcamSink { appsrc: gst_app::AppSrc, - _pipe: gst::Pipeline, + _pipe: gst::Pipeline, } impl WebcamSink { @@ -225,39 +237,85 @@ impl WebcamSink { gst::init()?; let pipeline = gst::Pipeline::new(); + + let caps_h264 = gst::Caps::builder("video/x-h264") + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build(); + let raw_caps = gst::Caps::builder("video/x-raw") + .field("format", "YUY2") + .field("width", 1280i32) + .field("height", 720i32) + .field("framerate", gst::Fraction::new(30, 1)) + .build(); + let src = gst::ElementFactory::make("appsrc") .build()? .downcast::() .expect("appsrc"); src.set_is_live(true); src.set_format(gst::Format::Time); + src.set_caps(Some(&caps_h264)); + src.set_property("block", &true); let h264parse = gst::ElementFactory::make("h264parse").build()?; - let decoder = gst::ElementFactory::make("v4l2h264dec").build()?; - let convert = gst::ElementFactory::make("videoconvert").build()?; - let sink = gst::ElementFactory::make("v4l2sink") - .property("device", &uvc_dev) - .property("sync", &false) - .build()?; + let decoder_name = Self::pick_decoder(); + let decoder = gst::ElementFactory::make(decoder_name) + .build() + .with_context(|| format!("building decoder element {decoder_name}"))?; + let convert = gst::ElementFactory::make("videoconvert").build()?; + let caps = gst::ElementFactory::make("capsfilter") + .property("caps", &raw_caps) + .build()?; + let sink = gst::ElementFactory::make("v4l2sink") + .property("device", &uvc_dev) + .property("sync", &false) + .build()?; // Up‑cast to &gst::Element for the collection macros pipeline.add_many(&[ - src.upcast_ref(), &h264parse, &decoder, &convert, &sink + src.upcast_ref(), + &h264parse, + &decoder, + &convert, + &caps, + &sink, ])?; gst::Element::link_many(&[ - src.upcast_ref(), &h264parse, &decoder, &convert, &sink + src.upcast_ref(), + &h264parse, + &decoder, + &convert, + &caps, + &sink, ])?; pipeline.set_state(gst::State::Playing)?; - Ok(Self { appsrc: src, _pipe: pipeline }) + Ok(Self { + appsrc: src, + _pipe: pipeline, + }) } pub fn push(&self, pkt: VideoPacket) { let mut buf = gst::Buffer::from_slice(pkt.data); - buf.get_mut().unwrap() - .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + buf.get_mut() + .unwrap() + .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); let _ = self.appsrc.push_buffer(buf); } + + fn pick_decoder() -> &'static str { + if gst::ElementFactory::find("v4l2h264dec").is_some() { + "v4l2h264dec" + } else if gst::ElementFactory::find("v4l2slh264dec").is_some() { + "v4l2slh264dec" + } else if gst::ElementFactory::find("omxh264dec").is_some() { + "omxh264dec" + } else { + "avdec_h264" + } + } } /*─────────────────────────────────*/ @@ -265,15 +323,15 @@ impl WebcamSink { /*─────────────────────────────────*/ pub struct CameraRelay { - sink: WebcamSink, // the v4l2sink pipeline (or stub) - id: u32, // gRPC β€œid” (for future multi‑cam) + sink: WebcamSink, // the v4l2sink pipeline (or stub) + id: u32, // gRPC β€œid” (for future multi‑cam) frames: std::sync::atomic::AtomicU64, } impl CameraRelay { pub fn new(id: u32, uvc_dev: &str) -> anyhow::Result { Ok(Self { - sink: WebcamSink::new(uvc_dev)?, + sink: WebcamSink::new(uvc_dev)?, id, frames: std::sync::atomic::AtomicU64::new(0), }) @@ -281,7 +339,9 @@ impl CameraRelay { /// Push one VideoPacket coming from the client pub fn feed(&self, pkt: VideoPacket) { - let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let n = self + .frames + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n % 60 == 0 { tracing::debug!(target:"lesavka_server::video", cam_id = self.id, diff --git a/server/tests/hid.rs b/server/tests/hid.rs index 3b2f1bd..6ca3426 100644 --- a/server/tests/hid.rs +++ b/server/tests/hid.rs @@ -1,14 +1,24 @@ #[tokio::test] async fn hid_roundtrip() { use lesavka_common::lesavka::*; - use lesavka_server::RelaySvc; // export the struct in lib.rs + use lesavka_server::RelaySvc; // export the struct in lib.rs let svc = RelaySvc::default(); let (mut cli, srv) = tonic::transport::Channel::balance_channel(1); - tokio::spawn(tonic::transport::server::Server::builder() - .add_service(relay_server::RelayServer::new(svc)) - .serve_with_incoming(srv)); + tokio::spawn( + tonic::transport::server::Server::builder() + .add_service(relay_server::RelayServer::new(svc)) + .serve_with_incoming(srv), + ); - let (mut tx, mut rx) = relay_client::RelayClient::new(cli).stream().await.unwrap().into_inner(); - tx.send(HidReport { data: vec![0,0,4,0,0,0,0,0] }).await.unwrap(); - assert!(rx.message().await.unwrap().is_none()); // nothing echoed yet + let (mut tx, mut rx) = relay_client::RelayClient::new(cli) + .stream() + .await + .unwrap() + .into_inner(); + tx.send(HidReport { + data: vec![0, 0, 4, 0, 0, 0, 0, 0], + }) + .await + .unwrap(); + assert!(rx.message().await.unwrap().is_none()); // nothing echoed yet }