From 826bada865cc8e9ff472e5cd7f98319e4a3632f1 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 2 May 2026 17:48:45 -0300 Subject: [PATCH] feat: add runtime blind sync healer --- AGENTS.md | 14 + Cargo.lock | 6 +- client/Cargo.toml | 2 +- client/src/bin/lesavka-relayctl.rs | 8 + common/Cargo.toml | 2 +- common/proto/lesavka.proto | 2 + .../manual/run_upstream_mirrored_av_sync.sh | 268 ++++++++++++ server/Cargo.toml | 2 +- server/src/blind_healer.rs | 386 ++++++++++++++++++ server/src/calibration.rs | 62 +++ server/src/lib.rs | 1 + server/src/main/handler_startup.rs | 5 + server/src/main/rpc_helpers.rs | 2 + server/src/upstream_media_runtime.rs | 2 + server/src/upstream_media_runtime/state.rs | 4 + server/src/upstream_media_runtime/types.rs | 2 + 16 files changed, 762 insertions(+), 6 deletions(-) create mode 100644 server/src/blind_healer.rs diff --git a/AGENTS.md b/AGENTS.md index 856eecb..bbc337b 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -525,3 +525,17 @@ measurement gaps before tuning any new healing controller. - [x] Include rolling blind metrics in mirrored-probe CSV/JSONL summaries and blind targets. - [x] Add planner tests for rolling timing windows and sink handoff timing. - [ ] Use the next mirrored run only for correlation/tuning: decide whether the controller should adjust playout delay, offset, or drop/freeze policy from these blind metrics. + +## 0.17.27 Runtime Blind Healing Checklist + +Context: if client/server-only timing is stable enough, Lesavka should make +small runtime corrections without waiting for the external probe. The probe +remains the truth judge and root-cause localizer, not the production dependency. + +- [x] Add a server-owned blind healer loop enabled by default with `LESAVKA_UPSTREAM_BLIND_HEAL=0` escape hatch. +- [x] Gate blind healing on rolling sample counts plus client-send, server-receive, queue-age, sink-late, and sink-handoff p95 limits. +- [x] Apply bounded transient offset nudges from sink handoff skew without saving them as site defaults. +- [x] Expose sample counts in `upstream-sync` and mirrored probe artifacts so failed runs can separate "insufficient evidence" from real timing failure. +- [x] Emit `root-cause-summary.json` from mirrored probe runs to classify failing layers instead of eyeballing raw metrics. +- [x] Add unit tests for apply/refuse/target behavior in the blind healer. +- [ ] Next run should identify the failing layer if confirmation still fails: client capture/uplink, network/server receive, server planner, server sink handoff, or external USB/browser/probe boundary. diff --git a/Cargo.lock b/Cargo.lock index ca47ee3..fccac32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.26" +version = "0.17.27" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.26" +version = "0.17.27" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.26" +version = "0.17.27" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index b2e818f..870ba63 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.26" +version = "0.17.27" edition = "2024" [dependencies] diff --git a/client/src/bin/lesavka-relayctl.rs b/client/src/bin/lesavka-relayctl.rs index 22b83f7..f473c99 100644 --- a/client/src/bin/lesavka-relayctl.rs +++ b/client/src/bin/lesavka-relayctl.rs @@ -398,6 +398,14 @@ fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) { .map(|value| format!("{value:.1}")) .unwrap_or_else(|| "pending".to_string()) ); + println!( + "planner_client_timing_window_samples={}", + state.client_timing_window_samples + ); + println!( + "planner_sink_handoff_window_samples={}", + state.sink_handoff_window_samples + ); println!("planner_detail={}", state.last_reason); } diff --git a/common/Cargo.toml b/common/Cargo.toml index ad7d76d..dda659e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.26" +version = "0.17.27" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index 348e727..fb500e2 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -136,6 +136,8 @@ message UpstreamSyncState { optional float microphone_sink_late_ms = 31; optional float camera_sink_late_p95_ms = 32; optional float microphone_sink_late_p95_ms = 33; + uint64 client_timing_window_samples = 34; + uint64 sink_handoff_window_samples = 35; } message HandshakeSet { diff --git a/scripts/manual/run_upstream_mirrored_av_sync.sh b/scripts/manual/run_upstream_mirrored_av_sync.sh index c7fd7da..171052c 100755 --- a/scripts/manual/run_upstream_mirrored_av_sync.sh +++ b/scripts/manual/run_upstream_mirrored_av_sync.sh @@ -733,6 +733,7 @@ summarize_adaptive_probe_metrics() { python3 - "${ARTIFACT_DIR}" "${LESAVKA_SYNC_TOTAL_SEGMENTS}" "${LESAVKA_SYNC_CALIBRATION_SEGMENTS}" <<'PY' import csv import json +import math import os import sys from pathlib import Path @@ -788,8 +789,204 @@ def range_for(rows, key): } +def is_number(value): + return isinstance(value, (int, float)) and math.isfinite(float(value)) + + +def abs_exceeds(row, key, threshold): + value = row.get(key) + return is_number(value) and abs(float(value)) > threshold + + +def over(row, key, threshold): + value = row.get(key) + return is_number(value) and float(value) > threshold + + +def under(row, key, threshold): + value = row.get(key) + return not is_number(value) or float(value) < threshold + + +def add_finding(findings, layer, severity, signal, detail, next_step): + findings.append({ + "layer": layer, + "severity": severity, + "signal": signal, + "detail": detail, + "next_step": next_step, + }) + + +def diagnose_segment(row): + findings = [] + paired = row.get("probe_paired_pulses") + status = row.get("probe_status", "missing") + reason = row.get("analysis_failure_reason") or row.get("calibration_note") or row.get("decision_note") or "" + if row.get("probe_passed"): + add_finding( + findings, + "none", + "info", + "probe_passed", + "The external probe judged this segment inside the acceptable sync band.", + "Use this segment as a candidate blind target only if confirmation also passes.", + ) + return findings + + if "video did not contain any recognizable color-coded sync pulses" in reason: + add_finding( + findings, + "probe_video", + "blocker", + "no_video_sync_pulses", + "The analyzer could not see the visual sync code in the browser capture.", + "Fix camera framing/focus/exposure or stimulus visibility before treating sync numbers as real.", + ) + elif isinstance(paired, int) and paired < 3: + add_finding( + findings, + "probe_pairing", + "blocker", + "too_few_matching_pairs", + f"The analyzer saw only {paired} matching pulse pairs, below the minimum verdict floor.", + "Treat this as insufficient probe evidence unless raw activity and server blind metrics agree.", + ) + elif isinstance(paired, int) and paired < 8: + add_finding( + findings, + "probe_pairing", + "warning", + "low_pair_count", + f"The analyzer saw {paired} paired pulses, enough for a verdict but below calibration-ready quality.", + "Use the result for direction, not persistence; improve audio/visual pulse detection if this repeats.", + ) + + if under(row, "planner_client_timing_window_samples_after", 30): + add_finding( + findings, + "server_evidence", + "blocker", + "client_timing_window_underfilled", + "The server did not collect enough client timing sidecar samples for blind diagnosis.", + "Keep the sender connected longer or investigate missing timing metadata before calibration.", + ) + if under(row, "planner_sink_handoff_window_samples_after", 30): + add_finding( + findings, + "server_evidence", + "blocker", + "sink_handoff_window_underfilled", + "The server did not collect enough audio/video sink handoff samples for blind diagnosis.", + "Keep the mirrored run alive longer or check whether one sink is not presenting steadily.", + ) + if over(row, "planner_client_send_abs_skew_p95_ms_after", 250): + add_finding( + findings, + "client_uplink", + "blocker", + "client_send_skew_p95_high", + "Client-side audio/video send timing is already unstable before the server receives it.", + "Look at capture queues, encode pressure, USB device behavior, and client CPU scheduling.", + ) + if over(row, "planner_camera_client_queue_age_p95_ms_after", 150) or over(row, "planner_microphone_client_queue_age_p95_ms_after", 150): + add_finding( + findings, + "client_uplink", + "blocker", + "client_queue_age_p95_high", + "One or both client capture queues are aging packets before send.", + "Prefer dropping stale packets or lowering capture/encode pressure instead of adding offset.", + ) + if over(row, "planner_server_receive_abs_skew_p95_ms_after", 250): + add_finding( + findings, + "network_receive", + "blocker", + "server_receive_skew_p95_high", + "Audio/video timing becomes unstable between client send and server receive.", + "Treat this as network/gRPC receive jitter; heal freshness with drop/reanchor policy, not static calibration.", + ) + if over(row, "planner_camera_sink_late_p95_ms_after", 120) or over(row, "planner_microphone_sink_late_p95_ms_after", 120): + add_finding( + findings, + "server_sink_scheduler", + "blocker", + "sink_late_p95_high", + "Packets are reaching the server but one sink is missing its due time.", + "Tune server scheduler/sink handoff and avoid trusting offset-only fixes until lateness falls.", + ) + if over(row, "planner_sink_handoff_abs_skew_p95_ms_after", 250): + add_finding( + findings, + "server_sink_handoff", + "blocker", + "sink_handoff_skew_p95_high", + "Server sink handoff timing is too jittery for a stable blind correction.", + "First reduce handoff jitter; the blind healer should refuse large p95 instability.", + ) + + required_blind_keys = [ + "planner_client_timing_window_samples_after", + "planner_sink_handoff_window_samples_after", + "planner_client_send_abs_skew_p95_ms_after", + "planner_server_receive_abs_skew_p95_ms_after", + "planner_camera_client_queue_age_p95_ms_after", + "planner_microphone_client_queue_age_p95_ms_after", + "planner_camera_sink_late_p95_ms_after", + "planner_microphone_sink_late_p95_ms_after", + "planner_sink_handoff_abs_skew_p95_ms_after", + ] + stable_blind_metrics = ( + all(is_number(row.get(key)) for key in required_blind_keys) + and not under(row, "planner_client_timing_window_samples_after", 30) + and not under(row, "planner_sink_handoff_window_samples_after", 30) + and not over(row, "planner_client_send_abs_skew_p95_ms_after", 250) + and not over(row, "planner_server_receive_abs_skew_p95_ms_after", 250) + and not over(row, "planner_camera_client_queue_age_p95_ms_after", 150) + and not over(row, "planner_microphone_client_queue_age_p95_ms_after", 150) + and not over(row, "planner_camera_sink_late_p95_ms_after", 120) + and not over(row, "planner_microphone_sink_late_p95_ms_after", 120) + and not over(row, "planner_sink_handoff_abs_skew_p95_ms_after", 250) + ) + if stable_blind_metrics and abs_exceeds(row, "planner_sink_handoff_skew_ms_after", 35): + add_finding( + findings, + "server_calibration", + "warning", + "stable_sink_handoff_offset", + "Blind metrics are stable enough and show a consistent server-side handoff offset.", + "The runtime blind healer should make bounded transient nudges from this signal.", + ) + if stable_blind_metrics and over(row, "probe_p95_abs_skew_ms", 80): + add_finding( + findings, + "external_boundary", + "warning", + "probe_fails_while_blind_metrics_stable", + "Client/server timing looks stable but the browser probe still sees skew.", + "Investigate USB gadget output, browser capture, physical mic/camera setup, or probe detector limits.", + ) + if not findings: + add_finding( + findings, + "unknown", + "warning", + status, + "The segment failed, but no single client/server metric crossed the current diagnostic thresholds.", + "Compare per-pulse events and raise timing trace if this pattern repeats.", + ) + return findings + + +def primary_finding(findings): + severity_rank = {"blocker": 0, "warning": 1, "info": 2} + return sorted(findings, key=lambda item: severity_rank.get(item.get("severity"), 9))[0] + + rows = [] event_rows = [] +diagnoses = [] for segment in range(1, segment_count + 1): segment_dir = root / f"segment-{segment}" report_path = latest_report(segment_dir) @@ -872,11 +1069,27 @@ for segment in range(1, segment_count + 1): "planner_microphone_sink_late_ms_after": as_float(planner_after.get("planner_microphone_sink_late_ms")), "planner_camera_sink_late_p95_ms_after": as_float(planner_after.get("planner_camera_sink_late_p95_ms")), "planner_microphone_sink_late_p95_ms_after": as_float(planner_after.get("planner_microphone_sink_late_p95_ms")), + "planner_client_timing_window_samples_after": as_float(planner_after.get("planner_client_timing_window_samples")), + "planner_sink_handoff_window_samples_after": as_float(planner_after.get("planner_sink_handoff_window_samples")), "active_audio_offset_us_before": as_float(calibration_before.get("calibration_active_audio_offset_us")), "active_audio_offset_us_after": as_float(calibration_after.get("calibration_active_audio_offset_us")), "active_video_offset_us_before": as_float(calibration_before.get("calibration_active_video_offset_us")), "active_video_offset_us_after": as_float(calibration_after.get("calibration_active_video_offset_us")), } + findings = diagnose_segment(row) + primary = primary_finding(findings) + row["diagnostic_layer"] = primary.get("layer", "") + row["diagnostic_severity"] = primary.get("severity", "") + row["diagnostic_signal"] = primary.get("signal", "") + row["diagnostic_detail"] = primary.get("detail", "") + diagnoses.append({ + "segment": segment, + "segment_phase": phase, + "probe_status": row["probe_status"], + "probe_passed": row["probe_passed"], + "primary": primary, + "findings": findings, + }) rows.append(row) for event in report.get("paired_events", []): @@ -961,6 +1174,8 @@ if target_source_rows: "planner_microphone_sink_late_ms_after": range_for(target_source_rows, "planner_microphone_sink_late_ms_after"), "planner_camera_sink_late_p95_ms_after": range_for(target_source_rows, "planner_camera_sink_late_p95_ms_after"), "planner_microphone_sink_late_p95_ms_after": range_for(target_source_rows, "planner_microphone_sink_late_p95_ms_after"), + "planner_client_timing_window_samples_after": range_for(target_source_rows, "planner_client_timing_window_samples_after"), + "planner_sink_handoff_window_samples_after": range_for(target_source_rows, "planner_sink_handoff_window_samples_after"), "active_audio_offset_us_after": range_for(target_source_rows, "active_audio_offset_us_after"), "active_video_offset_us_after": range_for(target_source_rows, "active_video_offset_us_after"), "probe_p95_abs_skew_ms": range_for(target_source_rows, "probe_p95_abs_skew_ms"), @@ -1017,6 +1232,55 @@ else: } confirmation_path.write_text(json.dumps(confirmation, indent=2, sort_keys=True) + "\n", encoding="utf-8") +root_cause_path = root / "root-cause-summary.json" +severity_rank = {"blocker": 0, "warning": 1, "info": 2} +if confirmation_rows: + diagnostic_source_segments = {row["segment"] for row in confirmation_rows} + diagnostic_source = "confirmation segments" +else: + diagnostic_source_segments = {row["segment"] for row in rows} + diagnostic_source = "all segments" +source_diagnoses = [ + diagnosis + for diagnosis in diagnoses + if diagnosis.get("segment") in diagnostic_source_segments +] +if not source_diagnoses: + source_diagnoses = diagnoses +primary_diagnosis = None +if source_diagnoses: + primary_diagnosis = sorted( + source_diagnoses, + key=lambda diagnosis: severity_rank.get( + diagnosis.get("primary", {}).get("severity"), + 9, + ), + )[0].get("primary") +root_cause = { + "ready": bool(source_diagnoses), + "source": diagnostic_source, + "primary": primary_diagnosis or { + "layer": "missing", + "severity": "blocker", + "signal": "no_segments", + "detail": "No segment diagnostics were available.", + "next_step": "Inspect run logs before trusting this artifact.", + }, + "thresholds": { + "min_client_timing_window_samples": 30, + "min_sink_handoff_window_samples": 30, + "max_client_send_abs_skew_p95_ms": 250, + "max_server_receive_abs_skew_p95_ms": 250, + "max_client_queue_age_p95_ms": 150, + "max_sink_late_p95_ms": 120, + "max_sink_handoff_abs_skew_p95_ms": 250, + "stable_sink_handoff_deadband_ms": 35, + "acceptable_probe_p95_abs_skew_ms": 80, + }, + "segment_diagnoses": diagnoses, +} +root_cause_path.write_text(json.dumps(root_cause, indent=2, sort_keys=True) + "\n", encoding="utf-8") + print(f" ↪ segment_metrics_csv={csv_path}") print(f" ↪ segment_metrics_jsonl={jsonl_path}") print(f" ↪ segment_events_csv={events_csv_path}") @@ -1025,6 +1289,10 @@ print(f" ↪ blind_targets_json={target_path}") print(f" ↪ blind_targets_ready={str(bool(target.get('ready'))).lower()}") print(f" ↪ confirmation_summary_json={confirmation_path}") print(f" ↪ confirmation_passed={str(bool(confirmation.get('passed'))).lower()}") +print(f" ↪ root_cause_summary_json={root_cause_path}") +print(f" ↪ root_cause_layer={root_cause.get('primary', {}).get('layer')}") +print(f" ↪ root_cause_signal={root_cause.get('primary', {}).get('signal')}") +print(f" ↪ root_cause_detail={root_cause.get('primary', {}).get('detail')}") PY } diff --git a/server/Cargo.toml b/server/Cargo.toml index 34e4fc6..46ff7c5 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.26" +version = "0.17.27" edition = "2024" autobins = false diff --git a/server/src/blind_healer.rs b/server/src/blind_healer.rs new file mode 100644 index 0000000..2f31b81 --- /dev/null +++ b/server/src/blind_healer.rs @@ -0,0 +1,386 @@ +use std::sync::Arc; +use std::time::Duration; + +use crate::calibration::CalibrationStore; +use crate::upstream_media_runtime::{UpstreamMediaRuntime, UpstreamPlannerSnapshot}; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum BlindHealTarget { + Video, + Audio, +} + +#[derive(Clone, Copy, Debug)] +struct BlindHealConfig { + enabled: bool, + target: BlindHealTarget, + min_samples: u64, + deadband_ms: f64, + max_handoff_abs_p95_ms: f64, + max_client_send_abs_p95_ms: f64, + max_server_receive_abs_p95_ms: f64, + max_queue_age_p95_ms: f64, + max_sink_late_p95_ms: f64, + gain: f64, + max_step_us: i64, + interval: Duration, + cooldown: Duration, +} + +#[derive(Clone, Debug, PartialEq)] +struct BlindHealAction { + audio_delta_us: i64, + video_delta_us: i64, + observed_sink_handoff_skew_ms: f64, + observed_client_send_abs_p95_ms: f64, + note: String, +} + +#[cfg(not(coverage))] +pub fn spawn_blind_healer(runtime: Arc, calibration: Arc) { + let config = BlindHealConfig::from_env(); + if !config.enabled { + tracing::info!("upstream blind healer disabled"); + return; + } + + tokio::spawn(async move { + let mut last_adjusted_session_id = 0; + let mut last_adjusted_at: Option = None; + loop { + tokio::time::sleep(config.interval).await; + let snapshot = runtime.snapshot(); + if snapshot.session_id != last_adjusted_session_id { + last_adjusted_session_id = snapshot.session_id; + last_adjusted_at = None; + } + if last_adjusted_at.is_some_and(|last| last.elapsed() < config.cooldown) { + continue; + } + let action = match evaluate_blind_heal_snapshot(&snapshot, config) { + BlindHealDecision::Apply(action) => action, + BlindHealDecision::Wait(reason) => { + tracing::trace!(reason, "upstream blind healer waiting"); + continue; + } + }; + let state = calibration.apply_transient_blind_estimate( + action.audio_delta_us, + action.video_delta_us, + action.observed_sink_handoff_skew_ms as f32, + action.observed_client_send_abs_p95_ms as f32, + action.note.clone(), + ); + last_adjusted_at = Some(tokio::time::Instant::now()); + tracing::info!( + session_id = snapshot.session_id, + audio_delta_us = action.audio_delta_us, + video_delta_us = action.video_delta_us, + sink_handoff_skew_ms = action.observed_sink_handoff_skew_ms, + client_send_abs_p95_ms = action.observed_client_send_abs_p95_ms, + active_audio_offset_us = state.active_audio_offset_us, + active_video_offset_us = state.active_video_offset_us, + "upstream blind healer applied transient calibration nudge" + ); + } + }); +} + +#[derive(Clone, Debug, PartialEq)] +enum BlindHealDecision { + Apply(BlindHealAction), + Wait(&'static str), +} + +fn evaluate_blind_heal_snapshot( + snapshot: &UpstreamPlannerSnapshot, + config: BlindHealConfig, +) -> BlindHealDecision { + if !config.enabled { + return BlindHealDecision::Wait("disabled"); + } + if !matches!(snapshot.phase, "live" | "healing") { + return BlindHealDecision::Wait("not-live"); + } + if snapshot.client_timing_window_samples < config.min_samples { + return BlindHealDecision::Wait("not-enough-client-samples"); + } + if snapshot.sink_handoff_window_samples < config.min_samples { + return BlindHealDecision::Wait("not-enough-sink-samples"); + } + let Some(skew_ms) = snapshot.sink_handoff_skew_ms else { + return BlindHealDecision::Wait("missing-sink-skew"); + }; + if skew_ms.abs() < config.deadband_ms { + return BlindHealDecision::Wait("inside-deadband"); + } + if exceeds( + snapshot.sink_handoff_abs_skew_p95_ms, + config.max_handoff_abs_p95_ms, + ) { + return BlindHealDecision::Wait("sink-handoff-p95-unstable"); + } + if exceeds( + snapshot.client_send_abs_skew_p95_ms, + config.max_client_send_abs_p95_ms, + ) { + return BlindHealDecision::Wait("client-send-p95-unstable"); + } + if exceeds( + snapshot.server_receive_abs_skew_p95_ms, + config.max_server_receive_abs_p95_ms, + ) { + return BlindHealDecision::Wait("server-receive-p95-unstable"); + } + if exceeds( + snapshot.camera_client_queue_age_p95_ms, + config.max_queue_age_p95_ms, + ) || exceeds( + snapshot.microphone_client_queue_age_p95_ms, + config.max_queue_age_p95_ms, + ) { + return BlindHealDecision::Wait("client-queue-p95-unstable"); + } + if exceeds( + snapshot.camera_sink_late_p95_ms, + config.max_sink_late_p95_ms, + ) || exceeds( + snapshot.microphone_sink_late_p95_ms, + config.max_sink_late_p95_ms, + ) { + return BlindHealDecision::Wait("sink-late-p95-unstable"); + } + + let correction_us = clamp_step(skew_ms * config.gain * 1000.0, config.max_step_us); + if correction_us == 0 { + return BlindHealDecision::Wait("rounded-zero"); + } + let (audio_delta_us, video_delta_us) = match config.target { + BlindHealTarget::Audio => (correction_us, 0), + BlindHealTarget::Video => (0, -correction_us), + }; + BlindHealDecision::Apply(BlindHealAction { + audio_delta_us, + video_delta_us, + observed_sink_handoff_skew_ms: skew_ms, + observed_client_send_abs_p95_ms: snapshot.client_send_abs_skew_p95_ms.unwrap_or(0.0), + note: format!( + "runtime blind healer: sink handoff skew {skew_ms:+.1}ms, target={:?}, applying audio {:+.1}ms/video {:+.1}ms", + config.target, + audio_delta_us as f64 / 1000.0, + video_delta_us as f64 / 1000.0 + ), + }) +} + +fn exceeds(value: Option, limit: f64) -> bool { + value.is_none_or(|value| !value.is_finite() || value.abs() > limit) +} + +fn clamp_step(value: f64, max_step_us: i64) -> i64 { + let limit = max_step_us.abs().max(1); + (value.round() as i64).clamp(-limit, limit) +} + +impl BlindHealConfig { + #[cfg(not(coverage))] + fn from_env() -> Self { + Self { + enabled: env_bool("LESAVKA_UPSTREAM_BLIND_HEAL", true), + target: match std::env::var("LESAVKA_UPSTREAM_BLIND_HEAL_TARGET") + .unwrap_or_else(|_| "video".to_string()) + .trim() + .to_ascii_lowercase() + .as_str() + { + "audio" | "mic" | "microphone" => BlindHealTarget::Audio, + _ => BlindHealTarget::Video, + }, + min_samples: env_u64("LESAVKA_UPSTREAM_BLIND_HEAL_MIN_SAMPLES", 30), + deadband_ms: env_f64("LESAVKA_UPSTREAM_BLIND_HEAL_DEADBAND_MS", 35.0), + max_handoff_abs_p95_ms: env_f64( + "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_HANDOFF_P95_MS", + 250.0, + ), + max_client_send_abs_p95_ms: env_f64( + "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_CLIENT_SEND_P95_MS", + 250.0, + ), + max_server_receive_abs_p95_ms: env_f64( + "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_SERVER_RECEIVE_P95_MS", + 250.0, + ), + max_queue_age_p95_ms: env_f64( + "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_QUEUE_AGE_P95_MS", + 150.0, + ), + max_sink_late_p95_ms: env_f64( + "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_SINK_LATE_P95_MS", + 120.0, + ), + gain: env_f64("LESAVKA_UPSTREAM_BLIND_HEAL_GAIN", 0.25).clamp(0.01, 1.0), + max_step_us: env_i64("LESAVKA_UPSTREAM_BLIND_HEAL_MAX_STEP_US", 25_000) + .abs() + .max(1), + interval: Duration::from_millis(env_u64( + "LESAVKA_UPSTREAM_BLIND_HEAL_INTERVAL_MS", + 2_000, + )), + cooldown: Duration::from_millis(env_u64( + "LESAVKA_UPSTREAM_BLIND_HEAL_COOLDOWN_MS", + 8_000, + )), + } + } +} + +#[cfg(not(coverage))] +fn env_bool(name: &str, default: bool) -> bool { + std::env::var(name) + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(default) +} + +#[cfg(not(coverage))] +fn env_u64(name: &str, default: u64) -> u64 { + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(default) +} + +#[cfg(not(coverage))] +fn env_i64(name: &str, default: i64) -> i64 { + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .filter(|value| *value != 0) + .unwrap_or(default) +} + +#[cfg(not(coverage))] +fn env_f64(name: &str, default: f64) -> f64 { + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .filter(|value| value.is_finite() && *value > 0.0) + .unwrap_or(default) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn config() -> BlindHealConfig { + BlindHealConfig { + enabled: true, + target: BlindHealTarget::Video, + min_samples: 30, + deadband_ms: 35.0, + max_handoff_abs_p95_ms: 250.0, + max_client_send_abs_p95_ms: 250.0, + max_server_receive_abs_p95_ms: 250.0, + max_queue_age_p95_ms: 150.0, + max_sink_late_p95_ms: 120.0, + gain: 0.25, + max_step_us: 25_000, + interval: Duration::from_millis(2_000), + cooldown: Duration::from_millis(8_000), + } + } + + fn snapshot() -> UpstreamPlannerSnapshot { + UpstreamPlannerSnapshot { + session_id: 7, + phase: "live", + latest_camera_remote_pts_us: Some(1), + latest_microphone_remote_pts_us: Some(1), + last_video_presented_pts_us: Some(1), + last_audio_presented_pts_us: Some(1), + live_lag_ms: Some(100.0), + planner_skew_ms: Some(0.0), + stale_audio_drops: 0, + stale_video_drops: 0, + skew_video_drops: 0, + freshness_reanchors: 0, + startup_timeouts: 0, + video_freezes: 0, + last_reason: "live".to_string(), + client_capture_skew_ms: Some(0.0), + client_send_skew_ms: Some(0.0), + server_receive_skew_ms: Some(0.0), + camera_client_queue_age_ms: Some(5.0), + microphone_client_queue_age_ms: Some(5.0), + camera_server_receive_age_ms: Some(5.0), + microphone_server_receive_age_ms: Some(5.0), + client_capture_abs_skew_p95_ms: Some(20.0), + client_send_abs_skew_p95_ms: Some(20.0), + server_receive_abs_skew_p95_ms: Some(20.0), + camera_client_queue_age_p95_ms: Some(20.0), + microphone_client_queue_age_p95_ms: Some(20.0), + sink_handoff_skew_ms: Some(100.0), + sink_handoff_abs_skew_p95_ms: Some(110.0), + camera_sink_late_ms: Some(0.0), + microphone_sink_late_ms: Some(0.0), + camera_sink_late_p95_ms: Some(10.0), + microphone_sink_late_p95_ms: Some(10.0), + client_timing_window_samples: 60, + sink_handoff_window_samples: 60, + } + } + + #[test] + fn blind_healer_nudges_video_opposite_sink_handoff_skew() { + let decision = evaluate_blind_heal_snapshot(&snapshot(), config()); + assert_eq!( + decision, + BlindHealDecision::Apply(BlindHealAction { + audio_delta_us: 0, + video_delta_us: -25_000, + observed_sink_handoff_skew_ms: 100.0, + observed_client_send_abs_p95_ms: 20.0, + note: "runtime blind healer: sink handoff skew +100.0ms, target=Video, applying audio +0.0ms/video -25.0ms".to_string(), + }) + ); + } + + #[test] + fn blind_healer_refuses_when_under_sampled_or_unstable() { + let mut low_samples = snapshot(); + low_samples.sink_handoff_window_samples = 1; + assert_eq!( + evaluate_blind_heal_snapshot(&low_samples, config()), + BlindHealDecision::Wait("not-enough-sink-samples") + ); + + let mut noisy_network = snapshot(); + noisy_network.server_receive_abs_skew_p95_ms = Some(400.0); + assert_eq!( + evaluate_blind_heal_snapshot(&noisy_network, config()), + BlindHealDecision::Wait("server-receive-p95-unstable") + ); + } + + #[test] + fn blind_healer_can_target_audio_when_requested() { + let mut config = config(); + config.target = BlindHealTarget::Audio; + let decision = evaluate_blind_heal_snapshot(&snapshot(), config); + assert!(matches!( + decision, + BlindHealDecision::Apply(BlindHealAction { + audio_delta_us: 25_000, + video_delta_us: 0, + .. + }) + )); + } +} diff --git a/server/src/calibration.rs b/server/src/calibration.rs index 13c2cfd..d59294c 100644 --- a/server/src/calibration.rs +++ b/server/src/calibration.rs @@ -150,6 +150,36 @@ impl CalibrationStore { persist_snapshot(&self.path, &state)?; Ok(state.to_proto()) } + + pub fn apply_transient_blind_estimate( + &self, + audio_delta_us: i64, + video_delta_us: i64, + observed_delivery_skew_ms: f32, + observed_enqueue_skew_ms: f32, + note: impl Into, + ) -> ProtoCalibrationState { + let mut state = self.state.lock().expect("calibration mutex poisoned"); + state.active_audio_offset_us = + clamp_offset(state.active_audio_offset_us.saturating_add(audio_delta_us)); + state.active_video_offset_us = + clamp_offset(state.active_video_offset_us.saturating_add(video_delta_us)); + state.source = "blind".to_string(); + state.confidence = "runtime-estimated".to_string(); + let note = note.into(); + state.detail = if note.trim().is_empty() { + format!( + "transient blind estimate from relay telemetry: delivery skew {:.1}ms, enqueue skew {:.1}ms", + observed_delivery_skew_ms, observed_enqueue_skew_ms + ) + } else { + note + }; + touch(&mut state); + self.runtime + .set_playout_offsets(state.active_video_offset_us, state.active_audio_offset_us); + state.to_proto() + } } impl CalibrationSnapshot { @@ -736,4 +766,36 @@ mod tests { }, ); } + + #[test] + fn transient_blind_estimate_updates_runtime_without_persisting_active_file_state() { + let dir = tempfile::tempdir().expect("calibration dir"); + let path = dir.path().join("calibration.toml"); + let path_string = path.to_string_lossy().to_string(); + temp_env::with_var( + "LESAVKA_CALIBRATION_PATH", + Some(path_string.as_str()), + || { + let runtime = Arc::new(UpstreamMediaRuntime::new()); + let store = CalibrationStore::load(runtime.clone()); + let before_raw = std::fs::read_to_string(&path).ok(); + + let state = store.apply_transient_blind_estimate( + 0, + -12_000, + 48.0, + 7.0, + "runtime blind healer nudge", + ); + + assert_eq!(state.source, "blind"); + assert_eq!(state.confidence, "runtime-estimated"); + assert_eq!( + runtime.playout_offsets(), + (FACTORY_MJPEG_VIDEO_OFFSET_US - 12_000, 0) + ); + assert_eq!(std::fs::read_to_string(&path).ok(), before_raw); + }, + ); + } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 7f64034..ab7126c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -5,6 +5,7 @@ pub const REVISION: &str = env!("LESAVKA_GIT_SHA"); pub const BUILD_ID: &str = REVISION; pub mod audio; +pub mod blind_healer; pub mod calibration; pub mod camera; pub mod camera_runtime; diff --git a/server/src/main/handler_startup.rs b/server/src/main/handler_startup.rs index b3dec6c..f5e3de7 100644 --- a/server/src/main/handler_startup.rs +++ b/server/src/main/handler_startup.rs @@ -47,6 +47,11 @@ impl Handler { let upstream_media_rt = Arc::new(UpstreamMediaRuntime::new()); let calibration = Arc::new(CalibrationStore::load(upstream_media_rt.clone())); + #[cfg(not(coverage))] + lesavka_server::blind_healer::spawn_blind_healer( + upstream_media_rt.clone(), + calibration.clone(), + ); Ok(Self { kb: Arc::new(Mutex::new(kb)), diff --git a/server/src/main/rpc_helpers.rs b/server/src/main/rpc_helpers.rs index 8c6af1e..1d98919 100644 --- a/server/src/main/rpc_helpers.rs +++ b/server/src/main/rpc_helpers.rs @@ -227,6 +227,8 @@ impl Handler { microphone_sink_late_p95_ms: snapshot .microphone_sink_late_p95_ms .map(|value| value as f32), + client_timing_window_samples: snapshot.client_timing_window_samples, + sink_handoff_window_samples: snapshot.sink_handoff_window_samples, })) } } diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index e5e6cfc..428364c 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -256,6 +256,8 @@ impl UpstreamMediaRuntime { .map(presentation_late_ms), camera_sink_late_p95_ms: state.camera_sink_late_window_ms.p95(), microphone_sink_late_p95_ms: state.microphone_sink_late_window_ms.p95(), + client_timing_window_samples: state.client_capture_skew_window_ms.len() as u64, + sink_handoff_window_samples: state.sink_handoff_skew_window_ms.len() as u64, } } } diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs index 67dae52..99fbce6 100644 --- a/server/src/upstream_media_runtime/state.rs +++ b/server/src/upstream_media_runtime/state.rs @@ -38,6 +38,10 @@ impl UpstreamScalarWindow { pub fn p95(&self) -> Option { percentile(self.values.iter().copied(), 0.95) } + + pub fn len(&self) -> usize { + self.values.len() + } } fn percentile(values: impl Iterator, quantile: f64) -> Option { diff --git a/server/src/upstream_media_runtime/types.rs b/server/src/upstream_media_runtime/types.rs index 356faa3..27b4c2e 100644 --- a/server/src/upstream_media_runtime/types.rs +++ b/server/src/upstream_media_runtime/types.rs @@ -100,4 +100,6 @@ pub struct UpstreamPlannerSnapshot { pub microphone_sink_late_ms: Option, pub camera_sink_late_p95_ms: Option, pub microphone_sink_late_p95_ms: Option, + pub client_timing_window_samples: u64, + pub sink_handoff_window_samples: u64, }