//! End-to-end server coverage for bundled upstream webcam/microphone media. //! //! Scope: run a local gRPC server and send a synthetic HEVC video packet with //! the audio that should play beside it through `StreamWebcamMedia`. //! Targets: `server/src/main.rs`, `server/src/upstream_media_runtime.rs`. //! Why: client transport should reach the server as one timestamped media unit //! so the relay can preserve sync before final RCT playout. #[cfg(coverage)] #[allow(warnings)] mod server_upstream_media_bundle { include!(env!("LESAVKA_SERVER_MAIN_SRC")); include!("../support/server_upstream_media_harness.rs"); use serial_test::serial; use temp_env::with_var; #[test] #[serial] fn stream_webcam_media_records_bundled_hevc_audio_video_timing() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let runtime = handler.upstream_media_rt.clone(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(4); let mut response = cli .stream_webcam_media(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(rx), )) .await .expect("bundled stream should open") .into_inner(); tx.send(UpstreamMediaBundle { session_id: 77, seq: 1, capture_start_us: 980_000, capture_end_us: 1_020_000, video: Some(VideoPacket { id: 2, pts: 1_000_000, data: vec![0, 0, 0, 1, 0x26, 0x01], client_capture_pts_us: 1_000_000, client_send_pts_us: 1_005_000, client_queue_age_ms: 5, ..Default::default() }), audio: vec![AudioPacket { id: 0, pts: 990_000, data: vec![1, 2, 3, 4], client_capture_pts_us: 990_000, client_send_pts_us: 1_006_000, client_queue_age_ms: 16, ..Default::default() }], audio_sample_rate: 48_000, audio_channels: 2, video_width: 1920, video_height: 1080, video_fps: 30, }) .await .expect("send bundled upstream media"); let mut live_snapshot = None; for _ in 0..40 { let snapshot = runtime.snapshot(); if snapshot.latest_camera_remote_pts_us == Some(1_000_000) && snapshot.latest_microphone_remote_pts_us == Some(990_000) { live_snapshot = Some(snapshot); break; } tokio::time::sleep(std::time::Duration::from_millis(25)).await; } let snapshot = live_snapshot.expect("bundled timing should reach runtime"); assert!(snapshot.client_timing_window_samples >= 1); assert_eq!(snapshot.latest_camera_remote_pts_us, Some(1_000_000)); assert_eq!(snapshot.latest_microphone_remote_pts_us, Some(990_000)); drop(tx); let ack = tokio::time::timeout(std::time::Duration::from_secs(1), response.message()) .await .expect("bundled ack timeout") .expect("bundled ack grpc") .expect("bundled ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); } #[test] #[serial] fn stream_webcam_media_loopback_preserves_full_hevc_probe_train_timing() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let runtime = handler.upstream_media_rt.clone(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(20); let mut response = cli .stream_webcam_media(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(rx), )) .await .expect("bundled loopback stream should open") .into_inner(); for event in 0..16_u64 { let video_pts = 4_000_000 + event * 1_000_000; tx.send(UpstreamMediaBundle { session_id: 88, seq: event + 1, capture_start_us: video_pts.saturating_sub(20_000), capture_end_us: video_pts.saturating_add(10_000), video: Some(VideoPacket { id: 2, pts: video_pts, data: vec![0, 0, 0, 1, 0x26, (event + 1) as u8], client_capture_pts_us: video_pts, client_send_pts_us: video_pts.saturating_add(5_000), client_queue_age_ms: 5, effective_fps: 30, ..Default::default() }), audio: vec![ AudioPacket { id: 0, pts: video_pts.saturating_sub(20_000), data: vec![0x11; 1_920], client_capture_pts_us: video_pts.saturating_sub(20_000), client_send_pts_us: video_pts.saturating_add(5_000), client_queue_age_ms: 25, ..Default::default() }, AudioPacket { id: 0, pts: video_pts.saturating_add(10_000), data: vec![0x22; 1_920], client_capture_pts_us: video_pts.saturating_add(10_000), client_send_pts_us: video_pts.saturating_add(15_000), client_queue_age_ms: 5, ..Default::default() }, ], audio_sample_rate: 48_000, audio_channels: 2, video_width: 1920, video_height: 1080, video_fps: 30, }) .await .expect("send HEVC loopback bundle"); } let expected_video_pts = 19_000_000; let expected_audio_pts = 19_010_000; let mut live_snapshot = None; for _ in 0..80 { let snapshot = runtime.snapshot(); if snapshot.latest_camera_remote_pts_us == Some(expected_video_pts) && snapshot.latest_microphone_remote_pts_us == Some(expected_audio_pts) { live_snapshot = Some(snapshot); break; } tokio::time::sleep(std::time::Duration::from_millis(25)).await; } let snapshot = live_snapshot.expect("full HEVC bundle train should reach runtime"); assert!( snapshot.client_timing_window_samples >= 16, "server should record one timing sample per bundled event, got {}", snapshot.client_timing_window_samples ); assert_eq!( snapshot.latest_camera_remote_pts_us, Some(expected_video_pts) ); assert_eq!( snapshot.latest_microphone_remote_pts_us, Some(expected_audio_pts) ); drop(tx); let ack = tokio::time::timeout(std::time::Duration::from_secs(1), response.message()) .await .expect("bundled loopback ack timeout") .expect("bundled loopback ack grpc") .expect("bundled loopback ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); } }