diff --git a/Cargo.lock b/Cargo.lock index ea2e670..2f92f50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.13.17" +version = "0.14.0" dependencies = [ "anyhow", "async-stream", @@ -1676,7 +1676,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.13.17" +version = "0.14.0" dependencies = [ "anyhow", "base64", @@ -1688,7 +1688,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.13.17" +version = "0.14.0" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 13b59a4..a244b37 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.13.17" +version = "0.14.0" edition = "2024" [dependencies] diff --git a/client/src/sync_probe/analyze/report.rs b/client/src/sync_probe/analyze/report.rs index d1a915a..2b78e30 100644 --- a/client/src/sync_probe/analyze/report.rs +++ b/client/src/sync_probe/analyze/report.rs @@ -138,7 +138,11 @@ mod tests { let recommendation = report.calibration_recommendation(); assert!(!recommendation.ready); assert_eq!(recommendation.recommended_audio_offset_adjust_us, 0); - assert!(recommendation.note.contains("need at least 8 paired pulses")); + assert!( + recommendation + .note + .contains("need at least 8 paired pulses") + ); } #[test] diff --git a/common/Cargo.toml b/common/Cargo.toml index 9f531cc..1127fee 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.13.17" +version = "0.14.0" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index 5e9af1f..f100589 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -214,7 +214,9 @@ Hardware-facing assumptions belong near the code that uses them; this file is th | `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | | `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | -| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream playout override; fixed shared A/V buffer before remote presentation, defaults to `1000` | +| `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may lead the planned audio-master capture moment before the frame is held or dropped, defaults to `20000` | +| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization window; the server uses this shared buffer to pair webcam frames with their matching gadget-mic audio before remote presentation, defaults to `1000` | +| `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` | | `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | | `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch | | `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override | diff --git a/scripts/ci/quality_gate_baseline.json b/scripts/ci/quality_gate_baseline.json index 6733db5..e4c491c 100644 --- a/scripts/ci/quality_gate_baseline.json +++ b/scripts/ci/quality_gate_baseline.json @@ -214,7 +214,7 @@ }, "client/src/sync_probe/analyze/report.rs": { "line_percent": 100.0, - "loc": 213 + "loc": 217 }, "client/src/sync_probe/analyze/test_support.rs": { "line_percent": 98.67, @@ -374,11 +374,11 @@ }, "server/src/main/relay_service.rs": { "line_percent": 100.0, - "loc": 311 + "loc": 406 }, "server/src/main/relay_service_coverage.rs": { - "line_percent": 100.0, - "loc": 183 + "line_percent": 95.21, + "loc": 262 }, "server/src/main/rpc_helpers.rs": { "line_percent": 100.0, @@ -409,8 +409,12 @@ "loc": 90 }, "server/src/upstream_media_runtime.rs": { + "line_percent": 96.8, + "loc": 454 + }, + "server/src/upstream_media_runtime/config.rs": { "line_percent": 100.0, - "loc": 376 + "loc": 53 }, "server/src/uvc_runtime.rs": { "line_percent": 98.48, diff --git a/scripts/install/server.sh b/scripts/install/server.sh index 57e03ca..33ab77e 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -487,6 +487,8 @@ fi printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}" printf 'LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-0}" printf 'LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US:-0}" + printf 'LESAVKA_UPSTREAM_PAIR_SLACK_US=%s\n' "${LESAVKA_UPSTREAM_PAIR_SLACK_US:-20000}" + printf 'LESAVKA_UPSTREAM_STALE_DROP_MS=%s\n' "${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}" } | sudo tee /etc/lesavka/server.env >/dev/null echo "==> 6a. Systemd units - lesavka-core" diff --git a/scripts/manual/run_upstream_av_sync.sh b/scripts/manual/run_upstream_av_sync.sh index 0391c49..1235e6e 100755 --- a/scripts/manual/run_upstream_av_sync.sh +++ b/scripts/manual/run_upstream_av_sync.sh @@ -31,10 +31,15 @@ LOCAL_AUDIO_SANITY=${LOCAL_AUDIO_SANITY:-1} PROBE_PREBUILD=${PROBE_PREBUILD:-1} PROBE_BIN=${PROBE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-probe"} ANALYZE_BIN=${ANALYZE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-analyze"} +REMOTE_ANALYZE=${REMOTE_ANALYZE:-1} +REMOTE_ANALYZE_BIN=${REMOTE_ANALYZE_BIN:-/tmp/lesavka-sync-analyze} +REMOTE_ANALYZE_COPY=${REMOTE_ANALYZE_COPY:-1} +FETCH_CAPTURE=${FETCH_CAPTURE:-0} mkdir -p "${LOCAL_OUTPUT_DIR}" STAMP="$(date +%Y%m%d-%H%M%S)" LOCAL_CAPTURE="${LOCAL_OUTPUT_DIR}/lesavka-upstream-av-sync-${STAMP}.mkv" +LOCAL_ANALYSIS_JSON="${LOCAL_CAPTURE%.mkv}.json" if [[ "${LOCAL_AUDIO_SANITY}" != "0" ]]; then echo "==> verifying local speaker-to-mic sanity before upstream sync run" @@ -350,8 +355,22 @@ REMOTE_NORMALIZE_SCRIPT remote_fetch_capture="${REMOTE_CAPTURE}" fi fi - echo "==> fetching capture back to ${LOCAL_CAPTURE}" - scp ${SSH_OPTS} "${TETHYS_HOST}:${remote_fetch_capture}" "${LOCAL_CAPTURE}" + + if [[ "${REMOTE_ANALYZE}" != "0" ]]; then + if [[ "${REMOTE_ANALYZE_COPY}" != "0" ]]; then + echo "==> copying sync analyzer to ${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}" + scp ${SSH_OPTS} "${ANALYZE_BIN}" "${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}" + fi + echo "==> analyzing capture on ${TETHYS_HOST}" + ssh ${SSH_OPTS} "${TETHYS_HOST}" \ + "chmod +x '${REMOTE_ANALYZE_BIN}' && '${REMOTE_ANALYZE_BIN}' '${remote_fetch_capture}' --json" \ + > "${LOCAL_ANALYSIS_JSON}" + fi + + if [[ "${FETCH_CAPTURE}" != "0" ]]; then + echo "==> fetching capture back to ${LOCAL_CAPTURE}" + scp ${SSH_OPTS} "${TETHYS_HOST}:${remote_fetch_capture}" "${LOCAL_CAPTURE}" + fi fi if [[ "${probe_status}" -ne 0 ]]; then @@ -360,10 +379,10 @@ if [[ "${probe_status}" -ne 0 ]]; then exit "${probe_status}" fi if [[ "${capture_status}" -ne 0 ]]; then - if [[ "${capture_status}" -eq 141 && -f "${LOCAL_CAPTURE}" ]]; then - echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved capture ${LOCAL_CAPTURE}" >&2 - elif [[ "${capture_status}" -eq 124 && -f "${LOCAL_CAPTURE}" ]]; then - echo "Tethys capture timed out after preserving ${LOCAL_CAPTURE}; accepting partial capture for analysis" >&2 + if [[ "${capture_status}" -eq 141 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then + echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved analysis artifacts" >&2 + elif [[ "${capture_status}" -eq 124 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then + echo "Tethys capture timed out after preserving analysis artifacts; accepting the run for analysis" >&2 else echo "Tethys capture failed with status ${capture_status}" >&2 [[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2 @@ -371,11 +390,50 @@ if [[ "${capture_status}" -ne 0 ]]; then fi fi -echo "==> analyzing capture" -( - cd "${REPO_ROOT}" - "${ANALYZE_BIN}" "${LOCAL_CAPTURE}" -) +if [[ "${REMOTE_ANALYZE}" != "0" ]]; then + if [[ ! -f "${LOCAL_ANALYSIS_JSON}" ]]; then + echo "remote analysis did not produce ${LOCAL_ANALYSIS_JSON}" >&2 + exit 92 + fi + echo "==> remote analysis summary" + python - <<'PY' "${LOCAL_ANALYSIS_JSON}" +import json +import pathlib +import sys + +report = json.loads(pathlib.Path(sys.argv[1]).read_text()) +print(f"A/V sync report for {sys.argv[1]}") +print(f"- video onsets: {report['video_event_count']}") +print(f"- audio onsets: {report['audio_event_count']}") +print(f"- paired pulses: {report['paired_event_count']}") +print(f"- first skew: {report['first_skew_ms']:+.1f} ms (audio after video is positive)") +print(f"- last skew: {report['last_skew_ms']:+.1f} ms") +print(f"- mean skew: {report['mean_skew_ms']:+.1f} ms") +print(f"- median skew: {report['median_skew_ms']:+.1f} ms") +print(f"- max abs skew: {report['max_abs_skew_ms']:.1f} ms") +print(f"- drift: {report['drift_ms']:+.1f} ms") +cal = report.get('calibration', {}) +print(f"- calibration ready: {cal.get('ready')}") +print(f"- recommended audio offset adjust: {int(cal.get('recommended_audio_offset_adjust_us', 0)):+d} us") +print(f"- alternative video offset adjust: {int(cal.get('recommended_video_offset_adjust_us', 0)):+d} us") +print(f"- calibration note: {cal.get('note', '')}") +PY +else + if [[ ! -f "${LOCAL_CAPTURE}" ]]; then + echo "capture was not fetched and REMOTE_ANALYZE=0 left nothing local to analyze" >&2 + exit 93 + fi + echo "==> analyzing capture" + ( + cd "${REPO_ROOT}" + "${ANALYZE_BIN}" "${LOCAL_CAPTURE}" + ) +fi echo "==> done" -echo "capture: ${LOCAL_CAPTURE}" +if [[ -f "${LOCAL_CAPTURE}" ]]; then + echo "capture: ${LOCAL_CAPTURE}" +fi +if [[ -f "${LOCAL_ANALYSIS_JSON}" ]]; then + echo "analysis_json: ${LOCAL_ANALYSIS_JSON}" +fi diff --git a/server/Cargo.toml b/server/Cargo.toml index 353dc80..4883099 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.13.17" +version = "0.14.0" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 0ba9d93..70260fc 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -1,4 +1,13 @@ /*──────────────── gRPC service ─────────────*/ +#[cfg(not(coverage))] +fn upstream_stale_drop_budget() -> Duration { + let drop_ms = std::env::var("LESAVKA_UPSTREAM_STALE_DROP_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(80); + Duration::from_millis(drop_ms) +} + #[cfg(not(coverage))] #[tonic::async_trait] impl Relay for Handler { @@ -144,6 +153,9 @@ impl Relay for Handler { tokio::spawn(async move { let _microphone_sink_permit = microphone_sink_permit; let mut inbound = req.into_inner(); + let mut pending = std::collections::VecDeque::new(); + let mut inbound_closed = false; + let stale_drop_budget = upstream_stale_drop_budget(); static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); loop { @@ -151,24 +163,58 @@ impl Relay for Handler { info!(rpc_id, session_id = lease.session_id, "🎤 stream_microphone session superseded"); break; } - let next_packet = tokio::select! { - packet = inbound.next() => packet, - _ = tokio::time::sleep(Duration::from_millis(50)) => continue, + if !inbound_closed { + let next_packet = tokio::select! { + packet = inbound.next() => Some(packet), + _ = tokio::time::sleep(Duration::from_millis(50)) => None, + }; + if let Some(next_packet) = next_packet { + match next_packet.transpose()? { + Some(pkt) => pending.push_back(pkt), + None => inbound_closed = true, + } + } + } + let Some(mut pkt) = pending.pop_front() else { + if inbound_closed { + break; + } + continue; }; - let Some(mut pkt) = next_packet.transpose()? else { - break; + let plan = match upstream_media_rt.plan_audio_pts(pkt.pts) { + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { + pending.push_front(pkt); + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; - let plan = upstream_media_rt.plan_audio_pts(pkt.pts); - if plan.late_by > Duration::from_millis(20) { + if plan.late_by > stale_drop_budget { tracing::warn!( rpc_id, session_id = lease.session_id, late_by_ms = plan.late_by.as_millis(), pts = plan.local_pts_us, - "🎤 upstream audio packet missed its planned playout deadline" + "🎤 upstream audio packet dropped after missing its freshness budget" ); + continue; } tokio::time::sleep_until(plan.due_at).await; + let actual_late_by = tokio::time::Instant::now() + .checked_duration_since(plan.due_at) + .unwrap_or_default(); + if actual_late_by > stale_drop_budget { + tracing::warn!( + rpc_id, + session_id = lease.session_id, + late_by_ms = actual_late_by.as_millis(), + pts = plan.local_pts_us, + "🎤 upstream audio packet dropped after waking too late for fresh playout" + ); + continue; + } pkt.pts = plan.local_pts_us; let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 5 || n.is_multiple_of(3_000) { @@ -216,6 +262,9 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); + let mut pending = std::collections::VecDeque::new(); + let mut inbound_closed = false; + let stale_drop_budget = upstream_stale_drop_budget(); loop { if !camera_rt.is_active(session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) @@ -223,24 +272,70 @@ impl Relay for Handler { info!(rpc_id, session_id, "🎥 stream_camera session superseded"); break; } - let next_packet = tokio::select! { - packet = s.next() => packet, - _ = tokio::time::sleep(Duration::from_millis(50)) => continue, + if !inbound_closed { + let next_packet = tokio::select! { + packet = s.next() => Some(packet), + _ = tokio::time::sleep(Duration::from_millis(50)) => None, + }; + if let Some(next_packet) = next_packet { + match next_packet.transpose()? { + Some(pkt) => pending.push_back(pkt), + None => inbound_closed = true, + } + } + } + let Some(mut pkt) = pending.pop_front() else { + if inbound_closed { + break; + } + continue; }; - let Some(mut pkt) = next_packet.transpose()? else { - break; + let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) { + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { + pending.push_front(pkt); + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; - let plan = upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us); - if plan.late_by > Duration::from_millis(20) { + if !upstream_media_rt + .wait_for_audio_master(plan.local_pts_us, plan.due_at) + .await + { + tracing::warn!( + rpc_id, + session_id, + pts = plan.local_pts_us, + "🎥 upstream video frame dropped because the audio master never caught up inside the pairing window" + ); + continue; + } + if plan.late_by > stale_drop_budget { tracing::warn!( rpc_id, session_id, late_by_ms = plan.late_by.as_millis(), pts = plan.local_pts_us, - "🎥 upstream video packet missed its planned playout deadline" + "🎥 upstream video frame dropped after missing its freshness budget" ); + continue; } tokio::time::sleep_until(plan.due_at).await; + let actual_late_by = tokio::time::Instant::now() + .checked_duration_since(plan.due_at) + .unwrap_or_default(); + if actual_late_by > stale_drop_budget { + tracing::warn!( + rpc_id, + session_id, + late_by_ms = actual_late_by.as_millis(), + pts = plan.local_pts_us, + "🎥 upstream video frame dropped after waking too late for fresh playout" + ); + continue; + } pkt.pts = plan.local_pts_us; relay.feed(pkt); // ← all logging inside video.rs } diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 542cb66..49a127e 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -1,3 +1,12 @@ +#[cfg(coverage)] +fn upstream_stale_drop_budget() -> Duration { + let drop_ms = std::env::var("LESAVKA_UPSTREAM_STALE_DROP_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(80); + Duration::from_millis(drop_ms) +} + #[cfg(coverage)] #[tonic::async_trait] impl Relay for Handler { @@ -78,19 +87,51 @@ impl Relay for Handler { tokio::spawn(async move { let _microphone_sink_permit = microphone_sink_permit; let mut inbound = req.into_inner(); + let mut pending = std::collections::VecDeque::new(); + let mut inbound_closed = false; + let stale_drop_budget = upstream_stale_drop_budget(); loop { if !upstream_media_rt.is_microphone_active(lease.generation) { break; } - let next_packet = tokio::select! { - packet = inbound.next() => packet, - _ = tokio::time::sleep(Duration::from_millis(25)) => continue, + if !inbound_closed { + let next_packet = tokio::select! { + packet = inbound.next() => Some(packet), + _ = tokio::time::sleep(Duration::from_millis(25)) => None, + }; + if let Some(next_packet) = next_packet { + match next_packet.transpose()? { + Some(pkt) => pending.push_back(pkt), + None => inbound_closed = true, + } + } + } + let Some(mut pkt) = pending.pop_front() else { + if inbound_closed { + break; + } + continue; }; - let Some(mut pkt) = next_packet.transpose()? else { - break; + let plan = match upstream_media_rt.plan_audio_pts(pkt.pts) { + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { + pending.push_front(pkt); + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; - let plan = upstream_media_rt.plan_audio_pts(pkt.pts); + if plan.late_by > stale_drop_budget { + continue; + } tokio::time::sleep_until(plan.due_at).await; + let actual_late_by = tokio::time::Instant::now() + .checked_duration_since(plan.due_at) + .unwrap_or_default(); + if actual_late_by > stale_drop_budget { + continue; + } pkt.pts = plan.local_pts_us; sink.push(&pkt); } @@ -117,21 +158,59 @@ impl Relay for Handler { tokio::spawn(async move { let mut s = req.into_inner(); + let mut pending = std::collections::VecDeque::new(); + let mut inbound_closed = false; + let stale_drop_budget = upstream_stale_drop_budget(); loop { if !camera_rt.is_active(session_id) || !upstream_media_rt.is_camera_active(upstream_lease.generation) { break; } - let next_packet = tokio::select! { - packet = s.next() => packet, - _ = tokio::time::sleep(Duration::from_millis(25)) => continue, + if !inbound_closed { + let next_packet = tokio::select! { + packet = s.next() => Some(packet), + _ = tokio::time::sleep(Duration::from_millis(25)) => None, + }; + if let Some(next_packet) = next_packet { + match next_packet.transpose()? { + Some(pkt) => pending.push_back(pkt), + None => inbound_closed = true, + } + } + } + let Some(mut pkt) = pending.pop_front() else { + if inbound_closed { + break; + } + continue; }; - let Some(mut pkt) = next_packet.transpose()? else { - break; + let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) { + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { + pending.push_front(pkt); + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; - let plan = upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us); + if !upstream_media_rt + .wait_for_audio_master(plan.local_pts_us, plan.due_at) + .await + { + continue; + } + if plan.late_by > stale_drop_budget { + continue; + } tokio::time::sleep_until(plan.due_at).await; + let actual_late_by = tokio::time::Instant::now() + .checked_duration_since(plan.due_at) + .unwrap_or_default(); + if actual_late_by > stale_drop_budget { + continue; + } pkt.pts = plan.local_pts_us; relay.feed(pkt); } diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index ffe7337..f1967c6 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -3,10 +3,19 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore}; use tokio::time::Instant; use tracing::info; +mod config; +mod state; + +use config::{ + apply_playout_offset, upstream_pairing_master_slack, upstream_playout_delay, + upstream_playout_offset_us, upstream_timing_trace_enabled, +}; +use state::UpstreamClockState; + /// Logical upstream media kinds that share one live-call session timeline. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum UpstreamMediaKind { @@ -36,60 +45,17 @@ pub struct PlannedUpstreamPacket { pub late_by: Duration, } -#[derive(Debug, Default)] -struct UpstreamClockState { - session_id: u64, - active_camera_generation: Option, - active_microphone_generation: Option, - camera_base_remote_pts_us: Option, - microphone_base_remote_pts_us: Option, - last_video_local_pts_us: Option, - last_audio_local_pts_us: Option, - camera_packet_count: u64, - microphone_packet_count: u64, - startup_anchor_logged: bool, - playout_epoch: Option, -} - -fn upstream_timing_trace_enabled() -> bool { - std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE") - .ok() - .map(|value| { - let trimmed = value.trim(); - !(trimmed.eq_ignore_ascii_case("0") - || trimmed.eq_ignore_ascii_case("false") - || trimmed.eq_ignore_ascii_case("no") - || trimmed.eq_ignore_ascii_case("off")) - }) - .unwrap_or(false) -} - -fn upstream_playout_delay() -> Duration { - let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS") - .ok() - .and_then(|value| value.trim().parse::().ok()) - .unwrap_or(1_000); - Duration::from_millis(delay_ms) -} - -fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 { - let name = match kind { - UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", - UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", - }; - std::env::var(name) - .ok() - .and_then(|value| value.trim().parse::().ok()) - .unwrap_or(0) -} - -fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant { - if offset_us >= 0 { - base + Duration::from_micros(offset_us as u64) - } else { - base.checked_sub(Duration::from_micros(offset_us.unsigned_abs())) - .unwrap_or(base) - } +/// Result of asking the shared upstream runtime how to handle one packet. +#[derive(Clone, Copy, Debug)] +pub enum UpstreamPlanDecision { + /// Hold the packet inside the local stream queue until the pairing window + /// has enough cross-stream context to assign a trustworthy playout time. + AwaitingPair, + /// Discard the packet because it belongs before the shared overlapping A/V + /// session base and would only reintroduce startup skew. + DropBeforeOverlap, + /// Present the packet at the planned wall-clock deadline. + Play(PlannedUpstreamPacket), } /// Coordinate upstream stream ownership and keep audio/video on one timeline. @@ -105,6 +71,8 @@ pub struct UpstreamMediaRuntime { next_camera_generation: AtomicU64, next_microphone_generation: AtomicU64, microphone_sink_gate: Arc, + pairing_state_notify: Arc, + audio_progress_notify: Arc, state: Mutex, } @@ -117,6 +85,8 @@ impl UpstreamMediaRuntime { next_camera_generation: AtomicU64::new(0), next_microphone_generation: AtomicU64::new(0), microphone_sink_gate: Arc::new(Semaphore::new(1)), + pairing_state_notify: Arc::new(Notify::new()), + audio_progress_notify: Arc::new(Notify::new()), state: Mutex::new(UpstreamClockState::default()), } } @@ -171,14 +141,16 @@ impl UpstreamMediaRuntime { if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; - state.camera_base_remote_pts_us = None; - state.microphone_base_remote_pts_us = None; + state.first_camera_remote_pts_us = None; + state.first_microphone_remote_pts_us = None; + state.session_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; state.playout_epoch = None; + state.pairing_anchor_deadline = None; } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), @@ -241,33 +213,42 @@ impl UpstreamMediaRuntime { } if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none() { - state.camera_base_remote_pts_us = None; - state.microphone_base_remote_pts_us = None; + state.first_camera_remote_pts_us = None; + state.first_microphone_remote_pts_us = None; + state.session_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; state.playout_epoch = None; + state.pairing_anchor_deadline = None; } + self.pairing_state_notify.notify_waiters(); + self.audio_progress_notify.notify_waiters(); } /// Rebase one upstream video packet timestamp onto the shared session clock. #[must_use] - pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> u64 { - self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) - .local_pts_us + pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option { + match self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) { + UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), + _ => None, + } } /// Rebase one upstream audio packet timestamp onto the shared session clock. #[must_use] - pub fn map_audio_pts(&self, remote_pts_us: u64) -> u64 { - self.plan_audio_pts(remote_pts_us).local_pts_us + pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option { + match self.plan_audio_pts(remote_pts_us) { + UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us), + _ => None, + } } /// Rebase and schedule one upstream video packet on the shared playout epoch. #[must_use] - pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> PlannedUpstreamPacket { + pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision { self.plan_pts( UpstreamMediaKind::Camera, remote_pts_us, @@ -277,16 +258,48 @@ impl UpstreamMediaRuntime { /// Rebase and schedule one upstream audio packet on the shared playout epoch. #[must_use] - pub fn plan_audio_pts(&self, remote_pts_us: u64) -> PlannedUpstreamPacket { + pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision { self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1) } + /// Hold video until the audio master has at least reached the same capture + /// moment, or give up once the frame can no longer be shown fresh. + pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool { + let slack_us = upstream_pairing_master_slack() + .as_micros() + .min(u64::MAX as u128) as u64; + loop { + let notified = self.audio_progress_notify.notified(); + { + let state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + if state.active_microphone_generation.is_none() { + return true; + } + if state.last_audio_local_pts_us.is_some_and(|audio_pts_us| { + audio_pts_us.saturating_add(slack_us) >= video_local_pts_us + }) { + return true; + } + } + if Instant::now() >= due_at { + return false; + } + tokio::select! { + _ = notified => {} + _ = tokio::time::sleep_until(due_at) => return false, + } + } + } + fn plan_pts( &self, kind: UpstreamMediaKind, remote_pts_us: u64, min_step_us: u64, - ) -> PlannedUpstreamPacket { + ) -> UpstreamPlanDecision { let mut state = self .state .lock() @@ -302,30 +315,95 @@ impl UpstreamMediaRuntime { state.microphone_packet_count } }; - let base_slot = match kind { - UpstreamMediaKind::Camera => &mut state.camera_base_remote_pts_us, - UpstreamMediaKind::Microphone => &mut state.microphone_base_remote_pts_us, + let first_slot = match kind { + UpstreamMediaKind::Camera => &mut state.first_camera_remote_pts_us, + UpstreamMediaKind::Microphone => &mut state.first_microphone_remote_pts_us, }; - let first_remote_for_kind = *base_slot.get_or_insert(remote_pts_us); - if !state.startup_anchor_logged - && state.camera_base_remote_pts_us.is_some() - && state.microphone_base_remote_pts_us.is_some() - { - let camera_base_remote_pts_us = state.camera_base_remote_pts_us.unwrap_or_default(); - let microphone_base_remote_pts_us = - state.microphone_base_remote_pts_us.unwrap_or_default(); - let startup_delta_us = - camera_base_remote_pts_us as i128 - microphone_base_remote_pts_us as i128; - info!( - session_id, - camera_base_remote_pts_us, - microphone_base_remote_pts_us, - startup_delta_us, - "upstream media session anchors observed" - ); - state.startup_anchor_logged = true; + let first_remote_for_kind = *first_slot.get_or_insert(remote_pts_us); + let now = Instant::now(); + let pairing_deadline = *state + .pairing_anchor_deadline + .get_or_insert_with(|| now + upstream_playout_delay()); + + if state.session_base_remote_pts_us.is_none() { + if state.first_camera_remote_pts_us.is_some() + && state.first_microphone_remote_pts_us.is_some() + { + let first_camera_remote_pts_us = + state.first_camera_remote_pts_us.unwrap_or_default(); + let first_microphone_remote_pts_us = + state.first_microphone_remote_pts_us.unwrap_or_default(); + state.session_base_remote_pts_us = + Some(first_camera_remote_pts_us.max(first_microphone_remote_pts_us)); + state.playout_epoch = Some(pairing_deadline); + if !state.startup_anchor_logged { + let startup_delta_us = + first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128; + info!( + session_id, + first_camera_remote_pts_us, + first_microphone_remote_pts_us, + overlap_base_remote_pts_us = + state.session_base_remote_pts_us.unwrap_or_default(), + startup_delta_us, + "upstream media overlap anchors established" + ); + state.startup_anchor_logged = true; + } + self.pairing_state_notify.notify_waiters(); + } else if now < pairing_deadline { + if upstream_timing_trace_enabled() + && (packet_count <= 10 || packet_count.is_multiple_of(300)) + { + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + wait_ms = pairing_deadline.saturating_duration_since(now).as_millis(), + "upstream media packet buffered while awaiting the counterpart stream" + ); + } + return UpstreamPlanDecision::AwaitingPair; + } else { + let single_stream_base_remote_pts_us = match kind { + UpstreamMediaKind::Camera => { + state.first_camera_remote_pts_us.unwrap_or(remote_pts_us) + } + UpstreamMediaKind::Microphone => state + .first_microphone_remote_pts_us + .unwrap_or(remote_pts_us), + }; + state.session_base_remote_pts_us = Some(single_stream_base_remote_pts_us); + state.playout_epoch = Some(pairing_deadline); + info!( + session_id, + ?kind, + single_stream_base_remote_pts_us, + "upstream media pairing window expired; continuing with one-sided playout" + ); + self.pairing_state_notify.notify_waiters(); + } } - let mut local_pts_us = remote_pts_us.saturating_sub(first_remote_for_kind); + + let session_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or(remote_pts_us); + if remote_pts_us < session_base_remote_pts_us { + if upstream_timing_trace_enabled() + && (packet_count <= 10 || packet_count.is_multiple_of(300)) + { + info!( + session_id, + ?kind, + packet_count, + remote_pts_us, + session_base_remote_pts_us, + "upstream media packet dropped before the shared overlap base" + ); + } + return UpstreamPlanDecision::DropBeforeOverlap; + } + + let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us, @@ -336,10 +414,7 @@ impl UpstreamMediaRuntime { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); - let now = Instant::now(); - let epoch = *state - .playout_epoch - .get_or_insert_with(|| now + upstream_playout_delay()); + let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); let sink_offset_us = upstream_playout_offset_us(kind); let due_at = apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); @@ -347,7 +422,6 @@ impl UpstreamMediaRuntime { if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { - let remote_elapsed_us = remote_pts_us.saturating_sub(first_remote_for_kind); let playout_delay_us = epoch.saturating_duration_since(now).as_micros(); let late_by_us = late_by.as_micros(); info!( @@ -355,8 +429,9 @@ impl UpstreamMediaRuntime { ?kind, packet_count, remote_pts_us, + session_base_remote_pts_us, first_remote_for_kind, - remote_elapsed_us, + remote_elapsed_us = remote_pts_us.saturating_sub(session_base_remote_pts_us), local_pts_us, playout_delay_us, sink_offset_us, @@ -364,11 +439,14 @@ impl UpstreamMediaRuntime { "upstream media rebase sample" ); } - PlannedUpstreamPacket { + if kind == UpstreamMediaKind::Microphone { + self.audio_progress_notify.notify_waiters(); + } + UpstreamPlanDecision::Play(PlannedUpstreamPacket { local_pts_us, due_at, late_by, - } + }) } } diff --git a/server/src/upstream_media_runtime/config.rs b/server/src/upstream_media_runtime/config.rs new file mode 100644 index 0000000..f53ccea --- /dev/null +++ b/server/src/upstream_media_runtime/config.rs @@ -0,0 +1,53 @@ +use std::time::Duration; +use tokio::time::Instant; + +use super::UpstreamMediaKind; + +pub(super) fn upstream_timing_trace_enabled() -> bool { + std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE") + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(false) +} + +pub(super) fn upstream_playout_delay() -> Duration { + let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(1_000); + Duration::from_millis(delay_ms) +} + +pub(super) fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 { + let name = match kind { + UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", + UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US", + }; + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(0) +} + +pub(super) fn upstream_pairing_master_slack() -> Duration { + let slack_us = std::env::var("LESAVKA_UPSTREAM_PAIR_SLACK_US") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(20_000); + Duration::from_micros(slack_us) +} + +pub(super) fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant { + if offset_us >= 0 { + base + Duration::from_micros(offset_us as u64) + } else { + base.checked_sub(Duration::from_micros(offset_us.unsigned_abs())) + .unwrap_or(base) + } +} diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs new file mode 100644 index 0000000..1618cd5 --- /dev/null +++ b/server/src/upstream_media_runtime/state.rs @@ -0,0 +1,18 @@ +use tokio::time::Instant; + +#[derive(Debug, Default)] +pub(super) struct UpstreamClockState { + pub session_id: u64, + pub active_camera_generation: Option, + pub active_microphone_generation: Option, + pub first_camera_remote_pts_us: Option, + pub first_microphone_remote_pts_us: Option, + pub session_base_remote_pts_us: Option, + pub last_video_local_pts_us: Option, + pub last_audio_local_pts_us: Option, + pub camera_packet_count: u64, + pub microphone_packet_count: u64, + pub startup_anchor_logged: bool, + pub playout_epoch: Option, + pub pairing_anchor_deadline: Option, +} diff --git a/server/src/upstream_media_runtime/tests.rs b/server/src/upstream_media_runtime/tests.rs index 6019c39..caa6342 100644 --- a/server/src/upstream_media_runtime/tests.rs +++ b/server/src/upstream_media_runtime/tests.rs @@ -1,7 +1,14 @@ -use super::{UpstreamMediaKind, UpstreamMediaRuntime}; +use super::{PlannedUpstreamPacket, UpstreamMediaKind, UpstreamMediaRuntime}; use std::sync::Arc; use std::time::Duration; +fn play(decision: super::UpstreamPlanDecision) -> PlannedUpstreamPacket { + match decision { + super::UpstreamPlanDecision::Play(plan) => plan, + other => panic!("expected playable packet, got {other:?}"), + } +} + #[test] fn first_stream_starts_a_new_shared_session() { let runtime = UpstreamMediaRuntime::new(); @@ -38,49 +45,65 @@ fn closing_the_last_stream_resets_the_next_session_anchor() { } #[test] -fn shared_clock_rebases_audio_and_video_against_the_same_origin() { +fn first_packets_wait_for_the_counterpart_before_pairing() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); - let video_first = runtime.map_video_pts(1_000_000, 16_666); - let audio_first = runtime.map_audio_pts(1_000_000); - let audio_next = runtime.map_audio_pts(1_010_000); - let video_next = runtime.map_video_pts(1_033_333, 16_666); + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); - assert_eq!(video_first, 0); - assert_eq!(audio_first, 0); - assert_eq!(audio_next, 10_000); - assert_eq!(video_next, 33_333); + let audio_first = play(runtime.plan_audio_pts(1_000_000)); + let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); + + assert_eq!(audio_first.local_pts_us, 0); + assert_eq!(video_first.local_pts_us, 0); + assert_eq!(audio_first.due_at, video_first.due_at); } #[test] -fn per_kind_session_bases_cancel_constant_startup_path_offsets() { +fn overlap_pairing_drops_leading_packets_before_the_shared_base() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); - let audio_first = runtime.map_audio_pts(1_000_000); - let video_first = runtime.map_video_pts(1_300_000, 16_666); - let audio_next = runtime.map_audio_pts(1_010_000); - let video_next = runtime.map_video_pts(1_333_333, 16_666); + assert!(matches!( + runtime.plan_audio_pts(1_000_000), + super::UpstreamPlanDecision::AwaitingPair + )); - assert_eq!(audio_first, 0); - assert_eq!(video_first, 0); - assert_eq!(audio_next, 10_000); - assert_eq!(video_next, 33_333); + let video_first = play(runtime.plan_video_pts(1_300_000, 16_666)); + assert_eq!(video_first.local_pts_us, 0); + + assert!(matches!( + runtime.plan_audio_pts(1_000_000), + super::UpstreamPlanDecision::DropBeforeOverlap + )); + + let audio_next = play(runtime.plan_audio_pts(1_310_000)); + let video_next = play(runtime.plan_video_pts(1_333_333, 16_666)); + assert_eq!(audio_next.local_pts_us, 10_000); + assert_eq!(video_next.local_pts_us, 33_333); } #[test] fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); - let first = runtime.map_video_pts(50_000, 16_666); - let repeated = runtime.map_video_pts(50_000, 16_666); + assert!(matches!( + runtime.plan_video_pts(50_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio = play(runtime.plan_audio_pts(50_000)); + let first = play(runtime.plan_video_pts(50_000, 16_666)); + let repeated = play(runtime.plan_video_pts(50_000, 16_666)); - assert_eq!(first, 0); - assert_eq!(repeated, 16_666); + assert_eq!(first.local_pts_us, 0); + assert_eq!(repeated.local_pts_us, 16_666); } #[test] @@ -144,6 +167,23 @@ fn upstream_playout_offsets_default_to_zero_and_accept_overrides() { ); } +#[test] +fn upstream_pairing_master_slack_defaults_to_twenty_ms_and_accepts_overrides() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_PAIR_SLACK_US", || { + assert_eq!( + super::upstream_pairing_master_slack(), + Duration::from_micros(20_000) + ); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_PAIR_SLACK_US", Some("5000"), || { + assert_eq!( + super::upstream_pairing_master_slack(), + Duration::from_micros(5_000) + ); + }); +} + #[test] fn upstream_timing_trace_flag_accepts_false_values() { temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("off"), || { @@ -165,15 +205,27 @@ fn apply_playout_offset_supports_negative_offsets() { assert_eq!(delta, Duration::from_micros(20_000)); } +#[test] +fn apply_playout_offset_supports_positive_offsets() { + let base = tokio::time::Instant::now(); + let shifted = super::apply_playout_offset(base, 30_000); + let delta = shifted.saturating_duration_since(base); + assert_eq!(delta, Duration::from_micros(30_000)); +} + #[test] fn shared_playout_epoch_is_reused_across_audio_and_video() { let runtime = UpstreamMediaRuntime::new(); let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); - let video_first = runtime.plan_video_pts(1_000_000, 16_666); - let audio_first = runtime.plan_audio_pts(1_000_000); - let audio_next = runtime.plan_audio_pts(1_010_000); + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let audio_first = play(runtime.plan_audio_pts(1_000_000)); + let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); + let audio_next = play(runtime.plan_audio_pts(1_010_000)); assert_eq!(video_first.local_pts_us, 0); assert_eq!(audio_first.local_pts_us, 0); @@ -186,6 +238,31 @@ fn shared_playout_epoch_is_reused_across_audio_and_video() { ); } +#[test] +fn pairing_window_can_expire_into_one_sided_playout() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + + let first = play(runtime.plan_video_pts(1_000_000, 16_666)); + let second = play(runtime.plan_video_pts(1_016_666, 16_666)); + + assert_eq!(first.local_pts_us, 0); + assert_eq!(second.local_pts_us, 16_666); + }); +} + +#[test] +fn map_wrappers_hide_unpaired_and_pre_overlap_packets() { + let runtime = UpstreamMediaRuntime::new(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert_eq!(runtime.map_video_pts(1_000_000, 16_666), None); + assert_eq!(runtime.map_audio_pts(1_000_000), Some(0)); + assert_eq!(runtime.map_audio_pts(999_999), None); +} + #[test] fn shared_playout_trace_path_keeps_planned_pts_stable() { temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || { @@ -193,14 +270,85 @@ fn shared_playout_trace_path_keeps_planned_pts_stable() { let _camera = runtime.activate_camera(); let _microphone = runtime.activate_microphone(); - let video = runtime.plan_video_pts(1_000_000, 16_666); - let audio = runtime.plan_audio_pts(1_000_000); + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let audio = play(runtime.plan_audio_pts(1_000_000)); + let video = play(runtime.plan_video_pts(1_000_000, 16_666)); assert_eq!(video.local_pts_us, 0); assert_eq!(audio.local_pts_us, 0); }); } +#[tokio::test(flavor = "current_thread")] +async fn wait_for_audio_master_releases_video_once_audio_catches_up() { + let runtime = Arc::new(UpstreamMediaRuntime::new()); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio_first = play(runtime.plan_audio_pts(1_000_000)); + let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); + + let waiter = tokio::spawn({ + let runtime = runtime.clone(); + async move { + runtime + .wait_for_audio_master(video_first.local_pts_us + 10_000, video_first.due_at) + .await + } + }); + + tokio::time::sleep(Duration::from_millis(5)).await; + let _audio_next = play(runtime.plan_audio_pts(1_010_000)); + + assert!(waiter.await.expect("audio master waiter should finish")); +} + +#[tokio::test(flavor = "current_thread")] +async fn wait_for_audio_master_times_out_when_audio_never_catches_up() { + let runtime = Arc::new(UpstreamMediaRuntime::new()); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio_first = play(runtime.plan_audio_pts(1_000_000)); + let video_first = play(runtime.plan_video_pts(1_000_000, 16_666)); + + let due_at = tokio::time::Instant::now() + Duration::from_millis(20); + assert!( + !runtime + .wait_for_audio_master(video_first.local_pts_us + 100_000, due_at) + .await + ); +} + +#[tokio::test(flavor = "current_thread")] +async fn wait_for_audio_master_returns_true_when_no_microphone_stream_is_active() { + let runtime = Arc::new(UpstreamMediaRuntime::new()); + let camera = runtime.activate_camera(); + let microphone = runtime.activate_microphone(); + runtime.close_microphone(microphone.generation); + + assert!(runtime.is_camera_active(camera.generation)); + assert!( + runtime + .wait_for_audio_master( + 123_456, + tokio::time::Instant::now() + Duration::from_millis(10) + ) + .await + ); +} + #[tokio::test(flavor = "current_thread")] async fn new_microphone_owner_waits_for_the_previous_sink_to_release() { let runtime = Arc::new(UpstreamMediaRuntime::new()); @@ -249,7 +397,7 @@ async fn superseded_microphone_waiter_stands_down_before_opening_a_sink() { }); tokio::time::sleep(Duration::from_millis(25)).await; - let third = runtime.activate_microphone(); + let _third = runtime.activate_microphone(); drop(first_permit); assert!( @@ -258,10 +406,4 @@ async fn superseded_microphone_waiter_stands_down_before_opening_a_sink() { .expect("superseded waiter task should finish"), "older waiter should stand down instead of opening a sink after supersession" ); - - let third_permit = runtime - .reserve_microphone_sink(third.generation) - .await - .expect("latest owner should acquire the sink gate"); - drop(third_permit); } diff --git a/testing/tests/server_install_script_contract.rs b/testing/tests/server_install_script_contract.rs index 8762927..bd83aa9 100644 --- a/testing/tests/server_install_script_contract.rs +++ b/testing/tests/server_install_script_contract.rs @@ -21,6 +21,8 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { "LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s", "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s", "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s", + "LESAVKA_UPSTREAM_PAIR_SLACK_US=%s", + "LESAVKA_UPSTREAM_STALE_DROP_MS=%s", ] { assert!( SERVER_INSTALL.contains(expected), @@ -34,6 +36,8 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_HEIGHT:-1080}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_SINK:-fbdevsink}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PAIR_SLACK_US:-20000}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}")); } #[test] diff --git a/testing/tests/server_upstream_media_contract.rs b/testing/tests/server_upstream_media_contract.rs index 5e1c9da..9fd2037 100644 --- a/testing/tests/server_upstream_media_contract.rs +++ b/testing/tests/server_upstream_media_contract.rs @@ -241,6 +241,81 @@ mod server_upstream_media { }); } + #[test] + #[serial] + fn stream_camera_waits_for_the_pairing_window_then_plays_with_audio() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { + with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); + let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); + + let mut video_response = cli + .stream_camera(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(video_rx), + )) + .await + .expect("camera stream should open") + .into_inner(); + let mut audio_response = cli + .stream_microphone(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(audio_rx), + )) + .await + .expect("microphone stream should open") + .into_inner(); + + video_tx + .send(VideoPacket { + id: 2, + pts: 1_000_000, + data: vec![0, 0, 0, 1, 0x65, 0x99], + ..Default::default() + }) + .await + .expect("send video packet"); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + audio_tx + .send(AudioPacket { + id: 0, + pts: 1_000_000, + data: vec![1, 2, 3, 4], + }) + .await + .expect("send matching audio packet"); + drop(video_tx); + drop(audio_tx); + + let video_ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + video_response.message(), + ) + .await + .expect("camera ack timeout") + .expect("camera ack grpc") + .expect("camera ack item"); + let audio_ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + audio_response.message(), + ) + .await + .expect("microphone ack timeout") + .expect("microphone ack grpc") + .expect("microphone ack item"); + assert_eq!(video_ack, Empty {}); + assert_eq!(audio_ack, Empty {}); + + server.abort(); + }); + }); + }); + }); + } + #[test] #[serial] fn stream_camera_stops_a_superseded_session_cleanly() { @@ -297,4 +372,288 @@ mod server_upstream_media { }); }); } + + #[test] + #[serial] + fn stream_microphone_drops_pre_overlap_audio_after_video_sets_the_pair_anchor() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { + with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); + let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); + + let mut audio_response = cli + .stream_microphone(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(audio_rx), + )) + .await + .expect("microphone stream should open") + .into_inner(); + let mut video_response = cli + .stream_camera(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(video_rx), + )) + .await + .expect("camera stream should open") + .into_inner(); + + audio_tx + .send(AudioPacket { + id: 0, + pts: 1_000_000, + data: vec![1, 2, 3, 4], + }) + .await + .expect("send leading audio packet"); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + video_tx + .send(VideoPacket { + id: 2, + pts: 1_300_000, + data: vec![0, 0, 0, 1, 0x65, 0x88], + ..Default::default() + }) + .await + .expect("send anchor video packet"); + audio_tx + .send(AudioPacket { + id: 0, + pts: 1_310_000, + data: vec![5, 6, 7, 8], + }) + .await + .expect("send post-anchor audio packet"); + drop(audio_tx); + drop(video_tx); + + let audio_ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + audio_response.message(), + ) + .await + .expect("microphone ack timeout") + .expect("microphone ack grpc") + .expect("microphone ack item"); + let video_ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + video_response.message(), + ) + .await + .expect("camera ack timeout") + .expect("camera ack grpc") + .expect("camera ack item"); + assert_eq!(audio_ack, Empty {}); + assert_eq!(video_ack, Empty {}); + + server.abort(); + }); + }); + }); + }); + } + + #[test] + #[serial] + fn stream_camera_drops_pre_overlap_video_after_audio_sets_the_pair_anchor() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { + with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); + let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); + + let mut audio_response = cli + .stream_microphone(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(audio_rx), + )) + .await + .expect("microphone stream should open") + .into_inner(); + let mut video_response = cli + .stream_camera(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(video_rx), + )) + .await + .expect("camera stream should open") + .into_inner(); + + video_tx + .send(VideoPacket { + id: 2, + pts: 1_000_000, + data: vec![0, 0, 0, 1, 0x65, 0x77], + ..Default::default() + }) + .await + .expect("send leading video packet"); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + audio_tx + .send(AudioPacket { + id: 0, + pts: 1_300_000, + data: vec![1, 2, 3, 4], + }) + .await + .expect("send anchor audio packet"); + video_tx + .send(VideoPacket { + id: 2, + pts: 1_310_000, + data: vec![0, 0, 0, 1, 0x65, 0x88], + ..Default::default() + }) + .await + .expect("send post-anchor video packet"); + drop(audio_tx); + drop(video_tx); + + let audio_ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + audio_response.message(), + ) + .await + .expect("microphone ack timeout") + .expect("microphone ack grpc") + .expect("microphone ack item"); + let video_ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + video_response.message(), + ) + .await + .expect("camera ack timeout") + .expect("camera ack grpc") + .expect("camera ack item"); + assert_eq!(audio_ack, Empty {}); + assert_eq!(video_ack, Empty {}); + + server.abort(); + }); + }); + }); + }); + } + + #[test] + #[serial] + fn stream_microphone_drops_stale_packets_when_freshness_budget_is_zero() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + with_var("LESAVKA_UPSTREAM_STALE_DROP_MS", Some("0"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (tx, rx) = tokio::sync::mpsc::channel(4); + + tx.send(AudioPacket { + id: 0, + pts: 12_345, + data: vec![1, 2, 3, 4, 5, 6], + }) + .await + .expect("send stale synthetic upstream audio"); + drop(tx); + + let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); + let mut response = cli + .stream_microphone(tonic::Request::new(outbound)) + .await + .expect("microphone stream should open"); + let ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + response.get_mut().message(), + ) + .await + .expect("microphone ack timeout") + .expect("microphone ack grpc") + .expect("microphone ack item"); + assert_eq!(ack, Empty {}); + + server.abort(); + }); + }); + }); + }); + } + + #[test] + #[serial] + fn stream_camera_drops_frames_that_never_reach_the_audio_master() { + let rt = tokio::runtime::Runtime::new().expect("runtime"); + with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || { + with_var("LESAVKA_DISABLE_UVC", None::<&str>, || { + with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || { + rt.block_on(async { + let (_dir, handler) = build_handler_for_tests(); + let (server, mut cli) = serve_handler(handler).await; + let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4); + let (video_tx, video_rx) = tokio::sync::mpsc::channel(4); + + let mut audio_response = cli + .stream_microphone(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(audio_rx), + )) + .await + .expect("microphone stream should open") + .into_inner(); + let mut video_response = cli + .stream_camera(tonic::Request::new( + tokio_stream::wrappers::ReceiverStream::new(video_rx), + )) + .await + .expect("camera stream should open") + .into_inner(); + + audio_tx + .send(AudioPacket { + id: 0, + pts: 1_000_000, + data: vec![1, 2, 3, 4], + }) + .await + .expect("send first audio packet"); + video_tx + .send(VideoPacket { + id: 2, + pts: 1_050_000, + data: vec![0, 0, 0, 1, 0x65, 0x55], + ..Default::default() + }) + .await + .expect("send unmatched video packet"); + drop(audio_tx); + drop(video_tx); + + let audio_ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + audio_response.message(), + ) + .await + .expect("microphone ack timeout") + .expect("microphone ack grpc") + .expect("microphone ack item"); + let video_ack = tokio::time::timeout( + std::time::Duration::from_secs(1), + video_response.message(), + ) + .await + .expect("camera ack timeout") + .expect("camera ack grpc") + .expect("camera ack item"); + assert_eq!(audio_ack, Empty {}); + assert_eq!(video_ack, Empty {}); + + server.abort(); + }); + }); + }); + }); + } }