//! End-to-end server coverage for upstream media streams. //! //! Scope: run a local gRPC server and push synthetic client webcam/mic packets //! through the public `StreamCamera` and `StreamMicrophone` RPCs. //! Targets: `server/src/main.rs`, `server/src/audio.rs`, `server/src/video_sinks.rs`. //! Why: local webcam/mic uplink should stay testable without physical UVC, //! HDMI, or ALSA hardware in CI. #[cfg(coverage)] #[allow(warnings)] mod server_upstream_media { include!(env!("LESAVKA_SERVER_MAIN_SRC")); use lesavka_common::lesavka::relay_client::RelayClient; use serial_test::serial; use temp_env::with_var; use tempfile::tempdir; use tonic::transport::Channel; async fn connect_with_retry(addr: std::net::SocketAddr) -> Channel { let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{addr}")) .expect("endpoint") .tcp_nodelay(true); for _ in 0..40 { if let Ok(channel) = endpoint.clone().connect().await { return channel; } tokio::time::sleep(std::time::Duration::from_millis(25)).await; } panic!("failed to connect to local tonic server"); } fn build_handler_for_tests() -> (tempfile::TempDir, Handler) { let dir = tempdir().expect("tempdir"); let kb_path = dir.path().join("hidg0.bin"); let ms_path = dir.path().join("hidg1.bin"); std::fs::write(&kb_path, []).expect("create kb file"); std::fs::write(&ms_path, []).expect("create ms file"); let kb = tokio::fs::File::from_std( std::fs::OpenOptions::new() .read(true) .write(true) .open(&kb_path) .expect("open kb"), ); let ms = tokio::fs::File::from_std( std::fs::OpenOptions::new() .read(true) .write(true) .open(&ms_path) .expect("open ms"), ); ( dir, Handler { kb: std::sync::Arc::new(tokio::sync::Mutex::new(Some(kb))), ms: std::sync::Arc::new(tokio::sync::Mutex::new(Some(ms))), gadget: UsbGadget::new("lesavka"), did_cycle: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), camera_rt: std::sync::Arc::new(CameraRuntime::new()), upstream_media_rt: std::sync::Arc::new(UpstreamMediaRuntime::new()), calibration: std::sync::Arc::new(CalibrationStore::load(std::sync::Arc::new( UpstreamMediaRuntime::new(), ))), capture_power: CapturePowerManager::new(), eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashMap::new(), )), }, ) } async fn serve_handler( handler: Handler, ) -> ( tokio::task::JoinHandle<()>, RelayClient, ) { let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind"); let addr = listener.local_addr().expect("addr"); drop(listener); let server = tokio::spawn(async move { let _ = tonic::transport::Server::builder() .add_service(RelayServer::new(handler)) .serve(addr) .await; }); let channel = connect_with_retry(addr).await; (server, RelayClient::new(channel)) } #[test] #[serial] fn stream_microphone_accepts_upstream_audio_packets() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(4); tx.send(AudioPacket { id: 0, pts: 12_345, data: vec![1, 2, 3, 4, 5, 6], ..AudioPacket::default() }) .await .expect("send synthetic upstream audio"); drop(tx); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); let mut response = cli .stream_microphone(tonic::Request::new(outbound)) .await .expect("microphone stream should open"); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), response.get_mut().message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); } #[test] #[serial] fn stream_microphone_supersedes_the_previous_owner_cleanly() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (first_tx, first_rx) = tokio::sync::mpsc::channel(1); let (_second_tx, second_rx) = tokio::sync::mpsc::channel(1); let mut first = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(first_rx), )) .await .expect("first microphone stream") .into_inner(); let _second = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(second_rx), )) .await .expect("second microphone stream supersedes first"); drop(first_tx); let ack = tokio::time::timeout(std::time::Duration::from_secs(1), first.message()) .await .expect("superseded microphone ack timeout") .expect("superseded microphone ack grpc") .expect("superseded microphone ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); } #[test] #[serial] fn stream_microphone_surfaces_internal_error_when_sink_open_fails() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_TEST_FORCE_PIPELINE_START_ERROR", Some("1"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (_tx, rx) = tokio::sync::mpsc::channel(1); let err = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(rx), )) .await .expect_err("missing sink should fail the stream"); assert_eq!(err.code(), tonic::Code::Internal); server.abort(); }); }); }); } #[test] #[serial] fn stream_camera_accepts_upstream_video_packets() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(4); tx.send(VideoPacket { id: 2, pts: 54_321, data: vec![0, 0, 0, 1, 0x65, 0x88], ..Default::default() }) .await .expect("send synthetic upstream video"); drop(tx); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); let mut response = cli .stream_camera(tonic::Request::new(outbound)) .await .expect("camera stream should open"); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), response.get_mut().message(), ) .await .expect("camera ack timeout") .expect("camera ack grpc") .expect("camera ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_camera_waits_for_the_pairing_window_then_plays_with_audio() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); let mut video_response = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(video_rx), )) .await .expect("camera stream should open") .into_inner(); let mut audio_response = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(audio_rx), )) .await .expect("microphone stream should open") .into_inner(); video_tx .send(VideoPacket { id: 2, pts: 1_000_000, data: vec![0, 0, 0, 1, 0x65, 0x99], ..Default::default() }) .await .expect("send video packet"); tokio::time::sleep(std::time::Duration::from_millis(20)).await; audio_tx .send(AudioPacket { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], ..AudioPacket::default() }) .await .expect("send matching audio packet"); drop(video_tx); drop(audio_tx); let video_ack = tokio::time::timeout( std::time::Duration::from_secs(1), video_response.message(), ) .await .expect("camera ack timeout") .expect("camera ack grpc") .expect("camera ack item"); let audio_ack = tokio::time::timeout( std::time::Duration::from_secs(1), audio_response.message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); assert_eq!(video_ack, Empty {}); assert_eq!(audio_ack, Empty {}); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_camera_stops_a_superseded_session_cleanly() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (first_tx, first_rx) = tokio::sync::mpsc::channel(4); let (second_tx, second_rx) = tokio::sync::mpsc::channel(1); let mut first = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(first_rx), )) .await .expect("first camera stream") .into_inner(); let _second = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(second_rx), )) .await .expect("second camera stream supersedes first"); first_tx .send(VideoPacket { id: 2, pts: 99, data: vec![0, 0, 0, 1, 0x65], ..Default::default() }) .await .expect("send packet to first stream"); drop(first_tx); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), first.message(), ) .await .expect("superseded camera ack timeout") .expect("superseded camera ack grpc") .expect("superseded camera ack item"); assert_eq!(ack, Empty {}); drop(second_tx); server.abort(); }); }); }); }); } }