diff --git a/client/Cargo.toml b/client/Cargo.toml index d179765..73a6376 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.3" +version = "0.14.4" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 5a8e6eb..74208e6 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.3" +version = "0.14.4" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 5adff37..25c1172 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.3" +version = "0.14.4" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 70260fc..de39195 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -8,6 +8,20 @@ fn upstream_stale_drop_budget() -> Duration { Duration::from_millis(drop_ms) } +#[cfg(not(coverage))] +fn retain_freshest_video_packet( + pending: &mut std::collections::VecDeque, +) -> usize { + if pending.len() <= 1 { + return 0; + } + let newest = pending.pop_back().expect("non-empty pending video queue"); + let dropped = pending.len(); + pending.clear(); + pending.push_back(newest); + dropped +} + #[cfg(not(coverage))] #[tonic::async_trait] impl Relay for Handler { @@ -279,7 +293,18 @@ impl Relay for Handler { }; if let Some(next_packet) = next_packet { match next_packet.transpose()? { - Some(pkt) => pending.push_back(pkt), + Some(pkt) => { + pending.push_back(pkt); + let coalesced = retain_freshest_video_packet(&mut pending); + if coalesced > 0 { + tracing::debug!( + rpc_id, + session_id, + dropped = coalesced, + "🎥 coalesced stale upstream video backlog down to the freshest frame" + ); + } + } None => inbound_closed = true, } } @@ -404,3 +429,33 @@ fn remote_audio_status(message: String) -> Status { Status::internal(message) } } + +#[cfg(test)] +mod tests { + use super::retain_freshest_video_packet; + use lesavka_common::lesavka::VideoPacket; + + #[test] + fn retain_freshest_video_packet_keeps_only_the_latest_frame() { + let mut pending = std::collections::VecDeque::from(vec![ + VideoPacket { + pts: 100, + ..Default::default() + }, + VideoPacket { + pts: 200, + ..Default::default() + }, + VideoPacket { + pts: 300, + ..Default::default() + }, + ]); + + let dropped = retain_freshest_video_packet(&mut pending); + + assert_eq!(dropped, 2); + assert_eq!(pending.len(), 1); + assert_eq!(pending.front().map(|pkt| pkt.pts), Some(300)); + } +} diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 49a127e..15a6fea 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -7,6 +7,20 @@ fn upstream_stale_drop_budget() -> Duration { Duration::from_millis(drop_ms) } +#[cfg(coverage)] +fn retain_freshest_video_packet( + pending: &mut std::collections::VecDeque, +) -> usize { + if pending.len() <= 1 { + return 0; + } + let newest = pending.pop_back().expect("non-empty pending video queue"); + let dropped = pending.len(); + pending.clear(); + pending.push_back(newest); + dropped +} + #[cfg(coverage)] #[tonic::async_trait] impl Relay for Handler { @@ -174,7 +188,10 @@ impl Relay for Handler { }; if let Some(next_packet) = next_packet { match next_packet.transpose()? { - Some(pkt) => pending.push_back(pkt), + Some(pkt) => { + pending.push_back(pkt); + let _ = retain_freshest_video_packet(&mut pending); + } None => inbound_closed = true, } }