From b948994811b05c9f643663a9e2dd43421ab8319c Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 1 May 2026 15:21:26 -0300 Subject: [PATCH] sync: send newest camera uplink frames --- AGENTS.md | 3 ++ Cargo.lock | 6 +-- client/Cargo.toml | 2 +- client/src/app/uplink_media.rs | 2 + client/src/sync_probe/capture.rs | 2 + .../src/sync_probe/capture/coverage_stub.rs | 2 + client/src/uplink_fresh_queue.rs | 48 ++++++++++++++++++- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- .../tests/client_uplink_freshness_contract.rs | 11 +++++ 10 files changed, 72 insertions(+), 8 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index c651cfb..ca0e17e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -133,6 +133,7 @@ Context: the mirrored browser probe finally reproduced the real failure class on - [x] While pairing is overdue, keep replacing the waiting-side anchor with fresh packets instead of preserving stale startup anchors. - [x] While awaiting the peer stream, keep only fresh pending camera packets. - [x] While awaiting the peer stream, keep only fresh pending microphone packets. +- [x] Send the latest camera packet from the client uplink queue instead of draining old-but-not-yet-stale video backlog. - [x] Add tests proving the pairing window no longer expires into one-sided playout by default. - [x] Add tests proving the explicit one-sided override still works for intentional single-stream scenarios. @@ -171,5 +172,7 @@ Context: the mirrored browser probe finally reproduced the real failure class on - 0.16.20 installed the `+720ms` offset (`/etc/lesavka/server.env` had `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=720000`), but the mirrored browser capture contained no recognizable color pulses. Theia server logs showed repeated `upstream video frame dropped because the audio master never caught up inside the pairing window`; UVC was effectively starved by the positive audio delay instead of flowing delayed-but-fresh frames. - 0.16.21 makes that wait offset-aware and adds a regression test proving a configured positive audio delay does not freeze UVC video while UAC sleeps before playout. - Replaying the 0.16.21 artifact after 0.16.22 analyzer hardening changes the verdict from false `catastrophic_failure` to `gross_failure`: p95 `273.8 ms`, median `-188.4 ms`, 7 paired coded pulses. The raw activity-start delta (`-3620.7 ms`) is still printed, but it is ignored for verdict/calibration because it disagrees with coded pairs by `3432.3 ms`; unpaired video/audio onsets are printed for triage. + - 0.16.22 live mirrored run still failed with p95 `433.7 ms`, median `-359.4 ms`, and 5 paired coded pulses. Client telemetry showed camera uplink `latest_age_ms` repeatedly around `300-350 ms`, matching the measured skew; patch 0.16.23 to make video queues latest-only instead of draining stale-but-under-budget backlog. + - 0.16.23 local validation passed for fresh-queue behavior, uplink/probe freshness contracts, sync analyzer tests, client/server binary checks, and whitespace checks. - [ ] Re-run the mirrored browser probe after the pre-start false-positive fix. - [ ] Run Google Meet manual validation. diff --git a/Cargo.lock b/Cargo.lock index aace271..f25a2a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.16.22" +version = "0.16.23" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.16.22" +version = "0.16.23" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.16.22" +version = "0.16.23" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 21bd81d..be68e60 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.16.22" +version = "0.16.23" edition = "2024" [dependencies] diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 6f75c21..2a9b979 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -253,6 +253,7 @@ const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { capacity: 32, max_age: Duration::from_millis(350), + policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, }; #[cfg(not(coverage))] @@ -260,6 +261,7 @@ const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { capacity: 16, max_age: Duration::from_millis(400), + policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest, }; #[cfg(not(coverage))] diff --git a/client/src/sync_probe/capture.rs b/client/src/sync_probe/capture.rs index fd909cf..f2d1dc5 100644 --- a/client/src/sync_probe/capture.rs +++ b/client/src/sync_probe/capture.rs @@ -45,12 +45,14 @@ pub use runtime::SyncProbeCapture; const PROBE_VIDEO_QUEUE: FreshQueueConfig = FreshQueueConfig { capacity: 32, max_age: Duration::from_millis(350), + policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, }; #[cfg(any(not(coverage), test))] const PROBE_AUDIO_QUEUE: FreshQueueConfig = FreshQueueConfig { capacity: 32, max_age: Duration::from_millis(400), + policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest, }; #[cfg(any(not(coverage), test))] diff --git a/client/src/sync_probe/capture/coverage_stub.rs b/client/src/sync_probe/capture/coverage_stub.rs index 482ebdc..7fcc5ad 100644 --- a/client/src/sync_probe/capture/coverage_stub.rs +++ b/client/src/sync_probe/capture/coverage_stub.rs @@ -17,6 +17,7 @@ impl SyncProbeCapture { crate::uplink_fresh_queue::FreshQueueConfig { capacity: 1, max_age: std::time::Duration::from_millis(1), + policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, }, ) } @@ -28,6 +29,7 @@ impl SyncProbeCapture { crate::uplink_fresh_queue::FreshQueueConfig { capacity: 1, max_age: std::time::Duration::from_millis(1), + policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest, }, ) } diff --git a/client/src/uplink_fresh_queue.rs b/client/src/uplink_fresh_queue.rs index 30c1bb8..6c4c42e 100644 --- a/client/src/uplink_fresh_queue.rs +++ b/client/src/uplink_fresh_queue.rs @@ -18,6 +18,18 @@ pub struct FreshQueueConfig { pub capacity: usize, /// Maximum packet age tolerated before the packet is considered stale. pub max_age: Duration, + /// Whether the consumer should drain old-but-fresh packets or skip straight + /// to the newest packet when backlog appears. + pub policy: FreshQueuePolicy, +} + +/// Queue delivery policy for live media. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum FreshQueuePolicy { + /// Preserve packet continuity while still enforcing max age. + DrainOldest, + /// Drop superseded packets and deliver the newest fresh packet. + LatestOnly, } /// Statistics returned after pushing one packet into the bounded queue. @@ -34,9 +46,9 @@ pub struct QueuePushStats { pub struct QueuePop { /// Fresh packet ready to send, if any. pub packet: Option, - /// Queue depth after removing stale packets and the returned packet. + /// Queue depth after removing stale/superseded packets and the returned packet. pub queue_depth: usize, - /// Number of stale packets discarded before a fresh packet was found. + /// Number of stale or superseded packets discarded before a fresh packet was found. pub dropped_stale: u64, /// Fresh packet age at the moment it left the queue. pub delivery_age: Duration, @@ -135,6 +147,12 @@ impl FreshPacketQueue { loop { let wait_for_more = { let mut state = self.inner.lock().expect("fresh queue mutex poisoned"); + if self.config.policy == FreshQueuePolicy::LatestOnly { + while state.queue.len() > 1 { + let _ = state.queue.pop_front(); + dropped_stale = dropped_stale.saturating_add(1); + } + } while let Some(front) = state.queue.pop_front() { let delivery_age = front.age_at_enqueue + front.queued_at.elapsed(); if delivery_age > self.config.max_age { @@ -174,6 +192,7 @@ mod tests { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), + policy: FreshQueuePolicy::DrainOldest, }); let first = queue.push(1_u8, Duration::ZERO); @@ -195,6 +214,7 @@ mod tests { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 3, max_age: Duration::from_millis(60), + policy: FreshQueuePolicy::DrainOldest, }); let _ = queue.push(1_u8, Duration::from_millis(40)); @@ -212,6 +232,7 @@ mod tests { let queue = FreshPacketQueue::::new(FreshQueueConfig { capacity: 1, max_age: Duration::from_secs(1), + policy: FreshQueuePolicy::DrainOldest, }); queue.close(); @@ -225,6 +246,7 @@ mod tests { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), + policy: FreshQueuePolicy::DrainOldest, }); let cloned = queue.clone(); @@ -240,6 +262,7 @@ mod tests { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), + policy: FreshQueuePolicy::DrainOldest, }); queue.close(); @@ -256,6 +279,7 @@ mod tests { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), + policy: FreshQueuePolicy::DrainOldest, }); let waiter = queue.clone(); @@ -274,6 +298,7 @@ mod tests { let queue = FreshPacketQueue::::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), + policy: FreshQueuePolicy::DrainOldest, }); let waiter = queue.clone(); @@ -285,4 +310,23 @@ mod tests { assert!(popped.closed); assert!(popped.packet.is_none()); } + + #[tokio::test] + async fn latest_only_policy_returns_newest_packet_and_drops_superseded_backlog() { + let queue = FreshPacketQueue::new(FreshQueueConfig { + capacity: 4, + max_age: Duration::from_secs(1), + policy: FreshQueuePolicy::LatestOnly, + }); + + let _ = queue.push(1_u8, Duration::from_millis(250)); + let _ = queue.push(2_u8, Duration::from_millis(150)); + let _ = queue.push(3_u8, Duration::from_millis(20)); + + let popped = queue.pop_fresh().await; + assert_eq!(popped.packet, Some(3)); + assert_eq!(popped.dropped_stale, 2); + assert_eq!(popped.queue_depth, 0); + assert!(popped.delivery_age < Duration::from_millis(100)); + } } diff --git a/common/Cargo.toml b/common/Cargo.toml index c3f2ada..4e2fcea 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.16.22" +version = "0.16.23" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 95ca642..2a1bc20 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.16.22" +version = "0.16.23" edition = "2024" autobins = false diff --git a/testing/tests/client_uplink_freshness_contract.rs b/testing/tests/client_uplink_freshness_contract.rs index c48271f..a192eb4 100644 --- a/testing/tests/client_uplink_freshness_contract.rs +++ b/testing/tests/client_uplink_freshness_contract.rs @@ -55,6 +55,14 @@ fn parse_queue_max_age_ms(block: &str, queue_const: &str) -> u64 { panic!("missing max_age for {queue_const}"); } +fn assert_queue_policy(block: &str, queue_const: &str, policy: &str) { + let expected = format!("policy: crate::uplink_fresh_queue::FreshQueuePolicy::{policy}"); + assert!( + block.contains(&expected), + "{queue_const} should use {policy} policy to preserve the intended live-media behavior" + ); +} + #[test] fn camera_uplink_queue_freshness_budget_stays_within_lipsync_window() { let block = queue_block(UPLINK_MEDIA_SRC, "VIDEO_UPLINK_QUEUE"); @@ -63,6 +71,7 @@ fn camera_uplink_queue_freshness_budget_stays_within_lipsync_window() { max_age_ms <= 350, "VIDEO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 350ms to prevent ~1s video drift" ); + assert_queue_policy(block, "VIDEO_UPLINK_QUEUE", "LatestOnly"); } #[test] @@ -73,6 +82,7 @@ fn microphone_uplink_queue_freshness_budget_stays_within_live_audio_window() { max_age_ms <= 400, "AUDIO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 400ms for live calls" ); + assert_queue_policy(block, "AUDIO_UPLINK_QUEUE", "DrainOldest"); } #[test] @@ -93,4 +103,5 @@ fn sync_probe_video_queue_uses_same_freshness_budget() { max_age_ms <= 350, "PROBE_VIDEO_QUEUE max_age is {max_age_ms}ms; keep probe and runtime freshness policies aligned" ); + assert_queue_policy(block, "PROBE_VIDEO_QUEUE", "LatestOnly"); }