302 lines
9.9 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// client/src/output/audio.rs
use anyhow::{Context, Result};
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use std::sync::Mutex;
use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::AudioPacket;
pub struct AudioOut {
pipeline: gst::Pipeline,
src: gst_app::AppSrc,
timeline: Mutex<AudioTimeline>,
}
#[derive(Default)]
struct AudioTimeline {
last_remote_pts_us: Option<u64>,
packets: u64,
}
impl AudioOut {
pub fn new() -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
let sink = pick_sink_element()?;
let tee_dump = std::env::var("LESAVKA_TAP_AUDIO")
.ok()
.as_deref()
.map(|v| v == "1")
.unwrap_or(false);
let mut pipe = format!(
"appsrc name=src is-live=true format=time do-timestamp=true \
block=false ! \
queue max-size-time=500000000 max-size-bytes=0 max-size-buffers=0 ! \
aacparse ! avdec_aac ! \
audioconvert ! audioresample ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
level name=remote_audio_level interval=1000000000 message=true ! \
queue max-size-time=400000000 max-size-bytes=0 max-size-buffers=0 ! {}",
sink,
);
if tee_dump {
pipe = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
tee name=t ! \
queue max-size-time=500000000 max-size-bytes=0 max-size-buffers=0 ! \
aacparse ! avdec_aac ! audioconvert ! audioresample ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
level name=remote_audio_level interval=1000000000 message=true ! \
queue max-size-time=400000000 max-size-bytes=0 max-size-buffers=0 ! {} \
t. ! queue ! filesink location=/tmp/lesavka-audio.aac",
sink,
);
warn!("💾 tee to /tmp/lesavka-audio.aac enabled (LESAVKA_TAP_AUDIO=1)");
}
let pipeline: gst::Pipeline = gst::parse::launch(&pipe)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
let src: gst_app::AppSrc = pipeline
.by_name("src")
.expect("no src element")
.downcast::<gst_app::AppSrc>()
.expect("src not an AppSrc");
src.set_caps(Some(
&gst::Caps::builder("audio/mpeg")
.field("mpegversion", &4i32) // AAC
.field("stream-format", &"adts") // ADTS frames
.field("rate", &48_000i32) // 48kHz
.field("channels", &2i32) // stereo
.build(),
));
src.set_format(gst::Format::Time);
#[cfg(not(coverage))]
{
let bus = pipeline.bus().unwrap();
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!(
"💥 gst error from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ gst warning from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
w.error(),
w.debug().unwrap_or_default()
),
Element(e) => {
if let Some(structure) = e.structure() {
if structure.name() == "level" {
info!("🔊 decoded audio level {}", structure);
} else {
debug!("🔎 gst element message: {}", structure);
}
}
}
StateChanged(s) if s.current() == gst::State::Playing => {
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🔊 audio pipeline ▶️ (sink='{}')", sink);
} else {
debug!(
"🔊 element {} now ▶️",
msg.src().map(|s| s.name()).unwrap_or_default()
);
}
}
_ => {}
}
}
});
}
pipeline
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
Ok(Self {
pipeline,
src,
timeline: Mutex::new(AudioTimeline::default()),
})
}
pub fn push(&self, pkt: AudioPacket) {
let buf = live_audio_buffer(pkt, &self.timeline);
#[cfg(not(coverage))]
if let Err(e) = self.src.push_buffer(buf) {
warn!("📉 AppSrc push failed: {e:?}");
}
#[cfg(coverage)]
{
let _ = self.src.push_buffer(buf);
}
}
}
fn live_audio_buffer(pkt: AudioPacket, timeline: &Mutex<AudioTimeline>) -> gst::Buffer {
let buf = gst::Buffer::from_slice(pkt.data);
if let Ok(mut timeline) = timeline.lock() {
let remote_gap_us = timeline
.last_remote_pts_us
.map(|last| pkt.pts.saturating_sub(last));
timeline.last_remote_pts_us = Some(pkt.pts);
timeline.packets = timeline.packets.saturating_add(1);
if timeline.packets <= 8 || timeline.packets % 600 == 0 {
debug!(
packet = timeline.packets,
remote_pts_us = pkt.pts,
remote_gap_us,
bytes = buf.size(),
"🔊 audio packet queued for live appsrc timestamping"
);
}
}
buf
}
impl Drop for AudioOut {
fn drop(&mut self) {
let _ = self.pipeline.set_state(gst::State::Null);
}
}
#[cfg(not(coverage))]
fn pick_sink_element() -> Result<String> {
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
let sink = normalize_sink_override(&s);
info!(
"💪 sink overridden via LESAVKA_AUDIO_SINK={} -> {}",
s, sink
);
return Ok(sink);
}
let sinks = list_pw_sinks();
if let Some((n, st)) = sinks.first() {
info!("🔈 using PipeWire sink '{}' ({st})", n);
return Ok(pulsesink_device_element(n));
}
warn!("🫣 no PipeWire sinks readable - falling back to autoaudiosink");
Ok("autoaudiosink".to_string())
}
#[cfg(coverage)]
fn pick_sink_element() -> Result<String> {
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
return Ok(normalize_sink_override(&s));
}
if let Some((n, _)) = list_pw_sinks().first() {
return Ok(pulsesink_device_element(n));
}
Ok("autoaudiosink".to_string())
}
/// Interpret `LESAVKA_AUDIO_SINK` as either a full sink element or bare device.
fn normalize_sink_override(raw: &str) -> String {
let trimmed = raw.trim();
if trimmed.contains([' ', '=', '!']) || trimmed.ends_with("sink") {
return trimmed.to_string();
}
pulsesink_device_element(trimmed)
}
fn pulsesink_device_element(device: &str) -> String {
let escaped = device.replace('\\', "\\\\").replace('"', "\\\"");
let (buffer_time, latency_time) = if device.starts_with("bluez_output.") {
(750_000, 250_000)
} else {
(350_000, 100_000)
};
format!(
"pulsesink device=\"{escaped}\" buffer-time={buffer_time} latency-time={latency_time} sync=true"
)
}
fn list_pw_sinks() -> Vec<(String, String)> {
let default_sink = std::process::Command::new("pactl")
.args(["info"])
.output()
.ok()
.filter(|output| output.status.success())
.and_then(|output| parse_pactl_default_sink(&String::from_utf8_lossy(&output.stdout)));
if let Ok(output) = std::process::Command::new("pactl")
.args(["list", "short", "sinks"])
.output()
&& output.status.success()
{
return parse_pactl_short_sinks(
&String::from_utf8_lossy(&output.stdout),
default_sink.as_deref(),
);
}
default_sink
.map(|sink| vec![(sink, "DEFAULT".to_string())])
.unwrap_or_default()
}
fn parse_pactl_default_sink(stdout: &str) -> Option<String> {
stdout
.lines()
.find_map(|line| line.strip_prefix("Default Sink:"))
.map(str::trim)
.filter(|sink| !sink.is_empty())
.map(str::to_string)
}
fn parse_pactl_short_sinks(stdout: &str, default_sink: Option<&str>) -> Vec<(String, String)> {
let mut sinks = Vec::new();
for line in stdout.lines() {
let columns: Vec<_> = line.split_whitespace().collect();
if columns.len() < 2 {
continue;
}
let name = columns[1].to_string();
let state = columns
.last()
.copied()
.unwrap_or("UNKNOWN")
.to_ascii_uppercase();
sinks.push((name, state));
}
sinks.sort_by_key(|(name, state)| {
(
sink_state_rank(state),
if Some(name.as_str()) == default_sink {
0
} else {
1
},
name.clone(),
)
});
sinks.dedup_by(|left, right| left.0 == right.0);
if let Some(default_sink) = default_sink
&& sinks.iter().all(|(name, _)| name != default_sink)
{
sinks.insert(0, (default_sink.to_string(), "DEFAULT".to_string()));
}
sinks
}
fn sink_state_rank(state: &str) -> u8 {
match state {
"RUNNING" => 0,
"IDLE" => 1,
"SUSPENDED" => 2,
_ => 3,
}
}