0.4.0 mostly complete milestone

This commit is contained in:
Brad Stein 2025-06-29 03:46:34 -05:00
parent c781d8b98e
commit 112deaf5be
24 changed files with 271 additions and 104 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.3.0"
version = "0.4.0"
edition = "2024"
[dependencies]

View File

@ -16,7 +16,9 @@ use lesavka_common::lesavka::{
relay_client::RelayClient, KeyboardReport, MonitorRequest, MouseReport, VideoPacket,
};
use crate::{input::inputs::InputAggregator, output::video::MonitorWindow};
use crate::{input::inputs::InputAggregator,
output::video::MonitorWindow,
output::audio::AudioOut};
pub struct LesavkaClientApp {
aggregator: Option<InputAggregator>,
@ -84,8 +86,8 @@ impl LesavkaClientApp {
std::thread::spawn(move || {
let el = EventLoopBuilder::<()>::new().with_any_thread(true).build().unwrap();
let win0 = MonitorWindow::new(0, &el).expect("win0");
let win1 = MonitorWindow::new(1, &el).expect("win1");
let win0 = MonitorWindow::new(0).expect("win0");
let win1 = MonitorWindow::new(1).expect("win1");
let _ = el.run(move |_: Event<()>, _elwt| {
_elwt.set_control_flow(ControlFlow::WaitUntil(
@ -113,7 +115,14 @@ impl LesavkaClientApp {
});
/*────────── start video gRPC pullers ──────────*/
tokio::spawn(Self::video_loop(vid_ep.clone(), video_tx));
let ep_video = vid_ep.clone();
tokio::spawn(Self::video_loop(ep_video, video_tx));
/*────────── audio renderer & puller ───────────*/
let audio_out = AudioOut::new()?;
let ep_audio = vid_ep.clone();
tokio::spawn(Self::audio_loop(ep_audio, audio_out));
/*────────── central reactor ───────────────────*/
tokio::select! {
@ -218,4 +227,21 @@ impl LesavkaClientApp {
});
}
}
/*──────────────── audio stream ───────────────*/
async fn audio_loop(ep: Channel, out: AudioOut) {
loop {
let mut cli = RelayClient::new(ep.clone());
let req = MonitorRequest { id: 0, max_bitrate: 0 };
match cli.capture_audio(Request::new(req)).await {
Ok(mut stream) => {
while let Some(res) = stream.get_mut().message().await.transpose() {
if let Ok(pkt) = res { out.push(pkt); }
}
}
Err(e) => tracing::warn!("audio stream err: {e}"),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}

View File

@ -1,7 +1,7 @@
// client/src/input/inputs.rs
use anyhow::{bail, Context, Result};
use evdev::{Device, EventType, KeyCode, RelativeAxisCode};
use evdev::{Device, EventType, KeyCode, RelativeAxisCode, ReadFlag, WriteFlag};
use tokio::{sync::broadcast::Sender, time::{interval, Duration}};
use tracing::{debug, info, warn};
@ -49,12 +49,13 @@ impl InputAggregator {
}
// attempt open
let mut dev = match Device::open(&path) {
let mut dev = match Device::open_with_options(
&path,
ReadFlag::NORMAL | ReadFlag::BLOCKING,
WriteFlag::NORMAL, // <-- write access needed for EVIOCGRAB
) {
Ok(d) => d,
Err(e) => {
tracing::debug!("Skipping {:?}, open error {e}", path);
continue;
}
Err(e) => { }
};
// non-blocking so fetch_events never stalls the whole loop

View File

@ -94,3 +94,12 @@ impl KeyboardAggregator {
&& self.has_key(KeyCode::KEY_ESC)
}
}
impl Drop for KeyboardAggregator {
fn drop(&mut self) {
let _ = self.dev.ungrab();
let _ = self.tx.send(KeyboardReport {
data: [0; 8].into(),
});
}
}

View File

@ -72,8 +72,8 @@ impl MouseAggregator {
}
fn flush(&mut self) {
if Instant::now() < self.next_send { return; }
self.next_send += SEND_INTERVAL;
if self.buttons == self.last_buttons && Instant::now() < self.next_send { return; }
self.next_send = Instant::now() + SEND_INTERVAL;
let pkt = [
self.buttons,
@ -100,3 +100,12 @@ impl MouseAggregator {
}
}
impl Drop for MouseAggregator {
fn drop(&mut self) {
let _ = self.dev.ungrab();
let _ = self.tx.send(MouseReport {
data: [0; 8].into(),
});
}
}

View File

@ -0,0 +1,46 @@
// client/src/output/audio.rs
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::AudioPacket;
use gst::prelude::*;
pub struct AudioOut {
src: gst_app::AppSrc,
}
impl AudioOut {
pub fn new() -> anyhow::Result<Self> {
gst::init()?;
// Autoaudiosink picks PipeWire / Pulse etc.
const PIPE: &str =
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! autoaudiosink";
// `parse_launch()` returns `gst::Element`; downcast manually and
// map the error into anyhow ourselves (no `?` on the downcast).
let pipeline: gst::Pipeline = gst::parse::launch(PIPE)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
let src: gst_app::AppSrc = pipeline
.by_name("src")
.expect("no src element")
.downcast::<gst_app::AppSrc>()
.expect("src not an AppSrc");
src.set_format(gst::Format::Time);
pipeline.set_state(gst::State::Playing)?;
Ok(Self { src })
}
pub fn push(&self, pkt: AudioPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.src.push_buffer(buf);
}
}

View File

@ -1,3 +1,4 @@
// client/src/output/mod.rs
pub mod audio;
pub mod video;

View File

@ -1,84 +1,70 @@
// client/src/output/video.rs
use anyhow::Context;
use gstreamer as gst;
use gstreamer_app as gst_app;
use gst::prelude::*;
use lesavka_common::lesavka::VideoPacket;
use winit::{
event_loop::EventLoop,
window::{Window, WindowAttributes},
};
const DESC: &str = concat!(
/* ---------- pipeline ----------------------------------------------------
* H.264/AU Decoded
* AppSrc decodebin glimagesink
* (autoplug) (overlay) |
* ----------------------------------------------------------------------*/
const PIPELINE_DESC: &str = concat!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! ",
"queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 leaky=downstream ! ",
"queue leaky=downstream ! ",
"capsfilter caps=video/x-h264,stream-format=byte-stream,alignment=au ! ",
"h264parse disable-passthrough=true ! decodebin ! ",
"queue ! videoconvert ! autovideosink sync=false"
"h264parse disable-passthrough=true ! decodebin ! videoconvert ! ",
"glimagesink name=sink sync=false"
);
pub struct MonitorWindow {
id: u32,
_window: Window,
src: gst_app::AppSrc,
_pipeline: gst::Pipeline,
src: gst_app::AppSrc,
}
impl MonitorWindow {
pub fn new(id: u32, el: &EventLoop<()>) -> anyhow::Result<Self> {
gst::init()?; // idempotent
pub fn new(_id: u32) -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
/* ---------- Wayland / X11 window ------------- */
let window = el
.create_window(
WindowAttributes::default()
.with_title(format!("Lesavka-eye-{id}"))
.with_decorations(true),
)?;
// --- Build pipeline ------------------------------------------------
let pipeline: gst::Pipeline = gst::parse::launch(PIPELINE_DESC)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
/* ---------- GStreamer pipeline --------------- */
let caps = gst::Caps::builder("video/x-h264")
// Optional: turn the sink fullscreen when LESAVKA_FULLSCREEN=1
if std::env::var("LESAVKA_FULLSCREEN").is_ok() {
if let Some(sink) = pipeline.by_name("sink") {
sink.set_property_from_str("fullscreen", "true");
sink.set_property("force-aspect-ratio", &true);
// Wayland & GL sinks accept it :contentReference[oaicite:1]{index=1}
}
}
/* ---------- AppSrc ------------------------------------------------- */
let src: gst_app::AppSrc = pipeline
.by_name("src")
.unwrap()
.downcast::<gst_app::AppSrc>()
.unwrap();
src.set_caps(Some(&gst::Caps::builder("video/x-h264")
.field("stream-format", &"byte-stream")
.field("alignment", &"au")
.build();
// let pipeline = gst::parse::launch(DESC)?
// .downcast::<gst::Pipeline>()
// .expect("pipeline down-cast");
let pipeline: gst::Pipeline = gst::parse::launch(DESC)?
.downcast()
.expect("pipeline");
// let src = pipeline
// .by_name("src")
// .expect("appsrc element not found")
// .downcast::<gst_app::AppSrc>()
// .expect("appsrc down-cast");
let src: gst_app::AppSrc = pipeline.by_name("src")
.expect("appsrc")
.downcast()
.expect("appsrc");
src.set_caps(Some(&caps));
src.set_format(gst::Format::Time); // downstream clock
src.set_property("blocksize", &0u32); // one AU per buffer
// NOTE: set_property() and friends return (), so no `?`
src.set_property("do-timestamp", &true);
src.set_latency(gst::ClockTime::NONE, gst::ClockTime::NONE);
.field("alignment", &"au")
.build()));
src.set_format(gst::Format::Time);
pipeline.set_state(gst::State::Playing)?;
Ok(Self { id, _window: window, src })
Ok(Self { _pipeline: pipeline, src })
}
/// Feed one H.264 access-unit into the pipeline.
/// Feed one access-unit to the decoder.
pub fn push_packet(&self, pkt: VideoPacket) {
// Mutable so we can set the PTS:
let mut buf = gst::Buffer::from_slice(pkt.data);
// if let Some(ref mut b) = buf.get_mut() {
// b.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
// }
{
let b = buf.get_mut().unwrap();
b.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
}
let _ = self.src.push_buffer(buf); // ignore Eos / flushing
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.src.push_buffer(buf); // ignore Eos/flushing
}
}

View File

@ -1,11 +1,12 @@
[[bin]]
name = "lesavka-common"
path = "build.rs"
[package]
name = "lesavka_common"
version = "0.3.0"
version = "0.4.0"
edition = "2024"
build = "build.rs"
[lib]
name = "lesavka_common"
path = "src/lib.rs"
[dependencies]
tonic = { version = "0.13", features = ["transport"] }
@ -14,6 +15,6 @@ prost = "0.13"
[build-dependencies]
tonic-build = { version = "0.13", features = ["prost"] }
[lib]
name = "lesavka_common"
path = "src/lib.rs"
[[bin]]
name = "lesavka-common"
path = "src/bin/cli.rs"

View File

@ -6,6 +6,7 @@ message MouseReport { bytes data = 1; }
message MonitorRequest { uint32 id = 1; uint32 max_bitrate = 2; }
message VideoPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; }
message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; }
message ResetUsbRequest {} // empty body
message ResetUsbReply { bool ok = 1; } // true = success
@ -14,5 +15,6 @@ service Relay {
rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport);
rpc StreamMouse (stream MouseReport) returns (stream MouseReport);
rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket);
rpc CaptureAudio (MonitorRequest) returns (stream AudioPacket);
rpc ResetUsb (ResetUsbRequest) returns (ResetUsbReply);
}

3
common/src/bin/cli.rs Normal file
View File

@ -0,0 +1,3 @@
fn main() {
lesavka_common::run_cli();
}

View File

@ -4,3 +4,7 @@
pub mod lesavka {
include!(concat!(env!("OUT_DIR"), "/lesavka.rs"));
}
pub fn run_cli() {
println!("lesavka-common CLI (v{})", env!("CARGO_PKG_VERSION"));
}

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# lesavka-core.sh - background stealth daemon to present gadget as usb hub of genuine devices
# scripts/daemon/lesavka-core.sh - background stealth daemon to present gadget as usb hub of genuine devices
# Proven Pi-5 configfs gadget: HID keyboard+mouse
# Still need Web Cam Support + stereo UAC2

View File

@ -5,8 +5,9 @@ set -euo pipefail
ORIG_USER=${SUDO_USER:-$(id -un)}
# 1. packages (Arch)
sudo pacman -Syq --needed --noconfirm git rustup protobuf gcc evtest gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav
sudo pacman -Syq --needed --noconfirm git rustup protobuf gcc evtest gstreamer gst-plugins-base gst-plugins-good gst-plugins-bad gst-plugins-ugly gst-libav pipewire pipewire-pulse
yay -S --noconfirm grpcurl-bin
sudo usermod -aG input "$ORIG_USER"
# 2. Rust tool-chain for both root & user
sudo rustup default stable

View File

@ -105,7 +105,7 @@ After=network.target lesavka-core.service
[Service]
ExecStart=/usr/local/bin/lesavka-server
Restart=always
Environment=RUST_LOG=lesavka_server=info,lesavka_server::video=trace,lesavka_server::usb_gadget=debug
Environment=RUST_LOG=lesavka_server=info,lesavka_server::audio=info,lesavka_server::video=info,lesavka_server::usb_gadget=info
Environment=RUST_BACKTRACE=1
Restart=always
RestartSec=5

View File

@ -6,5 +6,5 @@ grpcurl \
-import-path ./../../common/proto \
-proto lesavka.proto \
-d '{}' \
192.168.42.253:50051 \
64.25.10.31:50051 \
lesavka.Relay/ResetUsb

View File

@ -0,0 +1,4 @@
#!/usr/bin/env bash
# scripts/manual/vpn-open.sh
sudo openvpn --config ~/cyberghost/openvpn.ovpn

View File

@ -0,0 +1,4 @@
#!/usr/bin/env bash
# scripts/manual/vpn-test.sh
set -x IP $(curl -s https://api.ipify.org) && echo $IP && curl http://ip-api.com/json/$IP?fields=country,city,lat,lon

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_server"
version = "0.3.0"
version = "0.4.0"
edition = "2024"
[dependencies]

59
server/src/audio.rs Normal file
View File

@ -0,0 +1,59 @@
use anyhow::Context;
use futures_util::Stream;
use gstreamer as gst;
use gstreamer_app as gst_app;
use lesavka_common::lesavka::AudioPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, trace};
use gst::prelude::*;
const PIPE: &str = "appsrc name=audsrc is-live=true do-timestamp=true ! aacparse ! queue ! appsink name=asink emit-signals=true";
pub struct AudioStream {
_pipe: gst::Pipeline,
inner: ReceiverStream<Result<AudioPacket, Status>>,
}
impl Stream for AudioStream {
type Item = Result<AudioPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
pub async fn eye_ear(
dev: &str,
id: u32,
) -> anyhow::Result<AudioStream> {
gst::init().context("gst init")?;
let desc = format!("v4l2src device={dev} io-mode=mmap !
queue ! tsdemux ! aacparse !
queue ! appsink name=asink emit-signals=true");
let pipe: gst::Pipeline = gst::parse::launch(&desc)?
.downcast()
.expect("pipeline");
let sink = pipe.by_name("asink").expect("asink").downcast::<gst_app::AppSink>().unwrap();
let (tx, rx) = tokio::sync::mpsc::channel(8192);
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |s| {
let samp = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buf = samp.buffer().ok_or(gst::FlowError::Error)?;
let map = buf.map_readable().map_err(|_| gst::FlowError::Error)?;
let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds()/1_000;
let pkt = AudioPacket { id, pts, data: map.as_slice().to_vec() };
trace!("srv→grpc audio-{id} {}", pkt.data.len());
let _ = tx.try_send(Ok(pkt));
Ok(gst::FlowSuccess::Ok)
}).build()
);
pipe.set_state(gst::State::Playing)?;
Ok(AudioStream{ _pipe: pipe, inner: ReceiverStream::new(rx) })
}

View File

@ -1,4 +1,4 @@
// server/src/usb_gadget.rs
// server/src/gadget.rs
use std::{fs::{self, OpenOptions}, io::Write, path::Path, thread, time::Duration};
use anyhow::{Context, Result};
use tracing::{info, warn, trace};

View File

@ -1,4 +1,5 @@
// server/src/lib.rs
pub mod audio;
pub mod video;
pub mod usb_gadget;
pub mod gadget;

View File

@ -22,10 +22,11 @@ use tracing_appender::non_blocking::WorkerGuard;
use lesavka_common::lesavka::{
ResetUsbRequest, ResetUsbReply,
relay_server::{Relay, RelayServer},
KeyboardReport, MouseReport, MonitorRequest, VideoPacket,
KeyboardReport, MouseReport,
MonitorRequest, VideoPacket, AudioPacket
};
use lesavka_server::{usb_gadget::UsbGadget, video};
use lesavka_server::{usb_gadget::UsbGadget, video, audio};
/*──────────────── constants ────────────────*/
/// **false** = never reset automatically.
@ -120,6 +121,7 @@ impl Relay for Handler {
type StreamKeyboardStream = ReceiverStream<Result<KeyboardReport, Status>>;
type StreamMouseStream = ReceiverStream<Result<MouseReport, Status>>;
type CaptureVideoStream = Pin<Box<dyn Stream<Item=Result<VideoPacket,Status>> + Send>>;
type CaptureAudioStream = Pin<Box<dyn Stream<Item=Result<AudioPacket,Status>> + Send>>;
async fn stream_keyboard(
&self,
@ -180,6 +182,20 @@ impl Relay for Handler {
Ok(Response::new(Box::pin(s)))
}
async fn capture_audio(
&self,
req: Request<MonitorRequest>,
) -> Result<Response<Self::CaptureAudioStream>, Status> {
let id = req.into_inner().id;
let dev = match id { 0 => "/dev/lesavka_l_eye",
1 => "/dev/lesavka_r_eye",
_ => return Err(Status::invalid_argument("monitor id must be 0 or 1")) };
let s = audio::eye_ear(dev, id)
.await
.map_err(|e| Status::internal(format!("{e:#}")))?;
Ok(Response::new(Box::pin(s)))
}
/*────────────── USB-reset RPC ───────────*/
async fn reset_usb(
&self,

View File

@ -46,24 +46,18 @@ pub async fn eye_ball(
// let desc = format!(
// "v4l2src device={dev} io-mode=mmap ! \
// video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \
// h264parse config-interval=-1 ! \
// appsink name=sink emit-signals=true drop=true sync=false"
// queue max-size-time=0 ! tsdemux name=d ! \
// d. ! h264parse config-interval=1 ! queue ! appsink name=vsink emit-signals=true \
// d. ! aacparse ! queue ! appsink name=asink emit-signals=true"
// );
let desc = format!(
"v4l2src device={dev} io-mode=mmap ! \
queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 ! \
video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \
h264parse config-interval=1 ! \
appsink name=sink emit-signals=true drop=false sync=false"
queue max-size-buffers=0 max-size-bytes=0 max-size-time=0 ! tsdemux name=d ! \
video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! tsdemux name=d ! \
d. ! h264parse config-interval=1 ! queue ! appsink name=vsink emit-signals=true \
d. ! aacparse ! queue ! h264parse config-interval=1 ! appsink name=sink \
emit-signals=true drop=false sync=false"
);
// let desc = format!(
// "v4l2src device={dev} io-mode=mmap ! videorate skip-to-first=true ! \
// queue2 max-size-buffers=0 max-size-bytes=0 min-threshold-time=10000000 ! \
// video/x-h264,stream-format=byte-stream,alignment=au,profile=high ! \
// h264parse config-interval=1 ! \
// appsink name=sink emit-signals=true drop=false sync=false"
// );
let pipeline = gst::parse::launch(&desc)?
.downcast::<gst::Pipeline>()