feat: rebuild upstream media v2

This commit is contained in:
Brad Stein 2026-05-03 12:22:33 -03:00
parent 248f1b7a47
commit 3011dabc92
22 changed files with 2114 additions and 1235 deletions

View File

@ -1,42 +1,41 @@
# Lesavka Agent Notes # Lesavka Agent Notes
## 0.18.5 Bundled Webcam A/V Migration Checklist ## 0.19.0 Upstream Media v2 Rebuild Checklist
Context: manual Google Meet and mirrored-probe testing showed the split webcam Context: manual Google Meet testing showed the 0.18.x upstream media stack was
and microphone uplink design is too fragile under real browser/device pressure. still capable of seconds-scale lag and A/V skew. Treat that implementation as
The new product contract is: when webcam video is present, microphone audio quarantined v1, not as something to tune. The v2 contract is deliberately small:
travels with it on one client-owned upstream media stream. The server manages when webcam video is active, microphone audio and camera frames travel through
freshness and smoothness after arrival; it no longer tries to make two racing one client-owned bundle path; the server maps that client capture clock onto one
upstream channels look synchronized. Microphone-only remains supported as the local epoch, applies only explicit UVC/UAC output-path offsets, and drops stale
explicit no-camera path. bundles as a unit. Microphone-only remains supported as the explicit no-camera
path.
### Product Invariants ### Product Invariants
- [x] Webcam-enabled sessions use one bundled upstream media RPC by default. - [x] Webcam-enabled sessions use one bundled upstream media RPC by default.
- [x] Webcam-enabled sessions imply microphone capture when the server supports UAC. - [x] Webcam-enabled sessions imply microphone capture when the server supports UAC.
- [x] The previous upstream media runtime/planner is quarantined under
`quarantine/upstream-media-v1/` with retained-idea notes.
- [x] The UI-selected camera, camera quality, microphone, speaker, gain, and - [x] The UI-selected camera, camera quality, microphone, speaker, gain, and
enable switches remain authoritative; defaults may not override visible UI state. enable switches remain authoritative; defaults may not override visible UI state.
- [x] Client capture timestamps are the source of A/V sync truth for webcam sessions. - [x] Client capture timestamps are the source of A/V sync truth for webcam sessions.
- [x] Server bundled playout rebases that client timeline onto a fresh local epoch. - [x] Server v2 playout rebases that client timeline onto a fresh local epoch.
- [x] Server bundled playout may drop stale packets, but must not rebuild sync by - [x] Server v2 may drop stale bundles, but must not rebuild sync by independently
independently pairing separate camera and microphone streams. pairing separate camera and microphone streams.
- [x] Mic-only sessions keep the existing microphone stream path. - [x] Mic-only sessions keep an explicit no-camera audio path.
- [x] Legacy split webcam/mic uplink is only an explicit compatibility escape hatch. - [x] Legacy split webcam/mic uplink is only an explicit compatibility escape hatch.
- [x] Manual probes and diagnostics clearly label `bundled-webcam-media` versus - [x] Manual probes and diagnostics clearly label `bundled-webcam-media` versus
`mic-only` so we never confuse the architectures during debugging. `mic-only` so we never confuse the architectures during debugging.
- [x] Sync protection takes precedence over freshness and smoothness: bad mixed - [x] Sync protection takes precedence over freshness and smoothness: bad mixed
bundle timing is dropped coherently instead of letting one side play alone. bundle timing is dropped coherently instead of letting one side play alone.
- [x] Startup video is allowed to prime UVC if a first mixed bundle has bad
audio/video timing; the mismatched audio is dropped, preserving sync while
avoiding browser `Camera is starting` starvation.
- [x] An already-attached UVC gadget descriptor is the physical browser contract: - [x] An already-attached UVC gadget descriptor is the physical browser contract:
if it still advertises an older profile, server handshake/capture sizing if it still advertises an older profile, server handshake/capture sizing
follows that live descriptor until a controlled gadget rebuild is allowed. follows that live descriptor until a controlled gadget rebuild is allowed.
- [x] Bundled webcam sessions enforce freshness before output-path compensation; - [x] Bundled webcam sessions use the shared client capture timeline for transit
sync-critical measured UVC/UAC offsets are allowed to add presentation sync, then apply runtime output-path calibration as explicit per-device
delay because audio/video sync is higher priority than freshness. handoff delays.
- [x] Bundled webcam sessions use the shared client capture timeline for - [x] Optional common playout delay is only smoothness slack; it cannot clip or
transit sync, then apply runtime output-path calibration when splitting replace sync-critical UVC/UAC offsets.
into UVC/UAC so Meet sees synchronized presentation.
### Wire Protocol ### Wire Protocol
- [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or - [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or
@ -52,6 +51,12 @@ explicit no-camera path.
- [x] Spawn one bundled capture/uplink task instead of separate camera and mic - [x] Spawn one bundled capture/uplink task instead of separate camera and mic
tasks for webcam sessions. tasks for webcam sessions.
- [x] Bundle camera frames and microphone packets into one freshness-bounded queue. - [x] Bundle camera frames and microphone packets into one freshness-bounded queue.
- [x] With a camera active, do not flush microphone packets as standalone upstream
bundles; trim old pending audio and emit with a video frame instead.
- [x] Add a short video/audio grace window so audio captured beside a frame has
a chance to join that frame's bundle before uplink.
- [x] Keep the microphone-only RPC running as the no-camera path even when the
server supports bundled webcam media; it yields while camera is active.
- [x] Bound the pre-bundle capture handoff channel so camera/mic workers drop - [x] Bound the pre-bundle capture handoff channel so camera/mic workers drop
old events under pressure instead of building unbounded latency. old events under pressure instead of building unbounded latency.
- [x] Drop lag-clamped camera and microphone source buffers before bundling; - [x] Drop lag-clamped camera and microphone source buffers before bundling;
@ -70,15 +75,15 @@ explicit no-camera path.
for one upstream session. for one upstream session.
- [x] Schedule bundled packets by shared client capture timestamp instead of - [x] Schedule bundled packets by shared client capture timestamp instead of
startup-pairing independent streams. startup-pairing independent streams.
- [x] Replace the old bundled event sorter/reanchor planner with one v2 bundle
clock and explicit per-device handoff scheduling.
- [x] Sanitize packet timestamps before bundling so stale/future source PTS values - [x] Sanitize packet timestamps before bundling so stale/future source PTS values
cannot become the server's A/V sync truth. cannot become the server's A/V sync truth.
- [x] Make server bundled scheduling use the client capture sidecar rather than - [x] Make server bundled scheduling use the client capture sidecar rather than
raw packet `pts`, and reset the bundled epoch on client-session changes. raw packet `pts`, and reset the bundled epoch on client-session changes.
- [x] Keep server freshness drops/reanchors active for bundled media. - [x] Drop bundles coherently when they are already outside the live age budget.
- [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning. - [x] Drop mixed A/V bundles coherently when capture timestamps are too far apart
- [x] Reanchor bundled playout when due times drift too far before output-path to represent one real capture moment.
compensation, and drop packets whose predicted pre-compensation playout
age would exceed the one-second freshness budget.
- [x] Keep bundled UVC/UAC output-path compensation authoritative; do not clip - [x] Keep bundled UVC/UAC output-path compensation authoritative; do not clip
measured offsets just to improve freshness when that would break sync. measured offsets just to improve freshness when that would break sync.
- [x] Activate the camera relay before opening the microphone sink so UVC can - [x] Activate the camera relay before opening the microphone sink so UVC can

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.18.5" version = "0.19.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.18.5" version = "0.19.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.18.5" version = "0.19.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.18.5" version = "0.19.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -297,7 +297,7 @@ impl LesavkaClientApp {
)); ));
} }
} }
if microphone_available && !bundled_webcam_media { if microphone_available {
let ep = vid_ep.clone(); let ep = vid_ep.clone();
let mic_telemetry = let mic_telemetry =
uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone); uplink_telemetry.handle(crate::uplink_telemetry::UpstreamStreamKind::Microphone);
@ -307,6 +307,7 @@ impl LesavkaClientApp {
initial_mic_source.clone(), initial_mic_source.clone(),
mic_telemetry, mic_telemetry,
media_controls, media_controls,
bundled_webcam_media,
)); ));
} }

View File

@ -17,10 +17,8 @@ impl LesavkaClientApp {
loop { loop {
let state = media_controls.refresh(); let state = media_controls.refresh();
let camera_requested = state.camera; let camera_requested = state.camera;
let microphone_requested = state.microphone || state.camera; if !camera_requested {
if !camera_requested && !microphone_requested {
camera_telemetry.record_enabled(false); camera_telemetry.record_enabled(false);
microphone_telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
continue; continue;
} }
@ -244,6 +242,7 @@ impl LesavkaClientApp {
event_rx, event_rx,
stop, stop,
queue, queue,
camera.is_some(),
camera_telemetry, camera_telemetry,
microphone_telemetry, microphone_telemetry,
drop_log, drop_log,
@ -298,12 +297,17 @@ impl LesavkaClientApp {
initial_source: Option<String>, initial_source: Option<String>,
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
media_controls: crate::live_media_control::LiveMediaControls, media_controls: crate::live_media_control::LiveMediaControls,
pause_when_camera_active: bool,
) { ) {
let mut delay = Duration::from_secs(1); let mut delay = Duration::from_secs(1);
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop { loop {
let state = media_controls.refresh(); let state = media_controls.refresh();
if pause_when_camera_active && state.camera {
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
if !state.microphone { if !state.microphone {
telemetry.record_enabled(false); telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
@ -410,6 +414,12 @@ impl LesavkaClientApp {
let desired_source = state let desired_source = state
.microphone_source .microphone_source
.resolve(initial_source_thread.as_deref()); .resolve(initial_source_thread.as_deref());
if pause_when_camera_active && state.camera {
tracing::info!(
"🎤 microphone-only uplink yielding to bundled webcam A/V"
);
break;
}
if desired_source != active_source_thread { if desired_source != active_source_thread {
tracing::info!( tracing::info!(
from = active_source_thread.as_deref().unwrap_or("auto"), from = active_source_thread.as_deref().unwrap_or("auto"),
@ -780,8 +790,8 @@ const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
#[cfg(not(coverage))] #[cfg(not(coverage))]
const BUNDLED_MEDIA_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = const BUNDLED_MEDIA_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig { crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 16, capacity: 4,
max_age: Duration::from_millis(350), max_age: Duration::from_secs(1),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly,
}; };
@ -791,6 +801,9 @@ const BUNDLED_AUDIO_FLUSH_INTERVAL: Duration = Duration::from_millis(20);
#[cfg(not(coverage))] #[cfg(not(coverage))]
const BUNDLED_AUDIO_MAX_PENDING: usize = 8; const BUNDLED_AUDIO_MAX_PENDING: usize = 8;
#[cfg(not(coverage))]
const BUNDLED_VIDEO_AUDIO_GRACE: Duration = Duration::from_millis(30);
#[cfg(not(coverage))] #[cfg(not(coverage))]
const BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY: usize = 64; const BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY: usize = 64;
@ -807,6 +820,7 @@ fn bundle_captured_media(
event_rx: std::sync::mpsc::Receiver<BundledCaptureEvent>, event_rx: std::sync::mpsc::Receiver<BundledCaptureEvent>,
stop: Arc<AtomicBool>, stop: Arc<AtomicBool>,
queue: crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle>, queue: crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle>,
video_required: bool,
camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
drop_log: Arc<std::sync::Mutex<UplinkDropLogLimiter>>, drop_log: Arc<std::sync::Mutex<UplinkDropLogLimiter>>,
@ -816,18 +830,39 @@ fn bundle_captured_media(
.fetch_add(1, Ordering::Relaxed) .fetch_add(1, Ordering::Relaxed)
.saturating_add(1); .saturating_add(1);
let mut bundle_seq = 0_u64; let mut bundle_seq = 0_u64;
let mut pending_audio = Vec::new(); let mut pending_audio = Vec::<AudioPacket>::new();
let mut pending_video = None::<VideoPacket>;
let mut pending_video_deadline = None::<Instant>;
let mut next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; let mut next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL;
loop { loop {
if stop.load(Ordering::Relaxed) { if stop.load(Ordering::Relaxed) {
break; break;
} }
let timeout = next_audio_flush.saturating_duration_since(Instant::now()); let now = Instant::now();
let timeout = if video_required {
pending_video_deadline
.unwrap_or(now + BUNDLED_AUDIO_FLUSH_INTERVAL)
.saturating_duration_since(now)
} else {
next_audio_flush.saturating_duration_since(now)
};
match event_rx.recv_timeout(timeout) { match event_rx.recv_timeout(timeout) {
Ok(BundledCaptureEvent::Audio(packet)) => { Ok(BundledCaptureEvent::Audio(packet)) => {
pending_audio.push(packet); pending_audio.push(packet);
if pending_audio.len() >= BUNDLED_AUDIO_MAX_PENDING { if video_required {
let dropped = retain_newest_pending_audio(&mut pending_audio);
if dropped > 0 {
microphone_telemetry.record_stale_drop(dropped as u64);
log_uplink_drop(
&drop_log,
UplinkDropReason::Stale,
dropped as u64,
pending_audio.len(),
0.0,
);
}
} else if pending_audio.len() >= BUNDLED_AUDIO_MAX_PENDING {
emit_bundled_media( emit_bundled_media(
session_id, session_id,
&mut bundle_seq, &mut bundle_seq,
@ -842,24 +877,72 @@ fn bundle_captured_media(
} }
} }
Ok(BundledCaptureEvent::Video(packet)) => { Ok(BundledCaptureEvent::Video(packet)) => {
if let Some(video) = pending_video.take() {
emit_bundled_media( emit_bundled_media(
session_id, session_id,
&mut bundle_seq, &mut bundle_seq,
Some(packet), Some(video),
std::mem::take(&mut pending_audio), std::mem::take(&mut pending_audio),
&queue, &queue,
&camera_telemetry, &camera_telemetry,
&microphone_telemetry, &microphone_telemetry,
&drop_log, &drop_log,
); );
}
pending_video = Some(packet);
pending_video_deadline = Some(Instant::now() + BUNDLED_VIDEO_AUDIO_GRACE);
if !video_required {
emit_bundled_media(
session_id,
&mut bundle_seq,
pending_video.take(),
std::mem::take(&mut pending_audio),
&queue,
&camera_telemetry,
&microphone_telemetry,
&drop_log,
);
pending_video_deadline = None;
next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL;
} }
}
Ok(BundledCaptureEvent::Restart) => { Ok(BundledCaptureEvent::Restart) => {
if pending_video.is_some() || (!video_required && !pending_audio.is_empty()) {
emit_bundled_media(
session_id,
&mut bundle_seq,
pending_video.take(),
std::mem::take(&mut pending_audio),
&queue,
&camera_telemetry,
&microphone_telemetry,
&drop_log,
);
}
stop.store(true, Ordering::Relaxed); stop.store(true, Ordering::Relaxed);
break; break;
} }
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
if !pending_audio.is_empty() { if video_required {
if pending_video_deadline.is_some_and(|deadline| Instant::now() >= deadline) {
emit_bundled_media(
session_id,
&mut bundle_seq,
pending_video.take(),
std::mem::take(&mut pending_audio),
&queue,
&camera_telemetry,
&microphone_telemetry,
&drop_log,
);
pending_video_deadline = None;
} else {
let dropped = retain_newest_pending_audio(&mut pending_audio);
if dropped > 0 {
microphone_telemetry.record_stale_drop(dropped as u64);
}
}
} else if !pending_audio.is_empty() {
emit_bundled_media( emit_bundled_media(
session_id, session_id,
&mut bundle_seq, &mut bundle_seq,
@ -876,9 +959,31 @@ fn bundle_captured_media(
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
} }
} }
if pending_video.is_some() || (!video_required && !pending_audio.is_empty()) {
emit_bundled_media(
session_id,
&mut bundle_seq,
pending_video.take(),
std::mem::take(&mut pending_audio),
&queue,
&camera_telemetry,
&microphone_telemetry,
&drop_log,
);
}
queue.close(); queue.close();
} }
#[cfg(not(coverage))]
fn retain_newest_pending_audio(pending_audio: &mut Vec<AudioPacket>) -> usize {
if pending_audio.len() <= BUNDLED_AUDIO_MAX_PENDING {
return 0;
}
let dropped = pending_audio.len() - BUNDLED_AUDIO_MAX_PENDING;
pending_audio.drain(..dropped);
dropped
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn emit_bundled_media( fn emit_bundled_media(
session_id: u64, session_id: u64,

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.18.5" version = "0.19.0"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -247,18 +247,18 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_TOUCHPAD_SCALE` | input routing/clipboard override | | `LESAVKA_TOUCHPAD_SCALE` | input routing/clipboard override |
| `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_DEV` | server hardware/device override |
| `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default |
| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | legacy/split server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | | `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream output-path override; v2 uses it as the explicit UAC handoff delay relative to the shared client capture clock |
| `LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS` | server upstream sync override; how long video may wait past its nominal due time for UAC audio to reach the matching timestamp, defaults to `350` | | `LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS` | server upstream sync override; how long video may wait past its nominal due time for UAC audio to reach the matching timestamp, defaults to `350` |
| `LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US` | bundled webcam server playout override; defaults to the active runtime audio output-path calibration when unset; sync-critical measured offsets are not clipped for freshness | | `LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS` | compatibility alias for `LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS` |
| `LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS` | bundled webcam jitter buffer before output-path compensation; defaults to `350` to protect smooth synced playout | | `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | compatibility alias for `LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS` |
| `LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US` | bundled webcam server playout override; defaults to the active runtime video output-path calibration when unset; sync-critical measured offsets are not clipped for freshness |
| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` and is capped at `1000` |
| `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` | | `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` |
| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` | | `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` |
| `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` | | `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` |
| `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` | | `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` |
| `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | | `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging |
| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | legacy/split server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch, defaults to `1090000` for measured MJPEG/UVC browser-visible sync compensation | | `LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS` | v2 bundled webcam freshness ceiling; bundles already older than this are dropped as one unit, defaults to `1000` |
| `LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS` | v2 optional common playout slack after sync offsets; defaults to `20` and is reduced when needed to protect the live-age budget |
| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream output-path override; v2 uses it as the explicit UVC handoff delay relative to the shared client capture clock, defaults to the calibrated MJPEG/UVC offset |
| `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override | | `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override |
| `LESAVKA_UPLINK_MIC_LEVEL` | client media capture/playback override | | `LESAVKA_UPLINK_MIC_LEVEL` | client media capture/playback override |
| `LESAVKA_INSTALL_UVC_CODEC` | installer override; sets the persisted default UVC webcam codec in `/etc/lesavka/server.env` and `/etc/lesavka/uvc.env` | | `LESAVKA_INSTALL_UVC_CODEC` | installer override; sets the persisted default UVC webcam codec in `/etc/lesavka/server.env` and `/etc/lesavka/uvc.env` |

View File

@ -0,0 +1,16 @@
# Upstream Media v1 Quarantine
This folder preserves the old upstream media planner for reference only. It is intentionally outside the active Cargo module tree.
Good ideas worth reusing later, after v2 is stable:
- Explicit client timing sidecars for capture/send/queue age.
- Diagnostics windows for capture skew, send skew, receive skew, and sink handoff skew.
- Single-owner leases for camera and microphone streams.
- A single UAC sink permit so reconnects do not double-open ALSA.
Reasons this was quarantined:
- Bundled A/V was unpacked into independent events and scheduled one by one.
- Freshness recovery, output compensation, startup pairing, and per-kind drops interacted in ways that could create delay or asymmetry.
- The active bundled path was too complex to reason about while debugging real Google Meet lip sync.
The v2 active path must stay small: when video is present, audio and video travel in one bundle and the server releases that bundle coherently. Sync first, freshness second, smoothness third.

File diff suppressed because it is too large Load Diff

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.18.5" version = "0.19.0"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -1,133 +1,209 @@
#[cfg(not(coverage))] #[cfg(not(coverage))]
#[derive(Debug)] const MEDIA_V2_DEFAULT_PLAYOUT_DELAY_MS: u64 = 20;
enum BundledUpstreamEvent {
Audio(AudioPacket),
Video(VideoPacket),
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
impl BundledUpstreamEvent { const MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS: u64 = 1_000;
fn remote_pts_us(&self) -> u64 { #[cfg(not(coverage))]
self.client_timing().capture_pts_us const MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US: u64 = 250_000;
}
fn client_timing(&self) -> UpstreamClientTiming {
match self {
Self::Audio(packet) => audio_client_timing(packet),
Self::Video(packet) => video_client_timing(packet),
}
}
fn kind(&self) -> UpstreamMediaKind {
match self {
Self::Audio(_) => UpstreamMediaKind::Microphone,
Self::Video(_) => UpstreamMediaKind::Camera,
}
}
fn playout_order(&self) -> u8 {
match self {
Self::Audio(_) => 0,
Self::Video(_) => 1,
}
}
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
#[derive(Clone, Copy, Debug, Default)] #[derive(Clone, Copy, Debug, Default)]
struct BundledPlayoutClock { struct MediaV2Clock {
base_remote_pts_us: Option<u64>, base_capture_pts_us: Option<u64>,
epoch: Option<tokio::time::Instant>,
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
impl BundledPlayoutClock { impl MediaV2Clock {
fn ensure(&mut self, events: &[BundledUpstreamEvent]) -> Option<(u64, tokio::time::Instant)> { fn local_pts_us(&mut self, capture_pts_us: u64) -> u64 {
if self.base_remote_pts_us.is_none() || self.epoch.is_none() { let base = *self.base_capture_pts_us.get_or_insert(capture_pts_us);
let base = events.iter().map(BundledUpstreamEvent::remote_pts_us).min()?; capture_pts_us.saturating_sub(base)
self.base_remote_pts_us = Some(base);
self.epoch = Some(tokio::time::Instant::now() + bundled_upstream_playout_delay());
}
let base_remote_pts_us = self.base_remote_pts_us?;
let epoch = self.epoch?;
Some((base_remote_pts_us, epoch))
} }
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
const BUNDLED_CAPTURE_BOUND_TOLERANCE_US: u64 = 50_000; #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
#[cfg(not(coverage))] struct MediaV2BundleFacts {
const BUNDLED_MIXED_CAPTURE_SPAN_DROP_US: u64 = 250_000;
#[cfg(not(coverage))]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct BundledTimingSummary {
has_audio: bool, has_audio: bool,
has_video: bool, has_video: bool,
min_event_pts_us: u64,
max_event_pts_us: u64,
capture_span_us: u64, capture_span_us: u64,
capture_bounds_match: bool, max_queue_age_ms: u32,
mixed_span_too_wide: bool,
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn bundled_events_are_mixed(events: &[BundledUpstreamEvent]) -> bool { #[derive(Clone, Copy, Debug)]
let has_audio = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Audio(_))); struct MediaV2HandoffSchedule {
let has_video = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Video(_))); audio_due_at: Option<tokio::time::Instant>,
has_audio && has_video video_due_at: Option<tokio::time::Instant>,
common_delay: Duration,
relative_audio_delay: Duration,
relative_video_delay: Duration,
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn retain_startup_video_only(events: &mut Vec<BundledUpstreamEvent>) -> bool { fn summarize_media_v2_bundle(bundle: &UpstreamMediaBundle) -> Option<MediaV2BundleFacts> {
if !bundled_events_are_mixed(events) { let mut capture_start_us = u64::MAX;
return false; let mut capture_end_us = 0_u64;
let mut max_queue_age_ms = 0_u32;
let has_video = bundle.video.is_some();
let has_audio = !bundle.audio.is_empty();
if let Some(video) = bundle.video.as_ref() {
let pts = packet_video_capture_pts_us(video);
capture_start_us = capture_start_us.min(pts);
capture_end_us = capture_end_us.max(pts);
max_queue_age_ms = max_queue_age_ms.max(video.client_queue_age_ms);
} }
events.retain(|event| matches!(event, BundledUpstreamEvent::Video(_))); for audio in &bundle.audio {
!events.is_empty() let pts = packet_audio_capture_pts_us(audio);
capture_start_us = capture_start_us.min(pts);
capture_end_us = capture_end_us.max(pts);
max_queue_age_ms = max_queue_age_ms.max(audio.client_queue_age_ms);
} }
if !has_audio && !has_video {
#[cfg(not(coverage))] return None;
fn summarize_bundled_timing( }
bundle: &UpstreamMediaBundle, if capture_start_us == u64::MAX {
events: &[BundledUpstreamEvent], capture_start_us = 0;
) -> Option<BundledTimingSummary> { }
let min_event_pts_us = events.iter().map(BundledUpstreamEvent::remote_pts_us).min()?; Some(MediaV2BundleFacts {
let max_event_pts_us = events.iter().map(BundledUpstreamEvent::remote_pts_us).max()?;
let has_audio = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Audio(_)));
let has_video = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Video(_)));
let start_matches = bundle.capture_start_us == 0
|| abs_delta_us(bundle.capture_start_us, min_event_pts_us)
<= BUNDLED_CAPTURE_BOUND_TOLERANCE_US;
let end_matches = bundle.capture_end_us == 0
|| abs_delta_us(bundle.capture_end_us, max_event_pts_us) <= BUNDLED_CAPTURE_BOUND_TOLERANCE_US;
let capture_span_us = max_event_pts_us.saturating_sub(min_event_pts_us);
Some(BundledTimingSummary {
has_audio, has_audio,
has_video, has_video,
min_event_pts_us, capture_span_us: capture_end_us.saturating_sub(capture_start_us),
max_event_pts_us, max_queue_age_ms,
capture_span_us,
capture_bounds_match: start_matches && end_matches,
mixed_span_too_wide: has_audio
&& has_video
&& capture_span_us > BUNDLED_MIXED_CAPTURE_SPAN_DROP_US,
}) })
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn abs_delta_us(left: u64, right: u64) -> u64 { fn packet_audio_capture_pts_us(packet: &AudioPacket) -> u64 {
left.max(right) - left.min(right) if packet.client_capture_pts_us == 0 {
packet.pts
} else {
packet.client_capture_pts_us
}
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn bundled_upstream_playout_delay() -> Duration { fn packet_video_capture_pts_us(packet: &VideoPacket) -> u64 {
std::env::var("LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS") if packet.client_capture_pts_us == 0 {
.or_else(|_| std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS")) packet.pts
} else {
packet.client_capture_pts_us
}
}
#[cfg(not(coverage))]
fn media_v2_playout_delay() -> Duration {
std::env::var("LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS")
.or_else(|_| std::env::var("LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS"))
.ok() .ok()
.and_then(|value| value.trim().parse::<u64>().ok()) .and_then(|value| value.trim().parse::<u64>().ok())
.map(Duration::from_millis) .map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(350)) .unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_PLAYOUT_DELAY_MS))
}
#[cfg(not(coverage))]
fn media_v2_max_live_age() -> Duration {
std::env::var("LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS")
.or_else(|_| std::env::var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS"))
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS))
}
#[cfg(not(coverage))]
fn media_v2_handoff_schedule(
facts: MediaV2BundleFacts,
audio_offset_us: i64,
video_offset_us: i64,
) -> Option<MediaV2HandoffSchedule> {
let max_live_age = media_v2_max_live_age();
let queue_age = Duration::from_millis(u64::from(facts.max_queue_age_ms));
if queue_age >= max_live_age {
return None;
}
let mut relative_audio_delay = Duration::ZERO;
let mut relative_video_delay = Duration::ZERO;
if facts.has_audio && facts.has_video {
let base_offset_us = audio_offset_us.min(video_offset_us);
relative_audio_delay =
Duration::from_micros(audio_offset_us.saturating_sub(base_offset_us) as u64);
relative_video_delay =
Duration::from_micros(video_offset_us.saturating_sub(base_offset_us) as u64);
}
let relative_span = relative_audio_delay.max(relative_video_delay);
let remaining_after_offset = max_live_age
.saturating_sub(queue_age)
.saturating_sub(relative_span);
let common_delay = media_v2_playout_delay().min(remaining_after_offset);
let now = tokio::time::Instant::now();
Some(MediaV2HandoffSchedule {
audio_due_at: facts
.has_audio
.then_some(now + common_delay + relative_audio_delay),
video_due_at: facts
.has_video
.then_some(now + common_delay + relative_video_delay),
common_delay,
relative_audio_delay,
relative_video_delay,
})
}
#[cfg(not(coverage))]
async fn sleep_until_media_v2(due_at: tokio::time::Instant) {
if due_at > tokio::time::Instant::now() {
tokio::time::sleep_until(due_at).await;
}
}
#[cfg(not(coverage))]
async fn push_media_v2_audio(
audio_packets: &mut Vec<AudioPacket>,
clock: &mut MediaV2Clock,
sink: &mut lesavka_server::audio::Voice,
upstream_media_rt: &UpstreamMediaRuntime,
due_at: tokio::time::Instant,
) {
sleep_until_media_v2(due_at).await;
for mut audio in audio_packets.drain(..) {
let capture_pts_us = packet_audio_capture_pts_us(&audio);
audio.pts = clock.local_pts_us(capture_pts_us);
sink.push(&audio);
upstream_media_rt.mark_audio_presented(audio.pts, due_at);
}
}
#[cfg(not(coverage))]
async fn feed_media_v2_video(
video: Option<VideoPacket>,
clock: &mut MediaV2Clock,
relay: &Arc<lesavka_server::video::CameraRelay>,
upstream_media_rt: &UpstreamMediaRuntime,
due_at: tokio::time::Instant,
video_presented_once: &mut bool,
rpc_id: u64,
session_id: u64,
camera_session_id: u64,
) {
let Some(mut video) = video else {
return;
};
sleep_until_media_v2(due_at).await;
let capture_pts_us = packet_video_capture_pts_us(&video);
video.pts = clock.local_pts_us(capture_pts_us);
let presented_pts = video.pts;
relay.feed(video);
if !*video_presented_once {
info!(
rpc_id,
session_id,
camera_session_id,
pts = presented_pts,
"📦 first v2 bundled video frame fed to camera sink"
);
*video_presented_once = true;
}
upstream_media_rt.mark_video_presented(presented_pts, due_at);
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
@ -235,7 +311,7 @@ impl Relay for Handler {
Ok(Response::new(ReceiverStream::new(rx))) Ok(Response::new(ReceiverStream::new(rx)))
} }
/// Accept client-bundled webcam and microphone packets on one upstream clock. /// Accept client-bundled webcam and microphone packets on the v2 upstream path.
async fn stream_webcam_media( async fn stream_webcam_media(
&self, &self,
req: Request<tonic::Streaming<UpstreamMediaBundle>>, req: Request<tonic::Streaming<UpstreamMediaBundle>>,
@ -254,16 +330,14 @@ impl Relay for Handler {
width = camera_cfg.width, width = camera_cfg.width,
height = camera_cfg.height, height = camera_cfg.height,
fps = camera_cfg.fps, fps = camera_cfg.fps,
"📦 stream_webcam_media opened" "📦 stream_webcam_media v2 opened"
); );
let (camera_session_id, relay, _relay_reused) = let (camera_session_id, relay, _relay_reused) =
match self.camera_rt.activate(&camera_cfg).await { match self.camera_rt.activate(&camera_cfg).await {
Ok(active) => active, Ok(active) => active,
Err(err) => { Err(err) => {
self.upstream_media_rt self.upstream_media_rt.close_camera(camera_lease.generation);
.close_camera(camera_lease.generation); self.upstream_media_rt.close_microphone(microphone_lease.generation);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
return Err(err); return Err(err);
} }
}; };
@ -272,53 +346,44 @@ impl Relay for Handler {
.reserve_microphone_sink(microphone_lease.generation) .reserve_microphone_sink(microphone_lease.generation)
.await .await
else { else {
self.upstream_media_rt self.upstream_media_rt.close_camera(camera_lease.generation);
.close_camera(camera_lease.generation); self.upstream_media_rt.close_microphone(microphone_lease.generation);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
return Err(Status::aborted( return Err(Status::aborted(
"bundled webcam media stream superseded before microphone sink became available", "v2 bundled media stream superseded before microphone sink became available",
)); ));
}; };
let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let mut sink = runtime_support::open_voice_with_retry(&uac_dev) let mut sink = runtime_support::open_voice_with_retry(&uac_dev)
.await .await
.map_err(|e| { .map_err(|e| {
self.upstream_media_rt self.upstream_media_rt.close_camera(camera_lease.generation);
.close_camera(camera_lease.generation); self.upstream_media_rt.close_microphone(microphone_lease.generation);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
Status::internal(format!("{e:#}")) Status::internal(format!("{e:#}"))
})?; })?;
let camera_rt = self.camera_rt.clone(); let camera_rt = self.camera_rt.clone();
let upstream_media_rt = self.upstream_media_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone();
let frame_step_us = (1_000_000u64 / u64::from(camera_cfg.fps.max(1))).max(1);
let stale_drop_budget = upstream_stale_drop_budget();
let (tx, rx) = tokio::sync::mpsc::channel(1); let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move { tokio::spawn(async move {
let _microphone_sink_permit = microphone_sink_permit; let _microphone_sink_permit = microphone_sink_permit;
let mut inbound = req.into_inner(); let mut inbound = req.into_inner();
let mut clock = BundledPlayoutClock::default(); let mut clock = MediaV2Clock::default();
let mut last_bundle_session_id = None; let mut last_bundle_session_id = None;
let mut last_bundle_seq = None; let mut last_bundle_seq = None;
let mut video_presented_once = false; let mut video_presented_once = false;
let mut outcome = "aborted"; let mut outcome = "aborted";
'bundled_loop: loop {
let bundle = match inbound.next().await { while let Some(bundle_result) = inbound.next().await {
Some(Ok(bundle)) => bundle, let mut bundle = match bundle_result {
Some(Err(err)) => { Ok(bundle) => bundle,
Err(err) => {
warn!( warn!(
rpc_id, rpc_id,
session_id = camera_lease.session_id, session_id = camera_lease.session_id,
"📦 stream_webcam_media inbound error before clean EOF: {err}" "📦 stream_webcam_media v2 inbound error before clean EOF: {err}"
); );
break; break;
} }
None => {
outcome = "closed";
break;
}
}; };
if !camera_rt.is_active(camera_session_id) if !camera_rt.is_active(camera_session_id)
|| !upstream_media_rt.is_camera_active(camera_lease.generation) || !upstream_media_rt.is_camera_active(camera_lease.generation)
@ -327,31 +392,14 @@ impl Relay for Handler {
outcome = "superseded"; outcome = "superseded";
break; break;
} }
let mut events = Vec::with_capacity(bundle.audio.len() + 1);
if let Some(video) = bundle.video.clone() {
upstream_media_rt
.record_client_timing(UpstreamMediaKind::Camera, video_client_timing(&video));
events.push(BundledUpstreamEvent::Video(video));
}
for audio in &bundle.audio {
upstream_media_rt.record_client_timing(
UpstreamMediaKind::Microphone,
audio_client_timing(audio),
);
events.push(BundledUpstreamEvent::Audio(audio.clone()));
}
if events.is_empty() {
continue;
}
events.sort_by_key(|event| (event.remote_pts_us(), event.playout_order()));
if last_bundle_session_id.is_some_and(|session_id| session_id != bundle.session_id) { if last_bundle_session_id.is_some_and(|session_id| session_id != bundle.session_id) {
warn!( warn!(
rpc_id, rpc_id,
previous_session_id = last_bundle_session_id.unwrap_or_default(), previous_session_id = last_bundle_session_id.unwrap_or_default(),
next_session_id = bundle.session_id, next_session_id = bundle.session_id,
"📦 bundled upstream client session changed inside one gRPC stream; resetting playout epoch" "📦 v2 bundled client session changed; resetting local media clock"
); );
clock = BundledPlayoutClock::default(); clock = MediaV2Clock::default();
last_bundle_seq = None; last_bundle_seq = None;
} }
last_bundle_session_id = Some(bundle.session_id); last_bundle_session_id = Some(bundle.session_id);
@ -362,204 +410,143 @@ impl Relay for Handler {
client_bundle_session_id = bundle.session_id, client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq, bundle_seq = bundle.seq,
previous_bundle_seq = last_bundle_seq.unwrap_or_default(), previous_bundle_seq = last_bundle_seq.unwrap_or_default(),
"📦 bundled upstream packet sequence moved backwards; dropping duplicate/stale bundle" "📦 v2 dropping duplicate/stale bundled packet"
); );
continue; continue;
} }
last_bundle_seq = Some(bundle.seq); last_bundle_seq = Some(bundle.seq);
let Some(timing_summary) = summarize_bundled_timing(&bundle, &events) else {
let Some(facts) = summarize_media_v2_bundle(&bundle) else {
continue; continue;
}; };
if !timing_summary.capture_bounds_match { if facts.has_audio && facts.has_video && facts.capture_span_us > MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US {
warn!( warn!(
rpc_id, rpc_id,
session_id = camera_lease.session_id, session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id, client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq, bundle_seq = bundle.seq,
capture_start_us = bundle.capture_start_us, span_ms = facts.capture_span_us / 1000,
capture_end_us = bundle.capture_end_us, "📦 v2 dropping mixed bundle with impossible A/V capture span"
event_min_us = timing_summary.min_event_pts_us,
event_max_us = timing_summary.max_event_pts_us,
"📦 bundled upstream capture bounds disagreed with packet timing; using packet sidecar timing"
);
}
if timing_summary.mixed_span_too_wide {
if !video_presented_once && retain_startup_video_only(&mut events) {
warn!(
rpc_id,
session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
span_ms = timing_summary.capture_span_us / 1000,
"📦 bundled startup A/V span is too wide; dropping audio but feeding video to prime the camera device"
);
} else {
warn!(
rpc_id,
session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
span_ms = timing_summary.capture_span_us / 1000,
"📦 bundled mixed A/V capture span is too wide; dropping the bundle to protect sync"
); );
continue; continue;
} }
for audio in &bundle.audio {
upstream_media_rt.record_client_timing(
UpstreamMediaKind::Microphone,
audio_client_timing(audio),
);
} }
let Some((base_remote_pts_us, epoch)) = clock.ensure(&events) else { if let Some(video) = bundle.video.as_ref() {
upstream_media_rt.record_client_timing(
UpstreamMediaKind::Camera,
video_client_timing(video),
);
}
let (video_offset_us, audio_offset_us) = upstream_media_rt.playout_offsets();
let Some(schedule) = media_v2_handoff_schedule(facts, audio_offset_us, video_offset_us) else {
if facts.has_video {
upstream_media_rt.record_video_freeze(
"v2 dropped stale bundled A/V before UVC/UAC handoff",
);
}
warn!(
rpc_id,
session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
max_queue_age_ms = facts.max_queue_age_ms,
"📦 v2 dropping whole bundle because it is already outside the freshness budget"
);
continue; continue;
}; };
let mut mixed_bundle = bundled_events_are_mixed(&events); debug!(
let startup_video_priming = !video_presented_once
&& events
.iter()
.any(|event| matches!(event, BundledUpstreamEvent::Video(_)));
let mut planned_events = Vec::with_capacity(events.len());
let mut drop_mixed_bundle = false;
let mut dropped_audio_for_startup_video = false;
for event in events {
let kind = event.kind();
let min_step_us = match kind {
UpstreamMediaKind::Camera => frame_step_us,
UpstreamMediaKind::Microphone => 1,
};
let plan = match upstream_media_rt.plan_bundled_pts(
kind,
event.remote_pts_us(),
min_step_us,
base_remote_pts_us,
epoch,
) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
if startup_video_priming && kind == UpstreamMediaKind::Microphone {
dropped_audio_for_startup_video = true;
continue;
}
if mixed_bundle {
drop_mixed_bundle = true;
}
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => {
tracing::warn!(
rpc_id,
session_id = camera_lease.session_id,
?kind,
reason,
"📦 bundled upstream packet dropped by freshness planner"
);
if startup_video_priming && kind == UpstreamMediaKind::Microphone {
dropped_audio_for_startup_video = true;
continue;
}
if mixed_bundle {
drop_mixed_bundle = true;
}
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => continue,
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(reason) => {
tracing::error!(
rpc_id,
session_id = camera_lease.session_id,
reason,
"📦 bundled upstream startup failed"
);
break 'bundled_loop;
}
};
if plan.late_by > stale_drop_budget {
tracing::warn!(
rpc_id,
session_id = camera_lease.session_id,
?kind,
late_by_ms = plan.late_by.as_millis(),
pts = plan.local_pts_us,
"📦 bundled upstream packet dropped after missing freshness budget"
);
if startup_video_priming && kind == UpstreamMediaKind::Microphone {
dropped_audio_for_startup_video = true;
continue;
}
if mixed_bundle {
drop_mixed_bundle = true;
}
continue;
}
planned_events.push((event, plan));
}
if dropped_audio_for_startup_video {
mixed_bundle = false;
warn!(
rpc_id, rpc_id,
session_id = camera_lease.session_id, session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id, client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq, bundle_seq = bundle.seq,
"📦 dropped startup audio from a bad bundled packet but kept video-only playout so the camera can start" max_queue_age_ms = facts.max_queue_age_ms,
common_delay_ms = schedule.common_delay.as_millis(),
relative_audio_delay_ms = schedule.relative_audio_delay.as_millis(),
relative_video_delay_ms = schedule.relative_video_delay.as_millis(),
audio_offset_us,
video_offset_us,
"📦 v2 scheduled bundled UAC/UVC handoff from one capture clock"
); );
}
if drop_mixed_bundle { match (schedule.audio_due_at, schedule.video_due_at) {
warn!( (Some(audio_due_at), Some(video_due_at)) if audio_due_at <= video_due_at => {
push_media_v2_audio(
&mut bundle.audio,
&mut clock,
&mut sink,
&upstream_media_rt,
audio_due_at,
)
.await;
feed_media_v2_video(
bundle.video.take(),
&mut clock,
&relay,
&upstream_media_rt,
video_due_at,
&mut video_presented_once,
rpc_id, rpc_id,
session_id = camera_lease.session_id, camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
"📦 dropping mixed A/V bundle coherently because one side failed sync/freshness planning"
);
continue;
}
for (event, plan) in planned_events {
let kind = event.kind();
tokio::time::sleep_until(plan.due_at).await;
let actual_late_by = tokio::time::Instant::now()
.checked_duration_since(plan.due_at)
.unwrap_or_default();
if actual_late_by > stale_drop_budget {
tracing::warn!(
rpc_id,
session_id = camera_lease.session_id,
?kind,
late_by_ms = actual_late_by.as_millis(),
pts = plan.local_pts_us,
"📦 bundled upstream packet dropped after waking too late"
);
if mixed_bundle {
warn!(
rpc_id,
session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
"📦 stopping the rest of this mixed bundle after a late wake to avoid asymmetric playout"
);
break;
}
continue;
}
match event {
BundledUpstreamEvent::Audio(mut packet) => {
packet.pts = plan.local_pts_us;
sink.push(&packet);
upstream_media_rt.mark_audio_presented(packet.pts, plan.due_at);
}
BundledUpstreamEvent::Video(mut packet) => {
packet.pts = plan.local_pts_us;
let presented_pts = packet.pts;
relay.feed(packet);
if !video_presented_once {
info!(
rpc_id,
session_id = camera_lease.session_id,
camera_session_id, camera_session_id,
pts = presented_pts, )
"📦 first bundled video frame fed to camera sink" .await;
);
video_presented_once = true;
}
upstream_media_rt.mark_video_presented(presented_pts, plan.due_at);
}
} }
(Some(audio_due_at), Some(video_due_at)) => {
feed_media_v2_video(
bundle.video.take(),
&mut clock,
&relay,
&upstream_media_rt,
video_due_at,
&mut video_presented_once,
rpc_id,
camera_lease.session_id,
camera_session_id,
)
.await;
push_media_v2_audio(
&mut bundle.audio,
&mut clock,
&mut sink,
&upstream_media_rt,
audio_due_at,
)
.await;
}
(Some(audio_due_at), None) => {
push_media_v2_audio(
&mut bundle.audio,
&mut clock,
&mut sink,
&upstream_media_rt,
audio_due_at,
)
.await;
}
(None, Some(video_due_at)) => {
feed_media_v2_video(
bundle.video.take(),
&mut clock,
&relay,
&upstream_media_rt,
video_due_at,
&mut video_presented_once,
rpc_id,
camera_lease.session_id,
camera_session_id,
)
.await;
}
(None, None) => {}
} }
} }
outcome = if outcome == "aborted" { "closed" } else { outcome };
sink.finish(); sink.finish();
upstream_media_rt.close_camera(camera_lease.generation); upstream_media_rt.close_camera(camera_lease.generation);
upstream_media_rt.close_microphone(microphone_lease.generation); upstream_media_rt.close_microphone(microphone_lease.generation);
@ -568,7 +555,7 @@ impl Relay for Handler {
session_id = camera_lease.session_id, session_id = camera_lease.session_id,
camera_session_id, camera_session_id,
outcome, outcome,
"📦 stream_webcam_media lifecycle ended" "📦 stream_webcam_media v2 lifecycle ended"
); );
tx.send(Ok(Empty {})).await.ok(); tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(()) Ok::<(), Status>(())

View File

@ -2,13 +2,12 @@
#[allow(clippy::items_after_test_module)] #[allow(clippy::items_after_test_module)]
mod tests { mod tests {
use super::{ use super::{
BundledUpstreamEvent, UpstreamStreamCleanup, bundled_events_are_mixed, MediaV2BundleFacts, UpstreamStreamCleanup, media_v2_handoff_schedule,
retain_freshest_audio_packet, retain_freshest_video_packet, retain_startup_video_only, retain_freshest_audio_packet, retain_freshest_video_packet, summarize_media_v2_bundle,
summarize_bundled_timing,
}; };
use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket}; use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket};
use lesavka_server::upstream_media_runtime::{ use lesavka_server::upstream_media_runtime::{
UpstreamMediaKind, UpstreamMediaRuntime, UpstreamClientTiming, UpstreamClientTiming, UpstreamMediaKind, UpstreamMediaRuntime,
}; };
use std::sync::Arc; use std::sync::Arc;
@ -99,69 +98,84 @@ mod tests {
} }
#[test] #[test]
fn bundled_event_timing_uses_client_capture_sidecar_not_packet_pts() { fn media_v2_bundle_summary_uses_client_capture_sidecar_not_packet_pts() {
let video = BundledUpstreamEvent::Video(VideoPacket { let bundle = UpstreamMediaBundle {
capture_start_us: 1,
capture_end_us: 2,
video: Some(VideoPacket {
pts: 9_999_000,
client_capture_pts_us: 1_000_000,
client_send_pts_us: 1_001_000,
client_queue_age_ms: 12,
..Default::default()
}),
audio: vec![AudioPacket {
pts: 8_888_000,
client_capture_pts_us: 1_020_000,
client_send_pts_us: 1_021_000,
client_queue_age_ms: 34,
..Default::default()
}],
..Default::default()
};
let summary = summarize_media_v2_bundle(&bundle).expect("summary");
assert!(summary.has_video);
assert!(summary.has_audio);
assert_eq!(summary.capture_span_us, 20_000);
assert_eq!(summary.max_queue_age_ms, 34);
}
#[test]
fn media_v2_schedule_offsets_outputs_without_creating_split_planner() {
let facts = MediaV2BundleFacts {
has_audio: true,
has_video: true,
capture_span_us: 20_000,
max_queue_age_ms: 0,
};
let schedule =
media_v2_handoff_schedule(facts, 0, 979_000).expect("fresh schedule");
let audio_due = schedule.audio_due_at.expect("audio due");
let video_due = schedule.video_due_at.expect("video due");
assert_eq!(schedule.relative_audio_delay.as_millis(), 0);
assert_eq!(schedule.relative_video_delay.as_millis(), 979);
assert_eq!(video_due.duration_since(audio_due).as_millis(), 979);
assert!(schedule.common_delay.as_millis() <= 21);
}
#[test]
fn media_v2_schedule_refuses_bundles_already_past_freshness_budget() {
let facts = MediaV2BundleFacts {
has_audio: true,
has_video: true,
capture_span_us: 20_000,
max_queue_age_ms: 1_000,
};
assert!(media_v2_handoff_schedule(facts, 0, 0).is_none());
}
#[test]
fn legacy_bundled_event_timing_example_documents_quarantined_v1_behavior() {
let video = VideoPacket {
pts: 9_999_000, pts: 9_999_000,
client_capture_pts_us: 1_000_000, client_capture_pts_us: 1_000_000,
client_send_pts_us: 1_001_000, client_send_pts_us: 1_001_000,
..Default::default() ..Default::default()
}); };
let audio = BundledUpstreamEvent::Audio(AudioPacket { let audio = AudioPacket {
pts: 8_888_000, pts: 8_888_000,
client_capture_pts_us: 1_020_000, client_capture_pts_us: 1_020_000,
client_send_pts_us: 1_021_000, client_send_pts_us: 1_021_000,
..Default::default() ..Default::default()
});
assert_eq!(video.remote_pts_us(), 1_000_000);
assert_eq!(audio.remote_pts_us(), 1_020_000);
}
#[test]
fn bundled_timing_summary_flags_bad_bounds_and_wide_mixed_span() {
let events = vec![
BundledUpstreamEvent::Video(VideoPacket {
client_capture_pts_us: 1_000_000,
..Default::default()
}),
BundledUpstreamEvent::Audio(AudioPacket {
client_capture_pts_us: 1_400_000,
..Default::default()
}),
];
let bundle = UpstreamMediaBundle {
capture_start_us: 100,
capture_end_us: 200,
..Default::default()
}; };
let summary = summarize_bundled_timing(&bundle, &events).expect("timing summary"); assert_eq!(video.client_capture_pts_us, 1_000_000);
assert_eq!(audio.client_capture_pts_us, 1_020_000);
assert!(bundled_events_are_mixed(&events));
assert_eq!(summary.capture_span_us, 400_000);
assert!(!summary.capture_bounds_match);
assert!(summary.mixed_span_too_wide);
}
#[test]
fn startup_video_retention_drops_audio_from_bad_mixed_bundle() {
let mut events = vec![
BundledUpstreamEvent::Audio(AudioPacket {
client_capture_pts_us: 1_000_000,
..Default::default()
}),
BundledUpstreamEvent::Video(VideoPacket {
client_capture_pts_us: 1_500_000,
..Default::default()
}),
];
assert!(bundled_events_are_mixed(&events));
assert!(retain_startup_video_only(&mut events));
assert!(!bundled_events_are_mixed(&events));
assert_eq!(events.len(), 1);
assert!(matches!(events[0], BundledUpstreamEvent::Video(_)));
} }
#[test] #[test]

File diff suppressed because it is too large Load Diff