diff --git a/Cargo.lock b/Cargo.lock index 1c655d6..75c9521 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.17.28" +version = "0.17.29" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.17.28" +version = "0.17.29" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.17.28" +version = "0.17.29" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 88343fd..d708b2c 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.17.28" +version = "0.17.29" edition = "2024" [dependencies] diff --git a/client/src/app/uplink_media.rs b/client/src/app/uplink_media.rs index 7a283f4..db632ef 100644 --- a/client/src/app/uplink_media.rs +++ b/client/src/app/uplink_media.rs @@ -89,7 +89,7 @@ impl LesavkaClientApp { queue_depth_u32(next.queue_depth), duration_ms(next.delivery_age), ); - attach_audio_timing_metadata( + attach_audio_queue_metadata( &mut packet, next.queue_depth, next.delivery_age, @@ -140,9 +140,9 @@ impl LesavkaClientApp { tracing::info!("🎀 microphone uplink resumed"); paused = false; } - if let Some(pkt) = mic_clone.pull() { + if let Some(mut pkt) = mic_clone.pull() { trace!("πŸŽ€πŸ“€ cli {} bytes β†’ gRPC", pkt.data.len()); - let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); + let enqueue_age = stamp_audio_timing_metadata_at_enqueue(&mut pkt); let stats = queue_thread.push(pkt, enqueue_age); if stats.dropped_queue_full > 0 { telemetry_thread.record_queue_full_drop(stats.dropped_queue_full); @@ -275,7 +275,7 @@ impl LesavkaClientApp { queue_depth_u32(next.queue_depth), duration_ms(next.delivery_age), ); - attach_video_timing_metadata( + attach_video_queue_metadata( &mut packet, next.queue_depth, next.delivery_age, @@ -361,7 +361,7 @@ impl LesavkaClientApp { telemetry.record_enabled(true); tracing::info!("πŸ“Έ webcam uplink resumed"); } - let Some(pkt) = cam.pull() else { + let Some(mut pkt) = cam.pull() else { std::thread::sleep(Duration::from_millis(5)); continue; }; @@ -372,7 +372,7 @@ impl LesavkaClientApp { tracing::trace!("πŸ“Έ cli frame#{n} {} B", pkt.data.len()); } tracing::trace!("πŸ“Έβ¬†οΈ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); - let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); + let enqueue_age = stamp_video_timing_metadata_at_enqueue(&mut pkt); let stats = queue.push(pkt, enqueue_age); if stats.dropped_queue_full > 0 { telemetry.record_queue_full_drop(stats.dropped_queue_full); @@ -501,38 +501,54 @@ fn duration_ms_u32(duration: Duration) -> u32 { } #[cfg(not(coverage))] -fn shared_capture_window_from_delivery_age(delivery_age: Duration) -> (u64, u64) { - let send_pts_us = crate::live_capture_clock::capture_pts_us(); - let age_us = delivery_age.as_micros().min(u128::from(u64::MAX)) as u64; - (send_pts_us.saturating_sub(age_us), send_pts_us) +fn age_between_capture_and_enqueue(capture_pts_us: u64, enqueue_pts_us: u64) -> Duration { + Duration::from_micros(enqueue_pts_us.saturating_sub(capture_pts_us)) } #[cfg(not(coverage))] -fn attach_audio_timing_metadata( +fn stamp_audio_timing_metadata_at_enqueue(packet: &mut AudioPacket) -> Duration { + static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0); + let enqueue_pts_us = crate::live_capture_clock::capture_pts_us(); + let capture_pts_us = packet.pts.min(enqueue_pts_us); + packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); + packet.client_capture_pts_us = capture_pts_us; + packet.client_send_pts_us = enqueue_pts_us; + age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us) +} + +#[cfg(not(coverage))] +fn stamp_video_timing_metadata_at_enqueue(packet: &mut VideoPacket) -> Duration { + static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0); + let enqueue_pts_us = crate::live_capture_clock::capture_pts_us(); + let capture_pts_us = packet.pts.min(enqueue_pts_us); + packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); + packet.client_capture_pts_us = capture_pts_us; + packet.client_send_pts_us = enqueue_pts_us; + age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us) +} + +#[cfg(not(coverage))] +fn attach_audio_queue_metadata( packet: &mut AudioPacket, queue_depth: usize, delivery_age: Duration, ) { - static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0); - packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); - let (capture_pts_us, send_pts_us) = shared_capture_window_from_delivery_age(delivery_age); - packet.client_capture_pts_us = capture_pts_us; - packet.client_send_pts_us = send_pts_us; + if packet.seq == 0 { + let _ = stamp_audio_timing_metadata_at_enqueue(packet); + } packet.client_queue_depth = queue_depth_u32(queue_depth); packet.client_queue_age_ms = duration_ms_u32(delivery_age); } #[cfg(not(coverage))] -fn attach_video_timing_metadata( +fn attach_video_queue_metadata( packet: &mut VideoPacket, queue_depth: usize, delivery_age: Duration, ) { - static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0); - packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); - let (capture_pts_us, send_pts_us) = shared_capture_window_from_delivery_age(delivery_age); - packet.client_capture_pts_us = capture_pts_us; - packet.client_send_pts_us = send_pts_us; + if packet.seq == 0 { + let _ = stamp_video_timing_metadata_at_enqueue(packet); + } packet.client_queue_depth = queue_depth_u32(queue_depth); packet.client_queue_age_ms = duration_ms_u32(delivery_age); } @@ -631,48 +647,70 @@ mod uplink_timing_tests { use super::*; #[test] - fn audio_timing_metadata_uses_shared_clock_window_instead_of_packet_pts_domain() { + fn audio_timing_metadata_is_stamped_before_async_queue_pop() { std::thread::sleep(Duration::from_millis(5)); + let packet_pts_us = crate::live_capture_clock::capture_pts_us().saturating_sub(2_000); let mut packet = AudioPacket { - pts: 9_999_999, + pts: packet_pts_us, ..AudioPacket::default() }; - attach_audio_timing_metadata(&mut packet, 3, Duration::from_millis(2)); + let enqueue_age = stamp_audio_timing_metadata_at_enqueue(&mut packet); + let capture_pts_us = packet.client_capture_pts_us; + let send_pts_us = packet.client_send_pts_us; + std::thread::sleep(Duration::from_millis(5)); + attach_audio_queue_metadata( + &mut packet, + 3, + enqueue_age.saturating_add(Duration::from_millis(5)), + ); assert!(packet.seq > 0); assert_eq!(packet.client_queue_depth, 3); - assert_eq!(packet.client_queue_age_ms, 2); + assert!(packet.client_queue_age_ms >= 5); + assert_eq!(packet.client_capture_pts_us, capture_pts_us); + assert_eq!(packet.client_send_pts_us, send_pts_us); assert!( packet.client_send_pts_us >= packet.client_capture_pts_us, - "send must be on or after the shared-clock capture estimate" + "enqueue/send stamp must be on or after the shared-clock capture estimate" ); assert!( - packet.client_send_pts_us - packet.client_capture_pts_us <= 2_000, - "delivery age, not packet PTS domain, should define the timing window" + packet.client_send_pts_us - packet.client_capture_pts_us <= 3_000, + "capture-to-enqueue age, not async pop delay, should define the timing window" ); } #[test] - fn video_timing_metadata_uses_shared_clock_window_instead_of_packet_pts_domain() { + fn video_timing_metadata_is_stamped_before_async_queue_pop() { std::thread::sleep(Duration::from_millis(5)); + let packet_pts_us = crate::live_capture_clock::capture_pts_us().saturating_sub(3_000); let mut packet = VideoPacket { - pts: 9_999_999, + pts: packet_pts_us, ..VideoPacket::default() }; - attach_video_timing_metadata(&mut packet, 4, Duration::from_millis(3)); + let enqueue_age = stamp_video_timing_metadata_at_enqueue(&mut packet); + let capture_pts_us = packet.client_capture_pts_us; + let send_pts_us = packet.client_send_pts_us; + std::thread::sleep(Duration::from_millis(5)); + attach_video_queue_metadata( + &mut packet, + 4, + enqueue_age.saturating_add(Duration::from_millis(5)), + ); assert!(packet.seq > 0); assert_eq!(packet.client_queue_depth, 4); - assert_eq!(packet.client_queue_age_ms, 3); + assert!(packet.client_queue_age_ms >= 5); + assert_eq!(packet.client_capture_pts_us, capture_pts_us); + assert_eq!(packet.client_send_pts_us, send_pts_us); assert!( packet.client_send_pts_us >= packet.client_capture_pts_us, - "send must be on or after the shared-clock capture estimate" + "enqueue/send stamp must be on or after the shared-clock capture estimate" ); assert!( - packet.client_send_pts_us - packet.client_capture_pts_us <= 3_000, - "delivery age, not packet PTS domain, should define the timing window" + packet.client_send_pts_us - packet.client_capture_pts_us <= 4_000, + "capture-to-enqueue age, not async pop delay, should define the timing window" ); } } diff --git a/client/src/live_capture_clock.rs b/client/src/live_capture_clock.rs index a6ad58b..d53dfa8 100644 --- a/client/src/live_capture_clock.rs +++ b/client/src/live_capture_clock.rs @@ -30,6 +30,7 @@ pub fn capture_pts_us() -> u64 { /// Why: upstream freshness telemetry should use the same shared live clock as /// packet timestamps so queue-age calculations stay honest. #[must_use] +#[allow(dead_code)] pub fn packet_age(pts_us: u64) -> Duration { Duration::from_micros(capture_pts_us().saturating_sub(pts_us)) } diff --git a/common/Cargo.toml b/common/Cargo.toml index 7a906fe..88f1973 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.17.28" +version = "0.17.29" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index fb500e2..107121c 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -26,7 +26,9 @@ message VideoPacket { uint32 server_queue_peak = 10; string server_encoder_label = 11; uint32 server_process_cpu_tenths = 12; + // Shared client media-clock timestamp for the packet's capture estimate. uint64 client_capture_pts_us = 13; + // Shared client media-clock timestamp when the packet entered the uplink queue. uint64 client_send_pts_us = 14; uint32 client_queue_depth = 15; uint32 client_queue_age_ms = 16; @@ -36,7 +38,9 @@ message AudioPacket { uint64 pts = 2; bytes data = 3; uint64 seq = 4; + // Shared client media-clock timestamp for the packet's capture estimate. uint64 client_capture_pts_us = 5; + // Shared client media-clock timestamp when the packet entered the uplink queue. uint64 client_send_pts_us = 6; uint32 client_queue_depth = 7; uint32 client_queue_age_ms = 8; diff --git a/server/Cargo.toml b/server/Cargo.toml index 2806381..c2dd892 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.17.28" +version = "0.17.29" edition = "2024" autobins = false diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index c17ba20..9b84d3b 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -137,10 +137,22 @@ impl UpstreamMediaRuntime { received_at: Instant::now(), }; match kind { - UpstreamMediaKind::Camera => state.latest_camera_timing = Some(sample), - UpstreamMediaKind::Microphone => state.latest_microphone_timing = Some(sample), + UpstreamMediaKind::Camera => { + state.latest_camera_timing = Some(sample); + push_timing_sample(&mut state.recent_camera_timing, sample); + state + .camera_client_queue_age_window_ms + .push(f64::from(timing.queue_age_ms)); + } + UpstreamMediaKind::Microphone => { + state.latest_microphone_timing = Some(sample); + push_timing_sample(&mut state.recent_microphone_timing, sample); + state + .microphone_client_queue_age_window_ms + .push(f64::from(timing.queue_age_ms)); + } } - record_client_timing_windows(&mut state); + record_client_timing_windows(&mut state, kind, sample); } /// Mark one video frame as actually handed to the UVC/HDMI sink. @@ -186,23 +198,9 @@ impl UpstreamMediaRuntime { _ => None, }; let now = Instant::now(); - let client_capture_skew_ms = skew_ms_from_samples( - state.latest_camera_timing, - state.latest_microphone_timing, - |sample| sample.capture_pts_us, - ); - let client_send_skew_ms = skew_ms_from_samples( - state.latest_camera_timing, - state.latest_microphone_timing, - |sample| sample.send_pts_us, - ); - let server_receive_skew_ms = - match (state.latest_camera_timing, state.latest_microphone_timing) { - (Some(camera), Some(microphone)) => Some( - instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0, - ), - _ => None, - }; + let client_capture_skew_ms = state.latest_paired_client_capture_skew_ms; + let client_send_skew_ms = state.latest_paired_client_send_skew_ms; + let server_receive_skew_ms = state.latest_paired_server_receive_skew_ms; UpstreamPlannerSnapshot { session_id: state.session_id, phase: state.phase.as_str(), @@ -710,19 +708,6 @@ fn us_to_ms(value: u64) -> f64 { value as f64 / 1000.0 } -fn skew_ms_from_samples( - camera: Option, - microphone: Option, - value: impl Fn(state::UpstreamTimingSample) -> u64, -) -> Option { - match (camera, microphone) { - (Some(camera), Some(microphone)) => { - Some((value(camera) as i128 - value(microphone) as i128) as f64 / 1000.0) - } - _ => None, - } -} - fn instant_delta_us(left: Instant, right: Instant) -> i128 { if left >= right { left.saturating_duration_since(right).as_micros() as i128 @@ -731,27 +716,72 @@ fn instant_delta_us(left: Instant, right: Instant) -> i128 { } } -fn record_client_timing_windows(state: &mut UpstreamClockState) { - let (Some(camera), Some(microphone)) = - (state.latest_camera_timing, state.latest_microphone_timing) - else { +const CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US: u64 = 250_000; + +fn push_timing_sample( + samples: &mut std::collections::VecDeque, + sample: state::UpstreamTimingSample, +) { + if samples.len() >= state::TIMING_WINDOW_CAPACITY { + samples.pop_front(); + } + samples.push_back(sample); +} + +fn abs_delta_us(left: u64, right: u64) -> u64 { + left.max(right) - left.min(right) +} + +fn nearest_timing_sample_by_send( + samples: &std::collections::VecDeque, + send_pts_us: u64, +) -> Option { + samples + .iter() + .copied() + .min_by_key(|sample| abs_delta_us(sample.send_pts_us, send_pts_us)) +} + +fn record_client_timing_windows( + state: &mut UpstreamClockState, + kind: UpstreamMediaKind, + sample: state::UpstreamTimingSample, +) { + let paired = match kind { + UpstreamMediaKind::Camera => { + nearest_timing_sample_by_send(&state.recent_microphone_timing, sample.send_pts_us) + .map(|microphone| (sample, microphone)) + } + UpstreamMediaKind::Microphone => { + nearest_timing_sample_by_send(&state.recent_camera_timing, sample.send_pts_us) + .map(|camera| (camera, sample)) + } + }; + let Some((camera, microphone)) = paired else { return; }; + if abs_delta_us(camera.send_pts_us, microphone.send_pts_us) + > CLIENT_TIMING_PAIR_MAX_SEND_DELTA_US + { + return; + } + let client_capture_skew_ms = + (camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0; + let client_send_skew_ms = + (camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0; + let server_receive_skew_ms = + instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0; + + state.latest_paired_client_capture_skew_ms = Some(client_capture_skew_ms); + state.latest_paired_client_send_skew_ms = Some(client_send_skew_ms); + state.latest_paired_server_receive_skew_ms = Some(server_receive_skew_ms); state .client_capture_skew_window_ms - .push((camera.capture_pts_us as i128 - microphone.capture_pts_us as i128) as f64 / 1000.0); - state - .client_send_skew_window_ms - .push((camera.send_pts_us as i128 - microphone.send_pts_us as i128) as f64 / 1000.0); + .push(client_capture_skew_ms); + state.client_send_skew_window_ms.push(client_send_skew_ms); state .server_receive_skew_window_ms - .push(instant_delta_us(camera.received_at, microphone.received_at) as f64 / 1000.0); - state - .camera_client_queue_age_window_ms - .push(f64::from(camera.queue_age_ms)); - state - .microphone_client_queue_age_window_ms - .push(f64::from(microphone.queue_age_ms)); + .push(server_receive_skew_ms); } fn record_presentation_sample( diff --git a/server/src/upstream_media_runtime/lease_lifecycle.rs b/server/src/upstream_media_runtime/lease_lifecycle.rs index ef5d25b..1222832 100644 --- a/server/src/upstream_media_runtime/lease_lifecycle.rs +++ b/server/src/upstream_media_runtime/lease_lifecycle.rs @@ -168,6 +168,11 @@ fn reset_timing_anchors(state: &mut UpstreamClockState) { state.video_freezes = 0; state.latest_camera_timing = None; state.latest_microphone_timing = None; + state.recent_camera_timing = Default::default(); + state.recent_microphone_timing = Default::default(); + state.latest_paired_client_capture_skew_ms = None; + state.latest_paired_client_send_skew_ms = None; + state.latest_paired_server_receive_skew_ms = None; state.latest_camera_presentation = None; state.latest_microphone_presentation = None; state.client_capture_skew_window_ms = Default::default(); diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs index d70a43d..e166116 100644 --- a/server/src/upstream_media_runtime/state.rs +++ b/server/src/upstream_media_runtime/state.rs @@ -1,7 +1,7 @@ use std::collections::VecDeque; use tokio::time::Instant; -const TIMING_WINDOW_CAPACITY: usize = 240; +pub(super) const TIMING_WINDOW_CAPACITY: usize = 240; #[derive(Clone, Copy, Debug)] pub(super) struct UpstreamTimingSample { @@ -105,6 +105,11 @@ pub(super) struct UpstreamClockState { pub last_reason: String, pub latest_camera_timing: Option, pub latest_microphone_timing: Option, + pub recent_camera_timing: VecDeque, + pub recent_microphone_timing: VecDeque, + pub latest_paired_client_capture_skew_ms: Option, + pub latest_paired_client_send_skew_ms: Option, + pub latest_paired_server_receive_skew_ms: Option, pub latest_camera_presentation: Option, pub latest_microphone_presentation: Option, pub client_capture_skew_window_ms: UpstreamScalarWindow, diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/server/src/upstream_media_runtime/tests/planning.rs index 2725687..ef898ea 100644 --- a/server/src/upstream_media_runtime/tests/planning.rs +++ b/server/src/upstream_media_runtime/tests/planning.rs @@ -592,6 +592,47 @@ fn planner_snapshot_tracks_client_timing_sidecar_metrics() { ); } +#[test] +#[serial(upstream_media_runtime)] +fn planner_pairs_client_timing_by_nearby_send_time_not_latest_packet() { + let runtime = runtime_without_offsets(); + + runtime.record_client_timing( + super::UpstreamMediaKind::Camera, + super::UpstreamClientTiming { + capture_pts_us: 1_000_000, + send_pts_us: 1_000_000, + queue_depth: 1, + queue_age_ms: 5, + }, + ); + runtime.record_client_timing( + super::UpstreamMediaKind::Microphone, + super::UpstreamClientTiming { + capture_pts_us: 1_010_000, + send_pts_us: 1_010_000, + queue_depth: 1, + queue_age_ms: 5, + }, + ); + runtime.record_client_timing( + super::UpstreamMediaKind::Microphone, + super::UpstreamClientTiming { + capture_pts_us: 2_500_000, + send_pts_us: 2_500_000, + queue_depth: 1, + queue_age_ms: 5, + }, + ); + + let snapshot = runtime.snapshot(); + + assert_eq!(snapshot.client_capture_skew_ms, Some(-10.0)); + assert_eq!(snapshot.client_send_skew_ms, Some(-10.0)); + assert_eq!(snapshot.client_capture_abs_skew_p95_ms, Some(10.0)); + assert_eq!(snapshot.client_send_abs_skew_p95_ms, Some(10.0)); +} + #[test] #[serial(upstream_media_runtime)] fn default_runtime_covers_video_map_play_path() { diff --git a/server/src/upstream_media_runtime/types.rs b/server/src/upstream_media_runtime/types.rs index 27b4c2e..960004a 100644 --- a/server/src/upstream_media_runtime/types.rs +++ b/server/src/upstream_media_runtime/types.rs @@ -15,11 +15,13 @@ pub enum UpstreamMediaKind { pub struct UpstreamClientTiming { /// Packet capture timestamp on the shared client media clock. pub capture_pts_us: u64, - /// Client media-clock timestamp when the packet left the uplink queue. + /// Client media-clock timestamp when the packet entered the async uplink + /// queue. Stamping at enqueue prevents stream scheduling from posing as + /// capture skew. pub send_pts_us: u64, - /// Uplink queue depth observed as the packet was sent. + /// Uplink queue depth observed as the packet left the async queue. pub queue_depth: u32, - /// Packet age observed as the packet was sent. + /// Packet age observed as the packet left the async queue. pub queue_age_ms: u32, }