diff --git a/AGENTS.md b/AGENTS.md index fd1191e..d3b5e2f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,6 +1,6 @@ # Lesavka Agent Notes -## 0.18.2 Bundled Webcam A/V Migration Checklist +## 0.18.3 Bundled Webcam A/V Migration Checklist Context: manual Google Meet and mirrored-probe testing showed the split webcam and microphone uplink design is too fragile under real browser/device pressure. @@ -31,6 +31,12 @@ explicit no-camera path. - [x] An already-attached UVC gadget descriptor is the physical browser contract: if it still advertises an older profile, server handshake/capture sizing follows that live descriptor until a controlled gadget rebuild is allowed. +- [x] Bundled webcam sessions enforce a hard one-second freshness ceiling: + server-side reanchors may improve smoothness, but stale/future waits over + the live budget are dropped instead of preserving lag. +- [x] Bundled webcam sessions do not inherit legacy split-path static A/V + calibration offsets by default; the client-owned capture timeline is the + sync source, with only explicit bundled-offset env overrides allowed. ### Wire Protocol - [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or @@ -46,6 +52,10 @@ explicit no-camera path. - [x] Spawn one bundled capture/uplink task instead of separate camera and mic tasks for webcam sessions. - [x] Bundle camera frames and microphone packets into one freshness-bounded queue. +- [x] Bound the pre-bundle capture handoff channel so camera/mic workers drop + old events under pressure instead of building unbounded latency. +- [x] Drop lag-clamped camera and microphone source buffers before bundling; + stale source data may not be relabeled as fresh media. - [x] Stamp all packets at capture/uplink enqueue before the async gRPC stream can add misleading delay. - [x] Preserve live UI device/profile changes by restarting the bundled capture @@ -66,6 +76,8 @@ explicit no-camera path. raw packet `pts`, and reset the bundled epoch on client-session changes. - [x] Keep server freshness drops/reanchors active for bundled media. - [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning. +- [x] Reanchor bundled playout when due times drift too far into the future, and + drop packets whose predicted playout age would exceed the one-second budget. - [x] Activate the camera relay before opening the microphone sink so UVC can become ready even if UAC setup is slow. - [x] Log the first bundled video frame handed to the camera sink. diff --git a/Cargo.lock b/Cargo.lock index 33176af..a77bebe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.18.2" +version = "0.18.3" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.18.2" +version = "0.18.3" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.18.2" +version = "0.18.3" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 4cc85a0..842fa20 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.18.2" +version = "0.18.3" edition = "2024" [dependencies] diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 78b1130..04669b3 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -160,7 +160,9 @@ impl LesavkaClientApp { match cli.stream_webcam_media(Request::new(outbound)).await { Ok(mut resp) => { let stop = Arc::new(AtomicBool::new(false)); - let (event_tx, event_rx) = std::sync::mpsc::channel::(); + let (event_tx, event_rx) = std::sync::mpsc::sync_channel::( + BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY, + ); let camera_worker = camera.as_ref().map(|camera| { let camera = Arc::clone(camera); let stop = Arc::clone(&stop); @@ -181,13 +183,16 @@ impl LesavkaClientApp { || desired_source != active_camera_source || desired_profile != active_camera_profile { - let _ = event_tx.send(BundledCaptureEvent::Restart); + stop.store(true, Ordering::Relaxed); + let _ = event_tx.try_send(BundledCaptureEvent::Restart); break; } if let Some(mut pkt) = camera.pull() { let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt); - if event_tx.send(BundledCaptureEvent::Video(pkt)).is_err() { - break; + match event_tx.try_send(BundledCaptureEvent::Video(pkt)) { + Ok(()) => {} + Err(std::sync::mpsc::TrySendError::Full(_)) => continue, + Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break, } } } @@ -211,13 +216,16 @@ impl LesavkaClientApp { || !(state.microphone || state.camera) || desired_source != active_microphone_source { - let _ = event_tx.send(BundledCaptureEvent::Restart); + stop.store(true, Ordering::Relaxed); + let _ = event_tx.try_send(BundledCaptureEvent::Restart); break; } if let Some(mut pkt) = microphone.pull() { let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt); - if event_tx.send(BundledCaptureEvent::Audio(pkt)).is_err() { - break; + match event_tx.try_send(BundledCaptureEvent::Audio(pkt)) { + Ok(()) => {} + Err(std::sync::mpsc::TrySendError::Full(_)) => continue, + Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break, } } } @@ -783,6 +791,9 @@ const BUNDLED_AUDIO_FLUSH_INTERVAL: Duration = Duration::from_millis(20); #[cfg(not(coverage))] const BUNDLED_AUDIO_MAX_PENDING: usize = 8; +#[cfg(not(coverage))] +const BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY: usize = 64; + #[cfg(not(coverage))] #[derive(Debug)] enum BundledCaptureEvent { diff --git a/client/src/input/camera/capture_pipeline.rs b/client/src/input/camera/capture_pipeline.rs index 770f798..b2738c8 100644 --- a/client/src/input/camera/capture_pipeline.rs +++ b/client/src/input/camera/capture_pipeline.rs @@ -257,6 +257,10 @@ impl CameraCapture { packet_duration_us, crate::live_capture_clock::upstream_source_lag_cap(), ); + if timing.lag_clamped { + log_camera_stale_source_drop(timing, map.as_slice().len()); + return None; + } let pts = timing.packet_pts_us; static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); @@ -334,14 +338,31 @@ fn log_camera_timing_sample( source_pts_us = timing.source_pts_us.unwrap_or_default(), source_base_us = timing.source_base_us.unwrap_or_default(), capture_base_us = timing.capture_base_us.unwrap_or_default(), - capture_now_us = timing.capture_now_us, - packet_pts_us = timing.packet_pts_us, - pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, - used_source_pts = timing.used_source_pts, - lag_clamped = timing.lag_clamped, - lead_clamped = timing.lead_clamped, - bytes, - "📸 upstream webcam timing sample" - ); + capture_now_us = timing.capture_now_us, + packet_pts_us = timing.packet_pts_us, + pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, + used_source_pts = timing.used_source_pts, + lag_clamped = timing.lag_clamped, + lead_clamped = timing.lead_clamped, + bytes, + "📸 upstream webcam timing sample" + ); + } } + +fn log_camera_stale_source_drop(timing: crate::live_capture_clock::RebasedSourcePts, bytes: usize) { + static CAMERA_STALE_SOURCE_DROPS: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(0); + let drop_index = + CAMERA_STALE_SOURCE_DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if drop_index < 10 || drop_index.is_multiple_of(300) { + tracing::warn!( + drop_index, + bytes, + source_pts_us = timing.source_pts_us.unwrap_or_default(), + capture_now_us = timing.capture_now_us, + packet_pts_us = timing.packet_pts_us, + "📸 dropping stale webcam source buffer before bundled uplink" + ); + } } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index e58ee3f..3af4d59 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -169,6 +169,10 @@ impl MicrophoneCapture { packet_duration_us, crate::live_capture_clock::upstream_source_lag_cap(), ); + if timing.lag_clamped { + log_microphone_stale_source_drop(timing, map.len()); + return None; + } let pts = timing.packet_pts_us; let target_bytes = mic_packet_target_bytes(); let mut packets = split_audio_sample(pts, map.as_slice(), target_bytes); @@ -377,6 +381,32 @@ fn pcm_payload_duration_us(bytes: usize) -> u64 { ((frames as u128 * 1_000_000u128) / MIC_SAMPLE_RATE as u128).min(u64::MAX as u128) as u64 } +#[cfg(not(coverage))] +fn log_microphone_stale_source_drop( + timing: crate::live_capture_clock::RebasedSourcePts, + bytes: usize, +) { + static MIC_STALE_SOURCE_DROPS: AtomicU64 = AtomicU64::new(0); + let drop_index = MIC_STALE_SOURCE_DROPS.fetch_add(1, Ordering::Relaxed); + if drop_index < 10 || drop_index.is_multiple_of(300) { + warn!( + drop_index, + bytes, + source_pts_us = timing.source_pts_us.unwrap_or_default(), + capture_now_us = timing.capture_now_us, + packet_pts_us = timing.packet_pts_us, + "🎤 dropping stale microphone source buffer before bundled uplink" + ); + } +} + +#[cfg(coverage)] +fn log_microphone_stale_source_drop( + _timing: crate::live_capture_clock::RebasedSourcePts, + _bytes: usize, +) { +} + fn split_audio_sample(base_pts_us: u64, data: &[u8], target_bytes: usize) -> VecDeque { let frame_bytes = (MIC_CHANNELS * MIC_SAMPLE_BYTES).max(1); let target_bytes = frame_aligned_packet_bytes(target_bytes.max(frame_bytes)); diff --git a/common/Cargo.toml b/common/Cargo.toml index 6024b64..4a6fd50 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.18.2" +version = "0.18.3" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index 3f09295..3205025 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -247,15 +247,17 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_TOUCHPAD_SCALE` | input routing/clipboard override | | `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | -| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | +| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | legacy/split server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | | `LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS` | server upstream sync override; how long video may wait past its nominal due time for UAC audio to reach the matching timestamp, defaults to `350` | -| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` | +| `LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US` | bundled webcam server playout override; defaults to `0` because bundled client capture owns A/V sync | +| `LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US` | bundled webcam server playout override; defaults to `0` because bundled client capture owns A/V sync | +| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` and is capped at `1000` | | `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` | | `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` | | `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` | | `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` | | `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | -| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch, defaults to `1090000` for measured MJPEG/UVC browser-visible sync compensation | +| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | legacy/split server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch, defaults to `1090000` for measured MJPEG/UVC browser-visible sync compensation | | `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override | | `LESAVKA_UPLINK_MIC_LEVEL` | client media capture/playback override | | `LESAVKA_INSTALL_UVC_CODEC` | installer override; sets the persisted default UVC webcam codec in `/etc/lesavka/server.env` and `/etc/lesavka/uvc.env` | @@ -312,7 +314,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS` | upstream A/V timing override | | `LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_US` | upstream A/V timing override used by test contracts and server startup grace handling | | `LESAVKA_UPSTREAM_REANCHOR_LATE_MS` | upstream A/V timing override | -| `LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS` | upstream A/V timing override | +| `LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS` | client upstream source freshness cap before camera/mic buffers are treated as stale; defaults to `250` | | `LESAVKA_UVC_CONTROL_READ_ONLY` | UVC helper runtime override | | `LESAVKA_UVC_FRAME_PATH` | UVC helper MJPEG frame spool path | | `LESAVKA_UVC_LOCK_PATH` | UVC helper singleton lock path | diff --git a/server/Cargo.toml b/server/Cargo.toml index 351c7d1..232a289 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.18.2" +version = "0.18.3" edition = "2024" autobins = false diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index b967bb5..adae95d 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -12,10 +12,10 @@ mod state; mod types; use config::{ - apply_playout_offset, upstream_audio_master_wait_grace, upstream_camera_startup_grace_us, - upstream_max_live_lag, upstream_pairing_master_slack, upstream_playout_delay, - upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_require_paired_startup, - upstream_startup_timeout, upstream_timing_trace_enabled, + apply_playout_offset, upstream_audio_master_wait_grace, upstream_bundled_playout_offset_us, + upstream_camera_startup_grace_us, upstream_max_live_lag, upstream_pairing_master_slack, + upstream_playout_delay, upstream_playout_offset_us, upstream_reanchor_late_threshold, + upstream_require_paired_startup, upstream_startup_timeout, upstream_timing_trace_enabled, }; use state::{UpstreamClockState, UpstreamSyncPhase}; pub use types::{ @@ -382,7 +382,7 @@ impl UpstreamMediaRuntime { } *last_slot = Some(local_pts_us); - let sink_offset_us = self.playout_offset_us(kind); + let sink_offset_us = upstream_bundled_playout_offset_us(kind); let epoch = state.playout_epoch.unwrap_or(bundle_epoch); let mut due_at = apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); @@ -390,8 +390,11 @@ impl UpstreamMediaRuntime { let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let playout_delay = upstream_playout_delay().min(max_live_lag); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); - if late_by > reanchor_threshold { - let desired_due_at = now + playout_delay; + let max_future_wait = max_live_lag.saturating_sub(source_lag); + let due_future_wait = due_at.saturating_duration_since(now); + if late_by > reanchor_threshold || due_future_wait > max_future_wait { + let desired_delay = playout_delay.min(max_future_wait); + let desired_due_at = now + desired_delay; let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); let recovered_epoch = unoffset_due_at .checked_sub(Duration::from_micros(local_pts_us)) @@ -412,10 +415,35 @@ impl UpstreamMediaRuntime { ?kind, local_pts_us, remote_pts_us, - recovery_buffer_ms = playout_delay.as_millis(), + recovery_buffer_ms = desired_delay.as_millis(), + max_live_lag_ms = max_live_lag.as_millis(), + source_lag_ms = source_lag.as_millis(), "bundled upstream media playhead reanchored to preserve freshness" ); } + let predicted_lag_at_playout = + source_lag.saturating_add(due_at.saturating_duration_since(now)); + if predicted_lag_at_playout > max_live_lag { + match kind { + UpstreamMediaKind::Camera => { + state.stale_video_drops = state.stale_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.last_reason = + "dropped bundled video that would exceed max live lag at playout" + .to_string(); + } + UpstreamMediaKind::Microphone => { + state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); + state.last_reason = + "dropped bundled audio that would exceed max live lag at playout" + .to_string(); + } + } + state.phase = UpstreamSyncPhase::Healing; + return UpstreamPlanDecision::DropStale( + "bundled packet would exceed max live lag at playout", + ); + } if kind == UpstreamMediaKind::Microphone { self.audio_progress_notify.notify_waiters(); diff --git a/server/src/upstream_media_runtime/config.rs b/server/src/upstream_media_runtime/config.rs index 80694e5..d2a3f00 100644 --- a/server/src/upstream_media_runtime/config.rs +++ b/server/src/upstream_media_runtime/config.rs @@ -30,7 +30,7 @@ pub(super) fn upstream_max_live_lag() -> Duration { .ok() .and_then(|value| value.trim().parse::().ok()) .unwrap_or(1_000); - Duration::from_millis(lag_ms.max(1)) + Duration::from_millis(lag_ms.clamp(1, 1_000)) } pub(super) fn upstream_startup_timeout() -> Duration { @@ -71,6 +71,17 @@ pub(super) fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 { .unwrap_or(default_offset_us) } +pub(super) fn upstream_bundled_playout_offset_us(kind: UpstreamMediaKind) -> i64 { + let name = match kind { + UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US", + UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US", + }; + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(0) +} + pub(super) fn upstream_pairing_master_slack() -> Duration { let slack_us = std::env::var("LESAVKA_UPSTREAM_PAIR_SLACK_US") .ok() diff --git a/server/src/upstream_media_runtime/tests/config.rs b/server/src/upstream_media_runtime/tests/config.rs index 3e9f515..c3db808 100644 --- a/server/src/upstream_media_runtime/tests/config.rs +++ b/server/src/upstream_media_runtime/tests/config.rs @@ -25,6 +25,10 @@ fn upstream_max_live_lag_defaults_to_one_second_and_accepts_overrides() { temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("750"), || { assert_eq!(super::upstream_max_live_lag(), Duration::from_millis(750)); }); + + temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("5000"), || { + assert_eq!(super::upstream_max_live_lag(), Duration::from_secs(1)); + }); } #[test] @@ -102,6 +106,44 @@ fn upstream_playout_offsets_default_to_mjpeg_calibration_and_accept_overrides() ); } +#[test] +#[serial(upstream_media_runtime)] +fn bundled_playout_offsets_default_to_zero_and_accept_explicit_overrides() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US", || { + temp_env::with_var_unset("LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US", || { + assert_eq!( + super::upstream_bundled_playout_offset_us(UpstreamMediaKind::Microphone), + 0 + ); + assert_eq!( + super::upstream_bundled_playout_offset_us(UpstreamMediaKind::Camera), + 0 + ); + }); + }); + + temp_env::with_var( + "LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US", + Some("12000"), + || { + temp_env::with_var( + "LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US", + Some("-3000"), + || { + assert_eq!( + super::upstream_bundled_playout_offset_us(UpstreamMediaKind::Microphone), + 12_000 + ); + assert_eq!( + super::upstream_bundled_playout_offset_us(UpstreamMediaKind::Camera), + -3_000 + ); + }, + ); + }, + ); +} + #[test] #[serial(upstream_media_runtime)] fn upstream_pairing_master_slack_defaults_to_eighty_ms_and_accepts_overrides() { diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/server/src/upstream_media_runtime/tests/planning.rs index 456a493..2415e01 100644 --- a/server/src/upstream_media_runtime/tests/planning.rs +++ b/server/src/upstream_media_runtime/tests/planning.rs @@ -76,6 +76,71 @@ fn bundled_media_uses_client_epoch_without_pairing_wait() { }); } +#[test] +#[serial(upstream_media_runtime)] +fn bundled_media_ignores_legacy_static_calibration_offsets_by_default() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + temp_env::with_var_unset("LESAVKA_UPSTREAM_BUNDLED_AUDIO_PLAYOUT_OFFSET_US", || { + temp_env::with_var_unset("LESAVKA_UPSTREAM_BUNDLED_VIDEO_PLAYOUT_OFFSET_US", || { + let runtime = UpstreamMediaRuntime::new(); + runtime.set_playout_offsets(1_090_000, 0); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + let epoch = tokio::time::Instant::now(); + + let audio = play(runtime.plan_bundled_pts( + super::UpstreamMediaKind::Microphone, + 1_000_000, + 1, + 1_000_000, + epoch, + )); + let video = play(runtime.plan_bundled_pts( + super::UpstreamMediaKind::Camera, + 1_000_000, + 16_666, + 1_000_000, + epoch, + )); + + assert_eq!(audio.due_at, video.due_at); + assert_eq!(audio.local_pts_us, video.local_pts_us); + }); + }); + }); +} + +#[test] +#[serial(upstream_media_runtime)] +fn bundled_media_reanchors_future_wait_inside_one_second_freshness_budget() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("350"), || { + temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("1000"), || { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + let now = tokio::time::Instant::now(); + let stale_epoch = now + Duration::from_secs(20); + + let video = play(runtime.plan_bundled_pts( + super::UpstreamMediaKind::Camera, + 1_000_000, + 16_666, + 1_000_000, + stale_epoch, + )); + + assert!( + video + .due_at + .saturating_duration_since(tokio::time::Instant::now()) + <= Duration::from_secs(1), + "bundled playout must not preserve a many-second future wait" + ); + assert!(runtime.snapshot().freshness_reanchors >= 1); + }); + }); +} + #[test] #[serial(upstream_media_runtime)] fn pairing_window_holds_one_sided_playout_by_default() {