lesavka/server/src/audio.rs

118 lines
4.7 KiB
Rust
Raw Normal View History

2025-06-29 22:57:54 -05:00
// server/src/audio.rs
#![forbid(unsafe_code)]
2025-06-29 03:46:34 -05:00
use anyhow::Context;
use futures_util::Stream;
use gstreamer as gst;
use gstreamer_app as gst_app;
2025-06-29 22:57:54 -05:00
use gst::prelude::*;
2025-06-29 03:46:34 -05:00
use lesavka_common::lesavka::AudioPacket;
use tokio_stream::wrappers::ReceiverStream;
use tonic::Status;
2025-06-29 22:57:54 -05:00
use tracing::{debug, error, warn};
2025-06-29 03:46:34 -05:00
2025-06-29 22:57:54 -05:00
/// “Speaker” stream coming **from** the remote host (UAC2gadget playback
/// endpoint) **towards** the client.
2025-06-29 03:46:34 -05:00
pub struct AudioStream {
2025-06-29 22:57:54 -05:00
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<AudioPacket, Status>>,
2025-06-29 03:46:34 -05:00
}
impl Stream for AudioStream {
type Item = Result<AudioPacket, Status>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
Stream::poll_next(std::pin::Pin::new(&mut self.inner), cx)
}
}
2025-06-29 22:57:54 -05:00
impl Drop for AudioStream {
fn drop(&mut self) {
let _ = self._pipeline.set_state(gst::State::Null);
}
}
/*───────────────────────────────────────────────────────────────────────────*/
/* eye_ear() capture from ALSA (“speaker”) and push AAC AUs via gRPC */
/*───────────────────────────────────────────────────────────────────────────*/
pub async fn eye_ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
// NB: one *logical* speaker → id==0. A 2nd logical stream could be
// added later (for multichannel) without changing the client.
gst::init().context("gst init")?;
/*──────────── pipeline description ────────────
*
* ALSA (UAC2 gadget) AAC+ADTS AppSink
* raw 48kHz AU/ADTS
* alsasrc voaacenc appsink
*
*/
let desc = format!(
"alsasrc name=audsrc device=\"{alsa_dev}\" do-timestamp=true ! \
audio/x-raw,channels=2,rate=48000 ! \
voaacenc bitrate=192000 ! aacparse ! queue ! \
appsink name=asink emit-signals=true max-buffers=64 drop=true"
);
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?
.downcast()
.expect("pipeline");
let sink: gst_app::AppSink = pipeline
.by_name("asink")
.expect("asink")
.downcast()
.expect("appsink");
let (tx, rx) = tokio::sync::mpsc::channel(8192);
/*──────────── callbacks ────────────*/
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |s| {
let sample = s.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n % 300 == 0 {
debug!("🔊 eyeear #{n}: {}bytes", map.len());
}
let pts_us = buffer
.pts()
.unwrap_or(gst::ClockTime::ZERO)
.nseconds() / 1_000;
// push nonblocking; drop oldest on overflow
if tx.try_send(Ok(AudioPacket {
id,
pts: pts_us,
data: map.as_slice().to_vec(),
})).is_err() {
static DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let d = DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if d % 300 == 0 {
warn!("🔊💔 dropped {d} audio AUs (client too slow)");
}
}
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
pipeline.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
2025-06-29 17:24:19 -05:00
Ok(AudioStream {
2025-06-29 22:57:54 -05:00
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
2025-06-29 17:24:19 -05:00
})
2025-06-29 03:46:34 -05:00
}