fix(sync): preserve startup timing across client uplinks

This commit is contained in:
Brad Stein 2026-04-27 07:16:08 -03:00
parent 51f6934318
commit 7793f1eb41
8 changed files with 126 additions and 74 deletions

6
Cargo.lock generated
View File

@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.14.16"
version = "0.14.17"
dependencies = [
"anyhow",
"async-stream",
@ -1676,7 +1676,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.14.16"
version = "0.14.17"
dependencies = [
"anyhow",
"base64",
@ -1688,7 +1688,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.14.16"
version = "0.14.17"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.14.16"
version = "0.14.17"
edition = "2024"
[dependencies]

View File

@ -13,28 +13,6 @@ impl LesavkaClientApp {
telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE);
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let mic_clone = mic.clone();
let telemetry_thread = telemetry.clone();
let queue_thread = queue.clone();
std::thread::spawn(move || {
while stop_rx.try_recv().is_err() {
if let Some(pkt) = mic_clone.pull() {
trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len());
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
let stats = queue_thread.push(pkt, enqueue_age);
if stats.dropped_queue_full > 0 {
telemetry_thread.record_queue_full_drop(stats.dropped_queue_full);
}
telemetry_thread.record_enqueue(
queue_depth_u32(stats.queue_depth),
duration_ms(enqueue_age),
0.0,
);
}
}
});
let queue_stream = queue.clone();
let telemetry_stream = telemetry.clone();
@ -58,10 +36,34 @@ impl LesavkaClientApp {
match cli.stream_microphone(Request::new(outbound)).await {
Ok(mut resp) => {
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let mic_clone = mic.clone();
let telemetry_thread = telemetry.clone();
let queue_thread = queue.clone();
let mic_worker = std::thread::spawn(move || {
while stop_rx.try_recv().is_err() {
if let Some(pkt) = mic_clone.pull() {
trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len());
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
let stats = queue_thread.push(pkt, enqueue_age);
if stats.dropped_queue_full > 0 {
telemetry_thread.record_queue_full_drop(stats.dropped_queue_full);
}
telemetry_thread.record_enqueue(
queue_depth_u32(stats.queue_depth),
duration_ms(enqueue_age),
0.0,
);
}
}
});
delay = Duration::from_secs(1);
telemetry.record_connected();
while resp.get_mut().message().await.transpose().is_some() {}
telemetry.record_disconnect("microphone uplink stream ended");
queue.close();
let _ = stop_tx.send(());
let _ = mic_worker.join();
}
Err(e) => {
telemetry.record_disconnect(format!("microphone uplink connect failed: {e}"));
@ -76,7 +78,6 @@ impl LesavkaClientApp {
}
queue.close();
let _ = stop_tx.send(());
tokio::time::sleep(delay).await;
}
}
@ -94,41 +95,6 @@ impl LesavkaClientApp {
telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE);
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let cam_worker = std::thread::spawn({
let cam = cam.clone();
let telemetry = telemetry.clone();
let queue = queue.clone();
move || {
loop {
if stop_rx.try_recv().is_ok() {
break;
}
let Some(pkt) = cam.pull() else {
std::thread::sleep(Duration::from_millis(5));
continue;
};
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n.is_multiple_of(120) {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
let stats = queue.push(pkt, enqueue_age);
if stats.dropped_queue_full > 0 {
telemetry.record_queue_full_drop(stats.dropped_queue_full);
}
telemetry.record_enqueue(
queue_depth_u32(stats.queue_depth),
duration_ms(enqueue_age),
0.0,
);
}
}
});
let queue_stream = queue.clone();
let telemetry_stream = telemetry.clone();
@ -152,17 +118,50 @@ impl LesavkaClientApp {
match cli.stream_camera(Request::new(outbound)).await {
Ok(mut resp) => {
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
let cam_worker = std::thread::spawn({
let cam = cam.clone();
let telemetry = telemetry.clone();
let queue = queue.clone();
move || loop {
if stop_rx.try_recv().is_ok() {
break;
}
let Some(pkt) = cam.pull() else {
std::thread::sleep(Duration::from_millis(5));
continue;
};
static CNT: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 10 || n.is_multiple_of(120) {
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
}
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
let stats = queue.push(pkt, enqueue_age);
if stats.dropped_queue_full > 0 {
telemetry.record_queue_full_drop(stats.dropped_queue_full);
}
telemetry.record_enqueue(
queue_depth_u32(stats.queue_depth),
duration_ms(enqueue_age),
0.0,
);
}
});
delay = Duration::from_secs(1);
telemetry.record_connected();
while resp.get_mut().message().await.transpose().is_some() {}
telemetry.record_disconnect("camera uplink stream ended");
queue.close();
let _ = stop_tx.send(());
let _ = cam_worker.join();
}
Err(e) if e.code() == tonic::Code::Unimplemented => {
tracing::warn!("📸 server does not support StreamCamera giving up");
telemetry.record_disconnect("camera uplink unavailable on server");
queue.close();
let _ = stop_tx.send(());
let _ = cam_worker.join();
return;
}
Err(e) => {
@ -173,8 +172,6 @@ impl LesavkaClientApp {
}
queue.close();
let _ = stop_tx.send(());
let _ = cam_worker.join();
tokio::time::sleep(delay).await;
}
}
@ -183,8 +180,8 @@ impl LesavkaClientApp {
#[cfg(not(coverage))]
const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
crate::uplink_fresh_queue::FreshQueueConfig {
capacity: 8,
max_age: Duration::from_millis(350),
capacity: 32,
max_age: Duration::from_secs(1),
};
#[cfg(not(coverage))]

View File

@ -4,12 +4,17 @@ use std::sync::{Mutex, OnceLock};
use std::time::{Duration, Instant};
static CAPTURE_ORIGIN: OnceLock<Instant> = OnceLock::new();
static SHARED_SOURCE_CAPTURE_BASE_US: OnceLock<Mutex<Option<u64>>> = OnceLock::new();
const DEFAULT_SOURCE_LAG_CAP_MS: u64 = 250;
fn origin() -> Instant {
*CAPTURE_ORIGIN.get_or_init(Instant::now)
}
fn shared_source_capture_base_slot() -> &'static Mutex<Option<u64>> {
SHARED_SOURCE_CAPTURE_BASE_US.get_or_init(|| Mutex::new(None))
}
/// Return the shared live-capture timestamp for upstream camera/mic packets.
///
/// Inputs: none.
@ -164,7 +169,13 @@ impl SourcePtsRebaser {
if let Some(source_pts_us) = source_pts_us {
let source_base_us = *state.source_base_us.get_or_insert(source_pts_us);
let capture_base_us = *state.capture_base_us.get_or_insert(capture_now_us);
let capture_base_us = {
let mut shared_capture_base_us = shared_source_capture_base_slot()
.lock()
.expect("shared source capture base mutex poisoned");
*shared_capture_base_us.get_or_insert(capture_now_us)
};
state.capture_base_us = Some(capture_base_us);
packet_pts_us =
capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us));
used_source_pts = true;
@ -242,10 +253,19 @@ mod tests {
DurationPacedSourcePtsRebaser, SourcePtsRebaser, capture_pts_us, packet_age,
upstream_source_lag_cap, upstream_timing_trace_enabled,
};
use serial_test::serial;
use std::time::Duration;
fn reset_shared_source_capture_base_for_tests() {
*super::shared_source_capture_base_slot()
.lock()
.expect("shared source capture base mutex poisoned") = None;
}
#[test]
#[serial]
fn capture_pts_us_monotonically_advances() {
reset_shared_source_capture_base_for_tests();
let first = capture_pts_us();
std::thread::sleep(Duration::from_millis(2));
let second = capture_pts_us();
@ -253,7 +273,9 @@ mod tests {
}
#[test]
#[serial]
fn packet_age_is_small_for_recent_packets() {
reset_shared_source_capture_base_for_tests();
let pts = capture_pts_us();
std::thread::sleep(Duration::from_millis(2));
let age = packet_age(pts);
@ -262,7 +284,9 @@ mod tests {
}
#[test]
#[serial]
fn source_pts_rebaser_preserves_source_delta_on_shared_capture_clock() {
reset_shared_source_capture_base_for_tests();
let rebased = SourcePtsRebaser::default();
let first = rebased.rebase_or_now(Some(1_000_000), 1);
let second = rebased.rebase_or_now(Some(1_033_333), 1);
@ -278,7 +302,9 @@ mod tests {
}
#[test]
#[serial]
fn source_pts_rebaser_stays_monotonic_when_source_pts_repeat() {
reset_shared_source_capture_base_for_tests();
let rebased = SourcePtsRebaser::default();
let first = rebased.rebase_or_now(Some(50_000), 1);
let second = rebased.rebase_or_now(Some(50_000), 1);
@ -287,7 +313,9 @@ mod tests {
}
#[test]
#[serial]
fn source_pts_rebaser_falls_back_to_capture_clock_without_source_pts() {
reset_shared_source_capture_base_for_tests();
let rebased = SourcePtsRebaser::default();
let first = rebased.rebase_or_now(None, 1);
std::thread::sleep(Duration::from_millis(2));
@ -301,7 +329,9 @@ mod tests {
}
#[test]
#[serial]
fn source_pts_rebaser_clamps_source_lag_when_it_falls_too_far_behind_now() {
reset_shared_source_capture_base_for_tests();
let rebased = SourcePtsRebaser::default();
let _first = rebased.rebase_with_lag_cap(Some(1_000_000), 1, None);
std::thread::sleep(Duration::from_millis(8));
@ -315,7 +345,26 @@ mod tests {
}
#[test]
#[serial]
fn source_pts_rebasers_share_one_capture_base_across_streams() {
reset_shared_source_capture_base_for_tests();
let microphone = SourcePtsRebaser::default();
let camera = SourcePtsRebaser::default();
let first_microphone = microphone.rebase_or_now(Some(80_000), 1);
std::thread::sleep(Duration::from_millis(5));
let first_camera = camera.rebase_or_now(Some(435_000), 1);
assert_eq!(first_microphone.capture_base_us, first_camera.capture_base_us);
assert_eq!(first_microphone.packet_pts_us, first_camera.packet_pts_us);
assert_eq!(first_microphone.source_base_us, Some(80_000));
assert_eq!(first_camera.source_base_us, Some(435_000));
}
#[test]
#[serial]
fn upstream_timing_trace_flag_defaults_off_and_accepts_true_values() {
reset_shared_source_capture_base_for_tests();
temp_env::with_var_unset("LESAVKA_UPSTREAM_TIMING_TRACE", || {
assert!(!upstream_timing_trace_enabled());
});
@ -330,7 +379,9 @@ mod tests {
}
#[test]
#[serial]
fn upstream_source_lag_cap_defaults_and_accepts_override() {
reset_shared_source_capture_base_for_tests();
temp_env::with_var_unset("LESAVKA_UPSTREAM_SOURCE_LAG_CAP_MS", || {
assert_eq!(upstream_source_lag_cap(), Duration::from_millis(250));
});
@ -341,7 +392,9 @@ mod tests {
}
#[test]
#[serial]
fn duration_paced_rebaser_advances_by_packet_duration_when_source_pts_stretch() {
reset_shared_source_capture_base_for_tests();
let rebased = DurationPacedSourcePtsRebaser::default();
let first =
rebased.rebase_with_packet_duration(Some(0), 21_333, Duration::from_millis(250));
@ -355,7 +408,9 @@ mod tests {
}
#[test]
#[serial]
fn duration_paced_rebaser_clamps_when_duration_pacing_falls_stale() {
reset_shared_source_capture_base_for_tests();
let rebased = DurationPacedSourcePtsRebaser::default();
let _first = rebased.rebase_with_packet_duration(Some(0), 10_000, Duration::from_millis(2));
std::thread::sleep(Duration::from_millis(8));

View File

@ -43,8 +43,8 @@ pub use runtime::SyncProbeCapture;
#[cfg(any(not(coverage), test))]
const PROBE_VIDEO_QUEUE: FreshQueueConfig = FreshQueueConfig {
capacity: 8,
max_age: Duration::from_millis(350),
capacity: 32,
max_age: Duration::from_secs(1),
};
#[cfg(any(not(coverage), test))]

View File

@ -48,7 +48,6 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> {
config.pulse_width,
config.marker_tick_period,
);
let capture = SyncProbeCapture::new(camera, schedule, config.duration)?;
tracing::info!(
server = %config.server,
@ -62,6 +61,7 @@ async fn run_sync_probe(config: ProbeConfig) -> Result<()> {
let video_channel = connect(config.server.as_str()).await?;
let audio_channel = connect(config.server.as_str()).await?;
let capture = SyncProbeCapture::new(camera, schedule, config.duration)?;
let video_queue = capture.video_queue();
let audio_queue = capture.audio_queue();

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.14.16"
version = "0.14.17"
edition = "2024"
build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.14.16"
version = "0.14.17"
edition = "2024"
autobins = false