lesavka: release source hub switches cleanly
This commit is contained in:
parent
ddbbf6b036
commit
e0a472b714
@ -4,7 +4,7 @@ path = "src/main.rs"
|
|||||||
|
|
||||||
[package]
|
[package]
|
||||||
name = "lesavka_client"
|
name = "lesavka_client"
|
||||||
version = "0.11.14"
|
version = "0.11.15"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|||||||
@ -565,6 +565,17 @@ impl LauncherState {
|
|||||||
.map(|source_id| self.capture_size_choice(source_id))
|
.map(|source_id| self.capture_size_choice(source_id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn effective_preview_source_size(&self, monitor_id: usize) -> PreviewSourceSize {
|
||||||
|
let capture = self
|
||||||
|
.display_capture_size_choice(monitor_id)
|
||||||
|
.unwrap_or_else(|| self.capture_size_choice(monitor_id));
|
||||||
|
PreviewSourceSize {
|
||||||
|
width: capture.width.max(1) as u32,
|
||||||
|
height: capture.height.max(1) as u32,
|
||||||
|
fps: capture.fps.max(1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn capture_size_options(&self) -> Vec<CaptureSizeChoice> {
|
pub fn capture_size_options(&self) -> Vec<CaptureSizeChoice> {
|
||||||
capture_size_options(self.preview_source)
|
capture_size_options(self.preview_source)
|
||||||
}
|
}
|
||||||
@ -594,16 +605,16 @@ impl LauncherState {
|
|||||||
breakout_size_choice(
|
breakout_size_choice(
|
||||||
self.breakout_limit,
|
self.breakout_limit,
|
||||||
self.breakout_display,
|
self.breakout_display,
|
||||||
self.preview_source,
|
self.effective_preview_source_size(monitor_id),
|
||||||
self.breakout_size_preset(monitor_id),
|
self.breakout_size_preset(monitor_id),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn breakout_size_options(&self) -> Vec<BreakoutSizeChoice> {
|
pub fn breakout_size_options(&self, monitor_id: usize) -> Vec<BreakoutSizeChoice> {
|
||||||
breakout_size_options(
|
breakout_size_options(
|
||||||
self.breakout_limit,
|
self.breakout_limit,
|
||||||
self.breakout_display,
|
self.breakout_display,
|
||||||
self.preview_source,
|
self.effective_preview_source_size(monitor_id),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1008,6 +1019,23 @@ mod tests {
|
|||||||
assert!(state.display_capture_size_choice(0).is_none());
|
assert!(state.display_capture_size_choice(0).is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn mirrored_panes_use_their_effective_source_size_for_breakout_source_labels() {
|
||||||
|
let mut state = LauncherState::new();
|
||||||
|
state.set_capture_size_preset(1, CaptureSizePreset::P720);
|
||||||
|
state.set_feed_source_preset(0, FeedSourcePreset::OtherEye);
|
||||||
|
|
||||||
|
let mirrored_source = state.effective_preview_source_size(0);
|
||||||
|
assert_eq!(mirrored_source.width, 1280);
|
||||||
|
assert_eq!(mirrored_source.height, 720);
|
||||||
|
assert_eq!(mirrored_source.fps, 60);
|
||||||
|
|
||||||
|
let mirrored_breakout = state.breakout_size_choice(0);
|
||||||
|
assert_eq!(mirrored_breakout.preset, BreakoutSizePreset::Source);
|
||||||
|
assert_eq!(mirrored_breakout.width, 1280);
|
||||||
|
assert_eq!(mirrored_breakout.height, 720);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn selecting_auto_or_blank_clears_explicit_device() {
|
fn selecting_auto_or_blank_clears_explicit_device() {
|
||||||
let mut state = LauncherState::new();
|
let mut state = LauncherState::new();
|
||||||
@ -1127,9 +1155,14 @@ mod tests {
|
|||||||
assert_eq!(compact_capture.fps, 60);
|
assert_eq!(compact_capture.fps, 60);
|
||||||
assert_eq!(compact_capture.max_bitrate_kbit, 2_500);
|
assert_eq!(compact_capture.max_bitrate_kbit, 2_500);
|
||||||
|
|
||||||
|
let effective_source = state.effective_preview_source_size(0);
|
||||||
|
assert_eq!(effective_source.width, 720);
|
||||||
|
assert_eq!(effective_source.height, 480);
|
||||||
|
assert_eq!(effective_source.fps, 60);
|
||||||
|
|
||||||
let display = state.breakout_size_choice(0);
|
let display = state.breakout_size_choice(0);
|
||||||
assert_eq!(display.width, 1920);
|
assert_eq!(display.width, 852);
|
||||||
assert_eq!(display.height, 1080);
|
assert_eq!(display.height, 480);
|
||||||
|
|
||||||
state.set_breakout_size_preset(0, BreakoutSizePreset::P360);
|
state.set_breakout_size_preset(0, BreakoutSizePreset::P360);
|
||||||
let smaller = state.breakout_size_choice(0);
|
let smaller = state.breakout_size_choice(0);
|
||||||
@ -1149,12 +1182,12 @@ mod tests {
|
|||||||
assert_eq!(capture_options[0].fps, 60);
|
assert_eq!(capture_options[0].fps, 60);
|
||||||
assert_eq!(capture_options[0].max_bitrate_kbit, 18_000);
|
assert_eq!(capture_options[0].max_bitrate_kbit, 18_000);
|
||||||
|
|
||||||
let breakout_options = state.breakout_size_options();
|
let breakout_options = state.breakout_size_options(0);
|
||||||
assert!(breakout_options.len() >= 5);
|
assert!(breakout_options.len() >= 5);
|
||||||
assert!(breakout_options.iter().any(|choice| {
|
assert!(breakout_options.iter().any(|choice| {
|
||||||
choice.preset == BreakoutSizePreset::Source
|
choice.preset == BreakoutSizePreset::Source
|
||||||
&& choice.width == 1920
|
&& choice.width == 852
|
||||||
&& choice.height == 1080
|
&& choice.height == 480
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -256,7 +256,7 @@ fn refresh_eye_feed_controls(
|
|||||||
}
|
}
|
||||||
super::ui_components::sync_breakout_size_combo(
|
super::ui_components::sync_breakout_size_combo(
|
||||||
&widgets.display_panes[monitor_id].breakout_combo,
|
&widgets.display_panes[monitor_id].breakout_combo,
|
||||||
state.breakout_size_options(),
|
state.breakout_size_options(monitor_id),
|
||||||
state.breakout_size_preset(monitor_id),
|
state.breakout_size_preset(monitor_id),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -639,12 +639,12 @@ pub fn build_launcher_view(
|
|||||||
}
|
}
|
||||||
sync_breakout_size_combo(
|
sync_breakout_size_combo(
|
||||||
&left_pane.breakout_combo,
|
&left_pane.breakout_combo,
|
||||||
state.breakout_size_options(),
|
state.breakout_size_options(0),
|
||||||
state.breakout_size_preset(0),
|
state.breakout_size_preset(0),
|
||||||
);
|
);
|
||||||
sync_breakout_size_combo(
|
sync_breakout_size_combo(
|
||||||
&right_pane.breakout_combo,
|
&right_pane.breakout_combo,
|
||||||
state.breakout_size_options(),
|
state.breakout_size_options(1),
|
||||||
state.breakout_size_preset(1),
|
state.breakout_size_preset(1),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "lesavka_common"
|
name = "lesavka_common"
|
||||||
version = "0.11.14"
|
version = "0.11.15"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
build = "build.rs"
|
build = "build.rs"
|
||||||
|
|
||||||
|
|||||||
@ -17,6 +17,6 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn banner_includes_version() {
|
fn banner_includes_version() {
|
||||||
assert_eq!(banner("0.11.14"), "lesavka-common CLI (v0.11.14)");
|
assert_eq!(banner("0.11.15"), "lesavka-common CLI (v0.11.15)");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,7 +10,7 @@ bench = false
|
|||||||
|
|
||||||
[package]
|
[package]
|
||||||
name = "lesavka_server"
|
name = "lesavka_server"
|
||||||
version = "0.11.14"
|
version = "0.11.15"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
autobins = false
|
autobins = false
|
||||||
|
|
||||||
|
|||||||
@ -68,6 +68,7 @@ struct EyeHub {
|
|||||||
tx: broadcast::Sender<VideoPacket>,
|
tx: broadcast::Sender<VideoPacket>,
|
||||||
running: Arc<AtomicBool>,
|
running: Arc<AtomicBool>,
|
||||||
subscribers: Arc<AtomicUsize>,
|
subscribers: Arc<AtomicUsize>,
|
||||||
|
abort: tokio::task::AbortHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handler {
|
impl Handler {
|
||||||
@ -160,6 +161,7 @@ impl Handler {
|
|||||||
let mut hub_rx = hub.tx.subscribe();
|
let mut hub_rx = hub.tx.subscribe();
|
||||||
hub.subscribers.fetch_add(1, Ordering::AcqRel);
|
hub.subscribers.fetch_add(1, Ordering::AcqRel);
|
||||||
let subscribers = Arc::clone(&hub.subscribers);
|
let subscribers = Arc::clone(&hub.subscribers);
|
||||||
|
let hub_for_task = Arc::clone(&hub);
|
||||||
let (tx, rx) = tokio::sync::mpsc::channel(32);
|
let (tx, rx) = tokio::sync::mpsc::channel(32);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
@ -174,7 +176,9 @@ impl Handler {
|
|||||||
Err(broadcast::error::RecvError::Closed) => break,
|
Err(broadcast::error::RecvError::Closed) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
subscribers.fetch_sub(1, Ordering::AcqRel);
|
if subscribers.fetch_sub(1, Ordering::AcqRel) == 1 {
|
||||||
|
hub_for_task.shutdown();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
|
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
|
||||||
@ -186,11 +190,27 @@ impl Handler {
|
|||||||
key: EyeHubKey,
|
key: EyeHubKey,
|
||||||
max_bitrate_kbit: u32,
|
max_bitrate_kbit: u32,
|
||||||
) -> Result<Arc<EyeHub>, Status> {
|
) -> Result<Arc<EyeHub>, Status> {
|
||||||
let mut hubs = self.eye_hubs.lock().await;
|
let stale_hubs = {
|
||||||
if let Some(hub) = hubs.get(&key)
|
let mut hubs = self.eye_hubs.lock().await;
|
||||||
&& hub.running.load(Ordering::Relaxed)
|
if let Some(hub) = hubs.get(&key)
|
||||||
{
|
&& hub.running.load(Ordering::Relaxed)
|
||||||
return Ok(Arc::clone(hub));
|
{
|
||||||
|
return Ok(Arc::clone(hub));
|
||||||
|
}
|
||||||
|
take_conflicting_eye_hubs(&mut hubs, key)
|
||||||
|
};
|
||||||
|
if !stale_hubs.is_empty() {
|
||||||
|
info!(
|
||||||
|
source_id = key.source_id,
|
||||||
|
requested_width = key.requested_width,
|
||||||
|
requested_height = key.requested_height,
|
||||||
|
requested_fps = key.requested_fps,
|
||||||
|
stale_hubs = stale_hubs.len(),
|
||||||
|
"🎥 replacing stale/conflicting eye hubs before opening the source"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
for hub in stale_hubs {
|
||||||
|
hub.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
let lease = self.capture_power.acquire().await;
|
let lease = self.capture_power.acquire().await;
|
||||||
@ -206,6 +226,13 @@ impl Handler {
|
|||||||
.map_err(|e| Status::internal(format!("{e:#}")))?;
|
.map_err(|e| Status::internal(format!("{e:#}")))?;
|
||||||
|
|
||||||
let hub = EyeHub::spawn(stream, lease);
|
let hub = EyeHub::spawn(stream, lease);
|
||||||
|
let mut hubs = self.eye_hubs.lock().await;
|
||||||
|
if let Some(existing) = hubs.get(&key)
|
||||||
|
&& existing.running.load(Ordering::Relaxed)
|
||||||
|
{
|
||||||
|
hub.shutdown();
|
||||||
|
return Ok(Arc::clone(existing));
|
||||||
|
}
|
||||||
hubs.insert(key, Arc::clone(&hub));
|
hubs.insert(key, Arc::clone(&hub));
|
||||||
Ok(hub)
|
Ok(hub)
|
||||||
}
|
}
|
||||||
@ -289,20 +316,17 @@ impl EyeHub {
|
|||||||
let (tx, _) = broadcast::channel(32);
|
let (tx, _) = broadcast::channel(32);
|
||||||
let running = Arc::new(AtomicBool::new(true));
|
let running = Arc::new(AtomicBool::new(true));
|
||||||
let subscribers = Arc::new(AtomicUsize::new(0));
|
let subscribers = Arc::new(AtomicUsize::new(0));
|
||||||
let hub = Arc::new(Self {
|
let tx_for_task = tx.clone();
|
||||||
tx: tx.clone(),
|
let running_for_task = Arc::clone(&running);
|
||||||
running: Arc::clone(&running),
|
let subscribers_for_task = Arc::clone(&subscribers);
|
||||||
subscribers: Arc::clone(&subscribers),
|
let task = tokio::spawn(async move {
|
||||||
});
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let _lease = lease;
|
let _lease = lease;
|
||||||
let mut idle_ticks = 0_u32;
|
let mut idle_ticks = 0_u32;
|
||||||
while running.load(Ordering::Relaxed) {
|
while running_for_task.load(Ordering::Relaxed) {
|
||||||
match stream.next().await {
|
match stream.next().await {
|
||||||
Some(Ok(pkt)) => {
|
Some(Ok(pkt)) => {
|
||||||
let _ = tx.send(pkt);
|
let _ = tx_for_task.send(pkt);
|
||||||
if subscribers.load(Ordering::Relaxed) == 0 {
|
if subscribers_for_task.load(Ordering::Relaxed) == 0 {
|
||||||
idle_ticks = idle_ticks.saturating_add(1);
|
idle_ticks = idle_ticks.saturating_add(1);
|
||||||
if idle_ticks >= 60 {
|
if idle_ticks >= 60 {
|
||||||
break;
|
break;
|
||||||
@ -318,11 +342,47 @@ impl EyeHub {
|
|||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
running.store(false, Ordering::Relaxed);
|
running_for_task.store(false, Ordering::Relaxed);
|
||||||
|
});
|
||||||
|
let hub = Arc::new(Self {
|
||||||
|
tx: tx.clone(),
|
||||||
|
running: Arc::clone(&running),
|
||||||
|
subscribers: Arc::clone(&subscribers),
|
||||||
|
abort: task.abort_handle(),
|
||||||
});
|
});
|
||||||
|
|
||||||
hub
|
hub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn shutdown(&self) {
|
||||||
|
if self.running.swap(false, Ordering::AcqRel) {
|
||||||
|
self.abort.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn take_conflicting_eye_hubs(
|
||||||
|
hubs: &mut HashMap<EyeHubKey, Arc<EyeHub>>,
|
||||||
|
key: EyeHubKey,
|
||||||
|
) -> Vec<Arc<EyeHub>> {
|
||||||
|
let stale_keys: Vec<_> = hubs
|
||||||
|
.iter()
|
||||||
|
.filter_map(|(existing_key, hub)| {
|
||||||
|
let running = hub.running.load(Ordering::Relaxed);
|
||||||
|
let conflicting_source =
|
||||||
|
existing_key.source_id == key.source_id && *existing_key != key;
|
||||||
|
if !running || conflicting_source {
|
||||||
|
Some(*existing_key)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
stale_keys
|
||||||
|
.into_iter()
|
||||||
|
.filter_map(|existing_key| hubs.remove(&existing_key))
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
/*──────────────── gRPC service ─────────────*/
|
/*──────────────── gRPC service ─────────────*/
|
||||||
|
|||||||
@ -497,4 +497,75 @@ mod server_main_binary_extra {
|
|||||||
assert_eq!(observed.data, packet.data);
|
assert_eq!(observed.data, packet.data);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn conflicting_eye_hubs_for_the_same_source_are_pruned_before_reopen() {
|
||||||
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
||||||
|
rt.block_on(async {
|
||||||
|
let requested_key = EyeHubKey {
|
||||||
|
source_id: 1,
|
||||||
|
requested_width: 1280,
|
||||||
|
requested_height: 720,
|
||||||
|
requested_fps: 60,
|
||||||
|
};
|
||||||
|
let stale_same_source_key = EyeHubKey {
|
||||||
|
source_id: 1,
|
||||||
|
requested_width: 1920,
|
||||||
|
requested_height: 1080,
|
||||||
|
requested_fps: 60,
|
||||||
|
};
|
||||||
|
let keep_other_source_key = EyeHubKey {
|
||||||
|
source_id: 0,
|
||||||
|
requested_width: 1920,
|
||||||
|
requested_height: 1080,
|
||||||
|
requested_fps: 60,
|
||||||
|
};
|
||||||
|
let stale_same_source = EyeHub::spawn(
|
||||||
|
stream::pending::<Result<VideoPacket, tonic::Status>>(),
|
||||||
|
CapturePowerManager::new().acquire().await,
|
||||||
|
);
|
||||||
|
let stopped_other_source = EyeHub::spawn(
|
||||||
|
stream::pending::<Result<VideoPacket, tonic::Status>>(),
|
||||||
|
CapturePowerManager::new().acquire().await,
|
||||||
|
);
|
||||||
|
stopped_other_source.shutdown();
|
||||||
|
let keep_other_source = EyeHub::spawn(
|
||||||
|
stream::pending::<Result<VideoPacket, tonic::Status>>(),
|
||||||
|
CapturePowerManager::new().acquire().await,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut hubs = std::collections::HashMap::new();
|
||||||
|
hubs.insert(stale_same_source_key, stale_same_source.clone());
|
||||||
|
hubs.insert(
|
||||||
|
EyeHubKey {
|
||||||
|
source_id: 0,
|
||||||
|
requested_width: 1280,
|
||||||
|
requested_height: 720,
|
||||||
|
requested_fps: 60,
|
||||||
|
},
|
||||||
|
stopped_other_source,
|
||||||
|
);
|
||||||
|
hubs.insert(keep_other_source_key, keep_other_source.clone());
|
||||||
|
|
||||||
|
let removed = take_conflicting_eye_hubs(&mut hubs, requested_key);
|
||||||
|
|
||||||
|
assert_eq!(removed.len(), 2);
|
||||||
|
assert!(!hubs.contains_key(&stale_same_source_key));
|
||||||
|
assert!(hubs.contains_key(&keep_other_source_key));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn eye_hub_shutdown_marks_the_hub_as_not_running() {
|
||||||
|
let rt = tokio::runtime::Runtime::new().expect("runtime");
|
||||||
|
rt.block_on(async {
|
||||||
|
let hub = EyeHub::spawn(
|
||||||
|
stream::pending::<Result<VideoPacket, tonic::Status>>(),
|
||||||
|
CapturePowerManager::new().acquire().await,
|
||||||
|
);
|
||||||
|
assert!(hub.running.load(std::sync::atomic::Ordering::Relaxed));
|
||||||
|
hub.shutdown();
|
||||||
|
assert!(!hub.running.load(std::sync::atomic::Ordering::Relaxed));
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user