fix(audio): recover stalled speaker capture

This commit is contained in:
Brad Stein 2026-04-21 12:06:40 -03:00
parent b6cf15767d
commit 6fb1ad527e
6 changed files with 228 additions and 36 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.11.32"
version = "0.11.33"
edition = "2024"
[dependencies]

View File

@ -111,8 +111,8 @@ const LESAVKA_ICON_SEARCH_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/ass
const LAUNCHER_DEFAULT_WIDTH: i32 = 1380;
const LAUNCHER_DEFAULT_HEIGHT: i32 = 860;
const OPERATIONS_RAIL_WIDTH: i32 = 288;
const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 108;
const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 192;
const CAMERA_PREVIEW_VIEWPORT_HEIGHT: i32 = 144;
const CAMERA_PREVIEW_VIEWPORT_WIDTH: i32 = 256;
pub fn build_launcher_view(
app: &gtk::Application,
@ -306,11 +306,10 @@ pub fn build_launcher_view(
&microphone_test_button,
);
let audio_check_detail = gtk::Label::new(Some(
"Monitor Mic listens locally, Replay Last 3s replays the latest captured mic audio, and Play Tone verifies the speaker path.",
));
let audio_check_detail = gtk::Label::new(Some("Idle"));
audio_check_detail.add_css_class("dim-label");
audio_check_detail.set_wrap(true);
audio_check_detail.set_wrap(false);
audio_check_detail.set_ellipsize(pango::EllipsizeMode::End);
audio_check_detail.set_xalign(0.0);
let audio_check_meter = gtk::ProgressBar::new();
audio_check_meter.add_css_class("audio-check-meter");
@ -322,11 +321,15 @@ pub fn build_launcher_view(
preview_panel.set_hexpand(true);
preview_panel.set_vexpand(false);
preview_panel.set_valign(gtk::Align::Fill);
preview_body.set_vexpand(true);
preview_body.set_spacing(6);
preview_body.set_vexpand(false);
preview_body.set_spacing(8);
let testing_row = gtk::Box::new(gtk::Orientation::Horizontal, 8);
testing_row.set_hexpand(true);
testing_row.set_vexpand(false);
testing_row.set_valign(gtk::Align::Start);
let camera_preview = gtk::Picture::new();
camera_preview.set_can_shrink(false);
camera_preview.set_hexpand(true);
camera_preview.set_hexpand(false);
camera_preview.set_vexpand(false);
camera_preview.set_size_request(
CAMERA_PREVIEW_VIEWPORT_WIDTH,
@ -339,40 +342,56 @@ pub fn build_launcher_view(
camera_status.set_wrap(false);
camera_status.set_ellipsize(pango::EllipsizeMode::End);
camera_status.set_xalign(0.0);
camera_status.set_visible(true);
camera_status.set_visible(false);
let camera_preview_shell = gtk::Box::new(gtk::Orientation::Vertical, 0);
camera_preview_shell.set_hexpand(true);
camera_preview_shell.set_hexpand(false);
camera_preview_shell.set_vexpand(false);
camera_preview_shell.set_size_request(-1, CAMERA_PREVIEW_VIEWPORT_HEIGHT);
camera_preview_shell.set_halign(gtk::Align::Center);
camera_preview_shell.set_size_request(
CAMERA_PREVIEW_VIEWPORT_WIDTH,
CAMERA_PREVIEW_VIEWPORT_HEIGHT,
);
let camera_preview_frame = gtk::AspectFrame::new(0.5, 0.5, 16.0 / 9.0, false);
camera_preview_frame.set_hexpand(true);
camera_preview_frame.set_hexpand(false);
camera_preview_frame.set_vexpand(false);
camera_preview_frame.set_size_request(-1, CAMERA_PREVIEW_VIEWPORT_HEIGHT);
camera_preview_frame.set_size_request(
CAMERA_PREVIEW_VIEWPORT_WIDTH,
CAMERA_PREVIEW_VIEWPORT_HEIGHT,
);
camera_preview_frame.set_child(Some(&camera_preview));
camera_preview_shell.append(&camera_preview_frame);
let webcam_group = build_subgroup("Webcam Preview");
webcam_group.set_hexpand(true);
webcam_group.set_vexpand(false);
webcam_group.set_valign(gtk::Align::Start);
webcam_group.append(&camera_preview_shell);
webcam_group.append(&camera_status);
preview_body.append(&webcam_group);
testing_row.append(&webcam_group);
let playback_group = build_subgroup("Mic Playback");
playback_group.set_vexpand(true);
playback_group.set_valign(gtk::Align::Fill);
let playback_body = gtk::Box::new(gtk::Orientation::Vertical, 6);
let playback_row = gtk::Box::new(gtk::Orientation::Horizontal, 8);
playback_row.set_homogeneous(false);
playback_group.set_hexpand(false);
playback_group.set_vexpand(false);
playback_group.set_valign(gtk::Align::Start);
playback_group.set_size_request(148, -1);
let playback_body = gtk::Box::new(gtk::Orientation::Horizontal, 8);
playback_body.set_vexpand(false);
let playback_controls = gtk::Box::new(gtk::Orientation::Vertical, 6);
playback_controls.set_hexpand(true);
let microphone_replay_button = gtk::Button::with_label("Replay Last 3s");
stabilize_button(&microphone_replay_button, 124);
audio_check_meter.set_hexpand(true);
audio_check_meter.set_show_text(true);
audio_check_meter.set_orientation(gtk::Orientation::Vertical);
audio_check_meter.set_inverted(true);
audio_check_meter.set_hexpand(false);
audio_check_meter.set_vexpand(false);
audio_check_meter.set_size_request(20, CAMERA_PREVIEW_VIEWPORT_HEIGHT - 38);
audio_check_meter.set_show_text(false);
audio_check_meter.set_text(Some("Idle"));
playback_row.append(&microphone_replay_button);
playback_row.append(&audio_check_meter);
playback_body.append(&playback_row);
audio_check_detail.set_visible(false);
playback_body.append(&audio_check_detail);
playback_controls.append(&microphone_replay_button);
playback_controls.append(&audio_check_detail);
playback_body.append(&audio_check_meter);
playback_body.append(&playback_controls);
playback_group.append(&playback_body);
preview_body.append(&playback_group);
testing_row.append(&playback_group);
preview_body.append(&testing_row);
staging_row.append(&preview_panel);
let (connection_panel, connection_body) = build_panel("Session");
@ -817,10 +836,14 @@ pub fn install_css(window: &gtk::ApplicationWindow) {
padding: 10px;
}
progressbar.audio-check-meter trough {
min-width: 14px;
min-height: 10px;
border-radius: 999px;
background: rgba(255, 255, 255, 0.08);
}
progressbar.audio-check-meter.vertical trough {
min-height: 96px;
}
progressbar.audio-check-meter progress {
border-radius: 999px;
background: rgba(91, 179, 162, 0.88);

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.11.32"
version = "0.11.33"
edition = "2024"
build = "build.rs"

View File

@ -17,6 +17,6 @@ mod tests {
#[test]
fn banner_includes_version() {
assert_eq!(banner("0.11.32"), "lesavka-common CLI (v0.11.32)");
assert_eq!(banner("0.11.33"), "lesavka-common CLI (v0.11.33)");
}
}

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.11.32"
version = "0.11.33"
edition = "2024"
autobins = false

View File

@ -9,11 +9,14 @@ use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use std::sync::{Arc, Mutex};
use std::sync::{
Arc, Mutex,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use std::time::{Duration, Instant};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::AudioPacket;
@ -96,6 +99,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
// });
let (tx, rx) = tokio::sync::mpsc::channel(8192);
let source_health = Arc::new(AudioSourceHealth::new());
let bus = pipeline.bus().expect("bus");
std::thread::spawn(move || {
@ -114,6 +118,15 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
StateChanged(s) if s.current() == gst::State::Playing => {
debug!("🎶 audio pipeline PLAYING")
}
Element(e) => {
if let Some(structure) = e.structure() {
if structure.name() == "level" {
info!("🔊 source audio level {}", structure);
} else {
debug!("🔎 audio element message: {}", structure);
}
}
}
_ => {}
}
}
@ -124,10 +137,16 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
gst_app::AppSinkCallbacks::builder()
.new_sample({
let tap = tap.clone();
let source_health = source_health.clone();
let tx = tx.clone();
move |s| {
if source_health.is_closed() {
return Err(gst::FlowError::Flushing);
}
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
source_health.mark_sample();
// -------- cliptap (minute dumps) ------------
tap.lock().unwrap().feed(map.as_slice());
@ -166,6 +185,13 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
spawn_audio_source_watchdog(
pipeline.clone(),
source_health,
tx.clone(),
alsa_dev.to_string(),
);
Ok(AudioStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
@ -188,8 +214,10 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
Ok(format!(
concat!(
"alsasrc device=\"{dev}\" do-timestamp=true ! ",
"alsasrc device=\"{dev}\" do-timestamp=true provide-clock=false ",
"use-driver-timestamps=false buffer-time=200000 latency-time=10000 ! ",
"audio/x-raw,format=S16LE,channels=2,rate=48000 ! ",
"level name=source_level interval=1000000000 message=true ! ",
"audioconvert ! audioresample ! {enc} bitrate=192000 ! ",
"aacparse ! ",
"capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! ",
@ -202,6 +230,147 @@ fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
))
}
#[cfg(not(coverage))]
struct AudioSourceHealth {
started_at: Instant,
last_sample_at: Mutex<Instant>,
packets: AtomicU64,
closed: AtomicBool,
}
#[cfg(not(coverage))]
impl AudioSourceHealth {
fn new() -> Self {
let now = Instant::now();
Self {
started_at: now,
last_sample_at: Mutex::new(now),
packets: AtomicU64::new(0),
closed: AtomicBool::new(false),
}
}
fn mark_sample(&self) {
self.packets.fetch_add(1, Ordering::Relaxed);
if let Ok(mut last) = self.last_sample_at.lock() {
*last = Instant::now();
}
}
fn is_closed(&self) -> bool {
self.closed.load(Ordering::Relaxed)
}
fn signal_failure(&self) -> bool {
!self.closed.swap(true, Ordering::Relaxed)
}
fn elapsed(&self) -> Duration {
self.started_at.elapsed()
}
fn idle_for(&self) -> Duration {
self.last_sample_at
.lock()
.map(|last| last.elapsed())
.unwrap_or_else(|_| Duration::from_secs(0))
}
fn packets(&self) -> u64 {
self.packets.load(Ordering::Relaxed)
}
}
#[cfg(not(coverage))]
#[derive(Clone, Copy)]
struct AudioWatchdogPolicy {
startup_grace: Duration,
idle_timeout: Duration,
min_packets_per_second: u64,
}
#[cfg(not(coverage))]
impl AudioWatchdogPolicy {
fn from_env() -> Self {
Self {
startup_grace: env_duration_ms("LESAVKA_AUDIO_SOURCE_GRACE_MS", 3_000),
idle_timeout: env_duration_ms("LESAVKA_AUDIO_SOURCE_IDLE_MS", 1_500),
min_packets_per_second: env_u64("LESAVKA_AUDIO_MIN_PACKETS_PER_SEC", 20),
}
}
}
#[cfg(not(coverage))]
fn env_duration_ms(name: &str, default_ms: u64) -> Duration {
Duration::from_millis(env_u64(name, default_ms))
}
#[cfg(not(coverage))]
fn env_u64(name: &str, default: u64) -> u64 {
std::env::var(name)
.ok()
.and_then(|value| value.parse::<u64>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
/// Watch the remote speaker capture source and fail fast when the USB audio
/// gadget is open but not producing real-time packets.
#[cfg(not(coverage))]
fn spawn_audio_source_watchdog(
pipeline: gst::Pipeline,
health: Arc<AudioSourceHealth>,
tx: tokio::sync::mpsc::Sender<Result<AudioPacket, Status>>,
alsa_dev: String,
) {
let policy = AudioWatchdogPolicy::from_env();
std::thread::spawn(move || {
loop {
std::thread::sleep(Duration::from_millis(250));
if health.is_closed() {
break;
}
let elapsed = health.elapsed();
if elapsed < policy.startup_grace {
continue;
}
let packets = health.packets();
let idle_for = health.idle_for();
let rate = packets as f64 / elapsed.as_secs_f64().max(0.001);
let failure = if packets == 0 {
Some(format!(
"remote speaker capture produced no audio samples after {} ms on {alsa_dev}",
elapsed.as_millis()
))
} else if idle_for >= policy.idle_timeout {
Some(format!(
"remote speaker capture stalled for {} ms on {alsa_dev}",
idle_for.as_millis()
))
} else if (packets / elapsed.as_secs().max(1)) < policy.min_packets_per_second {
Some(format!(
"remote speaker capture cadence is too low on {alsa_dev}: {rate:.1} packets/s, expected at least {} packets/s",
policy.min_packets_per_second
))
} else {
None
};
if let Some(message) = failure {
if health.signal_failure() {
warn!("🔊🛟 {message}; restarting audio capture on next client reconnect");
let _ = pipeline.set_state(gst::State::Null);
let _ = tx.blocking_send(Err(Status::unavailable(message)));
}
break;
}
}
});
}
// ────────────────────── minuteclip helper ───────────────────────────────
pub struct ClipTap {
buf: Vec<u8>,