#![cfg_attr(not(test), allow(dead_code))] //! Freshness-first bounded queue for live-call upstream media. //! //! The queue keeps the newest useful packets and drops stale or superseded ones //! instead of preserving backlog indefinitely. use std::{ collections::VecDeque, sync::{Arc, Mutex}, time::{Duration, Instant}, }; use tokio::sync::Notify; /// Queue admission/configuration for one live media stream. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct FreshQueueConfig { /// Maximum number of packets retained while the transport catches up. pub capacity: usize, /// Maximum packet age tolerated before the packet is considered stale. pub max_age: Duration, } /// Statistics returned after pushing one packet into the bounded queue. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct QueuePushStats { /// Queue depth after the packet was admitted. pub queue_depth: usize, /// Number of older packets dropped because the queue was already full. pub dropped_queue_full: u64, } /// Result of taking the next fresh packet from the queue. #[derive(Clone, Debug)] pub struct QueuePop { /// Fresh packet ready to send, if any. pub packet: Option, /// Queue depth after removing stale packets and the returned packet. pub queue_depth: usize, /// Number of stale packets discarded before a fresh packet was found. pub dropped_stale: u64, /// Fresh packet age at the moment it left the queue. pub delivery_age: Duration, /// Whether the queue was closed and drained. pub closed: bool, } #[derive(Debug)] struct QueuedPacket { packet: T, queued_at: Instant, age_at_enqueue: Duration, } #[derive(Debug)] struct QueueState { queue: VecDeque>, closed: bool, } /// Shared bounded queue for the blocking capture thread and async gRPC stream. #[derive(Debug)] pub struct FreshPacketQueue { config: FreshQueueConfig, inner: Arc>>, notify: Arc, } impl Clone for FreshPacketQueue { fn clone(&self) -> Self { Self { config: self.config, inner: Arc::clone(&self.inner), notify: Arc::clone(&self.notify), } } } impl FreshPacketQueue { /// Creates a new bounded queue that favors fresh packets over backlog. #[must_use] pub fn new(config: FreshQueueConfig) -> Self { assert!(config.capacity > 0, "fresh queue capacity must be non-zero"); assert!( !config.max_age.is_zero(), "fresh queue max age must be non-zero" ); Self { config, inner: Arc::new(Mutex::new(QueueState { queue: VecDeque::with_capacity(config.capacity), closed: false, })), notify: Arc::new(Notify::new()), } } /// Pushes a new packet into the queue, dropping the oldest retained packet if full. #[must_use] pub fn push(&self, packet: T, age_at_enqueue: Duration) -> QueuePushStats { let mut state = self.inner.lock().expect("fresh queue mutex poisoned"); if state.closed { return QueuePushStats::default(); } let mut dropped_queue_full = 0_u64; if state.queue.len() == self.config.capacity { let _ = state.queue.pop_front(); dropped_queue_full = 1; } state.queue.push_back(QueuedPacket { packet, queued_at: Instant::now(), age_at_enqueue, }); let queue_depth = state.queue.len(); drop(state); self.notify.notify_one(); QueuePushStats { queue_depth, dropped_queue_full, } } /// Closes the queue and wakes any waiting consumer. pub fn close(&self) { if let Ok(mut state) = self.inner.lock() { state.closed = true; } self.notify.notify_waiters(); } /// Pops the next fresh packet, discarding any packets that have aged past the live budget. pub async fn pop_fresh(&self) -> QueuePop { let mut dropped_stale = 0_u64; loop { let wait_for_more = { let mut state = self.inner.lock().expect("fresh queue mutex poisoned"); while let Some(front) = state.queue.pop_front() { let delivery_age = front.age_at_enqueue + front.queued_at.elapsed(); if delivery_age > self.config.max_age { dropped_stale = dropped_stale.saturating_add(1); continue; } return QueuePop { packet: Some(front.packet), queue_depth: state.queue.len(), dropped_stale, delivery_age, closed: false, }; } if state.closed { return QueuePop { packet: None, queue_depth: 0, dropped_stale, delivery_age: Duration::ZERO, closed: true, }; } self.notify.notified() }; wait_for_more.await; } } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn push_drops_oldest_when_queue_is_full() { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), }); let first = queue.push(1_u8, Duration::ZERO); let second = queue.push(2_u8, Duration::ZERO); let third = queue.push(3_u8, Duration::ZERO); assert_eq!(first.dropped_queue_full, 0); assert_eq!(second.queue_depth, 2); assert_eq!(third.dropped_queue_full, 1); let popped = queue.pop_fresh().await; assert_eq!(popped.packet, Some(2)); let popped = queue.pop_fresh().await; assert_eq!(popped.packet, Some(3)); } #[tokio::test] async fn pop_fresh_discards_stale_packets_before_returning_live_media() { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 3, max_age: Duration::from_millis(60), }); let _ = queue.push(1_u8, Duration::from_millis(40)); let _ = queue.push(2_u8, Duration::ZERO); tokio::time::sleep(Duration::from_millis(30)).await; let popped = queue.pop_fresh().await; assert_eq!(popped.packet, Some(2)); assert_eq!(popped.dropped_stale, 1); assert!(popped.delivery_age <= Duration::from_millis(40)); } #[tokio::test] async fn pop_fresh_returns_closed_when_queue_is_drained() { let queue = FreshPacketQueue::::new(FreshQueueConfig { capacity: 1, max_age: Duration::from_secs(1), }); queue.close(); let popped = queue.pop_fresh().await; assert!(popped.closed); assert!(popped.packet.is_none()); } #[tokio::test] async fn clone_shares_the_same_underlying_queue() { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), }); let cloned = queue.clone(); let _ = cloned.push(7_u8, Duration::ZERO); let popped = queue.pop_fresh().await; assert_eq!(popped.packet, Some(7)); assert_eq!(popped.queue_depth, 0); } #[tokio::test] async fn push_returns_default_after_close() { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), }); queue.close(); let stats = queue.push(9_u8, Duration::ZERO); let popped = queue.pop_fresh().await; assert_eq!(stats, QueuePushStats::default()); assert!(popped.closed); assert!(popped.packet.is_none()); } #[tokio::test] async fn pop_fresh_waits_for_a_future_packet() { let queue = FreshPacketQueue::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), }); let waiter = queue.clone(); let task = tokio::spawn(async move { waiter.pop_fresh().await }); tokio::time::sleep(Duration::from_millis(20)).await; let _ = queue.push(5_u8, Duration::from_millis(10)); let popped = task.await.expect("waiter task"); assert_eq!(popped.packet, Some(5)); assert!(!popped.closed); assert!(popped.delivery_age >= Duration::from_millis(10)); } #[tokio::test] async fn pop_fresh_waiter_wakes_when_the_queue_closes() { let queue = FreshPacketQueue::::new(FreshQueueConfig { capacity: 2, max_age: Duration::from_secs(1), }); let waiter = queue.clone(); let task = tokio::spawn(async move { waiter.pop_fresh().await }); tokio::time::sleep(Duration::from_millis(20)).await; queue.close(); let popped = task.await.expect("waiter task"); assert!(popped.closed); assert!(popped.packet.is_none()); } }