#![cfg_attr(not(test), allow(dead_code))] //! Synthetic upstream queue harness for live-call latency experiments. //! //! This models the relay child's bounded async queue under temporary downstream stalls. //! The goal is not to emulate GStreamer perfectly; it is to answer the operational //! question we care about: does the current queue policy preserve stale media instead //! of keeping the stream fresh? use std::{collections::VecDeque, time::Duration}; /// Queue policy to simulate under upstream backpressure. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UplinkQueuePolicy { /// Preserve every queued packet, blocking new admission until space returns. PreserveBacklog, /// Drop the oldest queued packet when full so the newest packet stays live. DropOldestWhenFull, } /// Deterministic synthetic queue configuration. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct UplinkHarnessConfig { /// Packet cadence at the capture side. pub capture_interval: Duration, /// Packet cadence at the consumer side. pub consume_interval: Duration, /// Number of packets admitted to the async queue before backpressure kicks in. pub queue_capacity: usize, /// Optional maximum packet age before delivery. Older queued packets are /// discarded to model freshness-first live media delivery. pub freshness_max_age: Option, /// Total packets the synthetic capture source will attempt to produce. pub total_packets: usize, /// Optional one-shot downstream stall start time. pub stall_after: Option, /// One-shot downstream stall duration. pub stall_duration: Duration, } /// Summary from one synthetic run. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct UplinkHarnessResult { /// Packets accepted into the async queue. pub enqueued_packets: usize, /// Packets consumed from the async queue. pub delivered_packets: usize, /// Packets dropped at queue admission time. pub dropped_packets: usize, /// Highest queue depth observed. pub max_queue_depth: usize, /// Oldest packet age at queue admission time. pub max_enqueue_age: Duration, /// Oldest packet age at playout time. pub max_delivery_age: Duration, /// Longest time a producer packet spent blocked before queue admission. pub max_block_time: Duration, } #[derive(Clone, Copy, Debug)] struct PendingPacket { captured_at: Duration, blocked_at: Option, } /// Runs the synthetic queue harness under the selected policy. #[must_use] pub fn run_uplink_harness( config: UplinkHarnessConfig, policy: UplinkQueuePolicy, ) -> UplinkHarnessResult { assert!(config.queue_capacity > 0, "queue capacity must be non-zero"); assert!( !config.capture_interval.is_zero(), "capture interval must be non-zero" ); assert!( !config.consume_interval.is_zero(), "consume interval must be non-zero" ); let mut result = UplinkHarnessResult::default(); let mut queue: VecDeque = VecDeque::with_capacity(config.queue_capacity); let mut produced = 0usize; let mut next_capture_at = Duration::ZERO; let mut next_consume_at = config.consume_interval; let mut blocked_packet = None::; loop { let capture_ready = (produced < config.total_packets && blocked_packet.is_none()) .then_some(next_capture_at); let consume_ready = ((!queue.is_empty() || blocked_packet.is_some()) && produced <= config.total_packets) .then_some(next_allowed_consume_at(next_consume_at, config)); let next_event = min_option_duration(capture_ready, consume_ready); let Some(event_time) = next_event else { break; }; let now = event_time; if consume_ready == Some(now) { if let Some(max_age) = config.freshness_max_age { while let Some(packet) = queue.front() { if now.saturating_sub(packet.captured_at) <= max_age { break; } let _ = queue.pop_front(); result.dropped_packets += 1; } } if let Some(packet) = queue.pop_front() { result.delivered_packets += 1; result.max_delivery_age = result.max_delivery_age.max(now - packet.captured_at); } next_consume_at = now + config.consume_interval; if let Some(packet) = blocked_packet.take() { let enqueue_age = now - packet.captured_at; result.max_enqueue_age = result.max_enqueue_age.max(enqueue_age); if let Some(blocked_at) = packet.blocked_at { result.max_block_time = result.max_block_time.max(now - blocked_at); } queue.push_back(PendingPacket { blocked_at: None, ..packet }); result.enqueued_packets += 1; result.max_queue_depth = result.max_queue_depth.max(queue.len()); next_capture_at = now + config.capture_interval; } continue; } if capture_ready == Some(now) { let packet = PendingPacket { captured_at: now, blocked_at: None, }; produced += 1; if queue.len() < config.queue_capacity { queue.push_back(packet); result.enqueued_packets += 1; result.max_queue_depth = result.max_queue_depth.max(queue.len()); next_capture_at = now + config.capture_interval; } else { match policy { UplinkQueuePolicy::PreserveBacklog => { blocked_packet = Some(PendingPacket { blocked_at: Some(now), ..packet }); } UplinkQueuePolicy::DropOldestWhenFull => { let _ = queue.pop_front(); result.dropped_packets += 1; queue.push_back(packet); result.enqueued_packets += 1; result.max_queue_depth = result.max_queue_depth.max(queue.len()); next_capture_at = now + config.capture_interval; } } } } } result } fn next_allowed_consume_at(next_consume_at: Duration, config: UplinkHarnessConfig) -> Duration { let Some(stall_after) = config.stall_after else { return next_consume_at; }; let stall_end = stall_after + config.stall_duration; if next_consume_at >= stall_after && next_consume_at < stall_end { stall_end } else { next_consume_at } } fn min_option_duration(left: Option, right: Option) -> Option { match (left, right) { (Some(left), Some(right)) => Some(left.min(right)), (Some(left), None) => Some(left), (None, Some(right)) => Some(right), (None, None) => None, } } #[cfg(test)] mod tests { use super::*; fn camera_stall_config() -> UplinkHarnessConfig { UplinkHarnessConfig { capture_interval: Duration::from_millis(33), consume_interval: Duration::from_millis(33), queue_capacity: 8, freshness_max_age: None, total_packets: 160, stall_after: Some(Duration::from_millis(800)), stall_duration: Duration::from_secs(2), } } fn microphone_stall_config() -> UplinkHarnessConfig { UplinkHarnessConfig { capture_interval: Duration::from_millis(20), consume_interval: Duration::from_millis(20), queue_capacity: 16, freshness_max_age: None, total_packets: 320, stall_after: Some(Duration::from_millis(600)), stall_duration: Duration::from_secs(2), } } #[test] fn preserve_backlog_camera_policy_accumulates_stale_video_after_a_stall() { let result = run_uplink_harness(camera_stall_config(), UplinkQueuePolicy::PreserveBacklog); assert_eq!(result.dropped_packets, 0); assert!(result.max_queue_depth >= 8); assert!( result.max_enqueue_age >= Duration::from_millis(1500), "expected stale video to build, got {:?}", result.max_enqueue_age ); assert!( result.max_delivery_age >= Duration::from_millis(1700), "expected stale playout to build, got {:?}", result.max_delivery_age ); assert!( result.max_block_time >= Duration::from_millis(1500), "expected capture-side blocking, got {:?}", result.max_block_time ); } #[test] fn preserve_backlog_microphone_policy_accumulates_stale_audio_after_a_stall() { let result = run_uplink_harness( microphone_stall_config(), UplinkQueuePolicy::PreserveBacklog, ); assert_eq!(result.dropped_packets, 0); assert!(result.max_queue_depth >= 16); assert!( result.max_enqueue_age >= Duration::from_millis(1500), "expected stale audio to build, got {:?}", result.max_enqueue_age ); assert!( result.max_delivery_age >= Duration::from_millis(1800), "expected stale audio playout, got {:?}", result.max_delivery_age ); } #[test] fn drop_oldest_policy_keeps_media_live_by_sacrificing_old_packets() { let camera = run_uplink_harness(camera_stall_config(), UplinkQueuePolicy::DropOldestWhenFull); let microphone = run_uplink_harness( microphone_stall_config(), UplinkQueuePolicy::DropOldestWhenFull, ); assert!(camera.dropped_packets > 0); assert!(microphone.dropped_packets > 0); assert_eq!(camera.max_enqueue_age, Duration::ZERO); assert_eq!(microphone.max_enqueue_age, Duration::ZERO); assert!( camera.max_delivery_age <= Duration::from_millis(350), "freshness-first video should stay bounded, got {:?}", camera.max_delivery_age ); assert!( microphone.max_delivery_age <= Duration::from_millis(400), "freshness-first audio should stay bounded, got {:?}", microphone.max_delivery_age ); } }