From 8ebb4a8b9200669d36a0690b96bc869a99b6f36d Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 1 Jul 2025 10:23:51 -0500 Subject: [PATCH] increased mic logging --- client/Cargo.toml | 1 + client/src/app.rs | 43 +++++++++++++------------ client/src/input/inputs.rs | 3 +- client/src/input/microphone.rs | 47 +++++++++++++++++++++++----- client/src/tests/integration_test.rs | 4 +-- scripts/install/server.sh | 4 ++- scripts/manual/audio-clip-fetch.sh | 2 +- scripts/manual/audio-mic-fetch.sh | 14 +++++++++ server/src/main.rs | 7 +++++ 9 files changed, 91 insertions(+), 34 deletions(-) create mode 100644 scripts/manual/audio-mic-fetch.sh diff --git a/client/Cargo.toml b/client/Cargo.toml index 990e335..b63f8a6 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -28,6 +28,7 @@ raw-window-handle = "0.6" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" async-stream = "0.3" +shell-escape = "0.1" [build-dependencies] prost-build = "0.13" diff --git a/client/src/app.rs b/client/src/app.rs index 1eb5154..cb29551 100644 --- a/client/src/app.rs +++ b/client/src/app.rs @@ -14,7 +14,8 @@ use winit::{ }; use lesavka_common::lesavka::{ - relay_client::RelayClient, KeyboardReport, MonitorRequest, MouseReport, VideoPacket, + relay_client::RelayClient, KeyboardReport, + MonitorRequest, MouseReport, VideoPacket, AudioPacket }; use crate::{input::inputs::InputAggregator, @@ -51,7 +52,7 @@ impl LesavkaClientApp { /*────────── persistent gRPC channels ──────────*/ let hid_ep = Channel::from_shared(self.server_addr.clone())? .tcp_nodelay(true) - .concurrency_limit(1) + .concurrency_limit(4) .http2_keep_alive_interval(Duration::from_secs(15)) .connect_lazy(); @@ -256,32 +257,34 @@ impl LesavkaClientApp { async fn mic_loop(ep: Channel, mic: Arc) { loop { let mut cli = RelayClient::new(ep.clone()); - + + // 1. create a Tokio MPSC channel + let (tx, rx) = tokio::sync::mpsc::channel::(256); + let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); + + // 2. spawn a real thread that does the blocking `pull()` let mic_clone = mic.clone(); - let stream = async_stream::stream! { - loop { + std::thread::spawn(move || { + while stop_rx.try_recv().is_err() { if let Some(pkt) = mic_clone.pull() { - yield pkt; - } else { - break; // EOS (should never happen) + trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len()); + let _ = tx.blocking_send(pkt); } } - }; + }); + + // 3. turn `rx` into an async stream for gRPC + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - match cli.stream_microphone(Request::new(stream)).await { + match cli.stream_microphone(Request::new(outbound)).await { Ok(mut resp) => { - // Drain the (mostly empty) response stream - while let Some(res) = resp.get_mut().message().await.transpose() { - if let Err(e) = res { - warn!("🎤 server err: {e}"); - break; - } - } + // Drain and ignore Empty replies + while resp.get_mut().message().await.transpose().is_some() {} } Err(e) => warn!("🎤 connect failed: {e}"), } - - tokio::time::sleep(Duration::from_secs(1)).await; // retry + let _ = stop_tx.send(()); + tokio::time::sleep(Duration::from_secs(1)).await; } - } + } } diff --git a/client/src/input/inputs.rs b/client/src/input/inputs.rs index 7b138bc..f471f43 100644 --- a/client/src/input/inputs.rs +++ b/client/src/input/inputs.rs @@ -7,8 +7,7 @@ use tracing::{debug, info, warn}; use lesavka_common::lesavka::{KeyboardReport, MouseReport}; -use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator, - camera::CameraCapture, microphone::MicrophoneCapture}; +use super::{keyboard::KeyboardAggregator, mouse::MouseAggregator, camera::CameraCapture}; use crate::layout::{Layout, apply as apply_layout}; pub struct InputAggregator { diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index 57ba3b7..936dcd5 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -7,7 +7,9 @@ use gstreamer as gst; use gstreamer_app as gst_app; use gst::prelude::*; use lesavka_common::lesavka::AudioPacket; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, warn, trace}; +use shell_escape::unix::escape; +use std::sync::atomic::{AtomicU64, Ordering}; pub struct MicrophoneCapture { pipeline: gst::Pipeline, @@ -19,15 +21,24 @@ impl MicrophoneCapture { gst::init().ok(); // idempotent /* pulsesrc (default mic) → AAC/ADTS → appsink -------------------*/ - let desc = concat!( - "pulsesrc do-timestamp=true ! ", - "audio/x-raw,format=S16LE,channels=2,rate=48000 ! ", - "audioconvert ! audioresample ! avenc_aac bitrate=128000 ! ", - "aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2 ! ", - "appsink name=asink emit-signals=true max-buffers=50 drop=true" + // Optional override: LESAVKA_MIC_SOURCE= + let device_arg = match std::env::var("LESAVKA_MIC_SOURCE") { + Ok(s) if !s.is_empty() => { + let full = Self::pulse_source_by_substr(&s).unwrap_or(s); + format!("device={}", escape(full.into())) + } + _ => String::new(), + }; + debug!("🎤 device: {device_arg}"); + let desc = format!( + "pulsesrc {device_arg} do-timestamp=true ! \ + audio/x-raw,format=S16LE,channels=2,rate=48000 ! \ + audioconvert ! audioresample ! avenc_aac bitrate=128000 ! \ + aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2 ! \ + appsink name=asink emit-signals=true max-buffers=50 drop=true" ); - let pipeline: gst::Pipeline = gst::parse::launch(desc)? + let pipeline: gst::Pipeline = gst::parse::launch(&desc)? .downcast() .expect("pipeline"); let sink: gst_app::AppSink = pipeline @@ -69,9 +80,29 @@ impl MicrophoneCapture { let buf = sample.buffer().unwrap(); let map = buf.map_readable().unwrap(); let pts = buf.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000; + static CNT: AtomicU64 = AtomicU64::new(0); + let n = CNT.fetch_add(1, Ordering::Relaxed); + if n < 10 || n % 300 == 0 { + trace!("🎤⇧ cli pkt#{n} {} bytes", map.len()); + } Some(AudioPacket { id: 0, pts, data: map.as_slice().to_vec() }) + } Err(_) => None, } } + + fn pulse_source_by_substr(fragment: &str) -> Option { + use std::process::Command; + let out = Command::new("pactl").args(["list", "short", "sources"]) + .output().ok()?; + let list = String::from_utf8_lossy(&out.stdout); + list.lines() + .find_map(|ln| { + let mut cols = ln.split_whitespace(); + let _id = cols.next()?; + let name = cols.next()?; // column #1 + if name.contains(fragment) { Some(name.to_owned()) } else { None } + }) + } } diff --git a/client/src/tests/integration_test.rs b/client/src/tests/integration_test.rs index f351460..7b7a3f1 100644 --- a/client/src/tests/integration_test.rs +++ b/client/src/tests/integration_test.rs @@ -9,7 +9,7 @@ mod tests { fn test_keycode_mapping() { assert_eq!(keycode_to_usage(Key::KEY_A), Some(0x04)); assert_eq!(keycode_to_usage(Key::KEY_Z), Some(0x1D)); - assert_eq!(keycode_to_usage(Key::KEY_LEFTCTRL), Some(0)); // this is "handled" by is_modifier + assert_eq!(keycode_to_usage(Key::KEY_LEFTCTRL), Some(0)); assert!(is_modifier(Key::KEY_LEFTCTRL).is_some()); } -} \ No newline at end of file +} diff --git a/scripts/install/server.sh b/scripts/install/server.sh index ae4af81..3bef8ac 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -20,7 +20,8 @@ sudo pacman -Syq --needed --noconfirm git \ gst-plugins-bad-libs \ gst-plugins-ugly \ gst-libav \ - tcpdump + tcpdump \ + lsof if ! command -v yay >/dev/null 2>&1; then echo "==> 1b. installing yay from AUR ..." sudo -u "$ORIG_USER" bash -c ' @@ -123,6 +124,7 @@ ExecStart=/usr/local/bin/lesavka-server Restart=always Environment=RUST_LOG=lesavka_server=trace,lesavka_server::audio=trace,lesavka_server::video=trace,lesavka_server::gadget=trace Environment=RUST_BACKTRACE=1 +Environment=GST_DEBUG="*:2,alsasink:6,alsasrc:6" Restart=always RestartSec=5 StandardError=append:/tmp/lesavka-server.stderr diff --git a/scripts/manual/audio-clip-fetch.sh b/scripts/manual/audio-clip-fetch.sh index 8d4ae24..f8ec154 100644 --- a/scripts/manual/audio-clip-fetch.sh +++ b/scripts/manual/audio-clip-fetch.sh @@ -4,7 +4,7 @@ # Pull & play the most recent 1 s AAC clip from lesavka‑server PI_HOST="nikto@192.168.42.253" # adjust REMOTE_DIR="/tmp" -DEST="$(mktemp -u).aac" +DEST="$(mktemp -u).wav" scp "${PI_HOST}:${REMOTE_DIR}/ear-*.aac" "$DEST" 2>/dev/null \ || { echo "❌ no clip files yet"; exit 1; } diff --git a/scripts/manual/audio-mic-fetch.sh b/scripts/manual/audio-mic-fetch.sh new file mode 100644 index 0000000..8d4ae24 --- /dev/null +++ b/scripts/manual/audio-mic-fetch.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +# scripts/manual/audio-clip-fetch.sh + +# Pull & play the most recent 1 s AAC clip from lesavka‑server +PI_HOST="nikto@192.168.42.253" # adjust +REMOTE_DIR="/tmp" +DEST="$(mktemp -u).aac" + +scp "${PI_HOST}:${REMOTE_DIR}/ear-*.aac" "$DEST" 2>/dev/null \ + || { echo "❌ no clip files yet"; exit 1; } + +LATEST=$(ls -1t ear-*.aac | head -n1) +echo "🎧 playing ${LATEST} ..." +gst-play-1.0 --quiet "${LATEST}" diff --git a/server/src/main.rs b/server/src/main.rs index bb514a8..cd3f046 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -184,6 +184,13 @@ impl Relay for Handler { tokio::spawn(async move { let mut inbound = req.into_inner(); while let Some(pkt) = inbound.next().await.transpose()? { + static CNT: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if n < 10 || n % 300 == 0 { + trace!("🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); + } + let mut buf = gst::Buffer::from_slice(pkt.data); buf.get_mut().unwrap() .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));