fix(sync): coalesce stale upstream video backlog

This commit is contained in:
Brad Stein 2026-04-26 14:10:35 -03:00
parent 38ead8c1e9
commit 65e6f84f8d
5 changed files with 77 additions and 5 deletions

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.14.3"
version = "0.14.4"
edition = "2024"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.14.3"
version = "0.14.4"
edition = "2024"
build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.14.3"
version = "0.14.4"
edition = "2024"
autobins = false

View File

@ -8,6 +8,20 @@ fn upstream_stale_drop_budget() -> Duration {
Duration::from_millis(drop_ms)
}
#[cfg(not(coverage))]
fn retain_freshest_video_packet(
pending: &mut std::collections::VecDeque<VideoPacket>,
) -> usize {
if pending.len() <= 1 {
return 0;
}
let newest = pending.pop_back().expect("non-empty pending video queue");
let dropped = pending.len();
pending.clear();
pending.push_back(newest);
dropped
}
#[cfg(not(coverage))]
#[tonic::async_trait]
impl Relay for Handler {
@ -279,7 +293,18 @@ impl Relay for Handler {
};
if let Some(next_packet) = next_packet {
match next_packet.transpose()? {
Some(pkt) => pending.push_back(pkt),
Some(pkt) => {
pending.push_back(pkt);
let coalesced = retain_freshest_video_packet(&mut pending);
if coalesced > 0 {
tracing::debug!(
rpc_id,
session_id,
dropped = coalesced,
"🎥 coalesced stale upstream video backlog down to the freshest frame"
);
}
}
None => inbound_closed = true,
}
}
@ -404,3 +429,33 @@ fn remote_audio_status(message: String) -> Status {
Status::internal(message)
}
}
#[cfg(test)]
mod tests {
use super::retain_freshest_video_packet;
use lesavka_common::lesavka::VideoPacket;
#[test]
fn retain_freshest_video_packet_keeps_only_the_latest_frame() {
let mut pending = std::collections::VecDeque::from(vec![
VideoPacket {
pts: 100,
..Default::default()
},
VideoPacket {
pts: 200,
..Default::default()
},
VideoPacket {
pts: 300,
..Default::default()
},
]);
let dropped = retain_freshest_video_packet(&mut pending);
assert_eq!(dropped, 2);
assert_eq!(pending.len(), 1);
assert_eq!(pending.front().map(|pkt| pkt.pts), Some(300));
}
}

View File

@ -7,6 +7,20 @@ fn upstream_stale_drop_budget() -> Duration {
Duration::from_millis(drop_ms)
}
#[cfg(coverage)]
fn retain_freshest_video_packet(
pending: &mut std::collections::VecDeque<VideoPacket>,
) -> usize {
if pending.len() <= 1 {
return 0;
}
let newest = pending.pop_back().expect("non-empty pending video queue");
let dropped = pending.len();
pending.clear();
pending.push_back(newest);
dropped
}
#[cfg(coverage)]
#[tonic::async_trait]
impl Relay for Handler {
@ -174,7 +188,10 @@ impl Relay for Handler {
};
if let Some(next_packet) = next_packet {
match next_packet.transpose()? {
Some(pkt) => pending.push_back(pkt),
Some(pkt) => {
pending.push_back(pkt);
let _ = retain_freshest_video_packet(&mut pending);
}
None => inbound_closed = true,
}
}