lesavka/server/src/audio/voice_input.rs

359 lines
12 KiB
Rust
Raw Normal View History

pub struct ClipTap {
buf: Vec<u8>,
tag: &'static str,
next_dump: Instant,
period: Duration,
}
impl ClipTap {
pub fn new(tag: &'static str, period: Duration) -> Self {
Self {
buf: Vec::with_capacity(260_000),
tag,
next_dump: Instant::now() + period,
period,
}
}
pub fn feed(&mut self, bytes: &[u8]) {
self.buf.extend_from_slice(bytes);
if self.buf.len() > 256_000 {
self.buf.drain(..self.buf.len() - 256_000);
}
if Instant::now() >= self.next_dump {
self.flush();
self.next_dump += self.period;
}
}
pub fn flush(&mut self) {
if self.buf.is_empty() {
return;
}
let ts = chrono::Local::now().format("%Y%m%d-%H%M%S");
let path = format!("/tmp/{}-{}.aac", self.tag, ts);
let _ = std::fs::write(&path, &self.buf);
self.buf.clear();
}
}
impl Drop for ClipTap {
fn drop(&mut self) {
self.flush()
}
}
// ────────────────────── microphone sink ────────────────────────────────
pub struct Voice {
appsrc: gst_app::AppSrc,
_pipe: gst::Pipeline, // keep pipeline alive
tap: ClipTap,
}
impl Drop for Voice {
fn drop(&mut self) {
let _ = self._pipe.set_state(gst::State::Null);
}
}
fn voice_input_caps() -> gst::Caps {
gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("stream-format", "adts")
.field("rate", 48_000i32)
.field("channels", 2i32)
.build()
}
fn voice_sink_buffer_time_us() -> i64 {
positive_voice_sink_timing_env("LESAVKA_UAC_BUFFER_TIME_US", 20_000)
}
fn voice_sink_latency_time_us() -> i64 {
positive_voice_sink_timing_env("LESAVKA_UAC_LATENCY_TIME_US", 5_000)
}
fn voice_sink_compensation_us() -> i64 {
std::env::var("LESAVKA_UAC_COMPENSATION_US")
.ok()
.and_then(|value| value.trim().parse::<i64>().ok())
.filter(|value| *value >= 0)
.unwrap_or_else(default_voice_sink_compensation_us)
}
fn default_voice_sink_compensation_us() -> i64 {
let cfg = crate::camera::current_camera_config();
if cfg.output == crate::camera::CameraOutput::Hdmi {
non_negative_voice_sink_timing_env("LESAVKA_UAC_HDMI_COMPENSATION_US", 105_000)
} else {
0
}
}
fn positive_voice_sink_timing_env(name: &str, default: i64) -> i64 {
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<i64>().ok())
.filter(|value| *value > 0)
.unwrap_or(default)
}
fn non_negative_voice_sink_timing_env(name: &str, default: i64) -> i64 {
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<i64>().ok())
.filter(|value| *value >= 0)
.unwrap_or(default)
}
impl Voice {
#[cfg(coverage)]
pub async fn new(_alsa_dev: &str) -> anyhow::Result<Self> {
gst::init().context("gst init")?;
let pipeline = gst::Pipeline::new();
let appsrc = gst::ElementFactory::make("appsrc")
.build()
.context("make appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("appsrc");
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
appsrc.set_caps(Some(&voice_input_caps()));
let sink = gst::ElementFactory::make("fakesink")
.build()
.context("make fakesink")?;
pipeline.add_many(&[appsrc.upcast_ref(), &sink])?;
gst::Element::link_many(&[appsrc.upcast_ref(), &sink])?;
start_pipeline_or_reset(&pipeline, "starting voice pipeline")?;
Ok(Self {
appsrc,
_pipe: pipeline,
tap: ClipTap::new("voice", Duration::from_secs(60)),
})
}
#[cfg(not(coverage))]
pub async fn new(alsa_dev: &str) -> anyhow::Result<Self> {
use gst::prelude::*;
gst::init().context("gst init")?;
// pipeline
let pipeline = gst::Pipeline::new();
// elements
let appsrc = gst::ElementFactory::make("appsrc")
.build()
.context("make appsrc")?
.downcast::<gst_app::AppSrc>()
.unwrap();
// dedicated AppSrc helpers exist and avoid the needless `?`
appsrc.set_caps(Some(&voice_input_caps()));
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
let decodebin = gst::ElementFactory::make("decodebin")
.build()
.context("make decodebin")?;
let convert = gst::ElementFactory::make("audioconvert")
.build()
.context("make audioconvert")?;
let resample = gst::ElementFactory::make("audioresample")
.build()
.context("make audioresample")?;
let caps = gst::Caps::builder("audio/x-raw")
.field("format", "S16LE")
.field("channels", 2i32)
.field("rate", 48_000i32)
.build();
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &caps)
.build()
.context("make capsfilter")?;
let alsa_sink = gst::ElementFactory::make("alsasink")
.build()
.context("make alsasink")?;
let delay_queue = gst::ElementFactory::make("queue")
.build()
.context("make voice delay queue")?;
let buffer_time_us = voice_sink_buffer_time_us();
let latency_time_us = voice_sink_latency_time_us();
let compensation_us = voice_sink_compensation_us();
alsa_sink.set_property("device", alsa_dev);
alsa_sink.set_property("sync", false);
alsa_sink.set_property("async", false);
alsa_sink.set_property("enable-last-sample", false);
alsa_sink.set_property("provide-clock", false);
alsa_sink.set_property("buffer-time", buffer_time_us);
alsa_sink.set_property("latency-time", latency_time_us);
let compensation_ns = (compensation_us.max(0) as u64).saturating_mul(1_000);
delay_queue.set_property("max-size-buffers", 0u32);
delay_queue.set_property("max-size-bytes", 0u32);
delay_queue.set_property("max-size-time", compensation_ns);
delay_queue.set_property("min-threshold-time", compensation_ns);
tracing::info!(
%alsa_dev,
buffer_time_us,
latency_time_us,
compensation_us,
"🎤 UAC sink low-latency timing armed"
);
pipeline.add_many([
appsrc.upcast_ref(),
&decodebin,
&convert,
&resample,
&capsfilter,
&delay_queue,
&alsa_sink,
])?;
appsrc.link(&decodebin)?;
gst::Element::link_many([&convert, &resample, &capsfilter, &delay_queue, &alsa_sink])?;
/*------------ decodebin autolink ----------------*/
let convert_sink = convert
.static_pad("sink")
.context("audioconvert sink pad")?;
decodebin.connect_pad_added(move |_db, pad| {
if convert_sink.is_linked() {
return;
}
let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None));
let is_audio = caps
.structure(0)
.map(|s| s.name().starts_with("audio/"))
.unwrap_or(false);
if !is_audio {
return;
}
let _ = pad.link(&convert_sink);
});
let bus = pipeline.bus().context("voice pipeline bus")?;
// underrun ≠ error just show a warning
// let _id = alsa_sink.connect("underrun", false, |_| {
// tracing::warn!("⚠️ USB playback underrun host muted/not reading");
// None
// });
start_pipeline_or_reset(&pipeline, "starting voice pipeline")?;
spawn_pipeline_bus_logger(bus, "voice", "🎤 voice pipeline ▶️");
Ok(Self {
appsrc,
_pipe: pipeline,
tap: ClipTap::new("voice", Duration::from_secs(60)),
})
}
pub fn push(&mut self, pkt: &AudioPacket) {
self.tap.feed(&pkt.data);
let mut buf = gst::Buffer::from_slice(pkt.data.clone());
buf.get_mut()
.unwrap()
.set_pts(Some(gst::ClockTime::from_useconds(pkt.pts)));
let _ = self.appsrc.push_buffer(buf);
}
pub fn finish(&mut self) {
self.tap.flush();
let _ = self.appsrc.end_of_stream();
}
}
#[cfg(test)]
mod voice_sink_timing_tests {
use crate::camera::update_camera_config;
use super::{voice_sink_buffer_time_us, voice_sink_latency_time_us};
use super::{default_voice_sink_compensation_us, voice_sink_compensation_us};
#[test]
fn voice_sink_timing_defaults_stay_live_call_friendly() {
temp_env::with_var_unset("LESAVKA_UAC_BUFFER_TIME_US", || {
temp_env::with_var_unset("LESAVKA_UAC_LATENCY_TIME_US", || {
temp_env::with_var_unset("LESAVKA_UAC_COMPENSATION_US", || {
temp_env::with_var_unset("LESAVKA_UAC_HDMI_COMPENSATION_US", || {
temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || {
update_camera_config();
assert_eq!(voice_sink_buffer_time_us(), 20_000);
assert_eq!(voice_sink_latency_time_us(), 5_000);
assert_eq!(voice_sink_compensation_us(), 0);
});
});
});
});
});
}
#[test]
fn voice_sink_timing_env_accepts_positive_overrides_only() {
temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || {
update_camera_config();
temp_env::with_var("LESAVKA_UAC_BUFFER_TIME_US", Some("42000"), || {
temp_env::with_var("LESAVKA_UAC_LATENCY_TIME_US", Some("7000"), || {
assert_eq!(voice_sink_buffer_time_us(), 42_000);
assert_eq!(voice_sink_latency_time_us(), 7_000);
assert_eq!(voice_sink_compensation_us(), 0);
});
});
});
temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || {
update_camera_config();
temp_env::with_var("LESAVKA_UAC_BUFFER_TIME_US", Some("0"), || {
temp_env::with_var("LESAVKA_UAC_LATENCY_TIME_US", Some("-5"), || {
temp_env::with_var("LESAVKA_UAC_COMPENSATION_US", Some("166667"), || {
assert_eq!(voice_sink_buffer_time_us(), 20_000);
assert_eq!(voice_sink_latency_time_us(), 5_000);
assert_eq!(voice_sink_compensation_us(), 166_667);
});
});
});
});
temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || {
update_camera_config();
temp_env::with_var("LESAVKA_UAC_COMPENSATION_US", Some("-5"), || {
temp_env::with_var("LESAVKA_UAC_BUFFER_TIME_US", Some("0"), || {
temp_env::with_var("LESAVKA_UAC_LATENCY_TIME_US", Some("-5"), || {
assert_eq!(voice_sink_buffer_time_us(), 20_000);
assert_eq!(voice_sink_latency_time_us(), 5_000);
assert_eq!(voice_sink_compensation_us(), 0);
});
});
});
});
}
#[test]
fn hdmi_sink_compensation_defaults_to_hdmi_specific_delay() {
temp_env::with_var_unset("LESAVKA_UAC_COMPENSATION_US", || {
temp_env::with_var_unset("LESAVKA_UAC_HDMI_COMPENSATION_US", || {
temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("hdmi"), || {
update_camera_config();
assert_eq!(default_voice_sink_compensation_us(), 105_000);
assert_eq!(voice_sink_compensation_us(), 105_000);
});
});
});
}
#[test]
fn explicit_compensation_override_wins_over_hdmi_default() {
temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("hdmi"), || {
update_camera_config();
temp_env::with_var("LESAVKA_UAC_HDMI_COMPENSATION_US", Some("120000"), || {
temp_env::with_var("LESAVKA_UAC_COMPENSATION_US", Some("90000"), || {
assert_eq!(default_voice_sink_compensation_us(), 120_000);
assert_eq!(voice_sink_compensation_us(), 90_000);
});
});
});
}
}