diff --git a/Cargo.lock b/Cargo.lock index 40235a4..c97e816 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.16.12" +version = "0.16.13" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.16.12" +version = "0.16.13" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.16.12" +version = "0.16.13" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 60ab076..0caf2f7 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.16.12" +version = "0.16.13" edition = "2024" [dependencies] diff --git a/client/src/bin/lesavka-sync-analyze.rs b/client/src/bin/lesavka-sync-analyze.rs index fd5f002..46b3b15 100644 --- a/client/src/bin/lesavka-sync-analyze.rs +++ b/client/src/bin/lesavka-sync-analyze.rs @@ -23,7 +23,7 @@ struct SyncAnalyzeOutput<'a> { #[cfg(not(coverage))] fn main() -> Result<()> { let args = parse_args(std::env::args().skip(1))?; - let report = analyze_capture(&args.capture_path, &SyncAnalysisOptions::default()) + let report = analyze_capture(&args.capture_path, &args.options) .with_context(|| format!("analyzing sync capture {}", args.capture_path.display()))?; let calibration = report.calibration_recommendation(); let verdict = report.verdict(); @@ -51,11 +51,12 @@ fn main() -> Result<()> { } #[cfg(any(not(coverage), test))] -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, PartialEq)] struct AnalyzeArgs { capture_path: PathBuf, emit_json: bool, report_dir: Option, + options: SyncAnalysisOptions, } #[cfg(any(not(coverage), test))] @@ -66,13 +67,16 @@ where { let args = args.into_iter().map(Into::into).collect::>(); if args.is_empty() || args.iter().any(|arg| arg == "--help" || arg == "-h") { - println!("Usage: lesavka-sync-analyze [--json] [--report-dir ]"); + println!( + "Usage: lesavka-sync-analyze [--json] [--report-dir ] [--event-width-codes 1,2,1,3]" + ); std::process::exit(0); } let mut emit_json = false; let mut report_dir = None::; let mut capture_path = None::; + let mut options = SyncAnalysisOptions::default(); let mut iter = args.into_iter(); while let Some(arg) = iter.next() { if arg == "--json" { @@ -93,6 +97,20 @@ where report_dir = Some(PathBuf::from(dir)); continue; } + if arg == "--event-width-codes" { + let Some(raw_codes) = iter.next() else { + bail!("--event-width-codes requires a comma-separated list"); + }; + options.event_width_codes = parse_event_width_codes(&raw_codes)?; + continue; + } + if let Some(raw_codes) = arg.strip_prefix("--event-width-codes=") { + if raw_codes.is_empty() { + bail!("--event-width-codes requires a comma-separated list"); + } + options.event_width_codes = parse_event_width_codes(raw_codes)?; + continue; + } if capture_path.is_some() { bail!("unexpected extra argument `{arg}`"); } @@ -104,9 +122,34 @@ where capture_path, emit_json, report_dir, + options, }) } +#[cfg(any(not(coverage), test))] +fn parse_event_width_codes(raw: &str) -> Result> { + let codes = raw + .split(',') + .filter_map(|part| { + let trimmed = part.trim(); + (!trimmed.is_empty()).then_some(trimmed) + }) + .map(|part| { + let code = part + .parse::() + .with_context(|| format!("parsing event width code `{part}`"))?; + if code == 0 { + bail!("event width codes must be positive"); + } + Ok(code) + }) + .collect::>>()?; + if codes.is_empty() { + bail!("event width code list must not be empty"); + } + Ok(codes) +} + #[cfg(not(coverage))] fn format_human_report( capture_path: &std::path::Path, @@ -213,9 +256,17 @@ mod tests { ); } + #[test] + fn parse_args_accepts_event_width_codes() { + let args = parse_args(["capture.mkv", "--event-width-codes", "1,2,1,3"]).expect("args"); + assert_eq!(args.options.event_width_codes, vec![1, 2, 1, 3]); + } + #[test] fn parse_args_rejects_extra_positional_arguments() { assert!(parse_args(["one.mkv", "two.mkv"]).is_err()); + assert!(parse_args(["one.mkv", "--event-width-codes", ""]).is_err()); + assert!(parse_args(["one.mkv", "--event-width-codes", "0"]).is_err()); } #[test] diff --git a/client/src/sync_probe/analyze.rs b/client/src/sync_probe/analyze.rs index 3ac23ab..f43d7ad 100644 --- a/client/src/sync_probe/analyze.rs +++ b/client/src/sync_probe/analyze.rs @@ -11,7 +11,8 @@ use std::path::Path; use media_extract::{extract_audio_samples, extract_video_brightness, extract_video_timestamps}; use onset_detection::{ - DEFAULT_AUDIO_SAMPLE_RATE_HZ, correlate_segments, detect_audio_segments, detect_video_segments, + DEFAULT_AUDIO_SAMPLE_RATE_HZ, correlate_coded_segments, correlate_segments, + detect_audio_segments, detect_video_segments, }; pub use onset_detection::{detect_audio_onsets, detect_video_onsets}; @@ -38,14 +39,25 @@ pub fn analyze_capture( options.audio_window_ms, )?; - correlate_segments( - &video_segments, - &audio_segments, - options.pulse_period_s, - options.pulse_width_s, - options.marker_tick_period, - options.max_pair_gap_s, - ) + if options.event_width_codes.is_empty() { + correlate_segments( + &video_segments, + &audio_segments, + options.pulse_period_s, + options.pulse_width_s, + options.marker_tick_period, + options.max_pair_gap_s, + ) + } else { + correlate_coded_segments( + &video_segments, + &audio_segments, + options.pulse_period_s, + options.pulse_width_s, + &options.event_width_codes, + options.max_pair_gap_s, + ) + } } fn reconcile_video_timestamps(timestamps: Vec, frame_count: usize) -> Result> { diff --git a/client/src/sync_probe/analyze/onset_detection.rs b/client/src/sync_probe/analyze/onset_detection.rs index bed42aa..1a00e0f 100644 --- a/client/src/sync_probe/analyze/onset_detection.rs +++ b/client/src/sync_probe/analyze/onset_detection.rs @@ -4,7 +4,7 @@ mod correlation; #[cfg(test)] mod tests; -pub(crate) use correlation::correlate_segments; +pub(crate) use correlation::{correlate_coded_segments, correlate_segments}; pub(super) const DEFAULT_AUDIO_SAMPLE_RATE_HZ: u32 = 48_000; // Real HDMI/capture paths can preserve pulse shape while compressing its absolute diff --git a/client/src/sync_probe/analyze/onset_detection/correlation.rs b/client/src/sync_probe/analyze/onset_detection/correlation.rs index b86290a..a20115d 100644 --- a/client/src/sync_probe/analyze/onset_detection/correlation.rs +++ b/client/src/sync_probe/analyze/onset_detection/correlation.rs @@ -12,6 +12,7 @@ pub(super) use collapse::collapse_segments_by_phase; const MARKER_WIDTH_MULTIPLIER: f64 = 1.5; const PHASE_TOLERANCE_WIDTH_MULTIPLIER: f64 = 2.5; const STARTUP_PHASE_ANCHOR_TOLERANCE_FRACTION: f64 = 1.0 / 3.0; +const MIN_CODED_PAIRS: usize = 3; #[cfg_attr(not(test), allow(dead_code))] pub(super) fn correlate_onsets( @@ -174,6 +175,122 @@ pub(crate) fn correlate_segments( )) } +pub(crate) fn correlate_coded_segments( + video_segments: &[PulseSegment], + audio_segments: &[PulseSegment], + pulse_period_s: f64, + pulse_width_s: f64, + event_width_codes: &[u32], + max_pair_gap_s: f64, +) -> Result { + if event_width_codes.is_empty() { + bail!("event width code sequence must not be empty"); + } + if event_width_codes.iter().any(|code| *code == 0) { + bail!("event width codes must stay positive"); + } + if pulse_period_s <= 0.0 { + bail!("pulse period must stay positive"); + } + if pulse_width_s <= 0.0 { + bail!("pulse width must stay positive"); + } + if max_pair_gap_s <= 0.0 { + bail!("max pair gap must stay positive"); + } + + let phase_tolerance_s = segment_phase_tolerance(pulse_period_s, pulse_width_s, max_pair_gap_s); + let video_segments = + collapse_segments_by_phase(video_segments, pulse_period_s, phase_tolerance_s); + let audio_segments = + collapse_segments_by_phase(audio_segments, pulse_period_s, phase_tolerance_s); + if video_segments.is_empty() { + bail!("video onset list is empty"); + } + if audio_segments.is_empty() { + bail!("audio onset list is empty"); + } + + let video_onsets_s = video_segments + .iter() + .map(|segment| segment.start_s) + .collect::>(); + let audio_onsets_s = audio_segments + .iter() + .map(|segment| segment.start_s) + .collect::>(); + let activity_start_delta_ms = (audio_onsets_s[0] - video_onsets_s[0]) * 1000.0; + let (_, _, common_window) = + trim_onsets_to_common_activity_window(&video_onsets_s, &audio_onsets_s, max_pair_gap_s); + let filtered_video_segments = filter_segments_to_window(&video_segments, common_window); + let filtered_audio_segments = filter_segments_to_window(&audio_segments, common_window); + if filtered_video_segments.is_empty() || filtered_audio_segments.is_empty() { + return correlate_segments( + &video_segments, + &audio_segments, + pulse_period_s, + pulse_width_s, + 1, + max_pair_gap_s, + ); + } + + let expected_start_skew_ms = + (filtered_audio_segments[0].start_s - filtered_video_segments[0].start_s) * 1000.0; + let video_indexed = index_coded_segments_by_spacing( + &filtered_video_segments, + pulse_period_s, + pulse_width_s, + event_width_codes, + ); + let audio_indexed = index_coded_segments_by_spacing( + &filtered_audio_segments, + pulse_period_s, + pulse_width_s, + event_width_codes, + ); + let offset_candidates = candidate_coded_index_offsets(&video_indexed, &audio_indexed); + let pairs = best_coded_pairs_for_index_offsets( + &video_indexed, + &audio_indexed, + &offset_candidates, + max_pair_gap_s, + expected_start_skew_ms, + ); + + if pairs.len() < MIN_CODED_PAIRS { + if activity_start_delta_ms.abs() >= 1_000.0 { + return correlate_segments( + &video_segments, + &audio_segments, + pulse_period_s, + pulse_width_s, + 1, + max_pair_gap_s, + ); + } + bail!( + "need at least {MIN_CODED_PAIRS} matching coded pulse pairs; saw {}", + pairs.len() + ); + } + + let video_onsets_s = filtered_video_segments + .iter() + .map(|segment| segment.start_s) + .collect::>(); + let audio_onsets_s = filtered_audio_segments + .iter() + .map(|segment| segment.start_s) + .collect::>(); + Ok(sync_report_from_pairs( + &video_onsets_s, + &audio_onsets_s, + activity_start_delta_ms, + pairs, + )) +} + #[derive(Clone, Copy)] struct CommonActivityWindow { start_s: f64, @@ -231,6 +348,19 @@ fn trim_onsets_to_common_activity_window<'a>( ) } +fn filter_segments_to_window( + segments: &[PulseSegment], + common_window: CommonActivityWindow, +) -> Vec { + segments + .iter() + .copied() + .filter(|segment| { + segment.start_s >= common_window.start_s && segment.start_s <= common_window.end_s + }) + .collect() +} + fn segment_phase_tolerance(pulse_period_s: f64, pulse_width_s: f64, max_pair_gap_s: f64) -> f64 { (pulse_width_s * PHASE_TOLERANCE_WIDTH_MULTIPLIER) .max(max_pair_gap_s.min(pulse_period_s / 3.0)) @@ -270,6 +400,70 @@ pub(super) fn index_onsets_by_spacing(onsets_s: &[f64], pulse_period_s: f64) -> indexed } +#[derive(Clone, Copy, Debug, PartialEq)] +struct CodedPulseSegment { + start_s: f64, + code: u32, +} + +fn index_coded_segments_by_spacing( + segments: &[PulseSegment], + pulse_period_s: f64, + pulse_width_s: f64, + event_width_codes: &[u32], +) -> BTreeMap { + let mut indexed = BTreeMap::new(); + let Some(first_segment) = segments.first().copied() else { + return indexed; + }; + + let mut pulse_index = 0_i64; + let mut previous_start = first_segment.start_s; + indexed.insert( + pulse_index, + CodedPulseSegment { + start_s: first_segment.start_s, + code: nearest_event_width_code( + first_segment.duration_s, + pulse_width_s, + event_width_codes, + ), + }, + ); + for segment in segments.iter().copied().skip(1) { + let pulse_steps = ((segment.start_s - previous_start) / pulse_period_s) + .round() + .max(1.0) as i64; + pulse_index += pulse_steps; + indexed.insert( + pulse_index, + CodedPulseSegment { + start_s: segment.start_s, + code: nearest_event_width_code( + segment.duration_s, + pulse_width_s, + event_width_codes, + ), + }, + ); + previous_start = segment.start_s; + } + indexed +} + +fn nearest_event_width_code(duration_s: f64, pulse_width_s: f64, event_width_codes: &[u32]) -> u32 { + let ratio = duration_s / pulse_width_s.max(f64::EPSILON); + event_width_codes + .iter() + .copied() + .min_by(|left, right| { + let left_error = (ratio - f64::from(*left)).abs(); + let right_error = (ratio - f64::from(*right)).abs(); + left_error.total_cmp(&right_error) + }) + .unwrap_or(1) +} + pub(super) fn candidate_index_offsets( video_indexed: &BTreeMap, audio_indexed: &BTreeMap, @@ -298,6 +492,34 @@ pub(super) fn candidate_index_offsets( (audio_min - video_max..=audio_max - video_min).collect() } +fn candidate_coded_index_offsets( + video_indexed: &BTreeMap, + audio_indexed: &BTreeMap, +) -> Vec { + if video_indexed.is_empty() || audio_indexed.is_empty() { + return Vec::new(); + } + + let video_min = *video_indexed + .keys() + .next() + .expect("non-empty indexed video map has a first key"); + let video_max = *video_indexed + .keys() + .next_back() + .expect("non-empty indexed video map has a last key"); + let audio_min = *audio_indexed + .keys() + .next() + .expect("non-empty indexed audio map has a first key"); + let audio_max = *audio_indexed + .keys() + .next_back() + .expect("non-empty indexed audio map has a last key"); + + (audio_min - video_max..=audio_max - video_min).collect() +} + pub(super) fn marker_index_offsets( video_indexed: &BTreeMap, audio_indexed: &BTreeMap, @@ -420,6 +642,70 @@ fn best_pairs_for_index_offsets( best.map(|(_, _, _, _, pairs)| pairs).unwrap_or_default() } +fn best_coded_pairs_for_index_offsets( + video_indexed: &BTreeMap, + audio_indexed: &BTreeMap, + offset_candidates: &[i64], + max_pair_gap_s: f64, + expected_start_skew_ms: f64, +) -> Vec { + let max_pair_gap_ms = max_pair_gap_s * 1000.0; + let startup_phase_anchor_tolerance_ms = + max_pair_gap_ms * STARTUP_PHASE_ANCHOR_TOLERANCE_FRACTION; + let mut best: Option<(bool, usize, f64, f64, Vec)> = None; + + for offset in offset_candidates.iter().copied() { + let pairs = video_indexed + .iter() + .filter_map(|(pulse_index, video)| { + audio_indexed + .get(&(pulse_index + offset)) + .filter(|audio| audio.code == video.code) + .map(|audio| { + let skew_ms = (audio.start_s - video.start_s) * 1000.0; + MatchedOnsetPair::new(video.start_s, audio.start_s, skew_ms, max_pair_gap_s) + }) + }) + .filter(|pair| pair.skew_ms.abs() <= max_pair_gap_ms) + .collect::>(); + if pairs.is_empty() { + continue; + } + + let mean_abs_skew_ms = + pairs.iter().map(|pair| pair.skew_ms.abs()).sum::() / pairs.len() as f64; + let startup_phase_anchor_error_ms = (pairs[0].skew_ms - expected_start_skew_ms).abs(); + let startup_phase_anchor_consistent = + startup_phase_anchor_error_ms <= startup_phase_anchor_tolerance_ms; + match &best { + Some(( + best_anchor_consistent, + best_count, + best_anchor_error_ms, + best_mean_abs_skew_ms, + _, + )) if (!startup_phase_anchor_consistent && *best_anchor_consistent) + || (startup_phase_anchor_consistent == *best_anchor_consistent + && (pairs.len() < *best_count + || (pairs.len() == *best_count + && (startup_phase_anchor_error_ms > *best_anchor_error_ms + || (startup_phase_anchor_error_ms == *best_anchor_error_ms + && mean_abs_skew_ms >= *best_mean_abs_skew_ms))))) => {} + _ => { + best = Some(( + startup_phase_anchor_consistent, + pairs.len(), + startup_phase_anchor_error_ms, + mean_abs_skew_ms, + pairs, + )) + } + } + } + + best.map(|(_, _, _, _, pairs)| pairs).unwrap_or_default() +} + pub(super) fn marker_onsets(segments: &[PulseSegment], pulse_width_s: f64) -> Vec { let threshold = pulse_width_s * MARKER_WIDTH_MULTIPLIER; segments diff --git a/client/src/sync_probe/analyze/onset_detection/tests.rs b/client/src/sync_probe/analyze/onset_detection/tests.rs index b807b78..fab946e 100644 --- a/client/src/sync_probe/analyze/onset_detection/tests.rs +++ b/client/src/sync_probe/analyze/onset_detection/tests.rs @@ -3,8 +3,8 @@ use super::correlation::{ index_onsets_by_spacing, marker_index_offsets, marker_onsets, shortest_wrapped_difference, }; use super::{ - PulseSegment, correlate_segments, detect_audio_onsets, detect_audio_segments, - detect_video_onsets, detect_video_segments, median, + PulseSegment, correlate_coded_segments, correlate_segments, detect_audio_onsets, + detect_audio_segments, detect_video_onsets, detect_video_segments, median, }; use crate::sync_probe::analyze::report::SyncAnalysisReport; use std::collections::BTreeMap; @@ -272,6 +272,64 @@ fn correlate_segments_preserves_whole_period_delay_evidence() { ); } +#[test] +fn correlate_coded_segments_matches_preserved_event_width_codes() { + fn segment(start_s: f64, code: u32) -> PulseSegment { + let duration_s = 0.12 * f64::from(code); + PulseSegment { + start_s, + end_s: start_s + duration_s, + duration_s, + } + } + + let codes = [1, 2, 1, 3, 2, 4, 1, 1]; + let video = codes + .iter() + .enumerate() + .map(|(tick, code)| segment(tick as f64, *code)) + .collect::>(); + let audio = codes + .iter() + .enumerate() + .map(|(tick, code)| segment(tick as f64 + 0.045, *code)) + .collect::>(); + + let report = + correlate_coded_segments(&video, &audio, 1.0, 0.12, &codes, 0.2).expect("coded report"); + + assert_eq!(report.paired_event_count, codes.len()); + assert!((report.median_skew_ms - 45.0).abs() < 1.0); + assert!(report.max_abs_skew_ms < 50.0); +} + +#[test] +fn correlate_coded_segments_rejects_nearby_wrong_width_codes() { + fn segment(start_s: f64, code: u32) -> PulseSegment { + let duration_s = 0.12 * f64::from(code); + PulseSegment { + start_s, + end_s: start_s + duration_s, + duration_s, + } + } + + let codes = [1, 2, 1, 3, 2, 4]; + let video = codes + .iter() + .enumerate() + .map(|(tick, code)| segment(tick as f64, *code)) + .collect::>(); + let shifted_audio_codes = [3, 1, 4, 1, 2, 1]; + let audio = shifted_audio_codes + .iter() + .enumerate() + .map(|(tick, code)| segment(tick as f64 + 0.02, *code)) + .collect::>(); + + assert!(correlate_coded_segments(&video, &audio, 1.0, 0.12, &codes, 0.2).is_err()); +} + fn assert_sync_report_shape(report: &SyncAnalysisReport, paired_events: usize) { assert_eq!(report.video_event_count, paired_events); assert_eq!(report.audio_event_count, paired_events); diff --git a/client/src/sync_probe/analyze/report.rs b/client/src/sync_probe/analyze/report.rs index 6a0fa7b..bee3edd 100644 --- a/client/src/sync_probe/analyze/report.rs +++ b/client/src/sync_probe/analyze/report.rs @@ -237,6 +237,7 @@ pub struct SyncAnalysisOptions { pub pulse_period_s: f64, pub pulse_width_s: f64, pub marker_tick_period: u32, + pub event_width_codes: Vec, } impl Default for SyncAnalysisOptions { @@ -247,6 +248,7 @@ impl Default for SyncAnalysisOptions { pulse_period_s: DEFAULT_PULSE_PERIOD_S, pulse_width_s: DEFAULT_PULSE_WIDTH_S, marker_tick_period: DEFAULT_MARKER_TICK_PERIOD, + event_width_codes: Vec::new(), } } } @@ -263,6 +265,7 @@ mod tests { assert!((options.pulse_period_s - 1.0).abs() < f64::EPSILON); assert!((options.pulse_width_s - 0.12).abs() < f64::EPSILON); assert_eq!(options.marker_tick_period, 5); + assert!(options.event_width_codes.is_empty()); } #[test] diff --git a/common/Cargo.toml b/common/Cargo.toml index 047293e..842121a 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.16.12" +version = "0.16.13" edition = "2024" build = "build.rs" diff --git a/scripts/manual/browser_consumer_probe.py b/scripts/manual/browser_consumer_probe.py index 5018735..f42e539 100755 --- a/scripts/manual/browser_consumer_probe.py +++ b/scripts/manual/browser_consumer_probe.py @@ -293,6 +293,10 @@ class ProbeHandler(http.server.BaseHTTPRequestHandler): pass +class ReusableTcpServer(socketserver.TCPServer): + allow_reuse_address = True + + def main() -> None: args = parse_args() state = ProbeState(Path(args.output), Path(args.status), args.duration_seconds) @@ -301,7 +305,7 @@ def main() -> None: pass Handler.state = state - with socketserver.TCPServer((args.host, args.port), Handler) as httpd: + with ReusableTcpServer((args.host, args.port), Handler) as httpd: httpd.serve_forever() diff --git a/scripts/manual/local_av_stimulus.py b/scripts/manual/local_av_stimulus.py new file mode 100755 index 0000000..ff81229 --- /dev/null +++ b/scripts/manual/local_av_stimulus.py @@ -0,0 +1,259 @@ +#!/usr/bin/env python3 +"""Serve a local A/V stimulus page for the mirrored upstream sync probe.""" + +from __future__ import annotations + +import argparse +import http.server +import json +import socketserver +import threading +import time +from pathlib import Path + +DEFAULT_EVENT_WIDTH_CODES = "1,2,1,3,2,4,1,1,3,1,4,2,1,2,3,4,1,3,2,2,4,1,2,4,3,1,1,4,2,3,1,2" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Serve Lesavka local A/V sync stimulus") + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=18444) + parser.add_argument("--status", default="/tmp/lesavka-local-av-stimulus-status.json") + parser.add_argument("--duration-seconds", type=int, default=20) + parser.add_argument("--warmup-seconds", type=int, default=4) + parser.add_argument("--pulse-period-ms", type=int, default=1000) + parser.add_argument("--pulse-width-ms", type=int, default=120) + parser.add_argument("--marker-tick-period", type=int, default=5) + parser.add_argument("--event-width-codes", default=DEFAULT_EVENT_WIDTH_CODES) + args = parser.parse_args() + args.event_width_codes = parse_event_width_codes(args.event_width_codes) + return args + + +def parse_event_width_codes(raw: str) -> list[int]: + codes = [int(part.strip()) for part in raw.split(",") if part.strip()] + if not codes: + raise SystemExit("--event-width-codes must contain at least one integer") + if any(code < 1 for code in codes): + raise SystemExit("--event-width-codes values must be positive") + return codes + + +class StimulusState: + def __init__(self, status_path: Path, args: argparse.Namespace) -> None: + self.status_path = status_path + self.args = args + self.lock = threading.Lock() + self.start_token = 0 + self.status = { + "booted_at": time.time(), + "ready": False, + "started": False, + "completed": False, + "last_error": None, + "page_message": "booting", + "last_update": time.time(), + } + self.write_status() + + def write_status(self) -> None: + self.status_path.parent.mkdir(parents=True, exist_ok=True) + tmp = self.status_path.with_suffix(".tmp") + tmp.write_text(json.dumps(self.status, indent=2, sort_keys=True), encoding="utf-8") + tmp.replace(self.status_path) + + def update(self, payload: dict) -> None: + with self.lock: + self.status.update(payload) + self.status["last_update"] = time.time() + self.write_status() + + def snapshot(self) -> dict: + with self.lock: + snap = dict(self.status) + snap.update({ + "start_token": self.start_token, + "duration_seconds": self.args.duration_seconds, + "warmup_seconds": self.args.warmup_seconds, + "pulse_period_ms": self.args.pulse_period_ms, + "pulse_width_ms": self.args.pulse_width_ms, + "marker_tick_period": self.args.marker_tick_period, + "event_width_codes": self.args.event_width_codes, + }) + return snap + + def request_start(self) -> dict: + with self.lock: + self.start_token += 1 + self.status.update({ + "started": False, + "completed": False, + "last_error": None, + "page_message": "start requested", + "start_requested_at": time.time(), + }) + self.write_status() + return self.snapshot() + + +def page_html() -> str: + return """ + + + +Lesavka Local A/V Stimulus + + + +
LESAVKA
booting...
+ + +""" + + +class StimulusHandler(http.server.BaseHTTPRequestHandler): + state: StimulusState + + def _send(self, code: int, body: bytes, content_type: str = "application/json") -> None: + self.send_response(code) + self.send_header("Content-Type", content_type) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self) -> None: + if self.path in ("/", "/index.html"): + self.state.update({"page_message": "html served"}) + self._send(200, page_html().encode("utf-8"), "text/html; charset=utf-8") + return + if self.path == "/command": + self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) + return + if self.path == "/status": + self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) + return + self._send(404, b"not found", "text/plain; charset=utf-8") + + def do_POST(self) -> None: + length = int(self.headers.get("Content-Length", "0")) + body = self.rfile.read(length) + if self.path == "/status": + payload = json.loads(body.decode("utf-8")) + self.state.update(payload) + self._send(200, json.dumps(self.state.snapshot()).encode("utf-8")) + return + if self.path == "/start": + self._send(200, json.dumps(self.state.request_start()).encode("utf-8")) + return + self._send(404, b"not found", "text/plain; charset=utf-8") + + def log_message(self, fmt: str, *args) -> None: + pass + + +class ReusableTcpServer(socketserver.TCPServer): + allow_reuse_address = True + + +def main() -> None: + args = parse_args() + state = StimulusState(Path(args.status), args) + + class Handler(StimulusHandler): + pass + + Handler.state = state + with ReusableTcpServer((args.host, args.port), Handler) as httpd: + httpd.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/scripts/manual/run_upstream_browser_av_sync.sh b/scripts/manual/run_upstream_browser_av_sync.sh index 24d232d..a8b7e0d 100755 --- a/scripts/manual/run_upstream_browser_av_sync.sh +++ b/scripts/manual/run_upstream_browser_av_sync.sh @@ -13,6 +13,9 @@ REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../.." >/dev/null 2>&1 && pwd)" TETHYS_HOST=${TETHYS_HOST:-tethys} LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-https://38.28.125.112:50051} PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-15} +BROWSER_RECORD_SECONDS=${BROWSER_RECORD_SECONDS:-${PROBE_DURATION_SECONDS}} +BROWSER_SYNC_DRIVER_COMMAND=${BROWSER_SYNC_DRIVER_COMMAND:-} +SYNC_ANALYZE_EVENT_WIDTH_CODES=${SYNC_ANALYZE_EVENT_WIDTH_CODES:-} BROWSER_PORT=${BROWSER_PORT:-18443} REMOTE_SCRIPT=${REMOTE_SCRIPT:-/tmp/lesavka-browser-consumer-probe.py} REMOTE_CAPTURE=${REMOTE_CAPTURE:-/tmp/lesavka-browser-av-sync.webm} @@ -38,7 +41,7 @@ ssh ${SSH_OPTS} "${TETHYS_HOST}" bash -s -- \ "${REMOTE_CAPTURE}" \ "${REMOTE_STATUS}" \ "${REMOTE_PROFILE_DIR}" \ - "${PROBE_DURATION_SECONDS}" \ + "${BROWSER_RECORD_SECONDS}" \ "${BROWSER_PORT}" \ "${DISPLAY_ENV}" \ "${REMOTE_RUNTIME_DIR}" <<'REMOTE_SETUP' @@ -129,16 +132,21 @@ ssh ${SSH_OPTS} "${TETHYS_HOST}" "curl -fsS -X POST http://127.0.0.1:${BROWSER_P sleep 1 -echo "==> running local Lesavka sync probe against ${LESAVKA_SERVER_ADDR}" -( - cd "${REPO_ROOT}" - cargo run -p lesavka_client --bin lesavka-sync-probe -- \ - --server "${LESAVKA_SERVER_ADDR}" \ - --duration-seconds "${PROBE_DURATION_SECONDS}" -) +if [[ -n "${BROWSER_SYNC_DRIVER_COMMAND}" ]]; then + echo "==> running custom browser sync driver" + bash -lc "${BROWSER_SYNC_DRIVER_COMMAND}" +else + echo "==> running local Lesavka sync probe against ${LESAVKA_SERVER_ADDR}" + ( + cd "${REPO_ROOT}" + cargo run -p lesavka_client --bin lesavka-sync-probe -- \ + --server "${LESAVKA_SERVER_ADDR}" \ + --duration-seconds "${PROBE_DURATION_SECONDS}" + ) +fi echo "==> waiting for browser recording upload" -dealine_upload=$(( $(date +%s) + PROBE_DURATION_SECONDS + 60 )) +deadline_upload=$(( $(date +%s) + PROBE_DURATION_SECONDS + 60 )) while true; do status_json=$(ssh ${SSH_OPTS} "${TETHYS_HOST}" "test -f '${REMOTE_STATUS}' && cat '${REMOTE_STATUS}'" || true) if [[ -n "${status_json}" ]]; then @@ -148,7 +156,7 @@ while true; do break fi fi - if (( $(date +%s) >= dealine_upload )); then + if (( $(date +%s) >= deadline_upload )); then echo "browser recording was not uploaded before timeout" >&2 [[ -n "${status_json:-}" ]] && echo "last status: ${status_json}" >&2 exit 1 @@ -161,11 +169,15 @@ echo "==> fetching capture back to ${LOCAL_CAPTURE}" scp ${SSH_OPTS} "${TETHYS_HOST}:${REMOTE_CAPTURE}" "${LOCAL_CAPTURE}" echo "==> analyzing browser capture" +analyze_args=(--report-dir "${LOCAL_REPORT_DIR}") +if [[ -n "${SYNC_ANALYZE_EVENT_WIDTH_CODES}" ]]; then + analyze_args+=(--event-width-codes "${SYNC_ANALYZE_EVENT_WIDTH_CODES}") +fi +analyze_args+=("${LOCAL_CAPTURE}") ( cd "${REPO_ROOT}" cargo run -p lesavka_client --bin lesavka-sync-analyze -- \ - --report-dir "${LOCAL_REPORT_DIR}" \ - "${LOCAL_CAPTURE}" + "${analyze_args[@]}" ) echo "==> done" diff --git a/scripts/manual/run_upstream_mirrored_av_sync.sh b/scripts/manual/run_upstream_mirrored_av_sync.sh new file mode 100755 index 0000000..2c1a4da --- /dev/null +++ b/scripts/manual/run_upstream_mirrored_av_sync.sh @@ -0,0 +1,208 @@ +#!/usr/bin/env bash +# scripts/manual/run_upstream_mirrored_av_sync.sh +# Manual: full mirrored upstream A/V sync probe. +# +# This probe intentionally uses the normal lesavka-client capture path as the +# sender. A local browser stimulus is captured by the real webcam and real mic, +# Lesavka relays those live captures to Theia, and a Tethys browser records the +# Lesavka UVC/UAC devices via getUserMedia before analysis. + +set -euo pipefail + +SCRIPT_DIR="$(cd -- "$(dirname "${BASH_SOURCE[0]}")" >/dev/null 2>&1 && pwd)" +REPO_ROOT="$(cd -- "${SCRIPT_DIR}/../.." >/dev/null 2>&1 && pwd)" + +LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-auto} +LESAVKA_SERVER_HOST=${LESAVKA_SERVER_HOST:-theia} +LESAVKA_SERVER_SCHEME=${LESAVKA_SERVER_SCHEME:-https} +LESAVKA_SERVER_PORT=${LESAVKA_SERVER_PORT:-50051} +LESAVKA_TLS_DOMAIN=${LESAVKA_TLS_DOMAIN:-lesavka-server} +PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-20} +PROBE_WARMUP_SECONDS=${PROBE_WARMUP_SECONDS:-4} +PROBE_PULSE_PERIOD_MS=${PROBE_PULSE_PERIOD_MS:-1000} +PROBE_PULSE_WIDTH_MS=${PROBE_PULSE_WIDTH_MS:-120} +PROBE_MARKER_TICK_PERIOD=${PROBE_MARKER_TICK_PERIOD:-5} +PROBE_EVENT_WIDTH_CODES=${PROBE_EVENT_WIDTH_CODES:-1,2,1,3,2,4,1,1,3,1,4,2,1,2,3,4,1,3,2,2,4,1,2,4,3,1,1,4,2,3,1,2} +STIMULUS_PORT=${STIMULUS_PORT:-18444} +STIMULUS_SETTLE_SECONDS=${STIMULUS_SETTLE_SECONDS:-10} +LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-"${REPO_ROOT}/tmp"} +SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=5"} +LOCAL_BROWSER=${LOCAL_BROWSER:-firefox} + +mkdir -p "${LOCAL_OUTPUT_DIR}" +STAMP="$(date +%Y%m%d-%H%M%S)" +ARTIFACT_DIR="${LOCAL_OUTPUT_DIR%/}/lesavka-mirrored-av-sync-${STAMP}" +mkdir -p "${ARTIFACT_DIR}" +STIMULUS_STATUS="${ARTIFACT_DIR}/stimulus-status.json" +STIMULUS_PROFILE="${ARTIFACT_DIR}/stimulus-firefox-profile" +CLIENT_LOG="${ARTIFACT_DIR}/lesavka-client.log" +MEDIA_CONTROL="${ARTIFACT_DIR}/media.control" +RESOLVED_LESAVKA_SERVER_ADDR="" +SERVER_TUNNEL_PID="" +STIMULUS_PID="" +STIMULUS_BROWSER_PID="" +CLIENT_PID="" + +cleanup() { + set +e + [[ -n "${CLIENT_PID}" ]] && kill "${CLIENT_PID}" >/dev/null 2>&1 + [[ -n "${STIMULUS_BROWSER_PID}" ]] && kill "${STIMULUS_BROWSER_PID}" >/dev/null 2>&1 + [[ -n "${STIMULUS_PID}" ]] && kill "${STIMULUS_PID}" >/dev/null 2>&1 + [[ -n "${SERVER_TUNNEL_PID}" ]] && kill "${SERVER_TUNNEL_PID}" >/dev/null 2>&1 +} +trap cleanup EXIT + +pick_local_port() { + python3 - <<'PY' +import socket +with socket.socket() as s: + s.bind(('127.0.0.1', 0)) + print(s.getsockname()[1]) +PY +} + +wait_for_url() { + local url=$1 + local timeout_seconds=$2 + local deadline=$(( $(date +%s) + timeout_seconds )) + until curl -fsS "${url}" >/dev/null 2>&1; do + if (( $(date +%s) >= deadline )); then + echo "Timed out waiting for ${url}" >&2 + return 1 + fi + sleep 0.2 + done +} + +wait_for_tcp() { + local host=$1 + local port=$2 + local timeout_seconds=$3 + python3 - "$host" "$port" "$timeout_seconds" <<'PY' +import socket +import sys +import time + +host = sys.argv[1] +port = int(sys.argv[2]) +deadline = time.monotonic() + float(sys.argv[3]) +last_error = None +while time.monotonic() < deadline: + try: + with socket.create_connection((host, port), timeout=0.5): + sys.exit(0) + except OSError as exc: + last_error = exc + time.sleep(0.2) + +print(f"Timed out waiting for TCP {host}:{port}: {last_error}", file=sys.stderr) +sys.exit(1) +PY +} + +start_server_tunnel_if_needed() { + if [[ "${LESAVKA_SERVER_ADDR}" != "auto" ]]; then + RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_ADDR}" + return + fi + local port + port="$(pick_local_port)" + echo "==> opening SSH tunnel to ${LESAVKA_SERVER_HOST}:127.0.0.1:${LESAVKA_SERVER_PORT} on localhost:${port}" + ssh ${SSH_OPTS} -N \ + -o ExitOnForwardFailure=yes \ + -L "127.0.0.1:${port}:127.0.0.1:${LESAVKA_SERVER_PORT}" \ + "${LESAVKA_SERVER_HOST}" >/tmp/lesavka-mirrored-sync-tunnel.log 2>&1 & + SERVER_TUNNEL_PID=$! + wait_for_tcp "127.0.0.1" "${port}" 5 + RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${port}" + echo "==> resolved Lesavka server addr: ${RESOLVED_LESAVKA_SERVER_ADDR}" + echo " ↪ tunneled to ${LESAVKA_SERVER_HOST}:127.0.0.1:${LESAVKA_SERVER_PORT}" +} + +start_local_stimulus() { + echo "==> starting local A/V stimulus server" + python3 "${REPO_ROOT}/scripts/manual/local_av_stimulus.py" \ + --port "${STIMULUS_PORT}" \ + --status "${STIMULUS_STATUS}" \ + --duration-seconds "${PROBE_DURATION_SECONDS}" \ + --warmup-seconds "${PROBE_WARMUP_SECONDS}" \ + --pulse-period-ms "${PROBE_PULSE_PERIOD_MS}" \ + --pulse-width-ms "${PROBE_PULSE_WIDTH_MS}" \ + --marker-tick-period "${PROBE_MARKER_TICK_PERIOD}" \ + --event-width-codes "${PROBE_EVENT_WIDTH_CODES}" \ + >"${ARTIFACT_DIR}/stimulus-server.log" 2>&1 & + STIMULUS_PID=$! + wait_for_url "http://127.0.0.1:${STIMULUS_PORT}/status" 10 + + mkdir -p "${STIMULUS_PROFILE}" + cat >"${STIMULUS_PROFILE}/user.js" <<'PREFS' +user_pref("media.autoplay.default", 0); +user_pref("media.autoplay.blocking_policy", 0); +user_pref("toolkit.telemetry.reportingpolicy.firstRun", false); +user_pref("browser.shell.checkDefaultBrowser", false); +user_pref("browser.tabs.warnOnClose", false); +user_pref("browser.startup.page", 1); +user_pref("browser.aboutwelcome.enabled", false); +PREFS + printf 'user_pref("browser.startup.homepage", "http://127.0.0.1:%s/");\n' "${STIMULUS_PORT}" >>"${STIMULUS_PROFILE}/user.js" + echo "==> opening local stimulus browser" + "${LOCAL_BROWSER}" --new-instance --no-remote --profile "${STIMULUS_PROFILE}" \ + "http://127.0.0.1:${STIMULUS_PORT}/" \ + >"${ARTIFACT_DIR}/stimulus-browser.log" 2>&1 & + STIMULUS_BROWSER_PID=$! + + echo "==> position check" + echo " Point the real webcam at the stimulus window and keep the selected microphone hearing the tone." + echo " Waiting ${STIMULUS_SETTLE_SECONDS}s before starting the mirrored capture." + sleep "${STIMULUS_SETTLE_SECONDS}" +} + +start_real_lesavka_client() { + echo "camera=1 microphone=1 audio=0 $(date +%s%N)" >"${MEDIA_CONTROL}" + echo "==> starting real headless lesavka-client sender" + ( + cd "${REPO_ROOT}" + LESAVKA_HEADLESS=1 \ + LESAVKA_SERVER_ADDR="${RESOLVED_LESAVKA_SERVER_ADDR}" \ + LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ + LESAVKA_MEDIA_CONTROL="${MEDIA_CONTROL}" \ + RUST_LOG="${RUST_LOG:-warn,lesavka_client::app=info,lesavka_client::input::camera=info,lesavka_client::input::microphone=info}" \ + "${REPO_ROOT}/target/debug/lesavka-client" --no-launcher --server "${RESOLVED_LESAVKA_SERVER_ADDR}" + ) >"${CLIENT_LOG}" 2>&1 & + CLIENT_PID=$! + sleep 4 + if ! kill -0 "${CLIENT_PID}" >/dev/null 2>&1; then + echo "lesavka-client exited before mirrored probe could start; see ${CLIENT_LOG}" >&2 + exit 1 + fi +} + +run_browser_capture_with_real_driver() { + local record_seconds=$((PROBE_DURATION_SECONDS + 3)) + local wait_seconds=$((PROBE_DURATION_SECONDS + 2)) + local driver_command="curl -fsS -X POST http://127.0.0.1:${STIMULUS_PORT}/start >/dev/null; sleep ${wait_seconds}" + echo "==> starting Tethys browser consumer and mirrored driver" + BROWSER_RECORD_SECONDS="${record_seconds}" \ + PROBE_DURATION_SECONDS="${PROBE_DURATION_SECONDS}" \ + BROWSER_SYNC_DRIVER_COMMAND="${driver_command}" \ + SYNC_ANALYZE_EVENT_WIDTH_CODES="${PROBE_EVENT_WIDTH_CODES}" \ + LOCAL_OUTPUT_DIR="${ARTIFACT_DIR}" \ + LESAVKA_SERVER_ADDR="${RESOLVED_LESAVKA_SERVER_ADDR}" \ + "${REPO_ROOT}/scripts/manual/run_upstream_browser_av_sync.sh" +} + +echo "==> prebuilding real client and analyzer" +( + cd "${REPO_ROOT}" + cargo build -p lesavka_client --bin lesavka-client --bin lesavka-sync-analyze >/dev/null +) + +start_server_tunnel_if_needed +start_local_stimulus +start_real_lesavka_client +run_browser_capture_with_real_driver + +echo "==> mirrored probe complete" +echo "artifact_dir: ${ARTIFACT_DIR}" +echo "client_log: ${CLIENT_LOG}" +echo "stimulus_status: ${STIMULUS_STATUS}" diff --git a/server/Cargo.toml b/server/Cargo.toml index 5a8e580..bce8f7c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.16.12" +version = "0.16.13" edition = "2024" autobins = false diff --git a/testing/tests/client_manual_sync_script_contract.rs b/testing/tests/client_manual_sync_script_contract.rs index 68739fb..5a9fbd4 100644 --- a/testing/tests/client_manual_sync_script_contract.rs +++ b/testing/tests/client_manual_sync_script_contract.rs @@ -6,6 +6,11 @@ //! port is not exposed on the public SSH endpoint. const SYNC_SCRIPT: &str = include_str!("../../scripts/manual/run_upstream_av_sync.sh"); +const BROWSER_SYNC_SCRIPT: &str = + include_str!("../../scripts/manual/run_upstream_browser_av_sync.sh"); +const MIRRORED_SYNC_SCRIPT: &str = + include_str!("../../scripts/manual/run_upstream_mirrored_av_sync.sh"); +const LOCAL_STIMULUS: &str = include_str!("../../scripts/manual/local_av_stimulus.py"); #[test] fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { @@ -55,3 +60,65 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() { "auto server resolution should not guess a public gRPC host when SSH is already required" ); } + +#[test] +fn browser_sync_script_can_delegate_to_a_real_path_driver() { + for expected in [ + "BROWSER_RECORD_SECONDS=${BROWSER_RECORD_SECONDS:-${PROBE_DURATION_SECONDS}}", + "BROWSER_SYNC_DRIVER_COMMAND=${BROWSER_SYNC_DRIVER_COMMAND:-}", + "SYNC_ANALYZE_EVENT_WIDTH_CODES=${SYNC_ANALYZE_EVENT_WIDTH_CODES:-}", + "==> running custom browser sync driver", + "bash -lc \"${BROWSER_SYNC_DRIVER_COMMAND}\"", + "--event-width-codes", + "--report-dir \"${LOCAL_REPORT_DIR}\"", + ] { + assert!( + BROWSER_SYNC_SCRIPT.contains(expected), + "browser sync script should contain {expected}" + ); + } +} + +#[test] +fn mirrored_sync_script_uses_real_client_capture_path() { + for expected in [ + "local_av_stimulus.py", + "lesavka-client", + "LESAVKA_HEADLESS=1", + "LESAVKA_MEDIA_CONTROL=\"${MEDIA_CONTROL}\"", + "--no-launcher --server \"${RESOLVED_LESAVKA_SERVER_ADDR}\"", + "BROWSER_SYNC_DRIVER_COMMAND=\"${driver_command}\"", + "SYNC_ANALYZE_EVENT_WIDTH_CODES=\"${PROBE_EVENT_WIDTH_CODES}\"", + "run_upstream_browser_av_sync.sh", + "Point the real webcam at the stimulus window", + ] { + assert!( + MIRRORED_SYNC_SCRIPT.contains(expected), + "mirrored sync script should contain {expected}" + ); + } + assert!( + !MIRRORED_SYNC_SCRIPT.contains("lesavka-sync-probe"), + "mirrored sync must not use the synthetic direct sender" + ); +} + +#[test] +fn local_stimulus_matches_sync_analyzer_pulse_contract() { + for expected in [ + "--warmup-seconds", + "--pulse-period-ms", + "--pulse-width-ms", + "--marker-tick-period", + "--event-width-codes", + "event_width_codes", + "widthCode", + "oscillator.frequency.value = 880", + "Point the real webcam at this window", + ] { + assert!( + LOCAL_STIMULUS.contains(expected), + "local stimulus should contain {expected}" + ); + } +}