//! End-to-end server coverage for upstream microphone media streams. //! //! Scope: run a local gRPC server and push synthetic client microphone packets //! through the public `StreamMicrophone` RPC. //! Targets: `server/src/main.rs`, `server/src/audio.rs`. //! Why: upstream audio should surface sink failures, supersession, and normal //! packet delivery without requiring ALSA hardware in CI. #[cfg(coverage)] #[allow(warnings)] mod server_upstream_media_audio { include!(env!("LESAVKA_SERVER_MAIN_SRC")); include!("../support/server_upstream_media_harness.rs"); use serial_test::serial; use temp_env::with_var; #[test] #[serial] fn stream_microphone_accepts_upstream_audio_packets() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(4); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); let mut response = cli .stream_microphone(tonic::Request::new(outbound)) .await .expect("microphone stream should open"); tokio::time::sleep(std::time::Duration::from_millis(35)).await; tx.send(AudioPacket { id: 0, pts: 12_345, data: vec![1, 2, 3, 4, 5, 6], ..AudioPacket::default() }) .await .expect("send synthetic upstream audio"); drop(tx); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), response.get_mut().message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); } #[test] #[serial] fn stream_microphone_supersedes_the_previous_owner_cleanly() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (first_tx, first_rx) = tokio::sync::mpsc::channel(1); let (_second_tx, second_rx) = tokio::sync::mpsc::channel(1); let mut first = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(first_rx), )) .await .expect("first microphone stream") .into_inner(); let _second = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(second_rx), )) .await .expect("second microphone stream supersedes first"); drop(first_tx); let ack = tokio::time::timeout(std::time::Duration::from_secs(1), first.message()) .await .expect("superseded microphone ack timeout") .expect("superseded microphone ack grpc") .expect("superseded microphone ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); } #[test] #[serial] fn stream_microphone_surfaces_internal_error_when_sink_open_fails() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_TEST_FORCE_PIPELINE_START_ERROR", Some("1"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (_tx, rx) = tokio::sync::mpsc::channel(1); let err = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(rx), )) .await .expect_err("missing sink should fail the stream"); assert_eq!(err.code(), tonic::Code::Internal); server.abort(); }); }); }); } }