media: harden hevc upstream smoothness

This commit is contained in:
Brad Stein 2026-05-09 18:19:48 -03:00
parent e7a6d8f288
commit aeae7e7e07
13 changed files with 273 additions and 18 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.21.11"
version = "0.21.12"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.21.11"
version = "0.21.12"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.21.11"
version = "0.21.12"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.21.11"
version = "0.21.12"
edition = "2024"
[dependencies]

View File

@ -6,6 +6,8 @@ include!("uplink_media/camera_loop.rs");
include!("uplink_media/media_source_requirements.rs");
include!("uplink_media/video_keyframes.rs");
include!("uplink_media/bundled_media_queue.rs");
include!("uplink_media/uplink_queue_metadata.rs");

View File

@ -148,6 +148,43 @@ use super::*;
assert!(!disables_upstream_epoch_auto_heal("yes"));
}
#[test]
/// Keeps HEVC recovery explicit because freshness drops can otherwise resume from predictive frames.
fn hevc_keyframe_detection_recognizes_irap_access_units() {
assert!(contains_hevc_irap(&[0, 0, 0, 1, 0x26, 0x01, 0xaa]));
assert!(contains_hevc_irap(&[0, 0, 1, 0x28, 0x01, 0xaa]));
assert!(!contains_hevc_irap(&[0, 0, 0, 1, 0x02, 0x01, 0xaa]));
}
#[test]
/// Keeps queue-drop recovery explicit because stale HEVC bundles should not corrupt the decoder.
fn hevc_recovery_holds_delta_bundle_until_next_keyframe() {
let delta_bundle = UpstreamMediaBundle {
video: Some(VideoPacket {
data: vec![0, 0, 0, 1, 0x02, 0x01, 0xaa],
..Default::default()
}),
..Default::default()
};
let keyframe_bundle = UpstreamMediaBundle {
video: Some(VideoPacket {
data: vec![0, 0, 0, 1, 0x26, 0x01, 0xaa],
..Default::default()
}),
..Default::default()
};
assert!(should_hold_hevc_bundle_for_keyframe_recovery(
true,
&delta_bundle
));
assert!(!should_hold_hevc_bundle_for_keyframe_recovery(
true,
&keyframe_bundle
));
assert!(bundle_has_hevc_recovery_keyframe(&keyframe_bundle));
}
/// Verifies the live uplink queue emits one physically bundled HEVC frame and PCM span.
///
/// Inputs: pre-stamped HEVC video plus two nearby audio packets, exactly as

View File

@ -0,0 +1,90 @@
#[cfg(not(coverage))]
/// Detect whether an Annex-B HEVC access unit contains an intra recovery point.
///
/// Inputs: one encoded HEVC packet in byte-stream form. Output: `true` when the
/// packet carries an IRAP NAL unit. Why: the freshness-first uplink may drop
/// predictive frames, and the server decoder should only resume from a frame
/// that can rebuild a clean picture.
fn contains_hevc_irap(data: &[u8]) -> bool {
let mut offset = 0;
while let Some((start, prefix_len)) = find_annex_b_start_code(data, offset) {
let nal_index = start + prefix_len;
if nal_index + 1 < data.len() {
let nal_type = (data[nal_index] >> 1) & 0x3f;
if (16..=23).contains(&nal_type) {
return true;
}
}
offset = nal_index.saturating_add(2);
}
false
}
#[cfg(not(coverage))]
/// Locate the next Annex-B start code in an encoded video packet.
///
/// Inputs: encoded bytes plus the search offset. Output: the start index and
/// prefix length. Why: both three-byte and four-byte start codes appear in
/// HEVC streams, and keyframe recovery must handle both without allocating.
fn find_annex_b_start_code(data: &[u8], from: usize) -> Option<(usize, usize)> {
let mut index = from;
while index + 3 < data.len() {
if data[index] == 0 && data[index + 1] == 0 {
if data[index + 2] == 1 {
return Some((index, 3));
}
if index + 4 < data.len() && data[index + 2] == 0 && data[index + 3] == 1 {
return Some((index, 4));
}
}
index += 1;
}
None
}
#[cfg(not(coverage))]
/// Decide whether a bundled HEVC packet is safe after a freshness drop.
///
/// Inputs: the recovery state and the next outbound bundle. Output: `true`
/// when the bundle should be held back. Why: sending a non-keyframe immediately
/// after dropping older HEVC packets can make the server decode from missing
/// references, which shows up as tearing or grey corrupted frames.
fn should_hold_hevc_bundle_for_keyframe_recovery(
waiting_for_keyframe: bool,
bundle: &UpstreamMediaBundle,
) -> bool {
waiting_for_keyframe
&& bundle
.video
.as_ref()
.is_some_and(|video| !contains_hevc_irap(&video.data))
}
#[cfg(not(coverage))]
/// Report whether a bundle carries the HEVC recovery frame we were waiting for.
///
/// Inputs: an outbound media bundle. Output: true when its video packet can
/// restart HEVC prediction. Why: the freshness policy should clear recovery
/// mode as soon as the next clean picture reaches the server.
fn bundle_has_hevc_recovery_keyframe(bundle: &UpstreamMediaBundle) -> bool {
bundle
.video
.as_ref()
.is_some_and(|video| contains_hevc_irap(&video.data))
}
#[cfg(not(coverage))]
/// Resolve whether the active upstream camera codec needs HEVC recovery.
///
/// Inputs: the negotiated camera config plus optional env fallback. Output:
/// `true` only for HEVC/H.265. Why: MJPEG frames are independent, so the extra
/// keyframe wait should only apply to predictive video transport.
fn upstream_camera_uses_hevc(camera_cfg: Option<crate::input::camera::CameraConfig>) -> bool {
if camera_cfg.is_some_and(|cfg| matches!(cfg.codec, crate::input::camera::CameraCodec::Hevc)) {
return true;
}
std::env::var("LESAVKA_CAM_CODEC")
.ok()
.map(|value| value.trim().to_ascii_lowercase())
.is_some_and(|value| matches!(value.as_str(), "hevc" | "h265" | "h.265"))
}

View File

@ -16,6 +16,7 @@ impl LesavkaClientApp {
) {
let mut delay = Duration::from_secs(1);
let mut startup_epoch_heal_delay = upstream_epoch_auto_heal_delay();
let recover_hevc_after_drops = upstream_camera_uses_hevc(camera_cfg);
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
@ -123,6 +124,7 @@ impl LesavkaClientApp {
let microphone_telemetry_stream = microphone_telemetry.clone();
let drop_log_stream = Arc::clone(&drop_log);
let outbound = async_stream::stream! {
let mut waiting_for_hevc_keyframe = false;
loop {
let next = queue_stream.pop_fresh().await;
if next.dropped_stale > 0 {
@ -135,8 +137,31 @@ impl LesavkaClientApp {
next.queue_depth,
duration_ms(next.delivery_age),
);
if recover_hevc_after_drops {
waiting_for_hevc_keyframe = true;
}
}
if let Some(mut bundle) = next.packet {
if recover_hevc_after_drops
&& should_hold_hevc_bundle_for_keyframe_recovery(
waiting_for_hevc_keyframe,
&bundle,
)
{
camera_telemetry_stream.record_stale_drop(1);
microphone_telemetry_stream.record_stale_drop(1);
log_uplink_drop(
&drop_log_stream,
UplinkDropReason::Stale,
1,
next.queue_depth,
duration_ms(next.delivery_age),
);
continue;
}
if recover_hevc_after_drops && bundle_has_hevc_recovery_keyframe(&bundle) {
waiting_for_hevc_keyframe = false;
}
let queue_depth = queue_depth_u32(next.queue_depth);
let delivery_age_ms = duration_ms(next.delivery_age);
if bundle.video.is_some() {
@ -175,6 +200,7 @@ impl LesavkaClientApp {
let active_camera_source = active_camera_source.clone();
let active_camera_profile = active_camera_profile.clone();
std::thread::spawn(move || {
let mut waiting_for_hevc_keyframe = false;
while !stop.load(Ordering::Relaxed) {
let state = media_controls.refresh();
let desired_source =
@ -190,10 +216,27 @@ impl LesavkaClientApp {
break;
}
if let Some(mut pkt) = camera.pull() {
if recover_hevc_after_drops
&& waiting_for_hevc_keyframe
&& !contains_hevc_irap(&pkt.data)
{
continue;
}
let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt);
let is_recovery_keyframe = recover_hevc_after_drops
&& contains_hevc_irap(&pkt.data);
match event_tx.try_send(BundledCaptureEvent::Video(pkt)) {
Ok(()) => {}
Err(std::sync::mpsc::TrySendError::Full(_)) => continue,
Ok(()) => {
if is_recovery_keyframe {
waiting_for_hevc_keyframe = false;
}
}
Err(std::sync::mpsc::TrySendError::Full(_)) => {
if recover_hevc_after_drops {
waiting_for_hevc_keyframe = true;
}
continue;
}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
}
}

View File

@ -66,7 +66,8 @@ include!("camera/bus_and_encoder.rs");
#[cfg(test)]
mod tests {
use super::{
CameraCapture, CameraCodec, CameraConfig, resolved_capture_profile, resolved_output_profile,
CameraCapture, CameraCodec, CameraConfig, hevc_keyframe_interval, resolved_capture_profile,
resolved_output_profile,
};
use serial_test::serial;
@ -194,6 +195,31 @@ mod tests {
});
}
#[test]
#[serial]
/// HEVC should recover quickly after freshness drops without changing H.264 knobs.
fn hevc_keyframe_interval_defaults_short_and_honors_overrides() {
temp_env::with_vars(
[
("LESAVKA_CAM_KEYFRAME_INTERVAL", None::<&str>),
("LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL", None::<&str>),
],
|| {
assert_eq!(hevc_keyframe_interval(30), 3);
assert_eq!(hevc_keyframe_interval(2), 2);
},
);
temp_env::with_vars(
[
("LESAVKA_CAM_KEYFRAME_INTERVAL", Some("5")),
("LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL", Some("1")),
],
|| {
assert_eq!(hevc_keyframe_interval(30), 1);
},
);
}
#[cfg(coverage)]
#[test]
/// Coverage builds use a deterministic HEVC encoder choice.

View File

@ -62,7 +62,11 @@ impl CameraCapture {
let capture_profile = capture_profile_override.unwrap_or_else(|| resolved_capture_profile(cfg));
let (capture_width, capture_height, capture_fps) = capture_profile;
let (width, height, fps) = resolved_output_profile(cfg, capture_profile);
let keyframe_interval = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps);
let keyframe_interval = if output_hevc {
hevc_keyframe_interval(fps)
} else {
env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(5)).clamp(1, fps)
};
let source_profile = camera_source_profile(allow_mjpg_source);
let use_mjpg_source = source_profile == CameraSourceProfile::Mjpeg;
let passthrough_mjpg_source =
@ -346,6 +350,18 @@ fn env_flag_enabled(name: &str) -> bool {
})
}
/// Choose the live HEVC keyframe cadence.
///
/// Inputs: target FPS plus optional `LESAVKA_CAM_KEYFRAME_INTERVAL` and
/// `LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL` overrides. Output: GOP length in
/// frames. Why: the uplink intentionally drops stale HEVC bundles for
/// freshness, so short GOPs keep decoder recovery below a human-visible blink.
fn hevc_keyframe_interval(fps: u32) -> u32 {
let fps = fps.max(1);
let generic_default = env_u32("LESAVKA_CAM_KEYFRAME_INTERVAL", fps.min(3));
env_u32("LESAVKA_CAM_HEVC_KEYFRAME_INTERVAL", generic_default).clamp(1, fps)
}
/// Keeps `log_camera_first_packet` explicit because it sits on camera selection, where negotiated profiles must match the server output contract.
/// Inputs are the typed parameters; output is the return value or side effect.
fn log_camera_first_packet(packet_index: u64, bytes: usize, pts_us: u64) {

View File

@ -264,7 +264,7 @@ fn pick_hevc_encoder(fps: u32) -> Result<String> {
/// real webcam uplink; a one-second GOP made coded flashes less representative
/// than Lesavka's default live-call HEVC pipeline.
fn low_latency_hevc_keyframe_interval(fps: u32) -> u32 {
fps.clamp(1, 5)
fps.clamp(1, 3)
}
/// Select the visual signature for a video timestamp.
@ -406,9 +406,9 @@ mod tests {
fn low_latency_hevc_keyframe_interval_matches_live_camera_default() {
assert_eq!(super::low_latency_hevc_keyframe_interval(0), 1);
assert_eq!(super::low_latency_hevc_keyframe_interval(1), 1);
assert_eq!(super::low_latency_hevc_keyframe_interval(5), 5);
assert_eq!(super::low_latency_hevc_keyframe_interval(20), 5);
assert_eq!(super::low_latency_hevc_keyframe_interval(30), 5);
assert_eq!(super::low_latency_hevc_keyframe_interval(5), 3);
assert_eq!(super::low_latency_hevc_keyframe_interval(20), 3);
assert_eq!(super::low_latency_hevc_keyframe_interval(30), 3);
}
/// Verifies encoded packet timestamps come from the encoder output sample.

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.21.11"
version = "0.21.12"
edition = "2024"
build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.21.11"
version = "0.21.12"
edition = "2024"
autobins = false

View File

@ -45,7 +45,7 @@ const V4L2_BUF_TYPE_VIDEO_OUTPUT: u32 = 2;
const V4L2_MEMORY_MMAP: u32 = 1;
const V4L2_FIELD_NONE: u32 = 1;
const V4L2_PIX_FMT_MJPEG: u32 = u32::from_le_bytes(*b"MJPG");
const MAX_MJPEG_FRAME_BYTES: usize = 1024 * 1024;
const MAX_MJPEG_FRAME_BYTES: usize = 8 * 1024 * 1024;
const EMPTY_MJPEG_FRAME: &[u8] = &[0xff, 0xd8, 0xff, 0xd9];
const DEFAULT_UVC_BUFFER_COUNT: u32 = 2;
const DEFAULT_UVC_IDLE_PUMP_MS: u64 = 2;
@ -400,10 +400,11 @@ impl UvcVideoStream {
let Some(buffer) = self.buffers.get(index as usize) else {
return Ok(());
};
let bytes = self.latest_frame.len().min(buffer.len);
let frame = self.frame_for_buffer(buffer.len);
let bytes = frame.len();
if bytes > 0 {
unsafe {
std::ptr::copy_nonoverlapping(self.latest_frame.as_ptr(), buffer.ptr, bytes);
std::ptr::copy_nonoverlapping(frame.as_ptr(), buffer.ptr, bytes);
}
} else {
unsafe {
@ -428,8 +429,9 @@ impl UvcVideoStream {
if stale && looks_like_mjpeg_frame(&self.latest_frame) {
return;
}
let max_frame_bytes = self.frame_payload_limit();
if let Ok(frame) = std::fs::read(&self.frame_path)
&& frame.len() <= MAX_MJPEG_FRAME_BYTES
&& frame.len() <= max_frame_bytes
&& looks_like_mjpeg_frame(&frame)
{
self.latest_frame = frame;
@ -437,6 +439,22 @@ impl UvcVideoStream {
self.latest_frame = EMPTY_MJPEG_FRAME.to_vec();
}
}
fn frame_payload_limit(&self) -> usize {
self.buffers
.iter()
.map(|buffer| buffer.len)
.min()
.unwrap_or(MAX_MJPEG_FRAME_BYTES)
}
fn frame_for_buffer(&self, buffer_len: usize) -> &[u8] {
if self.latest_frame.len() <= buffer_len && looks_like_mjpeg_frame(&self.latest_frame) {
&self.latest_frame
} else {
EMPTY_MJPEG_FRAME
}
}
}
impl Drop for UvcVideoStream {

View File

@ -400,6 +400,29 @@ mod uvc_binary_extra {
));
}
#[test]
fn uvc_frame_refresh_rejects_frames_that_would_be_truncated() {
let frame = NamedTempFile::new().expect("tmp frame");
let mut stream = UvcVideoStream::new(-1);
stream.frame_path = frame.path().to_path_buf();
stream.latest_frame = vec![0xff, 0xd8, 0x11, 0xff, 0xd9];
stream.buffers.push(MmapBuffer {
ptr: std::ptr::null_mut(),
len: 8,
});
fs::write(
frame.path(),
[0xff, 0xd8, 1, 2, 3, 4, 5, 6, 7, 8, 0xff, 0xd9],
)
.expect("write oversize frame");
stream.refresh_latest_frame();
assert_eq!(stream.latest_frame, vec![0xff, 0xd8, 0x11, 0xff, 0xd9]);
assert_eq!(stream.frame_payload_limit(), 8);
assert_eq!(stream.frame_for_buffer(4), EMPTY_MJPEG_FRAME);
}
#[test]
fn compute_payload_cap_clamps_limit_pct_bounds() {
with_var("LESAVKA_UVC_MAXPAYLOAD_LIMIT", None::<&str>, || {