381 lines
16 KiB
Rust
381 lines
16 KiB
Rust
//! End-to-end server coverage for upstream media streams.
|
|
//!
|
|
//! Scope: run a local gRPC server and push synthetic client webcam/mic packets
|
|
//! through the public `StreamCamera` and `StreamMicrophone` RPCs.
|
|
//! Targets: `server/src/main.rs`, `server/src/audio.rs`, `server/src/video_sinks.rs`.
|
|
//! Why: local webcam/mic uplink should stay testable without physical UVC,
|
|
//! HDMI, or ALSA hardware in CI.
|
|
|
|
#[cfg(coverage)]
|
|
#[allow(warnings)]
|
|
mod server_upstream_media {
|
|
include!(env!("LESAVKA_SERVER_MAIN_SRC"));
|
|
|
|
use lesavka_common::lesavka::relay_client::RelayClient;
|
|
use serial_test::serial;
|
|
use temp_env::with_var;
|
|
use tempfile::tempdir;
|
|
use tonic::transport::Channel;
|
|
|
|
async fn connect_with_retry(addr: std::net::SocketAddr) -> Channel {
|
|
let endpoint = tonic::transport::Endpoint::from_shared(format!("http://{addr}"))
|
|
.expect("endpoint")
|
|
.tcp_nodelay(true);
|
|
for _ in 0..40 {
|
|
if let Ok(channel) = endpoint.clone().connect().await {
|
|
return channel;
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
|
|
}
|
|
panic!("failed to connect to local tonic server");
|
|
}
|
|
|
|
fn build_handler_for_tests() -> (tempfile::TempDir, Handler) {
|
|
let dir = tempdir().expect("tempdir");
|
|
let kb_path = dir.path().join("hidg0.bin");
|
|
let ms_path = dir.path().join("hidg1.bin");
|
|
std::fs::write(&kb_path, []).expect("create kb file");
|
|
std::fs::write(&ms_path, []).expect("create ms file");
|
|
|
|
let kb = tokio::fs::File::from_std(
|
|
std::fs::OpenOptions::new()
|
|
.read(true)
|
|
.write(true)
|
|
.open(&kb_path)
|
|
.expect("open kb"),
|
|
);
|
|
let ms = tokio::fs::File::from_std(
|
|
std::fs::OpenOptions::new()
|
|
.read(true)
|
|
.write(true)
|
|
.open(&ms_path)
|
|
.expect("open ms"),
|
|
);
|
|
|
|
(
|
|
dir,
|
|
Handler {
|
|
kb: std::sync::Arc::new(tokio::sync::Mutex::new(Some(kb))),
|
|
ms: std::sync::Arc::new(tokio::sync::Mutex::new(Some(ms))),
|
|
gadget: UsbGadget::new("lesavka"),
|
|
did_cycle: std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
|
|
camera_rt: std::sync::Arc::new(CameraRuntime::new()),
|
|
upstream_media_rt: std::sync::Arc::new(UpstreamMediaRuntime::new()),
|
|
calibration: std::sync::Arc::new(CalibrationStore::load(std::sync::Arc::new(
|
|
UpstreamMediaRuntime::new(),
|
|
))),
|
|
capture_power: CapturePowerManager::new(),
|
|
eye_hubs: std::sync::Arc::new(tokio::sync::Mutex::new(
|
|
std::collections::HashMap::new(),
|
|
)),
|
|
},
|
|
)
|
|
}
|
|
|
|
async fn serve_handler(
|
|
handler: Handler,
|
|
) -> (
|
|
tokio::task::JoinHandle<()>,
|
|
RelayClient<tonic::transport::Channel>,
|
|
) {
|
|
let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("bind");
|
|
let addr = listener.local_addr().expect("addr");
|
|
drop(listener);
|
|
|
|
let server = tokio::spawn(async move {
|
|
let _ = tonic::transport::Server::builder()
|
|
.add_service(RelayServer::new(handler))
|
|
.serve(addr)
|
|
.await;
|
|
});
|
|
let channel = connect_with_retry(addr).await;
|
|
(server, RelayClient::new(channel))
|
|
}
|
|
|
|
#[test]
|
|
#[serial]
|
|
fn stream_microphone_accepts_upstream_audio_packets() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
|
|
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
|
|
rt.block_on(async {
|
|
let (_dir, handler) = build_handler_for_tests();
|
|
let (server, mut cli) = serve_handler(handler).await;
|
|
let (tx, rx) = tokio::sync::mpsc::channel(4);
|
|
|
|
tx.send(AudioPacket {
|
|
id: 0,
|
|
pts: 12_345,
|
|
data: vec![1, 2, 3, 4, 5, 6],
|
|
..AudioPacket::default()
|
|
})
|
|
.await
|
|
.expect("send synthetic upstream audio");
|
|
drop(tx);
|
|
|
|
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
|
|
let mut response = cli
|
|
.stream_microphone(tonic::Request::new(outbound))
|
|
.await
|
|
.expect("microphone stream should open");
|
|
let ack = tokio::time::timeout(
|
|
std::time::Duration::from_secs(1),
|
|
response.get_mut().message(),
|
|
)
|
|
.await
|
|
.expect("microphone ack timeout")
|
|
.expect("microphone ack grpc")
|
|
.expect("microphone ack item");
|
|
assert_eq!(ack, Empty {});
|
|
|
|
server.abort();
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
#[serial]
|
|
fn stream_microphone_supersedes_the_previous_owner_cleanly() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
|
|
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
|
|
rt.block_on(async {
|
|
let (_dir, handler) = build_handler_for_tests();
|
|
let (server, mut cli) = serve_handler(handler).await;
|
|
let (first_tx, first_rx) = tokio::sync::mpsc::channel(1);
|
|
let (_second_tx, second_rx) = tokio::sync::mpsc::channel(1);
|
|
|
|
let mut first = cli
|
|
.stream_microphone(tonic::Request::new(
|
|
tokio_stream::wrappers::ReceiverStream::new(first_rx),
|
|
))
|
|
.await
|
|
.expect("first microphone stream")
|
|
.into_inner();
|
|
|
|
let _second = cli
|
|
.stream_microphone(tonic::Request::new(
|
|
tokio_stream::wrappers::ReceiverStream::new(second_rx),
|
|
))
|
|
.await
|
|
.expect("second microphone stream supersedes first");
|
|
|
|
drop(first_tx);
|
|
let ack =
|
|
tokio::time::timeout(std::time::Duration::from_secs(1), first.message())
|
|
.await
|
|
.expect("superseded microphone ack timeout")
|
|
.expect("superseded microphone ack grpc")
|
|
.expect("superseded microphone ack item");
|
|
assert_eq!(ack, Empty {});
|
|
|
|
server.abort();
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
#[serial]
|
|
fn stream_microphone_surfaces_internal_error_when_sink_open_fails() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
|
|
with_var("LESAVKA_TEST_FORCE_PIPELINE_START_ERROR", Some("1"), || {
|
|
rt.block_on(async {
|
|
let (_dir, handler) = build_handler_for_tests();
|
|
let (server, mut cli) = serve_handler(handler).await;
|
|
let (_tx, rx) = tokio::sync::mpsc::channel(1);
|
|
|
|
let err = cli
|
|
.stream_microphone(tonic::Request::new(
|
|
tokio_stream::wrappers::ReceiverStream::new(rx),
|
|
))
|
|
.await
|
|
.expect_err("missing sink should fail the stream");
|
|
assert_eq!(err.code(), tonic::Code::Internal);
|
|
|
|
server.abort();
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
#[serial]
|
|
fn stream_camera_accepts_upstream_video_packets() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
|
|
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
|
|
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
|
|
rt.block_on(async {
|
|
let (_dir, handler) = build_handler_for_tests();
|
|
let (server, mut cli) = serve_handler(handler).await;
|
|
let (tx, rx) = tokio::sync::mpsc::channel(4);
|
|
|
|
tx.send(VideoPacket {
|
|
id: 2,
|
|
pts: 54_321,
|
|
data: vec![0, 0, 0, 1, 0x65, 0x88],
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.expect("send synthetic upstream video");
|
|
drop(tx);
|
|
|
|
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
|
|
let mut response = cli
|
|
.stream_camera(tonic::Request::new(outbound))
|
|
.await
|
|
.expect("camera stream should open");
|
|
let ack = tokio::time::timeout(
|
|
std::time::Duration::from_secs(1),
|
|
response.get_mut().message(),
|
|
)
|
|
.await
|
|
.expect("camera ack timeout")
|
|
.expect("camera ack grpc")
|
|
.expect("camera ack item");
|
|
assert_eq!(ack, Empty {});
|
|
|
|
server.abort();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
#[serial]
|
|
fn stream_camera_waits_for_the_pairing_window_then_plays_with_audio() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
|
|
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
|
|
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || {
|
|
rt.block_on(async {
|
|
let (_dir, handler) = build_handler_for_tests();
|
|
let (server, mut cli) = serve_handler(handler).await;
|
|
let (video_tx, video_rx) = tokio::sync::mpsc::channel(4);
|
|
let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4);
|
|
|
|
let mut video_response = cli
|
|
.stream_camera(tonic::Request::new(
|
|
tokio_stream::wrappers::ReceiverStream::new(video_rx),
|
|
))
|
|
.await
|
|
.expect("camera stream should open")
|
|
.into_inner();
|
|
let mut audio_response = cli
|
|
.stream_microphone(tonic::Request::new(
|
|
tokio_stream::wrappers::ReceiverStream::new(audio_rx),
|
|
))
|
|
.await
|
|
.expect("microphone stream should open")
|
|
.into_inner();
|
|
|
|
video_tx
|
|
.send(VideoPacket {
|
|
id: 2,
|
|
pts: 1_000_000,
|
|
data: vec![0, 0, 0, 1, 0x65, 0x99],
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.expect("send video packet");
|
|
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
|
|
audio_tx
|
|
.send(AudioPacket {
|
|
id: 0,
|
|
pts: 1_000_000,
|
|
data: vec![1, 2, 3, 4],
|
|
..AudioPacket::default()
|
|
})
|
|
.await
|
|
.expect("send matching audio packet");
|
|
drop(video_tx);
|
|
drop(audio_tx);
|
|
|
|
let video_ack = tokio::time::timeout(
|
|
std::time::Duration::from_secs(1),
|
|
video_response.message(),
|
|
)
|
|
.await
|
|
.expect("camera ack timeout")
|
|
.expect("camera ack grpc")
|
|
.expect("camera ack item");
|
|
let audio_ack = tokio::time::timeout(
|
|
std::time::Duration::from_secs(1),
|
|
audio_response.message(),
|
|
)
|
|
.await
|
|
.expect("microphone ack timeout")
|
|
.expect("microphone ack grpc")
|
|
.expect("microphone ack item");
|
|
assert_eq!(video_ack, Empty {});
|
|
assert_eq!(audio_ack, Empty {});
|
|
|
|
server.abort();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
#[serial]
|
|
fn stream_camera_stops_a_superseded_session_cleanly() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
|
|
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
|
|
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
|
|
rt.block_on(async {
|
|
let (_dir, handler) = build_handler_for_tests();
|
|
let (server, mut cli) = serve_handler(handler).await;
|
|
let (first_tx, first_rx) = tokio::sync::mpsc::channel(4);
|
|
let (second_tx, second_rx) = tokio::sync::mpsc::channel(1);
|
|
|
|
let mut first = cli
|
|
.stream_camera(tonic::Request::new(
|
|
tokio_stream::wrappers::ReceiverStream::new(first_rx),
|
|
))
|
|
.await
|
|
.expect("first camera stream")
|
|
.into_inner();
|
|
|
|
let _second = cli
|
|
.stream_camera(tonic::Request::new(
|
|
tokio_stream::wrappers::ReceiverStream::new(second_rx),
|
|
))
|
|
.await
|
|
.expect("second camera stream supersedes first");
|
|
|
|
first_tx
|
|
.send(VideoPacket {
|
|
id: 2,
|
|
pts: 99,
|
|
data: vec![0, 0, 0, 1, 0x65],
|
|
..Default::default()
|
|
})
|
|
.await
|
|
.expect("send packet to first stream");
|
|
drop(first_tx);
|
|
|
|
let ack = tokio::time::timeout(
|
|
std::time::Duration::from_secs(1),
|
|
first.message(),
|
|
)
|
|
.await
|
|
.expect("superseded camera ack timeout")
|
|
.expect("superseded camera ack grpc")
|
|
.expect("superseded camera ack item");
|
|
assert_eq!(ack, Empty {});
|
|
drop(second_tx);
|
|
|
|
server.abort();
|
|
});
|
|
});
|
|
});
|
|
});
|
|
}
|
|
}
|