impl Handler { async fn capture_video_reply( &self, req: MonitorRequest, ) -> Result, Status> { let id = req.id; if id > 1 { return Err(Status::invalid_argument("monitor id must be 0 or 1")); } let source_id = req.source_id.unwrap_or(id); if source_id > 1 { return Err(Status::invalid_argument("source id must be 0 or 1")); } let dev = if source_id == 0 { "/dev/lesavka_l_eye" } else { "/dev/lesavka_r_eye" }; #[cfg(not(coverage))] { let rpc_id = runtime_support::next_stream_id(); info!( rpc_id, id, source_id, max_bitrate = req.max_bitrate, requested_width = req.requested_width, requested_height = req.requested_height, requested_fps = req.requested_fps, "🎥 capture_video opened" ); debug!(rpc_id, "🎥 streaming {dev}"); } let hub = self .eye_hub( dev, EyeHubKey { source_id, requested_width: req.requested_width, requested_height: req.requested_height, requested_fps: req.requested_fps, }, req.max_bitrate, ) .await?; let hub_rx = hub.tx.subscribe(); hub.subscribers.fetch_add(1, Ordering::AcqRel); let subscribers = Arc::clone(&hub.subscribers); let hub_for_task = Arc::clone(&hub); let (tx, rx) = tokio::sync::mpsc::channel(32); tokio::spawn(forward_eye_hub_packets( id, hub_rx, tx, subscribers, hub_for_task, )); Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) } async fn eye_hub( &self, dev: &str, key: EyeHubKey, max_bitrate_kbit: u32, ) -> Result, Status> { let stale_hubs = { let mut hubs = self.eye_hubs.lock().await; if let Some(hub) = hubs.get(&key) && hub.running.load(Ordering::Relaxed) { return Ok(Arc::clone(hub)); } take_conflicting_eye_hubs(&mut hubs, key) }; #[cfg(not(coverage))] if !stale_hubs.is_empty() { info!( source_id = key.source_id, requested_width = key.requested_width, requested_height = key.requested_height, requested_fps = key.requested_fps, stale_hubs = stale_hubs.len(), "🎥 replacing stale/conflicting eye hubs before opening the source" ); } for hub in stale_hubs { hub.shutdown(); } let lease = self.capture_power.acquire().await; let stream = video::eye_ball_with_request( dev, key.source_id, max_bitrate_kbit, key.requested_width, key.requested_height, key.requested_fps, ) .await .map_err(|e| Status::internal(format!("{e:#}")))?; let hub = EyeHub::spawn(stream, lease); let mut hubs = self.eye_hubs.lock().await; #[cfg(not(coverage))] if let Some(existing) = hubs.get(&key) && existing.running.load(Ordering::Relaxed) { hub.shutdown(); return Ok(Arc::clone(existing)); } hubs.insert(key, Arc::clone(&hub)); Ok(hub) } #[cfg(test)] #[allow(dead_code)] async fn eye_hub_count(&self) -> usize { self.eye_hubs.lock().await.len() } } /// Fan out one shared eye hub into one RPC stream without letting stale frames /// build latency when a receiver falls behind or disconnects. async fn forward_eye_hub_packets( id: u32, mut hub_rx: broadcast::Receiver, tx: tokio::sync::mpsc::Sender>, subscribers: Arc, hub_for_task: Arc, ) { loop { match hub_rx.recv().await { Ok(mut pkt) => { pkt.id = id; if tx.send(Ok(pkt)).await.is_err() { break; } } Err(broadcast::error::RecvError::Lagged(_)) => continue, Err(broadcast::error::RecvError::Closed) => break, } } if subscribers.fetch_sub(1, Ordering::AcqRel) == 1 { hub_for_task.shutdown(); } }