196 lines
6.7 KiB
Rust
Raw Normal View History

2025-06-29 03:46:34 -05:00
// client/src/output/audio.rs
2025-06-30 11:38:57 -05:00
use anyhow::{Context, Result};
use gst::MessageView::*;
use gst::prelude::*;
2025-06-29 03:46:34 -05:00
use gstreamer as gst;
use gstreamer_app as gst_app;
use tracing::{debug, error, info, warn};
2025-06-29 03:46:34 -05:00
2025-06-30 11:38:57 -05:00
use lesavka_common::lesavka::AudioPacket;
2025-06-29 03:46:34 -05:00
pub struct AudioOut {
2025-06-30 11:38:57 -05:00
pipeline: gst::Pipeline,
2025-06-29 03:46:34 -05:00
src: gst_app::AppSrc,
}
impl AudioOut {
pub fn new() -> anyhow::Result<Self> {
2025-06-30 11:38:57 -05:00
gst::init().context("initialising GStreamer")?;
let sink = pick_sink_element()?;
let tee_dump = std::env::var("LESAVKA_TAP_AUDIO")
.ok()
.as_deref()
.map(|v| v == "1")
.unwrap_or(false);
let mut pipe = format!(
"appsrc name=src is-live=true format=time do-timestamp=true \
block=false ! \
queue leaky=downstream ! \
aacparse ! avdec_aac ! audioresample ! audioconvert ! {}",
sink,
);
if tee_dump {
pipe = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
tee name=t ! \
queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! audioconvert ! {} \
t. ! queue ! filesink location=/tmp/lesavka-audio.aac",
sink,
);
warn!("💾 tee to /tmp/lesavka-audio.aac enabled (LESAVKA_TAP_AUDIO=1)");
}
let pipeline: gst::Pipeline = gst::parse::launch(&pipe)?
2025-06-29 03:46:34 -05:00
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
let src: gst_app::AppSrc = pipeline
.by_name("src")
.expect("no src element")
.downcast::<gst_app::AppSrc>()
.expect("src not an AppSrc");
src.set_caps(Some(
&gst::Caps::builder("audio/mpeg")
.field("mpegversion", &4i32) // AAC
.field("stream-format", &"adts") // ADTS frames
.field("rate", &48_000i32) // 48kHz
.field("channels", &2i32) // stereo
.build(),
2025-06-30 11:38:57 -05:00
));
2025-06-29 03:46:34 -05:00
src.set_format(gst::Format::Time);
2025-06-30 19:35:38 -05:00
#[cfg(not(coverage))]
{
let bus = pipeline.bus().unwrap();
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!(
"💥 gst error from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ gst warning from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
w.error(),
w.debug().unwrap_or_default()
),
Element(e) => debug!(
"🔎 gst element message: {}",
e.structure().map(|s| s.to_string()).unwrap_or_default()
),
StateChanged(s) if s.current() == gst::State::Playing => {
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🔊 audio pipeline ▶️ (sink='{}')", sink);
} else {
debug!(
"🔊 element {} now ▶️",
msg.src().map(|s| s.name()).unwrap_or_default()
);
}
2025-06-30 15:45:37 -05:00
}
_ => {}
}
2025-06-30 02:42:20 -05:00
}
});
}
pipeline
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
2025-06-30 11:38:57 -05:00
Ok(Self { pipeline, src })
2025-06-29 03:46:34 -05:00
}
pub fn push(&self, pkt: AudioPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
#[cfg(not(coverage))]
2025-06-30 11:38:57 -05:00
if let Err(e) = self.src.push_buffer(buf) {
warn!("📉 AppSrc push failed: {e:?}");
}
#[cfg(coverage)]
{
let _ = self.src.push_buffer(buf);
}
2025-06-30 11:38:57 -05:00
}
}
impl Drop for AudioOut {
fn drop(&mut self) {
let _ = self.pipeline.set_state(gst::State::Null);
}
}
#[cfg(not(coverage))]
2025-06-30 11:38:57 -05:00
fn pick_sink_element() -> Result<String> {
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
let sink = normalize_sink_override(&s);
info!(
"💪 sink overridden via LESAVKA_AUDIO_SINK={} -> {}",
s, sink
);
return Ok(sink);
2025-06-30 11:38:57 -05:00
}
let sinks = list_pw_sinks();
2025-06-30 11:38:57 -05:00
for (n, st) in &sinks {
if *st == "RUNNING" {
info!("🔈 using default RUNNING sink '{}'", n);
return Ok(pulsesink_device_element(n));
2025-06-30 11:38:57 -05:00
}
}
if let Some((n, _)) = sinks.iter().find(|(_, st)| *st == "RUNNING") {
2025-06-30 15:45:37 -05:00
warn!("🏃 picking first RUNNING sink '{}'", n);
return Ok(pulsesink_device_element(n));
2025-06-30 11:38:57 -05:00
}
if let Some((n, _)) = sinks.first() {
2025-06-30 15:45:37 -05:00
warn!("🎲 picking first sink '{}'", n);
return Ok(pulsesink_device_element(n));
2025-06-30 11:38:57 -05:00
}
2025-06-30 15:45:37 -05:00
warn!("🫣 no PipeWire sinks readable - falling back to autoaudiosink");
2025-06-30 11:38:57 -05:00
Ok("autoaudiosink".to_string())
}
#[cfg(coverage)]
fn pick_sink_element() -> Result<String> {
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
return Ok(normalize_sink_override(&s));
}
if let Some((n, _)) = list_pw_sinks().first() {
return Ok(pulsesink_device_element(n));
}
Ok("autoaudiosink".to_string())
}
/// Interpret `LESAVKA_AUDIO_SINK` as either a full sink element or bare device.
fn normalize_sink_override(raw: &str) -> String {
let trimmed = raw.trim();
if trimmed.contains([' ', '=', '!']) || trimmed.ends_with("sink") {
return trimmed.to_string();
}
pulsesink_device_element(trimmed)
}
fn pulsesink_device_element(device: &str) -> String {
let escaped = device.replace('\\', "\\\\").replace('"', "\\\"");
format!("pulsesink device=\"{escaped}\"")
}
2025-06-30 11:38:57 -05:00
fn list_pw_sinks() -> Vec<(String, String)> {
2025-11-30 23:41:29 -03:00
if let Ok(info) = std::process::Command::new("pactl")
.args(["info"])
.output()
.map(|o| String::from_utf8_lossy(&o.stdout).to_string())
{
if let Some(line) = info.lines().find(|l| l.starts_with("Default Sink:")) {
let def = line["Default Sink:".len()..].trim();
return vec![(def.to_string(), "UNKNOWN".to_string())];
2025-06-30 11:38:57 -05:00
}
2025-06-29 03:46:34 -05:00
}
2025-11-30 23:41:29 -03:00
Vec::new()
2025-06-29 03:46:34 -05:00
}