From e06f14d27e73f3d7e2d9fd40514e46e5f74c5034 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Fri, 24 Apr 2026 18:26:19 -0300 Subject: [PATCH] fix(server): align live sink clocks --- Cargo.lock | 6 +- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- scripts/ci/quality_gate_baseline.json | 8 ++- server/Cargo.toml | 2 +- server/src/audio/voice_input.rs | 20 ++++-- server/src/lib.rs | 1 + server/src/media_timing.rs | 72 +++++++++++++++++++ server/src/video_sinks/hdmi_sink.rs | 29 ++++---- server/src/video_sinks/webcam_sink.rs | 2 +- .../tests/server_audio_include_contract.rs | 4 ++ .../server_video_sinks_include_contract.rs | 8 ++- 12 files changed, 128 insertions(+), 28 deletions(-) create mode 100644 server/src/media_timing.rs diff --git a/Cargo.lock b/Cargo.lock index ba27e2f..e0d7575 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.13.2" +version = "0.13.3" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.13.2" +version = "0.13.3" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.13.2" +version = "0.13.3" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 2ae9ef1..ae3f116 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.13.2" +version = "0.13.3" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index 85bb87c..ff40dbd 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.13.2" +version = "0.13.3" edition = "2024" build = "build.rs" diff --git a/scripts/ci/quality_gate_baseline.json b/scripts/ci/quality_gate_baseline.json index 5568cbb..03b4862 100644 --- a/scripts/ci/quality_gate_baseline.json +++ b/scripts/ci/quality_gate_baseline.json @@ -278,7 +278,7 @@ }, "server/src/audio/voice_input.rs": { "line_percent": 100.0, - "loc": 360 + "loc": 370 }, "server/src/bin/lesavka_uvc/control_payloads.rs": { "line_percent": 100.0, @@ -372,6 +372,10 @@ "line_percent": 100.0, "loc": 66 }, + "server/src/media_timing.rs": { + "line_percent": 100.0, + "loc": 72 + }, "server/src/paste.rs": { "line_percent": 98.29, "loc": 260 @@ -410,7 +414,7 @@ }, "server/src/video_sinks/hdmi_sink.rs": { "line_percent": 100.0, - "loc": 423 + "loc": 428 }, "server/src/video_sinks/webcam_sink.rs": { "line_percent": 100.0, diff --git a/server/Cargo.toml b/server/Cargo.toml index ff2704c..2fbf7b6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.13.2" +version = "0.13.3" edition = "2024" autobins = false diff --git a/server/src/audio/voice_input.rs b/server/src/audio/voice_input.rs index b1ccc99..4a52c4d 100644 --- a/server/src/audio/voice_input.rs +++ b/server/src/audio/voice_input.rs @@ -44,6 +44,7 @@ impl Drop for ClipTap { pub struct Voice { appsrc: gst_app::AppSrc, _pipe: gst::Pipeline, // keep pipeline alive + clock_aligned: bool, tap: ClipTap, } @@ -130,6 +131,7 @@ impl Voice { Ok(Self { appsrc, _pipe: pipeline, + clock_aligned: false, tap: ClipTap::new("voice", Duration::from_secs(60)), }) } @@ -184,8 +186,8 @@ impl Voice { let compensation_us = voice_sink_compensation_us(); alsa_sink.set_property("device", alsa_dev); - alsa_sink.set_property("sync", false); - alsa_sink.set_property("async", false); + alsa_sink.set_property("sync", true); + alsa_sink.set_property("async", true); alsa_sink.set_property("enable-last-sample", false); alsa_sink.set_property("provide-clock", false); alsa_sink.set_property("buffer-time", buffer_time_us); @@ -202,6 +204,7 @@ impl Voice { compensation_us, "🎤 UAC sink low-latency timing armed" ); + crate::media_timing::prepare_pipeline_clock_sync(&pipeline); pipeline.add_many([ appsrc.upcast_ref(), @@ -248,17 +251,24 @@ impl Voice { Ok(Self { appsrc, _pipe: pipeline, + clock_aligned: false, tap: ClipTap::new("voice", Duration::from_secs(60)), }) } pub fn push(&mut self, pkt: &AudioPacket) { self.tap.feed(&pkt.data); + if !self.clock_aligned { + crate::media_timing::align_pipeline_to_session_clock(&self._pipe, pkt.pts); + self.clock_aligned = true; + } let mut buf = gst::Buffer::from_slice(pkt.data.clone()); - buf.get_mut() - .unwrap() - .set_pts(Some(gst::ClockTime::from_useconds(pkt.pts))); + if let Some(meta) = buf.get_mut() { + let ts = gst::ClockTime::from_useconds(pkt.pts); + meta.set_pts(Some(ts)); + meta.set_dts(Some(ts)); + } let _ = self.appsrc.push_buffer(buf); } diff --git a/server/src/lib.rs b/server/src/lib.rs index d45b1a7..88bdc68 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -10,6 +10,7 @@ pub mod camera_runtime; pub mod capture_power; pub mod gadget; pub mod handshake; +pub(crate) mod media_timing; pub mod paste; pub mod runtime_support; pub mod upstream_media_runtime; diff --git a/server/src/media_timing.rs b/server/src/media_timing.rs new file mode 100644 index 0000000..36e4990 --- /dev/null +++ b/server/src/media_timing.rs @@ -0,0 +1,72 @@ +//! Shared live-playback timing helpers for server-side sink pipelines. +//! +//! The HDMI and UAC paths live in separate GStreamer pipelines, so they need a +//! common clock source and a shared base-time derived from the same session PTS +//! timeline before sink synchronization can meaningfully align them. + +use gst::prelude::*; +use gstreamer as gst; + +/// Pin one playback pipeline to the shared system clock. +/// +/// Inputs: the pipeline that will present live audio or video. +/// Outputs: none; the pipeline is configured in place. +/// Why: separate playback pipelines must use the same clock before their +/// session-rebased timestamps can line up. +pub(crate) fn prepare_pipeline_clock_sync(pipeline: &gst::Pipeline) { + let clock = gst::SystemClock::obtain(); + pipeline.use_clock(Some(&clock)); + pipeline.set_start_time(None::); +} + +/// Align one playback pipeline so the supplied session PTS lands on "now". +/// +/// Inputs: the pipeline plus the first session-local PTS, in microseconds, +/// that should render immediately. +/// Outputs: none; the pipeline base-time is updated in place. +/// Why: audio and video pipelines start at different wall-clock moments, so +/// each one must translate the shared session timeline back into the same +/// absolute base-time on its first packet. +pub(crate) fn align_pipeline_to_session_clock(pipeline: &gst::Pipeline, session_pts_us: u64) { + let clock = gst::SystemClock::obtain(); + let now_ns = clock + .time() + .map(|value| value.nseconds()) + .unwrap_or_default(); + let base_time_ns = session_base_time_ns(now_ns, session_pts_us); + + pipeline.use_clock(Some(&clock)); + pipeline.set_start_time(None::); + pipeline.set_base_time(gst::ClockTime::from_nseconds(base_time_ns)); +} + +/// Turn on clock-synchronized presentation when a sink exposes the standard +/// `sync` property. +/// +/// Inputs: the sink element from the HDMI or audio playback path. +/// Outputs: none; the sink is configured in place when supported. +/// Why: timestamps only matter when the sink actually honors them. +pub(crate) fn enable_sink_clock_sync(sink: &gst::Element) { + if sink.has_property("sync", None) { + sink.set_property("sync", true); + } +} + +fn session_base_time_ns(clock_time_ns: u64, session_pts_us: u64) -> u64 { + clock_time_ns.saturating_sub(session_pts_us.saturating_mul(1_000)) +} + +#[cfg(test)] +mod tests { + use super::session_base_time_ns; + + #[test] + fn session_base_time_subtracts_pts_from_clock_time() { + assert_eq!(session_base_time_ns(9_000_000, 3_000), 6_000_000); + } + + #[test] + fn session_base_time_saturates_at_zero() { + assert_eq!(session_base_time_ns(2_000_000, 3_000), 0); + } +} diff --git a/server/src/video_sinks/hdmi_sink.rs b/server/src/video_sinks/hdmi_sink.rs index 66b1087..fff22c5 100644 --- a/server/src/video_sinks/hdmi_sink.rs +++ b/server/src/video_sinks/hdmi_sink.rs @@ -8,6 +8,7 @@ pub struct HdmiSink { appsrc: gst_app::AppSrc, pipe: gst::Pipeline, + clock_aligned: AtomicBool, next_pts_us: AtomicU64, frame_step_us: u64, presentation_delay_us: u64, @@ -49,6 +50,7 @@ impl HdmiSink { src.set_property("do-timestamp", &false); let sink = build_hdmi_sink(cfg)?; + crate::media_timing::prepare_pipeline_clock_sync(&pipeline); pipeline.add_many(&[src.upcast_ref(), &sink])?; gst::Element::link_many(&[src.upcast_ref(), &sink])?; pipeline.set_state(gst::State::Playing)?; @@ -57,6 +59,7 @@ impl HdmiSink { Ok(Self { appsrc: src, pipe: pipeline, + clock_aligned: AtomicBool::new(false), next_pts_us: AtomicU64::new(0), frame_step_us, presentation_delay_us: 0, @@ -103,6 +106,7 @@ impl HdmiSink { let rate = gst::ElementFactory::make("videorate").build()?; let scale = gst::ElementFactory::make("videoscale").build()?; let sink = build_hdmi_sink(cfg)?; + crate::media_timing::prepare_pipeline_clock_sync(&pipeline); tracing::info!( target: "lesavka_server::video", queue_depth, @@ -195,6 +199,7 @@ impl HdmiSink { Ok(Self { appsrc: src, pipe: pipeline, + clock_aligned: AtomicBool::new(false), next_pts_us: AtomicU64::new(0), frame_step_us, presentation_delay_us, @@ -222,6 +227,12 @@ impl HdmiSink { pkt.pts.saturating_add(self.presentation_delay_us), self.frame_step_us, ); + if !self + .clock_aligned + .swap(true, std::sync::atomic::Ordering::SeqCst) + { + crate::media_timing::align_pipeline_to_session_clock(&self.pipe, pts_us); + } let ts = gst::ClockTime::from_useconds(pts_us); meta.set_pts(Some(ts)); meta.set_dts(Some(ts)); @@ -245,14 +256,14 @@ fn build_hdmi_sink(_cfg: &CameraConfig) -> anyhow::Result { let sink = gst::ElementFactory::make(&name) .build() .context("building HDMI sink")?; - disable_sink_clock_sync(&sink); + crate::media_timing::enable_sink_clock_sync(&sink); return Ok(sink); } let sink = gst::ElementFactory::make("fakesink") .build() .context("building fallback HDMI sink")?; - disable_sink_clock_sync(&sink); + crate::media_timing::enable_sink_clock_sync(&sink); Ok(sink) } @@ -267,7 +278,7 @@ fn build_hdmi_sink(cfg: &CameraConfig) -> anyhow::Result { let sink = gst::ElementFactory::make(&name) .build() .context("building HDMI sink")?; - disable_sink_clock_sync(&sink); + crate::media_timing::enable_sink_clock_sync(&sink); return Ok(sink); } @@ -300,14 +311,14 @@ fn build_hdmi_sink(cfg: &CameraConfig) -> anyhow::Result { let skip = read_bool_env("LESAVKA_HDMI_SKIP_VSYNC").unwrap_or(false); sink.set_property("skip-vsync", skip); } - disable_sink_clock_sync(&sink); + crate::media_timing::enable_sink_clock_sync(&sink); return Ok(sink); } let sink = gst::ElementFactory::make("autovideosink") .build() .context("building HDMI sink")?; - disable_sink_clock_sync(&sink); + crate::media_timing::enable_sink_clock_sync(&sink); Ok(sink) } @@ -320,7 +331,7 @@ fn build_fbdev_hdmi_sink() -> anyhow::Result { .property("device", &device) .build() .context("building framebuffer HDMI sink")?; - disable_sink_clock_sync(&sink); + crate::media_timing::enable_sink_clock_sync(&sink); tracing::info!( target: "lesavka_server::video", @@ -369,12 +380,6 @@ fn unblank_framebuffer(device: &str) { } } -fn disable_sink_clock_sync(sink: &gst::Element) { - if sink.has_property("sync", None) { - sink.set_property("sync", false); - } -} - fn read_bool_env(name: &str) -> Option { let value = std::env::var(name).ok()?; match value.trim().to_ascii_lowercase().as_str() { diff --git a/server/src/video_sinks/webcam_sink.rs b/server/src/video_sinks/webcam_sink.rs index 98fec19..4fc5365 100644 --- a/server/src/video_sinks/webcam_sink.rs +++ b/server/src/video_sinks/webcam_sink.rs @@ -5,7 +5,7 @@ use gstreamer_app as gst_app; use lesavka_common::lesavka::VideoPacket; use std::fs; use std::path::Path; -use std::sync::atomic::AtomicU64; +use std::sync::atomic::{AtomicBool, AtomicU64}; use tracing::warn; use crate::camera::{CameraCodec, CameraConfig}; diff --git a/testing/tests/server_audio_include_contract.rs b/testing/tests/server_audio_include_contract.rs index 7c4c528..280cabe 100644 --- a/testing/tests/server_audio_include_contract.rs +++ b/testing/tests/server_audio_include_contract.rs @@ -8,6 +8,10 @@ pub use lesavka_server::camera; +#[path = "../../server/src/media_timing.rs"] +#[allow(warnings)] +mod media_timing; + #[path = "../../server/src/audio.rs"] #[allow(warnings)] mod server_audio_contract; diff --git a/testing/tests/server_video_sinks_include_contract.rs b/testing/tests/server_video_sinks_include_contract.rs index 53815a5..f5c9af6 100644 --- a/testing/tests/server_video_sinks_include_contract.rs +++ b/testing/tests/server_video_sinks_include_contract.rs @@ -13,6 +13,10 @@ mod video_support { pub use lesavka_server::video_support::*; } +#[path = "../../server/src/media_timing.rs"] +#[allow(warnings)] +mod media_timing; + #[allow(warnings)] mod video_sinks_include_contract { include!(env!("LESAVKA_SERVER_VIDEO_SINKS_SRC")); @@ -89,8 +93,8 @@ mod video_sinks_include_contract { } if sink.has_property("sync", None) { assert!( - !sink.property::("sync"), - "fbdev HDMI output should not clock-sync WAN camera frames" + sink.property::("sync"), + "fbdev HDMI output should honor the shared session clock" ); } });