lesavka/server/src/audio.rs

398 lines
14 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// server/src/audio.rs
#![cfg_attr(coverage, allow(dead_code, unused_imports, unused_variables))]
#![forbid(unsafe_code)]
use anyhow::{Context, anyhow};
use futures_util::Stream;
use gst::ElementFactory;
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
use tracing::{debug, error, warn};
use lesavka_common::lesavka::AudioPacket;
/// “Speaker” stream coming **from** the remote host (UAC2gadget playback
/// endpoint) **towards** the client.
pub struct AudioStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<AudioPacket, Status>>,
}
impl Stream for AudioStream {
type Item = Result<AudioPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.inner).poll_next(cx)
}
}
impl Drop for AudioStream {
fn drop(&mut self) {
let _ = self._pipeline.set_state(gst::State::Null);
}
}
/*───────────────────────────────────────────────────────────────────────────*/
/* ear() - capture from ALSA (“speaker”) and push AAC AUs via gRPC */
/*───────────────────────────────────────────────────────────────────────────*/
#[cfg(coverage)]
pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
let _ = id;
if alsa_dev.contains('"') {
return Err(anyhow!("invalid ALSA device string"));
}
if alsa_dev.contains("UAC2Gadget") || alsa_dev.contains("DefinitelyMissing") {
return Err(anyhow!("ALSA source not available"));
}
let _ = gst::init();
let pipeline = gst::Pipeline::new();
let (_tx, rx) = tokio::sync::mpsc::channel(1);
Ok(AudioStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
})
}
#[cfg(not(coverage))]
pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
// NB: one *logical* speaker → id==0. A 2nd logical stream could be
// added later (for multichannel) without changing the client.
gst::init().context("gst init")?;
/*──────────── pipeline description ────────────
*
* ALSA (UAC2 gadget) AAC+ADTS AppSink
* ┌───────────┐ raw 48kHz ┌─────────┐ AU/ADTS ┌──────────┐
* │ alsasrc │────────────► voaacenc │────────► appsink │
* └───────────┘ └─────────┘ └──────────┘
*/
let desc = build_pipeline_desc(alsa_dev)?;
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline");
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.expect("asink")
.downcast()
.expect("appsink");
let tap = Arc::new(Mutex::new(ClipTap::new(
"🎧 - ear",
Duration::from_secs(60),
)));
// sink.connect("underrun", false, |_| {
// tracing::warn!("⚠️ USB playback underrun host muted or not reading");
// None
// });
let (tx, rx) = tokio::sync::mpsc::channel(8192);
let bus = pipeline.bus().expect("bus");
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!(
"💥 audio pipeline: {} ({})",
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ audio pipeline: {} ({})",
w.error(),
w.debug().unwrap_or_default()
),
StateChanged(s) if s.current() == gst::State::Playing => {
debug!("🎶 audio pipeline PLAYING")
}
_ => {}
}
}
});
/*──────────── callbacks ────────────*/
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample({
let tap = tap.clone();
move |s| {
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
// -------- cliptap (minute dumps) ------------
tap.lock().unwrap().feed(map.as_slice());
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
debug!("🎧 ear #{n}: {}bytes", map.len());
}
let pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
// push nonblocking; drop oldest on overflow
if tx
.try_send(Ok(AudioPacket {
id,
pts: pts_us,
data: map.as_slice().to_vec(),
}))
.is_err()
{
static DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if d % 300 == 0 {
warn!("🎧💔 dropped {d} audio AUs (client too slow)");
}
}
Ok(gst::FlowSuccess::Ok)
}
})
.build(),
);
pipeline
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
Ok(AudioStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
})
}
/*────────────────────────── build_pipeline_desc ───────────────────────────*/
#[cfg(not(coverage))]
fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
let reg = gst::Registry::get();
// first available encoder
let enc = ["fdkaacenc", "voaacenc", "avenc_aac"]
.into_iter()
.find(|&e| {
reg.find_plugin(e).is_some()
|| reg.find_feature(e, ElementFactory::static_type()).is_some()
})
.ok_or_else(|| anyhow!("no AAC encoder plugin available"))?;
Ok(format!(
concat!(
"alsasrc device=\"{dev}\" do-timestamp=true ! ",
"audio/x-raw,format=S16LE,channels=2,rate=48000 ! ",
"audioconvert ! audioresample ! {enc} bitrate=192000 ! ",
"aacparse ! ",
"capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! ",
"tee name=t ",
"t. ! queue ! appsink name=asink emit-signals=true ",
"t. ! queue ! appsink name=debugtap emit-signals=true max-buffers=500 drop=true"
),
dev = dev,
enc = enc
))
}
// ────────────────────── minuteclip helper ───────────────────────────────
pub struct ClipTap {
buf: Vec<u8>,
tag: &'static str,
next_dump: Instant,
period: Duration,
}
impl ClipTap {
pub fn new(tag: &'static str, period: Duration) -> Self {
Self {
buf: Vec::with_capacity(260_000),
tag,
next_dump: Instant::now() + period,
period,
}
}
pub fn feed(&mut self, bytes: &[u8]) {
self.buf.extend_from_slice(bytes);
if self.buf.len() > 256_000 {
self.buf.drain(..self.buf.len() - 256_000);
}
if Instant::now() >= self.next_dump {
self.flush();
self.next_dump += self.period;
}
}
pub fn flush(&mut self) {
if self.buf.is_empty() {
return;
}
let ts = chrono::Local::now().format("%Y%m%d-%H%M%S");
let path = format!("/tmp/{}-{}.aac", self.tag, ts);
let _ = std::fs::write(&path, &self.buf);
self.buf.clear();
}
}
impl Drop for ClipTap {
fn drop(&mut self) {
self.flush()
}
}
// ────────────────────── microphone sink ────────────────────────────────
pub struct Voice {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline, // keep pipeline alive
tap: ClipTap,
}
impl Voice {
#[cfg(coverage)]
pub async fn new(_alsa_dev: &str) -> anyhow::Result<Self> {
gst::init().context("gst init")?;
let pipeline = gst::Pipeline::new();
let appsrc = gst::ElementFactory::make("appsrc")
.build()
.context("make appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
let sink = gst::ElementFactory::make("fakesink")
.build()
.context("make fakesink")?;
pipeline.add_many(&[appsrc.upcast_ref(), &sink])?;
gst::Element::link_many(&[appsrc.upcast_ref(), &sink])?;
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
appsrc,
_pipe: pipeline,
tap: ClipTap::new("voice", Duration::from_secs(60)),
})
}
#[cfg(not(coverage))]
pub async fn new(alsa_dev: &str) -> anyhow::Result<Self> {
use gst::prelude::*;
gst::init().context("gst init")?;
// pipeline
let pipeline = gst::Pipeline::new();
// elements
let appsrc = gst::ElementFactory::make("appsrc")
.build()
.context("make appsrc")?
.downcast::<gst_app::AppSrc>()
.unwrap();
// dedicated AppSrc helpers exist and avoid the needless `?`
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
let decodebin = gst::ElementFactory::make("decodebin")
.build()
.context("make decodebin")?;
let convert = gst::ElementFactory::make("audioconvert")
.build()
.context("make audioconvert")?;
let resample = gst::ElementFactory::make("audioresample")
.build()
.context("make audioresample")?;
let caps = gst::Caps::builder("audio/x-raw")
.field("format", "S16LE")
.field("channels", 2i32)
.field("rate", 48_000i32)
.build();
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &caps)
.build()
.context("make capsfilter")?;
let alsa_sink = gst::ElementFactory::make("alsasink")
.build()
.context("make alsasink")?;
alsa_sink.set_property("device", &alsa_dev);
pipeline.add_many(&[
appsrc.upcast_ref(),
&decodebin,
&convert,
&resample,
&capsfilter,
&alsa_sink,
])?;
appsrc.link(&decodebin)?;
gst::Element::link_many(&[&convert, &resample, &capsfilter, &alsa_sink])?;
/*------------ decodebin autolink ----------------*/
let convert_sink = convert
.static_pad("sink")
.context("audioconvert sink pad")?;
decodebin.connect_pad_added(move |_db, pad| {
if convert_sink.is_linked() {
return;
}
let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None));
let is_audio = caps
.structure(0)
.map(|s| s.name().starts_with("audio/"))
.unwrap_or(false);
if !is_audio {
return;
}
let _ = pad.link(&convert_sink);
});
// underrun ≠ error just show a warning
// let _id = alsa_sink.connect("underrun", false, |_| {
// tracing::warn!("⚠️ USB playback underrun host muted/not reading");
// None
// });
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
appsrc,
_pipe: pipeline,
tap: ClipTap::new("voice", Duration::from_secs(60)),
})
}
pub fn push(&mut self, pkt: &AudioPacket) {
self.tap.feed(&pkt.data);
let mut buf = gst::Buffer::from_slice(pkt.data.clone());
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.appsrc.push_buffer(buf);
}
pub fn finish(&mut self) {
self.tap.flush();
let _ = self.appsrc.end_of_stream();
}
}
#[cfg(all(test, coverage))]
mod tests {
use super::Voice;
#[tokio::test]
async fn coverage_voice_constructor_starts_stub_pipeline() {
let mut voice = Voice::new("coverage-audio").await.expect("voice");
voice.finish();
}
}