#![cfg_attr(coverage, allow(dead_code, unused_imports, unused_variables))] #![forbid(unsafe_code)] use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; use tokio::sync::Mutex; use tonic::Status; use tracing::info; use crate::{camera, uvc_runtime, video}; struct CameraRelaySlot { cfg: camera::CameraConfig, relay: Arc, } /// Manage the currently active camera relay instance. /// /// Inputs: camera configurations requested by incoming RPC streams. /// Outputs: a reusable relay handle plus a monotonically increasing session id. /// Why: only one camera output should own the physical sink at a time, but we /// still want identical stream requests to reuse the existing pipeline. pub struct CameraRuntime { generation: AtomicU64, slot: Mutex>, } impl CameraRuntime { /// Create an empty runtime with no active relay. /// /// Inputs: none. /// Outputs: a fresh runtime with generation zero. /// Why: keeping construction trivial lets the main server handler create /// camera state early and share it across RPCs. #[must_use] pub fn new() -> Self { Self { generation: AtomicU64::new(0), slot: Mutex::new(None), } } /// Activate the relay matching the current configuration. /// /// Inputs: the desired camera configuration selected from the environment. /// Outputs: a session id plus a relay that is either reused or recreated. /// Why: UVC/HDMI sinks are expensive to churn, so identical requests should /// reuse the active pipeline instead of rebuilding it every time. #[cfg(coverage)] pub async fn activate( &self, cfg: &camera::CameraConfig, ) -> Result<(u64, Arc, bool), Status> { let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1; if matches!(cfg.output, camera::CameraOutput::Uvc) && std::env::var("LESAVKA_DISABLE_UVC").is_ok() { return Err(Status::failed_precondition( "UVC output disabled (LESAVKA_DISABLE_UVC set)", )); } Ok((session_id, Arc::new(video::CameraRelay::new_noop(0)), false)) } #[cfg(not(coverage))] pub async fn activate( &self, cfg: &camera::CameraConfig, ) -> Result<(u64, Arc, bool), Status> { let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1; let mut slot = self.slot.lock().await; let mut reused = false; let relay = if let Some(existing) = slot.as_ref() { if camera_cfg_eq(&existing.cfg, cfg) { reused = true; existing.relay.clone() } else { self.make_relay(cfg)? } } else { self.make_relay(cfg)? }; if !reused { *slot = Some(CameraRelaySlot { cfg: cfg.clone(), relay: relay.clone(), }); info!( session_id, output = cfg.output.as_str(), codec = cfg.codec.as_str(), width = cfg.width, height = cfg.height, fps = cfg.fps, "🎥 camera relay (re)created" ); } else { info!(session_id, "🎥 camera relay reused"); } Ok((session_id, relay, reused)) } /// Check whether a previously issued session id is still current. /// /// Inputs: a session id returned by `activate`. /// Outputs: `true` only when the session is still the most recent owner of /// the active camera relay. /// Why: superseded streams must stop writing frames into a sink that has /// already been reconfigured for a newer client session. #[must_use] pub fn is_active(&self, session_id: u64) -> bool { self.generation.load(Ordering::Relaxed) == session_id } /// Supersede the active camera stream and drop the userspace relay sink. /// /// Inputs: none. /// Outputs: none. /// Why: UVC recovery should make the client reconnect and recreate the /// spool/appsrc pipeline without cycling the USB controller while a browser /// may still own the gadget. #[cfg(coverage)] pub async fn soft_recover(&self) { self.generation.fetch_add(1, Ordering::SeqCst); } #[cfg(not(coverage))] pub async fn soft_recover(&self) { self.generation.fetch_add(1, Ordering::SeqCst); let mut slot = self.slot.lock().await; let _dropped = slot.take(); } #[allow(clippy::result_large_err)] #[cfg(not(coverage))] fn make_relay(&self, cfg: &camera::CameraConfig) -> Result, Status> { let relay = match cfg.output { camera::CameraOutput::Uvc => { if std::env::var("LESAVKA_DISABLE_UVC").is_ok() { return Err(Status::failed_precondition( "UVC output disabled (LESAVKA_DISABLE_UVC set)", )); } let uvc = uvc_runtime::pick_uvc_device() .map_err(|e| Status::internal(format!("{e:#}")))?; info!(%uvc, "🎥 stream_camera using UVC sink"); video::CameraRelay::new_uvc(0, &uvc, cfg) .map_err(|e| Status::internal(format!("{e:#}")))? } camera::CameraOutput::Hdmi => video::CameraRelay::new_hdmi(0, cfg) .map_err(|e| Status::internal(format!("{e:#}")))?, }; Ok(Arc::new(relay)) } } impl Default for CameraRuntime { fn default() -> Self { Self::new() } } /// Compare two camera configurations for sink reuse. /// /// Inputs: the currently active camera config and the requested config. /// Outputs: `true` when both configs target the same sink and stream profile. /// Why: reusing a pipeline is only safe when both the transport parameters and /// the HDMI connector identity still match. #[must_use] pub fn camera_cfg_eq(a: &camera::CameraConfig, b: &camera::CameraConfig) -> bool { if a.output != b.output || a.codec != b.codec || a.width != b.width || a.height != b.height || a.fps != b.fps { return false; } if a.output == camera::CameraOutput::Hdmi && a.hdmi_display_size() != b.hdmi_display_size() { return false; } match (&a.hdmi, &b.hdmi) { (Some(left), Some(right)) => left.name == right.name && left.id == right.id, (None, None) => true, _ => false, } } #[cfg(test)] mod tests { use super::camera_cfg_eq; use crate::camera::{CameraCodec, CameraConfig, CameraOutput, HdmiConnector}; #[test] fn camera_cfg_eq_requires_matching_sink_profile() { let base = CameraConfig { output: CameraOutput::Hdmi, codec: CameraCodec::H264, width: 1920, height: 1080, fps: 30, hdmi: Some(HdmiConnector { name: String::from("HDMI-A-1"), id: Some(42), modes: Vec::new(), }), }; let same = base.clone(); assert!(camera_cfg_eq(&base, &same)); let mut changed = base.clone(); changed.fps = 25; assert!(!camera_cfg_eq(&base, &changed)); changed = base.clone(); changed.hdmi = Some(HdmiConnector { name: String::from("HDMI-A-2"), id: Some(42), modes: Vec::new(), }); assert!(!camera_cfg_eq(&base, &changed)); } }