lesavka/server/src/audio.rs

304 lines
11 KiB
Rust
Raw Normal View History

2025-06-29 22:57:54 -05:00
// server/src/audio.rs
#![forbid(unsafe_code)]
2025-06-30 14:20:07 -05:00
use anyhow::{anyhow, Context};
use chrono::Local;
2025-06-29 03:46:34 -05:00
use futures_util::Stream;
use gstreamer as gst;
use gstreamer_app as gst_app;
2025-06-29 22:57:54 -05:00
use gst::prelude::*;
2025-07-01 17:30:34 -05:00
use gst::ElementFactory;
2025-06-30 02:42:20 -05:00
use gst::MessageView::*;
2025-06-29 03:46:34 -05:00
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
2025-06-29 22:57:54 -05:00
use tracing::{debug, error, warn};
2025-07-01 17:30:34 -05:00
use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH};
use std::sync::{Arc, Mutex};
use lesavka_common::lesavka::AudioPacket;
2025-06-29 03:46:34 -05:00
2025-06-29 22:57:54 -05:00
/// “Speaker” stream coming **from** the remote host (UAC2gadget playback
/// endpoint) **towards** the client.
2025-06-29 03:46:34 -05:00
pub struct AudioStream {
2025-06-29 22:57:54 -05:00
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<AudioPacket, Status>>,
2025-06-29 03:46:34 -05:00
}
impl Stream for AudioStream {
type Item = Result<AudioPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
2025-06-29 22:57:54 -05:00
impl Drop for AudioStream {
fn drop(&mut self) {
let _ = self._pipeline.set_state(gst::State::Null);
}
}
/*───────────────────────────────────────────────────────────────────────────*/
2025-06-30 15:45:37 -05:00
/* ear() - capture from ALSA (“speaker”) and push AAC AUs via gRPC */
2025-06-29 22:57:54 -05:00
/*───────────────────────────────────────────────────────────────────────────*/
2025-06-30 02:42:20 -05:00
pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
2025-06-29 22:57:54 -05:00
// NB: one *logical* speaker → id==0. A 2nd logical stream could be
// added later (for multichannel) without changing the client.
gst::init().context("gst init")?;
/*──────────── pipeline description ────────────
*
* ALSA (UAC2 gadget) AAC+ADTS AppSink
* raw 48kHz AU/ADTS
* alsasrc voaacenc appsink
*
*/
2025-07-01 17:30:34 -05:00
let desc = build_pipeline_desc(alsa_dev)?;
2025-06-29 22:57:54 -05:00
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast()
.expect("pipeline");
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.expect("asink")
.downcast()
.expect("appsink");
2025-07-01 17:30:34 -05:00
let tap = Arc::new(Mutex::new(ClipTap::new("🎧 - ear", Duration::from_secs(60))));
sink.connect("underrun", false, |_| {
tracing::warn!("⚠️ USB playback underrun host muted or not reading");
None
});
2025-06-29 22:57:54 -05:00
let (tx, rx) = tokio::sync::mpsc::channel(8192);
2025-06-30 02:42:20 -05:00
let bus = pipeline.bus().expect("bus");
2025-06-30 02:03:01 -05:00
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!("💥 audio pipeline: {} ({})",
2025-06-30 02:42:20 -05:00
e.error(), e.debug().unwrap_or_default()),
2025-06-30 02:03:01 -05:00
Warning(w) => warn!("⚠️ audio pipeline: {} ({})",
2025-06-30 02:42:20 -05:00
w.error(), w.debug().unwrap_or_default()),
2025-06-30 02:03:01 -05:00
StateChanged(s) if s.current() == gst::State::Playing =>
debug!("🎶 audio pipeline PLAYING"),
_ => {}
}
}
});
2025-06-29 22:57:54 -05:00
/*──────────── callbacks ────────────*/
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
2025-07-01 17:30:34 -05:00
.new_sample({
let tap = tap.clone();
move |s| {
2025-06-29 22:57:54 -05:00
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
2025-07-01 17:30:34 -05:00
// -------- cliptap (minute dumps) ------------
tap.lock().unwrap().feed(map.as_slice());
2025-06-29 22:57:54 -05:00
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
2025-06-30 02:42:20 -05:00
debug!("🎧 ear #{n}: {}bytes", map.len());
2025-06-29 22:57:54 -05:00
}
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.nseconds() / 1_000;
// push nonblocking; drop oldest on overflow
if tx.try_send(Ok(AudioPacket {
id,
pts: pts_us,
data: map.as_slice().to_vec(),
})).is_err() {
static DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if d % 300 == 0 {
2025-06-30 02:42:20 -05:00
warn!("🎧💔 dropped {d} audio AUs (client too slow)");
2025-06-29 22:57:54 -05:00
}
}
Ok(gst::FlowSuccess::Ok)
2025-07-01 17:30:34 -05:00
}
}).build(),
2025-06-29 22:57:54 -05:00
);
pipeline.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
2025-06-29 17:24:19 -05:00
Ok(AudioStream {
2025-06-29 22:57:54 -05:00
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
2025-06-29 17:24:19 -05:00
})
2025-06-29 03:46:34 -05:00
}
2025-06-30 01:19:28 -05:00
2025-06-30 14:20:07 -05:00
/*────────────────────────── build_pipeline_desc ───────────────────────────*/
2025-06-30 01:19:28 -05:00
fn build_pipeline_desc(dev: &str) -> anyhow::Result<String> {
2025-06-30 11:38:57 -05:00
let reg = gst::Registry::get();
2025-06-30 12:34:27 -05:00
2025-06-30 14:20:07 -05:00
// first available encoder
let enc = ["fdkaacenc", "voaacenc", "avenc_aac"]
2025-06-30 01:19:28 -05:00
.into_iter()
2025-06-30 11:38:57 -05:00
.find(|&e| {
reg.find_plugin(e).is_some()
2025-06-30 14:20:07 -05:00
|| reg
.find_feature(e, ElementFactory::static_type())
.is_some()
2025-06-30 11:38:57 -05:00
})
.ok_or_else(|| anyhow!("no AAC encoder plugin available"))?;
2025-06-30 01:19:28 -05:00
Ok(format!(
2025-06-30 14:20:07 -05:00
concat!(
"alsasrc device=\"{dev}\" do-timestamp=true ! ",
"audio/x-raw,format=S16LE,channels=2,rate=48000 ! ",
"audioconvert ! audioresample ! {enc} bitrate=192000 ! ",
"aacparse ! ",
"capsfilter caps=audio/mpeg,stream-format=adts,channels=2,rate=48000 ! ",
"tee name=t ",
"t. ! queue ! appsink name=asink emit-signals=true ",
"t. ! queue ! appsink name=debugtap emit-signals=true max-buffers=500 drop=true"
),
dev = dev,
enc = enc
2025-06-30 01:19:28 -05:00
))
2025-06-30 14:20:07 -05:00
}
2025-07-01 17:30:34 -05:00
// ────────────────────── minuteclip helper ───────────────────────────────
pub struct ClipTap {
buf: Vec<u8>,
tag: &'static str,
next_dump: Instant,
period: Duration,
}
2025-06-30 14:20:07 -05:00
2025-07-01 17:30:34 -05:00
impl ClipTap {
pub fn new(tag: &'static str, period: Duration) -> Self {
Self {
buf: Vec::with_capacity(260_000),
tag,
next_dump: Instant::now() + period,
period,
2025-06-30 14:20:07 -05:00
}
2025-07-01 17:30:34 -05:00
}
pub fn feed(&mut self, bytes: &[u8]) {
self.buf.extend_from_slice(bytes);
if self.buf.len() > 256_000 {
self.buf.drain(..self.buf.len() - 256_000);
}
if Instant::now() >= self.next_dump {
self.flush();
self.next_dump += self.period;
}
}
pub fn flush(&mut self) {
if self.buf.is_empty() {
return;
}
let ts = chrono::Local::now().format("%Y%m%d-%H%M%S");
let path = format!("/tmp/{}-{}.aac", self.tag, ts);
if std::fs::write(&path, &self.buf).is_ok() {
tracing::debug!("📼 wrote {} clip → {}", self.tag, path);
}
self.buf.clear();
}
}
impl Drop for ClipTap {
fn drop(&mut self) {
self.flush()
}
}
// ────────────────────── microphone sink ────────────────────────────────
pub struct Voice {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline, // keep pipeline alive
tap: ClipTap,
}
2025-06-30 14:20:07 -05:00
2025-07-01 17:30:34 -05:00
impl Voice {
pub async fn new(alsa_dev: &str) -> anyhow::Result<Self> {
use gst::prelude::*;
gst::init().context("gst init")?;
// pipeline
let pipeline = gst::Pipeline::new();
// elements
let appsrc = gst::ElementFactory::make("appsrc")
.build()
.context("make appsrc")?
.downcast::<gst_app::AppSrc>()
2025-06-30 14:20:07 -05:00
.unwrap();
2025-07-01 17:30:34 -05:00
// dedicated AppSrc helpers exist and avoid the needless `?`
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
let decodebin = gst::ElementFactory::make("decodebin")
.build()
.context("make decodebin")?;
let alsa_sink = gst::ElementFactory::make("alsasink")
.build()
.context("make alsasink")?;
alsa_sink.set_property("device", &alsa_dev);
pipeline.add_many(&[appsrc.upcast_ref(), &decodebin, &alsa_sink])?;
appsrc.link(&decodebin)?;
/*------------ decodebin autolink ----------------*/
let sink_clone = alsa_sink.clone(); // keep original for later
decodebin.connect_pad_added(move |_db, pad| {
let sink_pad = sink_clone.static_pad("sink").unwrap();
if !sink_pad.is_linked() {
let _ = pad.link(&sink_pad);
}
});
// underrun ≠ error just show a warning
let _id = alsa_sink.connect("underrun", false, |_| {
tracing::warn!("⚠️ USB playback underrun host muted/not reading");
None
});
pipeline.set_state(gst::State::Playing)?;
Ok(Self {
appsrc,
_pipe: pipeline,
tap: ClipTap::new("voice", Duration::from_secs(60)),
})
}
pub fn push(&mut self, pkt: &AudioPacket) {
use gst::prelude::*;
self.tap.feed(&pkt.data);
let mut buf = gst::Buffer::from_slice(pkt.data.clone());
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
if let Err(e) = self.appsrc.push_buffer(buf) {
tracing::warn!("🎤 AppSrc push failed: {e:?}");
}
}
pub fn finish(&mut self) {
self.tap.flush();
let _ = self.appsrc.end_of_stream();
2025-06-30 14:20:07 -05:00
}
2025-07-01 17:30:34 -05:00
}