lesavka/server/src/upstream_media_runtime.rs

1067 lines
45 KiB
Rust

#![forbid(unsafe_code)]
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::info;
mod config;
mod state;
mod types;
use config::{
apply_playout_offset, upstream_audio_master_wait_grace, upstream_bundled_playout_delay,
upstream_bundled_playout_offset_override_us, upstream_camera_startup_grace_us,
upstream_max_live_lag, upstream_pairing_master_slack, upstream_playout_delay,
upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_require_paired_startup,
upstream_startup_timeout, upstream_timing_trace_enabled,
};
use state::{UpstreamClockState, UpstreamSyncPhase};
pub use types::{
PlannedUpstreamPacket, UpstreamClientTiming, UpstreamMediaKind, UpstreamPlanDecision,
UpstreamPlannerSnapshot, UpstreamStreamLease,
};
/// Coordinate upstream stream ownership and keep audio/video on one timeline.
///
/// Inputs: stream-open/close events plus remote packet timestamps.
/// Outputs: active-stream leases and rebased local PTS values.
/// Why: live calls need one current webcam owner, one current microphone owner,
/// and one shared media clock so reconnects do not leave old sinks alive or let
/// audio/video drift onto separate timing islands.
#[derive(Debug)]
pub struct UpstreamMediaRuntime {
next_session_id: AtomicU64,
next_camera_generation: AtomicU64,
next_microphone_generation: AtomicU64,
microphone_sink_gate: Arc<Semaphore>,
pairing_state_notify: Arc<Notify>,
audio_progress_notify: Arc<Notify>,
camera_playout_offset_us: AtomicI64,
microphone_playout_offset_us: AtomicI64,
state: Mutex<UpstreamClockState>,
}
impl UpstreamMediaRuntime {
/// Build an empty upstream runtime.
#[must_use]
pub fn new() -> Self {
Self {
next_session_id: AtomicU64::new(0),
next_camera_generation: AtomicU64::new(0),
next_microphone_generation: AtomicU64::new(0),
microphone_sink_gate: Arc::new(Semaphore::new(1)),
pairing_state_notify: Arc::new(Notify::new()),
audio_progress_notify: Arc::new(Notify::new()),
camera_playout_offset_us: AtomicI64::new(upstream_playout_offset_us(
UpstreamMediaKind::Camera,
)),
microphone_playout_offset_us: AtomicI64::new(upstream_playout_offset_us(
UpstreamMediaKind::Microphone,
)),
state: Mutex::new(UpstreamClockState::default()),
}
}
/// Apply live upstream playout offsets without restarting the relay.
pub fn set_playout_offsets(&self, camera_offset_us: i64, microphone_offset_us: i64) {
self.camera_playout_offset_us
.store(camera_offset_us, Ordering::Relaxed);
self.microphone_playout_offset_us
.store(microphone_offset_us, Ordering::Relaxed);
}
/// Return `(camera_offset_us, microphone_offset_us)` currently used for live playout.
#[must_use]
pub fn playout_offsets(&self) -> (i64, i64) {
(
self.camera_playout_offset_us.load(Ordering::Relaxed),
self.microphone_playout_offset_us.load(Ordering::Relaxed),
)
}
fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 {
match kind {
UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed),
UpstreamMediaKind::Microphone => {
self.microphone_playout_offset_us.load(Ordering::Relaxed)
}
}
}
fn positive_audio_delay_allowance_us(&self) -> u64 {
let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed);
let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed);
microphone_offset_us.saturating_sub(camera_offset_us).max(0) as u64
}
fn audio_ahead_video_allowance_us(&self) -> u64 {
let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed);
let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed);
camera_offset_us.saturating_sub(microphone_offset_us).max(0) as u64
}
fn intentional_future_wait_allowance_us(&self, kind: UpstreamMediaKind) -> u64 {
match kind {
UpstreamMediaKind::Camera => self.audio_ahead_video_allowance_us(),
UpstreamMediaKind::Microphone => self.positive_audio_delay_allowance_us(),
}
}
fn raw_bundled_playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 {
upstream_bundled_playout_offset_override_us(kind)
.unwrap_or_else(|| self.playout_offset_us(kind))
}
fn bundled_playout_offsets_us(&self) -> (i64, i64) {
let mut camera_offset_us = self.raw_bundled_playout_offset_us(UpstreamMediaKind::Camera);
let mut microphone_offset_us =
self.raw_bundled_playout_offset_us(UpstreamMediaKind::Microphone);
let max_span_us = upstream_max_live_lag()
.saturating_sub(upstream_bundled_playout_delay())
.as_micros()
.min(i64::MAX as u128) as i64;
let span_us = camera_offset_us.saturating_sub(microphone_offset_us);
if span_us > max_span_us {
camera_offset_us = microphone_offset_us.saturating_add(max_span_us);
} else if span_us < -max_span_us {
microphone_offset_us = camera_offset_us.saturating_add(max_span_us);
}
(camera_offset_us, microphone_offset_us)
}
fn bundled_playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 {
let (camera_offset_us, microphone_offset_us) = self.bundled_playout_offsets_us();
match kind {
UpstreamMediaKind::Camera => camera_offset_us,
UpstreamMediaKind::Microphone => microphone_offset_us,
}
}
fn bundled_later_offset_reserve_us(&self, kind: UpstreamMediaKind) -> u64 {
let (camera_offset_us, microphone_offset_us) = self.bundled_playout_offsets_us();
let slowest_offset_us = camera_offset_us.max(microphone_offset_us);
let kind_offset_us = match kind {
UpstreamMediaKind::Camera => camera_offset_us,
UpstreamMediaKind::Microphone => microphone_offset_us,
};
slowest_offset_us.saturating_sub(kind_offset_us).max(0) as u64
}
/// Mark one audio chunk as actually handed to the UAC sink.
pub fn mark_audio_presented(&self, local_pts_us: u64, due_at: Instant) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
record_presentation_sample(&mut state, UpstreamMediaKind::Microphone, due_at);
state.last_audio_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live;
state.last_reason = "audio-master playhead flowing".to_string();
}
self.audio_progress_notify.notify_waiters();
}
/// Record client-side timing facts for one packet as it arrives at the server.
pub fn record_client_timing(&self, kind: UpstreamMediaKind, timing: UpstreamClientTiming) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
let sample = state::UpstreamTimingSample {
capture_pts_us: timing.capture_pts_us,
send_pts_us: timing.send_pts_us,
queue_age_ms: timing.queue_age_ms,
received_at: Instant::now(),
};
match kind {
UpstreamMediaKind::Camera => {
state.latest_camera_timing = Some(sample);
push_timing_sample(&mut state.recent_camera_timing, sample);
state
.camera_client_queue_age_window_ms
.push(f64::from(timing.queue_age_ms));
}
UpstreamMediaKind::Microphone => {
state.latest_microphone_timing = Some(sample);
push_timing_sample(&mut state.recent_microphone_timing, sample);
state
.microphone_client_queue_age_window_ms
.push(f64::from(timing.queue_age_ms));
}
}
record_client_timing_windows(&mut state, kind, sample);
}
/// Mark one video frame as actually handed to the UVC/HDMI sink.
pub fn mark_video_presented(&self, local_pts_us: u64, due_at: Instant) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
record_presentation_sample(&mut state, UpstreamMediaKind::Camera, due_at);
state.last_video_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live;
state.last_reason = "video follower emitted a synced frame".to_string();
}
}
/// Record that video intentionally froze instead of showing an out-of-sync frame.
pub fn record_video_freeze(&self, reason: impl Into<String>) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
state.video_freezes = state.video_freezes.saturating_add(1);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Healing;
}
state.last_reason = reason.into();
}
/// Return current planner facts for diagnostics and probe artifacts.
#[must_use]
pub fn snapshot(&self) -> UpstreamPlannerSnapshot {
let state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
let live_lag_ms = live_lag_us(&state).map(us_to_ms);
let planner_skew_ms = match (
state.last_audio_presented_pts_us,
state.last_video_presented_pts_us,
) {
(Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0),
_ => None,
};
let now = Instant::now();
let client_capture_skew_ms = state.latest_paired_client_capture_skew_ms;
let client_send_skew_ms = state.latest_paired_client_send_skew_ms;
let server_receive_skew_ms = state.latest_paired_server_receive_skew_ms;
UpstreamPlannerSnapshot {
session_id: state.session_id,
phase: state.phase.as_str(),
latest_camera_remote_pts_us: state.latest_camera_remote_pts_us,
latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us,
last_video_presented_pts_us: state.last_video_presented_pts_us,
last_audio_presented_pts_us: state.last_audio_presented_pts_us,
live_lag_ms,
planner_skew_ms,
stale_audio_drops: state.stale_audio_drops,
stale_video_drops: state.stale_video_drops,
skew_video_drops: state.skew_video_drops,
freshness_reanchors: state.freshness_reanchors,
startup_timeouts: state.startup_timeouts,
video_freezes: state.video_freezes,
last_reason: state.last_reason.clone(),
client_capture_skew_ms,
client_send_skew_ms,
server_receive_skew_ms,
camera_client_queue_age_ms: state
.latest_camera_timing
.map(|sample| f64::from(sample.queue_age_ms)),
microphone_client_queue_age_ms: state
.latest_microphone_timing
.map(|sample| f64::from(sample.queue_age_ms)),
camera_server_receive_age_ms: state.latest_camera_timing.map(|sample| {
now.saturating_duration_since(sample.received_at)
.as_secs_f64()
* 1000.0
}),
microphone_server_receive_age_ms: state.latest_microphone_timing.map(|sample| {
now.saturating_duration_since(sample.received_at)
.as_secs_f64()
* 1000.0
}),
client_capture_abs_skew_p95_ms: state.client_capture_skew_window_ms.p95_abs(),
client_send_abs_skew_p95_ms: state.client_send_skew_window_ms.p95_abs(),
server_receive_abs_skew_p95_ms: state.server_receive_skew_window_ms.p95_abs(),
camera_client_queue_age_p95_ms: state.camera_client_queue_age_window_ms.p95(),
microphone_client_queue_age_p95_ms: state.microphone_client_queue_age_window_ms.p95(),
sink_handoff_skew_ms: latest_sink_handoff_skew_ms(&state),
sink_handoff_abs_skew_p95_ms: state.sink_handoff_skew_window_ms.p95_abs(),
camera_sink_late_ms: state.latest_camera_presentation.map(presentation_late_ms),
microphone_sink_late_ms: state
.latest_microphone_presentation
.map(presentation_late_ms),
camera_sink_late_p95_ms: state.camera_sink_late_window_ms.p95(),
microphone_sink_late_p95_ms: state.microphone_sink_late_window_ms.p95(),
client_timing_window_samples: state.client_capture_skew_window_ms.len() as u64,
sink_handoff_window_samples: state.sink_handoff_skew_window_ms.len() as u64,
}
}
}
include!("upstream_media_runtime/lease_lifecycle.rs");
impl UpstreamMediaRuntime {
/// Rebase one upstream video packet timestamp onto the shared session clock.
#[must_use]
pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option<u64> {
match self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) {
UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us),
_ => None,
}
}
/// Rebase one upstream audio packet timestamp onto the shared session clock.
#[must_use]
pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option<u64> {
match self.plan_audio_pts(remote_pts_us) {
UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us),
_ => None,
}
}
/// Rebase and schedule one upstream video packet on the shared playout epoch.
#[must_use]
pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision {
self.plan_pts(
UpstreamMediaKind::Camera,
remote_pts_us,
frame_step_us.max(1),
)
}
/// Rebase and schedule one upstream audio packet on the shared playout epoch.
#[must_use]
pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision {
self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1)
}
/// Schedule a packet from the bundled webcam/microphone transport.
///
/// Inputs: the media kind, client capture timestamp, packet cadence floor,
/// and the client-owned bundle epoch chosen for this gRPC stream.
/// Outputs: the server playout decision for that packet.
/// Why: bundled webcam media has already been synchronized on the client,
/// so the server should not re-solve cross-stream startup pairing. It only
/// rebases the shared client clock onto a fresh local playout epoch.
#[must_use]
pub fn plan_bundled_pts(
&self,
kind: UpstreamMediaKind,
remote_pts_us: u64,
min_step_us: u64,
bundle_base_remote_pts_us: u64,
bundle_epoch: Instant,
) -> UpstreamPlanDecision {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
let session_id = state.session_id;
match kind {
UpstreamMediaKind::Camera => {
state.camera_packet_count = state.camera_packet_count.saturating_add(1);
state
.first_camera_remote_pts_us
.get_or_insert(remote_pts_us);
state.camera_startup_ready = true;
}
UpstreamMediaKind::Microphone => {
state.microphone_packet_count = state.microphone_packet_count.saturating_add(1);
state
.first_microphone_remote_pts_us
.get_or_insert(remote_pts_us);
}
}
update_latest_remote_pts(&mut state, kind, remote_pts_us);
if state.session_base_remote_pts_us.is_none() {
state.session_base_remote_pts_us = Some(bundle_base_remote_pts_us);
state.playout_epoch = Some(bundle_epoch);
state.pairing_anchor_deadline = Some(bundle_epoch);
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "client-bundled upstream media epoch established".to_string();
self.pairing_state_notify.notify_waiters();
info!(
session_id,
bundle_base_remote_pts_us, "client-bundled upstream media epoch established"
);
}
let session_base_remote_pts_us = state
.session_base_remote_pts_us
.unwrap_or(bundle_base_remote_pts_us);
if remote_pts_us < session_base_remote_pts_us {
return UpstreamPlanDecision::DropBeforeOverlap;
}
let max_live_lag = upstream_max_live_lag();
let source_lag = source_lag_for_kind(&state, kind, remote_pts_us);
if source_lag > max_live_lag {
match kind {
UpstreamMediaKind::Camera => {
state.stale_video_drops = state.stale_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.last_reason =
"dropped stale bundled video beyond max live lag".to_string();
}
UpstreamMediaKind::Microphone => {
state.stale_audio_drops = state.stale_audio_drops.saturating_add(1);
state.last_reason =
"dropped stale bundled audio beyond max live lag".to_string();
}
}
state.phase = UpstreamSyncPhase::Healing;
return UpstreamPlanDecision::DropStale("bundled packet exceeded max live lag");
}
let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us);
let last_slot = match kind {
UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us,
UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us,
};
if let Some(last_pts_us) = *last_slot
&& local_pts_us <= last_pts_us
{
local_pts_us = last_pts_us.saturating_add(min_step_us.max(1));
}
*last_slot = Some(local_pts_us);
let sink_offset_us = self.bundled_playout_offset_us(kind);
let epoch = state.playout_epoch.unwrap_or(bundle_epoch);
let mut due_at =
apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us);
let now = Instant::now();
let mut late_by = now.checked_duration_since(due_at).unwrap_or_default();
let playout_delay = upstream_bundled_playout_delay().min(max_live_lag);
let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay);
let max_future_wait = max_live_lag.saturating_sub(source_lag);
let later_offset_reserve =
Duration::from_micros(self.bundled_later_offset_reserve_us(kind));
let max_kind_future_wait = max_future_wait.saturating_sub(later_offset_reserve);
let due_future_wait = due_at.saturating_duration_since(now);
if late_by > reanchor_threshold || due_future_wait > max_kind_future_wait {
let desired_delay = playout_delay.min(max_kind_future_wait);
let desired_due_at = now + desired_delay;
let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us);
let recovered_epoch = unoffset_due_at
.checked_sub(Duration::from_micros(local_pts_us))
.unwrap_or(unoffset_due_at);
state.playout_epoch = Some(recovered_epoch);
state.pairing_anchor_deadline = Some(desired_due_at);
state.freshness_reanchors = state.freshness_reanchors.saturating_add(1);
state.phase = UpstreamSyncPhase::Healing;
state.last_reason =
"reanchored bundled upstream playhead to preserve freshness".to_string();
due_at = apply_playout_offset(
recovered_epoch + Duration::from_micros(local_pts_us),
sink_offset_us,
);
late_by = now.checked_duration_since(due_at).unwrap_or_default();
info!(
session_id,
?kind,
local_pts_us,
remote_pts_us,
recovery_buffer_ms = desired_delay.as_millis(),
max_live_lag_ms = max_live_lag.as_millis(),
source_lag_ms = source_lag.as_millis(),
later_offset_reserve_ms = later_offset_reserve.as_millis(),
"bundled upstream media playhead reanchored to preserve freshness"
);
}
let predicted_lag_at_playout =
source_lag.saturating_add(due_at.saturating_duration_since(now));
if predicted_lag_at_playout > max_live_lag {
match kind {
UpstreamMediaKind::Camera => {
state.stale_video_drops = state.stale_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.last_reason =
"dropped bundled video that would exceed max live lag at playout"
.to_string();
}
UpstreamMediaKind::Microphone => {
state.stale_audio_drops = state.stale_audio_drops.saturating_add(1);
state.last_reason =
"dropped bundled audio that would exceed max live lag at playout"
.to_string();
}
}
state.phase = UpstreamSyncPhase::Healing;
return UpstreamPlanDecision::DropStale(
"bundled packet would exceed max live lag at playout",
);
}
if kind == UpstreamMediaKind::Microphone {
self.audio_progress_notify.notify_waiters();
}
UpstreamPlanDecision::Play(PlannedUpstreamPacket {
local_pts_us,
due_at,
late_by,
source_lag,
})
}
/// Hold video until the audio master has at least reached the same capture
/// moment, or until the bounded sync grace is exhausted.
pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool {
let slack_us = upstream_pairing_master_slack()
.as_micros()
.min(u64::MAX as u128) as u64;
let audio_delay_allowance_us = self.positive_audio_delay_allowance_us();
let deadline = due_at + upstream_audio_master_wait_grace();
loop {
let notified = self.audio_progress_notify.notified();
{
let state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
if state.active_microphone_generation.is_none() {
return true;
}
let audio_presented_pts_us = state.last_audio_presented_pts_us.unwrap_or(0);
if audio_presented_pts_us
.saturating_add(slack_us)
.saturating_add(audio_delay_allowance_us)
>= video_local_pts_us
{
return true;
}
}
if Instant::now() >= deadline {
return false;
}
tokio::select! {
_ = notified => {}
_ = tokio::time::sleep_until(deadline) => return false,
}
}
}
fn plan_pts(
&self,
kind: UpstreamMediaKind,
remote_pts_us: u64,
min_step_us: u64,
) -> UpstreamPlanDecision {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
let session_id = state.session_id;
let packet_count = match kind {
UpstreamMediaKind::Camera => {
state.camera_packet_count = state.camera_packet_count.saturating_add(1);
state.camera_packet_count
}
UpstreamMediaKind::Microphone => {
state.microphone_packet_count = state.microphone_packet_count.saturating_add(1);
state.microphone_packet_count
}
};
update_latest_remote_pts(&mut state, kind, remote_pts_us);
let mut first_remote_for_kind = match kind {
UpstreamMediaKind::Camera => {
let first_slot = &mut state.first_camera_remote_pts_us;
*first_slot.get_or_insert(remote_pts_us)
}
UpstreamMediaKind::Microphone => {
let first_slot = &mut state.first_microphone_remote_pts_us;
*first_slot.get_or_insert(remote_pts_us)
}
};
if kind == UpstreamMediaKind::Camera {
let startup_grace_us = upstream_camera_startup_grace_us();
if !state.camera_startup_ready
&& (startup_grace_us == 0
|| remote_pts_us.saturating_sub(first_remote_for_kind) >= startup_grace_us)
{
state.camera_startup_ready = true;
state.first_camera_remote_pts_us = Some(remote_pts_us);
first_remote_for_kind = remote_pts_us;
}
}
let now = Instant::now();
let pairing_deadline = *state
.pairing_anchor_deadline
.get_or_insert_with(|| now + upstream_playout_delay());
let playout_delay = upstream_playout_delay();
let max_live_lag = upstream_max_live_lag();
if state.session_base_remote_pts_us.is_none() {
if state.session_started_at.is_some_and(|started_at| {
now.saturating_duration_since(started_at) > upstream_startup_timeout()
}) {
state.phase = UpstreamSyncPhase::Failed;
state.startup_timeouts = state.startup_timeouts.saturating_add(1);
state.last_reason =
"paired upstream startup did not converge before timeout".to_string();
return UpstreamPlanDecision::StartupFailed(
"paired upstream startup did not converge before timeout",
);
}
if state.first_camera_remote_pts_us.is_some()
&& state.first_microphone_remote_pts_us.is_some()
&& state.camera_startup_ready
{
let first_camera_remote_pts_us =
state.first_camera_remote_pts_us.unwrap_or_default();
let first_microphone_remote_pts_us =
state.first_microphone_remote_pts_us.unwrap_or_default();
state.session_base_remote_pts_us =
Some(first_camera_remote_pts_us.max(first_microphone_remote_pts_us));
let overlap_epoch = now + playout_delay;
state.playout_epoch = Some(overlap_epoch);
state.pairing_anchor_deadline = Some(overlap_epoch);
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "fresh audio/video overlap anchor established".to_string();
if !state.startup_anchor_logged {
let startup_delta_us =
first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128;
info!(
session_id,
first_camera_remote_pts_us,
first_microphone_remote_pts_us,
overlap_base_remote_pts_us =
state.session_base_remote_pts_us.unwrap_or_default(),
startup_delta_us,
"upstream media overlap anchors established"
);
state.startup_anchor_logged = true;
}
self.pairing_state_notify.notify_waiters();
} else if now < pairing_deadline {
state.phase = UpstreamSyncPhase::Acquiring;
state.last_reason = "awaiting both upstream media streams".to_string();
if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
{
info!(
session_id,
?kind,
packet_count,
remote_pts_us,
wait_ms = pairing_deadline.saturating_duration_since(now).as_millis(),
"upstream media packet buffered while awaiting the counterpart stream"
);
}
return UpstreamPlanDecision::AwaitingPair;
} else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready {
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "camera startup warm-up is still in progress".to_string();
if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
{
info!(
session_id,
?kind,
packet_count,
remote_pts_us,
"upstream media packet buffered while camera startup warm-up is still in progress"
);
}
return UpstreamPlanDecision::AwaitingPair;
} else if upstream_require_paired_startup() {
let refreshed = refresh_unpaired_pairing_anchor(
&mut state,
kind,
remote_pts_us,
now + playout_delay,
);
if refreshed || upstream_timing_trace_enabled() {
info!(
session_id,
?kind,
packet_count,
remote_pts_us,
refreshed_anchor = refreshed,
healing_window_ms = playout_delay.as_millis(),
"upstream media pairing window expired; holding one-sided stream for synced startup"
);
}
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "holding one-sided stream for synced startup".to_string();
return UpstreamPlanDecision::AwaitingPair;
} else {
let single_stream_base_remote_pts_us = match kind {
UpstreamMediaKind::Camera => {
state.first_camera_remote_pts_us.unwrap_or(remote_pts_us)
}
UpstreamMediaKind::Microphone => state
.first_microphone_remote_pts_us
.unwrap_or(remote_pts_us),
};
state.session_base_remote_pts_us = Some(single_stream_base_remote_pts_us);
let one_sided_epoch = now + playout_delay;
state.playout_epoch = Some(one_sided_epoch);
state.pairing_anchor_deadline = Some(one_sided_epoch);
info!(
session_id,
?kind,
single_stream_base_remote_pts_us,
"upstream media pairing window expired; continuing with one-sided playout"
);
self.pairing_state_notify.notify_waiters();
}
}
let session_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or(remote_pts_us);
if remote_pts_us < session_base_remote_pts_us {
if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
{
info!(
session_id,
?kind,
packet_count,
remote_pts_us,
session_base_remote_pts_us,
"upstream media packet dropped before the shared overlap base"
);
}
return UpstreamPlanDecision::DropBeforeOverlap;
}
let source_lag = source_lag_for_kind(&state, kind, remote_pts_us);
if source_lag > max_live_lag {
match kind {
UpstreamMediaKind::Camera => {
state.stale_video_drops = state.stale_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.last_reason = "dropped stale video beyond max live lag".to_string();
}
UpstreamMediaKind::Microphone => {
state.stale_audio_drops = state.stale_audio_drops.saturating_add(1);
state.last_reason = "dropped stale audio beyond max live lag".to_string();
}
}
state.phase = UpstreamSyncPhase::Healing;
return UpstreamPlanDecision::DropStale("packet exceeded max live lag");
}
let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us);
let last_slot = match kind {
UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us,
UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us,
};
if let Some(last_pts_us) = *last_slot
&& local_pts_us <= last_pts_us
{
local_pts_us = last_pts_us.saturating_add(min_step_us.max(1));
}
*last_slot = Some(local_pts_us);
let audio_ahead_video_allowance_us = self.audio_ahead_video_allowance_us();
if kind == UpstreamMediaKind::Camera
&& state
.last_audio_presented_pts_us
.is_some_and(|audio_pts_us| {
video_is_too_far_behind_audio(
local_pts_us,
audio_pts_us,
audio_ahead_video_allowance_us,
)
})
{
state.skew_video_drops = state.skew_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.phase = UpstreamSyncPhase::Healing;
state.last_reason =
"dropped video frame that was too far behind the audio master".to_string();
return UpstreamPlanDecision::DropStale("video frame was too far behind audio master");
}
let epoch = *state.playout_epoch.get_or_insert(pairing_deadline);
let sink_offset_us = self.playout_offset_us(kind);
let playout_delay = upstream_playout_delay().min(max_live_lag);
let mut due_at =
apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us);
let mut late_by = now.checked_duration_since(due_at).unwrap_or_default();
let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay);
let intentional_future_wait_allowance =
Duration::from_micros(self.intentional_future_wait_allowance_us(kind));
let max_future_wait = max_live_lag
.saturating_sub(source_lag)
.saturating_add(intentional_future_wait_allowance);
let due_future_wait = due_at.saturating_duration_since(now);
if late_by > reanchor_threshold || due_future_wait > max_future_wait {
let old_late_by = late_by;
let old_future_wait = due_future_wait;
let desired_delay = playout_delay.min(max_future_wait);
let desired_due_at = now + desired_delay;
let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us);
let recovered_epoch = unoffset_due_at
.checked_sub(Duration::from_micros(local_pts_us))
.unwrap_or(unoffset_due_at);
state.playout_epoch = Some(recovered_epoch);
state.pairing_anchor_deadline = Some(desired_due_at);
state.freshness_reanchors = state.freshness_reanchors.saturating_add(1);
state.phase = UpstreamSyncPhase::Healing;
state.last_reason = "reanchored upstream playhead to preserve freshness".to_string();
due_at = apply_playout_offset(
recovered_epoch + Duration::from_micros(local_pts_us),
sink_offset_us,
);
late_by = now.checked_duration_since(due_at).unwrap_or_default();
info!(
session_id,
?kind,
packet_count,
local_pts_us,
remote_pts_us,
old_late_by_ms = old_late_by.as_millis(),
old_future_wait_ms = old_future_wait.as_millis(),
recovery_buffer_ms = playout_delay.as_millis(),
reanchor_threshold_ms = reanchor_threshold.as_millis(),
max_live_lag_ms = max_live_lag.as_millis(),
source_lag_ms = source_lag.as_millis(),
"upstream media playhead reanchored to preserve freshness"
);
}
let predicted_lag_at_playout =
source_lag.saturating_add(due_at.saturating_duration_since(now));
if predicted_lag_at_playout > max_live_lag.saturating_add(intentional_future_wait_allowance)
{
match kind {
UpstreamMediaKind::Camera => {
state.stale_video_drops = state.stale_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.last_reason =
"dropped video that would exceed max live lag at playout".to_string();
}
UpstreamMediaKind::Microphone => {
state.stale_audio_drops = state.stale_audio_drops.saturating_add(1);
state.last_reason =
"dropped audio that would exceed max live lag at playout".to_string();
}
}
state.phase = UpstreamSyncPhase::Healing;
return UpstreamPlanDecision::DropStale("packet would exceed max live lag at playout");
}
if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
{
let playout_delay_us = due_at.saturating_duration_since(now).as_micros();
let late_by_us = late_by.as_micros();
info!(
session_id,
?kind,
packet_count,
remote_pts_us,
session_base_remote_pts_us,
first_remote_for_kind,
remote_elapsed_us = remote_pts_us.saturating_sub(session_base_remote_pts_us),
local_pts_us,
playout_delay_us,
sink_offset_us,
late_by_us,
source_lag_us = source_lag.as_micros(),
"upstream media rebase sample"
);
}
if kind == UpstreamMediaKind::Microphone {
self.audio_progress_notify.notify_waiters();
}
UpstreamPlanDecision::Play(PlannedUpstreamPacket {
local_pts_us,
due_at,
late_by,
source_lag,
})
}
}
fn update_latest_remote_pts(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
remote_pts_us: u64,
) {
let slot = match kind {
UpstreamMediaKind::Camera => &mut state.latest_camera_remote_pts_us,
UpstreamMediaKind::Microphone => &mut state.latest_microphone_remote_pts_us,
};
*slot = Some((*slot).unwrap_or(remote_pts_us).max(remote_pts_us));
}
fn source_lag_for_kind(
state: &UpstreamClockState,
kind: UpstreamMediaKind,
remote_pts_us: u64,
) -> Duration {
let latest = match kind {
UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us,
UpstreamMediaKind::Microphone => state.latest_microphone_remote_pts_us,
}
.unwrap_or(remote_pts_us);
Duration::from_micros(latest.saturating_sub(remote_pts_us))
}
fn video_is_too_far_behind_audio(
video_pts_us: u64,
audio_pts_us: u64,
audio_ahead_video_allowance_us: u64,
) -> bool {
let slack_us = (upstream_pairing_master_slack()
.as_micros()
.min(u64::MAX as u128) as u64)
.saturating_add(audio_ahead_video_allowance_us);
video_pts_us.saturating_add(slack_us) < audio_pts_us
}
fn live_lag_us(state: &UpstreamClockState) -> Option<u64> {
let latest_audio = state.latest_microphone_remote_pts_us?;
let audio_playhead = state.last_audio_presented_pts_us?;
let base = state.session_base_remote_pts_us?;
Some(latest_audio.saturating_sub(base.saturating_add(audio_playhead)))
}
fn us_to_ms(value: u64) -> f64 {
value as f64 / 1000.0
}
fn instant_delta_us(left: Instant, right: Instant) -> i128 {
if left >= right {
left.saturating_duration_since(right).as_micros() as i128
} else {
-(right.saturating_duration_since(left).as_micros() as i128)
}
}
const CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US: u64 = 250_000;
fn push_timing_sample(
samples: &mut std::collections::VecDeque<state::UpstreamTimingSample>,
sample: state::UpstreamTimingSample,
) {
if samples.len() >= state::TIMING_WINDOW_CAPACITY {
samples.pop_front();
}
samples.push_back(sample);
}
fn abs_delta_us(left: u64, right: u64) -> u64 {
left.max(right) - left.min(right)
}
fn nearest_timing_sample_by_send(
samples: &std::collections::VecDeque<state::UpstreamTimingSample>,
send_pts_us: u64,
) -> Option<state::UpstreamTimingSample> {
samples
.iter()
.copied()
.min_by_key(|sample| abs_delta_us(sample.send_pts_us, send_pts_us))
}
fn record_client_timing_windows(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
sample: state::UpstreamTimingSample,
) {
let paired = match kind {
UpstreamMediaKind::Camera => {
nearest_timing_sample_by_send(&state.recent_microphone_timing, sample.send_pts_us)
.map(|microphone| (sample, microphone))
}
UpstreamMediaKind::Microphone => {
nearest_timing_sample_by_send(&state.recent_camera_timing, sample.send_pts_us)
.map(|camera| (camera, sample))
}
};
let Some((camera, microphone)) = paired else {
return;
};
if abs_delta_us(camera.send_pts_us, microphone.send_pts_us)
> CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US
{
return;
}
let client_capture_skew_ms =
(camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0;
let client_send_skew_ms =
(camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0;
let server_receive_skew_ms =
instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0;
state.latest_paired_client_capture_skew_ms = Some(client_capture_skew_ms);
state.latest_paired_client_send_skew_ms = Some(client_send_skew_ms);
state.latest_paired_server_receive_skew_ms = Some(server_receive_skew_ms);
state
.client_capture_skew_window_ms
.push(client_capture_skew_ms);
state.client_send_skew_window_ms.push(client_send_skew_ms);
state
.server_receive_skew_window_ms
.push(server_receive_skew_ms);
}
fn record_presentation_sample(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
due_at: Instant,
) {
let sample = state::UpstreamPresentationSample {
due_at,
handed_at: Instant::now(),
};
let late_ms = presentation_late_ms(sample).max(0.0);
match kind {
UpstreamMediaKind::Camera => {
state.latest_camera_presentation = Some(sample);
state.camera_sink_late_window_ms.push(late_ms);
}
UpstreamMediaKind::Microphone => {
state.latest_microphone_presentation = Some(sample);
state.microphone_sink_late_window_ms.push(late_ms);
}
}
if let Some(skew_ms) = latest_sink_handoff_skew_ms(state) {
state.sink_handoff_skew_window_ms.push(skew_ms);
}
}
fn latest_sink_handoff_skew_ms(state: &UpstreamClockState) -> Option<f64> {
let (Some(camera), Some(microphone)) = (
state.latest_camera_presentation,
state.latest_microphone_presentation,
) else {
return None;
};
let due_at_delta_ms = instant_delta_us(camera.due_at, microphone.due_at).abs() as f64 / 1000.0;
if due_at_delta_ms > 250.0 {
return None;
}
Some(instant_delta_us(camera.handed_at, microphone.handed_at) as f64 / 1000.0)
}
fn presentation_late_ms(sample: state::UpstreamPresentationSample) -> f64 {
instant_delta_us(sample.handed_at, sample.due_at) as f64 / 1000.0
}
fn refresh_unpaired_pairing_anchor(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
remote_pts_us: u64,
next_deadline: Instant,
) -> bool {
state.pairing_anchor_deadline = Some(next_deadline);
match kind {
UpstreamMediaKind::Camera if state.first_microphone_remote_pts_us.is_none() => {
state.first_camera_remote_pts_us = Some(remote_pts_us);
true
}
UpstreamMediaKind::Microphone if state.first_camera_remote_pts_us.is_none() => {
state.first_microphone_remote_pts_us = Some(remote_pts_us);
true
}
_ => false,
}
}
impl Default for UpstreamMediaRuntime {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests;