#![forbid(unsafe_code)] use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; /// Logical upstream media kinds that share one live-call session timeline. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamMediaKind { /// Webcam uplink frames destined for the UVC/HDMI sink path. Camera, /// Microphone uplink packets destined for the UAC sink path. Microphone, } /// Lease returned when one upstream media stream becomes the active owner. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct UpstreamStreamLease { /// Shared session id for the current upstream live-call window. pub session_id: u64, /// Per-kind generation used to supersede older streams of the same kind. pub generation: u64, } #[derive(Debug, Default)] struct UpstreamClockState { session_id: u64, active_camera_generation: Option, active_microphone_generation: Option, base_remote_pts_us: Option, last_video_local_pts_us: Option, last_audio_local_pts_us: Option, } /// Coordinate upstream stream ownership and keep audio/video on one timeline. /// /// Inputs: stream-open/close events plus remote packet timestamps. /// Outputs: active-stream leases and rebased local PTS values. /// Why: live calls need one current webcam owner, one current microphone owner, /// and one shared media clock so reconnects do not leave old sinks alive or let /// audio/video drift onto separate timing islands. #[derive(Debug)] pub struct UpstreamMediaRuntime { next_session_id: AtomicU64, next_camera_generation: AtomicU64, next_microphone_generation: AtomicU64, microphone_sink_gate: Arc, state: Mutex, } impl UpstreamMediaRuntime { /// Build an empty upstream runtime. #[must_use] pub fn new() -> Self { Self { next_session_id: AtomicU64::new(0), next_camera_generation: AtomicU64::new(0), next_microphone_generation: AtomicU64::new(0), microphone_sink_gate: Arc::new(Semaphore::new(1)), state: Mutex::new(UpstreamClockState::default()), } } /// Activate a camera stream as the current owner for the session. #[must_use] pub fn activate_camera(&self) -> UpstreamStreamLease { self.activate(UpstreamMediaKind::Camera) } /// Activate a microphone stream as the current owner for the session. #[must_use] pub fn activate_microphone(&self) -> UpstreamStreamLease { self.activate(UpstreamMediaKind::Microphone) } /// Reserve the single live microphone sink slot for one generation. /// /// Inputs: the microphone lease generation that wants to own the UAC sink. /// Outputs: an owned semaphore permit while that generation still owns the /// microphone slot, or `None` if a newer stream superseded it before the /// previous sink fully stood down. /// Why: ALSA only allows one live owner of the UAC playback device, so a /// replacement stream must wait for the old owner to release the sink /// before opening a new playback pipeline. pub async fn reserve_microphone_sink(&self, generation: u64) -> Option { let permit = self .microphone_sink_gate .clone() .acquire_owned() .await .ok()?; self.is_microphone_active(generation).then_some(permit) } fn activate(&self, kind: UpstreamMediaKind) -> UpstreamStreamLease { let generation = match kind { UpstreamMediaKind::Camera => { self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1 } UpstreamMediaKind::Microphone => { self.next_microphone_generation .fetch_add(1, Ordering::SeqCst) + 1 } }; let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; state.base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), UpstreamMediaKind::Microphone => state.active_microphone_generation = Some(generation), } UpstreamStreamLease { session_id: state.session_id, generation, } } /// Return whether the supplied camera lease is still the active owner. #[must_use] pub fn is_camera_active(&self, generation: u64) -> bool { self.is_active(UpstreamMediaKind::Camera, generation) } /// Return whether the supplied microphone lease is still the active owner. #[must_use] pub fn is_microphone_active(&self, generation: u64) -> bool { self.is_active(UpstreamMediaKind::Microphone, generation) } fn is_active(&self, kind: UpstreamMediaKind, generation: u64) -> bool { let state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera => state.active_camera_generation == Some(generation), UpstreamMediaKind::Microphone => state.active_microphone_generation == Some(generation), } } /// Mark a camera stream as closed if it still owns the camera slot. pub fn close_camera(&self, generation: u64) { self.close(UpstreamMediaKind::Camera, generation); } /// Mark a microphone stream as closed if it still owns the microphone slot. pub fn close_microphone(&self, generation: u64) { self.close(UpstreamMediaKind::Microphone, generation); } fn close(&self, kind: UpstreamMediaKind, generation: u64) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera if state.active_camera_generation == Some(generation) => { state.active_camera_generation = None; } UpstreamMediaKind::Microphone if state.active_microphone_generation == Some(generation) => { state.active_microphone_generation = None; } _ => return, } if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; } } /// Rebase one upstream video packet timestamp onto the shared session clock. #[must_use] pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> u64 { self.map_pts( UpstreamMediaKind::Camera, remote_pts_us, frame_step_us.max(1), ) } /// Rebase one upstream audio packet timestamp onto the shared session clock. #[must_use] pub fn map_audio_pts(&self, remote_pts_us: u64) -> u64 { self.map_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } fn map_pts(&self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64) -> u64 { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); let base_remote = *state.base_remote_pts_us.get_or_insert(remote_pts_us); let mut local_pts_us = remote_pts_us.saturating_sub(base_remote); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, }; if let Some(last_pts_us) = *last_slot && local_pts_us <= last_pts_us { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); local_pts_us } } #[cfg(test)] mod tests { use super::{UpstreamMediaKind, UpstreamMediaRuntime}; use std::sync::Arc; use std::time::Duration; #[test] fn first_stream_starts_a_new_shared_session() { let runtime = UpstreamMediaRuntime::new(); let camera = runtime.activate_camera(); let microphone = runtime.activate_microphone(); assert_eq!(camera.session_id, 1); assert_eq!(microphone.session_id, 1); assert!(runtime.is_camera_active(camera.generation)); assert!(runtime.is_microphone_active(microphone.generation)); } #[test] fn replacing_one_kind_keeps_the_session_but_preempts_the_old_owner() { let runtime = UpstreamMediaRuntime::new(); let first = runtime.activate_microphone(); let second = runtime.activate_microphone(); assert_eq!(first.session_id, second.session_id); assert!(!runtime.is_microphone_active(first.generation)); assert!(runtime.is_microphone_active(second.generation)); } #[test] fn closing_the_last_stream_resets_the_next_session_anchor() { let runtime = UpstreamMediaRuntime::new(); let camera = runtime.activate_camera(); let microphone = runtime.activate_microphone(); runtime.close_camera(camera.generation); runtime.close_microphone(microphone.generation); let next = runtime.activate_camera(); assert_eq!(next.session_id, 2); } #[test] fn shared_clock_rebases_audio_and_video_against_the_same_origin() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); let video_first = runtime.map_video_pts(1_000_000, 16_666); let audio_first = runtime.map_audio_pts(1_000_000); let audio_next = runtime.map_audio_pts(1_010_000); let video_next = runtime.map_video_pts(1_033_333, 16_666); assert_eq!(video_first, 0); assert_eq!(audio_first, 0); assert_eq!(audio_next, 10_000); assert_eq!(video_next, 33_333); } #[test] fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let first = runtime.map_video_pts(50_000, 16_666); let repeated = runtime.map_video_pts(50_000, 16_666); assert_eq!(first, 0); assert_eq!(repeated, 16_666); } #[test] fn close_ignores_superseded_generation_values() { let runtime = UpstreamMediaRuntime::new(); let first = runtime.activate_camera(); let second = runtime.activate_camera(); runtime.close_camera(first.generation); assert!(runtime.is_camera_active(second.generation)); runtime.close(UpstreamMediaKind::Camera, second.generation); let next = runtime.activate_camera(); assert_eq!(next.session_id, 2); } #[tokio::test(flavor = "current_thread")] async fn new_microphone_owner_waits_for_the_previous_sink_to_release() { let runtime = Arc::new(UpstreamMediaRuntime::new()); let first = runtime.activate_microphone(); let first_permit = runtime .reserve_microphone_sink(first.generation) .await .expect("first owner should acquire the sink gate"); let second = runtime.activate_microphone(); let waiter = tokio::spawn({ let runtime = runtime.clone(); async move { runtime .reserve_microphone_sink(second.generation) .await .is_some() } }); tokio::time::sleep(Duration::from_millis(25)).await; assert!(!waiter.is_finished()); drop(first_permit); assert!(waiter.await.expect("waiter task should finish")); } #[tokio::test(flavor = "current_thread")] async fn superseded_microphone_waiter_stands_down_before_opening_a_sink() { let runtime = Arc::new(UpstreamMediaRuntime::new()); let first = runtime.activate_microphone(); let first_permit = runtime .reserve_microphone_sink(first.generation) .await .expect("first owner should acquire the sink gate"); let second = runtime.activate_microphone(); let superseded_waiter = tokio::spawn({ let runtime = runtime.clone(); async move { runtime .reserve_microphone_sink(second.generation) .await .is_some() } }); tokio::time::sleep(Duration::from_millis(25)).await; let third = runtime.activate_microphone(); drop(first_permit); assert!( !superseded_waiter .await .expect("superseded waiter task should finish"), "older waiter should stand down instead of opening a sink after supersession" ); let third_permit = runtime .reserve_microphone_sink(third.generation) .await .expect("latest owner should acquire the sink gate"); drop(third_permit); } }