From 3011dabc9209429c558cd223dfcc6f7961341370 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 3 May 2026 12:22:33 -0300 Subject: [PATCH] feat: rebuild upstream media v2 --- AGENTS.md | 57 +- Cargo.lock | 6 +- client/Cargo.toml | 2 +- client/src/app/session_lifecycle.rs | 3 +- client/src/app/uplink_media.rs | 145 +- common/Cargo.toml | 2 +- docs/operational-env.md | 12 +- quarantine/upstream-media-v1/NOTES.md | 16 + .../server/src/upstream_media_runtime.rs | 1040 +++++++++++++ .../src/upstream_media_runtime/config.rs | 0 .../upstream_media_runtime/lease_lifecycle.rs | 0 .../src/upstream_media_runtime/state.rs | 0 .../src/upstream_media_runtime/tests.rs | 0 .../tests/async_wait.rs | 0 .../upstream_media_runtime/tests/config.rs | 0 .../upstream_media_runtime/tests/lifecycle.rs | 0 .../upstream_media_runtime/tests/planning.rs | 0 .../src/upstream_media_runtime/types.rs | 0 server/Cargo.toml | 2 +- server/src/main/relay_service.rs | 623 ++++---- server/src/main/relay_service_tests.rs | 128 +- server/src/upstream_media_runtime.rs | 1313 +++++++---------- 22 files changed, 2114 insertions(+), 1235 deletions(-) create mode 100644 quarantine/upstream-media-v1/NOTES.md create mode 100644 quarantine/upstream-media-v1/server/src/upstream_media_runtime.rs rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/config.rs (100%) rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/lease_lifecycle.rs (100%) rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/state.rs (100%) rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/tests.rs (100%) rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/tests/async_wait.rs (100%) rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/tests/config.rs (100%) rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/tests/lifecycle.rs (100%) rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/tests/planning.rs (100%) rename {server => quarantine/upstream-media-v1/server}/src/upstream_media_runtime/types.rs (100%) diff --git a/AGENTS.md b/AGENTS.md index b95f12d..6ea7b6d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,42 +1,41 @@ # Lesavka Agent Notes -## 0.18.5 Bundled Webcam A/V Migration Checklist +## 0.19.0 Upstream Media v2 Rebuild Checklist -Context: manual Google Meet and mirrored-probe testing showed the split webcam -and microphone uplink design is too fragile under real browser/device pressure. -The new product contract is: when webcam video is present, microphone audio -travels with it on one client-owned upstream media stream. The server manages -freshness and smoothness after arrival; it no longer tries to make two racing -upstream channels look synchronized. Microphone-only remains supported as the -explicit no-camera path. +Context: manual Google Meet testing showed the 0.18.x upstream media stack was +still capable of seconds-scale lag and A/V skew. Treat that implementation as +quarantined v1, not as something to tune. The v2 contract is deliberately small: +when webcam video is active, microphone audio and camera frames travel through +one client-owned bundle path; the server maps that client capture clock onto one +local epoch, applies only explicit UVC/UAC output-path offsets, and drops stale +bundles as a unit. Microphone-only remains supported as the explicit no-camera +path. ### Product Invariants - [x] Webcam-enabled sessions use one bundled upstream media RPC by default. - [x] Webcam-enabled sessions imply microphone capture when the server supports UAC. +- [x] The previous upstream media runtime/planner is quarantined under + `quarantine/upstream-media-v1/` with retained-idea notes. - [x] The UI-selected camera, camera quality, microphone, speaker, gain, and enable switches remain authoritative; defaults may not override visible UI state. - [x] Client capture timestamps are the source of A/V sync truth for webcam sessions. -- [x] Server bundled playout rebases that client timeline onto a fresh local epoch. -- [x] Server bundled playout may drop stale packets, but must not rebuild sync by - independently pairing separate camera and microphone streams. -- [x] Mic-only sessions keep the existing microphone stream path. +- [x] Server v2 playout rebases that client timeline onto a fresh local epoch. +- [x] Server v2 may drop stale bundles, but must not rebuild sync by independently + pairing separate camera and microphone streams. +- [x] Mic-only sessions keep an explicit no-camera audio path. - [x] Legacy split webcam/mic uplink is only an explicit compatibility escape hatch. - [x] Manual probes and diagnostics clearly label `bundled-webcam-media` versus `mic-only` so we never confuse the architectures during debugging. - [x] Sync protection takes precedence over freshness and smoothness: bad mixed bundle timing is dropped coherently instead of letting one side play alone. -- [x] Startup video is allowed to prime UVC if a first mixed bundle has bad - audio/video timing; the mismatched audio is dropped, preserving sync while - avoiding browser `Camera is starting` starvation. - [x] An already-attached UVC gadget descriptor is the physical browser contract: if it still advertises an older profile, server handshake/capture sizing follows that live descriptor until a controlled gadget rebuild is allowed. -- [x] Bundled webcam sessions enforce freshness before output-path compensation; - sync-critical measured UVC/UAC offsets are allowed to add presentation - delay because audio/video sync is higher priority than freshness. -- [x] Bundled webcam sessions use the shared client capture timeline for - transit sync, then apply runtime output-path calibration when splitting - into UVC/UAC so Meet sees synchronized presentation. +- [x] Bundled webcam sessions use the shared client capture timeline for transit + sync, then apply runtime output-path calibration as explicit per-device + handoff delays. +- [x] Optional common playout delay is only smoothness slack; it cannot clip or + replace sync-critical UVC/UAC offsets. ### Wire Protocol - [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or @@ -52,6 +51,12 @@ explicit no-camera path. - [x] Spawn one bundled capture/uplink task instead of separate camera and mic tasks for webcam sessions. - [x] Bundle camera frames and microphone packets into one freshness-bounded queue. +- [x] With a camera active, do not flush microphone packets as standalone upstream + bundles; trim old pending audio and emit with a video frame instead. +- [x] Add a short video/audio grace window so audio captured beside a frame has + a chance to join that frame's bundle before uplink. +- [x] Keep the microphone-only RPC running as the no-camera path even when the + server supports bundled webcam media; it yields while camera is active. - [x] Bound the pre-bundle capture handoff channel so camera/mic workers drop old events under pressure instead of building unbounded latency. - [x] Drop lag-clamped camera and microphone source buffers before bundling; @@ -70,15 +75,15 @@ explicit no-camera path. for one upstream session. - [x] Schedule bundled packets by shared client capture timestamp instead of startup-pairing independent streams. +- [x] Replace the old bundled event sorter/reanchor planner with one v2 bundle + clock and explicit per-device handoff scheduling. - [x] Sanitize packet timestamps before bundling so stale/future source PTS values cannot become the server's A/V sync truth. - [x] Make server bundled scheduling use the client capture sidecar rather than raw packet `pts`, and reset the bundled epoch on client-session changes. -- [x] Keep server freshness drops/reanchors active for bundled media. -- [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning. -- [x] Reanchor bundled playout when due times drift too far before output-path - compensation, and drop packets whose predicted pre-compensation playout - age would exceed the one-second freshness budget. +- [x] Drop bundles coherently when they are already outside the live age budget. +- [x] Drop mixed A/V bundles coherently when capture timestamps are too far apart + to represent one real capture moment. - [x] Keep bundled UVC/UAC output-path compensation authoritative; do not clip measured offsets just to improve freshness when that would break sync. - [x] Activate the camera relay before opening the microphone sink so UVC can diff --git a/Cargo.lock b/Cargo.lock index 6e6264b..035ace6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.18.5" +version = "0.19.0" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.18.5" +version = "0.19.0" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.18.5" +version = "0.19.0" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 29c7143..cd2b353 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.18.5" +version = "0.19.0" edition = "2024" [dependencies] diff --git a/client/src/app/session_lifecycle.rs b/client/src/app/session_lifecycle.rs index 6e51512..4906aa5 100644 --- a/client/src/app/session_lifecycle.rs +++ b/client/src/app/session_lifecycle.rs @@ -297,7 +297,7 @@ impl LesavkaClientApp { )); } } - if microphone_available && !bundled_webcam_media { + if microphone_available { let ep = vid_ep.clone(); let mic_telemetry = uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); @@ -307,6 +307,7 @@ impl LesavkaClientApp { initial_mic_source.clone(), mic_telemetry, media_controls, + bundled_webcam_media, )); } diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 04669b3..b875430 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -17,10 +17,8 @@ impl LesavkaClientApp { loop { let state = media_controls.refresh(); let camera_requested = state.camera; - let microphone_requested = state.microphone || state.camera; - if !camera_requested && !microphone_requested { + if !camera_requested { camera_telemetry.record_enabled(false); - microphone_telemetry.record_enabled(false); tokio::time::sleep(Duration::from_millis(100)).await; continue; } @@ -244,6 +242,7 @@ impl LesavkaClientApp { event_rx, stop, queue, + camera.is_some(), camera_telemetry, microphone_telemetry, drop_log, @@ -298,12 +297,17 @@ impl LesavkaClientApp { initial_source: Option, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, + pause_when_camera_active: bool, ) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let state = media_controls.refresh(); + if pause_when_camera_active && state.camera { + tokio::time::sleep(Duration::from_millis(100)).await; + continue; + } if !state.microphone { telemetry.record_enabled(false); tokio::time::sleep(Duration::from_millis(100)).await; @@ -410,6 +414,12 @@ impl LesavkaClientApp { let desired_source = state .microphone_source .resolve(initial_source_thread.as_deref()); + if pause_when_camera_active && state.camera { + tracing::info!( + "🎤 microphone-only uplink yielding to bundled webcam A/V" + ); + break; + } if desired_source != active_source_thread { tracing::info!( from = active_source_thread.as_deref().unwrap_or("auto"), @@ -780,8 +790,8 @@ const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = #[cfg(not(coverage))] const BUNDLED_MEDIA_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { - capacity: 16, - max_age: Duration::from_millis(350), + capacity: 4, + max_age: Duration::from_secs(1), policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, }; @@ -791,6 +801,9 @@ const BUNDLED_AUDIO_FLUSH_INTERVAL: Duration = Duration::from_millis(20); #[cfg(not(coverage))] const BUNDLED_AUDIO_MAX_PENDING: usize = 8; +#[cfg(not(coverage))] +const BUNDLED_VIDEO_AUDIO_GRACE: Duration = Duration::from_millis(30); + #[cfg(not(coverage))] const BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY: usize = 64; @@ -807,6 +820,7 @@ fn bundle_captured_media( event_rx: std::sync::mpsc::Receiver, stop: Arc, queue: crate::uplink_fresh_queue::FreshPacketQueue, + video_required: bool, camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, drop_log: Arc>, @@ -816,18 +830,39 @@ fn bundle_captured_media( .fetch_add(1, Ordering::Relaxed) .saturating_add(1); let mut bundle_seq = 0_u64; - let mut pending_audio = Vec::new(); + let mut pending_audio = Vec::::new(); + let mut pending_video = None::; + let mut pending_video_deadline = None::; let mut next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; loop { if stop.load(Ordering::Relaxed) { break; } - let timeout = next_audio_flush.saturating_duration_since(Instant::now()); + let now = Instant::now(); + let timeout = if video_required { + pending_video_deadline + .unwrap_or(now + BUNDLED_AUDIO_FLUSH_INTERVAL) + .saturating_duration_since(now) + } else { + next_audio_flush.saturating_duration_since(now) + }; match event_rx.recv_timeout(timeout) { Ok(BundledCaptureEvent::Audio(packet)) => { pending_audio.push(packet); - if pending_audio.len() >= BUNDLED_AUDIO_MAX_PENDING { + if video_required { + let dropped = retain_newest_pending_audio(&mut pending_audio); + if dropped > 0 { + microphone_telemetry.record_stale_drop(dropped as u64); + log_uplink_drop( + &drop_log, + UplinkDropReason::Stale, + dropped as u64, + pending_audio.len(), + 0.0, + ); + } + } else if pending_audio.len() >= BUNDLED_AUDIO_MAX_PENDING { emit_bundled_media( session_id, &mut bundle_seq, @@ -842,24 +877,72 @@ fn bundle_captured_media( } } Ok(BundledCaptureEvent::Video(packet)) => { - emit_bundled_media( - session_id, - &mut bundle_seq, - Some(packet), - std::mem::take(&mut pending_audio), - &queue, - &camera_telemetry, - µphone_telemetry, - &drop_log, - ); - next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; + if let Some(video) = pending_video.take() { + emit_bundled_media( + session_id, + &mut bundle_seq, + Some(video), + std::mem::take(&mut pending_audio), + &queue, + &camera_telemetry, + µphone_telemetry, + &drop_log, + ); + } + pending_video = Some(packet); + pending_video_deadline = Some(Instant::now() + BUNDLED_VIDEO_AUDIO_GRACE); + if !video_required { + emit_bundled_media( + session_id, + &mut bundle_seq, + pending_video.take(), + std::mem::take(&mut pending_audio), + &queue, + &camera_telemetry, + µphone_telemetry, + &drop_log, + ); + pending_video_deadline = None; + next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; + } } Ok(BundledCaptureEvent::Restart) => { + if pending_video.is_some() || (!video_required && !pending_audio.is_empty()) { + emit_bundled_media( + session_id, + &mut bundle_seq, + pending_video.take(), + std::mem::take(&mut pending_audio), + &queue, + &camera_telemetry, + µphone_telemetry, + &drop_log, + ); + } stop.store(true, Ordering::Relaxed); break; } Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { - if !pending_audio.is_empty() { + if video_required { + if pending_video_deadline.is_some_and(|deadline| Instant::now() >= deadline) { + emit_bundled_media( + session_id, + &mut bundle_seq, + pending_video.take(), + std::mem::take(&mut pending_audio), + &queue, + &camera_telemetry, + µphone_telemetry, + &drop_log, + ); + pending_video_deadline = None; + } else { + let dropped = retain_newest_pending_audio(&mut pending_audio); + if dropped > 0 { + microphone_telemetry.record_stale_drop(dropped as u64); + } + } + } else if !pending_audio.is_empty() { emit_bundled_media( session_id, &mut bundle_seq, @@ -876,9 +959,31 @@ fn bundle_captured_media( Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, } } + if pending_video.is_some() || (!video_required && !pending_audio.is_empty()) { + emit_bundled_media( + session_id, + &mut bundle_seq, + pending_video.take(), + std::mem::take(&mut pending_audio), + &queue, + &camera_telemetry, + µphone_telemetry, + &drop_log, + ); + } queue.close(); } +#[cfg(not(coverage))] +fn retain_newest_pending_audio(pending_audio: &mut Vec) -> usize { + if pending_audio.len() <= BUNDLED_AUDIO_MAX_PENDING { + return 0; + } + let dropped = pending_audio.len() - BUNDLED_AUDIO_MAX_PENDING; + pending_audio.drain(..dropped); + dropped +} + #[cfg(not(coverage))] fn emit_bundled_media( session_id: u64, diff --git a/common/Cargo.toml b/common/Cargo.toml index 2f3bac6..79ba30a 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.18.5" +version = "0.19.0" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index 803a017..a520ba9 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -247,18 +247,18 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_TOUCHPAD_SCALE` | input routing/clipboard override | | `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | -| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | legacy/split server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | +| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream output-path override; v2 uses it as the explicit UAC handoff delay relative to the shared client capture clock | | `LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS` | server upstream sync override; how long video may wait past its nominal due time for UAC audio to reach the matching timestamp, defaults to `350` | -| `LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US` | bundled webcam server playout override; defaults to the active runtime audio output-path calibration when unset; sync-critical measured offsets are not clipped for freshness | -| `LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS` | bundled webcam jitter buffer before output-path compensation; defaults to `350` to protect smooth synced playout | -| `LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US` | bundled webcam server playout override; defaults to the active runtime video output-path calibration when unset; sync-critical measured offsets are not clipped for freshness | -| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` and is capped at `1000` | +| `LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS` | compatibility alias for `LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS` | +| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | compatibility alias for `LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS` | | `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` | | `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` | | `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` | | `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` | | `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | -| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | legacy/split server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch, defaults to `1090000` for measured MJPEG/UVC browser-visible sync compensation | +| `LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS` | v2 bundled webcam freshness ceiling; bundles already older than this are dropped as one unit, defaults to `1000` | +| `LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS` | v2 optional common playout slack after sync offsets; defaults to `20` and is reduced when needed to protect the live-age budget | +| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream output-path override; v2 uses it as the explicit UVC handoff delay relative to the shared client capture clock, defaults to the calibrated MJPEG/UVC offset | | `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override | | `LESAVKA_UPLINK_MIC_LEVEL` | client media capture/playback override | | `LESAVKA_INSTALL_UVC_CODEC` | installer override; sets the persisted default UVC webcam codec in `/etc/lesavka/server.env` and `/etc/lesavka/uvc.env` | diff --git a/quarantine/upstream-media-v1/NOTES.md b/quarantine/upstream-media-v1/NOTES.md new file mode 100644 index 0000000..f121165 --- /dev/null +++ b/quarantine/upstream-media-v1/NOTES.md @@ -0,0 +1,16 @@ +# Upstream Media v1 Quarantine + +This folder preserves the old upstream media planner for reference only. It is intentionally outside the active Cargo module tree. + +Good ideas worth reusing later, after v2 is stable: +- Explicit client timing sidecars for capture/send/queue age. +- Diagnostics windows for capture skew, send skew, receive skew, and sink handoff skew. +- Single-owner leases for camera and microphone streams. +- A single UAC sink permit so reconnects do not double-open ALSA. + +Reasons this was quarantined: +- Bundled A/V was unpacked into independent events and scheduled one by one. +- Freshness recovery, output compensation, startup pairing, and per-kind drops interacted in ways that could create delay or asymmetry. +- The active bundled path was too complex to reason about while debugging real Google Meet lip sync. + +The v2 active path must stay small: when video is present, audio and video travel in one bundle and the server releases that bundle coherently. Sync first, freshness second, smoothness third. diff --git a/quarantine/upstream-media-v1/server/src/upstream_media_runtime.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime.rs new file mode 100644 index 0000000..4e3b918 --- /dev/null +++ b/quarantine/upstream-media-v1/server/src/upstream_media_runtime.rs @@ -0,0 +1,1040 @@ +#![forbid(unsafe_code)] + +use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; +use tokio::time::Instant; +use tracing::info; + +mod config; +mod state; +mod types; + +use config::{ + apply_playout_offset, upstream_audio_master_wait_grace, upstream_bundled_playout_delay, + upstream_bundled_playout_offset_override_us, upstream_camera_startup_grace_us, + upstream_max_live_lag, upstream_pairing_master_slack, upstream_playout_delay, + upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_require_paired_startup, + upstream_startup_timeout, upstream_timing_trace_enabled, +}; +use state::{UpstreamClockState, UpstreamSyncPhase}; +pub use types::{ + PlannedUpstreamPacket, UpstreamClientTiming, UpstreamMediaKind, UpstreamPlanDecision, + UpstreamPlannerSnapshot, UpstreamStreamLease, +}; + +/// Coordinate upstream stream ownership and keep audio/video on one timeline. +/// +/// Inputs: stream-open/close events plus remote packet timestamps. +/// Outputs: active-stream leases and rebased local PTS values. +/// Why: live calls need one current webcam owner, one current microphone owner, +/// and one shared media clock so reconnects do not leave old sinks alive or let +/// audio/video drift onto separate timing islands. +#[derive(Debug)] +pub struct UpstreamMediaRuntime { + next_session_id: AtomicU64, + next_camera_generation: AtomicU64, + next_microphone_generation: AtomicU64, + microphone_sink_gate: Arc, + pairing_state_notify: Arc, + audio_progress_notify: Arc, + camera_playout_offset_us: AtomicI64, + microphone_playout_offset_us: AtomicI64, + state: Mutex, +} + +impl UpstreamMediaRuntime { + /// Build an empty upstream runtime. + #[must_use] + pub fn new() -> Self { + Self { + next_session_id: AtomicU64::new(0), + next_camera_generation: AtomicU64::new(0), + next_microphone_generation: AtomicU64::new(0), + microphone_sink_gate: Arc::new(Semaphore::new(1)), + pairing_state_notify: Arc::new(Notify::new()), + audio_progress_notify: Arc::new(Notify::new()), + camera_playout_offset_us: AtomicI64::new(upstream_playout_offset_us( + UpstreamMediaKind::Camera, + )), + microphone_playout_offset_us: AtomicI64::new(upstream_playout_offset_us( + UpstreamMediaKind::Microphone, + )), + state: Mutex::new(UpstreamClockState::default()), + } + } + + /// Apply live upstream playout offsets without restarting the relay. + pub fn set_playout_offsets(&self, camera_offset_us: i64, microphone_offset_us: i64) { + self.camera_playout_offset_us + .store(camera_offset_us, Ordering::Relaxed); + self.microphone_playout_offset_us + .store(microphone_offset_us, Ordering::Relaxed); + } + + /// Return `(camera_offset_us, microphone_offset_us)` currently used for live playout. + #[must_use] + pub fn playout_offsets(&self) -> (i64, i64) { + ( + self.camera_playout_offset_us.load(Ordering::Relaxed), + self.microphone_playout_offset_us.load(Ordering::Relaxed), + ) + } + + fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { + match kind { + UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed), + UpstreamMediaKind::Microphone => { + self.microphone_playout_offset_us.load(Ordering::Relaxed) + } + } + } + + fn positive_audio_delay_allowance_us(&self) -> u64 { + let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed); + let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed); + microphone_offset_us.saturating_sub(camera_offset_us).max(0) as u64 + } + + fn audio_ahead_video_allowance_us(&self) -> u64 { + let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed); + let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed); + camera_offset_us.saturating_sub(microphone_offset_us).max(0) as u64 + } + + fn intentional_future_wait_allowance_us(&self, kind: UpstreamMediaKind) -> u64 { + match kind { + UpstreamMediaKind::Camera => self.audio_ahead_video_allowance_us(), + UpstreamMediaKind::Microphone => self.positive_audio_delay_allowance_us(), + } + } + + fn raw_bundled_playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { + upstream_bundled_playout_offset_override_us(kind) + .unwrap_or_else(|| self.playout_offset_us(kind)) + } + + fn bundled_playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { + self.raw_bundled_playout_offset_us(kind) + } + + /// Mark one audio chunk as actually handed to the UAC sink. + pub fn mark_audio_presented(&self, local_pts_us: u64, due_at: Instant) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + record_presentation_sample(&mut state, UpstreamMediaKind::Microphone, due_at); + state.last_audio_presented_pts_us = Some(local_pts_us); + if state.phase != UpstreamSyncPhase::Failed { + state.phase = UpstreamSyncPhase::Live; + state.last_reason = "audio-master playhead flowing".to_string(); + } + self.audio_progress_notify.notify_waiters(); + } + + /// Record client-side timing facts for one packet as it arrives at the server. + pub fn record_client_timing(&self, kind: UpstreamMediaKind, timing: UpstreamClientTiming) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + let sample = state::UpstreamTimingSample { + capture_pts_us: timing.capture_pts_us, + send_pts_us: timing.send_pts_us, + queue_age_ms: timing.queue_age_ms, + received_at: Instant::now(), + }; + match kind { + UpstreamMediaKind::Camera => { + state.latest_camera_timing = Some(sample); + push_timing_sample(&mut state.recent_camera_timing, sample); + state + .camera_client_queue_age_window_ms + .push(f64::from(timing.queue_age_ms)); + } + UpstreamMediaKind::Microphone => { + state.latest_microphone_timing = Some(sample); + push_timing_sample(&mut state.recent_microphone_timing, sample); + state + .microphone_client_queue_age_window_ms + .push(f64::from(timing.queue_age_ms)); + } + } + record_client_timing_windows(&mut state, kind, sample); + } + + /// Mark one video frame as actually handed to the UVC/HDMI sink. + pub fn mark_video_presented(&self, local_pts_us: u64, due_at: Instant) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + record_presentation_sample(&mut state, UpstreamMediaKind::Camera, due_at); + state.last_video_presented_pts_us = Some(local_pts_us); + if state.phase != UpstreamSyncPhase::Failed { + state.phase = UpstreamSyncPhase::Live; + state.last_reason = "video follower emitted a synced frame".to_string(); + } + } + + /// Record that video intentionally froze instead of showing an out-of-sync frame. + pub fn record_video_freeze(&self, reason: impl Into) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + state.video_freezes = state.video_freezes.saturating_add(1); + if state.phase != UpstreamSyncPhase::Failed { + state.phase = UpstreamSyncPhase::Healing; + } + state.last_reason = reason.into(); + } + + /// Return current planner facts for diagnostics and probe artifacts. + #[must_use] + pub fn snapshot(&self) -> UpstreamPlannerSnapshot { + let state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + let live_lag_ms = live_lag_us(&state).map(us_to_ms); + let planner_skew_ms = match ( + state.last_audio_presented_pts_us, + state.last_video_presented_pts_us, + ) { + (Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0), + _ => None, + }; + let now = Instant::now(); + let client_capture_skew_ms = state.latest_paired_client_capture_skew_ms; + let client_send_skew_ms = state.latest_paired_client_send_skew_ms; + let server_receive_skew_ms = state.latest_paired_server_receive_skew_ms; + UpstreamPlannerSnapshot { + session_id: state.session_id, + phase: state.phase.as_str(), + latest_camera_remote_pts_us: state.latest_camera_remote_pts_us, + latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us, + last_video_presented_pts_us: state.last_video_presented_pts_us, + last_audio_presented_pts_us: state.last_audio_presented_pts_us, + live_lag_ms, + planner_skew_ms, + stale_audio_drops: state.stale_audio_drops, + stale_video_drops: state.stale_video_drops, + skew_video_drops: state.skew_video_drops, + freshness_reanchors: state.freshness_reanchors, + startup_timeouts: state.startup_timeouts, + video_freezes: state.video_freezes, + last_reason: state.last_reason.clone(), + client_capture_skew_ms, + client_send_skew_ms, + server_receive_skew_ms, + camera_client_queue_age_ms: state + .latest_camera_timing + .map(|sample| f64::from(sample.queue_age_ms)), + microphone_client_queue_age_ms: state + .latest_microphone_timing + .map(|sample| f64::from(sample.queue_age_ms)), + camera_server_receive_age_ms: state.latest_camera_timing.map(|sample| { + now.saturating_duration_since(sample.received_at) + .as_secs_f64() + * 1000.0 + }), + microphone_server_receive_age_ms: state.latest_microphone_timing.map(|sample| { + now.saturating_duration_since(sample.received_at) + .as_secs_f64() + * 1000.0 + }), + client_capture_abs_skew_p95_ms: state.client_capture_skew_window_ms.p95_abs(), + client_send_abs_skew_p95_ms: state.client_send_skew_window_ms.p95_abs(), + server_receive_abs_skew_p95_ms: state.server_receive_skew_window_ms.p95_abs(), + camera_client_queue_age_p95_ms: state.camera_client_queue_age_window_ms.p95(), + microphone_client_queue_age_p95_ms: state.microphone_client_queue_age_window_ms.p95(), + sink_handoff_skew_ms: latest_sink_handoff_skew_ms(&state), + sink_handoff_abs_skew_p95_ms: state.sink_handoff_skew_window_ms.p95_abs(), + camera_sink_late_ms: state.latest_camera_presentation.map(presentation_late_ms), + microphone_sink_late_ms: state + .latest_microphone_presentation + .map(presentation_late_ms), + camera_sink_late_p95_ms: state.camera_sink_late_window_ms.p95(), + microphone_sink_late_p95_ms: state.microphone_sink_late_window_ms.p95(), + client_timing_window_samples: state.client_capture_skew_window_ms.len() as u64, + sink_handoff_window_samples: state.sink_handoff_skew_window_ms.len() as u64, + } + } +} + +include!("upstream_media_runtime/lease_lifecycle.rs"); + +impl UpstreamMediaRuntime { + /// Rebase one upstream video packet timestamp onto the shared session clock. + #[must_use] + pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option { + match self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) { + UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), + _ => None, + } + } + + /// Rebase one upstream audio packet timestamp onto the shared session clock. + #[must_use] + pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option { + match self.plan_audio_pts(remote_pts_us) { + UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), + _ => None, + } + } + + /// Rebase and schedule one upstream video packet on the shared playout epoch. + #[must_use] + pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision { + self.plan_pts( + UpstreamMediaKind::Camera, + remote_pts_us, + frame_step_us.max(1), + ) + } + + /// Rebase and schedule one upstream audio packet on the shared playout epoch. + #[must_use] + pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision { + self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) + } + + /// Schedule a packet from the bundled webcam/microphone transport. + /// + /// Inputs: the media kind, client capture timestamp, packet cadence floor, + /// and the client-owned bundle epoch chosen for this gRPC stream. + /// Outputs: the server playout decision for that packet. + /// Why: bundled webcam media has already been synchronized on the client, + /// so the server should not re-solve cross-stream startup pairing. It only + /// rebases the shared client clock onto a fresh local playout epoch. + #[must_use] + pub fn plan_bundled_pts( + &self, + kind: UpstreamMediaKind, + remote_pts_us: u64, + min_step_us: u64, + bundle_base_remote_pts_us: u64, + bundle_epoch: Instant, + ) -> UpstreamPlanDecision { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + let session_id = state.session_id; + match kind { + UpstreamMediaKind::Camera => { + state.camera_packet_count = state.camera_packet_count.saturating_add(1); + state + .first_camera_remote_pts_us + .get_or_insert(remote_pts_us); + state.camera_startup_ready = true; + } + UpstreamMediaKind::Microphone => { + state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); + state + .first_microphone_remote_pts_us + .get_or_insert(remote_pts_us); + } + } + update_latest_remote_pts(&mut state, kind, remote_pts_us); + if state.session_base_remote_pts_us.is_none() { + state.session_base_remote_pts_us = Some(bundle_base_remote_pts_us); + state.playout_epoch = Some(bundle_epoch); + state.pairing_anchor_deadline = Some(bundle_epoch); + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "client-bundled upstream media epoch established".to_string(); + self.pairing_state_notify.notify_waiters(); + info!( + session_id, + bundle_base_remote_pts_us, "client-bundled upstream media epoch established" + ); + } + + let session_base_remote_pts_us = state + .session_base_remote_pts_us + .unwrap_or(bundle_base_remote_pts_us); + if remote_pts_us < session_base_remote_pts_us { + return UpstreamPlanDecision::DropBeforeOverlap; + } + + let max_live_lag = upstream_max_live_lag(); + let source_lag = source_lag_for_kind(&state, kind, remote_pts_us); + if source_lag > max_live_lag { + match kind { + UpstreamMediaKind::Camera => { + state.stale_video_drops = state.stale_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.last_reason = + "dropped stale bundled video beyond max live lag".to_string(); + } + UpstreamMediaKind::Microphone => { + state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); + state.last_reason = + "dropped stale bundled audio beyond max live lag".to_string(); + } + } + state.phase = UpstreamSyncPhase::Healing; + return UpstreamPlanDecision::DropStale("bundled packet exceeded max live lag"); + } + + let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); + let last_slot = match kind { + UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, + UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, + }; + if let Some(last_pts_us) = *last_slot + && local_pts_us <= last_pts_us + { + local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); + } + *last_slot = Some(local_pts_us); + + let sink_offset_us = self.bundled_playout_offset_us(kind); + let epoch = state.playout_epoch.unwrap_or(bundle_epoch); + let mut due_at = + apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); + let now = Instant::now(); + let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); + let playout_delay = upstream_bundled_playout_delay().min(max_live_lag); + let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); + let max_future_wait = max_live_lag.saturating_sub(source_lag); + let output_offset = if sink_offset_us >= 0 { + Duration::from_micros(sink_offset_us as u64) + } else { + Duration::ZERO + }; + let due_future_wait = due_at.saturating_duration_since(now); + let due_future_wait_before_output_compensation = + due_future_wait.saturating_sub(output_offset); + if late_by > reanchor_threshold + || due_future_wait_before_output_compensation > max_future_wait + { + let desired_delay = playout_delay.min(max_future_wait); + let desired_due_at = now + desired_delay; + let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); + let recovered_epoch = unoffset_due_at + .checked_sub(Duration::from_micros(local_pts_us)) + .unwrap_or(unoffset_due_at); + state.playout_epoch = Some(recovered_epoch); + state.pairing_anchor_deadline = Some(desired_due_at); + state.freshness_reanchors = state.freshness_reanchors.saturating_add(1); + state.phase = UpstreamSyncPhase::Healing; + state.last_reason = + "reanchored bundled upstream playhead to preserve freshness".to_string(); + due_at = apply_playout_offset( + recovered_epoch + Duration::from_micros(local_pts_us), + sink_offset_us, + ); + late_by = now.checked_duration_since(due_at).unwrap_or_default(); + info!( + session_id, + ?kind, + local_pts_us, + remote_pts_us, + recovery_buffer_ms = desired_delay.as_millis(), + max_live_lag_ms = max_live_lag.as_millis(), + source_lag_ms = source_lag.as_millis(), + output_offset_ms = output_offset.as_millis(), + "bundled upstream media playhead reanchored to preserve freshness" + ); + } + let predicted_lag_before_output_compensation = source_lag.saturating_add( + due_at + .saturating_duration_since(now) + .saturating_sub(output_offset), + ); + if predicted_lag_before_output_compensation > max_live_lag { + match kind { + UpstreamMediaKind::Camera => { + state.stale_video_drops = state.stale_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.last_reason = "dropped bundled video that would exceed max live lag before output compensation".to_string(); + } + UpstreamMediaKind::Microphone => { + state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); + state.last_reason = "dropped bundled audio that would exceed max live lag before output compensation".to_string(); + } + } + state.phase = UpstreamSyncPhase::Healing; + return UpstreamPlanDecision::DropStale( + "bundled packet would exceed max live lag before output compensation", + ); + } + + if kind == UpstreamMediaKind::Microphone { + self.audio_progress_notify.notify_waiters(); + } + UpstreamPlanDecision::Play(PlannedUpstreamPacket { + local_pts_us, + due_at, + late_by, + source_lag, + }) + } + + /// Hold video until the audio master has at least reached the same capture + /// moment, or until the bounded sync grace is exhausted. + pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { + let slack_us = upstream_pairing_master_slack() + .as_micros() + .min(u64::MAX as u128) as u64; + let audio_delay_allowance_us = self.positive_audio_delay_allowance_us(); + let deadline = due_at + upstream_audio_master_wait_grace(); + loop { + let notified = self.audio_progress_notify.notified(); + { + let state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + if state.active_microphone_generation.is_none() { + return true; + } + let audio_presented_pts_us = state.last_audio_presented_pts_us.unwrap_or(0); + if audio_presented_pts_us + .saturating_add(slack_us) + .saturating_add(audio_delay_allowance_us) + >= video_local_pts_us + { + return true; + } + } + if Instant::now() >= deadline { + return false; + } + tokio::select! { + _ = notified => {} + _ = tokio::time::sleep_until(deadline) => return false, + } + } + } + + fn plan_pts( + &self, + kind: UpstreamMediaKind, + remote_pts_us: u64, + min_step_us: u64, + ) -> UpstreamPlanDecision { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + let session_id = state.session_id; + let packet_count = match kind { + UpstreamMediaKind::Camera => { + state.camera_packet_count = state.camera_packet_count.saturating_add(1); + state.camera_packet_count + } + UpstreamMediaKind::Microphone => { + state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); + state.microphone_packet_count + } + }; + update_latest_remote_pts(&mut state, kind, remote_pts_us); + let mut first_remote_for_kind = match kind { + UpstreamMediaKind::Camera => { + let first_slot = &mut state.first_camera_remote_pts_us; + *first_slot.get_or_insert(remote_pts_us) + } + UpstreamMediaKind::Microphone => { + let first_slot = &mut state.first_microphone_remote_pts_us; + *first_slot.get_or_insert(remote_pts_us) + } + }; + if kind == UpstreamMediaKind::Camera { + let startup_grace_us = upstream_camera_startup_grace_us(); + if !state.camera_startup_ready + && (startup_grace_us == 0 + || remote_pts_us.saturating_sub(first_remote_for_kind) >= startup_grace_us) + { + state.camera_startup_ready = true; + state.first_camera_remote_pts_us = Some(remote_pts_us); + first_remote_for_kind = remote_pts_us; + } + } + let now = Instant::now(); + let pairing_deadline = *state + .pairing_anchor_deadline + .get_or_insert_with(|| now + upstream_playout_delay()); + let playout_delay = upstream_playout_delay(); + let max_live_lag = upstream_max_live_lag(); + + if state.session_base_remote_pts_us.is_none() { + if state.session_started_at.is_some_and(|started_at| { + now.saturating_duration_since(started_at) > upstream_startup_timeout() + }) { + state.phase = UpstreamSyncPhase::Failed; + state.startup_timeouts = state.startup_timeouts.saturating_add(1); + state.last_reason = + "paired upstream startup did not converge before timeout".to_string(); + return UpstreamPlanDecision::StartupFailed( + "paired upstream startup did not converge before timeout", + ); + } + if state.first_camera_remote_pts_us.is_some() + && state.first_microphone_remote_pts_us.is_some() + && state.camera_startup_ready + { + let first_camera_remote_pts_us = + state.first_camera_remote_pts_us.unwrap_or_default(); + let first_microphone_remote_pts_us = + state.first_microphone_remote_pts_us.unwrap_or_default(); + state.session_base_remote_pts_us = + Some(first_camera_remote_pts_us.max(first_microphone_remote_pts_us)); + let overlap_epoch = now + playout_delay; + state.playout_epoch = Some(overlap_epoch); + state.pairing_anchor_deadline = Some(overlap_epoch); + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "fresh audio/video overlap anchor established".to_string(); + if !state.startup_anchor_logged { + let startup_delta_us = + first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128; + info!( + session_id, + first_camera_remote_pts_us, + first_microphone_remote_pts_us, + overlap_base_remote_pts_us = + state.session_base_remote_pts_us.unwrap_or_default(), + startup_delta_us, + "upstream media overlap anchors established" + ); + state.startup_anchor_logged = true; + } + self.pairing_state_notify.notify_waiters(); + } else if now < pairing_deadline { + state.phase = UpstreamSyncPhase::Acquiring; + state.last_reason = "awaiting both upstream media streams".to_string(); + if upstream_timing_trace_enabled() + && (packet_count <= 10 || packet_count.is_multiple_of(300)) + { + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + wait_ms = pairing_deadline.saturating_duration_since(now).as_millis(), + "upstream media packet buffered while awaiting the counterpart stream" + ); + } + return UpstreamPlanDecision::AwaitingPair; + } else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready { + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "camera startup warm-up is still in progress".to_string(); + if upstream_timing_trace_enabled() + && (packet_count <= 10 || packet_count.is_multiple_of(300)) + { + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + "upstream media packet buffered while camera startup warm-up is still in progress" + ); + } + return UpstreamPlanDecision::AwaitingPair; + } else if upstream_require_paired_startup() { + let refreshed = refresh_unpaired_pairing_anchor( + &mut state, + kind, + remote_pts_us, + now + playout_delay, + ); + if refreshed || upstream_timing_trace_enabled() { + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + refreshed_anchor = refreshed, + healing_window_ms = playout_delay.as_millis(), + "upstream media pairing window expired; holding one-sided stream for synced startup" + ); + } + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "holding one-sided stream for synced startup".to_string(); + return UpstreamPlanDecision::AwaitingPair; + } else { + let single_stream_base_remote_pts_us = match kind { + UpstreamMediaKind::Camera => { + state.first_camera_remote_pts_us.unwrap_or(remote_pts_us) + } + UpstreamMediaKind::Microphone => state + .first_microphone_remote_pts_us + .unwrap_or(remote_pts_us), + }; + state.session_base_remote_pts_us = Some(single_stream_base_remote_pts_us); + let one_sided_epoch = now + playout_delay; + state.playout_epoch = Some(one_sided_epoch); + state.pairing_anchor_deadline = Some(one_sided_epoch); + info!( + session_id, + ?kind, + single_stream_base_remote_pts_us, + "upstream media pairing window expired; continuing with one-sided playout" + ); + self.pairing_state_notify.notify_waiters(); + } + } + + let session_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or(remote_pts_us); + if remote_pts_us < session_base_remote_pts_us { + if upstream_timing_trace_enabled() + && (packet_count <= 10 || packet_count.is_multiple_of(300)) + { + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + session_base_remote_pts_us, + "upstream media packet dropped before the shared overlap base" + ); + } + return UpstreamPlanDecision::DropBeforeOverlap; + } + + let source_lag = source_lag_for_kind(&state, kind, remote_pts_us); + if source_lag > max_live_lag { + match kind { + UpstreamMediaKind::Camera => { + state.stale_video_drops = state.stale_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.last_reason = "dropped stale video beyond max live lag".to_string(); + } + UpstreamMediaKind::Microphone => { + state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); + state.last_reason = "dropped stale audio beyond max live lag".to_string(); + } + } + state.phase = UpstreamSyncPhase::Healing; + return UpstreamPlanDecision::DropStale("packet exceeded max live lag"); + } + + let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); + let last_slot = match kind { + UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, + UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, + }; + if let Some(last_pts_us) = *last_slot + && local_pts_us <= last_pts_us + { + local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); + } + *last_slot = Some(local_pts_us); + let audio_ahead_video_allowance_us = self.audio_ahead_video_allowance_us(); + if kind == UpstreamMediaKind::Camera + && state + .last_audio_presented_pts_us + .is_some_and(|audio_pts_us| { + video_is_too_far_behind_audio( + local_pts_us, + audio_pts_us, + audio_ahead_video_allowance_us, + ) + }) + { + state.skew_video_drops = state.skew_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.phase = UpstreamSyncPhase::Healing; + state.last_reason = + "dropped video frame that was too far behind the audio master".to_string(); + return UpstreamPlanDecision::DropStale("video frame was too far behind audio master"); + } + let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); + let sink_offset_us = self.playout_offset_us(kind); + let playout_delay = upstream_playout_delay().min(max_live_lag); + let mut due_at = + apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); + let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); + let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); + let intentional_future_wait_allowance = + Duration::from_micros(self.intentional_future_wait_allowance_us(kind)); + let max_future_wait = max_live_lag + .saturating_sub(source_lag) + .saturating_add(intentional_future_wait_allowance); + let due_future_wait = due_at.saturating_duration_since(now); + if late_by > reanchor_threshold || due_future_wait > max_future_wait { + let old_late_by = late_by; + let old_future_wait = due_future_wait; + let desired_delay = playout_delay.min(max_future_wait); + let desired_due_at = now + desired_delay; + let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); + let recovered_epoch = unoffset_due_at + .checked_sub(Duration::from_micros(local_pts_us)) + .unwrap_or(unoffset_due_at); + state.playout_epoch = Some(recovered_epoch); + state.pairing_anchor_deadline = Some(desired_due_at); + state.freshness_reanchors = state.freshness_reanchors.saturating_add(1); + state.phase = UpstreamSyncPhase::Healing; + state.last_reason = "reanchored upstream playhead to preserve freshness".to_string(); + due_at = apply_playout_offset( + recovered_epoch + Duration::from_micros(local_pts_us), + sink_offset_us, + ); + late_by = now.checked_duration_since(due_at).unwrap_or_default(); + info!( + session_id, + ?kind, + packet_count, + local_pts_us, + remote_pts_us, + old_late_by_ms = old_late_by.as_millis(), + old_future_wait_ms = old_future_wait.as_millis(), + recovery_buffer_ms = playout_delay.as_millis(), + reanchor_threshold_ms = reanchor_threshold.as_millis(), + max_live_lag_ms = max_live_lag.as_millis(), + source_lag_ms = source_lag.as_millis(), + "upstream media playhead reanchored to preserve freshness" + ); + } + let predicted_lag_at_playout = + source_lag.saturating_add(due_at.saturating_duration_since(now)); + if predicted_lag_at_playout > max_live_lag.saturating_add(intentional_future_wait_allowance) + { + match kind { + UpstreamMediaKind::Camera => { + state.stale_video_drops = state.stale_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.last_reason = + "dropped video that would exceed max live lag at playout".to_string(); + } + UpstreamMediaKind::Microphone => { + state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); + state.last_reason = + "dropped audio that would exceed max live lag at playout".to_string(); + } + } + state.phase = UpstreamSyncPhase::Healing; + return UpstreamPlanDecision::DropStale("packet would exceed max live lag at playout"); + } + if upstream_timing_trace_enabled() + && (packet_count <= 10 || packet_count.is_multiple_of(300)) + { + let playout_delay_us = due_at.saturating_duration_since(now).as_micros(); + let late_by_us = late_by.as_micros(); + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + session_base_remote_pts_us, + first_remote_for_kind, + remote_elapsed_us = remote_pts_us.saturating_sub(session_base_remote_pts_us), + local_pts_us, + playout_delay_us, + sink_offset_us, + late_by_us, + source_lag_us = source_lag.as_micros(), + "upstream media rebase sample" + ); + } + if kind == UpstreamMediaKind::Microphone { + self.audio_progress_notify.notify_waiters(); + } + UpstreamPlanDecision::Play(PlannedUpstreamPacket { + local_pts_us, + due_at, + late_by, + source_lag, + }) + } +} + +fn update_latest_remote_pts( + state: &mut UpstreamClockState, + kind: UpstreamMediaKind, + remote_pts_us: u64, +) { + let slot = match kind { + UpstreamMediaKind::Camera => &mut state.latest_camera_remote_pts_us, + UpstreamMediaKind::Microphone => &mut state.latest_microphone_remote_pts_us, + }; + *slot = Some((*slot).unwrap_or(remote_pts_us).max(remote_pts_us)); +} + +fn source_lag_for_kind( + state: &UpstreamClockState, + kind: UpstreamMediaKind, + remote_pts_us: u64, +) -> Duration { + let latest = match kind { + UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us, + UpstreamMediaKind::Microphone => state.latest_microphone_remote_pts_us, + } + .unwrap_or(remote_pts_us); + Duration::from_micros(latest.saturating_sub(remote_pts_us)) +} + +fn video_is_too_far_behind_audio( + video_pts_us: u64, + audio_pts_us: u64, + audio_ahead_video_allowance_us: u64, +) -> bool { + let slack_us = (upstream_pairing_master_slack() + .as_micros() + .min(u64::MAX as u128) as u64) + .saturating_add(audio_ahead_video_allowance_us); + video_pts_us.saturating_add(slack_us) < audio_pts_us +} + +fn live_lag_us(state: &UpstreamClockState) -> Option { + let latest_audio = state.latest_microphone_remote_pts_us?; + let audio_playhead = state.last_audio_presented_pts_us?; + let base = state.session_base_remote_pts_us?; + Some(latest_audio.saturating_sub(base.saturating_add(audio_playhead))) +} + +fn us_to_ms(value: u64) -> f64 { + value as f64 / 1000.0 +} + +fn instant_delta_us(left: Instant, right: Instant) -> i128 { + if left >= right { + left.saturating_duration_since(right).as_micros() as i128 + } else { + -(right.saturating_duration_since(left).as_micros() as i128) + } +} + +const CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US: u64 = 250_000; + +fn push_timing_sample( + samples: &mut std::collections::VecDeque, + sample: state::UpstreamTimingSample, +) { + if samples.len() >= state::TIMING_WINDOW_CAPACITY { + samples.pop_front(); + } + samples.push_back(sample); +} + +fn abs_delta_us(left: u64, right: u64) -> u64 { + left.max(right) - left.min(right) +} + +fn nearest_timing_sample_by_send( + samples: &std::collections::VecDeque, + send_pts_us: u64, +) -> Option { + samples + .iter() + .copied() + .min_by_key(|sample| abs_delta_us(sample.send_pts_us, send_pts_us)) +} + +fn record_client_timing_windows( + state: &mut UpstreamClockState, + kind: UpstreamMediaKind, + sample: state::UpstreamTimingSample, +) { + let paired = match kind { + UpstreamMediaKind::Camera => { + nearest_timing_sample_by_send(&state.recent_microphone_timing, sample.send_pts_us) + .map(|microphone| (sample, microphone)) + } + UpstreamMediaKind::Microphone => { + nearest_timing_sample_by_send(&state.recent_camera_timing, sample.send_pts_us) + .map(|camera| (camera, sample)) + } + }; + let Some((camera, microphone)) = paired else { + return; + }; + if abs_delta_us(camera.send_pts_us, microphone.send_pts_us) + > CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US + { + return; + } + let client_capture_skew_ms = + (camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0; + let client_send_skew_ms = + (camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0; + let server_receive_skew_ms = + instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0; + + state.latest_paired_client_capture_skew_ms = Some(client_capture_skew_ms); + state.latest_paired_client_send_skew_ms = Some(client_send_skew_ms); + state.latest_paired_server_receive_skew_ms = Some(server_receive_skew_ms); + state + .client_capture_skew_window_ms + .push(client_capture_skew_ms); + state.client_send_skew_window_ms.push(client_send_skew_ms); + state + .server_receive_skew_window_ms + .push(server_receive_skew_ms); +} + +fn record_presentation_sample( + state: &mut UpstreamClockState, + kind: UpstreamMediaKind, + due_at: Instant, +) { + let sample = state::UpstreamPresentationSample { + due_at, + handed_at: Instant::now(), + }; + let late_ms = presentation_late_ms(sample).max(0.0); + match kind { + UpstreamMediaKind::Camera => { + state.latest_camera_presentation = Some(sample); + state.camera_sink_late_window_ms.push(late_ms); + } + UpstreamMediaKind::Microphone => { + state.latest_microphone_presentation = Some(sample); + state.microphone_sink_late_window_ms.push(late_ms); + } + } + if let Some(skew_ms) = latest_sink_handoff_skew_ms(state) { + state.sink_handoff_skew_window_ms.push(skew_ms); + } +} + +fn latest_sink_handoff_skew_ms(state: &UpstreamClockState) -> Option { + let (Some(camera), Some(microphone)) = ( + state.latest_camera_presentation, + state.latest_microphone_presentation, + ) else { + return None; + }; + let due_at_delta_ms = instant_delta_us(camera.due_at, microphone.due_at).abs() as f64 / 1000.0; + if due_at_delta_ms > 250.0 { + return None; + } + Some(instant_delta_us(camera.handed_at, microphone.handed_at) as f64 / 1000.0) +} + +fn presentation_late_ms(sample: state::UpstreamPresentationSample) -> f64 { + instant_delta_us(sample.handed_at, sample.due_at) as f64 / 1000.0 +} + +fn refresh_unpaired_pairing_anchor( + state: &mut UpstreamClockState, + kind: UpstreamMediaKind, + remote_pts_us: u64, + next_deadline: Instant, +) -> bool { + state.pairing_anchor_deadline = Some(next_deadline); + match kind { + UpstreamMediaKind::Camera if state.first_microphone_remote_pts_us.is_none() => { + state.first_camera_remote_pts_us = Some(remote_pts_us); + true + } + UpstreamMediaKind::Microphone if state.first_camera_remote_pts_us.is_none() => { + state.first_microphone_remote_pts_us = Some(remote_pts_us); + true + } + _ => false, + } +} + +impl Default for UpstreamMediaRuntime { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests; diff --git a/server/src/upstream_media_runtime/config.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/config.rs similarity index 100% rename from server/src/upstream_media_runtime/config.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/config.rs diff --git a/server/src/upstream_media_runtime/lease_lifecycle.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/lease_lifecycle.rs similarity index 100% rename from server/src/upstream_media_runtime/lease_lifecycle.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/lease_lifecycle.rs diff --git a/server/src/upstream_media_runtime/state.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/state.rs similarity index 100% rename from server/src/upstream_media_runtime/state.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/state.rs diff --git a/server/src/upstream_media_runtime/tests.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests.rs similarity index 100% rename from server/src/upstream_media_runtime/tests.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests.rs diff --git a/server/src/upstream_media_runtime/tests/async_wait.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests/async_wait.rs similarity index 100% rename from server/src/upstream_media_runtime/tests/async_wait.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests/async_wait.rs diff --git a/server/src/upstream_media_runtime/tests/config.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests/config.rs similarity index 100% rename from server/src/upstream_media_runtime/tests/config.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests/config.rs diff --git a/server/src/upstream_media_runtime/tests/lifecycle.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests/lifecycle.rs similarity index 100% rename from server/src/upstream_media_runtime/tests/lifecycle.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests/lifecycle.rs diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests/planning.rs similarity index 100% rename from server/src/upstream_media_runtime/tests/planning.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/tests/planning.rs diff --git a/server/src/upstream_media_runtime/types.rs b/quarantine/upstream-media-v1/server/src/upstream_media_runtime/types.rs similarity index 100% rename from server/src/upstream_media_runtime/types.rs rename to quarantine/upstream-media-v1/server/src/upstream_media_runtime/types.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index e13362d..72e7190 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.18.5" +version = "0.19.0" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 77bf54d..ccb99ea 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -1,133 +1,209 @@ #[cfg(not(coverage))] -#[derive(Debug)] -enum BundledUpstreamEvent { - Audio(AudioPacket), - Video(VideoPacket), -} - +const MEDIA_V2_DEFAULT_PLAYOUT_DELAY_MS: u64 = 20; #[cfg(not(coverage))] -impl BundledUpstreamEvent { - fn remote_pts_us(&self) -> u64 { - self.client_timing().capture_pts_us - } - - fn client_timing(&self) -> UpstreamClientTiming { - match self { - Self::Audio(packet) => audio_client_timing(packet), - Self::Video(packet) => video_client_timing(packet), - } - } - - fn kind(&self) -> UpstreamMediaKind { - match self { - Self::Audio(_) => UpstreamMediaKind::Microphone, - Self::Video(_) => UpstreamMediaKind::Camera, - } - } - - fn playout_order(&self) -> u8 { - match self { - Self::Audio(_) => 0, - Self::Video(_) => 1, - } - } -} +const MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS: u64 = 1_000; +#[cfg(not(coverage))] +const MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US: u64 = 250_000; #[cfg(not(coverage))] #[derive(Clone, Copy, Debug, Default)] -struct BundledPlayoutClock { - base_remote_pts_us: Option, - epoch: Option, +struct MediaV2Clock { + base_capture_pts_us: Option, } #[cfg(not(coverage))] -impl BundledPlayoutClock { - fn ensure(&mut self, events: &[BundledUpstreamEvent]) -> Option<(u64, tokio::time::Instant)> { - if self.base_remote_pts_us.is_none() || self.epoch.is_none() { - let base = events.iter().map(BundledUpstreamEvent::remote_pts_us).min()?; - self.base_remote_pts_us = Some(base); - self.epoch = Some(tokio::time::Instant::now() + bundled_upstream_playout_delay()); - } - let base_remote_pts_us = self.base_remote_pts_us?; - let epoch = self.epoch?; - Some((base_remote_pts_us, epoch)) +impl MediaV2Clock { + fn local_pts_us(&mut self, capture_pts_us: u64) -> u64 { + let base = *self.base_capture_pts_us.get_or_insert(capture_pts_us); + capture_pts_us.saturating_sub(base) } } #[cfg(not(coverage))] -const BUNDLED_CAPTURE_BOUND_TOLERANCE_US: u64 = 50_000; -#[cfg(not(coverage))] -const BUNDLED_MIXED_CAPTURE_SPAN_DROP_US: u64 = 250_000; - -#[cfg(not(coverage))] -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -struct BundledTimingSummary { +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +struct MediaV2BundleFacts { has_audio: bool, has_video: bool, - min_event_pts_us: u64, - max_event_pts_us: u64, capture_span_us: u64, - capture_bounds_match: bool, - mixed_span_too_wide: bool, + max_queue_age_ms: u32, } #[cfg(not(coverage))] -fn bundled_events_are_mixed(events: &[BundledUpstreamEvent]) -> bool { - let has_audio = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Audio(_))); - let has_video = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Video(_))); - has_audio && has_video +#[derive(Clone, Copy, Debug)] +struct MediaV2HandoffSchedule { + audio_due_at: Option, + video_due_at: Option, + common_delay: Duration, + relative_audio_delay: Duration, + relative_video_delay: Duration, } #[cfg(not(coverage))] -fn retain_startup_video_only(events: &mut Vec) -> bool { - if !bundled_events_are_mixed(events) { - return false; +fn summarize_media_v2_bundle(bundle: &UpstreamMediaBundle) -> Option { + let mut capture_start_us = u64::MAX; + let mut capture_end_us = 0_u64; + let mut max_queue_age_ms = 0_u32; + let has_video = bundle.video.is_some(); + let has_audio = !bundle.audio.is_empty(); + if let Some(video) = bundle.video.as_ref() { + let pts = packet_video_capture_pts_us(video); + capture_start_us = capture_start_us.min(pts); + capture_end_us = capture_end_us.max(pts); + max_queue_age_ms = max_queue_age_ms.max(video.client_queue_age_ms); } - events.retain(|event| matches!(event, BundledUpstreamEvent::Video(_))); - !events.is_empty() -} - -#[cfg(not(coverage))] -fn summarize_bundled_timing( - bundle: &UpstreamMediaBundle, - events: &[BundledUpstreamEvent], -) -> Option { - let min_event_pts_us = events.iter().map(BundledUpstreamEvent::remote_pts_us).min()?; - let max_event_pts_us = events.iter().map(BundledUpstreamEvent::remote_pts_us).max()?; - let has_audio = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Audio(_))); - let has_video = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Video(_))); - let start_matches = bundle.capture_start_us == 0 - || abs_delta_us(bundle.capture_start_us, min_event_pts_us) - <= BUNDLED_CAPTURE_BOUND_TOLERANCE_US; - let end_matches = bundle.capture_end_us == 0 - || abs_delta_us(bundle.capture_end_us, max_event_pts_us) <= BUNDLED_CAPTURE_BOUND_TOLERANCE_US; - let capture_span_us = max_event_pts_us.saturating_sub(min_event_pts_us); - Some(BundledTimingSummary { + for audio in &bundle.audio { + let pts = packet_audio_capture_pts_us(audio); + capture_start_us = capture_start_us.min(pts); + capture_end_us = capture_end_us.max(pts); + max_queue_age_ms = max_queue_age_ms.max(audio.client_queue_age_ms); + } + if !has_audio && !has_video { + return None; + } + if capture_start_us == u64::MAX { + capture_start_us = 0; + } + Some(MediaV2BundleFacts { has_audio, has_video, - min_event_pts_us, - max_event_pts_us, - capture_span_us, - capture_bounds_match: start_matches && end_matches, - mixed_span_too_wide: has_audio - && has_video - && capture_span_us > BUNDLED_MIXED_CAPTURE_SPAN_DROP_US, + capture_span_us: capture_end_us.saturating_sub(capture_start_us), + max_queue_age_ms, }) } #[cfg(not(coverage))] -fn abs_delta_us(left: u64, right: u64) -> u64 { - left.max(right) - left.min(right) +fn packet_audio_capture_pts_us(packet: &AudioPacket) -> u64 { + if packet.client_capture_pts_us == 0 { + packet.pts + } else { + packet.client_capture_pts_us + } } #[cfg(not(coverage))] -fn bundled_upstream_playout_delay() -> Duration { - std::env::var("LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS") - .or_else(|_| std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS")) +fn packet_video_capture_pts_us(packet: &VideoPacket) -> u64 { + if packet.client_capture_pts_us == 0 { + packet.pts + } else { + packet.client_capture_pts_us + } +} + +#[cfg(not(coverage))] +fn media_v2_playout_delay() -> Duration { + std::env::var("LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS") + .or_else(|_| std::env::var("LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS")) .ok() .and_then(|value| value.trim().parse::().ok()) .map(Duration::from_millis) - .unwrap_or_else(|| Duration::from_millis(350)) + .unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_PLAYOUT_DELAY_MS)) +} + +#[cfg(not(coverage))] +fn media_v2_max_live_age() -> Duration { + std::env::var("LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS") + .or_else(|_| std::env::var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS")) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .map(Duration::from_millis) + .unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS)) +} + +#[cfg(not(coverage))] +fn media_v2_handoff_schedule( + facts: MediaV2BundleFacts, + audio_offset_us: i64, + video_offset_us: i64, +) -> Option { + let max_live_age = media_v2_max_live_age(); + let queue_age = Duration::from_millis(u64::from(facts.max_queue_age_ms)); + if queue_age >= max_live_age { + return None; + } + + let mut relative_audio_delay = Duration::ZERO; + let mut relative_video_delay = Duration::ZERO; + if facts.has_audio && facts.has_video { + let base_offset_us = audio_offset_us.min(video_offset_us); + relative_audio_delay = + Duration::from_micros(audio_offset_us.saturating_sub(base_offset_us) as u64); + relative_video_delay = + Duration::from_micros(video_offset_us.saturating_sub(base_offset_us) as u64); + } + let relative_span = relative_audio_delay.max(relative_video_delay); + let remaining_after_offset = max_live_age + .saturating_sub(queue_age) + .saturating_sub(relative_span); + let common_delay = media_v2_playout_delay().min(remaining_after_offset); + let now = tokio::time::Instant::now(); + Some(MediaV2HandoffSchedule { + audio_due_at: facts + .has_audio + .then_some(now + common_delay + relative_audio_delay), + video_due_at: facts + .has_video + .then_some(now + common_delay + relative_video_delay), + common_delay, + relative_audio_delay, + relative_video_delay, + }) +} + +#[cfg(not(coverage))] +async fn sleep_until_media_v2(due_at: tokio::time::Instant) { + if due_at > tokio::time::Instant::now() { + tokio::time::sleep_until(due_at).await; + } +} + +#[cfg(not(coverage))] +async fn push_media_v2_audio( + audio_packets: &mut Vec, + clock: &mut MediaV2Clock, + sink: &mut lesavka_server::audio::Voice, + upstream_media_rt: &UpstreamMediaRuntime, + due_at: tokio::time::Instant, +) { + sleep_until_media_v2(due_at).await; + for mut audio in audio_packets.drain(..) { + let capture_pts_us = packet_audio_capture_pts_us(&audio); + audio.pts = clock.local_pts_us(capture_pts_us); + sink.push(&audio); + upstream_media_rt.mark_audio_presented(audio.pts, due_at); + } +} + +#[cfg(not(coverage))] +async fn feed_media_v2_video( + video: Option, + clock: &mut MediaV2Clock, + relay: &Arc, + upstream_media_rt: &UpstreamMediaRuntime, + due_at: tokio::time::Instant, + video_presented_once: &mut bool, + rpc_id: u64, + session_id: u64, + camera_session_id: u64, +) { + let Some(mut video) = video else { + return; + }; + sleep_until_media_v2(due_at).await; + let capture_pts_us = packet_video_capture_pts_us(&video); + video.pts = clock.local_pts_us(capture_pts_us); + let presented_pts = video.pts; + relay.feed(video); + if !*video_presented_once { + info!( + rpc_id, + session_id, + camera_session_id, + pts = presented_pts, + "📦 first v2 bundled video frame fed to camera sink" + ); + *video_presented_once = true; + } + upstream_media_rt.mark_video_presented(presented_pts, due_at); } #[cfg(not(coverage))] @@ -235,7 +311,7 @@ impl Relay for Handler { Ok(Response::new(ReceiverStream::new(rx))) } - /// Accept client-bundled webcam and microphone packets on one upstream clock. + /// Accept client-bundled webcam and microphone packets on the v2 upstream path. async fn stream_webcam_media( &self, req: Request>, @@ -254,16 +330,14 @@ impl Relay for Handler { width = camera_cfg.width, height = camera_cfg.height, fps = camera_cfg.fps, - "📦 stream_webcam_media opened" + "📦 stream_webcam_media v2 opened" ); let (camera_session_id, relay, _relay_reused) = match self.camera_rt.activate(&camera_cfg).await { Ok(active) => active, Err(err) => { - self.upstream_media_rt - .close_camera(camera_lease.generation); - self.upstream_media_rt - .close_microphone(microphone_lease.generation); + self.upstream_media_rt.close_camera(camera_lease.generation); + self.upstream_media_rt.close_microphone(microphone_lease.generation); return Err(err); } }; @@ -272,53 +346,44 @@ impl Relay for Handler { .reserve_microphone_sink(microphone_lease.generation) .await else { - self.upstream_media_rt - .close_camera(camera_lease.generation); - self.upstream_media_rt - .close_microphone(microphone_lease.generation); + self.upstream_media_rt.close_camera(camera_lease.generation); + self.upstream_media_rt.close_microphone(microphone_lease.generation); return Err(Status::aborted( - "bundled webcam media stream superseded before microphone sink became available", + "v2 bundled media stream superseded before microphone sink became available", )); }; let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); let mut sink = runtime_support::open_voice_with_retry(&uac_dev) .await .map_err(|e| { - self.upstream_media_rt - .close_camera(camera_lease.generation); - self.upstream_media_rt - .close_microphone(microphone_lease.generation); + self.upstream_media_rt.close_camera(camera_lease.generation); + self.upstream_media_rt.close_microphone(microphone_lease.generation); Status::internal(format!("{e:#}")) })?; let camera_rt = self.camera_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone(); - let frame_step_us = (1_000_000u64 / u64::from(camera_cfg.fps.max(1))).max(1); - let stale_drop_budget = upstream_stale_drop_budget(); let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { let _microphone_sink_permit = microphone_sink_permit; let mut inbound = req.into_inner(); - let mut clock = BundledPlayoutClock::default(); + let mut clock = MediaV2Clock::default(); let mut last_bundle_session_id = None; let mut last_bundle_seq = None; let mut video_presented_once = false; let mut outcome = "aborted"; - 'bundled_loop: loop { - let bundle = match inbound.next().await { - Some(Ok(bundle)) => bundle, - Some(Err(err)) => { + + while let Some(bundle_result) = inbound.next().await { + let mut bundle = match bundle_result { + Ok(bundle) => bundle, + Err(err) => { warn!( rpc_id, session_id = camera_lease.session_id, - "📦 stream_webcam_media inbound error before clean EOF: {err}" + "📦 stream_webcam_media v2 inbound error before clean EOF: {err}" ); break; } - None => { - outcome = "closed"; - break; - } }; if !camera_rt.is_active(camera_session_id) || !upstream_media_rt.is_camera_active(camera_lease.generation) @@ -327,31 +392,14 @@ impl Relay for Handler { outcome = "superseded"; break; } - let mut events = Vec::with_capacity(bundle.audio.len() + 1); - if let Some(video) = bundle.video.clone() { - upstream_media_rt - .record_client_timing(UpstreamMediaKind::Camera, video_client_timing(&video)); - events.push(BundledUpstreamEvent::Video(video)); - } - for audio in &bundle.audio { - upstream_media_rt.record_client_timing( - UpstreamMediaKind::Microphone, - audio_client_timing(audio), - ); - events.push(BundledUpstreamEvent::Audio(audio.clone())); - } - if events.is_empty() { - continue; - } - events.sort_by_key(|event| (event.remote_pts_us(), event.playout_order())); if last_bundle_session_id.is_some_and(|session_id| session_id != bundle.session_id) { warn!( rpc_id, previous_session_id = last_bundle_session_id.unwrap_or_default(), next_session_id = bundle.session_id, - "📦 bundled upstream client session changed inside one gRPC stream; resetting playout epoch" + "📦 v2 bundled client session changed; resetting local media clock" ); - clock = BundledPlayoutClock::default(); + clock = MediaV2Clock::default(); last_bundle_seq = None; } last_bundle_session_id = Some(bundle.session_id); @@ -362,204 +410,143 @@ impl Relay for Handler { client_bundle_session_id = bundle.session_id, bundle_seq = bundle.seq, previous_bundle_seq = last_bundle_seq.unwrap_or_default(), - "📦 bundled upstream packet sequence moved backwards; dropping duplicate/stale bundle" + "📦 v2 dropping duplicate/stale bundled packet" ); continue; } last_bundle_seq = Some(bundle.seq); - let Some(timing_summary) = summarize_bundled_timing(&bundle, &events) else { + + let Some(facts) = summarize_media_v2_bundle(&bundle) else { continue; }; - if !timing_summary.capture_bounds_match { + if facts.has_audio && facts.has_video && facts.capture_span_us > MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US { warn!( rpc_id, session_id = camera_lease.session_id, client_bundle_session_id = bundle.session_id, bundle_seq = bundle.seq, - capture_start_us = bundle.capture_start_us, - capture_end_us = bundle.capture_end_us, - event_min_us = timing_summary.min_event_pts_us, - event_max_us = timing_summary.max_event_pts_us, - "📦 bundled upstream capture bounds disagreed with packet timing; using packet sidecar timing" + span_ms = facts.capture_span_us / 1000, + "📦 v2 dropping mixed bundle with impossible A/V capture span" + ); + continue; + } + for audio in &bundle.audio { + upstream_media_rt.record_client_timing( + UpstreamMediaKind::Microphone, + audio_client_timing(audio), ); } - if timing_summary.mixed_span_too_wide { - if !video_presented_once && retain_startup_video_only(&mut events) { - warn!( - rpc_id, - session_id = camera_lease.session_id, - client_bundle_session_id = bundle.session_id, - bundle_seq = bundle.seq, - span_ms = timing_summary.capture_span_us / 1000, - "📦 bundled startup A/V span is too wide; dropping audio but feeding video to prime the camera device" - ); - } else { - warn!( - rpc_id, - session_id = camera_lease.session_id, - client_bundle_session_id = bundle.session_id, - bundle_seq = bundle.seq, - span_ms = timing_summary.capture_span_us / 1000, - "📦 bundled mixed A/V capture span is too wide; dropping the bundle to protect sync" - ); - continue; - } + if let Some(video) = bundle.video.as_ref() { + upstream_media_rt.record_client_timing( + UpstreamMediaKind::Camera, + video_client_timing(video), + ); } - let Some((base_remote_pts_us, epoch)) = clock.ensure(&events) else { + let (video_offset_us, audio_offset_us) = upstream_media_rt.playout_offsets(); + let Some(schedule) = media_v2_handoff_schedule(facts, audio_offset_us, video_offset_us) else { + if facts.has_video { + upstream_media_rt.record_video_freeze( + "v2 dropped stale bundled A/V before UVC/UAC handoff", + ); + } + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + max_queue_age_ms = facts.max_queue_age_ms, + "📦 v2 dropping whole bundle because it is already outside the freshness budget" + ); continue; }; - let mut mixed_bundle = bundled_events_are_mixed(&events); - let startup_video_priming = !video_presented_once - && events - .iter() - .any(|event| matches!(event, BundledUpstreamEvent::Video(_))); - let mut planned_events = Vec::with_capacity(events.len()); - let mut drop_mixed_bundle = false; - let mut dropped_audio_for_startup_video = false; - for event in events { - let kind = event.kind(); - let min_step_us = match kind { - UpstreamMediaKind::Camera => frame_step_us, - UpstreamMediaKind::Microphone => 1, - }; - let plan = match upstream_media_rt.plan_bundled_pts( - kind, - event.remote_pts_us(), - min_step_us, - base_remote_pts_us, - epoch, - ) { - lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, - lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { - if startup_video_priming && kind == UpstreamMediaKind::Microphone { - dropped_audio_for_startup_video = true; - continue; - } - if mixed_bundle { - drop_mixed_bundle = true; - } - continue; - } - lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => { - tracing::warn!( - rpc_id, - session_id = camera_lease.session_id, - ?kind, - reason, - "📦 bundled upstream packet dropped by freshness planner" - ); - if startup_video_priming && kind == UpstreamMediaKind::Microphone { - dropped_audio_for_startup_video = true; - continue; - } - if mixed_bundle { - drop_mixed_bundle = true; - } - continue; - } - lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => continue, - lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(reason) => { - tracing::error!( - rpc_id, - session_id = camera_lease.session_id, - reason, - "📦 bundled upstream startup failed" - ); - break 'bundled_loop; - } - }; - if plan.late_by > stale_drop_budget { - tracing::warn!( + debug!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + max_queue_age_ms = facts.max_queue_age_ms, + common_delay_ms = schedule.common_delay.as_millis(), + relative_audio_delay_ms = schedule.relative_audio_delay.as_millis(), + relative_video_delay_ms = schedule.relative_video_delay.as_millis(), + audio_offset_us, + video_offset_us, + "📦 v2 scheduled bundled UAC/UVC handoff from one capture clock" + ); + + match (schedule.audio_due_at, schedule.video_due_at) { + (Some(audio_due_at), Some(video_due_at)) if audio_due_at <= video_due_at => { + push_media_v2_audio( + &mut bundle.audio, + &mut clock, + &mut sink, + &upstream_media_rt, + audio_due_at, + ) + .await; + feed_media_v2_video( + bundle.video.take(), + &mut clock, + &relay, + &upstream_media_rt, + video_due_at, + &mut video_presented_once, rpc_id, - session_id = camera_lease.session_id, - ?kind, - late_by_ms = plan.late_by.as_millis(), - pts = plan.local_pts_us, - "📦 bundled upstream packet dropped after missing freshness budget" - ); - if startup_video_priming && kind == UpstreamMediaKind::Microphone { - dropped_audio_for_startup_video = true; - continue; - } - if mixed_bundle { - drop_mixed_bundle = true; - } - continue; + camera_lease.session_id, + camera_session_id, + ) + .await; } - planned_events.push((event, plan)); - } - if dropped_audio_for_startup_video { - mixed_bundle = false; - warn!( - rpc_id, - session_id = camera_lease.session_id, - client_bundle_session_id = bundle.session_id, - bundle_seq = bundle.seq, - "📦 dropped startup audio from a bad bundled packet but kept video-only playout so the camera can start" - ); - } - if drop_mixed_bundle { - warn!( - rpc_id, - session_id = camera_lease.session_id, - client_bundle_session_id = bundle.session_id, - bundle_seq = bundle.seq, - "📦 dropping mixed A/V bundle coherently because one side failed sync/freshness planning" - ); - continue; - } - for (event, plan) in planned_events { - let kind = event.kind(); - tokio::time::sleep_until(plan.due_at).await; - let actual_late_by = tokio::time::Instant::now() - .checked_duration_since(plan.due_at) - .unwrap_or_default(); - if actual_late_by > stale_drop_budget { - tracing::warn!( + (Some(audio_due_at), Some(video_due_at)) => { + feed_media_v2_video( + bundle.video.take(), + &mut clock, + &relay, + &upstream_media_rt, + video_due_at, + &mut video_presented_once, rpc_id, - session_id = camera_lease.session_id, - ?kind, - late_by_ms = actual_late_by.as_millis(), - pts = plan.local_pts_us, - "📦 bundled upstream packet dropped after waking too late" - ); - if mixed_bundle { - warn!( - rpc_id, - session_id = camera_lease.session_id, - client_bundle_session_id = bundle.session_id, - bundle_seq = bundle.seq, - "📦 stopping the rest of this mixed bundle after a late wake to avoid asymmetric playout" - ); - break; - } - continue; + camera_lease.session_id, + camera_session_id, + ) + .await; + push_media_v2_audio( + &mut bundle.audio, + &mut clock, + &mut sink, + &upstream_media_rt, + audio_due_at, + ) + .await; } - match event { - BundledUpstreamEvent::Audio(mut packet) => { - packet.pts = plan.local_pts_us; - sink.push(&packet); - upstream_media_rt.mark_audio_presented(packet.pts, plan.due_at); - } - BundledUpstreamEvent::Video(mut packet) => { - packet.pts = plan.local_pts_us; - let presented_pts = packet.pts; - relay.feed(packet); - if !video_presented_once { - info!( - rpc_id, - session_id = camera_lease.session_id, - camera_session_id, - pts = presented_pts, - "📦 first bundled video frame fed to camera sink" - ); - video_presented_once = true; - } - upstream_media_rt.mark_video_presented(presented_pts, plan.due_at); - } + (Some(audio_due_at), None) => { + push_media_v2_audio( + &mut bundle.audio, + &mut clock, + &mut sink, + &upstream_media_rt, + audio_due_at, + ) + .await; } + (None, Some(video_due_at)) => { + feed_media_v2_video( + bundle.video.take(), + &mut clock, + &relay, + &upstream_media_rt, + video_due_at, + &mut video_presented_once, + rpc_id, + camera_lease.session_id, + camera_session_id, + ) + .await; + } + (None, None) => {} } } + + outcome = if outcome == "aborted" { "closed" } else { outcome }; sink.finish(); upstream_media_rt.close_camera(camera_lease.generation); upstream_media_rt.close_microphone(microphone_lease.generation); @@ -568,7 +555,7 @@ impl Relay for Handler { session_id = camera_lease.session_id, camera_session_id, outcome, - "📦 stream_webcam_media lifecycle ended" + "📦 stream_webcam_media v2 lifecycle ended" ); tx.send(Ok(Empty {})).await.ok(); Ok::<(), Status>(()) diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index ed8032b..01642f6 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -2,13 +2,12 @@ #[allow(clippy::items_after_test_module)] mod tests { use super::{ - BundledUpstreamEvent, UpstreamStreamCleanup, bundled_events_are_mixed, - retain_freshest_audio_packet, retain_freshest_video_packet, retain_startup_video_only, - summarize_bundled_timing, + MediaV2BundleFacts, UpstreamStreamCleanup, media_v2_handoff_schedule, + retain_freshest_audio_packet, retain_freshest_video_packet, summarize_media_v2_bundle, }; use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket}; use lesavka_server::upstream_media_runtime::{ - UpstreamMediaKind, UpstreamMediaRuntime, UpstreamClientTiming, + UpstreamClientTiming, UpstreamMediaKind, UpstreamMediaRuntime, }; use std::sync::Arc; @@ -99,69 +98,84 @@ mod tests { } #[test] - fn bundled_event_timing_uses_client_capture_sidecar_not_packet_pts() { - let video = BundledUpstreamEvent::Video(VideoPacket { + fn media_v2_bundle_summary_uses_client_capture_sidecar_not_packet_pts() { + let bundle = UpstreamMediaBundle { + capture_start_us: 1, + capture_end_us: 2, + video: Some(VideoPacket { + pts: 9_999_000, + client_capture_pts_us: 1_000_000, + client_send_pts_us: 1_001_000, + client_queue_age_ms: 12, + ..Default::default() + }), + audio: vec![AudioPacket { + pts: 8_888_000, + client_capture_pts_us: 1_020_000, + client_send_pts_us: 1_021_000, + client_queue_age_ms: 34, + ..Default::default() + }], + ..Default::default() + }; + + let summary = summarize_media_v2_bundle(&bundle).expect("summary"); + + assert!(summary.has_video); + assert!(summary.has_audio); + assert_eq!(summary.capture_span_us, 20_000); + assert_eq!(summary.max_queue_age_ms, 34); + } + + #[test] + fn media_v2_schedule_offsets_outputs_without_creating_split_planner() { + let facts = MediaV2BundleFacts { + has_audio: true, + has_video: true, + capture_span_us: 20_000, + max_queue_age_ms: 0, + }; + + let schedule = + media_v2_handoff_schedule(facts, 0, 979_000).expect("fresh schedule"); + let audio_due = schedule.audio_due_at.expect("audio due"); + let video_due = schedule.video_due_at.expect("video due"); + + assert_eq!(schedule.relative_audio_delay.as_millis(), 0); + assert_eq!(schedule.relative_video_delay.as_millis(), 979); + assert_eq!(video_due.duration_since(audio_due).as_millis(), 979); + assert!(schedule.common_delay.as_millis() <= 21); + } + + #[test] + fn media_v2_schedule_refuses_bundles_already_past_freshness_budget() { + let facts = MediaV2BundleFacts { + has_audio: true, + has_video: true, + capture_span_us: 20_000, + max_queue_age_ms: 1_000, + }; + + assert!(media_v2_handoff_schedule(facts, 0, 0).is_none()); + } + + #[test] + fn legacy_bundled_event_timing_example_documents_quarantined_v1_behavior() { + let video = VideoPacket { pts: 9_999_000, client_capture_pts_us: 1_000_000, client_send_pts_us: 1_001_000, ..Default::default() - }); - let audio = BundledUpstreamEvent::Audio(AudioPacket { + }; + let audio = AudioPacket { pts: 8_888_000, client_capture_pts_us: 1_020_000, client_send_pts_us: 1_021_000, ..Default::default() - }); - - assert_eq!(video.remote_pts_us(), 1_000_000); - assert_eq!(audio.remote_pts_us(), 1_020_000); - } - - #[test] - fn bundled_timing_summary_flags_bad_bounds_and_wide_mixed_span() { - let events = vec![ - BundledUpstreamEvent::Video(VideoPacket { - client_capture_pts_us: 1_000_000, - ..Default::default() - }), - BundledUpstreamEvent::Audio(AudioPacket { - client_capture_pts_us: 1_400_000, - ..Default::default() - }), - ]; - let bundle = UpstreamMediaBundle { - capture_start_us: 100, - capture_end_us: 200, - ..Default::default() }; - let summary = summarize_bundled_timing(&bundle, &events).expect("timing summary"); - - assert!(bundled_events_are_mixed(&events)); - assert_eq!(summary.capture_span_us, 400_000); - assert!(!summary.capture_bounds_match); - assert!(summary.mixed_span_too_wide); - } - - #[test] - fn startup_video_retention_drops_audio_from_bad_mixed_bundle() { - let mut events = vec![ - BundledUpstreamEvent::Audio(AudioPacket { - client_capture_pts_us: 1_000_000, - ..Default::default() - }), - BundledUpstreamEvent::Video(VideoPacket { - client_capture_pts_us: 1_500_000, - ..Default::default() - }), - ]; - - assert!(bundled_events_are_mixed(&events)); - assert!(retain_startup_video_only(&mut events)); - - assert!(!bundled_events_are_mixed(&events)); - assert_eq!(events.len(), 1); - assert!(matches!(events[0], BundledUpstreamEvent::Video(_))); + assert_eq!(video.client_capture_pts_us, 1_000_000); + assert_eq!(audio.client_capture_pts_us, 1_020_000); } #[test] diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 4e3b918..89dbeac 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -1,51 +1,208 @@ #![forbid(unsafe_code)] +use std::collections::VecDeque; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; -use tracing::info; -mod config; -mod state; -mod types; +const TIMING_WINDOW_CAPACITY: usize = 240; +const FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 0; +const FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 1_090_000; -use config::{ - apply_playout_offset, upstream_audio_master_wait_grace, upstream_bundled_playout_delay, - upstream_bundled_playout_offset_override_us, upstream_camera_startup_grace_us, - upstream_max_live_lag, upstream_pairing_master_slack, upstream_playout_delay, - upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_require_paired_startup, - upstream_startup_timeout, upstream_timing_trace_enabled, -}; -use state::{UpstreamClockState, UpstreamSyncPhase}; -pub use types::{ - PlannedUpstreamPacket, UpstreamClientTiming, UpstreamMediaKind, UpstreamPlanDecision, - UpstreamPlannerSnapshot, UpstreamStreamLease, -}; +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum UpstreamMediaKind { + Camera, + Microphone, +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct UpstreamClientTiming { + pub capture_pts_us: u64, + pub send_pts_us: u64, + pub queue_depth: u32, + pub queue_age_ms: u32, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct UpstreamStreamLease { + pub session_id: u64, + pub generation: u64, +} + +#[derive(Clone, Copy, Debug)] +pub struct PlannedUpstreamPacket { + pub local_pts_us: u64, + pub due_at: Instant, + pub late_by: Duration, + pub source_lag: Duration, +} + +#[derive(Clone, Copy, Debug)] +pub enum UpstreamPlanDecision { + AwaitingPair, + DropBeforeOverlap, + DropStale(&'static str), + StartupFailed(&'static str), + Play(PlannedUpstreamPacket), +} + +#[derive(Clone, Debug)] +pub struct UpstreamPlannerSnapshot { + pub session_id: u64, + pub phase: &'static str, + pub latest_camera_remote_pts_us: Option, + pub latest_microphone_remote_pts_us: Option, + pub last_video_presented_pts_us: Option, + pub last_audio_presented_pts_us: Option, + pub live_lag_ms: Option, + pub planner_skew_ms: Option, + pub stale_audio_drops: u64, + pub stale_video_drops: u64, + pub skew_video_drops: u64, + pub freshness_reanchors: u64, + pub startup_timeouts: u64, + pub video_freezes: u64, + pub last_reason: String, + pub client_capture_skew_ms: Option, + pub client_send_skew_ms: Option, + pub server_receive_skew_ms: Option, + pub camera_client_queue_age_ms: Option, + pub microphone_client_queue_age_ms: Option, + pub camera_server_receive_age_ms: Option, + pub microphone_server_receive_age_ms: Option, + pub client_capture_abs_skew_p95_ms: Option, + pub client_send_abs_skew_p95_ms: Option, + pub server_receive_abs_skew_p95_ms: Option, + pub camera_client_queue_age_p95_ms: Option, + pub microphone_client_queue_age_p95_ms: Option, + pub sink_handoff_skew_ms: Option, + pub sink_handoff_abs_skew_p95_ms: Option, + pub camera_sink_late_ms: Option, + pub microphone_sink_late_ms: Option, + pub camera_sink_late_p95_ms: Option, + pub microphone_sink_late_p95_ms: Option, + pub client_timing_window_samples: u64, + pub sink_handoff_window_samples: u64, +} + +#[derive(Clone, Copy, Debug)] +struct TimingSample { + capture_pts_us: u64, + send_pts_us: u64, + queue_age_ms: u32, + received_at: Instant, +} + +#[derive(Clone, Copy, Debug)] +struct PresentationSample { + due_at: Instant, + handed_at: Instant, +} + +#[derive(Debug, Default)] +struct ScalarWindow { + values: VecDeque, +} + +impl ScalarWindow { + fn push(&mut self, value: f64) { + if self.values.len() >= TIMING_WINDOW_CAPACITY { + self.values.pop_front(); + } + self.values.push_back(value); + } + + fn p95(&self) -> Option { + percentile(self.values.iter().copied(), 0.95) + } + + fn p95_abs(&self) -> Option { + percentile(self.values.iter().map(|value| value.abs()), 0.95) + } + + fn len(&self) -> usize { + self.values.len() + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum UpstreamSyncPhase { + Acquiring, + Syncing, + Live, + Healing, +} + +impl Default for UpstreamSyncPhase { + fn default() -> Self { + Self::Acquiring + } +} + +impl UpstreamSyncPhase { + fn as_str(self) -> &'static str { + match self { + Self::Acquiring => "acquiring", + Self::Syncing => "syncing", + Self::Live => "live", + Self::Healing => "healing", + } + } +} + +#[derive(Debug, Default)] +struct RuntimeState { + session_id: u64, + active_camera_generation: Option, + active_microphone_generation: Option, + phase: UpstreamSyncPhase, + session_started_at: Option, + base_remote_pts_us: Option, + playout_epoch: Option, + latest_camera_remote_pts_us: Option, + latest_microphone_remote_pts_us: Option, + last_video_local_pts_us: Option, + last_audio_local_pts_us: Option, + last_video_presented_pts_us: Option, + last_audio_presented_pts_us: Option, + latest_camera_timing: Option, + latest_microphone_timing: Option, + latest_camera_presentation: Option, + latest_microphone_presentation: Option, + latest_paired_client_capture_skew_ms: Option, + latest_paired_client_send_skew_ms: Option, + latest_paired_server_receive_skew_ms: Option, + client_capture_skew_window_ms: ScalarWindow, + client_send_skew_window_ms: ScalarWindow, + server_receive_skew_window_ms: ScalarWindow, + camera_client_queue_age_window_ms: ScalarWindow, + microphone_client_queue_age_window_ms: ScalarWindow, + sink_handoff_skew_window_ms: ScalarWindow, + camera_sink_late_window_ms: ScalarWindow, + microphone_sink_late_window_ms: ScalarWindow, + stale_audio_drops: u64, + stale_video_drops: u64, + skew_video_drops: u64, + freshness_reanchors: u64, + startup_timeouts: u64, + video_freezes: u64, + last_reason: String, +} -/// Coordinate upstream stream ownership and keep audio/video on one timeline. -/// -/// Inputs: stream-open/close events plus remote packet timestamps. -/// Outputs: active-stream leases and rebased local PTS values. -/// Why: live calls need one current webcam owner, one current microphone owner, -/// and one shared media clock so reconnects do not leave old sinks alive or let -/// audio/video drift onto separate timing islands. #[derive(Debug)] pub struct UpstreamMediaRuntime { next_session_id: AtomicU64, next_camera_generation: AtomicU64, next_microphone_generation: AtomicU64, microphone_sink_gate: Arc, - pairing_state_notify: Arc, - audio_progress_notify: Arc, camera_playout_offset_us: AtomicI64, microphone_playout_offset_us: AtomicI64, - state: Mutex, + state: Mutex, } impl UpstreamMediaRuntime { - /// Build an empty upstream runtime. #[must_use] pub fn new() -> Self { Self { @@ -53,19 +210,14 @@ impl UpstreamMediaRuntime { next_camera_generation: AtomicU64::new(0), next_microphone_generation: AtomicU64::new(0), microphone_sink_gate: Arc::new(Semaphore::new(1)), - pairing_state_notify: Arc::new(Notify::new()), - audio_progress_notify: Arc::new(Notify::new()), - camera_playout_offset_us: AtomicI64::new(upstream_playout_offset_us( - UpstreamMediaKind::Camera, - )), - microphone_playout_offset_us: AtomicI64::new(upstream_playout_offset_us( + camera_playout_offset_us: AtomicI64::new(playout_offset_us(UpstreamMediaKind::Camera)), + microphone_playout_offset_us: AtomicI64::new(playout_offset_us( UpstreamMediaKind::Microphone, )), - state: Mutex::new(UpstreamClockState::default()), + state: Mutex::new(RuntimeState::default()), } } - /// Apply live upstream playout offsets without restarting the relay. pub fn set_playout_offsets(&self, camera_offset_us: i64, microphone_offset_us: i64) { self.camera_playout_offset_us .store(camera_offset_us, Ordering::Relaxed); @@ -73,7 +225,6 @@ impl UpstreamMediaRuntime { .store(microphone_offset_us, Ordering::Relaxed); } - /// Return `(camera_offset_us, microphone_offset_us)` currently used for live playout. #[must_use] pub fn playout_offsets(&self) -> (i64, i64) { ( @@ -82,65 +233,55 @@ impl UpstreamMediaRuntime { ) } - fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { - match kind { - UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed), - UpstreamMediaKind::Microphone => { - self.microphone_playout_offset_us.load(Ordering::Relaxed) - } - } + #[must_use] + pub fn activate_camera(&self) -> UpstreamStreamLease { + self.activate(UpstreamMediaKind::Camera) } - fn positive_audio_delay_allowance_us(&self) -> u64 { - let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed); - let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed); - microphone_offset_us.saturating_sub(camera_offset_us).max(0) as u64 + #[must_use] + pub fn activate_microphone(&self) -> UpstreamStreamLease { + self.activate(UpstreamMediaKind::Microphone) } - fn audio_ahead_video_allowance_us(&self) -> u64 { - let camera_offset_us = self.camera_playout_offset_us.load(Ordering::Relaxed); - let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed); - camera_offset_us.saturating_sub(microphone_offset_us).max(0) as u64 + pub async fn reserve_microphone_sink(&self, generation: u64) -> Option { + let permit = self + .microphone_sink_gate + .clone() + .acquire_owned() + .await + .ok()?; + self.is_microphone_active(generation).then_some(permit) } - fn intentional_future_wait_allowance_us(&self, kind: UpstreamMediaKind) -> u64 { - match kind { - UpstreamMediaKind::Camera => self.audio_ahead_video_allowance_us(), - UpstreamMediaKind::Microphone => self.positive_audio_delay_allowance_us(), - } + #[must_use] + pub fn is_camera_active(&self, generation: u64) -> bool { + self.is_active(UpstreamMediaKind::Camera, generation) } - fn raw_bundled_playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { - upstream_bundled_playout_offset_override_us(kind) - .unwrap_or_else(|| self.playout_offset_us(kind)) + #[must_use] + pub fn is_microphone_active(&self, generation: u64) -> bool { + self.is_active(UpstreamMediaKind::Microphone, generation) } - fn bundled_playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { - self.raw_bundled_playout_offset_us(kind) + pub fn close_camera(&self, generation: u64) { + self.close(UpstreamMediaKind::Camera, generation); } - /// Mark one audio chunk as actually handed to the UAC sink. - pub fn mark_audio_presented(&self, local_pts_us: u64, due_at: Instant) { - let mut state = self - .state - .lock() - .expect("upstream media state mutex poisoned"); - record_presentation_sample(&mut state, UpstreamMediaKind::Microphone, due_at); - state.last_audio_presented_pts_us = Some(local_pts_us); - if state.phase != UpstreamSyncPhase::Failed { - state.phase = UpstreamSyncPhase::Live; - state.last_reason = "audio-master playhead flowing".to_string(); - } - self.audio_progress_notify.notify_waiters(); + pub fn close_microphone(&self, generation: u64) { + self.close(UpstreamMediaKind::Microphone, generation); + } + + pub fn soft_recover_microphone(&self) { + let lease = self.activate_microphone(); + self.close_microphone(lease.generation); } - /// Record client-side timing facts for one packet as it arrives at the server. pub fn record_client_timing(&self, kind: UpstreamMediaKind, timing: UpstreamClientTiming) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); - let sample = state::UpstreamTimingSample { + let sample = TimingSample { capture_pts_us: timing.capture_pts_us, send_pts_us: timing.send_pts_us, queue_age_ms: timing.queue_age_ms, @@ -149,68 +290,61 @@ impl UpstreamMediaRuntime { match kind { UpstreamMediaKind::Camera => { state.latest_camera_timing = Some(sample); - push_timing_sample(&mut state.recent_camera_timing, sample); + state.latest_camera_remote_pts_us = Some(timing.capture_pts_us); state .camera_client_queue_age_window_ms .push(f64::from(timing.queue_age_ms)); } UpstreamMediaKind::Microphone => { state.latest_microphone_timing = Some(sample); - push_timing_sample(&mut state.recent_microphone_timing, sample); + state.latest_microphone_remote_pts_us = Some(timing.capture_pts_us); state .microphone_client_queue_age_window_ms .push(f64::from(timing.queue_age_ms)); } } - record_client_timing_windows(&mut state, kind, sample); + record_timing_pair(&mut state); + } + + pub fn mark_audio_presented(&self, local_pts_us: u64, due_at: Instant) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + state.last_audio_presented_pts_us = Some(local_pts_us); + record_presentation(&mut state, UpstreamMediaKind::Microphone, due_at); + state.phase = UpstreamSyncPhase::Live; + state.last_reason = "v2 audio handed to UAC".to_string(); } - /// Mark one video frame as actually handed to the UVC/HDMI sink. pub fn mark_video_presented(&self, local_pts_us: u64, due_at: Instant) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); - record_presentation_sample(&mut state, UpstreamMediaKind::Camera, due_at); state.last_video_presented_pts_us = Some(local_pts_us); - if state.phase != UpstreamSyncPhase::Failed { - state.phase = UpstreamSyncPhase::Live; - state.last_reason = "video follower emitted a synced frame".to_string(); - } + record_presentation(&mut state, UpstreamMediaKind::Camera, due_at); + state.phase = UpstreamSyncPhase::Live; + state.last_reason = "v2 video handed to UVC".to_string(); } - /// Record that video intentionally froze instead of showing an out-of-sync frame. pub fn record_video_freeze(&self, reason: impl Into) { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); state.video_freezes = state.video_freezes.saturating_add(1); - if state.phase != UpstreamSyncPhase::Failed { - state.phase = UpstreamSyncPhase::Healing; - } + state.phase = UpstreamSyncPhase::Healing; state.last_reason = reason.into(); } - /// Return current planner facts for diagnostics and probe artifacts. #[must_use] pub fn snapshot(&self) -> UpstreamPlannerSnapshot { let state = self .state .lock() .expect("upstream media state mutex poisoned"); - let live_lag_ms = live_lag_us(&state).map(us_to_ms); - let planner_skew_ms = match ( - state.last_audio_presented_pts_us, - state.last_video_presented_pts_us, - ) { - (Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0), - _ => None, - }; let now = Instant::now(); - let client_capture_skew_ms = state.latest_paired_client_capture_skew_ms; - let client_send_skew_ms = state.latest_paired_client_send_skew_ms; - let server_receive_skew_ms = state.latest_paired_server_receive_skew_ms; UpstreamPlannerSnapshot { session_id: state.session_id, phase: state.phase.as_str(), @@ -218,8 +352,8 @@ impl UpstreamMediaRuntime { latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us, last_video_presented_pts_us: state.last_video_presented_pts_us, last_audio_presented_pts_us: state.last_audio_presented_pts_us, - live_lag_ms, - planner_skew_ms, + live_lag_ms: live_lag_ms(&state), + planner_skew_ms: planner_skew_ms(&state), stale_audio_drops: state.stale_audio_drops, stale_video_drops: state.stale_video_drops, skew_video_drops: state.skew_video_drops, @@ -227,25 +361,21 @@ impl UpstreamMediaRuntime { startup_timeouts: state.startup_timeouts, video_freezes: state.video_freezes, last_reason: state.last_reason.clone(), - client_capture_skew_ms, - client_send_skew_ms, - server_receive_skew_ms, + client_capture_skew_ms: state.latest_paired_client_capture_skew_ms, + client_send_skew_ms: state.latest_paired_client_send_skew_ms, + server_receive_skew_ms: state.latest_paired_server_receive_skew_ms, camera_client_queue_age_ms: state .latest_camera_timing .map(|sample| f64::from(sample.queue_age_ms)), microphone_client_queue_age_ms: state .latest_microphone_timing .map(|sample| f64::from(sample.queue_age_ms)), - camera_server_receive_age_ms: state.latest_camera_timing.map(|sample| { - now.saturating_duration_since(sample.received_at) - .as_secs_f64() - * 1000.0 - }), - microphone_server_receive_age_ms: state.latest_microphone_timing.map(|sample| { - now.saturating_duration_since(sample.received_at) - .as_secs_f64() - * 1000.0 - }), + camera_server_receive_age_ms: state + .latest_camera_timing + .map(|sample| age_ms(now, sample.received_at)), + microphone_server_receive_age_ms: state + .latest_microphone_timing + .map(|sample| age_ms(now, sample.received_at)), client_capture_abs_skew_p95_ms: state.client_capture_skew_window_ms.p95_abs(), client_send_abs_skew_p95_ms: state.client_send_skew_window_ms.p95_abs(), server_receive_abs_skew_p95_ms: state.server_receive_skew_window_ms.p95_abs(), @@ -263,21 +393,15 @@ impl UpstreamMediaRuntime { sink_handoff_window_samples: state.sink_handoff_skew_window_ms.len() as u64, } } -} -include!("upstream_media_runtime/lease_lifecycle.rs"); - -impl UpstreamMediaRuntime { - /// Rebase one upstream video packet timestamp onto the shared session clock. #[must_use] pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option { - match self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) { + match self.plan_video_pts(remote_pts_us, frame_step_us) { UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), _ => None, } } - /// Rebase one upstream audio packet timestamp onto the shared session clock. #[must_use] pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option { match self.plan_audio_pts(remote_pts_us) { @@ -286,30 +410,20 @@ impl UpstreamMediaRuntime { } } - /// Rebase and schedule one upstream video packet on the shared playout epoch. #[must_use] pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision { - self.plan_pts( + self.plan_legacy_pts( UpstreamMediaKind::Camera, remote_pts_us, frame_step_us.max(1), ) } - /// Rebase and schedule one upstream audio packet on the shared playout epoch. #[must_use] pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision { - self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) + self.plan_legacy_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } - /// Schedule a packet from the bundled webcam/microphone transport. - /// - /// Inputs: the media kind, client capture timestamp, packet cadence floor, - /// and the client-owned bundle epoch chosen for this gRPC stream. - /// Outputs: the server playout decision for that packet. - /// Why: bundled webcam media has already been synchronized on the client, - /// so the server should not re-solve cross-stream startup pairing. It only - /// rebases the shared client clock onto a fresh local playout epoch. #[must_use] pub fn plan_bundled_pts( &self, @@ -319,401 +433,123 @@ impl UpstreamMediaRuntime { bundle_base_remote_pts_us: u64, bundle_epoch: Instant, ) -> UpstreamPlanDecision { + self.plan_rebased_pts( + kind, + remote_pts_us, + min_step_us.max(1), + Some(bundle_base_remote_pts_us), + Some(bundle_epoch), + ) + } + + pub async fn wait_for_audio_master(&self, _video_local_pts_us: u64, _due_at: Instant) -> bool { + true + } + + fn activate(&self, kind: UpstreamMediaKind) -> UpstreamStreamLease { + let generation = match kind { + UpstreamMediaKind::Camera => { + self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1 + } + UpstreamMediaKind::Microphone => { + self.next_microphone_generation + .fetch_add(1, Ordering::SeqCst) + + 1 + } + }; let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); - let session_id = state.session_id; + if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() + { + state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; + reset_session_state(&mut state); + state.session_started_at = Some(Instant::now()); + state.phase = UpstreamSyncPhase::Acquiring; + state.last_reason = "v2 upstream session acquiring media".to_string(); + } match kind { - UpstreamMediaKind::Camera => { - state.camera_packet_count = state.camera_packet_count.saturating_add(1); - state - .first_camera_remote_pts_us - .get_or_insert(remote_pts_us); - state.camera_startup_ready = true; - } - UpstreamMediaKind::Microphone => { - state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); - state - .first_microphone_remote_pts_us - .get_or_insert(remote_pts_us); - } + UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), + UpstreamMediaKind::Microphone => state.active_microphone_generation = Some(generation), } - update_latest_remote_pts(&mut state, kind, remote_pts_us); - if state.session_base_remote_pts_us.is_none() { - state.session_base_remote_pts_us = Some(bundle_base_remote_pts_us); - state.playout_epoch = Some(bundle_epoch); - state.pairing_anchor_deadline = Some(bundle_epoch); - state.phase = UpstreamSyncPhase::Syncing; - state.last_reason = "client-bundled upstream media epoch established".to_string(); - self.pairing_state_notify.notify_waiters(); - info!( - session_id, - bundle_base_remote_pts_us, "client-bundled upstream media epoch established" - ); + UpstreamStreamLease { + session_id: state.session_id, + generation, } - - let session_base_remote_pts_us = state - .session_base_remote_pts_us - .unwrap_or(bundle_base_remote_pts_us); - if remote_pts_us < session_base_remote_pts_us { - return UpstreamPlanDecision::DropBeforeOverlap; - } - - let max_live_lag = upstream_max_live_lag(); - let source_lag = source_lag_for_kind(&state, kind, remote_pts_us); - if source_lag > max_live_lag { - match kind { - UpstreamMediaKind::Camera => { - state.stale_video_drops = state.stale_video_drops.saturating_add(1); - state.video_freezes = state.video_freezes.saturating_add(1); - state.last_reason = - "dropped stale bundled video beyond max live lag".to_string(); - } - UpstreamMediaKind::Microphone => { - state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); - state.last_reason = - "dropped stale bundled audio beyond max live lag".to_string(); - } - } - state.phase = UpstreamSyncPhase::Healing; - return UpstreamPlanDecision::DropStale("bundled packet exceeded max live lag"); - } - - let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); - let last_slot = match kind { - UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, - UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, - }; - if let Some(last_pts_us) = *last_slot - && local_pts_us <= last_pts_us - { - local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); - } - *last_slot = Some(local_pts_us); - - let sink_offset_us = self.bundled_playout_offset_us(kind); - let epoch = state.playout_epoch.unwrap_or(bundle_epoch); - let mut due_at = - apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); - let now = Instant::now(); - let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); - let playout_delay = upstream_bundled_playout_delay().min(max_live_lag); - let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); - let max_future_wait = max_live_lag.saturating_sub(source_lag); - let output_offset = if sink_offset_us >= 0 { - Duration::from_micros(sink_offset_us as u64) - } else { - Duration::ZERO - }; - let due_future_wait = due_at.saturating_duration_since(now); - let due_future_wait_before_output_compensation = - due_future_wait.saturating_sub(output_offset); - if late_by > reanchor_threshold - || due_future_wait_before_output_compensation > max_future_wait - { - let desired_delay = playout_delay.min(max_future_wait); - let desired_due_at = now + desired_delay; - let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); - let recovered_epoch = unoffset_due_at - .checked_sub(Duration::from_micros(local_pts_us)) - .unwrap_or(unoffset_due_at); - state.playout_epoch = Some(recovered_epoch); - state.pairing_anchor_deadline = Some(desired_due_at); - state.freshness_reanchors = state.freshness_reanchors.saturating_add(1); - state.phase = UpstreamSyncPhase::Healing; - state.last_reason = - "reanchored bundled upstream playhead to preserve freshness".to_string(); - due_at = apply_playout_offset( - recovered_epoch + Duration::from_micros(local_pts_us), - sink_offset_us, - ); - late_by = now.checked_duration_since(due_at).unwrap_or_default(); - info!( - session_id, - ?kind, - local_pts_us, - remote_pts_us, - recovery_buffer_ms = desired_delay.as_millis(), - max_live_lag_ms = max_live_lag.as_millis(), - source_lag_ms = source_lag.as_millis(), - output_offset_ms = output_offset.as_millis(), - "bundled upstream media playhead reanchored to preserve freshness" - ); - } - let predicted_lag_before_output_compensation = source_lag.saturating_add( - due_at - .saturating_duration_since(now) - .saturating_sub(output_offset), - ); - if predicted_lag_before_output_compensation > max_live_lag { - match kind { - UpstreamMediaKind::Camera => { - state.stale_video_drops = state.stale_video_drops.saturating_add(1); - state.video_freezes = state.video_freezes.saturating_add(1); - state.last_reason = "dropped bundled video that would exceed max live lag before output compensation".to_string(); - } - UpstreamMediaKind::Microphone => { - state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); - state.last_reason = "dropped bundled audio that would exceed max live lag before output compensation".to_string(); - } - } - state.phase = UpstreamSyncPhase::Healing; - return UpstreamPlanDecision::DropStale( - "bundled packet would exceed max live lag before output compensation", - ); - } - - if kind == UpstreamMediaKind::Microphone { - self.audio_progress_notify.notify_waiters(); - } - UpstreamPlanDecision::Play(PlannedUpstreamPacket { - local_pts_us, - due_at, - late_by, - source_lag, - }) } - /// Hold video until the audio master has at least reached the same capture - /// moment, or until the bounded sync grace is exhausted. - pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { - let slack_us = upstream_pairing_master_slack() - .as_micros() - .min(u64::MAX as u128) as u64; - let audio_delay_allowance_us = self.positive_audio_delay_allowance_us(); - let deadline = due_at + upstream_audio_master_wait_grace(); - loop { - let notified = self.audio_progress_notify.notified(); + fn is_active(&self, kind: UpstreamMediaKind, generation: u64) -> bool { + let state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + match kind { + UpstreamMediaKind::Camera => state.active_camera_generation == Some(generation), + UpstreamMediaKind::Microphone => state.active_microphone_generation == Some(generation), + } + } + + fn close(&self, kind: UpstreamMediaKind, generation: u64) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + match kind { + UpstreamMediaKind::Camera if state.active_camera_generation == Some(generation) => { + state.active_camera_generation = None + } + UpstreamMediaKind::Microphone + if state.active_microphone_generation == Some(generation) => { - let state = self - .state - .lock() - .expect("upstream media state mutex poisoned"); - if state.active_microphone_generation.is_none() { - return true; - } - let audio_presented_pts_us = state.last_audio_presented_pts_us.unwrap_or(0); - if audio_presented_pts_us - .saturating_add(slack_us) - .saturating_add(audio_delay_allowance_us) - >= video_local_pts_us - { - return true; - } - } - if Instant::now() >= deadline { - return false; - } - tokio::select! { - _ = notified => {} - _ = tokio::time::sleep_until(deadline) => return false, + state.active_microphone_generation = None } + _ => return, + } + if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() + { + reset_session_state(&mut state); } } - fn plan_pts( + fn plan_legacy_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, + ) -> UpstreamPlanDecision { + self.plan_rebased_pts(kind, remote_pts_us, min_step_us.max(1), None, None) + } + + fn plan_rebased_pts( + &self, + kind: UpstreamMediaKind, + remote_pts_us: u64, + min_step_us: u64, + explicit_base: Option, + explicit_epoch: Option, ) -> UpstreamPlanDecision { let mut state = self .state .lock() .expect("upstream media state mutex poisoned"); - let session_id = state.session_id; - let packet_count = match kind { - UpstreamMediaKind::Camera => { - state.camera_packet_count = state.camera_packet_count.saturating_add(1); - state.camera_packet_count - } + match kind { + UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us = Some(remote_pts_us), UpstreamMediaKind::Microphone => { - state.microphone_packet_count = state.microphone_packet_count.saturating_add(1); - state.microphone_packet_count + state.latest_microphone_remote_pts_us = Some(remote_pts_us) } + } + let base = match explicit_base { + Some(base) => *state.base_remote_pts_us.get_or_insert(base), + None => *state.base_remote_pts_us.get_or_insert(remote_pts_us), }; - update_latest_remote_pts(&mut state, kind, remote_pts_us); - let mut first_remote_for_kind = match kind { - UpstreamMediaKind::Camera => { - let first_slot = &mut state.first_camera_remote_pts_us; - *first_slot.get_or_insert(remote_pts_us) - } - UpstreamMediaKind::Microphone => { - let first_slot = &mut state.first_microphone_remote_pts_us; - *first_slot.get_or_insert(remote_pts_us) - } + let epoch = match explicit_epoch { + Some(epoch) => *state.playout_epoch.get_or_insert(epoch), + None => *state + .playout_epoch + .get_or_insert(Instant::now() + upstream_playout_delay()), }; - if kind == UpstreamMediaKind::Camera { - let startup_grace_us = upstream_camera_startup_grace_us(); - if !state.camera_startup_ready - && (startup_grace_us == 0 - || remote_pts_us.saturating_sub(first_remote_for_kind) >= startup_grace_us) - { - state.camera_startup_ready = true; - state.first_camera_remote_pts_us = Some(remote_pts_us); - first_remote_for_kind = remote_pts_us; - } - } - let now = Instant::now(); - let pairing_deadline = *state - .pairing_anchor_deadline - .get_or_insert_with(|| now + upstream_playout_delay()); - let playout_delay = upstream_playout_delay(); - let max_live_lag = upstream_max_live_lag(); - - if state.session_base_remote_pts_us.is_none() { - if state.session_started_at.is_some_and(|started_at| { - now.saturating_duration_since(started_at) > upstream_startup_timeout() - }) { - state.phase = UpstreamSyncPhase::Failed; - state.startup_timeouts = state.startup_timeouts.saturating_add(1); - state.last_reason = - "paired upstream startup did not converge before timeout".to_string(); - return UpstreamPlanDecision::StartupFailed( - "paired upstream startup did not converge before timeout", - ); - } - if state.first_camera_remote_pts_us.is_some() - && state.first_microphone_remote_pts_us.is_some() - && state.camera_startup_ready - { - let first_camera_remote_pts_us = - state.first_camera_remote_pts_us.unwrap_or_default(); - let first_microphone_remote_pts_us = - state.first_microphone_remote_pts_us.unwrap_or_default(); - state.session_base_remote_pts_us = - Some(first_camera_remote_pts_us.max(first_microphone_remote_pts_us)); - let overlap_epoch = now + playout_delay; - state.playout_epoch = Some(overlap_epoch); - state.pairing_anchor_deadline = Some(overlap_epoch); - state.phase = UpstreamSyncPhase::Syncing; - state.last_reason = "fresh audio/video overlap anchor established".to_string(); - if !state.startup_anchor_logged { - let startup_delta_us = - first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128; - info!( - session_id, - first_camera_remote_pts_us, - first_microphone_remote_pts_us, - overlap_base_remote_pts_us = - state.session_base_remote_pts_us.unwrap_or_default(), - startup_delta_us, - "upstream media overlap anchors established" - ); - state.startup_anchor_logged = true; - } - self.pairing_state_notify.notify_waiters(); - } else if now < pairing_deadline { - state.phase = UpstreamSyncPhase::Acquiring; - state.last_reason = "awaiting both upstream media streams".to_string(); - if upstream_timing_trace_enabled() - && (packet_count <= 10 || packet_count.is_multiple_of(300)) - { - info!( - session_id, - ?kind, - packet_count, - remote_pts_us, - wait_ms = pairing_deadline.saturating_duration_since(now).as_millis(), - "upstream media packet buffered while awaiting the counterpart stream" - ); - } - return UpstreamPlanDecision::AwaitingPair; - } else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready { - state.phase = UpstreamSyncPhase::Syncing; - state.last_reason = "camera startup warm-up is still in progress".to_string(); - if upstream_timing_trace_enabled() - && (packet_count <= 10 || packet_count.is_multiple_of(300)) - { - info!( - session_id, - ?kind, - packet_count, - remote_pts_us, - "upstream media packet buffered while camera startup warm-up is still in progress" - ); - } - return UpstreamPlanDecision::AwaitingPair; - } else if upstream_require_paired_startup() { - let refreshed = refresh_unpaired_pairing_anchor( - &mut state, - kind, - remote_pts_us, - now + playout_delay, - ); - if refreshed || upstream_timing_trace_enabled() { - info!( - session_id, - ?kind, - packet_count, - remote_pts_us, - refreshed_anchor = refreshed, - healing_window_ms = playout_delay.as_millis(), - "upstream media pairing window expired; holding one-sided stream for synced startup" - ); - } - state.phase = UpstreamSyncPhase::Syncing; - state.last_reason = "holding one-sided stream for synced startup".to_string(); - return UpstreamPlanDecision::AwaitingPair; - } else { - let single_stream_base_remote_pts_us = match kind { - UpstreamMediaKind::Camera => { - state.first_camera_remote_pts_us.unwrap_or(remote_pts_us) - } - UpstreamMediaKind::Microphone => state - .first_microphone_remote_pts_us - .unwrap_or(remote_pts_us), - }; - state.session_base_remote_pts_us = Some(single_stream_base_remote_pts_us); - let one_sided_epoch = now + playout_delay; - state.playout_epoch = Some(one_sided_epoch); - state.pairing_anchor_deadline = Some(one_sided_epoch); - info!( - session_id, - ?kind, - single_stream_base_remote_pts_us, - "upstream media pairing window expired; continuing with one-sided playout" - ); - self.pairing_state_notify.notify_waiters(); - } - } - - let session_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or(remote_pts_us); - if remote_pts_us < session_base_remote_pts_us { - if upstream_timing_trace_enabled() - && (packet_count <= 10 || packet_count.is_multiple_of(300)) - { - info!( - session_id, - ?kind, - packet_count, - remote_pts_us, - session_base_remote_pts_us, - "upstream media packet dropped before the shared overlap base" - ); - } - return UpstreamPlanDecision::DropBeforeOverlap; - } - - let source_lag = source_lag_for_kind(&state, kind, remote_pts_us); - if source_lag > max_live_lag { - match kind { - UpstreamMediaKind::Camera => { - state.stale_video_drops = state.stale_video_drops.saturating_add(1); - state.video_freezes = state.video_freezes.saturating_add(1); - state.last_reason = "dropped stale video beyond max live lag".to_string(); - } - UpstreamMediaKind::Microphone => { - state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); - state.last_reason = "dropped stale audio beyond max live lag".to_string(); - } - } - state.phase = UpstreamSyncPhase::Healing; - return UpstreamPlanDecision::DropStale("packet exceeded max live lag"); - } - - let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); + let mut local_pts_us = remote_pts_us.saturating_sub(base); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, @@ -724,309 +560,30 @@ impl UpstreamMediaRuntime { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); - let audio_ahead_video_allowance_us = self.audio_ahead_video_allowance_us(); - if kind == UpstreamMediaKind::Camera - && state - .last_audio_presented_pts_us - .is_some_and(|audio_pts_us| { - video_is_too_far_behind_audio( - local_pts_us, - audio_pts_us, - audio_ahead_video_allowance_us, - ) - }) - { - state.skew_video_drops = state.skew_video_drops.saturating_add(1); - state.video_freezes = state.video_freezes.saturating_add(1); - state.phase = UpstreamSyncPhase::Healing; - state.last_reason = - "dropped video frame that was too far behind the audio master".to_string(); - return UpstreamPlanDecision::DropStale("video frame was too far behind audio master"); - } - let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); - let sink_offset_us = self.playout_offset_us(kind); - let playout_delay = upstream_playout_delay().min(max_live_lag); - let mut due_at = - apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); - let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); - let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); - let intentional_future_wait_allowance = - Duration::from_micros(self.intentional_future_wait_allowance_us(kind)); - let max_future_wait = max_live_lag - .saturating_sub(source_lag) - .saturating_add(intentional_future_wait_allowance); - let due_future_wait = due_at.saturating_duration_since(now); - if late_by > reanchor_threshold || due_future_wait > max_future_wait { - let old_late_by = late_by; - let old_future_wait = due_future_wait; - let desired_delay = playout_delay.min(max_future_wait); - let desired_due_at = now + desired_delay; - let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); - let recovered_epoch = unoffset_due_at - .checked_sub(Duration::from_micros(local_pts_us)) - .unwrap_or(unoffset_due_at); - state.playout_epoch = Some(recovered_epoch); - state.pairing_anchor_deadline = Some(desired_due_at); - state.freshness_reanchors = state.freshness_reanchors.saturating_add(1); - state.phase = UpstreamSyncPhase::Healing; - state.last_reason = "reanchored upstream playhead to preserve freshness".to_string(); - due_at = apply_playout_offset( - recovered_epoch + Duration::from_micros(local_pts_us), - sink_offset_us, - ); - late_by = now.checked_duration_since(due_at).unwrap_or_default(); - info!( - session_id, - ?kind, - packet_count, - local_pts_us, - remote_pts_us, - old_late_by_ms = old_late_by.as_millis(), - old_future_wait_ms = old_future_wait.as_millis(), - recovery_buffer_ms = playout_delay.as_millis(), - reanchor_threshold_ms = reanchor_threshold.as_millis(), - max_live_lag_ms = max_live_lag.as_millis(), - source_lag_ms = source_lag.as_millis(), - "upstream media playhead reanchored to preserve freshness" - ); - } - let predicted_lag_at_playout = - source_lag.saturating_add(due_at.saturating_duration_since(now)); - if predicted_lag_at_playout > max_live_lag.saturating_add(intentional_future_wait_allowance) - { - match kind { - UpstreamMediaKind::Camera => { - state.stale_video_drops = state.stale_video_drops.saturating_add(1); - state.video_freezes = state.video_freezes.saturating_add(1); - state.last_reason = - "dropped video that would exceed max live lag at playout".to_string(); - } - UpstreamMediaKind::Microphone => { - state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); - state.last_reason = - "dropped audio that would exceed max live lag at playout".to_string(); - } - } - state.phase = UpstreamSyncPhase::Healing; - return UpstreamPlanDecision::DropStale("packet would exceed max live lag at playout"); - } - if upstream_timing_trace_enabled() - && (packet_count <= 10 || packet_count.is_multiple_of(300)) - { - let playout_delay_us = due_at.saturating_duration_since(now).as_micros(); - let late_by_us = late_by.as_micros(); - info!( - session_id, - ?kind, - packet_count, - remote_pts_us, - session_base_remote_pts_us, - first_remote_for_kind, - remote_elapsed_us = remote_pts_us.saturating_sub(session_base_remote_pts_us), - local_pts_us, - playout_delay_us, - sink_offset_us, - late_by_us, - source_lag_us = source_lag.as_micros(), - "upstream media rebase sample" - ); - } - if kind == UpstreamMediaKind::Microphone { - self.audio_progress_notify.notify_waiters(); - } + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "v2 legacy packet mapped without cross-stream planner".to_string(); + let due_at = apply_offset( + epoch + Duration::from_micros(local_pts_us), + self.playout_offset_us(kind), + ); + let late_by = Instant::now() + .checked_duration_since(due_at) + .unwrap_or_default(); UpstreamPlanDecision::Play(PlannedUpstreamPacket { local_pts_us, due_at, late_by, - source_lag, + source_lag: Duration::ZERO, }) } -} -fn update_latest_remote_pts( - state: &mut UpstreamClockState, - kind: UpstreamMediaKind, - remote_pts_us: u64, -) { - let slot = match kind { - UpstreamMediaKind::Camera => &mut state.latest_camera_remote_pts_us, - UpstreamMediaKind::Microphone => &mut state.latest_microphone_remote_pts_us, - }; - *slot = Some((*slot).unwrap_or(remote_pts_us).max(remote_pts_us)); -} - -fn source_lag_for_kind( - state: &UpstreamClockState, - kind: UpstreamMediaKind, - remote_pts_us: u64, -) -> Duration { - let latest = match kind { - UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us, - UpstreamMediaKind::Microphone => state.latest_microphone_remote_pts_us, - } - .unwrap_or(remote_pts_us); - Duration::from_micros(latest.saturating_sub(remote_pts_us)) -} - -fn video_is_too_far_behind_audio( - video_pts_us: u64, - audio_pts_us: u64, - audio_ahead_video_allowance_us: u64, -) -> bool { - let slack_us = (upstream_pairing_master_slack() - .as_micros() - .min(u64::MAX as u128) as u64) - .saturating_add(audio_ahead_video_allowance_us); - video_pts_us.saturating_add(slack_us) < audio_pts_us -} - -fn live_lag_us(state: &UpstreamClockState) -> Option { - let latest_audio = state.latest_microphone_remote_pts_us?; - let audio_playhead = state.last_audio_presented_pts_us?; - let base = state.session_base_remote_pts_us?; - Some(latest_audio.saturating_sub(base.saturating_add(audio_playhead))) -} - -fn us_to_ms(value: u64) -> f64 { - value as f64 / 1000.0 -} - -fn instant_delta_us(left: Instant, right: Instant) -> i128 { - if left >= right { - left.saturating_duration_since(right).as_micros() as i128 - } else { - -(right.saturating_duration_since(left).as_micros() as i128) - } -} - -const CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US: u64 = 250_000; - -fn push_timing_sample( - samples: &mut std::collections::VecDeque, - sample: state::UpstreamTimingSample, -) { - if samples.len() >= state::TIMING_WINDOW_CAPACITY { - samples.pop_front(); - } - samples.push_back(sample); -} - -fn abs_delta_us(left: u64, right: u64) -> u64 { - left.max(right) - left.min(right) -} - -fn nearest_timing_sample_by_send( - samples: &std::collections::VecDeque, - send_pts_us: u64, -) -> Option { - samples - .iter() - .copied() - .min_by_key(|sample| abs_delta_us(sample.send_pts_us, send_pts_us)) -} - -fn record_client_timing_windows( - state: &mut UpstreamClockState, - kind: UpstreamMediaKind, - sample: state::UpstreamTimingSample, -) { - let paired = match kind { - UpstreamMediaKind::Camera => { - nearest_timing_sample_by_send(&state.recent_microphone_timing, sample.send_pts_us) - .map(|microphone| (sample, microphone)) + fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 { + match kind { + UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed), + UpstreamMediaKind::Microphone => { + self.microphone_playout_offset_us.load(Ordering::Relaxed) + } } - UpstreamMediaKind::Microphone => { - nearest_timing_sample_by_send(&state.recent_camera_timing, sample.send_pts_us) - .map(|camera| (camera, sample)) - } - }; - let Some((camera, microphone)) = paired else { - return; - }; - if abs_delta_us(camera.send_pts_us, microphone.send_pts_us) - > CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US - { - return; - } - let client_capture_skew_ms = - (camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0; - let client_send_skew_ms = - (camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0; - let server_receive_skew_ms = - instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0; - - state.latest_paired_client_capture_skew_ms = Some(client_capture_skew_ms); - state.latest_paired_client_send_skew_ms = Some(client_send_skew_ms); - state.latest_paired_server_receive_skew_ms = Some(server_receive_skew_ms); - state - .client_capture_skew_window_ms - .push(client_capture_skew_ms); - state.client_send_skew_window_ms.push(client_send_skew_ms); - state - .server_receive_skew_window_ms - .push(server_receive_skew_ms); -} - -fn record_presentation_sample( - state: &mut UpstreamClockState, - kind: UpstreamMediaKind, - due_at: Instant, -) { - let sample = state::UpstreamPresentationSample { - due_at, - handed_at: Instant::now(), - }; - let late_ms = presentation_late_ms(sample).max(0.0); - match kind { - UpstreamMediaKind::Camera => { - state.latest_camera_presentation = Some(sample); - state.camera_sink_late_window_ms.push(late_ms); - } - UpstreamMediaKind::Microphone => { - state.latest_microphone_presentation = Some(sample); - state.microphone_sink_late_window_ms.push(late_ms); - } - } - if let Some(skew_ms) = latest_sink_handoff_skew_ms(state) { - state.sink_handoff_skew_window_ms.push(skew_ms); - } -} - -fn latest_sink_handoff_skew_ms(state: &UpstreamClockState) -> Option { - let (Some(camera), Some(microphone)) = ( - state.latest_camera_presentation, - state.latest_microphone_presentation, - ) else { - return None; - }; - let due_at_delta_ms = instant_delta_us(camera.due_at, microphone.due_at).abs() as f64 / 1000.0; - if due_at_delta_ms > 250.0 { - return None; - } - Some(instant_delta_us(camera.handed_at, microphone.handed_at) as f64 / 1000.0) -} - -fn presentation_late_ms(sample: state::UpstreamPresentationSample) -> f64 { - instant_delta_us(sample.handed_at, sample.due_at) as f64 / 1000.0 -} - -fn refresh_unpaired_pairing_anchor( - state: &mut UpstreamClockState, - kind: UpstreamMediaKind, - remote_pts_us: u64, - next_deadline: Instant, -) -> bool { - state.pairing_anchor_deadline = Some(next_deadline); - match kind { - UpstreamMediaKind::Camera if state.first_microphone_remote_pts_us.is_none() => { - state.first_camera_remote_pts_us = Some(remote_pts_us); - true - } - UpstreamMediaKind::Microphone if state.first_camera_remote_pts_us.is_none() => { - state.first_microphone_remote_pts_us = Some(remote_pts_us); - true - } - _ => false, } } @@ -1036,5 +593,159 @@ impl Default for UpstreamMediaRuntime { } } -#[cfg(test)] -mod tests; +fn reset_session_state(state: &mut RuntimeState) { + state.base_remote_pts_us = None; + state.playout_epoch = None; + state.latest_camera_remote_pts_us = None; + state.latest_microphone_remote_pts_us = None; + state.last_video_local_pts_us = None; + state.last_audio_local_pts_us = None; + state.last_video_presented_pts_us = None; + state.last_audio_presented_pts_us = None; + state.latest_camera_timing = None; + state.latest_microphone_timing = None; + state.latest_camera_presentation = None; + state.latest_microphone_presentation = None; + state.latest_paired_client_capture_skew_ms = None; + state.latest_paired_client_send_skew_ms = None; + state.latest_paired_server_receive_skew_ms = None; + state.stale_audio_drops = 0; + state.stale_video_drops = 0; + state.skew_video_drops = 0; + state.freshness_reanchors = 0; + state.startup_timeouts = 0; + state.video_freezes = 0; +} + +fn record_timing_pair(state: &mut RuntimeState) { + let (Some(camera), Some(microphone)) = + (state.latest_camera_timing, state.latest_microphone_timing) + else { + return; + }; + let capture_skew_ms = delta_ms(microphone.capture_pts_us, camera.capture_pts_us); + let send_skew_ms = delta_ms(microphone.send_pts_us, camera.send_pts_us); + let receive_skew_ms = signed_duration_ms(microphone.received_at, camera.received_at); + state.latest_paired_client_capture_skew_ms = Some(capture_skew_ms); + state.latest_paired_client_send_skew_ms = Some(send_skew_ms); + state.latest_paired_server_receive_skew_ms = Some(receive_skew_ms); + state.client_capture_skew_window_ms.push(capture_skew_ms); + state.client_send_skew_window_ms.push(send_skew_ms); + state.server_receive_skew_window_ms.push(receive_skew_ms); +} + +fn record_presentation(state: &mut RuntimeState, kind: UpstreamMediaKind, due_at: Instant) { + let sample = PresentationSample { + due_at, + handed_at: Instant::now(), + }; + match kind { + UpstreamMediaKind::Camera => { + state.latest_camera_presentation = Some(sample); + state + .camera_sink_late_window_ms + .push(presentation_late_ms(sample)); + } + UpstreamMediaKind::Microphone => { + state.latest_microphone_presentation = Some(sample); + state + .microphone_sink_late_window_ms + .push(presentation_late_ms(sample)); + } + } + if let Some(skew) = latest_sink_handoff_skew_ms(state) { + state.sink_handoff_skew_window_ms.push(skew); + } +} + +fn live_lag_ms(state: &RuntimeState) -> Option { + let latest = state + .latest_camera_remote_pts_us + .into_iter() + .chain(state.latest_microphone_remote_pts_us) + .max()?; + let base = state.base_remote_pts_us.unwrap_or(latest); + Some(latest.saturating_sub(base) as f64 / 1000.0) +} + +fn planner_skew_ms(state: &RuntimeState) -> Option { + match ( + state.last_audio_presented_pts_us, + state.last_video_presented_pts_us, + ) { + (Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0), + _ => None, + } +} + +fn latest_sink_handoff_skew_ms(state: &RuntimeState) -> Option { + let camera = state.latest_camera_presentation?; + let microphone = state.latest_microphone_presentation?; + Some(presentation_late_signed_ms(microphone) - presentation_late_signed_ms(camera)) +} + +fn presentation_late_ms(sample: PresentationSample) -> f64 { + presentation_late_signed_ms(sample).max(0.0) +} + +fn presentation_late_signed_ms(sample: PresentationSample) -> f64 { + signed_duration_ms(sample.handed_at, sample.due_at) +} + +fn age_ms(now: Instant, then: Instant) -> f64 { + now.saturating_duration_since(then).as_secs_f64() * 1000.0 +} + +fn signed_duration_ms(left: Instant, right: Instant) -> f64 { + if left >= right { + left.duration_since(right).as_secs_f64() * 1000.0 + } else { + -(right.duration_since(left).as_secs_f64() * 1000.0) + } +} + +fn delta_ms(left_us: u64, right_us: u64) -> f64 { + (left_us as i128 - right_us as i128) as f64 / 1000.0 +} + +fn percentile(values: impl Iterator, quantile: f64) -> Option { + let mut sorted = values.filter(|value| value.is_finite()).collect::>(); + if sorted.is_empty() { + return None; + } + sorted.sort_by(|left, right| left.total_cmp(right)); + let index = ((sorted.len() - 1) as f64 * quantile.clamp(0.0, 1.0)).ceil() as usize; + sorted.get(index).copied() +} + +fn upstream_playout_delay() -> Duration { + let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(80); + Duration::from_millis(delay_ms) +} + +fn playout_offset_us(kind: UpstreamMediaKind) -> i64 { + let name = match kind { + UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", + UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", + }; + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(match kind { + UpstreamMediaKind::Camera => FACTORY_MJPEG_VIDEO_OFFSET_US, + UpstreamMediaKind::Microphone => FACTORY_MJPEG_AUDIO_OFFSET_US, + }) +} + +fn apply_offset(instant: Instant, offset_us: i64) -> Instant { + if offset_us >= 0 { + instant + Duration::from_micros(offset_us as u64) + } else { + instant + .checked_sub(Duration::from_micros(offset_us.unsigned_abs())) + .unwrap_or(instant) + } +}