media: let video wait for audio sync grace

This commit is contained in:
Brad Stein 2026-05-02 11:33:49 -03:00
parent fbf274d21b
commit 4e1b6d781f
15 changed files with 112 additions and 45 deletions

View File

@ -257,3 +257,17 @@ Context: 0.17.8 installed cleanly on both ends (`314c55b`) but the mirrored prob
- [x] Run focused upstream planner tests. - [x] Run focused upstream planner tests.
- [x] Run package checks before push. - [x] Run package checks before push.
- [x] Push clean semver `0.17.9` for installed client/server testing. - [x] Push clean semver `0.17.9` for installed client/server testing.
## 0.17.10 Sync-Only Audio Catch-Up Grace Checklist
Context: 0.17.9 installed cleanly on both ends (`fbf274d`) and improved the mirrored probe to `median=+19.8ms`, `mean=-42.0ms`, and planner phase `live`, but it still failed with `p95=254.1ms`, only 6 paired pulses, `drift=341.9ms`, and 591 video freezes. The Theia server log showed repeated `upstream video frame dropped because the audio master never caught up inside the pairing window`, so the video follower was still giving up at the nominal video due time instead of spending a bounded sync grace to let audio catch up.
- [x] Keep 0.17.10 scoped to establishing sync; defer freshness and smoothness tuning until paired skew is stable.
- [x] Add `LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS` with a `350ms` default so video can wait past nominal due time for UAC audio progress.
- [x] Stop dropping video solely because it woke late after a successful audio-master wait.
- [x] Preserve the global `1000ms` live-lag ceiling and existing stale-input planner rules.
- [x] Update installer defaults and operational docs for the sync grace.
- [x] Add/adjust tests proving video can wait through sync grace and still times out after grace expires.
- [x] Run focused upstream planner tests.
- [x] Run package checks before push.
- [ ] Push clean semver `0.17.10` for installed client/server testing.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.17.9" version = "0.17.10"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.17.9" version = "0.17.10"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.17.9" version = "0.17.10"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.17.9" version = "0.17.10"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.17.9" version = "0.17.10"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -248,6 +248,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_DEV` | server hardware/device override |
| `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default |
| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | | `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch |
| `LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS` | server upstream sync override; how long video may wait past its nominal due time for UAC audio to reach the matching timestamp, defaults to `350` |
| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` | | `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` |
| `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` | | `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` |
| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` | | `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` |

View File

@ -1005,6 +1005,7 @@ fi
printf 'LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s\n' "$(resolve_upstream_audio_playout_offset_us)" printf 'LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s\n' "$(resolve_upstream_audio_playout_offset_us)"
printf 'LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s\n' "$(resolve_upstream_video_playout_offset_us)" printf 'LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s\n' "$(resolve_upstream_video_playout_offset_us)"
printf 'LESAVKA_UPSTREAM_PAIR_SLACK_US=%s\n' "${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}" printf 'LESAVKA_UPSTREAM_PAIR_SLACK_US=%s\n' "${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}"
printf 'LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS=%s\n' "${LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS:-350}"
printf 'LESAVKA_UPSTREAM_STALE_DROP_MS=%s\n' "${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}" printf 'LESAVKA_UPSTREAM_STALE_DROP_MS=%s\n' "${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}"
printf 'LESAVKA_SERVER_BIND_ADDR=%s\n' "${INSTALL_SERVER_BIND_ADDR}" printf 'LESAVKA_SERVER_BIND_ADDR=%s\n' "${INSTALL_SERVER_BIND_ADDR}"
printf 'LESAVKA_UVC_CODEC=%s\n' "${INSTALL_UVC_CODEC}" printf 'LESAVKA_UVC_CODEC=%s\n' "${INSTALL_UVC_CODEC}"

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.17.9" version = "0.17.10"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -494,29 +494,14 @@ impl Relay for Handler {
.checked_duration_since(plan.due_at) .checked_duration_since(plan.due_at)
.unwrap_or_default(); .unwrap_or_default();
if actual_late_by > stale_drop_budget { if actual_late_by > stale_drop_budget {
let coalesced = retain_freshest_video_packet(&mut pending); tracing::debug!(
if startup_video_settled { rpc_id,
tracing::warn!( session_id = upstream_lease.session_id,
rpc_id, camera_session_id,
session_id = upstream_lease.session_id, late_by_ms = actual_late_by.as_millis(),
camera_session_id, pts = plan.local_pts_us,
late_by_ms = actual_late_by.as_millis(), "🎥 emitting video after waiting for the audio master to preserve sync"
pts = plan.local_pts_us, );
dropped_pending = coalesced,
"🎥 upstream video frame dropped after waking too late for fresh playout"
);
} else {
tracing::debug!(
rpc_id,
session_id = upstream_lease.session_id,
camera_session_id,
late_by_ms = actual_late_by.as_millis(),
pts = plan.local_pts_us,
dropped_pending = coalesced,
"🎥 dropping startup-stale upstream video after a late wake until the playout window settles"
);
}
continue;
} }
pkt.pts = plan.local_pts_us; pkt.pts = plan.local_pts_us;
startup_video_settled = true; startup_video_settled = true;

View File

@ -261,13 +261,6 @@ impl Relay for Handler {
continue; continue;
} }
tokio::time::sleep_until(plan.due_at).await; tokio::time::sleep_until(plan.due_at).await;
let actual_late_by = tokio::time::Instant::now()
.checked_duration_since(plan.due_at)
.unwrap_or_default();
if actual_late_by > stale_drop_budget {
let _ = retain_freshest_video_packet(&mut pending);
continue;
}
pkt.pts = plan.local_pts_us; pkt.pts = plan.local_pts_us;
let presented_pts = pkt.pts; let presented_pts = pkt.pts;
relay.feed(pkt); relay.feed(pkt);

View File

@ -12,10 +12,10 @@ mod state;
mod types; mod types;
use config::{ use config::{
apply_playout_offset, upstream_camera_startup_grace_us, upstream_max_live_lag, apply_playout_offset, upstream_audio_master_wait_grace, upstream_camera_startup_grace_us,
upstream_pairing_master_slack, upstream_playout_delay, upstream_playout_offset_us, upstream_max_live_lag, upstream_pairing_master_slack, upstream_playout_delay,
upstream_reanchor_late_threshold, upstream_require_paired_startup, upstream_startup_timeout, upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_require_paired_startup,
upstream_timing_trace_enabled, upstream_startup_timeout, upstream_timing_trace_enabled,
}; };
use state::{UpstreamClockState, UpstreamSyncPhase}; use state::{UpstreamClockState, UpstreamSyncPhase};
pub use types::{ pub use types::{
@ -215,12 +215,13 @@ impl UpstreamMediaRuntime {
} }
/// Hold video until the audio master has at least reached the same capture /// Hold video until the audio master has at least reached the same capture
/// moment, or give up once the frame can no longer be shown fresh. /// moment, or until the bounded sync grace is exhausted.
pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool {
let slack_us = upstream_pairing_master_slack() let slack_us = upstream_pairing_master_slack()
.as_micros() .as_micros()
.min(u64::MAX as u128) as u64; .min(u64::MAX as u128) as u64;
let audio_delay_allowance_us = self.positive_audio_delay_allowance_us(); let audio_delay_allowance_us = self.positive_audio_delay_allowance_us();
let deadline = due_at + upstream_audio_master_wait_grace();
loop { loop {
let notified = self.audio_progress_notify.notified(); let notified = self.audio_progress_notify.notified();
{ {
@ -240,12 +241,12 @@ impl UpstreamMediaRuntime {
return true; return true;
} }
} }
if Instant::now() >= due_at { if Instant::now() >= deadline {
return false; return false;
} }
tokio::select! { tokio::select! {
_ = notified => {} _ = notified => {}
_ = tokio::time::sleep_until(due_at) => return false, _ = tokio::time::sleep_until(deadline) => return false,
} }
} }
} }

View File

@ -79,6 +79,14 @@ pub(super) fn upstream_pairing_master_slack() -> Duration {
Duration::from_micros(slack_us) Duration::from_micros(slack_us)
} }
pub(super) fn upstream_audio_master_wait_grace() -> Duration {
let grace_ms = std::env::var("LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(350);
Duration::from_millis(grace_ms)
}
pub(super) fn upstream_reanchor_late_threshold(playout_delay: Duration) -> Duration { pub(super) fn upstream_reanchor_late_threshold(playout_delay: Duration) -> Duration {
if let Some(override_ms) = std::env::var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS") if let Some(override_ms) = std::env::var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS")
.ok() .ok()

View File

@ -86,6 +86,46 @@ async fn wait_for_audio_master_times_out_when_audio_never_catches_up() {
); );
} }
#[tokio::test(flavor = "current_thread")]
#[serial(upstream_media_runtime)]
async fn wait_for_audio_master_keeps_video_waiting_through_sync_grace() {
let runtime = Arc::new(runtime_without_offsets());
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let audio_first = play(runtime.plan_audio_pts(1_000_000));
runtime.mark_audio_presented(audio_first.local_pts_us);
let video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
let waiter = tokio::spawn({
let runtime = runtime.clone();
async move {
runtime
.wait_for_audio_master(
video_first.local_pts_us + 120_000,
tokio::time::Instant::now()
.checked_sub(Duration::from_millis(100))
.unwrap_or_else(tokio::time::Instant::now),
)
.await
}
});
tokio::time::sleep(Duration::from_millis(5)).await;
let audio_next = play(runtime.plan_audio_pts(1_120_000));
runtime.mark_audio_presented(audio_next.local_pts_us);
assert!(
waiter
.await
.expect("audio master waiter should finish inside sync grace")
);
}
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
async fn wait_for_audio_master_returns_true_when_no_microphone_stream_is_active() { async fn wait_for_audio_master_returns_true_when_no_microphone_stream_is_active() {

View File

@ -120,6 +120,28 @@ fn upstream_pairing_master_slack_defaults_to_eighty_ms_and_accepts_overrides() {
}); });
} }
#[test]
#[serial(upstream_media_runtime)]
fn upstream_audio_master_wait_grace_defaults_to_sync_buffer_and_accepts_overrides() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS", || {
assert_eq!(
super::upstream_audio_master_wait_grace(),
Duration::from_millis(350)
);
});
temp_env::with_var(
"LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS",
Some("125"),
|| {
assert_eq!(
super::upstream_audio_master_wait_grace(),
Duration::from_millis(125)
);
},
);
}
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn upstream_reanchor_late_threshold_defaults_to_half_the_buffer_and_accepts_overrides() { fn upstream_reanchor_late_threshold_defaults_to_half_the_buffer_and_accepts_overrides() {

View File

@ -464,7 +464,7 @@ fn default_runtime_covers_video_map_play_path() {
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
async fn wait_for_audio_master_returns_false_immediately_once_due_time_has_already_passed() { async fn wait_for_audio_master_returns_false_after_sync_grace_has_already_passed() {
let runtime = UpstreamMediaRuntime::new(); let runtime = UpstreamMediaRuntime::new();
let _camera = runtime.activate_camera(); let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone(); let _microphone = runtime.activate_microphone();
@ -474,7 +474,7 @@ async fn wait_for_audio_master_returns_false_immediately_once_due_time_has_alrea
.wait_for_audio_master( .wait_for_audio_master(
123_456, 123_456,
tokio::time::Instant::now() tokio::time::Instant::now()
.checked_sub(Duration::from_millis(1)) .checked_sub(Duration::from_millis(400))
.unwrap_or_else(tokio::time::Instant::now), .unwrap_or_else(tokio::time::Instant::now),
) )
.await .await

View File

@ -24,6 +24,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
"LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s", "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s",
"LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s", "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s",
"LESAVKA_UPSTREAM_PAIR_SLACK_US=%s", "LESAVKA_UPSTREAM_PAIR_SLACK_US=%s",
"LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS=%s",
"LESAVKA_UPSTREAM_STALE_DROP_MS=%s", "LESAVKA_UPSTREAM_STALE_DROP_MS=%s",
"LESAVKA_SERVER_BIND_ADDR=%s", "LESAVKA_SERVER_BIND_ADDR=%s",
"/etc/lesavka/uvc.env", "/etc/lesavka/uvc.env",
@ -85,6 +86,7 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
"installer should not preserve old video delay baselines accidentally" "installer should not preserve old video delay baselines accidentally"
); );
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_AUDIO_MASTER_WAIT_GRACE_MS:-350}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_INSTALL_SERVER_BIND_ADDR:-0.0.0.0:50051}")); assert!(SERVER_INSTALL.contains("${LESAVKA_INSTALL_SERVER_BIND_ADDR:-0.0.0.0:50051}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_MAXPACKET:-1024}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UVC_MAXPACKET:-1024}"));