//! End-to-end server coverage for upstream media pairing and freshness. //! //! Scope: run a local gRPC server and verify webcam/mic packet pairing behavior. //! Targets: `server/src/main.rs`, `server/src/upstream_media_runtime.rs`. //! Why: MJPEG lip sync depends on keeping late/early packet decisions stable //! while streams start, stop, or temporarily lose their pair. #[cfg(coverage)] #[allow(warnings)] mod server_upstream_media_pairing { include!(env!("LESAVKA_SERVER_MAIN_SRC")); use lesavka_common::lesavka::relay_client::RelayClient; use serial_test::serial; use temp_env::with_var; use tempfile::tempdir; use tonic::transport::Channel; async fn connect_with_retry(addr: std::net::SocketAddr) -> Channel { let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{addr}")) .expect("endpoint") .tcp_nodelay(true); for _ in 0..40 { if let Ok(channel) = endpoint.clone().connect().await { return channel; } tokio::time::sleep(std::time::Duration::from_millis(25)).await; } panic!("failed to connect to local tonic server"); } fn build_handler_for_tests() -> (tempfile::TempDir, Handler) { let dir = tempdir().expect("tempdir"); let kb_path = dir.path().join("hidg0.bin"); let ms_path = dir.path().join("hidg1.bin"); std::fs::write(&kb_path, []).expect("create kb file"); std::fs::write(&ms_path, []).expect("create ms file"); let kb = tokio::fs::File::from_std( std::fs::OpenOptions::new() .read(true) .write(true) .open(&kb_path) .expect("open kb"), ); let ms = tokio::fs::File::from_std( std::fs::OpenOptions::new() .read(true) .write(true) .open(&ms_path) .expect("open ms"), ); ( dir, Handler { kb: std::sync::Arc::new(tokio::sync::Mutex::new(Some(kb))), ms: std::sync::Arc::new(tokio::sync::Mutex::new(Some(ms))), gadget: UsbGadget::new("lesavka"), did_cycle: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)), camera_rt: std::sync::Arc::new(CameraRuntime::new()), upstream_media_rt: std::sync::Arc::new(UpstreamMediaRuntime::new()), calibration: std::sync::Arc::new(CalibrationStore::load(std::sync::Arc::new( UpstreamMediaRuntime::new(), ))), capture_power: CapturePowerManager::new(), eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new( std::collections::HashMap::new(), )), }, ) } async fn serve_handler( handler: Handler, ) -> ( tokio::task::JoinHandle<()>, RelayClient, ) { let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind"); let addr = listener.local_addr().expect("addr"); drop(listener); let server = tokio::spawn(async move { let _ = tonic::transport::Server::builder() .add_service(RelayServer::new(handler)) .serve(addr) .await; }); let channel = connect_with_retry(addr).await; (server, RelayClient::new(channel)) } #[test] #[serial] fn stream_microphone_drops_stale_packets_when_freshness_budget_is_zero() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { with_var("LESAVKA_UPSTREAM_STALE_DROP_MS", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (tx, rx) = tokio::sync::mpsc::channel(4); tx.send(AudioPacket { id: 0, pts: 12_345, data: vec![1, 2, 3, 4, 5, 6], ..AudioPacket::default() }) .await .expect("send stale synthetic upstream audio"); drop(tx); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); let mut response = cli .stream_microphone(tonic::Request::new(outbound)) .await .expect("microphone stream should open"); let ack = tokio::time::timeout( std::time::Duration::from_secs(1), response.get_mut().message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); assert_eq!(ack, Empty {}); server.abort(); }); }); }); }); } #[test] #[serial] fn stream_camera_drops_frames_that_never_reach_the_audio_master() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { with_var( "LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_US", Some("0"), || { rt.block_on(async { let (_dir, handler) = build_handler_for_tests(); let (server, mut cli) = serve_handler(handler).await; let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); let mut audio_response = cli .stream_microphone(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(audio_rx), )) .await .expect("microphone stream should open") .into_inner(); let mut video_response = cli .stream_camera(tonic::Request::new( tokio_stream::wrappers::ReceiverStream::new(video_rx), )) .await .expect("camera stream should open") .into_inner(); audio_tx .send(AudioPacket { id: 0, pts: 1_000_000, data: vec![1, 2, 3, 4], ..AudioPacket::default() }) .await .expect("send first audio packet"); video_tx .send(VideoPacket { id: 2, pts: 1_050_000, data: vec![0, 0, 0, 1, 0x65, 0x55], ..Default::default() }) .await .expect("send unmatched video packet"); drop(video_tx); let video_ack = tokio::time::timeout( std::time::Duration::from_secs(1), video_response.message(), ) .await .expect("camera ack timeout") .expect("camera ack grpc") .expect("camera ack item"); assert_eq!(video_ack, Empty {}); drop(audio_tx); let audio_ack = tokio::time::timeout( std::time::Duration::from_secs(1), audio_response.message(), ) .await .expect("microphone ack timeout") .expect("microphone ack grpc") .expect("microphone ack item"); assert_eq!(audio_ack, Empty {}); server.abort(); }); }, ); }); }); }); } }