sync: send newest camera uplink frames

This commit is contained in:
Brad Stein 2026-05-01 15:21:26 -03:00
parent aeb85ca998
commit b948994811
10 changed files with 72 additions and 8 deletions

View File

@ -133,6 +133,7 @@ Context: the mirrored browser probe finally reproduced the real failure class on
- [x] While pairing is overdue, keep replacing the waiting-side anchor with fresh packets instead of preserving stale startup anchors. - [x] While pairing is overdue, keep replacing the waiting-side anchor with fresh packets instead of preserving stale startup anchors.
- [x] While awaiting the peer stream, keep only fresh pending camera packets. - [x] While awaiting the peer stream, keep only fresh pending camera packets.
- [x] While awaiting the peer stream, keep only fresh pending microphone packets. - [x] While awaiting the peer stream, keep only fresh pending microphone packets.
- [x] Send the latest camera packet from the client uplink queue instead of draining old-but-not-yet-stale video backlog.
- [x] Add tests proving the pairing window no longer expires into one-sided playout by default. - [x] Add tests proving the pairing window no longer expires into one-sided playout by default.
- [x] Add tests proving the explicit one-sided override still works for intentional single-stream scenarios. - [x] Add tests proving the explicit one-sided override still works for intentional single-stream scenarios.
@ -171,5 +172,7 @@ Context: the mirrored browser probe finally reproduced the real failure class on
- 0.16.20 installed the `+720ms` offset (`/etc/lesavka/server.env` had `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=720000`), but the mirrored browser capture contained no recognizable color pulses. Theia server logs showed repeated `upstream video frame dropped because the audio master never caught up inside the pairing window`; UVC was effectively starved by the positive audio delay instead of flowing delayed-but-fresh frames. - 0.16.20 installed the `+720ms` offset (`/etc/lesavka/server.env` had `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=720000`), but the mirrored browser capture contained no recognizable color pulses. Theia server logs showed repeated `upstream video frame dropped because the audio master never caught up inside the pairing window`; UVC was effectively starved by the positive audio delay instead of flowing delayed-but-fresh frames.
- 0.16.21 makes that wait offset-aware and adds a regression test proving a configured positive audio delay does not freeze UVC video while UAC sleeps before playout. - 0.16.21 makes that wait offset-aware and adds a regression test proving a configured positive audio delay does not freeze UVC video while UAC sleeps before playout.
- Replaying the 0.16.21 artifact after 0.16.22 analyzer hardening changes the verdict from false `catastrophic_failure` to `gross_failure`: p95 `273.8 ms`, median `-188.4 ms`, 7 paired coded pulses. The raw activity-start delta (`-3620.7 ms`) is still printed, but it is ignored for verdict/calibration because it disagrees with coded pairs by `3432.3 ms`; unpaired video/audio onsets are printed for triage. - Replaying the 0.16.21 artifact after 0.16.22 analyzer hardening changes the verdict from false `catastrophic_failure` to `gross_failure`: p95 `273.8 ms`, median `-188.4 ms`, 7 paired coded pulses. The raw activity-start delta (`-3620.7 ms`) is still printed, but it is ignored for verdict/calibration because it disagrees with coded pairs by `3432.3 ms`; unpaired video/audio onsets are printed for triage.
- 0.16.22 live mirrored run still failed with p95 `433.7 ms`, median `-359.4 ms`, and 5 paired coded pulses. Client telemetry showed camera uplink `latest_age_ms` repeatedly around `300-350 ms`, matching the measured skew; patch 0.16.23 to make video queues latest-only instead of draining stale-but-under-budget backlog.
- 0.16.23 local validation passed for fresh-queue behavior, uplink/probe freshness contracts, sync analyzer tests, client/server binary checks, and whitespace checks.
- [ ] Re-run the mirrored browser probe after the pre-start false-positive fix. - [ ] Re-run the mirrored browser probe after the pre-start false-positive fix.
- [ ] Run Google Meet manual validation. - [ ] Run Google Meet manual validation.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.16.22" version = "0.16.23"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.16.22" version = "0.16.23"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.16.22" version = "0.16.23"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.16.22" version = "0.16.23"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -253,6 +253,7 @@ const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig { crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 32, capacity: 32,
max_age: Duration::from_millis(350), max_age: Duration::from_millis(350),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly,
}; };
#[cfg(not(coverage))] #[cfg(not(coverage))]
@ -260,6 +261,7 @@ const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig { crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 16, capacity: 16,
max_age: Duration::from_millis(400), max_age: Duration::from_millis(400),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest,
}; };
#[cfg(not(coverage))] #[cfg(not(coverage))]

View File

@ -45,12 +45,14 @@ pub use runtime::SyncProbeCapture;
const PROBE_VIDEO_QUEUE: FreshQueueConfig = FreshQueueConfig { const PROBE_VIDEO_QUEUE: FreshQueueConfig = FreshQueueConfig {
capacity: 32, capacity: 32,
max_age: Duration::from_millis(350), max_age: Duration::from_millis(350),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly,
}; };
#[cfg(any(not(coverage), test))] #[cfg(any(not(coverage), test))]
const PROBE_AUDIO_QUEUE: FreshQueueConfig = FreshQueueConfig { const PROBE_AUDIO_QUEUE: FreshQueueConfig = FreshQueueConfig {
capacity: 32, capacity: 32,
max_age: Duration::from_millis(400), max_age: Duration::from_millis(400),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest,
}; };
#[cfg(any(not(coverage), test))] #[cfg(any(not(coverage), test))]

View File

@ -17,6 +17,7 @@ impl SyncProbeCapture {
crate::uplink_fresh_queue::FreshQueueConfig { crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 1, capacity: 1,
max_age: std::time::Duration::from_millis(1), max_age: std::time::Duration::from_millis(1),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly,
}, },
) )
} }
@ -28,6 +29,7 @@ impl SyncProbeCapture {
crate::uplink_fresh_queue::FreshQueueConfig { crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 1, capacity: 1,
max_age: std::time::Duration::from_millis(1), max_age: std::time::Duration::from_millis(1),
policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest,
}, },
) )
} }

View File

@ -18,6 +18,18 @@ pub struct FreshQueueConfig {
pub capacity: usize, pub capacity: usize,
/// Maximum packet age tolerated before the packet is considered stale. /// Maximum packet age tolerated before the packet is considered stale.
pub max_age: Duration, pub max_age: Duration,
/// Whether the consumer should drain old-but-fresh packets or skip straight
/// to the newest packet when backlog appears.
pub policy: FreshQueuePolicy,
}
/// Queue delivery policy for live media.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum FreshQueuePolicy {
/// Preserve packet continuity while still enforcing max age.
DrainOldest,
/// Drop superseded packets and deliver the newest fresh packet.
LatestOnly,
} }
/// Statistics returned after pushing one packet into the bounded queue. /// Statistics returned after pushing one packet into the bounded queue.
@ -34,9 +46,9 @@ pub struct QueuePushStats {
pub struct QueuePop<T> { pub struct QueuePop<T> {
/// Fresh packet ready to send, if any. /// Fresh packet ready to send, if any.
pub packet: Option<T>, pub packet: Option<T>,
/// Queue depth after removing stale packets and the returned packet. /// Queue depth after removing stale/superseded packets and the returned packet.
pub queue_depth: usize, pub queue_depth: usize,
/// Number of stale packets discarded before a fresh packet was found. /// Number of stale or superseded packets discarded before a fresh packet was found.
pub dropped_stale: u64, pub dropped_stale: u64,
/// Fresh packet age at the moment it left the queue. /// Fresh packet age at the moment it left the queue.
pub delivery_age: Duration, pub delivery_age: Duration,
@ -135,6 +147,12 @@ impl<T> FreshPacketQueue<T> {
loop { loop {
let wait_for_more = { let wait_for_more = {
let mut state = self.inner.lock().expect("fresh queue mutex poisoned"); let mut state = self.inner.lock().expect("fresh queue mutex poisoned");
if self.config.policy == FreshQueuePolicy::LatestOnly {
while state.queue.len() > 1 {
let _ = state.queue.pop_front();
dropped_stale = dropped_stale.saturating_add(1);
}
}
while let Some(front) = state.queue.pop_front() { while let Some(front) = state.queue.pop_front() {
let delivery_age = front.age_at_enqueue + front.queued_at.elapsed(); let delivery_age = front.age_at_enqueue + front.queued_at.elapsed();
if delivery_age > self.config.max_age { if delivery_age > self.config.max_age {
@ -174,6 +192,7 @@ mod tests {
let queue = FreshPacketQueue::new(FreshQueueConfig { let queue = FreshPacketQueue::new(FreshQueueConfig {
capacity: 2, capacity: 2,
max_age: Duration::from_secs(1), max_age: Duration::from_secs(1),
policy: FreshQueuePolicy::DrainOldest,
}); });
let first = queue.push(1_u8, Duration::ZERO); let first = queue.push(1_u8, Duration::ZERO);
@ -195,6 +214,7 @@ mod tests {
let queue = FreshPacketQueue::new(FreshQueueConfig { let queue = FreshPacketQueue::new(FreshQueueConfig {
capacity: 3, capacity: 3,
max_age: Duration::from_millis(60), max_age: Duration::from_millis(60),
policy: FreshQueuePolicy::DrainOldest,
}); });
let _ = queue.push(1_u8, Duration::from_millis(40)); let _ = queue.push(1_u8, Duration::from_millis(40));
@ -212,6 +232,7 @@ mod tests {
let queue = FreshPacketQueue::<u8>::new(FreshQueueConfig { let queue = FreshPacketQueue::<u8>::new(FreshQueueConfig {
capacity: 1, capacity: 1,
max_age: Duration::from_secs(1), max_age: Duration::from_secs(1),
policy: FreshQueuePolicy::DrainOldest,
}); });
queue.close(); queue.close();
@ -225,6 +246,7 @@ mod tests {
let queue = FreshPacketQueue::new(FreshQueueConfig { let queue = FreshPacketQueue::new(FreshQueueConfig {
capacity: 2, capacity: 2,
max_age: Duration::from_secs(1), max_age: Duration::from_secs(1),
policy: FreshQueuePolicy::DrainOldest,
}); });
let cloned = queue.clone(); let cloned = queue.clone();
@ -240,6 +262,7 @@ mod tests {
let queue = FreshPacketQueue::new(FreshQueueConfig { let queue = FreshPacketQueue::new(FreshQueueConfig {
capacity: 2, capacity: 2,
max_age: Duration::from_secs(1), max_age: Duration::from_secs(1),
policy: FreshQueuePolicy::DrainOldest,
}); });
queue.close(); queue.close();
@ -256,6 +279,7 @@ mod tests {
let queue = FreshPacketQueue::new(FreshQueueConfig { let queue = FreshPacketQueue::new(FreshQueueConfig {
capacity: 2, capacity: 2,
max_age: Duration::from_secs(1), max_age: Duration::from_secs(1),
policy: FreshQueuePolicy::DrainOldest,
}); });
let waiter = queue.clone(); let waiter = queue.clone();
@ -274,6 +298,7 @@ mod tests {
let queue = FreshPacketQueue::<u8>::new(FreshQueueConfig { let queue = FreshPacketQueue::<u8>::new(FreshQueueConfig {
capacity: 2, capacity: 2,
max_age: Duration::from_secs(1), max_age: Duration::from_secs(1),
policy: FreshQueuePolicy::DrainOldest,
}); });
let waiter = queue.clone(); let waiter = queue.clone();
@ -285,4 +310,23 @@ mod tests {
assert!(popped.closed); assert!(popped.closed);
assert!(popped.packet.is_none()); assert!(popped.packet.is_none());
} }
#[tokio::test]
async fn latest_only_policy_returns_newest_packet_and_drops_superseded_backlog() {
let queue = FreshPacketQueue::new(FreshQueueConfig {
capacity: 4,
max_age: Duration::from_secs(1),
policy: FreshQueuePolicy::LatestOnly,
});
let _ = queue.push(1_u8, Duration::from_millis(250));
let _ = queue.push(2_u8, Duration::from_millis(150));
let _ = queue.push(3_u8, Duration::from_millis(20));
let popped = queue.pop_fresh().await;
assert_eq!(popped.packet, Some(3));
assert_eq!(popped.dropped_stale, 2);
assert_eq!(popped.queue_depth, 0);
assert!(popped.delivery_age < Duration::from_millis(100));
}
} }

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.16.22" version = "0.16.23"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.16.22" version = "0.16.23"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -55,6 +55,14 @@ fn parse_queue_max_age_ms(block: &str, queue_const: &str) -> u64 {
panic!("missing max_age for {queue_const}"); panic!("missing max_age for {queue_const}");
} }
fn assert_queue_policy(block: &str, queue_const: &str, policy: &str) {
let expected = format!("policy: crate::uplink_fresh_queue::FreshQueuePolicy::{policy}");
assert!(
block.contains(&expected),
"{queue_const} should use {policy} policy to preserve the intended live-media behavior"
);
}
#[test] #[test]
fn camera_uplink_queue_freshness_budget_stays_within_lipsync_window() { fn camera_uplink_queue_freshness_budget_stays_within_lipsync_window() {
let block = queue_block(UPLINK_MEDIA_SRC, "VIDEO_UPLINK_QUEUE"); let block = queue_block(UPLINK_MEDIA_SRC, "VIDEO_UPLINK_QUEUE");
@ -63,6 +71,7 @@ fn camera_uplink_queue_freshness_budget_stays_within_lipsync_window() {
max_age_ms <= 350, max_age_ms <= 350,
"VIDEO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 350ms to prevent ~1s video drift" "VIDEO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 350ms to prevent ~1s video drift"
); );
assert_queue_policy(block, "VIDEO_UPLINK_QUEUE", "LatestOnly");
} }
#[test] #[test]
@ -73,6 +82,7 @@ fn microphone_uplink_queue_freshness_budget_stays_within_live_audio_window() {
max_age_ms <= 400, max_age_ms <= 400,
"AUDIO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 400ms for live calls" "AUDIO_UPLINK_QUEUE max_age is {max_age_ms}ms; keep it <= 400ms for live calls"
); );
assert_queue_policy(block, "AUDIO_UPLINK_QUEUE", "DrainOldest");
} }
#[test] #[test]
@ -93,4 +103,5 @@ fn sync_probe_video_queue_uses_same_freshness_budget() {
max_age_ms <= 350, max_age_ms <= 350,
"PROBE_VIDEO_QUEUE max_age is {max_age_ms}ms; keep probe and runtime freshness policies aligned" "PROBE_VIDEO_QUEUE max_age is {max_age_ms}ms; keep probe and runtime freshness policies aligned"
); );
assert_queue_policy(block, "PROBE_VIDEO_QUEUE", "LatestOnly");
} }