#![forbid(unsafe_code)] use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::info; mod config; mod state; use config::{ apply_playout_offset, upstream_pairing_master_slack, upstream_playout_delay, upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_timing_trace_enabled, }; use state::UpstreamClockState; /// Logical upstream media kinds that share one live-call session timeline. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamMediaKind { /// Webcam uplink frames destined for the UVC/HDMI sink path. Camera, /// Microphone uplink packets destined for the UAC sink path. Microphone, } /// Lease returned when one upstream media stream becomes the active owner. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct UpstreamStreamLease { /// Shared session id for the current upstream live-call window. pub session_id: u64, /// Per-kind generation used to supersede older streams of the same kind. pub generation: u64, } /// One rebased upstream packet plus its planned server playout time. #[derive(Clone, Copy, Debug)] pub struct PlannedUpstreamPacket { /// Session-local packet timestamp after rebase onto the shared server clock. pub local_pts_us: u64, /// Wall-clock deadline when the server should present this packet. pub due_at: Instant, /// How late the packet already is when planned, if any. pub late_by: Duration, } /// Result of asking the shared upstream runtime how to handle one packet. #[derive(Clone, Copy, Debug)] pub enum UpstreamPlanDecision { /// Hold the packet inside the local stream queue until the pairing window /// has enough cross-stream context to assign a trustworthy playout time. AwaitingPair, /// Discard the packet because it belongs before the shared overlapping A/V /// session base and would only reintroduce startup skew. DropBeforeOverlap, /// Present the packet at the planned wall-clock deadline. Play(PlannedUpstreamPacket), } /// Coordinate upstream stream ownership and keep audio/video on one timeline. /// /// Inputs: stream-open/close events plus remote packet timestamps. /// Outputs: active-stream leases and rebased local PTS values. /// Why: live calls need one current webcam owner, one current microphone owner, /// and one shared media clock so reconnects do not leave old sinks alive or let /// audio/video drift onto separate timing islands. #[derive(Debug)] pub struct UpstreamMediaRuntime { next_session_id: AtomicU64, next_camera_generation: AtomicU64, next_microphone_generation: AtomicU64, microphone_sink_gate: Arc, pairing_state_notify: Arc, audio_progress_notify: Arc, state: Mutex, } impl UpstreamMediaRuntime { /// Build an empty upstream runtime. #[must_use] pub fn new() -> Self { Self { next_session_id: AtomicU64::new(0), next_camera_generation: AtomicU64::new(0), next_microphone_generation: AtomicU64::new(0), microphone_sink_gate: Arc::new(Semaphore::new(1)), pairing_state_notify: Arc::new(Notify::new()), audio_progress_notify: Arc::new(Notify::new()), state: Mutex::new(UpstreamClockState::default()), } } /// Activate a camera stream as the current owner for the session. #[must_use] pub fn activate_camera(&self) -> UpstreamStreamLease { self.activate(UpstreamMediaKind::Camera) } /// Activate a microphone stream as the current owner for the session. #[must_use] pub fn activate_microphone(&self) -> UpstreamStreamLease { self.activate(UpstreamMediaKind::Microphone) } /// Reserve the single live microphone sink slot for one generation. /// /// Inputs: the microphone lease generation that wants to own the UAC sink. /// Outputs: an owned semaphore permit while that generation still owns the /// microphone slot, or `None` if a newer stream superseded it before the /// previous sink fully stood down. /// Why: ALSA only allows one live owner of the UAC playback device, so a /// replacement stream must wait for the old owner to release the sink /// before opening a new playback pipeline. pub async fn reserve_microphone_sink(&self, generation: u64) -> Option { let permit = self .microphone_sink_gate .clone() .acquire_owned() .await .ok()?; self.is_microphone_active(generation).then_some(permit) } fn activate(&self, kind: UpstreamMediaKind) -> UpstreamStreamLease { let generation = match kind { UpstreamMediaKind::Camera => { self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1 } UpstreamMediaKind::Microphone => { self.next_microphone_generation .fetch_add(1, Ordering::SeqCst) + 1 } }; let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; state.first_camera_remote_pts_us = None; state.first_microphone_remote_pts_us = None; state.session_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; state.playout_epoch = None; state.pairing_anchor_deadline = None; state.catastrophic_reanchor_done = false; } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), UpstreamMediaKind::Microphone => state.active_microphone_generation = Some(generation), } UpstreamStreamLease { session_id: state.session_id, generation, } } /// Return whether the supplied camera lease is still the active owner. #[must_use] pub fn is_camera_active(&self, generation: u64) -> bool { self.is_active(UpstreamMediaKind::Camera, generation) } /// Return whether the supplied microphone lease is still the active owner. #[must_use] pub fn is_microphone_active(&self, generation: u64) -> bool { self.is_active(UpstreamMediaKind::Microphone, generation) } fn is_active(&self, kind: UpstreamMediaKind, generation: u64) -> bool { let state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera => state.active_camera_generation == Some(generation), UpstreamMediaKind::Microphone => state.active_microphone_generation == Some(generation), } } /// Mark a camera stream as closed if it still owns the camera slot. pub fn close_camera(&self, generation: u64) { self.close(UpstreamMediaKind::Camera, generation); } /// Mark a microphone stream as closed if it still owns the microphone slot. pub fn close_microphone(&self, generation: u64) { self.close(UpstreamMediaKind::Microphone, generation); } fn close(&self, kind: UpstreamMediaKind, generation: u64) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera if state.active_camera_generation == Some(generation) => { state.active_camera_generation = None; } UpstreamMediaKind::Microphone if state.active_microphone_generation == Some(generation) => { state.active_microphone_generation = None; } _ => return, } if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.first_camera_remote_pts_us = None; state.first_microphone_remote_pts_us = None; state.session_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; state.playout_epoch = None; state.pairing_anchor_deadline = None; state.catastrophic_reanchor_done = false; } self.pairing_state_notify.notify_waiters(); self.audio_progress_notify.notify_waiters(); } /// Rebase one upstream video packet timestamp onto the shared session clock. #[must_use] pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option { match self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } /// Rebase one upstream audio packet timestamp onto the shared session clock. #[must_use] pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option { match self.plan_audio_pts(remote_pts_us) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } /// Rebase and schedule one upstream video packet on the shared playout epoch. #[must_use] pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision { self.plan_pts( UpstreamMediaKind::Camera, remote_pts_us, frame_step_us.max(1), ) } /// Rebase and schedule one upstream audio packet on the shared playout epoch. #[must_use] pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision { self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } /// Hold video until the audio master has at least reached the same capture /// moment, or give up once the frame can no longer be shown fresh. pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { let slack_us = upstream_pairing_master_slack() .as_micros() .min(u64::MAX as u128) as u64; loop { let notified = self.audio_progress_notify.notified(); { let state = self .state .lock() .expect("upstream media state mutex poisoned"); if state.active_microphone_generation.is_none() { return true; } if state.last_audio_local_pts_us.is_some_and(|audio_pts_us| { audio_pts_us.saturating_add(slack_us) >= video_local_pts_us }) { return true; } } if Instant::now() >= due_at { return false; } tokio::select! { _ = notified => {} _ = tokio::time::sleep_until(due_at) => return false, } } } fn plan_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, ) -> UpstreamPlanDecision { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); let session_id = state.session_id; let packet_count = match kind { UpstreamMediaKind::Camera => { state.camera_packet_count = state.camera_packet_count.saturating_add(1); state.camera_packet_count } UpstreamMediaKind::Microphone => { state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); state.microphone_packet_count } }; let first_slot = match kind { UpstreamMediaKind::Camera => &mut state.first_camera_remote_pts_us, UpstreamMediaKind::Microphone => &mut state.first_microphone_remote_pts_us, }; let first_remote_for_kind = *first_slot.get_or_insert(remote_pts_us); let now = Instant::now(); let pairing_deadline = *state .pairing_anchor_deadline .get_or_insert_with(|| now + upstream_playout_delay()); if state.session_base_remote_pts_us.is_none() { if state.first_camera_remote_pts_us.is_some() && state.first_microphone_remote_pts_us.is_some() { let first_camera_remote_pts_us = state.first_camera_remote_pts_us.unwrap_or_default(); let first_microphone_remote_pts_us = state.first_microphone_remote_pts_us.unwrap_or_default(); state.session_base_remote_pts_us = Some(first_camera_remote_pts_us.max(first_microphone_remote_pts_us)); state.playout_epoch = Some(pairing_deadline); if !state.startup_anchor_logged { let startup_delta_us = first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128; info!( session_id, first_camera_remote_pts_us, first_microphone_remote_pts_us, overlap_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or_default(), startup_delta_us, "upstream media overlap anchors established" ); state.startup_anchor_logged = true; } self.pairing_state_notify.notify_waiters(); } else if now < pairing_deadline { if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { info!( session_id, ?kind, packet_count, remote_pts_us, wait_ms = pairing_deadline.saturating_duration_since(now).as_millis(), "upstream media packet buffered while awaiting the counterpart stream" ); } return UpstreamPlanDecision::AwaitingPair; } else { let single_stream_base_remote_pts_us = match kind { UpstreamMediaKind::Camera => { state.first_camera_remote_pts_us.unwrap_or(remote_pts_us) } UpstreamMediaKind::Microphone => state .first_microphone_remote_pts_us .unwrap_or(remote_pts_us), }; state.session_base_remote_pts_us = Some(single_stream_base_remote_pts_us); state.playout_epoch = Some(pairing_deadline); info!( session_id, ?kind, single_stream_base_remote_pts_us, "upstream media pairing window expired; continuing with one-sided playout" ); self.pairing_state_notify.notify_waiters(); } } let session_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or(remote_pts_us); if remote_pts_us < session_base_remote_pts_us { if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { info!( session_id, ?kind, packet_count, remote_pts_us, session_base_remote_pts_us, "upstream media packet dropped before the shared overlap base" ); } return UpstreamPlanDecision::DropBeforeOverlap; } let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, }; if let Some(last_pts_us) = *last_slot && local_pts_us <= last_pts_us { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); let sink_offset_us = upstream_playout_offset_us(kind); let playout_delay = upstream_playout_delay(); let mut due_at = apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); if !state.catastrophic_reanchor_done && late_by > reanchor_threshold { let old_late_by = late_by; let desired_due_at = now + playout_delay; let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); let recovered_epoch = unoffset_due_at .checked_sub(Duration::from_micros(local_pts_us)) .unwrap_or(unoffset_due_at); state.playout_epoch = Some(recovered_epoch); state.pairing_anchor_deadline = Some(desired_due_at); state.catastrophic_reanchor_done = true; due_at = apply_playout_offset( recovered_epoch + Duration::from_micros(local_pts_us), sink_offset_us, ); late_by = now.checked_duration_since(due_at).unwrap_or_default(); info!( session_id, ?kind, packet_count, local_pts_us, remote_pts_us, old_late_by_ms = old_late_by.as_millis(), recovery_buffer_ms = playout_delay.as_millis(), reanchor_threshold_ms = reanchor_threshold.as_millis(), "upstream media playout epoch reanchored after catastrophic lateness" ); } if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { let playout_delay_us = due_at.saturating_duration_since(now).as_micros(); let late_by_us = late_by.as_micros(); info!( session_id, ?kind, packet_count, remote_pts_us, session_base_remote_pts_us, first_remote_for_kind, remote_elapsed_us = remote_pts_us.saturating_sub(session_base_remote_pts_us), local_pts_us, playout_delay_us, sink_offset_us, late_by_us, "upstream media rebase sample" ); } if kind == UpstreamMediaKind::Microphone { self.audio_progress_notify.notify_waiters(); } UpstreamPlanDecision::Play(PlannedUpstreamPacket { local_pts_us, due_at, late_by, }) } } #[cfg(test)] mod tests;