lesavka/testing/tests/server_main_eye_hub_contract.rs

138 lines
5.4 KiB
Rust

//! Eye-hub coverage for shared server video fan-out.
//!
//! Scope: include `server/src/main.rs` and exercise hub fan-out, conflict
//! pruning, and shutdown behavior.
//! Targets: `server/src/main.rs`.
//! Why: eye-feed hubs are latency-sensitive; stale hubs must stop and be
//! replaced without freezing downstream previews.
#[allow(warnings)]
mod server_main_eye_hub {
include!(env!("LESAVKA_SERVER_MAIN_SRC"));
use futures_util::stream;
use temp_env::with_var;
fn with_capture_power_disabled(f: impl FnOnce()) {
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), f);
}
#[test]
fn shared_eye_hub_forwards_inner_packets() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_capture_power_disabled(|| {
rt.block_on(async {
let lease = CapturePowerManager::new().acquire().await;
let packet = VideoPacket {
id: 2,
pts: 42,
data: vec![9, 8, 7],
..Default::default()
};
let (packet_tx, packet_rx) = tokio::sync::mpsc::channel(1);
let hub = EyeHub::spawn(ReceiverStream::new(packet_rx), lease);
hub.subscribers
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let mut rx = hub.tx.subscribe();
packet_tx
.send(Ok(packet.clone()))
.await
.expect("send synthetic packet");
let observed = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
.await
.expect("hub packet timeout")
.expect("hub packet");
assert_eq!(observed.id, packet.id);
assert_eq!(observed.pts, packet.pts);
assert_eq!(observed.data, packet.data);
drop(packet_tx);
for _ in 0..20 {
if !hub.running.load(std::sync::atomic::Ordering::Relaxed) {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
assert!(
!hub.running.load(std::sync::atomic::Ordering::Relaxed),
"hub should stop after the synthetic packet stream closes"
);
});
});
}
#[test]
fn conflicting_eye_hubs_for_the_same_source_are_pruned_before_reopen() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_capture_power_disabled(|| {
rt.block_on(async {
let requested_key = EyeHubKey {
source_id: 1,
requested_width: 1280,
requested_height: 720,
requested_fps: 60,
};
let stale_same_source_key = EyeHubKey {
source_id: 1,
requested_width: 1920,
requested_height: 1080,
requested_fps: 60,
};
let keep_other_source_key = EyeHubKey {
source_id: 0,
requested_width: 1920,
requested_height: 1080,
requested_fps: 60,
};
let stale_same_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
let stopped_other_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
stopped_other_source.shutdown();
let keep_other_source = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
let mut hubs = std::collections::HashMap::new();
hubs.insert(stale_same_source_key, stale_same_source.clone());
hubs.insert(
EyeHubKey {
source_id: 0,
requested_width: 1280,
requested_height: 720,
requested_fps: 60,
},
stopped_other_source,
);
hubs.insert(keep_other_source_key, keep_other_source.clone());
let removed = take_conflicting_eye_hubs(&mut hubs, requested_key);
assert_eq!(removed.len(), 2);
assert!(!hubs.contains_key(&stale_same_source_key));
assert!(hubs.contains_key(&keep_other_source_key));
});
});
}
#[test]
fn eye_hub_shutdown_marks_the_hub_as_not_running() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_capture_power_disabled(|| {
rt.block_on(async {
let hub = EyeHub::spawn(
stream::pending::<Result<VideoPacket, tonic::Status>>(),
CapturePowerManager::new().acquire().await,
);
assert!(hub.running.load(std::sync::atomic::Ordering::Relaxed));
hub.shutdown();
assert!(!hub.running.load(std::sync::atomic::Ordering::Relaxed));
});
});
}
}