fix(sync): stabilize upstream startup anchoring

This commit is contained in:
Brad Stein 2026-04-25 14:00:53 -03:00
parent c6308749fe
commit e155a9e140
2 changed files with 55 additions and 6 deletions

View File

@ -26,7 +26,7 @@ REMOTE_AUDIO_SOURCE=${REMOTE_AUDIO_SOURCE:-auto}
REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto} REMOTE_AUDIO_QUIESCE_USER_AUDIO=${REMOTE_AUDIO_QUIESCE_USER_AUDIO:-auto}
ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-1} ANALYSIS_NORMALIZE=${ANALYSIS_NORMALIZE:-1}
ANALYSIS_SCALE_WIDTH=${ANALYSIS_SCALE_WIDTH:-1280} ANALYSIS_SCALE_WIDTH=${ANALYSIS_SCALE_WIDTH:-1280}
SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=5"} SSH_OPTS=${SSH_OPTS:-"-o BatchMode=yes -o ConnectTimeout=30"}
LOCAL_AUDIO_SANITY=${LOCAL_AUDIO_SANITY:-1} LOCAL_AUDIO_SANITY=${LOCAL_AUDIO_SANITY:-1}
PROBE_PREBUILD=${PROBE_PREBUILD:-1} PROBE_PREBUILD=${PROBE_PREBUILD:-1}
PROBE_BIN=${PROBE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-probe"} PROBE_BIN=${PROBE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-probe"}
@ -362,6 +362,8 @@ fi
if [[ "${capture_status}" -ne 0 ]]; then if [[ "${capture_status}" -ne 0 ]]; then
if [[ "${capture_status}" -eq 141 && -f "${LOCAL_CAPTURE}" ]]; then if [[ "${capture_status}" -eq 141 && -f "${LOCAL_CAPTURE}" ]]; then
echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved capture ${LOCAL_CAPTURE}" >&2 echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved capture ${LOCAL_CAPTURE}" >&2
elif [[ "${capture_status}" -eq 124 && -f "${LOCAL_CAPTURE}" ]]; then
echo "Tethys capture timed out after preserving ${LOCAL_CAPTURE}; accepting partial capture for analysis" >&2
else else
echo "Tethys capture failed with status ${capture_status}" >&2 echo "Tethys capture failed with status ${capture_status}" >&2
[[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2 [[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2

View File

@ -3,6 +3,7 @@
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::info;
/// Logical upstream media kinds that share one live-call session timeline. /// Logical upstream media kinds that share one live-call session timeline.
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
@ -27,9 +28,11 @@ struct UpstreamClockState {
session_id: u64, session_id: u64,
active_camera_generation: Option<u64>, active_camera_generation: Option<u64>,
active_microphone_generation: Option<u64>, active_microphone_generation: Option<u64>,
base_remote_pts_us: Option<u64>, camera_base_remote_pts_us: Option<u64>,
microphone_base_remote_pts_us: Option<u64>,
last_video_local_pts_us: Option<u64>, last_video_local_pts_us: Option<u64>,
last_audio_local_pts_us: Option<u64>, last_audio_local_pts_us: Option<u64>,
startup_anchor_logged: bool,
} }
/// Coordinate upstream stream ownership and keep audio/video on one timeline. /// Coordinate upstream stream ownership and keep audio/video on one timeline.
@ -111,9 +114,11 @@ impl UpstreamMediaRuntime {
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
{ {
state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1;
state.base_remote_pts_us = None; state.camera_base_remote_pts_us = None;
state.microphone_base_remote_pts_us = None;
state.last_video_local_pts_us = None; state.last_video_local_pts_us = None;
state.last_audio_local_pts_us = None; state.last_audio_local_pts_us = None;
state.startup_anchor_logged = false;
} }
match kind { match kind {
UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation),
@ -176,9 +181,11 @@ impl UpstreamMediaRuntime {
} }
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
{ {
state.base_remote_pts_us = None; state.camera_base_remote_pts_us = None;
state.microphone_base_remote_pts_us = None;
state.last_video_local_pts_us = None; state.last_video_local_pts_us = None;
state.last_audio_local_pts_us = None; state.last_audio_local_pts_us = None;
state.startup_anchor_logged = false;
} }
} }
@ -203,8 +210,31 @@ impl UpstreamMediaRuntime {
.state .state
.lock() .lock()
.expect("upstream media state mutex poisoned"); .expect("upstream media state mutex poisoned");
let base_remote = *state.base_remote_pts_us.get_or_insert(remote_pts_us); let session_id = state.session_id;
let mut local_pts_us = remote_pts_us.saturating_sub(base_remote); let base_slot = match kind {
UpstreamMediaKind::Camera => &mut state.camera_base_remote_pts_us,
UpstreamMediaKind::Microphone => &mut state.microphone_base_remote_pts_us,
};
let first_remote_for_kind = *base_slot.get_or_insert(remote_pts_us);
if !state.startup_anchor_logged
&& state.camera_base_remote_pts_us.is_some()
&& state.microphone_base_remote_pts_us.is_some()
{
let camera_base_remote_pts_us = state.camera_base_remote_pts_us.unwrap_or_default();
let microphone_base_remote_pts_us =
state.microphone_base_remote_pts_us.unwrap_or_default();
let startup_delta_us =
camera_base_remote_pts_us as i128 - microphone_base_remote_pts_us as i128;
info!(
session_id,
camera_base_remote_pts_us,
microphone_base_remote_pts_us,
startup_delta_us,
"upstream media session anchors observed"
);
state.startup_anchor_logged = true;
}
let mut local_pts_us = remote_pts_us.saturating_sub(first_remote_for_kind);
let last_slot = match kind { let last_slot = match kind {
UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us,
UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us,
@ -277,6 +307,23 @@ mod tests {
assert_eq!(video_next, 33_333); assert_eq!(video_next, 33_333);
} }
#[test]
fn per_kind_session_bases_cancel_constant_startup_path_offsets() {
let runtime = UpstreamMediaRuntime::new();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let audio_first = runtime.map_audio_pts(1_000_000);
let video_first = runtime.map_video_pts(1_300_000, 16_666);
let audio_next = runtime.map_audio_pts(1_010_000);
let video_next = runtime.map_video_pts(1_333_333, 16_666);
assert_eq!(audio_first, 0);
assert_eq!(video_first, 0);
assert_eq!(audio_next, 10_000);
assert_eq!(video_next, 33_333);
}
#[test] #[test]
fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() { fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() {
let runtime = UpstreamMediaRuntime::new(); let runtime = UpstreamMediaRuntime::new();