release: ship lesavka 0.16.11

This commit is contained in:
Brad Stein 2026-04-30 22:23:29 -03:00
parent c12f5bf50c
commit 0968f5aa8d
16 changed files with 877 additions and 167 deletions

96
AGENTS.md Normal file
View File

@ -0,0 +1,96 @@
# Lesavka Agent Notes
## A/V Sync Probe And Lip-Sync Validation Checklist
Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind video even though internal client/server telemetry reported fresh uplink packets. Treat this as a product correctness failure, not a calibration issue. Do not resume blind lip-sync tuning until the probe can explain where delay appears.
### Operating Principles
- Avoid hard-resetting USB, UVC, UAC, display managers, or remote hosts unless the user explicitly approves it.
- Prefer observation and reversible user-space probes before changing media pipelines.
- Treat Tethys-only SSH/device inspection as a development luxury, not a production dependency.
- Do not claim lip sync is fixed from internal telemetry alone; require end-to-end device-level evidence.
- Keep this checklist updated as work lands.
### Phase 1: Build The Probe
- [x] Create this tracked checklist in `AGENTS.md`.
- [x] Inventory existing `client/src/sync_probe/` code and decide what can be reused.
- Reuse the existing synthetic beacon in `client/src/sync_probe/`.
- Reuse the existing Tethys capture harness in `scripts/manual/run_upstream_av_sync.sh`.
- Reuse and extend `lesavka-sync-analyze`; current gap is structured evidence output, not capture generation.
- [x] Define the phase-1 output contract:
- [x] `report.json`
- [x] `report.txt`
- [x] per-event rows with event id, video time, audio time, skew, and confidence
- [x] pass/fail verdict using preferred/acceptable/catastrophic thresholds
- [x] Add a deterministic local sync beacon source:
- [x] video flash pattern with event identity or cadence
- [x] simultaneous audio click/beep
- [x] stable event schedule suitable for automated detection
- [x] Add a Tethys-side capture probe:
- [x] capture Lesavka UVC video device
- [x] capture Lesavka UAC microphone device
- [x] record enough raw evidence for debugging when detection fails
- [x] detect video flashes
- [x] detect audio clicks
- [x] pair events and compute skew
- [x] Add a runner that can launch or instruct the Tethys probe safely over SSH without rebooting or restarting the desktop.
- [x] Store probe artifacts under `/tmp/lesavka-sync-probe-*` by default.
- [x] Keep the probe usable without Google Meet first; Google Meet validation is a later application-level check.
### Phase 2: Use Probe To Root-Cause Desync
- [x] Run probe through direct Lesavka UVC/UAC devices on Tethys.
- First live run reached the devices but exposed analyzer/tooling gaps instead of a valid skew report.
- Fixed the manual probe tunnel to preserve HTTPS/mTLS through SSH (`LESAVKA_SERVER_SCHEME=https`, `LESAVKA_TLS_DOMAIN=lesavka-server`).
- Fixed analyzer handling for MJPEG captures whose FFprobe metadata over-reports frames versus decodable video frames.
- [x] Compare client-generated event times against Tethys-observed times.
- The preserved Tethys capture had 323 decodable frames with constant brightness, so no video flash reached UVC.
- Server logs show the probe entered a stale upstream session and dropped audio as ~326 seconds late.
- [x] Identify whether delay appears before server planning, at server UAC sink, at UVC helper, inside Tethys device capture, or inside browser/WebRTC.
- Current root cause is server planning/session lifecycle, before UVC/UAC sink output.
- A previous one-sided microphone session started at 2026-04-30T22:59:52Z; the new probe at 2026-05-01T00:57:08Z inherited its stale playout epoch.
- [x] Add diagnostics for whichever stage is hiding delay.
- Existing server lifecycle/planning logs were enough to isolate this run; next gate should preserve these as structured artifacts.
- [x] Do not tune calibration offsets until gross backlog is ruled out.
- No calibration offsets were changed during the stale-session investigation.
- Current evidence points at lifecycle/session planning, not an offset problem.
### Phase 3: Fix Lesavka With Evidence
- [x] If stale upstream lifecycle is confirmed, reset shared A/V timing anchors when a new stream replaces an existing owner.
- Added a lifecycle guard so normal camera/microphone stream replacement clears stale shared timing anchors before re-pairing.
- Kept soft microphone recovery intentionally separate so it supersedes the mic owner without disturbing an active healthy camera/shared clock.
- Added regression coverage for stale timing-anchor replacement and soft microphone recovery preservation.
- [ ] If UAC sink backlog is confirmed, make UAC output freshness-bounded.
- [ ] If audio progress is marked too early, move/augment progress telemetry to reflect actual sink emission readiness.
- [ ] If UVC and UAC are using incompatible freshness semantics, unify them behind one live-media policy.
- [ ] If browser/WebRTC adds delay after devices are already synced, document the application boundary and add browser-specific mitigation or guidance.
### Phase 4: Gate And Release Criteria
- [x] Add deterministic unit/integration tests for probe analysis logic.
- [x] Add a hardware-in-the-loop/manual gate artifact schema for real Tethys probe runs.
- [x] Update `scripts/ci/media_reliability_gate.sh` to report probe evidence when present.
- Gate now reads `LESAVKA_SYNC_PROBE_REPORT_JSON`, `LESAVKA_SYNC_PROBE_REPORT_DIR`, or `target/media-reliability-gate/sync-probe/report.json`.
- Gate emits sync-probe verdict/check metrics, skew metrics, event counts, and a verdict info metric.
- [x] Require a fresh probe report before declaring lip sync fixed.
- Gate now supports `LESAVKA_REQUIRE_SYNC_PROBE=1`, which fails media reliability when a valid passing probe report is absent.
- Product/release judgment still requires a new live Theia/Tethys probe after the lifecycle fix is installed.
- [ ] Suggested thresholds:
- [x] preferred: p95 skew <= 35 ms
- [x] acceptable: p95 skew <= 80 ms
- [x] gross failure: sustained skew > 250 ms
- [x] catastrophic failure: any sustained skew near or above 1000 ms
### Open Questions
- [x] Decide whether the phase-1 beacon should run as a separate binary, a hidden client mode, or both.
- [x] Decide whether Tethys probe should be Rust-only, shell plus GStreamer, or a hybrid.
- [ ] Confirm whether sudo/Vault access is available for installing missing probe dependencies on Theia/Tethys.
- Non-sudo server journal inspection worked; noninteractive sudo over SSH still needs an explicit TTY/password path.
### Validation Evidence
- [x] `cargo test -p lesavka_server upstream_media_runtime::tests::lifecycle`
- [x] `cargo test -p lesavka_client sync_probe::analyze`
- [x] `cargo test -p lesavka_testing upstream_sync_script_tunnels_auto_server_addr_through_ssh`
- [x] `bash -n scripts/ci/media_reliability_gate.sh`
- [x] `cargo test -p lesavka_testing media_reliability_gate_reports_direct_sync_probe_evidence`
- [x] `LESAVKA_REQUIRE_SYNC_PROBE=1 ./scripts/ci/media_reliability_gate.sh`
- Used a synthetic passing report at `target/media-reliability-gate/sync-probe/report.json` to verify gate parsing/enforcement.
- This validates CI glue only; a real Theia/Tethys probe is still required for product judgment.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.16.9" version = "0.16.11"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.16.9" version = "0.16.11"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.16.9" version = "0.16.11"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.16.9" version = "0.16.11"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -2,10 +2,13 @@
use anyhow::{Context, Result, bail}; use anyhow::{Context, Result, bail};
#[cfg(not(coverage))] #[cfg(not(coverage))]
use serde::Serialize; use serde::Serialize;
#[cfg(any(not(coverage), test))]
use std::path::PathBuf;
#[cfg(not(coverage))] #[cfg(not(coverage))]
use lesavka_client::sync_probe::analyze::{ use lesavka_client::sync_probe::analyze::{
SyncAnalysisOptions, SyncAnalysisReport, SyncCalibrationRecommendation, analyze_capture, SyncAnalysisOptions, SyncAnalysisReport, SyncAnalysisVerdict, SyncCalibrationRecommendation,
analyze_capture,
}; };
#[cfg(not(coverage))] #[cfg(not(coverage))]
@ -14,80 +17,173 @@ struct SyncAnalyzeOutput<'a> {
#[serde(flatten)] #[serde(flatten)]
report: &'a SyncAnalysisReport, report: &'a SyncAnalysisReport,
calibration: SyncCalibrationRecommendation, calibration: SyncCalibrationRecommendation,
verdict: SyncAnalysisVerdict,
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn main() -> Result<()> { fn main() -> Result<()> {
let (capture_path, emit_json) = parse_args(std::env::args().skip(1))?; let args = parse_args(std::env::args().skip(1))?;
let report = analyze_capture(&capture_path, &SyncAnalysisOptions::default()) let report = analyze_capture(&args.capture_path, &SyncAnalysisOptions::default())
.with_context(|| format!("analyzing sync capture {}", capture_path.display()))?; .with_context(|| format!("analyzing sync capture {}", args.capture_path.display()))?;
let calibration = report.calibration_recommendation(); let calibration = report.calibration_recommendation();
let verdict = report.verdict();
if emit_json { let human_report = format_human_report(&args.capture_path, &report, &calibration, &verdict);
let output = SyncAnalyzeOutput { let output = SyncAnalyzeOutput {
report: &report, report: &report,
calibration, calibration,
verdict,
}; };
if let Some(report_dir) = &args.report_dir {
write_report_dir(report_dir, &human_report, &output)?;
}
if args.emit_json {
println!( println!(
"{}", "{}",
serde_json::to_string_pretty(&output).context("serializing JSON report")? serde_json::to_string_pretty(&output).context("serializing JSON report")?
); );
} else { } else {
println!("A/V sync report for {}", capture_path.display()); print!("{human_report}");
println!("- video onsets: {}", report.video_event_count);
println!("- audio onsets: {}", report.audio_event_count);
println!("- paired pulses: {}", report.paired_event_count);
println!(
"- first skew: {:+.1} ms (audio after video is positive)",
report.first_skew_ms
);
println!("- last skew: {:+.1} ms", report.last_skew_ms);
println!("- mean skew: {:+.1} ms", report.mean_skew_ms);
println!("- median skew: {:+.1} ms", report.median_skew_ms);
println!("- max abs skew: {:.1} ms", report.max_abs_skew_ms);
println!("- drift: {:+.1} ms", report.drift_ms);
println!("- calibration ready: {}", calibration.ready);
println!(
"- recommended audio offset adjust: {:+} us",
calibration.recommended_audio_offset_adjust_us
);
println!(
"- alternative video offset adjust: {:+} us",
calibration.recommended_video_offset_adjust_us
);
println!("- calibration note: {}", calibration.note);
} }
Ok(()) Ok(())
} }
#[cfg(any(not(coverage), test))] #[cfg(any(not(coverage), test))]
fn parse_args<I, S>(args: I) -> Result<(std::path::PathBuf, bool)> #[derive(Debug, PartialEq, Eq)]
struct AnalyzeArgs {
capture_path: PathBuf,
emit_json: bool,
report_dir: Option<PathBuf>,
}
#[cfg(any(not(coverage), test))]
fn parse_args<I, S>(args: I) -> Result<AnalyzeArgs>
where where
I: IntoIterator<Item = S>, I: IntoIterator<Item = S>,
S: Into<String>, S: Into<String>,
{ {
let args = args.into_iter().map(Into::into).collect::<Vec<_>>(); let args = args.into_iter().map(Into::into).collect::<Vec<_>>();
if args.is_empty() || args.iter().any(|arg| arg == "--help" || arg == "-h") { if args.is_empty() || args.iter().any(|arg| arg == "--help" || arg == "-h") {
println!("Usage: lesavka-sync-analyze <capture.mkv> [--json]"); println!("Usage: lesavka-sync-analyze <capture.mkv> [--json] [--report-dir <dir>]");
std::process::exit(0); std::process::exit(0);
} }
let mut emit_json = false; let mut emit_json = false;
let mut capture_path = None::<std::path::PathBuf>; let mut report_dir = None::<PathBuf>;
for arg in args { let mut capture_path = None::<PathBuf>;
let mut iter = args.into_iter();
while let Some(arg) = iter.next() {
if arg == "--json" { if arg == "--json" {
emit_json = true; emit_json = true;
continue; continue;
} }
if arg == "--report-dir" {
let Some(dir) = iter.next() else {
bail!("--report-dir requires a directory");
};
report_dir = Some(PathBuf::from(dir));
continue;
}
if let Some(dir) = arg.strip_prefix("--report-dir=") {
if dir.is_empty() {
bail!("--report-dir requires a directory");
}
report_dir = Some(PathBuf::from(dir));
continue;
}
if capture_path.is_some() { if capture_path.is_some() {
bail!("unexpected extra argument `{arg}`"); bail!("unexpected extra argument `{arg}`");
} }
capture_path = Some(std::path::PathBuf::from(arg)); capture_path = Some(PathBuf::from(arg));
} }
let capture_path = capture_path.context("capture path is required")?; let capture_path = capture_path.context("capture path is required")?;
Ok((capture_path, emit_json)) Ok(AnalyzeArgs {
capture_path,
emit_json,
report_dir,
})
}
#[cfg(not(coverage))]
fn format_human_report(
capture_path: &std::path::Path,
report: &SyncAnalysisReport,
calibration: &SyncCalibrationRecommendation,
verdict: &SyncAnalysisVerdict,
) -> String {
format!(
"\
A/V sync report for {capture}
- verdict: {status} ({passed})
- verdict reason: {reason}
- p95 abs skew: {p95:.1} ms
- video onsets: {video_events}
- audio onsets: {audio_events}
- paired pulses: {paired_events}
- first skew: {first_skew:+.1} ms (audio after video is positive)
- last skew: {last_skew:+.1} ms
- mean skew: {mean_skew:+.1} ms
- median skew: {median_skew:+.1} ms
- max abs skew: {max_abs:.1} ms
- drift: {drift:+.1} ms
- calibration ready: {cal_ready}
- recommended audio offset adjust: {audio_adjust:+} us
- alternative video offset adjust: {video_adjust:+} us
- calibration note: {cal_note}
",
capture = capture_path.display(),
status = verdict.status,
passed = if verdict.passed { "pass" } else { "fail" },
reason = verdict.reason,
p95 = verdict.p95_abs_skew_ms,
video_events = report.video_event_count,
audio_events = report.audio_event_count,
paired_events = report.paired_event_count,
first_skew = report.first_skew_ms,
last_skew = report.last_skew_ms,
mean_skew = report.mean_skew_ms,
median_skew = report.median_skew_ms,
max_abs = report.max_abs_skew_ms,
drift = report.drift_ms,
cal_ready = calibration.ready,
audio_adjust = calibration.recommended_audio_offset_adjust_us,
video_adjust = calibration.recommended_video_offset_adjust_us,
cal_note = calibration.note,
)
}
#[cfg(not(coverage))]
fn write_report_dir(
report_dir: &std::path::Path,
human_report: &str,
output: &SyncAnalyzeOutput<'_>,
) -> Result<()> {
std::fs::create_dir_all(report_dir)
.with_context(|| format!("creating report directory {}", report_dir.display()))?;
std::fs::write(report_dir.join("report.txt"), human_report)
.with_context(|| format!("writing {}", report_dir.join("report.txt").display()))?;
std::fs::write(
report_dir.join("report.json"),
serde_json::to_string_pretty(output).context("serializing JSON report")?,
)
.with_context(|| format!("writing {}", report_dir.join("report.json").display()))?;
write_events_csv(&report_dir.join("events.csv"), output.report)?;
Ok(())
}
#[cfg(not(coverage))]
fn write_events_csv(path: &std::path::Path, report: &SyncAnalysisReport) -> Result<()> {
let mut csv = String::from("event_id,video_time_s,audio_time_s,skew_ms,confidence\n");
for event in &report.paired_events {
csv.push_str(&format!(
"{},{:.9},{:.9},{:.6},{:.6}\n",
event.event_id, event.video_time_s, event.audio_time_s, event.skew_ms, event.confidence
));
}
std::fs::write(path, csv).with_context(|| format!("writing {}", path.display()))
} }
#[cfg(coverage)] #[cfg(coverage)]
@ -99,9 +195,20 @@ mod tests {
#[test] #[test]
fn parse_args_accepts_capture_path_and_json_flag() { fn parse_args_accepts_capture_path_and_json_flag() {
let (path, json) = parse_args(["capture.mkv", "--json"]).expect("args"); let args = parse_args(["capture.mkv", "--json"]).expect("args");
assert_eq!(path, std::path::PathBuf::from("capture.mkv")); assert_eq!(args.capture_path, std::path::PathBuf::from("capture.mkv"));
assert!(json); assert!(args.emit_json);
assert_eq!(args.report_dir, None);
}
#[test]
fn parse_args_accepts_report_dir() {
let args = parse_args(["capture.mkv", "--report-dir", "/tmp/probe"]).expect("args");
assert_eq!(args.capture_path, std::path::PathBuf::from("capture.mkv"));
assert_eq!(
args.report_dir,
Some(std::path::PathBuf::from("/tmp/probe"))
);
} }
#[test] #[test]

View File

@ -6,7 +6,7 @@ mod report;
#[cfg(test)] #[cfg(test)]
pub(super) mod test_support; pub(super) mod test_support;
use anyhow::Result; use anyhow::{Result, bail};
use std::path::Path; use std::path::Path;
use media_extract::{extract_audio_samples, extract_video_brightness, extract_video_timestamps}; use media_extract::{extract_audio_samples, extract_video_brightness, extract_video_timestamps};
@ -15,7 +15,10 @@ use onset_detection::{
}; };
pub use onset_detection::{detect_audio_onsets, detect_video_onsets}; pub use onset_detection::{detect_audio_onsets, detect_video_onsets};
pub use report::{SyncAnalysisOptions, SyncAnalysisReport, SyncCalibrationRecommendation}; pub use report::{
SyncAnalysisOptions, SyncAnalysisReport, SyncAnalysisVerdict, SyncCalibrationRecommendation,
SyncEventPair,
};
/// Analyzes a captured upstream sync-probe file by extracting video and audio /// Analyzes a captured upstream sync-probe file by extracting video and audio
/// pulses, then correlating them into skew and drift metrics. /// pulses, then correlating them into skew and drift metrics.
@ -23,8 +26,9 @@ pub fn analyze_capture(
capture_path: &Path, capture_path: &Path,
options: &SyncAnalysisOptions, options: &SyncAnalysisOptions,
) -> Result<SyncAnalysisReport> { ) -> Result<SyncAnalysisReport> {
let timestamps = extract_video_timestamps(capture_path)?; let raw_timestamps = extract_video_timestamps(capture_path)?;
let brightness = extract_video_brightness(capture_path, timestamps.len())?; let brightness = extract_video_brightness(capture_path)?;
let timestamps = reconcile_video_timestamps(raw_timestamps, brightness.len())?;
let video_segments = detect_video_segments(&timestamps, &brightness)?; let video_segments = detect_video_segments(&timestamps, &brightness)?;
let audio_samples = extract_audio_samples(capture_path)?; let audio_samples = extract_audio_samples(capture_path)?;
@ -44,6 +48,36 @@ pub fn analyze_capture(
) )
} }
fn reconcile_video_timestamps(timestamps: Vec<f64>, frame_count: usize) -> Result<Vec<f64>> {
if frame_count == 0 {
bail!("capture did not contain any decoded video brightness frames");
}
if timestamps.len() == frame_count {
return Ok(timestamps);
}
let first = timestamps.first().copied();
let last = timestamps.last().copied();
if let (Some(first), Some(last)) = (first, last)
&& frame_count > 1
&& last > first
{
let step = (last - first) / (frame_count - 1) as f64;
return Ok((0..frame_count)
.map(|index| first + index as f64 * step)
.collect());
}
if timestamps.len() > frame_count {
return Ok(timestamps.into_iter().take(frame_count).collect());
}
bail!(
"ffprobe returned {} video timestamps for {frame_count} decoded brightness frames and no usable duration",
timestamps.len()
)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::test_support::{ use super::test_support::{
@ -51,6 +85,7 @@ mod tests {
with_fake_media_tools, with_fake_media_tools,
}; };
use super::{SyncAnalysisOptions, analyze_capture}; use super::{SyncAnalysisOptions, analyze_capture};
use crate::sync_probe::analyze::reconcile_video_timestamps;
#[test] #[test]
fn analyze_capture_runs_against_fake_media_tools() { fn analyze_capture_runs_against_fake_media_tools() {
@ -84,4 +119,42 @@ mod tests {
}, },
); );
} }
#[test]
fn analyze_capture_synthesizes_timestamps_when_mjpeg_metadata_overreports_frames() {
let metadata_timestamps = (0..301)
.map(|index| index as f64 * 0.004)
.collect::<Vec<_>>();
let brightness = (0..25)
.map(|index| if matches!(index, 0 | 10 | 20) { 250 } else { 5 })
.collect::<Vec<_>>();
let audio = click_track_samples(&[0.05, 0.55, 1.05], 67_000);
with_fake_media_tools(
&frame_json(&metadata_timestamps),
&thumbnail_video_bytes(&brightness),
&audio_samples_to_bytes(&audio),
|capture_path| {
let report = analyze_capture(
capture_path,
&SyncAnalysisOptions {
pulse_period_s: 0.5,
..SyncAnalysisOptions::default()
},
)
.expect("analysis report");
assert_eq!(report.video_event_count, 3);
assert_eq!(report.audio_event_count, 3);
assert_eq!(report.paired_event_count, 3);
assert!(report.max_abs_skew_ms < 120.0);
},
);
}
#[test]
fn reconcile_video_timestamps_resamples_metadata_span_to_decoded_frame_count() {
let reconciled = reconcile_video_timestamps(vec![0.0, 0.004, 0.008, 1.0], 3)
.expect("reconciled timestamps");
assert_eq!(reconciled, vec![0.0, 0.5, 1.0]);
}
} }

View File

@ -46,13 +46,7 @@ pub(super) fn extract_video_timestamps(capture_path: &Path) -> Result<Vec<f64>>
Ok(timestamps) Ok(timestamps)
} }
pub(super) fn extract_video_brightness( pub(super) fn extract_video_brightness(capture_path: &Path) -> Result<Vec<u8>> {
capture_path: &Path,
expected_frames: usize,
) -> Result<Vec<u8>> {
if expected_frames == 0 {
bail!("expected at least one video frame when extracting brightness");
}
let output = run_command( let output = run_command(
Command::new("ffmpeg") Command::new("ffmpeg")
.arg("-hide_banner") .arg("-hide_banner")
@ -87,15 +81,10 @@ pub(super) fn extract_video_brightness(
); );
} }
let extracted_frames = output.len() / frame_pixels; let extracted_frames = output.len() / frame_pixels;
if extracted_frames < expected_frames {
bail!(
"ffmpeg emitted only {extracted_frames} brightness frames for {expected_frames} expected timestamps"
);
}
Ok(output Ok(output
.chunks_exact(frame_pixels) .chunks_exact(frame_pixels)
.take(expected_frames) .take(extracted_frames)
.map(summarize_frame_brightness) .map(summarize_frame_brightness)
.collect()) .collect())
} }
@ -201,8 +190,8 @@ mod tests {
&thumbnail_video_bytes(&brightness), &thumbnail_video_bytes(&brightness),
&[1, 0], &[1, 0],
|capture_path| { |capture_path| {
let parsed = extract_video_brightness(capture_path, 1).expect("video brightness"); let parsed = extract_video_brightness(capture_path).expect("video brightness");
assert_eq!(parsed, vec![16]); assert_eq!(parsed, vec![16, 40, 77]);
}, },
); );
} }
@ -214,8 +203,7 @@ mod tests {
&[], &[],
&[1, 0], &[1, 0],
|capture_path| { |capture_path| {
let error = let error = extract_video_brightness(capture_path).expect_err("empty brightness");
extract_video_brightness(capture_path, 1).expect_err("empty brightness");
assert!( assert!(
error error
.to_string() .to_string()
@ -233,7 +221,7 @@ mod tests {
&thumbnail_video_bytes(&brightness), &thumbnail_video_bytes(&brightness),
&[1, 0], &[1, 0],
|capture_path| { |capture_path| {
let parsed = extract_video_brightness(capture_path, 3).expect("video brightness"); let parsed = extract_video_brightness(capture_path).expect("video brightness");
assert_eq!(parsed, vec![20, 26, 20]); assert_eq!(parsed, vec![20, 26, 20]);
}, },
); );
@ -242,8 +230,7 @@ mod tests {
#[test] #[test]
fn extract_video_brightness_rejects_truncated_frame_data() { fn extract_video_brightness_rejects_truncated_frame_data() {
with_fake_media_tools(&frame_json(&[0.0]), &[1, 2, 3], &[1, 0], |capture_path| { with_fake_media_tools(&frame_json(&[0.0]), &[1, 2, 3], &[1, 0], |capture_path| {
let error = let error = extract_video_brightness(capture_path).expect_err("truncated frame bytes");
extract_video_brightness(capture_path, 1).expect_err("truncated frame bytes");
assert!(error.to_string().contains("not divisible")); assert!(error.to_string().contains("not divisible"));
}); });
} }

View File

@ -1,7 +1,7 @@
use anyhow::{Result, bail}; use anyhow::{Result, bail};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use crate::sync_probe::analyze::report::SyncAnalysisReport; use crate::sync_probe::analyze::report::{SyncAnalysisReport, SyncEventPair};
use super::{PulseSegment, median}; use super::{PulseSegment, median};
@ -39,7 +39,7 @@ pub(super) fn correlate_onsets(
let video_pulses = index_onsets_by_spacing(video_onsets_s, pulse_period_s); let video_pulses = index_onsets_by_spacing(video_onsets_s, pulse_period_s);
let audio_pulses = index_onsets_by_spacing(audio_onsets_s, pulse_period_s); let audio_pulses = index_onsets_by_spacing(audio_onsets_s, pulse_period_s);
let offset_candidates = candidate_index_offsets(&video_pulses, &audio_pulses); let offset_candidates = candidate_index_offsets(&video_pulses, &audio_pulses);
let mut skews_ms = best_skews_for_index_offsets( let mut pairs = best_pairs_for_index_offsets(
&video_pulses, &video_pulses,
&audio_pulses, &audio_pulses,
&offset_candidates, &offset_candidates,
@ -47,24 +47,29 @@ pub(super) fn correlate_onsets(
expected_start_skew_ms, expected_start_skew_ms,
); );
if skews_ms.is_empty() && video_onsets_s.len() == 1 && audio_onsets_s.len() == 1 { if pairs.is_empty() && video_onsets_s.len() == 1 && audio_onsets_s.len() == 1 {
let video_phase_s = estimate_phase(video_onsets_s, pulse_period_s); let video_phase_s = estimate_phase(video_onsets_s, pulse_period_s);
let audio_phase_s = estimate_phase(audio_onsets_s, pulse_period_s); let audio_phase_s = estimate_phase(audio_onsets_s, pulse_period_s);
let phase_skew_ms = let phase_skew_ms =
shortest_wrapped_difference(audio_phase_s - video_phase_s, pulse_period_s) * 1000.0; shortest_wrapped_difference(audio_phase_s - video_phase_s, pulse_period_s) * 1000.0;
if phase_skew_ms.abs() <= max_pair_gap_s * 1000.0 { if phase_skew_ms.abs() <= max_pair_gap_s * 1000.0 {
skews_ms.push(phase_skew_ms); pairs.push(MatchedOnsetPair::new(
video_onsets_s[0],
audio_onsets_s[0],
phase_skew_ms,
max_pair_gap_s,
));
} }
} }
if skews_ms.is_empty() { if pairs.is_empty() {
bail!("no audio/video pulse pairs were close enough to compare"); bail!("no audio/video pulse pairs were close enough to compare");
} }
Ok(sync_report_from_skews( Ok(sync_report_from_pairs(
common_window.filter_onsets(video_onsets_s), common_window.filter_onsets(video_onsets_s),
common_window.filter_onsets(audio_onsets_s), common_window.filter_onsets(audio_onsets_s),
skews_ms, pairs,
)) ))
} }
@ -131,7 +136,7 @@ pub(crate) fn correlate_segments(
video_marker_onsets, video_marker_onsets,
audio_marker_onsets, audio_marker_onsets,
); );
let mut skews_ms = best_skews_for_index_offsets( let mut pairs = best_pairs_for_index_offsets(
&video_indexed, &video_indexed,
&audio_indexed, &audio_indexed,
&offset_candidates, &offset_candidates,
@ -139,24 +144,29 @@ pub(crate) fn correlate_segments(
expected_start_skew_ms, expected_start_skew_ms,
); );
if skews_ms.is_empty() && video_onsets_s.len() == 1 && audio_onsets_s.len() == 1 { if pairs.is_empty() && video_onsets_s.len() == 1 && audio_onsets_s.len() == 1 {
let video_phase_s = estimate_phase(video_onsets_s, pulse_period_s); let video_phase_s = estimate_phase(video_onsets_s, pulse_period_s);
let audio_phase_s = estimate_phase(audio_onsets_s, pulse_period_s); let audio_phase_s = estimate_phase(audio_onsets_s, pulse_period_s);
let phase_skew_ms = let phase_skew_ms =
shortest_wrapped_difference(audio_phase_s - video_phase_s, pulse_period_s) * 1000.0; shortest_wrapped_difference(audio_phase_s - video_phase_s, pulse_period_s) * 1000.0;
if phase_skew_ms.abs() <= max_pair_gap_s * 1000.0 { if phase_skew_ms.abs() <= max_pair_gap_s * 1000.0 {
skews_ms.push(phase_skew_ms); pairs.push(MatchedOnsetPair::new(
video_onsets_s[0],
audio_onsets_s[0],
phase_skew_ms,
max_pair_gap_s,
));
} }
} }
if skews_ms.is_empty() { if pairs.is_empty() {
bail!("no audio/video pulse pairs were close enough to compare"); bail!("no audio/video pulse pairs were close enough to compare");
} }
Ok(sync_report_from_skews( Ok(sync_report_from_pairs(
video_onsets_s, video_onsets_s,
audio_onsets_s, audio_onsets_s,
skews_ms, pairs,
)) ))
} }
@ -318,35 +328,63 @@ fn pulse_indices_for_onsets(indexed: &BTreeMap<i64, f64>, marker_onsets: &[f64])
.collect() .collect()
} }
fn best_skews_for_index_offsets( #[derive(Clone, Debug, PartialEq)]
struct MatchedOnsetPair {
video_time_s: f64,
audio_time_s: f64,
skew_ms: f64,
confidence: f64,
}
impl MatchedOnsetPair {
fn new(video_time_s: f64, audio_time_s: f64, skew_ms: f64, max_pair_gap_s: f64) -> Self {
let max_pair_gap_ms = max_pair_gap_s * 1000.0;
let confidence = if max_pair_gap_ms <= 0.0 {
0.0
} else {
(1.0 - (skew_ms.abs() / max_pair_gap_ms)).clamp(0.0, 1.0)
};
Self {
video_time_s,
audio_time_s,
skew_ms,
confidence,
}
}
}
fn best_pairs_for_index_offsets(
video_indexed: &BTreeMap<i64, f64>, video_indexed: &BTreeMap<i64, f64>,
audio_indexed: &BTreeMap<i64, f64>, audio_indexed: &BTreeMap<i64, f64>,
offset_candidates: &[i64], offset_candidates: &[i64],
max_pair_gap_s: f64, max_pair_gap_s: f64,
expected_start_skew_ms: f64, expected_start_skew_ms: f64,
) -> Vec<f64> { ) -> Vec<MatchedOnsetPair> {
let max_pair_gap_ms = max_pair_gap_s * 1000.0; let max_pair_gap_ms = max_pair_gap_s * 1000.0;
let startup_phase_anchor_tolerance_ms = let startup_phase_anchor_tolerance_ms =
max_pair_gap_ms * STARTUP_PHASE_ANCHOR_TOLERANCE_FRACTION; max_pair_gap_ms * STARTUP_PHASE_ANCHOR_TOLERANCE_FRACTION;
let mut best: Option<(bool, usize, f64, f64, Vec<f64>)> = None; let mut best: Option<(bool, usize, f64, f64, Vec<MatchedOnsetPair>)> = None;
for offset in offset_candidates.iter().copied() { for offset in offset_candidates.iter().copied() {
let skews_ms = video_indexed let pairs = video_indexed
.iter() .iter()
.filter_map(|(pulse_index, video_time)| { .filter_map(|(pulse_index, video_time)| {
audio_indexed audio_indexed
.get(&(pulse_index + offset)) .get(&(pulse_index + offset))
.map(|audio_time| (audio_time - video_time) * 1000.0) .map(|audio_time| {
let skew_ms = (audio_time - video_time) * 1000.0;
MatchedOnsetPair::new(*video_time, *audio_time, skew_ms, max_pair_gap_s)
}) })
.filter(|skew_ms| skew_ms.abs() <= max_pair_gap_ms) })
.filter(|pair| pair.skew_ms.abs() <= max_pair_gap_ms)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
if skews_ms.is_empty() { if pairs.is_empty() {
continue; continue;
} }
let mean_abs_skew_ms = let mean_abs_skew_ms =
skews_ms.iter().map(|skew_ms| skew_ms.abs()).sum::<f64>() / skews_ms.len() as f64; pairs.iter().map(|pair| pair.skew_ms.abs()).sum::<f64>() / pairs.len() as f64;
let startup_phase_anchor_error_ms = (skews_ms[0] - expected_start_skew_ms).abs(); let startup_phase_anchor_error_ms = (pairs[0].skew_ms - expected_start_skew_ms).abs();
let startup_phase_anchor_consistent = let startup_phase_anchor_consistent =
startup_phase_anchor_error_ms <= startup_phase_anchor_tolerance_ms; startup_phase_anchor_error_ms <= startup_phase_anchor_tolerance_ms;
match &best { match &best {
@ -358,24 +396,24 @@ fn best_skews_for_index_offsets(
_, _,
)) if (!startup_phase_anchor_consistent && *best_anchor_consistent) )) if (!startup_phase_anchor_consistent && *best_anchor_consistent)
|| (startup_phase_anchor_consistent == *best_anchor_consistent || (startup_phase_anchor_consistent == *best_anchor_consistent
&& (skews_ms.len() < *best_count && (pairs.len() < *best_count
|| (skews_ms.len() == *best_count || (pairs.len() == *best_count
&& (startup_phase_anchor_error_ms > *best_anchor_error_ms && (startup_phase_anchor_error_ms > *best_anchor_error_ms
|| (startup_phase_anchor_error_ms == *best_anchor_error_ms || (startup_phase_anchor_error_ms == *best_anchor_error_ms
&& mean_abs_skew_ms >= *best_mean_abs_skew_ms))))) => {} && mean_abs_skew_ms >= *best_mean_abs_skew_ms))))) => {}
_ => { _ => {
best = Some(( best = Some((
startup_phase_anchor_consistent, startup_phase_anchor_consistent,
skews_ms.len(), pairs.len(),
startup_phase_anchor_error_ms, startup_phase_anchor_error_ms,
mean_abs_skew_ms, mean_abs_skew_ms,
skews_ms, pairs,
)) ))
} }
} }
} }
best.map(|(_, _, _, _, skews)| skews).unwrap_or_default() best.map(|(_, _, _, _, pairs)| pairs).unwrap_or_default()
} }
pub(super) fn marker_onsets(segments: &[PulseSegment], pulse_width_s: f64) -> Vec<f64> { pub(super) fn marker_onsets(segments: &[PulseSegment], pulse_width_s: f64) -> Vec<f64> {
@ -392,11 +430,26 @@ pub(super) fn shortest_wrapped_difference(delta_s: f64, pulse_period_s: f64) ->
((delta_s + half_period).rem_euclid(pulse_period_s)) - half_period ((delta_s + half_period).rem_euclid(pulse_period_s)) - half_period
} }
fn sync_report_from_skews( fn sync_report_from_pairs(
video_onsets_s: &[f64], video_onsets_s: &[f64],
audio_onsets_s: &[f64], audio_onsets_s: &[f64],
skews_ms: Vec<f64>, pairs: Vec<MatchedOnsetPair>,
) -> SyncAnalysisReport { ) -> SyncAnalysisReport {
let paired_events = pairs
.iter()
.enumerate()
.map(|(event_id, pair)| SyncEventPair {
event_id,
video_time_s: pair.video_time_s,
audio_time_s: pair.audio_time_s,
skew_ms: pair.skew_ms,
confidence: pair.confidence,
})
.collect::<Vec<_>>();
let skews_ms = paired_events
.iter()
.map(|event| event.skew_ms)
.collect::<Vec<_>>();
let mut sorted_skews = skews_ms.clone(); let mut sorted_skews = skews_ms.clone();
sorted_skews.sort_by(|left, right| left.total_cmp(right)); sorted_skews.sort_by(|left, right| left.total_cmp(right));
let first_skew_ms = *skews_ms.first().expect("paired skew list is not empty"); let first_skew_ms = *skews_ms.first().expect("paired skew list is not empty");
@ -422,5 +475,6 @@ fn sync_report_from_skews(
skews_ms, skews_ms,
video_onsets_s: video_onsets_s.to_vec(), video_onsets_s: video_onsets_s.to_vec(),
audio_onsets_s: audio_onsets_s.to_vec(), audio_onsets_s: audio_onsets_s.to_vec(),
paired_events,
} }
} }

View File

@ -8,6 +8,11 @@ const DEFAULT_MARKER_TICK_PERIOD: u32 = 5;
const CALIBRATION_MIN_PAIRED_EVENTS: usize = 8; const CALIBRATION_MIN_PAIRED_EVENTS: usize = 8;
const CALIBRATION_MAX_DRIFT_MS: f64 = 40.0; const CALIBRATION_MAX_DRIFT_MS: f64 = 40.0;
const CALIBRATION_SETTLED_SKEW_MS: f64 = 5.0; const CALIBRATION_SETTLED_SKEW_MS: f64 = 5.0;
const VERDICT_MIN_PAIRED_EVENTS: usize = 3;
const VERDICT_PREFERRED_P95_ABS_SKEW_MS: f64 = 35.0;
const VERDICT_ACCEPTABLE_P95_ABS_SKEW_MS: f64 = 80.0;
const VERDICT_GROSS_FAILURE_P95_ABS_SKEW_MS: f64 = 250.0;
const VERDICT_CATASTROPHIC_MAX_ABS_SKEW_MS: f64 = 1_000.0;
#[derive(Clone, Debug, PartialEq, Serialize)] #[derive(Clone, Debug, PartialEq, Serialize)]
pub struct SyncAnalysisReport { pub struct SyncAnalysisReport {
@ -23,6 +28,29 @@ pub struct SyncAnalysisReport {
pub skews_ms: Vec<f64>, pub skews_ms: Vec<f64>,
pub video_onsets_s: Vec<f64>, pub video_onsets_s: Vec<f64>,
pub audio_onsets_s: Vec<f64>, pub audio_onsets_s: Vec<f64>,
pub paired_events: Vec<SyncEventPair>,
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct SyncEventPair {
pub event_id: usize,
pub video_time_s: f64,
pub audio_time_s: f64,
pub skew_ms: f64,
pub confidence: f64,
}
#[derive(Clone, Debug, PartialEq, Serialize)]
pub struct SyncAnalysisVerdict {
pub status: String,
pub passed: bool,
pub p95_abs_skew_ms: f64,
pub max_abs_skew_ms: f64,
pub preferred_p95_abs_skew_ms: f64,
pub acceptable_p95_abs_skew_ms: f64,
pub gross_failure_p95_abs_skew_ms: f64,
pub catastrophic_max_abs_skew_ms: f64,
pub reason: String,
} }
#[derive(Clone, Debug, PartialEq, Eq, Serialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize)]
@ -34,6 +62,88 @@ pub struct SyncCalibrationRecommendation {
} }
impl SyncAnalysisReport { impl SyncAnalysisReport {
#[must_use]
pub fn verdict(&self) -> SyncAnalysisVerdict {
let p95_abs_skew_ms = percentile_abs(&self.skews_ms, 0.95);
let base = SyncAnalysisVerdict {
status: String::new(),
passed: false,
p95_abs_skew_ms,
max_abs_skew_ms: self.max_abs_skew_ms,
preferred_p95_abs_skew_ms: VERDICT_PREFERRED_P95_ABS_SKEW_MS,
acceptable_p95_abs_skew_ms: VERDICT_ACCEPTABLE_P95_ABS_SKEW_MS,
gross_failure_p95_abs_skew_ms: VERDICT_GROSS_FAILURE_P95_ABS_SKEW_MS,
catastrophic_max_abs_skew_ms: VERDICT_CATASTROPHIC_MAX_ABS_SKEW_MS,
reason: String::new(),
};
if self.paired_event_count < VERDICT_MIN_PAIRED_EVENTS {
return SyncAnalysisVerdict {
status: "insufficient_data".to_string(),
reason: format!(
"need at least {VERDICT_MIN_PAIRED_EVENTS} paired events; saw {}",
self.paired_event_count
),
..base
};
}
if self.max_abs_skew_ms >= VERDICT_CATASTROPHIC_MAX_ABS_SKEW_MS {
return SyncAnalysisVerdict {
status: "catastrophic_failure".to_string(),
reason: format!(
"max skew {:.1} ms is at or above the {:.1} ms catastrophic boundary",
self.max_abs_skew_ms, VERDICT_CATASTROPHIC_MAX_ABS_SKEW_MS
),
..base
};
}
if p95_abs_skew_ms > VERDICT_GROSS_FAILURE_P95_ABS_SKEW_MS {
return SyncAnalysisVerdict {
status: "gross_failure".to_string(),
reason: format!(
"p95 skew {:.1} ms exceeds the {:.1} ms gross-failure boundary",
p95_abs_skew_ms, VERDICT_GROSS_FAILURE_P95_ABS_SKEW_MS
),
..base
};
}
if p95_abs_skew_ms <= VERDICT_PREFERRED_P95_ABS_SKEW_MS {
return SyncAnalysisVerdict {
status: "preferred".to_string(),
passed: true,
reason: format!(
"p95 skew {:.1} ms is inside the preferred {:.1} ms band",
p95_abs_skew_ms, VERDICT_PREFERRED_P95_ABS_SKEW_MS
),
..base
};
}
if p95_abs_skew_ms <= VERDICT_ACCEPTABLE_P95_ABS_SKEW_MS {
return SyncAnalysisVerdict {
status: "acceptable".to_string(),
passed: true,
reason: format!(
"p95 skew {:.1} ms is inside the acceptable {:.1} ms band",
p95_abs_skew_ms, VERDICT_ACCEPTABLE_P95_ABS_SKEW_MS
),
..base
};
}
SyncAnalysisVerdict {
status: "gross_failure".to_string(),
reason: format!(
"p95 skew {:.1} ms exceeds the {:.1} ms acceptable band",
p95_abs_skew_ms, VERDICT_ACCEPTABLE_P95_ABS_SKEW_MS
),
..base
}
}
#[must_use] #[must_use]
pub fn calibration_recommendation(&self) -> SyncCalibrationRecommendation { pub fn calibration_recommendation(&self) -> SyncCalibrationRecommendation {
if self.paired_event_count < CALIBRATION_MIN_PAIRED_EVENTS { if self.paired_event_count < CALIBRATION_MIN_PAIRED_EVENTS {
@ -83,6 +193,19 @@ impl SyncAnalysisReport {
} }
} }
fn percentile_abs(values: &[f64], percentile: f64) -> f64 {
if values.is_empty() {
return 0.0;
}
let mut sorted = values.iter().copied().map(f64::abs).collect::<Vec<_>>();
sorted.sort_by(|left, right| left.total_cmp(right));
let index = ((sorted.len() as f64 * percentile).ceil() as usize)
.saturating_sub(1)
.min(sorted.len() - 1);
sorted[index]
}
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub struct SyncAnalysisOptions { pub struct SyncAnalysisOptions {
pub audio_window_ms: u32, pub audio_window_ms: u32,
@ -133,6 +256,7 @@ mod tests {
skews_ms: vec![20.0; 4], skews_ms: vec![20.0; 4],
video_onsets_s: vec![], video_onsets_s: vec![],
audio_onsets_s: vec![], audio_onsets_s: vec![],
paired_events: vec![],
}; };
let recommendation = report.calibration_recommendation(); let recommendation = report.calibration_recommendation();
@ -160,6 +284,7 @@ mod tests {
skews_ms: vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0], skews_ms: vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0],
video_onsets_s: vec![], video_onsets_s: vec![],
audio_onsets_s: vec![], audio_onsets_s: vec![],
paired_events: vec![],
}; };
let recommendation = report.calibration_recommendation(); let recommendation = report.calibration_recommendation();
@ -183,6 +308,7 @@ mod tests {
skews_ms: vec![28.0, 30.0, 32.0], skews_ms: vec![28.0, 30.0, 32.0],
video_onsets_s: vec![], video_onsets_s: vec![],
audio_onsets_s: vec![], audio_onsets_s: vec![],
paired_events: vec![],
}; };
let recommendation = report.calibration_recommendation(); let recommendation = report.calibration_recommendation();
@ -207,6 +333,7 @@ mod tests {
skews_ms: vec![3.0, 4.0], skews_ms: vec![3.0, 4.0],
video_onsets_s: vec![], video_onsets_s: vec![],
audio_onsets_s: vec![], audio_onsets_s: vec![],
paired_events: vec![],
}; };
let recommendation = report.calibration_recommendation(); let recommendation = report.calibration_recommendation();
@ -214,4 +341,50 @@ mod tests {
assert_eq!(recommendation.recommended_audio_offset_adjust_us, -4_000); assert_eq!(recommendation.recommended_audio_offset_adjust_us, -4_000);
assert!(recommendation.note.contains("already within the settled")); assert!(recommendation.note.contains("already within the settled"));
} }
#[test]
fn verdict_passes_preferred_skew_band() {
let report = SyncAnalysisReport {
video_event_count: 5,
audio_event_count: 5,
paired_event_count: 5,
first_skew_ms: 10.0,
last_skew_ms: 20.0,
mean_skew_ms: 15.0,
median_skew_ms: 15.0,
max_abs_skew_ms: 20.0,
drift_ms: 10.0,
skews_ms: vec![10.0, 12.0, 15.0, 18.0, 20.0],
video_onsets_s: vec![],
audio_onsets_s: vec![],
paired_events: vec![],
};
let verdict = report.verdict();
assert!(verdict.passed);
assert_eq!(verdict.status, "preferred");
}
#[test]
fn verdict_flags_catastrophic_desync() {
let report = SyncAnalysisReport {
video_event_count: 5,
audio_event_count: 5,
paired_event_count: 5,
first_skew_ms: 8_000.0,
last_skew_ms: 8_000.0,
mean_skew_ms: 8_000.0,
median_skew_ms: 8_000.0,
max_abs_skew_ms: 8_000.0,
drift_ms: 0.0,
skews_ms: vec![8_000.0; 5],
video_onsets_s: vec![],
audio_onsets_s: vec![],
paired_events: vec![],
};
let verdict = report.verdict();
assert!(!verdict.passed);
assert_eq!(verdict.status, "catastrophic_failure");
}
} }

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.16.9" version = "0.16.11"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -8,8 +8,12 @@ TEST_LOG="${REPORT_DIR}/cargo-test.log"
SUMMARY_JSON="${REPORT_DIR}/summary.json" SUMMARY_JSON="${REPORT_DIR}/summary.json"
SUMMARY_TXT="${REPORT_DIR}/summary.txt" SUMMARY_TXT="${REPORT_DIR}/summary.txt"
METRICS_FILE="${REPORT_DIR}/metrics.prom" METRICS_FILE="${REPORT_DIR}/metrics.prom"
STATUS_FILE="${REPORT_DIR}/gate-status.txt"
PUSHGATEWAY_URL=${QUALITY_GATE_PUSHGATEWAY_URL:-} PUSHGATEWAY_URL=${QUALITY_GATE_PUSHGATEWAY_URL:-}
PUSHGATEWAY_JOB=${LESAVKA_MEDIA_GATE_PUSHGATEWAY_JOB:-lesavka-media-reliability-gate} PUSHGATEWAY_JOB=${LESAVKA_MEDIA_GATE_PUSHGATEWAY_JOB:-lesavka-media-reliability-gate}
SYNC_PROBE_REPORT_JSON=${LESAVKA_SYNC_PROBE_REPORT_JSON:-}
SYNC_PROBE_REPORT_DIR=${LESAVKA_SYNC_PROBE_REPORT_DIR:-}
REQUIRE_SYNC_PROBE=${LESAVKA_REQUIRE_SYNC_PROBE:-0}
mkdir -p "${REPORT_DIR}" mkdir -p "${REPORT_DIR}"
cd "${ROOT_DIR}" cd "${ROOT_DIR}"
@ -64,7 +68,7 @@ set -e
end_seconds=$(date +%s) end_seconds=$(date +%s)
duration_seconds=$((end_seconds - start_seconds)) duration_seconds=$((end_seconds - start_seconds))
python3 - "${SUMMARY_JSON}" "${SUMMARY_TXT}" "${METRICS_FILE}" "${status}" "${duration_seconds}" "${branch}" "${commit}" "${build_url}" "${REPORT_DIR}" <<'PY' python3 - "${SUMMARY_JSON}" "${SUMMARY_TXT}" "${METRICS_FILE}" "${STATUS_FILE}" "${status}" "${duration_seconds}" "${branch}" "${commit}" "${build_url}" "${REPORT_DIR}" "${SYNC_PROBE_REPORT_JSON}" "${SYNC_PROBE_REPORT_DIR}" "${REQUIRE_SYNC_PROBE}" <<'PY'
import json import json
import pathlib import pathlib
import sys import sys
@ -73,17 +77,61 @@ from datetime import datetime, timezone
summary_path = pathlib.Path(sys.argv[1]) summary_path = pathlib.Path(sys.argv[1])
text_path = pathlib.Path(sys.argv[2]) text_path = pathlib.Path(sys.argv[2])
metrics_path = pathlib.Path(sys.argv[3]) metrics_path = pathlib.Path(sys.argv[3])
status = int(sys.argv[4]) status_path = pathlib.Path(sys.argv[4])
duration_seconds = int(sys.argv[5]) status = int(sys.argv[5])
branch = sys.argv[6] duration_seconds = int(sys.argv[6])
commit = sys.argv[7] branch = sys.argv[7]
build_url = sys.argv[8] commit = sys.argv[8]
report_dir = pathlib.Path(sys.argv[9]) build_url = sys.argv[9]
report_dir = pathlib.Path(sys.argv[10])
sync_probe_report_json = sys.argv[11]
sync_probe_report_dir = sys.argv[12]
require_sync_probe = sys.argv[13] == '1'
manual_report = report_dir / 'manual-soak.json' manual_report = report_dir / 'manual-soak.json'
def esc(value: str) -> str: def esc(value: str) -> str:
return value.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"') return value.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')
def num(report: dict, path: list[str], default: float = 0.0) -> float:
current = report
for key in path:
if not isinstance(current, dict) or key not in current:
return default
current = current[key]
try:
return float(current)
except (TypeError, ValueError):
return default
def discover_sync_probe_report() -> tuple[pathlib.Path | None, dict | None, str]:
candidates = []
if sync_probe_report_json:
candidates.append(pathlib.Path(sync_probe_report_json))
if sync_probe_report_dir:
candidates.append(pathlib.Path(sync_probe_report_dir) / 'report.json')
candidates.append(report_dir / 'sync-probe' / 'report.json')
for path in candidates:
if not path.exists():
continue
try:
return path, json.loads(path.read_text(encoding='utf-8')), ''
except json.JSONDecodeError as exc:
return path, None, f'invalid JSON: {exc}'
return None, None, 'no report.json found; set LESAVKA_SYNC_PROBE_REPORT_JSON or LESAVKA_SYNC_PROBE_REPORT_DIR'
sync_probe_path, sync_probe_report, sync_probe_error = discover_sync_probe_report()
sync_probe_verdict = {}
sync_probe_check_status = 'not_applicable'
sync_probe_why = 'requires real Theia -> Lesavka -> Tethys UVC/UAC hardware evidence'
if sync_probe_report is not None:
sync_probe_verdict = sync_probe_report.get('verdict', {})
sync_probe_check_status = 'ok' if bool(sync_probe_verdict.get('passed')) else 'failed'
sync_probe_why = sync_probe_verdict.get('reason') or 'sync probe report was present'
elif require_sync_probe:
sync_probe_check_status = 'failed'
sync_probe_why = sync_probe_error
manual_checks = [ manual_checks = [
{ {
'name': 'zoom_equivalent_webcam_consumer', 'name': 'zoom_equivalent_webcam_consumer',
@ -100,6 +148,11 @@ manual_checks = [
'status': 'not_applicable' if not manual_report.exists() else 'reported', 'status': 'not_applicable' if not manual_report.exists() else 'reported',
'why': 'requires the Theia HDMI -> UGREEN -> Tethys USB path', 'why': 'requires the Theia HDMI -> UGREEN -> Tethys USB path',
}, },
{
'name': 'direct_upstream_av_sync_probe',
'status': sync_probe_check_status,
'why': sync_probe_why,
},
] ]
tracked_signals = [ tracked_signals = [
@ -116,23 +169,39 @@ tracked_signals = [
'synthetic_moving_pattern_distortion', 'synthetic_moving_pattern_distortion',
] ]
final_status = status
if require_sync_probe and sync_probe_check_status != 'ok':
final_status = 1
summary = { summary = {
'suite': 'lesavka', 'suite': 'lesavka',
'branch': branch, 'branch': branch,
'commit': commit, 'commit': commit,
'build_url': build_url, 'build_url': build_url,
'generated_at': datetime.now(timezone.utc).isoformat(), 'generated_at': datetime.now(timezone.utc).isoformat(),
'status': 'ok' if status == 0 else 'failed', 'status': 'ok' if final_status == 0 else 'failed',
'deterministic_status': 'ok' if status == 0 else 'failed',
'duration_seconds': duration_seconds, 'duration_seconds': duration_seconds,
'deterministic_tests': 'cargo test -p lesavka_testing media reliability contract subset', 'deterministic_tests': 'cargo test -p lesavka_testing media reliability contract subset',
'tracked_media_signals': tracked_signals, 'tracked_media_signals': tracked_signals,
'manual_checks': manual_checks, 'manual_checks': manual_checks,
'sync_probe': {
'required': require_sync_probe,
'status': sync_probe_check_status,
'report_json': '' if sync_probe_path is None else str(sync_probe_path),
'verdict': sync_probe_verdict,
'paired_event_count': 0 if sync_probe_report is None else sync_probe_report.get('paired_event_count', 0),
'median_skew_ms': 0.0 if sync_probe_report is None else sync_probe_report.get('median_skew_ms', 0.0),
'drift_ms': 0.0 if sync_probe_report is None else sync_probe_report.get('drift_ms', 0.0),
},
} }
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + '\n', encoding='utf-8') summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + '\n', encoding='utf-8')
status_path.write_text(str(final_status) + '\n', encoding='utf-8')
lines = [ lines = [
'media reliability gate report', 'media reliability gate report',
f'status: {summary["status"]}', f'status: {summary["status"]}',
f'deterministic_status: {summary["deterministic_status"]}',
f'branch: {branch}', f'branch: {branch}',
f'commit: {commit}', f'commit: {commit}',
f'duration_seconds: {duration_seconds}', f'duration_seconds: {duration_seconds}',
@ -150,16 +219,33 @@ lines = [
] ]
for check in manual_checks: for check in manual_checks:
lines.append(f'- {check["name"]}: {check["status"]} ({check["why"]})') lines.append(f'- {check["name"]}: {check["status"]} ({check["why"]})')
lines.extend([
'',
'sync probe evidence',
f'- required: {require_sync_probe}',
f'- status: {sync_probe_check_status}',
f'- report_json: {summary["sync_probe"]["report_json"] or "none"}',
f'- p95_abs_skew_ms: {num(sync_probe_report or {}, ["verdict", "p95_abs_skew_ms"]):.1f}',
f'- max_abs_skew_ms: {num(sync_probe_report or {}, ["max_abs_skew_ms"]):.1f}',
f'- median_skew_ms: {num(sync_probe_report or {}, ["median_skew_ms"]):.1f}',
f'- drift_ms: {num(sync_probe_report or {}, ["drift_ms"]):+.1f}',
])
text_path.write_text('\n'.join(lines) + '\n', encoding='utf-8') text_path.write_text('\n'.join(lines) + '\n', encoding='utf-8')
labels = f'suite="lesavka",branch="{esc(branch)}",commit="{esc(commit)}"' labels = f'suite="lesavka",branch="{esc(branch)}",commit="{esc(commit)}"'
ok = 1 if status == 0 else 0 ok = 1 if final_status == 0 else 0
failed = 0 if status == 0 else 1 failed = 0 if final_status == 0 else 1
probe_ok = 1 if sync_probe_check_status == 'ok' else 0
probe_failed = 1 if sync_probe_check_status == 'failed' else 0
probe_not_applicable = 1 if sync_probe_check_status == 'not_applicable' else 0
metrics = [ metrics = [
'# HELP platform_quality_gate_checks_total Check outcomes from the latest lesavka gate run.', '# HELP platform_quality_gate_checks_total Check outcomes from the latest lesavka gate run.',
'# TYPE platform_quality_gate_checks_total gauge', '# TYPE platform_quality_gate_checks_total gauge',
f'platform_quality_gate_checks_total{{{labels},check="media_reliability",status="ok"}} {ok}', f'platform_quality_gate_checks_total{{{labels},check="media_reliability",status="ok"}} {ok}',
f'platform_quality_gate_checks_total{{{labels},check="media_reliability",status="failed"}} {failed}', f'platform_quality_gate_checks_total{{{labels},check="media_reliability",status="failed"}} {failed}',
f'platform_quality_gate_checks_total{{{labels},check="sync_probe",status="ok"}} {probe_ok}',
f'platform_quality_gate_checks_total{{{labels},check="sync_probe",status="failed"}} {probe_failed}',
f'platform_quality_gate_checks_total{{{labels},check="sync_probe",status="not_applicable"}} {probe_not_applicable}',
'# HELP lesavka_media_reliability_manual_check_info Manual media reliability evidence slots.', '# HELP lesavka_media_reliability_manual_check_info Manual media reliability evidence slots.',
'# TYPE lesavka_media_reliability_manual_check_info gauge', '# TYPE lesavka_media_reliability_manual_check_info gauge',
] ]
@ -167,10 +253,28 @@ for check in manual_checks:
metrics.append( metrics.append(
f'lesavka_media_reliability_manual_check_info{{{labels},check="{esc(check["name"])}",status="{esc(check["status"])}"}} 1' f'lesavka_media_reliability_manual_check_info{{{labels},check="{esc(check["name"])}",status="{esc(check["status"])}"}} 1'
) )
metrics.extend([
'# HELP lesavka_sync_probe_skew_ms Last direct UVC/UAC sync-probe skew values.',
'# TYPE lesavka_sync_probe_skew_ms gauge',
f'lesavka_sync_probe_skew_ms{{{labels},stat="p95_abs"}} {num(sync_probe_report or {}, ["verdict", "p95_abs_skew_ms"]):.3f}',
f'lesavka_sync_probe_skew_ms{{{labels},stat="max_abs"}} {num(sync_probe_report or {}, ["max_abs_skew_ms"]):.3f}',
f'lesavka_sync_probe_skew_ms{{{labels},stat="median"}} {num(sync_probe_report or {}, ["median_skew_ms"]):.3f}',
f'lesavka_sync_probe_skew_ms{{{labels},stat="drift"}} {num(sync_probe_report or {}, ["drift_ms"]):.3f}',
'# HELP lesavka_sync_probe_events_total Last direct UVC/UAC sync-probe event counts.',
'# TYPE lesavka_sync_probe_events_total gauge',
f'lesavka_sync_probe_events_total{{{labels},event_type="paired"}} {int(num(sync_probe_report or {}, ["paired_event_count"]))}',
f'lesavka_sync_probe_events_total{{{labels},event_type="video"}} {int(num(sync_probe_report or {}, ["video_event_count"]))}',
f'lesavka_sync_probe_events_total{{{labels},event_type="audio"}} {int(num(sync_probe_report or {}, ["audio_event_count"]))}',
'# HELP lesavka_sync_probe_verdict_info Last direct UVC/UAC sync-probe verdict.',
'# TYPE lesavka_sync_probe_verdict_info gauge',
f'lesavka_sync_probe_verdict_info{{{labels},status="{esc(sync_probe_check_status)}",verdict="{esc(str(sync_probe_verdict.get("status", "")))}",reason="{esc(sync_probe_why)}"}} 1',
])
metrics_path.write_text('\n'.join(metrics) + '\n', encoding='utf-8') metrics_path.write_text('\n'.join(metrics) + '\n', encoding='utf-8')
print(text_path.read_text(encoding='utf-8')) print(text_path.read_text(encoding='utf-8'))
PY PY
status=$(cat "${STATUS_FILE}")
if [[ -n "${PUSHGATEWAY_URL}" ]]; then if [[ -n "${PUSHGATEWAY_URL}" ]]; then
curl --fail --silent --show-error \ curl --fail --silent --show-error \
--data-binary @"${METRICS_FILE}" \ --data-binary @"${METRICS_FILE}" \

View File

@ -14,6 +14,8 @@ TETHYS_HOST=${TETHYS_HOST:-tethys}
LESAVKA_SERVER_HOST=${LESAVKA_SERVER_HOST:-theia} LESAVKA_SERVER_HOST=${LESAVKA_SERVER_HOST:-theia}
LESAVKA_SERVER_CONNECT_HOST=${LESAVKA_SERVER_CONNECT_HOST:-38.28.125.112} LESAVKA_SERVER_CONNECT_HOST=${LESAVKA_SERVER_CONNECT_HOST:-38.28.125.112}
LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-auto} LESAVKA_SERVER_ADDR=${LESAVKA_SERVER_ADDR:-auto}
LESAVKA_SERVER_SCHEME=${LESAVKA_SERVER_SCHEME:-https}
LESAVKA_TLS_DOMAIN=${LESAVKA_TLS_DOMAIN:-lesavka-server}
PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-10} PROBE_DURATION_SECONDS=${PROBE_DURATION_SECONDS:-10}
PROBE_WARMUP_SECONDS=${PROBE_WARMUP_SECONDS:-4} PROBE_WARMUP_SECONDS=${PROBE_WARMUP_SECONDS:-4}
PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))} PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))}
@ -23,8 +25,7 @@ PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE
LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0} LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0}
TAIL_SECONDS=${TAIL_SECONDS:-2} TAIL_SECONDS=${TAIL_SECONDS:-2}
CAPTURE_SECONDS=${CAPTURE_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + LEAD_IN_SECONDS + TAIL_SECONDS))} CAPTURE_SECONDS=${CAPTURE_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + LEAD_IN_SECONDS + TAIL_SECONDS))}
REMOTE_CAPTURE=${REMOTE_CAPTURE:-/tmp/lesavka-upstream-av-sync.mkv} LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-/tmp}
LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-"${REPO_ROOT}/tmp"}
REMOTE_VIDEO_DEVICE=${REMOTE_VIDEO_DEVICE:-auto} REMOTE_VIDEO_DEVICE=${REMOTE_VIDEO_DEVICE:-auto}
VIDEO_SIZE=${VIDEO_SIZE:-auto} VIDEO_SIZE=${VIDEO_SIZE:-auto}
VIDEO_FPS=${VIDEO_FPS:-auto} VIDEO_FPS=${VIDEO_FPS:-auto}
@ -50,11 +51,15 @@ REMOTE_EXPECT_CAM_OUTPUT=${REMOTE_EXPECT_CAM_OUTPUT:-uvc}
REMOTE_EXPECT_UVC_CODEC=${REMOTE_EXPECT_UVC_CODEC:-mjpeg} REMOTE_EXPECT_UVC_CODEC=${REMOTE_EXPECT_UVC_CODEC:-mjpeg}
CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__" CAPTURE_READY_MARKER="__LESAVKA_CAPTURE_READY__"
mkdir -p "${LOCAL_OUTPUT_DIR}"
STAMP="$(date +%Y%m%d-%H%M%S)" STAMP="$(date +%Y%m%d-%H%M%S)"
LOCAL_CAPTURE="${LOCAL_OUTPUT_DIR}/lesavka-upstream-av-sync-${STAMP}.mkv" REMOTE_CAPTURE=${REMOTE_CAPTURE:-"/tmp/lesavka-sync-probe-${STAMP}.mkv"}
LOCAL_ANALYSIS_JSON="${LOCAL_CAPTURE%.mkv}.json" LOCAL_REPORT_DIR="${LOCAL_OUTPUT_DIR%/}/lesavka-sync-probe-${STAMP}"
LOCAL_CAPTURE_LOG="${LOCAL_CAPTURE%.mkv}.capture.log" LOCAL_CAPTURE="${LOCAL_REPORT_DIR}/capture.mkv"
LOCAL_ANALYSIS_JSON="${LOCAL_REPORT_DIR}/report.json"
LOCAL_REPORT_TXT="${LOCAL_REPORT_DIR}/report.txt"
LOCAL_EVENTS_CSV="${LOCAL_REPORT_DIR}/events.csv"
LOCAL_CAPTURE_LOG="${LOCAL_REPORT_DIR}/capture.log"
mkdir -p "${LOCAL_REPORT_DIR}"
RESOLVED_LESAVKA_SERVER_ADDR="" RESOLVED_LESAVKA_SERVER_ADDR=""
SERVER_TUNNEL_PID="" SERVER_TUNNEL_PID=""
SERVER_TUNNEL_REMOTE_PORT="" SERVER_TUNNEL_REMOTE_PORT=""
@ -133,12 +138,12 @@ resolve_server_addr() {
port="${bind_addr##*:}" port="${bind_addr##*:}"
if [[ "${port}" =~ ^[0-9]+$ ]]; then if [[ "${port}" =~ ^[0-9]+$ ]]; then
start_server_tunnel "${port}" start_server_tunnel "${port}"
RESOLVED_LESAVKA_SERVER_ADDR="http://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}" RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}"
return 0 return 0
fi fi
start_server_tunnel "50051" start_server_tunnel "50051"
RESOLVED_LESAVKA_SERVER_ADDR="http://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}" RESOLVED_LESAVKA_SERVER_ADDR="${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}"
} }
preflight_server_path() { preflight_server_path() {
@ -766,6 +771,7 @@ probe_status=0
probe_timed_out=0 probe_timed_out=0
( (
cd "${REPO_ROOT}" cd "${REPO_ROOT}"
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
timeout --signal=INT "${PROBE_TIMEOUT_SECONDS}" "${PROBE_BIN}" \ timeout --signal=INT "${PROBE_TIMEOUT_SECONDS}" "${PROBE_BIN}" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
--duration-seconds "${PROBE_DURATION_SECONDS}" \ --duration-seconds "${PROBE_DURATION_SECONDS}" \
@ -864,27 +870,51 @@ if [[ "${REMOTE_ANALYZE}" != "0" ]]; then
exit 92 exit 92
fi fi
echo "==> remote analysis summary" echo "==> remote analysis summary"
python - <<'PY' "${LOCAL_ANALYSIS_JSON}" python - <<'PY' "${LOCAL_ANALYSIS_JSON}" "${LOCAL_REPORT_TXT}" "${LOCAL_EVENTS_CSV}"
import csv
import json import json
import pathlib import pathlib
import sys import sys
report = json.loads(pathlib.Path(sys.argv[1]).read_text()) report = json.loads(pathlib.Path(sys.argv[1]).read_text())
print(f"A/V sync report for {sys.argv[1]}") verdict = report.get('verdict', {})
print(f"- video onsets: {report['video_event_count']}")
print(f"- audio onsets: {report['audio_event_count']}")
print(f"- paired pulses: {report['paired_event_count']}")
print(f"- first skew: {report['first_skew_ms']:+.1f} ms (audio after video is positive)")
print(f"- last skew: {report['last_skew_ms']:+.1f} ms")
print(f"- mean skew: {report['mean_skew_ms']:+.1f} ms")
print(f"- median skew: {report['median_skew_ms']:+.1f} ms")
print(f"- max abs skew: {report['max_abs_skew_ms']:.1f} ms")
print(f"- drift: {report['drift_ms']:+.1f} ms")
cal = report.get('calibration', {}) cal = report.get('calibration', {})
print(f"- calibration ready: {cal.get('ready')}") lines = [
print(f"- recommended audio offset adjust: {int(cal.get('recommended_audio_offset_adjust_us', 0)):+d} us") f"A/V sync report for {sys.argv[1]}",
print(f"- alternative video offset adjust: {int(cal.get('recommended_video_offset_adjust_us', 0)):+d} us") f"- verdict: {verdict.get('status', 'unknown')} ({'pass' if verdict.get('passed') else 'fail'})",
print(f"- calibration note: {cal.get('note', '')}") f"- verdict reason: {verdict.get('reason', '')}",
f"- p95 abs skew: {float(verdict.get('p95_abs_skew_ms', 0.0)):.1f} ms",
f"- video onsets: {report['video_event_count']}",
f"- audio onsets: {report['audio_event_count']}",
f"- paired pulses: {report['paired_event_count']}",
f"- first skew: {report['first_skew_ms']:+.1f} ms (audio after video is positive)",
f"- last skew: {report['last_skew_ms']:+.1f} ms",
f"- mean skew: {report['mean_skew_ms']:+.1f} ms",
f"- median skew: {report['median_skew_ms']:+.1f} ms",
f"- max abs skew: {report['max_abs_skew_ms']:.1f} ms",
f"- drift: {report['drift_ms']:+.1f} ms",
f"- calibration ready: {cal.get('ready')}",
f"- recommended audio offset adjust: {int(cal.get('recommended_audio_offset_adjust_us', 0)):+d} us",
f"- alternative video offset adjust: {int(cal.get('recommended_video_offset_adjust_us', 0)):+d} us",
f"- calibration note: {cal.get('note', '')}",
]
summary = "\n".join(lines) + "\n"
pathlib.Path(sys.argv[2]).write_text(summary)
with pathlib.Path(sys.argv[3]).open("w", newline="") as handle:
writer = csv.DictWriter(
handle,
fieldnames=["event_id", "video_time_s", "audio_time_s", "skew_ms", "confidence"],
)
writer.writeheader()
for event in report.get("paired_events", []):
writer.writerow({
"event_id": event.get("event_id"),
"video_time_s": event.get("video_time_s"),
"audio_time_s": event.get("audio_time_s"),
"skew_ms": event.get("skew_ms"),
"confidence": event.get("confidence"),
})
print(summary, end="")
PY PY
else else
if [[ ! -f "${LOCAL_CAPTURE}" ]]; then if [[ ! -f "${LOCAL_CAPTURE}" ]]; then
@ -894,7 +924,7 @@ else
echo "==> analyzing capture" echo "==> analyzing capture"
( (
cd "${REPO_ROOT}" cd "${REPO_ROOT}"
"${ANALYZE_BIN}" "${LOCAL_CAPTURE}" "${ANALYZE_BIN}" "${LOCAL_CAPTURE}" --report-dir "${LOCAL_REPORT_DIR}"
) )
fi fi
@ -903,11 +933,18 @@ if [[ "${capture_v4l2_fault}" -eq 1 ]]; then
fi fi
echo "==> done" echo "==> done"
echo "artifact_dir: ${LOCAL_REPORT_DIR}"
if [[ -f "${LOCAL_CAPTURE}" ]]; then if [[ -f "${LOCAL_CAPTURE}" ]]; then
echo "capture: ${LOCAL_CAPTURE}" echo "capture: ${LOCAL_CAPTURE}"
fi fi
if [[ -f "${LOCAL_ANALYSIS_JSON}" ]]; then if [[ -f "${LOCAL_ANALYSIS_JSON}" ]]; then
echo "analysis_json: ${LOCAL_ANALYSIS_JSON}" echo "report_json: ${LOCAL_ANALYSIS_JSON}"
fi
if [[ -f "${LOCAL_REPORT_TXT}" ]]; then
echo "report_txt: ${LOCAL_REPORT_TXT}"
fi
if [[ -f "${LOCAL_EVENTS_CSV}" ]]; then
echo "events_csv: ${LOCAL_EVENTS_CSV}"
fi fi
if [[ -f "${LOCAL_CAPTURE_LOG}" ]]; then if [[ -f "${LOCAL_CAPTURE_LOG}" ]]; then
echo "capture_log: ${LOCAL_CAPTURE_LOG}" echo "capture_log: ${LOCAL_CAPTURE_LOG}"

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.16.9" version = "0.16.11"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -2,13 +2,13 @@ impl UpstreamMediaRuntime {
/// Activate a camera stream as the current owner for the session. /// Activate a camera stream as the current owner for the session.
#[must_use] #[must_use]
pub fn activate_camera(&self) -> UpstreamStreamLease { pub fn activate_camera(&self) -> UpstreamStreamLease {
self.activate(UpstreamMediaKind::Camera) self.activate(UpstreamMediaKind::Camera, true)
} }
/// Activate a microphone stream as the current owner for the session. /// Activate a microphone stream as the current owner for the session.
#[must_use] #[must_use]
pub fn activate_microphone(&self) -> UpstreamStreamLease { pub fn activate_microphone(&self) -> UpstreamStreamLease {
self.activate(UpstreamMediaKind::Microphone) self.activate(UpstreamMediaKind::Microphone, true)
} }
/// Reserve the single live microphone sink slot for one generation. /// Reserve the single live microphone sink slot for one generation.
@ -30,7 +30,7 @@ impl UpstreamMediaRuntime {
self.is_microphone_active(generation).then_some(permit) self.is_microphone_active(generation).then_some(permit)
} }
fn activate(&self, kind: UpstreamMediaKind) -> UpstreamStreamLease { fn activate(&self, kind: UpstreamMediaKind, reset_on_replace: bool) -> UpstreamStreamLease {
let generation = match kind { let generation = match kind {
UpstreamMediaKind::Camera => { UpstreamMediaKind::Camera => {
self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1 self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1
@ -46,21 +46,17 @@ impl UpstreamMediaRuntime {
.state .state
.lock() .lock()
.expect("upstream media state mutex poisoned"); .expect("upstream media state mutex poisoned");
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() let replacing_existing_owner = match kind {
{ UpstreamMediaKind::Camera => state.active_camera_generation.is_some(),
UpstreamMediaKind::Microphone => state.active_microphone_generation.is_some(),
};
let starting_new_session =
state.active_camera_generation.is_none() && state.active_microphone_generation.is_none();
if starting_new_session {
state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1;
state.first_camera_remote_pts_us = None; reset_timing_anchors(&mut state);
state.first_microphone_remote_pts_us = None; } else if reset_on_replace && replacing_existing_owner {
state.camera_startup_ready = false; reset_timing_anchors(&mut state);
state.session_base_remote_pts_us = None;
state.last_video_local_pts_us = None;
state.last_audio_local_pts_us = None;
state.camera_packet_count = 0;
state.microphone_packet_count = 0;
state.startup_anchor_logged = false;
state.playout_epoch = None;
state.pairing_anchor_deadline = None;
state.catastrophic_reanchor_done = false;
} }
match kind { match kind {
UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation),
@ -112,7 +108,7 @@ impl UpstreamMediaRuntime {
/// Why: UAC recovery should force the active gRPC/audio sink to drain and /// Why: UAC recovery should force the active gRPC/audio sink to drain and
/// reconnect, not reset UDC or disturb UVC/HID. /// reconnect, not reset UDC or disturb UVC/HID.
pub fn soft_recover_microphone(&self) { pub fn soft_recover_microphone(&self) {
let lease = self.activate_microphone(); let lease = self.activate(UpstreamMediaKind::Microphone, false);
self.close_microphone(lease.generation); self.close_microphone(lease.generation);
} }
@ -134,6 +130,14 @@ impl UpstreamMediaRuntime {
} }
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
{ {
reset_timing_anchors(&mut state);
}
self.pairing_state_notify.notify_waiters();
self.audio_progress_notify.notify_waiters();
}
}
fn reset_timing_anchors(state: &mut UpstreamClockState) {
state.first_camera_remote_pts_us = None; state.first_camera_remote_pts_us = None;
state.first_microphone_remote_pts_us = None; state.first_microphone_remote_pts_us = None;
state.camera_startup_ready = false; state.camera_startup_ready = false;
@ -146,8 +150,4 @@ impl UpstreamMediaRuntime {
state.playout_epoch = None; state.playout_epoch = None;
state.pairing_anchor_deadline = None; state.pairing_anchor_deadline = None;
state.catastrophic_reanchor_done = false; state.catastrophic_reanchor_done = false;
}
self.pairing_state_notify.notify_waiters();
self.audio_progress_notify.notify_waiters();
}
} }

View File

@ -27,6 +27,55 @@ fn replacing_one_kind_keeps_the_session_but_preempts_the_old_owner() {
assert!(runtime.is_microphone_active(second.generation)); assert!(runtime.is_microphone_active(second.generation));
} }
#[test]
#[serial(upstream_media_runtime)]
fn replacing_one_kind_resets_stale_timing_anchors_before_repairing() {
temp_env::with_var(
"LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS",
Some("0"),
|| {
let runtime = runtime_without_offsets();
let first_camera = runtime.activate_camera();
let first_microphone = runtime.activate_microphone();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let stale_audio = play(runtime.plan_audio_pts(1_000_000));
let stale_video = play(runtime.plan_video_pts(1_000_000, 16_666));
assert_eq!(stale_audio.local_pts_us, stale_video.local_pts_us);
let replacement_camera = runtime.activate_camera();
assert_eq!(replacement_camera.session_id, first_camera.session_id);
assert!(!runtime.is_camera_active(first_camera.generation));
assert!(runtime.is_microphone_active(first_microphone.generation));
assert!(matches!(
runtime.plan_video_pts(6_700_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let repaired_audio = play(runtime.plan_audio_pts(6_700_010_000));
assert!(matches!(
runtime.plan_video_pts(6_700_000_000, 16_666),
super::UpstreamPlanDecision::DropBeforeOverlap
));
let repaired_video = play(runtime.plan_video_pts(6_700_026_666, 16_666));
assert_eq!(repaired_audio.local_pts_us, 0);
assert_eq!(repaired_video.local_pts_us, 16_666);
assert!(
repaired_audio.due_at > tokio::time::Instant::now(),
"replacement stream should get a fresh playout budget instead of inheriting stale lateness"
);
assert!(
repaired_video.due_at > tokio::time::Instant::now(),
"replacement video should also get a fresh playout budget after re-pairing"
);
},
);
}
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn closing_the_last_stream_resets_the_next_session_anchor() { fn closing_the_last_stream_resets_the_next_session_anchor() {

View File

@ -16,9 +16,16 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() {
"start_server_tunnel", "start_server_tunnel",
"ExitOnForwardFailure=yes", "ExitOnForwardFailure=yes",
"127.0.0.1:${local_port}:127.0.0.1:${remote_port}", "127.0.0.1:${local_port}:127.0.0.1:${remote_port}",
"RESOLVED_LESAVKA_SERVER_ADDR=\"http://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}\"", "LESAVKA_SERVER_SCHEME=${LESAVKA_SERVER_SCHEME:-https}",
"LESAVKA_TLS_DOMAIN=${LESAVKA_TLS_DOMAIN:-lesavka-server}",
"RESOLVED_LESAVKA_SERVER_ADDR=\"${LESAVKA_SERVER_SCHEME}://127.0.0.1:${SERVER_TUNNEL_LOCAL_PORT}\"",
"LESAVKA_TLS_DOMAIN=\"${LESAVKA_TLS_DOMAIN}\"",
"tunneled to ${LESAVKA_SERVER_HOST}:127.0.0.1:${SERVER_TUNNEL_REMOTE_PORT}", "tunneled to ${LESAVKA_SERVER_HOST}:127.0.0.1:${SERVER_TUNNEL_REMOTE_PORT}",
"CAPTURE_READY_MARKER=\"__LESAVKA_CAPTURE_READY__\"", "CAPTURE_READY_MARKER=\"__LESAVKA_CAPTURE_READY__\"",
"LOCAL_OUTPUT_DIR=${LOCAL_OUTPUT_DIR:-/tmp}",
"LOCAL_REPORT_DIR=\"${LOCAL_OUTPUT_DIR%/}/lesavka-sync-probe-${STAMP}\"",
"LOCAL_ANALYSIS_JSON=\"${LOCAL_REPORT_DIR}/report.json\"",
"LOCAL_EVENTS_CSV=\"${LOCAL_REPORT_DIR}/events.csv\"",
"LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0}", "LEAD_IN_SECONDS=${LEAD_IN_SECONDS:-0}",
"PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))}", "PROBE_TIMEOUT_SECONDS=${PROBE_TIMEOUT_SECONDS:-$((PROBE_DURATION_SECONDS + PROBE_WARMUP_SECONDS + 20))}",
"timeout --signal=INT \"${PROBE_TIMEOUT_SECONDS}\" \"${PROBE_BIN}\"", "timeout --signal=INT \"${PROBE_TIMEOUT_SECONDS}\" \"${PROBE_BIN}\"",
@ -33,6 +40,8 @@ fn upstream_sync_script_tunnels_auto_server_addr_through_ssh() {
"resolve_alsa_audio_device", "resolve_alsa_audio_device",
"PipeWire Lesavka source not found; falling back to ALSA device", "PipeWire Lesavka source not found; falling back to ALSA device",
"Lesavka audio source not found in PipeWire or ALSA; capture host does not currently expose the gadget microphone.", "Lesavka audio source not found in PipeWire or ALSA; capture host does not currently expose the gadget microphone.",
"artifact_dir: ${LOCAL_REPORT_DIR}",
"events_csv: ${LOCAL_EVENTS_CSV}",
] { ] {
assert!( assert!(
SYNC_SCRIPT.contains(expected), SYNC_SCRIPT.contains(expected),

View File

@ -7,6 +7,7 @@
const PERFORMANCE_GATE: &str = include_str!("../../scripts/ci/performance_gate.sh"); const PERFORMANCE_GATE: &str = include_str!("../../scripts/ci/performance_gate.sh");
const PLATFORM_GATE: &str = include_str!("../../scripts/ci/platform_quality_gate.sh"); const PLATFORM_GATE: &str = include_str!("../../scripts/ci/platform_quality_gate.sh");
const MEDIA_GATE: &str = include_str!("../../scripts/ci/media_reliability_gate.sh");
#[test] #[test]
fn performance_gate_tracks_av_and_interaction_latency_contracts() { fn performance_gate_tracks_av_and_interaction_latency_contracts() {
@ -40,3 +41,23 @@ fn platform_gate_runs_performance_before_media_reliability() {
"performance regressions should fail before broader media packaging checks" "performance regressions should fail before broader media packaging checks"
); );
} }
#[test]
fn media_reliability_gate_reports_direct_sync_probe_evidence() {
for expected in [
"LESAVKA_SYNC_PROBE_REPORT_JSON",
"LESAVKA_SYNC_PROBE_REPORT_DIR",
"LESAVKA_REQUIRE_SYNC_PROBE",
"direct_upstream_av_sync_probe",
"check=\"sync_probe\"",
"lesavka_sync_probe_skew_ms",
"lesavka_sync_probe_events_total",
"lesavka_sync_probe_verdict_info",
"gate-status.txt",
] {
assert!(
MEDIA_GATE.contains(expected),
"media reliability gate should include {expected}"
);
}
}