fix(sync): remove probe audio pacing distortion

This commit is contained in:
Brad Stein 2026-04-26 01:48:38 -03:00
parent 7116306662
commit 4dea109589
6 changed files with 311 additions and 247 deletions

View File

@ -23,13 +23,16 @@ use tracing::{error, info, trace};
const MIC_GAIN_ENV: &str = "LESAVKA_MIC_GAIN";
const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL";
const MIC_LEVEL_TAP_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL";
const MIC_SAMPLE_RATE: u64 = 48_000;
const MIC_CHANNELS: usize = 2;
const MIC_SAMPLE_BYTES: usize = std::mem::size_of::<i16>();
pub struct MicrophoneCapture {
#[allow(dead_code)] // kept alive to hold PLAYING state
pipeline: gst::Pipeline,
sink: gst_app::AppSink,
level_tap_running: Option<Arc<AtomicBool>>,
pts_rebaser: crate::live_capture_clock::SourcePtsRebaser,
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser,
}
impl MicrophoneCapture {
@ -48,15 +51,9 @@ impl MicrophoneCapture {
_ => Self::default_source_desc(),
};
debug!("🎤 source: {source_desc}");
let aac = ["avenc_aac", "fdkaacenc", "faac", "opusenc"]
.into_iter()
.find(|e| gst::ElementFactory::find(e).is_some())
.unwrap_or("opusenc");
let parser = parser_for_encoder(aac);
let gain = mic_gain_from_env();
let level_tap_path = mic_level_tap_path();
let desc =
microphone_pipeline_desc(&source_desc, aac, parser, gain, level_tap_path.is_some());
let desc = microphone_pipeline_desc(&source_desc, gain, level_tap_path.is_some());
let pipeline: gst::Pipeline = gst::parse::launch(&desc)?.downcast().expect("pipeline");
let sink: gst_app::AppSink = pipeline.by_name("asink").unwrap().downcast().unwrap();
@ -114,7 +111,7 @@ impl MicrophoneCapture {
pipeline,
sink,
level_tap_running,
pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(),
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
})
}
@ -125,10 +122,11 @@ impl MicrophoneCapture {
let buf = sample.buffer().unwrap();
let map = buf.map_readable().unwrap();
let source_pts_us = buf.pts().map(|ts| ts.nseconds() / 1_000);
let timing = self.pts_rebaser.rebase_with_lag_cap(
let packet_duration_us = buffer_duration_us(buf, map.len());
let timing = self.pts_rebaser.rebase_with_packet_duration(
source_pts_us,
1,
Some(crate::live_capture_clock::upstream_source_lag_cap()),
packet_duration_us,
crate::live_capture_clock::upstream_source_lag_cap(),
);
let pts = timing.packet_pts_us;
#[cfg(not(coverage))]
@ -276,55 +274,46 @@ fn mic_level_tap_path() -> Option<PathBuf> {
.map(PathBuf::from)
}
fn parser_for_encoder(aac: &str) -> &'static str {
if aac.contains("opus") {
"capsfilter caps=audio/x-opus,rate=48000,channels=2"
} else {
"aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate=48000,channels=2"
}
}
fn microphone_pipeline_desc(
source_desc: &str,
encoder: &str,
parser: &str,
gain: f64,
level_tap_enabled: bool,
) -> String {
fn microphone_pipeline_desc(source_desc: &str, gain: f64, level_tap_enabled: bool) -> String {
let gain = format_mic_gain_for_gst(gain);
if level_tap_enabled {
format!(
"{source_desc} ! \
audioconvert ! audioresample ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
audio/x-raw,format=S16LE,channels={MIC_CHANNELS},rate={MIC_SAMPLE_RATE} ! \
volume name=mic_input_gain volume={gain} ! \
tee name=t \
t. ! queue max-size-buffers=100 leaky=downstream ! \
audioconvert ! audioresample ! \
audio/x-raw,channels=2,rate=48000 ! \
{encoder} bitrate=128000 ! \
{parser} ! \
t. ! queue max-size-buffers=64 leaky=downstream ! \
audio/x-raw,format=S16LE,channels={MIC_CHANNELS},rate={MIC_SAMPLE_RATE} ! \
appsink name=asink emit-signals=true max-buffers=50 drop=true \
t. ! queue max-size-buffers=8 leaky=downstream ! \
audio/x-raw,format=S16LE,channels=2,rate=48000 ! \
audio/x-raw,format=S16LE,channels={MIC_CHANNELS},rate={MIC_SAMPLE_RATE} ! \
appsink name=level_sink emit-signals=false sync=false max-buffers=8 drop=true"
)
} else {
format!(
"{source_desc} ! \
audioconvert ! audioresample ! \
audio/x-raw,channels=2,rate=48000 ! \
audio/x-raw,format=S16LE,channels={MIC_CHANNELS},rate={MIC_SAMPLE_RATE} ! \
volume name=mic_input_gain volume={gain} ! \
audioconvert ! audioresample ! \
audio/x-raw,channels=2,rate=48000 ! \
{encoder} bitrate=128000 ! \
{parser} ! \
queue max-size-buffers=100 leaky=downstream ! \
queue max-size-buffers=64 leaky=downstream ! \
appsink name=asink emit-signals=true max-buffers=50 drop=true"
)
}
}
fn buffer_duration_us(buf: &gst::BufferRef, bytes: usize) -> u64 {
buf.duration()
.map(|ts| ts.nseconds() / 1_000)
.unwrap_or_else(|| {
let bytes_per_frame = MIC_CHANNELS * MIC_SAMPLE_BYTES;
let frames = bytes / bytes_per_frame.max(1);
((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128)
as u64
})
.max(1)
}
/// Detect launcher catalog names that should be opened through Pulse directly.
fn looks_like_pulse_source_name(source: &str) -> bool {
let source = source.trim();

View File

@ -104,6 +104,27 @@ pub struct RebasedSourcePts {
pub lag_clamped: bool,
}
#[derive(Debug, Default)]
struct DurationPacedSourcePtsState {
next_packet_pts_us: Option<u64>,
}
/// Rebase encoded packet timing by anchoring once, then pacing by duration.
///
/// Inputs: optional source PTS from one encoded packet stream plus the packet's
/// declared duration and a freshness lag cap.
/// Outputs: packet timestamps on the shared client capture clock that advance
/// by actual media duration instead of trusting potentially stretched parser
/// PTS on every packet.
/// Why: encoded audio parsers can emit packet PTS values that do not track
/// real packet duration faithfully, which can make the server pace audio far
/// too slowly or quickly even when the underlying capture stream is healthy.
#[derive(Debug, Default)]
pub struct DurationPacedSourcePtsRebaser {
anchor_rebaser: SourcePtsRebaser,
state: Mutex<DurationPacedSourcePtsState>,
}
impl SourcePtsRebaser {
/// Translate one source-buffer timestamp onto the shared capture clock.
///
@ -149,12 +170,9 @@ impl SourcePtsRebaser {
used_source_pts = true;
}
if used_source_pts
&& let Some(max_lag) = max_lag
{
let lag_floor_us = capture_now_us.saturating_sub(
max_lag.as_micros().min(u64::MAX as u128) as u64,
);
if used_source_pts && let Some(max_lag) = max_lag {
let lag_floor_us =
capture_now_us.saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64);
if packet_pts_us < lag_floor_us {
packet_pts_us = lag_floor_us;
lag_clamped = true;
@ -180,11 +198,49 @@ impl SourcePtsRebaser {
}
}
impl DurationPacedSourcePtsRebaser {
/// Rebase one encoded packet onto the shared capture clock.
///
/// Inputs: optional packet PTS, the packet media duration in microseconds,
/// and a freshness lag cap behind the live capture clock.
/// Outputs: a rebased packet timestamp plus the values used to derive it.
/// Why: once the first encoded packet is anchored, the safest pacing signal
/// for compressed audio is its actual packet duration, with a live lag
/// clamp to keep delayed batches from resurrecting stale timing.
#[must_use]
pub fn rebase_with_packet_duration(
&self,
source_pts_us: Option<u64>,
packet_duration_us: u64,
max_lag: Duration,
) -> RebasedSourcePts {
let step_us = packet_duration_us.max(1);
let mut rebased =
self.anchor_rebaser
.rebase_with_lag_cap(source_pts_us, step_us, Some(max_lag));
let lag_floor_us = rebased
.capture_now_us
.saturating_sub(max_lag.as_micros().min(u64::MAX as u128) as u64);
let mut state = self
.state
.lock()
.expect("duration paced source pts rebaser mutex poisoned");
let mut packet_pts_us = state.next_packet_pts_us.unwrap_or(rebased.packet_pts_us);
if packet_pts_us < lag_floor_us {
packet_pts_us = lag_floor_us;
rebased.lag_clamped = true;
}
state.next_packet_pts_us = Some(packet_pts_us.saturating_add(step_us));
rebased.packet_pts_us = packet_pts_us;
rebased
}
}
#[cfg(test)]
mod tests {
use super::{
SourcePtsRebaser, capture_pts_us, packet_age, upstream_source_lag_cap,
upstream_timing_trace_enabled,
DurationPacedSourcePtsRebaser, SourcePtsRebaser, capture_pts_us, packet_age,
upstream_source_lag_cap, upstream_timing_trace_enabled,
};
use std::time::Duration;
@ -249,11 +305,8 @@ mod tests {
let rebased = SourcePtsRebaser::default();
let _first = rebased.rebase_with_lag_cap(Some(1_000_000), 1, None);
std::thread::sleep(Duration::from_millis(8));
let second = rebased.rebase_with_lag_cap(
Some(1_000_001),
1,
Some(Duration::from_millis(2)),
);
let second =
rebased.rebase_with_lag_cap(Some(1_000_001), 1, Some(Duration::from_millis(2)));
assert!(second.used_source_pts);
assert!(second.lag_clamped);
@ -286,4 +339,32 @@ mod tests {
assert_eq!(upstream_source_lag_cap(), Duration::from_millis(90));
});
}
#[test]
fn duration_paced_rebaser_advances_by_packet_duration_when_source_pts_stretch() {
let rebased = DurationPacedSourcePtsRebaser::default();
let first =
rebased.rebase_with_packet_duration(Some(0), 21_333, Duration::from_millis(250));
let second =
rebased.rebase_with_packet_duration(Some(52_666), 21_333, Duration::from_millis(250));
assert_eq!(
second.packet_pts_us.saturating_sub(first.packet_pts_us),
21_333
);
}
#[test]
fn duration_paced_rebaser_clamps_when_duration_pacing_falls_stale() {
let rebased = DurationPacedSourcePtsRebaser::default();
let _first = rebased.rebase_with_packet_duration(Some(0), 10_000, Duration::from_millis(2));
std::thread::sleep(Duration::from_millis(8));
let second =
rebased.rebase_with_packet_duration(Some(10_000), 10_000, Duration::from_millis(2));
assert!(
second.packet_pts_us.saturating_add(2_500) >= second.capture_now_us,
"duration-paced packet pts should never trail live capture by more than the lag cap"
);
}
}

View File

@ -160,6 +160,7 @@ fn render_audio_chunk(
pcm
}
#[cfg(test)]
fn probe_pts_exceeds_duration(pts_usecs: u64, duration: std::time::Duration) -> bool {
pts_usecs > duration.as_micros() as u64
}

View File

@ -1,5 +1,24 @@
use super::*;
pub(super) fn rebase_probe_packet_pts(
pts_rebaser: &crate::live_capture_clock::SourcePtsRebaser,
source_pts_us: u64,
lag_cap: Duration,
) -> u64 {
pts_rebaser
.rebase_with_lag_cap(Some(source_pts_us), 1, Some(lag_cap))
.packet_pts_us
}
fn rebase_probe_audio_packet_pts(
pts_rebaser: &crate::live_capture_clock::DurationPacedSourcePtsRebaser,
source_pts_us: u64,
packet_duration_us: u64,
lag_cap: Duration,
) -> crate::live_capture_clock::RebasedSourcePts {
pts_rebaser.rebase_with_packet_duration(Some(source_pts_us), packet_duration_us, lag_cap)
}
pub struct SyncProbeCapture {
pipeline: gst::Pipeline,
running: Arc<AtomicBool>,
@ -24,16 +43,6 @@ impl SyncProbeCapture {
.context("missing sync probe video appsink")?
.downcast::<gst_app::AppSink>()
.expect("video appsink");
let audio_src = pipeline
.by_name("sync_probe_audio_src")
.context("missing sync probe audio appsrc")?
.downcast::<gst_app::AppSrc>()
.expect("audio appsrc");
let audio_sink = pipeline
.by_name("sync_probe_audio_sink")
.context("missing sync probe audio appsink")?
.downcast::<gst_app::AppSink>()
.expect("audio appsink");
pipeline
.set_state(gst::State::Playing)
@ -55,8 +64,6 @@ impl SyncProbeCapture {
video_queue.clone(),
);
let audio_thread = spawn_audio_thread(
audio_src,
audio_sink,
schedule,
duration,
probe_start,
@ -124,23 +131,8 @@ fn build_pipeline(camera: CameraConfig, _schedule: &PulseSchedule) -> Result<gst
),
};
let audio_branch = format!(
"appsrc name=sync_probe_audio_src is-live=true format=time do-timestamp=false \
caps=audio/x-raw,format=S16LE,layout=interleaved,channels={},rate={} ! \
queue max-size-buffers=8 leaky=downstream ! \
audioconvert ! audioresample ! audio/x-raw,channels=2,rate={} ! \
{} ! aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate={},channels=2 ! \
appsink name=sync_probe_audio_sink emit-signals=false sync=false max-buffers=256 drop=false",
AUDIO_CHANNELS,
AUDIO_SAMPLE_RATE,
AUDIO_SAMPLE_RATE,
pick_aac_encoder()?,
AUDIO_SAMPLE_RATE,
);
let desc = format!("{video_branch} {audio_branch}");
gst::parse::launch(&desc)
.with_context(|| format!("building sync probe pipeline: {desc}"))?
gst::parse::launch(&video_branch)
.with_context(|| format!("building sync probe pipeline: {video_branch}"))?
.downcast::<gst::Pipeline>()
.map_err(|_| anyhow::anyhow!("sync probe description did not build a pipeline"))
}
@ -161,20 +153,6 @@ fn pick_h264_encoder(fps: u32) -> Result<String> {
bail!("no usable H.264 encoder found for sync probe")
}
fn pick_aac_encoder() -> Result<&'static str> {
[
"avenc_aac bitrate=128000",
"fdkaacenc bitrate=128000",
"faac bitrate=128000",
]
.into_iter()
.find(|entry| {
let name = entry.split_ascii_whitespace().next().unwrap_or_default();
gst::ElementFactory::find(name).is_some()
})
.ok_or_else(|| anyhow::anyhow!("no usable AAC encoder found for sync probe"))
}
fn spawn_video_thread(
src: gst_app::AppSrc,
sink: gst_app::AppSink,
@ -186,6 +164,8 @@ fn spawn_video_thread(
queue: FreshPacketQueue<VideoPacket>,
) -> JoinHandle<()> {
thread::spawn(move || {
let pts_rebaser = crate::live_capture_clock::SourcePtsRebaser::default();
let lag_cap = crate::live_capture_clock::upstream_source_lag_cap();
let dark_frame = build_dark_probe_frame(camera.width as usize, camera.height as usize);
let regular_pulse_frame =
build_regular_probe_frame(camera.width as usize, camera.height as usize);
@ -231,9 +211,10 @@ fn spawn_video_thread(
&& let Some(buffer) = sample.buffer()
&& let Ok(map) = buffer.map_readable()
{
let source_pts_us = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
let packet = VideoPacket {
id: 2,
pts: buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000,
pts: rebase_probe_packet_pts(&pts_rebaser, source_pts_us, lag_cap),
data: map.as_slice().to_vec(),
..Default::default()
};
@ -249,8 +230,6 @@ fn spawn_video_thread(
}
fn spawn_audio_thread(
src: gst_app::AppSrc,
sink: gst_app::AppSink,
schedule: PulseSchedule,
duration: Duration,
probe_start: Instant,
@ -258,7 +237,7 @@ fn spawn_audio_thread(
queue: FreshPacketQueue<AudioPacket>,
) -> JoinHandle<()> {
thread::spawn(move || {
let pts_rebaser = crate::live_capture_clock::SourcePtsRebaser::default();
let pts_rebaser = crate::live_capture_clock::DurationPacedSourcePtsRebaser::default();
let lag_cap = crate::live_capture_clock::upstream_source_lag_cap();
let chunk_duration = Duration::from_millis(AUDIO_CHUNK_MS);
let samples_per_chunk =
@ -279,71 +258,21 @@ fn spawn_audio_thread(
}
let chunk = render_audio_chunk(&schedule, pts, samples_per_chunk);
let mut buffer = gst::Buffer::from_slice(chunk);
if let Some(meta) = buffer.get_mut() {
let pts_time = gst::ClockTime::from_nseconds(pts.as_nanos() as u64);
meta.set_pts(Some(pts_time));
meta.set_dts(Some(pts_time));
meta.set_duration(Some(gst::ClockTime::from_nseconds(
chunk_duration.as_nanos() as u64,
)));
}
if src.push_buffer(buffer).is_err() {
break;
}
drain_audio_samples(
&sink,
&queue,
let timing = rebase_probe_audio_packet_pts(
&pts_rebaser,
pts.as_micros().min(u64::MAX as u128) as u64,
chunk_duration.as_micros().min(u64::MAX as u128) as u64,
lag_cap,
duration,
gst::ClockTime::from_mseconds(25),
);
let packet = AudioPacket {
id: 0,
pts: timing.packet_pts_us,
data: chunk,
};
let _ = queue.push(packet, Duration::ZERO);
chunk_index = chunk_index.saturating_add(1);
}
let _ = src.end_of_stream();
drain_audio_samples(
&sink,
&queue,
&pts_rebaser,
lag_cap,
duration,
gst::ClockTime::from_mseconds(500),
);
queue.close();
})
}
fn drain_audio_samples(
sink: &gst_app::AppSink,
queue: &FreshPacketQueue<AudioPacket>,
pts_rebaser: &crate::live_capture_clock::SourcePtsRebaser,
lag_cap: Duration,
duration: Duration,
timeout: gst::ClockTime,
) {
while let Some(sample) = sink.try_pull_sample(timeout) {
let Some(buffer) = sample.buffer() else {
continue;
};
let pts_usecs = buffer.pts().unwrap_or(gst::ClockTime::ZERO).nseconds() / 1_000;
if probe_pts_exceeds_duration(pts_usecs, duration) {
break;
}
let Ok(map) = buffer.map_readable() else {
continue;
};
let timing = pts_rebaser.rebase_with_lag_cap(Some(pts_usecs), 1, Some(lag_cap));
let packet = AudioPacket {
id: 0,
pts: timing.packet_pts_us,
data: map.as_slice().to_vec(),
};
let _ = queue.push(packet, Duration::ZERO);
if timeout == gst::ClockTime::ZERO {
continue;
}
}
}

View File

@ -5,10 +5,8 @@ use crate::input::camera::{CameraCodec, CameraConfig};
use crate::sync_probe::analyze::detect_audio_onsets;
use crate::sync_probe::schedule::PulseSchedule;
use lesavka_common::lesavka::{AudioPacket, VideoPacket};
use std::fs;
use std::process::Command;
use std::time::Duration;
use tempfile::tempdir;
use std::time::Instant;
fn stub_camera() -> CameraConfig {
CameraConfig {
@ -19,42 +17,13 @@ fn stub_camera() -> CameraConfig {
}
}
fn decode_adts_aac_to_mono_samples(aac_bytes: &[u8]) -> Vec<i16> {
let dir = tempdir().expect("tempdir");
let input = dir.path().join("runtime-probe.aac");
fs::write(&input, aac_bytes).expect("write runtime AAC");
let output = Command::new("ffmpeg")
.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-i")
.arg(&input)
.arg("-ac")
.arg("1")
.arg("-ar")
.arg(super::AUDIO_SAMPLE_RATE.to_string())
.arg("-f")
.arg("s16le")
.arg("-acodec")
.arg("pcm_s16le")
.arg("-")
.output()
.expect("decode runtime AAC with ffmpeg");
assert!(
output.status.success(),
"ffmpeg decode failed: {}",
String::from_utf8_lossy(&output.stderr)
);
assert!(
output.stdout.len() >= 2,
"decoded runtime AAC did not yield enough PCM bytes"
);
output
.stdout
fn decode_interleaved_pcm_to_mono_samples(pcm_bytes: &[u8]) -> Vec<i16> {
pcm_bytes
.chunks_exact(2)
.map(|chunk| i16::from_le_bytes([chunk[0], chunk[1]]))
.collect::<Vec<_>>()
.chunks_exact(super::AUDIO_CHANNELS)
.map(|frame| frame[0])
.collect()
}
@ -172,9 +141,24 @@ fn probe_video_frames_render_distinct_idle_regular_and_marker_patterns() {
assert_ne!(regular, marker);
}
#[test]
fn probe_video_pts_are_lag_capped_like_audio() {
let rebaser = crate::live_capture_clock::SourcePtsRebaser::default();
let _first =
super::runtime::rebase_probe_packet_pts(&rebaser, 1_000_000, Duration::from_millis(2));
std::thread::sleep(Duration::from_millis(8));
let second =
super::runtime::rebase_probe_packet_pts(&rebaser, 1_000_001, Duration::from_millis(2));
assert!(
second >= 6_000,
"delayed probe video packets should be pulled forward instead of looking ancient"
);
}
#[cfg(not(coverage))]
#[tokio::test]
async fn runtime_audio_probe_emits_nontrivial_aac_packets() {
async fn runtime_audio_probe_emits_nontrivial_pcm_packets() {
let capture = SyncProbeCapture::new(
stub_camera(),
PulseSchedule::new(
@ -203,16 +187,16 @@ async fn runtime_audio_probe_emits_nontrivial_aac_packets() {
}
assert!(
packet_count >= 16,
"expected the runtime probe to emit many AAC packets, got {packet_count}"
packet_count >= 120,
"expected the runtime probe to emit many PCM packets, got {packet_count}"
);
assert!(
total_bytes >= 8_000,
"expected the runtime probe to emit a meaningful AAC payload, got {total_bytes} bytes"
total_bytes >= 200_000,
"expected the runtime probe to emit a meaningful PCM payload, got {total_bytes} bytes"
);
assert!(
largest_packet >= 64,
"expected at least one non-trivial AAC packet, largest was {largest_packet} bytes"
largest_packet >= 1_000,
"expected at least one non-trivial PCM packet, largest was {largest_packet} bytes"
);
}
@ -229,22 +213,22 @@ async fn runtime_audio_probe_decodes_detectable_click_onsets() {
.expect("runtime capture");
let audio_queue = capture.audio_queue();
let mut aac = Vec::new();
let mut pcm = Vec::new();
loop {
let next = audio_queue.pop_fresh().await;
let Some(packet) = next.packet else {
break;
};
aac.extend_from_slice(&packet.data);
pcm.extend_from_slice(&packet.data);
}
assert!(
aac.len() >= 8_000,
"expected the runtime probe AAC stream to carry a meaningful payload, got {} bytes",
aac.len()
pcm.len() >= 200_000,
"expected the runtime probe PCM stream to carry a meaningful payload, got {} bytes",
pcm.len()
);
let decoded = decode_adts_aac_to_mono_samples(&aac);
let decoded = decode_interleaved_pcm_to_mono_samples(&pcm);
let onsets =
detect_audio_onsets(&decoded, super::AUDIO_SAMPLE_RATE as u32, 5).expect("audio onsets");
assert!(
@ -274,16 +258,16 @@ async fn runtime_audio_probe_decodes_detectable_click_onsets_for_manual_harness_
.expect("runtime capture");
let audio_queue = capture.audio_queue();
let mut aac = Vec::new();
let mut pcm = Vec::new();
loop {
let next = audio_queue.pop_fresh().await;
let Some(packet) = next.packet else {
break;
};
aac.extend_from_slice(&packet.data);
pcm.extend_from_slice(&packet.data);
}
let decoded = decode_adts_aac_to_mono_samples(&aac);
let decoded = decode_interleaved_pcm_to_mono_samples(&pcm);
let onsets =
detect_audio_onsets(&decoded, super::AUDIO_SAMPLE_RATE as u32, 5).expect("audio onsets");
assert!(
@ -299,3 +283,94 @@ async fn runtime_audio_probe_decodes_detectable_click_onsets_for_manual_harness_
);
}
}
#[cfg(not(coverage))]
#[tokio::test]
async fn runtime_probe_audio_and_video_pts_advance_near_real_time() {
let capture_duration = Duration::from_secs(3);
let capture = SyncProbeCapture::new(
stub_camera(),
PulseSchedule::new(
Duration::from_secs(1),
Duration::from_millis(500),
Duration::from_millis(120),
4,
),
capture_duration,
)
.expect("runtime capture");
let video_queue = capture.video_queue();
let audio_queue = capture.audio_queue();
let started = Instant::now();
let video_task = tokio::spawn(async move {
let mut first_pts = None;
let mut last_pts = None;
let mut count = 0usize;
loop {
let next = video_queue.pop_fresh().await;
let Some(packet) = next.packet else {
break;
};
first_pts.get_or_insert(packet.pts);
last_pts = Some(packet.pts);
count = count.saturating_add(1);
}
(first_pts, last_pts, count)
});
let audio_task = tokio::spawn(async move {
let mut first_pts = None;
let mut last_pts = None;
let mut count = 0usize;
loop {
let next = audio_queue.pop_fresh().await;
let Some(packet) = next.packet else {
break;
};
first_pts.get_or_insert(packet.pts);
last_pts = Some(packet.pts);
count = count.saturating_add(1);
}
(first_pts, last_pts, count)
});
let (video_first, video_last, video_count) = video_task.await.expect("video drain");
let (audio_first, audio_last, audio_count) = audio_task.await.expect("audio drain");
let wall_elapsed = started.elapsed();
let video_span = video_last.expect("video last pts") - video_first.expect("video first pts");
let audio_span = audio_last.expect("audio last pts") - audio_first.expect("audio first pts");
eprintln!(
"runtime probe spans: video_count={video_count} video_span_us={video_span} audio_count={audio_count} audio_span_us={audio_span} wall_elapsed={wall_elapsed:?}"
);
assert!(
video_count >= 60,
"expected many runtime probe video packets, got {video_count}"
);
assert!(
audio_count >= 60,
"expected many runtime probe audio packets, got {audio_count}"
);
assert!(
wall_elapsed <= Duration::from_secs(5),
"runtime probe should not take excessively long locally, took {wall_elapsed:?}"
);
assert!(
video_span >= 2_400_000,
"video pts should span most of the 3s capture, got {} us",
video_span
);
assert!(
audio_span >= 2_400_000,
"audio pts should span most of the 3s capture, got {} us",
audio_span
);
assert!(
audio_span <= 3_400_000,
"audio pts should stay near the 3s capture duration, got {} us",
audio_span
);
}

View File

@ -29,7 +29,7 @@ impl ClipTap {
return;
}
let ts = chrono::Local::now().format("%Y%m%d-%H%M%S");
let path = format!("/tmp/{}-{}.aac", self.tag, ts);
let path = format!("/tmp/{}-{}.s16le", self.tag, ts);
let _ = std::fs::write(&path, &self.buf);
self.buf.clear();
}
@ -40,6 +40,10 @@ impl Drop for ClipTap {
}
}
const VOICE_SAMPLE_RATE: u64 = 48_000;
const VOICE_CHANNELS: usize = 2;
const VOICE_SAMPLE_BYTES: usize = std::mem::size_of::<i16>();
// ────────────────────── microphone sink ────────────────────────────────
pub struct Voice {
appsrc: gst_app::AppSrc,
@ -55,11 +59,11 @@ impl Drop for Voice {
}
fn voice_input_caps() -> gst::Caps {
gst::Caps::builder("audio/mpeg")
.field("mpegversion", 4i32)
.field("stream-format", "adts")
.field("rate", 48_000i32)
.field("channels", 2i32)
gst::Caps::builder("audio/x-raw")
.field("format", "S16LE")
.field("layout", "interleaved")
.field("rate", VOICE_SAMPLE_RATE as i32)
.field("channels", VOICE_CHANNELS as i32)
.build()
}
@ -174,9 +178,6 @@ impl Voice {
appsrc.set_format(gst::Format::Time);
appsrc.set_is_live(true);
let decodebin = gst::ElementFactory::make("decodebin")
.build()
.context("make decodebin")?;
let convert = gst::ElementFactory::make("audioconvert")
.build()
.context("make audioconvert")?;
@ -185,8 +186,8 @@ impl Voice {
.context("make audioresample")?;
let caps = gst::Caps::builder("audio/x-raw")
.field("format", "S16LE")
.field("channels", 2i32)
.field("rate", 48_000i32)
.field("channels", VOICE_CHANNELS as i32)
.field("rate", VOICE_SAMPLE_RATE as i32)
.build();
let capsfilter = gst::ElementFactory::make("capsfilter")
.property("caps", &caps)
@ -249,7 +250,6 @@ impl Voice {
if delay_queue_enabled {
pipeline.add_many([
appsrc.upcast_ref(),
&decodebin,
&convert,
&resample,
&capsfilter,
@ -261,7 +261,6 @@ impl Voice {
} else {
pipeline.add_many([
appsrc.upcast_ref(),
&decodebin,
&convert,
&resample,
&capsfilter,
@ -270,10 +269,9 @@ impl Voice {
&alsa_sink,
])?;
}
appsrc.link(&decodebin)?;
appsrc.link(&convert)?;
if delay_queue_enabled {
gst::Element::link_many([
&convert,
&resample,
&capsfilter,
&level,
@ -283,7 +281,6 @@ impl Voice {
])?;
} else {
gst::Element::link_many([
&convert,
&resample,
&capsfilter,
&level,
@ -292,25 +289,6 @@ impl Voice {
])?;
}
/*------------ decodebin autolink ----------------*/
let convert_sink = convert
.static_pad("sink")
.context("audioconvert sink pad")?;
decodebin.connect_pad_added(move |_db, pad| {
if convert_sink.is_linked() {
return;
}
let caps = pad.current_caps().unwrap_or_else(|| pad.query_caps(None));
let is_audio = caps
.structure(0)
.map(|s| s.name().starts_with("audio/"))
.unwrap_or(false);
if !is_audio {
return;
}
let _ = pad.link(&convert_sink);
});
let bus = pipeline.bus().context("voice pipeline bus")?;
// underrun ≠ error just show a warning
@ -342,6 +320,9 @@ impl Voice {
let ts = gst::ClockTime::from_useconds(pkt.pts);
meta.set_pts(Some(ts));
meta.set_dts(Some(ts));
meta.set_duration(Some(gst::ClockTime::from_useconds(
voice_packet_duration_us(pkt.data.len()),
)));
}
let _ = self.appsrc.push_buffer(buf);
@ -352,6 +333,14 @@ impl Voice {
}
}
fn voice_packet_duration_us(bytes: usize) -> u64 {
let bytes_per_frame = VOICE_CHANNELS * VOICE_SAMPLE_BYTES;
let frames = bytes / bytes_per_frame.max(1);
(((frames as u128 * 1_000_000u128) / VOICE_SAMPLE_RATE as u128).min(u64::MAX as u128)
as u64)
.max(1)
}
#[cfg(test)]
mod voice_sink_timing_tests {
use crate::camera::update_camera_config;