752 lines
26 KiB
Rust
752 lines
26 KiB
Rust
#![forbid(unsafe_code)]
|
|
|
|
use std::collections::VecDeque;
|
|
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
|
|
use std::sync::{Arc, Mutex};
|
|
use std::time::Duration;
|
|
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
|
use tokio::time::Instant;
|
|
|
|
const TIMING_WINDOW_CAPACITY: usize = 240;
|
|
const FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 0;
|
|
const FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 1_090_000;
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
pub enum UpstreamMediaKind {
|
|
Camera,
|
|
Microphone,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
|
|
pub struct UpstreamClientTiming {
|
|
pub capture_pts_us: u64,
|
|
pub send_pts_us: u64,
|
|
pub queue_depth: u32,
|
|
pub queue_age_ms: u32,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
pub struct UpstreamStreamLease {
|
|
pub session_id: u64,
|
|
pub generation: u64,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
pub struct PlannedUpstreamPacket {
|
|
pub local_pts_us: u64,
|
|
pub due_at: Instant,
|
|
pub late_by: Duration,
|
|
pub source_lag: Duration,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
pub enum UpstreamPlanDecision {
|
|
AwaitingPair,
|
|
DropBeforeOverlap,
|
|
DropStale(&'static str),
|
|
StartupFailed(&'static str),
|
|
Play(PlannedUpstreamPacket),
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct UpstreamPlannerSnapshot {
|
|
pub session_id: u64,
|
|
pub phase: &'static str,
|
|
pub latest_camera_remote_pts_us: Option<u64>,
|
|
pub latest_microphone_remote_pts_us: Option<u64>,
|
|
pub last_video_presented_pts_us: Option<u64>,
|
|
pub last_audio_presented_pts_us: Option<u64>,
|
|
pub live_lag_ms: Option<f64>,
|
|
pub planner_skew_ms: Option<f64>,
|
|
pub stale_audio_drops: u64,
|
|
pub stale_video_drops: u64,
|
|
pub skew_video_drops: u64,
|
|
pub freshness_reanchors: u64,
|
|
pub startup_timeouts: u64,
|
|
pub video_freezes: u64,
|
|
pub last_reason: String,
|
|
pub client_capture_skew_ms: Option<f64>,
|
|
pub client_send_skew_ms: Option<f64>,
|
|
pub server_receive_skew_ms: Option<f64>,
|
|
pub camera_client_queue_age_ms: Option<f64>,
|
|
pub microphone_client_queue_age_ms: Option<f64>,
|
|
pub camera_server_receive_age_ms: Option<f64>,
|
|
pub microphone_server_receive_age_ms: Option<f64>,
|
|
pub client_capture_abs_skew_p95_ms: Option<f64>,
|
|
pub client_send_abs_skew_p95_ms: Option<f64>,
|
|
pub server_receive_abs_skew_p95_ms: Option<f64>,
|
|
pub camera_client_queue_age_p95_ms: Option<f64>,
|
|
pub microphone_client_queue_age_p95_ms: Option<f64>,
|
|
pub sink_handoff_skew_ms: Option<f64>,
|
|
pub sink_handoff_abs_skew_p95_ms: Option<f64>,
|
|
pub camera_sink_late_ms: Option<f64>,
|
|
pub microphone_sink_late_ms: Option<f64>,
|
|
pub camera_sink_late_p95_ms: Option<f64>,
|
|
pub microphone_sink_late_p95_ms: Option<f64>,
|
|
pub client_timing_window_samples: u64,
|
|
pub sink_handoff_window_samples: u64,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
struct TimingSample {
|
|
capture_pts_us: u64,
|
|
send_pts_us: u64,
|
|
queue_age_ms: u32,
|
|
received_at: Instant,
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug)]
|
|
struct PresentationSample {
|
|
due_at: Instant,
|
|
handed_at: Instant,
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct ScalarWindow {
|
|
values: VecDeque<f64>,
|
|
}
|
|
|
|
impl ScalarWindow {
|
|
fn push(&mut self, value: f64) {
|
|
if self.values.len() >= TIMING_WINDOW_CAPACITY {
|
|
self.values.pop_front();
|
|
}
|
|
self.values.push_back(value);
|
|
}
|
|
|
|
fn p95(&self) -> Option<f64> {
|
|
percentile(self.values.iter().copied(), 0.95)
|
|
}
|
|
|
|
fn p95_abs(&self) -> Option<f64> {
|
|
percentile(self.values.iter().map(|value| value.abs()), 0.95)
|
|
}
|
|
|
|
fn len(&self) -> usize {
|
|
self.values.len()
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
enum UpstreamSyncPhase {
|
|
Acquiring,
|
|
Syncing,
|
|
Live,
|
|
Healing,
|
|
}
|
|
|
|
impl Default for UpstreamSyncPhase {
|
|
fn default() -> Self {
|
|
Self::Acquiring
|
|
}
|
|
}
|
|
|
|
impl UpstreamSyncPhase {
|
|
fn as_str(self) -> &'static str {
|
|
match self {
|
|
Self::Acquiring => "acquiring",
|
|
Self::Syncing => "syncing",
|
|
Self::Live => "live",
|
|
Self::Healing => "healing",
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct RuntimeState {
|
|
session_id: u64,
|
|
active_camera_generation: Option<u64>,
|
|
active_microphone_generation: Option<u64>,
|
|
phase: UpstreamSyncPhase,
|
|
session_started_at: Option<Instant>,
|
|
base_remote_pts_us: Option<u64>,
|
|
playout_epoch: Option<Instant>,
|
|
latest_camera_remote_pts_us: Option<u64>,
|
|
latest_microphone_remote_pts_us: Option<u64>,
|
|
last_video_local_pts_us: Option<u64>,
|
|
last_audio_local_pts_us: Option<u64>,
|
|
last_video_presented_pts_us: Option<u64>,
|
|
last_audio_presented_pts_us: Option<u64>,
|
|
latest_camera_timing: Option<TimingSample>,
|
|
latest_microphone_timing: Option<TimingSample>,
|
|
latest_camera_presentation: Option<PresentationSample>,
|
|
latest_microphone_presentation: Option<PresentationSample>,
|
|
latest_paired_client_capture_skew_ms: Option<f64>,
|
|
latest_paired_client_send_skew_ms: Option<f64>,
|
|
latest_paired_server_receive_skew_ms: Option<f64>,
|
|
client_capture_skew_window_ms: ScalarWindow,
|
|
client_send_skew_window_ms: ScalarWindow,
|
|
server_receive_skew_window_ms: ScalarWindow,
|
|
camera_client_queue_age_window_ms: ScalarWindow,
|
|
microphone_client_queue_age_window_ms: ScalarWindow,
|
|
sink_handoff_skew_window_ms: ScalarWindow,
|
|
camera_sink_late_window_ms: ScalarWindow,
|
|
microphone_sink_late_window_ms: ScalarWindow,
|
|
stale_audio_drops: u64,
|
|
stale_video_drops: u64,
|
|
skew_video_drops: u64,
|
|
freshness_reanchors: u64,
|
|
startup_timeouts: u64,
|
|
video_freezes: u64,
|
|
last_reason: String,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct UpstreamMediaRuntime {
|
|
next_session_id: AtomicU64,
|
|
next_camera_generation: AtomicU64,
|
|
next_microphone_generation: AtomicU64,
|
|
microphone_sink_gate: Arc<Semaphore>,
|
|
camera_playout_offset_us: AtomicI64,
|
|
microphone_playout_offset_us: AtomicI64,
|
|
state: Mutex<RuntimeState>,
|
|
}
|
|
|
|
impl UpstreamMediaRuntime {
|
|
#[must_use]
|
|
pub fn new() -> Self {
|
|
Self {
|
|
next_session_id: AtomicU64::new(0),
|
|
next_camera_generation: AtomicU64::new(0),
|
|
next_microphone_generation: AtomicU64::new(0),
|
|
microphone_sink_gate: Arc::new(Semaphore::new(1)),
|
|
camera_playout_offset_us: AtomicI64::new(playout_offset_us(UpstreamMediaKind::Camera)),
|
|
microphone_playout_offset_us: AtomicI64::new(playout_offset_us(
|
|
UpstreamMediaKind::Microphone,
|
|
)),
|
|
state: Mutex::new(RuntimeState::default()),
|
|
}
|
|
}
|
|
|
|
pub fn set_playout_offsets(&self, camera_offset_us: i64, microphone_offset_us: i64) {
|
|
self.camera_playout_offset_us
|
|
.store(camera_offset_us, Ordering::Relaxed);
|
|
self.microphone_playout_offset_us
|
|
.store(microphone_offset_us, Ordering::Relaxed);
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn playout_offsets(&self) -> (i64, i64) {
|
|
(
|
|
self.camera_playout_offset_us.load(Ordering::Relaxed),
|
|
self.microphone_playout_offset_us.load(Ordering::Relaxed),
|
|
)
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn activate_camera(&self) -> UpstreamStreamLease {
|
|
self.activate(UpstreamMediaKind::Camera)
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn activate_microphone(&self) -> UpstreamStreamLease {
|
|
self.activate(UpstreamMediaKind::Microphone)
|
|
}
|
|
|
|
pub async fn reserve_microphone_sink(&self, generation: u64) -> Option<OwnedSemaphorePermit> {
|
|
let permit = self
|
|
.microphone_sink_gate
|
|
.clone()
|
|
.acquire_owned()
|
|
.await
|
|
.ok()?;
|
|
self.is_microphone_active(generation).then_some(permit)
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn is_camera_active(&self, generation: u64) -> bool {
|
|
self.is_active(UpstreamMediaKind::Camera, generation)
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn is_microphone_active(&self, generation: u64) -> bool {
|
|
self.is_active(UpstreamMediaKind::Microphone, generation)
|
|
}
|
|
|
|
pub fn close_camera(&self, generation: u64) {
|
|
self.close(UpstreamMediaKind::Camera, generation);
|
|
}
|
|
|
|
pub fn close_microphone(&self, generation: u64) {
|
|
self.close(UpstreamMediaKind::Microphone, generation);
|
|
}
|
|
|
|
pub fn soft_recover_microphone(&self) {
|
|
let lease = self.activate_microphone();
|
|
self.close_microphone(lease.generation);
|
|
}
|
|
|
|
pub fn record_client_timing(&self, kind: UpstreamMediaKind, timing: UpstreamClientTiming) {
|
|
let mut state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
let sample = TimingSample {
|
|
capture_pts_us: timing.capture_pts_us,
|
|
send_pts_us: timing.send_pts_us,
|
|
queue_age_ms: timing.queue_age_ms,
|
|
received_at: Instant::now(),
|
|
};
|
|
match kind {
|
|
UpstreamMediaKind::Camera => {
|
|
state.latest_camera_timing = Some(sample);
|
|
state.latest_camera_remote_pts_us = Some(timing.capture_pts_us);
|
|
state
|
|
.camera_client_queue_age_window_ms
|
|
.push(f64::from(timing.queue_age_ms));
|
|
}
|
|
UpstreamMediaKind::Microphone => {
|
|
state.latest_microphone_timing = Some(sample);
|
|
state.latest_microphone_remote_pts_us = Some(timing.capture_pts_us);
|
|
state
|
|
.microphone_client_queue_age_window_ms
|
|
.push(f64::from(timing.queue_age_ms));
|
|
}
|
|
}
|
|
record_timing_pair(&mut state);
|
|
}
|
|
|
|
pub fn mark_audio_presented(&self, local_pts_us: u64, due_at: Instant) {
|
|
let mut state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
state.last_audio_presented_pts_us = Some(local_pts_us);
|
|
record_presentation(&mut state, UpstreamMediaKind::Microphone, due_at);
|
|
state.phase = UpstreamSyncPhase::Live;
|
|
state.last_reason = "v2 audio handed to UAC".to_string();
|
|
}
|
|
|
|
pub fn mark_video_presented(&self, local_pts_us: u64, due_at: Instant) {
|
|
let mut state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
state.last_video_presented_pts_us = Some(local_pts_us);
|
|
record_presentation(&mut state, UpstreamMediaKind::Camera, due_at);
|
|
state.phase = UpstreamSyncPhase::Live;
|
|
state.last_reason = "v2 video handed to UVC".to_string();
|
|
}
|
|
|
|
pub fn record_video_freeze(&self, reason: impl Into<String>) {
|
|
let mut state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
state.video_freezes = state.video_freezes.saturating_add(1);
|
|
state.phase = UpstreamSyncPhase::Healing;
|
|
state.last_reason = reason.into();
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn snapshot(&self) -> UpstreamPlannerSnapshot {
|
|
let state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
let now = Instant::now();
|
|
UpstreamPlannerSnapshot {
|
|
session_id: state.session_id,
|
|
phase: state.phase.as_str(),
|
|
latest_camera_remote_pts_us: state.latest_camera_remote_pts_us,
|
|
latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us,
|
|
last_video_presented_pts_us: state.last_video_presented_pts_us,
|
|
last_audio_presented_pts_us: state.last_audio_presented_pts_us,
|
|
live_lag_ms: live_lag_ms(&state),
|
|
planner_skew_ms: planner_skew_ms(&state),
|
|
stale_audio_drops: state.stale_audio_drops,
|
|
stale_video_drops: state.stale_video_drops,
|
|
skew_video_drops: state.skew_video_drops,
|
|
freshness_reanchors: state.freshness_reanchors,
|
|
startup_timeouts: state.startup_timeouts,
|
|
video_freezes: state.video_freezes,
|
|
last_reason: state.last_reason.clone(),
|
|
client_capture_skew_ms: state.latest_paired_client_capture_skew_ms,
|
|
client_send_skew_ms: state.latest_paired_client_send_skew_ms,
|
|
server_receive_skew_ms: state.latest_paired_server_receive_skew_ms,
|
|
camera_client_queue_age_ms: state
|
|
.latest_camera_timing
|
|
.map(|sample| f64::from(sample.queue_age_ms)),
|
|
microphone_client_queue_age_ms: state
|
|
.latest_microphone_timing
|
|
.map(|sample| f64::from(sample.queue_age_ms)),
|
|
camera_server_receive_age_ms: state
|
|
.latest_camera_timing
|
|
.map(|sample| age_ms(now, sample.received_at)),
|
|
microphone_server_receive_age_ms: state
|
|
.latest_microphone_timing
|
|
.map(|sample| age_ms(now, sample.received_at)),
|
|
client_capture_abs_skew_p95_ms: state.client_capture_skew_window_ms.p95_abs(),
|
|
client_send_abs_skew_p95_ms: state.client_send_skew_window_ms.p95_abs(),
|
|
server_receive_abs_skew_p95_ms: state.server_receive_skew_window_ms.p95_abs(),
|
|
camera_client_queue_age_p95_ms: state.camera_client_queue_age_window_ms.p95(),
|
|
microphone_client_queue_age_p95_ms: state.microphone_client_queue_age_window_ms.p95(),
|
|
sink_handoff_skew_ms: latest_sink_handoff_skew_ms(&state),
|
|
sink_handoff_abs_skew_p95_ms: state.sink_handoff_skew_window_ms.p95_abs(),
|
|
camera_sink_late_ms: state.latest_camera_presentation.map(presentation_late_ms),
|
|
microphone_sink_late_ms: state
|
|
.latest_microphone_presentation
|
|
.map(presentation_late_ms),
|
|
camera_sink_late_p95_ms: state.camera_sink_late_window_ms.p95(),
|
|
microphone_sink_late_p95_ms: state.microphone_sink_late_window_ms.p95(),
|
|
client_timing_window_samples: state.client_capture_skew_window_ms.len() as u64,
|
|
sink_handoff_window_samples: state.sink_handoff_skew_window_ms.len() as u64,
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option<u64> {
|
|
match self.plan_video_pts(remote_pts_us, frame_step_us) {
|
|
UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option<u64> {
|
|
match self.plan_audio_pts(remote_pts_us) {
|
|
UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision {
|
|
self.plan_legacy_pts(
|
|
UpstreamMediaKind::Camera,
|
|
remote_pts_us,
|
|
frame_step_us.max(1),
|
|
)
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision {
|
|
self.plan_legacy_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1)
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn plan_bundled_pts(
|
|
&self,
|
|
kind: UpstreamMediaKind,
|
|
remote_pts_us: u64,
|
|
min_step_us: u64,
|
|
bundle_base_remote_pts_us: u64,
|
|
bundle_epoch: Instant,
|
|
) -> UpstreamPlanDecision {
|
|
self.plan_rebased_pts(
|
|
kind,
|
|
remote_pts_us,
|
|
min_step_us.max(1),
|
|
Some(bundle_base_remote_pts_us),
|
|
Some(bundle_epoch),
|
|
)
|
|
}
|
|
|
|
pub async fn wait_for_audio_master(&self, _video_local_pts_us: u64, _due_at: Instant) -> bool {
|
|
true
|
|
}
|
|
|
|
fn activate(&self, kind: UpstreamMediaKind) -> UpstreamStreamLease {
|
|
let generation = match kind {
|
|
UpstreamMediaKind::Camera => {
|
|
self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1
|
|
}
|
|
UpstreamMediaKind::Microphone => {
|
|
self.next_microphone_generation
|
|
.fetch_add(1, Ordering::SeqCst)
|
|
+ 1
|
|
}
|
|
};
|
|
let mut state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
|
|
{
|
|
state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1;
|
|
reset_session_state(&mut state);
|
|
state.session_started_at = Some(Instant::now());
|
|
state.phase = UpstreamSyncPhase::Acquiring;
|
|
state.last_reason = "v2 upstream session acquiring media".to_string();
|
|
}
|
|
match kind {
|
|
UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation),
|
|
UpstreamMediaKind::Microphone => state.active_microphone_generation = Some(generation),
|
|
}
|
|
UpstreamStreamLease {
|
|
session_id: state.session_id,
|
|
generation,
|
|
}
|
|
}
|
|
|
|
fn is_active(&self, kind: UpstreamMediaKind, generation: u64) -> bool {
|
|
let state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
match kind {
|
|
UpstreamMediaKind::Camera => state.active_camera_generation == Some(generation),
|
|
UpstreamMediaKind::Microphone => state.active_microphone_generation == Some(generation),
|
|
}
|
|
}
|
|
|
|
fn close(&self, kind: UpstreamMediaKind, generation: u64) {
|
|
let mut state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
match kind {
|
|
UpstreamMediaKind::Camera if state.active_camera_generation == Some(generation) => {
|
|
state.active_camera_generation = None
|
|
}
|
|
UpstreamMediaKind::Microphone
|
|
if state.active_microphone_generation == Some(generation) =>
|
|
{
|
|
state.active_microphone_generation = None
|
|
}
|
|
_ => return,
|
|
}
|
|
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
|
|
{
|
|
reset_session_state(&mut state);
|
|
}
|
|
}
|
|
|
|
fn plan_legacy_pts(
|
|
&self,
|
|
kind: UpstreamMediaKind,
|
|
remote_pts_us: u64,
|
|
min_step_us: u64,
|
|
) -> UpstreamPlanDecision {
|
|
self.plan_rebased_pts(kind, remote_pts_us, min_step_us.max(1), None, None)
|
|
}
|
|
|
|
fn plan_rebased_pts(
|
|
&self,
|
|
kind: UpstreamMediaKind,
|
|
remote_pts_us: u64,
|
|
min_step_us: u64,
|
|
explicit_base: Option<u64>,
|
|
explicit_epoch: Option<Instant>,
|
|
) -> UpstreamPlanDecision {
|
|
let mut state = self
|
|
.state
|
|
.lock()
|
|
.expect("upstream media state mutex poisoned");
|
|
match kind {
|
|
UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us = Some(remote_pts_us),
|
|
UpstreamMediaKind::Microphone => {
|
|
state.latest_microphone_remote_pts_us = Some(remote_pts_us)
|
|
}
|
|
}
|
|
let base = match explicit_base {
|
|
Some(base) => *state.base_remote_pts_us.get_or_insert(base),
|
|
None => *state.base_remote_pts_us.get_or_insert(remote_pts_us),
|
|
};
|
|
let epoch = match explicit_epoch {
|
|
Some(epoch) => *state.playout_epoch.get_or_insert(epoch),
|
|
None => *state
|
|
.playout_epoch
|
|
.get_or_insert(Instant::now() + upstream_playout_delay()),
|
|
};
|
|
let mut local_pts_us = remote_pts_us.saturating_sub(base);
|
|
let last_slot = match kind {
|
|
UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us,
|
|
UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us,
|
|
};
|
|
if let Some(last_pts_us) = *last_slot
|
|
&& local_pts_us <= last_pts_us
|
|
{
|
|
local_pts_us = last_pts_us.saturating_add(min_step_us.max(1));
|
|
}
|
|
*last_slot = Some(local_pts_us);
|
|
state.phase = UpstreamSyncPhase::Syncing;
|
|
state.last_reason = "v2 legacy packet mapped without cross-stream planner".to_string();
|
|
let due_at = apply_offset(
|
|
epoch + Duration::from_micros(local_pts_us),
|
|
self.playout_offset_us(kind),
|
|
);
|
|
let late_by = Instant::now()
|
|
.checked_duration_since(due_at)
|
|
.unwrap_or_default();
|
|
UpstreamPlanDecision::Play(PlannedUpstreamPacket {
|
|
local_pts_us,
|
|
due_at,
|
|
late_by,
|
|
source_lag: Duration::ZERO,
|
|
})
|
|
}
|
|
|
|
fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 {
|
|
match kind {
|
|
UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed),
|
|
UpstreamMediaKind::Microphone => {
|
|
self.microphone_playout_offset_us.load(Ordering::Relaxed)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Default for UpstreamMediaRuntime {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
fn reset_session_state(state: &mut RuntimeState) {
|
|
state.base_remote_pts_us = None;
|
|
state.playout_epoch = None;
|
|
state.latest_camera_remote_pts_us = None;
|
|
state.latest_microphone_remote_pts_us = None;
|
|
state.last_video_local_pts_us = None;
|
|
state.last_audio_local_pts_us = None;
|
|
state.last_video_presented_pts_us = None;
|
|
state.last_audio_presented_pts_us = None;
|
|
state.latest_camera_timing = None;
|
|
state.latest_microphone_timing = None;
|
|
state.latest_camera_presentation = None;
|
|
state.latest_microphone_presentation = None;
|
|
state.latest_paired_client_capture_skew_ms = None;
|
|
state.latest_paired_client_send_skew_ms = None;
|
|
state.latest_paired_server_receive_skew_ms = None;
|
|
state.stale_audio_drops = 0;
|
|
state.stale_video_drops = 0;
|
|
state.skew_video_drops = 0;
|
|
state.freshness_reanchors = 0;
|
|
state.startup_timeouts = 0;
|
|
state.video_freezes = 0;
|
|
}
|
|
|
|
fn record_timing_pair(state: &mut RuntimeState) {
|
|
let (Some(camera), Some(microphone)) =
|
|
(state.latest_camera_timing, state.latest_microphone_timing)
|
|
else {
|
|
return;
|
|
};
|
|
let capture_skew_ms = delta_ms(microphone.capture_pts_us, camera.capture_pts_us);
|
|
let send_skew_ms = delta_ms(microphone.send_pts_us, camera.send_pts_us);
|
|
let receive_skew_ms = signed_duration_ms(microphone.received_at, camera.received_at);
|
|
state.latest_paired_client_capture_skew_ms = Some(capture_skew_ms);
|
|
state.latest_paired_client_send_skew_ms = Some(send_skew_ms);
|
|
state.latest_paired_server_receive_skew_ms = Some(receive_skew_ms);
|
|
state.client_capture_skew_window_ms.push(capture_skew_ms);
|
|
state.client_send_skew_window_ms.push(send_skew_ms);
|
|
state.server_receive_skew_window_ms.push(receive_skew_ms);
|
|
}
|
|
|
|
fn record_presentation(state: &mut RuntimeState, kind: UpstreamMediaKind, due_at: Instant) {
|
|
let sample = PresentationSample {
|
|
due_at,
|
|
handed_at: Instant::now(),
|
|
};
|
|
match kind {
|
|
UpstreamMediaKind::Camera => {
|
|
state.latest_camera_presentation = Some(sample);
|
|
state
|
|
.camera_sink_late_window_ms
|
|
.push(presentation_late_ms(sample));
|
|
}
|
|
UpstreamMediaKind::Microphone => {
|
|
state.latest_microphone_presentation = Some(sample);
|
|
state
|
|
.microphone_sink_late_window_ms
|
|
.push(presentation_late_ms(sample));
|
|
}
|
|
}
|
|
if let Some(skew) = latest_sink_handoff_skew_ms(state) {
|
|
state.sink_handoff_skew_window_ms.push(skew);
|
|
}
|
|
}
|
|
|
|
fn live_lag_ms(state: &RuntimeState) -> Option<f64> {
|
|
let latest = state
|
|
.latest_camera_remote_pts_us
|
|
.into_iter()
|
|
.chain(state.latest_microphone_remote_pts_us)
|
|
.max()?;
|
|
let base = state.base_remote_pts_us.unwrap_or(latest);
|
|
Some(latest.saturating_sub(base) as f64 / 1000.0)
|
|
}
|
|
|
|
fn planner_skew_ms(state: &RuntimeState) -> Option<f64> {
|
|
match (
|
|
state.last_audio_presented_pts_us,
|
|
state.last_video_presented_pts_us,
|
|
) {
|
|
(Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
fn latest_sink_handoff_skew_ms(state: &RuntimeState) -> Option<f64> {
|
|
let camera = state.latest_camera_presentation?;
|
|
let microphone = state.latest_microphone_presentation?;
|
|
Some(presentation_late_signed_ms(microphone) - presentation_late_signed_ms(camera))
|
|
}
|
|
|
|
fn presentation_late_ms(sample: PresentationSample) -> f64 {
|
|
presentation_late_signed_ms(sample).max(0.0)
|
|
}
|
|
|
|
fn presentation_late_signed_ms(sample: PresentationSample) -> f64 {
|
|
signed_duration_ms(sample.handed_at, sample.due_at)
|
|
}
|
|
|
|
fn age_ms(now: Instant, then: Instant) -> f64 {
|
|
now.saturating_duration_since(then).as_secs_f64() * 1000.0
|
|
}
|
|
|
|
fn signed_duration_ms(left: Instant, right: Instant) -> f64 {
|
|
if left >= right {
|
|
left.duration_since(right).as_secs_f64() * 1000.0
|
|
} else {
|
|
-(right.duration_since(left).as_secs_f64() * 1000.0)
|
|
}
|
|
}
|
|
|
|
fn delta_ms(left_us: u64, right_us: u64) -> f64 {
|
|
(left_us as i128 - right_us as i128) as f64 / 1000.0
|
|
}
|
|
|
|
fn percentile(values: impl Iterator<Item = f64>, quantile: f64) -> Option<f64> {
|
|
let mut sorted = values.filter(|value| value.is_finite()).collect::<Vec<_>>();
|
|
if sorted.is_empty() {
|
|
return None;
|
|
}
|
|
sorted.sort_by(|left, right| left.total_cmp(right));
|
|
let index = ((sorted.len() - 1) as f64 * quantile.clamp(0.0, 1.0)).ceil() as usize;
|
|
sorted.get(index).copied()
|
|
}
|
|
|
|
fn upstream_playout_delay() -> Duration {
|
|
let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS")
|
|
.ok()
|
|
.and_then(|value| value.trim().parse::<u64>().ok())
|
|
.unwrap_or(80);
|
|
Duration::from_millis(delay_ms)
|
|
}
|
|
|
|
fn playout_offset_us(kind: UpstreamMediaKind) -> i64 {
|
|
let name = match kind {
|
|
UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US",
|
|
UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US",
|
|
};
|
|
std::env::var(name)
|
|
.ok()
|
|
.and_then(|value| value.trim().parse::<i64>().ok())
|
|
.unwrap_or(match kind {
|
|
UpstreamMediaKind::Camera => FACTORY_MJPEG_VIDEO_OFFSET_US,
|
|
UpstreamMediaKind::Microphone => FACTORY_MJPEG_AUDIO_OFFSET_US,
|
|
})
|
|
}
|
|
|
|
fn apply_offset(instant: Instant, offset_us: i64) -> Instant {
|
|
if offset_us >= 0 {
|
|
instant + Duration::from_micros(offset_us as u64)
|
|
} else {
|
|
instant
|
|
.checked_sub(Duration::from_micros(offset_us.unsigned_abs()))
|
|
.unwrap_or(instant)
|
|
}
|
|
}
|