fix(sync): instrument theia sink path

This commit is contained in:
Brad Stein 2026-04-24 19:37:04 -03:00
parent e06f14d27e
commit b687c258e4
10 changed files with 235 additions and 23 deletions

6
Cargo.lock generated
View File

@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.13.3" version = "0.13.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1676,7 +1676,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.13.3" version = "0.13.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1688,7 +1688,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.13.3" version = "0.13.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.13.3" version = "0.13.4"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -130,7 +130,7 @@ fn build_pipeline(camera: CameraConfig, _schedule: &PulseSchedule) -> Result<gst
queue max-size-buffers=8 leaky=downstream ! \ queue max-size-buffers=8 leaky=downstream ! \
audioconvert ! audioresample ! audio/x-raw,channels=2,rate={} ! \ audioconvert ! audioresample ! audio/x-raw,channels=2,rate={} ! \
{} ! aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate={},channels=2 ! \ {} ! aacparse ! capsfilter caps=audio/mpeg,stream-format=adts,rate={},channels=2 ! \
appsink name=sync_probe_audio_sink emit-signals=false sync=false max-buffers=32 drop=true", appsink name=sync_probe_audio_sink emit-signals=false sync=false max-buffers=256 drop=false",
AUDIO_CHANNELS, AUDIO_CHANNELS,
AUDIO_SAMPLE_RATE, AUDIO_SAMPLE_RATE,
AUDIO_SAMPLE_RATE, AUDIO_SAMPLE_RATE,
@ -290,12 +290,12 @@ fn spawn_audio_thread(
break; break;
} }
drain_audio_samples(&sink, &queue, duration, gst::ClockTime::ZERO); drain_audio_samples(&sink, &queue, duration, gst::ClockTime::from_mseconds(25));
chunk_index = chunk_index.saturating_add(1); chunk_index = chunk_index.saturating_add(1);
} }
let _ = src.end_of_stream(); let _ = src.end_of_stream();
drain_audio_samples(&sink, &queue, duration, gst::ClockTime::from_mseconds(100)); drain_audio_samples(&sink, &queue, duration, gst::ClockTime::from_mseconds(500));
queue.close(); queue.close();
}) })
} }

View File

@ -5,7 +5,10 @@ use crate::input::camera::{CameraCodec, CameraConfig};
use crate::sync_probe::analyze::detect_audio_onsets; use crate::sync_probe::analyze::detect_audio_onsets;
use crate::sync_probe::schedule::PulseSchedule; use crate::sync_probe::schedule::PulseSchedule;
use lesavka_common::lesavka::{AudioPacket, VideoPacket}; use lesavka_common::lesavka::{AudioPacket, VideoPacket};
use std::fs;
use std::process::Command;
use std::time::Duration; use std::time::Duration;
use tempfile::tempdir;
fn stub_camera() -> CameraConfig { fn stub_camera() -> CameraConfig {
CameraConfig { CameraConfig {
@ -16,6 +19,45 @@ fn stub_camera() -> CameraConfig {
} }
} }
fn decode_adts_aac_to_mono_samples(aac_bytes: &[u8]) -> Vec<i16> {
let dir = tempdir().expect("tempdir");
let input = dir.path().join("runtime-probe.aac");
fs::write(&input, aac_bytes).expect("write runtime AAC");
let output = Command::new("ffmpeg")
.arg("-hide_banner")
.arg("-loglevel")
.arg("error")
.arg("-i")
.arg(&input)
.arg("-ac")
.arg("1")
.arg("-ar")
.arg(super::AUDIO_SAMPLE_RATE.to_string())
.arg("-f")
.arg("s16le")
.arg("-acodec")
.arg("pcm_s16le")
.arg("-")
.output()
.expect("decode runtime AAC with ffmpeg");
assert!(
output.status.success(),
"ffmpeg decode failed: {}",
String::from_utf8_lossy(&output.stderr)
);
assert!(
output.stdout.len() >= 2,
"decoded runtime AAC did not yield enough PCM bytes"
);
output
.stdout
.chunks_exact(2)
.map(|chunk| i16::from_le_bytes([chunk[0], chunk[1]]))
.collect()
}
#[tokio::test] #[tokio::test]
async fn coverage_stub_exposes_live_video_and_audio_queues() { async fn coverage_stub_exposes_live_video_and_audio_queues() {
let capture = SyncProbeCapture::new( let capture = SyncProbeCapture::new(
@ -129,3 +171,131 @@ fn probe_video_frames_render_distinct_idle_regular_and_marker_patterns() {
); );
assert_ne!(regular, marker); assert_ne!(regular, marker);
} }
#[cfg(not(coverage))]
#[tokio::test]
async fn runtime_audio_probe_emits_nontrivial_aac_packets() {
let capture = SyncProbeCapture::new(
stub_camera(),
PulseSchedule::new(
Duration::from_secs(1),
Duration::from_millis(500),
Duration::from_millis(120),
4,
),
Duration::from_secs(3),
)
.expect("runtime capture");
let audio_queue = capture.audio_queue();
let mut packet_count = 0usize;
let mut total_bytes = 0usize;
let mut largest_packet = 0usize;
loop {
let next = audio_queue.pop_fresh().await;
let Some(packet) = next.packet else {
break;
};
packet_count += 1;
total_bytes += packet.data.len();
largest_packet = largest_packet.max(packet.data.len());
}
assert!(
packet_count >= 16,
"expected the runtime probe to emit many AAC packets, got {packet_count}"
);
assert!(
total_bytes >= 8_000,
"expected the runtime probe to emit a meaningful AAC payload, got {total_bytes} bytes"
);
assert!(
largest_packet >= 64,
"expected at least one non-trivial AAC packet, largest was {largest_packet} bytes"
);
}
#[cfg(not(coverage))]
#[tokio::test]
async fn runtime_audio_probe_decodes_detectable_click_onsets() {
let schedule = PulseSchedule::new(
Duration::from_secs(1),
Duration::from_millis(500),
Duration::from_millis(120),
4,
);
let capture = SyncProbeCapture::new(stub_camera(), schedule.clone(), Duration::from_secs(3))
.expect("runtime capture");
let audio_queue = capture.audio_queue();
let mut aac = Vec::new();
loop {
let next = audio_queue.pop_fresh().await;
let Some(packet) = next.packet else {
break;
};
aac.extend_from_slice(&packet.data);
}
assert!(
aac.len() >= 8_000,
"expected the runtime probe AAC stream to carry a meaningful payload, got {} bytes",
aac.len()
);
let decoded = decode_adts_aac_to_mono_samples(&aac);
let onsets =
detect_audio_onsets(&decoded, super::AUDIO_SAMPLE_RATE as u32, 5).expect("audio onsets");
assert!(
onsets.len() >= 4,
"expected at least four decoded click onsets, got {onsets:?}"
);
let expected = [1.0, 1.5, 2.0, 2.5];
for (actual, expected) in onsets.iter().zip(expected) {
assert!(
(*actual - expected).abs() <= 0.08,
"expected onset near {expected:.3}s, got {actual:.3}s"
);
}
}
#[cfg(not(coverage))]
#[tokio::test]
async fn runtime_audio_probe_decodes_detectable_click_onsets_for_manual_harness_timing() {
let schedule = PulseSchedule::new(
Duration::from_secs(4),
Duration::from_secs(1),
Duration::from_millis(120),
5,
);
let capture = SyncProbeCapture::new(stub_camera(), schedule.clone(), Duration::from_secs(10))
.expect("runtime capture");
let audio_queue = capture.audio_queue();
let mut aac = Vec::new();
loop {
let next = audio_queue.pop_fresh().await;
let Some(packet) = next.packet else {
break;
};
aac.extend_from_slice(&packet.data);
}
let decoded = decode_adts_aac_to_mono_samples(&aac);
let onsets =
detect_audio_onsets(&decoded, super::AUDIO_SAMPLE_RATE as u32, 5).expect("audio onsets");
assert!(
onsets.len() >= 6,
"expected at least six decoded click onsets, got {onsets:?}"
);
let expected = [4.0, 5.0, 6.0, 7.0, 8.0, 9.0];
for (actual, expected) in onsets.iter().zip(expected) {
assert!(
(*actual - expected).abs() <= 0.1,
"expected onset near {expected:.3}s, got {actual:.3}s"
);
}
}

View File

@ -7,6 +7,12 @@ use crate::handshake;
use crate::sync_probe::capture::SyncProbeCapture; use crate::sync_probe::capture::SyncProbeCapture;
use crate::sync_probe::config::{ParseOutcome, ProbeConfig, parse_args_outcome_from, usage}; use crate::sync_probe::config::{ParseOutcome, ProbeConfig, parse_args_outcome_from, usage};
use crate::sync_probe::schedule::PulseSchedule; use crate::sync_probe::schedule::PulseSchedule;
#[cfg(not(coverage))]
use std::fs::File;
#[cfg(not(coverage))]
use std::io::Write;
#[cfg(not(coverage))]
use std::path::PathBuf;
#[cfg(not(coverage))] #[cfg(not(coverage))]
use lesavka_common::lesavka::relay_client::RelayClient; use lesavka_common::lesavka::relay_client::RelayClient;
@ -81,15 +87,33 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> {
let audio_task = tokio::spawn(async move { let audio_task = tokio::spawn(async move {
let mut client = RelayClient::new(audio_channel); let mut client = RelayClient::new(audio_channel);
let mut audio_dump = open_debug_dump("LESAVKA_SYNC_PROBE_AUDIO_DUMP")
.context("opening sync probe audio dump")?;
let mut sent_packets = 0u64;
let outbound = async_stream::stream! { let outbound = async_stream::stream! {
loop { loop {
let next = audio_queue.pop_fresh().await; let next = audio_queue.pop_fresh().await;
if let Some(packet) = next.packet { if let Some(packet) = next.packet {
sent_packets = sent_packets.saturating_add(1);
if sent_packets <= 5 || sent_packets.is_multiple_of(500) {
tracing::info!(
packet = sent_packets,
pts = packet.pts,
bytes = packet.data.len(),
"🧪 sync probe microphone packet"
);
}
if let Some(file) = audio_dump.as_mut() {
let _ = file.write_all(&packet.data);
}
yield packet; yield packet;
continue; continue;
} }
break; break;
} }
if let Some(file) = audio_dump.as_mut() {
let _ = file.flush();
}
}; };
let mut response = client let mut response = client
.stream_microphone(Request::new(outbound)) .stream_microphone(Request::new(outbound))
@ -118,6 +142,17 @@ async fn connect(server_addr: &str) -> Result<Channel> {
.with_context(|| format!("connecting to relay at {server_addr}")) .with_context(|| format!("connecting to relay at {server_addr}"))
} }
#[cfg(not(coverage))]
fn open_debug_dump(env_var: &str) -> Result<Option<File>> {
let Some(path) = std::env::var_os(env_var) else {
return Ok(None);
};
let path = PathBuf::from(path);
let file = File::create(&path)
.with_context(|| format!("creating debug dump at {}", path.display()))?;
Ok(Some(file))
}
#[cfg(coverage)] #[cfg(coverage)]
async fn run_sync_probe(_config: ProbeConfig) -> Result<()> { async fn run_sync_probe(_config: ProbeConfig) -> Result<()> {
Ok(()) Ok(())

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.13.3" version = "0.13.4"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -71,12 +71,12 @@ alsa_audio_dev="hw:3,0"
pulse_source="" pulse_source=""
if [[ "${remote_audio_source}" == "auto" ]]; then if [[ "${remote_audio_source}" == "auto" ]]; then
if pulse_source="$(resolve_pulse_source)"; then if pulse_source="$(resolve_pulse_source)"; then
audio_mode="pipewire" audio_mode="pulse"
else else
printf 'PipeWire Lesavka source not found; falling back to hw:3,0\n' >&2 printf 'PipeWire Lesavka source not found; falling back to hw:3,0\n' >&2
fi fi
elif [[ "${remote_audio_source}" == pulse:* ]]; then elif [[ "${remote_audio_source}" == pulse:* ]]; then
audio_mode="pipewire" audio_mode="pulse"
pulse_source="${remote_audio_source#pulse:}" pulse_source="${remote_audio_source#pulse:}"
elif [[ "${remote_audio_source}" == alsa:* ]]; then elif [[ "${remote_audio_source}" == alsa:* ]]; then
alsa_audio_dev="${remote_audio_source#alsa:}" alsa_audio_dev="${remote_audio_source#alsa:}"
@ -85,21 +85,15 @@ else
exit 64 exit 64
fi fi
if [[ "${audio_mode}" == "pipewire" ]]; then if [[ "${audio_mode}" == "pulse" ]]; then
printf 'using PipeWire source: %s\n' "${pulse_source}" >&2 printf 'using Pulse source: %s\n' "${pulse_source}" >&2
pw-record --target "${pulse_source}" \
--rate 48000 \
--channels 2 \
--format s16 \
--latency 10ms \
--raw - | \
ffmpeg -hide_banner -loglevel error -y \ ffmpeg -hide_banner -loglevel error -y \
-thread_queue_size 1024 \ -thread_queue_size 1024 \
"${video_args[@]}" \ "${video_args[@]}" \
-i /dev/video0 \ -i /dev/video0 \
-thread_queue_size 1024 \ -thread_queue_size 1024 \
-f s16le -ac 2 -ar 48000 \ -f pulse \
-i pipe:0 \ -i "${pulse_source}" \
-t "${capture_seconds}" \ -t "${capture_seconds}" \
-c:v ffv1 -level 3 -g 1 \ -c:v ffv1 -level 3 -g 1 \
-c:a pcm_s16le \ -c:a pcm_s16le \

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.13.3" version = "0.13.4"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -98,7 +98,7 @@ fn spawn_pipeline_bus_logger(bus: gst::Bus, label: &'static str, playing_message
Element(e) => { Element(e) => {
if let Some(structure) = e.structure() { if let Some(structure) = e.structure() {
if structure.name() == "level" { if structure.name() == "level" {
info!("🔊 source audio level {}", structure); info!("🔊 {label} audio level {}", structure);
} else { } else {
debug!("🔎 audio element message: {}", structure); debug!("🔎 audio element message: {}", structure);
} }

View File

@ -175,6 +175,11 @@ impl Voice {
.property("caps", &caps) .property("caps", &caps)
.build() .build()
.context("make capsfilter")?; .context("make capsfilter")?;
let level = gst::ElementFactory::make("level")
.property("interval", 1_000_000_000u64)
.property("message", true)
.build()
.context("make voice level probe")?;
let alsa_sink = gst::ElementFactory::make("alsasink") let alsa_sink = gst::ElementFactory::make("alsasink")
.build() .build()
.context("make alsasink")?; .context("make alsasink")?;
@ -212,11 +217,19 @@ impl Voice {
&convert, &convert,
&resample, &resample,
&capsfilter, &capsfilter,
&level,
&delay_queue, &delay_queue,
&alsa_sink, &alsa_sink,
])?; ])?;
appsrc.link(&decodebin)?; appsrc.link(&decodebin)?;
gst::Element::link_many([&convert, &resample, &capsfilter, &delay_queue, &alsa_sink])?; gst::Element::link_many([
&convert,
&resample,
&capsfilter,
&level,
&delay_queue,
&alsa_sink,
])?;
/*------------ decodebin autolink ----------------*/ /*------------ decodebin autolink ----------------*/
let convert_sink = convert let convert_sink = convert