impl UpstreamMediaRuntime { #[must_use] /// Keeps `snapshot` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. pub fn snapshot(&self) -> UpstreamPlannerSnapshot { let state = self .state .lock() .expect("upstream media state mutex poisoned"); let now = Instant::now(); UpstreamPlannerSnapshot { session_id: state.session_id, phase: state.phase.as_str(), latest_camera_remote_pts_us: state.latest_camera_remote_pts_us, latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us, last_video_presented_pts_us: state.last_video_presented_pts_us, last_audio_presented_pts_us: state.last_audio_presented_pts_us, live_lag_ms: live_lag_ms(&state), planner_skew_ms: planner_skew_ms(&state), stale_audio_drops: state.stale_audio_drops, stale_video_drops: state.stale_video_drops, skew_video_drops: state.skew_video_drops, freshness_reanchors: state.freshness_reanchors, startup_timeouts: state.startup_timeouts, video_freezes: state.video_freezes, last_reason: state.last_reason.clone(), client_capture_skew_ms: state.latest_paired_client_capture_skew_ms, client_send_skew_ms: state.latest_paired_client_send_skew_ms, server_receive_skew_ms: state.latest_paired_server_receive_skew_ms, camera_client_queue_age_ms: state .latest_camera_timing .map(|sample| f64::from(sample.queue_age_ms)), microphone_client_queue_age_ms: state .latest_microphone_timing .map(|sample| f64::from(sample.queue_age_ms)), camera_server_receive_age_ms: state .latest_camera_timing .map(|sample| age_ms(now, sample.received_at)), microphone_server_receive_age_ms: state .latest_microphone_timing .map(|sample| age_ms(now, sample.received_at)), client_capture_abs_skew_p95_ms: state.client_capture_skew_window_ms.p95_abs(), client_send_abs_skew_p95_ms: state.client_send_skew_window_ms.p95_abs(), server_receive_abs_skew_p95_ms: state.server_receive_skew_window_ms.p95_abs(), camera_client_queue_age_p95_ms: state.camera_client_queue_age_window_ms.p95(), microphone_client_queue_age_p95_ms: state.microphone_client_queue_age_window_ms.p95(), sink_handoff_skew_ms: latest_sink_handoff_skew_ms(&state), sink_handoff_abs_skew_p95_ms: state.sink_handoff_skew_window_ms.p95_abs(), camera_sink_late_ms: state.latest_camera_presentation.map(presentation_late_ms), microphone_sink_late_ms: state .latest_microphone_presentation .map(presentation_late_ms), camera_sink_late_p95_ms: state.camera_sink_late_window_ms.p95(), microphone_sink_late_p95_ms: state.microphone_sink_late_window_ms.p95(), client_timing_window_samples: state.client_capture_skew_window_ms.len() as u64, sink_handoff_window_samples: state.sink_handoff_skew_window_ms.len() as u64, } } #[must_use] /// Keeps `map_video_pts` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option { match self.plan_video_pts(remote_pts_us, frame_step_us) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } #[must_use] /// Keeps `map_audio_pts` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. /// Inputs are the typed parameters; output is the return value or side effect. pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option { match self.plan_audio_pts(remote_pts_us) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } #[must_use] pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision { self.plan_legacy_pts( UpstreamMediaKind::Camera, remote_pts_us, frame_step_us.max(1), ) } #[must_use] pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision { self.plan_legacy_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } #[must_use] pub fn plan_bundled_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, bundle_base_remote_pts_us: u64, bundle_epoch: Instant, ) -> UpstreamPlanDecision { self.plan_rebased_pts( kind, remote_pts_us, min_step_us.max(1), Some(bundle_base_remote_pts_us), Some(bundle_epoch), ) } pub async fn wait_for_audio_master(&self, _video_local_pts_us: u64, _due_at: Instant) -> bool { true } }