175 lines
6.8 KiB
Rust
175 lines
6.8 KiB
Rust
// client/src/output/audio.rs
|
||
|
||
use anyhow::{Context, Result};
|
||
use gstreamer as gst;
|
||
use gstreamer_app as gst_app;
|
||
use gst::prelude::*;
|
||
use tracing::{error, info, warn, debug};
|
||
|
||
use lesavka_common::lesavka::AudioPacket;
|
||
|
||
pub struct AudioOut {
|
||
pipeline: gst::Pipeline,
|
||
src: gst_app::AppSrc,
|
||
}
|
||
|
||
impl AudioOut {
|
||
pub fn new() -> anyhow::Result<Self> {
|
||
gst::init().context("initialising GStreamer")?;
|
||
|
||
// ── 1. Decide which sink element to instantiate ────────────────────
|
||
let sink = pick_sink_element()?;
|
||
|
||
// Operator can request a tee to /tmp via LESAVKA_TAP_AUDIO=1
|
||
let tee_dump = std::env::var("LESAVKA_TAP_AUDIO")
|
||
.ok()
|
||
.as_deref()
|
||
.map(|v| v == "1")
|
||
.unwrap_or(false);
|
||
|
||
// ── 2. Assemble pipeline description string ────────────────────────
|
||
let mut pipe = format!(
|
||
"appsrc name=src is-live=true format=time do-timestamp=true \
|
||
block=false ! \
|
||
queue leaky=downstream ! \
|
||
aacparse ! avdec_aac ! audioresample ! audioconvert ! {}",
|
||
sink,
|
||
);
|
||
if tee_dump {
|
||
pipe = format!(
|
||
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
|
||
tee name=t ! \
|
||
queue leaky=downstream ! aacparse ! avdec_aac ! audioresample ! audioconvert ! {} \
|
||
t. ! queue ! filesink location=/tmp/lesavka-audio.aac",
|
||
sink,
|
||
);
|
||
warn!("💾 tee to /tmp/lesavka-audio.aac enabled (LESAVKA_TAP_AUDIO=1)");
|
||
}
|
||
|
||
// ── 3. Create the pipeline & fetch the AppSrc ───────────────────────
|
||
let pipeline: gst::Pipeline = gst::parse::launch(&pipe)?
|
||
.downcast::<gst::Pipeline>()
|
||
.expect("not a pipeline");
|
||
|
||
let src: gst_app::AppSrc = pipeline
|
||
.by_name("src")
|
||
.expect("no src element")
|
||
.downcast::<gst_app::AppSrc>()
|
||
.expect("src not an AppSrc");
|
||
|
||
src.set_caps(Some(&gst::Caps::builder("audio/mpeg")
|
||
.field("mpegversion", &4i32) // AAC
|
||
.field("stream-format", &"adts") // ADTS frames
|
||
.field("rate", &48_000i32) // 48 kHz
|
||
.field("channels", &2i32) // stereo
|
||
.build()
|
||
));
|
||
src.set_format(gst::Format::Time);
|
||
|
||
// ── 4. Log *all* warnings/errors from the bus ──────────────────────
|
||
let bus = pipeline.bus().unwrap();
|
||
std::thread::spawn(move || {
|
||
use gst::MessageView::*;
|
||
for msg in bus.iter_timed(gst::ClockTime::NONE) {
|
||
match msg.view() {
|
||
Error(e) => error!("💥 gst error from {:?}: {} ({})",
|
||
msg.src().map(|s| s.path_string()),
|
||
e.error(), e.debug().unwrap_or_default()),
|
||
Warning(w) => warn!("⚠️ gst warning from {:?}: {} ({})",
|
||
msg.src().map(|s| s.path_string()),
|
||
w.error(), w.debug().unwrap_or_default()),
|
||
Element(e) => debug!("🔎 gst element message: {}", e
|
||
.structure()
|
||
.map(|s| s.to_string())
|
||
.unwrap_or_default()),
|
||
StateChanged(s) if s.current() == gst::State::Playing =>
|
||
info!("🔊 audio pipeline PLAYING (sink='{}')", sink),
|
||
_ => {}
|
||
}
|
||
}
|
||
});
|
||
|
||
pipeline.set_state(gst::State::Playing).context("starting audio pipeline")?;
|
||
|
||
Ok(Self { pipeline, src })
|
||
}
|
||
|
||
pub fn push(&self, pkt: AudioPacket) {
|
||
let mut buf = gst::Buffer::from_slice(pkt.data);
|
||
buf.get_mut()
|
||
.unwrap()
|
||
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
|
||
if let Err(e) = self.src.push_buffer(buf) {
|
||
warn!("📉 AppSrc push failed: {e:?}");
|
||
}
|
||
}
|
||
}
|
||
|
||
impl Drop for AudioOut {
|
||
fn drop(&mut self) {
|
||
// put the whole pipeline back to NULL so GStreamer can dispose cleanly
|
||
let _ = self.pipeline.set_state(gst::State::Null);
|
||
}
|
||
}
|
||
|
||
/*──────────────── helper: sink selection ─────────────────────────────*/
|
||
fn pick_sink_element() -> Result<String> {
|
||
// 1. Operator override
|
||
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
|
||
info!("🎛️ sink overridden via LESAVKA_AUDIO_SINK={}", s);
|
||
return Ok(s);
|
||
}
|
||
|
||
// 2. Query PipeWire for default & running sinks
|
||
// (works even if PulseAudio is present because PipeWire mimics it)
|
||
let sinks = list_pw_sinks(); // Vec<(name,state)>
|
||
for (n, st) in &sinks {
|
||
if *st == "RUNNING" {
|
||
info!("🔈 using default RUNNING sink '{}'", n);
|
||
return Ok(format!("pulsesink device={}", n));
|
||
}
|
||
}
|
||
|
||
// 3. First RUNNING sink
|
||
if let Some((n, _)) = sinks.iter().find(|(_, st)| *st == "RUNNING") {
|
||
warn!("🪄 picking first RUNNING sink '{}'", n);
|
||
return Ok(format!("pulsesink device={}", n));
|
||
}
|
||
// 4. Anything
|
||
if let Some((n, _)) = sinks.first() {
|
||
warn!("🪄 picking first sink '{}'", n);
|
||
return Ok(format!("pulsesink device={}", n));
|
||
}
|
||
|
||
// Fallback – let autoaudiosink try its luck
|
||
warn!("😬 no PipeWire sinks readable – falling back to autoaudiosink");
|
||
Ok("autoaudiosink".to_string())
|
||
}
|
||
|
||
/// Minimal PipeWire sink enumerator (no extra crate required).
|
||
fn list_pw_sinks() -> Vec<(String, String)> {
|
||
let mut out = Vec::new();
|
||
if let Ok(lines) = std::process::Command::new("pw-cli")
|
||
.args(["ls", "Node"])
|
||
.output()
|
||
.map(|o| String::from_utf8_lossy(&o.stdout).to_string())
|
||
{
|
||
for l in lines.lines() {
|
||
// Example: " 36 │ node.alive = true │ alsa_output.pci-0000_2f_00.4.iec958-stereo │ state: SUSPENDED ..."
|
||
if let Some(pos) = l.find("│") {
|
||
let parts: Vec<_> = l[pos..].split('│').map(|s| s.trim()).collect();
|
||
if parts.len() >= 3 && parts[2].starts_with("alsa_output.") {
|
||
let name = parts[2].to_string();
|
||
// try to parse state, else UNKNOWN
|
||
let state = parts.get(3)
|
||
.and_then(|s| s.split_whitespace().nth(1))
|
||
.unwrap_or("UNKNOWN")
|
||
.to_string();
|
||
out.push((name, state));
|
||
}
|
||
}
|
||
}
|
||
}
|
||
out
|
||
}
|