install updates - relay service update

This commit is contained in:
Brad Stein 2025-11-30 16:16:03 -03:00
parent 4f629facca
commit d2677afc46
10 changed files with 341 additions and 224 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ override.toml
**/*~
*.swp
*.swo
*.md

View File

@ -5,8 +5,19 @@ set -euo pipefail
ORIG_USER=${SUDO_USER:-$(id -un)}
# 1. packages (Arch)
sudo pacman -Syq --needed --noconfirm git rustup protobuf gcc evtest gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav pipewire pipewire-pulse
yay -S --noconfirm grpcurl-bin
sudo pacman -Syq --needed --noconfirm \
git rustup protobuf gcc evtest base-devel \
gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav \
pipewire pipewire-pulse
# 1b. yay for AUR bits (run as the invoking user, never root)
if ! command -v yay >/dev/null 2>&1; then
sudo -u "$ORIG_USER" bash -c 'cd /tmp && git clone --depth 1 https://aur.archlinux.org/yay.git &&
cd yay && makepkg -si --noconfirm'
fi
sudo -u "$ORIG_USER" yay -S --needed --noconfirm grpcurl-bin
# 1c. input access
sudo usermod -aG input "$ORIG_USER"
# 2. Rust tool-chain for both root & user
@ -14,7 +25,9 @@ sudo rustup default stable
sudo -u "$ORIG_USER" rustup default stable
# 3. clone / update into a user-writable dir
SRC="$HOME/.local/src/lesavka"
USER_HOME=$(getent passwd "$ORIG_USER" | cut -d: -f6)
SRC="$USER_HOME/.local/src/lesavka"
sudo -u "$ORIG_USER" mkdir -p "$(dirname "$SRC")"
if [[ -d $SRC/.git ]]; then
sudo -u "$ORIG_USER" git -C "$SRC" pull --ff-only
else
@ -41,7 +54,7 @@ Group=root
Environment=RUST_LOG=debug
Environment=LESAVKA_DEV_MODE=1
Environment=LESAVKA_SERVER_ADDR=http://64.25.10.31:50051
Environment=LESAVKA_SERVER_ADDR=http://38.28.125.112:50051
ExecStart=/usr/local/bin/lesavka-client
Restart=no

View File

@ -24,6 +24,7 @@ sudo pacman -Syq --needed --noconfirm git \
pipewire-pulse \
tailscale \
base-devel \
v4l-utils \
gstreamer \
gst-plugins-base \
gst-plugins-base-libs \
@ -48,12 +49,30 @@ options uvcvideo quirks=0x200 timeout=10000
EOF
echo "==> 2b. Predictable /dev names for each capture card"
# probe all v4l2 devices, keep only the two GC311 capture cards
# ensure relay (GPIO power) is on if present
if systemctl list-unit-files | grep -q '^relay.service'; then
sudo systemctl enable --now relay.service
sleep 2
fi
# probe v4l2 devices for GC311s (07ca:3311)
mapfile -t GC_VIDEOS < <(
sudo v4l2-ctl --list-devices |
sudo v4l2-ctl --list-devices 2>/dev/null |
awk '/Live Gamer MINI/{getline; print $1}'
)
# fallback via udev if v4l2-ctl output is empty/partial
if [ "${#GC_VIDEOS[@]}" -ne 2 ]; then
mapfile -t GC_VIDEOS < <(
for dev in /dev/video*; do
props=$(sudo udevadm info -q property -n "$dev" 2>/dev/null || true)
if echo "$props" | grep -q 'ID_VENDOR_ID=07ca' && echo "$props" | grep -q 'ID_MODEL_ID=3311'; then
echo "$dev"
fi
done | sort -u
)
fi
if [ "${#GC_VIDEOS[@]}" -ne 2 ]; then
echo "❌ Exactly two GC311 capture cards (index0) must be attached!" >&2
printf ' Detected: %s\n' "${GC_VIDEOS[@]}"
@ -89,7 +108,7 @@ sudo -u "$ORIG_USER" rustup default stable
echo "==> 4a. Source checkout"
SRC_DIR=/var/src/lesavka
REPO_URL=ssh://git@scm.bstein.dev:2242/brad_stein/lesavka.git
REPO_URL=ssh://git@scm.bstein.dev:2242/bstein/lesavka.git
if [[ ! -d $SRC_DIR ]]; then
sudo mkdir -p /var/src
sudo chown "$ORIG_USER":"$ORIG_USER" /var/src

View File

@ -1,19 +1,19 @@
// server/src/audio.rs
#![forbid(unsafe_code)]
use anyhow::{anyhow, Context};
use anyhow::{Context, anyhow};
use chrono::Local;
use futures_util::Stream;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst::ElementFactory;
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, error, warn};
use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH};
use std::sync::{Arc, Mutex};
use lesavka_common::lesavka::AudioPacket;
@ -21,7 +21,7 @@ use lesavka_common::lesavka::AudioPacket;
/// endpoint) **towards** the client.
pub struct AudioStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<AudioPacket, Status>>,
inner: ReceiverStream<Result<AudioPacket, Status>>,
}
impl Stream for AudioStream {
@ -58,17 +58,18 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
*/
let desc = build_pipeline_desc(alsa_dev)?;
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast()
.expect("pipeline");
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline");
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.expect("asink")
.downcast()
.expect("appsink");
let tap = Arc::new(Mutex::new(ClipTap::new("🎧 - ear", Duration::from_secs(60))));
let tap = Arc::new(Mutex::new(ClipTap::new(
"🎧 - ear",
Duration::from_secs(60),
)));
// sink.connect("underrun", false, |_| {
// tracing::warn!("⚠️ USB playback underrun host muted or not reading");
// None
@ -80,12 +81,19 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!("💥 audio pipeline: {} ({})",
e.error(), e.debug().unwrap_or_default()),
Warning(w) => warn!("⚠️ audio pipeline: {} ({})",
w.error(), w.debug().unwrap_or_default()),
StateChanged(s) if s.current() == gst::State::Playing =>
debug!("🎶 audio pipeline PLAYING"),
Error(e) => error!(
"💥 audio pipeline: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ audio pipeline: {} ({})",
w.error(),
w.debug().unwrap_or_default()
),
StateChanged(s) if s.current() == gst::State::Playing => {
debug!("🎶 audio pipeline PLAYING")
}
_ => {}
}
}
@ -94,52 +102,53 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
/*──────────── callbacks ────────────*/
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let tap = tap.clone();
move |s| {
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
.new_sample({
let tap = tap.clone();
move |s| {
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
// -------- cliptap (minute dumps) ------------
tap.lock().unwrap().feed(map.as_slice());
// -------- cliptap (minute dumps) ------------
tap.lock().unwrap().feed(map.as_slice());
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
debug!("🎧 ear #{n}: {}bytes", map.len());
}
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.nseconds() / 1_000;
// push nonblocking; drop oldest on overflow
if tx.try_send(Ok(AudioPacket {
id,
pts: pts_us,
data: map.as_slice().to_vec(),
})).is_err() {
static DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if d % 300 == 0 {
warn!("🎧💔 dropped {d} audio AUs (client too slow)");
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
debug!("🎧 ear #{n}: {}bytes", map.len());
}
let pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
// push nonblocking; drop oldest on overflow
if tx
.try_send(Ok(AudioPacket {
id,
pts: pts_us,
data: map.as_slice().to_vec(),
}))
.is_err()
{
static DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if d % 300 == 0 {
warn!("🎧💔 dropped {d} audio AUs (client too slow)");
}
}
Ok(gst::FlowSuccess::Ok)
}
Ok(gst::FlowSuccess::Ok)
}
}).build(),
})
.build(),
);
pipeline.set_state(gst::State::Playing)
pipeline
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
Ok(AudioStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
inner: ReceiverStream::new(rx),
})
}
@ -152,9 +161,7 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
.into_iter()
.find(|&e| {
reg.find_plugin(e).is_some()
|| reg
.find_feature(e, ElementFactory::static_type())
.is_some()
|| reg.find_feature(e, ElementFactory::static_type()).is_some()
})
.ok_or_else(|| anyhow!("no AAC encoder plugin available"))?;
@ -176,8 +183,8 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
// ────────────────────── minuteclip helper ───────────────────────────────
pub struct ClipTap {
buf: Vec<u8>,
tag: &'static str,
buf: Vec<u8>,
tag: &'static str,
next_dump: Instant,
period: Duration,
}
@ -222,8 +229,8 @@ impl Drop for ClipTap {
// ────────────────────── microphone sink ────────────────────────────────
pub struct Voice {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline, // keep pipeline alive
tap: ClipTap,
_pipe: gst::Pipeline, // keep pipeline alive
tap: ClipTap,
}
impl Voice {
@ -236,7 +243,7 @@ impl Voice {
let pipeline = gst::Pipeline::new();
// elements
let appsrc = gst::ElementFactory::make("appsrc")
let appsrc = gst::ElementFactory::make("appsrc")
.build()
.context("make appsrc")?
.downcast::<gst_app::AppSrc>()
@ -246,10 +253,10 @@ impl Voice {
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
let decodebin = gst::ElementFactory::make("decodebin")
let decodebin = gst::ElementFactory::make("decodebin")
.build()
.context("make decodebin")?;
let alsa_sink = gst::ElementFactory::make("alsasink")
let alsa_sink = gst::ElementFactory::make("alsasink")
.build()
.context("make alsasink")?;
@ -259,7 +266,7 @@ impl Voice {
appsrc.link(&decodebin)?;
/*------------ decodebin autolink ----------------*/
let sink_clone = alsa_sink.clone(); // keep original for later
let sink_clone = alsa_sink.clone(); // keep original for later
decodebin.connect_pad_added(move |_db, pad| {
let sink_pad = sink_clone.static_pad("sink").unwrap();
if !sink_pad.is_linked() {

View File

@ -1,7 +1,13 @@
// server/src/gadget.rs
use std::{fs::{self, OpenOptions}, io::Write, path::Path, thread, time::Duration};
use anyhow::{Context, Result};
use tracing::{info, warn, trace};
use std::{
fs::{self, OpenOptions},
io::Write,
path::Path,
thread,
time::Duration,
};
use tracing::{info, trace, warn};
#[derive(Clone)]
pub struct UsbGadget {
@ -46,8 +52,10 @@ impl UsbGadget {
}
thread::sleep(Duration::from_millis(50));
}
Err(anyhow::anyhow!("UDC never reached '{wanted}' (last = {:?})",
fs::read_to_string(&path).unwrap_or_default()))
Err(anyhow::anyhow!(
"UDC never reached '{wanted}' (last = {:?})",
fs::read_to_string(&path).unwrap_or_default()
))
}
pub fn wait_state_any(ctrl: &str, limit_ms: u64) -> anyhow::Result<String> {
@ -61,7 +69,9 @@ impl UsbGadget {
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
Err(anyhow::anyhow!("UDC state did not settle within {limit_ms}ms"))
Err(anyhow::anyhow!(
"UDC state did not settle within {limit_ms}ms"
))
}
/// Write `value` (plus “\n”) into a sysfs attribute
@ -81,14 +91,18 @@ impl UsbGadget {
}
thread::sleep(Duration::from_millis(50));
}
Err(anyhow::anyhow!("⚠️ UDC {ctrl} did not re-appear within {limit_ms}ms"))
Err(anyhow::anyhow!(
"⚠️ UDC {ctrl} did not re-appear within {limit_ms}ms"
))
}
/// Scan platform devices when /sys/class/udc is empty
fn probe_platform_udc() -> Result<Option<String>> {
for entry in fs::read_dir("/sys/bus/platform/devices")? {
let p = entry?.file_name().into_string().unwrap();
if p.ends_with(".usb") { return Ok(Some(p)); }
if p.ends_with(".usb") {
return Ok(Some(p));
}
}
Ok(None)
}
@ -98,26 +112,29 @@ impl UsbGadget {
/// Hard-reset the gadget → identical to a physical cable re-plug
pub fn cycle(&self) -> Result<()> {
/* 0-ensure we *know* the controller even after a previous crash */
let ctrl = Self::find_controller()
.or_else(|_| Self::probe_platform_udc()?
.ok_or_else(|| anyhow::anyhow!("no UDC present")))?;
let ctrl = Self::find_controller().or_else(|_| {
Self::probe_platform_udc()?.ok_or_else(|| anyhow::anyhow!("no UDC present"))
})?;
/* 1 - detach gadget */
info!("🔌 detaching gadget from {ctrl}");
// a) drop pull-ups (if the controller offers the switch)
let sc = format!("/sys/class/udc/{ctrl}/soft_connect");
let _ = Self::write_attr(&sc, "0"); // ignore errors - not all HW has it
let _ = Self::write_attr(&sc, "0"); // ignore errors - not all HW has it
// b) clear the UDC attribute; the kernel may transiently answer EBUSY
for attempt in 1..=10 {
match Self::write_attr(self.udc_file, "") {
Ok(_) => break,
Err(err) if {
// only swallow EBUSY
err.downcast_ref::<std::io::Error>()
.and_then(|io| io.raw_os_error())
== Some(libc::EBUSY) && attempt < 10
} => {
Err(err)
if {
// only swallow EBUSY
err.downcast_ref::<std::io::Error>()
.and_then(|io| io.raw_os_error())
== Some(libc::EBUSY)
&& attempt < 10
} =>
{
trace!("⏳ UDC busy (attempt {attempt}/10) - retrying…");
thread::sleep(Duration::from_millis(100));
}
@ -146,10 +163,10 @@ impl UsbGadget {
Some(libc::EINVAL) | Some(libc::EPERM) | Some(libc::ENOENT) => {
warn!("⚠️ soft_connect unsupported ({io}); continuing");
}
_ => return Err(err), // propagate all other errors
_ => return Err(err), // propagate all other errors
}
} else {
return Err(err); // non-IO errors: propagate
return Err(err); // non-IO errors: propagate
}
}
Ok(_) => { /* success */ }
@ -157,21 +174,20 @@ impl UsbGadget {
}
/* 5 - wait for host (but tolerate sleep) */
Self::wait_state(&ctrl, "configured", 6_000)
.or_else(|e| {
// If the host is physically absent (sleep / KVM paused)
// we allow 'not attached' and continue - we can still
// accept keyboard/mouse data and the host will enumerate
// later without another reset.
let last = fs::read_to_string(format!("/sys/class/udc/{ctrl}/state"))
.unwrap_or_default();
if last.trim() == "not attached" {
warn!("⚠️ host did not enumerate within 6s - continuing (state = {last:?})");
Ok(())
} else {
Err(e)
}
})?;
Self::wait_state(&ctrl, "configured", 6_000).or_else(|e| {
// If the host is physically absent (sleep / KVM paused)
// we allow 'not attached' and continue - we can still
// accept keyboard/mouse data and the host will enumerate
// later without another reset.
let last =
fs::read_to_string(format!("/sys/class/udc/{ctrl}/state")).unwrap_or_default();
if last.trim() == "not attached" {
warn!("⚠️ host did not enumerate within 6s - continuing (state = {last:?})");
Ok(())
} else {
Err(e)
}
})?;
info!("✅ USB-gadget cycle complete");
Ok(())
@ -182,8 +198,10 @@ impl UsbGadget {
let cand = ["dwc2", "dwc3"];
for drv in cand {
let root = format!("/sys/bus/platform/drivers/{drv}");
if !Path::new(&root).exists() { continue }
if !Path::new(&root).exists() {
continue;
}
/*----------- unbind ------------------------------------------------*/
info!("🔧 unbinding UDC driver ({drv})");
for attempt in 1..=20 {
@ -193,34 +211,32 @@ impl UsbGadget {
trace!("unbind in-progress (#{attempt}) - waiting…");
thread::sleep(Duration::from_millis(100));
}
Err(err) => return Err(err)
.context("UDC unbind failed irrecoverably"),
Err(err) => return Err(err).context("UDC unbind failed irrecoverably"),
}
}
thread::sleep(Duration::from_millis(150)); // let the core quiesce
thread::sleep(Duration::from_millis(150)); // let the core quiesce
/*----------- bind --------------------------------------------------*/
info!("🔧 binding UDC driver ({drv})");
for attempt in 1..=20 {
match Self::write_attr(format!("{root}/bind"), ctrl) {
Ok(_) => return Ok(()), // success 🎉
Ok(_) => return Ok(()), // success 🎉
Err(err) if attempt < 20 && Self::is_still_detaching(&err) => {
trace!("bind busy (#{attempt}) - retrying…");
thread::sleep(Duration::from_millis(100));
}
Err(err) => return Err(err)
.context("UDC bind failed irrecoverably"),
Err(err) => return Err(err).context("UDC bind failed irrecoverably"),
}
}
}
Err(anyhow::anyhow!("no dwc2/dwc3 driver nodes found"))
}
fn is_still_detaching(err: &anyhow::Error) -> bool {
err.downcast_ref::<std::io::Error>()
.and_then(|io| io.raw_os_error())
.map_or(false, |code| {
matches!(code, libc::EBUSY | libc::ENOENT | libc::ENODEV)
})
.and_then(|io| io.raw_os_error())
.map_or(false, |code| {
matches!(code, libc::EBUSY | libc::ENOENT | libc::ENODEV)
})
}
}

View File

@ -7,7 +7,7 @@ use lesavka_common::lesavka::{
};
pub struct HandshakeSvc {
pub camera: bool,
pub camera: bool,
pub microphone: bool,
}
@ -18,7 +18,7 @@ impl Handshake for HandshakeSvc {
_req: Request<Empty>,
) -> Result<Response<HandshakeSet>, Status> {
Ok(Response::new(HandshakeSet {
camera: self.camera,
camera: self.camera,
microphone: self.microphone,
}))
}
@ -26,6 +26,9 @@ impl Handshake for HandshakeSvc {
impl HandshakeSvc {
pub fn server() -> HandshakeServer<Self> {
HandshakeServer::new(Self { camera: true, microphone: true })
HandshakeServer::new(Self {
camera: true,
microphone: true,
})
}
}

View File

@ -1,6 +1,6 @@
// server/src/lib.rs
pub mod audio;
pub mod video;
pub mod gadget;
pub mod handshake;
pub mod video;

View File

@ -2,33 +2,27 @@
// server/src/main.rs
#![forbid(unsafe_code)]
use std::{panic, backtrace::Backtrace, pin::Pin, sync::Arc};
use std::sync::atomic::AtomicBool;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::Context as _;
use futures_util::{Stream, StreamExt};
use tokio::{
fs::{OpenOptions},
io::AsyncWriteExt,
sync::Mutex,
};
use gstreamer as gst;
use std::sync::atomic::AtomicBool;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{backtrace::Backtrace, panic, pin::Pin, sync::Arc};
use tokio::{fs::OpenOptions, io::AsyncWriteExt, sync::Mutex};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tonic::transport::Server;
use tonic_reflection::server::{Builder as ReflBuilder};
use tracing::{info, warn, error, trace, debug};
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use tonic::{Request, Response, Status};
use tonic_reflection::server::Builder as ReflBuilder;
use tracing::{debug, error, info, trace, warn};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*};
use lesavka_common::lesavka::{
Empty, ResetUsbReply,
AudioPacket, Empty, KeyboardReport, MonitorRequest, MouseReport, ResetUsbReply, VideoPacket,
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket, AudioPacket
};
use lesavka_server::{gadget::UsbGadget, video, audio, handshake::HandshakeSvc};
use lesavka_server::{audio, gadget::UsbGadget, handshake::HandshakeSvc, video};
/*──────────────── constants ────────────────*/
/// **false** = never reset automatically.
@ -39,22 +33,19 @@ const PKG_NAME: &str = env!("CARGO_PKG_NAME");
/*──────────────── logging ───────────────────*/
fn init_tracing() -> anyhow::Result<WorkerGuard> {
let file = std::fs::OpenOptions::new()
.create(true).truncate(true).write(true)
.create(true)
.truncate(true)
.write(true)
.open("/tmp/lesavka-server.log")?;
let (file_writer, guard) = tracing_appender::non_blocking(file);
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
EnvFilter::new("lesavka_server=info,lesavka_server::video=warn")
});
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("lesavka_server=info,lesavka_server::video=warn"));
let filter_str = env_filter.to_string();
tracing_subscriber::registry()
.with(env_filter)
.with(
fmt::layer()
.with_target(true)
.with_thread_ids(true),
)
.with(fmt::layer().with_target(true).with_thread_ids(true))
.with(
fmt::layer()
.with_writer(file_writer)
@ -69,9 +60,13 @@ fn init_tracing() -> anyhow::Result<WorkerGuard> {
/*──────────────── helpers ───────────────────*/
async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
for attempt in 1..=200 { // ≈10s
for attempt in 1..=200 {
// ≈10s
match OpenOptions::new()
.write(true).custom_flags(libc::O_NONBLOCK).open(path).await
.write(true)
.custom_flags(libc::O_NONBLOCK)
.open(path)
.await
{
Ok(f) => {
info!("✅ {path} opened on attempt #{attempt}");
@ -88,8 +83,7 @@ async fn open_with_retry(path: &str) -> anyhow::Result<tokio::fs::File> {
}
fn next_minute() -> SystemTime {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH).unwrap();
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let secs = now.as_secs();
let next = (secs / 60 + 1) * 60;
UNIX_EPOCH + Duration::from_secs(next)
@ -107,7 +101,7 @@ impl Handler {
async fn new(gadget: UsbGadget) -> anyhow::Result<Self> {
if AUTO_CYCLE {
info!("🛠️ Initial USB reset…");
let _ = gadget.cycle(); // ignore failure - may boot without host
let _ = gadget.cycle(); // ignore failure - may boot without host
} else {
info!("🛠️ AUTO_CYCLE disabled - no initial reset");
}
@ -138,10 +132,10 @@ impl Handler {
#[tonic::async_trait]
impl Relay for Handler {
/* existing streams ─ unchanged, except: no more auto-reset */
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send>>;
type CaptureAudioStream = Pin<Box<dyn Stream<Item=Result<AudioPacket,Status>> + Send>>;
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item = Result<VideoPacket, Status>> + Send>>;
type CaptureAudioStream = Pin<Box<dyn Stream<Item = Result<AudioPacket, Status>> + Send>>;
type StreamMicrophoneStream = ReceiverStream<Result<Empty, Status>>;
type StreamCameraStream = ReceiverStream<Result<Empty, Status>>;
@ -191,9 +185,9 @@ impl Relay for Handler {
&self,
req: Request<tonic::Streaming<AudioPacket>>,
) -> Result<Response<Self::StreamMicrophoneStream>, Status> {
// 1 ─ build once, early
let mut sink = audio::Voice::new("hw:UAC2Gadget,0").await
let mut sink = audio::Voice::new("hw:UAC2Gadget,0")
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
// 2 ─ dummy outbound stream (same trick as before)
@ -202,8 +196,7 @@ impl Relay for Handler {
// 3 ─ drive the sink in a background task
tokio::spawn(async move {
let mut inbound = req.into_inner();
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
while let Some(pkt) = inbound.next().await.transpose()? {
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
@ -212,7 +205,7 @@ impl Relay for Handler {
}
sink.push(&pkt);
}
sink.finish(); // flush on EOS
sink.finish(); // flush on EOS
let _ = tx.send(Ok(Empty {})).await;
Ok::<(), Status>(())
});
@ -225,25 +218,24 @@ impl Relay for Handler {
req: Request<tonic::Streaming<VideoPacket>>,
) -> Result<Response<Self::StreamCameraStream>, Status> {
// map gRPC camera id → UVC device
let uvc = std::env::var("LESAVKA_UVC_DEV")
.unwrap_or_else(|_| "/dev/video4".into());
let uvc = std::env::var("LESAVKA_UVC_DEV").unwrap_or_else(|_| "/dev/video4".into());
// build once
let relay = video::CameraRelay::new(0, &uvc)
.map_err(|e| Status::internal(format!("{e:#}")))?;
let relay =
video::CameraRelay::new(0, &uvc).map_err(|e| Status::internal(format!("{e:#}")))?;
// dummy outbound (same pattern as other streams)
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let mut s = req.into_inner();
while let Some(pkt) = s.next().await.transpose()? {
relay.feed(pkt); // ← all logging inside video.rs
relay.feed(pkt); // ← all logging inside video.rs
}
tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(())
});
Ok(Response::new(ReceiverStream::new(rx)))
}
@ -269,10 +261,9 @@ impl Relay for Handler {
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
// Only one speaker stream for now; both 0/1 → same ALSA dev.
let _id = req.into_inner().id;
let _id = req.into_inner().id;
// Allow override (`LESAVKA_ALSA_DEV=hw:2,0` for debugging).
let dev = std::env::var("LESAVKA_ALSA_DEV")
.unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let dev = std::env::var("LESAVKA_ALSA_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let s = audio::ear(&dev, 0)
.await
@ -282,10 +273,7 @@ impl Relay for Handler {
}
/*────────────── USB-reset RPC ────────────*/
async fn reset_usb(
&self,
_req: Request<Empty>,
) -> Result<Response<ResetUsbReply>, Status> {
async fn reset_usb(&self, _req: Request<Empty>) -> Result<Response<ResetUsbReply>, Status> {
info!("🔴 explicit ResetUsb() called");
match self.gadget.cycle() {
Ok(_) => {
@ -314,17 +302,17 @@ async fn main() -> anyhow::Result<()> {
error!("💥 panic: {p}\n{bt}");
}));
let gadget = UsbGadget::new("lesavka");
let gadget = UsbGadget::new("lesavka");
let handler = Handler::new(gadget.clone()).await?;
info!("🌐 lesavka-server listening on 0.0.0.0:50051");
Server::builder()
.tcp_nodelay(true)
.max_frame_size(Some(2*1024*1024))
.max_frame_size(Some(2 * 1024 * 1024))
.add_service(RelayServer::new(handler))
.add_service(HandshakeSvc::server())
.add_service(ReflBuilder::configure().build_v1().unwrap())
.serve(([0,0,0,0], 50051).into())
.serve(([0, 0, 0, 0], 50051).into())
.await?;
Ok(())
}

View File

@ -1,16 +1,16 @@
// server/src/video.rs
use anyhow::Context;
use futures_util::Stream;
use gst::MessageView::*;
use gst::prelude::*;
use gst::{MessageView, log};
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use gst::{log, MessageView};
use gst::MessageView::*;
use lesavka_common::lesavka::VideoPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, warn, error, info, enabled, trace, Level};
use futures_util::Stream;
use tracing::{Level, debug, enabled, error, info, trace, warn};
const EYE_ID: [&str; 2] = ["l", "r"];
static START: std::sync::OnceLock<gst::ClockTime> = std::sync::OnceLock::new();
@ -37,11 +37,7 @@ impl Drop for VideoStream {
}
}
pub async fn eye_ball(
dev: &str,
id: u32,
_max_bitrate_kbit: u32,
) -> anyhow::Result<VideoStream> {
pub async fn eye_ball(dev: &str, id: u32, _max_bitrate_kbit: u32) -> anyhow::Result<VideoStream> {
let eye = EYE_ID[id as usize];
gst::init().context("gst init")?;
@ -79,8 +75,10 @@ pub async fn eye_ball(
/* ----- BUS WATCH: show errors & warnings immediately --------------- */
let bus = pipeline.bus().expect("bus");
if let Some(src_pad) = pipeline.by_name(&format!("cam_{eye}"))
.and_then(|e| e.static_pad("src")) {
if let Some(src_pad) = pipeline
.by_name(&format!("cam_{eye}"))
.and_then(|e| e.static_pad("src"))
{
src_pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data {
if let gst::EventView::Caps(c) = ev.view() {
@ -139,8 +137,7 @@ pub async fn eye_ball(
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
/* -------- basic counters ------ */
static FRAME: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
static FRAME: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = FRAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n % 120 == 0 {
trace!(target: "lesavka_server::video", "eye-{eye}: delivered {n} frames");
@ -157,7 +154,9 @@ pub async fn eye_ball(
/* -------- detect SPS / IDR ---- */
if enabled!(Level::DEBUG) {
if let Some(&nal) = map.as_slice().get(4) {
if (nal & 0x1F) == 0x05 /* IDR */ {
if (nal & 0x1F) == 0x05
/* IDR */
{
debug!("eye-{eye}: IDR");
}
}
@ -175,7 +174,11 @@ pub async fn eye_ball(
/* -------- ship over gRPC ----- */
let data = map.as_slice().to_vec();
let size = data.len();
let pkt = VideoPacket { id, pts: pts_us, data };
let pkt = VideoPacket {
id,
pts: pts_us,
data,
};
match tx.try_send(Ok(pkt)) {
Ok(_) => {
trace!(target:"lesavka_server::video",
@ -202,22 +205,31 @@ pub async fn eye_ball(
.build(),
);
pipeline.set_state(gst::State::Playing).context("🎥 starting video pipeline eye-{eye}")?;
pipeline
.set_state(gst::State::Playing)
.context("🎥 starting video pipeline eye-{eye}")?;
let bus = pipeline.bus().unwrap();
loop {
match bus.timed_pop(gst::ClockTime::NONE) {
Some(msg) if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) => break,
Some(msg)
if matches!(msg.view(), MessageView::StateChanged(s)
if s.current() == gst::State::Playing) =>
{
break;
}
Some(_) => continue,
None => continue,
}
}
Ok(VideoStream { _pipeline: pipeline, inner: ReceiverStream::new(rx) })
Ok(VideoStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
})
}
pub struct WebcamSink {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline,
_pipe: gst::Pipeline,
}
impl WebcamSink {
@ -225,39 +237,85 @@ impl WebcamSink {
gst::init()?;
let pipeline = gst::Pipeline::new();
let caps_h264 = gst::Caps::builder("video/x-h264")
.field("stream-format", "byte-stream")
.field("alignment", "au")
.build();
let raw_caps = gst::Caps::builder("video/x-raw")
.field("format", "YUY2")
.field("width", 1280i32)
.field("height", 720i32)
.field("framerate", gst::Fraction::new(30, 1))
.build();
let src = gst::ElementFactory::make("appsrc")
.build()?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
src.set_is_live(true);
src.set_format(gst::Format::Time);
src.set_caps(Some(&caps_h264));
src.set_property("block", &true);
let h264parse = gst::ElementFactory::make("h264parse").build()?;
let decoder = gst::ElementFactory::make("v4l2h264dec").build()?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
let decoder_name = Self::pick_decoder();
let decoder = gst::ElementFactory::make(decoder_name)
.build()
.with_context(|| format!("building decoder element {decoder_name}"))?;
let convert = gst::ElementFactory::make("videoconvert").build()?;
let caps = gst::ElementFactory::make("capsfilter")
.property("caps", &raw_caps)
.build()?;
let sink = gst::ElementFactory::make("v4l2sink")
.property("device", &uvc_dev)
.property("sync", &false)
.build()?;
// Upcast to &gst::Element for the collection macros
pipeline.add_many(&[
src.upcast_ref(), &h264parse, &decoder, &convert, &sink
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&caps,
&sink,
])?;
gst::Element::link_many(&[
src.upcast_ref(), &h264parse, &decoder, &convert, &sink
src.upcast_ref(),
&h264parse,
&decoder,
&convert,
&caps,
&sink,
])?;
pipeline.set_state(gst::State::Playing)?;
Ok(Self { appsrc: src, _pipe: pipeline })
Ok(Self {
appsrc: src,
_pipe: pipeline,
})
}
pub fn push(&self, pkt: VideoPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut().unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.appsrc.push_buffer(buf);
}
fn pick_decoder() -> &'static str {
if gst::ElementFactory::find("v4l2h264dec").is_some() {
"v4l2h264dec"
} else if gst::ElementFactory::find("v4l2slh264dec").is_some() {
"v4l2slh264dec"
} else if gst::ElementFactory::find("omxh264dec").is_some() {
"omxh264dec"
} else {
"avdec_h264"
}
}
}
/*─────────────────────────────────*/
@ -265,15 +323,15 @@ impl WebcamSink {
/*─────────────────────────────────*/
pub struct CameraRelay {
sink: WebcamSink, // the v4l2sink pipeline (or stub)
id: u32, // gRPC “id” (for future multicam)
sink: WebcamSink, // the v4l2sink pipeline (or stub)
id: u32, // gRPC “id” (for future multicam)
frames: std::sync::atomic::AtomicU64,
}
impl CameraRelay {
pub fn new(id: u32, uvc_dev: &str) -> anyhow::Result<Self> {
Ok(Self {
sink: WebcamSink::new(uvc_dev)?,
sink: WebcamSink::new(uvc_dev)?,
id,
frames: std::sync::atomic::AtomicU64::new(0),
})
@ -281,7 +339,9 @@ impl CameraRelay {
/// Push one VideoPacket coming from the client
pub fn feed(&self, pkt: VideoPacket) {
let n = self.frames.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let n = self
.frames
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 60 == 0 {
tracing::debug!(target:"lesavka_server::video",
cam_id = self.id,

View File

@ -1,14 +1,24 @@
#[tokio::test]
async fn hid_roundtrip() {
use lesavka_common::lesavka::*;
use lesavka_server::RelaySvc; // export the struct in lib.rs
use lesavka_server::RelaySvc; // export the struct in lib.rs
let svc = RelaySvc::default();
let (mut cli, srv) = tonic::transport::Channel::balance_channel(1);
tokio::spawn(tonic::transport::server::Server::builder()
.add_service(relay_server::RelayServer::new(svc))
.serve_with_incoming(srv));
tokio::spawn(
tonic::transport::server::Server::builder()
.add_service(relay_server::RelayServer::new(svc))
.serve_with_incoming(srv),
);
let (mut tx, mut rx) = relay_client::RelayClient::new(cli).stream().await.unwrap().into_inner();
tx.send(HidReport { data: vec![0,0,4,0,0,0,0,0] }).await.unwrap();
assert!(rx.message().await.unwrap().is_none()); // nothing echoed yet
let (mut tx, mut rx) = relay_client::RelayClient::new(cli)
.stream()
.await
.unwrap()
.into_inner();
tx.send(HidReport {
data: vec![0, 0, 4, 0, 0, 0, 0, 0],
})
.await
.unwrap();
assert!(rx.message().await.unwrap().is_none()); // nothing echoed yet
}