From f477332834eca9200b2bb0c7812c95a3a7ee6b1a Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 27 Apr 2026 13:35:18 -0300 Subject: [PATCH] fix(sync): harden probe truth path --- Cargo.lock | 6 +-- client/Cargo.toml | 2 +- .../src/sync_probe/analyze/media_extract.rs | 29 ++--------- .../src/sync_probe/analyze/onset_detection.rs | 16 ++++++ .../analyze/onset_detection/tests.rs | 15 ++++++ client/src/sync_probe/capture.rs | 35 ++++--------- client/src/sync_probe/capture/runtime.rs | 2 +- client/src/sync_probe/capture/tests.rs | 42 +++++++++++++++ common/Cargo.toml | 2 +- scripts/manual/run_upstream_av_sync.sh | 52 +++++++++++++++++-- server/Cargo.toml | 2 +- 11 files changed, 144 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ee6b5a..86e73ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.14.18" +version = "0.14.19" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.14.18" +version = "0.14.19" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.14.18" +version = "0.14.19" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 0a98ee7..9e2b3e1 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.18" +version = "0.14.19" edition = "2024" [dependencies] diff --git a/client/src/sync_probe/analyze/media_extract.rs b/client/src/sync_probe/analyze/media_extract.rs index b10d3bf..0233a7d 100644 --- a/client/src/sync_probe/analyze/media_extract.rs +++ b/client/src/sync_probe/analyze/media_extract.rs @@ -142,27 +142,8 @@ pub(super) fn run_command(command: &mut Command, description: &str) -> Result u8 { - let crop_start = VIDEO_ANALYSIS_SIDE_PX / 4; - let crop_end = VIDEO_ANALYSIS_SIDE_PX - crop_start; - let mut center_total = 0u64; - let mut center_pixels = 0u64; - let mut border_total = 0u64; - let mut border_pixels = 0u64; - for y in 0..VIDEO_ANALYSIS_SIDE_PX { - for x in 0..VIDEO_ANALYSIS_SIDE_PX { - let value = u64::from(frame[y * VIDEO_ANALYSIS_SIDE_PX + x]); - if (crop_start..crop_end).contains(&x) && (crop_start..crop_end).contains(&y) { - center_total += value; - center_pixels += 1; - } else { - border_total += value; - border_pixels += 1; - } - } - } - let center_mean = center_total / center_pixels.max(1); - let border_mean = border_total / border_pixels.max(1); - center_mean.abs_diff(border_mean).min(u64::from(u8::MAX)) as u8 + let mean = frame.iter().map(|value| u64::from(*value)).sum::() / frame.len().max(1) as u64; + mean.min(u64::from(u8::MAX)) as u8 } #[cfg(test)] @@ -221,7 +202,7 @@ mod tests { &[1, 0], |capture_path| { let parsed = extract_video_brightness(capture_path, 1).expect("video brightness"); - assert_eq!(parsed, vec![15]); + assert_eq!(parsed, vec![16]); }, ); } @@ -245,7 +226,7 @@ mod tests { } #[test] - fn extract_video_brightness_uses_center_weighted_thumbnail_average() { + fn extract_video_brightness_uses_full_frame_thumbnail_average() { let brightness = vec![20u8, 45, 20]; with_fake_media_tools( &frame_json(&[0.0, 0.1, 0.2]), @@ -253,7 +234,7 @@ mod tests { &[1, 0], |capture_path| { let parsed = extract_video_brightness(capture_path, 3).expect("video brightness"); - assert_eq!(parsed, vec![0, 25, 0]); + assert_eq!(parsed, vec![20, 26, 20]); }, ); } diff --git a/client/src/sync_probe/analyze/onset_detection.rs b/client/src/sync_probe/analyze/onset_detection.rs index 0ba0894..ede6d38 100644 --- a/client/src/sync_probe/analyze/onset_detection.rs +++ b/client/src/sync_probe/analyze/onset_detection.rs @@ -11,6 +11,8 @@ pub(super) const DEFAULT_AUDIO_SAMPLE_RATE_HZ: u32 = 48_000; // luma swing into a narrower band, so keep this guard modest and let the // segment logic reject genuinely flat/noisy traces. const MIN_VIDEO_CONTRAST: u8 = 4; +const MAX_VIDEO_ACTIVE_FRAME_FRACTION: f64 = 0.35; +const MAX_VIDEO_FLICKER_SEGMENT_FRAME_MULTIPLIER: f64 = 1.5; #[derive(Clone, Copy, Debug, PartialEq)] pub(crate) struct PulseSegment { @@ -43,6 +45,11 @@ pub(crate) fn detect_video_segments( } let threshold = ((u16::from(min) + u16::from(max)) / 2) as u8; let frame_step_s = median_frame_step_seconds(×tamps_s[..frame_count]).max(1.0 / 120.0); + let active_frames = slice + .iter() + .copied() + .filter(|level| *level >= threshold) + .count(); let mut segments = Vec::new(); let mut previous_active = false; let mut segment_start = 0.0_f64; @@ -83,6 +90,15 @@ pub(crate) fn detect_video_segments( }); } + let active_fraction = active_frames as f64 / frame_count as f64; + let median_segment_duration_s = median(segments.iter().map(|segment| segment.duration_s).collect()); + if active_fraction > MAX_VIDEO_ACTIVE_FRAME_FRACTION + && median_segment_duration_s + <= frame_step_s * MAX_VIDEO_FLICKER_SEGMENT_FRAME_MULTIPLIER + { + bail!("video flash trace looks like frame-to-frame flicker, not sync pulses"); + } + Ok(segments) } diff --git a/client/src/sync_probe/analyze/onset_detection/tests.rs b/client/src/sync_probe/analyze/onset_detection/tests.rs index 50d1e79..cf9fa90 100644 --- a/client/src/sync_probe/analyze/onset_detection/tests.rs +++ b/client/src/sync_probe/analyze/onset_detection/tests.rs @@ -174,6 +174,21 @@ fn detect_video_onsets_rejects_empty_low_contrast_and_missing_edges() { assert!(detect_video_onsets(&[0.0, 0.1, 0.2], &[255, 255, 255]).is_err()); } +#[test] +fn detect_video_onsets_rejects_frame_to_frame_flicker() { + let timestamps = (0..120).map(|index| index as f64 / 30.0).collect::>(); + let brightness = (0..120) + .map(|index| if index % 2 == 0 { 0 } else { 6 }) + .collect::>(); + + let err = detect_video_onsets(×tamps, &brightness).expect_err("flicker should be rejected"); + assert!( + err.to_string() + .contains("frame-to-frame flicker"), + "unexpected error: {err}" + ); +} + #[test] fn detect_audio_onsets_rejects_empty_invalid_and_too_quiet_inputs() { assert!(detect_audio_onsets(&[], 48_000, 5).is_err()); diff --git a/client/src/sync_probe/capture.rs b/client/src/sync_probe/capture.rs index 3a0810c..0177bd2 100644 --- a/client/src/sync_probe/capture.rs +++ b/client/src/sync_probe/capture.rs @@ -66,28 +66,17 @@ const AUDIO_PULSE_AMPLITUDE: f64 = 24_000.0; #[cfg(any(not(coverage), test))] fn build_dark_probe_frame(width: usize, height: usize) -> Vec { - vec![16u8; width.saturating_mul(height).saturating_mul(3)] + vec![16u8; width.saturating_mul(height)] } #[cfg(any(not(coverage), test))] fn build_regular_probe_frame(width: usize, height: usize) -> Vec { - let mut frame = build_dark_probe_frame(width, height); - let x0 = width / 4; - let x1 = width.saturating_sub(x0); - let y0 = height / 4; - let y1 = height.saturating_sub(y0); - fill_rect(&mut frame, width, x0, y0, x1, y1, 255); - frame + vec![240u8; width.saturating_mul(height)] } #[cfg(any(not(coverage), test))] fn build_marker_probe_frame(width: usize, height: usize) -> Vec { - let mut frame = build_dark_probe_frame(width, height); - let x0 = width / 5; - let x1 = width.saturating_sub(x0); - let y0 = height / 5; - let y1 = height.saturating_sub(y0); - fill_rect(&mut frame, width, x0, y0, x1, y1, 255); + let mut frame = build_regular_probe_frame(width, height); let cross_half_w = (width / 48).max(6); let cross_half_h = (height / 48).max(6); @@ -97,19 +86,19 @@ fn build_marker_probe_frame(width: usize, height: usize) -> Vec { &mut frame, width, cx.saturating_sub(cross_half_w), - y0, + 0, (cx + cross_half_w).min(width), - y1, - 255, + height, + 16, ); fill_rect( &mut frame, width, - x0, + 0, cy.saturating_sub(cross_half_h), - x1, + width, (cy + cross_half_h).min(height), - 255, + 16, ); frame } @@ -124,15 +113,13 @@ fn fill_rect( y1: usize, value: u8, ) { - let height = frame.len() / width.saturating_mul(3); + let height = frame.len() / width.max(1); let x1 = x1.min(width); let y1 = y1.min(height); for y in y0.min(height)..y1 { for x in x0.min(width)..x1 { - let offset = (y * width + x) * 3; + let offset = y * width + x; frame[offset] = value; - frame[offset + 1] = value; - frame[offset + 2] = value; } } } diff --git a/client/src/sync_probe/capture/runtime.rs b/client/src/sync_probe/capture/runtime.rs index 8dd4f65..f2adec8 100644 --- a/client/src/sync_probe/capture/runtime.rs +++ b/client/src/sync_probe/capture/runtime.rs @@ -108,7 +108,7 @@ impl Drop for SyncProbeCapture { fn build_pipeline(camera: CameraConfig, _schedule: &PulseSchedule) -> Result { let video_caps = format!( - "video/x-raw,format=RGB,width={},height={},framerate={}/1", + "video/x-raw,format=GRAY8,width={},height={},framerate={}/1", camera.width, camera.height, camera.fps.max(1) diff --git a/client/src/sync_probe/capture/tests.rs b/client/src/sync_probe/capture/tests.rs index 56bd63d..1fda995 100644 --- a/client/src/sync_probe/capture/tests.rs +++ b/client/src/sync_probe/capture/tests.rs @@ -374,3 +374,45 @@ async fn runtime_probe_audio_and_video_pts_advance_near_real_time() { audio_span ); } + +#[cfg(not(coverage))] +#[tokio::test] +async fn runtime_probe_video_packets_change_across_a_pulse_boundary() { + let capture = SyncProbeCapture::new( + stub_camera(), + PulseSchedule::new( + Duration::from_secs(1), + Duration::from_secs(1), + Duration::from_millis(120), + 4, + ), + Duration::from_secs(2), + ) + .expect("runtime capture"); + + let video_queue = capture.video_queue(); + let mut dark_packet = None; + let mut pulse_packet = None; + + loop { + let next = video_queue.pop_fresh().await; + let Some(packet) = next.packet else { + break; + }; + if dark_packet.is_none() && (200_000..800_000).contains(&packet.pts) { + dark_packet = Some(packet.clone()); + } + if pulse_packet.is_none() && (1_000_000..1_120_000).contains(&packet.pts) { + pulse_packet = Some(packet.clone()); + } + if dark_packet.is_some() && pulse_packet.is_some() { + break; + } + } + + let dark_packet = dark_packet.expect("dark packet"); + let pulse_packet = pulse_packet.expect("pulse packet"); + assert_ne!(dark_packet.data, pulse_packet.data); + assert!(!dark_packet.data.is_empty()); + assert!(!pulse_packet.data.is_empty()); +} diff --git a/common/Cargo.toml b/common/Cargo.toml index 89e76e9..1305185 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.18" +version = "0.14.19" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index 4c11bbc..9c4acab 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -122,6 +122,42 @@ resolve_pulse_source() { ' } +gst_video_source_caps() { + case "${video_format}" in + ""|mjpeg|MJPG) + printf 'image/jpeg,width=%s,height=%s,framerate=%s/1' \ + "${resolved_video_size%x*}" \ + "${resolved_video_size#*x}" \ + "${video_fps}" + ;; + yuyv422|YUYV|yuyv) + printf 'video/x-raw,format=YUY2,width=%s,height=%s,framerate=%s/1' \ + "${resolved_video_size%x*}" \ + "${resolved_video_size#*x}" \ + "${video_fps}" + ;; + *) + printf 'unsupported gst video_format=%s\n' "${video_format}" >&2 + exit 64 + ;; + esac +} + +gst_video_decode_chain() { + case "${video_format}" in + ""|mjpeg|MJPG) + printf 'jpegdec ! ' + ;; + yuyv422|YUYV|yuyv) + printf '' + ;; + *) + printf 'unsupported gst video_format=%s\n' "${video_format}" >&2 + exit 64 + ;; + esac +} + resolve_video_size() { local requested=$1 if [[ "${requested}" != "auto" ]]; then @@ -247,6 +283,8 @@ video_args=(-f video4linux2 -framerate "${video_fps}" -video_size "${resolved_vi if [[ -n "${video_format}" ]]; then video_args+=(-input_format "${video_format}") fi +gst_source_caps="$(gst_video_source_caps)" +gst_decode_chain="$(gst_video_decode_chain)" quiesce_for_alsa=0 case "${remote_audio_quiesce_user_audio}" in @@ -333,11 +371,15 @@ elif [[ "${capture_mode}" == "pulse" ]]; then gst) case "${remote_pulse_video_mode}" in copy) + if [[ "${video_format}" != "mjpeg" && "${video_format}" != "MJPG" && -n "${video_format}" ]]; then + printf 'gst copy mode only supports mjpeg input, got %s\n' "${video_format}" >&2 + exit 64 + fi timeout --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device=/dev/video0 do-timestamp=true ! \ - image/jpeg,width="${resolved_video_size%x*}",height="${resolved_video_size#*x}",framerate="${video_fps}"/1 ! \ + ${gst_source_caps} ! \ queue ! mux. \ pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ @@ -348,9 +390,11 @@ elif [[ "${capture_mode}" == "pulse" ]]; then gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device=/dev/video0 do-timestamp=true ! \ - image/jpeg,width="${resolved_video_size%x*}",height="${resolved_video_size#*x}",framerate="${video_fps}"/1 ! \ - jpegdec ! videoconvert ! videorate ! video/x-raw,framerate="${video_fps}"/1 ! \ - jpegenc quality=90 ! image/jpeg,framerate="${video_fps}"/1 ! \ + ${gst_source_caps} ! \ + ${gst_decode_chain} \ + videoconvert ! videorate ! video/x-raw,framerate="${video_fps}"/1 ! \ + x264enc tune=zerolatency speed-preset=ultrafast key-int-max=1 bitrate=5000 ! \ + h264parse ! \ queue ! mux. \ pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ diff --git a/server/Cargo.toml b/server/Cargo.toml index 8f5e610..09b35d6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.18" +version = "0.14.19" edition = "2024" autobins = false