570 lines
19 KiB
Rust
570 lines
19 KiB
Rust
// server/src/audio.rs
|
||
#![cfg_attr(coverage, allow(dead_code, unused_imports, unused_variables))]
|
||
#![forbid(unsafe_code)]
|
||
|
||
use anyhow::{Context, anyhow};
|
||
use futures_util::Stream;
|
||
use gst::ElementFactory;
|
||
use gst::MessageView::*;
|
||
use gst::prelude::*;
|
||
use gstreamer as gst;
|
||
use gstreamer_app as gst_app;
|
||
use std::sync::{
|
||
Arc, Mutex,
|
||
atomic::{AtomicBool, AtomicU64, Ordering},
|
||
};
|
||
use std::time::{Duration, Instant};
|
||
use tokio_stream::wrappers::ReceiverStream;
|
||
use tonic::Status;
|
||
use tracing::{debug, error, info, warn};
|
||
|
||
use lesavka_common::lesavka::AudioPacket;
|
||
|
||
/// “Speaker” stream coming **from** the remote host (UAC2‑gadget playback
|
||
/// endpoint) **towards** the client.
|
||
pub struct AudioStream {
|
||
_pipeline: gst::Pipeline,
|
||
inner: ReceiverStream<Result<AudioPacket, Status>>,
|
||
}
|
||
|
||
impl Stream for AudioStream {
|
||
type Item = Result<AudioPacket, Status>;
|
||
fn poll_next(
|
||
mut self: std::pin::Pin<&mut Self>,
|
||
cx: &mut std::task::Context<'_>,
|
||
) -> std::task::Poll<Option<Self::Item>> {
|
||
std::pin::Pin::new(&mut self.inner).poll_next(cx)
|
||
}
|
||
}
|
||
|
||
impl Drop for AudioStream {
|
||
fn drop(&mut self) {
|
||
let _ = self._pipeline.set_state(gst::State::Null);
|
||
}
|
||
}
|
||
|
||
/*───────────────────────────────────────────────────────────────────────────*/
|
||
/* ear() - capture from ALSA (“speaker”) and push AAC AUs via gRPC */
|
||
/*───────────────────────────────────────────────────────────────────────────*/
|
||
|
||
#[cfg(coverage)]
|
||
pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
|
||
let _ = id;
|
||
if alsa_dev.contains('"') {
|
||
return Err(anyhow!("invalid ALSA device string"));
|
||
}
|
||
if alsa_dev.contains("UAC2Gadget") || alsa_dev.contains("DefinitelyMissing") {
|
||
return Err(anyhow!("ALSA source not available"));
|
||
}
|
||
|
||
let _ = gst::init();
|
||
let pipeline = gst::Pipeline::new();
|
||
let (_tx, rx) = tokio::sync::mpsc::channel(1);
|
||
Ok(AudioStream {
|
||
_pipeline: pipeline,
|
||
inner: ReceiverStream::new(rx),
|
||
})
|
||
}
|
||
|
||
#[cfg(not(coverage))]
|
||
pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
|
||
// NB: one *logical* speaker → id==0. A 2nd logical stream could be
|
||
// added later (for multi‑channel) without changing the client.
|
||
gst::init().context("gst init")?;
|
||
|
||
/*──────────── pipeline description ────────────
|
||
*
|
||
* ALSA (UAC2 gadget) AAC+ADTS AppSink
|
||
* ┌───────────┐ raw 48 kHz ┌─────────┐ AU/ADTS ┌──────────┐
|
||
* │ alsasrc │────────────► voaacenc │────────► appsink │
|
||
* └───────────┘ └─────────┘ └──────────┘
|
||
*/
|
||
let desc = build_pipeline_desc(alsa_dev)?;
|
||
|
||
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline");
|
||
|
||
let sink: gst_app::AppSink = pipeline
|
||
.by_name("asink")
|
||
.expect("asink")
|
||
.downcast()
|
||
.expect("appsink");
|
||
|
||
let tap = Arc::new(Mutex::new(ClipTap::new(
|
||
"🎧 - ear",
|
||
Duration::from_secs(60),
|
||
)));
|
||
// sink.connect("underrun", false, |_| {
|
||
// tracing::warn!("⚠️ USB playback underrun – host muted or not reading");
|
||
// None
|
||
// });
|
||
|
||
let (tx, rx) = tokio::sync::mpsc::channel(8192);
|
||
let source_health = Arc::new(AudioSourceHealth::new());
|
||
|
||
let bus = pipeline.bus().expect("bus");
|
||
std::thread::spawn(move || {
|
||
for msg in bus.iter_timed(gst::ClockTime::NONE) {
|
||
match msg.view() {
|
||
Error(e) => error!(
|
||
"💥 audio pipeline: {} ({})",
|
||
e.error(),
|
||
e.debug().unwrap_or_default()
|
||
),
|
||
Warning(w) => warn!(
|
||
"⚠️ audio pipeline: {} ({})",
|
||
w.error(),
|
||
w.debug().unwrap_or_default()
|
||
),
|
||
StateChanged(s) if s.current() == gst::State::Playing => {
|
||
debug!("🎶 audio pipeline PLAYING")
|
||
}
|
||
Element(e) => {
|
||
if let Some(structure) = e.structure() {
|
||
if structure.name() == "level" {
|
||
info!("🔊 source audio level {}", structure);
|
||
} else {
|
||
debug!("🔎 audio element message: {}", structure);
|
||
}
|
||
}
|
||
}
|
||
_ => {}
|
||
}
|
||
}
|
||
});
|
||
|
||
/*──────────── callbacks ────────────*/
|
||
sink.set_callbacks(
|
||
gst_app::AppSinkCallbacks::builder()
|
||
.new_sample({
|
||
let tap = tap.clone();
|
||
let source_health = source_health.clone();
|
||
let tx = tx.clone();
|
||
move |s| {
|
||
if source_health.is_closed() {
|
||
return Err(gst::FlowError::Flushing);
|
||
}
|
||
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
|
||
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
|
||
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
|
||
source_health.mark_sample();
|
||
|
||
// -------- clip‑tap (minute dumps) ------------
|
||
tap.lock().unwrap().feed(map.as_slice());
|
||
|
||
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
|
||
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||
if n < 10 || n % 300 == 0 {
|
||
debug!("🎧 ear #{n}: {} bytes", map.len());
|
||
}
|
||
|
||
let pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
|
||
|
||
// push non‑blocking; drop oldest on overflow
|
||
if tx
|
||
.try_send(Ok(AudioPacket {
|
||
id,
|
||
pts: pts_us,
|
||
data: map.as_slice().to_vec(),
|
||
}))
|
||
.is_err()
|
||
{
|
||
static DROPS: std::sync::atomic::AtomicU64 =
|
||
std::sync::atomic::AtomicU64::new(0);
|
||
let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||
if d % 300 == 0 {
|
||
warn!("🎧💔 dropped {d} audio AUs (client too slow)");
|
||
}
|
||
}
|
||
Ok(gst::FlowSuccess::Ok)
|
||
}
|
||
})
|
||
.build(),
|
||
);
|
||
|
||
pipeline
|
||
.set_state(gst::State::Playing)
|
||
.context("starting audio pipeline")?;
|
||
|
||
spawn_audio_source_watchdog(
|
||
pipeline.clone(),
|
||
source_health,
|
||
tx.clone(),
|
||
alsa_dev.to_string(),
|
||
);
|
||
|
||
Ok(AudioStream {
|
||
_pipeline: pipeline,
|
||
inner: ReceiverStream::new(rx),
|
||
})
|
||
}
|
||
|
||
/*────────────────────────── build_pipeline_desc ───────────────────────────*/
|
||
#[cfg(not(coverage))]
|
||
fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
|
||
let reg = gst::Registry::get();
|
||
|
||
// first available encoder
|
||
let enc = ["fdkaacenc", "voaacenc", "avenc_aac"]
|
||
.into_iter()
|
||
.find(|&e| {
|
||
reg.find_plugin(e).is_some()
|
||
|| reg.find_feature(e, ElementFactory::static_type()).is_some()
|
||
})
|
||
.ok_or_else(|| anyhow!("no AAC encoder plugin available"))?;
|
||
|
||
Ok(format!(
|
||
concat!(
|
||
"alsasrc device=\"{dev}\" do-timestamp=true provide-clock=false ",
|
||
"use-driver-timestamps=false buffer-time=200000 latency-time=10000 ! ",
|
||
"audio/x-raw,format=S16LE,channels=2,rate=48000 ! ",
|
||
"level name=source_level interval=1000000000 message=true ! ",
|
||
"audioconvert ! audioresample ! {enc} bitrate=192000 ! ",
|
||
"aacparse ! ",
|
||
"capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! ",
|
||
"tee name=t ",
|
||
"t. ! queue ! appsink name=asink emit-signals=true ",
|
||
"t. ! queue ! appsink name=debugtap emit-signals=true max-buffers=500 drop=true"
|
||
),
|
||
dev = dev,
|
||
enc = enc
|
||
))
|
||
}
|
||
|
||
#[cfg(not(coverage))]
|
||
struct AudioSourceHealth {
|
||
started_at: Instant,
|
||
last_sample_at: Mutex<Instant>,
|
||
packets: AtomicU64,
|
||
closed: AtomicBool,
|
||
}
|
||
|
||
#[cfg(not(coverage))]
|
||
impl AudioSourceHealth {
|
||
fn new() -> Self {
|
||
let now = Instant::now();
|
||
Self {
|
||
started_at: now,
|
||
last_sample_at: Mutex::new(now),
|
||
packets: AtomicU64::new(0),
|
||
closed: AtomicBool::new(false),
|
||
}
|
||
}
|
||
|
||
fn mark_sample(&self) {
|
||
self.packets.fetch_add(1, Ordering::Relaxed);
|
||
if let Ok(mut last) = self.last_sample_at.lock() {
|
||
*last = Instant::now();
|
||
}
|
||
}
|
||
|
||
fn is_closed(&self) -> bool {
|
||
self.closed.load(Ordering::Relaxed)
|
||
}
|
||
|
||
fn signal_failure(&self) -> bool {
|
||
!self.closed.swap(true, Ordering::Relaxed)
|
||
}
|
||
|
||
fn elapsed(&self) -> Duration {
|
||
self.started_at.elapsed()
|
||
}
|
||
|
||
fn idle_for(&self) -> Duration {
|
||
self.last_sample_at
|
||
.lock()
|
||
.map(|last| last.elapsed())
|
||
.unwrap_or_else(|_| Duration::from_secs(0))
|
||
}
|
||
|
||
fn packets(&self) -> u64 {
|
||
self.packets.load(Ordering::Relaxed)
|
||
}
|
||
}
|
||
|
||
#[cfg(not(coverage))]
|
||
#[derive(Clone, Copy)]
|
||
struct AudioWatchdogPolicy {
|
||
startup_grace: Duration,
|
||
idle_timeout: Duration,
|
||
min_packets_per_second: u64,
|
||
}
|
||
|
||
#[cfg(not(coverage))]
|
||
impl AudioWatchdogPolicy {
|
||
fn from_env() -> Self {
|
||
Self {
|
||
startup_grace: env_duration_ms("LESAVKA_AUDIO_SOURCE_GRACE_MS", 3_000),
|
||
idle_timeout: env_duration_ms("LESAVKA_AUDIO_SOURCE_IDLE_MS", 1_500),
|
||
min_packets_per_second: env_u64("LESAVKA_AUDIO_MIN_PACKETS_PER_SEC", 20),
|
||
}
|
||
}
|
||
}
|
||
|
||
#[cfg(not(coverage))]
|
||
fn env_duration_ms(name: &str, default_ms: u64) -> Duration {
|
||
Duration::from_millis(env_u64(name, default_ms))
|
||
}
|
||
|
||
#[cfg(not(coverage))]
|
||
fn env_u64(name: &str, default: u64) -> u64 {
|
||
std::env::var(name)
|
||
.ok()
|
||
.and_then(|value| value.parse::<u64>().ok())
|
||
.filter(|value| *value > 0)
|
||
.unwrap_or(default)
|
||
}
|
||
|
||
/// Watch the remote speaker capture source and fail fast when the USB audio
|
||
/// gadget is open but not producing real-time packets.
|
||
#[cfg(not(coverage))]
|
||
fn spawn_audio_source_watchdog(
|
||
pipeline: gst::Pipeline,
|
||
health: Arc<AudioSourceHealth>,
|
||
tx: tokio::sync::mpsc::Sender<Result<AudioPacket, Status>>,
|
||
alsa_dev: String,
|
||
) {
|
||
let policy = AudioWatchdogPolicy::from_env();
|
||
std::thread::spawn(move || {
|
||
loop {
|
||
std::thread::sleep(Duration::from_millis(250));
|
||
if health.is_closed() {
|
||
break;
|
||
}
|
||
|
||
let elapsed = health.elapsed();
|
||
if elapsed < policy.startup_grace {
|
||
continue;
|
||
}
|
||
|
||
let packets = health.packets();
|
||
let idle_for = health.idle_for();
|
||
let rate = packets as f64 / elapsed.as_secs_f64().max(0.001);
|
||
|
||
let failure = if packets == 0 {
|
||
Some(format!(
|
||
"remote speaker capture produced no audio samples after {} ms on {alsa_dev}",
|
||
elapsed.as_millis()
|
||
))
|
||
} else if idle_for >= policy.idle_timeout {
|
||
Some(format!(
|
||
"remote speaker capture stalled for {} ms on {alsa_dev}",
|
||
idle_for.as_millis()
|
||
))
|
||
} else if (packets / elapsed.as_secs().max(1)) < policy.min_packets_per_second {
|
||
Some(format!(
|
||
"remote speaker capture cadence is too low on {alsa_dev}: {rate:.1} packets/s, expected at least {} packets/s",
|
||
policy.min_packets_per_second
|
||
))
|
||
} else {
|
||
None
|
||
};
|
||
|
||
if let Some(message) = failure {
|
||
if health.signal_failure() {
|
||
warn!("🔊🛟 {message}; restarting audio capture on next client reconnect");
|
||
let _ = pipeline.set_state(gst::State::Null);
|
||
let _ = tx.blocking_send(Err(Status::unavailable(message)));
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
// ────────────────────── minute‑clip helper ───────────────────────────────
|
||
pub struct ClipTap {
|
||
buf: Vec<u8>,
|
||
tag: &'static str,
|
||
next_dump: Instant,
|
||
period: Duration,
|
||
}
|
||
|
||
impl ClipTap {
|
||
pub fn new(tag: &'static str, period: Duration) -> Self {
|
||
Self {
|
||
buf: Vec::with_capacity(260_000),
|
||
tag,
|
||
next_dump: Instant::now() + period,
|
||
period,
|
||
}
|
||
}
|
||
pub fn feed(&mut self, bytes: &[u8]) {
|
||
self.buf.extend_from_slice(bytes);
|
||
if self.buf.len() > 256_000 {
|
||
self.buf.drain(..self.buf.len() - 256_000);
|
||
}
|
||
if Instant::now() >= self.next_dump {
|
||
self.flush();
|
||
self.next_dump += self.period;
|
||
}
|
||
}
|
||
pub fn flush(&mut self) {
|
||
if self.buf.is_empty() {
|
||
return;
|
||
}
|
||
let ts = chrono::Local::now().format("%Y%m%d-%H%M%S");
|
||
let path = format!("/tmp/{}-{}.aac", self.tag, ts);
|
||
let _ = std::fs::write(&path, &self.buf);
|
||
self.buf.clear();
|
||
}
|
||
}
|
||
impl Drop for ClipTap {
|
||
fn drop(&mut self) {
|
||
self.flush()
|
||
}
|
||
}
|
||
|
||
// ────────────────────── microphone sink ────────────────────────────────
|
||
pub struct Voice {
|
||
appsrc: gst_app::AppSrc,
|
||
_pipe: gst::Pipeline, // keep pipeline alive
|
||
tap: ClipTap,
|
||
}
|
||
|
||
impl Voice {
|
||
#[cfg(coverage)]
|
||
pub async fn new(_alsa_dev: &str) -> anyhow::Result<Self> {
|
||
gst::init().context("gst init")?;
|
||
|
||
let pipeline = gst::Pipeline::new();
|
||
let appsrc = gst::ElementFactory::make("appsrc")
|
||
.build()
|
||
.context("make appsrc")?
|
||
.downcast::<gst_app::AppSrc>()
|
||
.expect("appsrc");
|
||
appsrc.set_format(gst::Format::Time);
|
||
appsrc.set_is_live(true);
|
||
|
||
let sink = gst::ElementFactory::make("fakesink")
|
||
.build()
|
||
.context("make fakesink")?;
|
||
pipeline.add_many(&[appsrc.upcast_ref(), &sink])?;
|
||
gst::Element::link_many(&[appsrc.upcast_ref(), &sink])?;
|
||
pipeline.set_state(gst::State::Playing)?;
|
||
|
||
Ok(Self {
|
||
appsrc,
|
||
_pipe: pipeline,
|
||
tap: ClipTap::new("voice", Duration::from_secs(60)),
|
||
})
|
||
}
|
||
|
||
#[cfg(not(coverage))]
|
||
pub async fn new(alsa_dev: &str) -> anyhow::Result<Self> {
|
||
use gst::prelude::*;
|
||
|
||
gst::init().context("gst init")?;
|
||
|
||
// pipeline
|
||
let pipeline = gst::Pipeline::new();
|
||
|
||
// elements
|
||
let appsrc = gst::ElementFactory::make("appsrc")
|
||
.build()
|
||
.context("make appsrc")?
|
||
.downcast::<gst_app::AppSrc>()
|
||
.unwrap();
|
||
|
||
// dedicated AppSrc helpers exist and avoid the needless `?`
|
||
appsrc.set_format(gst::Format::Time);
|
||
appsrc.set_is_live(true);
|
||
|
||
let decodebin = gst::ElementFactory::make("decodebin")
|
||
.build()
|
||
.context("make decodebin")?;
|
||
let convert = gst::ElementFactory::make("audioconvert")
|
||
.build()
|
||
.context("make audioconvert")?;
|
||
let resample = gst::ElementFactory::make("audioresample")
|
||
.build()
|
||
.context("make audioresample")?;
|
||
let caps = gst::Caps::builder("audio/x-raw")
|
||
.field("format", "S16LE")
|
||
.field("channels", 2i32)
|
||
.field("rate", 48_000i32)
|
||
.build();
|
||
let capsfilter = gst::ElementFactory::make("capsfilter")
|
||
.property("caps", &caps)
|
||
.build()
|
||
.context("make capsfilter")?;
|
||
let alsa_sink = gst::ElementFactory::make("alsasink")
|
||
.build()
|
||
.context("make alsasink")?;
|
||
|
||
alsa_sink.set_property("device", &alsa_dev);
|
||
alsa_sink.set_property("sync", false);
|
||
alsa_sink.set_property("async", false);
|
||
alsa_sink.set_property("enable-last-sample", false);
|
||
|
||
pipeline.add_many(&[
|
||
appsrc.upcast_ref(),
|
||
&decodebin,
|
||
&convert,
|
||
&resample,
|
||
&capsfilter,
|
||
&alsa_sink,
|
||
])?;
|
||
appsrc.link(&decodebin)?;
|
||
gst::Element::link_many(&[&convert, &resample, &capsfilter, &alsa_sink])?;
|
||
|
||
/*------------ decodebin autolink ----------------*/
|
||
let convert_sink = convert
|
||
.static_pad("sink")
|
||
.context("audioconvert sink pad")?;
|
||
decodebin.connect_pad_added(move |_db, pad| {
|
||
if convert_sink.is_linked() {
|
||
return;
|
||
}
|
||
let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None));
|
||
let is_audio = caps
|
||
.structure(0)
|
||
.map(|s| s.name().starts_with("audio/"))
|
||
.unwrap_or(false);
|
||
if !is_audio {
|
||
return;
|
||
}
|
||
let _ = pad.link(&convert_sink);
|
||
});
|
||
|
||
// underrun ≠ error – just show a warning
|
||
// let _id = alsa_sink.connect("underrun", false, |_| {
|
||
// tracing::warn!("⚠️ USB playback underrun – host muted/not reading");
|
||
// None
|
||
// });
|
||
|
||
pipeline.set_state(gst::State::Playing)?;
|
||
|
||
Ok(Self {
|
||
appsrc,
|
||
_pipe: pipeline,
|
||
tap: ClipTap::new("voice", Duration::from_secs(60)),
|
||
})
|
||
}
|
||
|
||
pub fn push(&mut self, pkt: &AudioPacket) {
|
||
self.tap.feed(&pkt.data);
|
||
|
||
let mut buf = gst::Buffer::from_slice(pkt.data.clone());
|
||
buf.get_mut()
|
||
.unwrap()
|
||
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
|
||
|
||
let _ = self.appsrc.push_buffer(buf);
|
||
}
|
||
pub fn finish(&mut self) {
|
||
self.tap.flush();
|
||
let _ = self.appsrc.end_of_stream();
|
||
}
|
||
}
|
||
|
||
#[cfg(all(test, coverage))]
|
||
mod tests {
|
||
use super::Voice;
|
||
|
||
#[tokio::test]
|
||
async fn coverage_voice_constructor_starts_stub_pipeline() {
|
||
let mut voice = Voice::new("coverage-audio").await.expect("voice");
|
||
voice.finish();
|
||
}
|
||
}
|