296 lines
9.7 KiB
Rust
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// client/src/output/audio.rs
use anyhow::{Context, Result};
use gst::MessageView::*;
use gst::prelude::*;
use gstreamer as gst;
use gstreamer_app as gst_app;
use std::sync::Mutex;
use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::AudioPacket;
pub struct AudioOut {
pipeline: gst::Pipeline,
src: gst_app::AppSrc,
timeline: Mutex<AudioTimeline>,
}
#[derive(Default)]
struct AudioTimeline {
first_remote_pts_us: Option<u64>,
last_local_pts_us: u64,
packets: u64,
}
impl AudioOut {
pub fn new() -> anyhow::Result<Self> {
gst::init().context("initialising GStreamer")?;
let sink = pick_sink_element()?;
let tee_dump = std::env::var("LESAVKA_TAP_AUDIO")
.ok()
.as_deref()
.map(|v| v == "1")
.unwrap_or(false);
let mut pipe = format!(
"appsrc name=src is-live=true format=time do-timestamp=true \
block=false ! \
queue max-size-time=500000000 max-size-bytes=0 max-size-buffers=0 ! \
aacparse ! avdec_aac ! \
audioconvert ! audioresample ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
queue max-size-time=400000000 max-size-bytes=0 max-size-buffers=0 ! {}",
sink,
);
if tee_dump {
pipe = format!(
"appsrc name=src is-live=true format=time do-timestamp=true block=false ! \
tee name=t ! \
queue max-size-time=500000000 max-size-bytes=0 max-size-buffers=0 ! \
aacparse ! avdec_aac ! audioconvert ! audioresample ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
queue max-size-time=400000000 max-size-bytes=0 max-size-buffers=0 ! {} \
t. ! queue ! filesink location=/tmp/lesavka-audio.aac",
sink,
);
warn!("💾 tee to /tmp/lesavka-audio.aac enabled (LESAVKA_TAP_AUDIO=1)");
}
let pipeline: gst::Pipeline = gst::parse::launch(&pipe)?
.downcast::<gst::Pipeline>()
.expect("not a pipeline");
let src: gst_app::AppSrc = pipeline
.by_name("src")
.expect("no src element")
.downcast::<gst_app::AppSrc>()
.expect("src not an AppSrc");
src.set_caps(Some(
&gst::Caps::builder("audio/mpeg")
.field("mpegversion", &4i32) // AAC
.field("stream-format", &"adts") // ADTS frames
.field("rate", &48_000i32) // 48kHz
.field("channels", &2i32) // stereo
.build(),
));
src.set_format(gst::Format::Time);
#[cfg(not(coverage))]
{
let bus = pipeline.bus().unwrap();
std::thread::spawn(move || {
for msg in bus.iter_timed(gst::ClockTime::NONE) {
match msg.view() {
Error(e) => error!(
"💥 gst error from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
e.error(),
e.debug().unwrap_or_default()
),
Warning(w) => warn!(
"⚠️ gst warning from {:?}: {} ({})",
msg.src().map(|s| s.path_string()),
w.error(),
w.debug().unwrap_or_default()
),
Element(e) => debug!(
"🔎 gst element message: {}",
e.structure().map(|s| s.to_string()).unwrap_or_default()
),
StateChanged(s) if s.current() == gst::State::Playing => {
if msg.src().map(|s| s.is::<gst::Pipeline>()).unwrap_or(false) {
info!("🔊 audio pipeline ▶️ (sink='{}')", sink);
} else {
debug!(
"🔊 element {} now ▶️",
msg.src().map(|s| s.name()).unwrap_or_default()
);
}
}
_ => {}
}
}
});
}
pipeline
.set_state(gst::State::Playing)
.context("starting audio pipeline")?;
Ok(Self {
pipeline,
src,
timeline: Mutex::new(AudioTimeline::default()),
})
}
pub fn push(&self, pkt: AudioPacket) {
let mut buf = gst::Buffer::from_slice(pkt.data);
if let Ok(mut timeline) = self.timeline.lock() {
let base = timeline.first_remote_pts_us.get_or_insert(pkt.pts);
let mut local_pts_us = pkt.pts.saturating_sub(*base);
if local_pts_us < timeline.last_local_pts_us {
local_pts_us = timeline.last_local_pts_us.saturating_add(1);
}
timeline.last_local_pts_us = local_pts_us;
timeline.packets = timeline.packets.saturating_add(1);
if timeline.packets <= 8 || timeline.packets % 600 == 0 {
debug!(
packet = timeline.packets,
remote_pts_us = pkt.pts,
local_pts_us,
bytes = buf.size(),
"🔊 audio packet queued"
);
}
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(local_pts_us)));
}
#[cfg(not(coverage))]
if let Err(e) = self.src.push_buffer(buf) {
warn!("📉 AppSrc push failed: {e:?}");
}
#[cfg(coverage)]
{
let _ = self.src.push_buffer(buf);
}
}
}
impl Drop for AudioOut {
fn drop(&mut self) {
let _ = self.pipeline.set_state(gst::State::Null);
}
}
#[cfg(not(coverage))]
fn pick_sink_element() -> Result<String> {
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
let sink = normalize_sink_override(&s);
info!(
"💪 sink overridden via LESAVKA_AUDIO_SINK={} -> {}",
s, sink
);
return Ok(sink);
}
let sinks = list_pw_sinks();
if let Some((n, st)) = sinks.first() {
info!("🔈 using PipeWire sink '{}' ({st})", n);
return Ok(pulsesink_device_element(n));
}
warn!("🫣 no PipeWire sinks readable - falling back to autoaudiosink");
Ok("autoaudiosink".to_string())
}
#[cfg(coverage)]
fn pick_sink_element() -> Result<String> {
if let Ok(s) = std::env::var("LESAVKA_AUDIO_SINK") {
return Ok(normalize_sink_override(&s));
}
if let Some((n, _)) = list_pw_sinks().first() {
return Ok(pulsesink_device_element(n));
}
Ok("autoaudiosink".to_string())
}
/// Interpret `LESAVKA_AUDIO_SINK` as either a full sink element or bare device.
fn normalize_sink_override(raw: &str) -> String {
let trimmed = raw.trim();
if trimmed.contains([' ', '=', '!']) || trimmed.ends_with("sink") {
return trimmed.to_string();
}
pulsesink_device_element(trimmed)
}
fn pulsesink_device_element(device: &str) -> String {
let escaped = device.replace('\\', "\\\\").replace('"', "\\\"");
let (buffer_time, latency_time) = if device.starts_with("bluez_output.") {
(750_000, 250_000)
} else {
(350_000, 100_000)
};
format!(
"pulsesink device=\"{escaped}\" buffer-time={buffer_time} latency-time={latency_time} sync=true"
)
}
fn list_pw_sinks() -> Vec<(String, String)> {
let default_sink = std::process::Command::new("pactl")
.args(["info"])
.output()
.ok()
.filter(|output| output.status.success())
.and_then(|output| parse_pactl_default_sink(&String::from_utf8_lossy(&output.stdout)));
if let Ok(output) = std::process::Command::new("pactl")
.args(["list", "short", "sinks"])
.output()
&& output.status.success()
{
return parse_pactl_short_sinks(
&String::from_utf8_lossy(&output.stdout),
default_sink.as_deref(),
);
}
default_sink
.map(|sink| vec![(sink, "DEFAULT".to_string())])
.unwrap_or_default()
}
fn parse_pactl_default_sink(stdout: &str) -> Option<String> {
stdout
.lines()
.find_map(|line| line.strip_prefix("Default Sink:"))
.map(str::trim)
.filter(|sink| !sink.is_empty())
.map(str::to_string)
}
fn parse_pactl_short_sinks(stdout: &str, default_sink: Option<&str>) -> Vec<(String, String)> {
let mut sinks = Vec::new();
for line in stdout.lines() {
let columns: Vec<_> = line.split_whitespace().collect();
if columns.len() < 2 {
continue;
}
let name = columns[1].to_string();
let state = columns
.last()
.copied()
.unwrap_or("UNKNOWN")
.to_ascii_uppercase();
sinks.push((name, state));
}
sinks.sort_by_key(|(name, state)| {
(
sink_state_rank(state),
if Some(name.as_str()) == default_sink {
0
} else {
1
},
name.clone(),
)
});
sinks.dedup_by(|left, right| left.0 == right.0);
if let Some(default_sink) = default_sink
&& sinks.iter().all(|(name, _)| name != default_sink)
{
sinks.insert(0, (default_sink.to_string(), "DEFAULT".to_string()));
}
sinks
}
fn sink_state_rank(state: &str) -> u8 {
match state {
"RUNNING" => 0,
"IDLE" => 1,
"SUSPENDED" => 2,
_ => 3,
}
}