media: packetize live microphone uplink
This commit is contained in:
parent
d628c1a634
commit
db83f24dde
@ -228,3 +228,4 @@ Context: 0.16.x proved that queue tweaks and static calibration cannot guarantee
|
|||||||
- 2026-05-02: 0.17.2 mirrored probe and Google Meet test showed major improvement but persistent sub-second late video. Root cause follow-up: the temporary `+350ms` factory MJPEG video playout offset matched the observed browser skew and also made the server skew guard freeze video against its own offset. Patch 0.17.3 restores factory video offset to `0ms`, migrates untouched `+350ms` install/calibration defaults back to `0ms`, and makes the skew guard offset-aware for intentional site calibration.
|
- 2026-05-02: 0.17.2 mirrored probe and Google Meet test showed major improvement but persistent sub-second late video. Root cause follow-up: the temporary `+350ms` factory MJPEG video playout offset matched the observed browser skew and also made the server skew guard freeze video against its own offset. Patch 0.17.3 restores factory video offset to `0ms`, migrates untouched `+350ms` install/calibration defaults back to `0ms`, and makes the skew guard offset-aware for intentional site calibration.
|
||||||
- 2026-05-02: 0.17.3 Google Meet manual test improved to roughly sub-second/near-quarter-second lip sync, but the mirrored analyzer could not pair pulses and the user still heard choppy background audio. Client logs showed Pulse microphone packets arriving unevenly with ages around `90-240ms`; patch 0.17.4 lowers Pulse mic `buffer-time`/`latency-time`, bounds the mic queue/appsink, and keeps mirrored-probe after-run planner diagnostics even when analysis fails.
|
- 2026-05-02: 0.17.3 Google Meet manual test improved to roughly sub-second/near-quarter-second lip sync, but the mirrored analyzer could not pair pulses and the user still heard choppy background audio. Client logs showed Pulse microphone packets arriving unevenly with ages around `90-240ms`; patch 0.17.4 lowers Pulse mic `buffer-time`/`latency-time`, bounds the mic queue/appsink, and keeps mirrored-probe after-run planner diagnostics even when analysis fails.
|
||||||
- 2026-05-02: 0.17.4 mirrored run was salvageable after an SCP banner timeout, but analysis still failed with no close pulse pairs. The client log still showed `180-240ms` microphone delivery ages, pointing at server playout sleeps backpressuring the gRPC microphone stream. Patch 0.17.5 drains inbound microphone packets while waiting for scheduled UAC playout and retries browser-capture SCP fetches.
|
- 2026-05-02: 0.17.4 mirrored run was salvageable after an SCP banner timeout, but analysis still failed with no close pulse pairs. The client log still showed `180-240ms` microphone delivery ages, pointing at server playout sleeps backpressuring the gRPC microphone stream. Patch 0.17.5 drains inbound microphone packets while waiting for scheduled UAC playout and retries browser-capture SCP fetches.
|
||||||
|
- 2026-05-02: 0.17.5 mirrored run still failed with insufficient paired evidence, and the client log still showed recurring `180-240ms` microphone packet age while camera age stayed near zero. Patch 0.17.6 splits oversized mic samples into `20ms` timestamped packets and keeps a short fresh server-side audio window instead of collapsing every pending burst to one newest chunk, aiming to preserve lip sync without making background audio choppy.
|
||||||
|
|||||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lesavka_client"
|
name = "lesavka_client"
|
||||||
version = "0.17.5"
|
version = "0.17.6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"async-stream",
|
"async-stream",
|
||||||
@ -1686,7 +1686,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lesavka_common"
|
name = "lesavka_common"
|
||||||
version = "0.17.5"
|
version = "0.17.6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"base64",
|
"base64",
|
||||||
@ -1698,7 +1698,7 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "lesavka_server"
|
name = "lesavka_server"
|
||||||
version = "0.17.5"
|
version = "0.17.6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"base64",
|
"base64",
|
||||||
|
|||||||
@ -4,7 +4,7 @@ path = "src/main.rs"
|
|||||||
|
|
||||||
[package]
|
[package]
|
||||||
name = "lesavka_client"
|
name = "lesavka_client"
|
||||||
version = "0.17.5"
|
version = "0.17.6"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@ -8,9 +8,10 @@ use shell_escape::unix::escape;
|
|||||||
#[cfg(not(coverage))]
|
#[cfg(not(coverage))]
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::VecDeque,
|
||||||
path::{Path as StdPath, PathBuf},
|
path::{Path as StdPath, PathBuf},
|
||||||
sync::{
|
sync::{
|
||||||
Arc,
|
Arc, Mutex,
|
||||||
atomic::{AtomicBool, Ordering as AtomicOrdering},
|
atomic::{AtomicBool, Ordering as AtomicOrdering},
|
||||||
},
|
},
|
||||||
thread,
|
thread,
|
||||||
@ -25,11 +26,13 @@ const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL";
|
|||||||
const MIC_LEVEL_TAP_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL";
|
const MIC_LEVEL_TAP_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL";
|
||||||
const MIC_PULSE_BUFFER_TIME_ENV: &str = "LESAVKA_MIC_PULSE_BUFFER_TIME_US";
|
const MIC_PULSE_BUFFER_TIME_ENV: &str = "LESAVKA_MIC_PULSE_BUFFER_TIME_US";
|
||||||
const MIC_PULSE_LATENCY_TIME_ENV: &str = "LESAVKA_MIC_PULSE_LATENCY_TIME_US";
|
const MIC_PULSE_LATENCY_TIME_ENV: &str = "LESAVKA_MIC_PULSE_LATENCY_TIME_US";
|
||||||
|
const MIC_PACKET_TARGET_DURATION_ENV: &str = "LESAVKA_MIC_PACKET_TARGET_US";
|
||||||
const MIC_SAMPLE_RATE: u64 = 48_000;
|
const MIC_SAMPLE_RATE: u64 = 48_000;
|
||||||
const MIC_CHANNELS: usize = 2;
|
const MIC_CHANNELS: usize = 2;
|
||||||
const MIC_SAMPLE_BYTES: usize = std::mem::size_of::<i16>();
|
const MIC_SAMPLE_BYTES: usize = std::mem::size_of::<i16>();
|
||||||
const DEFAULT_MIC_PULSE_BUFFER_TIME_US: u64 = 40_000;
|
const DEFAULT_MIC_PULSE_BUFFER_TIME_US: u64 = 40_000;
|
||||||
const DEFAULT_MIC_PULSE_LATENCY_TIME_US: u64 = 10_000;
|
const DEFAULT_MIC_PULSE_LATENCY_TIME_US: u64 = 10_000;
|
||||||
|
const DEFAULT_MIC_PACKET_TARGET_DURATION_US: u64 = 20_000;
|
||||||
const MIC_MAIN_QUEUE_MAX_BUFFERS: u32 = 8;
|
const MIC_MAIN_QUEUE_MAX_BUFFERS: u32 = 8;
|
||||||
const MIC_MAIN_QUEUE_MAX_TIME_NS: u64 = 80_000_000;
|
const MIC_MAIN_QUEUE_MAX_TIME_NS: u64 = 80_000_000;
|
||||||
const MIC_APPSINK_MAX_BUFFERS: u32 = 8;
|
const MIC_APPSINK_MAX_BUFFERS: u32 = 8;
|
||||||
@ -40,6 +43,7 @@ pub struct MicrophoneCapture {
|
|||||||
sink: gst_app::AppSink,
|
sink: gst_app::AppSink,
|
||||||
level_tap_running: Option<Arc<AtomicBool>>,
|
level_tap_running: Option<Arc<AtomicBool>>,
|
||||||
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser,
|
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser,
|
||||||
|
pending_packets: Mutex<VecDeque<AudioPacket>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MicrophoneCapture {
|
impl MicrophoneCapture {
|
||||||
@ -119,11 +123,15 @@ impl MicrophoneCapture {
|
|||||||
sink,
|
sink,
|
||||||
level_tap_running,
|
level_tap_running,
|
||||||
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
|
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
|
||||||
|
pending_packets: Mutex::default(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Blocking pull; call from an async wrapper
|
/// Blocking pull; call from an async wrapper
|
||||||
pub fn pull(&self) -> Option<AudioPacket> {
|
pub fn pull(&self) -> Option<AudioPacket> {
|
||||||
|
if let Some(packet) = self.pending_packets.lock().ok()?.pop_front() {
|
||||||
|
return Some(packet);
|
||||||
|
}
|
||||||
match self.sink.pull_sample() {
|
match self.sink.pull_sample() {
|
||||||
Ok(sample) => {
|
Ok(sample) => {
|
||||||
let buf = sample.buffer().unwrap();
|
let buf = sample.buffer().unwrap();
|
||||||
@ -136,6 +144,10 @@ impl MicrophoneCapture {
|
|||||||
crate::live_capture_clock::upstream_source_lag_cap(),
|
crate::live_capture_clock::upstream_source_lag_cap(),
|
||||||
);
|
);
|
||||||
let pts = timing.packet_pts_us;
|
let pts = timing.packet_pts_us;
|
||||||
|
let target_bytes = mic_packet_target_bytes();
|
||||||
|
let mut packets = split_audio_sample(pts, map.as_slice(), target_bytes);
|
||||||
|
let packet_count = packets.len();
|
||||||
|
let first_packet = packets.pop_front();
|
||||||
#[cfg(not(coverage))]
|
#[cfg(not(coverage))]
|
||||||
{
|
{
|
||||||
static CNT: AtomicU64 = AtomicU64::new(0);
|
static CNT: AtomicU64 = AtomicU64::new(0);
|
||||||
@ -155,18 +167,26 @@ impl MicrophoneCapture {
|
|||||||
used_source_pts = timing.used_source_pts,
|
used_source_pts = timing.used_source_pts,
|
||||||
lag_clamped = timing.lag_clamped,
|
lag_clamped = timing.lag_clamped,
|
||||||
bytes = map.len(),
|
bytes = map.len(),
|
||||||
|
packet_duration_us,
|
||||||
|
split_packets = packet_count,
|
||||||
|
target_packet_bytes = target_bytes,
|
||||||
"🎤 upstream microphone timing sample"
|
"🎤 upstream microphone timing sample"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
if n < 10 || n.is_multiple_of(300) {
|
if n < 10 || n.is_multiple_of(300) {
|
||||||
trace!("🎤⇧ cli pkt#{n} {} bytes", map.len());
|
trace!(
|
||||||
|
"🎤⇧ cli sample#{n} {} bytes -> {} packet(s)",
|
||||||
|
map.len(),
|
||||||
|
packet_count
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(AudioPacket {
|
if !packets.is_empty()
|
||||||
id: 0,
|
&& let Ok(mut pending) = self.pending_packets.lock()
|
||||||
pts,
|
{
|
||||||
data: map.as_slice().to_vec(),
|
pending.extend(packets);
|
||||||
})
|
}
|
||||||
|
first_packet
|
||||||
}
|
}
|
||||||
Err(_) => None,
|
Err(_) => None,
|
||||||
}
|
}
|
||||||
@ -330,6 +350,48 @@ fn pcm_payload_duration_us(bytes: usize) -> u64 {
|
|||||||
((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) as u64
|
((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn split_audio_sample(base_pts_us: u64, data: &[u8], target_bytes: usize) -> VecDeque<AudioPacket> {
|
||||||
|
let frame_bytes = (MIC_CHANNELS * MIC_SAMPLE_BYTES).max(1);
|
||||||
|
let target_bytes = frame_aligned_packet_bytes(target_bytes.max(frame_bytes));
|
||||||
|
let mut packets = VecDeque::new();
|
||||||
|
let mut offset = 0usize;
|
||||||
|
while offset < data.len() {
|
||||||
|
let remaining = data.len() - offset;
|
||||||
|
let mut take = remaining.min(target_bytes);
|
||||||
|
if remaining > take {
|
||||||
|
take -= take % frame_bytes;
|
||||||
|
if take == 0 {
|
||||||
|
take = frame_bytes.min(remaining);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let end = offset.saturating_add(take).min(data.len());
|
||||||
|
if end == offset {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
packets.push_back(AudioPacket {
|
||||||
|
id: 0,
|
||||||
|
pts: base_pts_us.saturating_add(pcm_payload_duration_us(offset)),
|
||||||
|
data: data[offset..end].to_vec(),
|
||||||
|
});
|
||||||
|
offset = end;
|
||||||
|
}
|
||||||
|
packets
|
||||||
|
}
|
||||||
|
|
||||||
|
fn mic_packet_target_bytes() -> usize {
|
||||||
|
let frame_bytes = MIC_CHANNELS * MIC_SAMPLE_BYTES;
|
||||||
|
let target_us = mic_packet_target_duration_us().clamp(1_000, 100_000);
|
||||||
|
let frames = ((MIC_SAMPLE_RATE as u128 * target_us as u128) / 1_000_000u128)
|
||||||
|
.max(1)
|
||||||
|
.min(usize::MAX as u128) as usize;
|
||||||
|
frame_aligned_packet_bytes(frames.saturating_mul(frame_bytes))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn frame_aligned_packet_bytes(bytes: usize) -> usize {
|
||||||
|
let frame_bytes = (MIC_CHANNELS * MIC_SAMPLE_BYTES).max(1);
|
||||||
|
((bytes / frame_bytes).max(1)).saturating_mul(frame_bytes)
|
||||||
|
}
|
||||||
|
|
||||||
/// Rejects bogus capture timestamps before they can poison mic PTS rebasing.
|
/// Rejects bogus capture timestamps before they can poison mic PTS rebasing.
|
||||||
fn duration_matches_pcm_payload(reported_us: u64, payload_us: u64) -> bool {
|
fn duration_matches_pcm_payload(reported_us: u64, payload_us: u64) -> bool {
|
||||||
if reported_us == 0 {
|
if reported_us == 0 {
|
||||||
@ -354,6 +416,13 @@ fn mic_pulse_latency_time_us() -> u64 {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn mic_packet_target_duration_us() -> u64 {
|
||||||
|
positive_u64_env(
|
||||||
|
MIC_PACKET_TARGET_DURATION_ENV,
|
||||||
|
DEFAULT_MIC_PACKET_TARGET_DURATION_US,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
fn positive_u64_env(name: &str, default_value: u64) -> u64 {
|
fn positive_u64_env(name: &str, default_value: u64) -> u64 {
|
||||||
std::env::var(name)
|
std::env::var(name)
|
||||||
.ok()
|
.ok()
|
||||||
@ -464,7 +533,7 @@ impl Drop for MicrophoneCapture {
|
|||||||
mod tests {
|
mod tests {
|
||||||
use super::{
|
use super::{
|
||||||
MIC_CHANNELS, MIC_SAMPLE_BYTES, MIC_SAMPLE_RATE, buffer_duration_us,
|
MIC_CHANNELS, MIC_SAMPLE_BYTES, MIC_SAMPLE_RATE, buffer_duration_us,
|
||||||
pcm_payload_duration_us,
|
mic_packet_target_bytes, pcm_payload_duration_us, split_audio_sample,
|
||||||
};
|
};
|
||||||
use gstreamer as gst;
|
use gstreamer as gst;
|
||||||
|
|
||||||
@ -508,4 +577,35 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(buffer_duration_us(buffer.as_ref(), bytes), 20_000);
|
assert_eq!(buffer_duration_us(buffer.as_ref(), bytes), 20_000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn oversized_microphone_samples_split_into_live_sized_packets() {
|
||||||
|
let bytes_per_frame = MIC_CHANNELS * MIC_SAMPLE_BYTES;
|
||||||
|
let hundred_ms_bytes = (MIC_SAMPLE_RATE as usize / 10) * bytes_per_frame;
|
||||||
|
let data = vec![7_u8; hundred_ms_bytes];
|
||||||
|
|
||||||
|
let packets = split_audio_sample(1_000_000, &data, mic_packet_target_bytes());
|
||||||
|
|
||||||
|
assert_eq!(packets.len(), 5);
|
||||||
|
assert!(packets.iter().all(|packet| packet.id == 0));
|
||||||
|
assert!(packets.iter().all(|packet| packet.data.len() == 3_840));
|
||||||
|
assert_eq!(packets.front().map(|packet| packet.pts), Some(1_000_000));
|
||||||
|
assert_eq!(packets.get(1).map(|packet| packet.pts), Some(1_020_000));
|
||||||
|
assert_eq!(packets.back().map(|packet| packet.pts), Some(1_080_000));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn trailing_microphone_packet_keeps_remaining_bytes() {
|
||||||
|
let bytes_per_frame = MIC_CHANNELS * MIC_SAMPLE_BYTES;
|
||||||
|
let forty_five_ms_bytes = ((MIC_SAMPLE_RATE as usize * 45) / 1_000) * bytes_per_frame;
|
||||||
|
let data = vec![9_u8; forty_five_ms_bytes];
|
||||||
|
|
||||||
|
let packets = split_audio_sample(5_000, &data, mic_packet_target_bytes());
|
||||||
|
|
||||||
|
assert_eq!(packets.len(), 3);
|
||||||
|
assert_eq!(packets[0].data.len(), 3_840);
|
||||||
|
assert_eq!(packets[1].data.len(), 3_840);
|
||||||
|
assert_eq!(packets[2].data.len(), 960);
|
||||||
|
assert_eq!(packets[2].pts, 45_000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lesavka_common"
|
name = "lesavka_common"
|
||||||
version = "0.17.5"
|
version = "0.17.6"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
build = "build.rs"
|
build = "build.rs"
|
||||||
|
|
||||||
|
|||||||
@ -269,6 +269,7 @@ start_real_lesavka_client() {
|
|||||||
LESAVKA_SERVER_ADDR="${RESOLVED_LESAVKA_SERVER_ADDR}" \
|
LESAVKA_SERVER_ADDR="${RESOLVED_LESAVKA_SERVER_ADDR}" \
|
||||||
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
|
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
|
||||||
LESAVKA_MEDIA_CONTROL="${MEDIA_CONTROL}" \
|
LESAVKA_MEDIA_CONTROL="${MEDIA_CONTROL}" \
|
||||||
|
LESAVKA_UPSTREAM_TIMING_TRACE="${LESAVKA_UPSTREAM_TIMING_TRACE:-1}" \
|
||||||
RUST_LOG="${RUST_LOG:-warn,lesavka_client::app=info,lesavka_client::input::camera=info,lesavka_client::input::microphone=info}" \
|
RUST_LOG="${RUST_LOG:-warn,lesavka_client::app=info,lesavka_client::input::camera=info,lesavka_client::input::microphone=info}" \
|
||||||
"${REPO_ROOT}/target/debug/lesavka-client" --no-launcher --server "${RESOLVED_LESAVKA_SERVER_ADDR}"
|
"${REPO_ROOT}/target/debug/lesavka-client" --no-launcher --server "${RESOLVED_LESAVKA_SERVER_ADDR}"
|
||||||
) >"${CLIENT_LOG}" 2>&1 &
|
) >"${CLIENT_LOG}" 2>&1 &
|
||||||
|
|||||||
@ -10,7 +10,7 @@ bench = false
|
|||||||
|
|
||||||
[package]
|
[package]
|
||||||
name = "lesavka_server"
|
name = "lesavka_server"
|
||||||
version = "0.17.5"
|
version = "0.17.6"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
autobins = false
|
autobins = false
|
||||||
|
|
||||||
|
|||||||
@ -21,17 +21,18 @@ fn retain_freshest_video_packet(
|
|||||||
dropped
|
dropped
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(coverage)]
|
||||||
|
const AUDIO_PENDING_LIVE_WINDOW_PACKETS: usize = 8;
|
||||||
|
|
||||||
#[cfg(coverage)]
|
#[cfg(coverage)]
|
||||||
fn retain_freshest_audio_packet(
|
fn retain_freshest_audio_packet(
|
||||||
pending: &mut std::collections::VecDeque<AudioPacket>,
|
pending: &mut std::collections::VecDeque<AudioPacket>,
|
||||||
) -> usize {
|
) -> usize {
|
||||||
if pending.len() <= 1 {
|
if pending.len() <= AUDIO_PENDING_LIVE_WINDOW_PACKETS {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
let newest = pending.pop_back().expect("non-empty pending audio queue");
|
let dropped = pending.len() - AUDIO_PENDING_LIVE_WINDOW_PACKETS;
|
||||||
let dropped = pending.len();
|
pending.drain(..dropped);
|
||||||
pending.clear();
|
|
||||||
pending.push_back(newest);
|
|
||||||
dropped
|
dropped
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -31,27 +31,20 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn retain_freshest_audio_packet_keeps_only_the_latest_chunk() {
|
fn retain_freshest_audio_packet_keeps_a_short_live_window() {
|
||||||
let mut pending = std::collections::VecDeque::from(vec![
|
let mut pending = (0..10)
|
||||||
AudioPacket {
|
.map(|idx| AudioPacket {
|
||||||
pts: 100,
|
pts: idx * 100,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
},
|
})
|
||||||
AudioPacket {
|
.collect::<std::collections::VecDeque<_>>();
|
||||||
pts: 200,
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
AudioPacket {
|
|
||||||
pts: 300,
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
]);
|
|
||||||
|
|
||||||
let dropped = retain_freshest_audio_packet(&mut pending);
|
let dropped = retain_freshest_audio_packet(&mut pending);
|
||||||
|
|
||||||
assert_eq!(dropped, 2);
|
assert_eq!(dropped, 2);
|
||||||
assert_eq!(pending.len(), 1);
|
assert_eq!(pending.len(), 8);
|
||||||
assert_eq!(pending.front().map(|pkt| pkt.pts), Some(300));
|
assert_eq!(pending.front().map(|pkt| pkt.pts), Some(200));
|
||||||
|
assert_eq!(pending.back().map(|pkt| pkt.pts), Some(900));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|||||||
@ -23,17 +23,19 @@ fn retain_freshest_video_packet(
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(coverage))]
|
#[cfg(not(coverage))]
|
||||||
/// Keeps only the newest microphone packet while startup pairing is healing.
|
const AUDIO_PENDING_LIVE_WINDOW_PACKETS: usize = 8;
|
||||||
|
|
||||||
|
#[cfg(not(coverage))]
|
||||||
|
/// Keeps a tiny newest microphone window so playout can stay smooth without
|
||||||
|
/// draining old audio.
|
||||||
fn retain_freshest_audio_packet(
|
fn retain_freshest_audio_packet(
|
||||||
pending: &mut std::collections::VecDeque<AudioPacket>,
|
pending: &mut std::collections::VecDeque<AudioPacket>,
|
||||||
) -> usize {
|
) -> usize {
|
||||||
if pending.len() <= 1 {
|
if pending.len() <= AUDIO_PENDING_LIVE_WINDOW_PACKETS {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
let newest = pending.pop_back().expect("non-empty pending audio queue");
|
let dropped = pending.len() - AUDIO_PENDING_LIVE_WINDOW_PACKETS;
|
||||||
let dropped = pending.len();
|
pending.drain(..dropped);
|
||||||
pending.clear();
|
|
||||||
pending.push_back(newest);
|
|
||||||
dropped
|
dropped
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -313,6 +313,7 @@ JSON
|
|||||||
sink,
|
sink,
|
||||||
level_tap_running: Some(std::sync::Arc::clone(&running)),
|
level_tap_running: Some(std::sync::Arc::clone(&running)),
|
||||||
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
|
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
|
||||||
|
pending_packets: Default::default(),
|
||||||
};
|
};
|
||||||
assert!(
|
assert!(
|
||||||
cap.pull().is_none(),
|
cap.pull().is_none(),
|
||||||
@ -436,6 +437,7 @@ JSON
|
|||||||
sink,
|
sink,
|
||||||
level_tap_running: None,
|
level_tap_running: None,
|
||||||
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
|
pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(),
|
||||||
|
pending_packets: Default::default(),
|
||||||
};
|
};
|
||||||
let first_pkt = cap.pull().expect("first audio packet");
|
let first_pkt = cap.pull().expect("first audio packet");
|
||||||
let second_pkt = cap.pull().expect("second audio packet");
|
let second_pkt = cap.pull().expect("second audio packet");
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user