diff --git a/AGENTS.md b/AGENTS.md index 9de8374..7c1cb4c 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -94,3 +94,58 @@ Context: Google Meet testing on 2026-04-30 showed audio roughly 8 seconds behind - [x] `LESAVKA_REQUIRE_SYNC_PROBE=1 ./scripts/ci/media_reliability_gate.sh` - Used a synthetic passing report at `target/media-reliability-gate/sync-probe/report.json` to verify gate parsing/enforcement. - This validates CI glue only; a real Theia/Tethys probe is still required for product judgment. + +## Real Upstream Lip-Sync Fix Checklist + +Context: the mirrored browser probe finally reproduced the real failure class on 2026-05-01: +`activity_start_delta_ms=+9591.1`. This means the end-to-end browser-visible path can still start video far ahead of audio. The fix target is not silence in the logs; it is a freshness-first A/V uplink whose startup can heal briefly but cannot drift into seconds of skew. + +### Acceptance Criteria +- [ ] Mirrored browser probe passes with `activity_start_delta_ms <= 1000`. +- [ ] Steady-state preferred sync: median skew within `35 ms`. +- [ ] Steady-state acceptable sync: p95 absolute skew within `80 ms`. +- [ ] Any sustained or startup A/V split near `1000 ms` remains a hard failure. +- [ ] No stale audio backlog is ever drained into UAC to catch up. +- [ ] No stale video backlog is ever drained into UVC to catch up. +- [ ] Google Meet manual testing agrees with the mirrored probe instead of revealing hidden seconds-scale skew. + +### Phase 0: Keep The Probe Honest +- [x] Split raw activity-start fields from filtered/coded paired-pulse fields in probe reports. +- [x] Print explicit raw first-video and first-audio timestamps in `report.txt`. +- [ ] Keep the mirrored browser probe as the release/blocking upstream A/V gate. +- [ ] Keep the old raw-device probe as a lower-level diagnostic only. + +### Phase 1: Stop One-Sided Startup Drift +- [x] Default upstream planning must require both camera and microphone before live playout. +- [x] One-sided playout may only happen through an explicit compatibility override. +- [x] While pairing is overdue, keep replacing the waiting-side anchor with fresh packets instead of preserving stale startup anchors. +- [x] While awaiting the peer stream, keep only fresh pending camera packets. +- [x] While awaiting the peer stream, keep only fresh pending microphone packets. +- [x] Add tests proving the pairing window no longer expires into one-sided playout by default. +- [x] Add tests proving the explicit one-sided override still works for intentional single-stream scenarios. + +### Phase 2: Bound UAC Freshness +- [x] Configure UAC `appsrc` as non-blocking and bounded. +- [x] Log and drop UAC appsrc push failures instead of treating enqueue as guaranteed playback. +- [ ] Flush/stop UAC cleanly on session close, replacement, and recovery. +- [x] Add tests or contract coverage for bounded UAC settings where practical. + +### Phase 3: Add Real Timing Evidence +- [ ] Add server timing counters for first camera packet, first mic packet, first UVC write, and first UAC push per session. +- [ ] Add dropped-stale audio/video counters to diagnostics. +- [ ] Add a concise health explanation when startup pairing exceeds the healing window. +- [ ] Surface `Starting`, `Healing`, `Flowing`, `Lagging`, `Dropping`, and `Stale` states in chips/diagnostics from real path evidence. + +### Phase 4: Recovery And Mid-Session Changes +- [ ] Make device changes trigger soft-pause, stream replacement, queue flush, and re-pairing. +- [ ] Keep recovery soft-first; reserve hard UVC/UAC gadget rebuilds for explicit guarded recoveries. +- [ ] Add cooldown/state guards so recovery buttons cannot wedge Theia. +- [ ] Ensure disconnect closes all client/server media tasks for the session. + +### Phase 5: Verification Loop +- [x] Run focused upstream runtime tests. +- [x] Run server/client media contract tests. +- [x] Run `cargo check` for touched packages. +- [x] Bump version for the fix release. +- [ ] Run the mirrored browser probe on installed client/server. +- [ ] Run Google Meet manual validation. diff --git a/Cargo.lock b/Cargo.lock index 5603b67..4a28908 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.16.16" +version = "0.16.17" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.16.16" +version = "0.16.17" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.16.16" +version = "0.16.17" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 1e3dcc8..b400872 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.16.16" +version = "0.16.17" edition = "2024" [dependencies] diff --git a/client/src/bin/lesavka-sync-analyze.rs b/client/src/bin/lesavka-sync-analyze.rs index 46b3b15..96c3c56 100644 --- a/client/src/bin/lesavka-sync-analyze.rs +++ b/client/src/bin/lesavka-sync-analyze.rs @@ -167,6 +167,9 @@ A/V sync report for {capture} - audio onsets: {audio_events} - paired pulses: {paired_events} - activity start delta: {activity_start_delta:+.1} ms (audio after video is positive) +- raw first video activity: {raw_video:.3} s +- raw first audio activity: {raw_audio:.3} s +- paired window first video/audio: {paired_video:.3} s / {paired_audio:.3} s - first skew: {first_skew:+.1} ms (audio after video is positive) - last skew: {last_skew:+.1} ms - mean skew: {mean_skew:+.1} ms @@ -187,6 +190,10 @@ A/V sync report for {capture} audio_events = report.audio_event_count, paired_events = report.paired_event_count, activity_start_delta = report.activity_start_delta_ms, + raw_video = report.raw_first_video_activity_s, + raw_audio = report.raw_first_audio_activity_s, + paired_video = report.video_onsets_s.first().copied().unwrap_or(0.0), + paired_audio = report.audio_onsets_s.first().copied().unwrap_or(0.0), first_skew = report.first_skew_ms, last_skew = report.last_skew_ms, mean_skew = report.mean_skew_ms, diff --git a/client/src/sync_probe/analyze/onset_detection/correlation.rs b/client/src/sync_probe/analyze/onset_detection/correlation.rs index a20115d..757ad1f 100644 --- a/client/src/sync_probe/analyze/onset_detection/correlation.rs +++ b/client/src/sync_probe/analyze/onset_detection/correlation.rs @@ -34,7 +34,10 @@ pub(super) fn correlate_onsets( bail!("pulse period must stay positive"); } - let activity_start_delta_ms = (audio_onsets_s[0] - video_onsets_s[0]) * 1000.0; + let raw_first_video_activity_s = video_onsets_s[0]; + let raw_first_audio_activity_s = audio_onsets_s[0]; + let activity_start_delta_ms = + (raw_first_audio_activity_s - raw_first_video_activity_s) * 1000.0; let (video_onsets_s, audio_onsets_s, common_window) = trim_onsets_to_common_activity_window(video_onsets_s, audio_onsets_s, max_pair_gap_s); let expected_start_skew_ms = (audio_onsets_s[0] - video_onsets_s[0]) * 1000.0; @@ -72,6 +75,8 @@ pub(super) fn correlate_onsets( common_window.filter_onsets(video_onsets_s), common_window.filter_onsets(audio_onsets_s), activity_start_delta_ms, + raw_first_video_activity_s, + raw_first_audio_activity_s, pairs, )) } @@ -124,7 +129,10 @@ pub(crate) fn correlate_segments( bail!("audio onset list is empty"); } - let activity_start_delta_ms = (audio_onsets_s[0] - video_onsets_s[0]) * 1000.0; + let raw_first_video_activity_s = video_onsets_s[0]; + let raw_first_audio_activity_s = audio_onsets_s[0]; + let activity_start_delta_ms = + (raw_first_audio_activity_s - raw_first_video_activity_s) * 1000.0; let (video_onsets_s, audio_onsets_s, common_window) = trim_onsets_to_common_activity_window(&video_onsets_s, &audio_onsets_s, max_pair_gap_s); let expected_start_skew_ms = (audio_onsets_s[0] - video_onsets_s[0]) * 1000.0; @@ -171,6 +179,8 @@ pub(crate) fn correlate_segments( video_onsets_s, audio_onsets_s, activity_start_delta_ms, + raw_first_video_activity_s, + raw_first_audio_activity_s, pairs, )) } @@ -219,7 +229,10 @@ pub(crate) fn correlate_coded_segments( .iter() .map(|segment| segment.start_s) .collect::>(); - let activity_start_delta_ms = (audio_onsets_s[0] - video_onsets_s[0]) * 1000.0; + let raw_first_video_activity_s = video_onsets_s[0]; + let raw_first_audio_activity_s = audio_onsets_s[0]; + let activity_start_delta_ms = + (raw_first_audio_activity_s - raw_first_video_activity_s) * 1000.0; let (_, _, common_window) = trim_onsets_to_common_activity_window(&video_onsets_s, &audio_onsets_s, max_pair_gap_s); let filtered_video_segments = filter_segments_to_window(&video_segments, common_window); @@ -287,6 +300,8 @@ pub(crate) fn correlate_coded_segments( &video_onsets_s, &audio_onsets_s, activity_start_delta_ms, + raw_first_video_activity_s, + raw_first_audio_activity_s, pairs, )) } @@ -724,6 +739,8 @@ fn sync_report_from_pairs( video_onsets_s: &[f64], audio_onsets_s: &[f64], activity_start_delta_ms: f64, + raw_first_video_activity_s: f64, + raw_first_audio_activity_s: f64, pairs: Vec, ) -> SyncAnalysisReport { let paired_events = pairs @@ -758,6 +775,8 @@ fn sync_report_from_pairs( audio_event_count: audio_onsets_s.len(), paired_event_count: skews_ms.len(), activity_start_delta_ms, + raw_first_video_activity_s, + raw_first_audio_activity_s, first_skew_ms, last_skew_ms, mean_skew_ms, diff --git a/client/src/sync_probe/analyze/report.rs b/client/src/sync_probe/analyze/report.rs index bee3edd..f024635 100644 --- a/client/src/sync_probe/analyze/report.rs +++ b/client/src/sync_probe/analyze/report.rs @@ -20,6 +20,8 @@ pub struct SyncAnalysisReport { pub audio_event_count: usize, pub paired_event_count: usize, pub activity_start_delta_ms: f64, + pub raw_first_video_activity_s: f64, + pub raw_first_audio_activity_s: f64, pub first_skew_ms: f64, pub last_skew_ms: f64, pub mean_skew_ms: f64, @@ -275,6 +277,8 @@ mod tests { audio_event_count: 4, paired_event_count: 4, activity_start_delta_ms: 0.0, + raw_first_video_activity_s: 0.0, + raw_first_audio_activity_s: 0.0, first_skew_ms: 20.0, last_skew_ms: 20.0, mean_skew_ms: 20.0, @@ -304,6 +308,8 @@ mod tests { audio_event_count: 12, paired_event_count: 12, activity_start_delta_ms: 0.0, + raw_first_video_activity_s: 0.0, + raw_first_audio_activity_s: 0.0, first_skew_ms: 10.0, last_skew_ms: 70.0, mean_skew_ms: 40.0, @@ -329,6 +335,8 @@ mod tests { audio_event_count: 14, paired_event_count: 12, activity_start_delta_ms: 0.0, + raw_first_video_activity_s: 0.0, + raw_first_audio_activity_s: 0.0, first_skew_ms: 28.0, last_skew_ms: 32.0, mean_skew_ms: 30.0, @@ -355,6 +363,8 @@ mod tests { audio_event_count: 14, paired_event_count: 12, activity_start_delta_ms: 0.0, + raw_first_video_activity_s: 0.0, + raw_first_audio_activity_s: 0.0, first_skew_ms: 3.0, last_skew_ms: 4.0, mean_skew_ms: 3.5, @@ -380,6 +390,8 @@ mod tests { audio_event_count: 5, paired_event_count: 5, activity_start_delta_ms: 0.0, + raw_first_video_activity_s: 0.0, + raw_first_audio_activity_s: 0.0, first_skew_ms: 10.0, last_skew_ms: 20.0, mean_skew_ms: 15.0, @@ -404,6 +416,8 @@ mod tests { audio_event_count: 5, paired_event_count: 5, activity_start_delta_ms: 0.0, + raw_first_video_activity_s: 0.0, + raw_first_audio_activity_s: 0.0, first_skew_ms: 8_000.0, last_skew_ms: 8_000.0, mean_skew_ms: 8_000.0, @@ -428,6 +442,8 @@ mod tests { audio_event_count: 20, paired_event_count: 20, activity_start_delta_ms: 20_000.0, + raw_first_video_activity_s: 0.0, + raw_first_audio_activity_s: 0.0, first_skew_ms: 0.0, last_skew_ms: 0.0, mean_skew_ms: 0.0, diff --git a/common/Cargo.toml b/common/Cargo.toml index 5ac381d..f84b75e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.16.16" +version = "0.16.17" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index b322450..8a49cb6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.16.16" +version = "0.16.17" edition = "2024" autobins = false diff --git a/server/src/audio/voice_input.rs b/server/src/audio/voice_input.rs index 62e3392..8ac4267 100644 --- a/server/src/audio/voice_input.rs +++ b/server/src/audio/voice_input.rs @@ -128,6 +128,45 @@ fn voice_sink_delay_queue_enabled(compensation_us: i64) -> bool { compensation_us > 0 } +fn voice_appsrc_max_buffers() -> u64 { + positive_voice_appsrc_limit_env("LESAVKA_UAC_APP_MAX_BUFFERS", 8) +} + +fn voice_appsrc_max_bytes() -> u64 { + positive_voice_appsrc_limit_env("LESAVKA_UAC_APP_MAX_BYTES", 32_768) +} + +fn voice_appsrc_max_time_ns() -> u64 { + positive_voice_appsrc_limit_env("LESAVKA_UAC_APP_MAX_TIME_NS", 80_000_000) +} + +fn positive_voice_appsrc_limit_env(name: &str, default: u64) -> u64 { + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .filter(|value| *value > 0) + .unwrap_or(default) +} + +#[cfg(not(coverage))] +fn configure_voice_appsrc(appsrc: &gst_app::AppSrc) { + use gst::prelude::*; + + appsrc.set_property("block", false); + if appsrc.has_property("max-buffers", None) { + appsrc.set_property("max-buffers", voice_appsrc_max_buffers()); + } + if appsrc.has_property("max-bytes", None) { + appsrc.set_property("max-bytes", voice_appsrc_max_bytes()); + } + if appsrc.has_property("max-time", None) { + appsrc.set_property("max-time", voice_appsrc_max_time_ns()); + } + if appsrc.has_property("leaky-type", None) { + appsrc.set_property_from_str("leaky-type", "downstream"); + } +} + impl Voice { #[cfg(coverage)] pub async fn new(_alsa_dev: &str) -> anyhow::Result { @@ -178,6 +217,7 @@ impl Voice { appsrc.set_caps(Some(&voice_input_caps())); appsrc.set_format(gst::Format::Time); appsrc.set_is_live(true); + configure_voice_appsrc(&appsrc); let convert = gst::ElementFactory::make("audioconvert") .build() @@ -328,7 +368,15 @@ impl Voice { ))); } - let _ = self.appsrc.push_buffer(buf); + if let Err(err) = self.appsrc.push_buffer(buf) { + tracing::warn!( + target: "lesavka_server::audio", + %err, + pts = pkt.pts, + bytes = pkt.data.len(), + "🎤⚠️ UAC appsrc rejected upstream microphone packet" + ); + } } pub fn finish(&mut self) { self.tap.flush(); @@ -349,6 +397,7 @@ mod voice_sink_timing_tests { use crate::camera::update_camera_config; use super::{voice_sink_buffer_time_us, voice_sink_latency_time_us}; use super::{default_voice_sink_compensation_us, voice_sink_compensation_us}; + use super::{voice_appsrc_max_buffers, voice_appsrc_max_bytes, voice_appsrc_max_time_ns}; #[test] fn voice_sink_timing_defaults_stay_live_call_friendly() { @@ -368,6 +417,42 @@ mod voice_sink_timing_tests { }); } + #[test] + fn voice_appsrc_limits_default_to_a_short_freshness_window() { + temp_env::with_var_unset("LESAVKA_UAC_APP_MAX_BUFFERS", || { + temp_env::with_var_unset("LESAVKA_UAC_APP_MAX_BYTES", || { + temp_env::with_var_unset("LESAVKA_UAC_APP_MAX_TIME_NS", || { + assert_eq!(voice_appsrc_max_buffers(), 8); + assert_eq!(voice_appsrc_max_bytes(), 32_768); + assert_eq!(voice_appsrc_max_time_ns(), 80_000_000); + }); + }); + }); + } + + #[test] + fn voice_appsrc_limits_accept_positive_overrides_only() { + temp_env::with_var("LESAVKA_UAC_APP_MAX_BUFFERS", Some("12"), || { + temp_env::with_var("LESAVKA_UAC_APP_MAX_BYTES", Some("65536"), || { + temp_env::with_var("LESAVKA_UAC_APP_MAX_TIME_NS", Some("10000000"), || { + assert_eq!(voice_appsrc_max_buffers(), 12); + assert_eq!(voice_appsrc_max_bytes(), 65_536); + assert_eq!(voice_appsrc_max_time_ns(), 10_000_000); + }); + }); + }); + + temp_env::with_var("LESAVKA_UAC_APP_MAX_BUFFERS", Some("0"), || { + temp_env::with_var("LESAVKA_UAC_APP_MAX_BYTES", Some("nope"), || { + temp_env::with_var("LESAVKA_UAC_APP_MAX_TIME_NS", Some("0"), || { + assert_eq!(voice_appsrc_max_buffers(), 8); + assert_eq!(voice_appsrc_max_bytes(), 32_768); + assert_eq!(voice_appsrc_max_time_ns(), 80_000_000); + }); + }); + }); + } + #[test] fn voice_sink_timing_env_accepts_positive_overrides_only() { temp_env::with_var("LESAVKA_CAM_OUTPUT", Some("uvc"), || { diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 8873069..1bd3751 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -164,7 +164,18 @@ impl Relay for Handler { }; if let Some(next_packet) = next_packet { match next_packet.transpose() { - Ok(Some(pkt)) => pending.push_back(pkt), + Ok(Some(pkt)) => { + pending.push_back(pkt); + let coalesced = retain_freshest_audio_packet(&mut pending); + if coalesced > 0 { + tracing::debug!( + rpc_id, + session_id = lease.session_id, + dropped = coalesced, + "🎤 coalesced stale upstream audio backlog down to the freshest chunk" + ); + } + } Ok(None) => inbound_closed = true, Err(err) => { cleanup.mark_aborted(); diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 28d059a..52b9421 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -21,6 +21,20 @@ fn retain_freshest_video_packet( dropped } +#[cfg(coverage)] +fn retain_freshest_audio_packet( + pending: &mut std::collections::VecDeque, +) -> usize { + if pending.len() <= 1 { + return 0; + } + let newest = pending.pop_back().expect("non-empty pending audio queue"); + let dropped = pending.len(); + pending.clear(); + pending.push_back(newest); + dropped +} + #[cfg(coverage)] #[tonic::async_trait] impl Relay for Handler { @@ -115,7 +129,10 @@ impl Relay for Handler { }; if let Some(next_packet) = next_packet { match next_packet.transpose()? { - Some(pkt) => pending.push_back(pkt), + Some(pkt) => { + pending.push_back(pkt); + let _ = retain_freshest_audio_packet(&mut pending); + } None => inbound_closed = true, } } diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index 95986ed..f18bfa3 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -1,8 +1,8 @@ #[cfg(all(test, not(coverage)))] #[allow(clippy::items_after_test_module)] mod tests { - use super::{UpstreamStreamCleanup, retain_freshest_video_packet}; - use lesavka_common::lesavka::VideoPacket; + use super::{UpstreamStreamCleanup, retain_freshest_audio_packet, retain_freshest_video_packet}; + use lesavka_common::lesavka::{AudioPacket, VideoPacket}; use lesavka_server::upstream_media_runtime::UpstreamMediaRuntime; use std::sync::Arc; @@ -30,6 +30,30 @@ mod tests { assert_eq!(pending.front().map(|pkt| pkt.pts), Some(300)); } + #[test] + fn retain_freshest_audio_packet_keeps_only_the_latest_chunk() { + let mut pending = std::collections::VecDeque::from(vec![ + AudioPacket { + pts: 100, + ..Default::default() + }, + AudioPacket { + pts: 200, + ..Default::default() + }, + AudioPacket { + pts: 300, + ..Default::default() + }, + ]); + + let dropped = retain_freshest_audio_packet(&mut pending); + + assert_eq!(dropped, 2); + assert_eq!(pending.len(), 1); + assert_eq!(pending.front().map(|pkt| pkt.pts), Some(300)); + } + #[test] fn upstream_cleanup_guard_closes_its_microphone_generation() { let runtime = Arc::new(UpstreamMediaRuntime::new()); diff --git a/server/src/main/relay_stream_lifecycle.rs b/server/src/main/relay_stream_lifecycle.rs index 5ce20d8..9293ad7 100644 --- a/server/src/main/relay_stream_lifecycle.rs +++ b/server/src/main/relay_stream_lifecycle.rs @@ -22,6 +22,21 @@ fn retain_freshest_video_packet( dropped } +#[cfg(not(coverage))] +/// Keeps only the newest microphone packet while startup pairing is healing. +fn retain_freshest_audio_packet( + pending: &mut std::collections::VecDeque, +) -> usize { + if pending.len() <= 1 { + return 0; + } + let newest = pending.pop_back().expect("non-empty pending audio queue"); + let dropped = pending.len(); + pending.clear(); + pending.push_back(newest); + dropped +} + #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] enum UpstreamStreamCleanupKind { diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 5fbce2f..1c8e168 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -14,7 +14,7 @@ mod types; use config::{ apply_playout_offset, upstream_camera_startup_grace_us, upstream_pairing_master_slack, upstream_playout_delay, upstream_playout_offset_us, upstream_reanchor_late_threshold, - upstream_reanchor_window_us, upstream_timing_trace_enabled, + upstream_reanchor_window_us, upstream_require_paired_startup, upstream_timing_trace_enabled, }; use state::UpstreamClockState; pub use types::{ @@ -262,6 +262,25 @@ impl UpstreamMediaRuntime { ); } return UpstreamPlanDecision::AwaitingPair; + } else if upstream_require_paired_startup() { + let refreshed = refresh_unpaired_pairing_anchor( + &mut state, + kind, + remote_pts_us, + now + playout_delay, + ); + if refreshed || upstream_timing_trace_enabled() { + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + refreshed_anchor = refreshed, + healing_window_ms = playout_delay.as_millis(), + "upstream media pairing window expired; holding one-sided stream for synced startup" + ); + } + return UpstreamPlanDecision::AwaitingPair; } else { let single_stream_base_remote_pts_us = match kind { UpstreamMediaKind::Camera => { @@ -382,6 +401,26 @@ impl UpstreamMediaRuntime { } } +fn refresh_unpaired_pairing_anchor( + state: &mut UpstreamClockState, + kind: UpstreamMediaKind, + remote_pts_us: u64, + next_deadline: Instant, +) -> bool { + state.pairing_anchor_deadline = Some(next_deadline); + match kind { + UpstreamMediaKind::Camera if state.first_microphone_remote_pts_us.is_none() => { + state.first_camera_remote_pts_us = Some(remote_pts_us); + true + } + UpstreamMediaKind::Microphone if state.first_camera_remote_pts_us.is_none() => { + state.first_microphone_remote_pts_us = Some(remote_pts_us); + true + } + _ => false, + } +} + impl Default for UpstreamMediaRuntime { fn default() -> Self { Self::new() diff --git a/server/src/upstream_media_runtime/config.rs b/server/src/upstream_media_runtime/config.rs index 011f22e..615eee6 100644 --- a/server/src/upstream_media_runtime/config.rs +++ b/server/src/upstream_media_runtime/config.rs @@ -25,6 +25,19 @@ pub(super) fn upstream_playout_delay() -> Duration { Duration::from_millis(delay_ms) } +pub(super) fn upstream_require_paired_startup() -> bool { + std::env::var("LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP") + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(true) +} + pub(super) fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 { let name = match kind { UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", diff --git a/server/src/upstream_media_runtime/tests/config.rs b/server/src/upstream_media_runtime/tests/config.rs index 0ebb544..9b6fddd 100644 --- a/server/src/upstream_media_runtime/tests/config.rs +++ b/server/src/upstream_media_runtime/tests/config.rs @@ -14,6 +14,28 @@ fn upstream_playout_delay_defaults_to_one_second_and_accepts_overrides() { }); } +#[test] +#[serial(upstream_media_runtime)] +fn upstream_requires_paired_startup_by_default_with_compatibility_override() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP", || { + assert!(super::upstream_require_paired_startup()); + }); + + for disabled in ["0", "false", "no", "off"] { + temp_env::with_var( + "LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP", + Some(disabled), + || { + assert!(!super::upstream_require_paired_startup()); + }, + ); + } + + temp_env::with_var("LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP", Some("1"), || { + assert!(super::upstream_require_paired_startup()); + }); +} + #[test] #[serial(upstream_media_runtime)] fn upstream_playout_offsets_default_to_mjpeg_calibration_and_accept_overrides() { diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/server/src/upstream_media_runtime/tests/planning.rs index ea45170..d22e896 100644 --- a/server/src/upstream_media_runtime/tests/planning.rs +++ b/server/src/upstream_media_runtime/tests/planning.rs @@ -38,19 +38,74 @@ fn shared_playout_epoch_is_reused_across_audio_and_video() { #[test] #[serial(upstream_media_runtime)] -fn pairing_window_can_expire_into_one_sided_playout() { +fn pairing_window_holds_one_sided_playout_by_default() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); - let first = play(runtime.plan_video_pts(1_000_000, 16_666)); - let second = play(runtime.plan_video_pts(1_016_666, 16_666)); - - assert_eq!(first.local_pts_us, 0); - assert_eq!(second.local_pts_us, 16_666); + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + assert!(matches!( + runtime.plan_video_pts(1_016_666, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); }); } +#[test] +#[serial(upstream_media_runtime)] +fn explicit_override_allows_one_sided_playout_for_compatibility() { + temp_env::with_var("LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP", Some("0"), || { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + + let first = play(runtime.plan_video_pts(1_000_000, 16_666)); + let second = play(runtime.plan_video_pts(1_016_666, 16_666)); + + assert_eq!(first.local_pts_us, 0); + assert_eq!(second.local_pts_us, 16_666); + }); + }); +} + +#[test] +#[serial(upstream_media_runtime)] +fn overdue_pairing_refreshes_waiting_anchor_before_late_counterpart_arrives() { + temp_env::with_var( + "LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS", + Some("0"), + || { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + assert!(matches!( + runtime.plan_video_pts(9_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + + let audio = play(runtime.plan_audio_pts(9_010_000)); + assert!(matches!( + runtime.plan_video_pts(9_000_000, 16_666), + super::UpstreamPlanDecision::DropBeforeOverlap + )); + let video = play(runtime.plan_video_pts(9_016_666, 16_666)); + + assert_eq!(audio.local_pts_us, 0); + assert_eq!(video.local_pts_us, 6_666); + }); + }, + ); +} + #[test] #[serial(upstream_media_runtime)] fn map_wrappers_hide_unpaired_and_pre_overlap_packets() {