From 736c4e3bac0297eb783489b1b1815cdb4d92c113 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Tue, 21 Apr 2026 01:52:39 -0300 Subject: [PATCH] fix(audio): rebase remote packet pts onto local playback timeline --- client/src/output/audio.rs | 39 ++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/client/src/output/audio.rs b/client/src/output/audio.rs index 93c5c7a..146e477 100644 --- a/client/src/output/audio.rs +++ b/client/src/output/audio.rs @@ -5,6 +5,7 @@ use gst::MessageView::*; use gst::prelude::*; use gstreamer as gst; use gstreamer_app as gst_app; +use std::sync::Mutex; use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::AudioPacket; @@ -12,6 +13,14 @@ use lesavka_common::lesavka::AudioPacket; pub struct AudioOut { pipeline: gst::Pipeline, src: gst_app::AppSrc, + timeline: Mutex, +} + +#[derive(Default)] +struct AudioTimeline { + first_remote_pts_us: Option, + last_local_pts_us: u64, + packets: u64, } impl AudioOut { @@ -106,14 +115,36 @@ impl AudioOut { pipeline .set_state(gst::State::Playing) .context("starting audio pipeline")?; - Ok(Self { pipeline, src }) + Ok(Self { + pipeline, + src, + timeline: Mutex::new(AudioTimeline::default()), + }) } pub fn push(&self, pkt: AudioPacket) { let mut buf = gst::Buffer::from_slice(pkt.data); - buf.get_mut() - .unwrap() - .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + if let Ok(mut timeline) = self.timeline.lock() { + let base = timeline.first_remote_pts_us.get_or_insert(pkt.pts); + let mut local_pts_us = pkt.pts.saturating_sub(*base); + if local_pts_us < timeline.last_local_pts_us { + local_pts_us = timeline.last_local_pts_us.saturating_add(1); + } + timeline.last_local_pts_us = local_pts_us; + timeline.packets = timeline.packets.saturating_add(1); + if timeline.packets <= 8 || timeline.packets % 600 == 0 { + debug!( + packet = timeline.packets, + remote_pts_us = pkt.pts, + local_pts_us, + bytes = buf.size(), + "🔊 audio packet queued" + ); + } + buf.get_mut() + .unwrap() + .set_pts(Some(gst::ClockTime::from_useconds(local_pts_us))); + } #[cfg(not(coverage))] if let Err(e) = self.src.push_buffer(buf) { warn!("📉 AppSrc push failed: {e:?}");