diff --git a/client/src/bin/lesavka-sync-analyze.rs b/client/src/bin/lesavka-sync-analyze.rs index 12a9c0e..2bc21c4 100644 --- a/client/src/bin/lesavka-sync-analyze.rs +++ b/client/src/bin/lesavka-sync-analyze.rs @@ -1,19 +1,36 @@ #[cfg(any(not(coverage), test))] use anyhow::{Context, Result, bail}; +#[cfg(not(coverage))] +use serde::Serialize; #[cfg(not(coverage))] -use lesavka_client::sync_probe::analyze::{SyncAnalysisOptions, analyze_capture}; +use lesavka_client::sync_probe::analyze::{ + SyncAnalysisOptions, SyncAnalysisReport, SyncCalibrationRecommendation, analyze_capture, +}; + +#[cfg(not(coverage))] +#[derive(Serialize)] +struct SyncAnalyzeOutput<'a> { + #[serde(flatten)] + report: &'a SyncAnalysisReport, + calibration: SyncCalibrationRecommendation, +} #[cfg(not(coverage))] fn main() -> Result<()> { let (capture_path, emit_json) = parse_args(std::env::args().skip(1))?; let report = analyze_capture(&capture_path, &SyncAnalysisOptions::default()) .with_context(|| format!("analyzing sync capture {}", capture_path.display()))?; + let calibration = report.calibration_recommendation(); if emit_json { + let output = SyncAnalyzeOutput { + report: &report, + calibration, + }; println!( "{}", - serde_json::to_string_pretty(&report).context("serializing JSON report")? + serde_json::to_string_pretty(&output).context("serializing JSON report")? ); } else { println!("A/V sync report for {}", capture_path.display()); @@ -29,6 +46,16 @@ fn main() -> Result<()> { println!("- median skew: {:+.1} ms", report.median_skew_ms); println!("- max abs skew: {:.1} ms", report.max_abs_skew_ms); println!("- drift: {:+.1} ms", report.drift_ms); + println!("- calibration ready: {}", calibration.ready); + println!( + "- recommended audio offset adjust: {:+} us", + calibration.recommended_audio_offset_adjust_us + ); + println!( + "- alternative video offset adjust: {:+} us", + calibration.recommended_video_offset_adjust_us + ); + println!("- calibration note: {}", calibration.note); } Ok(()) diff --git a/client/src/input/camera/capture_pipeline.rs b/client/src/input/camera/capture_pipeline.rs index 54332cf..3185b73 100644 --- a/client/src/input/camera/capture_pipeline.rs +++ b/client/src/input/camera/capture_pipeline.rs @@ -241,29 +241,8 @@ impl CameraCapture { static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let packet_index = CAMERA_PACKET_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if packet_index == 0 { - tracing::info!( - bytes = map.as_slice().len(), - pts_us = pts, - "📸 upstream webcam frames flowing" - ); - } - if crate::live_capture_clock::upstream_timing_trace_enabled() - && (packet_index < 10 || packet_index.is_multiple_of(300)) - { - tracing::info!( - packet_index, - source_pts_us = timing.source_pts_us.unwrap_or_default(), - source_base_us = timing.source_base_us.unwrap_or_default(), - capture_base_us = timing.capture_base_us.unwrap_or_default(), - capture_now_us = timing.capture_now_us, - packet_pts_us = timing.packet_pts_us, - pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, - used_source_pts = timing.used_source_pts, - bytes = map.as_slice().len(), - "📸 upstream webcam timing sample" - ); - } + log_camera_first_packet(packet_index, map.as_slice().len(), pts); + log_camera_timing_sample(packet_index, timing, map.as_slice().len()); Some(VideoPacket { id: 2, pts, @@ -271,5 +250,36 @@ impl CameraCapture { ..Default::default() }) } - +} + +fn log_camera_first_packet(packet_index: u64, bytes: usize, pts_us: u64) { + if packet_index == 0 { + tracing::info!(bytes, pts_us, "📸 upstream webcam frames flowing"); + } +} + +fn should_log_camera_timing_sample(packet_index: u64) -> bool { + crate::live_capture_clock::upstream_timing_trace_enabled() + && (packet_index < 10 || packet_index.is_multiple_of(300)) +} + +fn log_camera_timing_sample( + packet_index: u64, + timing: crate::live_capture_clock::RebasedSourcePts, + bytes: usize, +) { + if should_log_camera_timing_sample(packet_index) { + tracing::info!( + packet_index, + source_pts_us = timing.source_pts_us.unwrap_or_default(), + source_base_us = timing.source_base_us.unwrap_or_default(), + capture_base_us = timing.capture_base_us.unwrap_or_default(), + capture_now_us = timing.capture_now_us, + packet_pts_us = timing.packet_pts_us, + pull_path_delay_us = timing.capture_now_us as i128 - timing.packet_pts_us as i128, + used_source_pts = timing.used_source_pts, + bytes, + "📸 upstream webcam timing sample" + ); + } } diff --git a/client/src/sync_probe/analyze.rs b/client/src/sync_probe/analyze.rs index 69b5978..df3680f 100644 --- a/client/src/sync_probe/analyze.rs +++ b/client/src/sync_probe/analyze.rs @@ -15,7 +15,7 @@ use onset_detection::{ }; pub use onset_detection::{detect_audio_onsets, detect_video_onsets}; -pub use report::{SyncAnalysisOptions, SyncAnalysisReport}; +pub use report::{SyncAnalysisOptions, SyncAnalysisReport, SyncCalibrationRecommendation}; /// Analyzes a captured upstream sync-probe file by extracting video and audio /// pulses, then correlating them into skew and drift metrics. diff --git a/client/src/sync_probe/analyze/report.rs b/client/src/sync_probe/analyze/report.rs index 3e225d3..d1a915a 100644 --- a/client/src/sync_probe/analyze/report.rs +++ b/client/src/sync_probe/analyze/report.rs @@ -5,6 +5,9 @@ const DEFAULT_MAX_PAIR_GAP_S: f64 = 0.5; const DEFAULT_PULSE_PERIOD_S: f64 = 1.0; const DEFAULT_PULSE_WIDTH_S: f64 = 0.12; const DEFAULT_MARKER_TICK_PERIOD: u32 = 5; +const CALIBRATION_MIN_PAIRED_EVENTS: usize = 8; +const CALIBRATION_MAX_DRIFT_MS: f64 = 40.0; +const CALIBRATION_SETTLED_SKEW_MS: f64 = 5.0; #[derive(Clone, Debug, PartialEq, Serialize)] pub struct SyncAnalysisReport { @@ -22,6 +25,64 @@ pub struct SyncAnalysisReport { pub audio_onsets_s: Vec, } +#[derive(Clone, Debug, PartialEq, Eq, Serialize)] +pub struct SyncCalibrationRecommendation { + pub ready: bool, + pub recommended_audio_offset_adjust_us: i64, + pub recommended_video_offset_adjust_us: i64, + pub note: String, +} + +impl SyncAnalysisReport { + #[must_use] + pub fn calibration_recommendation(&self) -> SyncCalibrationRecommendation { + if self.paired_event_count < CALIBRATION_MIN_PAIRED_EVENTS { + return SyncCalibrationRecommendation { + ready: false, + recommended_audio_offset_adjust_us: 0, + recommended_video_offset_adjust_us: 0, + note: format!( + "need at least {CALIBRATION_MIN_PAIRED_EVENTS} paired pulses; saw {}", + self.paired_event_count + ), + }; + } + + if self.drift_ms.abs() > CALIBRATION_MAX_DRIFT_MS { + return SyncCalibrationRecommendation { + ready: false, + recommended_audio_offset_adjust_us: 0, + recommended_video_offset_adjust_us: 0, + note: format!( + "drift {:.1} ms exceeds the {:.1} ms calibration limit", + self.drift_ms, CALIBRATION_MAX_DRIFT_MS + ), + }; + } + + let recommended_audio_offset_adjust_us = (-(self.median_skew_ms * 1_000.0)).round() as i64; + let recommended_video_offset_adjust_us = -recommended_audio_offset_adjust_us; + let note = if self.median_skew_ms.abs() <= CALIBRATION_SETTLED_SKEW_MS { + format!( + "median skew {:.1} ms is already within the settled {:.1} ms band", + self.median_skew_ms, CALIBRATION_SETTLED_SKEW_MS + ) + } else { + format!( + "apply the audio offset adjustment to move median skew from {:+.1} ms toward 0.0 ms", + self.median_skew_ms + ) + }; + + SyncCalibrationRecommendation { + ready: true, + recommended_audio_offset_adjust_us, + recommended_video_offset_adjust_us, + note, + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct SyncAnalysisOptions { pub audio_window_ms: u32, @@ -45,7 +106,7 @@ impl Default for SyncAnalysisOptions { #[cfg(test)] mod tests { - use super::SyncAnalysisOptions; + use super::{SyncAnalysisOptions, SyncAnalysisReport}; #[test] fn default_options_match_live_probe_expectations() { @@ -56,4 +117,97 @@ mod tests { assert!((options.pulse_width_s - 0.12).abs() < f64::EPSILON); assert_eq!(options.marker_tick_period, 5); } + + #[test] + fn calibration_recommendation_requires_enough_pairs() { + let report = SyncAnalysisReport { + video_event_count: 4, + audio_event_count: 4, + paired_event_count: 4, + first_skew_ms: 20.0, + last_skew_ms: 20.0, + mean_skew_ms: 20.0, + median_skew_ms: 20.0, + max_abs_skew_ms: 20.0, + drift_ms: 0.0, + skews_ms: vec![20.0; 4], + video_onsets_s: vec![], + audio_onsets_s: vec![], + }; + + let recommendation = report.calibration_recommendation(); + assert!(!recommendation.ready); + assert_eq!(recommendation.recommended_audio_offset_adjust_us, 0); + assert!(recommendation.note.contains("need at least 8 paired pulses")); + } + + #[test] + fn calibration_recommendation_rejects_unstable_drift() { + let report = SyncAnalysisReport { + video_event_count: 12, + audio_event_count: 12, + paired_event_count: 12, + first_skew_ms: 10.0, + last_skew_ms: 70.0, + mean_skew_ms: 40.0, + median_skew_ms: 35.0, + max_abs_skew_ms: 70.0, + drift_ms: 60.0, + skews_ms: vec![10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0], + video_onsets_s: vec![], + audio_onsets_s: vec![], + }; + + let recommendation = report.calibration_recommendation(); + assert!(!recommendation.ready); + assert_eq!(recommendation.recommended_audio_offset_adjust_us, 0); + assert!(recommendation.note.contains("drift 60.0 ms exceeds")); + } + + #[test] + fn calibration_recommendation_maps_median_skew_to_audio_and_video_offsets() { + let report = SyncAnalysisReport { + video_event_count: 14, + audio_event_count: 14, + paired_event_count: 12, + first_skew_ms: 28.0, + last_skew_ms: 32.0, + mean_skew_ms: 30.0, + median_skew_ms: 30.0, + max_abs_skew_ms: 32.0, + drift_ms: 4.0, + skews_ms: vec![28.0, 30.0, 32.0], + video_onsets_s: vec![], + audio_onsets_s: vec![], + }; + + let recommendation = report.calibration_recommendation(); + assert!(recommendation.ready); + assert_eq!(recommendation.recommended_audio_offset_adjust_us, -30_000); + assert_eq!(recommendation.recommended_video_offset_adjust_us, 30_000); + assert!(recommendation.note.contains("move median skew")); + } + + #[test] + fn calibration_recommendation_reports_when_skew_is_already_settled() { + let report = SyncAnalysisReport { + video_event_count: 14, + audio_event_count: 14, + paired_event_count: 12, + first_skew_ms: 3.0, + last_skew_ms: 4.0, + mean_skew_ms: 3.5, + median_skew_ms: 4.0, + max_abs_skew_ms: 4.0, + drift_ms: 1.0, + skews_ms: vec![3.0, 4.0], + video_onsets_s: vec![], + audio_onsets_s: vec![], + }; + + let recommendation = report.calibration_recommendation(); + assert!(recommendation.ready); + assert_eq!(recommendation.recommended_audio_offset_adjust_us, -4_000); + assert!(recommendation.note.contains("already within the settled")); + } } diff --git a/docs/operational-env.md b/docs/operational-env.md index 6d78c90..5e9af1f 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -213,6 +213,10 @@ Hardware-facing assumptions belong near the code that uses them; this file is th | `LESAVKA_TOUCHPAD_SCALE` | input routing/clipboard override | | `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | +| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | +| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream playout override; fixed shared A/V buffer before remote presentation, defaults to `1000` | +| `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | +| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch | | `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override | | `LESAVKA_UPLINK_MIC_LEVEL` | client media capture/playback override | | `LESAVKA_USB_RECOVERY_` | USB recovery timing override | diff --git a/scripts/ci/quality_gate_baseline.json b/scripts/ci/quality_gate_baseline.json index b57a821..6733db5 100644 --- a/scripts/ci/quality_gate_baseline.json +++ b/scripts/ci/quality_gate_baseline.json @@ -1,4 +1,16 @@ { + "client/src/sync_probe/analyze/media_extract.rs": { + "line_percent": 97.96, + "loc": 245 + }, + "client/src/sync_probe/analyze/onset_detection/correlation.rs": { + "line_percent": 99.29, + "loc": 282 + }, + "client/src/sync_probe/analyze/onset_detection/correlation_collapse.rs": { + "line_percent": 98.73, + "loc": 237 + }, "files": { "client/src/app/audio_recovery_config.rs": { "line_percent": 100.0, @@ -17,8 +29,8 @@ "loc": 304 }, "client/src/bin/lesavka-sync-analyze.rs": { - "line_percent": 95.0, - "loc": 98 + "line_percent": 100.0, + "loc": 125 }, "client/src/bin/lesavka-sync-probe.rs": { "line_percent": 100.0, @@ -30,19 +42,19 @@ }, "client/src/input/camera.rs": { "line_percent": 100.0, - "loc": 61 + "loc": 62 }, "client/src/input/camera/bus_and_encoder.rs": { "line_percent": 100.0, "loc": 69 }, "client/src/input/camera/capture_pipeline.rs": { - "line_percent": 99.32, - "loc": 272 + "line_percent": 97.55, + "loc": 285 }, "client/src/input/camera/device_selection.rs": { - "line_percent": 97.62, - "loc": 100 + "line_percent": 97.67, + "loc": 101 }, "client/src/input/camera/encoder_selection.rs": { "line_percent": 100.0, @@ -94,7 +106,7 @@ }, "client/src/input/microphone.rs": { "line_percent": 100.0, - "loc": 416 + "loc": 419 }, "client/src/input/mouse.rs": { "line_percent": 98.85, @@ -150,7 +162,7 @@ }, "client/src/live_capture_clock.rs": { "line_percent": 100.0, - "loc": 56 + "loc": 203 }, "client/src/main.rs": { "line_percent": 100.0, @@ -185,20 +197,24 @@ "loc": 87 }, "client/src/sync_probe/analyze/media_extract.rs": { - "line_percent": 97.88, - "loc": 309 + "line_percent": 97.96, + "loc": 319 }, "client/src/sync_probe/analyze/onset_detection.rs": { "line_percent": 96.77, "loc": 274 }, "client/src/sync_probe/analyze/onset_detection/correlation.rs": { - "line_percent": 100.0, - "loc": 310 + "line_percent": 99.29, + "loc": 334 + }, + "client/src/sync_probe/analyze/onset_detection/correlation_collapse.rs": { + "line_percent": 98.73, + "loc": 311 }, "client/src/sync_probe/analyze/report.rs": { "line_percent": 100.0, - "loc": 59 + "loc": 213 }, "client/src/sync_probe/analyze/test_support.rs": { "line_percent": 98.67, @@ -358,11 +374,11 @@ }, "server/src/main/relay_service.rs": { "line_percent": 100.0, - "loc": 289 + "loc": 311 }, "server/src/main/relay_service_coverage.rs": { "line_percent": 100.0, - "loc": 179 + "loc": 183 }, "server/src/main/rpc_helpers.rs": { "line_percent": 100.0, @@ -394,7 +410,7 @@ }, "server/src/upstream_media_runtime.rs": { "line_percent": 100.0, - "loc": 369 + "loc": 376 }, "server/src/uvc_runtime.rs": { "line_percent": 98.48, @@ -418,23 +434,11 @@ }, "server/src/video_sinks/webcam_sink.rs": { "line_percent": 100.0, - "loc": 199 + "loc": 258 }, "server/src/video_support.rs": { "line_percent": 97.74, "loc": 263 } - }, - "client/src/sync_probe/analyze/media_extract.rs": { - "loc": 245, - "line_percent": 97.96 - }, - "client/src/sync_probe/analyze/onset_detection/correlation.rs": { - "loc": 282, - "line_percent": 99.29 - }, - "client/src/sync_probe/analyze/onset_detection/correlation_collapse.rs": { - "loc": 237, - "line_percent": 98.73 } } diff --git a/scripts/install/server.sh b/scripts/install/server.sh index d145402..57e03ca 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -484,6 +484,9 @@ fi printf 'LESAVKA_ALSA_DEV=%s\n' "${LESAVKA_ALSA_DEV:-hw:UAC2Gadget,0}" printf 'LESAVKA_UAC_HDMI_COMPENSATION_US=%s\n' "${LESAVKA_UAC_HDMI_COMPENSATION_US:-0}" printf 'LESAVKA_UAC_SESSION_CLOCK_ALIGN=%s\n' "${LESAVKA_UAC_SESSION_CLOCK_ALIGN:-0}" + printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}" + printf 'LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-0}" + printf 'LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US:-0}" } | sudo tee /etc/lesavka/server.env >/dev/null echo "==> 6a. Systemd units - lesavka-core" @@ -609,6 +612,11 @@ else fi sudo systemctl restart lesavka-server +INSTALLED_VERSION=$(/usr/local/bin/lesavka-server --version 2>/dev/null | head -n1 || true) +INSTALLED_SHA=$(git -C "$SCRIPT_REPO_ROOT" rev-parse --short HEAD 2>/dev/null || true) echo "✅ lesavka-server installed and restarted..." +if [[ -n $INSTALLED_VERSION || -n $INSTALLED_SHA ]]; then + echo "➡️ Installed: ${INSTALLED_VERSION:-lesavka-server}${INSTALLED_SHA:+ ($INSTALLED_SHA)}" +fi echo "➡️ Status: sudo systemctl status lesavka-server --no-pager" echo "➡️ Logs: sudo journalctl -u lesavka-server -f --no-pager" diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 5074288..0ba9d93 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -158,7 +158,18 @@ impl Relay for Handler { let Some(mut pkt) = next_packet.transpose()? else { break; }; - pkt.pts = upstream_media_rt.map_audio_pts(pkt.pts); + let plan = upstream_media_rt.plan_audio_pts(pkt.pts); + if plan.late_by > Duration::from_millis(20) { + tracing::warn!( + rpc_id, + session_id = lease.session_id, + late_by_ms = plan.late_by.as_millis(), + pts = plan.local_pts_us, + "🎤 upstream audio packet missed its planned playout deadline" + ); + } + tokio::time::sleep_until(plan.due_at).await; + pkt.pts = plan.local_pts_us; let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 5 || n.is_multiple_of(3_000) { tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); @@ -219,7 +230,18 @@ impl Relay for Handler { let Some(mut pkt) = next_packet.transpose()? else { break; }; - pkt.pts = upstream_media_rt.map_video_pts(pkt.pts, frame_step_us); + let plan = upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us); + if plan.late_by > Duration::from_millis(20) { + tracing::warn!( + rpc_id, + session_id, + late_by_ms = plan.late_by.as_millis(), + pts = plan.local_pts_us, + "🎥 upstream video packet missed its planned playout deadline" + ); + } + tokio::time::sleep_until(plan.due_at).await; + pkt.pts = plan.local_pts_us; relay.feed(pkt); // ← all logging inside video.rs } upstream_media_rt.close_camera(upstream_lease.generation); diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 3797fc1..542cb66 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -89,7 +89,9 @@ impl Relay for Handler { let Some(mut pkt) = next_packet.transpose()? else { break; }; - pkt.pts = upstream_media_rt.map_audio_pts(pkt.pts); + let plan = upstream_media_rt.plan_audio_pts(pkt.pts); + tokio::time::sleep_until(plan.due_at).await; + pkt.pts = plan.local_pts_us; sink.push(&pkt); } sink.finish(); @@ -128,7 +130,9 @@ impl Relay for Handler { let Some(mut pkt) = next_packet.transpose()? else { break; }; - pkt.pts = upstream_media_rt.map_video_pts(pkt.pts, frame_step_us); + let plan = upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us); + tokio::time::sleep_until(plan.due_at).await; + pkt.pts = plan.local_pts_us; relay.feed(pkt); } upstream_media_rt.close_camera(upstream_lease.generation); diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 03bbdbe..ffe7337 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -2,7 +2,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::time::Instant; use tracing::info; /// Logical upstream media kinds that share one live-call session timeline. @@ -23,6 +25,17 @@ pub struct UpstreamStreamLease { pub generation: u64, } +/// One rebased upstream packet plus its planned server playout time. +#[derive(Clone, Copy, Debug)] +pub struct PlannedUpstreamPacket { + /// Session-local packet timestamp after rebase onto the shared server clock. + pub local_pts_us: u64, + /// Wall-clock deadline when the server should present this packet. + pub due_at: Instant, + /// How late the packet already is when planned, if any. + pub late_by: Duration, +} + #[derive(Debug, Default)] struct UpstreamClockState { session_id: u64, @@ -35,6 +48,7 @@ struct UpstreamClockState { camera_packet_count: u64, microphone_packet_count: u64, startup_anchor_logged: bool, + playout_epoch: Option, } fn upstream_timing_trace_enabled() -> bool { @@ -50,6 +64,34 @@ fn upstream_timing_trace_enabled() -> bool { .unwrap_or(false) } +fn upstream_playout_delay() -> Duration { + let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(1_000); + Duration::from_millis(delay_ms) +} + +fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 { + let name = match kind { + UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", + UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", + }; + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(0) +} + +fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant { + if offset_us >= 0 { + base + Duration::from_micros(offset_us as u64) + } else { + base.checked_sub(Duration::from_micros(offset_us.unsigned_abs())) + .unwrap_or(base) + } +} + /// Coordinate upstream stream ownership and keep audio/video on one timeline. /// /// Inputs: stream-open/close events plus remote packet timestamps. @@ -136,6 +178,7 @@ impl UpstreamMediaRuntime { state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; + state.playout_epoch = None; } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), @@ -205,26 +248,45 @@ impl UpstreamMediaRuntime { state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; + state.playout_epoch = None; } } /// Rebase one upstream video packet timestamp onto the shared session clock. #[must_use] pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> u64 { - self.map_pts( + self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) + .local_pts_us + } + + /// Rebase one upstream audio packet timestamp onto the shared session clock. + #[must_use] + pub fn map_audio_pts(&self, remote_pts_us: u64) -> u64 { + self.plan_audio_pts(remote_pts_us).local_pts_us + } + + /// Rebase and schedule one upstream video packet on the shared playout epoch. + #[must_use] + pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> PlannedUpstreamPacket { + self.plan_pts( UpstreamMediaKind::Camera, remote_pts_us, frame_step_us.max(1), ) } - /// Rebase one upstream audio packet timestamp onto the shared session clock. + /// Rebase and schedule one upstream audio packet on the shared playout epoch. #[must_use] - pub fn map_audio_pts(&self, remote_pts_us: u64) -> u64 { - self.map_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) + pub fn plan_audio_pts(&self, remote_pts_us: u64) -> PlannedUpstreamPacket { + self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } - fn map_pts(&self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64) -> u64 { + fn plan_pts( + &self, + kind: UpstreamMediaKind, + remote_pts_us: u64, + min_step_us: u64, + ) -> PlannedUpstreamPacket { let mut state = self .state .lock() @@ -274,10 +336,20 @@ impl UpstreamMediaRuntime { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); + let now = Instant::now(); + let epoch = *state + .playout_epoch + .get_or_insert_with(|| now + upstream_playout_delay()); + let sink_offset_us = upstream_playout_offset_us(kind); + let due_at = + apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); + let late_by = now.checked_duration_since(due_at).unwrap_or_default(); if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { let remote_elapsed_us = remote_pts_us.saturating_sub(first_remote_for_kind); + let playout_delay_us = epoch.saturating_duration_since(now).as_micros(); + let late_by_us = late_by.as_micros(); info!( session_id, ?kind, @@ -286,175 +358,19 @@ impl UpstreamMediaRuntime { first_remote_for_kind, remote_elapsed_us, local_pts_us, + playout_delay_us, + sink_offset_us, + late_by_us, "upstream media rebase sample" ); } - local_pts_us + PlannedUpstreamPacket { + local_pts_us, + due_at, + late_by, + } } } #[cfg(test)] -mod tests { - use super::{UpstreamMediaKind, UpstreamMediaRuntime}; - use std::sync::Arc; - use std::time::Duration; - - #[test] - fn first_stream_starts_a_new_shared_session() { - let runtime = UpstreamMediaRuntime::new(); - let camera = runtime.activate_camera(); - let microphone = runtime.activate_microphone(); - - assert_eq!(camera.session_id, 1); - assert_eq!(microphone.session_id, 1); - assert!(runtime.is_camera_active(camera.generation)); - assert!(runtime.is_microphone_active(microphone.generation)); - } - - #[test] - fn replacing_one_kind_keeps_the_session_but_preempts_the_old_owner() { - let runtime = UpstreamMediaRuntime::new(); - let first = runtime.activate_microphone(); - let second = runtime.activate_microphone(); - - assert_eq!(first.session_id, second.session_id); - assert!(!runtime.is_microphone_active(first.generation)); - assert!(runtime.is_microphone_active(second.generation)); - } - - #[test] - fn closing_the_last_stream_resets_the_next_session_anchor() { - let runtime = UpstreamMediaRuntime::new(); - let camera = runtime.activate_camera(); - let microphone = runtime.activate_microphone(); - runtime.close_camera(camera.generation); - runtime.close_microphone(microphone.generation); - - let next = runtime.activate_camera(); - assert_eq!(next.session_id, 2); - } - - #[test] - fn shared_clock_rebases_audio_and_video_against_the_same_origin() { - let runtime = UpstreamMediaRuntime::new(); - let _camera = runtime.activate_camera(); - let _microphone = runtime.activate_microphone(); - - let video_first = runtime.map_video_pts(1_000_000, 16_666); - let audio_first = runtime.map_audio_pts(1_000_000); - let audio_next = runtime.map_audio_pts(1_010_000); - let video_next = runtime.map_video_pts(1_033_333, 16_666); - - assert_eq!(video_first, 0); - assert_eq!(audio_first, 0); - assert_eq!(audio_next, 10_000); - assert_eq!(video_next, 33_333); - } - - #[test] - fn per_kind_session_bases_cancel_constant_startup_path_offsets() { - let runtime = UpstreamMediaRuntime::new(); - let _camera = runtime.activate_camera(); - let _microphone = runtime.activate_microphone(); - - let audio_first = runtime.map_audio_pts(1_000_000); - let video_first = runtime.map_video_pts(1_300_000, 16_666); - let audio_next = runtime.map_audio_pts(1_010_000); - let video_next = runtime.map_video_pts(1_333_333, 16_666); - - assert_eq!(audio_first, 0); - assert_eq!(video_first, 0); - assert_eq!(audio_next, 10_000); - assert_eq!(video_next, 33_333); - } - - #[test] - fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() { - let runtime = UpstreamMediaRuntime::new(); - let _camera = runtime.activate_camera(); - - let first = runtime.map_video_pts(50_000, 16_666); - let repeated = runtime.map_video_pts(50_000, 16_666); - - assert_eq!(first, 0); - assert_eq!(repeated, 16_666); - } - - #[test] - fn close_ignores_superseded_generation_values() { - let runtime = UpstreamMediaRuntime::new(); - let first = runtime.activate_camera(); - let second = runtime.activate_camera(); - runtime.close_camera(first.generation); - - assert!(runtime.is_camera_active(second.generation)); - runtime.close(UpstreamMediaKind::Camera, second.generation); - let next = runtime.activate_camera(); - assert_eq!(next.session_id, 2); - } - - #[tokio::test(flavor = "current_thread")] - async fn new_microphone_owner_waits_for_the_previous_sink_to_release() { - let runtime = Arc::new(UpstreamMediaRuntime::new()); - let first = runtime.activate_microphone(); - let first_permit = runtime - .reserve_microphone_sink(first.generation) - .await - .expect("first owner should acquire the sink gate"); - let second = runtime.activate_microphone(); - - let waiter = tokio::spawn({ - let runtime = runtime.clone(); - async move { - runtime - .reserve_microphone_sink(second.generation) - .await - .is_some() - } - }); - - tokio::time::sleep(Duration::from_millis(25)).await; - assert!(!waiter.is_finished()); - - drop(first_permit); - assert!(waiter.await.expect("waiter task should finish")); - } - - #[tokio::test(flavor = "current_thread")] - async fn superseded_microphone_waiter_stands_down_before_opening_a_sink() { - let runtime = Arc::new(UpstreamMediaRuntime::new()); - let first = runtime.activate_microphone(); - let first_permit = runtime - .reserve_microphone_sink(first.generation) - .await - .expect("first owner should acquire the sink gate"); - let second = runtime.activate_microphone(); - - let superseded_waiter = tokio::spawn({ - let runtime = runtime.clone(); - async move { - runtime - .reserve_microphone_sink(second.generation) - .await - .is_some() - } - }); - - tokio::time::sleep(Duration::from_millis(25)).await; - let third = runtime.activate_microphone(); - drop(first_permit); - - assert!( - !superseded_waiter - .await - .expect("superseded waiter task should finish"), - "older waiter should stand down instead of opening a sink after supersession" - ); - - let third_permit = runtime - .reserve_microphone_sink(third.generation) - .await - .expect("latest owner should acquire the sink gate"); - drop(third_permit); - } -} +mod tests; diff --git a/server/src/upstream_media_runtime/tests.rs b/server/src/upstream_media_runtime/tests.rs new file mode 100644 index 0000000..6019c39 --- /dev/null +++ b/server/src/upstream_media_runtime/tests.rs @@ -0,0 +1,267 @@ +use super::{UpstreamMediaKind, UpstreamMediaRuntime}; +use std::sync::Arc; +use std::time::Duration; + +#[test] +fn first_stream_starts_a_new_shared_session() { + let runtime = UpstreamMediaRuntime::new(); + let camera = runtime.activate_camera(); + let microphone = runtime.activate_microphone(); + + assert_eq!(camera.session_id, 1); + assert_eq!(microphone.session_id, 1); + assert!(runtime.is_camera_active(camera.generation)); + assert!(runtime.is_microphone_active(microphone.generation)); +} + +#[test] +fn replacing_one_kind_keeps_the_session_but_preempts_the_old_owner() { + let runtime = UpstreamMediaRuntime::new(); + let first = runtime.activate_microphone(); + let second = runtime.activate_microphone(); + + assert_eq!(first.session_id, second.session_id); + assert!(!runtime.is_microphone_active(first.generation)); + assert!(runtime.is_microphone_active(second.generation)); +} + +#[test] +fn closing_the_last_stream_resets_the_next_session_anchor() { + let runtime = UpstreamMediaRuntime::new(); + let camera = runtime.activate_camera(); + let microphone = runtime.activate_microphone(); + runtime.close_camera(camera.generation); + runtime.close_microphone(microphone.generation); + + let next = runtime.activate_camera(); + assert_eq!(next.session_id, 2); +} + +#[test] +fn shared_clock_rebases_audio_and_video_against_the_same_origin() { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + let video_first = runtime.map_video_pts(1_000_000, 16_666); + let audio_first = runtime.map_audio_pts(1_000_000); + let audio_next = runtime.map_audio_pts(1_010_000); + let video_next = runtime.map_video_pts(1_033_333, 16_666); + + assert_eq!(video_first, 0); + assert_eq!(audio_first, 0); + assert_eq!(audio_next, 10_000); + assert_eq!(video_next, 33_333); +} + +#[test] +fn per_kind_session_bases_cancel_constant_startup_path_offsets() { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + let audio_first = runtime.map_audio_pts(1_000_000); + let video_first = runtime.map_video_pts(1_300_000, 16_666); + let audio_next = runtime.map_audio_pts(1_010_000); + let video_next = runtime.map_video_pts(1_333_333, 16_666); + + assert_eq!(audio_first, 0); + assert_eq!(video_first, 0); + assert_eq!(audio_next, 10_000); + assert_eq!(video_next, 33_333); +} + +#[test] +fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + + let first = runtime.map_video_pts(50_000, 16_666); + let repeated = runtime.map_video_pts(50_000, 16_666); + + assert_eq!(first, 0); + assert_eq!(repeated, 16_666); +} + +#[test] +fn close_ignores_superseded_generation_values() { + let runtime = UpstreamMediaRuntime::new(); + let first = runtime.activate_camera(); + let second = runtime.activate_camera(); + runtime.close_camera(first.generation); + + assert!(runtime.is_camera_active(second.generation)); + runtime.close(super::UpstreamMediaKind::Camera, second.generation); + let next = runtime.activate_camera(); + assert_eq!(next.session_id, 2); +} + +#[test] +fn upstream_playout_delay_defaults_to_one_second_and_accepts_overrides() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", || { + assert_eq!(super::upstream_playout_delay(), Duration::from_secs(1)); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("250"), || { + assert_eq!(super::upstream_playout_delay(), Duration::from_millis(250)); + }); +} + +#[test] +fn upstream_playout_offsets_default_to_zero_and_accept_overrides() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", || { + temp_env::with_var_unset("LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", || { + assert_eq!( + super::upstream_playout_offset_us(UpstreamMediaKind::Microphone), + 0 + ); + assert_eq!( + super::upstream_playout_offset_us(UpstreamMediaKind::Camera), + 0 + ); + }); + }); + + temp_env::with_var( + "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", + Some("-20000"), + || { + temp_env::with_var( + "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", + Some("35000"), + || { + assert_eq!( + super::upstream_playout_offset_us(UpstreamMediaKind::Microphone), + -20_000 + ); + assert_eq!( + super::upstream_playout_offset_us(UpstreamMediaKind::Camera), + 35_000 + ); + }, + ); + }, + ); +} + +#[test] +fn upstream_timing_trace_flag_accepts_false_values() { + temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("off"), || { + assert!(!super::upstream_timing_trace_enabled()); + }); + temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("false"), || { + assert!(!super::upstream_timing_trace_enabled()); + }); + temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { + assert!(super::upstream_timing_trace_enabled()); + }); +} + +#[test] +fn apply_playout_offset_supports_negative_offsets() { + let base = tokio::time::Instant::now() + Duration::from_millis(50); + let shifted = super::apply_playout_offset(base, -20_000); + let delta = base.saturating_duration_since(shifted); + assert_eq!(delta, Duration::from_micros(20_000)); +} + +#[test] +fn shared_playout_epoch_is_reused_across_audio_and_video() { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + let video_first = runtime.plan_video_pts(1_000_000, 16_666); + let audio_first = runtime.plan_audio_pts(1_000_000); + let audio_next = runtime.plan_audio_pts(1_010_000); + + assert_eq!(video_first.local_pts_us, 0); + assert_eq!(audio_first.local_pts_us, 0); + assert_eq!(video_first.due_at, audio_first.due_at); + assert_eq!( + audio_next + .due_at + .saturating_duration_since(audio_first.due_at), + Duration::from_micros(10_000) + ); +} + +#[test] +fn shared_playout_trace_path_keeps_planned_pts_stable() { + temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + let video = runtime.plan_video_pts(1_000_000, 16_666); + let audio = runtime.plan_audio_pts(1_000_000); + + assert_eq!(video.local_pts_us, 0); + assert_eq!(audio.local_pts_us, 0); + }); +} + +#[tokio::test(flavor = "current_thread")] +async fn new_microphone_owner_waits_for_the_previous_sink_to_release() { + let runtime = Arc::new(UpstreamMediaRuntime::new()); + let first = runtime.activate_microphone(); + let first_permit = runtime + .reserve_microphone_sink(first.generation) + .await + .expect("first owner should acquire the sink gate"); + let second = runtime.activate_microphone(); + + let waiter = tokio::spawn({ + let runtime = runtime.clone(); + async move { + runtime + .reserve_microphone_sink(second.generation) + .await + .is_some() + } + }); + + tokio::time::sleep(Duration::from_millis(25)).await; + assert!(!waiter.is_finished()); + + drop(first_permit); + assert!(waiter.await.expect("waiter task should finish")); +} + +#[tokio::test(flavor = "current_thread")] +async fn superseded_microphone_waiter_stands_down_before_opening_a_sink() { + let runtime = Arc::new(UpstreamMediaRuntime::new()); + let first = runtime.activate_microphone(); + let first_permit = runtime + .reserve_microphone_sink(first.generation) + .await + .expect("first owner should acquire the sink gate"); + let second = runtime.activate_microphone(); + + let superseded_waiter = tokio::spawn({ + let runtime = runtime.clone(); + async move { + runtime + .reserve_microphone_sink(second.generation) + .await + .is_some() + } + }); + + tokio::time::sleep(Duration::from_millis(25)).await; + let third = runtime.activate_microphone(); + drop(first_permit); + + assert!( + !superseded_waiter + .await + .expect("superseded waiter task should finish"), + "older waiter should stand down instead of opening a sink after supersession" + ); + + let third_permit = runtime + .reserve_microphone_sink(third.generation) + .await + .expect("latest owner should acquire the sink gate"); + drop(third_permit); +} diff --git a/testing/tests/client_camera_include_contract.rs b/testing/tests/client_camera_include_contract.rs index fc085e9..90d1242 100644 --- a/testing/tests/client_camera_include_contract.rs +++ b/testing/tests/client_camera_include_contract.rs @@ -5,6 +5,11 @@ //! Targets: `client/src/input/camera.rs`. //! Why: camera startup should remain robust across codec/env permutations. +#[allow(warnings)] +mod live_capture_clock { + include!("support/live_capture_clock_shim.rs"); +} + #[allow(warnings)] mod camera_include_contract { include!(env!("LESAVKA_CLIENT_CAMERA_SRC")); @@ -417,4 +422,26 @@ mod camera_include_contract { } } } + + #[test] + fn camera_timing_helpers_cover_first_packet_and_trace_enabled_paths() { + log_camera_first_packet(0, 128, 42_000); + assert!(!should_log_camera_timing_sample(11)); + + with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { + assert!(should_log_camera_timing_sample(0)); + log_camera_timing_sample( + 0, + crate::live_capture_clock::RebasedSourcePts { + packet_pts_us: 12_345, + capture_now_us: 12_999, + source_pts_us: Some(5_000), + source_base_us: Some(5_000), + capture_base_us: Some(7_345), + used_source_pts: true, + }, + 256, + ); + }); + } } diff --git a/testing/tests/client_microphone_include_contract.rs b/testing/tests/client_microphone_include_contract.rs index a6a67dc..6ee54a4 100644 --- a/testing/tests/client_microphone_include_contract.rs +++ b/testing/tests/client_microphone_include_contract.rs @@ -5,6 +5,11 @@ //! Targets: `client/src/input/microphone.rs`. //! Why: source selection regressions should be caught with deterministic tests. +#[allow(warnings)] +mod live_capture_clock { + include!("support/live_capture_clock_shim.rs"); +} + #[allow(warnings)] mod microphone_include_contract { include!(env!("LESAVKA_CLIENT_MICROPHONE_SRC")); @@ -302,6 +307,7 @@ JSON pipeline: gst::Pipeline::new(), sink, level_tap_running: Some(std::sync::Arc::clone(&running)), + pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(), }; assert!( cap.pull().is_none(), @@ -424,6 +430,7 @@ JSON pipeline, sink, level_tap_running: None, + pts_rebaser: crate::live_capture_clock::SourcePtsRebaser::default(), }; let first_pkt = cap.pull().expect("first audio packet"); let second_pkt = cap.pull().expect("second audio packet"); diff --git a/testing/tests/client_microphone_source_contract.rs b/testing/tests/client_microphone_source_contract.rs index 427d2f0..ec03d83 100644 --- a/testing/tests/client_microphone_source_contract.rs +++ b/testing/tests/client_microphone_source_contract.rs @@ -6,6 +6,11 @@ //! Why: launcher-selected Pulse source names must not regress to PipeWire //! negotiation when the catalog already provides a concrete Pulse device name. +#[allow(warnings)] +mod live_capture_clock { + include!("support/live_capture_clock_shim.rs"); +} + #[allow(warnings)] mod microphone_source_contract { include!(env!("LESAVKA_CLIENT_MICROPHONE_SRC")); diff --git a/testing/tests/client_microphone_startup_contract.rs b/testing/tests/client_microphone_startup_contract.rs index f3510e7..881f904 100644 --- a/testing/tests/client_microphone_startup_contract.rs +++ b/testing/tests/client_microphone_startup_contract.rs @@ -6,6 +6,11 @@ //! Why: startup failures should move the pipeline back to NULL before the //! capture object returns an error. +#[allow(warnings)] +mod live_capture_clock { + include!("support/live_capture_clock_shim.rs"); +} + #[allow(warnings)] mod microphone_startup_contract { include!(env!("LESAVKA_CLIENT_MICROPHONE_SRC")); diff --git a/testing/tests/client_microphone_tap_contract.rs b/testing/tests/client_microphone_tap_contract.rs index bc631cd..172e692 100644 --- a/testing/tests/client_microphone_tap_contract.rs +++ b/testing/tests/client_microphone_tap_contract.rs @@ -6,6 +6,11 @@ //! Why: the local launcher tap should stay best-effort and never destabilize //! microphone uplink startup. +#[allow(warnings)] +mod live_capture_clock { + include!("support/live_capture_clock_shim.rs"); +} + #[allow(warnings)] mod microphone_tap_contract { include!(env!("LESAVKA_CLIENT_MICROPHONE_SRC")); diff --git a/testing/tests/server_install_script_contract.rs b/testing/tests/server_install_script_contract.rs index 97b4f51..8762927 100644 --- a/testing/tests/server_install_script_contract.rs +++ b/testing/tests/server_install_script_contract.rs @@ -18,6 +18,9 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { "LESAVKA_HDMI_HEIGHT=%s", "LESAVKA_HDMI_SINK=%s", "LESAVKA_HDMI_FBDEV=%s", + "LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s", + "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s", + "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s", ] { assert!( SERVER_INSTALL.contains(expected), @@ -30,6 +33,23 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_WIDTH:-1920}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_HEIGHT:-1080}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_SINK:-fbdevsink}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}")); +} + +#[test] +fn server_install_reports_installed_version_and_revision() { + assert!( + SERVER_INSTALL.contains("Installed:"), + "install script should print the installed build identity at the end" + ); + assert!( + SERVER_INSTALL.contains("lesavka-server --version"), + "install script should query the installed binary version" + ); + assert!( + SERVER_INSTALL.contains("git -C \"$SCRIPT_REPO_ROOT\" rev-parse --short HEAD"), + "install script should print the installed git revision for operator clarity" + ); } #[test] diff --git a/testing/tests/server_upstream_media_contract.rs b/testing/tests/server_upstream_media_contract.rs index 4ef66f0..5e1c9da 100644 --- a/testing/tests/server_upstream_media_contract.rs +++ b/testing/tests/server_upstream_media_contract.rs @@ -94,36 +94,38 @@ mod server_upstream_media { fn stream_microphone_accepts_upstream_audio_packets() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { - rt.block_on(async { - let (_dir, handler) = build_handler_for_tests(); - let (server, mut cli) = serve_handler(handler).await; - let (tx, rx) = tokio::sync::mpsc::channel(4); + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (tx, rx) = tokio::sync::mpsc::channel(4); - tx.send(AudioPacket { - id: 0, - pts: 12_345, - data: vec![1, 2, 3, 4, 5, 6], - }) - .await - .expect("send synthetic upstream audio"); - drop(tx); - - let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - let mut response = cli - .stream_microphone(tonic::Request::new(outbound)) + tx.send(AudioPacket { + id: 0, + pts: 12_345, + data: vec![1, 2, 3, 4, 5, 6], + }) .await - .expect("microphone stream should open"); - let ack = tokio::time::timeout( - std::time::Duration::from_secs(1), - response.get_mut().message(), - ) - .await - .expect("microphone ack timeout") - .expect("microphone ack grpc") - .expect("microphone ack item"); - assert_eq!(ack, Empty {}); + .expect("send synthetic upstream audio"); + drop(tx); - server.abort(); + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + let mut response = cli + .stream_microphone(tonic::Request::new(outbound)) + .await + .expect("microphone stream should open"); + let ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + response.get_mut().message(), + ) + .await + .expect("microphone ack timeout") + .expect("microphone ack grpc") + .expect("microphone ack item"); + assert_eq!(ack, Empty {}); + + server.abort(); + }); }); }); } @@ -133,36 +135,39 @@ mod server_upstream_media { fn stream_microphone_supersedes_the_previous_owner_cleanly() { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { - rt.block_on(async { - let (_dir, handler) = build_handler_for_tests(); - let (server, mut cli) = serve_handler(handler).await; - let (first_tx, first_rx) = tokio::sync::mpsc::channel(1); - let (_second_tx, second_rx) = tokio::sync::mpsc::channel(1); + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (first_tx, first_rx) = tokio::sync::mpsc::channel(1); + let (_second_tx, second_rx) = tokio::sync::mpsc::channel(1); - let mut first = cli - .stream_microphone(tonic::Request::new( - tokio_stream::wrappers::ReceiverStream::new(first_rx), - )) - .await - .expect("first microphone stream") - .into_inner(); + let mut first = cli + .stream_microphone(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(first_rx), + )) + .await + .expect("first microphone stream") + .into_inner(); - let _second = cli - .stream_microphone(tonic::Request::new( - tokio_stream::wrappers::ReceiverStream::new(second_rx), - )) - .await - .expect("second microphone stream supersedes first"); + let _second = cli + .stream_microphone(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(second_rx), + )) + .await + .expect("second microphone stream supersedes first"); - drop(first_tx); - let ack = tokio::time::timeout(std::time::Duration::from_secs(1), first.message()) - .await - .expect("superseded microphone ack timeout") - .expect("superseded microphone ack grpc") - .expect("superseded microphone ack item"); - assert_eq!(ack, Empty {}); + drop(first_tx); + let ack = + tokio::time::timeout(std::time::Duration::from_secs(1), first.message()) + .await + .expect("superseded microphone ack timeout") + .expect("superseded microphone ack grpc") + .expect("superseded microphone ack item"); + assert_eq!(ack, Empty {}); - server.abort(); + server.abort(); + }); }); }); } @@ -198,37 +203,39 @@ mod server_upstream_media { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { - rt.block_on(async { - let (_dir, handler) = build_handler_for_tests(); - let (server, mut cli) = serve_handler(handler).await; - let (tx, rx) = tokio::sync::mpsc::channel(4); + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (tx, rx) = tokio::sync::mpsc::channel(4); - tx.send(VideoPacket { - id: 2, - pts: 54_321, - data: vec![0, 0, 0, 1, 0x65, 0x88], - ..Default::default() - }) - .await - .expect("send synthetic upstream video"); - drop(tx); - - let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); - let mut response = cli - .stream_camera(tonic::Request::new(outbound)) + tx.send(VideoPacket { + id: 2, + pts: 54_321, + data: vec![0, 0, 0, 1, 0x65, 0x88], + ..Default::default() + }) .await - .expect("camera stream should open"); - let ack = tokio::time::timeout( - std::time::Duration::from_secs(1), - response.get_mut().message(), - ) - .await - .expect("camera ack timeout") - .expect("camera ack grpc") - .expect("camera ack item"); - assert_eq!(ack, Empty {}); + .expect("send synthetic upstream video"); + drop(tx); - server.abort(); + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + let mut response = cli + .stream_camera(tonic::Request::new(outbound)) + .await + .expect("camera stream should open"); + let ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + response.get_mut().message(), + ) + .await + .expect("camera ack timeout") + .expect("camera ack grpc") + .expect("camera ack item"); + assert_eq!(ack, Empty {}); + + server.abort(); + }); }); }); }); @@ -240,48 +247,52 @@ mod server_upstream_media { let rt = tokio::runtime::Runtime::new().expect("runtime"); with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { - rt.block_on(async { - let (_dir, handler) = build_handler_for_tests(); - let (server, mut cli) = serve_handler(handler).await; - let (first_tx, first_rx) = tokio::sync::mpsc::channel(4); - let (second_tx, second_rx) = tokio::sync::mpsc::channel(1); + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (first_tx, first_rx) = tokio::sync::mpsc::channel(4); + let (second_tx, second_rx) = tokio::sync::mpsc::channel(1); - let mut first = cli - .stream_camera(tonic::Request::new( - tokio_stream::wrappers::ReceiverStream::new(first_rx), - )) - .await - .expect("first camera stream") - .into_inner(); - - let _second = cli - .stream_camera(tonic::Request::new( - tokio_stream::wrappers::ReceiverStream::new(second_rx), - )) - .await - .expect("second camera stream supersedes first"); - - first_tx - .send(VideoPacket { - id: 2, - pts: 99, - data: vec![0, 0, 0, 1, 0x65], - ..Default::default() - }) - .await - .expect("send packet to first stream"); - drop(first_tx); - - let ack = - tokio::time::timeout(std::time::Duration::from_secs(1), first.message()) + let mut first = cli + .stream_camera(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(first_rx), + )) .await - .expect("superseded camera ack timeout") - .expect("superseded camera ack grpc") - .expect("superseded camera ack item"); - assert_eq!(ack, Empty {}); - drop(second_tx); + .expect("first camera stream") + .into_inner(); - server.abort(); + let _second = cli + .stream_camera(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(second_rx), + )) + .await + .expect("second camera stream supersedes first"); + + first_tx + .send(VideoPacket { + id: 2, + pts: 99, + data: vec![0, 0, 0, 1, 0x65], + ..Default::default() + }) + .await + .expect("send packet to first stream"); + drop(first_tx); + + let ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + first.message(), + ) + .await + .expect("superseded camera ack timeout") + .expect("superseded camera ack grpc") + .expect("superseded camera ack item"); + assert_eq!(ack, Empty {}); + drop(second_tx); + + server.abort(); + }); }); }); }); diff --git a/testing/tests/support/live_capture_clock_shim.rs b/testing/tests/support/live_capture_clock_shim.rs new file mode 100644 index 0000000..982d133 --- /dev/null +++ b/testing/tests/support/live_capture_clock_shim.rs @@ -0,0 +1,82 @@ +use std::sync::{Mutex, OnceLock}; +use std::time::{Duration, Instant}; + +fn capture_clock_origin() -> &'static Instant { + static ORIGIN: OnceLock = OnceLock::new(); + ORIGIN.get_or_init(Instant::now) +} + +pub fn capture_pts_us() -> u64 { + capture_clock_origin().elapsed().as_micros() as u64 +} + +pub fn upstream_timing_trace_enabled() -> bool { + std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE") + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(false) +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct RebasedSourcePts { + pub packet_pts_us: u64, + pub capture_now_us: u64, + pub source_pts_us: Option, + pub source_base_us: Option, + pub capture_base_us: Option, + pub used_source_pts: bool, +} + +#[derive(Debug, Default)] +struct SourcePtsRebaserState { + source_base_us: Option, + capture_base_us: Option, + last_packet_pts_us: Option, +} + +#[derive(Debug, Default)] +pub struct SourcePtsRebaser { + state: Mutex, +} + +impl SourcePtsRebaser { + pub fn rebase_or_now(&self, source_pts_us: Option, min_step_us: u64) -> RebasedSourcePts { + let capture_now_us = capture_pts_us(); + let mut state = self + .state + .lock() + .expect("source pts rebaser mutex poisoned"); + let mut packet_pts_us = capture_now_us; + let mut used_source_pts = false; + + if let Some(source_pts_us) = source_pts_us { + let source_base_us = *state.source_base_us.get_or_insert(source_pts_us); + let capture_base_us = *state.capture_base_us.get_or_insert(capture_now_us); + packet_pts_us = + capture_base_us.saturating_add(source_pts_us.saturating_sub(source_base_us)); + used_source_pts = true; + } + + if let Some(last_packet_pts_us) = state.last_packet_pts_us + && packet_pts_us <= last_packet_pts_us + { + packet_pts_us = last_packet_pts_us.saturating_add(min_step_us.max(1)); + } + + state.last_packet_pts_us = Some(packet_pts_us); + RebasedSourcePts { + packet_pts_us, + capture_now_us, + source_pts_us, + source_base_us: state.source_base_us, + capture_base_us: state.capture_base_us, + used_source_pts, + } + } +}