media: enforce upstream lip-sync planner

This commit is contained in:
Brad Stein 2026-05-01 19:16:40 -03:00
parent c960df7400
commit 3920e0a72a
41 changed files with 1098 additions and 101 deletions

View File

@ -179,3 +179,44 @@ Context: the mirrored browser probe finally reproduced the real failure class on
- 0.16.25 removed the client mic backlog but exposed a stable hardware/browser path delta: p95 `557.3 ms`, median `-540.5 ms`, drift `+9.0 ms`, and fresh mic delivery ages around `2-10 ms`. Patch 0.16.26 raises the MJPEG/UVC factory audio delay to `+1260 ms` and expands the calibration clamp so this stable offset can actually be corrected instead of rejected. - 0.16.25 removed the client mic backlog but exposed a stable hardware/browser path delta: p95 `557.3 ms`, median `-540.5 ms`, drift `+9.0 ms`, and fresh mic delivery ages around `2-10 ms`. Patch 0.16.26 raises the MJPEG/UVC factory audio delay to `+1260 ms` and expands the calibration clamp so this stable offset can actually be corrected instead of rejected.
- [ ] Re-run the mirrored browser probe after the pre-start false-positive fix. - [ ] Re-run the mirrored browser probe after the pre-start false-positive fix.
- [ ] Run Google Meet manual validation. - [ ] Run Google Meet manual validation.
## 0.17.0 Tyrannical Upstream Playout Checklist
Context: 0.16.x proved that queue tweaks and static calibration cannot guarantee lip sync. 0.17.0 changes the upstream contract: the server planner is authoritative, audio is the master, video follows by timestamp or freezes, and freshness wins over smooth-but-wrong playback.
### Hard Product Invariants
- [x] No normal live upstream playout may be more than 1000 ms behind the freshest known client capture frontier.
- [x] Video may not advance outside the audio playhead sync budget.
- [x] Audio should be continuous when possible, but stale audio must be dropped/skipped rather than drained.
- [x] Missing or late video should freeze/stutter instead of pulling audio backward.
- [x] Startup may be ugly, but must either enter fresh synced live mode or declare failure within 60 seconds.
- [x] Healing may be visible, but must prevent persistent seconds-scale skew.
- [x] Calibration may fine-tune sub-frame offsets only; it must not be required to rescue seconds-scale desync.
- [x] Bump Lesavka to 0.17.0 because this is a media-contract change, not a patch tune.
- [x] Add planner policy config: max live lag, max skew, startup timeout, target playout delay, and healing cooldown.
- [x] Reset 0.17 defaults so shipped audio/video offsets do not intentionally exceed the freshness budget.
- [x] Track latest camera/audio input timestamps in the server planner.
- [x] Track actual planned/emitted audio and video playheads.
- [x] Enforce audio as master: stale audio is dropped/skipped; it does not drain backlog.
- [x] Enforce video follower behavior: frames ahead of audio wait; frames too far behind audio are dropped so the previous frame freezes.
- [x] Re-anchor continuously when the live playhead falls outside the freshness budget, not only once at startup.
- [x] Keep startup paired-only by default and fail visibly after the startup timeout.
- [x] Add planner phases and counters to diagnostics/logs: acquiring, syncing, live, healing, failed; stale drops, skew drops, freshness heals, video freezes.
- [x] Keep UVC/UAC sinks as dumb consumers of planner-approved packets.
- [x] Update tests to prove stale media cannot be emitted and video cannot outrun/lag audio beyond policy.
- [x] Update manual/probe diagnostics so 0.17 reports the planner state being tested.
### Validation Targets Before Human Test
- [x] Unit tests for startup pairing, stale audio drop, stale video drop, reanchor, startup timeout, audio-master/video-follower rules.
- [x] Contract tests for installer defaults and version reporting.
- [x] `cargo check -p lesavka_client -p lesavka_server --bins`.
- [x] Focused `lesavka_testing` media/runtime contracts.
- [ ] Only after all of the above, run the mirrored browser probe.
### Progress Log
- 2026-05-01: Added 0.17 planner defaults (`350ms` target playout, `1000ms` max live lag, `60000ms` startup timeout, `80000us` pair slack), reset MJPEG audio factory offset to `0`, and migrated old `-45ms`, `+720ms`, and `+1260ms` untouched baselines.
- 2026-05-01: Server planner now tracks latest input frontier, presented audio/video playheads, phase, stale drops, skew drops, reanchors, startup timeouts, and freezes.
- 2026-05-01: Runtime tests green for stale audio drop, stale video drop, audio-master/video-follower freeze, repeated reanchor, paired startup timeout, and planner snapshot basics: `cargo test -p lesavka_server upstream_media_runtime::tests -- --nocapture`.
- 2026-05-01: Added `GetUpstreamSync` RPC, `lesavka-relayctl upstream-sync`, launcher diagnostics text, and mirrored-probe before/after planner snapshots so 0.17 probe runs report the exact planner state under test.
- 2026-05-01: Validation green: `cargo test -p lesavka_server --lib --bins`, `cargo test -p lesavka_testing`, `cargo test -p lesavka_client --bins --lib`, and targeted installer/RPC/layout contracts.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.16.26" version = "0.17.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.16.26" version = "0.17.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.16.26" version = "0.17.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.16.26" version = "0.17.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -21,6 +21,7 @@ enum CommandKind {
RecoverUac, RecoverUac,
RecoverUvc, RecoverUvc,
ResetUsb, ResetUsb,
UpstreamSync,
} }
impl CommandKind { impl CommandKind {
@ -36,6 +37,7 @@ impl CommandKind {
"recover-uac" => Some(Self::RecoverUac), "recover-uac" => Some(Self::RecoverUac),
"recover-uvc" => Some(Self::RecoverUvc), "recover-uvc" => Some(Self::RecoverUvc),
"reset-usb" | "hard-reset-usb" => Some(Self::ResetUsb), "reset-usb" | "hard-reset-usb" => Some(Self::ResetUsb),
"upstream-sync" | "sync" => Some(Self::UpstreamSync),
_ => None, _ => None,
} }
} }
@ -54,7 +56,7 @@ enum ParseOutcome {
} }
fn usage() -> &'static str { fn usage() -> &'static str {
"Usage: lesavka-relayctl [--server http://HOST:50051] <status|version|auto|on|off|recover-usb|recover-uac|recover-uvc|reset-usb>" "Usage: lesavka-relayctl [--server http://HOST:50051] <status|version|upstream-sync|auto|on|off|recover-usb|recover-uac|recover-uvc|reset-usb>"
} }
fn parse_args_outcome_from<I, S>(args: I) -> Result<ParseOutcome> fn parse_args_outcome_from<I, S>(args: I) -> Result<ParseOutcome>
@ -117,7 +119,8 @@ fn capture_power_request(command: CommandKind) -> Option<SetCapturePowerRequest>
| CommandKind::RecoverUsb | CommandKind::RecoverUsb
| CommandKind::RecoverUac | CommandKind::RecoverUac
| CommandKind::RecoverUvc | CommandKind::RecoverUvc
| CommandKind::ResetUsb => return None, | CommandKind::ResetUsb
| CommandKind::UpstreamSync => return None,
CommandKind::Auto => (false, CapturePowerCommand::Auto), CommandKind::Auto => (false, CapturePowerCommand::Auto),
CommandKind::On => (true, CapturePowerCommand::ForceOn), CommandKind::On => (true, CapturePowerCommand::ForceOn),
CommandKind::Off => (false, CapturePowerCommand::ForceOff), CommandKind::Off => (false, CapturePowerCommand::ForceOff),
@ -186,6 +189,32 @@ fn print_versions(server_addr: &str, caps: &HandshakeSet) {
println!("server_camera_codec={}", caps.camera_codec); println!("server_camera_codec={}", caps.camera_codec);
} }
fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) {
println!("planner_session_id={}", state.session_id);
println!("planner_phase={}", state.phase);
println!(
"planner_live_lag_ms={}",
state
.live_lag_ms
.map(|value| format!("{value:.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!(
"planner_skew_ms={}",
state
.planner_skew_ms
.map(|value| format!("{value:+.1}"))
.unwrap_or_else(|| "pending".to_string())
);
println!("planner_stale_audio_drops={}", state.stale_audio_drops);
println!("planner_stale_video_drops={}", state.stale_video_drops);
println!("planner_skew_video_drops={}", state.skew_video_drops);
println!("planner_freshness_reanchors={}", state.freshness_reanchors);
println!("planner_startup_timeouts={}", state.startup_timeouts);
println!("planner_video_freezes={}", state.video_freezes);
println!("planner_detail={}", state.last_reason);
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -215,7 +244,8 @@ async fn main() -> Result<()> {
| CommandKind::RecoverUsb | CommandKind::RecoverUsb
| CommandKind::RecoverUac | CommandKind::RecoverUac
| CommandKind::RecoverUvc | CommandKind::RecoverUvc
| CommandKind::ResetUsb => unreachable!(), | CommandKind::ResetUsb
| CommandKind::UpstreamSync => unreachable!(),
}; };
let reply = client let reply = client
.set_capture_power(Request::new(request)) .set_capture_power(Request::new(request))
@ -268,6 +298,15 @@ async fn main() -> Result<()> {
println!("ok={}", reply.ok); println!("ok={}", reply.ok);
return Ok(()); return Ok(());
} }
CommandKind::UpstreamSync => {
let reply = client
.get_upstream_sync(Request::new(Empty {}))
.await
.context("querying upstream sync planner state")?
.into_inner();
print_upstream_sync(reply);
return Ok(());
}
CommandKind::Version | CommandKind::Auto | CommandKind::On | CommandKind::Off => { CommandKind::Version | CommandKind::Auto | CommandKind::On | CommandKind::Off => {
unreachable!() unreachable!()
} }
@ -297,6 +336,11 @@ mod tests {
assert_eq!(CommandKind::parse("get"), Some(CommandKind::Status)); assert_eq!(CommandKind::parse("get"), Some(CommandKind::Status));
assert_eq!(CommandKind::parse("version"), Some(CommandKind::Version)); assert_eq!(CommandKind::parse("version"), Some(CommandKind::Version));
assert_eq!(CommandKind::parse("versions"), Some(CommandKind::Version)); assert_eq!(CommandKind::parse("versions"), Some(CommandKind::Version));
assert_eq!(
CommandKind::parse("upstream-sync"),
Some(CommandKind::UpstreamSync)
);
assert_eq!(CommandKind::parse("sync"), Some(CommandKind::UpstreamSync));
assert_eq!(CommandKind::parse("force-on"), Some(CommandKind::On)); assert_eq!(CommandKind::parse("force-on"), Some(CommandKind::On));
assert_eq!(CommandKind::parse("force-off"), Some(CommandKind::Off)); assert_eq!(CommandKind::parse("force-off"), Some(CommandKind::Off));
assert_eq!( assert_eq!(
@ -328,9 +372,9 @@ mod tests {
#[test] #[test]
fn parse_args_accepts_server_and_command() { fn parse_args_accepts_server_and_command() {
let config = let config =
parse_args_from(["--server", " http://lab:50051 ", "reset-usb"]).expect("config"); parse_args_from(["--server", " http://lab:50051 ", "upstream-sync"]).expect("config");
assert_eq!(config.server, "http://lab:50051"); assert_eq!(config.server, "http://lab:50051");
assert_eq!(config.command, CommandKind::ResetUsb); assert_eq!(config.command, CommandKind::UpstreamSync);
} }
#[test] #[test]
@ -391,6 +435,7 @@ mod tests {
assert!(capture_power_request(CommandKind::RecoverUac).is_none()); assert!(capture_power_request(CommandKind::RecoverUac).is_none());
assert!(capture_power_request(CommandKind::RecoverUvc).is_none()); assert!(capture_power_request(CommandKind::RecoverUvc).is_none());
assert!(capture_power_request(CommandKind::ResetUsb).is_none()); assert!(capture_power_request(CommandKind::ResetUsb).is_none());
assert!(capture_power_request(CommandKind::UpstreamSync).is_none());
} }
#[test] #[test]

View File

@ -4,7 +4,7 @@ use lesavka_common::lesavka::{
}; };
use tonic::{Request, transport::Channel}; use tonic::{Request, transport::Channel};
use super::state::CalibrationStatus; use super::state::{CalibrationStatus, UpstreamSyncStatus};
use crate::relay_transport; use crate::relay_transport;
pub fn fetch_calibration(server_addr: &str) -> Result<CalibrationStatus> { pub fn fetch_calibration(server_addr: &str) -> Result<CalibrationStatus> {
@ -19,6 +19,18 @@ pub fn fetch_calibration(server_addr: &str) -> Result<CalibrationStatus> {
}) })
} }
pub fn fetch_upstream_sync(server_addr: &str) -> Result<UpstreamSyncStatus> {
with_runtime(async move {
let mut client = connect(server_addr).await?;
let reply = client
.get_upstream_sync(Request::new(Empty {}))
.await
.context("querying upstream A/V planner state")?
.into_inner();
Ok(UpstreamSyncStatus::from_proto(reply))
})
}
pub fn restore_default_calibration(server_addr: &str) -> Result<CalibrationStatus> { pub fn restore_default_calibration(server_addr: &str) -> Result<CalibrationStatus> {
send_calibration_request( send_calibration_request(
server_addr, server_addr,

View File

@ -163,6 +163,18 @@ pub struct SnapshotReport {
pub av_delivery_skew_ms: f32, pub av_delivery_skew_ms: f32,
pub av_enqueue_skew_ms: f32, pub av_enqueue_skew_ms: f32,
pub av_sync_health: String, pub av_sync_health: String,
pub planner_available: bool,
pub planner_phase: String,
pub planner_session_id: u64,
pub planner_live_lag_ms: Option<f32>,
pub planner_skew_ms: Option<f32>,
pub planner_stale_audio_drops: u64,
pub planner_stale_video_drops: u64,
pub planner_skew_video_drops: u64,
pub planner_freshness_reanchors: u64,
pub planner_startup_timeouts: u64,
pub planner_video_freezes: u64,
pub planner_detail: String,
pub calibration_available: bool, pub calibration_available: bool,
pub calibration_profile: String, pub calibration_profile: String,
pub calibration_source: String, pub calibration_source: String,

View File

@ -238,6 +238,18 @@ impl SnapshotReport {
av_delivery_skew_ms, av_delivery_skew_ms,
av_enqueue_skew_ms, av_enqueue_skew_ms,
av_sync_health, av_sync_health,
planner_available: state.upstream_sync.available,
planner_phase: state.upstream_sync.phase.clone(),
planner_session_id: state.upstream_sync.session_id,
planner_live_lag_ms: state.upstream_sync.live_lag_ms,
planner_skew_ms: state.upstream_sync.planner_skew_ms,
planner_stale_audio_drops: state.upstream_sync.stale_audio_drops,
planner_stale_video_drops: state.upstream_sync.stale_video_drops,
planner_skew_video_drops: state.upstream_sync.skew_video_drops,
planner_freshness_reanchors: state.upstream_sync.freshness_reanchors,
planner_startup_timeouts: state.upstream_sync.startup_timeouts,
planner_video_freezes: state.upstream_sync.video_freezes,
planner_detail: state.upstream_sync.detail.clone(),
calibration_available: state.calibration.available, calibration_available: state.calibration.available,
calibration_profile: state.calibration.profile.clone(), calibration_profile: state.calibration.profile.clone(),
calibration_source: state.calibration.source.clone(), calibration_source: state.calibration.source.clone(),

View File

@ -141,6 +141,41 @@ impl SnapshotReport {
self.upstream_microphone.latest_enqueue_age_ms, self.upstream_microphone.latest_enqueue_age_ms,
self.upstream_microphone.latest_delivery_age_ms self.upstream_microphone.latest_delivery_age_ms
); );
let _ = writeln!(text, "server upstream planner");
let _ = writeln!(
text,
" status: {} | phase={} | session={}",
if self.planner_available {
"available"
} else {
"unavailable"
},
self.planner_phase,
self.planner_session_id
);
let live_lag = self
.planner_live_lag_ms
.map(|value| format!("{value:.1}ms"))
.unwrap_or_else(|| "pending".to_string());
let planner_skew = self
.planner_skew_ms
.map(|value| format!("{value:+.1}ms audio-video"))
.unwrap_or_else(|| "pending".to_string());
let _ = writeln!(
text,
" live lag: {live_lag} | planner skew: {planner_skew}"
);
let _ = writeln!(
text,
" drops/heals: stale_audio={} stale_video={} skew_video={} reanchors={} freezes={} startup_timeouts={}",
self.planner_stale_audio_drops,
self.planner_stale_video_drops,
self.planner_skew_video_drops,
self.planner_freshness_reanchors,
self.planner_video_freezes,
self.planner_startup_timeouts
);
let _ = writeln!(text, " detail: {}", self.planner_detail);
let _ = writeln!(text, "calibration"); let _ = writeln!(text, "calibration");
let _ = writeln!( let _ = writeln!(
text, text,

View File

@ -453,4 +453,7 @@ impl LauncherState {
self.calibration = calibration; self.calibration = calibration;
} }
pub fn set_upstream_sync(&mut self, upstream_sync: UpstreamSyncStatus) {
self.upstream_sync = upstream_sync;
}
} }

View File

@ -346,11 +346,11 @@ impl Default for CalibrationStatus {
Self { Self {
available: false, available: false,
profile: "mjpeg".to_string(), profile: "mjpeg".to_string(),
factory_audio_offset_us: 1_260_000, factory_audio_offset_us: 0,
factory_video_offset_us: 0, factory_video_offset_us: 0,
default_audio_offset_us: 1_260_000, default_audio_offset_us: 0,
default_video_offset_us: 0, default_video_offset_us: 0,
active_audio_offset_us: 1_260_000, active_audio_offset_us: 0,
active_video_offset_us: 0, active_video_offset_us: 0,
source: "unknown".to_string(), source: "unknown".to_string(),
confidence: "unknown".to_string(), confidence: "unknown".to_string(),
@ -360,6 +360,81 @@ impl Default for CalibrationStatus {
} }
} }
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct UpstreamSyncStatus {
pub available: bool,
pub session_id: u64,
pub phase: String,
pub latest_camera_remote_pts_us: Option<u64>,
pub latest_microphone_remote_pts_us: Option<u64>,
pub last_video_presented_pts_us: Option<u64>,
pub last_audio_presented_pts_us: Option<u64>,
pub live_lag_ms: Option<f32>,
pub planner_skew_ms: Option<f32>,
pub stale_audio_drops: u64,
pub stale_video_drops: u64,
pub skew_video_drops: u64,
pub freshness_reanchors: u64,
pub startup_timeouts: u64,
pub video_freezes: u64,
pub detail: String,
}
impl UpstreamSyncStatus {
#[must_use]
pub fn from_proto(reply: lesavka_common::lesavka::UpstreamSyncState) -> Self {
Self {
available: true,
session_id: reply.session_id,
phase: reply.phase,
latest_camera_remote_pts_us: reply.latest_camera_remote_pts_us,
latest_microphone_remote_pts_us: reply.latest_microphone_remote_pts_us,
last_video_presented_pts_us: reply.last_video_presented_pts_us,
last_audio_presented_pts_us: reply.last_audio_presented_pts_us,
live_lag_ms: reply.live_lag_ms,
planner_skew_ms: reply.planner_skew_ms,
stale_audio_drops: reply.stale_audio_drops,
stale_video_drops: reply.stale_video_drops,
skew_video_drops: reply.skew_video_drops,
freshness_reanchors: reply.freshness_reanchors,
startup_timeouts: reply.startup_timeouts,
video_freezes: reply.video_freezes,
detail: reply.last_reason,
}
}
#[must_use]
pub fn unavailable(detail: impl Into<String>) -> Self {
Self {
detail: detail.into(),
..Self::default()
}
}
}
impl Default for UpstreamSyncStatus {
fn default() -> Self {
Self {
available: false,
session_id: 0,
phase: "unknown".to_string(),
latest_camera_remote_pts_us: None,
latest_microphone_remote_pts_us: None,
last_video_presented_pts_us: None,
last_audio_presented_pts_us: None,
live_lag_ms: None,
planner_skew_ms: None,
stale_audio_drops: 0,
stale_video_drops: 0,
skew_video_drops: 0,
freshness_reanchors: 0,
startup_timeouts: 0,
video_freezes: 0,
detail: "upstream sync planner unavailable".to_string(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct DeviceSelection { pub struct DeviceSelection {
pub camera: Option<String>, pub camera: Option<String>,
@ -415,6 +490,7 @@ pub struct LauncherState {
pub swap_key_binding_token: u64, pub swap_key_binding_token: u64,
pub capture_power: CapturePowerStatus, pub capture_power: CapturePowerStatus,
pub calibration: CalibrationStatus, pub calibration: CalibrationStatus,
pub upstream_sync: UpstreamSyncStatus,
pub remote_active: bool, pub remote_active: bool,
pub notes: Vec<String>, pub notes: Vec<String>,
} }
@ -449,6 +525,7 @@ impl Default for LauncherState {
swap_key_binding_token: 0, swap_key_binding_token: 0,
capture_power: CapturePowerStatus::default(), capture_power: CapturePowerStatus::default(),
calibration: CalibrationStatus::default(), calibration: CalibrationStatus::default(),
upstream_sync: UpstreamSyncStatus::default(),
remote_active: false, remote_active: false,
notes: Vec::new(), notes: Vec::new(),
} }

View File

@ -175,6 +175,15 @@ impl Relay for ProbeRelay {
self.get_calibration(Request::new(lesavka_common::lesavka::Empty {})) self.get_calibration(Request::new(lesavka_common::lesavka::Empty {}))
.await .await
} }
async fn get_upstream_sync(
&self,
_request: Request<lesavka_common::lesavka::Empty>,
) -> Result<Response<lesavka_common::lesavka::UpstreamSyncState>, Status> {
Ok(Response::new(
lesavka_common::lesavka::UpstreamSyncState::default(),
))
}
} }
#[test] #[test]

View File

@ -405,7 +405,7 @@ fn capture_power_status_updates_snapshot_state() {
fn calibration_status_tracks_proto_unavailable_and_status_line() { fn calibration_status_tracks_proto_unavailable_and_status_line() {
let mut state = LauncherState::new(); let mut state = LauncherState::new();
assert!(!state.calibration.available); assert!(!state.calibration.available);
assert_eq!(state.calibration.active_audio_offset_us, 1_260_000); assert_eq!(state.calibration.active_audio_offset_us, 0);
let unavailable = CalibrationStatus::unavailable("server unreachable"); let unavailable = CalibrationStatus::unavailable("server unreachable");
assert!(!unavailable.available); assert!(!unavailable.available);
@ -414,7 +414,7 @@ fn calibration_status_tracks_proto_unavailable_and_status_line() {
state.set_calibration(CalibrationStatus::from_proto( state.set_calibration(CalibrationStatus::from_proto(
lesavka_common::lesavka::CalibrationState { lesavka_common::lesavka::CalibrationState {
profile: "mjpeg".to_string(), profile: "mjpeg".to_string(),
factory_audio_offset_us: 1_260_000, factory_audio_offset_us: 0,
factory_video_offset_us: 0, factory_video_offset_us: 0,
default_audio_offset_us: -40_000, default_audio_offset_us: -40_000,
default_video_offset_us: 1_000, default_video_offset_us: 1_000,
@ -429,7 +429,7 @@ fn calibration_status_tracks_proto_unavailable_and_status_line() {
assert!(state.calibration.available); assert!(state.calibration.available);
assert_eq!(state.calibration.profile, "mjpeg"); assert_eq!(state.calibration.profile, "mjpeg");
assert_eq!(state.calibration.factory_audio_offset_us, 1_260_000); assert_eq!(state.calibration.factory_audio_offset_us, 0);
assert_eq!(state.calibration.factory_video_offset_us, 0); assert_eq!(state.calibration.factory_video_offset_us, 0);
assert_eq!(state.calibration.default_audio_offset_us, -40_000); assert_eq!(state.calibration.default_audio_offset_us, -40_000);
assert_eq!(state.calibration.default_video_offset_us, 1_000); assert_eq!(state.calibration.default_video_offset_us, 1_000);

View File

@ -6,7 +6,7 @@ use futures::stream;
use lesavka_common::lesavka::{ use lesavka_common::lesavka::{
AudioPacket, CalibrationRequest, CalibrationState, CapturePowerState, Empty, KeyboardReport, AudioPacket, CalibrationRequest, CalibrationState, CapturePowerState, Empty, KeyboardReport,
MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest,
VideoPacket, UpstreamSyncState, VideoPacket,
relay_server::{Relay, RelayServer}, relay_server::{Relay, RelayServer},
}; };
use serial_test::serial; use serial_test::serial;
@ -158,6 +158,13 @@ impl Relay for UtilityRelay {
) -> Result<Response<CalibrationState>, Status> { ) -> Result<Response<CalibrationState>, Status> {
Ok(Response::new(CalibrationState::default())) Ok(Response::new(CalibrationState::default()))
} }
async fn get_upstream_sync(
&self,
_request: Request<Empty>,
) -> Result<Response<UpstreamSyncState>, Status> {
Ok(Response::new(UpstreamSyncState::default()))
}
} }
fn serve(relay: UtilityRelay) -> (tokio::runtime::Runtime, String) { fn serve(relay: UtilityRelay) -> (tokio::runtime::Runtime, String) {

View File

@ -3,8 +3,8 @@ use anyhow::Result;
#[cfg(not(coverage))] #[cfg(not(coverage))]
use { use {
super::calibration::{ super::calibration::{
blind_calibration_estimate, fetch_calibration, nudge_audio_calibration, blind_calibration_estimate, fetch_calibration, fetch_upstream_sync,
restore_default_calibration, restore_factory_calibration, nudge_audio_calibration, restore_default_calibration, restore_factory_calibration,
}, },
super::clipboard::send_clipboard_text_to_remote, super::clipboard::send_clipboard_text_to_remote,
super::device_test::{DeviceTestController, DeviceTestKind}, super::device_test::{DeviceTestController, DeviceTestKind},
@ -20,7 +20,7 @@ use {
super::state::{ super::state::{
BreakoutSizePreset, CalibrationStatus, CapturePowerStatus, CaptureSizePreset, BreakoutSizePreset, CalibrationStatus, CapturePowerStatus, CaptureSizePreset,
DisplaySurface, FeedSourcePreset, InputRouting, LauncherState, MAX_AUDIO_GAIN_PERCENT, DisplaySurface, FeedSourcePreset, InputRouting, LauncherState, MAX_AUDIO_GAIN_PERCENT,
MAX_MIC_GAIN_PERCENT, MAX_MIC_GAIN_PERCENT, UpstreamSyncStatus,
}, },
super::ui_components::{ super::ui_components::{
ConsoleLogLevel, build_launcher_view, sync_camera_quality_combo, sync_input_device_combo, ConsoleLogLevel, build_launcher_view, sync_camera_quality_combo, sync_input_device_combo,
@ -144,6 +144,9 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
calibration_tx, calibration_tx,
calibration_rx, calibration_rx,
calibration_request_in_flight, calibration_request_in_flight,
upstream_sync_tx,
upstream_sync_rx,
upstream_sync_request_in_flight,
relay_tx, relay_tx,
relay_rx, relay_rx,
relay_request_in_flight, relay_request_in_flight,
@ -154,6 +157,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> {
diagnostics_process, diagnostics_process,
next_power_probe, next_power_probe,
next_calibration_probe, next_calibration_probe,
next_upstream_sync_probe,
next_diagnostics_probe, next_diagnostics_probe,
next_diagnostics_sample, next_diagnostics_sample,
preview_session_active, preview_session_active,

View File

@ -21,6 +21,9 @@ struct ActivationContext {
calibration_tx: std::sync::mpsc::Sender<CalibrationMessage>, calibration_tx: std::sync::mpsc::Sender<CalibrationMessage>,
calibration_rx: std::sync::mpsc::Receiver<CalibrationMessage>, calibration_rx: std::sync::mpsc::Receiver<CalibrationMessage>,
calibration_request_in_flight: Rc<Cell<bool>>, calibration_request_in_flight: Rc<Cell<bool>>,
upstream_sync_tx: std::sync::mpsc::Sender<UpstreamSyncMessage>,
upstream_sync_rx: std::sync::mpsc::Receiver<UpstreamSyncMessage>,
upstream_sync_request_in_flight: Rc<Cell<bool>>,
relay_tx: std::sync::mpsc::Sender<RelayMessage>, relay_tx: std::sync::mpsc::Sender<RelayMessage>,
relay_rx: std::sync::mpsc::Receiver<RelayMessage>, relay_rx: std::sync::mpsc::Receiver<RelayMessage>,
relay_request_in_flight: Rc<Cell<bool>>, relay_request_in_flight: Rc<Cell<bool>>,
@ -31,6 +34,7 @@ struct ActivationContext {
diagnostics_process: Rc<RefCell<ProcessCpuSampler>>, diagnostics_process: Rc<RefCell<ProcessCpuSampler>>,
next_power_probe: Rc<Cell<Instant>>, next_power_probe: Rc<Cell<Instant>>,
next_calibration_probe: Rc<Cell<Instant>>, next_calibration_probe: Rc<Cell<Instant>>,
next_upstream_sync_probe: Rc<Cell<Instant>>,
next_diagnostics_probe: Rc<Cell<Instant>>, next_diagnostics_probe: Rc<Cell<Instant>>,
next_diagnostics_sample: Rc<Cell<Instant>>, next_diagnostics_sample: Rc<Cell<Instant>>,
preview_session_active: Rc<Cell<bool>>, preview_session_active: Rc<Cell<bool>>,

View File

@ -112,6 +112,9 @@
let (calibration_tx, calibration_rx) = let (calibration_tx, calibration_rx) =
std::sync::mpsc::channel::<CalibrationMessage>(); std::sync::mpsc::channel::<CalibrationMessage>();
let calibration_request_in_flight = Rc::new(Cell::new(false)); let calibration_request_in_flight = Rc::new(Cell::new(false));
let (upstream_sync_tx, upstream_sync_rx) =
std::sync::mpsc::channel::<UpstreamSyncMessage>();
let upstream_sync_request_in_flight = Rc::new(Cell::new(false));
let (relay_tx, relay_rx) = std::sync::mpsc::channel::<RelayMessage>(); let (relay_tx, relay_rx) = std::sync::mpsc::channel::<RelayMessage>();
let relay_request_in_flight = Rc::new(Cell::new(false)); let relay_request_in_flight = Rc::new(Cell::new(false));
let (caps_tx, caps_rx) = std::sync::mpsc::channel::<CapsMessage>(); let (caps_tx, caps_rx) = std::sync::mpsc::channel::<CapsMessage>();
@ -122,6 +125,8 @@
Rc::new(Cell::new(Instant::now() + Duration::from_millis(500))); Rc::new(Cell::new(Instant::now() + Duration::from_millis(500)));
let next_calibration_probe = let next_calibration_probe =
Rc::new(Cell::new(Instant::now() + Duration::from_millis(650))); Rc::new(Cell::new(Instant::now() + Duration::from_millis(650)));
let next_upstream_sync_probe =
Rc::new(Cell::new(Instant::now() + Duration::from_millis(750)));
let next_diagnostics_probe = let next_diagnostics_probe =
Rc::new(Cell::new(Instant::now() + Duration::from_millis(250))); Rc::new(Cell::new(Instant::now() + Duration::from_millis(250)));
let next_diagnostics_sample = let next_diagnostics_sample =
@ -157,6 +162,9 @@
calibration_tx, calibration_tx,
calibration_rx, calibration_rx,
calibration_request_in_flight, calibration_request_in_flight,
upstream_sync_tx,
upstream_sync_rx,
upstream_sync_request_in_flight,
relay_tx, relay_tx,
relay_rx, relay_rx,
relay_request_in_flight, relay_request_in_flight,
@ -167,6 +175,7 @@
diagnostics_process, diagnostics_process,
next_power_probe, next_power_probe,
next_calibration_probe, next_calibration_probe,
next_upstream_sync_probe,
next_diagnostics_probe, next_diagnostics_probe,
next_diagnostics_sample, next_diagnostics_sample,
preview_session_active, preview_session_active,

View File

@ -179,6 +179,22 @@ fn request_calibration_refresh(
}); });
} }
#[cfg(not(coverage))]
/// Refresh authoritative upstream sync planner state in the background.
fn request_upstream_sync_refresh(
upstream_sync_tx: std::sync::mpsc::Sender<UpstreamSyncMessage>,
server_addr: String,
delay: Duration,
) {
std::thread::spawn(move || {
if !delay.is_zero() {
std::thread::sleep(delay);
}
let result = fetch_upstream_sync(&server_addr).map_err(|err| err.to_string());
let _ = upstream_sync_tx.send(UpstreamSyncMessage::Refresh(result));
});
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn request_calibration_command<F>( fn request_calibration_command<F>(
calibration_tx: std::sync::mpsc::Sender<CalibrationMessage>, calibration_tx: std::sync::mpsc::Sender<CalibrationMessage>,
@ -233,6 +249,11 @@ fn unavailable_calibration(detail: String) -> CalibrationStatus {
CalibrationStatus::unavailable(detail) CalibrationStatus::unavailable(detail)
} }
#[cfg(not(coverage))]
fn unavailable_upstream_sync(detail: String) -> UpstreamSyncStatus {
UpstreamSyncStatus::unavailable(detail)
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
fn calibration_summary(calibration: &CalibrationStatus) -> String { fn calibration_summary(calibration: &CalibrationStatus) -> String {
format!( format!(

View File

@ -10,6 +10,11 @@ enum CalibrationMessage {
Command(std::result::Result<CalibrationStatus, String>), Command(std::result::Result<CalibrationStatus, String>),
} }
#[cfg(not(coverage))]
enum UpstreamSyncMessage {
Refresh(std::result::Result<UpstreamSyncStatus, String>),
}
#[cfg(not(coverage))] #[cfg(not(coverage))]
enum RelayMessage { enum RelayMessage {
Spawned(std::result::Result<RelayChild, String>), Spawned(std::result::Result<RelayChild, String>),

View File

@ -13,14 +13,17 @@
Rc::new(RefCell::new(path_marker(focus_signal_path.as_path()))); Rc::new(RefCell::new(path_marker(focus_signal_path.as_path())));
let power_request_in_flight = Rc::clone(&power_request_in_flight); let power_request_in_flight = Rc::clone(&power_request_in_flight);
let calibration_request_in_flight = Rc::clone(&calibration_request_in_flight); let calibration_request_in_flight = Rc::clone(&calibration_request_in_flight);
let upstream_sync_request_in_flight = Rc::clone(&upstream_sync_request_in_flight);
let relay_request_in_flight = Rc::clone(&relay_request_in_flight); let relay_request_in_flight = Rc::clone(&relay_request_in_flight);
let preview = preview.clone(); let preview = preview.clone();
let power_tx = power_tx.clone(); let power_tx = power_tx.clone();
let calibration_tx = calibration_tx.clone(); let calibration_tx = calibration_tx.clone();
let upstream_sync_tx = upstream_sync_tx.clone();
let caps_tx = caps_tx.clone(); let caps_tx = caps_tx.clone();
let caps_request_in_flight = Rc::clone(&caps_request_in_flight); let caps_request_in_flight = Rc::clone(&caps_request_in_flight);
let diagnostics_network = Rc::clone(&diagnostics_network); let diagnostics_network = Rc::clone(&diagnostics_network);
let diagnostics_process = Rc::clone(&diagnostics_process); let diagnostics_process = Rc::clone(&diagnostics_process);
let next_upstream_sync_probe = Rc::clone(&next_upstream_sync_probe);
let next_diagnostics_probe = Rc::clone(&next_diagnostics_probe); let next_diagnostics_probe = Rc::clone(&next_diagnostics_probe);
let next_diagnostics_sample = Rc::clone(&next_diagnostics_sample); let next_diagnostics_sample = Rc::clone(&next_diagnostics_sample);
let preview_session_active = Rc::clone(&preview_session_active); let preview_session_active = Rc::clone(&preview_session_active);
@ -77,6 +80,11 @@
server_addr.clone(), server_addr.clone(),
Duration::from_millis(350), Duration::from_millis(350),
); );
request_upstream_sync_refresh(
upstream_sync_tx.clone(),
server_addr.clone(),
Duration::from_millis(450),
);
request_capture_power_refresh( request_capture_power_refresh(
power_tx.clone(), power_tx.clone(),
server_addr, server_addr,
@ -156,6 +164,11 @@
server_addr.clone(), server_addr.clone(),
Duration::from_millis(300), Duration::from_millis(300),
); );
request_upstream_sync_refresh(
upstream_sync_tx.clone(),
server_addr.clone(),
Duration::from_millis(400),
);
request_capture_power_refresh( request_capture_power_refresh(
power_tx.clone(), power_tx.clone(),
server_addr, server_addr,
@ -302,6 +315,26 @@
} }
} }
while let Ok(message) = upstream_sync_rx.try_recv() {
upstream_sync_request_in_flight.set(false);
match message {
UpstreamSyncMessage::Refresh(Ok(upstream_sync)) => {
let mut state = state.borrow_mut();
state.set_server_available(true);
state.set_upstream_sync(upstream_sync);
}
UpstreamSyncMessage::Refresh(Err(err)) => {
let relay_live = child_proc.borrow().is_some()
|| state.borrow().remote_active;
let mut state = state.borrow_mut();
if relay_live {
state.set_server_available(true);
}
state.set_upstream_sync(unavailable_upstream_sync(err));
}
}
}
while let Ok(message) = caps_rx.try_recv() { while let Ok(message) = caps_rx.try_recv() {
caps_request_in_flight.set(false); caps_request_in_flight.set(false);
match message { match message {
@ -393,6 +426,21 @@
next_calibration_probe.set(now + Duration::from_secs(2)); next_calibration_probe.set(now + Duration::from_secs(2));
} }
if now >= next_upstream_sync_probe.get()
&& !upstream_sync_request_in_flight.get()
&& (child_running || state.borrow().server_available)
{
upstream_sync_request_in_flight.set(true);
let server_addr =
selected_server_addr(&server_entry, server_addr_fallback.as_ref());
request_upstream_sync_refresh(
upstream_sync_tx.clone(),
server_addr,
Duration::ZERO,
);
next_upstream_sync_probe.set(now + Duration::from_secs(2));
}
if now >= next_diagnostics_probe.get() && !caps_request_in_flight.get() { if now >= next_diagnostics_probe.get() && !caps_request_in_flight.get() {
caps_request_in_flight.set(true); caps_request_in_flight.set(true);
let server_addr = let server_addr =

View File

@ -406,7 +406,7 @@
let widgets = widgets.clone(); let widgets = widgets.clone();
widgets.calibration_rig_button.connect_clicked(move |_| { widgets.calibration_rig_button.connect_clicked(move |_| {
widgets.status_label.set_text( widgets.status_label.set_text(
"Rig calibration wizard is queued for the 0.16.0 test-equipment phase; for now the manual Tethys sync battery remains the measured-default path.", "Rig calibration wizard is queued for lab tooling; for now the mirrored Tethys sync probe remains the measured-default path.",
); );
}); });
} }

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.16.26" version = "0.17.0"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -89,6 +89,24 @@ message CalibrationRequest {
string note = 6; string note = 6;
} }
message UpstreamSyncState {
uint64 session_id = 1;
string phase = 2;
optional uint64 latest_camera_remote_pts_us = 3;
optional uint64 latest_microphone_remote_pts_us = 4;
optional uint64 last_video_presented_pts_us = 5;
optional uint64 last_audio_presented_pts_us = 6;
optional float live_lag_ms = 7;
optional float planner_skew_ms = 8;
uint64 stale_audio_drops = 9;
uint64 stale_video_drops = 10;
uint64 skew_video_drops = 11;
uint64 freshness_reanchors = 12;
uint64 startup_timeouts = 13;
uint64 video_freezes = 14;
string last_reason = 15;
}
message HandshakeSet { message HandshakeSet {
bool camera = 1; bool camera = 1;
bool microphone = 2; bool microphone = 2;
@ -122,6 +140,7 @@ service Relay {
rpc SetCapturePower (SetCapturePowerRequest) returns (CapturePowerState); rpc SetCapturePower (SetCapturePowerRequest) returns (CapturePowerState);
rpc GetCalibration (Empty) returns (CalibrationState); rpc GetCalibration (Empty) returns (CalibrationState);
rpc Calibrate (CalibrationRequest) returns (CalibrationState); rpc Calibrate (CalibrationRequest) returns (CalibrationState);
rpc GetUpstreamSync (Empty) returns (UpstreamSyncState);
} }
service Handshake { service Handshake {

View File

@ -248,8 +248,10 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_DEV` | server hardware/device override |
| `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default |
| `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | | `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch |
| `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may lead the planned audio-master capture moment before the frame is held or dropped, defaults to `20000` | | `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` |
| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization window; the server uses this shared buffer to pair webcam frames with their matching gadget-mic audio before remote presentation, defaults to `1000` | | `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` |
| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` |
| `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` |
| `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` | | `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` |
| `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | | `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging |
| `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch | | `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch |

View File

@ -14,9 +14,10 @@ INSTALL_UVC_CODEC=${LESAVKA_INSTALL_UVC_CODEC:-mjpeg}
INSTALL_SERVER_BIND_ADDR=${LESAVKA_INSTALL_SERVER_BIND_ADDR:-0.0.0.0:50051} INSTALL_SERVER_BIND_ADDR=${LESAVKA_INSTALL_SERVER_BIND_ADDR:-0.0.0.0:50051}
LESAVKA_TLS_DIR=${LESAVKA_TLS_DIR:-/etc/lesavka/pki} LESAVKA_TLS_DIR=${LESAVKA_TLS_DIR:-/etc/lesavka/pki}
LESAVKA_CLIENT_BUNDLE=${LESAVKA_CLIENT_BUNDLE:-/etc/lesavka/lesavka-client-pki.tar.gz} LESAVKA_CLIENT_BUNDLE=${LESAVKA_CLIENT_BUNDLE:-/etc/lesavka/lesavka-client-pki.tar.gz}
DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=1260000 DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=0
LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=-45000 LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=-45000
PREVIOUS_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=720000 PREVIOUS_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=720000
PREVIOUS_TUNED_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=1260000
resolve_upstream_audio_playout_offset_us() { resolve_upstream_audio_playout_offset_us() {
if [[ -n ${LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} ]]; then if [[ -n ${LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} ]]; then
@ -24,8 +25,8 @@ resolve_upstream_audio_playout_offset_us() {
return 0 return 0
fi fi
if [[ ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" || ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$PREVIOUS_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" ]]; then if [[ ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" || ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$PREVIOUS_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" || ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$PREVIOUS_TUNED_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" ]]; then
echo "⚠️ migrating stale upstream audio playout offset to +1260ms for MJPEG/UVC." >&2 echo "⚠️ migrating stale upstream audio playout offset to the 0.17 freshness-first planner default." >&2
echo " Use LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US to intentionally keep an older value." >&2 echo " Use LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US to intentionally keep an older value." >&2
printf '%s\n' "$DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" printf '%s\n' "$DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US"
return 0 return 0
@ -974,7 +975,9 @@ fi
printf 'LESAVKA_ALSA_DEV=%s\n' "${LESAVKA_ALSA_DEV:-hw:UAC2Gadget,0}" printf 'LESAVKA_ALSA_DEV=%s\n' "${LESAVKA_ALSA_DEV:-hw:UAC2Gadget,0}"
printf 'LESAVKA_UAC_HDMI_COMPENSATION_US=%s\n' "${LESAVKA_UAC_HDMI_COMPENSATION_US:-205000}" printf 'LESAVKA_UAC_HDMI_COMPENSATION_US=%s\n' "${LESAVKA_UAC_HDMI_COMPENSATION_US:-205000}"
printf 'LESAVKA_UAC_SESSION_CLOCK_ALIGN=%s\n' "${LESAVKA_UAC_SESSION_CLOCK_ALIGN:-0}" printf 'LESAVKA_UAC_SESSION_CLOCK_ALIGN=%s\n' "${LESAVKA_UAC_SESSION_CLOCK_ALIGN:-0}"
printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}" printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-350}"
printf 'LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS=%s\n' "${LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS:-1000}"
printf 'LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS=%s\n' "${LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS:-60000}"
printf 'LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s\n' "$(resolve_upstream_audio_playout_offset_us)" printf 'LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s\n' "$(resolve_upstream_audio_playout_offset_us)"
printf 'LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US:-0}" printf 'LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US:-0}"
printf 'LESAVKA_UPSTREAM_PAIR_SLACK_US=%s\n' "${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}" printf 'LESAVKA_UPSTREAM_PAIR_SLACK_US=%s\n' "${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}"

View File

@ -185,6 +185,27 @@ print_lesavka_versions() {
done <<<"${version_output}" done <<<"${version_output}"
} }
print_upstream_sync_state() {
local label="$1"
echo "==> upstream sync planner state (${label})"
if [[ ! -x "${REPO_ROOT}/target/debug/lesavka-relayctl" ]]; then
(cd "${REPO_ROOT}" && cargo build -p lesavka_client --bin lesavka-relayctl >/dev/null)
fi
local sync_output
if ! sync_output="$(
LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \
"${REPO_ROOT}/target/debug/lesavka-relayctl" \
--server "${RESOLVED_LESAVKA_SERVER_ADDR}" \
upstream-sync 2>&1
)"; then
echo " ↪ planner query failed: ${sync_output}"
return 0
fi
while IFS= read -r line; do
[[ -n "${line}" ]] && echo "${line}"
done <<<"${sync_output}"
}
start_local_stimulus() { start_local_stimulus() {
echo "==> starting local A/V stimulus server" echo "==> starting local A/V stimulus server"
python3 "${REPO_ROOT}/scripts/manual/local_av_stimulus.py" \ python3 "${REPO_ROOT}/scripts/manual/local_av_stimulus.py" \
@ -266,9 +287,11 @@ echo "==> prebuilding real client and analyzer"
start_server_tunnel_if_needed start_server_tunnel_if_needed
print_lesavka_versions print_lesavka_versions
print_upstream_sync_state "before mirrored run"
start_local_stimulus start_local_stimulus
start_real_lesavka_client start_real_lesavka_client
run_browser_capture_with_real_driver run_browser_capture_with_real_driver
print_upstream_sync_state "after mirrored run"
echo "==> mirrored probe complete" echo "==> mirrored probe complete"
echo "artifact_dir: ${ARTIFACT_DIR}" echo "artifact_dir: ${ARTIFACT_DIR}"

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.16.26" version = "0.17.0"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -9,13 +9,14 @@ use lesavka_common::lesavka::{
use crate::upstream_media_runtime::UpstreamMediaRuntime; use crate::upstream_media_runtime::UpstreamMediaRuntime;
pub const FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 1_260_000; pub const FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 0;
pub const FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 0; pub const FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 0;
const LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = -45_000; const LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = -45_000;
const PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 720_000; const PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 720_000;
const PREVIOUS_TUNED_MJPEG_AUDIO_OFFSET_US: i64 = 1_260_000;
const PROFILE: &str = "mjpeg"; const PROFILE: &str = "mjpeg";
const FACTORY_CONFIDENCE: &str = "factory"; const FACTORY_CONFIDENCE: &str = "factory";
const OFFSET_LIMIT_US: i64 = 1_500_000; const OFFSET_LIMIT_US: i64 = 500_000;
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
struct CalibrationSnapshot { struct CalibrationSnapshot {
@ -237,10 +238,17 @@ fn parse_snapshot(raw: &str) -> CalibrationSnapshot {
fn migrate_legacy_snapshot(mut state: CalibrationSnapshot) -> CalibrationSnapshot { fn migrate_legacy_snapshot(mut state: CalibrationSnapshot) -> CalibrationSnapshot {
let source_allows_migration = matches!(state.source.as_str(), "factory" | "env"); let source_allows_migration = matches!(state.source.as_str(), "factory" | "env");
let confidence_allows_migration = matches!(state.confidence.as_str(), "factory" | "configured"); let confidence_allows_migration = matches!(state.confidence.as_str(), "factory" | "configured");
let untouched_legacy_audio = matches!( let clamped_previous_baseline = state.default_audio_offset_us == OFFSET_LIMIT_US
&& state
.detail
.contains("loaded upstream A/V calibration defaults");
let untouched_legacy_audio = (matches!(
state.default_audio_offset_us, state.default_audio_offset_us,
LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US | PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US
) && state.active_audio_offset_us == state.default_audio_offset_us; | PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US
| PREVIOUS_TUNED_MJPEG_AUDIO_OFFSET_US
) || clamped_previous_baseline)
&& state.active_audio_offset_us == state.default_audio_offset_us;
let untouched_legacy_video = state.default_video_offset_us == FACTORY_MJPEG_VIDEO_OFFSET_US let untouched_legacy_video = state.default_video_offset_us == FACTORY_MJPEG_VIDEO_OFFSET_US
&& state.active_video_offset_us == FACTORY_MJPEG_VIDEO_OFFSET_US; && state.active_video_offset_us == FACTORY_MJPEG_VIDEO_OFFSET_US;
if state.profile == PROFILE if state.profile == PROFILE
@ -321,7 +329,7 @@ mod tests {
], ],
|| { || {
let state = snapshot_from_env(); let state = snapshot_from_env();
assert_eq!(state.default_audio_offset_us, 1_260_000); assert_eq!(state.default_audio_offset_us, 0);
assert_eq!(state.active_video_offset_us, 0); assert_eq!(state.active_video_offset_us, 0);
assert_eq!(state.source, "factory"); assert_eq!(state.source, "factory");
}, },
@ -345,10 +353,10 @@ mod tests {
note: String::new(), note: String::new(),
}) })
.expect("manual adjust applies"); .expect("manual adjust applies");
assert_eq!(state.active_audio_offset_us, 1_255_000); assert_eq!(state.active_audio_offset_us, -5_000);
assert_eq!(runtime.playout_offsets(), (0, 1_255_000)); assert_eq!(runtime.playout_offsets(), (0, -5_000));
let raw = std::fs::read_to_string(file.path()).expect("persisted calibration"); let raw = std::fs::read_to_string(file.path()).expect("persisted calibration");
assert!(raw.contains("active_audio_offset_us=1255000")); assert!(raw.contains("active_audio_offset_us=-5000"));
}); });
} }
@ -371,7 +379,7 @@ mod tests {
], ],
|| { || {
let state = snapshot_from_env(); let state = snapshot_from_env();
assert_eq!(state.default_audio_offset_us, -1_500_000); assert_eq!(state.default_audio_offset_us, -500_000);
assert_eq!(state.default_video_offset_us, 12_345); assert_eq!(state.default_video_offset_us, 12_345);
assert_eq!(state.source, "env"); assert_eq!(state.source, "env");
assert_eq!(state.confidence, "configured"); assert_eq!(state.confidence, "configured");
@ -399,7 +407,7 @@ mod tests {
); );
assert_eq!(state.default_audio_offset_us, FACTORY_MJPEG_AUDIO_OFFSET_US); assert_eq!(state.default_audio_offset_us, FACTORY_MJPEG_AUDIO_OFFSET_US);
assert_eq!(state.default_video_offset_us, 2_500); assert_eq!(state.default_video_offset_us, 2_500);
assert_eq!(state.active_audio_offset_us, -1_500_000); assert_eq!(state.active_audio_offset_us, -500_000);
assert_eq!(state.active_video_offset_us, FACTORY_MJPEG_VIDEO_OFFSET_US); assert_eq!(state.active_video_offset_us, FACTORY_MJPEG_VIDEO_OFFSET_US);
assert_eq!(state.source, "saved"); assert_eq!(state.source, "saved");
assert_eq!(state.confidence, FACTORY_CONFIDENCE); assert_eq!(state.confidence, FACTORY_CONFIDENCE);
@ -429,10 +437,10 @@ mod tests {
let runtime = Arc::new(UpstreamMediaRuntime::new()); let runtime = Arc::new(UpstreamMediaRuntime::new());
let store = CalibrationStore::load(runtime.clone()); let store = CalibrationStore::load(runtime.clone());
let state = store.current(); let state = store.current();
assert_eq!(state.active_audio_offset_us, 1_260_000); assert_eq!(state.active_audio_offset_us, 0);
assert_eq!(state.default_audio_offset_us, 1_260_000); assert_eq!(state.default_audio_offset_us, 0);
assert_eq!(state.source, "factory"); assert_eq!(state.source, "factory");
assert_eq!(runtime.playout_offsets(), (0, 1_260_000)); assert_eq!(runtime.playout_offsets(), (0, 0));
assert!(state.detail.contains("migrated legacy MJPEG")); assert!(state.detail.contains("migrated legacy MJPEG"));
}); });
} }
@ -459,11 +467,11 @@ mod tests {
let runtime = Arc::new(UpstreamMediaRuntime::new()); let runtime = Arc::new(UpstreamMediaRuntime::new());
let store = CalibrationStore::load(runtime.clone()); let store = CalibrationStore::load(runtime.clone());
let state = store.current(); let state = store.current();
assert_eq!(state.active_audio_offset_us, 1_260_000); assert_eq!(state.active_audio_offset_us, 0);
assert_eq!(state.default_audio_offset_us, 1_260_000); assert_eq!(state.default_audio_offset_us, 0);
assert_eq!(state.source, "factory"); assert_eq!(state.source, "factory");
assert_eq!(runtime.playout_offsets(), (0, 1_260_000)); assert_eq!(runtime.playout_offsets(), (0, 0));
assert!(state.detail.contains("from +720.0ms to +1260.0ms")); assert!(state.detail.contains("to +0.0ms"));
}); });
} }
@ -519,7 +527,7 @@ mod tests {
.expect("blind estimate"); .expect("blind estimate");
assert_eq!(blind.source, "blind"); assert_eq!(blind.source, "blind");
assert!(blind.detail.contains("delivery skew 44.0ms")); assert!(blind.detail.contains("delivery skew 44.0ms"));
assert_eq!(runtime.playout_offsets(), (-2_000, 1_265_000)); assert_eq!(runtime.playout_offsets(), (-2_000, 5_000));
let manual = store let manual = store
.apply(CalibrationRequest { .apply(CalibrationRequest {
@ -531,7 +539,7 @@ mod tests {
note: String::new(), note: String::new(),
}) })
.expect("manual clamp"); .expect("manual clamp");
assert_eq!(manual.active_audio_offset_us, 1_500_000); assert_eq!(manual.active_audio_offset_us, 500_000);
let saved = store let saved = store
.apply(CalibrationRequest { .apply(CalibrationRequest {

View File

@ -20,7 +20,7 @@ use tracing::{debug, error, info, warn};
use lesavka_common::lesavka::{ use lesavka_common::lesavka::{
AudioPacket, CalibrationRequest, CalibrationState, CapturePowerCommand, CapturePowerState, AudioPacket, CalibrationRequest, CalibrationState, CapturePowerCommand, CapturePowerState,
Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply,
SetCapturePowerRequest, VideoPacket, SetCapturePowerRequest, UpstreamSyncState, VideoPacket,
relay_server::{Relay, RelayServer}, relay_server::{Relay, RelayServer},
}; };

View File

@ -198,12 +198,41 @@ impl Relay for Handler {
}; };
let plan = match upstream_media_rt.plan_audio_pts(pkt.pts) { let plan = match upstream_media_rt.plan_audio_pts(pkt.pts) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => {
if inbound_closed {
tracing::debug!(
rpc_id,
session_id = lease.session_id,
pts = pkt.pts,
"🎤 dropping trailing upstream audio because no paired video arrived before stream close"
);
continue;
}
pending.push_front(pkt); pending.push_front(pkt);
continue; continue;
} }
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
continue; continue;
} }
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => {
tracing::warn!(
rpc_id,
session_id = lease.session_id,
pts = pkt.pts,
reason,
"🎤 upstream audio packet dropped by authoritative freshness planner"
);
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(reason) => {
tracing::error!(
rpc_id,
session_id = lease.session_id,
reason,
"🎤 upstream audio startup failed"
);
cleanup.mark_aborted();
break;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
}; };
if plan.late_by > stale_drop_budget { if plan.late_by > stale_drop_budget {
@ -236,6 +265,7 @@ impl Relay for Handler {
tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len());
} }
sink.push(&pkt); sink.push(&pkt);
upstream_media_rt.mark_audio_presented(pkt.pts);
} }
sink.finish(); // flush on EOS sink.finish(); // flush on EOS
let _ = tx.send(Ok(Empty {})).await; let _ = tx.send(Ok(Empty {})).await;
@ -362,12 +392,37 @@ impl Relay for Handler {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
continue; continue;
} }
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => {
tracing::warn!(
rpc_id,
session_id = upstream_lease.session_id,
camera_session_id,
pts = pkt.pts,
reason,
"🎥 upstream video frame dropped by authoritative freshness planner"
);
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(reason) => {
tracing::error!(
rpc_id,
session_id = upstream_lease.session_id,
camera_session_id,
reason,
"🎥 upstream video startup failed"
);
cleanup.mark_aborted();
break;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
}; };
if !upstream_media_rt if !upstream_media_rt
.wait_for_audio_master(plan.local_pts_us, plan.due_at) .wait_for_audio_master(plan.local_pts_us, plan.due_at)
.await .await
{ {
upstream_media_rt.record_video_freeze(
"video froze because audio master did not reach the frame timestamp",
);
tracing::warn!( tracing::warn!(
rpc_id, rpc_id,
session_id = upstream_lease.session_id, session_id = upstream_lease.session_id,
@ -433,7 +488,9 @@ impl Relay for Handler {
} }
pkt.pts = plan.local_pts_us; pkt.pts = plan.local_pts_us;
startup_video_settled = true; startup_video_settled = true;
let presented_pts = pkt.pts;
relay.feed(pkt); // ← all logging inside video.rs relay.feed(pkt); // ← all logging inside video.rs
upstream_media_rt.mark_video_presented(presented_pts);
} }
tx.send(Ok(Empty {})).await.ok(); tx.send(Ok(Empty {})).await.ok();
Ok::<(), Status>(()) Ok::<(), Status>(())
@ -503,6 +560,13 @@ impl Relay for Handler {
) -> Result<Response<CalibrationState>, Status> { ) -> Result<Response<CalibrationState>, Status> {
self.calibrate_reply(req).await self.calibrate_reply(req).await
} }
async fn get_upstream_sync(
&self,
_req: Request<Empty>,
) -> Result<Response<UpstreamSyncState>, Status> {
self.get_upstream_sync_reply().await
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -154,6 +154,12 @@ impl Relay for Handler {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
continue; continue;
} }
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(_) => {
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(_) => {
break;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
}; };
if plan.late_by > stale_drop_budget { if plan.late_by > stale_drop_budget {
@ -168,6 +174,7 @@ impl Relay for Handler {
} }
pkt.pts = plan.local_pts_us; pkt.pts = plan.local_pts_us;
sink.push(&pkt); sink.push(&pkt);
upstream_media_rt.mark_audio_presented(pkt.pts);
} }
sink.finish(); sink.finish();
upstream_media_rt.close_microphone(lease.generation); upstream_media_rt.close_microphone(lease.generation);
@ -233,12 +240,19 @@ impl Relay for Handler {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
continue; continue;
} }
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(_) => {
continue;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(_) => {
break;
}
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
}; };
if !upstream_media_rt if !upstream_media_rt
.wait_for_audio_master(plan.local_pts_us, plan.due_at) .wait_for_audio_master(plan.local_pts_us, plan.due_at)
.await .await
{ {
upstream_media_rt.record_video_freeze("coverage video froze awaiting audio master");
continue; continue;
} }
if plan.late_by > stale_drop_budget { if plan.late_by > stale_drop_budget {
@ -254,7 +268,9 @@ impl Relay for Handler {
continue; continue;
} }
pkt.pts = plan.local_pts_us; pkt.pts = plan.local_pts_us;
let presented_pts = pkt.pts;
relay.feed(pkt); relay.feed(pkt);
upstream_media_rt.mark_video_presented(presented_pts);
} }
upstream_media_rt.close_camera(upstream_lease.generation); upstream_media_rt.close_camera(upstream_lease.generation);
tx.send(Ok(Empty {})).await.ok(); tx.send(Ok(Empty {})).await.ok();
@ -336,4 +352,11 @@ impl Relay for Handler {
) -> Result<Response<CalibrationState>, Status> { ) -> Result<Response<CalibrationState>, Status> {
self.calibrate_reply(req).await self.calibrate_reply(req).await
} }
async fn get_upstream_sync(
&self,
_req: Request<Empty>,
) -> Result<Response<UpstreamSyncState>, Status> {
self.get_upstream_sync_reply().await
}
} }

View File

@ -164,4 +164,25 @@ impl Handler {
.map(Response::new) .map(Response::new)
.map_err(|e| Status::internal(format!("{e:#}"))) .map_err(|e| Status::internal(format!("{e:#}")))
} }
async fn get_upstream_sync_reply(&self) -> Result<Response<UpstreamSyncState>, Status> {
let snapshot = self.upstream_media_rt.snapshot();
Ok(Response::new(UpstreamSyncState {
session_id: snapshot.session_id,
phase: snapshot.phase.to_string(),
latest_camera_remote_pts_us: snapshot.latest_camera_remote_pts_us,
latest_microphone_remote_pts_us: snapshot.latest_microphone_remote_pts_us,
last_video_presented_pts_us: snapshot.last_video_presented_pts_us,
last_audio_presented_pts_us: snapshot.last_audio_presented_pts_us,
live_lag_ms: snapshot.live_lag_ms.map(|value| value as f32),
planner_skew_ms: snapshot.planner_skew_ms.map(|value| value as f32),
stale_audio_drops: snapshot.stale_audio_drops,
stale_video_drops: snapshot.stale_video_drops,
skew_video_drops: snapshot.skew_video_drops,
freshness_reanchors: snapshot.freshness_reanchors,
startup_timeouts: snapshot.startup_timeouts,
video_freezes: snapshot.video_freezes,
last_reason: snapshot.last_reason,
}))
}
} }

View File

@ -12,13 +12,15 @@ mod state;
mod types; mod types;
use config::{ use config::{
apply_playout_offset, upstream_camera_startup_grace_us, upstream_pairing_master_slack, apply_playout_offset, upstream_camera_startup_grace_us, upstream_max_live_lag,
upstream_playout_delay, upstream_playout_offset_us, upstream_reanchor_late_threshold, upstream_pairing_master_slack, upstream_playout_delay, upstream_playout_offset_us,
upstream_reanchor_window_us, upstream_require_paired_startup, upstream_timing_trace_enabled, upstream_reanchor_late_threshold, upstream_require_paired_startup, upstream_startup_timeout,
upstream_timing_trace_enabled,
}; };
use state::UpstreamClockState; use state::{UpstreamClockState, UpstreamSyncPhase};
pub use types::{ pub use types::{
PlannedUpstreamPacket, UpstreamMediaKind, UpstreamPlanDecision, UpstreamStreamLease, PlannedUpstreamPacket, UpstreamMediaKind, UpstreamPlanDecision, UpstreamPlannerSnapshot,
UpstreamStreamLease,
}; };
/// Coordinate upstream stream ownership and keep audio/video on one timeline. /// Coordinate upstream stream ownership and keep audio/video on one timeline.
@ -93,6 +95,79 @@ impl UpstreamMediaRuntime {
let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed); let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed);
microphone_offset_us.saturating_sub(camera_offset_us).max(0) as u64 microphone_offset_us.saturating_sub(camera_offset_us).max(0) as u64
} }
/// Mark one audio chunk as actually handed to the UAC sink.
pub fn mark_audio_presented(&self, local_pts_us: u64) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
state.last_audio_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live;
state.last_reason = "audio-master playhead flowing".to_string();
}
}
/// Mark one video frame as actually handed to the UVC/HDMI sink.
pub fn mark_video_presented(&self, local_pts_us: u64) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
state.last_video_presented_pts_us = Some(local_pts_us);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Live;
state.last_reason = "video follower emitted a synced frame".to_string();
}
}
/// Record that video intentionally froze instead of showing an out-of-sync frame.
pub fn record_video_freeze(&self, reason: impl Into<String>) {
let mut state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
state.video_freezes = state.video_freezes.saturating_add(1);
if state.phase != UpstreamSyncPhase::Failed {
state.phase = UpstreamSyncPhase::Healing;
}
state.last_reason = reason.into();
}
/// Return current planner facts for diagnostics and probe artifacts.
#[must_use]
pub fn snapshot(&self) -> UpstreamPlannerSnapshot {
let state = self
.state
.lock()
.expect("upstream media state mutex poisoned");
let live_lag_ms = live_lag_us(&state).map(us_to_ms);
let planner_skew_ms = match (
state.last_audio_presented_pts_us,
state.last_video_presented_pts_us,
) {
(Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0),
_ => None,
};
UpstreamPlannerSnapshot {
session_id: state.session_id,
phase: state.phase.as_str(),
latest_camera_remote_pts_us: state.latest_camera_remote_pts_us,
latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us,
last_video_presented_pts_us: state.last_video_presented_pts_us,
last_audio_presented_pts_us: state.last_audio_presented_pts_us,
live_lag_ms,
planner_skew_ms,
stale_audio_drops: state.stale_audio_drops,
stale_video_drops: state.stale_video_drops,
skew_video_drops: state.skew_video_drops,
freshness_reanchors: state.freshness_reanchors,
startup_timeouts: state.startup_timeouts,
video_freezes: state.video_freezes,
last_reason: state.last_reason.clone(),
}
}
} }
include!("upstream_media_runtime/lease_lifecycle.rs"); include!("upstream_media_runtime/lease_lifecycle.rs");
@ -189,6 +264,7 @@ impl UpstreamMediaRuntime {
state.microphone_packet_count state.microphone_packet_count
} }
}; };
update_latest_remote_pts(&mut state, kind, remote_pts_us);
let mut first_remote_for_kind = match kind { let mut first_remote_for_kind = match kind {
UpstreamMediaKind::Camera => { UpstreamMediaKind::Camera => {
let first_slot = &mut state.first_camera_remote_pts_us; let first_slot = &mut state.first_camera_remote_pts_us;
@ -215,8 +291,20 @@ impl UpstreamMediaRuntime {
.pairing_anchor_deadline .pairing_anchor_deadline
.get_or_insert_with(|| now + upstream_playout_delay()); .get_or_insert_with(|| now + upstream_playout_delay());
let playout_delay = upstream_playout_delay(); let playout_delay = upstream_playout_delay();
let max_live_lag = upstream_max_live_lag();
if state.session_base_remote_pts_us.is_none() { if state.session_base_remote_pts_us.is_none() {
if state.session_started_at.is_some_and(|started_at| {
now.saturating_duration_since(started_at) > upstream_startup_timeout()
}) {
state.phase = UpstreamSyncPhase::Failed;
state.startup_timeouts = state.startup_timeouts.saturating_add(1);
state.last_reason =
"paired upstream startup did not converge before timeout".to_string();
return UpstreamPlanDecision::StartupFailed(
"paired upstream startup did not converge before timeout",
);
}
if state.first_camera_remote_pts_us.is_some() if state.first_camera_remote_pts_us.is_some()
&& state.first_microphone_remote_pts_us.is_some() && state.first_microphone_remote_pts_us.is_some()
&& state.camera_startup_ready && state.camera_startup_ready
@ -230,6 +318,8 @@ impl UpstreamMediaRuntime {
let overlap_epoch = now + playout_delay; let overlap_epoch = now + playout_delay;
state.playout_epoch = Some(overlap_epoch); state.playout_epoch = Some(overlap_epoch);
state.pairing_anchor_deadline = Some(overlap_epoch); state.pairing_anchor_deadline = Some(overlap_epoch);
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "fresh audio/video overlap anchor established".to_string();
if !state.startup_anchor_logged { if !state.startup_anchor_logged {
let startup_delta_us = let startup_delta_us =
first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128; first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128;
@ -246,6 +336,8 @@ impl UpstreamMediaRuntime {
} }
self.pairing_state_notify.notify_waiters(); self.pairing_state_notify.notify_waiters();
} else if now < pairing_deadline { } else if now < pairing_deadline {
state.phase = UpstreamSyncPhase::Acquiring;
state.last_reason = "awaiting both upstream media streams".to_string();
if upstream_timing_trace_enabled() if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300)) && (packet_count <= 10 || packet_count.is_multiple_of(300))
{ {
@ -260,6 +352,8 @@ impl UpstreamMediaRuntime {
} }
return UpstreamPlanDecision::AwaitingPair; return UpstreamPlanDecision::AwaitingPair;
} else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready { } else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready {
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "camera startup warm-up is still in progress".to_string();
if upstream_timing_trace_enabled() if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300)) && (packet_count <= 10 || packet_count.is_multiple_of(300))
{ {
@ -290,6 +384,8 @@ impl UpstreamMediaRuntime {
"upstream media pairing window expired; holding one-sided stream for synced startup" "upstream media pairing window expired; holding one-sided stream for synced startup"
); );
} }
state.phase = UpstreamSyncPhase::Syncing;
state.last_reason = "holding one-sided stream for synced startup".to_string();
return UpstreamPlanDecision::AwaitingPair; return UpstreamPlanDecision::AwaitingPair;
} else { } else {
let single_stream_base_remote_pts_us = match kind { let single_stream_base_remote_pts_us = match kind {
@ -331,6 +427,23 @@ impl UpstreamMediaRuntime {
return UpstreamPlanDecision::DropBeforeOverlap; return UpstreamPlanDecision::DropBeforeOverlap;
} }
let source_lag = source_lag_for_kind(&state, kind, remote_pts_us);
if source_lag > max_live_lag {
match kind {
UpstreamMediaKind::Camera => {
state.stale_video_drops = state.stale_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.last_reason = "dropped stale video beyond max live lag".to_string();
}
UpstreamMediaKind::Microphone => {
state.stale_audio_drops = state.stale_audio_drops.saturating_add(1);
state.last_reason = "dropped stale audio beyond max live lag".to_string();
}
}
state.phase = UpstreamSyncPhase::Healing;
return UpstreamPlanDecision::DropStale("packet exceeded max live lag");
}
let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us);
let last_slot = match kind { let last_slot = match kind {
UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us,
@ -342,27 +455,41 @@ impl UpstreamMediaRuntime {
local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); local_pts_us = last_pts_us.saturating_add(min_step_us.max(1));
} }
*last_slot = Some(local_pts_us); *last_slot = Some(local_pts_us);
if kind == UpstreamMediaKind::Camera
&& state.last_audio_local_pts_us.is_some_and(|audio_pts_us| {
video_is_too_far_behind_audio(local_pts_us, audio_pts_us)
})
{
state.skew_video_drops = state.skew_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.phase = UpstreamSyncPhase::Healing;
state.last_reason =
"dropped video frame that was too far behind the audio master".to_string();
return UpstreamPlanDecision::DropStale("video frame was too far behind audio master");
}
let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); let epoch = *state.playout_epoch.get_or_insert(pairing_deadline);
let sink_offset_us = self.playout_offset_us(kind); let sink_offset_us = self.playout_offset_us(kind);
let playout_delay = upstream_playout_delay(); let playout_delay = upstream_playout_delay().min(max_live_lag);
let mut due_at = let mut due_at =
apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us);
let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let mut late_by = now.checked_duration_since(due_at).unwrap_or_default();
let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay);
let reanchor_window_us = upstream_reanchor_window_us(playout_delay); let max_future_wait = max_live_lag.saturating_sub(source_lag);
if !state.catastrophic_reanchor_done let due_future_wait = due_at.saturating_duration_since(now);
&& local_pts_us <= reanchor_window_us if late_by > reanchor_threshold || due_future_wait > max_future_wait {
&& late_by > reanchor_threshold
{
let old_late_by = late_by; let old_late_by = late_by;
let desired_due_at = now + playout_delay; let old_future_wait = due_future_wait;
let desired_delay = playout_delay.min(max_future_wait);
let desired_due_at = now + desired_delay;
let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us);
let recovered_epoch = unoffset_due_at let recovered_epoch = unoffset_due_at
.checked_sub(Duration::from_micros(local_pts_us)) .checked_sub(Duration::from_micros(local_pts_us))
.unwrap_or(unoffset_due_at); .unwrap_or(unoffset_due_at);
state.playout_epoch = Some(recovered_epoch); state.playout_epoch = Some(recovered_epoch);
state.pairing_anchor_deadline = Some(desired_due_at); state.pairing_anchor_deadline = Some(desired_due_at);
state.catastrophic_reanchor_done = true; state.freshness_reanchors = state.freshness_reanchors.saturating_add(1);
state.phase = UpstreamSyncPhase::Healing;
state.last_reason = "reanchored upstream playhead to preserve freshness".to_string();
due_at = apply_playout_offset( due_at = apply_playout_offset(
recovered_epoch + Duration::from_micros(local_pts_us), recovered_epoch + Duration::from_micros(local_pts_us),
sink_offset_us, sink_offset_us,
@ -375,11 +502,33 @@ impl UpstreamMediaRuntime {
local_pts_us, local_pts_us,
remote_pts_us, remote_pts_us,
old_late_by_ms = old_late_by.as_millis(), old_late_by_ms = old_late_by.as_millis(),
old_future_wait_ms = old_future_wait.as_millis(),
recovery_buffer_ms = playout_delay.as_millis(), recovery_buffer_ms = playout_delay.as_millis(),
reanchor_threshold_ms = reanchor_threshold.as_millis(), reanchor_threshold_ms = reanchor_threshold.as_millis(),
"upstream media playout epoch reanchored after catastrophic lateness" max_live_lag_ms = max_live_lag.as_millis(),
source_lag_ms = source_lag.as_millis(),
"upstream media playhead reanchored to preserve freshness"
); );
} }
let predicted_lag_at_playout =
source_lag.saturating_add(due_at.saturating_duration_since(now));
if predicted_lag_at_playout > max_live_lag {
match kind {
UpstreamMediaKind::Camera => {
state.stale_video_drops = state.stale_video_drops.saturating_add(1);
state.video_freezes = state.video_freezes.saturating_add(1);
state.last_reason =
"dropped video that would exceed max live lag at playout".to_string();
}
UpstreamMediaKind::Microphone => {
state.stale_audio_drops = state.stale_audio_drops.saturating_add(1);
state.last_reason =
"dropped audio that would exceed max live lag at playout".to_string();
}
}
state.phase = UpstreamSyncPhase::Healing;
return UpstreamPlanDecision::DropStale("packet would exceed max live lag at playout");
}
if upstream_timing_trace_enabled() if upstream_timing_trace_enabled()
&& (packet_count <= 10 || packet_count.is_multiple_of(300)) && (packet_count <= 10 || packet_count.is_multiple_of(300))
{ {
@ -397,6 +546,7 @@ impl UpstreamMediaRuntime {
playout_delay_us, playout_delay_us,
sink_offset_us, sink_offset_us,
late_by_us, late_by_us,
source_lag_us = source_lag.as_micros(),
"upstream media rebase sample" "upstream media rebase sample"
); );
} }
@ -407,10 +557,54 @@ impl UpstreamMediaRuntime {
local_pts_us, local_pts_us,
due_at, due_at,
late_by, late_by,
source_lag,
}) })
} }
} }
fn update_latest_remote_pts(
state: &mut UpstreamClockState,
kind: UpstreamMediaKind,
remote_pts_us: u64,
) {
let slot = match kind {
UpstreamMediaKind::Camera => &mut state.latest_camera_remote_pts_us,
UpstreamMediaKind::Microphone => &mut state.latest_microphone_remote_pts_us,
};
*slot = Some((*slot).unwrap_or(remote_pts_us).max(remote_pts_us));
}
fn source_lag_for_kind(
state: &UpstreamClockState,
kind: UpstreamMediaKind,
remote_pts_us: u64,
) -> Duration {
let latest = match kind {
UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us,
UpstreamMediaKind::Microphone => state.latest_microphone_remote_pts_us,
}
.unwrap_or(remote_pts_us);
Duration::from_micros(latest.saturating_sub(remote_pts_us))
}
fn video_is_too_far_behind_audio(video_pts_us: u64, audio_pts_us: u64) -> bool {
let slack_us = upstream_pairing_master_slack()
.as_micros()
.min(u64::MAX as u128) as u64;
video_pts_us.saturating_add(slack_us) < audio_pts_us
}
fn live_lag_us(state: &UpstreamClockState) -> Option<u64> {
let latest_audio = state.latest_microphone_remote_pts_us?;
let audio_playhead = state.last_audio_presented_pts_us?;
let base = state.session_base_remote_pts_us?;
Some(latest_audio.saturating_sub(base.saturating_add(audio_playhead)))
}
fn us_to_ms(value: u64) -> f64 {
value as f64 / 1000.0
}
fn refresh_unpaired_pairing_anchor( fn refresh_unpaired_pairing_anchor(
state: &mut UpstreamClockState, state: &mut UpstreamClockState,
kind: UpstreamMediaKind, kind: UpstreamMediaKind,

View File

@ -21,10 +21,26 @@ pub(super) fn upstream_playout_delay() -> Duration {
let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS") let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS")
.ok() .ok()
.and_then(|value| value.trim().parse::<u64>().ok()) .and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(1_000); .unwrap_or(350);
Duration::from_millis(delay_ms) Duration::from_millis(delay_ms)
} }
pub(super) fn upstream_max_live_lag() -> Duration {
let lag_ms = std::env::var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(1_000);
Duration::from_millis(lag_ms.max(1))
}
pub(super) fn upstream_startup_timeout() -> Duration {
let timeout_ms = std::env::var("LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(60_000);
Duration::from_millis(timeout_ms.max(1))
}
pub(super) fn upstream_require_paired_startup() -> bool { pub(super) fn upstream_require_paired_startup() -> bool {
std::env::var("LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP") std::env::var("LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP")
.ok() .ok()
@ -45,10 +61,8 @@ pub(super) fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 {
}; };
let default_offset_us = match kind { let default_offset_us = match kind {
UpstreamMediaKind::Camera => FACTORY_MJPEG_VIDEO_OFFSET_US, UpstreamMediaKind::Camera => FACTORY_MJPEG_VIDEO_OFFSET_US,
// Hardware sync probes on the MJPEG UVC path show the UAC leg arriving // 0.17 keeps shipped offsets small: the planner owns freshness and
// about 80ms after video when using the older +35ms default. Bias the // pairing, while calibration only handles sub-frame trim.
// server playout earlier so the shipped default lands in the preferred
// lip-sync band instead of hovering at the guardrail.
UpstreamMediaKind::Microphone => FACTORY_MJPEG_AUDIO_OFFSET_US, UpstreamMediaKind::Microphone => FACTORY_MJPEG_AUDIO_OFFSET_US,
}; };
std::env::var(name) std::env::var(name)
@ -61,7 +75,7 @@ pub(super) fn upstream_pairing_master_slack() -> Duration {
let slack_us = std::env::var("LESAVKA_UPSTREAM_PAIR_SLACK_US") let slack_us = std::env::var("LESAVKA_UPSTREAM_PAIR_SLACK_US")
.ok() .ok()
.and_then(|value| value.trim().parse::<u64>().ok()) .and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(20_000); .unwrap_or(80_000);
Duration::from_micros(slack_us) Duration::from_micros(slack_us)
} }
@ -75,7 +89,7 @@ pub(super) fn upstream_reanchor_late_threshold(playout_delay: Duration) -> Durat
let default_ms = (playout_delay.as_millis().min(u64::MAX as u128) as u64) let default_ms = (playout_delay.as_millis().min(u64::MAX as u128) as u64)
.saturating_div(2) .saturating_div(2)
.max(250); .max(100);
Duration::from_millis(default_ms) Duration::from_millis(default_ms)
} }
@ -87,10 +101,6 @@ pub(super) fn upstream_camera_startup_grace_us() -> u64 {
.saturating_mul(1_000) .saturating_mul(1_000)
} }
pub(super) fn upstream_reanchor_window_us(playout_delay: Duration) -> u64 {
playout_delay.as_micros().min(u64::MAX as u128) as u64
}
pub(super) fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant { pub(super) fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant {
if offset_us >= 0 { if offset_us >= 0 {
base + Duration::from_micros(offset_us as u64) base + Duration::from_micros(offset_us as u64)

View File

@ -55,8 +55,14 @@ impl UpstreamMediaRuntime {
if starting_new_session { if starting_new_session {
state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1;
reset_timing_anchors(&mut state); reset_timing_anchors(&mut state);
state.session_started_at = Some(tokio::time::Instant::now());
state.phase = UpstreamSyncPhase::Acquiring;
state.last_reason = "new upstream session acquiring media".to_string();
} else if reset_on_replace && replacing_existing_owner { } else if reset_on_replace && replacing_existing_owner {
reset_timing_anchors(&mut state); reset_timing_anchors(&mut state);
state.session_started_at = Some(tokio::time::Instant::now());
state.phase = UpstreamSyncPhase::Acquiring;
state.last_reason = "upstream stream replacement reset media anchors".to_string();
} }
match kind { match kind {
UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation),
@ -138,16 +144,28 @@ impl UpstreamMediaRuntime {
} }
fn reset_timing_anchors(state: &mut UpstreamClockState) { fn reset_timing_anchors(state: &mut UpstreamClockState) {
state.session_started_at = None;
state.first_camera_remote_pts_us = None; state.first_camera_remote_pts_us = None;
state.first_microphone_remote_pts_us = None; state.first_microphone_remote_pts_us = None;
state.latest_camera_remote_pts_us = None;
state.latest_microphone_remote_pts_us = None;
state.camera_startup_ready = false; state.camera_startup_ready = false;
state.session_base_remote_pts_us = None; state.session_base_remote_pts_us = None;
state.last_video_local_pts_us = None; state.last_video_local_pts_us = None;
state.last_audio_local_pts_us = None; state.last_audio_local_pts_us = None;
state.last_video_presented_pts_us = None;
state.last_audio_presented_pts_us = None;
state.camera_packet_count = 0; state.camera_packet_count = 0;
state.microphone_packet_count = 0; state.microphone_packet_count = 0;
state.startup_anchor_logged = false; state.startup_anchor_logged = false;
state.playout_epoch = None; state.playout_epoch = None;
state.pairing_anchor_deadline = None; state.pairing_anchor_deadline = None;
state.catastrophic_reanchor_done = false; state.freshness_reanchors = 0;
state.stale_audio_drops = 0;
state.stale_video_drops = 0;
state.skew_video_drops = 0;
state.startup_timeouts = 0;
state.video_freezes = 0;
state.phase = UpstreamSyncPhase::Acquiring;
state.last_reason = "timing anchors reset".to_string();
} }

View File

@ -1,20 +1,59 @@
use tokio::time::Instant; use tokio::time::Instant;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum UpstreamSyncPhase {
Acquiring,
Syncing,
Live,
Healing,
Failed,
}
impl UpstreamSyncPhase {
pub fn as_str(self) -> &'static str {
match self {
Self::Acquiring => "acquiring",
Self::Syncing => "syncing",
Self::Live => "live",
Self::Healing => "healing",
Self::Failed => "failed",
}
}
}
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub(super) struct UpstreamClockState { pub(super) struct UpstreamClockState {
pub session_id: u64, pub session_id: u64,
pub active_camera_generation: Option<u64>, pub active_camera_generation: Option<u64>,
pub active_microphone_generation: Option<u64>, pub active_microphone_generation: Option<u64>,
pub session_started_at: Option<Instant>,
pub phase: UpstreamSyncPhase,
pub first_camera_remote_pts_us: Option<u64>, pub first_camera_remote_pts_us: Option<u64>,
pub first_microphone_remote_pts_us: Option<u64>, pub first_microphone_remote_pts_us: Option<u64>,
pub latest_camera_remote_pts_us: Option<u64>,
pub latest_microphone_remote_pts_us: Option<u64>,
pub camera_startup_ready: bool, pub camera_startup_ready: bool,
pub session_base_remote_pts_us: Option<u64>, pub session_base_remote_pts_us: Option<u64>,
pub last_video_local_pts_us: Option<u64>, pub last_video_local_pts_us: Option<u64>,
pub last_audio_local_pts_us: Option<u64>, pub last_audio_local_pts_us: Option<u64>,
pub last_video_presented_pts_us: Option<u64>,
pub last_audio_presented_pts_us: Option<u64>,
pub camera_packet_count: u64, pub camera_packet_count: u64,
pub microphone_packet_count: u64, pub microphone_packet_count: u64,
pub startup_anchor_logged: bool, pub startup_anchor_logged: bool,
pub playout_epoch: Option<Instant>, pub playout_epoch: Option<Instant>,
pub pairing_anchor_deadline: Option<Instant>, pub pairing_anchor_deadline: Option<Instant>,
pub catastrophic_reanchor_done: bool, pub freshness_reanchors: u64,
pub stale_audio_drops: u64,
pub stale_video_drops: u64,
pub skew_video_drops: u64,
pub startup_timeouts: u64,
pub video_freezes: u64,
pub last_reason: String,
}
impl Default for UpstreamSyncPhase {
fn default() -> Self {
Self::Acquiring
}
} }

View File

@ -4,9 +4,9 @@ use std::time::Duration;
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn upstream_playout_delay_defaults_to_one_second_and_accepts_overrides() { fn upstream_playout_delay_defaults_to_freshness_budget_and_accepts_overrides() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", || { temp_env::with_var_unset("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", || {
assert_eq!(super::upstream_playout_delay(), Duration::from_secs(1)); assert_eq!(super::upstream_playout_delay(), Duration::from_millis(350));
}); });
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("250"), || { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("250"), || {
@ -14,6 +14,33 @@ fn upstream_playout_delay_defaults_to_one_second_and_accepts_overrides() {
}); });
} }
#[test]
#[serial(upstream_media_runtime)]
fn upstream_max_live_lag_defaults_to_one_second_and_accepts_overrides() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", || {
assert_eq!(super::upstream_max_live_lag(), Duration::from_secs(1));
});
temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("750"), || {
assert_eq!(super::upstream_max_live_lag(), Duration::from_millis(750));
});
}
#[test]
#[serial(upstream_media_runtime)]
fn upstream_startup_timeout_defaults_to_one_minute_and_accepts_overrides() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS", || {
assert_eq!(super::upstream_startup_timeout(), Duration::from_secs(60));
});
temp_env::with_var("LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS", Some("2500"), || {
assert_eq!(
super::upstream_startup_timeout(),
Duration::from_millis(2500)
);
});
}
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn upstream_requires_paired_startup_by_default_with_compatibility_override() { fn upstream_requires_paired_startup_by_default_with_compatibility_override() {
@ -43,7 +70,7 @@ fn upstream_playout_offsets_default_to_mjpeg_calibration_and_accept_overrides()
temp_env::with_var_unset("LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", || { temp_env::with_var_unset("LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", || {
assert_eq!( assert_eq!(
super::upstream_playout_offset_us(UpstreamMediaKind::Microphone), super::upstream_playout_offset_us(UpstreamMediaKind::Microphone),
1_260_000 0
); );
assert_eq!( assert_eq!(
super::upstream_playout_offset_us(UpstreamMediaKind::Camera), super::upstream_playout_offset_us(UpstreamMediaKind::Camera),
@ -76,11 +103,11 @@ fn upstream_playout_offsets_default_to_mjpeg_calibration_and_accept_overrides()
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn upstream_pairing_master_slack_defaults_to_twenty_ms_and_accepts_overrides() { fn upstream_pairing_master_slack_defaults_to_eighty_ms_and_accepts_overrides() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_PAIR_SLACK_US", || { temp_env::with_var_unset("LESAVKA_UPSTREAM_PAIR_SLACK_US", || {
assert_eq!( assert_eq!(
super::upstream_pairing_master_slack(), super::upstream_pairing_master_slack(),
Duration::from_micros(20_000) Duration::from_micros(80_000)
); );
}); });
@ -102,7 +129,7 @@ fn upstream_reanchor_late_threshold_defaults_to_half_the_buffer_and_accepts_over
); );
assert_eq!( assert_eq!(
super::upstream_reanchor_late_threshold(Duration::from_millis(100)), super::upstream_reanchor_late_threshold(Duration::from_millis(100)),
Duration::from_millis(250) Duration::from_millis(100)
); );
}); });

View File

@ -208,7 +208,7 @@ fn overlap_anchor_gets_a_fresh_playout_budget_when_pairing_finishes_late() {
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn catastrophic_lateness_reanchors_only_once_per_session() { fn catastrophic_lateness_reanchors_repeatedly_to_preserve_freshness() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || {
temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || {
let runtime = UpstreamMediaRuntime::new(); let runtime = UpstreamMediaRuntime::new();
@ -226,12 +226,18 @@ fn catastrophic_lateness_reanchors_only_once_per_session() {
std::thread::sleep(Duration::from_millis(30)); std::thread::sleep(Duration::from_millis(30));
let first_recovered = play(runtime.plan_audio_pts(1_000_000)); let first_recovered = play(runtime.plan_audio_pts(1_000_000));
assert!(first_recovered.due_at > tokio::time::Instant::now()); assert!(first_recovered.due_at > tokio::time::Instant::now());
assert!(first_recovered.late_by <= Duration::from_millis(1));
std::thread::sleep(Duration::from_millis(30)); std::thread::sleep(Duration::from_millis(30));
let second_late = play(runtime.plan_audio_pts(1_000_001)); let second_recovered = play(runtime.plan_audio_pts(1_000_001));
assert!(second_recovered.due_at > tokio::time::Instant::now());
assert!( assert!(
second_late.late_by > Duration::from_millis(5), second_recovered.late_by <= Duration::from_millis(1),
"session should not keep extending itself with repeated reanchors" "0.17 planner must keep healing instead of preserving stale timing"
);
assert!(
runtime.snapshot().freshness_reanchors >= 2,
"repeated freshness reanchors should be counted for diagnostics"
); );
}); });
}); });
@ -239,7 +245,7 @@ fn catastrophic_lateness_reanchors_only_once_per_session() {
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn catastrophic_lateness_does_not_reanchor_once_the_session_is_well_past_startup() { fn catastrophic_lateness_reanchors_even_after_startup_window() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || {
temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || {
let runtime = UpstreamMediaRuntime::new(); let runtime = UpstreamMediaRuntime::new();
@ -258,17 +264,152 @@ fn catastrophic_lateness_does_not_reanchor_once_the_session_is_well_past_startup
let late_audio = play(runtime.plan_audio_pts(1_100_000)); let late_audio = play(runtime.plan_audio_pts(1_100_000));
assert_eq!(late_audio.local_pts_us, 100_000); assert_eq!(late_audio.local_pts_us, 100_000);
assert!( assert!(
late_audio.late_by > Duration::from_millis(5), late_audio.late_by <= Duration::from_millis(1),
"late packet should remain late instead of reanchoring the shared epoch mid-session" "0.17 planner should heal mid-session lateness instead of preserving drift"
); );
assert!( assert!(
late_audio.due_at <= tokio::time::Instant::now(), late_audio.due_at > tokio::time::Instant::now(),
"mid-session lateness should no longer push due_at back into the future" "mid-session freshness healing should push due_at back into the live budget"
); );
assert!(runtime.snapshot().freshness_reanchors >= 1);
}); });
}); });
} }
#[test]
#[serial(upstream_media_runtime)]
fn stale_audio_behind_the_freshest_audio_frontier_is_dropped() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("50"), || {
let runtime = runtime_without_offsets();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let _audio = play(runtime.plan_audio_pts(1_000_000));
let _video = play(runtime.plan_video_pts(1_000_000, 16_666));
let _fresh_audio = play(runtime.plan_audio_pts(2_000_000));
assert!(matches!(
runtime.plan_audio_pts(1_900_000),
super::UpstreamPlanDecision::DropStale("packet exceeded max live lag")
));
assert_eq!(runtime.snapshot().stale_audio_drops, 1);
});
});
}
#[test]
#[serial(upstream_media_runtime)]
fn stale_video_behind_the_freshest_video_frontier_is_dropped() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("50"), || {
let runtime = runtime_without_offsets();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let _audio = play(runtime.plan_audio_pts(1_000_000));
let _video = play(runtime.plan_video_pts(1_000_000, 16_666));
let _fresh_video = play(runtime.plan_video_pts(2_000_000, 16_666));
assert!(matches!(
runtime.plan_video_pts(1_900_000, 16_666),
super::UpstreamPlanDecision::DropStale("packet exceeded max live lag")
));
let snapshot = runtime.snapshot();
assert_eq!(snapshot.stale_video_drops, 1);
assert_eq!(snapshot.video_freezes, 1);
});
});
}
#[test]
#[serial(upstream_media_runtime)]
fn video_too_far_behind_audio_master_is_dropped_and_counted_as_freeze() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
temp_env::with_var("LESAVKA_UPSTREAM_PAIR_SLACK_US", Some("50000"), || {
let runtime = runtime_without_offsets();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let _audio = play(runtime.plan_audio_pts(1_000_000));
let _video = play(runtime.plan_video_pts(1_000_000, 16_666));
let _audio_master = play(runtime.plan_audio_pts(1_200_000));
assert!(matches!(
runtime.plan_video_pts(1_100_000, 16_666),
super::UpstreamPlanDecision::DropStale(
"video frame was too far behind audio master"
)
));
let snapshot = runtime.snapshot();
assert_eq!(snapshot.skew_video_drops, 1);
assert_eq!(snapshot.video_freezes, 1);
});
});
}
#[test]
#[serial(upstream_media_runtime)]
fn paired_startup_times_out_instead_of_waiting_forever() {
temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || {
temp_env::with_var("LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS", Some("1"), || {
let runtime = UpstreamMediaRuntime::new();
runtime.set_playout_offsets(0, 0);
let _camera = runtime.activate_camera();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
std::thread::sleep(Duration::from_millis(3));
assert!(matches!(
runtime.plan_video_pts(1_016_666, 16_666),
super::UpstreamPlanDecision::StartupFailed(
"paired upstream startup did not converge before timeout"
)
));
let snapshot = runtime.snapshot();
assert_eq!(snapshot.phase, "failed");
assert_eq!(snapshot.startup_timeouts, 1);
});
});
}
#[test]
#[serial(upstream_media_runtime)]
fn planner_snapshot_tracks_presented_playheads_and_skew() {
let runtime = runtime_without_offsets();
let _camera = runtime.activate_camera();
let _microphone = runtime.activate_microphone();
assert!(matches!(
runtime.plan_video_pts(1_000_000, 16_666),
super::UpstreamPlanDecision::AwaitingPair
));
let audio = play(runtime.plan_audio_pts(1_000_000));
let video = play(runtime.plan_video_pts(1_000_000, 16_666));
runtime.mark_audio_presented(audio.local_pts_us);
runtime.mark_video_presented(video.local_pts_us);
let snapshot = runtime.snapshot();
assert_eq!(snapshot.phase, "live");
assert_eq!(snapshot.last_audio_presented_pts_us, Some(0));
assert_eq!(snapshot.last_video_presented_pts_us, Some(0));
assert_eq!(snapshot.planner_skew_ms, Some(0.0));
}
#[test] #[test]
#[serial(upstream_media_runtime)] #[serial(upstream_media_runtime)]
fn default_runtime_covers_video_map_play_path() { fn default_runtime_covers_video_map_play_path() {

View File

@ -28,6 +28,9 @@ pub struct PlannedUpstreamPacket {
pub due_at: Instant, pub due_at: Instant,
/// How late the packet already is when planned, if any. /// How late the packet already is when planned, if any.
pub late_by: Duration, pub late_by: Duration,
/// How far this packet's capture time trails the freshest relevant input
/// frontier known to the planner.
pub source_lag: Duration,
} }
/// Result of asking the shared upstream runtime how to handle one packet. /// Result of asking the shared upstream runtime how to handle one packet.
@ -39,6 +42,31 @@ pub enum UpstreamPlanDecision {
/// Discard the packet because it belongs before the shared overlapping A/V /// Discard the packet because it belongs before the shared overlapping A/V
/// session base and would only reintroduce startup skew. /// session base and would only reintroduce startup skew.
DropBeforeOverlap, DropBeforeOverlap,
/// Discard a packet that would violate the live freshness or A/V skew
/// contract.
DropStale(&'static str),
/// Stop the stream because paired startup never converged.
StartupFailed(&'static str),
/// Present the packet at the planned wall-clock deadline. /// Present the packet at the planned wall-clock deadline.
Play(PlannedUpstreamPacket), Play(PlannedUpstreamPacket),
} }
/// Snapshot of the authoritative upstream sync planner.
#[derive(Clone, Debug)]
pub struct UpstreamPlannerSnapshot {
pub session_id: u64,
pub phase: &'static str,
pub latest_camera_remote_pts_us: Option<u64>,
pub latest_microphone_remote_pts_us: Option<u64>,
pub last_video_presented_pts_us: Option<u64>,
pub last_audio_presented_pts_us: Option<u64>,
pub live_lag_ms: Option<f64>,
pub planner_skew_ms: Option<f64>,
pub stale_audio_drops: u64,
pub stale_video_drops: u64,
pub skew_video_drops: u64,
pub freshness_reanchors: u64,
pub startup_timeouts: u64,
pub video_freezes: u64,
pub last_reason: String,
}

View File

@ -333,9 +333,12 @@ fn relay_controls_keep_connect_inline_with_server_entry() {
assert!(UI_LAYOUT_SRC.contains("tools_buttons.set_homogeneous(true);")); assert!(UI_LAYOUT_SRC.contains("tools_buttons.set_homogeneous(true);"));
assert!(UI_LAYOUT_SRC.contains("tools_heading.set_width_chars(RELAY_SUBGROUP_LABEL_WIDTH);")); assert!(UI_LAYOUT_SRC.contains("tools_heading.set_width_chars(RELAY_SUBGROUP_LABEL_WIDTH);"));
assert!(UI_LAYOUT_SRC.contains("let clipboard_button = rail_button(\"Clipboard\"")); assert!(UI_LAYOUT_SRC.contains("let clipboard_button = rail_button(\"Clipboard\""));
assert!(UI_LAYOUT_SRC.contains("let usb_recover_button = rail_button(\"USB\"")); assert!(UI_LAYOUT_SRC.contains("let usb_recover_button = rail_button("));
assert!(UI_LAYOUT_SRC.contains("let uac_recover_button = rail_button(\"UAC\"")); assert!(UI_LAYOUT_SRC.contains("\"USB\","));
assert!(UI_LAYOUT_SRC.contains("let uvc_recover_button = rail_button(\"UVC\"")); assert!(UI_LAYOUT_SRC.contains("let uac_recover_button = rail_button("));
assert!(UI_LAYOUT_SRC.contains("\"UAC\","));
assert!(UI_LAYOUT_SRC.contains("let uvc_recover_button = rail_button("));
assert!(UI_LAYOUT_SRC.contains("\"UVC\","));
assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&usb_recover_button);")); assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&usb_recover_button);"));
assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&uac_recover_button);")); assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&uac_recover_button);"));
assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&uvc_recover_button);")); assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&uvc_recover_button);"));

View File

@ -19,6 +19,8 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
"LESAVKA_HDMI_SINK=%s", "LESAVKA_HDMI_SINK=%s",
"LESAVKA_HDMI_FBDEV=%s", "LESAVKA_HDMI_FBDEV=%s",
"LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s", "LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s",
"LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS=%s",
"LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS=%s",
"LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s", "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s",
"LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s", "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s",
"LESAVKA_UPSTREAM_PAIR_SLACK_US=%s", "LESAVKA_UPSTREAM_PAIR_SLACK_US=%s",
@ -49,16 +51,21 @@ fn server_install_pins_hdmi_camera_and_display_defaults() {
assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_WIDTH:-1920}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_WIDTH:-1920}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_HEIGHT:-1080}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_HEIGHT:-1080}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_SINK:-fbdevsink}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_SINK:-fbdevsink}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-350}"));
assert!(SERVER_INSTALL.contains("DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=1260000")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS:-1000}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS:-60000}"));
assert!(SERVER_INSTALL.contains("DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=0"));
assert!(SERVER_INSTALL.contains("LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=-45000")); assert!(SERVER_INSTALL.contains("LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=-45000"));
assert!(
SERVER_INSTALL.contains("PREVIOUS_TUNED_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=1260000")
);
assert!( assert!(
SERVER_INSTALL.contains("LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US"), SERVER_INSTALL.contains("LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US"),
"install-specific offset override should bypass stale ambient runtime env" "install-specific offset override should bypass stale ambient runtime env"
); );
assert!( assert!(
SERVER_INSTALL.contains("migrating stale upstream audio playout offset to +1260ms"), SERVER_INSTALL.contains("migrating stale upstream audio playout offset to the 0.17 freshness-first planner default"),
"installer should not preserve the old MJPEG/UVC sync baseline accidentally" "installer should not preserve old MJPEG/UVC sync baselines accidentally"
); );
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}"));
assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}"));

View File

@ -467,7 +467,7 @@ mod server_main_rpc {
.expect("initial calibration") .expect("initial calibration")
.into_inner(); .into_inner();
assert_eq!(initial.profile, "mjpeg"); assert_eq!(initial.profile, "mjpeg");
assert_eq!(initial.active_audio_offset_us, 1_260_000); assert_eq!(initial.active_audio_offset_us, 0);
let adjusted = rt let adjusted = rt
.block_on(async { .block_on(async {
@ -486,7 +486,7 @@ mod server_main_rpc {
.expect("calibrate") .expect("calibrate")
.into_inner(); .into_inner();
assert_eq!(adjusted.source, "blind"); assert_eq!(adjusted.source, "blind");
assert_eq!(adjusted.active_audio_offset_us, -35_000); assert_eq!(adjusted.active_audio_offset_us, 10_000);
assert_eq!(adjusted.active_video_offset_us, 2_000); assert_eq!(adjusted.active_video_offset_us, 2_000);
assert!( assert!(
std::fs::read_to_string(calibration_path) std::fs::read_to_string(calibration_path)
@ -496,4 +496,27 @@ mod server_main_rpc {
}, },
); );
} }
#[test]
#[cfg(coverage)]
#[serial]
fn upstream_sync_rpc_surfaces_planner_snapshot() {
let (_dir, handler) = build_handler_for_tests();
let rt = tokio::runtime::Runtime::new().expect("runtime");
let lease_camera = handler.upstream_media_rt.activate_camera();
let lease_microphone = handler.upstream_media_rt.activate_microphone();
assert_eq!(lease_camera.session_id, lease_microphone.session_id);
let initial = rt
.block_on(async {
handler
.get_upstream_sync(tonic::Request::new(Empty {}))
.await
})
.expect("planner sync state")
.into_inner();
assert_eq!(initial.phase, "acquiring");
assert_eq!(initial.session_id, lease_camera.session_id);
}
} }