From 850a9fcd429c655e02e4c1bd82930d191ae8073a Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 15 May 2026 19:29:52 -0300 Subject: [PATCH] media: recover sustained upstream av skew --- Cargo.lock | 6 +- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- docs/operational-env.md | 1 + server/Cargo.toml | 2 +- server/src/main/relay_service.rs | 35 +++++++++++ .../main/relay_service/upstream_media_rpc.rs | 25 ++++++++ server/src/main/relay_service_tests.rs | 63 ++++++++++++++++++- 8 files changed, 129 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1779d6a..46c476e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.43" +version = "0.22.44" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.43" +version = "0.22.44" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.43" +version = "0.22.44" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 529c451..cdab257 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.43" +version = "0.22.44" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index b62d1f0..b3ef4b0 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.43" +version = "0.22.44" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index 03f7c44..2128cc2 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -298,6 +298,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` | | `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | | `LESAVKA_UPSTREAM_V2_AUDIO_STARVATION_HEAL_MS` | v2 bundled webcam watchdog; if video stays live while audio disappears for this long, the server retires the mic epoch and forces a clean bundled reconnect, defaults to `1500`, set `0` to disable | +| `LESAVKA_UPSTREAM_V2_AV_SKEW_HEAL_MS` | v2 bundled webcam watchdog; if audio and video packets keep arriving with an impossible capture span for this long, the server retires the mic epoch and forces a clean bundled reconnect, defaults to `1500`, set `0` to disable | | `LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS` | v2 bundled webcam freshness ceiling; bundles already older than this are dropped as one unit, defaults to `1000` | | `LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS` | v2 optional common playout slack after sync offsets; defaults to `20` and is reduced when needed to protect the live-age budget | | `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_MODE_OFFSETS_US` | server upstream per-UVC-mode output-path map; shipped MJPEG defaults are `1280x720@20=162659,1280x720@30=135090,1920x1080@20=160045,1920x1080@30=127952`; shipped HEVC decode-to-MJPEG defaults are profile-specific under `LESAVKA_UPSTREAM_HEVC_VIDEO_PLAYOUT_MODE_OFFSETS_US` | diff --git a/server/Cargo.toml b/server/Cargo.toml index 290fce6..cae0b2e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.43" +version = "0.22.44" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 2c1fdab..c7ce889 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -3,6 +3,7 @@ const MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS: u64 = 1_000; const MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS: u64 = 750; const MEDIA_V2_DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 5_000; const MEDIA_V2_DEFAULT_AUDIO_STARVATION_HEAL_MS: u64 = 1_500; +const MEDIA_V2_DEFAULT_AV_SKEW_HEAL_MS: u64 = 1_500; const MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US: u64 = 250_000; #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -135,6 +136,14 @@ fn media_v2_audio_starvation_heal_after() -> Option { (millis > 0).then(|| Duration::from_millis(millis)) } +fn media_v2_av_skew_heal_after() -> Option { + let millis = std::env::var("LESAVKA_UPSTREAM_V2_AV_SKEW_HEAL_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(MEDIA_V2_DEFAULT_AV_SKEW_HEAL_MS); + (millis > 0).then(|| Duration::from_millis(millis)) +} + /// Track whether a bundled stream has lost audio while video is still live. /// /// Inputs are the latest bundle facts plus the current starvation marker. @@ -160,6 +169,32 @@ fn media_v2_audio_starvation_elapsed( now.saturating_duration_since(*since) >= heal_after } +/// Track sustained impossible A/V capture skew inside otherwise live bundles. +/// +/// Inputs are the latest bundle facts plus the current skew marker. Output is +/// true once skewed bundles persist beyond the heal window. Why: the mic epoch +/// can keep producing packets while its timestamp base has drifted far away +/// from video; soft UAC recovery is the known-safe repair for that state. +fn media_v2_av_skew_elapsed( + facts: MediaV2BundleFacts, + now: tokio::time::Instant, + skewed_since: &mut Option, + heal_after: Option, +) -> bool { + if !facts.has_audio + || !facts.has_video + || facts.capture_span_us <= MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US + { + *skewed_since = None; + return false; + } + let Some(heal_after) = heal_after else { + return false; + }; + let since = skewed_since.get_or_insert(now); + now.saturating_duration_since(*since) >= heal_after +} + /// Keeps `media_v2_handoff_schedule` explicit because it sits on relay RPC orchestration, where hardware failures must surface without stopping the server. /// Inputs are the typed parameters; output is the return value or side effect. fn media_v2_handoff_schedule( diff --git a/server/src/main/relay_service/upstream_media_rpc.rs b/server/src/main/relay_service/upstream_media_rpc.rs index 2d5ed60..c385dd6 100644 --- a/server/src/main/relay_service/upstream_media_rpc.rs +++ b/server/src/main/relay_service/upstream_media_rpc.rs @@ -86,6 +86,8 @@ impl Handler { let idle_timeout = media_v2_stream_idle_timeout(); let audio_starvation_heal_after = media_v2_audio_starvation_heal_after(); let mut audio_starved_since = None; + let av_skew_heal_after = media_v2_av_skew_heal_after(); + let mut av_skewed_since = None; let (mut audio_handoff_tx, audio_worker) = if let Some((microphone_sink_permit, sink)) = microphone_sink { let (audio_handoff_tx, audio_handoff_rx) = @@ -199,6 +201,29 @@ impl Handler { outcome = "audio-starved"; break; } + if media_v2_av_skew_elapsed( + facts, + bundle_arrived_at, + &mut av_skewed_since, + av_skew_heal_after, + ) { + let av_skewed_ms = av_skewed_since + .map(|since| bundle_arrived_at.saturating_duration_since(since)) + .unwrap_or_default() + .as_millis(); + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + capture_span_ms = facts.capture_span_us / 1000, + av_skewed_ms, + "📦🩺 v2 bundled A/V epoch skew stayed impossible; retiring upstream audio epoch" + ); + upstream_media_rt.soft_recover_microphone(); + outcome = "av-skewed"; + break; + } if facts.has_audio && facts.has_video && facts.capture_span_us > MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US { if matches!(camera_cfg.codec, camera::CameraCodec::Hevc) && facts.has_video { waiting_for_hevc_keyframe = true; diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index 0eb22cf..cd41cbd 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -3,7 +3,8 @@ mod tests { use super::{ MediaV2BundleFacts, UpstreamStreamCleanup, media_v2_audio_starvation_elapsed, - media_v2_audio_starvation_heal_after, media_v2_frame_step_us, media_v2_handoff_schedule, + media_v2_audio_starvation_heal_after, media_v2_av_skew_elapsed, + media_v2_av_skew_heal_after, media_v2_frame_step_us, media_v2_handoff_schedule, media_v2_has_hevc_recovery_keyframe, media_v2_should_hold_hevc_video_for_recovery, media_v2_uac_start_timeout, prepare_media_v2_audio, prepare_media_v2_video, retain_freshest_audio_packet, retain_freshest_video_packet, summarize_media_v2_bundle, @@ -214,6 +215,20 @@ mod tests { ); } + #[test] + fn media_v2_av_skew_watchdog_is_short_and_disableable() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_V2_AV_SKEW_HEAL_MS", || { + assert_eq!( + media_v2_av_skew_heal_after(), + Some(std::time::Duration::from_millis(1_500)) + ); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_V2_AV_SKEW_HEAL_MS", Some("0"), || { + assert_eq!(media_v2_av_skew_heal_after(), None); + }); + } + #[test] fn media_v2_audio_starvation_elapsed_requires_live_video_without_audio() { let mut starved_since = None; @@ -264,6 +279,52 @@ mod tests { assert!(starved_since.is_none()); } + #[test] + fn media_v2_av_skew_elapsed_requires_sustained_impossible_capture_span() { + let mut skewed_since = None; + let now = tokio::time::Instant::now(); + let skewed = MediaV2BundleFacts { + has_audio: true, + has_video: true, + capture_start_us: 1_000_000, + capture_end_us: 1_400_001, + capture_span_us: 400_001, + max_queue_age_ms: 0, + }; + let normal = MediaV2BundleFacts { + capture_end_us: 1_020_000, + capture_span_us: 20_000, + ..skewed + }; + + assert!(!media_v2_av_skew_elapsed( + skewed, + now, + &mut skewed_since, + Some(std::time::Duration::from_millis(1_500)) + )); + assert!(skewed_since.is_some()); + assert!(!media_v2_av_skew_elapsed( + skewed, + now + std::time::Duration::from_millis(1_499), + &mut skewed_since, + Some(std::time::Duration::from_millis(1_500)) + )); + assert!(media_v2_av_skew_elapsed( + skewed, + now + std::time::Duration::from_millis(1_500), + &mut skewed_since, + Some(std::time::Duration::from_millis(1_500)) + )); + assert!(!media_v2_av_skew_elapsed( + normal, + now + std::time::Duration::from_millis(1_600), + &mut skewed_since, + Some(std::time::Duration::from_millis(1_500)) + )); + assert!(skewed_since.is_none()); + } + #[test] /// Keeps server HEVC drop recovery explicit because late-drop freshness can otherwise corrupt decoded video. fn media_v2_hevc_recovery_holds_delta_until_keyframe() {