media: recover sustained upstream av skew

This commit is contained in:
Brad Stein 2026-05-15 19:29:52 -03:00
parent 3aea8b3c16
commit 850a9fcd42
8 changed files with 129 additions and 7 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.22.43"
version = "0.22.44"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.22.43"
version = "0.22.44"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.22.43"
version = "0.22.44"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.22.43"
version = "0.22.44"
edition = "2024"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.22.43"
version = "0.22.44"
edition = "2024"
build = "build.rs"

View File

@ -298,6 +298,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` |
| `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging |
| `LESAVKA_UPSTREAM_V2_AUDIO_STARVATION_HEAL_MS` | v2 bundled webcam watchdog; if video stays live while audio disappears for this long, the server retires the mic epoch and forces a clean bundled reconnect, defaults to `1500`, set `0` to disable |
| `LESAVKA_UPSTREAM_V2_AV_SKEW_HEAL_MS` | v2 bundled webcam watchdog; if audio and video packets keep arriving with an impossible capture span for this long, the server retires the mic epoch and forces a clean bundled reconnect, defaults to `1500`, set `0` to disable |
| `LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS` | v2 bundled webcam freshness ceiling; bundles already older than this are dropped as one unit, defaults to `1000` |
| `LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS` | v2 optional common playout slack after sync offsets; defaults to `20` and is reduced when needed to protect the live-age budget |
| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_MODE_OFFSETS_US` | server upstream per-UVC-mode output-path map; shipped MJPEG defaults are `1280x720@20=162659,1280x720@30=135090,1920x1080@20=160045,1920x1080@30=127952`; shipped HEVC decode-to-MJPEG defaults are profile-specific under `LESAVKA_UPSTREAM_HEVC_VIDEO_PLAYOUT_MODE_OFFSETS_US` |

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.22.43"
version = "0.22.44"
edition = "2024"
autobins = false

View File

@ -3,6 +3,7 @@ const MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS: u64 = 1_000;
const MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS: u64 = 750;
const MEDIA_V2_DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 5_000;
const MEDIA_V2_DEFAULT_AUDIO_STARVATION_HEAL_MS: u64 = 1_500;
const MEDIA_V2_DEFAULT_AV_SKEW_HEAL_MS: u64 = 1_500;
const MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US: u64 = 250_000;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
@ -135,6 +136,14 @@ fn media_v2_audio_starvation_heal_after() -> Option<Duration> {
(millis > 0).then(|| Duration::from_millis(millis))
}
fn media_v2_av_skew_heal_after() -> Option<Duration> {
let millis = std::env::var("LESAVKA_UPSTREAM_V2_AV_SKEW_HEAL_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(MEDIA_V2_DEFAULT_AV_SKEW_HEAL_MS);
(millis > 0).then(|| Duration::from_millis(millis))
}
/// Track whether a bundled stream has lost audio while video is still live.
///
/// Inputs are the latest bundle facts plus the current starvation marker.
@ -160,6 +169,32 @@ fn media_v2_audio_starvation_elapsed(
now.saturating_duration_since(*since) >= heal_after
}
/// Track sustained impossible A/V capture skew inside otherwise live bundles.
///
/// Inputs are the latest bundle facts plus the current skew marker. Output is
/// true once skewed bundles persist beyond the heal window. Why: the mic epoch
/// can keep producing packets while its timestamp base has drifted far away
/// from video; soft UAC recovery is the known-safe repair for that state.
fn media_v2_av_skew_elapsed(
facts: MediaV2BundleFacts,
now: tokio::time::Instant,
skewed_since: &mut Option<tokio::time::Instant>,
heal_after: Option<Duration>,
) -> bool {
if !facts.has_audio
|| !facts.has_video
|| facts.capture_span_us <= MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US
{
*skewed_since = None;
return false;
}
let Some(heal_after) = heal_after else {
return false;
};
let since = skewed_since.get_or_insert(now);
now.saturating_duration_since(*since) >= heal_after
}
/// Keeps `media_v2_handoff_schedule` explicit because it sits on relay RPC orchestration, where hardware failures must surface without stopping the server.
/// Inputs are the typed parameters; output is the return value or side effect.
fn media_v2_handoff_schedule(

View File

@ -86,6 +86,8 @@ impl Handler {
let idle_timeout = media_v2_stream_idle_timeout();
let audio_starvation_heal_after = media_v2_audio_starvation_heal_after();
let mut audio_starved_since = None;
let av_skew_heal_after = media_v2_av_skew_heal_after();
let mut av_skewed_since = None;
let (mut audio_handoff_tx, audio_worker) =
if let Some((microphone_sink_permit, sink)) = microphone_sink {
let (audio_handoff_tx, audio_handoff_rx) =
@ -199,6 +201,29 @@ impl Handler {
outcome = "audio-starved";
break;
}
if media_v2_av_skew_elapsed(
facts,
bundle_arrived_at,
&mut av_skewed_since,
av_skew_heal_after,
) {
let av_skewed_ms = av_skewed_since
.map(|since| bundle_arrived_at.saturating_duration_since(since))
.unwrap_or_default()
.as_millis();
warn!(
rpc_id,
session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
capture_span_ms = facts.capture_span_us / 1000,
av_skewed_ms,
"📦🩺 v2 bundled A/V epoch skew stayed impossible; retiring upstream audio epoch"
);
upstream_media_rt.soft_recover_microphone();
outcome = "av-skewed";
break;
}
if facts.has_audio && facts.has_video && facts.capture_span_us > MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US {
if matches!(camera_cfg.codec, camera::CameraCodec::Hevc) && facts.has_video {
waiting_for_hevc_keyframe = true;

View File

@ -3,7 +3,8 @@
mod tests {
use super::{
MediaV2BundleFacts, UpstreamStreamCleanup, media_v2_audio_starvation_elapsed,
media_v2_audio_starvation_heal_after, media_v2_frame_step_us, media_v2_handoff_schedule,
media_v2_audio_starvation_heal_after, media_v2_av_skew_elapsed,
media_v2_av_skew_heal_after, media_v2_frame_step_us, media_v2_handoff_schedule,
media_v2_has_hevc_recovery_keyframe, media_v2_should_hold_hevc_video_for_recovery,
media_v2_uac_start_timeout, prepare_media_v2_audio, prepare_media_v2_video,
retain_freshest_audio_packet, retain_freshest_video_packet, summarize_media_v2_bundle,
@ -214,6 +215,20 @@ mod tests {
);
}
#[test]
fn media_v2_av_skew_watchdog_is_short_and_disableable() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_V2_AV_SKEW_HEAL_MS", || {
assert_eq!(
media_v2_av_skew_heal_after(),
Some(std::time::Duration::from_millis(1_500))
);
});
temp_env::with_var("LESAVKA_UPSTREAM_V2_AV_SKEW_HEAL_MS", Some("0"), || {
assert_eq!(media_v2_av_skew_heal_after(), None);
});
}
#[test]
fn media_v2_audio_starvation_elapsed_requires_live_video_without_audio() {
let mut starved_since = None;
@ -264,6 +279,52 @@ mod tests {
assert!(starved_since.is_none());
}
#[test]
fn media_v2_av_skew_elapsed_requires_sustained_impossible_capture_span() {
let mut skewed_since = None;
let now = tokio::time::Instant::now();
let skewed = MediaV2BundleFacts {
has_audio: true,
has_video: true,
capture_start_us: 1_000_000,
capture_end_us: 1_400_001,
capture_span_us: 400_001,
max_queue_age_ms: 0,
};
let normal = MediaV2BundleFacts {
capture_end_us: 1_020_000,
capture_span_us: 20_000,
..skewed
};
assert!(!media_v2_av_skew_elapsed(
skewed,
now,
&mut skewed_since,
Some(std::time::Duration::from_millis(1_500))
));
assert!(skewed_since.is_some());
assert!(!media_v2_av_skew_elapsed(
skewed,
now + std::time::Duration::from_millis(1_499),
&mut skewed_since,
Some(std::time::Duration::from_millis(1_500))
));
assert!(media_v2_av_skew_elapsed(
skewed,
now + std::time::Duration::from_millis(1_500),
&mut skewed_since,
Some(std::time::Duration::from_millis(1_500))
));
assert!(!media_v2_av_skew_elapsed(
normal,
now + std::time::Duration::from_millis(1_600),
&mut skewed_since,
Some(std::time::Duration::from_millis(1_500))
));
assert!(skewed_since.is_none());
}
#[test]
/// Keeps server HEVC drop recovery explicit because late-drop freshness can otherwise corrupt decoded video.
fn media_v2_hevc_recovery_holds_delta_until_keyframe() {