impl UpstreamMediaRuntime { /// Keeps `activate` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. fn activate(&self, kind: UpstreamMediaKind) -> UpstreamStreamLease { let generation = match kind { UpstreamMediaKind::Camera => { self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1 } UpstreamMediaKind::Microphone => { self.next_microphone_generation .fetch_add(1, Ordering::SeqCst) + 1 } }; let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; reset_session_state(&mut state); state.session_started_at = Some(Instant::now()); state.phase = UpstreamSyncPhase::Acquiring; state.last_reason = "v2 upstream session acquiring media".to_string(); } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), UpstreamMediaKind::Microphone => state.active_microphone_generation = Some(generation), } UpstreamStreamLease { session_id: state.session_id, generation, } } /// Starts a bundled camera+microphone generation with a fresh shared playout epoch. fn activate_bundled(&self) -> UpstreamBundledLeases { let camera_generation = self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1; let microphone_generation = self.next_microphone_generation .fetch_add(1, Ordering::SeqCst) + 1; let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; reset_session_state(&mut state); state.session_started_at = Some(Instant::now()); state.phase = UpstreamSyncPhase::Acquiring; state.last_reason = "v2 bundled upstream session acquiring media".to_string(); state.active_camera_generation = Some(camera_generation); state.active_microphone_generation = Some(microphone_generation); UpstreamBundledLeases { camera: UpstreamStreamLease { session_id: state.session_id, generation: camera_generation, }, microphone: UpstreamStreamLease { session_id: state.session_id, generation: microphone_generation, }, } } /// Keeps `is_active` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn is_active(&self, kind: UpstreamMediaKind, generation: u64) -> bool { let state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera => state.active_camera_generation == Some(generation), UpstreamMediaKind::Microphone => state.active_microphone_generation == Some(generation), } } /// Keeps `close` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn close(&self, kind: UpstreamMediaKind, generation: u64) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera if state.active_camera_generation == Some(generation) => { state.active_camera_generation = None } UpstreamMediaKind::Microphone if state.active_microphone_generation == Some(generation) => { state.active_microphone_generation = None } _ => return, } if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { reset_session_state(&mut state); } } fn plan_legacy_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, ) -> UpstreamPlanDecision { self.plan_rebased_pts(kind, remote_pts_us, min_step_us.max(1), None, None) } /// Keeps `plan_rebased_pts` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn plan_rebased_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, explicit_base: Option, explicit_epoch: Option, ) -> UpstreamPlanDecision { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); match kind { UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us = Some(remote_pts_us), UpstreamMediaKind::Microphone => { state.latest_microphone_remote_pts_us = Some(remote_pts_us) } } let base = match explicit_base { Some(base) => *state.base_remote_pts_us.get_or_insert(base), None => *state.base_remote_pts_us.get_or_insert(remote_pts_us), }; let epoch = match explicit_epoch { Some(epoch) => *state.playout_epoch.get_or_insert(epoch), None => *state .playout_epoch .get_or_insert(Instant::now() + upstream_playout_delay()), }; let mut local_pts_us = remote_pts_us.saturating_sub(base); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, }; if let Some(last_pts_us) = *last_slot && local_pts_us <= last_pts_us { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); state.phase = UpstreamSyncPhase::Syncing; state.last_reason = "v2 legacy packet mapped without cross-stream planner".to_string(); let due_at = apply_offset( epoch + Duration::from_micros(local_pts_us), self.playout_offset_us(kind), ); let late_by = Instant::now() .checked_duration_since(due_at) .unwrap_or_default(); UpstreamPlanDecision::Play(PlannedUpstreamPacket { local_pts_us, due_at, late_by, source_lag: Duration::ZERO, }) } /// Keeps `playout_offset_us` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { match kind { UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed), UpstreamMediaKind::Microphone => { self.microphone_playout_offset_us.load(Ordering::Relaxed) } } } }