From b687c258e4597d3ddec270533352a8d23d241f92 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 24 Apr 2026 19:37:04 -0300 Subject: [PATCH] fix(sync): instrument theia sink path --- Cargo.lock | 6 +- client/Cargo.toml | 2 +- client/src/sync_probe/capture/runtime.rs | 6 +- client/src/sync_probe/capture/tests.rs | 170 +++++++++++++++++++++++ client/src/sync_probe/runner.rs | 35 +++++ common/Cargo.toml | 2 +- scripts/manual/run_upstream_av_sync.sh | 18 +-- server/Cargo.toml | 2 +- server/src/audio/ear_capture.rs | 2 +- server/src/audio/voice_input.rs | 15 +- 10 files changed, 235 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0d7575..7e27066 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.13.3" +version = "0.13.4" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.13.3" +version = "0.13.4" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.13.3" +version = "0.13.4" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index ae3f116..1053a83 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.13.3" +version = "0.13.4" edition = "2024" [dependencies] diff --git a/client/src/sync_probe/capture/runtime.rs b/client/src/sync_probe/capture/runtime.rs index d0e0b4b..75eb681 100644 --- a/client/src/sync_probe/capture/runtime.rs +++ b/client/src/sync_probe/capture/runtime.rs @@ -130,7 +130,7 @@ fn build_pipeline(camera: CameraConfig, _schedule: &PulseSchedule) -> Result CameraConfig { CameraConfig { @@ -16,6 +19,45 @@ fn stub_camera() -> CameraConfig { } } +fn decode_adts_aac_to_mono_samples(aac_bytes: &[u8]) -> Vec { + let dir = tempdir().expect("tempdir"); + let input = dir.path().join("runtime-probe.aac"); + fs::write(&input, aac_bytes).expect("write runtime AAC"); + + let output = Command::new("ffmpeg") + .arg("-hide_banner") + .arg("-loglevel") + .arg("error") + .arg("-i") + .arg(&input) + .arg("-ac") + .arg("1") + .arg("-ar") + .arg(super::AUDIO_SAMPLE_RATE.to_string()) + .arg("-f") + .arg("s16le") + .arg("-acodec") + .arg("pcm_s16le") + .arg("-") + .output() + .expect("decode runtime AAC with ffmpeg"); + assert!( + output.status.success(), + "ffmpeg decode failed: {}", + String::from_utf8_lossy(&output.stderr) + ); + assert!( + output.stdout.len() >= 2, + "decoded runtime AAC did not yield enough PCM bytes" + ); + + output + .stdout + .chunks_exact(2) + .map(|chunk| i16::from_le_bytes([chunk[0], chunk[1]])) + .collect() +} + #[tokio::test] async fn coverage_stub_exposes_live_video_and_audio_queues() { let capture = SyncProbeCapture::new( @@ -129,3 +171,131 @@ fn probe_video_frames_render_distinct_idle_regular_and_marker_patterns() { ); assert_ne!(regular, marker); } + +#[cfg(not(coverage))] +#[tokio::test] +async fn runtime_audio_probe_emits_nontrivial_aac_packets() { + let capture = SyncProbeCapture::new( + stub_camera(), + PulseSchedule::new( + Duration::from_secs(1), + Duration::from_millis(500), + Duration::from_millis(120), + 4, + ), + Duration::from_secs(3), + ) + .expect("runtime capture"); + + let audio_queue = capture.audio_queue(); + let mut packet_count = 0usize; + let mut total_bytes = 0usize; + let mut largest_packet = 0usize; + + loop { + let next = audio_queue.pop_fresh().await; + let Some(packet) = next.packet else { + break; + }; + packet_count += 1; + total_bytes += packet.data.len(); + largest_packet = largest_packet.max(packet.data.len()); + } + + assert!( + packet_count >= 16, + "expected the runtime probe to emit many AAC packets, got {packet_count}" + ); + assert!( + total_bytes >= 8_000, + "expected the runtime probe to emit a meaningful AAC payload, got {total_bytes} bytes" + ); + assert!( + largest_packet >= 64, + "expected at least one non-trivial AAC packet, largest was {largest_packet} bytes" + ); +} + +#[cfg(not(coverage))] +#[tokio::test] +async fn runtime_audio_probe_decodes_detectable_click_onsets() { + let schedule = PulseSchedule::new( + Duration::from_secs(1), + Duration::from_millis(500), + Duration::from_millis(120), + 4, + ); + let capture = SyncProbeCapture::new(stub_camera(), schedule.clone(), Duration::from_secs(3)) + .expect("runtime capture"); + + let audio_queue = capture.audio_queue(); + let mut aac = Vec::new(); + loop { + let next = audio_queue.pop_fresh().await; + let Some(packet) = next.packet else { + break; + }; + aac.extend_from_slice(&packet.data); + } + + assert!( + aac.len() >= 8_000, + "expected the runtime probe AAC stream to carry a meaningful payload, got {} bytes", + aac.len() + ); + + let decoded = decode_adts_aac_to_mono_samples(&aac); + let onsets = + detect_audio_onsets(&decoded, super::AUDIO_SAMPLE_RATE as u32, 5).expect("audio onsets"); + assert!( + onsets.len() >= 4, + "expected at least four decoded click onsets, got {onsets:?}" + ); + + let expected = [1.0, 1.5, 2.0, 2.5]; + for (actual, expected) in onsets.iter().zip(expected) { + assert!( + (*actual - expected).abs() <= 0.08, + "expected onset near {expected:.3}s, got {actual:.3}s" + ); + } +} + +#[cfg(not(coverage))] +#[tokio::test] +async fn runtime_audio_probe_decodes_detectable_click_onsets_for_manual_harness_timing() { + let schedule = PulseSchedule::new( + Duration::from_secs(4), + Duration::from_secs(1), + Duration::from_millis(120), + 5, + ); + let capture = SyncProbeCapture::new(stub_camera(), schedule.clone(), Duration::from_secs(10)) + .expect("runtime capture"); + + let audio_queue = capture.audio_queue(); + let mut aac = Vec::new(); + loop { + let next = audio_queue.pop_fresh().await; + let Some(packet) = next.packet else { + break; + }; + aac.extend_from_slice(&packet.data); + } + + let decoded = decode_adts_aac_to_mono_samples(&aac); + let onsets = + detect_audio_onsets(&decoded, super::AUDIO_SAMPLE_RATE as u32, 5).expect("audio onsets"); + assert!( + onsets.len() >= 6, + "expected at least six decoded click onsets, got {onsets:?}" + ); + + let expected = [4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; + for (actual, expected) in onsets.iter().zip(expected) { + assert!( + (*actual - expected).abs() <= 0.1, + "expected onset near {expected:.3}s, got {actual:.3}s" + ); + } +} diff --git a/client/src/sync_probe/runner.rs b/client/src/sync_probe/runner.rs index 3f0d0ed..05b426a 100644 --- a/client/src/sync_probe/runner.rs +++ b/client/src/sync_probe/runner.rs @@ -7,6 +7,12 @@ use crate::handshake; use crate::sync_probe::capture::SyncProbeCapture; use crate::sync_probe::config::{ParseOutcome, ProbeConfig, parse_args_outcome_from, usage}; use crate::sync_probe::schedule::PulseSchedule; +#[cfg(not(coverage))] +use std::fs::File; +#[cfg(not(coverage))] +use std::io::Write; +#[cfg(not(coverage))] +use std::path::PathBuf; #[cfg(not(coverage))] use lesavka_common::lesavka::relay_client::RelayClient; @@ -81,15 +87,33 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> { let audio_task = tokio::spawn(async move { let mut client = RelayClient::new(audio_channel); + let mut audio_dump = open_debug_dump("LESAVKA_SYNC_PROBE_AUDIO_DUMP") + .context("opening sync probe audio dump")?; + let mut sent_packets = 0u64; let outbound = async_stream::stream! { loop { let next = audio_queue.pop_fresh().await; if let Some(packet) = next.packet { + sent_packets = sent_packets.saturating_add(1); + if sent_packets <= 5 || sent_packets.is_multiple_of(500) { + tracing::info!( + packet = sent_packets, + pts = packet.pts, + bytes = packet.data.len(), + "๐Ÿงช sync probe microphone packet" + ); + } + if let Some(file) = audio_dump.as_mut() { + let _ = file.write_all(&packet.data); + } yield packet; continue; } break; } + if let Some(file) = audio_dump.as_mut() { + let _ = file.flush(); + } }; let mut response = client .stream_microphone(Request::new(outbound)) @@ -118,6 +142,17 @@ async fn connect(server_addr: &str) -> Result { .with_context(|| format!("connecting to relay at {server_addr}")) } +#[cfg(not(coverage))] +fn open_debug_dump(env_var: &str) -> Result> { + let Some(path) = std::env::var_os(env_var) else { + return Ok(None); + }; + let path = PathBuf::from(path); + let file = File::create(&path) + .with_context(|| format!("creating debug dump at {}", path.display()))?; + Ok(Some(file)) +} + #[cfg(coverage)] async fn run_sync_probe(_config: ProbeConfig) -> Result<()> { Ok(()) diff --git a/common/Cargo.toml b/common/Cargo.toml index ff40dbd..5ce6537 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.13.3" +version = "0.13.4" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index db4146f..3cac9b9 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -71,12 +71,12 @@ alsa_audio_dev="hw:3,0" pulse_source="" if [[ "${remote_audio_source}" == "auto" ]]; then if pulse_source="$(resolve_pulse_source)"; then - audio_mode="pipewire" + audio_mode="pulse" else printf 'PipeWire Lesavka source not found; falling back to hw:3,0\n' >&2 fi elif [[ "${remote_audio_source}" == pulse:* ]]; then - audio_mode="pipewire" + audio_mode="pulse" pulse_source="${remote_audio_source#pulse:}" elif [[ "${remote_audio_source}" == alsa:* ]]; then alsa_audio_dev="${remote_audio_source#alsa:}" @@ -85,21 +85,15 @@ else exit 64 fi -if [[ "${audio_mode}" == "pipewire" ]]; then - printf 'using PipeWire source: %s\n' "${pulse_source}" >&2 - pw-record --target "${pulse_source}" \ - --rate 48000 \ - --channels 2 \ - --format s16 \ - --latency 10ms \ - --raw - | \ +if [[ "${audio_mode}" == "pulse" ]]; then + printf 'using Pulse source: %s\n' "${pulse_source}" >&2 ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ "${video_args[@]}" \ -i /dev/video0 \ -thread_queue_size 1024 \ - -f s16le -ac 2 -ar 48000 \ - -i pipe:0 \ + -f pulse \ + -i "${pulse_source}" \ -t "${capture_seconds}" \ -c:v ffv1 -level 3 -g 1 \ -c:a pcm_s16le \ diff --git a/server/Cargo.toml b/server/Cargo.toml index 2fbf7b6..d2daaae 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.13.3" +version = "0.13.4" edition = "2024" autobins = false diff --git a/server/src/audio/ear_capture.rs b/server/src/audio/ear_capture.rs index ef1368b..6f9bd48 100644 --- a/server/src/audio/ear_capture.rs +++ b/server/src/audio/ear_capture.rs @@ -98,7 +98,7 @@ fn spawn_pipeline_bus_logger(bus: gst::Bus, label: &'static str, playing_message Element(e) => { if let Some(structure) = e.structure() { if structure.name() == "level" { - info!("๐Ÿ”Š source audio level {}", structure); + info!("๐Ÿ”Š {label} audio level {}", structure); } else { debug!("๐Ÿ”Ž audio element message: {}", structure); } diff --git a/server/src/audio/voice_input.rs b/server/src/audio/voice_input.rs index 4a52c4d..e16b5f6 100644 --- a/server/src/audio/voice_input.rs +++ b/server/src/audio/voice_input.rs @@ -175,6 +175,11 @@ impl Voice { .property("caps", &caps) .build() .context("make capsfilter")?; + let level = gst::ElementFactory::make("level") + .property("interval", 1_000_000_000u64) + .property("message", true) + .build() + .context("make voice level probe")?; let alsa_sink = gst::ElementFactory::make("alsasink") .build() .context("make alsasink")?; @@ -212,11 +217,19 @@ impl Voice { &convert, &resample, &capsfilter, + &level, &delay_queue, &alsa_sink, ])?; appsrc.link(&decodebin)?; - gst::Element::link_many([&convert, &resample, &capsfilter, &delay_queue, &alsa_sink])?; + gst::Element::link_many([ + &convert, + &resample, + &capsfilter, + &level, + &delay_queue, + &alsa_sink, + ])?; /*------------ decodebin autolink ----------------*/ let convert_sink = convert