From db83f24dde5f2d8a919223688462a2991b5004cc Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 2 May 2026 03:15:19 -0300 Subject: [PATCH] media: packetize live microphone uplink --- AGENTS.md | 1 + Cargo.lock | 6 +- client/Cargo.toml | 2 +- client/src/input/microphone.rs | 116 ++++++++++++++++-- common/Cargo.toml | 2 +- .../manual/run_upstream_mirrored_av_sync.sh | 1 + server/Cargo.toml | 2 +- server/src/main/relay_service_coverage.rs | 11 +- server/src/main/relay_service_tests.rs | 25 ++-- server/src/main/relay_stream_lifecycle.rs | 14 ++- .../client_microphone_include_contract.rs | 2 + 11 files changed, 141 insertions(+), 41 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 0734338..f6f2d4d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -228,3 +228,4 @@ Context: 0.16.x proved that queue tweaks and static calibration cannot guarantee - 2026-05-02: 0.17.2 mirrored probe and Google Meet test showed major improvement but persistent sub-second late video. Root cause follow-up: the temporary `+350ms` factory MJPEG video playout offset matched the observed browser skew and also made the server skew guard freeze video against its own offset. Patch 0.17.3 restores factory video offset to `0ms`, migrates untouched `+350ms` install/calibration defaults back to `0ms`, and makes the skew guard offset-aware for intentional site calibration. - 2026-05-02: 0.17.3 Google Meet manual test improved to roughly sub-second/near-quarter-second lip sync, but the mirrored analyzer could not pair pulses and the user still heard choppy background audio. Client logs showed Pulse microphone packets arriving unevenly with ages around `90-240ms`; patch 0.17.4 lowers Pulse mic `buffer-time`/`latency-time`, bounds the mic queue/appsink, and keeps mirrored-probe after-run planner diagnostics even when analysis fails. - 2026-05-02: 0.17.4 mirrored run was salvageable after an SCP banner timeout, but analysis still failed with no close pulse pairs. The client log still showed `180-240ms` microphone delivery ages, pointing at server playout sleeps backpressuring the gRPC microphone stream. Patch 0.17.5 drains inbound microphone packets while waiting for scheduled UAC playout and retries browser-capture SCP fetches. +- 2026-05-02: 0.17.5 mirrored run still failed with insufficient paired evidence, and the client log still showed recurring `180-240ms` microphone packet age while camera age stayed near zero. Patch 0.17.6 splits oversized mic samples into `20ms` timestamped packets and keeps a short fresh server-side audio window instead of collapsing every pending burst to one newest chunk, aiming to preserve lip sync without making background audio choppy. diff --git a/Cargo.lock b/Cargo.lock index 72c7020..9abfa9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.5" +version = "0.17.6" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.5" +version = "0.17.6" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.5" +version = "0.17.6" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 2bdb65a..083e239 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.5" +version = "0.17.6" edition = "2024" [dependencies] diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index f03447e..7d148af 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -8,9 +8,10 @@ use shell_escape::unix::escape; #[cfg(not(coverage))] use std::sync::atomic::{AtomicU64, Ordering}; use std::{ + collections::VecDeque, path::{Path as StdPath, PathBuf}, sync::{ - Arc, + Arc, Mutex, atomic::{AtomicBool, Ordering as AtomicOrdering}, }, thread, @@ -25,11 +26,13 @@ const MIC_GAIN_CONTROL_ENV: &str = "LESAVKA_MIC_GAIN_CONTROL"; const MIC_LEVEL_TAP_ENV: &str = "LESAVKA_UPLINK_MIC_LEVEL"; const MIC_PULSE_BUFFER_TIME_ENV: &str = "LESAVKA_MIC_PULSE_BUFFER_TIME_US"; const MIC_PULSE_LATENCY_TIME_ENV: &str = "LESAVKA_MIC_PULSE_LATENCY_TIME_US"; +const MIC_PACKET_TARGET_DURATION_ENV: &str = "LESAVKA_MIC_PACKET_TARGET_US"; const MIC_SAMPLE_RATE: u64 = 48_000; const MIC_CHANNELS: usize = 2; const MIC_SAMPLE_BYTES: usize = std::mem::size_of::(); const DEFAULT_MIC_PULSE_BUFFER_TIME_US: u64 = 40_000; const DEFAULT_MIC_PULSE_LATENCY_TIME_US: u64 = 10_000; +const DEFAULT_MIC_PACKET_TARGET_DURATION_US: u64 = 20_000; const MIC_MAIN_QUEUE_MAX_BUFFERS: u32 = 8; const MIC_MAIN_QUEUE_MAX_TIME_NS: u64 = 80_000_000; const MIC_APPSINK_MAX_BUFFERS: u32 = 8; @@ -40,6 +43,7 @@ pub struct MicrophoneCapture { sink: gst_app::AppSink, level_tap_running: Option>, pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser, + pending_packets: Mutex>, } impl MicrophoneCapture { @@ -119,11 +123,15 @@ impl MicrophoneCapture { sink, level_tap_running, pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(), + pending_packets: Mutex::default(), }) } /// Blocking pull; call from an async wrapper pub fn pull(&self) -> Option { + if let Some(packet) = self.pending_packets.lock().ok()?.pop_front() { + return Some(packet); + } match self.sink.pull_sample() { Ok(sample) => { let buf = sample.buffer().unwrap(); @@ -136,6 +144,10 @@ impl MicrophoneCapture { crate::live_capture_clock::upstream_source_lag_cap(), ); let pts = timing.packet_pts_us; + let target_bytes = mic_packet_target_bytes(); + let mut packets = split_audio_sample(pts, map.as_slice(), target_bytes); + let packet_count = packets.len(); + let first_packet = packets.pop_front(); #[cfg(not(coverage))] { static CNT: AtomicU64 = AtomicU64::new(0); @@ -155,18 +167,26 @@ impl MicrophoneCapture { used_source_pts = timing.used_source_pts, lag_clamped = timing.lag_clamped, bytes = map.len(), + packet_duration_us, + split_packets = packet_count, + target_packet_bytes = target_bytes, "🎤 upstream microphone timing sample" ); } if n < 10 || n.is_multiple_of(300) { - trace!("🎤⇧ cli pkt#{n} {} bytes", map.len()); + trace!( + "🎤⇧ cli sample#{n} {} bytes -> {} packet(s)", + map.len(), + packet_count + ); } } - Some(AudioPacket { - id: 0, - pts, - data: map.as_slice().to_vec(), - }) + if !packets.is_empty() + && let Ok(mut pending) = self.pending_packets.lock() + { + pending.extend(packets); + } + first_packet } Err(_) => None, } @@ -330,6 +350,48 @@ fn pcm_payload_duration_us(bytes: usize) -> u64 { ((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) as u64 } +fn split_audio_sample(base_pts_us: u64, data: &[u8], target_bytes: usize) -> VecDeque { + let frame_bytes = (MIC_CHANNELS * MIC_SAMPLE_BYTES).max(1); + let target_bytes = frame_aligned_packet_bytes(target_bytes.max(frame_bytes)); + let mut packets = VecDeque::new(); + let mut offset = 0usize; + while offset < data.len() { + let remaining = data.len() - offset; + let mut take = remaining.min(target_bytes); + if remaining > take { + take -= take % frame_bytes; + if take == 0 { + take = frame_bytes.min(remaining); + } + } + let end = offset.saturating_add(take).min(data.len()); + if end == offset { + break; + } + packets.push_back(AudioPacket { + id: 0, + pts: base_pts_us.saturating_add(pcm_payload_duration_us(offset)), + data: data[offset..end].to_vec(), + }); + offset = end; + } + packets +} + +fn mic_packet_target_bytes() -> usize { + let frame_bytes = MIC_CHANNELS * MIC_SAMPLE_BYTES; + let target_us = mic_packet_target_duration_us().clamp(1_000, 100_000); + let frames = ((MIC_SAMPLE_RATE as u128 * target_us as u128) / 1_000_000u128) + .max(1) + .min(usize::MAX as u128) as usize; + frame_aligned_packet_bytes(frames.saturating_mul(frame_bytes)) +} + +fn frame_aligned_packet_bytes(bytes: usize) -> usize { + let frame_bytes = (MIC_CHANNELS * MIC_SAMPLE_BYTES).max(1); + ((bytes / frame_bytes).max(1)).saturating_mul(frame_bytes) +} + /// Rejects bogus capture timestamps before they can poison mic PTS rebasing. fn duration_matches_pcm_payload(reported_us: u64, payload_us: u64) -> bool { if reported_us == 0 { @@ -354,6 +416,13 @@ fn mic_pulse_latency_time_us() -> u64 { ) } +fn mic_packet_target_duration_us() -> u64 { + positive_u64_env( + MIC_PACKET_TARGET_DURATION_ENV, + DEFAULT_MIC_PACKET_TARGET_DURATION_US, + ) +} + fn positive_u64_env(name: &str, default_value: u64) -> u64 { std::env::var(name) .ok() @@ -464,7 +533,7 @@ impl Drop for MicrophoneCapture { mod tests { use super::{ MIC_CHANNELS, MIC_SAMPLE_BYTES, MIC_SAMPLE_RATE, buffer_duration_us, - pcm_payload_duration_us, + mic_packet_target_bytes, pcm_payload_duration_us, split_audio_sample, }; use gstreamer as gst; @@ -508,4 +577,35 @@ mod tests { assert_eq!(buffer_duration_us(buffer.as_ref(), bytes), 20_000); } + + #[test] + fn oversized_microphone_samples_split_into_live_sized_packets() { + let bytes_per_frame = MIC_CHANNELS * MIC_SAMPLE_BYTES; + let hundred_ms_bytes = (MIC_SAMPLE_RATE as usize / 10) * bytes_per_frame; + let data = vec![7_u8; hundred_ms_bytes]; + + let packets = split_audio_sample(1_000_000, &data, mic_packet_target_bytes()); + + assert_eq!(packets.len(), 5); + assert!(packets.iter().all(|packet| packet.id == 0)); + assert!(packets.iter().all(|packet| packet.data.len() == 3_840)); + assert_eq!(packets.front().map(|packet| packet.pts), Some(1_000_000)); + assert_eq!(packets.get(1).map(|packet| packet.pts), Some(1_020_000)); + assert_eq!(packets.back().map(|packet| packet.pts), Some(1_080_000)); + } + + #[test] + fn trailing_microphone_packet_keeps_remaining_bytes() { + let bytes_per_frame = MIC_CHANNELS * MIC_SAMPLE_BYTES; + let forty_five_ms_bytes = ((MIC_SAMPLE_RATE as usize * 45) / 1_000) * bytes_per_frame; + let data = vec![9_u8; forty_five_ms_bytes]; + + let packets = split_audio_sample(5_000, &data, mic_packet_target_bytes()); + + assert_eq!(packets.len(), 3); + assert_eq!(packets[0].data.len(), 3_840); + assert_eq!(packets[1].data.len(), 3_840); + assert_eq!(packets[2].data.len(), 960); + assert_eq!(packets[2].pts, 45_000); + } } diff --git a/common/Cargo.toml b/common/Cargo.toml index 61622d2..c709df9 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.5" +version = "0.17.6" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_upstream_mirrored_av_sync.sh b/scripts/manual/run_upstream_mirrored_av_sync.sh index 1129786..a29868c 100755 --- a/scripts/manual/run_upstream_mirrored_av_sync.sh +++ b/scripts/manual/run_upstream_mirrored_av_sync.sh @@ -269,6 +269,7 @@ start_real_lesavka_client() { LESAVKA_SERVER_ADDR="${RESOLVED_LESAVKA_SERVER_ADDR}" \ LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ LESAVKA_MEDIA_CONTROL="${MEDIA_CONTROL}" \ + LESAVKA_UPSTREAM_TIMING_TRACE="${LESAVKA_UPSTREAM_TIMING_TRACE:-1}" \ RUST_LOG="${RUST_LOG:-warn,lesavka_client::app=info,lesavka_client::input::camera=info,lesavka_client::input::microphone=info}" \ "${REPO_ROOT}/target/debug/lesavka-client" --no-launcher --server "${RESOLVED_LESAVKA_SERVER_ADDR}" ) >"${CLIENT_LOG}" 2>&1 & diff --git a/server/Cargo.toml b/server/Cargo.toml index 32b8048..9081848 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.5" +version = "0.17.6" edition = "2024" autobins = false diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index cf51ccf..72181b8 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -21,17 +21,18 @@ fn retain_freshest_video_packet( dropped } +#[cfg(coverage)] +const AUDIO_PENDING_LIVE_WINDOW_PACKETS: usize = 8; + #[cfg(coverage)] fn retain_freshest_audio_packet( pending: &mut std::collections::VecDeque, ) -> usize { - if pending.len() <= 1 { + if pending.len() <= AUDIO_PENDING_LIVE_WINDOW_PACKETS { return 0; } - let newest = pending.pop_back().expect("non-empty pending audio queue"); - let dropped = pending.len(); - pending.clear(); - pending.push_back(newest); + let dropped = pending.len() - AUDIO_PENDING_LIVE_WINDOW_PACKETS; + pending.drain(..dropped); dropped } diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index f18bfa3..28716f4 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -31,27 +31,20 @@ mod tests { } #[test] - fn retain_freshest_audio_packet_keeps_only_the_latest_chunk() { - let mut pending = std::collections::VecDeque::from(vec![ - AudioPacket { - pts: 100, + fn retain_freshest_audio_packet_keeps_a_short_live_window() { + let mut pending = (0..10) + .map(|idx| AudioPacket { + pts: idx * 100, ..Default::default() - }, - AudioPacket { - pts: 200, - ..Default::default() - }, - AudioPacket { - pts: 300, - ..Default::default() - }, - ]); + }) + .collect::>(); let dropped = retain_freshest_audio_packet(&mut pending); assert_eq!(dropped, 2); - assert_eq!(pending.len(), 1); - assert_eq!(pending.front().map(|pkt| pkt.pts), Some(300)); + assert_eq!(pending.len(), 8); + assert_eq!(pending.front().map(|pkt| pkt.pts), Some(200)); + assert_eq!(pending.back().map(|pkt| pkt.pts), Some(900)); } #[test] diff --git a/server/src/main/relay_stream_lifecycle.rs b/server/src/main/relay_stream_lifecycle.rs index 9293ad7..36a5a62 100644 --- a/server/src/main/relay_stream_lifecycle.rs +++ b/server/src/main/relay_stream_lifecycle.rs @@ -23,17 +23,19 @@ fn retain_freshest_video_packet( } #[cfg(not(coverage))] -/// Keeps only the newest microphone packet while startup pairing is healing. +const AUDIO_PENDING_LIVE_WINDOW_PACKETS: usize = 8; + +#[cfg(not(coverage))] +/// Keeps a tiny newest microphone window so playout can stay smooth without +/// draining old audio. fn retain_freshest_audio_packet( pending: &mut std::collections::VecDeque, ) -> usize { - if pending.len() <= 1 { + if pending.len() <= AUDIO_PENDING_LIVE_WINDOW_PACKETS { return 0; } - let newest = pending.pop_back().expect("non-empty pending audio queue"); - let dropped = pending.len(); - pending.clear(); - pending.push_back(newest); + let dropped = pending.len() - AUDIO_PENDING_LIVE_WINDOW_PACKETS; + pending.drain(..dropped); dropped } diff --git a/testing/tests/client_microphone_include_contract.rs b/testing/tests/client_microphone_include_contract.rs index ccf1e8e..49ea161 100644 --- a/testing/tests/client_microphone_include_contract.rs +++ b/testing/tests/client_microphone_include_contract.rs @@ -313,6 +313,7 @@ JSON sink, level_tap_running: Some(std::sync::Arc::clone(&running)), pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(), + pending_packets: Default::default(), }; assert!( cap.pull().is_none(), @@ -436,6 +437,7 @@ JSON sink, level_tap_running: None, pts_rebaser: crate::live_capture_clock::DurationPacedSourcePtsRebaser::default(), + pending_packets: Default::default(), }; let first_pkt = cap.pull().expect("first audio packet"); let second_pkt = cap.pull().expect("second audio packet");