use super::{UpstreamMediaRuntime, play, runtime_without_offsets}; use serial_test::serial; use std::time::Duration; fn with_info_tracing(f: impl FnOnce() -> T) -> T { let subscriber = tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_test_writer() .finish(); tracing::subscriber::with_default(subscriber, f) } #[test] #[serial(upstream_media_runtime)] fn shared_playout_epoch_is_reused_across_audio_and_video() { let runtime = runtime_without_offsets(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let audio_first = play(runtime.plan_audio_pts(1_000_000)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); let audio_next = play(runtime.plan_audio_pts(1_010_000)); assert_eq!(video_first.local_pts_us, 0); assert_eq!(audio_first.local_pts_us, 0); assert_eq!(video_first.due_at, audio_first.due_at); assert_eq!( audio_next .due_at .saturating_duration_since(audio_first.due_at), Duration::from_micros(10_000) ); } #[test] #[serial(upstream_media_runtime)] fn pairing_window_holds_one_sided_playout_by_default() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); assert!(matches!( runtime.plan_video_pts(1_016_666, 16_666), super::UpstreamPlanDecision::AwaitingPair )); }); } #[test] #[serial(upstream_media_runtime)] fn explicit_override_allows_one_sided_playout_for_compatibility() { temp_env::with_var("LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP", Some("0"), || { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let first = play(runtime.plan_video_pts(1_000_000, 16_666)); let second = play(runtime.plan_video_pts(1_016_666, 16_666)); assert_eq!(first.local_pts_us, 0); assert_eq!(second.local_pts_us, 16_666); }); }); } #[test] #[serial(upstream_media_runtime)] fn overdue_pairing_refreshes_waiting_anchor_before_late_counterpart_arrives() { temp_env::with_var( "LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS", Some("0"), || { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { let runtime = runtime_without_offsets(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); assert!(matches!( runtime.plan_video_pts(9_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let audio = play(runtime.plan_audio_pts(9_010_000)); assert!(matches!( runtime.plan_video_pts(9_000_000, 16_666), super::UpstreamPlanDecision::DropBeforeOverlap )); let video = play(runtime.plan_video_pts(9_016_666, 16_666)); assert_eq!(audio.local_pts_us, 0); assert_eq!(video.local_pts_us, 6_666); }); }, ); } #[test] #[serial(upstream_media_runtime)] fn map_wrappers_hide_unpaired_and_pre_overlap_packets() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert_eq!(runtime.map_video_pts(1_000_000, 16_666), None); assert_eq!(runtime.map_audio_pts(1_000_000), Some(0)); assert_eq!(runtime.map_audio_pts(999_999), None); } #[test] #[serial(upstream_media_runtime)] fn shared_playout_trace_path_keeps_planned_pts_stable() { temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let audio = play(runtime.plan_audio_pts(1_000_000)); let video = play(runtime.plan_video_pts(1_000_000, 16_666)); assert_eq!(video.local_pts_us, 0); assert_eq!(audio.local_pts_us, 0); }); } #[test] #[serial(upstream_media_runtime)] fn catastrophic_lateness_reanchors_the_shared_playout_epoch() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { let runtime = UpstreamMediaRuntime::new(); runtime.set_playout_offsets(0, 0); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio_first = play(runtime.plan_audio_pts(1_000_000)); let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); std::thread::sleep(Duration::from_millis(30)); let recovered_audio = play(runtime.plan_audio_pts(1_000_000)); assert!( recovered_audio.due_at > tokio::time::Instant::now(), "recovered packet should be scheduled back into the future" ); assert!( recovered_audio.late_by <= Duration::from_millis(1), "recovered packet should no longer be catastrophically late" ); let recovered_video = play(runtime.plan_video_pts(1_016_666, 16_666)); assert!( recovered_video.due_at > tokio::time::Instant::now(), "shared epoch recovery should also move video back into the future" ); }); }); } #[test] #[serial(upstream_media_runtime)] fn overlap_anchor_gets_a_fresh_playout_budget_when_pairing_finishes_late() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { let runtime = UpstreamMediaRuntime::new(); runtime.set_playout_offsets(0, 0); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); std::thread::sleep(Duration::from_millis(15)); let before_pair = tokio::time::Instant::now(); let audio_first = play(runtime.plan_audio_pts(1_000_000)); let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); assert!( audio_first.due_at.saturating_duration_since(before_pair) >= Duration::from_millis(15), "audio should keep most of the configured playout budget after late pairing" ); assert!( video_first.due_at.saturating_duration_since(before_pair) >= Duration::from_millis(15), "video should keep most of the configured playout budget after late pairing" ); }); } #[test] #[serial(upstream_media_runtime)] fn catastrophic_lateness_reanchors_only_once_per_session() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { let runtime = UpstreamMediaRuntime::new(); runtime.set_playout_offsets(0, 0); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio_first = play(runtime.plan_audio_pts(1_000_000)); let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); std::thread::sleep(Duration::from_millis(30)); let first_recovered = play(runtime.plan_audio_pts(1_000_000)); assert!(first_recovered.due_at > tokio::time::Instant::now()); std::thread::sleep(Duration::from_millis(30)); let second_late = play(runtime.plan_audio_pts(1_000_001)); assert!( second_late.late_by > Duration::from_millis(5), "session should not keep extending itself with repeated reanchors" ); }); }); } #[test] #[serial(upstream_media_runtime)] fn catastrophic_lateness_does_not_reanchor_once_the_session_is_well_past_startup() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { let runtime = UpstreamMediaRuntime::new(); runtime.set_playout_offsets(0, 0); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio_first = play(runtime.plan_audio_pts(1_000_000)); let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); std::thread::sleep(Duration::from_millis(130)); let late_audio = play(runtime.plan_audio_pts(1_100_000)); assert_eq!(late_audio.local_pts_us, 100_000); assert!( late_audio.late_by > Duration::from_millis(5), "late packet should remain late instead of reanchoring the shared epoch mid-session" ); assert!( late_audio.due_at <= tokio::time::Instant::now(), "mid-session lateness should no longer push due_at back into the future" ); }); }); } #[test] #[serial(upstream_media_runtime)] fn default_runtime_covers_video_map_play_path() { let runtime = UpstreamMediaRuntime::default(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_000_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); let _audio = play(runtime.plan_audio_pts(1_000_000)); assert_eq!(runtime.map_video_pts(1_000_000, 16_666), Some(0)); } #[tokio::test(flavor = "current_thread")] #[serial(upstream_media_runtime)] async fn wait_for_audio_master_returns_false_immediately_once_due_time_has_already_passed() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!( !runtime .wait_for_audio_master( 123_456, tokio::time::Instant::now() .checked_sub(Duration::from_millis(1)) .unwrap_or_else(tokio::time::Instant::now), ) .await ); } #[test] #[serial(upstream_media_runtime)] fn timing_trace_paths_emit_overlap_and_dropbeforeoverlap_details() { temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { with_info_tracing(|| { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); assert!(matches!( runtime.plan_video_pts(1_300_000, 16_666), super::UpstreamPlanDecision::AwaitingPair )); assert!(matches!( runtime.plan_audio_pts(1_000_000), super::UpstreamPlanDecision::DropBeforeOverlap )); let _video = play(runtime.plan_video_pts(1_300_000, 16_666)); assert!(matches!( runtime.plan_audio_pts(1_000_000), super::UpstreamPlanDecision::DropBeforeOverlap )); }); }); }