From 51f69343182cbc7d3aa09286d674abbda9631524 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 27 Apr 2026 06:52:14 -0300 Subject: [PATCH] fix(sync): anchor pulse pairing to startup phase --- Cargo.lock | 6 +-- client/Cargo.toml | 2 +- .../analyze/onset_detection/correlation.rs | 43 +++++++++++++--- .../analyze/onset_detection/tests.rs | 40 +++++++++++++++ common/Cargo.toml | 2 +- scripts/manual/run_upstream_av_sync.sh | 51 ++++++++++++++----- server/Cargo.toml | 2 +- 7 files changed, 120 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5eac170..e9bb1f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.14.14" +version = "0.14.16" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.14.14" +version = "0.14.16" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.14.14" +version = "0.14.16" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 8c1d181..b0829f7 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.15" +version = "0.14.16" edition = "2024" [dependencies] diff --git a/client/src/sync_probe/analyze/onset_detection/correlation.rs b/client/src/sync_probe/analyze/onset_detection/correlation.rs index 18c342b..f8737cb 100644 --- a/client/src/sync_probe/analyze/onset_detection/correlation.rs +++ b/client/src/sync_probe/analyze/onset_detection/correlation.rs @@ -11,6 +11,7 @@ pub(super) use collapse::collapse_segments_by_phase; const MARKER_WIDTH_MULTIPLIER: f64 = 1.5; const PHASE_TOLERANCE_WIDTH_MULTIPLIER: f64 = 2.5; +const STARTUP_PHASE_ANCHOR_TOLERANCE_FRACTION: f64 = 1.0 / 3.0; #[cfg_attr(not(test), allow(dead_code))] pub(super) fn correlate_onsets( @@ -34,6 +35,7 @@ pub(super) fn correlate_onsets( let (video_onsets_s, audio_onsets_s, common_window) = trim_onsets_to_common_activity_window(video_onsets_s, audio_onsets_s, max_pair_gap_s); + let expected_start_skew_ms = (audio_onsets_s[0] - video_onsets_s[0]) * 1000.0; let video_pulses = index_onsets_by_spacing(&video_onsets_s, pulse_period_s); let audio_pulses = index_onsets_by_spacing(&audio_onsets_s, pulse_period_s); let offset_candidates = candidate_index_offsets(&video_pulses, &audio_pulses); @@ -42,6 +44,7 @@ pub(super) fn correlate_onsets( &audio_pulses, &offset_candidates, max_pair_gap_s, + expected_start_skew_ms, ); if skews_ms.is_empty() && video_onsets_s.len() == 1 && audio_onsets_s.len() == 1 { @@ -115,6 +118,7 @@ pub(crate) fn correlate_segments( let (video_onsets_s, audio_onsets_s, common_window) = trim_onsets_to_common_activity_window(&video_onsets_s, &audio_onsets_s, max_pair_gap_s); + let expected_start_skew_ms = (audio_onsets_s[0] - video_onsets_s[0]) * 1000.0; let video_marker_onsets = marker_onsets(&video_segments, pulse_width_s); let audio_marker_onsets = marker_onsets(&audio_segments, pulse_width_s); let video_marker_onsets = common_window.filter_onsets(&video_marker_onsets); @@ -132,6 +136,7 @@ pub(crate) fn correlate_segments( &audio_indexed, &offset_candidates, max_pair_gap_s, + expected_start_skew_ms, ); if skews_ms.is_empty() && video_onsets_s.len() == 1 && audio_onsets_s.len() == 1 { @@ -318,9 +323,12 @@ fn best_skews_for_index_offsets( audio_indexed: &BTreeMap, offset_candidates: &[i64], max_pair_gap_s: f64, + expected_start_skew_ms: f64, ) -> Vec { let max_pair_gap_ms = max_pair_gap_s * 1000.0; - let mut best: Option<(usize, f64, Vec)> = None; + let startup_phase_anchor_tolerance_ms = + max_pair_gap_ms * STARTUP_PHASE_ANCHOR_TOLERANCE_FRACTION; + let mut best: Option<(bool, usize, f64, f64, Vec)> = None; for offset in offset_candidates.iter().copied() { let skews_ms = video_indexed @@ -336,17 +344,38 @@ fn best_skews_for_index_offsets( continue; } - let score = + let mean_abs_skew_ms = skews_ms.iter().map(|skew_ms| skew_ms.abs()).sum::() / skews_ms.len() as f64; + let startup_phase_anchor_error_ms = (skews_ms[0] - expected_start_skew_ms).abs(); + let startup_phase_anchor_consistent = + startup_phase_anchor_error_ms <= startup_phase_anchor_tolerance_ms; match &best { - Some((best_count, best_score, _)) - if skews_ms.len() < *best_count - || (skews_ms.len() == *best_count && score >= *best_score) => {} - _ => best = Some((skews_ms.len(), score, skews_ms)), + Some(( + best_anchor_consistent, + best_count, + best_anchor_error_ms, + best_mean_abs_skew_ms, + _, + )) if startup_phase_anchor_consistent < *best_anchor_consistent + || (startup_phase_anchor_consistent == *best_anchor_consistent + && (skews_ms.len() < *best_count + || (skews_ms.len() == *best_count + && (startup_phase_anchor_error_ms > *best_anchor_error_ms + || (startup_phase_anchor_error_ms == *best_anchor_error_ms + && mean_abs_skew_ms >= *best_mean_abs_skew_ms))))) => {} + _ => { + best = Some(( + startup_phase_anchor_consistent, + skews_ms.len(), + startup_phase_anchor_error_ms, + mean_abs_skew_ms, + skews_ms, + )) + } } } - best.map(|(_, _, skews)| skews).unwrap_or_default() + best.map(|(_, _, _, _, skews)| skews).unwrap_or_default() } pub(super) fn marker_onsets(segments: &[PulseSegment], pulse_width_s: f64) -> Vec { diff --git a/client/src/sync_probe/analyze/onset_detection/tests.rs b/client/src/sync_probe/analyze/onset_detection/tests.rs index ac93f64..50d1e79 100644 --- a/client/src/sync_probe/analyze/onset_detection/tests.rs +++ b/client/src/sync_probe/analyze/onset_detection/tests.rs @@ -127,6 +127,46 @@ fn correlate_onsets_ignores_leading_video_cadence_before_audio_becomes_active() assert!(report.max_abs_skew_ms < 60.0); } +#[test] +fn correlate_onsets_prefers_the_phase_consistent_basin_over_a_larger_alias_cluster() { + let report = correlate_onsets( + &[ + 9.554, 10.5035, 11.487, 12.4365, 13.691, 14.505, 15.624, 16.438, 17.3535, 18.4385, + 19.5575, 20.4395, 21.4905, 22.508, 23.4235, 24.5425, 25.5605, 26.4425, 27.4935, + 28.5105, + ], + &[ + 10.011041666666667, + 11.011041666666667, + 12.011041666666667, + 13.011041666666667, + 14.011041666666667, + 15.011041666666667, + 16.011041666666667, + 17.011041666666667, + 18.011041666666667, + 19.011041666666667, + 20.011041666666667, + 21.011041666666667, + 22.011041666666667, + 23.011041666666667, + 24.011041666666667, + 25.011041666666667, + 26.011041666666667, + 27.011041666666667, + 28.011041666666667, + 29.011041666666667, + ], + 1.0, + 0.5, + ) + .expect("phase-consistent basin"); + + assert!(report.first_skew_ms > 400.0); + assert!(report.median_skew_ms > 350.0); + assert!(report.paired_event_count >= 6); +} + #[test] fn detect_video_onsets_rejects_empty_low_contrast_and_missing_edges() { assert!(detect_video_onsets(&[], &[]).is_err()); diff --git a/common/Cargo.toml b/common/Cargo.toml index 858cfdf..b09e643 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.15" +version = "0.14.16" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index 13d9bed..b85635e 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -22,6 +22,7 @@ VIDEO_SIZE=${VIDEO_SIZE:-auto} VIDEO_FPS=${VIDEO_FPS:-30} VIDEO_FORMAT=${VIDEO_FORMAT:-mjpeg} REMOTE_CAPTURE_STACK=${REMOTE_CAPTURE_STACK:-pulse} +REMOTE_PULSE_VIDEO_MODE=${REMOTE_PULSE_VIDEO_MODE:-copy} REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto} REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto} ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-0} @@ -72,6 +73,7 @@ ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ "${VIDEO_FPS}" \ "${VIDEO_FORMAT}" \ "${REMOTE_CAPTURE_STACK}" \ + "${REMOTE_PULSE_VIDEO_MODE}" \ "${REMOTE_AUDIO_SOURCE}" \ "${REMOTE_AUDIO_QUIESCE_USER_AUDIO}" \ > >(tee "${LOCAL_CAPTURE_LOG}") \ @@ -83,8 +85,9 @@ video_size=$3 video_fps=$4 video_format=$5 remote_capture_stack=$6 -remote_audio_source=$7 -remote_audio_quiesce_user_audio=$8 +remote_pulse_video_mode=$7 +remote_audio_source=$8 +remote_audio_quiesce_user_audio=$9 rm -f "${remote_capture}" @@ -288,17 +291,39 @@ if [[ "${capture_mode}" == "pwpipe" ]]; then "${remote_capture}" elif [[ "${capture_mode}" == "pulse" ]]; then printf 'using Pulse source: %s\n' "${pulse_source}" >&2 - ffmpeg -hide_banner -loglevel error -y \ - -thread_queue_size 1024 \ - "${video_args[@]}" \ - -i /dev/video0 \ - -thread_queue_size 1024 \ - -f pulse \ - -i "${pulse_source}" \ - -t "${capture_seconds}" \ - -c:v copy \ - -c:a pcm_s16le \ - "${remote_capture}" + case "${remote_pulse_video_mode}" in + copy) + ffmpeg -hide_banner -loglevel error -y \ + -thread_queue_size 1024 \ + "${video_args[@]}" \ + -i /dev/video0 \ + -thread_queue_size 1024 \ + -f pulse \ + -i "${pulse_source}" \ + -t "${capture_seconds}" \ + -c:v copy \ + -c:a pcm_s16le \ + "${remote_capture}" + ;; + cfr) + ffmpeg -hide_banner -loglevel error -y \ + -thread_queue_size 1024 \ + "${video_args[@]}" \ + -i /dev/video0 \ + -thread_queue_size 1024 \ + -f pulse \ + -i "${pulse_source}" \ + -t "${capture_seconds}" \ + -vf "fps=${video_fps}" \ + -c:v libx264 -preset ultrafast -crf 12 -g 1 -pix_fmt yuv420p \ + -c:a pcm_s16le \ + "${remote_capture}" + ;; + *) + printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2 + exit 64 + ;; + esac else ffmpeg -hide_banner -loglevel error -y \ -thread_queue_size 1024 \ diff --git a/server/Cargo.toml b/server/Cargo.toml index 3b31221..8e3d63e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.15" +version = "0.14.16" edition = "2024" autobins = false