diff --git a/Cargo.lock b/Cargo.lock index 4082216..a2fcff3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.19.20" +version = "0.19.21" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.19.20" +version = "0.19.21" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.19.20" +version = "0.19.21" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 2ebcf84..67e5994 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.19.20" +version = "0.19.21" edition = "2024" [dependencies] diff --git a/client/src/bin/lesavka-sync-analyze.rs b/client/src/bin/lesavka-sync-analyze.rs index 9a70763..1e3eb00 100644 --- a/client/src/bin/lesavka-sync-analyze.rs +++ b/client/src/bin/lesavka-sync-analyze.rs @@ -2,6 +2,8 @@ use anyhow::{Context, Result, bail}; #[cfg(not(coverage))] use serde::Serialize; +#[cfg(not(coverage))] +use std::collections::BTreeSet; #[cfg(any(not(coverage), test))] use std::path::PathBuf; @@ -16,10 +18,25 @@ use lesavka_client::sync_probe::analyze::{ struct SyncAnalyzeOutput<'a> { #[serde(flatten)] report: &'a SyncAnalysisReport, + #[serde(skip_serializing_if = "Option::is_none")] + signature_coverage: Option, calibration: SyncCalibrationRecommendation, verdict: SyncAnalysisVerdict, } +#[cfg(not(coverage))] +#[derive(Serialize)] +struct SignatureCoverage { + expected_event_count: usize, + expected_codes: Vec, + paired_event_count: usize, + paired_server_event_ids: Vec, + paired_codes: Vec, + missing_event_ids: Vec, + missing_codes: Vec, + unknown_pair_identity_count: usize, +} + #[cfg(not(coverage))] fn main() -> Result<()> { let args = parse_args(std::env::args().skip(1))?; @@ -27,9 +44,17 @@ fn main() -> Result<()> { .with_context(|| format!("analyzing sync capture {}", args.capture_path.display()))?; let calibration = report.calibration_recommendation(); let verdict = report.verdict(); - let human_report = format_human_report(&args.capture_path, &report, &calibration, &verdict); + let signature_coverage = signature_coverage(&args.options.event_width_codes, &report); + let human_report = format_human_report( + &args.capture_path, + &report, + signature_coverage.as_ref(), + &calibration, + &verdict, + ); let output = SyncAnalyzeOutput { report: &report, + signature_coverage, calibration, verdict, }; @@ -226,6 +251,7 @@ fn parse_analysis_seconds(raw: &str, label: &str) -> Result { fn format_human_report( capture_path: &std::path::Path, report: &SyncAnalysisReport, + signature_coverage: Option<&SignatureCoverage>, calibration: &SyncCalibrationRecommendation, verdict: &SyncAnalysisVerdict, ) -> String { @@ -246,6 +272,7 @@ fn format_human_report( } else { "reported only; ignored for verdict/calibration because it disagrees with paired pulses" }; + let signature_coverage = format_signature_coverage(signature_coverage); format!( "\ A/V sync report for {capture} @@ -264,6 +291,7 @@ A/V sync report for {capture} - paired window first video/audio: {paired_video:.3} s / {paired_audio:.3} s - unpaired video onsets: {unpaired_video} - unpaired audio onsets: {unpaired_audio} +{signature_coverage}\ - first skew: {first_skew:+.1} ms (audio after video is positive) - last skew: {last_skew:+.1} ms - mean skew: {mean_skew:+.1} ms @@ -297,6 +325,7 @@ A/V sync report for {capture} paired_audio = first_paired_audio, unpaired_video = unpaired_video, unpaired_audio = unpaired_audio, + signature_coverage = signature_coverage, first_skew = report.first_skew_ms, last_skew = report.last_skew_ms, mean_skew = report.mean_skew_ms, @@ -310,6 +339,69 @@ A/V sync report for {capture} ) } +#[cfg(not(coverage))] +fn signature_coverage( + expected_codes: &[u32], + report: &SyncAnalysisReport, +) -> Option { + if expected_codes.is_empty() { + return None; + } + let paired_server_event_ids = report + .paired_events + .iter() + .filter_map(|event| event.server_event_id) + .collect::>() + .into_iter() + .collect::>(); + let paired_codes = paired_server_event_ids + .iter() + .filter_map(|index| expected_codes.get(*index).copied()) + .collect::>(); + let missing_event_ids = expected_codes + .iter() + .enumerate() + .filter_map(|(index, _)| (!paired_server_event_ids.contains(&index)).then_some(index)) + .collect::>(); + let missing_codes = missing_event_ids + .iter() + .filter_map(|index| expected_codes.get(*index).copied()) + .collect::>(); + let unknown_pair_identity_count = report + .paired_events + .iter() + .filter(|event| event.server_event_id.is_none() || event.event_code.is_none()) + .count(); + Some(SignatureCoverage { + expected_event_count: expected_codes.len(), + expected_codes: expected_codes.to_vec(), + paired_event_count: paired_server_event_ids.len(), + paired_server_event_ids, + paired_codes, + missing_event_ids, + missing_codes, + unknown_pair_identity_count, + }) +} + +#[cfg(not(coverage))] +fn format_signature_coverage(coverage: Option<&SignatureCoverage>) -> String { + let Some(coverage) = coverage else { + return String::new(); + }; + let missing_ids = format_usize_list(&coverage.missing_event_ids); + let missing_codes = format_u32_list(&coverage.missing_codes); + format!( + "- expected coded signatures: {}\n- paired coded signatures: {}/{}\n- missing paired signature ids: {}\n- missing paired signature codes: {}\n- paired signatures without identity: {}\n", + coverage.expected_event_count, + coverage.paired_event_count, + coverage.expected_event_count, + missing_ids, + missing_codes, + coverage.unknown_pair_identity_count + ) +} + #[cfg(not(coverage))] fn unpaired_video_onsets(report: &SyncAnalysisReport) -> Vec { unpaired_onsets( @@ -365,6 +457,30 @@ fn format_onset_list(onsets: &[f64]) -> String { formatted.join(", ") } +#[cfg(not(coverage))] +fn format_usize_list(values: &[usize]) -> String { + if values.is_empty() { + return "none".to_string(); + } + values + .iter() + .map(|value| value.to_string()) + .collect::>() + .join(", ") +} + +#[cfg(not(coverage))] +fn format_u32_list(values: &[u32]) -> String { + if values.is_empty() { + return "none".to_string(); + } + values + .iter() + .map(|value| value.to_string()) + .collect::>() + .join(", ") +} + #[cfg(not(coverage))] fn write_report_dir( report_dir: &std::path::Path, @@ -386,16 +502,34 @@ fn write_report_dir( #[cfg(not(coverage))] fn write_events_csv(path: &std::path::Path, report: &SyncAnalysisReport) -> Result<()> { - let mut csv = String::from("event_id,video_time_s,audio_time_s,skew_ms,confidence\n"); + let mut csv = String::from( + "event_id,server_event_id,event_code,video_time_s,audio_time_s,skew_ms,confidence\n", + ); for event in &report.paired_events { csv.push_str(&format!( - "{},{:.9},{:.9},{:.6},{:.6}\n", - event.event_id, event.video_time_s, event.audio_time_s, event.skew_ms, event.confidence + "{},{},{},{:.9},{:.9},{:.6},{:.6}\n", + event.event_id, + optional_usize(event.server_event_id), + optional_u32(event.event_code), + event.video_time_s, + event.audio_time_s, + event.skew_ms, + event.confidence )); } std::fs::write(path, csv).with_context(|| format!("writing {}", path.display())) } +#[cfg(not(coverage))] +fn optional_usize(value: Option) -> String { + value.map(|value| value.to_string()).unwrap_or_default() +} + +#[cfg(not(coverage))] +fn optional_u32(value: Option) -> String { + value.map(|value| value.to_string()).unwrap_or_default() +} + #[cfg(coverage)] fn main() {} @@ -510,6 +644,7 @@ mod tests { let text = super::format_human_report( std::path::Path::new("/tmp/capture.webm"), &report, + None, &calibration, &verdict, ); @@ -520,4 +655,77 @@ mod tests { assert!(text.contains("- unpaired video onsets: 9.461s, 13.367s")); assert!(text.contains("- unpaired audio onsets: 9.135s, 13.135s")); } + + #[test] + fn signature_coverage_reports_missing_and_unknown_coded_pairs() { + let report = SyncAnalysisReport { + video_event_count: 3, + audio_event_count: 3, + paired_event_count: 2, + coded_events: true, + activity_start_delta_ms: 0.0, + raw_first_video_activity_s: 1.0, + raw_first_audio_activity_s: 1.0, + first_skew_ms: 0.0, + last_skew_ms: 0.0, + mean_skew_ms: 0.0, + median_skew_ms: 0.0, + max_abs_skew_ms: 0.0, + drift_ms: 0.0, + skews_ms: vec![0.0, 0.0], + video_onsets_s: vec![1.0, 2.0, 3.0], + audio_onsets_s: vec![1.0, 2.0, 3.0], + paired_events: vec![ + SyncEventPair { + event_id: 0, + server_event_id: Some(0), + event_code: Some(1), + video_time_s: 1.0, + audio_time_s: 1.0, + skew_ms: 0.0, + confidence: 1.0, + }, + SyncEventPair { + event_id: 1, + server_event_id: None, + event_code: None, + video_time_s: 2.0, + audio_time_s: 2.0, + skew_ms: 0.0, + confidence: 0.4, + }, + ], + }; + let calibration = SyncCalibrationRecommendation { + ready: false, + recommended_audio_offset_adjust_us: 0, + recommended_video_offset_adjust_us: 0, + note: "need more pairs".to_string(), + }; + let verdict = SyncAnalysisVerdict { + status: "insufficient_data".to_string(), + passed: false, + p95_abs_skew_ms: 0.0, + max_abs_skew_ms: 0.0, + preferred_p95_abs_skew_ms: 35.0, + acceptable_p95_abs_skew_ms: 80.0, + gross_failure_p95_abs_skew_ms: 250.0, + catastrophic_max_abs_skew_ms: 1_000.0, + reason: "need more pairs".to_string(), + }; + let coverage = super::signature_coverage(&[1, 2, 3], &report); + let text = super::format_human_report( + std::path::Path::new("/tmp/capture.webm"), + &report, + coverage.as_ref(), + &calibration, + &verdict, + ); + + assert!(text.contains("- expected coded signatures: 3")); + assert!(text.contains("- paired coded signatures: 1/3")); + assert!(text.contains("- missing paired signature ids: 1, 2")); + assert!(text.contains("- missing paired signature codes: 2, 3")); + assert!(text.contains("- paired signatures without identity: 1")); + } } diff --git a/client/src/sync_probe/analyze.rs b/client/src/sync_probe/analyze.rs index 1310fd2..d08f4e5 100644 --- a/client/src/sync_probe/analyze.rs +++ b/client/src/sync_probe/analyze.rs @@ -38,24 +38,14 @@ pub fn analyze_capture( } else { let colors = extract_video_colors(capture_path)?; let timestamps = reconcile_video_timestamps(raw_timestamps.clone(), colors.len())?; - match detect_color_coded_video_segments( + let segments = detect_color_coded_video_segments( ×tamps, &colors, &options.event_width_codes, options.pulse_width_s, - ) { - Ok(segments) => (segments, true), - Err(color_error) => { - let brightness = extract_video_brightness(capture_path)?; - let timestamps = reconcile_video_timestamps(raw_timestamps, brightness.len())?; - ( - detect_video_segments(×tamps, &brightness).with_context(|| { - format!("color-coded video pulse detection failed: {color_error}") - })?, - false, - ) - } - } + ) + .context("color-coded video pulse detection failed")?; + (segments, true) }; let audio_samples = extract_audio_samples(capture_path)?; diff --git a/client/src/sync_probe/analyze/media_extract.rs b/client/src/sync_probe/analyze/media_extract.rs index 958904b..761bcf8 100644 --- a/client/src/sync_probe/analyze/media_extract.rs +++ b/client/src/sync_probe/analyze/media_extract.rs @@ -441,8 +441,24 @@ fn palette_match_score(r: u8, g: u8, b: u8) -> f64 { return 0.0; } - const PALETTE: [(u8, u8, u8); 4] = - [(255, 45, 45), (0, 230, 118), (41, 121, 255), (255, 179, 0)]; + const PALETTE: [(u8, u8, u8); 16] = [ + (255, 45, 45), + (0, 230, 118), + (41, 121, 255), + (255, 179, 0), + (216, 27, 96), + (0, 188, 212), + (205, 220, 57), + (126, 87, 194), + (255, 112, 67), + (38, 166, 154), + (255, 64, 129), + (92, 107, 192), + (255, 235, 59), + (105, 240, 174), + (171, 71, 188), + (3, 169, 244), + ]; let best_distance = PALETTE .into_iter() .map(|(pr, pg, pb)| { diff --git a/client/src/sync_probe/analyze/onset_detection/correlation.rs b/client/src/sync_probe/analyze/onset_detection/correlation.rs index ff884c3..3a6b0b9 100644 --- a/client/src/sync_probe/analyze/onset_detection/correlation.rs +++ b/client/src/sync_probe/analyze/onset_detection/correlation.rs @@ -257,13 +257,10 @@ pub(crate) fn correlate_coded_segments( let filtered_video_segments = filter_segments_to_window(&video_segments, common_window); let filtered_audio_segments = filter_segments_to_window(&audio_segments, common_window); if filtered_video_segments.is_empty() || filtered_audio_segments.is_empty() { - return correlate_segments( - &video_segments, - &audio_segments, - pulse_period_s, - pulse_width_s, - 1, - max_pair_gap_s, + bail!( + "coded pulse common window removed one stream entirely; refusing cadence-only fallback for coded proof (video={} audio={} raw activity delta {activity_start_delta_ms:+.1} ms)", + video_segments.len(), + audio_segments.len(), ); } @@ -389,16 +386,6 @@ pub(crate) fn correlate_coded_segments( )); } - if activity_start_delta_ms.abs() >= 1_000.0 { - return correlate_segments( - &video_segments, - &audio_segments, - pulse_period_s, - pulse_width_s, - 1, - max_pair_gap_s, - ); - } bail!( "need at least {MIN_CODED_PAIRS} matching coded pulse pairs; saw {}; raw activity delta was {activity_start_delta_ms:+.1} ms (video={raw_first_video_activity_s:.3}s audio={raw_first_audio_activity_s:.3}s)", pairs.len() diff --git a/client/src/sync_probe/analyze/onset_detection/tests.rs b/client/src/sync_probe/analyze/onset_detection/tests.rs index 6b95fd4..f962541 100644 --- a/client/src/sync_probe/analyze/onset_detection/tests.rs +++ b/client/src/sync_probe/analyze/onset_detection/tests.rs @@ -625,6 +625,38 @@ fn correlate_coded_segments_rejects_nearby_wrong_width_codes() { assert!(correlate_coded_segments(&video, &audio, 1.0, 0.12, &codes, 0.2).is_err()); } +#[test] +fn correlate_coded_segments_refuses_cadence_fallback_when_windows_do_not_overlap() { + fn segment(start_s: f64, code: u32) -> PulseSegment { + let duration_s = 0.12 * f64::from(code); + PulseSegment { + start_s, + end_s: start_s + duration_s, + duration_s, + } + } + + let codes = [1, 2, 3, 4, 5, 6]; + let video = codes + .iter() + .enumerate() + .map(|(tick, code)| segment(100.0 + tick as f64, *code)) + .collect::>(); + let audio = codes + .iter() + .enumerate() + .map(|(tick, code)| segment(tick as f64, *code)) + .collect::>(); + + let error = correlate_coded_segments(&video, &audio, 1.0, 0.12, &codes, 0.5) + .expect_err("coded proof should not fall back to cadence-only pairing"); + + assert!( + error.to_string().contains("refusing cadence-only fallback"), + "unexpected error: {error}" + ); +} + fn assert_sync_report_shape(report: &SyncAnalysisReport, paired_events: usize) { assert_eq!(report.video_event_count, paired_events); assert_eq!(report.audio_event_count, paired_events); diff --git a/common/Cargo.toml b/common/Cargo.toml index 9d83ce4..3034a46 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.19.20" +version = "0.19.21" edition = "2024" build = "build.rs" diff --git a/scripts/manual/run_server_to_rc_mode_matrix.sh b/scripts/manual/run_server_to_rc_mode_matrix.sh index b79d566..5055beb 100755 --- a/scripts/manual/run_server_to_rc_mode_matrix.sh +++ b/scripts/manual/run_server_to_rc_mode_matrix.sh @@ -89,6 +89,7 @@ REMOTE_PULSE_VIDEO_MODE=${REMOTE_PULSE_VIDEO_MODE:-cfr} REMOTE_CAPTURE_STACK=${REMOTE_CAPTURE_STACK:-pulse} REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto} REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK=${REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK:-0} +REMOTE_CAPTURE_READY_SETTLE_SECONDS=${REMOTE_CAPTURE_READY_SETTLE_SECONDS:-1} LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US=${LESAVKA_OUTPUT_DELAY_PROBE_AUDIO_DELAY_US:-0} STAMP="$(date +%Y%m%d-%H%M%S)" @@ -175,6 +176,7 @@ run_mode_probe() { REMOTE_AUDIO_SOURCE="${REMOTE_AUDIO_SOURCE}" \ REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK="${REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK}" \ REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS="${LESAVKA_SERVER_RC_PREROLL_DISCARD_SECONDS}" \ + REMOTE_CAPTURE_READY_SETTLE_SECONDS="${REMOTE_CAPTURE_READY_SETTLE_SECONDS}" \ PROBE_PREBUILD=0 \ VIDEO_SIZE="${width}x${height}" \ VIDEO_FPS="${fps}" \ @@ -915,6 +917,7 @@ audio = smoothness.get("audio") or {} audio_cadence = audio.get("packet_cadence") or {} audio_rms = audio.get("rms_continuity") or {} verdict = report.get("verdict") or {} +signature_coverage = report.get("signature_coverage") or {} sync_pass = verdict.get("passed") is True freshness_status = freshness.get("status", "unknown") @@ -929,6 +932,9 @@ missing = as_int(video.get("estimated_missing_frames"), 0) undecodable = as_int(video.get("undecodable_frames"), 0) duplicates = as_int(video.get("duplicate_frames"), 0) low_rms = as_int(audio_rms.get("low_rms_window_count"), 0) +signature_expected = as_int(signature_coverage.get("expected_event_count"), 0) +signature_paired = as_int(signature_coverage.get("paired_event_count"), 0) +signature_unknown = as_int(signature_coverage.get("unknown_pair_identity_count"), 0) pair_confidences = [] for event in report.get("paired_events") or []: if not isinstance(event, dict): @@ -966,6 +972,13 @@ if as_bool(require_sync_raw) and not sync_pass: reasons.append(f"sync did not pass: {verdict.get('status', 'unknown')}") if as_bool(require_freshness_raw) and not freshness_pass: reasons.append(f"freshness did not pass: {freshness_status}") +if signature_expected > 0: + if signature_paired < signature_expected: + reasons.append(f"paired coded signatures {signature_paired} < expected {signature_expected}") + if signature_unknown > 0: + reasons.append(f"paired signatures without coded identity {signature_unknown} > 0") +elif report.get("coded_events") is True: + reasons.append("coded signature coverage unavailable") if video_hiccups > as_int(max_video_hiccups_raw): reasons.append(f"video hiccups {video_hiccups} > {max_video_hiccups_raw}") if audio_hiccups > as_int(max_audio_hiccups_raw): @@ -1012,6 +1025,11 @@ artifact = { "video_event_count": as_int(report.get("video_event_count"), 0), "audio_event_count": as_int(report.get("audio_event_count"), 0), "paired_event_count": as_int(report.get("paired_event_count"), 0), + "signature_expected_event_count": signature_expected, + "signature_paired_event_count": signature_paired, + "signature_missing_event_ids": signature_coverage.get("missing_event_ids") or [], + "signature_missing_codes": signature_coverage.get("missing_codes") or [], + "signature_unknown_pair_identity_count": signature_unknown, "paired_confidence_min": min(pair_confidences) if pair_confidences else 0.0, "paired_confidence_median": median(pair_confidences), }, @@ -1277,6 +1295,9 @@ for result in results: " sync evidence: " f"video_onsets={sync.get('video_event_count', 0)} audio_onsets={sync.get('audio_event_count', 0)} " f"pairs={sync.get('paired_event_count', 0)} " + f"coded_pairs={sync.get('signature_paired_event_count', 0)}/{sync.get('signature_expected_event_count', 0)} " + f"missing_codes={sync.get('signature_missing_codes') or []} " + f"unknown_identity={sync.get('signature_unknown_pair_identity_count', 0)} " f"pair_conf_median={sync.get('paired_confidence_median', 0.0):.3f} " f"raw_pair_disagreement={sync.get('activity_pair_disagreement_ms', 0.0):+.1f}ms" ) diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index 8165cb6..3727bbc 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -44,9 +44,10 @@ REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto} REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto} REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK=${REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK:-0} REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS=${REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS:-0} +REMOTE_CAPTURE_READY_SETTLE_SECONDS=${REMOTE_CAPTURE_READY_SETTLE_SECONDS:-1} ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-0} ANALYSIS_SCALE_WIDTH=${ANALYSIS_SCALE_WIDTH:-1280} -ANALYSIS_TIMELINE_WINDOW=${ANALYSIS_TIMELINE_WINDOW:-1} +ANALYSIS_TIMELINE_WINDOW=${ANALYSIS_TIMELINE_WINDOW:-0} ANALYSIS_TIMELINE_WINDOW_PADDING_SECONDS=${ANALYSIS_TIMELINE_WINDOW_PADDING_SECONDS:-1.0} SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=30"} PROBE_PREBUILD=${PROBE_PREBUILD:-1} @@ -1728,12 +1729,6 @@ freshness_min_event_age_ms = min(event_age_min_values) if event_age_min_values e if not event_age_p95_values: freshness_status = "unknown" freshness_reason = "clock-aligned server feed and Tethys capture timestamps were not available" -elif not sync_passed: - freshness_status = "unknown" - freshness_reason = ( - "sync did not pass, so freshness from paired signatures is not trustworthy: " - f"{sync_verdict.get('status', 'unknown')} - {sync_verdict.get('reason', '')}" - ) elif not clock_alignment_available or clock_uncertainty_ms > max_clock_uncertainty_ms: freshness_status = "unknown" freshness_reason = ( @@ -1754,6 +1749,12 @@ elif freshness_worst_event_p95_ms < -clock_uncertainty_ms: f"worst RC event-age p95 {freshness_worst_event_p95_ms:.1f} ms, uncertainty " f"{clock_uncertainty_ms:.1f} ms" ) +elif not sync_passed: + freshness_status = "unknown" + freshness_reason = ( + "sync did not pass, so freshness from paired signatures is not trustworthy: " + f"{sync_verdict.get('status', 'unknown')} - {sync_verdict.get('reason', '')}" + ) elif freshness_worst_event_with_uncertainty_ms <= max_freshness_age_ms and ( freshness_worst_drift_ms is None or freshness_worst_drift_ms <= max_freshness_drift_ms ): @@ -2084,6 +2085,7 @@ ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ "${REMOTE_AUDIO_QUIESCE_USER_AUDIO}" \ "${REMOTE_CAPTURE_ALLOW_ALSA_FALLBACK}" \ "${REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS}" \ + "${REMOTE_CAPTURE_READY_SETTLE_SECONDS}" \ > >(tee "${LOCAL_CAPTURE_LOG}") \ 2> >(tee -a "${LOCAL_CAPTURE_LOG}" >&2) <<'REMOTE_CAPTURE_SCRIPT' & set -euo pipefail @@ -2100,6 +2102,7 @@ remote_audio_source=${10} remote_audio_quiesce_user_audio=${11} remote_capture_allow_alsa_fallback=${12} remote_capture_preroll_discard_seconds=${13} +remote_capture_ready_settle_seconds=${14} rm -f "${remote_capture}" @@ -2443,7 +2446,11 @@ gst_decode_chain="$(gst_video_decode_chain)" run_ffmpeg_capture() { local rc=0 - timeout --kill-after=5 --signal=INT "$((capture_seconds + 5))" "$@" &2 +} + +signal_capture_ready() { + if [[ "${remote_capture_ready_settle_seconds}" =~ ^[0-9]+([.][0-9]+)?$ ]]; then + sleep "${remote_capture_ready_settle_seconds}" + fi + printf '%s\n' "__LESAVKA_CAPTURE_READY__" +} + +run_tolerant_capture() { + announce_capture_start + "$@" & + local capture_pid=$! + signal_capture_ready + wait "${capture_pid}" || true +} + quiesce_for_alsa=0 case "${remote_audio_quiesce_user_audio}" in 1|true|yes) @@ -2525,28 +2551,31 @@ discard_preroll_capture() { discard_preroll_capture "${remote_capture_preroll_discard_seconds}" -printf '%s\n' "__LESAVKA_CAPTURE_READY__" -printf 'capture_start_unix_ns=%s\n' "$(date +%s%N)" >&2 - if [[ "${capture_mode}" == "pwpipe" ]]; then printf 'using PipeWire-native mux capture target serial: %s\n' "${pw_audio_target}" >&2 - timeout "${capture_seconds}" pw-record \ - --target "${pw_audio_target}" \ - --rate 48000 \ - --channels 2 \ - --format s16 \ - --raw - \ - | pw-v4l2 ffmpeg -hide_banner -loglevel error -y \ - -thread_queue_size 1024 \ - "${video_args[@]}" \ - -i "${resolved_video_device}" \ - -thread_queue_size 1024 \ - -f s16le -ar 48000 -ac 2 \ - -i pipe:0 \ - -t "${capture_seconds}" \ - -c:v copy \ - -c:a pcm_s16le \ - "${remote_capture}" + announce_capture_start + ( + timeout "${capture_seconds}" pw-record \ + --target "${pw_audio_target}" \ + --rate 48000 \ + --channels 2 \ + --format s16 \ + --raw - \ + | pw-v4l2 ffmpeg -hide_banner -loglevel error -y \ + -thread_queue_size 1024 \ + "${video_args[@]}" \ + -i "${resolved_video_device}" \ + -thread_queue_size 1024 \ + -f s16le -ar 48000 -ac 2 \ + -i pipe:0 \ + -t "${capture_seconds}" \ + -c:v copy \ + -c:a pcm_s16le \ + "${remote_capture}" + ) & + capture_pid=$! + signal_capture_ready + wait "${capture_pid}" elif [[ "${capture_mode}" == "pulse" ]]; then printf 'using Pulse source: %s\n' "${pulse_source}" >&2 case "${remote_pulse_capture_tool}" in @@ -2596,7 +2625,7 @@ elif [[ "${capture_mode}" == "pulse" ]]; then printf 'gst copy mode only supports mjpeg input, got %s\n' "${video_format}" >&2 exit 64 fi - timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \ + run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device="${resolved_video_device}" do-timestamp=true ! \ @@ -2605,10 +2634,10 @@ elif [[ "${capture_mode}" == "pulse" ]]; then pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \ - queue ! mux. || true + queue ! mux. ;; cfr) - timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \ + run_tolerant_capture timeout --kill-after=5 --signal=INT "$((capture_seconds + 3))" \ gst-launch-1.0 -q -e \ matroskamux name=mux ! filesink location="${remote_capture}" \ v4l2src device="${resolved_video_device}" do-timestamp=true ! \ @@ -2621,7 +2650,7 @@ elif [[ "${capture_mode}" == "pulse" ]]; then pulsesrc device="${pulse_source}" do-timestamp=true ! \ audio/x-raw,rate=48000,channels=2 ! \ audioconvert ! audioresample ! audio/x-raw,rate=48000,channels=2 ! \ - queue ! mux. || true + queue ! mux. ;; *) printf 'unsupported REMOTE_PULSE_VIDEO_MODE=%s\n' "${remote_pulse_video_mode}" >&2 diff --git a/server/Cargo.toml b/server/Cargo.toml index ca5e751..68ca5d6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.19.20" +version = "0.19.21" edition = "2024" autobins = false diff --git a/testing/tests/client_manual_sync_script_contract.rs b/testing/tests/client_manual_sync_script_contract.rs index 67d07cc..d4c2eeb 100644 --- a/testing/tests/client_manual_sync_script_contract.rs +++ b/testing/tests/client_manual_sync_script_contract.rs @@ -49,7 +49,7 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "LESAVKA_OUTPUT_DELAY_SAVE=${LESAVKA_OUTPUT_DELAY_SAVE:-0}", "LESAVKA_OUTPUT_REQUIRE_SYNC_PASS=${LESAVKA_OUTPUT_REQUIRE_SYNC_PASS:-0}", "LESAVKA_OUTPUT_DELAY_TARGET=${LESAVKA_OUTPUT_DELAY_TARGET:-video}", - "LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-8}", + "LESAVKA_OUTPUT_DELAY_MIN_PAIRS=${LESAVKA_OUTPUT_DELAY_MIN_PAIRS:-13}", "LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS=${LESAVKA_OUTPUT_DELAY_MAX_ABS_SKEW_MS:-5000}", "LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS=${LESAVKA_OUTPUT_DELAY_MAX_DRIFT_MS:-80}", "LESAVKA_OUTPUT_DELAY_MAX_STEP_US=${LESAVKA_OUTPUT_DELAY_MAX_STEP_US:-1500000}", @@ -57,6 +57,11 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_DRIFT_MS:-100}", "LESAVKA_OUTPUT_FRESHNESS_MAX_CLOCK_UNCERTAINTY_MS=${LESAVKA_OUTPUT_FRESHNESS_MAX_CLOCK_UNCERTAINTY_MS:-250}", "REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS=${REMOTE_CAPTURE_PREROLL_DISCARD_SECONDS:-0}", + "REMOTE_CAPTURE_READY_SETTLE_SECONDS=${REMOTE_CAPTURE_READY_SETTLE_SECONDS:-1}", + "remote_capture_ready_settle_seconds=${14}", + "announce_capture_start", + "signal_capture_ready", + "run_tolerant_capture", "discarding %ss of post-enumeration capture before probe", "ffmpeg -nostdin -hide_banner", "\"$@\"