lesavka/server/src/upstream_media_runtime/playout_planning_methods.rs

151 lines
6.2 KiB
Rust
Raw Normal View History

impl UpstreamMediaRuntime {
/// Keeps `activate` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync.
fn activate(&self, kind: UpstreamMediaKind) -> UpstreamStreamLease {
let generation = match kind {
UpstreamMediaKind::Camera => {
self.next_camera_generation.fetch_add(1, Ordering::SeqCst) + 1
}
UpstreamMediaKind::Microphone => {
self.next_microphone_generation
.fetch_add(1, Ordering::SeqCst)
+ 1
}
};
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
{
state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1;
reset_session_state(&mut state);
state.session_started_at = Some(Instant::now());
state.phase = UpstreamSyncPhase::Acquiring;
state.last_reason = "v2 upstream session acquiring media".to_string();
}
match kind {
UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation),
UpstreamMediaKind::Microphone => state.active_microphone_generation = Some(generation),
}
UpstreamStreamLease {
session_id: state.session_id,
generation,
}
}
/// Keeps `is_active` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync.
/// Inputs are the typed parameters; output is the return value or side effect.
fn is_active(&self, kind: UpstreamMediaKind, generation: u64) -> bool {
let state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
match kind {
UpstreamMediaKind::Camera => state.active_camera_generation == Some(generation),
UpstreamMediaKind::Microphone => state.active_microphone_generation == Some(generation),
}
}
/// Keeps `close` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync.
/// Inputs are the typed parameters; output is the return value or side effect.
fn close(&self, kind: UpstreamMediaKind, generation: u64) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
match kind {
UpstreamMediaKind::Camera if state.active_camera_generation == Some(generation) => {
state.active_camera_generation = None
}
UpstreamMediaKind::Microphone
if state.active_microphone_generation == Some(generation) =>
{
state.active_microphone_generation = None
}
_ => return,
}
if state.active_camera_generation.is_none() && state.active_microphone_generation.is_none()
{
reset_session_state(&mut state);
}
}
fn plan_legacy_pts(
&self,
kind: UpstreamMediaKind,
remote_pts_us: u64,
min_step_us: u64,
) -> UpstreamPlanDecision {
self.plan_rebased_pts(kind, remote_pts_us, min_step_us.max(1), None, None)
}
/// Keeps `plan_rebased_pts` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync.
/// Inputs are the typed parameters; output is the return value or side effect.
fn plan_rebased_pts(
&self,
kind: UpstreamMediaKind,
remote_pts_us: u64,
min_step_us: u64,
explicit_base: Option<u64>,
explicit_epoch: Option<Instant>,
) -> UpstreamPlanDecision {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
match kind {
UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us = Some(remote_pts_us),
UpstreamMediaKind::Microphone => {
state.latest_microphone_remote_pts_us = Some(remote_pts_us)
}
}
let base = match explicit_base {
Some(base) => *state.base_remote_pts_us.get_or_insert(base),
None => *state.base_remote_pts_us.get_or_insert(remote_pts_us),
};
let epoch = match explicit_epoch {
Some(epoch) => *state.playout_epoch.get_or_insert(epoch),
None => *state
.playout_epoch
.get_or_insert(Instant::now() + upstream_playout_delay()),
};
let mut local_pts_us = remote_pts_us.saturating_sub(base);
let last_slot = match kind {
UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us,
UpstreamMediaKind::Microphone => &mut state.last_audio_local_pts_us,
};
if let Some(last_pts_us) = *last_slot
&& local_pts_us <= last_pts_us
{
local_pts_us = last_pts_us.saturating_add(min_step_us.max(1));
}
*last_slot = Some(local_pts_us);
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "v2 legacy packet mapped without cross-stream planner".to_string();
let due_at = apply_offset(
epoch + Duration::from_micros(local_pts_us),
self.playout_offset_us(kind),
);
let late_by = Instant::now()
.checked_duration_since(due_at)
.unwrap_or_default();
UpstreamPlanDecision::Play(PlannedUpstreamPacket {
local_pts_us,
due_at,
late_by,
source_lag: Duration::ZERO,
})
}
/// Keeps `playout_offset_us` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync.
/// Inputs are the typed parameters; output is the return value or side effect.
fn playout_offset_us(&self, kind: UpstreamMediaKind) -> i64 {
match kind {
UpstreamMediaKind::Camera => self.camera_playout_offset_us.load(Ordering::Relaxed),
UpstreamMediaKind::Microphone => {
self.microphone_playout_offset_us.load(Ordering::Relaxed)
}
}
}
}