diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 33ab77e..f552955 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -622,3 +622,4 @@ if [[ -n $INSTALLED_VERSION || -n $INSTALLED_SHA ]]; then fi echo "➡️ Status: sudo systemctl status lesavka-server --no-pager" echo "➡️ Logs: sudo journalctl -u lesavka-server -f --no-pager" +echo "✅ Installed version: ${INSTALLED_VERSION:-lesavka-server}${INSTALLED_SHA:+ ($INSTALLED_SHA)}" diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index f1967c6..32f4e67 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -12,7 +12,7 @@ mod state; use config::{ apply_playout_offset, upstream_pairing_master_slack, upstream_playout_delay, - upstream_playout_offset_us, upstream_timing_trace_enabled, + upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_timing_trace_enabled, }; use state::UpstreamClockState; @@ -416,13 +416,41 @@ impl UpstreamMediaRuntime { *last_slot = Some(local_pts_us); let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); let sink_offset_us = upstream_playout_offset_us(kind); - let due_at = + let playout_delay = upstream_playout_delay(); + let mut due_at = apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); - let late_by = now.checked_duration_since(due_at).unwrap_or_default(); + let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); + let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); + if late_by > reanchor_threshold { + let old_late_by = late_by; + let desired_due_at = now + playout_delay; + let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); + let recovered_epoch = unoffset_due_at + .checked_sub(Duration::from_micros(local_pts_us)) + .unwrap_or(unoffset_due_at); + state.playout_epoch = Some(recovered_epoch); + state.pairing_anchor_deadline = Some(desired_due_at); + due_at = apply_playout_offset( + recovered_epoch + Duration::from_micros(local_pts_us), + sink_offset_us, + ); + late_by = now.checked_duration_since(due_at).unwrap_or_default(); + info!( + session_id, + ?kind, + packet_count, + local_pts_us, + remote_pts_us, + old_late_by_ms = old_late_by.as_millis(), + recovery_buffer_ms = playout_delay.as_millis(), + reanchor_threshold_ms = reanchor_threshold.as_millis(), + "upstream media playout epoch reanchored after catastrophic lateness" + ); + } if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { - let playout_delay_us = epoch.saturating_duration_since(now).as_micros(); + let playout_delay_us = due_at.saturating_duration_since(now).as_micros(); let late_by_us = late_by.as_micros(); info!( session_id, diff --git a/server/src/upstream_media_runtime/config.rs b/server/src/upstream_media_runtime/config.rs index f53ccea..dda29c1 100644 --- a/server/src/upstream_media_runtime/config.rs +++ b/server/src/upstream_media_runtime/config.rs @@ -43,6 +43,20 @@ pub(super) fn upstream_pairing_master_slack() -> Duration { Duration::from_micros(slack_us) } +pub(super) fn upstream_reanchor_late_threshold(playout_delay: Duration) -> Duration { + if let Some(override_ms) = std::env::var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + { + return Duration::from_millis(override_ms); + } + + let default_ms = (playout_delay.as_millis().min(u64::MAX as u128) as u64) + .saturating_div(2) + .max(250); + Duration::from_millis(default_ms) +} + pub(super) fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant { if offset_us >= 0 { base + Duration::from_micros(offset_us as u64) diff --git a/server/src/upstream_media_runtime/tests.rs b/server/src/upstream_media_runtime/tests.rs index caa6342..0e8cfa0 100644 --- a/server/src/upstream_media_runtime/tests.rs +++ b/server/src/upstream_media_runtime/tests.rs @@ -184,6 +184,27 @@ fn upstream_pairing_master_slack_defaults_to_twenty_ms_and_accepts_overrides() { }); } +#[test] +fn upstream_reanchor_late_threshold_defaults_to_half_the_buffer_and_accepts_overrides() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", || { + assert_eq!( + super::upstream_reanchor_late_threshold(Duration::from_secs(1)), + Duration::from_millis(500) + ); + assert_eq!( + super::upstream_reanchor_late_threshold(Duration::from_millis(100)), + Duration::from_millis(250) + ); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("42"), || { + assert_eq!( + super::upstream_reanchor_late_threshold(Duration::from_secs(1)), + Duration::from_millis(42) + ); + }); +} + #[test] fn upstream_timing_trace_flag_accepts_false_values() { temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("off"), || { @@ -282,6 +303,42 @@ fn shared_playout_trace_path_keeps_planned_pts_stable() { }); } +#[test] +fn catastrophic_lateness_reanchors_the_shared_playout_epoch() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { + temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio_first = play(runtime.plan_audio_pts(1_000_000)); + let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); + + std::thread::sleep(Duration::from_millis(30)); + + let recovered_audio = play(runtime.plan_audio_pts(1_000_000)); + assert!( + recovered_audio.due_at > tokio::time::Instant::now(), + "recovered packet should be scheduled back into the future" + ); + assert!( + recovered_audio.late_by <= Duration::from_millis(1), + "recovered packet should no longer be catastrophically late" + ); + + let recovered_video = play(runtime.plan_video_pts(1_016_666, 16_666)); + assert!( + recovered_video.due_at > tokio::time::Instant::now(), + "shared epoch recovery should also move video back into the future" + ); + }); + }); +} + #[tokio::test(flavor = "current_thread")] async fn wait_for_audio_master_releases_video_once_audio_catches_up() { let runtime = Arc::new(UpstreamMediaRuntime::new());