#![forbid(unsafe_code)] use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::info; mod config; mod state; mod types; use config::{ apply_playout_offset, upstream_audio_master_wait_grace, upstream_camera_startup_grace_us, upstream_max_live_lag, upstream_pairing_master_slack, upstream_playout_delay, upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_require_paired_startup, upstream_startup_timeout, upstream_timing_trace_enabled, }; use state::{UpstreamClockState, UpstreamSyncPhase}; pub use types::{ PlannedUpstreamPacket, UpstreamMediaKind, UpstreamPlanDecision, UpstreamPlannerSnapshot, UpstreamStreamLease, }; /// Coordinate upstream stream ownership and keep audio/video on one timeline. /// /// Inputs: stream-open/close events plus remote packet timestamps. /// Outputs: active-stream leases and rebased local PTS values. /// Why: live calls need one current webcam owner, one current microphone owner, /// and one shared media clock so reconnects do not leave old sinks alive or let /// audio/video drift onto separate timing islands. #[derive(Debug)] pub struct UpstreamMediaRuntime { next_session_id: AtomicU64, next_camera_generation: AtomicU64, next_microphone_generation: AtomicU64, microphone_sink_gate: Arc, pairing_state_notify: Arc, audio_progress_notify: Arc, camera_playout_offset_us: AtomicI64, microphone_playout_offset_us: AtomicI64, state: Mutex, } impl UpstreamMediaRuntime { /// Build an empty upstream runtime. #[must_use] pub fn new() -> Self { Self { next_session_id: AtomicU64::new(0), next_camera_generation: AtomicU64::new(0), next_microphone_generation: AtomicU64::new(0), microphone_sink_gate: Arc::new(Semaphore::new(1)), pairing_state_notify: Arc::new(Notify::new()), audio_progress_notify: Arc::new(Notify::new()), camera_playout_offset_us: AtomicI64::new(upstream_playout_offset_us( UpstreamMediaKind::Camera, )), microphone_playout_offset_us: AtomicI64::new(upstream_playout_offset_us( UpstreamMediaKind::Microphone, )), state: Mutex::new(UpstreamClockState::default()), } } /// Apply live upstream playout offsets without restarting the relay. pub fn set_playout_offsets(&self, camera_offset_us: i64, microphone_offset_us: i64) { self.camera_playout_offset_us .store(camera_offset_us, Ordering::Relaxed); self.microphone_playout_offset_us .store(microphone_offset_us, Ordering::Relaxed); } /// Return `(camera_offset_us, microphone_offset_us)` currently used for live playout. #[must_use] pub fn playout_offsets(&self) -> (i64, i64) { ( self.camera_playout_offset_us.load(Ordering::Relaxed), self.microphone_playout_offset_us.load(Ordering::Relaxed), ) } fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { match kind { UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed), UpstreamMediaKind::Microphone => { self.microphone_playout_offset_us.load(Ordering::Relaxed) } } } fn positive_audio_delay_allowance_us(&self) -> u64 { let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed); let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed); microphone_offset_us.saturating_sub(camera_offset_us).max(0) as u64 } fn audio_ahead_video_allowance_us(&self) -> u64 { let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed); let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed); camera_offset_us.saturating_sub(microphone_offset_us).max(0) as u64 } fn intentional_future_wait_allowance_us(&self, kind: UpstreamMediaKind) -> u64 { match kind { UpstreamMediaKind::Camera => self.audio_ahead_video_allowance_us(), UpstreamMediaKind::Microphone => self.positive_audio_delay_allowance_us(), } } /// Mark one audio chunk as actually handed to the UAC sink. pub fn mark_audio_presented(&self, local_pts_us: u64) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); state.last_audio_presented_pts_us = Some(local_pts_us); if state.phase != UpstreamSyncPhase::Failed { state.phase = UpstreamSyncPhase::Live; state.last_reason = "audio-master playhead flowing".to_string(); } self.audio_progress_notify.notify_waiters(); } /// Mark one video frame as actually handed to the UVC/HDMI sink. pub fn mark_video_presented(&self, local_pts_us: u64) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); state.last_video_presented_pts_us = Some(local_pts_us); if state.phase != UpstreamSyncPhase::Failed { state.phase = UpstreamSyncPhase::Live; state.last_reason = "video follower emitted a synced frame".to_string(); } } /// Record that video intentionally froze instead of showing an out-of-sync frame. pub fn record_video_freeze(&self, reason: impl Into) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); state.video_freezes = state.video_freezes.saturating_add(1); if state.phase != UpstreamSyncPhase::Failed { state.phase = UpstreamSyncPhase::Healing; } state.last_reason = reason.into(); } /// Return current planner facts for diagnostics and probe artifacts. #[must_use] pub fn snapshot(&self) -> UpstreamPlannerSnapshot { let state = self .state .lock() .expect("upstream media state mutex poisoned"); let live_lag_ms = live_lag_us(&state).map(us_to_ms); let planner_skew_ms = match ( state.last_audio_presented_pts_us, state.last_video_presented_pts_us, ) { (Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0), _ => None, }; UpstreamPlannerSnapshot { session_id: state.session_id, phase: state.phase.as_str(), latest_camera_remote_pts_us: state.latest_camera_remote_pts_us, latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us, last_video_presented_pts_us: state.last_video_presented_pts_us, last_audio_presented_pts_us: state.last_audio_presented_pts_us, live_lag_ms, planner_skew_ms, stale_audio_drops: state.stale_audio_drops, stale_video_drops: state.stale_video_drops, skew_video_drops: state.skew_video_drops, freshness_reanchors: state.freshness_reanchors, startup_timeouts: state.startup_timeouts, video_freezes: state.video_freezes, last_reason: state.last_reason.clone(), } } } include!("upstream_media_runtime/lease_lifecycle.rs"); impl UpstreamMediaRuntime { /// Rebase one upstream video packet timestamp onto the shared session clock. #[must_use] pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option { match self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } /// Rebase one upstream audio packet timestamp onto the shared session clock. #[must_use] pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option { match self.plan_audio_pts(remote_pts_us) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } /// Rebase and schedule one upstream video packet on the shared playout epoch. #[must_use] pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision { self.plan_pts( UpstreamMediaKind::Camera, remote_pts_us, frame_step_us.max(1), ) } /// Rebase and schedule one upstream audio packet on the shared playout epoch. #[must_use] pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision { self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } /// Hold video until the audio master has at least reached the same capture /// moment, or until the bounded sync grace is exhausted. pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { let slack_us = upstream_pairing_master_slack() .as_micros() .min(u64::MAX as u128) as u64; let audio_delay_allowance_us = self.positive_audio_delay_allowance_us(); let deadline = due_at + upstream_audio_master_wait_grace(); loop { let notified = self.audio_progress_notify.notified(); { let state = self .state .lock() .expect("upstream media state mutex poisoned"); if state.active_microphone_generation.is_none() { return true; } let audio_presented_pts_us = state.last_audio_presented_pts_us.unwrap_or(0); if audio_presented_pts_us .saturating_add(slack_us) .saturating_add(audio_delay_allowance_us) >= video_local_pts_us { return true; } } if Instant::now() >= deadline { return false; } tokio::select! { _ = notified => {} _ = tokio::time::sleep_until(deadline) => return false, } } } fn plan_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, ) -> UpstreamPlanDecision { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); let session_id = state.session_id; let packet_count = match kind { UpstreamMediaKind::Camera => { state.camera_packet_count = state.camera_packet_count.saturating_add(1); state.camera_packet_count } UpstreamMediaKind::Microphone => { state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); state.microphone_packet_count } }; update_latest_remote_pts(&mut state, kind, remote_pts_us); let mut first_remote_for_kind = match kind { UpstreamMediaKind::Camera => { let first_slot = &mut state.first_camera_remote_pts_us; *first_slot.get_or_insert(remote_pts_us) } UpstreamMediaKind::Microphone => { let first_slot = &mut state.first_microphone_remote_pts_us; *first_slot.get_or_insert(remote_pts_us) } }; if kind == UpstreamMediaKind::Camera { let startup_grace_us = upstream_camera_startup_grace_us(); if !state.camera_startup_ready && (startup_grace_us == 0 || remote_pts_us.saturating_sub(first_remote_for_kind) >= startup_grace_us) { state.camera_startup_ready = true; state.first_camera_remote_pts_us = Some(remote_pts_us); first_remote_for_kind = remote_pts_us; } } let now = Instant::now(); let pairing_deadline = *state .pairing_anchor_deadline .get_or_insert_with(|| now + upstream_playout_delay()); let playout_delay = upstream_playout_delay(); let max_live_lag = upstream_max_live_lag(); if state.session_base_remote_pts_us.is_none() { if state.session_started_at.is_some_and(|started_at| { now.saturating_duration_since(started_at) > upstream_startup_timeout() }) { state.phase = UpstreamSyncPhase::Failed; state.startup_timeouts = state.startup_timeouts.saturating_add(1); state.last_reason = "paired upstream startup did not converge before timeout".to_string(); return UpstreamPlanDecision::StartupFailed( "paired upstream startup did not converge before timeout", ); } if state.first_camera_remote_pts_us.is_some() && state.first_microphone_remote_pts_us.is_some() && state.camera_startup_ready { let first_camera_remote_pts_us = state.first_camera_remote_pts_us.unwrap_or_default(); let first_microphone_remote_pts_us = state.first_microphone_remote_pts_us.unwrap_or_default(); state.session_base_remote_pts_us = Some(first_camera_remote_pts_us.max(first_microphone_remote_pts_us)); let overlap_epoch = now + playout_delay; state.playout_epoch = Some(overlap_epoch); state.pairing_anchor_deadline = Some(overlap_epoch); state.phase = UpstreamSyncPhase::Syncing; state.last_reason = "fresh audio/video overlap anchor established".to_string(); if !state.startup_anchor_logged { let startup_delta_us = first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128; info!( session_id, first_camera_remote_pts_us, first_microphone_remote_pts_us, overlap_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or_default(), startup_delta_us, "upstream media overlap anchors established" ); state.startup_anchor_logged = true; } self.pairing_state_notify.notify_waiters(); } else if now < pairing_deadline { state.phase = UpstreamSyncPhase::Acquiring; state.last_reason = "awaiting both upstream media streams".to_string(); if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { info!( session_id, ?kind, packet_count, remote_pts_us, wait_ms = pairing_deadline.saturating_duration_since(now).as_millis(), "upstream media packet buffered while awaiting the counterpart stream" ); } return UpstreamPlanDecision::AwaitingPair; } else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready { state.phase = UpstreamSyncPhase::Syncing; state.last_reason = "camera startup warm-up is still in progress".to_string(); if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { info!( session_id, ?kind, packet_count, remote_pts_us, "upstream media packet buffered while camera startup warm-up is still in progress" ); } return UpstreamPlanDecision::AwaitingPair; } else if upstream_require_paired_startup() { let refreshed = refresh_unpaired_pairing_anchor( &mut state, kind, remote_pts_us, now + playout_delay, ); if refreshed || upstream_timing_trace_enabled() { info!( session_id, ?kind, packet_count, remote_pts_us, refreshed_anchor = refreshed, healing_window_ms = playout_delay.as_millis(), "upstream media pairing window expired; holding one-sided stream for synced startup" ); } state.phase = UpstreamSyncPhase::Syncing; state.last_reason = "holding one-sided stream for synced startup".to_string(); return UpstreamPlanDecision::AwaitingPair; } else { let single_stream_base_remote_pts_us = match kind { UpstreamMediaKind::Camera => { state.first_camera_remote_pts_us.unwrap_or(remote_pts_us) } UpstreamMediaKind::Microphone => state .first_microphone_remote_pts_us .unwrap_or(remote_pts_us), }; state.session_base_remote_pts_us = Some(single_stream_base_remote_pts_us); let one_sided_epoch = now + playout_delay; state.playout_epoch = Some(one_sided_epoch); state.pairing_anchor_deadline = Some(one_sided_epoch); info!( session_id, ?kind, single_stream_base_remote_pts_us, "upstream media pairing window expired; continuing with one-sided playout" ); self.pairing_state_notify.notify_waiters(); } } let session_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or(remote_pts_us); if remote_pts_us < session_base_remote_pts_us { if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { info!( session_id, ?kind, packet_count, remote_pts_us, session_base_remote_pts_us, "upstream media packet dropped before the shared overlap base" ); } return UpstreamPlanDecision::DropBeforeOverlap; } let source_lag = source_lag_for_kind(&state, kind, remote_pts_us); if source_lag > max_live_lag { match kind { UpstreamMediaKind::Camera => { state.stale_video_drops = state.stale_video_drops.saturating_add(1); state.video_freezes = state.video_freezes.saturating_add(1); state.last_reason = "dropped stale video beyond max live lag".to_string(); } UpstreamMediaKind::Microphone => { state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); state.last_reason = "dropped stale audio beyond max live lag".to_string(); } } state.phase = UpstreamSyncPhase::Healing; return UpstreamPlanDecision::DropStale("packet exceeded max live lag"); } let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, }; if let Some(last_pts_us) = *last_slot && local_pts_us <= last_pts_us { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); let audio_ahead_video_allowance_us = self.audio_ahead_video_allowance_us(); if kind == UpstreamMediaKind::Camera && state .last_audio_presented_pts_us .is_some_and(|audio_pts_us| { video_is_too_far_behind_audio( local_pts_us, audio_pts_us, audio_ahead_video_allowance_us, ) }) { state.skew_video_drops = state.skew_video_drops.saturating_add(1); state.video_freezes = state.video_freezes.saturating_add(1); state.phase = UpstreamSyncPhase::Healing; state.last_reason = "dropped video frame that was too far behind the audio master".to_string(); return UpstreamPlanDecision::DropStale("video frame was too far behind audio master"); } let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); let sink_offset_us = self.playout_offset_us(kind); let playout_delay = upstream_playout_delay().min(max_live_lag); let mut due_at = apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); let intentional_future_wait_allowance = Duration::from_micros(self.intentional_future_wait_allowance_us(kind)); let max_future_wait = max_live_lag .saturating_sub(source_lag) .saturating_add(intentional_future_wait_allowance); let due_future_wait = due_at.saturating_duration_since(now); if late_by > reanchor_threshold || due_future_wait > max_future_wait { let old_late_by = late_by; let old_future_wait = due_future_wait; let desired_delay = playout_delay.min(max_future_wait); let desired_due_at = now + desired_delay; let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); let recovered_epoch = unoffset_due_at .checked_sub(Duration::from_micros(local_pts_us)) .unwrap_or(unoffset_due_at); state.playout_epoch = Some(recovered_epoch); state.pairing_anchor_deadline = Some(desired_due_at); state.freshness_reanchors = state.freshness_reanchors.saturating_add(1); state.phase = UpstreamSyncPhase::Healing; state.last_reason = "reanchored upstream playhead to preserve freshness".to_string(); due_at = apply_playout_offset( recovered_epoch + Duration::from_micros(local_pts_us), sink_offset_us, ); late_by = now.checked_duration_since(due_at).unwrap_or_default(); info!( session_id, ?kind, packet_count, local_pts_us, remote_pts_us, old_late_by_ms = old_late_by.as_millis(), old_future_wait_ms = old_future_wait.as_millis(), recovery_buffer_ms = playout_delay.as_millis(), reanchor_threshold_ms = reanchor_threshold.as_millis(), max_live_lag_ms = max_live_lag.as_millis(), source_lag_ms = source_lag.as_millis(), "upstream media playhead reanchored to preserve freshness" ); } let predicted_lag_at_playout = source_lag.saturating_add(due_at.saturating_duration_since(now)); if predicted_lag_at_playout > max_live_lag.saturating_add(intentional_future_wait_allowance) { match kind { UpstreamMediaKind::Camera => { state.stale_video_drops = state.stale_video_drops.saturating_add(1); state.video_freezes = state.video_freezes.saturating_add(1); state.last_reason = "dropped video that would exceed max live lag at playout".to_string(); } UpstreamMediaKind::Microphone => { state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); state.last_reason = "dropped audio that would exceed max live lag at playout".to_string(); } } state.phase = UpstreamSyncPhase::Healing; return UpstreamPlanDecision::DropStale("packet would exceed max live lag at playout"); } if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { let playout_delay_us = due_at.saturating_duration_since(now).as_micros(); let late_by_us = late_by.as_micros(); info!( session_id, ?kind, packet_count, remote_pts_us, session_base_remote_pts_us, first_remote_for_kind, remote_elapsed_us = remote_pts_us.saturating_sub(session_base_remote_pts_us), local_pts_us, playout_delay_us, sink_offset_us, late_by_us, source_lag_us = source_lag.as_micros(), "upstream media rebase sample" ); } if kind == UpstreamMediaKind::Microphone { self.audio_progress_notify.notify_waiters(); } UpstreamPlanDecision::Play(PlannedUpstreamPacket { local_pts_us, due_at, late_by, source_lag, }) } } fn update_latest_remote_pts( state: &mut UpstreamClockState, kind: UpstreamMediaKind, remote_pts_us: u64, ) { let slot = match kind { UpstreamMediaKind::Camera => &mut state.latest_camera_remote_pts_us, UpstreamMediaKind::Microphone => &mut state.latest_microphone_remote_pts_us, }; *slot = Some((*slot).unwrap_or(remote_pts_us).max(remote_pts_us)); } fn source_lag_for_kind( state: &UpstreamClockState, kind: UpstreamMediaKind, remote_pts_us: u64, ) -> Duration { let latest = match kind { UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us, UpstreamMediaKind::Microphone => state.latest_microphone_remote_pts_us, } .unwrap_or(remote_pts_us); Duration::from_micros(latest.saturating_sub(remote_pts_us)) } fn video_is_too_far_behind_audio( video_pts_us: u64, audio_pts_us: u64, audio_ahead_video_allowance_us: u64, ) -> bool { let slack_us = (upstream_pairing_master_slack() .as_micros() .min(u64::MAX as u128) as u64) .saturating_add(audio_ahead_video_allowance_us); video_pts_us.saturating_add(slack_us) < audio_pts_us } fn live_lag_us(state: &UpstreamClockState) -> Option { let latest_audio = state.latest_microphone_remote_pts_us?; let audio_playhead = state.last_audio_presented_pts_us?; let base = state.session_base_remote_pts_us?; Some(latest_audio.saturating_sub(base.saturating_add(audio_playhead))) } fn us_to_ms(value: u64) -> f64 { value as f64 / 1000.0 } fn refresh_unpaired_pairing_anchor( state: &mut UpstreamClockState, kind: UpstreamMediaKind, remote_pts_us: u64, next_deadline: Instant, ) -> bool { state.pairing_anchor_deadline = Some(next_deadline); match kind { UpstreamMediaKind::Camera if state.first_microphone_remote_pts_us.is_none() => { state.first_camera_remote_pts_us = Some(remote_pts_us); true } UpstreamMediaKind::Microphone if state.first_camera_remote_pts_us.is_none() => { state.first_microphone_remote_pts_us = Some(remote_pts_us); true } _ => false, } } impl Default for UpstreamMediaRuntime { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests;