feat(sync): pair upstream av in shared playout window

This commit is contained in:
Brad Stein 2026-04-25 22:25:24 -03:00
parent 8dbd7497f0
commit 1153d6843d
17 changed files with 1083 additions and 185 deletions

6
Cargo.lock generated
View File

@ -1642,7 +1642,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.13.17"
version = "0.14.0"
dependencies = [
"anyhow",
"async-stream",
@ -1676,7 +1676,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.13.17"
version = "0.14.0"
dependencies = [
"anyhow",
"base64",
@ -1688,7 +1688,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.13.17"
version = "0.14.0"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.13.17"
version = "0.14.0"
edition = "2024"
[dependencies]

View File

@ -138,7 +138,11 @@ mod tests {
let recommendation = report.calibration_recommendation();
assert!(!recommendation.ready);
assert_eq!(recommendation.recommended_audio_offset_adjust_us, 0);
assert!(recommendation.note.contains("need at least 8 paired pulses"));
assert!(
recommendation
.note
.contains("need at least 8 paired pulses")
);
}
#[test]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.13.17"
version = "0.14.0"
edition = "2024"
build = "build.rs"

View File

@ -214,7 +214,9 @@ Hardware-facing assumptions belong near the code that uses them; this file is th
| `LESAVKA_UAC_DEV` | server hardware/device override |
| `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default |
| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch |
| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream playout override; fixed shared A/V buffer before remote presentation, defaults to `1000` |
| `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may lead the planned audio-master capture moment before the frame is held or dropped, defaults to `20000` |
| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization window; the server uses this shared buffer to pair webcam frames with their matching gadget-mic audio before remote presentation, defaults to `1000` |
| `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` |
| `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging |
| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch |
| `LESAVKA_UPLINK_CAMERA_PREVIEW` | client media capture/playback override |

View File

@ -214,7 +214,7 @@
},
"client/src/sync_probe/analyze/report.rs": {
"line_percent": 100.0,
"loc": 213
"loc": 217
},
"client/src/sync_probe/analyze/test_support.rs": {
"line_percent": 98.67,
@ -374,11 +374,11 @@
},
"server/src/main/relay_service.rs": {
"line_percent": 100.0,
"loc": 311
"loc": 406
},
"server/src/main/relay_service_coverage.rs": {
"line_percent": 100.0,
"loc": 183
"line_percent": 95.21,
"loc": 262
},
"server/src/main/rpc_helpers.rs": {
"line_percent": 100.0,
@ -409,8 +409,12 @@
"loc": 90
},
"server/src/upstream_media_runtime.rs": {
"line_percent": 96.8,
"loc": 454
},
"server/src/upstream_media_runtime/config.rs": {
"line_percent": 100.0,
"loc": 376
"loc": 53
},
"server/src/uvc_runtime.rs": {
"line_percent": 98.48,

View File

@ -487,6 +487,8 @@ fi
printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}"
printf 'LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-0}"
printf 'LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US:-0}"
printf 'LESAVKA_UPSTREAM_PAIR_SLACK_US=%s\n' "${LESAVKA_UPSTREAM_PAIR_SLACK_US:-20000}"
printf 'LESAVKA_UPSTREAM_STALE_DROP_MS=%s\n' "${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}"
} | sudo tee /etc/lesavka/server.env >/dev/null
echo "==> 6a. Systemd units - lesavka-core"

View File

@ -31,10 +31,15 @@ LOCAL_AUDIO_SANITY=${LOCAL_AUDIO_SANITY:-1}
PROBE_PREBUILD=${PROBE_PREBUILD:-1}
PROBE_BIN=${PROBE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-probe"}
ANALYZE_BIN=${ANALYZE_BIN:-"${REPO_ROOT}/target/debug/lesavka-sync-analyze"}
REMOTE_ANALYZE=${REMOTE_ANALYZE:-1}
REMOTE_ANALYZE_BIN=${REMOTE_ANALYZE_BIN:-/tmp/lesavka-sync-analyze}
REMOTE_ANALYZE_COPY=${REMOTE_ANALYZE_COPY:-1}
FETCH_CAPTURE=${FETCH_CAPTURE:-0}
mkdir -p "${LOCAL_OUTPUT_DIR}"
STAMP="$(date +%Y%m%d-%H%M%S)"
LOCAL_CAPTURE="${LOCAL_OUTPUT_DIR}/lesavka-upstream-av-sync-${STAMP}.mkv"
LOCAL_ANALYSIS_JSON="${LOCAL_CAPTURE%.mkv}.json"
if [[ "${LOCAL_AUDIO_SANITY}" != "0" ]]; then
echo "==> verifying local speaker-to-mic sanity before upstream sync run"
@ -350,8 +355,22 @@ REMOTE_NORMALIZE_SCRIPT
remote_fetch_capture="${REMOTE_CAPTURE}"
fi
fi
echo "==> fetching capture back to ${LOCAL_CAPTURE}"
scp ${SSH_OPTS} "${TETHYS_HOST}:${remote_fetch_capture}" "${LOCAL_CAPTURE}"
if [[ "${REMOTE_ANALYZE}" != "0" ]]; then
if [[ "${REMOTE_ANALYZE_COPY}" != "0" ]]; then
echo "==> copying sync analyzer to ${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}"
scp ${SSH_OPTS} "${ANALYZE_BIN}" "${TETHYS_HOST}:${REMOTE_ANALYZE_BIN}"
fi
echo "==> analyzing capture on ${TETHYS_HOST}"
ssh ${SSH_OPTS} "${TETHYS_HOST}" \
"chmod +x '${REMOTE_ANALYZE_BIN}' && '${REMOTE_ANALYZE_BIN}' '${remote_fetch_capture}' --json" \
> "${LOCAL_ANALYSIS_JSON}"
fi
if [[ "${FETCH_CAPTURE}" != "0" ]]; then
echo "==> fetching capture back to ${LOCAL_CAPTURE}"
scp ${SSH_OPTS} "${TETHYS_HOST}:${remote_fetch_capture}" "${LOCAL_CAPTURE}"
fi
fi
if [[ "${probe_status}" -ne 0 ]]; then
@ -360,10 +379,10 @@ if [[ "${probe_status}" -ne 0 ]]; then
exit "${probe_status}"
fi
if [[ "${capture_status}" -ne 0 ]]; then
if [[ "${capture_status}" -eq 141 && -f "${LOCAL_CAPTURE}" ]]; then
echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved capture ${LOCAL_CAPTURE}" >&2
elif [[ "${capture_status}" -eq 124 && -f "${LOCAL_CAPTURE}" ]]; then
echo "Tethys capture timed out after preserving ${LOCAL_CAPTURE}; accepting partial capture for analysis" >&2
if [[ "${capture_status}" -eq 141 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then
echo "Tethys capture ended with PipeWire SIGPIPE after ffmpeg closed; accepting preserved analysis artifacts" >&2
elif [[ "${capture_status}" -eq 124 && ( -f "${LOCAL_CAPTURE}" || -f "${LOCAL_ANALYSIS_JSON}" ) ]]; then
echo "Tethys capture timed out after preserving analysis artifacts; accepting the run for analysis" >&2
else
echo "Tethys capture failed with status ${capture_status}" >&2
[[ -f "${LOCAL_CAPTURE}" ]] && echo "partial capture preserved at ${LOCAL_CAPTURE}" >&2
@ -371,11 +390,50 @@ if [[ "${capture_status}" -ne 0 ]]; then
fi
fi
echo "==> analyzing capture"
(
cd "${REPO_ROOT}"
"${ANALYZE_BIN}" "${LOCAL_CAPTURE}"
)
if [[ "${REMOTE_ANALYZE}" != "0" ]]; then
if [[ ! -f "${LOCAL_ANALYSIS_JSON}" ]]; then
echo "remote analysis did not produce ${LOCAL_ANALYSIS_JSON}" >&2
exit 92
fi
echo "==> remote analysis summary"
python - <<'PY' "${LOCAL_ANALYSIS_JSON}"
import json
import pathlib
import sys
report = json.loads(pathlib.Path(sys.argv[1]).read_text())
print(f"A/V sync report for {sys.argv[1]}")
print(f"- video onsets: {report['video_event_count']}")
print(f"- audio onsets: {report['audio_event_count']}")
print(f"- paired pulses: {report['paired_event_count']}")
print(f"- first skew: {report['first_skew_ms']:+.1f} ms (audio after video is positive)")
print(f"- last skew: {report['last_skew_ms']:+.1f} ms")
print(f"- mean skew: {report['mean_skew_ms']:+.1f} ms")
print(f"- median skew: {report['median_skew_ms']:+.1f} ms")
print(f"- max abs skew: {report['max_abs_skew_ms']:.1f} ms")
print(f"- drift: {report['drift_ms']:+.1f} ms")
cal = report.get('calibration', {})
print(f"- calibration ready: {cal.get('ready')}")
print(f"- recommended audio offset adjust: {int(cal.get('recommended_audio_offset_adjust_us', 0)):+d} us")
print(f"- alternative video offset adjust: {int(cal.get('recommended_video_offset_adjust_us', 0)):+d} us")
print(f"- calibration note: {cal.get('note', '')}")
PY
else
if [[ ! -f "${LOCAL_CAPTURE}" ]]; then
echo "capture was not fetched and REMOTE_ANALYZE=0 left nothing local to analyze" >&2
exit 93
fi
echo "==> analyzing capture"
(
cd "${REPO_ROOT}"
"${ANALYZE_BIN}" "${LOCAL_CAPTURE}"
)
fi
echo "==> done"
echo "capture: ${LOCAL_CAPTURE}"
if [[ -f "${LOCAL_CAPTURE}" ]]; then
echo "capture: ${LOCAL_CAPTURE}"
fi
if [[ -f "${LOCAL_ANALYSIS_JSON}" ]]; then
echo "analysis_json: ${LOCAL_ANALYSIS_JSON}"
fi

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.13.17"
version = "0.14.0"
edition = "2024"
autobins = false

View File

@ -1,4 +1,13 @@
/*──────────────── gRPC service ─────────────*/
#[cfg(not(coverage))]
fn upstream_stale_drop_budget() -> Duration {
let drop_ms = std::env::var("LESAVKA_UPSTREAM_STALE_DROP_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(80);
Duration::from_millis(drop_ms)
}
#[cfg(not(coverage))]
#[tonic::async_trait]
impl Relay for Handler {
@ -144,6 +153,9 @@ impl Relay for Handler {
tokio::spawn(async move {
let _microphone_sink_permit = microphone_sink_permit;
let mut inbound = req.into_inner();
let mut pending = std::collections::VecDeque::new();
let mut inbound_closed = false;
let stale_drop_budget = upstream_stale_drop_budget();
static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
loop {
@ -151,24 +163,58 @@ impl Relay for Handler {
info!(rpc_id, session_id = lease.session_id, "🎤 stream_microphone session superseded");
break;
}
let next_packet = tokio::select! {
packet = inbound.next() => packet,
_ = tokio::time::sleep(Duration::from_millis(50)) => continue,
if !inbound_closed {
let next_packet = tokio::select! {
packet = inbound.next() => Some(packet),
_ = tokio::time::sleep(Duration::from_millis(50)) => None,
};
if let Some(next_packet) = next_packet {
match next_packet.transpose()? {
Some(pkt) => pending.push_back(pkt),
None => inbound_closed = true,
}
}
}
let Some(mut pkt) = pending.pop_front() else {
if inbound_closed {
break;
}
continue;
};
let Some(mut pkt) = next_packet.transpose()? else {
break;
let plan = match upstream_media_rt.plan_audio_pts(pkt.pts) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => {
pending.push_front(pkt);
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
};
let plan = upstream_media_rt.plan_audio_pts(pkt.pts);
if plan.late_by > Duration::from_millis(20) {
if plan.late_by > stale_drop_budget {
tracing::warn!(
rpc_id,
session_id = lease.session_id,
late_by_ms = plan.late_by.as_millis(),
pts = plan.local_pts_us,
"🎤 upstream audio packet missed its planned playout deadline"
"🎤 upstream audio packet dropped after missing its freshness budget"
);
continue;
}
tokio::time::sleep_until(plan.due_at).await;
let actual_late_by = tokio::time::Instant::now()
.checked_duration_since(plan.due_at)
.unwrap_or_default();
if actual_late_by > stale_drop_budget {
tracing::warn!(
rpc_id,
session_id = lease.session_id,
late_by_ms = actual_late_by.as_millis(),
pts = plan.local_pts_us,
"🎤 upstream audio packet dropped after waking too late for fresh playout"
);
continue;
}
pkt.pts = plan.local_pts_us;
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if n < 5 || n.is_multiple_of(3_000) {
@ -216,6 +262,9 @@ impl Relay for Handler {
tokio::spawn(async move {
let mut s = req.into_inner();
let mut pending = std::collections::VecDeque::new();
let mut inbound_closed = false;
let stale_drop_budget = upstream_stale_drop_budget();
loop {
if !camera_rt.is_active(session_id)
|| !upstream_media_rt.is_camera_active(upstream_lease.generation)
@ -223,24 +272,70 @@ impl Relay for Handler {
info!(rpc_id, session_id, "🎥 stream_camera session superseded");
break;
}
let next_packet = tokio::select! {
packet = s.next() => packet,
_ = tokio::time::sleep(Duration::from_millis(50)) => continue,
if !inbound_closed {
let next_packet = tokio::select! {
packet = s.next() => Some(packet),
_ = tokio::time::sleep(Duration::from_millis(50)) => None,
};
if let Some(next_packet) = next_packet {
match next_packet.transpose()? {
Some(pkt) => pending.push_back(pkt),
None => inbound_closed = true,
}
}
}
let Some(mut pkt) = pending.pop_front() else {
if inbound_closed {
break;
}
continue;
};
let Some(mut pkt) = next_packet.transpose()? else {
break;
let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => {
pending.push_front(pkt);
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
};
let plan = upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us);
if plan.late_by > Duration::from_millis(20) {
if !upstream_media_rt
.wait_for_audio_master(plan.local_pts_us, plan.due_at)
.await
{
tracing::warn!(
rpc_id,
session_id,
pts = plan.local_pts_us,
"🎥 upstream video frame dropped because the audio master never caught up inside the pairing window"
);
continue;
}
if plan.late_by > stale_drop_budget {
tracing::warn!(
rpc_id,
session_id,
late_by_ms = plan.late_by.as_millis(),
pts = plan.local_pts_us,
"🎥 upstream video packet missed its planned playout deadline"
"🎥 upstream video frame dropped after missing its freshness budget"
);
continue;
}
tokio::time::sleep_until(plan.due_at).await;
let actual_late_by = tokio::time::Instant::now()
.checked_duration_since(plan.due_at)
.unwrap_or_default();
if actual_late_by > stale_drop_budget {
tracing::warn!(
rpc_id,
session_id,
late_by_ms = actual_late_by.as_millis(),
pts = plan.local_pts_us,
"🎥 upstream video frame dropped after waking too late for fresh playout"
);
continue;
}
pkt.pts = plan.local_pts_us;
relay.feed(pkt); // ← all logging inside video.rs
}

View File

@ -1,3 +1,12 @@
#[cfg(coverage)]
fn upstream_stale_drop_budget() -> Duration {
let drop_ms = std::env::var("LESAVKA_UPSTREAM_STALE_DROP_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(80);
Duration::from_millis(drop_ms)
}
#[cfg(coverage)]
#[tonic::async_trait]
impl Relay for Handler {
@ -78,19 +87,51 @@ impl Relay for Handler {
tokio::spawn(async move {
let _microphone_sink_permit = microphone_sink_permit;
let mut inbound = req.into_inner();
let mut pending = std::collections::VecDeque::new();
let mut inbound_closed = false;
let stale_drop_budget = upstream_stale_drop_budget();
loop {
if !upstream_media_rt.is_microphone_active(lease.generation) {
break;
}
let next_packet = tokio::select! {
packet = inbound.next() => packet,
_ = tokio::time::sleep(Duration::from_millis(25)) => continue,
if !inbound_closed {
let next_packet = tokio::select! {
packet = inbound.next() => Some(packet),
_ = tokio::time::sleep(Duration::from_millis(25)) => None,
};
if let Some(next_packet) = next_packet {
match next_packet.transpose()? {
Some(pkt) => pending.push_back(pkt),
None => inbound_closed = true,
}
}
}
let Some(mut pkt) = pending.pop_front() else {
if inbound_closed {
break;
}
continue;
};
let Some(mut pkt) = next_packet.transpose()? else {
break;
let plan = match upstream_media_rt.plan_audio_pts(pkt.pts) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => {
pending.push_front(pkt);
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
};
let plan = upstream_media_rt.plan_audio_pts(pkt.pts);
if plan.late_by > stale_drop_budget {
continue;
}
tokio::time::sleep_until(plan.due_at).await;
let actual_late_by = tokio::time::Instant::now()
.checked_duration_since(plan.due_at)
.unwrap_or_default();
if actual_late_by > stale_drop_budget {
continue;
}
pkt.pts = plan.local_pts_us;
sink.push(&pkt);
}
@ -117,21 +158,59 @@ impl Relay for Handler {
tokio::spawn(async move {
let mut s = req.into_inner();
let mut pending = std::collections::VecDeque::new();
let mut inbound_closed = false;
let stale_drop_budget = upstream_stale_drop_budget();
loop {
if !camera_rt.is_active(session_id)
|| !upstream_media_rt.is_camera_active(upstream_lease.generation)
{
break;
}
let next_packet = tokio::select! {
packet = s.next() => packet,
_ = tokio::time::sleep(Duration::from_millis(25)) => continue,
if !inbound_closed {
let next_packet = tokio::select! {
packet = s.next() => Some(packet),
_ = tokio::time::sleep(Duration::from_millis(25)) => None,
};
if let Some(next_packet) = next_packet {
match next_packet.transpose()? {
Some(pkt) => pending.push_back(pkt),
None => inbound_closed = true,
}
}
}
let Some(mut pkt) = pending.pop_front() else {
if inbound_closed {
break;
}
continue;
};
let Some(mut pkt) = next_packet.transpose()? else {
break;
let plan = match upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => {
pending.push_front(pkt);
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
};
let plan = upstream_media_rt.plan_video_pts(pkt.pts, frame_step_us);
if !upstream_media_rt
.wait_for_audio_master(plan.local_pts_us, plan.due_at)
.await
{
continue;
}
if plan.late_by > stale_drop_budget {
continue;
}
tokio::time::sleep_until(plan.due_at).await;
let actual_late_by = tokio::time::Instant::now()
.checked_duration_since(plan.due_at)
.unwrap_or_default();
if actual_late_by > stale_drop_budget {
continue;
}
pkt.pts = plan.local_pts_us;
relay.feed(pkt);
}

View File

@ -3,10 +3,19 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::{Notify, OwnedSemaphorePermit, Semaphore};
use tokio::time::Instant;
use tracing::info;
mod config;
mod state;
use config::{
apply_playout_offset, upstream_pairing_master_slack, upstream_playout_delay,
upstream_playout_offset_us, upstream_timing_trace_enabled,
};
use state::UpstreamClockState;
/// Logical upstream media kinds that share one live-call session timeline.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum UpstreamMediaKind {
@ -36,60 +45,17 @@ pub struct PlannedUpstreamPacket {
pub late_by: Duration,
}
#[derive(Debug, Default)]
struct UpstreamClockState {
session_id: u64,
active_camera_generation: Option<u64>,
active_microphone_generation: Option<u64>,
camera_base_remote_pts_us: Option<u64>,
microphone_base_remote_pts_us: Option<u64>,
last_video_local_pts_us: Option<u64>,
last_audio_local_pts_us: Option<u64>,
camera_packet_count: u64,
microphone_packet_count: u64,
startup_anchor_logged: bool,
playout_epoch: Option<Instant>,
}
fn upstream_timing_trace_enabled() -> bool {
std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(false)
}
fn upstream_playout_delay() -> Duration {
let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(1_000);
Duration::from_millis(delay_ms)
}
fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 {
let name = match kind {
UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US",
UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US",
};
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<i64>().ok())
.unwrap_or(0)
}
fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant {
if offset_us >= 0 {
base + Duration::from_micros(offset_us as u64)
} else {
base.checked_sub(Duration::from_micros(offset_us.unsigned_abs()))
.unwrap_or(base)
}
/// Result of asking the shared upstream runtime how to handle one packet.
#[derive(Clone, Copy, Debug)]
pub enum UpstreamPlanDecision {
/// Hold the packet inside the local stream queue until the pairing window
/// has enough cross-stream context to assign a trustworthy playout time.
AwaitingPair,
/// Discard the packet because it belongs before the shared overlapping A/V
/// session base and would only reintroduce startup skew.
DropBeforeOverlap,
/// Present the packet at the planned wall-clock deadline.
Play(PlannedUpstreamPacket),
}
/// Coordinate upstream stream ownership and keep audio/video on one timeline.
@ -105,6 +71,8 @@ pub struct UpstreamMediaRuntime {
next_camera_generation: AtomicU64,
next_microphone_generation: AtomicU64,
microphone_sink_gate: Arc<Semaphore>,
pairing_state_notify: Arc<Notify>,
audio_progress_notify: Arc<Notify>,
state: Mutex<UpstreamClockState>,
}
@ -117,6 +85,8 @@ impl UpstreamMediaRuntime {
next_camera_generation: AtomicU64::new(0),
next_microphone_generation: AtomicU64::new(0),
microphone_sink_gate: Arc::new(Semaphore::new(1)),
pairing_state_notify: Arc::new(Notify::new()),
audio_progress_notify: Arc::new(Notify::new()),
state: Mutex::new(UpstreamClockState::default()),
}
}
@ -171,14 +141,16 @@ impl UpstreamMediaRuntime {
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
{
state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1;
state.camera_base_remote_pts_us = None;
state.microphone_base_remote_pts_us = None;
state.first_camera_remote_pts_us = None;
state.first_microphone_remote_pts_us = None;
state.session_base_remote_pts_us = None;
state.last_video_local_pts_us = None;
state.last_audio_local_pts_us = None;
state.camera_packet_count = 0;
state.microphone_packet_count = 0;
state.startup_anchor_logged = false;
state.playout_epoch = None;
state.pairing_anchor_deadline = None;
}
match kind {
UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation),
@ -241,33 +213,42 @@ impl UpstreamMediaRuntime {
}
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
{
state.camera_base_remote_pts_us = None;
state.microphone_base_remote_pts_us = None;
state.first_camera_remote_pts_us = None;
state.first_microphone_remote_pts_us = None;
state.session_base_remote_pts_us = None;
state.last_video_local_pts_us = None;
state.last_audio_local_pts_us = None;
state.camera_packet_count = 0;
state.microphone_packet_count = 0;
state.startup_anchor_logged = false;
state.playout_epoch = None;
state.pairing_anchor_deadline = None;
}
self.pairing_state_notify.notify_waiters();
self.audio_progress_notify.notify_waiters();
}
/// Rebase one upstream video packet timestamp onto the shared session clock.
#[must_use]
pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> u64 {
self.plan_video_pts(remote_pts_us, frame_step_us.max(1))
.local_pts_us
pub fn map_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> Option<u64> {
match self.plan_video_pts(remote_pts_us, frame_step_us.max(1)) {
UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us),
_ => None,
}
}
/// Rebase one upstream audio packet timestamp onto the shared session clock.
#[must_use]
pub fn map_audio_pts(&self, remote_pts_us: u64) -> u64 {
self.plan_audio_pts(remote_pts_us).local_pts_us
pub fn map_audio_pts(&self, remote_pts_us: u64) -> Option<u64> {
match self.plan_audio_pts(remote_pts_us) {
UpstreamPlanDecision::Play(plan) => Some(plan.local_pts_us),
_ => None,
}
}
/// Rebase and schedule one upstream video packet on the shared playout epoch.
#[must_use]
pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> PlannedUpstreamPacket {
pub fn plan_video_pts(&self, remote_pts_us: u64, frame_step_us: u64) -> UpstreamPlanDecision {
self.plan_pts(
UpstreamMediaKind::Camera,
remote_pts_us,
@ -277,16 +258,48 @@ impl UpstreamMediaRuntime {
/// Rebase and schedule one upstream audio packet on the shared playout epoch.
#[must_use]
pub fn plan_audio_pts(&self, remote_pts_us: u64) -> PlannedUpstreamPacket {
pub fn plan_audio_pts(&self, remote_pts_us: u64) -> UpstreamPlanDecision {
self.plan_pts(UpstreamMediaKind::Microphone, remote_pts_us, 1)
}
/// Hold video until the audio master has at least reached the same capture
/// moment, or give up once the frame can no longer be shown fresh.
pub async fn wait_for_audio_master(&self, video_local_pts_us: u64, due_at: Instant) -> bool {
let slack_us = upstream_pairing_master_slack()
.as_micros()
.min(u64::MAX as u128) as u64;
loop {
let notified = self.audio_progress_notify.notified();
{
let state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
if state.active_microphone_generation.is_none() {
return true;
}
if state.last_audio_local_pts_us.is_some_and(|audio_pts_us| {
audio_pts_us.saturating_add(slack_us) >= video_local_pts_us
}) {
return true;
}
}
if Instant::now() >= due_at {
return false;
}
tokio::select! {
_ = notified => {}
_ = tokio::time::sleep_until(due_at) => return false,
}
}
}
fn plan_pts(
&self,
kind: UpstreamMediaKind,
remote_pts_us: u64,
min_step_us: u64,
) -> PlannedUpstreamPacket {
) -> UpstreamPlanDecision {
let mut state = self
.state
.lock()
@ -302,30 +315,95 @@ impl UpstreamMediaRuntime {
state.microphone_packet_count
}
};
let base_slot = match kind {
UpstreamMediaKind::Camera => &mut state.camera_base_remote_pts_us,
UpstreamMediaKind::Microphone => &mut state.microphone_base_remote_pts_us,
let first_slot = match kind {
UpstreamMediaKind::Camera => &mut state.first_camera_remote_pts_us,
UpstreamMediaKind::Microphone => &mut state.first_microphone_remote_pts_us,
};
let first_remote_for_kind = *base_slot.get_or_insert(remote_pts_us);
if !state.startup_anchor_logged
&& state.camera_base_remote_pts_us.is_some()
&& state.microphone_base_remote_pts_us.is_some()
{
let camera_base_remote_pts_us = state.camera_base_remote_pts_us.unwrap_or_default();
let microphone_base_remote_pts_us =
state.microphone_base_remote_pts_us.unwrap_or_default();
let startup_delta_us =
camera_base_remote_pts_us as i128 - microphone_base_remote_pts_us as i128;
info!(
session_id,
camera_base_remote_pts_us,
microphone_base_remote_pts_us,
startup_delta_us,
"upstream media session anchors observed"
);
state.startup_anchor_logged = true;
let first_remote_for_kind = *first_slot.get_or_insert(remote_pts_us);
let now = Instant::now();
let pairing_deadline = *state
.pairing_anchor_deadline
.get_or_insert_with(|| now + upstream_playout_delay());
if state.session_base_remote_pts_us.is_none() {
if state.first_camera_remote_pts_us.is_some()
&& state.first_microphone_remote_pts_us.is_some()
{
let first_camera_remote_pts_us =
state.first_camera_remote_pts_us.unwrap_or_default();
let first_microphone_remote_pts_us =
state.first_microphone_remote_pts_us.unwrap_or_default();
state.session_base_remote_pts_us =
Some(first_camera_remote_pts_us.max(first_microphone_remote_pts_us));
state.playout_epoch = Some(pairing_deadline);
if !state.startup_anchor_logged {
let startup_delta_us =
first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128;
info!(
session_id,
first_camera_remote_pts_us,
first_microphone_remote_pts_us,
overlap_base_remote_pts_us =
state.session_base_remote_pts_us.unwrap_or_default(),
startup_delta_us,
"upstream media overlap anchors established"
);
state.startup_anchor_logged = true;
}
self.pairing_state_notify.notify_waiters();
} else if now < pairing_deadline {
if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
{
info!(
session_id,
?kind,
packet_count,
remote_pts_us,
wait_ms = pairing_deadline.saturating_duration_since(now).as_millis(),
"upstream media packet buffered while awaiting the counterpart stream"
);
}
return UpstreamPlanDecision::AwaitingPair;
} else {
let single_stream_base_remote_pts_us = match kind {
UpstreamMediaKind::Camera => {
state.first_camera_remote_pts_us.unwrap_or(remote_pts_us)
}
UpstreamMediaKind::Microphone => state
.first_microphone_remote_pts_us
.unwrap_or(remote_pts_us),
};
state.session_base_remote_pts_us = Some(single_stream_base_remote_pts_us);
state.playout_epoch = Some(pairing_deadline);
info!(
session_id,
?kind,
single_stream_base_remote_pts_us,
"upstream media pairing window expired; continuing with one-sided playout"
);
self.pairing_state_notify.notify_waiters();
}
}
let mut local_pts_us = remote_pts_us.saturating_sub(first_remote_for_kind);
let session_base_remote_pts_us = state.session_base_remote_pts_us.unwrap_or(remote_pts_us);
if remote_pts_us < session_base_remote_pts_us {
if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
{
info!(
session_id,
?kind,
packet_count,
remote_pts_us,
session_base_remote_pts_us,
"upstream media packet dropped before the shared overlap base"
);
}
return UpstreamPlanDecision::DropBeforeOverlap;
}
let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us);
let last_slot = match kind {
UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us,
UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us,
@ -336,10 +414,7 @@ impl UpstreamMediaRuntime {
local_pts_us = last_pts_us.saturating_add(min_step_us.max(1));
}
*last_slot = Some(local_pts_us);
let now = Instant::now();
let epoch = *state
.playout_epoch
.get_or_insert_with(|| now + upstream_playout_delay());
let epoch = *state.playout_epoch.get_or_insert(pairing_deadline);
let sink_offset_us = upstream_playout_offset_us(kind);
let due_at =
apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us);
@ -347,7 +422,6 @@ impl UpstreamMediaRuntime {
if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300))
{
let remote_elapsed_us = remote_pts_us.saturating_sub(first_remote_for_kind);
let playout_delay_us = epoch.saturating_duration_since(now).as_micros();
let late_by_us = late_by.as_micros();
info!(
@ -355,8 +429,9 @@ impl UpstreamMediaRuntime {
?kind,
packet_count,
remote_pts_us,
session_base_remote_pts_us,
first_remote_for_kind,
remote_elapsed_us,
remote_elapsed_us = remote_pts_us.saturating_sub(session_base_remote_pts_us),
local_pts_us,
playout_delay_us,
sink_offset_us,
@ -364,11 +439,14 @@ impl UpstreamMediaRuntime {
"upstream media rebase sample"
);
}
PlannedUpstreamPacket {
if kind == UpstreamMediaKind::Microphone {
self.audio_progress_notify.notify_waiters();
}
UpstreamPlanDecision::Play(PlannedUpstreamPacket {
local_pts_us,
due_at,
late_by,
}
})
}
}

View File

@ -0,0 +1,53 @@
use std::time::Duration;
use tokio::time::Instant;
use super::UpstreamMediaKind;
pub(super) fn upstream_timing_trace_enabled() -> bool {
std::env::var("LESAVKA_UPSTREAM_TIMING_TRACE")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(false)
}
pub(super) fn upstream_playout_delay() -> Duration {
let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(1_000);
Duration::from_millis(delay_ms)
}
pub(super) fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 {
let name = match kind {
UpstreamMediaKind::Camera => "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US",
UpstreamMediaKind::Microphone => "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US",
};
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<i64>().ok())
.unwrap_or(0)
}
pub(super) fn upstream_pairing_master_slack() -> Duration {
let slack_us = std::env::var("LESAVKA_UPSTREAM_PAIR_SLACK_US")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(20_000);
Duration::from_micros(slack_us)
}
pub(super) fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant {
if offset_us >= 0 {
base + Duration::from_micros(offset_us as u64)
} else {
base.checked_sub(Duration::from_micros(offset_us.unsigned_abs()))
.unwrap_or(base)
}
}

View File

@ -0,0 +1,18 @@
use tokio::time::Instant;
#[derive(Debug, Default)]
pub(super) struct UpstreamClockState {
pub session_id: u64,
pub active_camera_generation: Option<u64>,
pub active_microphone_generation: Option<u64>,
pub first_camera_remote_pts_us: Option<u64>,
pub first_microphone_remote_pts_us: Option<u64>,
pub session_base_remote_pts_us: Option<u64>,
pub last_video_local_pts_us: Option<u64>,
pub last_audio_local_pts_us: Option<u64>,
pub camera_packet_count: u64,
pub microphone_packet_count: u64,
pub startup_anchor_logged: bool,
pub playout_epoch: Option<Instant>,
pub pairing_anchor_deadline: Option<Instant>,
}

View File

@ -1,7 +1,14 @@
use super::{UpstreamMediaKind, UpstreamMediaRuntime};
use super::{PlannedUpstreamPacket, UpstreamMediaKind, UpstreamMediaRuntime};
use std::sync::Arc;
use std::time::Duration;
fn play(decision: super::UpstreamPlanDecision) -> PlannedUpstreamPacket {
match decision {
super::UpstreamPlanDecision::Play(plan) => plan,
other => panic!("expected playable packet, got {other:?}"),
}
}
#[test]
fn first_stream_starts_a_new_shared_session() {
let runtime = UpstreamMediaRuntime::new();
@ -38,49 +45,65 @@ fn closing_the_last_stream_resets_the_next_session_anchor() {
}
#[test]
fn shared_clock_rebases_audio_and_video_against_the_same_origin() {
fn first_packets_wait_for_the_counterpart_before_pairing() {
let runtime = UpstreamMediaRuntime::new();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let video_first = runtime.map_video_pts(1_000_000, 16_666);
let audio_first = runtime.map_audio_pts(1_000_000);
let audio_next = runtime.map_audio_pts(1_010_000);
let video_next = runtime.map_video_pts(1_033_333, 16_666);
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
assert_eq!(video_first, 0);
assert_eq!(audio_first, 0);
assert_eq!(audio_next, 10_000);
assert_eq!(video_next, 33_333);
let audio_first = play(runtime.plan_audio_pts(1_000_000));
let video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
assert_eq!(audio_first.local_pts_us, 0);
assert_eq!(video_first.local_pts_us, 0);
assert_eq!(audio_first.due_at, video_first.due_at);
}
#[test]
fn per_kind_session_bases_cancel_constant_startup_path_offsets() {
fn overlap_pairing_drops_leading_packets_before_the_shared_base() {
let runtime = UpstreamMediaRuntime::new();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let audio_first = runtime.map_audio_pts(1_000_000);
let video_first = runtime.map_video_pts(1_300_000, 16_666);
let audio_next = runtime.map_audio_pts(1_010_000);
let video_next = runtime.map_video_pts(1_333_333, 16_666);
assert!(matches!(
runtime.plan_audio_pts(1_000_000),
super::UpstreamPlanDecision::AwaitingPair
));
assert_eq!(audio_first, 0);
assert_eq!(video_first, 0);
assert_eq!(audio_next, 10_000);
assert_eq!(video_next, 33_333);
let video_first = play(runtime.plan_video_pts(1_300_000, 16_666));
assert_eq!(video_first.local_pts_us, 0);
assert!(matches!(
runtime.plan_audio_pts(1_000_000),
super::UpstreamPlanDecision::DropBeforeOverlap
));
let audio_next = play(runtime.plan_audio_pts(1_310_000));
let video_next = play(runtime.plan_video_pts(1_333_333, 16_666));
assert_eq!(audio_next.local_pts_us, 10_000);
assert_eq!(video_next.local_pts_us, 33_333);
}
#[test]
fn shared_clock_keeps_each_kind_monotonic_when_remote_pts_repeat() {
let runtime = UpstreamMediaRuntime::new();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let first = runtime.map_video_pts(50_000, 16_666);
let repeated = runtime.map_video_pts(50_000, 16_666);
assert!(matches!(
runtime.plan_video_pts(50_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let _audio = play(runtime.plan_audio_pts(50_000));
let first = play(runtime.plan_video_pts(50_000, 16_666));
let repeated = play(runtime.plan_video_pts(50_000, 16_666));
assert_eq!(first, 0);
assert_eq!(repeated, 16_666);
assert_eq!(first.local_pts_us, 0);
assert_eq!(repeated.local_pts_us, 16_666);
}
#[test]
@ -144,6 +167,23 @@ fn upstream_playout_offsets_default_to_zero_and_accept_overrides() {
);
}
#[test]
fn upstream_pairing_master_slack_defaults_to_twenty_ms_and_accepts_overrides() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_PAIR_SLACK_US", || {
assert_eq!(
super::upstream_pairing_master_slack(),
Duration::from_micros(20_000)
);
});
temp_env::with_var("LESAVKA_UPSTREAM_PAIR_SLACK_US", Some("5000"), || {
assert_eq!(
super::upstream_pairing_master_slack(),
Duration::from_micros(5_000)
);
});
}
#[test]
fn upstream_timing_trace_flag_accepts_false_values() {
temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("off"), || {
@ -165,15 +205,27 @@ fn apply_playout_offset_supports_negative_offsets() {
assert_eq!(delta, Duration::from_micros(20_000));
}
#[test]
fn apply_playout_offset_supports_positive_offsets() {
let base = tokio::time::Instant::now();
let shifted = super::apply_playout_offset(base, 30_000);
let delta = shifted.saturating_duration_since(base);
assert_eq!(delta, Duration::from_micros(30_000));
}
#[test]
fn shared_playout_epoch_is_reused_across_audio_and_video() {
let runtime = UpstreamMediaRuntime::new();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let video_first = runtime.plan_video_pts(1_000_000, 16_666);
let audio_first = runtime.plan_audio_pts(1_000_000);
let audio_next = runtime.plan_audio_pts(1_010_000);
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let audio_first = play(runtime.plan_audio_pts(1_000_000));
let video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
let audio_next = play(runtime.plan_audio_pts(1_010_000));
assert_eq!(video_first.local_pts_us, 0);
assert_eq!(audio_first.local_pts_us, 0);
@ -186,6 +238,31 @@ fn shared_playout_epoch_is_reused_across_audio_and_video() {
);
}
#[test]
fn pairing_window_can_expire_into_one_sided_playout() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
let runtime = UpstreamMediaRuntime::new();
let _camera = runtime.activate_camera();
let first = play(runtime.plan_video_pts(1_000_000, 16_666));
let second = play(runtime.plan_video_pts(1_016_666, 16_666));
assert_eq!(first.local_pts_us, 0);
assert_eq!(second.local_pts_us, 16_666);
});
}
#[test]
fn map_wrappers_hide_unpaired_and_pre_overlap_packets() {
let runtime = UpstreamMediaRuntime::new();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
assert_eq!(runtime.map_video_pts(1_000_000, 16_666), None);
assert_eq!(runtime.map_audio_pts(1_000_000), Some(0));
assert_eq!(runtime.map_audio_pts(999_999), None);
}
#[test]
fn shared_playout_trace_path_keeps_planned_pts_stable() {
temp_env::with_var("LESAVKA_UPSTREAM_TIMING_TRACE", Some("1"), || {
@ -193,14 +270,85 @@ fn shared_playout_trace_path_keeps_planned_pts_stable() {
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
let video = runtime.plan_video_pts(1_000_000, 16_666);
let audio = runtime.plan_audio_pts(1_000_000);
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let audio = play(runtime.plan_audio_pts(1_000_000));
let video = play(runtime.plan_video_pts(1_000_000, 16_666));
assert_eq!(video.local_pts_us, 0);
assert_eq!(audio.local_pts_us, 0);
});
}
#[tokio::test(flavor = "current_thread")]
async fn wait_for_audio_master_releases_video_once_audio_catches_up() {
let runtime = Arc::new(UpstreamMediaRuntime::new());
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let _audio_first = play(runtime.plan_audio_pts(1_000_000));
let video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
let waiter = tokio::spawn({
let runtime = runtime.clone();
async move {
runtime
.wait_for_audio_master(video_first.local_pts_us + 10_000, video_first.due_at)
.await
}
});
tokio::time::sleep(Duration::from_millis(5)).await;
let _audio_next = play(runtime.plan_audio_pts(1_010_000));
assert!(waiter.await.expect("audio master waiter should finish"));
}
#[tokio::test(flavor = "current_thread")]
async fn wait_for_audio_master_times_out_when_audio_never_catches_up() {
let runtime = Arc::new(UpstreamMediaRuntime::new());
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let _audio_first = play(runtime.plan_audio_pts(1_000_000));
let video_first = play(runtime.plan_video_pts(1_000_000, 16_666));
let due_at = tokio::time::Instant::now() + Duration::from_millis(20);
assert!(
!runtime
.wait_for_audio_master(video_first.local_pts_us + 100_000, due_at)
.await
);
}
#[tokio::test(flavor = "current_thread")]
async fn wait_for_audio_master_returns_true_when_no_microphone_stream_is_active() {
let runtime = Arc::new(UpstreamMediaRuntime::new());
let camera = runtime.activate_camera();
let microphone = runtime.activate_microphone();
runtime.close_microphone(microphone.generation);
assert!(runtime.is_camera_active(camera.generation));
assert!(
runtime
.wait_for_audio_master(
123_456,
tokio::time::Instant::now() + Duration::from_millis(10)
)
.await
);
}
#[tokio::test(flavor = "current_thread")]
async fn new_microphone_owner_waits_for_the_previous_sink_to_release() {
let runtime = Arc::new(UpstreamMediaRuntime::new());
@ -249,7 +397,7 @@ async fn superseded_microphone_waiter_stands_down_before_opening_a_sink() {
});
tokio::time::sleep(Duration::from_millis(25)).await;
let third = runtime.activate_microphone();
let _third = runtime.activate_microphone();
drop(first_permit);
assert!(
@ -258,10 +406,4 @@ async fn superseded_microphone_waiter_stands_down_before_opening_a_sink() {
.expect("superseded waiter task should finish"),
"older waiter should stand down instead of opening a sink after supersession"
);
let third_permit = runtime
.reserve_microphone_sink(third.generation)
.await
.expect("latest owner should acquire the sink gate");
drop(third_permit);
}

View File

@ -21,6 +21,8 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
"LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s",
"LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s",
"LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s",
"LESAVKA_UPSTREAM_PAIR_SLACK_US=%s",
"LESAVKA_UPSTREAM_STALE_DROP_MS=%s",
] {
assert!(
SERVER_INSTALL.contains(expected),
@ -34,6 +36,8 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_HEIGHT:-1080}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_SINK:-fbdevsink}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PAIR_SLACK_US:-20000}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}"));
}
#[test]

View File

@ -241,6 +241,81 @@ mod server_upstream_media {
});
}
#[test]
#[serial]
fn stream_camera_waits_for_the_pairing_window_then_plays_with_audio() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests();
let (server, mut cli) = serve_handler(handler).await;
let (video_tx, video_rx) = tokio::sync::mpsc::channel(4);
let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4);
let mut video_response = cli
.stream_camera(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(video_rx),
))
.await
.expect("camera stream should open")
.into_inner();
let mut audio_response = cli
.stream_microphone(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(audio_rx),
))
.await
.expect("microphone stream should open")
.into_inner();
video_tx
.send(VideoPacket {
id: 2,
pts: 1_000_000,
data: vec![0, 0, 0, 1, 0x65, 0x99],
..Default::default()
})
.await
.expect("send video packet");
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
audio_tx
.send(AudioPacket {
id: 0,
pts: 1_000_000,
data: vec![1, 2, 3, 4],
})
.await
.expect("send matching audio packet");
drop(video_tx);
drop(audio_tx);
let video_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
video_response.message(),
)
.await
.expect("camera ack timeout")
.expect("camera ack grpc")
.expect("camera ack item");
let audio_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
audio_response.message(),
)
.await
.expect("microphone ack timeout")
.expect("microphone ack grpc")
.expect("microphone ack item");
assert_eq!(video_ack, Empty {});
assert_eq!(audio_ack, Empty {});
server.abort();
});
});
});
});
}
#[test]
#[serial]
fn stream_camera_stops_a_superseded_session_cleanly() {
@ -297,4 +372,288 @@ mod server_upstream_media {
});
});
}
#[test]
#[serial]
fn stream_microphone_drops_pre_overlap_audio_after_video_sets_the_pair_anchor() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests();
let (server, mut cli) = serve_handler(handler).await;
let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4);
let (video_tx, video_rx) = tokio::sync::mpsc::channel(4);
let mut audio_response = cli
.stream_microphone(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(audio_rx),
))
.await
.expect("microphone stream should open")
.into_inner();
let mut video_response = cli
.stream_camera(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(video_rx),
))
.await
.expect("camera stream should open")
.into_inner();
audio_tx
.send(AudioPacket {
id: 0,
pts: 1_000_000,
data: vec![1, 2, 3, 4],
})
.await
.expect("send leading audio packet");
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
video_tx
.send(VideoPacket {
id: 2,
pts: 1_300_000,
data: vec![0, 0, 0, 1, 0x65, 0x88],
..Default::default()
})
.await
.expect("send anchor video packet");
audio_tx
.send(AudioPacket {
id: 0,
pts: 1_310_000,
data: vec![5, 6, 7, 8],
})
.await
.expect("send post-anchor audio packet");
drop(audio_tx);
drop(video_tx);
let audio_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
audio_response.message(),
)
.await
.expect("microphone ack timeout")
.expect("microphone ack grpc")
.expect("microphone ack item");
let video_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
video_response.message(),
)
.await
.expect("camera ack timeout")
.expect("camera ack grpc")
.expect("camera ack item");
assert_eq!(audio_ack, Empty {});
assert_eq!(video_ack, Empty {});
server.abort();
});
});
});
});
}
#[test]
#[serial]
fn stream_camera_drops_pre_overlap_video_after_audio_sets_the_pair_anchor() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests();
let (server, mut cli) = serve_handler(handler).await;
let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4);
let (video_tx, video_rx) = tokio::sync::mpsc::channel(4);
let mut audio_response = cli
.stream_microphone(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(audio_rx),
))
.await
.expect("microphone stream should open")
.into_inner();
let mut video_response = cli
.stream_camera(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(video_rx),
))
.await
.expect("camera stream should open")
.into_inner();
video_tx
.send(VideoPacket {
id: 2,
pts: 1_000_000,
data: vec![0, 0, 0, 1, 0x65, 0x77],
..Default::default()
})
.await
.expect("send leading video packet");
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
audio_tx
.send(AudioPacket {
id: 0,
pts: 1_300_000,
data: vec![1, 2, 3, 4],
})
.await
.expect("send anchor audio packet");
video_tx
.send(VideoPacket {
id: 2,
pts: 1_310_000,
data: vec![0, 0, 0, 1, 0x65, 0x88],
..Default::default()
})
.await
.expect("send post-anchor video packet");
drop(audio_tx);
drop(video_tx);
let audio_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
audio_response.message(),
)
.await
.expect("microphone ack timeout")
.expect("microphone ack grpc")
.expect("microphone ack item");
let video_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
video_response.message(),
)
.await
.expect("camera ack timeout")
.expect("camera ack grpc")
.expect("camera ack item");
assert_eq!(audio_ack, Empty {});
assert_eq!(video_ack, Empty {});
server.abort();
});
});
});
});
}
#[test]
#[serial]
fn stream_microphone_drops_stale_packets_when_freshness_budget_is_zero() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
with_var("LESAVKA_UPSTREAM_STALE_DROP_MS", Some("0"), || {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests();
let (server, mut cli) = serve_handler(handler).await;
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.send(AudioPacket {
id: 0,
pts: 12_345,
data: vec![1, 2, 3, 4, 5, 6],
})
.await
.expect("send stale synthetic upstream audio");
drop(tx);
let outbound = tokio_stream::wrappers::ReceiverStream::new(rx);
let mut response = cli
.stream_microphone(tonic::Request::new(outbound))
.await
.expect("microphone stream should open");
let ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
response.get_mut().message(),
)
.await
.expect("microphone ack timeout")
.expect("microphone ack grpc")
.expect("microphone ack item");
assert_eq!(ack, Empty {});
server.abort();
});
});
});
});
}
#[test]
#[serial]
fn stream_camera_drops_frames_that_never_reach_the_audio_master() {
let rt = tokio::runtime::Runtime::new().expect("runtime");
with_var("LESAVKA_CAPTURE_POWER_UNIT", Some("none"), || {
with_var("LESAVKA_DISABLE_UVC", None::<&str>, || {
with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("80"), || {
rt.block_on(async {
let (_dir, handler) = build_handler_for_tests();
let (server, mut cli) = serve_handler(handler).await;
let (audio_tx, audio_rx) = tokio::sync::mpsc::channel(4);
let (video_tx, video_rx) = tokio::sync::mpsc::channel(4);
let mut audio_response = cli
.stream_microphone(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(audio_rx),
))
.await
.expect("microphone stream should open")
.into_inner();
let mut video_response = cli
.stream_camera(tonic::Request::new(
tokio_stream::wrappers::ReceiverStream::new(video_rx),
))
.await
.expect("camera stream should open")
.into_inner();
audio_tx
.send(AudioPacket {
id: 0,
pts: 1_000_000,
data: vec![1, 2, 3, 4],
})
.await
.expect("send first audio packet");
video_tx
.send(VideoPacket {
id: 2,
pts: 1_050_000,
data: vec![0, 0, 0, 1, 0x65, 0x55],
..Default::default()
})
.await
.expect("send unmatched video packet");
drop(audio_tx);
drop(video_tx);
let audio_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
audio_response.message(),
)
.await
.expect("microphone ack timeout")
.expect("microphone ack grpc")
.expect("microphone ack item");
let video_ack = tokio::time::timeout(
std::time::Duration::from_secs(1),
video_response.message(),
)
.await
.expect("camera ack timeout")
.expect("camera ack grpc")
.expect("camera ack item");
assert_eq!(audio_ack, Empty {});
assert_eq!(video_ack, Empty {});
server.abort();
});
});
});
});
}
}