media: auto-heal starved upstream audio

This commit is contained in:
Brad Stein 2026-05-15 17:21:50 -03:00
parent f9058329bc
commit 3aea8b3c16
14 changed files with 167 additions and 24 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.22.42"
version = "0.22.43"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.22.42"
version = "0.22.43"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.22.42"
version = "0.22.43"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.22.42"
version = "0.22.43"
edition = "2024"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.22.42"
version = "0.22.43"
edition = "2024"
build = "build.rs"

View File

@ -297,6 +297,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` |
| `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` |
| `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging |
| `LESAVKA_UPSTREAM_V2_AUDIO_STARVATION_HEAL_MS` | v2 bundled webcam watchdog; if video stays live while audio disappears for this long, the server retires the mic epoch and forces a clean bundled reconnect, defaults to `1500`, set `0` to disable |
| `LESAVKA_UPSTREAM_V2_MAX_LIVE_AGE_MS` | v2 bundled webcam freshness ceiling; bundles already older than this are dropped as one unit, defaults to `1000` |
| `LESAVKA_UPSTREAM_V2_PLAYOUT_DELAY_MS` | v2 optional common playout slack after sync offsets; defaults to `20` and is reduced when needed to protect the live-age budget |
| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_MODE_OFFSETS_US` | server upstream per-UVC-mode output-path map; shipped MJPEG defaults are `1280x720@20=162659,1280x720@30=135090,1920x1080@20=160045,1920x1080@30=127952`; shipped HEVC decode-to-MJPEG defaults are profile-specific under `LESAVKA_UPSTREAM_HEVC_VIDEO_PLAYOUT_MODE_OFFSETS_US` |
@ -314,7 +315,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_UVC_APP_MAX_TIME_NS` | server UVC appsrc memory guard; defaults to `200000000` ns of queued media |
| `LESAVKA_UVC_BLOCKING` | server hardware/device override |
| `LESAVKA_UVC_BULK` | UVC transfer-mode override; defaults to `1` so patched kernels prefer reliable bulk transfer over lossy isochronous MJPEG. If the live kernel does not expose `streaming_bulk`, Lesavka falls back to isochronous descriptors without the 512-byte bulk clamp. Set `0` to force classic isochronous descriptors |
| `LESAVKA_UVC_BUFFER_COUNT` | UVC helper freshness override; number of queued gadget output buffers, defaults to `2` for live-call freshness |
| `LESAVKA_UVC_BUFFER_COUNT` | UVC helper freshness override; number of queued gadget output buffers, defaults to `4` to keep the gadget fed through host/browser scheduling jitter |
| `LESAVKA_UVC_BY_PATH_ROOT` | server hardware/device override |
| `LESAVKA_UVC_CODEC` | server hardware/device override |
| `LESAVKA_UVC_CTRL_BIN` | server hardware/device override |
@ -345,6 +346,7 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_UVC_MAXPAYLOAD_LIMIT` | server hardware/device override |
| `LESAVKA_UVC_MJPEG` | server hardware/device override |
| `LESAVKA_UVC_MJPEG_BUDGET_BYTES_PER_SEC` | UVC helper MJPEG budget guard; derives a per-frame byte cap from target FPS when `LESAVKA_UVC_FRAME_MAX_BYTES` is unset; non-bulk UVC is additionally capped by `LESAVKA_UVC_ISOCHRONOUS_LIMIT_PCT` |
| `LESAVKA_UVC_QUEUE_PACING` | UVC helper queue pacing override; defaults to `0` because the RCT host already paces UVC consumption, and delaying returned buffer requeueing can starve isochronous gadget transfers |
| `LESAVKA_UVC_SKIP_UDEV` | server hardware/device override |
| `LESAVKA_UVC_STATS_INTERVAL_MS` | UVC helper telemetry interval for queued/reloaded/rejected MJPEG frame counters; defaults to `5000`, `0` disables |
| `LESAVKA_UVC_STATS_PATH` | UVC helper JSON stats snapshot path for queued/reloaded/rejected MJPEG frame counters; defaults to `/run/lesavka-uvc-video-stats.json`, set `0` or empty to disable file snapshots |

View File

@ -355,7 +355,7 @@ LESAVKA_UVC_HEIGHT=$(uvc_env_value LESAVKA_UVC_HEIGHT 720)
LESAVKA_UVC_CODEC=${INSTALL_UVC_CODEC}
LESAVKA_UVC_BLOCKING=$(uvc_env_value LESAVKA_UVC_BLOCKING 1)
LESAVKA_UVC_CONTROL_READ_ONLY=$(uvc_env_value LESAVKA_UVC_CONTROL_READ_ONLY 0)
LESAVKA_UVC_QUEUE_PACING=$(uvc_env_value LESAVKA_UVC_QUEUE_PACING 1)
LESAVKA_UVC_QUEUE_PACING=$(uvc_env_value LESAVKA_UVC_QUEUE_PACING 0)
LESAVKA_UVC_MAXBURST=$(uvc_env_value LESAVKA_UVC_MAXBURST 0)
LESAVKA_UVC_BULK=$(uvc_env_value LESAVKA_UVC_BULK 1)
LESAVKA_UVC_FRAME_SIZE_GUARD=$(uvc_env_value LESAVKA_UVC_FRAME_SIZE_GUARD 1)

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.22.42"
version = "0.22.43"
edition = "2024"
autobins = false

View File

@ -48,7 +48,7 @@ const V4L2_PIX_FMT_MJPEG: u32 = u32::from_le_bytes(*b"MJPG");
const MAX_MJPEG_FRAME_BYTES: usize = 8 * 1024 * 1024;
const MINIMAL_MJPEG_FRAME: &[u8] = &[0xff, 0xd8, 0xff, 0xd9];
const IDLE_MJPEG_FRAME: &[u8] = include_bytes!("lesavka_uvc/idle_1280x720_black.jpg");
const DEFAULT_UVC_BUFFER_COUNT: u32 = 2;
const DEFAULT_UVC_BUFFER_COUNT: u32 = 4;
const DEFAULT_UVC_IDLE_PUMP_MS: u64 = 2;
const DEFAULT_UVC_FRAME_MAX_AGE_MS: u64 = 1_000;
const DEFAULT_UVC_MJPEG_BUDGET_BYTES_PER_SEC: u32 = 10_000_000;
@ -657,7 +657,7 @@ fn uvc_idle_pump_sleep() -> Duration {
}
fn uvc_queue_period(fps: u32) -> Option<Duration> {
if !env_flag_enabled("LESAVKA_UVC_QUEUE_PACING", true) {
if !env_flag_enabled("LESAVKA_UVC_QUEUE_PACING", false) {
return None;
}
let fps = fps.max(1);

View File

@ -50,7 +50,7 @@ const UVC_VS_COMMIT_CONTROL: u8 = 0x02;
const UVC_VC_REQUEST_ERROR_CODE_CONTROL: u8 = 0x02;
#[cfg(coverage)]
const DEFAULT_UVC_BUFFER_COUNT: u32 = 2;
const DEFAULT_UVC_BUFFER_COUNT: u32 = 4;
#[cfg(coverage)]
const DEFAULT_UVC_IDLE_PUMP_MS: u64 = 2;
#[cfg(coverage)]

View File

@ -159,7 +159,7 @@ fn uvc_idle_pump_sleep() -> std::time::Duration {
/// `None` when pacing is explicitly disabled. Why: the RCT-facing host must
/// not be overfed faster than the advertised descriptor cadence.
fn uvc_queue_period(fps: u32) -> Option<std::time::Duration> {
if !env_flag_enabled("LESAVKA_UVC_QUEUE_PACING", true) {
if !env_flag_enabled("LESAVKA_UVC_QUEUE_PACING", false) {
return None;
}
let fps = fps.max(1);

View File

@ -135,6 +135,20 @@ fn uvc_frame_budget_caps_isochronous_transport() {
);
}
#[test]
fn uvc_queue_pacing_defaults_off_but_remains_opt_in() {
with_var("LESAVKA_UVC_QUEUE_PACING", None::<&str>, || {
assert_eq!(uvc_queue_period(30), None);
});
with_var("LESAVKA_UVC_QUEUE_PACING", Some("1"), || {
assert_eq!(
uvc_queue_period(30),
Some(std::time::Duration::from_nanos(33_333_333))
);
});
}
#[test]
#[serial]
fn main_coverage_mode_returns_error_for_non_uvc_node() {

View File

@ -2,6 +2,7 @@ const MEDIA_V2_DEFAULT_PLAYOUT_DELAY_MS: u64 = 20;
const MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS: u64 = 1_000;
const MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS: u64 = 750;
const MEDIA_V2_DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 5_000;
const MEDIA_V2_DEFAULT_AUDIO_STARVATION_HEAL_MS: u64 = 1_500;
const MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US: u64 = 250_000;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
@ -126,6 +127,39 @@ fn media_v2_stream_idle_timeout() -> Duration {
.unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_STREAM_IDLE_TIMEOUT_MS))
}
fn media_v2_audio_starvation_heal_after() -> Option<Duration> {
let millis = std::env::var("LESAVKA_UPSTREAM_V2_AUDIO_STARVATION_HEAL_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(MEDIA_V2_DEFAULT_AUDIO_STARVATION_HEAL_MS);
(millis > 0).then(|| Duration::from_millis(millis))
}
/// Track whether a bundled stream has lost audio while video is still live.
///
/// Inputs are the latest bundle facts plus the current starvation marker.
/// Output is true once the audio-free video run exceeds the heal window. Why:
/// a stuck mic capture path can leave the RCT hearing a stale UAC epoch while
/// fresh video keeps flowing; retiring the mic generation forces the client to
/// rebuild the bundled stream without touching USB descriptors.
fn media_v2_audio_starvation_elapsed(
audio_enabled: bool,
facts: MediaV2BundleFacts,
now: tokio::time::Instant,
starved_since: &mut Option<tokio::time::Instant>,
heal_after: Option<Duration>,
) -> bool {
if !audio_enabled || !facts.has_video || facts.has_audio {
*starved_since = None;
return false;
}
let Some(heal_after) = heal_after else {
return false;
};
let since = starved_since.get_or_insert(now);
now.saturating_duration_since(*since) >= heal_after
}
/// Keeps `media_v2_handoff_schedule` explicit because it sits on relay RPC orchestration, where hardware failures must surface without stopping the server.
/// Inputs are the typed parameters; output is the return value or side effect.
fn media_v2_handoff_schedule(

View File

@ -84,6 +84,8 @@ impl Handler {
let mut waiting_for_hevc_keyframe = false;
let mut outcome = "aborted";
let idle_timeout = media_v2_stream_idle_timeout();
let audio_starvation_heal_after = media_v2_audio_starvation_heal_after();
let mut audio_starved_since = None;
let (mut audio_handoff_tx, audio_worker) =
if let Some((microphone_sink_permit, sink)) = microphone_sink {
let (audio_handoff_tx, audio_handoff_rx) =
@ -174,6 +176,29 @@ impl Handler {
let Some(facts) = summarize_media_v2_bundle(&bundle) else {
continue;
};
if media_v2_audio_starvation_elapsed(
audio_enabled,
facts,
bundle_arrived_at,
&mut audio_starved_since,
audio_starvation_heal_after,
) {
let audio_starved_ms = audio_starved_since
.map(|since| bundle_arrived_at.saturating_duration_since(since))
.unwrap_or_default()
.as_millis();
warn!(
rpc_id,
session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
audio_starved_ms,
"📦🩺 v2 bundled microphone starved while video stayed live; retiring upstream audio epoch"
);
upstream_media_rt.soft_recover_microphone();
outcome = "audio-starved";
break;
}
if facts.has_audio && facts.has_video && facts.capture_span_us > MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US {
if matches!(camera_cfg.codec, camera::CameraCodec::Hevc) && facts.has_video {
waiting_for_hevc_keyframe = true;

View File

@ -2,11 +2,11 @@
#[allow(clippy::items_after_test_module)]
mod tests {
use super::{
MediaV2BundleFacts, UpstreamStreamCleanup, media_v2_frame_step_us,
media_v2_handoff_schedule, media_v2_has_hevc_recovery_keyframe,
media_v2_should_hold_hevc_video_for_recovery, prepare_media_v2_audio,
prepare_media_v2_video, retain_freshest_audio_packet, retain_freshest_video_packet,
summarize_media_v2_bundle, media_v2_uac_start_timeout,
MediaV2BundleFacts, UpstreamStreamCleanup, media_v2_audio_starvation_elapsed,
media_v2_audio_starvation_heal_after, media_v2_frame_step_us, media_v2_handoff_schedule,
media_v2_has_hevc_recovery_keyframe, media_v2_should_hold_hevc_video_for_recovery,
media_v2_uac_start_timeout, prepare_media_v2_audio, prepare_media_v2_video,
retain_freshest_audio_packet, retain_freshest_video_packet, summarize_media_v2_bundle,
};
use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket};
use lesavka_server::camera::CameraCodec;
@ -196,6 +196,74 @@ mod tests {
});
}
#[test]
fn media_v2_audio_starvation_watchdog_is_short_and_disableable() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_V2_AUDIO_STARVATION_HEAL_MS", || {
assert_eq!(
media_v2_audio_starvation_heal_after(),
Some(std::time::Duration::from_millis(1_500))
);
});
temp_env::with_var(
"LESAVKA_UPSTREAM_V2_AUDIO_STARVATION_HEAL_MS",
Some("0"),
|| {
assert_eq!(media_v2_audio_starvation_heal_after(), None);
},
);
}
#[test]
fn media_v2_audio_starvation_elapsed_requires_live_video_without_audio() {
let mut starved_since = None;
let now = tokio::time::Instant::now();
let video_only = MediaV2BundleFacts {
has_audio: false,
has_video: true,
capture_start_us: 1_000,
capture_end_us: 1_000,
capture_span_us: 0,
max_queue_age_ms: 0,
};
let audio_video = MediaV2BundleFacts {
has_audio: true,
has_video: true,
..video_only
};
assert!(!media_v2_audio_starvation_elapsed(
true,
video_only,
now,
&mut starved_since,
Some(std::time::Duration::from_millis(1_500))
));
assert!(starved_since.is_some());
assert!(!media_v2_audio_starvation_elapsed(
true,
video_only,
now + std::time::Duration::from_millis(1_499),
&mut starved_since,
Some(std::time::Duration::from_millis(1_500))
));
assert!(media_v2_audio_starvation_elapsed(
true,
video_only,
now + std::time::Duration::from_millis(1_500),
&mut starved_since,
Some(std::time::Duration::from_millis(1_500))
));
assert!(!media_v2_audio_starvation_elapsed(
true,
audio_video,
now + std::time::Duration::from_millis(1_600),
&mut starved_since,
Some(std::time::Duration::from_millis(1_500))
));
assert!(starved_since.is_none());
}
#[test]
/// Keeps server HEVC drop recovery explicit because late-drop freshness can otherwise corrupt decoded video.
fn media_v2_hevc_recovery_holds_delta_until_keyframe() {

View File

@ -359,12 +359,9 @@ mod uvc_binary_extra {
with_var("LESAVKA_UVC_BUFFER_COUNT", None::<&str>, || {
with_var("LESAVKA_UVC_IDLE_PUMP_MS", None::<&str>, || {
with_var("LESAVKA_UVC_FRAME_MAX_AGE_MS", None::<&str>, || {
assert_eq!(uvc_buffer_count(), 2);
assert_eq!(uvc_buffer_count(), 4);
assert_eq!(uvc_idle_pump_sleep(), std::time::Duration::from_millis(2));
assert_eq!(
uvc_queue_period(30),
Some(std::time::Duration::from_nanos(33_333_333))
);
assert_eq!(uvc_queue_period(30), None);
assert_eq!(
frame_spool_max_age(),
Some(std::time::Duration::from_millis(1_000))
@ -380,10 +377,13 @@ mod uvc_binary_extra {
with_var("LESAVKA_UVC_BUFFER_COUNT", Some("99"), || {
with_var("LESAVKA_UVC_IDLE_PUMP_MS", Some("11"), || {
with_var("LESAVKA_UVC_FRAME_MAX_AGE_MS", Some("0"), || {
with_var("LESAVKA_UVC_QUEUE_PACING", Some("0"), || {
with_var("LESAVKA_UVC_QUEUE_PACING", Some("1"), || {
assert_eq!(uvc_buffer_count(), 8);
assert_eq!(uvc_idle_pump_sleep(), std::time::Duration::from_millis(11));
assert_eq!(uvc_queue_period(30), None);
assert_eq!(
uvc_queue_period(30),
Some(std::time::Duration::from_nanos(33_333_333))
);
assert_eq!(frame_spool_max_age(), None);
});
});