lesavka/server/src/camera_runtime.rs

212 lines
6.8 KiB
Rust
Raw Normal View History

#![cfg_attr(coverage, allow(dead_code, unused_imports, unused_variables))]
#![forbid(unsafe_code)]
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::Mutex;
use tonic::Status;
use tracing::info;
use crate::{camera, uvc_runtime, video};
struct CameraRelaySlot {
cfg: camera::CameraConfig,
relay: Arc<video::CameraRelay>,
}
/// Manage the currently active camera relay instance.
///
/// Inputs: camera configurations requested by incoming RPC streams.
/// Outputs: a reusable relay handle plus a monotonically increasing session id.
/// Why: only one camera output should own the physical sink at a time, but we
/// still want identical stream requests to reuse the existing pipeline.
pub struct CameraRuntime {
generation: AtomicU64,
slot: Mutex<Option<CameraRelaySlot>>,
}
impl CameraRuntime {
/// Create an empty runtime with no active relay.
///
/// Inputs: none.
/// Outputs: a fresh runtime with generation zero.
/// Why: keeping construction trivial lets the main server handler create
/// camera state early and share it across RPCs.
#[must_use]
pub fn new() -> Self {
Self {
generation: AtomicU64::new(0),
slot: Mutex::new(None),
}
}
/// Activate the relay matching the current configuration.
///
/// Inputs: the desired camera configuration selected from the environment.
/// Outputs: a session id plus a relay that is either reused or recreated.
/// Why: UVC/HDMI sinks are expensive to churn, so identical requests should
/// reuse the active pipeline instead of rebuilding it every time.
#[cfg(coverage)]
pub async fn activate(
&self,
cfg: &camera::CameraConfig,
) -> Result<(u64, Arc<video::CameraRelay>), Status> {
let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1;
if matches!(cfg.output, camera::CameraOutput::Uvc)
&& std::env::var("LESAVKA_DISABLE_UVC").is_ok()
{
return Err(Status::failed_precondition(
"UVC output disabled (LESAVKA_DISABLE_UVC set)",
));
}
Ok((session_id, Arc::new(video::CameraRelay::new_noop(0))))
}
#[cfg(not(coverage))]
pub async fn activate(
&self,
cfg: &camera::CameraConfig,
) -> Result<(u64, Arc<video::CameraRelay>), Status> {
let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1;
let mut slot = self.slot.lock().await;
let mut reused = false;
let relay = if let Some(existing) = slot.as_ref() {
if camera_cfg_eq(&existing.cfg, cfg) {
reused = true;
existing.relay.clone()
} else {
self.make_relay(cfg)?
}
} else {
self.make_relay(cfg)?
};
if !reused {
*slot = Some(CameraRelaySlot {
cfg: cfg.clone(),
relay: relay.clone(),
});
info!(
session_id,
output = cfg.output.as_str(),
codec = cfg.codec.as_str(),
width = cfg.width,
height = cfg.height,
fps = cfg.fps,
"🎥 camera relay (re)created"
);
} else {
info!(session_id, "🎥 camera relay reused");
}
Ok((session_id, relay))
}
/// Check whether a previously issued session id is still current.
///
/// Inputs: a session id returned by `activate`.
/// Outputs: `true` only when the session is still the most recent owner of
/// the active camera relay.
/// Why: superseded streams must stop writing frames into a sink that has
/// already been reconfigured for a newer client session.
#[must_use]
pub fn is_active(&self, session_id: u64) -> bool {
self.generation.load(Ordering::Relaxed) == session_id
}
#[allow(clippy::result_large_err)]
#[cfg(not(coverage))]
fn make_relay(&self, cfg: &camera::CameraConfig) -> Result<Arc<video::CameraRelay>, Status> {
let relay = match cfg.output {
camera::CameraOutput::Uvc => {
if std::env::var("LESAVKA_DISABLE_UVC").is_ok() {
return Err(Status::failed_precondition(
"UVC output disabled (LESAVKA_DISABLE_UVC set)",
));
}
let uvc = uvc_runtime::pick_uvc_device()
.map_err(|e| Status::internal(format!("{e:#}")))?;
info!(%uvc, "🎥 stream_camera using UVC sink");
video::CameraRelay::new_uvc(0, &uvc, cfg)
.map_err(|e| Status::internal(format!("{e:#}")))?
}
camera::CameraOutput::Hdmi => video::CameraRelay::new_hdmi(0, cfg)
.map_err(|e| Status::internal(format!("{e:#}")))?,
};
Ok(Arc::new(relay))
}
}
impl Default for CameraRuntime {
fn default() -> Self {
Self::new()
}
}
/// Compare two camera configurations for sink reuse.
///
/// Inputs: the currently active camera config and the requested config.
/// Outputs: `true` when both configs target the same sink and stream profile.
/// Why: reusing a pipeline is only safe when both the transport parameters and
/// the HDMI connector identity still match.
#[must_use]
pub fn camera_cfg_eq(a: &camera::CameraConfig, b: &camera::CameraConfig) -> bool {
if a.output != b.output
|| a.codec != b.codec
|| a.width != b.width
|| a.height != b.height
|| a.fps != b.fps
{
return false;
}
if a.output == camera::CameraOutput::Hdmi && a.hdmi_display_size() != b.hdmi_display_size() {
return false;
}
match (&a.hdmi, &b.hdmi) {
(Some(left), Some(right)) => left.name == right.name && left.id == right.id,
(None, None) => true,
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::camera_cfg_eq;
use crate::camera::{CameraCodec, CameraConfig, CameraOutput, HdmiConnector};
#[test]
fn camera_cfg_eq_requires_matching_sink_profile() {
let base = CameraConfig {
output: CameraOutput::Hdmi,
codec: CameraCodec::H264,
width: 1920,
height: 1080,
fps: 30,
hdmi: Some(HdmiConnector {
name: String::from("HDMI-A-1"),
id: Some(42),
modes: Vec::new(),
}),
};
let same = base.clone();
assert!(camera_cfg_eq(&base, &same));
let mut changed = base.clone();
changed.fps = 25;
assert!(!camera_cfg_eq(&base, &changed));
changed = base.clone();
changed.hdmi = Some(HdmiConnector {
name: String::from("HDMI-A-2"),
id: Some(42),
modes: Vec::new(),
});
assert!(!camera_cfg_eq(&base, &changed));
}
}