180 lines
5.8 KiB
Rust
180 lines
5.8 KiB
Rust
#![forbid(unsafe_code)]
|
|
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
|
|
use tokio::sync::Mutex;
|
|
use tonic::Status;
|
|
use tracing::info;
|
|
|
|
use crate::{camera, uvc_runtime, video};
|
|
|
|
struct CameraRelaySlot {
|
|
cfg: camera::CameraConfig,
|
|
relay: Arc<video::CameraRelay>,
|
|
}
|
|
|
|
/// Manage the currently active camera relay instance.
|
|
///
|
|
/// Inputs: camera configurations requested by incoming RPC streams.
|
|
/// Outputs: a reusable relay handle plus a monotonically increasing session id.
|
|
/// Why: only one camera output should own the physical sink at a time, but we
|
|
/// still want identical stream requests to reuse the existing pipeline.
|
|
pub struct CameraRuntime {
|
|
generation: AtomicU64,
|
|
slot: Mutex<Option<CameraRelaySlot>>,
|
|
}
|
|
|
|
impl CameraRuntime {
|
|
/// Create an empty runtime with no active relay.
|
|
///
|
|
/// Inputs: none.
|
|
/// Outputs: a fresh runtime with generation zero.
|
|
/// Why: keeping construction trivial lets the main server handler create
|
|
/// camera state early and share it across RPCs.
|
|
#[must_use]
|
|
pub fn new() -> Self {
|
|
Self {
|
|
generation: AtomicU64::new(0),
|
|
slot: Mutex::new(None),
|
|
}
|
|
}
|
|
|
|
/// Activate the relay matching the current configuration.
|
|
///
|
|
/// Inputs: the desired camera configuration selected from the environment.
|
|
/// Outputs: a session id plus a relay that is either reused or recreated.
|
|
/// Why: UVC/HDMI sinks are expensive to churn, so identical requests should
|
|
/// reuse the active pipeline instead of rebuilding it every time.
|
|
pub async fn activate(
|
|
&self,
|
|
cfg: &camera::CameraConfig,
|
|
) -> Result<(u64, Arc<video::CameraRelay>), Status> {
|
|
let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1;
|
|
let mut slot = self.slot.lock().await;
|
|
let mut reused = false;
|
|
|
|
let relay = if let Some(existing) = slot.as_ref() {
|
|
if camera_cfg_eq(&existing.cfg, cfg) {
|
|
reused = true;
|
|
existing.relay.clone()
|
|
} else {
|
|
self.make_relay(cfg)?
|
|
}
|
|
} else {
|
|
self.make_relay(cfg)?
|
|
};
|
|
|
|
if !reused {
|
|
*slot = Some(CameraRelaySlot {
|
|
cfg: cfg.clone(),
|
|
relay: relay.clone(),
|
|
});
|
|
info!(
|
|
session_id,
|
|
output = cfg.output.as_str(),
|
|
codec = cfg.codec.as_str(),
|
|
width = cfg.width,
|
|
height = cfg.height,
|
|
fps = cfg.fps,
|
|
"🎥 camera relay (re)created"
|
|
);
|
|
} else {
|
|
info!(session_id, "🎥 camera relay reused");
|
|
}
|
|
|
|
Ok((session_id, relay))
|
|
}
|
|
|
|
/// Check whether a previously issued session id is still current.
|
|
///
|
|
/// Inputs: a session id returned by `activate`.
|
|
/// Outputs: `true` only when the session is still the most recent owner of
|
|
/// the active camera relay.
|
|
/// Why: superseded streams must stop writing frames into a sink that has
|
|
/// already been reconfigured for a newer client session.
|
|
#[must_use]
|
|
pub fn is_active(&self, session_id: u64) -> bool {
|
|
self.generation.load(Ordering::Relaxed) == session_id
|
|
}
|
|
|
|
fn make_relay(&self, cfg: &camera::CameraConfig) -> Result<Arc<video::CameraRelay>, Status> {
|
|
let relay = match cfg.output {
|
|
camera::CameraOutput::Uvc => {
|
|
if std::env::var("LESAVKA_DISABLE_UVC").is_ok() {
|
|
return Err(Status::failed_precondition(
|
|
"UVC output disabled (LESAVKA_DISABLE_UVC set)",
|
|
));
|
|
}
|
|
let uvc = uvc_runtime::pick_uvc_device()
|
|
.map_err(|e| Status::internal(format!("{e:#}")))?;
|
|
info!(%uvc, "🎥 stream_camera using UVC sink");
|
|
video::CameraRelay::new_uvc(0, &uvc, cfg)
|
|
.map_err(|e| Status::internal(format!("{e:#}")))?
|
|
}
|
|
camera::CameraOutput::Hdmi => video::CameraRelay::new_hdmi(0, cfg)
|
|
.map_err(|e| Status::internal(format!("{e:#}")))?,
|
|
};
|
|
Ok(Arc::new(relay))
|
|
}
|
|
}
|
|
|
|
/// Compare two camera configurations for sink reuse.
|
|
///
|
|
/// Inputs: the currently active camera config and the requested config.
|
|
/// Outputs: `true` when both configs target the same sink and stream profile.
|
|
/// Why: reusing a pipeline is only safe when both the transport parameters and
|
|
/// the HDMI connector identity still match.
|
|
#[must_use]
|
|
pub fn camera_cfg_eq(a: &camera::CameraConfig, b: &camera::CameraConfig) -> bool {
|
|
if a.output != b.output
|
|
|| a.codec != b.codec
|
|
|| a.width != b.width
|
|
|| a.height != b.height
|
|
|| a.fps != b.fps
|
|
{
|
|
return false;
|
|
}
|
|
|
|
match (&a.hdmi, &b.hdmi) {
|
|
(Some(left), Some(right)) => left.name == right.name && left.id == right.id,
|
|
(None, None) => true,
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::camera_cfg_eq;
|
|
use crate::camera::{CameraCodec, CameraConfig, CameraOutput, HdmiConnector};
|
|
|
|
#[test]
|
|
fn camera_cfg_eq_requires_matching_sink_profile() {
|
|
let base = CameraConfig {
|
|
output: CameraOutput::Hdmi,
|
|
codec: CameraCodec::H264,
|
|
width: 1920,
|
|
height: 1080,
|
|
fps: 30,
|
|
hdmi: Some(HdmiConnector {
|
|
name: String::from("HDMI-A-1"),
|
|
id: Some(42),
|
|
}),
|
|
};
|
|
|
|
let same = base.clone();
|
|
assert!(camera_cfg_eq(&base, &same));
|
|
|
|
let mut changed = base.clone();
|
|
changed.fps = 25;
|
|
assert!(!camera_cfg_eq(&base, &changed));
|
|
|
|
changed = base.clone();
|
|
changed.hdmi = Some(HdmiConnector {
|
|
name: String::from("HDMI-A-2"),
|
|
id: Some(42),
|
|
});
|
|
assert!(!camera_cfg_eq(&base, &changed));
|
|
}
|
|
}
|