#![forbid(unsafe_code)] use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::info; mod config; mod state; mod types; use config::{ apply_playout_offset, upstream_camera_startup_grace_us, upstream_pairing_master_slack, upstream_playout_delay, upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_reanchor_window_us, upstream_require_paired_startup, upstream_timing_trace_enabled, }; use state::UpstreamClockState; pub use types::{ PlannedUpstreamPacket, UpstreamMediaKind, UpstreamPlanDecision, UpstreamStreamLease, }; /// Coordinate upstream stream ownership and keep audio/video on one timeline. /// /// Inputs: stream-open/close events plus remote packet timestamps. /// Outputs: active-stream leases and rebased local PTS values. /// Why: live calls need one current webcam owner, one current microphone owner, /// and one shared media clock so reconnects do not leave old sinks alive or let /// audio/video drift onto separate timing islands. #[derive(Debug)] pub struct UpstreamMediaRuntime { next_session_id: AtomicU64, next_camera_generation: AtomicU64, next_microphone_generation: AtomicU64, microphone_sink_gate: Arc, pairing_state_notify: Arc, audio_progress_notify: Arc, camera_playout_offset_us: AtomicI64, microphone_playout_offset_us: AtomicI64, state: Mutex, } impl UpstreamMediaRuntime { /// Build an empty upstream runtime. #[must_use] pub fn new() -> Self { Self { next_session_id: AtomicU64::new(0), next_camera_generation: AtomicU64::new(0), next_microphone_generation: AtomicU64::new(0), microphone_sink_gate: Arc::new(Semaphore::new(1)), pairing_state_notify: Arc::new(Notify::new()), audio_progress_notify: Arc::new(Notify::new()), camera_playout_offset_us: AtomicI64::new(upstream_playout_offset_us( UpstreamMediaKind::Camera, )), microphone_playout_offset_us: AtomicI64::new(upstream_playout_offset_us( UpstreamMediaKind::Microphone, )), state: Mutex::new(UpstreamClockState::default()), } } /// Apply live upstream playout offsets without restarting the relay. pub fn set_playout_offsets(&self, camera_offset_us: i64, microphone_offset_us: i64) { self.camera_playout_offset_us .store(camera_offset_us, Ordering::Relaxed); self.microphone_playout_offset_us .store(microphone_offset_us, Ordering::Relaxed); } /// Return `(camera_offset_us, microphone_offset_us)` currently used for live playout. #[must_use] pub fn playout_offsets(&self) -> (i64, i64) { ( self.camera_playout_offset_us.load(Ordering::Relaxed), self.microphone_playout_offset_us.load(Ordering::Relaxed), ) } fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { match kind { UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed), UpstreamMediaKind::Microphone => { self.microphone_playout_offset_us.load(Ordering::Relaxed) } } } } include!("upstream_media_runtime/lease_lifecycle.rs"); impl UpstreamMediaRuntime { /// Rebase one upstream video packet timestamp onto the shared session clock. #[must_use] pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option { match self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } /// Rebase one upstream audio packet timestamp onto the shared session clock. #[must_use] pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option { match self.plan_audio_pts(remote_pts_us) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } /// Rebase and schedule one upstream video packet on the shared playout epoch. #[must_use] pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision { self.plan_pts( UpstreamMediaKind::Camera, remote_pts_us, frame_step_us.max(1), ) } /// Rebase and schedule one upstream audio packet on the shared playout epoch. #[must_use] pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision { self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } /// Hold video until the audio master has at least reached the same capture /// moment, or give up once the frame can no longer be shown fresh. pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { let slack_us = upstream_pairing_master_slack() .as_micros() .min(u64::MAX as u128) as u64; loop { let notified = self.audio_progress_notify.notified(); { let state = self .state .lock() .expect("upstream media state mutex poisoned"); if state.active_microphone_generation.is_none() { return true; } if state.last_audio_local_pts_us.is_some_and(|audio_pts_us| { audio_pts_us.saturating_add(slack_us) >= video_local_pts_us }) { return true; } } if Instant::now() >= due_at { return false; } tokio::select! { _ = notified => {} _ = tokio::time::sleep_until(due_at) => return false, } } } fn plan_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, ) -> UpstreamPlanDecision { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); let session_id = state.session_id; let packet_count = match kind { UpstreamMediaKind::Camera => { state.camera_packet_count = state.camera_packet_count.saturating_add(1); state.camera_packet_count } UpstreamMediaKind::Microphone => { state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); state.microphone_packet_count } }; let mut first_remote_for_kind = match kind { UpstreamMediaKind::Camera => { let first_slot = &mut state.first_camera_remote_pts_us; *first_slot.get_or_insert(remote_pts_us) } UpstreamMediaKind::Microphone => { let first_slot = &mut state.first_microphone_remote_pts_us; *first_slot.get_or_insert(remote_pts_us) } }; if kind == UpstreamMediaKind::Camera { let startup_grace_us = upstream_camera_startup_grace_us(); if !state.camera_startup_ready && (startup_grace_us == 0 || remote_pts_us.saturating_sub(first_remote_for_kind) >= startup_grace_us) { state.camera_startup_ready = true; state.first_camera_remote_pts_us = Some(remote_pts_us); first_remote_for_kind = remote_pts_us; } } let now = Instant::now(); let pairing_deadline = *state .pairing_anchor_deadline .get_or_insert_with(|| now + upstream_playout_delay()); let playout_delay = upstream_playout_delay(); if state.session_base_remote_pts_us.is_none() { if state.first_camera_remote_pts_us.is_some() && state.first_microphone_remote_pts_us.is_some() && state.camera_startup_ready { let first_camera_remote_pts_us = state.first_camera_remote_pts_us.unwrap_or_default(); let first_microphone_remote_pts_us = state.first_microphone_remote_pts_us.unwrap_or_default(); state.session_base_remote_pts_us = Some(first_camera_remote_pts_us.max(first_microphone_remote_pts_us)); let overlap_epoch = now + playout_delay; state.playout_epoch = Some(overlap_epoch); state.pairing_anchor_deadline = Some(overlap_epoch); if !state.startup_anchor_logged { let startup_delta_us = first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128; info!( session_id, first_camera_remote_pts_us, first_microphone_remote_pts_us, overlap_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or_default(), startup_delta_us, "upstream media overlap anchors established" ); state.startup_anchor_logged = true; } self.pairing_state_notify.notify_waiters(); } else if now < pairing_deadline { if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { info!( session_id, ?kind, packet_count, remote_pts_us, wait_ms = pairing_deadline.saturating_duration_since(now).as_millis(), "upstream media packet buffered while awaiting the counterpart stream" ); } return UpstreamPlanDecision::AwaitingPair; } else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready { if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { info!( session_id, ?kind, packet_count, remote_pts_us, "upstream media packet buffered while camera startup warm-up is still in progress" ); } return UpstreamPlanDecision::AwaitingPair; } else if upstream_require_paired_startup() { let refreshed = refresh_unpaired_pairing_anchor( &mut state, kind, remote_pts_us, now + playout_delay, ); if refreshed || upstream_timing_trace_enabled() { info!( session_id, ?kind, packet_count, remote_pts_us, refreshed_anchor = refreshed, healing_window_ms = playout_delay.as_millis(), "upstream media pairing window expired; holding one-sided stream for synced startup" ); } return UpstreamPlanDecision::AwaitingPair; } else { let single_stream_base_remote_pts_us = match kind { UpstreamMediaKind::Camera => { state.first_camera_remote_pts_us.unwrap_or(remote_pts_us) } UpstreamMediaKind::Microphone => state .first_microphone_remote_pts_us .unwrap_or(remote_pts_us), }; state.session_base_remote_pts_us = Some(single_stream_base_remote_pts_us); let one_sided_epoch = now + playout_delay; state.playout_epoch = Some(one_sided_epoch); state.pairing_anchor_deadline = Some(one_sided_epoch); info!( session_id, ?kind, single_stream_base_remote_pts_us, "upstream media pairing window expired; continuing with one-sided playout" ); self.pairing_state_notify.notify_waiters(); } } let session_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or(remote_pts_us); if remote_pts_us < session_base_remote_pts_us { if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { info!( session_id, ?kind, packet_count, remote_pts_us, session_base_remote_pts_us, "upstream media packet dropped before the shared overlap base" ); } return UpstreamPlanDecision::DropBeforeOverlap; } let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, }; if let Some(last_pts_us) = *last_slot && local_pts_us <= last_pts_us { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); let sink_offset_us = self.playout_offset_us(kind); let playout_delay = upstream_playout_delay(); let mut due_at = apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); let reanchor_window_us = upstream_reanchor_window_us(playout_delay); if !state.catastrophic_reanchor_done && local_pts_us <= reanchor_window_us && late_by > reanchor_threshold { let old_late_by = late_by; let desired_due_at = now + playout_delay; let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); let recovered_epoch = unoffset_due_at .checked_sub(Duration::from_micros(local_pts_us)) .unwrap_or(unoffset_due_at); state.playout_epoch = Some(recovered_epoch); state.pairing_anchor_deadline = Some(desired_due_at); state.catastrophic_reanchor_done = true; due_at = apply_playout_offset( recovered_epoch + Duration::from_micros(local_pts_us), sink_offset_us, ); late_by = now.checked_duration_since(due_at).unwrap_or_default(); info!( session_id, ?kind, packet_count, local_pts_us, remote_pts_us, old_late_by_ms = old_late_by.as_millis(), recovery_buffer_ms = playout_delay.as_millis(), reanchor_threshold_ms = reanchor_threshold.as_millis(), "upstream media playout epoch reanchored after catastrophic lateness" ); } if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { let playout_delay_us = due_at.saturating_duration_since(now).as_micros(); let late_by_us = late_by.as_micros(); info!( session_id, ?kind, packet_count, remote_pts_us, session_base_remote_pts_us, first_remote_for_kind, remote_elapsed_us = remote_pts_us.saturating_sub(session_base_remote_pts_us), local_pts_us, playout_delay_us, sink_offset_us, late_by_us, "upstream media rebase sample" ); } if kind == UpstreamMediaKind::Microphone { self.audio_progress_notify.notify_waiters(); } UpstreamPlanDecision::Play(PlannedUpstreamPacket { local_pts_us, due_at, late_by, }) } } fn refresh_unpaired_pairing_anchor( state: &mut UpstreamClockState, kind: UpstreamMediaKind, remote_pts_us: u64, next_deadline: Instant, ) -> bool { state.pairing_anchor_deadline = Some(next_deadline); match kind { UpstreamMediaKind::Camera if state.first_microphone_remote_pts_us.is_none() => { state.first_camera_remote_pts_us = Some(remote_pts_us); true } UpstreamMediaKind::Microphone if state.first_camera_remote_pts_us.is_none() => { state.first_microphone_remote_pts_us = Some(remote_pts_us); true } _ => false, } } impl Default for UpstreamMediaRuntime { fn default() -> Self { Self::new() } } #[cfg(test)] mod tests;