lesavka/client/src/app/uplink_media/uplink_queue_metadata.rs

256 lines
10 KiB
Rust

#[cfg(not(coverage))]
/// Keeps `retain_newest_pending_audio` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn retain_newest_pending_audio(pending_audio: &mut Vec<AudioPacket>) -> usize {
if pending_audio.len() <= BUNDLED_AUDIO_MAX_PENDING {
return 0;
}
let dropped = pending_audio.len() - BUNDLED_AUDIO_MAX_PENDING;
pending_audio.drain(..dropped);
dropped
}
#[cfg(not(coverage))]
#[allow(clippy::too_many_arguments)]
/// Keeps `emit_bundled_media` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn emit_bundled_media(
session_id: u64,
bundle_seq: &mut u64,
video: Option<VideoPacket>,
mut audio: Vec<AudioPacket>,
queue: &crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle>,
camera_telemetry: &crate::uplink_telemetry::UplinkTelemetryHandle,
microphone_telemetry: &crate::uplink_telemetry::UplinkTelemetryHandle,
drop_log: &Arc<std::sync::Mutex<UplinkDropLogLimiter>>,
) {
if let Some(video) = video.as_ref() {
let dropped = retain_audio_near_video(video, &mut audio);
if dropped > 0 {
microphone_telemetry.record_stale_drop(dropped as u64);
log_uplink_drop(
drop_log,
UplinkDropReason::Stale,
dropped as u64,
audio.len(),
bundled_audio_video_max_span().as_secs_f32() * 1_000.0,
);
}
}
if video.is_none() && audio.is_empty() {
return;
}
*bundle_seq = bundle_seq.saturating_add(1);
let (capture_start_us, capture_end_us) = bundled_capture_bounds(video.as_ref(), &audio);
let enqueue_now_us = crate::live_capture_clock::capture_pts_us();
let enqueue_age = Duration::from_micros(enqueue_now_us.saturating_sub(capture_start_us));
let has_video = video.is_some();
let has_audio = !audio.is_empty();
let mut bundle = UpstreamMediaBundle {
session_id,
seq: *bundle_seq,
capture_start_us,
capture_end_us,
video,
audio,
..UpstreamMediaBundle::default()
};
if has_audio {
let profile = lesavka_common::audio_transport::packet_audio_profile(
bundle.audio.first().expect("audio was checked as present"),
);
lesavka_common::audio_transport::mark_bundle_audio_profile(&mut bundle, profile);
}
attach_bundle_queue_metadata(&mut bundle, 0, enqueue_age);
let stats = queue.push(bundle, enqueue_age);
if stats.dropped_queue_full > 0 {
if has_video {
camera_telemetry.record_queue_full_drop(stats.dropped_queue_full);
}
if has_audio {
microphone_telemetry.record_queue_full_drop(stats.dropped_queue_full);
}
log_uplink_drop(
drop_log,
UplinkDropReason::QueueFull,
stats.dropped_queue_full,
stats.queue_depth,
duration_ms(enqueue_age),
);
}
let queue_depth = queue_depth_u32(stats.queue_depth);
let age_ms = duration_ms(enqueue_age);
if has_video {
camera_telemetry.record_enqueue(queue_depth, age_ms, 0.0);
}
if has_audio {
microphone_telemetry.record_enqueue(queue_depth, age_ms, 0.0);
}
}
#[cfg(not(coverage))]
/// Drop microphone packets too far away from the video capture timestamp.
///
/// Inputs: one video packet and the pending audio window. Output: the number of
/// removed audio packets. Why: Opus/GStreamer can occasionally emit delayed
/// mic buffers; bundling those with fresh webcam frames makes the server drop
/// the whole A/V bundle, which hurts video much more than omitting stale mic
/// audio for that frame.
fn retain_audio_near_video(video: &VideoPacket, audio: &mut Vec<AudioPacket>) -> usize {
if audio.is_empty() {
return 0;
}
let max_span_us = bundled_audio_video_max_span()
.as_micros()
.min(u128::from(u64::MAX)) as u64;
let video_pts = packet_video_capture_pts_us(video);
let before = audio.len();
audio.retain(|packet| packet_audio_capture_pts_us(packet).abs_diff(video_pts) <= max_span_us);
before.saturating_sub(audio.len())
}
#[cfg(not(coverage))]
/// Keeps `bundled_capture_bounds` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn bundled_capture_bounds(video: Option<&VideoPacket>, audio: &[AudioPacket]) -> (u64, u64) {
let mut start = u64::MAX;
let mut end = 0_u64;
if let Some(video) = video {
let pts = packet_video_capture_pts_us(video);
start = start.min(pts);
end = end.max(pts);
}
for packet in audio {
let pts = packet_audio_capture_pts_us(packet);
start = start.min(pts);
end = end.max(pts);
}
if start == u64::MAX {
let now = crate::live_capture_clock::capture_pts_us();
return (now, now);
}
(start, end.max(start))
}
#[cfg(not(coverage))]
/// Keeps `packet_audio_capture_pts_us` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn packet_audio_capture_pts_us(packet: &AudioPacket) -> u64 {
if packet.client_capture_pts_us == 0 {
packet.pts
} else {
packet.client_capture_pts_us
}
}
#[cfg(not(coverage))]
/// Keeps `packet_video_capture_pts_us` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn packet_video_capture_pts_us(packet: &VideoPacket) -> u64 {
if packet.client_capture_pts_us == 0 {
packet.pts
} else {
packet.client_capture_pts_us
}
}
fn queue_depth_u32(depth: usize) -> u32 {
depth.try_into().unwrap_or(u32::MAX)
}
#[cfg(not(coverage))]
fn duration_ms(duration: Duration) -> f32 {
duration.as_secs_f32() * 1_000.0
}
fn duration_ms_u32(duration: Duration) -> u32 {
duration.as_millis().min(u128::from(u32::MAX)) as u32
}
fn age_between_capture_and_enqueue(capture_pts_us: u64, enqueue_pts_us: u64) -> Duration {
Duration::from_micros(enqueue_pts_us.saturating_sub(capture_pts_us))
}
fn stamp_audio_timing_metadata_at_enqueue(packet: &mut AudioPacket) -> Duration {
static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
let enqueue_pts_us = crate::live_capture_clock::capture_pts_us();
let capture_pts_us = sanitized_capture_pts_us(packet.pts, enqueue_pts_us);
packet.pts = capture_pts_us;
packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = enqueue_pts_us;
age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us)
}
fn stamp_video_timing_metadata_at_enqueue(packet: &mut VideoPacket) -> Duration {
static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
let enqueue_pts_us = crate::live_capture_clock::capture_pts_us();
let capture_pts_us = sanitized_capture_pts_us(packet.pts, enqueue_pts_us);
packet.pts = capture_pts_us;
packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = enqueue_pts_us;
age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us)
}
/// Keeps `sanitized_capture_pts_us` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn sanitized_capture_pts_us(packet_pts_us: u64, enqueue_pts_us: u64) -> u64 {
let mut capture_pts_us = packet_pts_us.min(enqueue_pts_us);
let max_lag_us = crate::live_capture_clock::upstream_source_lag_cap()
.as_micros()
.min(u64::MAX as u128) as u64;
let lag_floor_us = enqueue_pts_us.saturating_sub(max_lag_us);
if capture_pts_us < lag_floor_us {
capture_pts_us = lag_floor_us;
}
capture_pts_us
}
/// Keeps `attach_audio_queue_metadata` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn attach_audio_queue_metadata(
packet: &mut AudioPacket,
queue_depth: usize,
delivery_age: Duration,
) {
if packet.encoding == 0 {
lesavka_common::audio_transport::mark_packet_pcm_s16le(packet);
}
if packet.seq == 0 {
let _ = stamp_audio_timing_metadata_at_enqueue(packet);
}
packet.client_queue_depth = queue_depth_u32(queue_depth);
packet.client_queue_age_ms = duration_ms_u32(delivery_age);
}
/// Keeps `attach_video_queue_metadata` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn attach_video_queue_metadata(
packet: &mut VideoPacket,
queue_depth: usize,
delivery_age: Duration,
) {
if packet.seq == 0 {
let _ = stamp_video_timing_metadata_at_enqueue(packet);
}
packet.client_queue_depth = queue_depth_u32(queue_depth);
packet.client_queue_age_ms = duration_ms_u32(delivery_age);
}
#[cfg(not(coverage))]
/// Keeps `attach_bundle_queue_metadata` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
fn attach_bundle_queue_metadata(
bundle: &mut UpstreamMediaBundle,
queue_depth: usize,
delivery_age: Duration,
) {
for packet in &mut bundle.audio {
attach_audio_queue_metadata(packet, queue_depth, delivery_age);
}
if let Some(packet) = bundle.video.as_mut() {
attach_video_queue_metadata(packet, queue_depth, delivery_age);
}
}