media: anchor live capture timestamps per stream
This commit is contained in:
parent
03ad78829b
commit
43ff0477ee
@ -224,3 +224,4 @@ Context: 0.16.x proved that queue tweaks and static calibration cannot guarantee
|
||||
- 2026-05-01: First installed 0.17.0 mirrored browser probe on client/server commit `3920e0a` failed honestly: planner reported fresh live state (`live_lag_ms=10`, `skew_ms=+20.7`) but browser-observed paired pulses showed audio late by median `+349.1ms`, p95 `429.1ms`, with 6 video freezes/skew drops. Replayed artifact after analyzer hardening now reports `gross_failure` instead of false raw-start `catastrophic_failure`.
|
||||
- 2026-05-01: Patch follow-up models the observed MJPEG/UVC browser egress delta by defaulting video playout offset to `+350ms` and preserving the 1s freshness ceiling. Raw activity-start evidence is now ignored for verdict/calibration when it disagrees with paired pulses that are already failing directly. Existing early-0.17 `audio=0/video=0` factory/env calibration files migrate to the new `video=+350ms` default on load.
|
||||
- 2026-05-01: Release identity cleanup: bumped the patched build to clean semver `0.17.1`; probe attribution now prints `client_version`/`server_version` separately from `client_revision`/`server_revision` and refuses old `client_full_version` output.
|
||||
- 2026-05-01: 0.17.1 mirrored probe failed with video about `1.18-1.31s` behind audio and 761 planner video freezes. Root cause candidate: the client rebaser forced independent camera/mic pipelines onto one first-packet capture base, so a later-starting camera path was timestamped too early and looked permanently behind audio. Patch 0.17.2 anchors each stream to the shared monotonic clock at its own first packet time.
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_client"
|
||||
version = "0.17.1"
|
||||
version = "0.17.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-stream",
|
||||
@ -1686,7 +1686,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_common"
|
||||
version = "0.17.1"
|
||||
version = "0.17.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
@ -1698,7 +1698,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "lesavka_server"
|
||||
version = "0.17.1"
|
||||
version = "0.17.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
|
||||
@ -4,7 +4,7 @@ path = "src/main.rs"
|
||||
|
||||
[package]
|
||||
name = "lesavka_client"
|
||||
version = "0.17.1"
|
||||
version = "0.17.2"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
|
||||
@ -4,17 +4,12 @@ use std::sync::{Mutex, OnceLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
static CAPTURE_ORIGIN: OnceLock<Instant> = OnceLock::new();
|
||||
static SHARED_SOURCE_CAPTURE_BASE_US: OnceLock<Mutex<Option<u64>>> = OnceLock::new();
|
||||
const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250;
|
||||
|
||||
fn origin() -> Instant {
|
||||
*CAPTURE_ORIGIN.get_or_init(Instant::now)
|
||||
}
|
||||
|
||||
fn shared_source_capture_base_slot() -> &'static Mutex<Option<u64>> {
|
||||
SHARED_SOURCE_CAPTURE_BASE_US.get_or_init(|| Mutex::new(None))
|
||||
}
|
||||
|
||||
/// Return the shared live-capture timestamp for upstream camera/mic packets.
|
||||
///
|
||||
/// Inputs: none.
|
||||
@ -170,12 +165,7 @@ impl SourcePtsRebaser {
|
||||
|
||||
if let Some(source_pts_us) = source_pts_us {
|
||||
let source_base_us = *state.source_base_us.get_or_insert(source_pts_us);
|
||||
let capture_base_us = {
|
||||
let mut shared_capture_base_us = shared_source_capture_base_slot()
|
||||
.lock()
|
||||
.expect("shared source capture base mutex poisoned");
|
||||
*shared_capture_base_us.get_or_insert(capture_now_us)
|
||||
};
|
||||
let capture_base_us = *state.capture_base_us.get_or_insert(capture_now_us);
|
||||
state.capture_base_us = Some(capture_base_us);
|
||||
packet_pts_us =
|
||||
capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us));
|
||||
@ -257,16 +247,9 @@ mod tests {
|
||||
use serial_test::serial;
|
||||
use std::time::Duration;
|
||||
|
||||
fn reset_shared_source_capture_base_for_tests() {
|
||||
*super::shared_source_capture_base_slot()
|
||||
.lock()
|
||||
.expect("shared source capture base mutex poisoned") = None;
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn capture_pts_us_monotonically_advances() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
let first = capture_pts_us();
|
||||
std::thread::sleep(Duration::from_millis(2));
|
||||
let second = capture_pts_us();
|
||||
@ -276,7 +259,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn packet_age_is_small_for_recent_packets() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
let pts = capture_pts_us();
|
||||
std::thread::sleep(Duration::from_millis(2));
|
||||
let age = packet_age(pts);
|
||||
@ -287,7 +269,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn source_pts_rebaser_preserves_source_delta_on_shared_capture_clock() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
let rebased = SourcePtsRebaser::default();
|
||||
let first = rebased.rebase_or_now(Some(1_000_000), 1);
|
||||
let second = rebased.rebase_or_now(Some(1_033_333), 1);
|
||||
@ -305,7 +286,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn source_pts_rebaser_stays_monotonic_when_source_pts_repeat() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
let rebased = SourcePtsRebaser::default();
|
||||
let first = rebased.rebase_or_now(Some(50_000), 1);
|
||||
let second = rebased.rebase_or_now(Some(50_000), 1);
|
||||
@ -316,7 +296,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn source_pts_rebaser_falls_back_to_capture_clock_without_source_pts() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
let rebased = SourcePtsRebaser::default();
|
||||
let first = rebased.rebase_or_now(None, 1);
|
||||
std::thread::sleep(Duration::from_millis(2));
|
||||
@ -332,7 +311,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn source_pts_rebaser_clamps_source_lag_when_it_falls_too_far_behind_now() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
let rebased = SourcePtsRebaser::default();
|
||||
let _first = rebased.rebase_with_lag_cap(Some(1_000_000), 1, None);
|
||||
std::thread::sleep(Duration::from_millis(8));
|
||||
@ -347,8 +325,7 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn source_pts_rebasers_share_one_capture_base_across_streams() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
fn source_pts_rebasers_anchor_each_stream_to_its_own_first_packet_time() {
|
||||
let microphone = SourcePtsRebaser::default();
|
||||
let camera = SourcePtsRebaser::default();
|
||||
|
||||
@ -356,11 +333,14 @@ mod tests {
|
||||
std::thread::sleep(Duration::from_millis(5));
|
||||
let first_camera = camera.rebase_or_now(Some(435_000), 1);
|
||||
|
||||
assert_eq!(
|
||||
first_microphone.capture_base_us,
|
||||
first_camera.capture_base_us
|
||||
assert_ne!(
|
||||
first_microphone.capture_base_us, first_camera.capture_base_us,
|
||||
"independent camera/mic pipelines must not be forced onto the same first-packet timestamp"
|
||||
);
|
||||
assert!(
|
||||
first_camera.packet_pts_us > first_microphone.packet_pts_us,
|
||||
"a later-starting camera pipeline should keep that real wall-clock delay"
|
||||
);
|
||||
assert_eq!(first_microphone.packet_pts_us, first_camera.packet_pts_us);
|
||||
assert_eq!(first_microphone.source_base_us, Some(80_000));
|
||||
assert_eq!(first_camera.source_base_us, Some(435_000));
|
||||
}
|
||||
@ -368,7 +348,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn upstream_timing_trace_flag_defaults_off_and_accepts_true_values() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
temp_env::with_var_unset("LESAVKA_UPSTREAM_TIMING_TRACE", || {
|
||||
assert!(!upstream_timing_trace_enabled());
|
||||
});
|
||||
@ -385,7 +364,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn upstream_source_lag_cap_defaults_and_accepts_override() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
temp_env::with_var_unset("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", || {
|
||||
assert_eq!(upstream_source_lag_cap(), Duration::from_millis(250));
|
||||
});
|
||||
@ -398,7 +376,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn duration_paced_rebaser_advances_by_packet_duration_when_source_pts_stretch() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
let rebased = DurationPacedSourcePtsRebaser::default();
|
||||
let first =
|
||||
rebased.rebase_with_packet_duration(Some(0), 21_333, Duration::from_millis(250));
|
||||
@ -414,7 +391,6 @@ mod tests {
|
||||
#[test]
|
||||
#[serial]
|
||||
fn duration_paced_rebaser_clamps_when_duration_pacing_falls_stale() {
|
||||
reset_shared_source_capture_base_for_tests();
|
||||
let rebased = DurationPacedSourcePtsRebaser::default();
|
||||
let _first = rebased.rebase_with_packet_duration(Some(0), 10_000, Duration::from_millis(2));
|
||||
std::thread::sleep(Duration::from_millis(8));
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "lesavka_common"
|
||||
version = "0.17.1"
|
||||
version = "0.17.2"
|
||||
edition = "2024"
|
||||
build = "build.rs"
|
||||
|
||||
|
||||
@ -10,7 +10,7 @@ bench = false
|
||||
|
||||
[package]
|
||||
name = "lesavka_server"
|
||||
version = "0.17.1"
|
||||
version = "0.17.2"
|
||||
edition = "2024"
|
||||
autobins = false
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user