fix(sync): recover from catastrophic playout lateness
This commit is contained in:
parent
28cbdc8808
commit
538c3e778d
@ -622,3 +622,4 @@ if [[ -n $INSTALLED_VERSION || -n $INSTALLED_SHA ]]; then
|
||||
fi
|
||||
echo "➡️ Status: sudo systemctl status lesavka-server --no-pager"
|
||||
echo "➡️ Logs: sudo journalctl -u lesavka-server -f --no-pager"
|
||||
echo "✅ Installed version: ${INSTALLED_VERSION:-lesavka-server}${INSTALLED_SHA:+ ($INSTALLED_SHA)}"
|
||||
|
||||
@ -12,7 +12,7 @@ mod state;
|
||||
|
||||
use config::{
|
||||
apply_playout_offset, upstream_pairing_master_slack, upstream_playout_delay,
|
||||
upstream_playout_offset_us, upstream_timing_trace_enabled,
|
||||
upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_timing_trace_enabled,
|
||||
};
|
||||
use state::UpstreamClockState;
|
||||
|
||||
@ -416,13 +416,41 @@ impl UpstreamMediaRuntime {
|
||||
*last_slot = Some(local_pts_us);
|
||||
let epoch = *state.playout_epoch.get_or_insert(pairing_deadline);
|
||||
let sink_offset_us = upstream_playout_offset_us(kind);
|
||||
let due_at =
|
||||
let playout_delay = upstream_playout_delay();
|
||||
let mut due_at =
|
||||
apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us);
|
||||
let late_by = now.checked_duration_since(due_at).unwrap_or_default();
|
||||
let mut late_by = now.checked_duration_since(due_at).unwrap_or_default();
|
||||
let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay);
|
||||
if late_by > reanchor_threshold {
|
||||
let old_late_by = late_by;
|
||||
let desired_due_at = now + playout_delay;
|
||||
let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us);
|
||||
let recovered_epoch = unoffset_due_at
|
||||
.checked_sub(Duration::from_micros(local_pts_us))
|
||||
.unwrap_or(unoffset_due_at);
|
||||
state.playout_epoch = Some(recovered_epoch);
|
||||
state.pairing_anchor_deadline = Some(desired_due_at);
|
||||
due_at = apply_playout_offset(
|
||||
recovered_epoch + Duration::from_micros(local_pts_us),
|
||||
sink_offset_us,
|
||||
);
|
||||
late_by = now.checked_duration_since(due_at).unwrap_or_default();
|
||||
info!(
|
||||
session_id,
|
||||
?kind,
|
||||
packet_count,
|
||||
local_pts_us,
|
||||
remote_pts_us,
|
||||
old_late_by_ms = old_late_by.as_millis(),
|
||||
recovery_buffer_ms = playout_delay.as_millis(),
|
||||
reanchor_threshold_ms = reanchor_threshold.as_millis(),
|
||||
"upstream media playout epoch reanchored after catastrophic lateness"
|
||||
);
|
||||
}
|
||||
if upstream_timing_trace_enabled()
|
||||
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
|
||||
{
|
||||
let playout_delay_us = epoch.saturating_duration_since(now).as_micros();
|
||||
let playout_delay_us = due_at.saturating_duration_since(now).as_micros();
|
||||
let late_by_us = late_by.as_micros();
|
||||
info!(
|
||||
session_id,
|
||||
|
||||
@ -43,6 +43,20 @@ pub(super) fn upstream_pairing_master_slack() -> Duration {
|
||||
Duration::from_micros(slack_us)
|
||||
}
|
||||
|
||||
pub(super) fn upstream_reanchor_late_threshold(playout_delay: Duration) -> Duration {
|
||||
if let Some(override_ms) = std::env::var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS")
|
||||
.ok()
|
||||
.and_then(|value| value.trim().parse::<u64>().ok())
|
||||
{
|
||||
return Duration::from_millis(override_ms);
|
||||
}
|
||||
|
||||
let default_ms = (playout_delay.as_millis().min(u64::MAX as u128) as u64)
|
||||
.saturating_div(2)
|
||||
.max(250);
|
||||
Duration::from_millis(default_ms)
|
||||
}
|
||||
|
||||
pub(super) fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant {
|
||||
if offset_us >= 0 {
|
||||
base + Duration::from_micros(offset_us as u64)
|
||||
|
||||
@ -184,6 +184,27 @@ fn upstream_pairing_master_slack_defaults_to_twenty_ms_and_accepts_overrides() {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upstream_reanchor_late_threshold_defaults_to_half_the_buffer_and_accepts_overrides() {
|
||||
temp_env::with_var_unset("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", || {
|
||||
assert_eq!(
|
||||
super::upstream_reanchor_late_threshold(Duration::from_secs(1)),
|
||||
Duration::from_millis(500)
|
||||
);
|
||||
assert_eq!(
|
||||
super::upstream_reanchor_late_threshold(Duration::from_millis(100)),
|
||||
Duration::from_millis(250)
|
||||
);
|
||||
});
|
||||
|
||||
temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("42"), || {
|
||||
assert_eq!(
|
||||
super::upstream_reanchor_late_threshold(Duration::from_secs(1)),
|
||||
Duration::from_millis(42)
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn upstream_timing_trace_flag_accepts_false_values() {
|
||||
temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("off"), || {
|
||||
@ -282,6 +303,42 @@ fn shared_playout_trace_path_keeps_planned_pts_stable() {
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn catastrophic_lateness_reanchors_the_shared_playout_epoch() {
|
||||
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || {
|
||||
temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || {
|
||||
let runtime = UpstreamMediaRuntime::new();
|
||||
let _camera = runtime.activate_camera();
|
||||
let _microphone = runtime.activate_microphone();
|
||||
|
||||
assert!(matches!(
|
||||
runtime.plan_video_pts(1_000_000, 16_666),
|
||||
super::UpstreamPlanDecision::AwaitingPair
|
||||
));
|
||||
let _audio_first = play(runtime.plan_audio_pts(1_000_000));
|
||||
let _video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
|
||||
|
||||
std::thread::sleep(Duration::from_millis(30));
|
||||
|
||||
let recovered_audio = play(runtime.plan_audio_pts(1_000_000));
|
||||
assert!(
|
||||
recovered_audio.due_at > tokio::time::Instant::now(),
|
||||
"recovered packet should be scheduled back into the future"
|
||||
);
|
||||
assert!(
|
||||
recovered_audio.late_by <= Duration::from_millis(1),
|
||||
"recovered packet should no longer be catastrophically late"
|
||||
);
|
||||
|
||||
let recovered_video = play(runtime.plan_video_pts(1_016_666, 16_666));
|
||||
assert!(
|
||||
recovered_video.due_at > tokio::time::Instant::now(),
|
||||
"shared epoch recovery should also move video back into the future"
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn wait_for_audio_master_releases_video_once_audio_catches_up() {
|
||||
let runtime = Arc::new(UpstreamMediaRuntime::new());
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user