lesavka/server/src/calibration.rs

501 lines
22 KiB
Rust

use crate::upstream_media_runtime::UpstreamMediaRuntime;
use anyhow::{Context, Result};
use chrono::Utc;
use lesavka_common::lesavka::{
CalibrationAction, CalibrationRequest, CalibrationState as ProtoCalibrationState,
};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
mod mode_env;
mod profile_offsets;
use mode_env::{current_uvc_mode, lookup_mode_offset_us};
pub use profile_offsets::{
FACTORY_HEVC_AUDIO_MODE_OFFSETS_US, FACTORY_HEVC_AUDIO_OFFSET_US,
FACTORY_HEVC_OPUS_AUDIO_MODE_OFFSETS_US, FACTORY_HEVC_OPUS_AUDIO_OFFSET_US,
FACTORY_HEVC_OPUS_VIDEO_MODE_OFFSETS_US, FACTORY_HEVC_OPUS_VIDEO_OFFSET_US,
FACTORY_HEVC_PCM_AUDIO_MODE_OFFSETS_US, FACTORY_HEVC_PCM_AUDIO_OFFSET_US,
FACTORY_HEVC_PCM_VIDEO_MODE_OFFSETS_US, FACTORY_HEVC_PCM_VIDEO_OFFSET_US,
FACTORY_HEVC_VIDEO_MODE_OFFSETS_US, FACTORY_HEVC_VIDEO_OFFSET_1280X720_20_US,
FACTORY_HEVC_VIDEO_OFFSET_1280X720_30_US, FACTORY_HEVC_VIDEO_OFFSET_1920X1080_20_US,
FACTORY_HEVC_VIDEO_OFFSET_1920X1080_30_US, FACTORY_HEVC_VIDEO_OFFSET_US,
FACTORY_MJPEG_AUDIO_MODE_OFFSETS_US, FACTORY_MJPEG_AUDIO_OFFSET_US,
FACTORY_MJPEG_OPUS_AUDIO_MODE_OFFSETS_US, FACTORY_MJPEG_OPUS_AUDIO_OFFSET_US,
FACTORY_MJPEG_OPUS_VIDEO_MODE_OFFSETS_US, FACTORY_MJPEG_OPUS_VIDEO_OFFSET_1280X720_20_US,
FACTORY_MJPEG_OPUS_VIDEO_OFFSET_1280X720_30_US,
FACTORY_MJPEG_OPUS_VIDEO_OFFSET_1920X1080_20_US,
FACTORY_MJPEG_OPUS_VIDEO_OFFSET_1920X1080_30_US, FACTORY_MJPEG_OPUS_VIDEO_OFFSET_US,
FACTORY_MJPEG_PCM_AUDIO_MODE_OFFSETS_US, FACTORY_MJPEG_PCM_AUDIO_OFFSET_US,
FACTORY_MJPEG_PCM_VIDEO_MODE_OFFSETS_US, FACTORY_MJPEG_PCM_VIDEO_OFFSET_US,
FACTORY_MJPEG_VIDEO_MODE_OFFSETS_US, FACTORY_MJPEG_VIDEO_OFFSET_1280X720_20_US,
FACTORY_MJPEG_VIDEO_OFFSET_1280X720_30_US, FACTORY_MJPEG_VIDEO_OFFSET_1920X1080_20_US,
FACTORY_MJPEG_VIDEO_OFFSET_1920X1080_30_US, FACTORY_MJPEG_VIDEO_OFFSET_US,
};
use profile_offsets::{
configured_profile_offset_us, current_profile, factory_audio_mode_offsets_us,
factory_audio_scalar_offset_us, factory_video_mode_offsets_us, factory_video_scalar_offset_us,
normalize_calibration_profile,
};
const LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = -45_000;
const PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 720_000;
const PREVIOUS_TUNED_MJPEG_AUDIO_OFFSET_US: i64 = 1_260_000;
const PREVIOUS_FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 0;
const PREVIOUS_DELAYED_FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 350_000;
const PREVIOUS_BROWSER_FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 130_000;
const PREVIOUS_SCALAR_FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 170_000;
const PREVIOUS_OVERSHOT_FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 1_090_000;
const FACTORY_CONFIDENCE: &str = "factory";
const PREVIOUS_OFFSET_LIMIT_US: i64 = 500_000;
const OFFSET_LIMIT_US: i64 = 1_500_000;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CalibrationMedia {
Audio,
Video,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CalibrationSnapshot {
profile: String,
factory_audio_offset_us: i64,
factory_video_offset_us: i64,
default_audio_offset_us: i64,
default_video_offset_us: i64,
active_audio_offset_us: i64,
active_video_offset_us: i64,
source: String,
confidence: String,
updated_at: String,
detail: String,
}
#[derive(Debug)]
pub struct CalibrationStore {
path: PathBuf,
runtime: Arc<UpstreamMediaRuntime>,
state: Mutex<CalibrationSnapshot>,
}
impl CalibrationStore {
/// Keeps `load` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
pub fn load(runtime: Arc<UpstreamMediaRuntime>) -> Self {
let path = calibration_path();
let state = std::fs::read_to_string(&path)
.ok()
.map(|raw| migrate_legacy_snapshot(parse_snapshot(&raw)))
.unwrap_or_else(snapshot_from_env);
runtime.set_playout_offsets(state.active_video_offset_us, state.active_audio_offset_us);
Self {
path,
runtime,
state: Mutex::new(state),
}
}
pub fn current(&self) -> ProtoCalibrationState {
self.state
.lock()
.expect("calibration mutex poisoned")
.to_proto()
}
/// Keeps `apply` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
pub fn apply(&self, request: CalibrationRequest) -> Result<ProtoCalibrationState> {
let mut state = self.state.lock().expect("calibration mutex poisoned");
let action =
CalibrationAction::try_from(request.action).unwrap_or(CalibrationAction::Unspecified);
match action {
CalibrationAction::Unspecified => {}
CalibrationAction::RestoreDefault => {
state.active_audio_offset_us = state.default_audio_offset_us;
state.active_video_offset_us = state.default_video_offset_us;
state.source = "default".to_string();
state.confidence = "saved-default".to_string();
state.detail = "restored saved upstream A/V calibration".to_string();
touch(&mut state);
}
CalibrationAction::RestoreFactory => {
*state = factory_snapshot_from_env(format!(
"restored release-shipped {} upstream calibration",
current_profile()
));
}
CalibrationAction::AdjustActive => {
state.active_audio_offset_us = clamp_offset(
state
.active_audio_offset_us
.saturating_add(request.audio_delta_us),
);
state.active_video_offset_us = clamp_offset(
state
.active_video_offset_us
.saturating_add(request.video_delta_us),
);
state.source = "manual".to_string();
state.confidence = "manual".to_string();
state.detail = format!(
"manual upstream A/V calibration nudge: audio {:+.1}ms, video {:+.1}ms",
request.audio_delta_us as f64 / 1000.0,
request.video_delta_us as f64 / 1000.0
);
touch(&mut state);
}
CalibrationAction::BlindEstimate => {
state.active_audio_offset_us = clamp_offset(
state
.active_audio_offset_us
.saturating_add(request.audio_delta_us),
);
state.active_video_offset_us = clamp_offset(
state
.active_video_offset_us
.saturating_add(request.video_delta_us),
);
state.source = "blind".to_string();
state.confidence = "estimated".to_string();
state.detail = if request.note.trim().is_empty() {
format!(
"blind estimate applied from relay telemetry: delivery skew {:.1}ms, enqueue skew {:.1}ms",
request.observed_delivery_skew_ms, request.observed_enqueue_skew_ms
)
} else {
request.note
};
touch(&mut state);
}
CalibrationAction::SaveActiveAsDefault => {
state.default_audio_offset_us = state.active_audio_offset_us;
state.default_video_offset_us = state.active_video_offset_us;
state.source = "default".to_string();
state.confidence = "measured".to_string();
state.detail = "saved current upstream A/V calibration as site default".to_string();
touch(&mut state);
}
}
self.runtime
.set_playout_offsets(state.active_video_offset_us, state.active_audio_offset_us);
persist_snapshot(&self.path, &state)?;
Ok(state.to_proto())
}
/// Keeps `apply_transient_blind_estimate` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
pub fn apply_transient_blind_estimate(
&self,
audio_delta_us: i64,
video_delta_us: i64,
observed_delivery_skew_ms: f32,
observed_enqueue_skew_ms: f32,
note: impl Into<String>,
) -> ProtoCalibrationState {
let mut state = self.state.lock().expect("calibration mutex poisoned");
state.active_audio_offset_us =
clamp_offset(state.active_audio_offset_us.saturating_add(audio_delta_us));
state.active_video_offset_us =
clamp_offset(state.active_video_offset_us.saturating_add(video_delta_us));
state.source = "blind".to_string();
state.confidence = "runtime-estimated".to_string();
let note = note.into();
state.detail = if note.trim().is_empty() {
format!(
"transient blind estimate from relay telemetry: delivery skew {:.1}ms, enqueue skew {:.1}ms",
observed_delivery_skew_ms, observed_enqueue_skew_ms
)
} else {
note
};
touch(&mut state);
self.runtime
.set_playout_offsets(state.active_video_offset_us, state.active_audio_offset_us);
state.to_proto()
}
}
impl CalibrationSnapshot {
/// Keeps `to_proto` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
fn to_proto(&self) -> ProtoCalibrationState {
ProtoCalibrationState {
profile: self.profile.clone(),
factory_audio_offset_us: self.factory_audio_offset_us,
factory_video_offset_us: self.factory_video_offset_us,
default_audio_offset_us: self.default_audio_offset_us,
default_video_offset_us: self.default_video_offset_us,
active_audio_offset_us: self.active_audio_offset_us,
active_video_offset_us: self.active_video_offset_us,
source: self.source.clone(),
confidence: self.confidence.clone(),
updated_at: self.updated_at.clone(),
detail: self.detail.clone(),
}
}
}
pub fn calibration_path() -> PathBuf {
std::env::var("LESAVKA_CALIBRATION_PATH")
.ok()
.filter(|path| !path.trim().is_empty())
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/var/lib/lesavka/calibration.toml"))
}
/// Resolve the startup playout offset for the current calibration profile.
///
/// Inputs: media kind plus process env. Output: microsecond offset selected
/// from profile-specific env or factory maps. Why: the upstream runtime starts
/// before the persisted calibration store is loaded, so it still needs the same
/// camera/audio profile fork used by the durable calibration state.
pub fn configured_playout_offset_us(media: CalibrationMedia) -> i64 {
let mode = current_uvc_mode();
let profile = current_profile();
let (media_name, factory_map, factory_scalar, stale) = match media {
CalibrationMedia::Audio => (
"AUDIO",
factory_audio_mode_offsets_us(&profile),
factory_audio_scalar_offset_us(&profile),
is_stale_audio_offset_us as fn(i64) -> bool,
),
CalibrationMedia::Video => (
"VIDEO",
factory_video_mode_offsets_us(&profile),
factory_video_scalar_offset_us(&profile),
is_stale_video_offset_us as fn(i64) -> bool,
),
};
configured_profile_offset_us(&profile, media_name, mode.as_deref(), stale)
.or_else(|| {
mode.as_deref()
.and_then(|mode| lookup_mode_offset_us(factory_map, mode))
})
.unwrap_or(factory_scalar)
}
/// Keeps `snapshot_from_env` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
fn snapshot_from_env() -> CalibrationSnapshot {
let mode = current_uvc_mode();
let profile = current_profile();
let factory_audio_mode_offsets_us = factory_audio_mode_offsets_us(&profile);
let factory_video_mode_offsets_us = factory_video_mode_offsets_us(&profile);
let factory_audio_scalar_offset_us = factory_audio_scalar_offset_us(&profile);
let factory_video_scalar_offset_us = factory_video_scalar_offset_us(&profile);
let factory_audio_offset_us = mode
.as_deref()
.and_then(|mode| lookup_mode_offset_us(factory_audio_mode_offsets_us, mode))
.unwrap_or(factory_audio_scalar_offset_us);
let factory_video_offset_us = mode
.as_deref()
.and_then(|mode| lookup_mode_offset_us(factory_video_mode_offsets_us, mode))
.unwrap_or(factory_video_scalar_offset_us);
let env_audio =
configured_profile_offset_us(&profile, "AUDIO", mode.as_deref(), is_stale_audio_offset_us);
let env_video =
configured_profile_offset_us(&profile, "VIDEO", mode.as_deref(), is_stale_video_offset_us);
let default_audio_offset_us = env_audio.unwrap_or(factory_audio_offset_us);
let default_video_offset_us = env_video.unwrap_or(factory_video_offset_us);
let source = if env_audio.is_some() || env_video.is_some() {
"env".to_string()
} else {
"factory".to_string()
};
let confidence = if source == "factory" {
FACTORY_CONFIDENCE.to_string()
} else {
"configured".to_string()
};
CalibrationSnapshot {
profile,
factory_audio_offset_us,
factory_video_offset_us,
default_audio_offset_us,
default_video_offset_us,
active_audio_offset_us: default_audio_offset_us,
active_video_offset_us: default_video_offset_us,
source,
confidence,
updated_at: Utc::now().to_rfc3339(),
detail: "loaded upstream A/V calibration defaults".to_string(),
}
}
/// Keeps `parse_snapshot` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
fn parse_snapshot(raw: &str) -> CalibrationSnapshot {
let fallback = snapshot_from_env();
let value = |key: &str| -> Option<String> {
raw.lines().find_map(|line| {
let trimmed = line.trim();
let (left, right) = trimmed.split_once('=')?;
(left.trim() == key).then(|| right.trim().trim_matches('"').to_string())
})
};
let number = |key: &str, default: i64| -> i64 {
value(key)
.and_then(|raw| raw.parse::<i64>().ok())
.map(clamp_offset)
.unwrap_or(default)
};
CalibrationSnapshot {
profile: value("profile").unwrap_or(fallback.profile),
factory_audio_offset_us: fallback.factory_audio_offset_us,
factory_video_offset_us: fallback.factory_video_offset_us,
default_audio_offset_us: number(
"default_audio_offset_us",
fallback.default_audio_offset_us,
),
default_video_offset_us: number(
"default_video_offset_us",
fallback.default_video_offset_us,
),
active_audio_offset_us: number("active_audio_offset_us", fallback.active_audio_offset_us),
active_video_offset_us: number("active_video_offset_us", fallback.active_video_offset_us),
source: value("source").unwrap_or(fallback.source),
confidence: value("confidence").unwrap_or(fallback.confidence),
updated_at: value("updated_at").unwrap_or(fallback.updated_at),
detail: value("detail").unwrap_or(fallback.detail),
}
}
/// Keeps `migrate_legacy_snapshot` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
fn migrate_legacy_snapshot(mut state: CalibrationSnapshot) -> CalibrationSnapshot {
let current_profile = current_profile();
let stored_profile = normalize_calibration_profile(&state.profile);
let source_allows_migration = matches!(state.source.as_str(), "factory" | "env");
let confidence_allows_migration = matches!(state.confidence.as_str(), "factory" | "configured");
let detail_allows_profile_migration = state
.detail
.contains("loaded upstream A/V calibration defaults")
|| state.detail.contains("restored release-shipped");
if stored_profile != current_profile
&& source_allows_migration
&& confidence_allows_migration
&& detail_allows_profile_migration
{
let mut replacement = factory_snapshot_from_env(format!(
"migrated factory upstream A/V calibration profile from {} to {}",
stored_profile, current_profile
));
replacement.detail = format!(
"migrated factory upstream A/V calibration profile from {} to {}",
stored_profile, replacement.profile
);
return replacement;
}
let clamped_previous_baseline = matches!(
state.default_audio_offset_us,
PREVIOUS_OFFSET_LIMIT_US | OFFSET_LIMIT_US
) && state
.detail
.contains("loaded upstream A/V calibration defaults");
let untouched_legacy_audio = (is_stale_audio_offset_us(state.default_audio_offset_us)
|| state.default_audio_offset_us == state.factory_audio_offset_us
|| clamped_previous_baseline)
&& state.active_audio_offset_us == state.default_audio_offset_us;
let untouched_legacy_video = is_stale_video_offset_us(state.default_video_offset_us)
&& state.active_video_offset_us == state.default_video_offset_us;
if stored_profile == current_profile
&& source_allows_migration
&& confidence_allows_migration
&& untouched_legacy_audio
&& untouched_legacy_video
{
let old_audio_offset_us = state.default_audio_offset_us;
let old_video_offset_us = state.default_video_offset_us;
state.default_audio_offset_us = state.factory_audio_offset_us;
state.active_audio_offset_us = state.factory_audio_offset_us;
state.default_video_offset_us = state.factory_video_offset_us;
state.active_video_offset_us = state.factory_video_offset_us;
state.profile = current_profile;
state.source = "factory".to_string();
state.confidence = FACTORY_CONFIDENCE.to_string();
state.detail = format!(
"migrated legacy MJPEG upstream A/V baseline from audio {:+.1}ms/video {:+.1}ms to audio {:+.1}ms/video {:+.1}ms",
old_audio_offset_us as f64 / 1000.0,
old_video_offset_us as f64 / 1000.0,
state.factory_audio_offset_us as f64 / 1000.0,
state.factory_video_offset_us as f64 / 1000.0
);
touch(&mut state);
}
state
}
/// Builds a persisted calibration snapshot from the current profile factory.
///
/// Inputs: human-readable audit detail. Output: calibration state whose active
/// and default offsets are reset to the profile-specific factory values.
/// Why: restore/migration paths must not accidentally revive stale MJPEG
/// offsets after the server is running an HEVC decode-to-MJPEG profile.
fn factory_snapshot_from_env(detail: impl Into<String>) -> CalibrationSnapshot {
let mut state = snapshot_from_env();
state.default_audio_offset_us = state.factory_audio_offset_us;
state.default_video_offset_us = state.factory_video_offset_us;
state.active_audio_offset_us = state.factory_audio_offset_us;
state.active_video_offset_us = state.factory_video_offset_us;
state.source = "factory".to_string();
state.confidence = FACTORY_CONFIDENCE.to_string();
state.detail = detail.into();
touch(&mut state);
state
}
/// Keeps `persist_snapshot` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
fn persist_snapshot(path: &PathBuf, state: &CalibrationSnapshot) -> Result<()> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating calibration directory {}", parent.display()))?;
}
std::fs::write(path, serialize_snapshot(state))
.with_context(|| format!("writing calibration state {}", path.display()))
}
/// Keeps `serialize_snapshot` explicit because it sits on calibration state, where persisted and factory offsets must stay auditable.
/// Inputs are the typed parameters; output is the return value or side effect.
fn serialize_snapshot(state: &CalibrationSnapshot) -> String {
format!(
"profile=\"{}\"\ndefault_audio_offset_us={}\ndefault_video_offset_us={}\nactive_audio_offset_us={}\nactive_video_offset_us={}\nsource=\"{}\"\nconfidence=\"{}\"\nupdated_at=\"{}\"\ndetail=\"{}\"\n",
escape_value(&state.profile),
state.default_audio_offset_us,
state.default_video_offset_us,
state.active_audio_offset_us,
state.active_video_offset_us,
escape_value(&state.source),
escape_value(&state.confidence),
escape_value(&state.updated_at),
escape_value(&state.detail),
)
}
fn touch(state: &mut CalibrationSnapshot) {
state.updated_at = Utc::now().to_rfc3339();
}
fn is_stale_audio_offset_us(offset: i64) -> bool {
matches!(
offset,
LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US
| PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US
| PREVIOUS_TUNED_MJPEG_AUDIO_OFFSET_US
)
}
fn is_stale_video_offset_us(offset: i64) -> bool {
matches!(
offset,
PREVIOUS_FACTORY_MJPEG_VIDEO_OFFSET_US
| PREVIOUS_DELAYED_FACTORY_MJPEG_VIDEO_OFFSET_US
| PREVIOUS_BROWSER_FACTORY_MJPEG_VIDEO_OFFSET_US
| PREVIOUS_SCALAR_FACTORY_MJPEG_VIDEO_OFFSET_US
| PREVIOUS_OVERSHOT_FACTORY_MJPEG_VIDEO_OFFSET_US
)
}
fn clamp_offset(value: i64) -> i64 {
value.clamp(-OFFSET_LIMIT_US, OFFSET_LIMIT_US)
}
fn escape_value(value: &str) -> String {
value.replace('\\', "\\\\").replace('"', "\\\"")
}
#[cfg(test)]
#[path = "calibration/tests/mod.rs"]
mod tests;