fix(sync): warm cold camera relays before pairing

This commit is contained in:
Brad Stein 2026-04-26 16:45:25 -03:00
parent fb323cb5cc
commit a87577a042
7 changed files with 56 additions and 12 deletions

6
Cargo.lock generated
View File

@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.14.6" version = "0.14.7"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1676,7 +1676,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.14.6" version = "0.14.7"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1688,7 +1688,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.14.6" version = "0.14.7"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.14.6" version = "0.14.7"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.14.6" version = "0.14.7"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.14.6" version = "0.14.7"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -51,7 +51,7 @@ impl CameraRuntime {
pub async fn activate( pub async fn activate(
&self, &self,
cfg: &camera::CameraConfig, cfg: &camera::CameraConfig,
) -> Result<(u64, Arc<video::CameraRelay>), Status> { ) -> Result<(u64, Arc<video::CameraRelay>, bool), Status> {
let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1; let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1;
if matches!(cfg.output, camera::CameraOutput::Uvc) if matches!(cfg.output, camera::CameraOutput::Uvc)
&& std::env::var("LESAVKA_DISABLE_UVC").is_ok() && std::env::var("LESAVKA_DISABLE_UVC").is_ok()
@ -60,14 +60,14 @@ impl CameraRuntime {
"UVC output disabled (LESAVKA_DISABLE_UVC set)", "UVC output disabled (LESAVKA_DISABLE_UVC set)",
)); ));
} }
Ok((session_id, Arc::new(video::CameraRelay::new_noop(0)))) Ok((session_id, Arc::new(video::CameraRelay::new_noop(0)), false))
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
pub async fn activate( pub async fn activate(
&self, &self,
cfg: &camera::CameraConfig, cfg: &camera::CameraConfig,
) -> Result<(u64, Arc<video::CameraRelay>), Status> { ) -> Result<(u64, Arc<video::CameraRelay>, bool), Status> {
let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1; let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1;
let mut slot = self.slot.lock().await; let mut slot = self.slot.lock().await;
let mut reused = false; let mut reused = false;
@ -101,7 +101,7 @@ impl CameraRuntime {
info!(session_id, "🎥 camera relay reused"); info!(session_id, "🎥 camera relay reused");
} }
Ok((session_id, relay)) Ok((session_id, relay, reused))
} }
/// Check whether a previously issued session id is still current. /// Check whether a previously issued session id is still current.

View File

@ -8,6 +8,15 @@ fn upstream_stale_drop_budget() -> Duration {
Duration::from_millis(drop_ms) Duration::from_millis(drop_ms)
} }
#[cfg(not(coverage))]
fn upstream_camera_startup_grace() -> Duration {
let grace_ms = std::env::var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(250);
Duration::from_millis(grace_ms)
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn retain_freshest_video_packet( fn retain_freshest_video_packet(
pending: &mut std::collections::VecDeque<VideoPacket>, pending: &mut std::collections::VecDeque<VideoPacket>,
@ -265,7 +274,7 @@ impl Relay for Handler {
); );
let upstream_lease = self.upstream_media_rt.activate_camera(); let upstream_lease = self.upstream_media_rt.activate_camera();
let (session_id, relay) = self.camera_rt.activate(&cfg).await?; let (session_id, relay, relay_reused) = self.camera_rt.activate(&cfg).await?;
let camera_rt = self.camera_rt.clone(); let camera_rt = self.camera_rt.clone();
let upstream_media_rt = self.upstream_media_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone();
info!(rpc_id, session_id, "🎥 stream_camera opened"); info!(rpc_id, session_id, "🎥 stream_camera opened");
@ -280,6 +289,10 @@ impl Relay for Handler {
let mut inbound_closed = false; let mut inbound_closed = false;
let stale_drop_budget = upstream_stale_drop_budget(); let stale_drop_budget = upstream_stale_drop_budget();
let mut startup_video_settled = false; let mut startup_video_settled = false;
let startup_grace_us = upstream_camera_startup_grace()
.as_micros()
.min(u64::MAX as u128) as u64;
let mut cold_startup_grace_pending = !relay_reused && startup_grace_us > 0;
loop { loop {
if !camera_rt.is_active(session_id) if !camera_rt.is_active(session_id)
|| !upstream_media_rt.is_camera_active(upstream_lease.generation) || !upstream_media_rt.is_camera_active(upstream_lease.generation)
@ -316,6 +329,19 @@ impl Relay for Handler {
} }
continue; continue;
}; };
if cold_startup_grace_pending && pkt.pts < startup_grace_us {
let coalesced = retain_freshest_video_packet(&mut pending);
tracing::debug!(
rpc_id,
session_id,
pts = pkt.pts,
startup_grace_us,
dropped_pending = coalesced,
"🎥 dropping cold-start upstream video until the relay warm-up grace is spent"
);
continue;
}
cold_startup_grace_pending = false;
let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) { let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => {
pending.push_front(pkt); pending.push_front(pkt);

View File

@ -7,6 +7,15 @@ fn upstream_stale_drop_budget() -> Duration {
Duration::from_millis(drop_ms) Duration::from_millis(drop_ms)
} }
#[cfg(coverage)]
fn upstream_camera_startup_grace() -> Duration {
let grace_ms = std::env::var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(250);
Duration::from_millis(grace_ms)
}
#[cfg(coverage)] #[cfg(coverage)]
fn retain_freshest_video_packet( fn retain_freshest_video_packet(
pending: &mut std::collections::VecDeque<VideoPacket>, pending: &mut std::collections::VecDeque<VideoPacket>,
@ -164,7 +173,7 @@ impl Relay for Handler {
) -> Result<Response<Self::StreamCameraStream>, Status> { ) -> Result<Response<Self::StreamCameraStream>, Status> {
let cfg = camera::current_camera_config(); let cfg = camera::current_camera_config();
let upstream_lease = self.upstream_media_rt.activate_camera(); let upstream_lease = self.upstream_media_rt.activate_camera();
let (session_id, relay) = self.camera_rt.activate(&cfg).await?; let (session_id, relay, relay_reused) = self.camera_rt.activate(&cfg).await?;
let camera_rt = self.camera_rt.clone(); let camera_rt = self.camera_rt.clone();
let upstream_media_rt = self.upstream_media_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone();
let (tx, rx) = tokio::sync::mpsc::channel(1); let (tx, rx) = tokio::sync::mpsc::channel(1);
@ -175,6 +184,10 @@ impl Relay for Handler {
let mut pending = std::collections::VecDeque::new(); let mut pending = std::collections::VecDeque::new();
let mut inbound_closed = false; let mut inbound_closed = false;
let stale_drop_budget = upstream_stale_drop_budget(); let stale_drop_budget = upstream_stale_drop_budget();
let startup_grace_us = upstream_camera_startup_grace()
.as_micros()
.min(u64::MAX as u128) as u64;
let mut cold_startup_grace_pending = !relay_reused && startup_grace_us > 0;
loop { loop {
if !camera_rt.is_active(session_id) if !camera_rt.is_active(session_id)
|| !upstream_media_rt.is_camera_active(upstream_lease.generation) || !upstream_media_rt.is_camera_active(upstream_lease.generation)
@ -202,6 +215,11 @@ impl Relay for Handler {
} }
continue; continue;
}; };
if cold_startup_grace_pending && pkt.pts < startup_grace_us {
let _ = retain_freshest_video_packet(&mut pending);
continue;
}
cold_startup_grace_pending = false;
let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) { let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => {
pending.push_front(pkt); pending.push_front(pkt);