138 lines
5.4 KiB
Rust
138 lines
5.4 KiB
Rust
//! Eye-hub coverage for shared server video fan-out.
|
|
//!
|
|
//! Scope: include `server/src/main.rs` and exercise hub fan-out, conflict
|
|
//! pruning, and shutdown behavior.
|
|
//! Targets: `server/src/main.rs`.
|
|
//! Why: eye-feed hubs are latency-sensitive; stale hubs must stop and be
|
|
//! replaced without freezing downstream previews.
|
|
|
|
#[allow(warnings)]
|
|
mod server_main_eye_hub {
|
|
include!(env!("LESAVKA_SERVER_MAIN_SRC"));
|
|
|
|
use futures_util::stream;
|
|
use temp_env::with_var;
|
|
|
|
fn with_capture_power_disabled(f: impl FnOnce()) {
|
|
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), f);
|
|
}
|
|
|
|
#[test]
|
|
fn shared_eye_hub_forwards_inner_packets() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_capture_power_disabled(|| {
|
|
rt.block_on(async {
|
|
let lease = CapturePowerManager::new().acquire().await;
|
|
let packet = VideoPacket {
|
|
id: 2,
|
|
pts: 42,
|
|
data: vec![9, 8, 7],
|
|
..Default::default()
|
|
};
|
|
let (packet_tx, packet_rx) = tokio::sync::mpsc::channel(1);
|
|
let hub = EyeHub::spawn(ReceiverStream::new(packet_rx), lease);
|
|
hub.subscribers
|
|
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
|
|
let mut rx = hub.tx.subscribe();
|
|
packet_tx
|
|
.send(Ok(packet.clone()))
|
|
.await
|
|
.expect("send synthetic packet");
|
|
let observed = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
|
|
.await
|
|
.expect("hub packet timeout")
|
|
.expect("hub packet");
|
|
assert_eq!(observed.id, packet.id);
|
|
assert_eq!(observed.pts, packet.pts);
|
|
assert_eq!(observed.data, packet.data);
|
|
drop(packet_tx);
|
|
for _ in 0..20 {
|
|
if !hub.running.load(std::sync::atomic::Ordering::Relaxed) {
|
|
return;
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
|
}
|
|
assert!(
|
|
!hub.running.load(std::sync::atomic::Ordering::Relaxed),
|
|
"hub should stop after the synthetic packet stream closes"
|
|
);
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn conflicting_eye_hubs_for_the_same_source_are_pruned_before_reopen() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_capture_power_disabled(|| {
|
|
rt.block_on(async {
|
|
let requested_key = EyeHubKey {
|
|
source_id: 1,
|
|
requested_width: 1280,
|
|
requested_height: 720,
|
|
requested_fps: 60,
|
|
};
|
|
let stale_same_source_key = EyeHubKey {
|
|
source_id: 1,
|
|
requested_width: 1920,
|
|
requested_height: 1080,
|
|
requested_fps: 60,
|
|
};
|
|
let keep_other_source_key = EyeHubKey {
|
|
source_id: 0,
|
|
requested_width: 1920,
|
|
requested_height: 1080,
|
|
requested_fps: 60,
|
|
};
|
|
let stale_same_source = EyeHub::spawn(
|
|
stream::pending::<Result<VideoPacket, tonic::Status>>(),
|
|
CapturePowerManager::new().acquire().await,
|
|
);
|
|
let stopped_other_source = EyeHub::spawn(
|
|
stream::pending::<Result<VideoPacket, tonic::Status>>(),
|
|
CapturePowerManager::new().acquire().await,
|
|
);
|
|
stopped_other_source.shutdown();
|
|
let keep_other_source = EyeHub::spawn(
|
|
stream::pending::<Result<VideoPacket, tonic::Status>>(),
|
|
CapturePowerManager::new().acquire().await,
|
|
);
|
|
|
|
let mut hubs = std::collections::HashMap::new();
|
|
hubs.insert(stale_same_source_key, stale_same_source.clone());
|
|
hubs.insert(
|
|
EyeHubKey {
|
|
source_id: 0,
|
|
requested_width: 1280,
|
|
requested_height: 720,
|
|
requested_fps: 60,
|
|
},
|
|
stopped_other_source,
|
|
);
|
|
hubs.insert(keep_other_source_key, keep_other_source.clone());
|
|
|
|
let removed = take_conflicting_eye_hubs(&mut hubs, requested_key);
|
|
|
|
assert_eq!(removed.len(), 2);
|
|
assert!(!hubs.contains_key(&stale_same_source_key));
|
|
assert!(hubs.contains_key(&keep_other_source_key));
|
|
});
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn eye_hub_shutdown_marks_the_hub_as_not_running() {
|
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
|
with_capture_power_disabled(|| {
|
|
rt.block_on(async {
|
|
let hub = EyeHub::spawn(
|
|
stream::pending::<Result<VideoPacket, tonic::Status>>(),
|
|
CapturePowerManager::new().acquire().await,
|
|
);
|
|
assert!(hub.running.load(std::sync::atomic::Ordering::Relaxed));
|
|
hub.shutdown();
|
|
assert!(!hub.running.load(std::sync::atomic::Ordering::Relaxed));
|
|
});
|
|
});
|
|
}
|
|
}
|