lesavka/quarantine/upstream-media-v1/server/src/upstream_media_runtime.rs

468 lines
18 KiB
Rust
Raw Normal View History

2026-05-03 12:22:33 -03:00
#![forbid(unsafe_code)]
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::info;
mod config;
mod state;
mod types;
use config::{
apply_playout_offset, upstream_audio_master_wait_grace, upstream_bundled_playout_delay,
upstream_bundled_playout_offset_override_us, upstream_camera_startup_grace_us,
upstream_max_live_lag, upstream_pairing_master_slack, upstream_playout_delay,
upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_require_paired_startup,
upstream_startup_timeout, upstream_timing_trace_enabled,
};
use state::{UpstreamClockState, UpstreamSyncPhase};
pub use types::{
PlannedUpstreamPacket, UpstreamClientTiming, UpstreamMediaKind, UpstreamPlanDecision,
UpstreamPlannerSnapshot, UpstreamStreamLease,
};
/// Coordinate upstream stream ownership and keep audio/video on one timeline.
///
/// Inputs: stream-open/close events plus remote packet timestamps.
/// Outputs: active-stream leases and rebased local PTS values.
/// Why: live calls need one current webcam owner, one current microphone owner,
/// and one shared media clock so reconnects do not leave old sinks alive or let
/// audio/video drift onto separate timing islands.
#[derive(Debug)]
pub struct UpstreamMediaRuntime {
next_session_id: AtomicU64,
next_camera_generation: AtomicU64,
next_microphone_generation: AtomicU64,
microphone_sink_gate: Arc<Semaphore>,
pairing_state_notify: Arc<Notify>,
audio_progress_notify: Arc<Notify>,
camera_playout_offset_us: AtomicI64,
microphone_playout_offset_us: AtomicI64,
state: Mutex<UpstreamClockState>,
}
impl UpstreamMediaRuntime {
/// Build an empty upstream runtime.
#[must_use]
pub fn new() -> Self {
Self {
next_session_id: AtomicU64::new(0),
next_camera_generation: AtomicU64::new(0),
next_microphone_generation: AtomicU64::new(0),
microphone_sink_gate: Arc::new(Semaphore::new(1)),
pairing_state_notify: Arc::new(Notify::new()),
audio_progress_notify: Arc::new(Notify::new()),
camera_playout_offset_us: AtomicI64::new(upstream_playout_offset_us(
UpstreamMediaKind::Camera,
)),
microphone_playout_offset_us: AtomicI64::new(upstream_playout_offset_us(
UpstreamMediaKind::Microphone,
)),
state: Mutex::new(UpstreamClockState::default()),
}
}
/// Apply live upstream playout offsets without restarting the relay.
pub fn set_playout_offsets(&self, camera_offset_us: i64, microphone_offset_us: i64) {
self.camera_playout_offset_us
.store(camera_offset_us, Ordering::Relaxed);
self.microphone_playout_offset_us
.store(microphone_offset_us, Ordering::Relaxed);
}
/// Return `(camera_offset_us, microphone_offset_us)` currently used for live playout.
#[must_use]
pub fn playout_offsets(&self) -> (i64, i64) {
(
self.camera_playout_offset_us.load(Ordering::Relaxed),
self.microphone_playout_offset_us.load(Ordering::Relaxed),
)
}
fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 {
match kind {
UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed),
UpstreamMediaKind::Microphone => {
self.microphone_playout_offset_us.load(Ordering::Relaxed)
}
}
}
fn positive_audio_delay_allowance_us(&self) -> u64 {
let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed);
let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed);
microphone_offset_us.saturating_sub(camera_offset_us).max(0) as u64
}
fn audio_ahead_video_allowance_us(&self) -> u64 {
let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed);
let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed);
camera_offset_us.saturating_sub(microphone_offset_us).max(0) as u64
}
fn intentional_future_wait_allowance_us(&self, kind: UpstreamMediaKind) -> u64 {
match kind {
UpstreamMediaKind::Camera => self.audio_ahead_video_allowance_us(),
UpstreamMediaKind::Microphone => self.positive_audio_delay_allowance_us(),
}
}
fn raw_bundled_playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 {
upstream_bundled_playout_offset_override_us(kind)
.unwrap_or_else(|| self.playout_offset_us(kind))
}
fn bundled_playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 {
self.raw_bundled_playout_offset_us(kind)
}
/// Mark one audio chunk as actually handed to the UAC sink.
pub fn mark_audio_presented(&self, local_pts_us: u64, due_at: Instant) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
record_presentation_sample(&mut state, UpstreamMediaKind::Microphone, due_at);
state.last_audio_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live;
state.last_reason = "audio-master playhead flowing".to_string();
}
self.audio_progress_notify.notify_waiters();
}
/// Record client-side timing facts for one packet as it arrives at the server.
pub fn record_client_timing(&self, kind: UpstreamMediaKind, timing: UpstreamClientTiming) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
let sample = state::UpstreamTimingSample {
capture_pts_us: timing.capture_pts_us,
send_pts_us: timing.send_pts_us,
queue_age_ms: timing.queue_age_ms,
received_at: Instant::now(),
};
match kind {
UpstreamMediaKind::Camera => {
state.latest_camera_timing = Some(sample);
push_timing_sample(&mut state.recent_camera_timing, sample);
state
.camera_client_queue_age_window_ms
.push(f64::from(timing.queue_age_ms));
}
UpstreamMediaKind::Microphone => {
state.latest_microphone_timing = Some(sample);
push_timing_sample(&mut state.recent_microphone_timing, sample);
state
.microphone_client_queue_age_window_ms
.push(f64::from(timing.queue_age_ms));
}
}
record_client_timing_windows(&mut state, kind, sample);
}
/// Mark one video frame as actually handed to the UVC/HDMI sink.
pub fn mark_video_presented(&self, local_pts_us: u64, due_at: Instant) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
record_presentation_sample(&mut state, UpstreamMediaKind::Camera, due_at);
state.last_video_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live;
state.last_reason = "video follower emitted a synced frame".to_string();
}
}
/// Record that video intentionally froze instead of showing an out-of-sync frame.
pub fn record_video_freeze(&self, reason: impl Into<String>) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
state.video_freezes = state.video_freezes.saturating_add(1);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Healing;
}
state.last_reason = reason.into();
}
/// Return current planner facts for diagnostics and probe artifacts.
#[must_use]
pub fn snapshot(&self) -> UpstreamPlannerSnapshot {
let state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
let live_lag_ms = live_lag_us(&state).map(us_to_ms);
let planner_skew_ms = match (
state.last_audio_presented_pts_us,
state.last_video_presented_pts_us,
) {
(Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0),
_ => None,
};
let now = Instant::now();
let client_capture_skew_ms = state.latest_paired_client_capture_skew_ms;
let client_send_skew_ms = state.latest_paired_client_send_skew_ms;
let server_receive_skew_ms = state.latest_paired_server_receive_skew_ms;
UpstreamPlannerSnapshot {
session_id: state.session_id,
phase: state.phase.as_str(),
latest_camera_remote_pts_us: state.latest_camera_remote_pts_us,
latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us,
last_video_presented_pts_us: state.last_video_presented_pts_us,
last_audio_presented_pts_us: state.last_audio_presented_pts_us,
live_lag_ms,
planner_skew_ms,
stale_audio_drops: state.stale_audio_drops,
stale_video_drops: state.stale_video_drops,
skew_video_drops: state.skew_video_drops,
freshness_reanchors: state.freshness_reanchors,
startup_timeouts: state.startup_timeouts,
video_freezes: state.video_freezes,
last_reason: state.last_reason.clone(),
client_capture_skew_ms,
client_send_skew_ms,
server_receive_skew_ms,
camera_client_queue_age_ms: state
.latest_camera_timing
.map(|sample| f64::from(sample.queue_age_ms)),
microphone_client_queue_age_ms: state
.latest_microphone_timing
.map(|sample| f64::from(sample.queue_age_ms)),
camera_server_receive_age_ms: state.latest_camera_timing.map(|sample| {
now.saturating_duration_since(sample.received_at)
.as_secs_f64()
* 1000.0
}),
microphone_server_receive_age_ms: state.latest_microphone_timing.map(|sample| {
now.saturating_duration_since(sample.received_at)
.as_secs_f64()
* 1000.0
}),
client_capture_abs_skew_p95_ms: state.client_capture_skew_window_ms.p95_abs(),
client_send_abs_skew_p95_ms: state.client_send_skew_window_ms.p95_abs(),
server_receive_abs_skew_p95_ms: state.server_receive_skew_window_ms.p95_abs(),
camera_client_queue_age_p95_ms: state.camera_client_queue_age_window_ms.p95(),
microphone_client_queue_age_p95_ms: state.microphone_client_queue_age_window_ms.p95(),
sink_handoff_skew_ms: latest_sink_handoff_skew_ms(&state),
sink_handoff_abs_skew_p95_ms: state.sink_handoff_skew_window_ms.p95_abs(),
camera_sink_late_ms: state.latest_camera_presentation.map(presentation_late_ms),
microphone_sink_late_ms: state
.latest_microphone_presentation
.map(presentation_late_ms),
camera_sink_late_p95_ms: state.camera_sink_late_window_ms.p95(),
microphone_sink_late_p95_ms: state.microphone_sink_late_window_ms.p95(),
client_timing_window_samples: state.client_capture_skew_window_ms.len() as u64,
sink_handoff_window_samples: state.sink_handoff_skew_window_ms.len() as u64,
}
}
}
include!("upstream_media_runtime/lease_lifecycle.rs");
include!("upstream_media_runtime/playout_planning_methods.rs");
include!("upstream_media_runtime/playout_decision_core.rs");
2026-05-03 12:22:33 -03:00
fn update_latest_remote_pts(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
remote_pts_us: u64,
) {
let slot = match kind {
UpstreamMediaKind::Camera => &mut state.latest_camera_remote_pts_us,
UpstreamMediaKind::Microphone => &mut state.latest_microphone_remote_pts_us,
};
*slot = Some((*slot).unwrap_or(remote_pts_us).max(remote_pts_us));
}
fn source_lag_for_kind(
state: &UpstreamClockState,
kind: UpstreamMediaKind,
remote_pts_us: u64,
) -> Duration {
let latest = match kind {
UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us,
UpstreamMediaKind::Microphone => state.latest_microphone_remote_pts_us,
}
.unwrap_or(remote_pts_us);
Duration::from_micros(latest.saturating_sub(remote_pts_us))
}
fn video_is_too_far_behind_audio(
video_pts_us: u64,
audio_pts_us: u64,
audio_ahead_video_allowance_us: u64,
) -> bool {
let slack_us = (upstream_pairing_master_slack()
.as_micros()
.min(u64::MAX as u128) as u64)
.saturating_add(audio_ahead_video_allowance_us);
video_pts_us.saturating_add(slack_us) < audio_pts_us
}
fn live_lag_us(state: &UpstreamClockState) -> Option<u64> {
let latest_audio = state.latest_microphone_remote_pts_us?;
let audio_playhead = state.last_audio_presented_pts_us?;
let base = state.session_base_remote_pts_us?;
Some(latest_audio.saturating_sub(base.saturating_add(audio_playhead)))
}
fn us_to_ms(value: u64) -> f64 {
value as f64 / 1000.0
}
fn instant_delta_us(left: Instant, right: Instant) -> i128 {
if left >= right {
left.saturating_duration_since(right).as_micros() as i128
} else {
-(right.saturating_duration_since(left).as_micros() as i128)
}
}
const CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US: u64 = 250_000;
fn push_timing_sample(
samples: &mut std::collections::VecDeque<state::UpstreamTimingSample>,
sample: state::UpstreamTimingSample,
) {
if samples.len() >= state::TIMING_WINDOW_CAPACITY {
samples.pop_front();
}
samples.push_back(sample);
}
fn abs_delta_us(left: u64, right: u64) -> u64 {
left.max(right) - left.min(right)
}
fn nearest_timing_sample_by_send(
samples: &std::collections::VecDeque<state::UpstreamTimingSample>,
send_pts_us: u64,
) -> Option<state::UpstreamTimingSample> {
samples
.iter()
.copied()
.min_by_key(|sample| abs_delta_us(sample.send_pts_us, send_pts_us))
}
fn record_client_timing_windows(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
sample: state::UpstreamTimingSample,
) {
let paired = match kind {
UpstreamMediaKind::Camera => {
nearest_timing_sample_by_send(&state.recent_microphone_timing, sample.send_pts_us)
.map(|microphone| (sample, microphone))
}
UpstreamMediaKind::Microphone => {
nearest_timing_sample_by_send(&state.recent_camera_timing, sample.send_pts_us)
.map(|camera| (camera, sample))
}
};
let Some((camera, microphone)) = paired else {
return;
};
if abs_delta_us(camera.send_pts_us, microphone.send_pts_us)
> CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US
{
return;
}
let client_capture_skew_ms =
(camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0;
let client_send_skew_ms =
(camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0;
let server_receive_skew_ms =
instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0;
state.latest_paired_client_capture_skew_ms = Some(client_capture_skew_ms);
state.latest_paired_client_send_skew_ms = Some(client_send_skew_ms);
state.latest_paired_server_receive_skew_ms = Some(server_receive_skew_ms);
state
.client_capture_skew_window_ms
.push(client_capture_skew_ms);
state.client_send_skew_window_ms.push(client_send_skew_ms);
state
.server_receive_skew_window_ms
.push(server_receive_skew_ms);
}
fn record_presentation_sample(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
due_at: Instant,
) {
let sample = state::UpstreamPresentationSample {
due_at,
handed_at: Instant::now(),
};
let late_ms = presentation_late_ms(sample).max(0.0);
match kind {
UpstreamMediaKind::Camera => {
state.latest_camera_presentation = Some(sample);
state.camera_sink_late_window_ms.push(late_ms);
}
UpstreamMediaKind::Microphone => {
state.latest_microphone_presentation = Some(sample);
state.microphone_sink_late_window_ms.push(late_ms);
}
}
if let Some(skew_ms) = latest_sink_handoff_skew_ms(state) {
state.sink_handoff_skew_window_ms.push(skew_ms);
}
}
fn latest_sink_handoff_skew_ms(state: &UpstreamClockState) -> Option<f64> {
let (Some(camera), Some(microphone)) = (
state.latest_camera_presentation,
state.latest_microphone_presentation,
) else {
return None;
};
let due_at_delta_ms = instant_delta_us(camera.due_at, microphone.due_at).abs() as f64 / 1000.0;
if due_at_delta_ms > 250.0 {
return None;
}
Some(instant_delta_us(camera.handed_at, microphone.handed_at) as f64 / 1000.0)
}
fn presentation_late_ms(sample: state::UpstreamPresentationSample) -> f64 {
instant_delta_us(sample.handed_at, sample.due_at) as f64 / 1000.0
}
fn refresh_unpaired_pairing_anchor(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
remote_pts_us: u64,
next_deadline: Instant,
) -> bool {
state.pairing_anchor_deadline = Some(next_deadline);
match kind {
UpstreamMediaKind::Camera if state.first_microphone_remote_pts_us.is_none() => {
state.first_camera_remote_pts_us = Some(remote_pts_us);
true
}
UpstreamMediaKind::Microphone if state.first_camera_remote_pts_us.is_none() => {
state.first_microphone_remote_pts_us = Some(remote_pts_us);
true
}
_ => false,
}
}
impl Default for UpstreamMediaRuntime {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests;