use std::sync::Arc; use std::time::Duration; use crate::calibration::CalibrationStore; use crate::upstream_media_runtime::{UpstreamMediaRuntime, UpstreamPlannerSnapshot}; #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum BlindHealTarget { Video, Audio, } #[derive(Clone, Copy, Debug)] struct BlindHealConfig { enabled: bool, target: BlindHealTarget, min_samples: u64, deadband_ms: f64, max_handoff_abs_p95_ms: f64, max_client_send_abs_p95_ms: f64, max_server_receive_abs_p95_ms: f64, max_queue_age_p95_ms: f64, max_sink_late_p95_ms: f64, gain: f64, max_step_us: i64, interval: Duration, cooldown: Duration, } #[derive(Clone, Debug, PartialEq)] struct BlindHealAction { audio_delta_us: i64, video_delta_us: i64, observed_sink_handoff_skew_ms: f64, observed_client_send_abs_p95_ms: f64, note: String, } #[cfg(not(coverage))] pub fn spawn_blind_healer(runtime: Arc, calibration: Arc) { let config = BlindHealConfig::from_env(); if !config.enabled { tracing::info!("upstream blind healer disabled"); return; } tokio::spawn(async move { let mut last_adjusted_session_id = 0; let mut last_adjusted_at: Option = None; loop { tokio::time::sleep(config.interval).await; let snapshot = runtime.snapshot(); if snapshot.session_id != last_adjusted_session_id { last_adjusted_session_id = snapshot.session_id; last_adjusted_at = None; } if last_adjusted_at.is_some_and(|last| last.elapsed() < config.cooldown) { continue; } let action = match evaluate_blind_heal_snapshot(&snapshot, config) { BlindHealDecision::Apply(action) => action, BlindHealDecision::Wait(reason) => { tracing::trace!(reason, "upstream blind healer waiting"); continue; } }; let state = calibration.apply_transient_blind_estimate( action.audio_delta_us, action.video_delta_us, action.observed_sink_handoff_skew_ms as f32, action.observed_client_send_abs_p95_ms as f32, action.note.clone(), ); last_adjusted_at = Some(tokio::time::Instant::now()); tracing::info!( session_id = snapshot.session_id, audio_delta_us = action.audio_delta_us, video_delta_us = action.video_delta_us, sink_handoff_skew_ms = action.observed_sink_handoff_skew_ms, client_send_abs_p95_ms = action.observed_client_send_abs_p95_ms, active_audio_offset_us = state.active_audio_offset_us, active_video_offset_us = state.active_video_offset_us, "upstream blind healer applied transient calibration nudge" ); } }); } #[derive(Clone, Debug, PartialEq)] enum BlindHealDecision { Apply(BlindHealAction), Wait(&'static str), } fn evaluate_blind_heal_snapshot( snapshot: &UpstreamPlannerSnapshot, config: BlindHealConfig, ) -> BlindHealDecision { if !config.enabled { return BlindHealDecision::Wait("disabled"); } if !matches!(snapshot.phase, "live" | "healing") { return BlindHealDecision::Wait("not-live"); } if snapshot.client_timing_window_samples < config.min_samples { return BlindHealDecision::Wait("not-enough-client-samples"); } if snapshot.sink_handoff_window_samples < config.min_samples { return BlindHealDecision::Wait("not-enough-sink-samples"); } let Some(skew_ms) = snapshot.sink_handoff_skew_ms else { return BlindHealDecision::Wait("missing-sink-skew"); }; if skew_ms.abs() < config.deadband_ms { return BlindHealDecision::Wait("inside-deadband"); } if exceeds( snapshot.sink_handoff_abs_skew_p95_ms, config.max_handoff_abs_p95_ms, ) { return BlindHealDecision::Wait("sink-handoff-p95-unstable"); } if exceeds( snapshot.client_send_abs_skew_p95_ms, config.max_client_send_abs_p95_ms, ) { return BlindHealDecision::Wait("client-send-p95-unstable"); } if exceeds( snapshot.server_receive_abs_skew_p95_ms, config.max_server_receive_abs_p95_ms, ) { return BlindHealDecision::Wait("server-receive-p95-unstable"); } if exceeds( snapshot.camera_client_queue_age_p95_ms, config.max_queue_age_p95_ms, ) || exceeds( snapshot.microphone_client_queue_age_p95_ms, config.max_queue_age_p95_ms, ) { return BlindHealDecision::Wait("client-queue-p95-unstable"); } if exceeds( snapshot.camera_sink_late_p95_ms, config.max_sink_late_p95_ms, ) || exceeds( snapshot.microphone_sink_late_p95_ms, config.max_sink_late_p95_ms, ) { return BlindHealDecision::Wait("sink-late-p95-unstable"); } let correction_us = clamp_step(skew_ms * config.gain * 1000.0, config.max_step_us); if correction_us == 0 { return BlindHealDecision::Wait("rounded-zero"); } let (audio_delta_us, video_delta_us) = match config.target { BlindHealTarget::Audio => (correction_us, 0), BlindHealTarget::Video => (0, -correction_us), }; BlindHealDecision::Apply(BlindHealAction { audio_delta_us, video_delta_us, observed_sink_handoff_skew_ms: skew_ms, observed_client_send_abs_p95_ms: snapshot.client_send_abs_skew_p95_ms.unwrap_or(0.0), note: format!( "runtime blind healer: sink handoff skew {skew_ms:+.1}ms, target={:?}, applying audio {:+.1}ms/video {:+.1}ms", config.target, audio_delta_us as f64 / 1000.0, video_delta_us as f64 / 1000.0 ), }) } fn exceeds(value: Option, limit: f64) -> bool { value.is_none_or(|value| !value.is_finite() || value.abs() > limit) } fn clamp_step(value: f64, max_step_us: i64) -> i64 { let limit = max_step_us.abs().max(1); (value.round() as i64).clamp(-limit, limit) } impl BlindHealConfig { #[cfg(not(coverage))] fn from_env() -> Self { Self { enabled: env_bool("LESAVKA_UPSTREAM_BLIND_HEAL", true), target: match std::env::var("LESAVKA_UPSTREAM_BLIND_HEAL_TARGET") .unwrap_or_else(|_| "video".to_string()) .trim() .to_ascii_lowercase() .as_str() { "audio" | "mic" | "microphone" => BlindHealTarget::Audio, _ => BlindHealTarget::Video, }, min_samples: env_u64("LESAVKA_UPSTREAM_BLIND_HEAL_MIN_SAMPLES", 30), deadband_ms: env_f64("LESAVKA_UPSTREAM_BLIND_HEAL_DEADBAND_MS", 35.0), max_handoff_abs_p95_ms: env_f64( "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_HANDOFF_P95_MS", 250.0, ), max_client_send_abs_p95_ms: env_f64( "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_CLIENT_SEND_P95_MS", 250.0, ), max_server_receive_abs_p95_ms: env_f64( "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_SERVER_RECEIVE_P95_MS", 250.0, ), max_queue_age_p95_ms: env_f64( "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_QUEUE_AGE_P95_MS", 150.0, ), max_sink_late_p95_ms: env_f64( "LESAVKA_UPSTREAM_BLIND_HEAL_MAX_SINK_LATE_P95_MS", 120.0, ), gain: env_f64("LESAVKA_UPSTREAM_BLIND_HEAL_GAIN", 0.25).clamp(0.01, 1.0), max_step_us: env_i64("LESAVKA_UPSTREAM_BLIND_HEAL_MAX_STEP_US", 25_000) .abs() .max(1), interval: Duration::from_millis(env_u64( "LESAVKA_UPSTREAM_BLIND_HEAL_INTERVAL_MS", 2_000, )), cooldown: Duration::from_millis(env_u64( "LESAVKA_UPSTREAM_BLIND_HEAL_COOLDOWN_MS", 8_000, )), } } } #[cfg(not(coverage))] fn env_bool(name: &str, default: bool) -> bool { std::env::var(name) .ok() .map(|value| { let trimmed = value.trim(); !(trimmed.eq_ignore_ascii_case("0") || trimmed.eq_ignore_ascii_case("false") || trimmed.eq_ignore_ascii_case("no") || trimmed.eq_ignore_ascii_case("off")) }) .unwrap_or(default) } #[cfg(not(coverage))] fn env_u64(name: &str, default: u64) -> u64 { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value > 0) .unwrap_or(default) } #[cfg(not(coverage))] fn env_i64(name: &str, default: i64) -> i64 { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| *value != 0) .unwrap_or(default) } #[cfg(not(coverage))] fn env_f64(name: &str, default: f64) -> f64 { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) .filter(|value| value.is_finite() && *value > 0.0) .unwrap_or(default) } #[cfg(test)] mod tests { use super::*; fn config() -> BlindHealConfig { BlindHealConfig { enabled: true, target: BlindHealTarget::Video, min_samples: 30, deadband_ms: 35.0, max_handoff_abs_p95_ms: 250.0, max_client_send_abs_p95_ms: 250.0, max_server_receive_abs_p95_ms: 250.0, max_queue_age_p95_ms: 150.0, max_sink_late_p95_ms: 120.0, gain: 0.25, max_step_us: 25_000, interval: Duration::from_millis(2_000), cooldown: Duration::from_millis(8_000), } } fn snapshot() -> UpstreamPlannerSnapshot { UpstreamPlannerSnapshot { session_id: 7, phase: "live", latest_camera_remote_pts_us: Some(1), latest_microphone_remote_pts_us: Some(1), last_video_presented_pts_us: Some(1), last_audio_presented_pts_us: Some(1), live_lag_ms: Some(100.0), planner_skew_ms: Some(0.0), stale_audio_drops: 0, stale_video_drops: 0, skew_video_drops: 0, freshness_reanchors: 0, startup_timeouts: 0, video_freezes: 0, last_reason: "live".to_string(), client_capture_skew_ms: Some(0.0), client_send_skew_ms: Some(0.0), server_receive_skew_ms: Some(0.0), camera_client_queue_age_ms: Some(5.0), microphone_client_queue_age_ms: Some(5.0), camera_server_receive_age_ms: Some(5.0), microphone_server_receive_age_ms: Some(5.0), client_capture_abs_skew_p95_ms: Some(20.0), client_send_abs_skew_p95_ms: Some(20.0), server_receive_abs_skew_p95_ms: Some(20.0), camera_client_queue_age_p95_ms: Some(20.0), microphone_client_queue_age_p95_ms: Some(20.0), sink_handoff_skew_ms: Some(100.0), sink_handoff_abs_skew_p95_ms: Some(110.0), camera_sink_late_ms: Some(0.0), microphone_sink_late_ms: Some(0.0), camera_sink_late_p95_ms: Some(10.0), microphone_sink_late_p95_ms: Some(10.0), client_timing_window_samples: 60, sink_handoff_window_samples: 60, } } #[test] fn blind_healer_nudges_video_opposite_sink_handoff_skew() { let decision = evaluate_blind_heal_snapshot(&snapshot(), config()); assert_eq!( decision, BlindHealDecision::Apply(BlindHealAction { audio_delta_us: 0, video_delta_us: -25_000, observed_sink_handoff_skew_ms: 100.0, observed_client_send_abs_p95_ms: 20.0, note: "runtime blind healer: sink handoff skew +100.0ms, target=Video, applying audio +0.0ms/video -25.0ms".to_string(), }) ); } #[test] fn blind_healer_refuses_when_under_sampled_or_unstable() { let mut low_samples = snapshot(); low_samples.sink_handoff_window_samples = 1; assert_eq!( evaluate_blind_heal_snapshot(&low_samples, config()), BlindHealDecision::Wait("not-enough-sink-samples") ); let mut noisy_network = snapshot(); noisy_network.server_receive_abs_skew_p95_ms = Some(400.0); assert_eq!( evaluate_blind_heal_snapshot(&noisy_network, config()), BlindHealDecision::Wait("server-receive-p95-unstable") ); } #[test] fn blind_healer_can_target_audio_when_requested() { let mut config = config(); config.target = BlindHealTarget::Audio; let decision = evaluate_blind_heal_snapshot(&snapshot(), config); assert!(matches!( decision, BlindHealDecision::Apply(BlindHealAction { audio_delta_us: 25_000, video_delta_us: 0, .. }) )); } }