#![forbid(unsafe_code)] use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::info; /// Logical upstream media kinds that share one live-call session timeline. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamMediaKind { /// Webcam uplink frames destined for the UVC/HDMI sink path. Camera, /// Microphone uplink packets destined for the UAC sink path. Microphone, } /// Lease returned when one upstream media stream becomes the active owner. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct UpstreamStreamLease { /// Shared session id for the current upstream live-call window. pub session_id: u64, /// Per-kind generation used to supersede older streams of the same kind. pub generation: u64, } /// One rebased upstream packet plus its planned server playout time. #[derive(Clone, Copy, Debug)] pub struct PlannedUpstreamPacket { /// Session-local packet timestamp after rebase onto the shared server clock. pub local_pts_us: u64, /// Wall-clock deadline when the server should present this packet. pub due_at: Instant, /// How late the packet already is when planned, if any. pub late_by: Duration, } #[derive(Debug, Default)] struct UpstreamClockState { session_id: u64, active_camera_generation: Option, active_microphone_generation: Option, camera_base_remote_pts_us: Option, microphone_base_remote_pts_us: Option, last_video_local_pts_us: Option, last_audio_local_pts_us: Option, camera_packet_count: u64, microphone_packet_count: u64, startup_anchor_logged: bool, playout_epoch: Option, } fn upstream_timing_trace_enabled() -> bool { std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE") .ok() .map(|value| { let trimmed = value.trim(); !(trimmed.eq_ignore_ascii_case("0") || trimmed.eq_ignore_ascii_case("false") || trimmed.eq_ignore_ascii_case("no") || trimmed.eq_ignore_ascii_case("off")) }) .unwrap_or(false) } fn upstream_playout_delay() -> Duration { let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS") .ok() .and_then(|value| value.trim().parse::().ok()) .unwrap_or(1_000); Duration::from_millis(delay_ms) } fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 { let name = match kind { UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", }; std::env::var(name) .ok() .and_then(|value| value.trim().parse::().ok()) .unwrap_or(0) } fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant { if offset_us >= 0 { base + Duration::from_micros(offset_us as u64) } else { base.checked_sub(Duration::from_micros(offset_us.unsigned_abs())) .unwrap_or(base) } } /// Coordinate upstream stream ownership and keep audio/video on one timeline. /// /// Inputs: stream-open/close events plus remote packet timestamps. /// Outputs: active-stream leases and rebased local PTS values. /// Why: live calls need one current webcam owner, one current microphone owner, /// and one shared media clock so reconnects do not leave old sinks alive or let /// audio/video drift onto separate timing islands. #[derive(Debug)] pub struct UpstreamMediaRuntime { next_session_id: AtomicU64, next_camera_generation: AtomicU64, next_microphone_generation: AtomicU64, microphone_sink_gate: Arc, state: Mutex, } impl UpstreamMediaRuntime { /// Build an empty upstream runtime. #[must_use] pub fn new() -> Self { Self { next_session_id: AtomicU64::new(0), next_camera_generation: AtomicU64::new(0), next_microphone_generation: AtomicU64::new(0), microphone_sink_gate: Arc::new(Semaphore::new(1)), state: Mutex::new(UpstreamClockState::default()), } } /// Activate a camera stream as the current owner for the session. #[must_use] pub fn activate_camera(&self) -> UpstreamStreamLease { self.activate(UpstreamMediaKind::Camera) } /// Activate a microphone stream as the current owner for the session. #[must_use] pub fn activate_microphone(&self) -> UpstreamStreamLease { self.activate(UpstreamMediaKind::Microphone) } /// Reserve the single live microphone sink slot for one generation. /// /// Inputs: the microphone lease generation that wants to own the UAC sink. /// Outputs: an owned semaphore permit while that generation still owns the /// microphone slot, or `None` if a newer stream superseded it before the /// previous sink fully stood down. /// Why: ALSA only allows one live owner of the UAC playback device, so a /// replacement stream must wait for the old owner to release the sink /// before opening a new playback pipeline. pub async fn reserve_microphone_sink(&self, generation: u64) -> Option { let permit = self .microphone_sink_gate .clone() .acquire_owned() .await .ok()?; self.is_microphone_active(generation).then_some(permit) } fn activate(&self, kind: UpstreamMediaKind) -> UpstreamStreamLease { let generation = match kind { UpstreamMediaKind::Camera => { self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1 } UpstreamMediaKind::Microphone => { self.next_microphone_generation .fetch_add(1, Ordering::SeqCst) + 1 } }; let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; state.camera_base_remote_pts_us = None; state.microphone_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; state.playout_epoch = None; } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), UpstreamMediaKind::Microphone => state.active_microphone_generation = Some(generation), } UpstreamStreamLease { session_id: state.session_id, generation, } } /// Return whether the supplied camera lease is still the active owner. #[must_use] pub fn is_camera_active(&self, generation: u64) -> bool { self.is_active(UpstreamMediaKind::Camera, generation) } /// Return whether the supplied microphone lease is still the active owner. #[must_use] pub fn is_microphone_active(&self, generation: u64) -> bool { self.is_active(UpstreamMediaKind::Microphone, generation) } fn is_active(&self, kind: UpstreamMediaKind, generation: u64) -> bool { let state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera => state.active_camera_generation == Some(generation), UpstreamMediaKind::Microphone => state.active_microphone_generation == Some(generation), } } /// Mark a camera stream as closed if it still owns the camera slot. pub fn close_camera(&self, generation: u64) { self.close(UpstreamMediaKind::Camera, generation); } /// Mark a microphone stream as closed if it still owns the microphone slot. pub fn close_microphone(&self, generation: u64) { self.close(UpstreamMediaKind::Microphone, generation); } fn close(&self, kind: UpstreamMediaKind, generation: u64) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera if state.active_camera_generation == Some(generation) => { state.active_camera_generation = None; } UpstreamMediaKind::Microphone if state.active_microphone_generation == Some(generation) => { state.active_microphone_generation = None; } _ => return, } if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.camera_base_remote_pts_us = None; state.microphone_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; state.playout_epoch = None; } } /// Rebase one upstream video packet timestamp onto the shared session clock. #[must_use] pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> u64 { self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) .local_pts_us } /// Rebase one upstream audio packet timestamp onto the shared session clock. #[must_use] pub fn map_audio_pts(&self, remote_pts_us: u64) -> u64 { self.plan_audio_pts(remote_pts_us).local_pts_us } /// Rebase and schedule one upstream video packet on the shared playout epoch. #[must_use] pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> PlannedUpstreamPacket { self.plan_pts( UpstreamMediaKind::Camera, remote_pts_us, frame_step_us.max(1), ) } /// Rebase and schedule one upstream audio packet on the shared playout epoch. #[must_use] pub fn plan_audio_pts(&self, remote_pts_us: u64) -> PlannedUpstreamPacket { self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } fn plan_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, ) -> PlannedUpstreamPacket { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); let session_id = state.session_id; let packet_count = match kind { UpstreamMediaKind::Camera => { state.camera_packet_count = state.camera_packet_count.saturating_add(1); state.camera_packet_count } UpstreamMediaKind::Microphone => { state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); state.microphone_packet_count } }; let base_slot = match kind { UpstreamMediaKind::Camera => &mut state.camera_base_remote_pts_us, UpstreamMediaKind::Microphone => &mut state.microphone_base_remote_pts_us, }; let first_remote_for_kind = *base_slot.get_or_insert(remote_pts_us); if !state.startup_anchor_logged && state.camera_base_remote_pts_us.is_some() && state.microphone_base_remote_pts_us.is_some() { let camera_base_remote_pts_us = state.camera_base_remote_pts_us.unwrap_or_default(); let microphone_base_remote_pts_us = state.microphone_base_remote_pts_us.unwrap_or_default(); let startup_delta_us = camera_base_remote_pts_us as i128 - microphone_base_remote_pts_us as i128; info!( session_id, camera_base_remote_pts_us, microphone_base_remote_pts_us, startup_delta_us, "upstream media session anchors observed" ); state.startup_anchor_logged = true; } let mut local_pts_us = remote_pts_us.saturating_sub(first_remote_for_kind); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, }; if let Some(last_pts_us) = *last_slot && local_pts_us <= last_pts_us { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); let now = Instant::now(); let epoch = *state .playout_epoch .get_or_insert_with(|| now + upstream_playout_delay()); let sink_offset_us = upstream_playout_offset_us(kind); let due_at = apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); let late_by = now.checked_duration_since(due_at).unwrap_or_default(); if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { let remote_elapsed_us = remote_pts_us.saturating_sub(first_remote_for_kind); let playout_delay_us = epoch.saturating_duration_since(now).as_micros(); let late_by_us = late_by.as_micros(); info!( session_id, ?kind, packet_count, remote_pts_us, first_remote_for_kind, remote_elapsed_us, local_pts_us, playout_delay_us, sink_offset_us, late_by_us, "upstream media rebase sample" ); } PlannedUpstreamPacket { local_pts_us, due_at, late_by, } } } #[cfg(test)] mod tests;