lesavka/testing/tests/server_upstream_media_video_contract.rs

197 lines
8.4 KiB
Rust

//! End-to-end server coverage for upstream webcam video streams.
//!
//! Scope: run a local gRPC server and push synthetic client video packets
//! through the public `StreamCamera` RPC.
//! Targets: `server/src/main.rs`, `server/src/video_sinks.rs`.
//! Why: upstream video needs freshness-first delivery while still waiting for
//! an audio timing master before playout.
#[cfg(coverage)]
#[allow(warnings)]
mod server_upstream_media_video {
include!(env!("LESAVKA_SERVER_MAIN_SRC"));
include!("../support/server_upstream_media_harness.rs");
use serial_test::serial;
use temp_env::with_var;
#[test]
#[serial]
fn stream_camera_accepts_upstream_video_packets() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests();
let (server, mut cli) = serve_handler(handler).await;
let (tx, rx) = tokio::sync::mpsc::channel(4);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut response = cli
.stream_camera(tonic::Request::new(outbound))
.await
.expect("camera stream should open");
tokio::time::sleep(std::time::Duration::from_millis(35)).await;
tx.send(VideoPacket {
id: 2,
pts: 54_321,
data: vec![0, 0, 0, 1, 0x65, 0x88],
..Default::default()
})
.await
.expect("send synthetic upstream video");
drop(tx);
let ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
response.get_mut().message(),
)
.await
.expect("camera ack timeout")
.expect("camera ack grpc")
.expect("camera ack item");
assert_eq!(ack, Empty {});
server.abort();
});
});
});
});
}
#[test]
#[serial]
fn stream_camera_waits_for_the_pairing_window_then_plays_with_audio() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests();
let (server, mut cli) = serve_handler(handler).await;
let (video_tx, video_rx) = tokio::sync::mpsc::channel(4);
let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4);
let mut video_response = cli
.stream_camera(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(video_rx),
))
.await
.expect("camera stream should open")
.into_inner();
let mut audio_response = cli
.stream_microphone(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(audio_rx),
))
.await
.expect("microphone stream should open")
.into_inner();
video_tx
.send(VideoPacket {
id: 2,
pts: 1_000_000,
data: vec![0, 0, 0, 1, 0x65, 0x99],
..Default::default()
})
.await
.expect("send video packet");
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
audio_tx
.send(AudioPacket {
id: 0,
pts: 1_000_000,
data: vec![1, 2, 3, 4],
..AudioPacket::default()
})
.await
.expect("send matching audio packet");
drop(video_tx);
drop(audio_tx);
let video_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
video_response.message(),
)
.await
.expect("camera ack timeout")
.expect("camera ack grpc")
.expect("camera ack item");
let audio_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
audio_response.message(),
)
.await
.expect("microphone ack timeout")
.expect("microphone ack grpc")
.expect("microphone ack item");
assert_eq!(video_ack, Empty {});
assert_eq!(audio_ack, Empty {});
server.abort();
});
});
});
});
}
#[test]
#[serial]
fn stream_camera_stops_a_superseded_session_cleanly() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests();
let (server, mut cli) = serve_handler(handler).await;
let (first_tx, first_rx) = tokio::sync::mpsc::channel(4);
let (second_tx, second_rx) = tokio::sync::mpsc::channel(1);
let mut first = cli
.stream_camera(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(first_rx),
))
.await
.expect("first camera stream")
.into_inner();
let _second = cli
.stream_camera(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(second_rx),
))
.await
.expect("second camera stream supersedes first");
first_tx
.send(VideoPacket {
id: 2,
pts: 99,
data: vec![0, 0, 0, 1, 0x65],
..Default::default()
})
.await
.expect("send packet to first stream");
drop(first_tx);
let ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
first.message(),
)
.await
.expect("superseded camera ack timeout")
.expect("superseded camera ack grpc")
.expect("superseded camera ack item");
assert_eq!(ack, Empty {});
drop(second_tx);
server.abort();
});
});
});
});
}
}