From e155a9e1408d3c6574ad6751a8912b2cdab2c605 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sat, 25 Apr 2026 14:00:53 -0300 Subject: [PATCH] fix(sync): stabilize upstream startup anchoring --- scripts/manual/run_upstream_av_sync.sh | 4 +- server/src/upstream_media_runtime.rs | 57 +++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index ef331de..0391c49 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -26,7 +26,7 @@ REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto} REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto} ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-1} ANALYSIS_SCALE_WIDTH=${ANALYSIS_SCALE_WIDTH:-1280} -SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=5"} +SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=30"} LOCAL_AUDIO_SANITY=${LOCAL_AUDIO_SANITY:-1} PROBE_PREBUILD=${PROBE_PREBUILD:-1} PROBE_BIN=${PROBE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-probe"} @@ -362,6 +362,8 @@ fi if [[ "${capture_status}" -ne 0 ]]; then if [[ "${capture_status}" -eq 141 && -f "${LOCAL_CAPTURE}" ]]; then echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved capture ${LOCAL_CAPTURE}" >&2 + elif [[ "${capture_status}" -eq 124 && -f "${LOCAL_CAPTURE}" ]]; then + echo "Tethys capture timed out after preserving ${LOCAL_CAPTURE}; accepting partial capture for analysis" >&2 else echo "Tethys capture failed with status ${capture_status}" >&2 [[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2 diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index be752f8..91aed48 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -3,6 +3,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tracing::info; /// Logical upstream media kinds that share one live-call session timeline. #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -27,9 +28,11 @@ struct UpstreamClockState { session_id: u64, active_camera_generation: Option, active_microphone_generation: Option, - base_remote_pts_us: Option, + camera_base_remote_pts_us: Option, + microphone_base_remote_pts_us: Option, last_video_local_pts_us: Option, last_audio_local_pts_us: Option, + startup_anchor_logged: bool, } /// Coordinate upstream stream ownership and keep audio/video on one timeline. @@ -111,9 +114,11 @@ impl UpstreamMediaRuntime { if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; - state.base_remote_pts_us = None; + state.camera_base_remote_pts_us = None; + state.microphone_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; + state.startup_anchor_logged = false; } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), @@ -176,9 +181,11 @@ impl UpstreamMediaRuntime { } if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { - state.base_remote_pts_us = None; + state.camera_base_remote_pts_us = None; + state.microphone_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; + state.startup_anchor_logged = false; } } @@ -203,8 +210,31 @@ impl UpstreamMediaRuntime { .state .lock() .expect("upstream media state mutex poisoned"); - let base_remote = *state.base_remote_pts_us.get_or_insert(remote_pts_us); - let mut local_pts_us = remote_pts_us.saturating_sub(base_remote); + let session_id = state.session_id; + let base_slot = match kind { + UpstreamMediaKind::Camera => &mut state.camera_base_remote_pts_us, + UpstreamMediaKind::Microphone => &mut state.microphone_base_remote_pts_us, + }; + let first_remote_for_kind = *base_slot.get_or_insert(remote_pts_us); + if !state.startup_anchor_logged + && state.camera_base_remote_pts_us.is_some() + && state.microphone_base_remote_pts_us.is_some() + { + let camera_base_remote_pts_us = state.camera_base_remote_pts_us.unwrap_or_default(); + let microphone_base_remote_pts_us = + state.microphone_base_remote_pts_us.unwrap_or_default(); + let startup_delta_us = + camera_base_remote_pts_us as i128 - microphone_base_remote_pts_us as i128; + info!( + session_id, + camera_base_remote_pts_us, + microphone_base_remote_pts_us, + startup_delta_us, + "upstream media session anchors observed" + ); + state.startup_anchor_logged = true; + } + let mut local_pts_us = remote_pts_us.saturating_sub(first_remote_for_kind); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, @@ -277,6 +307,23 @@ mod tests { assert_eq!(video_next, 33_333); } + #[test] + fn per_kind_session_bases_cancel_constant_startup_path_offsets() { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + let audio_first = runtime.map_audio_pts(1_000_000); + let video_first = runtime.map_video_pts(1_300_000, 16_666); + let audio_next = runtime.map_audio_pts(1_010_000); + let video_next = runtime.map_video_pts(1_333_333, 16_666); + + assert_eq!(audio_first, 0); + assert_eq!(video_first, 0); + assert_eq!(audio_next, 10_000); + assert_eq!(video_next, 33_333); + } + #[test] fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() { let runtime = UpstreamMediaRuntime::new();