diff --git a/Cargo.lock b/Cargo.lock index b6c6498..4627d5f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.14.6" +version = "0.14.7" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.14.6" +version = "0.14.7" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.14.6" +version = "0.14.7" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 9d4639b..943f95a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.14.6" +version = "0.14.7" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 8496091..9561371 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.14.6" +version = "0.14.7" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 6a253f1..59d8aa1 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.14.6" +version = "0.14.7" edition = "2024" autobins = false diff --git a/server/src/camera_runtime.rs b/server/src/camera_runtime.rs index fcae228..a789abe 100644 --- a/server/src/camera_runtime.rs +++ b/server/src/camera_runtime.rs @@ -51,7 +51,7 @@ impl CameraRuntime { pub async fn activate( &self, cfg: &camera::CameraConfig, - ) -> Result<(u64, Arc), Status> { + ) -> Result<(u64, Arc, bool), Status> { let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1; if matches!(cfg.output, camera::CameraOutput::Uvc) && std::env::var("LESAVKA_DISABLE_UVC").is_ok() @@ -60,14 +60,14 @@ impl CameraRuntime { "UVC output disabled (LESAVKA_DISABLE_UVC set)", )); } - Ok((session_id, Arc::new(video::CameraRelay::new_noop(0)))) + Ok((session_id, Arc::new(video::CameraRelay::new_noop(0)), false)) } #[cfg(not(coverage))] pub async fn activate( &self, cfg: &camera::CameraConfig, - ) -> Result<(u64, Arc), Status> { + ) -> Result<(u64, Arc, bool), Status> { let session_id = self.generation.fetch_add(1, Ordering::SeqCst) + 1; let mut slot = self.slot.lock().await; let mut reused = false; @@ -101,7 +101,7 @@ impl CameraRuntime { info!(session_id, "🎥 camera relay reused"); } - Ok((session_id, relay)) + Ok((session_id, relay, reused)) } /// Check whether a previously issued session id is still current. diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 4371c05..61693c7 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -8,6 +8,15 @@ fn upstream_stale_drop_budget() -> Duration { Duration::from_millis(drop_ms) } +#[cfg(not(coverage))] +fn upstream_camera_startup_grace() -> Duration { + let grace_ms = std::env::var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(250); + Duration::from_millis(grace_ms) +} + #[cfg(not(coverage))] fn retain_freshest_video_packet( pending: &mut std::collections::VecDeque, @@ -265,7 +274,7 @@ impl Relay for Handler { ); let upstream_lease = self.upstream_media_rt.activate_camera(); - let (session_id, relay) = self.camera_rt.activate(&cfg).await?; + let (session_id, relay, relay_reused) = self.camera_rt.activate(&cfg).await?; let camera_rt = self.camera_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone(); info!(rpc_id, session_id, "🎥 stream_camera opened"); @@ -280,6 +289,10 @@ impl Relay for Handler { let mut inbound_closed = false; let stale_drop_budget = upstream_stale_drop_budget(); let mut startup_video_settled = false; + let startup_grace_us = upstream_camera_startup_grace() + .as_micros() + .min(u64::MAX as u128) as u64; + let mut cold_startup_grace_pending = !relay_reused && startup_grace_us > 0; loop { if !camera_rt.is_active(session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) @@ -316,6 +329,19 @@ impl Relay for Handler { } continue; }; + if cold_startup_grace_pending && pkt.pts < startup_grace_us { + let coalesced = retain_freshest_video_packet(&mut pending); + tracing::debug!( + rpc_id, + session_id, + pts = pkt.pts, + startup_grace_us, + dropped_pending = coalesced, + "🎥 dropping cold-start upstream video until the relay warm-up grace is spent" + ); + continue; + } + cold_startup_grace_pending = false; let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { pending.push_front(pkt); diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 229b810..b6b7a8d 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -7,6 +7,15 @@ fn upstream_stale_drop_budget() -> Duration { Duration::from_millis(drop_ms) } +#[cfg(coverage)] +fn upstream_camera_startup_grace() -> Duration { + let grace_ms = std::env::var("LESAVKA_UPSTREAM_CAMERA_STARTUP_GRACE_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(250); + Duration::from_millis(grace_ms) +} + #[cfg(coverage)] fn retain_freshest_video_packet( pending: &mut std::collections::VecDeque, @@ -164,7 +173,7 @@ impl Relay for Handler { ) -> Result, Status> { let cfg = camera::current_camera_config(); let upstream_lease = self.upstream_media_rt.activate_camera(); - let (session_id, relay) = self.camera_rt.activate(&cfg).await?; + let (session_id, relay, relay_reused) = self.camera_rt.activate(&cfg).await?; let camera_rt = self.camera_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone(); let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -175,6 +184,10 @@ impl Relay for Handler { let mut pending = std::collections::VecDeque::new(); let mut inbound_closed = false; let stale_drop_budget = upstream_stale_drop_budget(); + let startup_grace_us = upstream_camera_startup_grace() + .as_micros() + .min(u64::MAX as u128) as u64; + let mut cold_startup_grace_pending = !relay_reused && startup_grace_us > 0; loop { if !camera_rt.is_active(session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) @@ -202,6 +215,11 @@ impl Relay for Handler { } continue; }; + if cold_startup_grace_pending && pkt.pts < startup_grace_us { + let _ = retain_freshest_video_packet(&mut pending); + continue; + } + cold_startup_grace_pending = false; let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { pending.push_front(pkt);