use super::{PlannedUpstreamPacket, UpstreamMediaKind, UpstreamMediaRuntime}; use std::sync::Arc; use std::time::Duration; fn play(decision: super::UpstreamPlanDecision) -> PlannedUpstreamPacket { match decision { super::UpstreamPlanDecision::Play(plan) => plan, other => panic!("expected playable packet, got {other:?}"), } } #[test] fn first_stream_starts_a_new_shared_session() { let runtime = UpstreamMediaRuntime::new(); let camera = runtime.activate_camera(); let microphone = runtime.activate_microphone(); assert_eq!(camera.session_id, 1); assert_eq!(microphone.session_id, 1); assert!(runtime.is_camera_active(camera.generation)); assert!(runtime.is_microphone_active(microphone.generation)); } #[test] fn replacing_one_kind_keeps_the_session_but_preempts_the_old_owner() { let runtime = UpstreamMediaRuntime::new(); let first = runtime.activate_microphone(); let second = runtime.activate_microphone(); assert_eq!(first.session_id, second.session_id); assert!(!runtime.is_microphone_active(first.generation)); assert!(runtime.is_microphone_active(second.generation)); } #[test] fn closing_the_last_stream_resets_the_next_session_anchor() { let runtime = UpstreamMediaRuntime::new(); let camera = runtime.activate_camera(); let microphone = runtime.activate_microphone(); runtime.close_camera(camera.generation); runtime.close_microphone(microphone.generation); let next = runtime.activate_camera(); assert_eq!(next.session_id, 2); } #[test] fn first_packets_wait_for_the_counterpart_before_pairing() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let audio_first = play(runtime.plan_audio_pts(1_000_000)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); assert_eq!(audio_first.local_pts_us, 0); assert_eq!(video_first.local_pts_us, 0); assert_eq!(audio_first.due_at, video_first.due_at); } #[test] fn overlap_waits_for_camera_startup_grace_before_establishing_the_shared_base() { temp_env::with_var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS", Some("250"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); assert!(matches!( runtime.plan_audio_pts(1_000_000), super::UpstreamPlanDecision::AwaitingPair )); assert!(matches!( runtime.plan_video_pts(1_200_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let video_ready = play(runtime.plan_video_pts(1_250_000, 16_666)); let audio_ready = play(runtime.plan_audio_pts(1_260_000)); assert_eq!(video_ready.local_pts_us, 0); assert_eq!(audio_ready.local_pts_us, 10_000); }); } #[test] fn pairing_window_does_not_expire_into_one_sided_playout_while_camera_warms_up() { temp_env::with_var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS", Some("250"), || { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); assert!(matches!( runtime.plan_audio_pts(1_000_000), super::UpstreamPlanDecision::AwaitingPair )); std::thread::sleep(Duration::from_millis(30)); assert!(matches!( runtime.plan_audio_pts(1_010_000), super::UpstreamPlanDecision::AwaitingPair )); let video_ready = play(runtime.plan_video_pts(1_250_000, 16_666)); let audio_ready = play(runtime.plan_audio_pts(1_260_000)); assert_eq!(video_ready.local_pts_us, 0); assert_eq!(audio_ready.local_pts_us, 10_000); }); }); } #[test] fn overlap_pairing_drops_leading_packets_before_the_shared_base() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_audio_pts(1_000_000), super::UpstreamPlanDecision::AwaitingPair )); let video_first = play(runtime.plan_video_pts(1_300_000, 16_666)); assert_eq!(video_first.local_pts_us, 0); assert!(matches!( runtime.plan_audio_pts(1_000_000), super::UpstreamPlanDecision::DropBeforeOverlap )); let audio_next = play(runtime.plan_audio_pts(1_310_000)); let video_next = play(runtime.plan_video_pts(1_333_333, 16_666)); assert_eq!(audio_next.local_pts_us, 10_000); assert_eq!(video_next.local_pts_us, 33_333); } #[test] fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(50_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio = play(runtime.plan_audio_pts(50_000)); let first = play(runtime.plan_video_pts(50_000, 16_666)); let repeated = play(runtime.plan_video_pts(50_000, 16_666)); assert_eq!(first.local_pts_us, 0); assert_eq!(repeated.local_pts_us, 16_666); } #[test] fn close_ignores_superseded_generation_values() { let runtime = UpstreamMediaRuntime::new(); let first = runtime.activate_camera(); let second = runtime.activate_camera(); runtime.close_camera(first.generation); assert!(runtime.is_camera_active(second.generation)); runtime.close(super::UpstreamMediaKind::Camera, second.generation); let next = runtime.activate_camera(); assert_eq!(next.session_id, 2); } #[test] fn upstream_playout_delay_defaults_to_one_second_and_accepts_overrides() { temp_env::with_var_unset("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", || { assert_eq!(super::upstream_playout_delay(), Duration::from_secs(1)); }); temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("250"), || { assert_eq!(super::upstream_playout_delay(), Duration::from_millis(250)); }); } #[test] fn upstream_playout_offsets_default_to_zero_and_accept_overrides() { temp_env::with_var_unset("LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", || { temp_env::with_var_unset("LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", || { assert_eq!( super::upstream_playout_offset_us(UpstreamMediaKind::Microphone), 0 ); assert_eq!( super::upstream_playout_offset_us(UpstreamMediaKind::Camera), 0 ); }); }); temp_env::with_var( "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", Some("-20000"), || { temp_env::with_var( "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", Some("35000"), || { assert_eq!( super::upstream_playout_offset_us(UpstreamMediaKind::Microphone), -20_000 ); assert_eq!( super::upstream_playout_offset_us(UpstreamMediaKind::Camera), 35_000 ); }, ); }, ); } #[test] fn upstream_pairing_master_slack_defaults_to_twenty_ms_and_accepts_overrides() { temp_env::with_var_unset("LESAVKA_UPSTREAM_PAIR_SLACK_US", || { assert_eq!( super::upstream_pairing_master_slack(), Duration::from_micros(20_000) ); }); temp_env::with_var("LESAVKA_UPSTREAM_PAIR_SLACK_US", Some("5000"), || { assert_eq!( super::upstream_pairing_master_slack(), Duration::from_micros(5_000) ); }); } #[test] fn upstream_reanchor_late_threshold_defaults_to_half_the_buffer_and_accepts_overrides() { temp_env::with_var_unset("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", || { assert_eq!( super::upstream_reanchor_late_threshold(Duration::from_secs(1)), Duration::from_millis(500) ); assert_eq!( super::upstream_reanchor_late_threshold(Duration::from_millis(100)), Duration::from_millis(250) ); }); temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("42"), || { assert_eq!( super::upstream_reanchor_late_threshold(Duration::from_secs(1)), Duration::from_millis(42) ); }); } #[test] fn upstream_timing_trace_flag_accepts_false_values() { temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("off"), || { assert!(!super::upstream_timing_trace_enabled()); }); temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("false"), || { assert!(!super::upstream_timing_trace_enabled()); }); temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { assert!(super::upstream_timing_trace_enabled()); }); } #[test] fn apply_playout_offset_supports_negative_offsets() { let base = tokio::time::Instant::now() + Duration::from_millis(50); let shifted = super::apply_playout_offset(base, -20_000); let delta = base.saturating_duration_since(shifted); assert_eq!(delta, Duration::from_micros(20_000)); } #[test] fn apply_playout_offset_supports_positive_offsets() { let base = tokio::time::Instant::now(); let shifted = super::apply_playout_offset(base, 30_000); let delta = shifted.saturating_duration_since(base); assert_eq!(delta, Duration::from_micros(30_000)); } #[test] fn shared_playout_epoch_is_reused_across_audio_and_video() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let audio_first = play(runtime.plan_audio_pts(1_000_000)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let audio_next = play(runtime.plan_audio_pts(1_010_000)); assert_eq!(video_first.local_pts_us, 0); assert_eq!(audio_first.local_pts_us, 0); assert_eq!(video_first.due_at, audio_first.due_at); assert_eq!( audio_next .due_at .saturating_duration_since(audio_first.due_at), Duration::from_micros(10_000) ); } #[test] fn pairing_window_can_expire_into_one_sided_playout() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let first = play(runtime.plan_video_pts(1_000_000, 16_666)); let second = play(runtime.plan_video_pts(1_016_666, 16_666)); assert_eq!(first.local_pts_us, 0); assert_eq!(second.local_pts_us, 16_666); }); } #[test] fn map_wrappers_hide_unpaired_and_pre_overlap_packets() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert_eq!(runtime.map_video_pts(1_000_000, 16_666), None); assert_eq!(runtime.map_audio_pts(1_000_000), Some(0)); assert_eq!(runtime.map_audio_pts(999_999), None); } #[test] fn shared_playout_trace_path_keeps_planned_pts_stable() { temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let audio = play(runtime.plan_audio_pts(1_000_000)); let video = play(runtime.plan_video_pts(1_000_000, 16_666)); assert_eq!(video.local_pts_us, 0); assert_eq!(audio.local_pts_us, 0); }); } #[test] fn catastrophic_lateness_reanchors_the_shared_playout_epoch() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio_first = play(runtime.plan_audio_pts(1_000_000)); let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); std::thread::sleep(Duration::from_millis(30)); let recovered_audio = play(runtime.plan_audio_pts(1_000_000)); assert!( recovered_audio.due_at > tokio::time::Instant::now(), "recovered packet should be scheduled back into the future" ); assert!( recovered_audio.late_by <= Duration::from_millis(1), "recovered packet should no longer be catastrophically late" ); let recovered_video = play(runtime.plan_video_pts(1_016_666, 16_666)); assert!( recovered_video.due_at > tokio::time::Instant::now(), "shared epoch recovery should also move video back into the future" ); }); }); } #[test] fn overlap_anchor_gets_a_fresh_playout_budget_when_pairing_finishes_late() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); std::thread::sleep(Duration::from_millis(15)); let before_pair = tokio::time::Instant::now(); let audio_first = play(runtime.plan_audio_pts(1_000_000)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); assert!( audio_first.due_at.saturating_duration_since(before_pair) >= Duration::from_millis(15), "audio should keep most of the configured playout budget after late pairing" ); assert!( video_first.due_at.saturating_duration_since(before_pair) >= Duration::from_millis(15), "video should keep most of the configured playout budget after late pairing" ); }); } #[test] fn catastrophic_lateness_reanchors_only_once_per_session() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio_first = play(runtime.plan_audio_pts(1_000_000)); let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); std::thread::sleep(Duration::from_millis(30)); let first_recovered = play(runtime.plan_audio_pts(1_000_000)); assert!(first_recovered.due_at > tokio::time::Instant::now()); std::thread::sleep(Duration::from_millis(30)); let second_late = play(runtime.plan_audio_pts(1_000_001)); assert!( second_late.late_by > Duration::from_millis(5), "session should not keep extending itself with repeated reanchors" ); }); }); } #[test] fn catastrophic_lateness_does_not_reanchor_once_the_session_is_well_past_startup() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio_first = play(runtime.plan_audio_pts(1_000_000)); let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); std::thread::sleep(Duration::from_millis(130)); let late_audio = play(runtime.plan_audio_pts(1_100_000)); assert_eq!(late_audio.local_pts_us, 100_000); assert!( late_audio.late_by > Duration::from_millis(5), "late packet should remain late instead of reanchoring the shared epoch mid-session" ); assert!( late_audio.due_at <= tokio::time::Instant::now(), "mid-session lateness should no longer push due_at back into the future" ); }); }); } #[tokio::test(flavor = "current_thread")] async fn wait_for_audio_master_releases_video_once_audio_catches_up() { let runtime = Arc::new(UpstreamMediaRuntime::new()); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio_first = play(runtime.plan_audio_pts(1_000_000)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let waiter = tokio::spawn({ let runtime = runtime.clone(); async move { runtime .wait_for_audio_master(video_first.local_pts_us + 10_000, video_first.due_at) .await } }); tokio::time::sleep(Duration::from_millis(5)).await; let _audio_next = play(runtime.plan_audio_pts(1_010_000)); assert!(waiter.await.expect("audio master waiter should finish")); } #[tokio::test(flavor = "current_thread")] async fn wait_for_audio_master_times_out_when_audio_never_catches_up() { let runtime = Arc::new(UpstreamMediaRuntime::new()); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio_first = play(runtime.plan_audio_pts(1_000_000)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let due_at = tokio::time::Instant::now() + Duration::from_millis(20); assert!( !runtime .wait_for_audio_master(video_first.local_pts_us + 100_000, due_at) .await ); } #[tokio::test(flavor = "current_thread")] async fn wait_for_audio_master_returns_true_when_no_microphone_stream_is_active() { let runtime = Arc::new(UpstreamMediaRuntime::new()); let camera = runtime.activate_camera(); let microphone = runtime.activate_microphone(); runtime.close_microphone(microphone.generation); assert!(runtime.is_camera_active(camera.generation)); assert!( runtime .wait_for_audio_master( 123_456, tokio::time::Instant::now() + Duration::from_millis(10) ) .await ); } #[tokio::test(flavor = "current_thread")] async fn new_microphone_owner_waits_for_the_previous_sink_to_release() { let runtime = Arc::new(UpstreamMediaRuntime::new()); let first = runtime.activate_microphone(); let first_permit = runtime .reserve_microphone_sink(first.generation) .await .expect("first owner should acquire the sink gate"); let second = runtime.activate_microphone(); let waiter = tokio::spawn({ let runtime = runtime.clone(); async move { runtime .reserve_microphone_sink(second.generation) .await .is_some() } }); tokio::time::sleep(Duration::from_millis(25)).await; assert!(!waiter.is_finished()); drop(first_permit); assert!(waiter.await.expect("waiter task should finish")); } #[tokio::test(flavor = "current_thread")] async fn superseded_microphone_waiter_stands_down_before_opening_a_sink() { let runtime = Arc::new(UpstreamMediaRuntime::new()); let first = runtime.activate_microphone(); let first_permit = runtime .reserve_microphone_sink(first.generation) .await .expect("first owner should acquire the sink gate"); let second = runtime.activate_microphone(); let superseded_waiter = tokio::spawn({ let runtime = runtime.clone(); async move { runtime .reserve_microphone_sink(second.generation) .await .is_some() } }); tokio::time::sleep(Duration::from_millis(25)).await; let _third = runtime.activate_microphone(); drop(first_permit); assert!( !superseded_waiter .await .expect("superseded waiter task should finish"), "older waiter should stand down instead of opening a sink after supersession" ); }