From 02bdc5d76b908ae9234cb284855e9acc3eb30ce0 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 30 Jun 2025 19:35:38 -0500 Subject: [PATCH] Mic Setup --- client/Cargo.toml | 3 +- client/src/app.rs | 49 +++++++++++++++++----- client/src/input/inputs.rs | 6 +-- client/src/input/microphone.rs | 74 ++++++++++++++++++++++++++++++---- client/src/output/audio.rs | 6 +-- client/src/output/video.rs | 33 +++++++++++++++ common/Cargo.toml | 2 +- common/proto/lesavka.proto | 16 ++++---- scripts/daemon/lesavka-core.sh | 4 ++ server/Cargo.toml | 2 +- server/src/audio.rs | 24 +++++++++++ server/src/main.rs | 46 ++++++++++++++++++--- 12 files changed, 224 insertions(+), 41 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 03e2de4..990e335 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.4.0" +version = "0.5.0" edition = "2024" [dependencies] @@ -27,6 +27,7 @@ winit = "0.30" raw-window-handle = "0.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +async-stream = "0.3" [build-dependencies] prost-build = "0.13" diff --git a/client/src/app.rs b/client/src/app.rs index f34361b..37fbb52 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -17,6 +17,7 @@ use lesavka_common::lesavka::{ }; use crate::{input::inputs::InputAggregator, + input::microphone::MicrophoneCapture, output::video::MonitorWindow, output::audio::AudioOut}; @@ -98,7 +99,7 @@ impl LesavkaClientApp { while let Ok(pkt) = video_rx.try_recv() { CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if CNT.load(std::sync::atomic::Ordering::Relaxed) % 300 == 0 { - tracing::debug!("πŸŽ₯ received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed)); + debug!("πŸŽ₯ received {} video packets", CNT.load(std::sync::atomic::Ordering::Relaxed)); } let n = DUMP_CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n % 120 == 0 { @@ -125,6 +126,11 @@ impl LesavkaClientApp { tokio::spawn(Self::audio_loop(ep_audio, audio_out)); + /*────────── microphone gRPC pusher ───────────*/ + let mic = Arc::new(MicrophoneCapture::new()?); + let ep_mic = vid_ep.clone(); + tokio::spawn(Self::mic_loop(ep_mic, mic)); + /*────────── central reactor ───────────────────*/ tokio::select! { _ = kbd_loop => { warn!("⚠️⌨️ keyboard stream finished"); }, @@ -149,20 +155,19 @@ impl LesavkaClientApp { /*──────────────── keyboard stream ───────────────*/ async fn stream_loop_keyboard(&self, ep: Channel) { loop { - info!("βŒ¨οΈπŸ€™ dial {}", self.server_addr); // LESAVKA-client + info!("βŒ¨οΈπŸ€™ Keyboard dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); - // βœ… use kbd_tx here - fixes E0271 let outbound = BroadcastStream::new(self.kbd_tx.subscribe()) .filter_map(|r| r.ok()); match cli.stream_keyboard(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { - if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; } + if let Err(e) = msg { warn!("⌨️ server err: {e}"); break; } } } - Err(e) => warn!("⌨️ connect failed: {e}"), + Err(e) => warn!("⌨️ connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; // retry } @@ -171,7 +176,7 @@ impl LesavkaClientApp { /*──────────────── mouse stream ──────────────────*/ async fn stream_loop_mouse(&self, ep: Channel) { loop { - info!("πŸ–±οΈπŸ€™ dial {}", self.server_addr); + info!("πŸ–±οΈπŸ€™ Mouse dial {}", self.server_addr); let mut cli = RelayClient::new(ep.clone()); let outbound = BroadcastStream::new(self.mou_tx.subscribe()) @@ -180,10 +185,10 @@ impl LesavkaClientApp { match cli.stream_mouse(Request::new(outbound)).await { Ok(mut resp) => { while let Some(msg) = resp.get_mut().message().await.transpose() { - if let Err(e) = msg { warn!("πŸ–±οΈ server err: {e}"); break; } + if let Err(e) = msg { warn!("πŸ–±οΈ server err: {e}"); break; } } } - Err(e) => warn!("πŸ–±οΈ connect failed: {e}"), + Err(e) => warn!("πŸ–±οΈ connect failed: {e}"), } tokio::time::sleep(Duration::from_secs(1)).await; } @@ -203,7 +208,7 @@ impl LesavkaClientApp { let req = MonitorRequest { id: monitor_id, max_bitrate: 6_000 }; match cli.capture_video(Request::new(req)).await { Ok(mut stream) => { - info!("πŸŽ₯ cli video{monitor_id}: stream opened"); + debug!("πŸŽ₯ cli video{monitor_id}: stream opened"); while let Some(res) = stream.get_mut().message().await.transpose() { match res { Ok(pkt) => { @@ -245,4 +250,30 @@ impl LesavkaClientApp { tokio::time::sleep(Duration::from_secs(1)).await; } } + + /*──────────────── mic stream ─────────────────*/ + async fn mic_loop(ep: Channel, mic: Arc) { + loop { + let mut cli = RelayClient::new(ep.clone()); + + let mic_clone = mic.clone(); + let stream = async_stream::stream! { + loop { + if let Some(pkt) = mic_clone.pull() { + yield pkt; + } else { + break; // EOS – should not happen + } + } + }; + + match cli.stream_microphone(Request::new(stream)).await { + Ok(mut resp) => { + while resp.get_mut().message().await?.is_some() {} + } + Err(e) => warn!("🎀 connect failed: {e}"), + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index 31e1aac..7b138bc 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -20,7 +20,6 @@ pub struct InputAggregator { keyboards: Vec, mice: Vec, camera: Option, - mic: Option, } impl InputAggregator { @@ -29,7 +28,7 @@ impl InputAggregator { mou_tx: Sender) -> Self { Self { kbd_tx, mou_tx, dev_mode, released: false, magic_active: false, keyboards: Vec::new(), mice: Vec::new(), - camera: None, mic: None } + camera: None } } /// Called once at startup: enumerates input devices, @@ -96,7 +95,6 @@ impl InputAggregator { // Stubs for camera / mic: self.camera = Some(CameraCapture::new_stub()); - self.mic = Some(MicrophoneCapture::new_stub()); Ok(()) } @@ -135,8 +133,6 @@ impl InputAggregator { mouse.process_events(); } - // camera / mic stubs go here - self.magic_active = magic_now; tick.tick().await; } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 2c5a61f..57ba3b7 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -1,19 +1,77 @@ // client/src/input/microphone.rs -use anyhow::Result; +#![forbid(unsafe_code)] + +use anyhow::{Context, Result}; +use gstreamer as gst; +use gstreamer_app as gst_app; +use gst::prelude::*; +use lesavka_common::lesavka::AudioPacket; +use tracing::{debug, error, info, warn}; pub struct MicrophoneCapture { - // no real fields yet + pipeline: gst::Pipeline, + sink: gst_app::AppSink, } impl MicrophoneCapture { - pub fn new_stub() -> Self { - // real code would open /dev/snd, or use cpal / rodio / etc - MicrophoneCapture {} + pub fn new() -> Result { + gst::init().ok(); // idempotent + + /* pulsesrc (default mic) β†’ AAC/ADTS β†’ appsink -------------------*/ + let desc = concat!( + "pulsesrc do-timestamp=true ! ", + "audio/x-raw,format=S16LE,channels=2,rate=48000 ! ", + "audioconvert ! audioresample ! avenc_aac bitrate=128000 ! ", + "aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2 ! ", + "appsink name=asink emit-signals=true max-buffers=50 drop=true" + ); + + let pipeline: gst::Pipeline = gst::parse::launch(desc)? + .downcast() + .expect("pipeline"); + let sink: gst_app::AppSink = pipeline + .by_name("asink") + .unwrap() + .downcast() + .unwrap(); + + /* ─── bus for diagnostics ───────────────────────────────────────*/ + { + let bus = pipeline.bus().unwrap(); + std::thread::spawn(move || { + use gst::MessageView::*; + for msg in bus.iter_timed(gst::ClockTime::NONE) { + match msg.view() { + StateChanged(s) if s.current() == gst::State::Playing + && msg.src().map(|s| s.is::()).unwrap_or(false) => + info!("🎀 mic pipeline ▢️ (source=pulsesrc)"), + Error(e) => + error!("🎀πŸ’₯ mic: {} ({})", e.error(), e.debug().unwrap_or_default()), + Warning(w) => + warn!("🎀⚠️ mic: {} ({})", w.error(), w.debug().unwrap_or_default()), + _ => {} + } + } + }); + } + + pipeline.set_state(gst::State::Playing) + .context("start mic pipeline")?; + + Ok(Self { pipeline, sink }) } - pub fn capture_audio(&mut self) -> Result<()> { - // no-op - Ok(()) + /// Blocking pull; call from an async wrapper + pub fn pull(&self) -> Option { + match self.sink.pull_sample() { + Ok(sample) => { + let buf = sample.buffer().unwrap(); + let map = buf.map_readable().unwrap(); + let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; + Some(AudioPacket { id: 0, pts, data: map.as_slice().to_vec() }) + } + Err(_) => None, + } } } diff --git a/client/src/output/audio.rs b/client/src/output/audio.rs index e045ef8..6154990 100644 --- a/client/src/output/audio.rs +++ b/client/src/output/audio.rs @@ -66,7 +66,7 @@ impl AudioOut { .build() )); src.set_format(gst::Format::Time); - + // ── 4. Log *all* warnings/errors from the bus ────────────────────── let bus = pipeline.bus().unwrap(); std::thread::spawn(move || { @@ -88,9 +88,9 @@ impl AudioOut { .map(|s| s.is::()) .unwrap_or(false) { - info!("πŸ”Š audio pipeline PLAYING (sink='{}')", sink); + info!("πŸ”Š audio pipeline ▢️ (sink='{}')", sink); } else { - debug!("πŸ”Š element {} now PLAYING", + debug!("πŸ”Š element {} now ▢️", msg.src().map(|s| s.name()).unwrap_or_default()); } }, diff --git a/client/src/output/video.rs b/client/src/output/video.rs index 374efa0..ec8170e 100644 --- a/client/src/output/video.rs +++ b/client/src/output/video.rs @@ -149,6 +149,39 @@ impl MonitorWindow { } } + { + let id = id; // move into thread + let bus = pipeline.bus().expect("no bus"); + std::thread::spawn(move || { + use gst::MessageView::*; + for msg in bus.iter_timed(gst::ClockTime::NONE) { + match msg.view() { + StateChanged(s) if s.current() == gst::State::Playing => { + if msg + .src() + .map(|s| s.is::()) + .unwrap_or(false) + { + info!( + "🎞️ video{id} pipeline ▢️ (sink='glimagesink')" + ); + } + } + Error(e) => error!( + "πŸ’₯ gst video{id}: {} ({})", + e.error(), + e.debug().unwrap_or_default() + ), + Warning(w) => warn!( + "⚠️ gst video{id}: {} ({})", + w.error(), + w.debug().unwrap_or_default() + ), + _ => {} + } + } + }); + } pipeline.set_state(gst::State::Playing)?; diff --git a/common/Cargo.toml b/common/Cargo.toml index 1e07584..4628718 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.4.0" +version = "0.5.0" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 950b6f9..4ee8de2 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -8,13 +8,15 @@ message MonitorRequest { uint32 id = 1; uint32 max_bitrate = 2; } message VideoPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } message AudioPacket { uint32 id = 1; uint64 pts = 2; bytes data = 3; } -message ResetUsbRequest {} // empty body -message ResetUsbReply { bool ok = 1; } // true = success +message ResetUsbReply { bool ok = 1; } // true = success + +message Empty {} service Relay { - rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport); - rpc StreamMouse (stream MouseReport) returns (stream MouseReport); - rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket); - rpc CaptureAudio (MonitorRequest) returns (stream AudioPacket); - rpc ResetUsb (ResetUsbRequest) returns (ResetUsbReply); + rpc StreamKeyboard (stream KeyboardReport) returns (stream KeyboardReport); + rpc StreamMouse (stream MouseReport) returns (stream MouseReport); + rpc CaptureVideo (MonitorRequest) returns (stream VideoPacket); + rpc CaptureAudio (MonitorRequest) returns (stream AudioPacket); + rpc StreamMicrophone (stream AudioPacket) returns (stream Empty) {} + rpc ResetUsb (Empty) returns (ResetUsbReply); } diff --git a/scripts/daemon/lesavka-core.sh b/scripts/daemon/lesavka-core.sh index 43c051a..e3c2ab5 100644 --- a/scripts/daemon/lesavka-core.sh +++ b/scripts/daemon/lesavka-core.sh @@ -122,6 +122,10 @@ ln -s $G/functions/hid.usb0 $G/configs/c.1/ ln -s $G/functions/hid.usb1 $G/configs/c.1/ ln -s "$U" "$G/configs/c.1/" +echo "Lesavka Keyboard" >$G/functions/hid.usb0/os_desc/interface +echo "Lesavka Mouse" >$G/functions/hid.usb1/os_desc/interface +echo "Lesavka Mic+Spkr" >$U/os_desc/interface + #────────────────────────────────────────────────── # 4. Bind gadget #────────────────────────────────────────────────── diff --git a/server/Cargo.toml b/server/Cargo.toml index e0b1280..eba7ff2 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_server" -version = "0.4.0" +version = "0.5.0" edition = "2024" [dependencies] diff --git a/server/src/audio.rs b/server/src/audio.rs index ffbd476..741e117 100644 --- a/server/src/audio.rs +++ b/server/src/audio.rs @@ -136,6 +136,30 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { }) } +pub async fn voice( + alsa_dev: &str, +) -> anyhow::Result<(gst::Pipeline, gst_app::AppSrc)> { + gst::init()?; + + let desc = format!( + "appsrc name=src is-live=true format=time do-timestamp=true ! \ + aacparse ! avdec_aac ! audioconvert ! audioresample ! \ + alsasink device=\"{alsa_dev}\"" + ); + let pipeline: gst::Pipeline = gst::parse::launch(&desc)? + .downcast() + .unwrap(); + + let src: gst_app::AppSrc = pipeline + .by_name("src") + .unwrap() + .downcast() + .unwrap(); + + pipeline.set_state(gst::State::Playing)?; + Ok((pipeline, src)) +} + /*────────────────────────── build_pipeline_desc ───────────────────────────*/ fn build_pipeline_desc(dev: &str) -> anyhow::Result { let reg = gst::Registry::get(); diff --git a/server/src/main.rs b/server/src/main.rs index 206b2c0..bb514a8 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -11,6 +11,7 @@ use tokio::{ io::AsyncWriteExt, sync::Mutex, }; +use gstreamer as gst; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tonic::transport::Server; @@ -20,7 +21,7 @@ use tracing_subscriber::{filter::EnvFilter, fmt, prelude::*}; use tracing_appender::non_blocking::WorkerGuard; use lesavka_common::lesavka::{ - ResetUsbRequest, ResetUsbReply, + Empty, ResetUsbReply, relay_server::{Relay, RelayServer}, KeyboardReport, MouseReport, MonitorRequest, VideoPacket, AudioPacket @@ -118,10 +119,11 @@ impl Handler { #[tonic::async_trait] impl Relay for Handler { /* existing streams ─ unchanged, except: no more auto-reset */ - type StreamKeyboardStream = ReceiverStream>; - type StreamMouseStream = ReceiverStream>; - type CaptureVideoStream = Pin> + Send>>; - type CaptureAudioStream = Pin> + Send>>; + type StreamKeyboardStream = ReceiverStream>; + type StreamMouseStream = ReceiverStream>; + type CaptureVideoStream = Pin> + Send>>; + type CaptureAudioStream = Pin> + Send>>; + type StreamMicrophoneStream = ReceiverStream>; async fn stream_keyboard( &self, @@ -165,6 +167,38 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } + async fn stream_microphone( + &self, + req: Request>, + ) -> Result, Status> { + + // Build playback pipeline (AppSrc β†’ alsasink) + let (_pipeline, src) = audio::voice("hw:UAC2Gadget,0") + .await + .map_err(|e| Status::internal(format!("{e:#}")))?; + + // channel just to satisfy the β€œstream Empty” return type + let (tx, rx) = tokio::sync::mpsc::channel(1); + + // forward packets from gRPC to AppSrc + tokio::spawn(async move { + let mut inbound = req.into_inner(); + while let Some(pkt) = inbound.next().await.transpose()? { + let mut buf = gst::Buffer::from_slice(pkt.data); + buf.get_mut().unwrap() + .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + if let Err(e) = src.push_buffer(buf) { + warn!("🎀 AppSrc push failed: {e:?}"); + } + } + // optional: send a single Empty to show EOS + let _ = tx.send(Ok(Empty {})).await; + Result::<(), Status>::Ok(()) + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + async fn capture_video( &self, req: Request, @@ -202,7 +236,7 @@ impl Relay for Handler { /*────────────── USB-reset RPC ───────────*/ async fn reset_usb( &self, - _req: Request, + _req: Request, ) -> Result, Status> { info!("πŸ”΄ explicit ResetUsb() called"); match self.gadget.cycle() {