#![forbid(unsafe_code)] use std::collections::VecDeque; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use crate::calibration::{ FACTORY_MJPEG_AUDIO_MODE_OFFSETS_US, FACTORY_MJPEG_AUDIO_OFFSET_US, FACTORY_MJPEG_VIDEO_MODE_OFFSETS_US, FACTORY_MJPEG_VIDEO_OFFSET_US, }; const TIMING_WINDOW_CAPACITY: usize = 240; #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamMediaKind { Camera, Microphone, } #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct UpstreamClientTiming { pub capture_pts_us: u64, pub send_pts_us: u64, pub queue_depth: u32, pub queue_age_ms: u32, } #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct UpstreamStreamLease { pub session_id: u64, pub generation: u64, } #[derive(Clone, Copy, Debug)] pub struct PlannedUpstreamPacket { pub local_pts_us: u64, pub due_at: Instant, pub late_by: Duration, pub source_lag: Duration, } #[derive(Clone, Copy, Debug)] pub enum UpstreamPlanDecision { AwaitingPair, DropBeforeOverlap, DropStale(&'static str), StartupFailed(&'static str), Play(PlannedUpstreamPacket), } #[derive(Clone, Debug)] pub struct UpstreamPlannerSnapshot { pub session_id: u64, pub phase: &'static str, pub latest_camera_remote_pts_us: Option, pub latest_microphone_remote_pts_us: Option, pub last_video_presented_pts_us: Option, pub last_audio_presented_pts_us: Option, pub live_lag_ms: Option, pub planner_skew_ms: Option, pub stale_audio_drops: u64, pub stale_video_drops: u64, pub skew_video_drops: u64, pub freshness_reanchors: u64, pub startup_timeouts: u64, pub video_freezes: u64, pub last_reason: String, pub client_capture_skew_ms: Option, pub client_send_skew_ms: Option, pub server_receive_skew_ms: Option, pub camera_client_queue_age_ms: Option, pub microphone_client_queue_age_ms: Option, pub camera_server_receive_age_ms: Option, pub microphone_server_receive_age_ms: Option, pub client_capture_abs_skew_p95_ms: Option, pub client_send_abs_skew_p95_ms: Option, pub server_receive_abs_skew_p95_ms: Option, pub camera_client_queue_age_p95_ms: Option, pub microphone_client_queue_age_p95_ms: Option, pub sink_handoff_skew_ms: Option, pub sink_handoff_abs_skew_p95_ms: Option, pub camera_sink_late_ms: Option, pub microphone_sink_late_ms: Option, pub camera_sink_late_p95_ms: Option, pub microphone_sink_late_p95_ms: Option, pub client_timing_window_samples: u64, pub sink_handoff_window_samples: u64, } #[derive(Clone, Copy, Debug)] struct TimingSample { capture_pts_us: u64, send_pts_us: u64, queue_age_ms: u32, received_at: Instant, } #[derive(Clone, Copy, Debug)] struct PresentationSample { due_at: Instant, handed_at: Instant, } #[derive(Debug, Default)] struct ScalarWindow { values: VecDeque, } impl ScalarWindow { /// Keeps `push` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn push(&mut self, value: f64) { if self.values.len() >= TIMING_WINDOW_CAPACITY { self.values.pop_front(); } self.values.push_back(value); } fn p95(&self) -> Option { percentile(self.values.iter().copied(), 0.95) } fn p95_abs(&self) -> Option { percentile(self.values.iter().map(|value| value.abs()), 0.95) } fn len(&self) -> usize { self.values.len() } } #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] enum UpstreamSyncPhase { #[default] Acquiring, Syncing, Live, Healing, } impl UpstreamSyncPhase { /// Keeps `as_str` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn as_str(self) -> &'static str { match self { Self::Acquiring => "acquiring", Self::Syncing => "syncing", Self::Live => "live", Self::Healing => "healing", } } } #[derive(Debug, Default)] struct RuntimeState { session_id: u64, active_camera_generation: Option, active_microphone_generation: Option, phase: UpstreamSyncPhase, session_started_at: Option, base_remote_pts_us: Option, playout_epoch: Option, latest_camera_remote_pts_us: Option, latest_microphone_remote_pts_us: Option, last_video_local_pts_us: Option, last_audio_local_pts_us: Option, last_video_presented_pts_us: Option, last_audio_presented_pts_us: Option, latest_camera_timing: Option, latest_microphone_timing: Option, latest_camera_presentation: Option, latest_microphone_presentation: Option, latest_paired_client_capture_skew_ms: Option, latest_paired_client_send_skew_ms: Option, latest_paired_server_receive_skew_ms: Option, client_capture_skew_window_ms: ScalarWindow, client_send_skew_window_ms: ScalarWindow, server_receive_skew_window_ms: ScalarWindow, camera_client_queue_age_window_ms: ScalarWindow, microphone_client_queue_age_window_ms: ScalarWindow, sink_handoff_skew_window_ms: ScalarWindow, camera_sink_late_window_ms: ScalarWindow, microphone_sink_late_window_ms: ScalarWindow, stale_audio_drops: u64, stale_video_drops: u64, skew_video_drops: u64, freshness_reanchors: u64, startup_timeouts: u64, video_freezes: u64, last_reason: String, } #[derive(Debug)] pub struct UpstreamMediaRuntime { next_session_id: AtomicU64, next_camera_generation: AtomicU64, next_microphone_generation: AtomicU64, microphone_sink_gate: Arc, camera_playout_offset_us: AtomicI64, microphone_playout_offset_us: AtomicI64, state: Mutex, } include!("upstream_media_runtime/stream_lifecycle_methods.rs"); include!("upstream_media_runtime/planner_snapshot_methods.rs"); include!("upstream_media_runtime/playout_planning_methods.rs"); impl Default for UpstreamMediaRuntime { fn default() -> Self { Self::new() } } /// Keeps `reset_session_state` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn reset_session_state(state: &mut RuntimeState) { state.base_remote_pts_us = None; state.playout_epoch = None; state.latest_camera_remote_pts_us = None; state.latest_microphone_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; state.last_video_presented_pts_us = None; state.last_audio_presented_pts_us = None; state.latest_camera_timing = None; state.latest_microphone_timing = None; state.latest_camera_presentation = None; state.latest_microphone_presentation = None; state.latest_paired_client_capture_skew_ms = None; state.latest_paired_client_send_skew_ms = None; state.latest_paired_server_receive_skew_ms = None; state.stale_audio_drops = 0; state.stale_video_drops = 0; state.skew_video_drops = 0; state.freshness_reanchors = 0; state.startup_timeouts = 0; state.video_freezes = 0; } /// Keeps `record_timing_pair` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn record_timing_pair(state: &mut RuntimeState) { let (Some(camera), Some(microphone)) = (state.latest_camera_timing, state.latest_microphone_timing) else { return; }; let capture_skew_ms = delta_ms(microphone.capture_pts_us, camera.capture_pts_us); let send_skew_ms = delta_ms(microphone.send_pts_us, camera.send_pts_us); let receive_skew_ms = signed_duration_ms(microphone.received_at, camera.received_at); state.latest_paired_client_capture_skew_ms = Some(capture_skew_ms); state.latest_paired_client_send_skew_ms = Some(send_skew_ms); state.latest_paired_server_receive_skew_ms = Some(receive_skew_ms); state.client_capture_skew_window_ms.push(capture_skew_ms); state.client_send_skew_window_ms.push(send_skew_ms); state.server_receive_skew_window_ms.push(receive_skew_ms); } /// Keeps `record_presentation` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn record_presentation(state: &mut RuntimeState, kind: UpstreamMediaKind, due_at: Instant) { let sample = PresentationSample { due_at, handed_at: Instant::now(), }; match kind { UpstreamMediaKind::Camera => { state.latest_camera_presentation = Some(sample); state .camera_sink_late_window_ms .push(presentation_late_ms(sample)); } UpstreamMediaKind::Microphone => { state.latest_microphone_presentation = Some(sample); state .microphone_sink_late_window_ms .push(presentation_late_ms(sample)); } } if let Some(skew) = latest_sink_handoff_skew_ms(state) { state.sink_handoff_skew_window_ms.push(skew); } } fn live_lag_ms(state: &RuntimeState) -> Option { let latest = state .latest_camera_remote_pts_us .into_iter() .chain(state.latest_microphone_remote_pts_us) .max()?; let base = state.base_remote_pts_us.unwrap_or(latest); Some(latest.saturating_sub(base) as f64 / 1000.0) } /// Keeps `planner_skew_ms` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn planner_skew_ms(state: &RuntimeState) -> Option { match ( state.last_audio_presented_pts_us, state.last_video_presented_pts_us, ) { (Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0), _ => None, } } fn latest_sink_handoff_skew_ms(state: &RuntimeState) -> Option { let camera = state.latest_camera_presentation?; let microphone = state.latest_microphone_presentation?; Some(presentation_late_signed_ms(microphone) - presentation_late_signed_ms(camera)) } fn presentation_late_ms(sample: PresentationSample) -> f64 { presentation_late_signed_ms(sample).max(0.0) } fn presentation_late_signed_ms(sample: PresentationSample) -> f64 { signed_duration_ms(sample.handed_at, sample.due_at) } fn age_ms(now: Instant, then: Instant) -> f64 { now.saturating_duration_since(then).as_secs_f64() * 1000.0 } /// Keeps `signed_duration_ms` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn signed_duration_ms(left: Instant, right: Instant) -> f64 { if left >= right { left.duration_since(right).as_secs_f64() * 1000.0 } else { -(right.duration_since(left).as_secs_f64() * 1000.0) } } fn delta_ms(left_us: u64, right_us: u64) -> f64 { (left_us as i128 - right_us as i128) as f64 / 1000.0 } /// Keeps `percentile` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn percentile(values: impl Iterator, quantile: f64) -> Option { let mut sorted = values.filter(|value| value.is_finite()).collect::>(); if sorted.is_empty() { return None; } sorted.sort_by(|left, right| left.total_cmp(right)); let index = ((sorted.len() - 1) as f64 * quantile.clamp(0.0, 1.0)).ceil() as usize; sorted.get(index).copied() } fn upstream_playout_delay() -> Duration { let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS") .ok() .and_then(|value| value.trim().parse::().ok()) .unwrap_or(80); Duration::from_millis(delay_ms) } /// Keeps `playout_offset_us` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn playout_offset_us(kind: UpstreamMediaKind) -> i64 { let (scalar_name, mode_map_name, factory_map, factory_offset_us) = match kind { UpstreamMediaKind::Camera => ( "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_MODE_OFFSETS_US", FACTORY_MJPEG_VIDEO_MODE_OFFSETS_US, FACTORY_MJPEG_VIDEO_OFFSET_US, ), UpstreamMediaKind::Microphone => ( "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_MODE_OFFSETS_US", FACTORY_MJPEG_AUDIO_MODE_OFFSETS_US, FACTORY_MJPEG_AUDIO_OFFSET_US, ), }; let mode = current_uvc_mode(); mode.as_deref() .and_then(|mode| env_mode_offset_us(mode_map_name, mode)) .or_else(|| env_i64(scalar_name)) .or_else(|| { mode.as_deref() .and_then(|mode| lookup_mode_offset_us(factory_map, mode)) }) .unwrap_or(factory_offset_us) } fn current_uvc_mode() -> Option { let width = env_u32("LESAVKA_UVC_WIDTH")?; let height = env_u32("LESAVKA_UVC_HEIGHT")?; let fps = env_u32("LESAVKA_UVC_FPS") .or_else(|| { env_u32("LESAVKA_UVC_INTERVAL") .and_then(|interval| (interval > 0).then_some(10_000_000 / interval)) })? .max(1); Some(format!("{width}x{height}@{fps}")) } fn env_mode_offset_us(name: &str, mode: &str) -> Option { std::env::var(name) .ok() .and_then(|map| lookup_mode_offset_us(&map, mode)) } fn lookup_mode_offset_us(map: &str, mode: &str) -> Option { map.split(',').find_map(|entry| { let (key, value) = entry.trim().split_once('=')?; (key.trim() == mode) .then(|| value.trim().parse::().ok()) .flatten() }) } fn env_i64(name: &str) -> Option { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) } fn env_u32(name: &str) -> Option { std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) } /// Keeps `apply_offset` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn apply_offset(instant: Instant, offset_us: i64) -> Instant { if offset_us >= 0 { instant + Duration::from_micros(offset_us as u64) } else { instant .checked_sub(Duration::from_micros(offset_us.unsigned_abs())) .unwrap_or(instant) } } #[cfg(test)] #[path = "upstream_media_runtime/tests/mod.rs"] mod tests;