//! End-to-end server coverage for upstream media streams. //! //! Scope: run a local gRPC server and push synthetic client webcam/mic packets //! through the public `StreamCamera` and `StreamMicrophone` RPCs. //! Targets: `server/src/main.rs`, `server/src/audio.rs`, `server/src/video_sinks.rs`. //! Why: local webcam/mic uplink should stay testable without physical UVC, //! HDMI, or ALSA hardware in CI. #[cfg(coverage)] #[allow(warnings)] mod server_upstream_media { include!(env!("LESAVKA_SERVER_MAIN_SRC")); use lesavka_common::lesavka::relay_client::RelayClient; use serial_test::serial; use temp_env::with_var; use tempfile::tempdir; use tonic::transport::Channel; async fn connect_with_retry(addr: std::net::SocketAddr) -> Channel { let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{addr}")) .expect("endpoint") .tcp_nodelay(true); for _ in 0..40 { if let Ok(channel) = endpoint.clone().connect().await { return channel; } tokio::time::sleep(std::time::Duration::from_millis(25)).await; } panic!("failed to connect to local tonic server"); } fn build_handler_for_tests() -> (tempfile::TempDir, Handler) { let dir = tempdir().expect("tempdir"); let kb_path = dir.path().join("hidg0.bin"); let ms_path = dir.path().join("hidg1.bin"); std::fs::write(&kb_path, []).expect("create kb file"); std::fs::write(&ms_path, []).expect("create ms file"); let kb = tokio::fs::File::from_std( std::fs::OpenOptions::new() .read(true) .write(true) .open(&kb_path) .expect("open kb"), ); let ms = tokio::fs::File::from_std( std::fs::OpenOptions::new() .read(true) .write(true) .open(&ms_path) .expect("open ms"), ); ( dir, Handler { kb: std::sync::Arc::new(tokio::sync::Mutex::new(Some(kb))), ms: std::sync::Arc::new(tokio::sync::Mutex::new(Some(ms))), gadget: UsbGadget::new("lesavka"), did_cycle: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), camera_rt: std::sync::Arc::new(CameraRuntime::new()), upstream_media_rt: std::sync::Arc::new(UpstreamMediaRuntime::new()), capture_power: CapturePowerManager::new(), eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashMap::new(), )), }, ) } async fn serve_handler( handler: Handler, ) -> ( tokio::task::JoinHandle<()>, RelayClient, ) { let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind"); let addr = listener.local_addr().expect("addr"); drop(listener); let server = tokio::spawn(async move { let _ = tonic::transport::Server::builder() .add_service(RelayServer::new(handler)) .serve(addr) .await; }); let channel = connect_with_retry(addr).await; (server, RelayClient::new(channel)) } #[test] #[serial] fn stream_microphone_accepts_upstream_audio_packets() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(4); tx.send(AudioPacket { id: 0, pts: 12_345, data: vec![1, 2, 3, 4, 5, 6], }) .await .expect("send synthetic upstream audio"); drop(tx); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); let mut response = cli .stream_microphone(tonic::Request::new(outbound)) .await .expect("microphone stream should open"); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), response.get_mut().message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); } #[test] #[serial] fn stream_microphone_supersedes_the_previous_owner_cleanly() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (first_tx, first_rx) = tokio::sync::mpsc::channel(1); let (_second_tx, second_rx) = tokio::sync::mpsc::channel(1); let mut first = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(first_rx), )) .await .expect("first microphone stream") .into_inner(); let _second = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(second_rx), )) .await .expect("second microphone stream supersedes first"); drop(first_tx); let ack = tokio::time::timeout(std::time::Duration::from_secs(1), first.message()) .await .expect("superseded microphone ack timeout") .expect("superseded microphone ack grpc") .expect("superseded microphone ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); } #[test] #[serial] fn stream_microphone_surfaces_internal_error_when_sink_open_fails() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_TEST_FORCE_PIPELINE_START_ERROR", Some("1"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (_tx, rx) = tokio::sync::mpsc::channel(1); let err = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(rx), )) .await .expect_err("missing sink should fail the stream"); assert_eq!(err.code(), tonic::Code::Internal); server.abort(); }); }); }); } #[test] #[serial] fn stream_camera_accepts_upstream_video_packets() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(4); tx.send(VideoPacket { id: 2, pts: 54_321, data: vec![0, 0, 0, 1, 0x65, 0x88], ..Default::default() }) .await .expect("send synthetic upstream video"); drop(tx); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); let mut response = cli .stream_camera(tonic::Request::new(outbound)) .await .expect("camera stream should open"); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), response.get_mut().message(), ) .await .expect("camera ack timeout") .expect("camera ack grpc") .expect("camera ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_camera_waits_for_the_pairing_window_then_plays_with_audio() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); let mut video_response = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(video_rx), )) .await .expect("camera stream should open") .into_inner(); let mut audio_response = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(audio_rx), )) .await .expect("microphone stream should open") .into_inner(); video_tx .send(VideoPacket { id: 2, pts: 1_000_000, data: vec![0, 0, 0, 1, 0x65, 0x99], ..Default::default() }) .await .expect("send video packet"); tokio::time::sleep(std::time::Duration::from_millis(20)).await; audio_tx .send(AudioPacket { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], }) .await .expect("send matching audio packet"); drop(video_tx); drop(audio_tx); let video_ack = tokio::time::timeout( std::time::Duration::from_secs(1), video_response.message(), ) .await .expect("camera ack timeout") .expect("camera ack grpc") .expect("camera ack item"); let audio_ack = tokio::time::timeout( std::time::Duration::from_secs(1), audio_response.message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); assert_eq!(video_ack, Empty {}); assert_eq!(audio_ack, Empty {}); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_camera_stops_a_superseded_session_cleanly() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (first_tx, first_rx) = tokio::sync::mpsc::channel(4); let (second_tx, second_rx) = tokio::sync::mpsc::channel(1); let mut first = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(first_rx), )) .await .expect("first camera stream") .into_inner(); let _second = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(second_rx), )) .await .expect("second camera stream supersedes first"); first_tx .send(VideoPacket { id: 2, pts: 99, data: vec![0, 0, 0, 1, 0x65], ..Default::default() }) .await .expect("send packet to first stream"); drop(first_tx); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), first.message(), ) .await .expect("superseded camera ack timeout") .expect("superseded camera ack grpc") .expect("superseded camera ack item"); assert_eq!(ack, Empty {}); drop(second_tx); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_microphone_drops_pre_overlap_audio_after_video_sets_the_pair_anchor() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); let mut audio_response = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(audio_rx), )) .await .expect("microphone stream should open") .into_inner(); let mut video_response = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(video_rx), )) .await .expect("camera stream should open") .into_inner(); audio_tx .send(AudioPacket { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], }) .await .expect("send leading audio packet"); tokio::time::sleep(std::time::Duration::from_millis(20)).await; video_tx .send(VideoPacket { id: 2, pts: 1_300_000, data: vec![0, 0, 0, 1, 0x65, 0x88], ..Default::default() }) .await .expect("send anchor video packet"); audio_tx .send(AudioPacket { id: 0, pts: 1_310_000, data: vec![5, 6, 7, 8], }) .await .expect("send post-anchor audio packet"); drop(audio_tx); drop(video_tx); let audio_ack = tokio::time::timeout( std::time::Duration::from_secs(1), audio_response.message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); let video_ack = tokio::time::timeout( std::time::Duration::from_secs(1), video_response.message(), ) .await .expect("camera ack timeout") .expect("camera ack grpc") .expect("camera ack item"); assert_eq!(audio_ack, Empty {}); assert_eq!(video_ack, Empty {}); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_camera_drops_pre_overlap_video_after_audio_sets_the_pair_anchor() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); let mut audio_response = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(audio_rx), )) .await .expect("microphone stream should open") .into_inner(); let mut video_response = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(video_rx), )) .await .expect("camera stream should open") .into_inner(); video_tx .send(VideoPacket { id: 2, pts: 1_000_000, data: vec![0, 0, 0, 1, 0x65, 0x77], ..Default::default() }) .await .expect("send leading video packet"); tokio::time::sleep(std::time::Duration::from_millis(20)).await; audio_tx .send(AudioPacket { id: 0, pts: 1_300_000, data: vec![1, 2, 3, 4], }) .await .expect("send anchor audio packet"); video_tx .send(VideoPacket { id: 2, pts: 1_310_000, data: vec![0, 0, 0, 1, 0x65, 0x88], ..Default::default() }) .await .expect("send post-anchor video packet"); drop(audio_tx); drop(video_tx); let audio_ack = tokio::time::timeout( std::time::Duration::from_secs(1), audio_response.message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); let video_ack = tokio::time::timeout( std::time::Duration::from_secs(1), video_response.message(), ) .await .expect("camera ack timeout") .expect("camera ack grpc") .expect("camera ack item"); assert_eq!(audio_ack, Empty {}); assert_eq!(video_ack, Empty {}); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_microphone_drops_stale_packets_when_freshness_budget_is_zero() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { with_var("LESAVKA_UPSTREAM_STALE_DROP_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(4); tx.send(AudioPacket { id: 0, pts: 12_345, data: vec![1, 2, 3, 4, 5, 6], }) .await .expect("send stale synthetic upstream audio"); drop(tx); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); let mut response = cli .stream_microphone(tonic::Request::new(outbound)) .await .expect("microphone stream should open"); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), response.get_mut().message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_camera_drops_frames_that_never_reach_the_audio_master() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); let mut audio_response = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(audio_rx), )) .await .expect("microphone stream should open") .into_inner(); let mut video_response = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(video_rx), )) .await .expect("camera stream should open") .into_inner(); audio_tx .send(AudioPacket { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], }) .await .expect("send first audio packet"); video_tx .send(VideoPacket { id: 2, pts: 1_050_000, data: vec![0, 0, 0, 1, 0x65, 0x55], ..Default::default() }) .await .expect("send unmatched video packet"); drop(audio_tx); drop(video_tx); let audio_ack = tokio::time::timeout( std::time::Duration::from_secs(1), audio_response.message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); let video_ack = tokio::time::timeout( std::time::Duration::from_secs(1), video_response.message(), ) .await .expect("camera ack timeout") .expect("camera ack grpc") .expect("camera ack item"); assert_eq!(audio_ack, Empty {}); assert_eq!(video_ack, Empty {}); server.abort(); }); }); }); }); } }