fix: stamp upstream timing before async send

This commit is contained in:
Brad Stein 2026-05-02 19:38:05 -03:00
parent d2f312b14d
commit 0188c8661b
12 changed files with 220 additions and 94 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.17.28"
version = "0.17.29"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.17.28"
version = "0.17.29"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.17.28"
version = "0.17.29"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.17.28"
version = "0.17.29"
edition = "2024"
[dependencies]

View File

@ -89,7 +89,7 @@ impl LesavkaClientApp {
queue_depth_u32(next.queue_depth),
duration_ms(next.delivery_age),
);
attach_audio_timing_metadata(
attach_audio_queue_metadata(
&mut packet,
next.queue_depth,
next.delivery_age,
@ -140,9 +140,9 @@ impl LesavkaClientApp {
tracing::info!("🎤 microphone uplink resumed");
paused = false;
}
if let Some(pkt) = mic_clone.pull() {
if let Some(mut pkt) = mic_clone.pull() {
trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len());
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
let enqueue_age = stamp_audio_timing_metadata_at_enqueue(&mut pkt);
let stats = queue_thread.push(pkt, enqueue_age);
if stats.dropped_queue_full > 0 {
telemetry_thread.record_queue_full_drop(stats.dropped_queue_full);
@ -275,7 +275,7 @@ impl LesavkaClientApp {
queue_depth_u32(next.queue_depth),
duration_ms(next.delivery_age),
);
attach_video_timing_metadata(
attach_video_queue_metadata(
&mut packet,
next.queue_depth,
next.delivery_age,
@ -361,7 +361,7 @@ impl LesavkaClientApp {
telemetry.record_enabled(true);
tracing::info!("📸 webcam uplink resumed");
}
let Some(pkt) = cam.pull() else {
let Some(mut pkt) = cam.pull() else {
std::thread::sleep(Duration::from_millis(5));
continue;
};
@ -372,7 +372,7 @@ impl LesavkaClientApp {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
let enqueue_age = stamp_video_timing_metadata_at_enqueue(&mut pkt);
let stats = queue.push(pkt, enqueue_age);
if stats.dropped_queue_full > 0 {
telemetry.record_queue_full_drop(stats.dropped_queue_full);
@ -501,38 +501,54 @@ fn duration_ms_u32(duration: Duration) -> u32 {
}
#[cfg(not(coverage))]
fn shared_capture_window_from_delivery_age(delivery_age: Duration) -> (u64, u64) {
let send_pts_us = crate::live_capture_clock::capture_pts_us();
let age_us = delivery_age.as_micros().min(u128::from(u64::MAX)) as u64;
(send_pts_us.saturating_sub(age_us), send_pts_us)
fn age_between_capture_and_enqueue(capture_pts_us: u64, enqueue_pts_us: u64) -> Duration {
Duration::from_micros(enqueue_pts_us.saturating_sub(capture_pts_us))
}
#[cfg(not(coverage))]
fn attach_audio_timing_metadata(
fn stamp_audio_timing_metadata_at_enqueue(packet: &mut AudioPacket) -> Duration {
static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
let enqueue_pts_us = crate::live_capture_clock::capture_pts_us();
let capture_pts_us = packet.pts.min(enqueue_pts_us);
packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = enqueue_pts_us;
age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us)
}
#[cfg(not(coverage))]
fn stamp_video_timing_metadata_at_enqueue(packet: &mut VideoPacket) -> Duration {
static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
let enqueue_pts_us = crate::live_capture_clock::capture_pts_us();
let capture_pts_us = packet.pts.min(enqueue_pts_us);
packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = enqueue_pts_us;
age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us)
}
#[cfg(not(coverage))]
fn attach_audio_queue_metadata(
packet: &mut AudioPacket,
queue_depth: usize,
delivery_age: Duration,
) {
static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
let (capture_pts_us, send_pts_us) = shared_capture_window_from_delivery_age(delivery_age);
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = send_pts_us;
if packet.seq == 0 {
let _ = stamp_audio_timing_metadata_at_enqueue(packet);
}
packet.client_queue_depth = queue_depth_u32(queue_depth);
packet.client_queue_age_ms = duration_ms_u32(delivery_age);
}
#[cfg(not(coverage))]
fn attach_video_timing_metadata(
fn attach_video_queue_metadata(
packet: &mut VideoPacket,
queue_depth: usize,
delivery_age: Duration,
) {
static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0);
packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1);
let (capture_pts_us, send_pts_us) = shared_capture_window_from_delivery_age(delivery_age);
packet.client_capture_pts_us = capture_pts_us;
packet.client_send_pts_us = send_pts_us;
if packet.seq == 0 {
let _ = stamp_video_timing_metadata_at_enqueue(packet);
}
packet.client_queue_depth = queue_depth_u32(queue_depth);
packet.client_queue_age_ms = duration_ms_u32(delivery_age);
}
@ -631,48 +647,70 @@ mod uplink_timing_tests {
use super::*;
#[test]
fn audio_timing_metadata_uses_shared_clock_window_instead_of_packet_pts_domain() {
fn audio_timing_metadata_is_stamped_before_async_queue_pop() {
std::thread::sleep(Duration::from_millis(5));
let packet_pts_us = crate::live_capture_clock::capture_pts_us().saturating_sub(2_000);
let mut packet = AudioPacket {
pts: 9_999_999,
pts: packet_pts_us,
..AudioPacket::default()
};
attach_audio_timing_metadata(&mut packet, 3, Duration::from_millis(2));
let enqueue_age = stamp_audio_timing_metadata_at_enqueue(&mut packet);
let capture_pts_us = packet.client_capture_pts_us;
let send_pts_us = packet.client_send_pts_us;
std::thread::sleep(Duration::from_millis(5));
attach_audio_queue_metadata(
&mut packet,
3,
enqueue_age.saturating_add(Duration::from_millis(5)),
);
assert!(packet.seq > 0);
assert_eq!(packet.client_queue_depth, 3);
assert_eq!(packet.client_queue_age_ms, 2);
assert!(packet.client_queue_age_ms >= 5);
assert_eq!(packet.client_capture_pts_us, capture_pts_us);
assert_eq!(packet.client_send_pts_us, send_pts_us);
assert!(
packet.client_send_pts_us >= packet.client_capture_pts_us,
"send must be on or after the shared-clock capture estimate"
"enqueue/send stamp must be on or after the shared-clock capture estimate"
);
assert!(
packet.client_send_pts_us - packet.client_capture_pts_us <= 2_000,
"delivery age, not packet PTS domain, should define the timing window"
packet.client_send_pts_us - packet.client_capture_pts_us <= 3_000,
"capture-to-enqueue age, not async pop delay, should define the timing window"
);
}
#[test]
fn video_timing_metadata_uses_shared_clock_window_instead_of_packet_pts_domain() {
fn video_timing_metadata_is_stamped_before_async_queue_pop() {
std::thread::sleep(Duration::from_millis(5));
let packet_pts_us = crate::live_capture_clock::capture_pts_us().saturating_sub(3_000);
let mut packet = VideoPacket {
pts: 9_999_999,
pts: packet_pts_us,
..VideoPacket::default()
};
attach_video_timing_metadata(&mut packet, 4, Duration::from_millis(3));
let enqueue_age = stamp_video_timing_metadata_at_enqueue(&mut packet);
let capture_pts_us = packet.client_capture_pts_us;
let send_pts_us = packet.client_send_pts_us;
std::thread::sleep(Duration::from_millis(5));
attach_video_queue_metadata(
&mut packet,
4,
enqueue_age.saturating_add(Duration::from_millis(5)),
);
assert!(packet.seq > 0);
assert_eq!(packet.client_queue_depth, 4);
assert_eq!(packet.client_queue_age_ms, 3);
assert!(packet.client_queue_age_ms >= 5);
assert_eq!(packet.client_capture_pts_us, capture_pts_us);
assert_eq!(packet.client_send_pts_us, send_pts_us);
assert!(
packet.client_send_pts_us >= packet.client_capture_pts_us,
"send must be on or after the shared-clock capture estimate"
"enqueue/send stamp must be on or after the shared-clock capture estimate"
);
assert!(
packet.client_send_pts_us - packet.client_capture_pts_us <= 3_000,
"delivery age, not packet PTS domain, should define the timing window"
packet.client_send_pts_us - packet.client_capture_pts_us <= 4_000,
"capture-to-enqueue age, not async pop delay, should define the timing window"
);
}
}

View File

@ -30,6 +30,7 @@ pub fn capture_pts_us() -> u64 {
/// Why: upstream freshness telemetry should use the same shared live clock as
/// packet timestamps so queue-age calculations stay honest.
#[must_use]
#[allow(dead_code)]
pub fn packet_age(pts_us: u64) -> Duration {
Duration::from_micros(capture_pts_us().saturating_sub(pts_us))
}

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.17.28"
version = "0.17.29"
edition = "2024"
build = "build.rs"

View File

@ -26,7 +26,9 @@ message VideoPacket {
uint32 server_queue_peak = 10;
string server_encoder_label = 11;
uint32 server_process_cpu_tenths = 12;
// Shared client media-clock timestamp for the packet's capture estimate.
uint64 client_capture_pts_us = 13;
// Shared client media-clock timestamp when the packet entered the uplink queue.
uint64 client_send_pts_us = 14;
uint32 client_queue_depth = 15;
uint32 client_queue_age_ms = 16;
@ -36,7 +38,9 @@ message AudioPacket {
uint64 pts = 2;
bytes data = 3;
uint64 seq = 4;
// Shared client media-clock timestamp for the packet's capture estimate.
uint64 client_capture_pts_us = 5;
// Shared client media-clock timestamp when the packet entered the uplink queue.
uint64 client_send_pts_us = 6;
uint32 client_queue_depth = 7;
uint32 client_queue_age_ms = 8;

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.17.28"
version = "0.17.29"
edition = "2024"
autobins = false

View File

@ -137,10 +137,22 @@ impl UpstreamMediaRuntime {
received_at: Instant::now(),
};
match kind {
UpstreamMediaKind::Camera => state.latest_camera_timing = Some(sample),
UpstreamMediaKind::Microphone => state.latest_microphone_timing = Some(sample),
UpstreamMediaKind::Camera => {
state.latest_camera_timing = Some(sample);
push_timing_sample(&mut state.recent_camera_timing, sample);
state
.camera_client_queue_age_window_ms
.push(f64::from(timing.queue_age_ms));
}
UpstreamMediaKind::Microphone => {
state.latest_microphone_timing = Some(sample);
push_timing_sample(&mut state.recent_microphone_timing, sample);
state
.microphone_client_queue_age_window_ms
.push(f64::from(timing.queue_age_ms));
}
}
record_client_timing_windows(&mut state);
record_client_timing_windows(&mut state, kind, sample);
}
/// Mark one video frame as actually handed to the UVC/HDMI sink.
@ -186,23 +198,9 @@ impl UpstreamMediaRuntime {
_ => None,
};
let now = Instant::now();
let client_capture_skew_ms = skew_ms_from_samples(
state.latest_camera_timing,
state.latest_microphone_timing,
|sample| sample.capture_pts_us,
);
let client_send_skew_ms = skew_ms_from_samples(
state.latest_camera_timing,
state.latest_microphone_timing,
|sample| sample.send_pts_us,
);
let server_receive_skew_ms =
match (state.latest_camera_timing, state.latest_microphone_timing) {
(Some(camera), Some(microphone)) => Some(
instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0,
),
_ => None,
};
let client_capture_skew_ms = state.latest_paired_client_capture_skew_ms;
let client_send_skew_ms = state.latest_paired_client_send_skew_ms;
let server_receive_skew_ms = state.latest_paired_server_receive_skew_ms;
UpstreamPlannerSnapshot {
session_id: state.session_id,
phase: state.phase.as_str(),
@ -710,19 +708,6 @@ fn us_to_ms(value: u64) -> f64 {
value as f64 / 1000.0
}
fn skew_ms_from_samples(
camera: Option<state::UpstreamTimingSample>,
microphone: Option<state::UpstreamTimingSample>,
value: impl Fn(state::UpstreamTimingSample) -> u64,
) -> Option<f64> {
match (camera, microphone) {
(Some(camera), Some(microphone)) => {
Some((value(camera) as i128 - value(microphone) as i128) as f64 / 1000.0)
}
_ => None,
}
}
fn instant_delta_us(left: Instant, right: Instant) -> i128 {
if left >= right {
left.saturating_duration_since(right).as_micros() as i128
@ -731,27 +716,72 @@ fn instant_delta_us(left: Instant, right: Instant) -> i128 {
}
}
fn record_client_timing_windows(state: &mut UpstreamClockState) {
let (Some(camera), Some(microphone)) =
(state.latest_camera_timing, state.latest_microphone_timing)
else {
const CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US: u64 = 250_000;
fn push_timing_sample(
samples: &mut std::collections::VecDeque<state::UpstreamTimingSample>,
sample: state::UpstreamTimingSample,
) {
if samples.len() >= state::TIMING_WINDOW_CAPACITY {
samples.pop_front();
}
samples.push_back(sample);
}
fn abs_delta_us(left: u64, right: u64) -> u64 {
left.max(right) - left.min(right)
}
fn nearest_timing_sample_by_send(
samples: &std::collections::VecDeque<state::UpstreamTimingSample>,
send_pts_us: u64,
) -> Option<state::UpstreamTimingSample> {
samples
.iter()
.copied()
.min_by_key(|sample| abs_delta_us(sample.send_pts_us, send_pts_us))
}
fn record_client_timing_windows(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
sample: state::UpstreamTimingSample,
) {
let paired = match kind {
UpstreamMediaKind::Camera => {
nearest_timing_sample_by_send(&state.recent_microphone_timing, sample.send_pts_us)
.map(|microphone| (sample, microphone))
}
UpstreamMediaKind::Microphone => {
nearest_timing_sample_by_send(&state.recent_camera_timing, sample.send_pts_us)
.map(|camera| (camera, sample))
}
};
let Some((camera, microphone)) = paired else {
return;
};
if abs_delta_us(camera.send_pts_us, microphone.send_pts_us)
> CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US
{
return;
}
let client_capture_skew_ms =
(camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0;
let client_send_skew_ms =
(camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0;
let server_receive_skew_ms =
instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0;
state.latest_paired_client_capture_skew_ms = Some(client_capture_skew_ms);
state.latest_paired_client_send_skew_ms = Some(client_send_skew_ms);
state.latest_paired_server_receive_skew_ms = Some(server_receive_skew_ms);
state
.client_capture_skew_window_ms
.push((camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0);
state
.client_send_skew_window_ms
.push((camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0);
.push(client_capture_skew_ms);
state.client_send_skew_window_ms.push(client_send_skew_ms);
state
.server_receive_skew_window_ms
.push(instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0);
state
.camera_client_queue_age_window_ms
.push(f64::from(camera.queue_age_ms));
state
.microphone_client_queue_age_window_ms
.push(f64::from(microphone.queue_age_ms));
.push(server_receive_skew_ms);
}
fn record_presentation_sample(

View File

@ -168,6 +168,11 @@ fn reset_timing_anchors(state: &mut UpstreamClockState) {
state.video_freezes = 0;
state.latest_camera_timing = None;
state.latest_microphone_timing = None;
state.recent_camera_timing = Default::default();
state.recent_microphone_timing = Default::default();
state.latest_paired_client_capture_skew_ms = None;
state.latest_paired_client_send_skew_ms = None;
state.latest_paired_server_receive_skew_ms = None;
state.latest_camera_presentation = None;
state.latest_microphone_presentation = None;
state.client_capture_skew_window_ms = Default::default();

View File

@ -1,7 +1,7 @@
use std::collections::VecDeque;
use tokio::time::Instant;
const TIMING_WINDOW_CAPACITY: usize = 240;
pub(super) const TIMING_WINDOW_CAPACITY: usize = 240;
#[derive(Clone, Copy, Debug)]
pub(super) struct UpstreamTimingSample {
@ -105,6 +105,11 @@ pub(super) struct UpstreamClockState {
pub last_reason: String,
pub latest_camera_timing: Option<UpstreamTimingSample>,
pub latest_microphone_timing: Option<UpstreamTimingSample>,
pub recent_camera_timing: VecDeque<UpstreamTimingSample>,
pub recent_microphone_timing: VecDeque<UpstreamTimingSample>,
pub latest_paired_client_capture_skew_ms: Option<f64>,
pub latest_paired_client_send_skew_ms: Option<f64>,
pub latest_paired_server_receive_skew_ms: Option<f64>,
pub latest_camera_presentation: Option<UpstreamPresentationSample>,
pub latest_microphone_presentation: Option<UpstreamPresentationSample>,
pub client_capture_skew_window_ms: UpstreamScalarWindow,

View File

@ -592,6 +592,47 @@ fn planner_snapshot_tracks_client_timing_sidecar_metrics() {
);
}
#[test]
#[serial(upstream_media_runtime)]
fn planner_pairs_client_timing_by_nearby_send_time_not_latest_packet() {
let runtime = runtime_without_offsets();
runtime.record_client_timing(
super::UpstreamMediaKind::Camera,
super::UpstreamClientTiming {
capture_pts_us: 1_000_000,
send_pts_us: 1_000_000,
queue_depth: 1,
queue_age_ms: 5,
},
);
runtime.record_client_timing(
super::UpstreamMediaKind::Microphone,
super::UpstreamClientTiming {
capture_pts_us: 1_010_000,
send_pts_us: 1_010_000,
queue_depth: 1,
queue_age_ms: 5,
},
);
runtime.record_client_timing(
super::UpstreamMediaKind::Microphone,
super::UpstreamClientTiming {
capture_pts_us: 2_500_000,
send_pts_us: 2_500_000,
queue_depth: 1,
queue_age_ms: 5,
},
);
let snapshot = runtime.snapshot();
assert_eq!(snapshot.client_capture_skew_ms, Some(-10.0));
assert_eq!(snapshot.client_send_skew_ms, Some(-10.0));
assert_eq!(snapshot.client_capture_abs_skew_p95_ms, Some(10.0));
assert_eq!(snapshot.client_send_abs_skew_p95_ms, Some(10.0));
}
#[test]
#[serial(upstream_media_runtime)]
fn default_runtime_covers_video_map_play_path() {

View File

@ -15,11 +15,13 @@ pub enum UpstreamMediaKind {
pub struct UpstreamClientTiming {
/// Packet capture timestamp on the shared client media clock.
pub capture_pts_us: u64,
/// Client media-clock timestamp when the packet left the uplink queue.
/// Client media-clock timestamp when the packet entered the async uplink
/// queue. Stamping at enqueue prevents stream scheduling from posing as
/// capture skew.
pub send_pts_us: u64,
/// Uplink queue depth observed as the packet was sent.
/// Uplink queue depth observed as the packet left the async queue.
pub queue_depth: u32,
/// Packet age observed as the packet was sent.
/// Packet age observed as the packet left the async queue.
pub queue_age_ms: u32,
}