2026-04-24 00:30:07 -03:00
#![ cfg_attr(not(test), allow(dead_code)) ]
//! Freshness-first bounded queue for live-call upstream media.
//!
//! The queue keeps the newest useful packets and drops stale or superseded ones
//! instead of preserving backlog indefinitely.
use std ::{
collections ::VecDeque ,
sync ::{ Arc , Mutex } ,
time ::{ Duration , Instant } ,
} ;
use tokio ::sync ::Notify ;
/// Queue admission/configuration for one live media stream.
#[ derive(Clone, Copy, Debug, Eq, PartialEq) ]
pub struct FreshQueueConfig {
/// Maximum number of packets retained while the transport catches up.
pub capacity : usize ,
/// Maximum packet age tolerated before the packet is considered stale.
pub max_age : Duration ,
2026-05-01 15:21:26 -03:00
/// Whether the consumer should drain old-but-fresh packets or skip straight
/// to the newest packet when backlog appears.
pub policy : FreshQueuePolicy ,
}
/// Queue delivery policy for live media.
#[ derive(Clone, Copy, Debug, Eq, PartialEq) ]
pub enum FreshQueuePolicy {
/// Preserve packet continuity while still enforcing max age.
DrainOldest ,
/// Drop superseded packets and deliver the newest fresh packet.
LatestOnly ,
2026-04-24 00:30:07 -03:00
}
/// Statistics returned after pushing one packet into the bounded queue.
#[ derive(Clone, Copy, Debug, Default, Eq, PartialEq) ]
pub struct QueuePushStats {
/// Queue depth after the packet was admitted.
pub queue_depth : usize ,
/// Number of older packets dropped because the queue was already full.
pub dropped_queue_full : u64 ,
}
/// Result of taking the next fresh packet from the queue.
#[ derive(Clone, Debug) ]
pub struct QueuePop < T > {
/// Fresh packet ready to send, if any.
pub packet : Option < T > ,
2026-05-01 15:21:26 -03:00
/// Queue depth after removing stale/superseded packets and the returned packet.
2026-04-24 00:30:07 -03:00
pub queue_depth : usize ,
2026-05-01 15:21:26 -03:00
/// Number of stale or superseded packets discarded before a fresh packet was found.
2026-04-24 00:30:07 -03:00
pub dropped_stale : u64 ,
/// Fresh packet age at the moment it left the queue.
pub delivery_age : Duration ,
/// Whether the queue was closed and drained.
pub closed : bool ,
}
#[ derive(Debug) ]
struct QueuedPacket < T > {
packet : T ,
queued_at : Instant ,
age_at_enqueue : Duration ,
}
#[ derive(Debug) ]
struct QueueState < T > {
queue : VecDeque < QueuedPacket < T > > ,
closed : bool ,
}
/// Shared bounded queue for the blocking capture thread and async gRPC stream.
#[ derive(Debug) ]
pub struct FreshPacketQueue < T > {
config : FreshQueueConfig ,
inner : Arc < Mutex < QueueState < T > > > ,
notify : Arc < Notify > ,
}
impl < T > Clone for FreshPacketQueue < T > {
fn clone ( & self ) -> Self {
Self {
config : self . config ,
inner : Arc ::clone ( & self . inner ) ,
notify : Arc ::clone ( & self . notify ) ,
}
}
}
impl < T > FreshPacketQueue < T > {
/// Creates a new bounded queue that favors fresh packets over backlog.
#[ must_use ]
pub fn new ( config : FreshQueueConfig ) -> Self {
assert! ( config . capacity > 0 , " fresh queue capacity must be non-zero " ) ;
assert! (
! config . max_age . is_zero ( ) ,
" fresh queue max age must be non-zero "
) ;
Self {
config ,
inner : Arc ::new ( Mutex ::new ( QueueState {
queue : VecDeque ::with_capacity ( config . capacity ) ,
closed : false ,
} ) ) ,
notify : Arc ::new ( Notify ::new ( ) ) ,
}
}
/// Pushes a new packet into the queue, dropping the oldest retained packet if full.
#[ must_use ]
pub fn push ( & self , packet : T , age_at_enqueue : Duration ) -> QueuePushStats {
let mut state = self . inner . lock ( ) . expect ( " fresh queue mutex poisoned " ) ;
if state . closed {
return QueuePushStats ::default ( ) ;
}
let mut dropped_queue_full = 0_ u64 ;
if state . queue . len ( ) = = self . config . capacity {
let _ = state . queue . pop_front ( ) ;
dropped_queue_full = 1 ;
}
state . queue . push_back ( QueuedPacket {
packet ,
queued_at : Instant ::now ( ) ,
age_at_enqueue ,
} ) ;
let queue_depth = state . queue . len ( ) ;
drop ( state ) ;
self . notify . notify_one ( ) ;
QueuePushStats {
queue_depth ,
dropped_queue_full ,
}
}
/// Closes the queue and wakes any waiting consumer.
pub fn close ( & self ) {
if let Ok ( mut state ) = self . inner . lock ( ) {
state . closed = true ;
}
self . notify . notify_waiters ( ) ;
}
/// Pops the next fresh packet, discarding any packets that have aged past the live budget.
pub async fn pop_fresh ( & self ) -> QueuePop < T > {
let mut dropped_stale = 0_ u64 ;
loop {
let wait_for_more = {
let mut state = self . inner . lock ( ) . expect ( " fresh queue mutex poisoned " ) ;
2026-05-01 15:21:26 -03:00
if self . config . policy = = FreshQueuePolicy ::LatestOnly {
while state . queue . len ( ) > 1 {
let _ = state . queue . pop_front ( ) ;
dropped_stale = dropped_stale . saturating_add ( 1 ) ;
}
}
2026-04-24 00:30:07 -03:00
while let Some ( front ) = state . queue . pop_front ( ) {
let delivery_age = front . age_at_enqueue + front . queued_at . elapsed ( ) ;
if delivery_age > self . config . max_age {
dropped_stale = dropped_stale . saturating_add ( 1 ) ;
continue ;
}
return QueuePop {
packet : Some ( front . packet ) ,
queue_depth : state . queue . len ( ) ,
dropped_stale ,
delivery_age ,
closed : false ,
} ;
}
if state . closed {
return QueuePop {
packet : None ,
queue_depth : 0 ,
dropped_stale ,
delivery_age : Duration ::ZERO ,
closed : true ,
} ;
}
self . notify . notified ( )
} ;
wait_for_more . await ;
}
}
}
#[ cfg(test) ]
mod tests {
use super ::* ;
#[ tokio::test ]
2026-05-06 05:50:59 -03:00
/// Keeps `push_drops_oldest_when_queue_is_full` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
/// Inputs are the typed parameters; output is the return value or side effect.
2026-04-24 00:30:07 -03:00
async fn push_drops_oldest_when_queue_is_full ( ) {
let queue = FreshPacketQueue ::new ( FreshQueueConfig {
capacity : 2 ,
max_age : Duration ::from_secs ( 1 ) ,
2026-05-01 15:21:26 -03:00
policy : FreshQueuePolicy ::DrainOldest ,
2026-04-24 00:30:07 -03:00
} ) ;
let first = queue . push ( 1_ u8 , Duration ::ZERO ) ;
let second = queue . push ( 2_ u8 , Duration ::ZERO ) ;
let third = queue . push ( 3_ u8 , Duration ::ZERO ) ;
assert_eq! ( first . dropped_queue_full , 0 ) ;
assert_eq! ( second . queue_depth , 2 ) ;
assert_eq! ( third . dropped_queue_full , 1 ) ;
let popped = queue . pop_fresh ( ) . await ;
assert_eq! ( popped . packet , Some ( 2 ) ) ;
let popped = queue . pop_fresh ( ) . await ;
assert_eq! ( popped . packet , Some ( 3 ) ) ;
}
#[ tokio::test ]
2026-05-06 05:50:59 -03:00
/// Keeps `pop_fresh_discards_stale_packets_before_returning_live_media` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
/// Inputs are the typed parameters; output is the return value or side effect.
2026-04-24 00:30:07 -03:00
async fn pop_fresh_discards_stale_packets_before_returning_live_media ( ) {
let queue = FreshPacketQueue ::new ( FreshQueueConfig {
capacity : 3 ,
max_age : Duration ::from_millis ( 60 ) ,
2026-05-01 15:21:26 -03:00
policy : FreshQueuePolicy ::DrainOldest ,
2026-04-24 00:30:07 -03:00
} ) ;
let _ = queue . push ( 1_ u8 , Duration ::from_millis ( 40 ) ) ;
let _ = queue . push ( 2_ u8 , Duration ::ZERO ) ;
tokio ::time ::sleep ( Duration ::from_millis ( 30 ) ) . await ;
let popped = queue . pop_fresh ( ) . await ;
assert_eq! ( popped . packet , Some ( 2 ) ) ;
assert_eq! ( popped . dropped_stale , 1 ) ;
assert! ( popped . delivery_age < = Duration ::from_millis ( 40 ) ) ;
}
#[ tokio::test ]
async fn pop_fresh_returns_closed_when_queue_is_drained ( ) {
let queue = FreshPacketQueue ::< u8 > ::new ( FreshQueueConfig {
capacity : 1 ,
max_age : Duration ::from_secs ( 1 ) ,
2026-05-01 15:21:26 -03:00
policy : FreshQueuePolicy ::DrainOldest ,
2026-04-24 00:30:07 -03:00
} ) ;
queue . close ( ) ;
let popped = queue . pop_fresh ( ) . await ;
assert! ( popped . closed ) ;
assert! ( popped . packet . is_none ( ) ) ;
}
#[ tokio::test ]
2026-05-06 05:50:59 -03:00
/// Keeps `clone_shares_the_same_underlying_queue` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
/// Inputs are the typed parameters; output is the return value or side effect.
2026-04-24 00:30:07 -03:00
async fn clone_shares_the_same_underlying_queue ( ) {
let queue = FreshPacketQueue ::new ( FreshQueueConfig {
capacity : 2 ,
max_age : Duration ::from_secs ( 1 ) ,
2026-05-01 15:21:26 -03:00
policy : FreshQueuePolicy ::DrainOldest ,
2026-04-24 00:30:07 -03:00
} ) ;
let cloned = queue . clone ( ) ;
let _ = cloned . push ( 7_ u8 , Duration ::ZERO ) ;
let popped = queue . pop_fresh ( ) . await ;
assert_eq! ( popped . packet , Some ( 7 ) ) ;
assert_eq! ( popped . queue_depth , 0 ) ;
}
#[ tokio::test ]
2026-05-06 05:50:59 -03:00
/// Keeps `push_returns_default_after_close` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
/// Inputs are the typed parameters; output is the return value or side effect.
2026-04-24 00:30:07 -03:00
async fn push_returns_default_after_close ( ) {
let queue = FreshPacketQueue ::new ( FreshQueueConfig {
capacity : 2 ,
max_age : Duration ::from_secs ( 1 ) ,
2026-05-01 15:21:26 -03:00
policy : FreshQueuePolicy ::DrainOldest ,
2026-04-24 00:30:07 -03:00
} ) ;
queue . close ( ) ;
let stats = queue . push ( 9_ u8 , Duration ::ZERO ) ;
let popped = queue . pop_fresh ( ) . await ;
assert_eq! ( stats , QueuePushStats ::default ( ) ) ;
assert! ( popped . closed ) ;
assert! ( popped . packet . is_none ( ) ) ;
}
#[ tokio::test ]
2026-05-06 05:50:59 -03:00
/// Keeps `pop_fresh_waits_for_a_future_packet` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
/// Inputs are the typed parameters; output is the return value or side effect.
2026-04-24 00:30:07 -03:00
async fn pop_fresh_waits_for_a_future_packet ( ) {
let queue = FreshPacketQueue ::new ( FreshQueueConfig {
capacity : 2 ,
max_age : Duration ::from_secs ( 1 ) ,
2026-05-01 15:21:26 -03:00
policy : FreshQueuePolicy ::DrainOldest ,
2026-04-24 00:30:07 -03:00
} ) ;
let waiter = queue . clone ( ) ;
let task = tokio ::spawn ( async move { waiter . pop_fresh ( ) . await } ) ;
tokio ::time ::sleep ( Duration ::from_millis ( 20 ) ) . await ;
let _ = queue . push ( 5_ u8 , Duration ::from_millis ( 10 ) ) ;
let popped = task . await . expect ( " waiter task " ) ;
assert_eq! ( popped . packet , Some ( 5 ) ) ;
assert! ( ! popped . closed ) ;
assert! ( popped . delivery_age > = Duration ::from_millis ( 10 ) ) ;
}
#[ tokio::test ]
2026-05-06 05:50:59 -03:00
/// Keeps `pop_fresh_waiter_wakes_when_the_queue_closes` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
/// Inputs are the typed parameters; output is the return value or side effect.
2026-04-24 00:30:07 -03:00
async fn pop_fresh_waiter_wakes_when_the_queue_closes ( ) {
let queue = FreshPacketQueue ::< u8 > ::new ( FreshQueueConfig {
capacity : 2 ,
max_age : Duration ::from_secs ( 1 ) ,
2026-05-01 15:21:26 -03:00
policy : FreshQueuePolicy ::DrainOldest ,
2026-04-24 00:30:07 -03:00
} ) ;
let waiter = queue . clone ( ) ;
let task = tokio ::spawn ( async move { waiter . pop_fresh ( ) . await } ) ;
tokio ::time ::sleep ( Duration ::from_millis ( 20 ) ) . await ;
queue . close ( ) ;
let popped = task . await . expect ( " waiter task " ) ;
assert! ( popped . closed ) ;
assert! ( popped . packet . is_none ( ) ) ;
}
2026-05-01 15:21:26 -03:00
#[ tokio::test ]
2026-05-06 05:50:59 -03:00
/// Keeps `latest_only_policy_returns_newest_packet_and_drops_superseded_backlog` explicit because it sits on this module contract, where hidden behavior would make regressions difficult to diagnose.
/// Inputs are the typed parameters; output is the return value or side effect.
2026-05-01 15:21:26 -03:00
async fn latest_only_policy_returns_newest_packet_and_drops_superseded_backlog ( ) {
let queue = FreshPacketQueue ::new ( FreshQueueConfig {
capacity : 4 ,
max_age : Duration ::from_secs ( 1 ) ,
policy : FreshQueuePolicy ::LatestOnly ,
} ) ;
let _ = queue . push ( 1_ u8 , Duration ::from_millis ( 250 ) ) ;
let _ = queue . push ( 2_ u8 , Duration ::from_millis ( 150 ) ) ;
let _ = queue . push ( 3_ u8 , Duration ::from_millis ( 20 ) ) ;
let popped = queue . pop_fresh ( ) . await ;
assert_eq! ( popped . packet , Some ( 3 ) ) ;
assert_eq! ( popped . dropped_stale , 2 ) ;
assert_eq! ( popped . queue_depth , 0 ) ;
assert! ( popped . delivery_age < Duration ::from_millis ( 100 ) ) ;
}
2026-04-24 00:30:07 -03:00
}