diff --git a/AGENTS.md b/AGENTS.md index b3528d0..72df1c6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,6 +1,6 @@ # Lesavka Agent Notes -## 0.17.38 Bundled Webcam A/V Migration Checklist +## 0.17.39 Bundled Webcam A/V Migration Checklist Context: manual Google Meet and mirrored-probe testing showed the split webcam and microphone uplink design is too fragile under real browser/device pressure. @@ -21,8 +21,10 @@ explicit no-camera path. independently pairing separate camera and microphone streams. - [x] Mic-only sessions keep the existing microphone stream path. - [x] Legacy split webcam/mic uplink is only an explicit compatibility escape hatch. -- [ ] Manual probes and diagnostics clearly label `bundled-webcam-media` versus +- [x] Manual probes and diagnostics clearly label `bundled-webcam-media` versus `mic-only` so we never confuse the architectures during debugging. +- [x] Sync protection takes precedence over freshness and smoothness: bad mixed + bundle timing is dropped coherently instead of letting one side play alone. ### Wire Protocol - [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or @@ -42,7 +44,7 @@ explicit no-camera path. can add misleading delay. - [x] Preserve live UI device/profile changes by restarting the bundled capture pipeline when selected camera, camera quality, or microphone changes. -- [ ] Make launcher diagnostics expose the active upstream mode as first-class +- [x] Make launcher diagnostics expose the active upstream mode as first-class text rather than inferring from separate camera/mic telemetry. - [ ] Migrate sync-probe runner to the bundled path explicitly and remove any normal probe dependence on split `StreamCamera` + `StreamMicrophone`. @@ -52,7 +54,12 @@ explicit no-camera path. for one upstream session. - [x] Schedule bundled packets by shared client capture timestamp instead of startup-pairing independent streams. +- [x] Sanitize packet timestamps before bundling so stale/future source PTS values + cannot become the server's A/V sync truth. +- [x] Make server bundled scheduling use the client capture sidecar rather than + raw packet `pts`, and reset the bundled epoch on client-session changes. - [x] Keep server freshness drops/reanchors active for bundled media. +- [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning. - [x] Continue reporting client timing and sink handoff diagnostics from bundled packets. - [ ] Add bundled-mode counters for first bundle, first audio push, first video feed, dropped stale bundles, and bundle queue age. diff --git a/Cargo.lock b/Cargo.lock index 09ae1ac..735a7fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.38" +version = "0.17.39" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.38" +version = "0.17.39" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.38" +version = "0.17.39" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 0a66ad5..8f920c6 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.38" +version = "0.17.39" edition = "2024" [dependencies] diff --git a/client/src/app/session_lifecycle.rs b/client/src/app/session_lifecycle.rs index 1fa963b..6e51512 100644 --- a/client/src/app/session_lifecycle.rs +++ b/client/src/app/session_lifecycle.rs @@ -82,6 +82,15 @@ impl LesavkaClientApp { media_state.camera, media_state.microphone, ); + uplink_telemetry.record_upstream_mode(if bundled_webcam_media { + "bundled-webcam-media" + } else if camera_enabled && std::env::var("LESAVKA_LEGACY_SPLIT_UPLINK").is_ok() { + "legacy-split-webcam" + } else if microphone_available { + "mic-only" + } else { + "disabled" + }); /*────────── persistent gRPC channels ──────────*/ let hid_ep = relay_transport::endpoint(&self.server_addr)? diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 9f4fac0..78b1130 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -987,7 +987,8 @@ fn age_between_capture_and_enqueue(capture_pts_us: u64, enqueue_pts_us: u64) -> fn stamp_audio_timing_metadata_at_enqueue(packet: &mut AudioPacket) -> Duration { static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0); let enqueue_pts_us = crate::live_capture_clock::capture_pts_us(); - let capture_pts_us = packet.pts.min(enqueue_pts_us); + let capture_pts_us = sanitized_capture_pts_us(packet.pts, enqueue_pts_us); + packet.pts = capture_pts_us; packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); packet.client_capture_pts_us = capture_pts_us; packet.client_send_pts_us = enqueue_pts_us; @@ -998,13 +999,27 @@ fn stamp_audio_timing_metadata_at_enqueue(packet: &mut AudioPacket) -> Duration fn stamp_video_timing_metadata_at_enqueue(packet: &mut VideoPacket) -> Duration { static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0); let enqueue_pts_us = crate::live_capture_clock::capture_pts_us(); - let capture_pts_us = packet.pts.min(enqueue_pts_us); + let capture_pts_us = sanitized_capture_pts_us(packet.pts, enqueue_pts_us); + packet.pts = capture_pts_us; packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); packet.client_capture_pts_us = capture_pts_us; packet.client_send_pts_us = enqueue_pts_us; age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us) } +#[cfg(not(coverage))] +fn sanitized_capture_pts_us(packet_pts_us: u64, enqueue_pts_us: u64) -> u64 { + let mut capture_pts_us = packet_pts_us.min(enqueue_pts_us); + let max_lag_us = crate::live_capture_clock::upstream_source_lag_cap() + .as_micros() + .min(u64::MAX as u128) as u64; + let lag_floor_us = enqueue_pts_us.saturating_sub(max_lag_us); + if capture_pts_us < lag_floor_us { + capture_pts_us = lag_floor_us; + } + capture_pts_us +} + #[cfg(not(coverage))] fn attach_audio_queue_metadata( packet: &mut AudioPacket, @@ -1205,4 +1220,32 @@ mod uplink_timing_tests { "capture-to-enqueue age, not async pop delay, should define the timing window" ); } + + #[test] + fn stale_source_timestamps_are_clamped_before_bundling() { + let enqueue_pts_us = crate::live_capture_clock::capture_pts_us(); + let stale_pts_us = enqueue_pts_us.saturating_sub(30_000_000); + let mut audio = AudioPacket { + pts: stale_pts_us, + ..AudioPacket::default() + }; + let mut video = VideoPacket { + pts: stale_pts_us, + ..VideoPacket::default() + }; + + let audio_age = stamp_audio_timing_metadata_at_enqueue(&mut audio); + let video_age = stamp_video_timing_metadata_at_enqueue(&mut video); + + assert_eq!(audio.pts, audio.client_capture_pts_us); + assert_eq!(video.pts, video.client_capture_pts_us); + assert!( + audio_age <= crate::live_capture_clock::upstream_source_lag_cap(), + "audio capture timestamp should not resurrect stale source timing" + ); + assert!( + video_age <= crate::live_capture_clock::upstream_source_lag_cap(), + "video capture timestamp should not resurrect stale source timing" + ); + } } diff --git a/client/src/launcher/diagnostics/diagnostics_models.rs b/client/src/launcher/diagnostics/diagnostics_models.rs index d75ebf0..78fb572 100644 --- a/client/src/launcher/diagnostics/diagnostics_models.rs +++ b/client/src/launcher/diagnostics/diagnostics_models.rs @@ -52,6 +52,7 @@ pub struct PerformanceSample { pub right_rendered_caps_label: String, pub upstream_camera: UpstreamStreamTelemetry, pub upstream_microphone: UpstreamStreamTelemetry, + pub upstream_mode: String, pub dropped_frames: u64, pub queue_depth: u32, } @@ -160,6 +161,7 @@ pub struct SnapshotReport { pub mic_gain_label: String, pub upstream_camera: UpstreamStreamTelemetry, pub upstream_microphone: UpstreamStreamTelemetry, + pub upstream_mode: String, pub av_delivery_skew_ms: f32, pub av_enqueue_skew_ms: f32, pub av_sync_health: String, diff --git a/client/src/launcher/diagnostics/snapshot_report.rs b/client/src/launcher/diagnostics/snapshot_report.rs index 42c1b53..b05a9a9 100644 --- a/client/src/launcher/diagnostics/snapshot_report.rs +++ b/client/src/launcher/diagnostics/snapshot_report.rs @@ -21,6 +21,10 @@ impl SnapshotReport { let upstream_microphone = latest .map(|sample| sample.upstream_microphone.clone()) .unwrap_or_default(); + let upstream_mode = latest + .map(|sample| sample.upstream_mode.clone()) + .filter(|mode| !mode.trim().is_empty()) + .unwrap_or_else(|| "unknown".to_string()); let av_delivery_skew_ms = (upstream_camera.latest_delivery_age_ms - upstream_microphone.latest_delivery_age_ms) .abs(); @@ -235,6 +239,7 @@ impl SnapshotReport { mic_gain_label: state.mic_gain_label(), upstream_camera, upstream_microphone, + upstream_mode, av_delivery_skew_ms, av_enqueue_skew_ms, av_sync_health, diff --git a/client/src/launcher/diagnostics/snapshot_report_text.rs b/client/src/launcher/diagnostics/snapshot_report_text.rs index b319ea9..0ac1069 100644 --- a/client/src/launcher/diagnostics/snapshot_report_text.rs +++ b/client/src/launcher/diagnostics/snapshot_report_text.rs @@ -119,6 +119,7 @@ impl SnapshotReport { " uplink microphone: {}", uplink_summary(&self.upstream_microphone) ); + let _ = writeln!(text, " uplink mode: {}", self.upstream_mode); let _ = writeln!(text, "av sync guardrails"); let _ = writeln!( text, @@ -269,7 +270,8 @@ impl SnapshotReport { ); let _ = writeln!( text, - " uplink: cam={} mic={}", + " uplink: mode={} cam={} mic={}", + sample.upstream_mode, uplink_summary(&sample.upstream_camera), uplink_summary(&sample.upstream_microphone) ); diff --git a/client/src/launcher/tests/diagnostics.rs b/client/src/launcher/tests/diagnostics.rs index f4c944e..df68c59 100644 --- a/client/src/launcher/tests/diagnostics.rs +++ b/client/src/launcher/tests/diagnostics.rs @@ -87,6 +87,7 @@ fn sample(n: u64) -> PerformanceSample { delivery_age_peak_ms: 37.0, last_error: String::new(), }, + upstream_mode: "bundled-webcam-media".to_string(), dropped_frames: n, queue_depth: n as u32, } @@ -226,6 +227,7 @@ fn snapshot_text_mentions_versions_profiles_and_recommendations() { assert!(text.contains("media staging")); assert!(text.contains("uplink camera:")); assert!(text.contains("uplink microphone:")); + assert!(text.contains("uplink mode:")); assert!(text.contains("current UI state")); assert!(text.contains("recommendations")); } @@ -271,7 +273,7 @@ fn snapshot_text_renders_recent_samples_and_notes() { assert!(text.contains("server: unknown (reachable)")); assert!(text.contains("rtt=23.0ms")); assert!(text.contains("server=lx264enc:42/48/4")); - assert!(text.contains("uplink: cam=live queue=3/7")); + assert!(text.contains("uplink: mode=bundled-webcam-media cam=live queue=3/7")); assert!(text.contains("notes")); assert!(text.contains("operator changed camera quality during the run")); } diff --git a/client/src/launcher/ui/diagnostic_sampling.rs b/client/src/launcher/ui/diagnostic_sampling.rs index e21351e..64775c1 100644 --- a/client/src/launcher/ui/diagnostic_sampling.rs +++ b/client/src/launcher/ui/diagnostic_sampling.rs @@ -153,6 +153,10 @@ fn record_diagnostics_sample( upstream_microphone: uplink .map(|snapshot| snapshot.microphone.clone()) .unwrap_or_default(), + upstream_mode: uplink + .map(|snapshot| snapshot.upstream_mode.clone()) + .filter(|mode| !mode.trim().is_empty()) + .unwrap_or_else(|| "unknown".to_string()), dropped_frames: left_metrics .dropped_frames .saturating_add(right_metrics.dropped_frames), diff --git a/client/src/uplink_telemetry.rs b/client/src/uplink_telemetry.rs index 510336b..0670005 100644 --- a/client/src/uplink_telemetry.rs +++ b/client/src/uplink_telemetry.rs @@ -21,6 +21,9 @@ const FLUSH_INTERVAL: Duration = Duration::from_millis(250); pub struct UplinkTelemetrySnapshot { /// Last time the relay child wrote this snapshot, in Unix milliseconds. pub updated_at_unix_ms: u128, + /// Active upstream architecture for the current relay child. + #[serde(default)] + pub upstream_mode: String, /// Upstream webcam queue telemetry. pub camera: UpstreamStreamTelemetry, /// Upstream microphone queue telemetry. @@ -107,6 +110,7 @@ impl UplinkTelemetryPublisher { path, snapshot: UplinkTelemetrySnapshot { updated_at_unix_ms: unix_time_ms(), + upstream_mode: "starting".to_string(), camera: UpstreamStreamTelemetry { enabled: camera_enabled, ..UpstreamStreamTelemetry::default() @@ -134,6 +138,15 @@ impl UplinkTelemetryPublisher { } } + /// Publishes the selected upstream transport architecture. + pub fn record_upstream_mode(&self, mode: impl AsRef) { + if let Ok(mut state) = self.inner.lock() { + state.snapshot.updated_at_unix_ms = unix_time_ms(); + state.snapshot.upstream_mode = mode.as_ref().trim().to_string(); + write_snapshot(&mut state, true); + } + } + /// Forces an immediate write of the current snapshot. pub fn flush_now(&self) { if let Ok(mut state) = self.inner.lock() { diff --git a/common/Cargo.toml b/common/Cargo.toml index ce21c74..046b2ad 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.38" +version = "0.17.39" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 673bb64..82c753e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.38" +version = "0.17.39" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 5e884f4..18cf840 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -8,9 +8,13 @@ enum BundledUpstreamEvent { #[cfg(not(coverage))] impl BundledUpstreamEvent { fn remote_pts_us(&self) -> u64 { + self.client_timing().capture_pts_us + } + + fn client_timing(&self) -> UpstreamClientTiming { match self { - Self::Audio(packet) => packet.pts, - Self::Video(packet) => packet.pts, + Self::Audio(packet) => audio_client_timing(packet), + Self::Video(packet) => video_client_timing(packet), } } @@ -33,15 +37,11 @@ struct BundledPlayoutClock { impl BundledPlayoutClock { fn ensure( &mut self, - bundle: &UpstreamMediaBundle, + _bundle: &UpstreamMediaBundle, events: &[BundledUpstreamEvent], ) -> Option<(u64, tokio::time::Instant)> { if self.base_remote_pts_us.is_none() || self.epoch.is_none() { - let base = if bundle.capture_start_us != 0 { - bundle.capture_start_us - } else { - events.iter().map(BundledUpstreamEvent::remote_pts_us).min()? - }; + let base = events.iter().map(BundledUpstreamEvent::remote_pts_us).min()?; self.base_remote_pts_us = Some(base); self.epoch = Some(tokio::time::Instant::now() + bundled_upstream_playout_delay()); } @@ -52,6 +52,63 @@ impl BundledPlayoutClock { } } +#[cfg(not(coverage))] +const BUNDLED_CAPTURE_BOUND_TOLERANCE_US: u64 = 50_000; +#[cfg(not(coverage))] +const BUNDLED_MIXED_CAPTURE_SPAN_DROP_US: u64 = 250_000; + +#[cfg(not(coverage))] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +struct BundledTimingSummary { + has_audio: bool, + has_video: bool, + min_event_pts_us: u64, + max_event_pts_us: u64, + capture_span_us: u64, + capture_bounds_match: bool, + mixed_span_too_wide: bool, +} + +#[cfg(not(coverage))] +impl BundledTimingSummary { + fn mixed(self) -> bool { + self.has_audio && self.has_video + } +} + +#[cfg(not(coverage))] +fn summarize_bundled_timing( + bundle: &UpstreamMediaBundle, + events: &[BundledUpstreamEvent], +) -> Option { + let min_event_pts_us = events.iter().map(BundledUpstreamEvent::remote_pts_us).min()?; + let max_event_pts_us = events.iter().map(BundledUpstreamEvent::remote_pts_us).max()?; + let has_audio = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Audio(_))); + let has_video = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Video(_))); + let start_matches = bundle.capture_start_us == 0 + || abs_delta_us(bundle.capture_start_us, min_event_pts_us) + <= BUNDLED_CAPTURE_BOUND_TOLERANCE_US; + let end_matches = bundle.capture_end_us == 0 + || abs_delta_us(bundle.capture_end_us, max_event_pts_us) <= BUNDLED_CAPTURE_BOUND_TOLERANCE_US; + let capture_span_us = max_event_pts_us.saturating_sub(min_event_pts_us); + Some(BundledTimingSummary { + has_audio, + has_video, + min_event_pts_us, + max_event_pts_us, + capture_span_us, + capture_bounds_match: start_matches && end_matches, + mixed_span_too_wide: has_audio + && has_video + && capture_span_us > BUNDLED_MIXED_CAPTURE_SPAN_DROP_US, + }) +} + +#[cfg(not(coverage))] +fn abs_delta_us(left: u64, right: u64) -> u64 { + left.max(right) - left.min(right) +} + #[cfg(not(coverage))] fn bundled_upstream_playout_delay() -> Duration { std::env::var("LESAVKA_UPSTREAM_BUNDLED_PLAYOUT_DELAY_MS") @@ -233,6 +290,8 @@ impl Relay for Handler { let _microphone_sink_permit = microphone_sink_permit; let mut inbound = req.into_inner(); let mut clock = BundledPlayoutClock::default(); + let mut last_bundle_session_id = None; + let mut last_bundle_seq = None; let mut outcome = "aborted"; 'bundled_loop: loop { let bundle = match inbound.next().await { @@ -274,9 +333,62 @@ impl Relay for Handler { continue; } events.sort_by_key(BundledUpstreamEvent::remote_pts_us); + if last_bundle_session_id.is_some_and(|session_id| session_id != bundle.session_id) { + warn!( + rpc_id, + previous_session_id = last_bundle_session_id.unwrap_or_default(), + next_session_id = bundle.session_id, + "📦 bundled upstream client session changed inside one gRPC stream; resetting playout epoch" + ); + clock = BundledPlayoutClock::default(); + last_bundle_seq = None; + } + last_bundle_session_id = Some(bundle.session_id); + if last_bundle_seq.is_some_and(|seq| bundle.seq <= seq) { + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + previous_bundle_seq = last_bundle_seq.unwrap_or_default(), + "📦 bundled upstream packet sequence moved backwards; dropping duplicate/stale bundle" + ); + continue; + } + last_bundle_seq = Some(bundle.seq); + let Some(timing_summary) = summarize_bundled_timing(&bundle, &events) else { + continue; + }; + if !timing_summary.capture_bounds_match { + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + capture_start_us = bundle.capture_start_us, + capture_end_us = bundle.capture_end_us, + event_min_us = timing_summary.min_event_pts_us, + event_max_us = timing_summary.max_event_pts_us, + "📦 bundled upstream capture bounds disagreed with packet timing; using packet sidecar timing" + ); + } + if timing_summary.mixed_span_too_wide { + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + span_ms = timing_summary.capture_span_us / 1000, + "📦 bundled mixed A/V capture span is too wide; dropping the bundle to protect sync" + ); + continue; + } let Some((base_remote_pts_us, epoch)) = clock.ensure(&bundle, &events) else { continue; }; + let mixed_bundle = timing_summary.mixed(); + let mut planned_events = Vec::with_capacity(events.len()); + let mut drop_mixed_bundle = false; for event in events { let kind = event.kind(); let min_step_us = match kind { @@ -291,7 +403,12 @@ impl Relay for Handler { epoch, ) { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, - lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => continue, + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { + if mixed_bundle { + drop_mixed_bundle = true; + } + continue; + } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => { tracing::warn!( rpc_id, @@ -300,6 +417,9 @@ impl Relay for Handler { reason, "📦 bundled upstream packet dropped by freshness planner" ); + if mixed_bundle { + drop_mixed_bundle = true; + } continue; } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => continue, @@ -322,8 +442,25 @@ impl Relay for Handler { pts = plan.local_pts_us, "📦 bundled upstream packet dropped after missing freshness budget" ); + if mixed_bundle { + drop_mixed_bundle = true; + } continue; } + planned_events.push((event, plan)); + } + if drop_mixed_bundle { + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + "📦 dropping mixed A/V bundle coherently because one side failed sync/freshness planning" + ); + continue; + } + for (event, plan) in planned_events { + let kind = event.kind(); tokio::time::sleep_until(plan.due_at).await; let actual_late_by = tokio::time::Instant::now() .checked_duration_since(plan.due_at) @@ -337,6 +474,16 @@ impl Relay for Handler { pts = plan.local_pts_us, "📦 bundled upstream packet dropped after waking too late" ); + if mixed_bundle { + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + "📦 stopping the rest of this mixed bundle after a late wake to avoid asymmetric playout" + ); + break; + } continue; } match event { diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index 2235403..c401dbb 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -1,8 +1,11 @@ #[cfg(all(test, not(coverage)))] #[allow(clippy::items_after_test_module)] mod tests { - use super::{UpstreamStreamCleanup, retain_freshest_audio_packet, retain_freshest_video_packet}; - use lesavka_common::lesavka::{AudioPacket, VideoPacket}; + use super::{ + BundledUpstreamEvent, UpstreamStreamCleanup, retain_freshest_audio_packet, + retain_freshest_video_packet, summarize_bundled_timing, + }; + use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket}; use lesavka_server::upstream_media_runtime::{ UpstreamMediaKind, UpstreamMediaRuntime, UpstreamClientTiming, }; @@ -94,6 +97,51 @@ mod tests { ); } + #[test] + fn bundled_event_timing_uses_client_capture_sidecar_not_packet_pts() { + let video = BundledUpstreamEvent::Video(VideoPacket { + pts: 9_999_000, + client_capture_pts_us: 1_000_000, + client_send_pts_us: 1_001_000, + ..Default::default() + }); + let audio = BundledUpstreamEvent::Audio(AudioPacket { + pts: 8_888_000, + client_capture_pts_us: 1_020_000, + client_send_pts_us: 1_021_000, + ..Default::default() + }); + + assert_eq!(video.remote_pts_us(), 1_000_000); + assert_eq!(audio.remote_pts_us(), 1_020_000); + } + + #[test] + fn bundled_timing_summary_flags_bad_bounds_and_wide_mixed_span() { + let events = vec![ + BundledUpstreamEvent::Video(VideoPacket { + client_capture_pts_us: 1_000_000, + ..Default::default() + }), + BundledUpstreamEvent::Audio(AudioPacket { + client_capture_pts_us: 1_400_000, + ..Default::default() + }), + ]; + let bundle = UpstreamMediaBundle { + capture_start_us: 100, + capture_end_us: 200, + ..Default::default() + }; + + let summary = summarize_bundled_timing(&bundle, &events).expect("timing summary"); + + assert!(summary.mixed()); + assert_eq!(summary.capture_span_us, 400_000); + assert!(!summary.capture_bounds_match); + assert!(summary.mixed_span_too_wide); + } + #[test] fn upstream_cleanup_guard_closes_its_microphone_generation() { let runtime = Arc::new(UpstreamMediaRuntime::new());