fix: enforce bundled media freshness

This commit is contained in:
Brad Stein 2026-05-03 06:20:27 -03:00
parent 1673bb8b1b
commit 7637f7005a
13 changed files with 258 additions and 36 deletions

View File

@ -1,6 +1,6 @@
# Lesavka Agent Notes # Lesavka Agent Notes
## 0.18.2 Bundled Webcam A/V Migration Checklist ## 0.18.3 Bundled Webcam A/V Migration Checklist
Context: manual Google Meet and mirrored-probe testing showed the split webcam Context: manual Google Meet and mirrored-probe testing showed the split webcam
and microphone uplink design is too fragile under real browser/device pressure. and microphone uplink design is too fragile under real browser/device pressure.
@ -31,6 +31,12 @@ explicit no-camera path.
- [x] An already-attached UVC gadget descriptor is the physical browser contract: - [x] An already-attached UVC gadget descriptor is the physical browser contract:
if it still advertises an older profile, server handshake/capture sizing if it still advertises an older profile, server handshake/capture sizing
follows that live descriptor until a controlled gadget rebuild is allowed. follows that live descriptor until a controlled gadget rebuild is allowed.
- [x] Bundled webcam sessions enforce a hard one-second freshness ceiling:
server-side reanchors may improve smoothness, but stale/future waits over
the live budget are dropped instead of preserving lag.
- [x] Bundled webcam sessions do not inherit legacy split-path static A/V
calibration offsets by default; the client-owned capture timeline is the
sync source, with only explicit bundled-offset env overrides allowed.
### Wire Protocol ### Wire Protocol
- [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or - [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or
@ -46,6 +52,10 @@ explicit no-camera path.
- [x] Spawn one bundled capture/uplink task instead of separate camera and mic - [x] Spawn one bundled capture/uplink task instead of separate camera and mic
tasks for webcam sessions. tasks for webcam sessions.
- [x] Bundle camera frames and microphone packets into one freshness-bounded queue. - [x] Bundle camera frames and microphone packets into one freshness-bounded queue.
- [x] Bound the pre-bundle capture handoff channel so camera/mic workers drop
old events under pressure instead of building unbounded latency.
- [x] Drop lag-clamped camera and microphone source buffers before bundling;
stale source data may not be relabeled as fresh media.
- [x] Stamp all packets at capture/uplink enqueue before the async gRPC stream - [x] Stamp all packets at capture/uplink enqueue before the async gRPC stream
can add misleading delay. can add misleading delay.
- [x] Preserve live UI device/profile changes by restarting the bundled capture - [x] Preserve live UI device/profile changes by restarting the bundled capture
@ -66,6 +76,8 @@ explicit no-camera path.
raw packet `pts`, and reset the bundled epoch on client-session changes. raw packet `pts`, and reset the bundled epoch on client-session changes.
- [x] Keep server freshness drops/reanchors active for bundled media. - [x] Keep server freshness drops/reanchors active for bundled media.
- [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning. - [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning.
- [x] Reanchor bundled playout when due times drift too far into the future, and
drop packets whose predicted playout age would exceed the one-second budget.
- [x] Activate the camera relay before opening the microphone sink so UVC can - [x] Activate the camera relay before opening the microphone sink so UVC can
become ready even if UAC setup is slow. become ready even if UAC setup is slow.
- [x] Log the first bundled video frame handed to the camera sink. - [x] Log the first bundled video frame handed to the camera sink.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.18.2" version = "0.18.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.18.2" version = "0.18.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.18.2" version = "0.18.3"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.18.2" version = "0.18.3"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -160,7 +160,9 @@ impl LesavkaClientApp {
match cli.stream_webcam_media(Request::new(outbound)).await { match cli.stream_webcam_media(Request::new(outbound)).await {
Ok(mut resp) => { Ok(mut resp) => {
let stop = Arc::new(AtomicBool::new(false)); let stop = Arc::new(AtomicBool::new(false));
let (event_tx, event_rx) = std::sync::mpsc::channel::<BundledCaptureEvent>(); let (event_tx, event_rx) = std::sync::mpsc::sync_channel::<BundledCaptureEvent>(
BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY,
);
let camera_worker = camera.as_ref().map(|camera| { let camera_worker = camera.as_ref().map(|camera| {
let camera = Arc::clone(camera); let camera = Arc::clone(camera);
let stop = Arc::clone(&stop); let stop = Arc::clone(&stop);
@ -181,13 +183,16 @@ impl LesavkaClientApp {
|| desired_source != active_camera_source || desired_source != active_camera_source
|| desired_profile != active_camera_profile || desired_profile != active_camera_profile
{ {
let _ = event_tx.send(BundledCaptureEvent::Restart); stop.store(true, Ordering::Relaxed);
let _ = event_tx.try_send(BundledCaptureEvent::Restart);
break; break;
} }
if let Some(mut pkt) = camera.pull() { if let Some(mut pkt) = camera.pull() {
let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt); let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt);
if event_tx.send(BundledCaptureEvent::Video(pkt)).is_err() { match event_tx.try_send(BundledCaptureEvent::Video(pkt)) {
break; Ok(()) => {}
Err(std::sync::mpsc::TrySendError::Full(_)) => continue,
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
} }
} }
} }
@ -211,13 +216,16 @@ impl LesavkaClientApp {
|| !(state.microphone || state.camera) || !(state.microphone || state.camera)
|| desired_source != active_microphone_source || desired_source != active_microphone_source
{ {
let _ = event_tx.send(BundledCaptureEvent::Restart); stop.store(true, Ordering::Relaxed);
let _ = event_tx.try_send(BundledCaptureEvent::Restart);
break; break;
} }
if let Some(mut pkt) = microphone.pull() { if let Some(mut pkt) = microphone.pull() {
let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt); let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt);
if event_tx.send(BundledCaptureEvent::Audio(pkt)).is_err() { match event_tx.try_send(BundledCaptureEvent::Audio(pkt)) {
break; Ok(()) => {}
Err(std::sync::mpsc::TrySendError::Full(_)) => continue,
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
} }
} }
} }
@ -783,6 +791,9 @@ const BUNDLED_AUDIO_FLUSH_INTERVAL: Duration = Duration::from_millis(20);
#[cfg(not(coverage))] #[cfg(not(coverage))]
const BUNDLED_AUDIO_MAX_PENDING: usize = 8; const BUNDLED_AUDIO_MAX_PENDING: usize = 8;
#[cfg(not(coverage))]
const BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY: usize = 64;
#[cfg(not(coverage))] #[cfg(not(coverage))]
#[derive(Debug)] #[derive(Debug)]
enum BundledCaptureEvent { enum BundledCaptureEvent {

View File

@ -257,6 +257,10 @@ impl CameraCapture {
packet_duration_us, packet_duration_us,
crate::live_capture_clock::upstream_source_lag_cap(), crate::live_capture_clock::upstream_source_lag_cap(),
); );
if timing.lag_clamped {
log_camera_stale_source_drop(timing, map.as_slice().len());
return None;
}
let pts = timing.packet_pts_us; let pts = timing.packet_pts_us;
static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 = static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0); std::sync::atomic::AtomicU64::new(0);
@ -334,14 +338,31 @@ fn log_camera_timing_sample(
source_pts_us = timing.source_pts_us.unwrap_or_default(), source_pts_us = timing.source_pts_us.unwrap_or_default(),
source_base_us = timing.source_base_us.unwrap_or_default(), source_base_us = timing.source_base_us.unwrap_or_default(),
capture_base_us = timing.capture_base_us.unwrap_or_default(), capture_base_us = timing.capture_base_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us, capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us, packet_pts_us = timing.packet_pts_us,
pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128,
used_source_pts = timing.used_source_pts, used_source_pts = timing.used_source_pts,
lag_clamped = timing.lag_clamped, lag_clamped = timing.lag_clamped,
lead_clamped = timing.lead_clamped, lead_clamped = timing.lead_clamped,
bytes, bytes,
"📸 upstream webcam timing sample" "📸 upstream webcam timing sample"
); );
}
} }
fn log_camera_stale_source_drop(timing: crate::live_capture_clock::RebasedSourcePts, bytes: usize) {
static CAMERA_STALE_SOURCE_DROPS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let drop_index =
CAMERA_STALE_SOURCE_DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if drop_index < 10 || drop_index.is_multiple_of(300) {
tracing::warn!(
drop_index,
bytes,
source_pts_us = timing.source_pts_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us,
"📸 dropping stale webcam source buffer before bundled uplink"
);
}
} }

View File

@ -169,6 +169,10 @@ impl MicrophoneCapture {
packet_duration_us, packet_duration_us,
crate::live_capture_clock::upstream_source_lag_cap(), crate::live_capture_clock::upstream_source_lag_cap(),
); );
if timing.lag_clamped {
log_microphone_stale_source_drop(timing, map.len());
return None;
}
let pts = timing.packet_pts_us; let pts = timing.packet_pts_us;
let target_bytes = mic_packet_target_bytes(); let target_bytes = mic_packet_target_bytes();
let mut packets = split_audio_sample(pts, map.as_slice(), target_bytes); let mut packets = split_audio_sample(pts, map.as_slice(), target_bytes);
@ -377,6 +381,32 @@ fn pcm_payload_duration_us(bytes: usize) -> u64 {
((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) as u64 ((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) as u64
} }
#[cfg(not(coverage))]
fn log_microphone_stale_source_drop(
timing: crate::live_capture_clock::RebasedSourcePts,
bytes: usize,
) {
static MIC_STALE_SOURCE_DROPS: AtomicU64 = AtomicU64::new(0);
let drop_index = MIC_STALE_SOURCE_DROPS.fetch_add(1, Ordering::Relaxed);
if drop_index < 10 || drop_index.is_multiple_of(300) {
warn!(
drop_index,
bytes,
source_pts_us = timing.source_pts_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us,
"🎤 dropping stale microphone source buffer before bundled uplink"
);
}
}
#[cfg(coverage)]
fn log_microphone_stale_source_drop(
_timing: crate::live_capture_clock::RebasedSourcePts,
_bytes: usize,
) {
}
fn split_audio_sample(base_pts_us: u64, data: &[u8], target_bytes: usize) -> VecDeque<AudioPacket> { fn split_audio_sample(base_pts_us: u64, data: &[u8], target_bytes: usize) -> VecDeque<AudioPacket> {
let frame_bytes = (MIC_CHANNELS * MIC_SAMPLE_BYTES).max(1); let frame_bytes = (MIC_CHANNELS * MIC_SAMPLE_BYTES).max(1);
let target_bytes = frame_aligned_packet_bytes(target_bytes.max(frame_bytes)); let target_bytes = frame_aligned_packet_bytes(target_bytes.max(frame_bytes));

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.18.2" version = "0.18.3"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -247,15 +247,17 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_TOUCHPAD_SCALE` | input routing/clipboard override | | `LESAVKA_TOUCHPAD_SCALE` | input routing/clipboard override |
| `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_DEV` | server hardware/device override |
| `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default |
| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | | `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | legacy/split server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch |
| `LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS` | server upstream sync override; how long video may wait past its nominal due time for UAC audio to reach the matching timestamp, defaults to `350` | | `LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS` | server upstream sync override; how long video may wait past its nominal due time for UAC audio to reach the matching timestamp, defaults to `350` |
| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` | | `LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US` | bundled webcam server playout override; defaults to `0` because bundled client capture owns A/V sync |
| `LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US` | bundled webcam server playout override; defaults to `0` because bundled client capture owns A/V sync |
| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` and is capped at `1000` |
| `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` | | `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` |
| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` | | `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` |
| `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` | | `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` |
| `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` | | `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` |
| `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | | `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging |
| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch, defaults to `1090000` for measured MJPEG/UVC browser-visible sync compensation | | `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | legacy/split server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch, defaults to `1090000` for measured MJPEG/UVC browser-visible sync compensation |
| `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override | | `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override |
| `LESAVKA_UPLINK_MIC_LEVEL` | client media capture/playback override | | `LESAVKA_UPLINK_MIC_LEVEL` | client media capture/playback override |
| `LESAVKA_INSTALL_UVC_CODEC` | installer override; sets the persisted default UVC webcam codec in `/etc/lesavka/server.env` and `/etc/lesavka/uvc.env` | | `LESAVKA_INSTALL_UVC_CODEC` | installer override; sets the persisted default UVC webcam codec in `/etc/lesavka/server.env` and `/etc/lesavka/uvc.env` |
@ -312,7 +314,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS` | upstream A/V timing override | | `LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS` | upstream A/V timing override |
| `LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_US` | upstream A/V timing override used by test contracts and server startup grace handling | | `LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_US` | upstream A/V timing override used by test contracts and server startup grace handling |
| `LESAVKA_UPSTREAM_REANCHOR_LATE_MS` | upstream A/V timing override | | `LESAVKA_UPSTREAM_REANCHOR_LATE_MS` | upstream A/V timing override |
| `LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS` | upstream A/V timing override | | `LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS` | client upstream source freshness cap before camera/mic buffers are treated as stale; defaults to `250` |
| `LESAVKA_UVC_CONTROL_READ_ONLY` | UVC helper runtime override | | `LESAVKA_UVC_CONTROL_READ_ONLY` | UVC helper runtime override |
| `LESAVKA_UVC_FRAME_PATH` | UVC helper MJPEG frame spool path | | `LESAVKA_UVC_FRAME_PATH` | UVC helper MJPEG frame spool path |
| `LESAVKA_UVC_LOCK_PATH` | UVC helper singleton lock path | | `LESAVKA_UVC_LOCK_PATH` | UVC helper singleton lock path |

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.18.2" version = "0.18.3"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -12,10 +12,10 @@ mod state;
mod types; mod types;
use config::{ use config::{
apply_playout_offset, upstream_audio_master_wait_grace, upstream_camera_startup_grace_us, apply_playout_offset, upstream_audio_master_wait_grace, upstream_bundled_playout_offset_us,
upstream_max_live_lag, upstream_pairing_master_slack, upstream_playout_delay, upstream_camera_startup_grace_us, upstream_max_live_lag, upstream_pairing_master_slack,
upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_require_paired_startup, upstream_playout_delay, upstream_playout_offset_us, upstream_reanchor_late_threshold,
upstream_startup_timeout, upstream_timing_trace_enabled, upstream_require_paired_startup, upstream_startup_timeout, upstream_timing_trace_enabled,
}; };
use state::{UpstreamClockState, UpstreamSyncPhase}; use state::{UpstreamClockState, UpstreamSyncPhase};
pub use types::{ pub use types::{
@ -382,7 +382,7 @@ impl UpstreamMediaRuntime {
} }
*last_slot = Some(local_pts_us); *last_slot = Some(local_pts_us);
let sink_offset_us = self.playout_offset_us(kind); let sink_offset_us = upstream_bundled_playout_offset_us(kind);
let epoch = state.playout_epoch.unwrap_or(bundle_epoch); let epoch = state.playout_epoch.unwrap_or(bundle_epoch);
let mut due_at = let mut due_at =
apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us);
@ -390,8 +390,11 @@ impl UpstreamMediaRuntime {
let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let mut late_by = now.checked_duration_since(due_at).unwrap_or_default();
let playout_delay = upstream_playout_delay().min(max_live_lag); let playout_delay = upstream_playout_delay().min(max_live_lag);
let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay);
if late_by > reanchor_threshold { let max_future_wait = max_live_lag.saturating_sub(source_lag);
let desired_due_at = now + playout_delay; let due_future_wait = due_at.saturating_duration_since(now);
if late_by > reanchor_threshold || due_future_wait > max_future_wait {
let desired_delay = playout_delay.min(max_future_wait);
let desired_due_at = now + desired_delay;
let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us);
let recovered_epoch = unoffset_due_at let recovered_epoch = unoffset_due_at
.checked_sub(Duration::from_micros(local_pts_us)) .checked_sub(Duration::from_micros(local_pts_us))
@ -412,10 +415,35 @@ impl UpstreamMediaRuntime {
?kind, ?kind,
local_pts_us, local_pts_us,
remote_pts_us, remote_pts_us,
recovery_buffer_ms = playout_delay.as_millis(), recovery_buffer_ms = desired_delay.as_millis(),
max_live_lag_ms = max_live_lag.as_millis(),
source_lag_ms = source_lag.as_millis(),
"bundled upstream media playhead reanchored to preserve freshness" "bundled upstream media playhead reanchored to preserve freshness"
); );
} }
let predicted_lag_at_playout =
source_lag.saturating_add(due_at.saturating_duration_since(now));
if predicted_lag_at_playout > max_live_lag {
match kind {
UpstreamMediaKind::Camera => {
state.stale_video_drops = state.stale_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.last_reason =
"dropped bundled video that would exceed max live lag at playout"
.to_string();
}
UpstreamMediaKind::Microphone => {
state.stale_audio_drops = state.stale_audio_drops.saturating_add(1);
state.last_reason =
"dropped bundled audio that would exceed max live lag at playout"
.to_string();
}
}
state.phase = UpstreamSyncPhase::Healing;
return UpstreamPlanDecision::DropStale(
"bundled packet would exceed max live lag at playout",
);
}
if kind == UpstreamMediaKind::Microphone { if kind == UpstreamMediaKind::Microphone {
self.audio_progress_notify.notify_waiters(); self.audio_progress_notify.notify_waiters();

View File

@ -30,7 +30,7 @@ pub(super) fn upstream_max_live_lag() -> Duration {
.ok() .ok()
.and_then(|value| value.trim().parse::<u64>().ok()) .and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(1_000); .unwrap_or(1_000);
Duration::from_millis(lag_ms.max(1)) Duration::from_millis(lag_ms.clamp(1, 1_000))
} }
pub(super) fn upstream_startup_timeout() -> Duration { pub(super) fn upstream_startup_timeout() -> Duration {
@ -71,6 +71,17 @@ pub(super) fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 {
.unwrap_or(default_offset_us) .unwrap_or(default_offset_us)
} }
pub(super) fn upstream_bundled_playout_offset_us(kind: UpstreamMediaKind) -> i64 {
let name = match kind {
UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US",
UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US",
};
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<i64>().ok())
.unwrap_or(0)
}
pub(super) fn upstream_pairing_master_slack() -> Duration { pub(super) fn upstream_pairing_master_slack() -> Duration {
let slack_us = std::env::var("LESAVKA_UPSTREAM_PAIR_SLACK_US") let slack_us = std::env::var("LESAVKA_UPSTREAM_PAIR_SLACK_US")
.ok() .ok()

View File

@ -25,6 +25,10 @@ fn upstream_max_live_lag_defaults_to_one_second_and_accepts_overrides() {
temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("750"), || { temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("750"), || {
assert_eq!(super::upstream_max_live_lag(), Duration::from_millis(750)); assert_eq!(super::upstream_max_live_lag(), Duration::from_millis(750));
}); });
temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("5000"), || {
assert_eq!(super::upstream_max_live_lag(), Duration::from_secs(1));
});
} }
#[test] #[test]
@ -102,6 +106,44 @@ fn upstream_playout_offsets_default_to_mjpeg_calibration_and_accept_overrides()
); );
} }
#[test]
#[serial(upstream_media_runtime)]
fn bundled_playout_offsets_default_to_zero_and_accept_explicit_overrides() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US", || {
temp_env::with_var_unset("LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US", || {
assert_eq!(
super::upstream_bundled_playout_offset_us(UpstreamMediaKind::Microphone),
0
);
assert_eq!(
super::upstream_bundled_playout_offset_us(UpstreamMediaKind::Camera),
0
);
});
});
temp_env::with_var(
"LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US",
Some("12000"),
|| {
temp_env::with_var(
"LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US",
Some("-3000"),
|| {
assert_eq!(
super::upstream_bundled_playout_offset_us(UpstreamMediaKind::Microphone),
12_000
);
assert_eq!(
super::upstream_bundled_playout_offset_us(UpstreamMediaKind::Camera),
-3_000
);
},
);
},
);
}
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn upstream_pairing_master_slack_defaults_to_eighty_ms_and_accepts_overrides() { fn upstream_pairing_master_slack_defaults_to_eighty_ms_and_accepts_overrides() {

View File

@ -76,6 +76,71 @@ fn bundled_media_uses_client_epoch_without_pairing_wait() {
}); });
} }
#[test]
#[serial(upstream_media_runtime)]
fn bundled_media_ignores_legacy_static_calibration_offsets_by_default() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
temp_env::with_var_unset("LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US", || {
temp_env::with_var_unset("LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US", || {
let runtime = UpstreamMediaRuntime::new();
runtime.set_playout_offsets(1_090_000, 0);
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let epoch = tokio::time::Instant::now();
let audio = play(runtime.plan_bundled_pts(
super::UpstreamMediaKind::Microphone,
1_000_000,
1,
1_000_000,
epoch,
));
let video = play(runtime.plan_bundled_pts(
super::UpstreamMediaKind::Camera,
1_000_000,
16_666,
1_000_000,
epoch,
));
assert_eq!(audio.due_at, video.due_at);
assert_eq!(audio.local_pts_us, video.local_pts_us);
});
});
});
}
#[test]
#[serial(upstream_media_runtime)]
fn bundled_media_reanchors_future_wait_inside_one_second_freshness_budget() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("350"), || {
temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("1000"), || {
let runtime = runtime_without_offsets();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let now = tokio::time::Instant::now();
let stale_epoch = now + Duration::from_secs(20);
let video = play(runtime.plan_bundled_pts(
super::UpstreamMediaKind::Camera,
1_000_000,
16_666,
1_000_000,
stale_epoch,
));
assert!(
video
.due_at
.saturating_duration_since(tokio::time::Instant::now())
<= Duration::from_secs(1),
"bundled playout must not preserve a many-second future wait"
);
assert!(runtime.snapshot().freshness_reanchors >= 1);
});
});
}
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn pairing_window_holds_one_sided_playout_by_default() { fn pairing_window_holds_one_sided_playout_by_default() {