From b190e9431743eab983b9cd87f2171f179232d106 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 27 Apr 2026 04:49:44 -0300 Subject: [PATCH] fix(sync): favor pulse truth and trim onset pairing --- Cargo.lock | 6 +- client/Cargo.toml | 2 +- .../analyze/onset_detection/correlation.rs | 83 ++++++++++++++++--- .../analyze/onset_detection/tests.rs | 21 ++++- common/Cargo.toml | 2 +- scripts/manual/run_upstream_av_sync.sh | 29 +++++-- server/Cargo.toml | 2 +- 7 files changed, 120 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 45a490d..5eac170 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.14.13" +version = "0.14.14" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.14.13" +version = "0.14.14" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.14.13" +version = "0.14.14" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index eaa8e68..9563a61 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.13" +version = "0.14.14" edition = "2024" [dependencies] diff --git a/client/src/sync_probe/analyze/onset_detection/correlation.rs b/client/src/sync_probe/analyze/onset_detection/correlation.rs index 7223c45..18c342b 100644 --- a/client/src/sync_probe/analyze/onset_detection/correlation.rs +++ b/client/src/sync_probe/analyze/onset_detection/correlation.rs @@ -1,9 +1,9 @@ -use anyhow::{Result, bail}; +use anyhow::{bail, Result}; use std::collections::BTreeMap; use crate::sync_probe::analyze::report::SyncAnalysisReport; -use super::{PulseSegment, median}; +use super::{median, PulseSegment}; #[path = "correlation_collapse.rs"] mod collapse; @@ -32,8 +32,10 @@ pub(super) fn correlate_onsets( bail!("pulse period must stay positive"); } - let video_pulses = index_onsets_by_spacing(video_onsets_s, pulse_period_s); - let audio_pulses = index_onsets_by_spacing(audio_onsets_s, pulse_period_s); + let (video_onsets_s, audio_onsets_s, common_window) = + trim_onsets_to_common_activity_window(video_onsets_s, audio_onsets_s, max_pair_gap_s); + let video_pulses = index_onsets_by_spacing(&video_onsets_s, pulse_period_s); + let audio_pulses = index_onsets_by_spacing(&audio_onsets_s, pulse_period_s); let offset_candidates = candidate_index_offsets(&video_pulses, &audio_pulses); let mut skews_ms = best_skews_for_index_offsets( &video_pulses, @@ -43,8 +45,8 @@ pub(super) fn correlate_onsets( ); if skews_ms.is_empty() && video_onsets_s.len() == 1 && audio_onsets_s.len() == 1 { - let video_phase_s = estimate_phase(video_onsets_s, pulse_period_s); - let audio_phase_s = estimate_phase(audio_onsets_s, pulse_period_s); + let video_phase_s = estimate_phase(&video_onsets_s, pulse_period_s); + let audio_phase_s = estimate_phase(&audio_onsets_s, pulse_period_s); let phase_skew_ms = shortest_wrapped_difference(audio_phase_s - video_phase_s, pulse_period_s) * 1000.0; if phase_skew_ms.abs() <= max_pair_gap_s * 1000.0 { @@ -57,8 +59,8 @@ pub(super) fn correlate_onsets( } Ok(sync_report_from_skews( - video_onsets_s, - audio_onsets_s, + common_window.filter_onsets(&video_onsets_s), + common_window.filter_onsets(&audio_onsets_s), skews_ms, )) } @@ -111,8 +113,12 @@ pub(crate) fn correlate_segments( bail!("audio onset list is empty"); } + let (video_onsets_s, audio_onsets_s, common_window) = + trim_onsets_to_common_activity_window(&video_onsets_s, &audio_onsets_s, max_pair_gap_s); let video_marker_onsets = marker_onsets(&video_segments, pulse_width_s); let audio_marker_onsets = marker_onsets(&audio_segments, pulse_width_s); + let video_marker_onsets = common_window.filter_onsets(&video_marker_onsets); + let audio_marker_onsets = common_window.filter_onsets(&audio_marker_onsets); let video_indexed = index_onsets_by_spacing(&video_onsets_s, pulse_period_s); let audio_indexed = index_onsets_by_spacing(&audio_onsets_s, pulse_period_s); let offset_candidates = marker_index_offsets( @@ -143,12 +149,69 @@ pub(crate) fn correlate_segments( } Ok(sync_report_from_skews( - &video_onsets_s, - &audio_onsets_s, + video_onsets_s, + audio_onsets_s, skews_ms, )) } +#[derive(Clone, Copy)] +struct CommonActivityWindow { + start_s: f64, + end_s: f64, +} + +impl CommonActivityWindow { + fn filter_onsets(self, onsets_s: &[f64]) -> &[f64] { + let start = onsets_s.partition_point(|onset_s| *onset_s < self.start_s); + let end = onsets_s.partition_point(|onset_s| *onset_s <= self.end_s); + &onsets_s[start..end] + } +} + +fn trim_onsets_to_common_activity_window<'a>( + video_onsets_s: &'a [f64], + audio_onsets_s: &'a [f64], + max_pair_gap_s: f64, +) -> (&'a [f64], &'a [f64], CommonActivityWindow) { + let common_window = CommonActivityWindow { + start_s: (video_onsets_s + .first() + .copied() + .expect("validated video onset list is not empty") + .max( + audio_onsets_s + .first() + .copied() + .expect("validated audio onset list is not empty"), + ) + - max_pair_gap_s) + .max(0.0), + end_s: video_onsets_s + .last() + .copied() + .expect("validated video onset list is not empty") + .min( + audio_onsets_s + .last() + .copied() + .expect("validated audio onset list is not empty"), + ) + + max_pair_gap_s, + }; + let trimmed_video_onsets_s = common_window.filter_onsets(video_onsets_s); + let trimmed_audio_onsets_s = common_window.filter_onsets(audio_onsets_s); + if trimmed_video_onsets_s.is_empty() || trimmed_audio_onsets_s.is_empty() { + return (video_onsets_s, audio_onsets_s, common_window); + } + + ( + trimmed_video_onsets_s, + trimmed_audio_onsets_s, + common_window, + ) +} + fn segment_phase_tolerance(pulse_period_s: f64, pulse_width_s: f64, max_pair_gap_s: f64) -> f64 { (pulse_width_s * PHASE_TOLERANCE_WIDTH_MULTIPLIER) .max(max_pair_gap_s.min(pulse_period_s / 3.0)) diff --git a/client/src/sync_probe/analyze/onset_detection/tests.rs b/client/src/sync_probe/analyze/onset_detection/tests.rs index f7d2fb6..ac93f64 100644 --- a/client/src/sync_probe/analyze/onset_detection/tests.rs +++ b/client/src/sync_probe/analyze/onset_detection/tests.rs @@ -3,8 +3,8 @@ use super::correlation::{ index_onsets_by_spacing, marker_index_offsets, marker_onsets, shortest_wrapped_difference, }; use super::{ - PulseSegment, correlate_segments, detect_audio_onsets, detect_audio_segments, - detect_video_onsets, detect_video_segments, median, + correlate_segments, detect_audio_onsets, detect_audio_segments, detect_video_onsets, + detect_video_segments, median, PulseSegment, }; use crate::sync_probe::analyze::report::SyncAnalysisReport; use std::collections::BTreeMap; @@ -110,6 +110,23 @@ fn correlate_onsets_single_pulse_uses_phase_fallback() { assert!((report.first_skew_ms - 100.0).abs() < 0.001); } +#[test] +fn correlate_onsets_ignores_leading_video_cadence_before_audio_becomes_active() { + let report = correlate_onsets( + &[0.15, 1.15, 2.15, 3.15, 10.15, 11.15, 12.15], + &[10.20, 11.20, 12.20], + 1.0, + 0.2, + ) + .expect("correlated report"); + + assert_eq!(report.video_event_count, 3); + assert_eq!(report.audio_event_count, 3); + assert_eq!(report.paired_event_count, 3); + assert!((report.median_skew_ms - 50.0).abs() < 0.001); + assert!(report.max_abs_skew_ms < 60.0); +} + #[test] fn detect_video_onsets_rejects_empty_low_contrast_and_missing_edges() { assert!(detect_video_onsets(&[], &[]).is_err()); diff --git a/common/Cargo.toml b/common/Cargo.toml index 69dbc37..6784033 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.13" +version = "0.14.14" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index 1235e6e..0abed76 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -21,7 +21,7 @@ LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-"${REPO_ROOT}/tmp"} VIDEO_SIZE=${VIDEO_SIZE:-auto} VIDEO_FPS=${VIDEO_FPS:-30} VIDEO_FORMAT=${VIDEO_FORMAT:-mjpeg} -REMOTE_CAPTURE_STACK=${REMOTE_CAPTURE_STACK:-auto} +REMOTE_CAPTURE_STACK=${REMOTE_CAPTURE_STACK:-pulse} REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto} REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto} ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-1} @@ -40,6 +40,7 @@ mkdir -p "${LOCAL_OUTPUT_DIR}" STAMP="$(date +%Y%m%d-%H%M%S)" LOCAL_CAPTURE="${LOCAL_OUTPUT_DIR}/lesavka-upstream-av-sync-${STAMP}.mkv" LOCAL_ANALYSIS_JSON="${LOCAL_CAPTURE%.mkv}.json" +LOCAL_CAPTURE_LOG="${LOCAL_CAPTURE%.mkv}.capture.log" if [[ "${LOCAL_AUDIO_SANITY}" != "0" ]]; then echo "==> verifying local speaker-to-mic sanity before upstream sync run" @@ -72,7 +73,9 @@ ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ "${VIDEO_FORMAT}" \ "${REMOTE_CAPTURE_STACK}" \ "${REMOTE_AUDIO_SOURCE}" \ - "${REMOTE_AUDIO_QUIESCE_USER_AUDIO}" <<'REMOTE_CAPTURE_SCRIPT' & + "${REMOTE_AUDIO_QUIESCE_USER_AUDIO}" \ + > >(tee "${LOCAL_CAPTURE_LOG}") \ + 2> >(tee -a "${LOCAL_CAPTURE_LOG}" >&2) <<'REMOTE_CAPTURE_SCRIPT' & set -euo pipefail remote_capture=$1 capture_seconds=$2 @@ -174,13 +177,13 @@ pw_audio_target="" case "${remote_capture_stack}" in auto) - if command -v pw-record >/dev/null 2>&1 \ - && command -v pw-v4l2 >/dev/null 2>&1 \ - && pw_audio_target="$(resolve_pw_audio_target)"; then - capture_mode="pwpipe" - elif [[ "${remote_audio_source}" == "auto" ]]; then + if [[ "${remote_audio_source}" == "auto" ]]; then if pulse_source="$(resolve_pulse_source)"; then capture_mode="pulse" + elif command -v pw-record >/dev/null 2>&1 \ + && command -v pw-v4l2 >/dev/null 2>&1 \ + && pw_audio_target="$(resolve_pw_audio_target)"; then + capture_mode="pwpipe" else printf 'PipeWire Lesavka source not found; falling back to hw:3,0\n' >&2 fi @@ -326,6 +329,11 @@ probe_status=0 capture_status=0 wait "${capture_pid}" || capture_status=$? +capture_v4l2_fault=0 +if [[ -f "${LOCAL_CAPTURE_LOG}" ]] \ + && grep -q 'VIDIOC_QBUF): Bad file descriptor' "${LOCAL_CAPTURE_LOG}"; then + capture_v4l2_fault=1 +fi if ssh ${SSH_OPTS} "${TETHYS_HOST}" "test -f '${REMOTE_CAPTURE}'"; then remote_fetch_capture="${REMOTE_CAPTURE}" @@ -430,6 +438,10 @@ else ) fi +if [[ "${capture_v4l2_fault}" -eq 1 ]]; then + echo "warning: Tethys video capture reported VIDIOC_QBUF / Bad file descriptor; treat unstable skew or analyzer failures as host-capture suspect" >&2 +fi + echo "==> done" if [[ -f "${LOCAL_CAPTURE}" ]]; then echo "capture: ${LOCAL_CAPTURE}" @@ -437,3 +449,6 @@ fi if [[ -f "${LOCAL_ANALYSIS_JSON}" ]]; then echo "analysis_json: ${LOCAL_ANALYSIS_JSON}" fi +if [[ -f "${LOCAL_CAPTURE_LOG}" ]]; then + echo "capture_log: ${LOCAL_CAPTURE_LOG}" +fi diff --git a/server/Cargo.toml b/server/Cargo.toml index 1df0b19..68c71a3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.13" +version = "0.14.14" edition = "2024" autobins = false