diff --git a/AGENTS.md b/AGENTS.md index 9c83a14..de18338 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -179,3 +179,44 @@ Context: the mirrored browser probe finally reproduced the real failure class on - 0.16.25 removed the client mic backlog but exposed a stable hardware/browser path delta: p95 `557.3 ms`, median `-540.5 ms`, drift `+9.0 ms`, and fresh mic delivery ages around `2-10 ms`. Patch 0.16.26 raises the MJPEG/UVC factory audio delay to `+1260 ms` and expands the calibration clamp so this stable offset can actually be corrected instead of rejected. - [ ] Re-run the mirrored browser probe after the pre-start false-positive fix. - [ ] Run Google Meet manual validation. + +## 0.17.0 Tyrannical Upstream Playout Checklist + +Context: 0.16.x proved that queue tweaks and static calibration cannot guarantee lip sync. 0.17.0 changes the upstream contract: the server planner is authoritative, audio is the master, video follows by timestamp or freezes, and freshness wins over smooth-but-wrong playback. + +### Hard Product Invariants +- [x] No normal live upstream playout may be more than 1000 ms behind the freshest known client capture frontier. +- [x] Video may not advance outside the audio playhead sync budget. +- [x] Audio should be continuous when possible, but stale audio must be dropped/skipped rather than drained. +- [x] Missing or late video should freeze/stutter instead of pulling audio backward. +- [x] Startup may be ugly, but must either enter fresh synced live mode or declare failure within 60 seconds. +- [x] Healing may be visible, but must prevent persistent seconds-scale skew. +- [x] Calibration may fine-tune sub-frame offsets only; it must not be required to rescue seconds-scale desync. + +- [x] Bump Lesavka to 0.17.0 because this is a media-contract change, not a patch tune. +- [x] Add planner policy config: max live lag, max skew, startup timeout, target playout delay, and healing cooldown. +- [x] Reset 0.17 defaults so shipped audio/video offsets do not intentionally exceed the freshness budget. +- [x] Track latest camera/audio input timestamps in the server planner. +- [x] Track actual planned/emitted audio and video playheads. +- [x] Enforce audio as master: stale audio is dropped/skipped; it does not drain backlog. +- [x] Enforce video follower behavior: frames ahead of audio wait; frames too far behind audio are dropped so the previous frame freezes. +- [x] Re-anchor continuously when the live playhead falls outside the freshness budget, not only once at startup. +- [x] Keep startup paired-only by default and fail visibly after the startup timeout. +- [x] Add planner phases and counters to diagnostics/logs: acquiring, syncing, live, healing, failed; stale drops, skew drops, freshness heals, video freezes. +- [x] Keep UVC/UAC sinks as dumb consumers of planner-approved packets. +- [x] Update tests to prove stale media cannot be emitted and video cannot outrun/lag audio beyond policy. +- [x] Update manual/probe diagnostics so 0.17 reports the planner state being tested. + +### Validation Targets Before Human Test +- [x] Unit tests for startup pairing, stale audio drop, stale video drop, reanchor, startup timeout, audio-master/video-follower rules. +- [x] Contract tests for installer defaults and version reporting. +- [x] `cargo check -p lesavka_client -p lesavka_server --bins`. +- [x] Focused `lesavka_testing` media/runtime contracts. +- [ ] Only after all of the above, run the mirrored browser probe. + +### Progress Log +- 2026-05-01: Added 0.17 planner defaults (`350ms` target playout, `1000ms` max live lag, `60000ms` startup timeout, `80000us` pair slack), reset MJPEG audio factory offset to `0`, and migrated old `-45ms`, `+720ms`, and `+1260ms` untouched baselines. +- 2026-05-01: Server planner now tracks latest input frontier, presented audio/video playheads, phase, stale drops, skew drops, reanchors, startup timeouts, and freezes. +- 2026-05-01: Runtime tests green for stale audio drop, stale video drop, audio-master/video-follower freeze, repeated reanchor, paired startup timeout, and planner snapshot basics: `cargo test -p lesavka_server upstream_media_runtime::tests -- --nocapture`. +- 2026-05-01: Added `GetUpstreamSync` RPC, `lesavka-relayctl upstream-sync`, launcher diagnostics text, and mirrored-probe before/after planner snapshots so 0.17 probe runs report the exact planner state under test. +- 2026-05-01: Validation green: `cargo test -p lesavka_server --lib --bins`, `cargo test -p lesavka_testing`, `cargo test -p lesavka_client --bins --lib`, and targeted installer/RPC/layout contracts. diff --git a/Cargo.lock b/Cargo.lock index 744b9b1..4d946b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.16.26" +version = "0.17.0" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.16.26" +version = "0.17.0" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.16.26" +version = "0.17.0" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 55e48ea..3a9f145 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.16.26" +version = "0.17.0" edition = "2024" [dependencies] diff --git a/client/src/bin/lesavka-relayctl.rs b/client/src/bin/lesavka-relayctl.rs index f4fbcd9..bd0978f 100644 --- a/client/src/bin/lesavka-relayctl.rs +++ b/client/src/bin/lesavka-relayctl.rs @@ -21,6 +21,7 @@ enum CommandKind { RecoverUac, RecoverUvc, ResetUsb, + UpstreamSync, } impl CommandKind { @@ -36,6 +37,7 @@ impl CommandKind { "recover-uac" => Some(Self::RecoverUac), "recover-uvc" => Some(Self::RecoverUvc), "reset-usb" | "hard-reset-usb" => Some(Self::ResetUsb), + "upstream-sync" | "sync" => Some(Self::UpstreamSync), _ => None, } } @@ -54,7 +56,7 @@ enum ParseOutcome { } fn usage() -> &'static str { - "Usage: lesavka-relayctl [--server http://HOST:50051] " + "Usage: lesavka-relayctl [--server http://HOST:50051] " } fn parse_args_outcome_from(args: I) -> Result @@ -117,7 +119,8 @@ fn capture_power_request(command: CommandKind) -> Option | CommandKind::RecoverUsb | CommandKind::RecoverUac | CommandKind::RecoverUvc - | CommandKind::ResetUsb => return None, + | CommandKind::ResetUsb + | CommandKind::UpstreamSync => return None, CommandKind::Auto => (false, CapturePowerCommand::Auto), CommandKind::On => (true, CapturePowerCommand::ForceOn), CommandKind::Off => (false, CapturePowerCommand::ForceOff), @@ -186,6 +189,32 @@ fn print_versions(server_addr: &str, caps: &HandshakeSet) { println!("server_camera_codec={}", caps.camera_codec); } +fn print_upstream_sync(state: lesavka_common::lesavka::UpstreamSyncState) { + println!("planner_session_id={}", state.session_id); + println!("planner_phase={}", state.phase); + println!( + "planner_live_lag_ms={}", + state + .live_lag_ms + .map(|value| format!("{value:.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!( + "planner_skew_ms={}", + state + .planner_skew_ms + .map(|value| format!("{value:+.1}")) + .unwrap_or_else(|| "pending".to_string()) + ); + println!("planner_stale_audio_drops={}", state.stale_audio_drops); + println!("planner_stale_video_drops={}", state.stale_video_drops); + println!("planner_skew_video_drops={}", state.skew_video_drops); + println!("planner_freshness_reanchors={}", state.freshness_reanchors); + println!("planner_startup_timeouts={}", state.startup_timeouts); + println!("planner_video_freezes={}", state.video_freezes); + println!("planner_detail={}", state.last_reason); +} + #[cfg(not(coverage))] #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { @@ -215,7 +244,8 @@ async fn main() -> Result<()> { | CommandKind::RecoverUsb | CommandKind::RecoverUac | CommandKind::RecoverUvc - | CommandKind::ResetUsb => unreachable!(), + | CommandKind::ResetUsb + | CommandKind::UpstreamSync => unreachable!(), }; let reply = client .set_capture_power(Request::new(request)) @@ -268,6 +298,15 @@ async fn main() -> Result<()> { println!("ok={}", reply.ok); return Ok(()); } + CommandKind::UpstreamSync => { + let reply = client + .get_upstream_sync(Request::new(Empty {})) + .await + .context("querying upstream sync planner state")? + .into_inner(); + print_upstream_sync(reply); + return Ok(()); + } CommandKind::Version | CommandKind::Auto | CommandKind::On | CommandKind::Off => { unreachable!() } @@ -297,6 +336,11 @@ mod tests { assert_eq!(CommandKind::parse("get"), Some(CommandKind::Status)); assert_eq!(CommandKind::parse("version"), Some(CommandKind::Version)); assert_eq!(CommandKind::parse("versions"), Some(CommandKind::Version)); + assert_eq!( + CommandKind::parse("upstream-sync"), + Some(CommandKind::UpstreamSync) + ); + assert_eq!(CommandKind::parse("sync"), Some(CommandKind::UpstreamSync)); assert_eq!(CommandKind::parse("force-on"), Some(CommandKind::On)); assert_eq!(CommandKind::parse("force-off"), Some(CommandKind::Off)); assert_eq!( @@ -328,9 +372,9 @@ mod tests { #[test] fn parse_args_accepts_server_and_command() { let config = - parse_args_from(["--server", " http://lab:50051 ", "reset-usb"]).expect("config"); + parse_args_from(["--server", " http://lab:50051 ", "upstream-sync"]).expect("config"); assert_eq!(config.server, "http://lab:50051"); - assert_eq!(config.command, CommandKind::ResetUsb); + assert_eq!(config.command, CommandKind::UpstreamSync); } #[test] @@ -391,6 +435,7 @@ mod tests { assert!(capture_power_request(CommandKind::RecoverUac).is_none()); assert!(capture_power_request(CommandKind::RecoverUvc).is_none()); assert!(capture_power_request(CommandKind::ResetUsb).is_none()); + assert!(capture_power_request(CommandKind::UpstreamSync).is_none()); } #[test] diff --git a/client/src/launcher/calibration.rs b/client/src/launcher/calibration.rs index 5928885..7862ccb 100644 --- a/client/src/launcher/calibration.rs +++ b/client/src/launcher/calibration.rs @@ -4,7 +4,7 @@ use lesavka_common::lesavka::{ }; use tonic::{Request, transport::Channel}; -use super::state::CalibrationStatus; +use super::state::{CalibrationStatus, UpstreamSyncStatus}; use crate::relay_transport; pub fn fetch_calibration(server_addr: &str) -> Result { @@ -19,6 +19,18 @@ pub fn fetch_calibration(server_addr: &str) -> Result { }) } +pub fn fetch_upstream_sync(server_addr: &str) -> Result { + with_runtime(async move { + let mut client = connect(server_addr).await?; + let reply = client + .get_upstream_sync(Request::new(Empty {})) + .await + .context("querying upstream A/V planner state")? + .into_inner(); + Ok(UpstreamSyncStatus::from_proto(reply)) + }) +} + pub fn restore_default_calibration(server_addr: &str) -> Result { send_calibration_request( server_addr, diff --git a/client/src/launcher/diagnostics/diagnostics_models.rs b/client/src/launcher/diagnostics/diagnostics_models.rs index b424e88..d75ebf0 100644 --- a/client/src/launcher/diagnostics/diagnostics_models.rs +++ b/client/src/launcher/diagnostics/diagnostics_models.rs @@ -163,6 +163,18 @@ pub struct SnapshotReport { pub av_delivery_skew_ms: f32, pub av_enqueue_skew_ms: f32, pub av_sync_health: String, + pub planner_available: bool, + pub planner_phase: String, + pub planner_session_id: u64, + pub planner_live_lag_ms: Option, + pub planner_skew_ms: Option, + pub planner_stale_audio_drops: u64, + pub planner_stale_video_drops: u64, + pub planner_skew_video_drops: u64, + pub planner_freshness_reanchors: u64, + pub planner_startup_timeouts: u64, + pub planner_video_freezes: u64, + pub planner_detail: String, pub calibration_available: bool, pub calibration_profile: String, pub calibration_source: String, diff --git a/client/src/launcher/diagnostics/snapshot_report.rs b/client/src/launcher/diagnostics/snapshot_report.rs index 60a732f..42c1b53 100644 --- a/client/src/launcher/diagnostics/snapshot_report.rs +++ b/client/src/launcher/diagnostics/snapshot_report.rs @@ -238,6 +238,18 @@ impl SnapshotReport { av_delivery_skew_ms, av_enqueue_skew_ms, av_sync_health, + planner_available: state.upstream_sync.available, + planner_phase: state.upstream_sync.phase.clone(), + planner_session_id: state.upstream_sync.session_id, + planner_live_lag_ms: state.upstream_sync.live_lag_ms, + planner_skew_ms: state.upstream_sync.planner_skew_ms, + planner_stale_audio_drops: state.upstream_sync.stale_audio_drops, + planner_stale_video_drops: state.upstream_sync.stale_video_drops, + planner_skew_video_drops: state.upstream_sync.skew_video_drops, + planner_freshness_reanchors: state.upstream_sync.freshness_reanchors, + planner_startup_timeouts: state.upstream_sync.startup_timeouts, + planner_video_freezes: state.upstream_sync.video_freezes, + planner_detail: state.upstream_sync.detail.clone(), calibration_available: state.calibration.available, calibration_profile: state.calibration.profile.clone(), calibration_source: state.calibration.source.clone(), diff --git a/client/src/launcher/diagnostics/snapshot_report_text.rs b/client/src/launcher/diagnostics/snapshot_report_text.rs index 37700ed..b319ea9 100644 --- a/client/src/launcher/diagnostics/snapshot_report_text.rs +++ b/client/src/launcher/diagnostics/snapshot_report_text.rs @@ -141,6 +141,41 @@ impl SnapshotReport { self.upstream_microphone.latest_enqueue_age_ms, self.upstream_microphone.latest_delivery_age_ms ); + let _ = writeln!(text, "server upstream planner"); + let _ = writeln!( + text, + " status: {} | phase={} | session={}", + if self.planner_available { + "available" + } else { + "unavailable" + }, + self.planner_phase, + self.planner_session_id + ); + let live_lag = self + .planner_live_lag_ms + .map(|value| format!("{value:.1}ms")) + .unwrap_or_else(|| "pending".to_string()); + let planner_skew = self + .planner_skew_ms + .map(|value| format!("{value:+.1}ms audio-video")) + .unwrap_or_else(|| "pending".to_string()); + let _ = writeln!( + text, + " live lag: {live_lag} | planner skew: {planner_skew}" + ); + let _ = writeln!( + text, + " drops/heals: stale_audio={} stale_video={} skew_video={} reanchors={} freezes={} startup_timeouts={}", + self.planner_stale_audio_drops, + self.planner_stale_video_drops, + self.planner_skew_video_drops, + self.planner_freshness_reanchors, + self.planner_video_freezes, + self.planner_startup_timeouts + ); + let _ = writeln!(text, " detail: {}", self.planner_detail); let _ = writeln!(text, "calibration"); let _ = writeln!( text, diff --git a/client/src/launcher/state/launcher_state_impl.rs b/client/src/launcher/state/launcher_state_impl.rs index 9159b65..8fd4c52 100644 --- a/client/src/launcher/state/launcher_state_impl.rs +++ b/client/src/launcher/state/launcher_state_impl.rs @@ -453,4 +453,7 @@ impl LauncherState { self.calibration = calibration; } + pub fn set_upstream_sync(&mut self, upstream_sync: UpstreamSyncStatus) { + self.upstream_sync = upstream_sync; + } } diff --git a/client/src/launcher/state/selection_models.rs b/client/src/launcher/state/selection_models.rs index 9ddc083..2d97324 100644 --- a/client/src/launcher/state/selection_models.rs +++ b/client/src/launcher/state/selection_models.rs @@ -346,11 +346,11 @@ impl Default for CalibrationStatus { Self { available: false, profile: "mjpeg".to_string(), - factory_audio_offset_us: 1_260_000, + factory_audio_offset_us: 0, factory_video_offset_us: 0, - default_audio_offset_us: 1_260_000, + default_audio_offset_us: 0, default_video_offset_us: 0, - active_audio_offset_us: 1_260_000, + active_audio_offset_us: 0, active_video_offset_us: 0, source: "unknown".to_string(), confidence: "unknown".to_string(), @@ -360,6 +360,81 @@ impl Default for CalibrationStatus { } } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct UpstreamSyncStatus { + pub available: bool, + pub session_id: u64, + pub phase: String, + pub latest_camera_remote_pts_us: Option, + pub latest_microphone_remote_pts_us: Option, + pub last_video_presented_pts_us: Option, + pub last_audio_presented_pts_us: Option, + pub live_lag_ms: Option, + pub planner_skew_ms: Option, + pub stale_audio_drops: u64, + pub stale_video_drops: u64, + pub skew_video_drops: u64, + pub freshness_reanchors: u64, + pub startup_timeouts: u64, + pub video_freezes: u64, + pub detail: String, +} + +impl UpstreamSyncStatus { + #[must_use] + pub fn from_proto(reply: lesavka_common::lesavka::UpstreamSyncState) -> Self { + Self { + available: true, + session_id: reply.session_id, + phase: reply.phase, + latest_camera_remote_pts_us: reply.latest_camera_remote_pts_us, + latest_microphone_remote_pts_us: reply.latest_microphone_remote_pts_us, + last_video_presented_pts_us: reply.last_video_presented_pts_us, + last_audio_presented_pts_us: reply.last_audio_presented_pts_us, + live_lag_ms: reply.live_lag_ms, + planner_skew_ms: reply.planner_skew_ms, + stale_audio_drops: reply.stale_audio_drops, + stale_video_drops: reply.stale_video_drops, + skew_video_drops: reply.skew_video_drops, + freshness_reanchors: reply.freshness_reanchors, + startup_timeouts: reply.startup_timeouts, + video_freezes: reply.video_freezes, + detail: reply.last_reason, + } + } + + #[must_use] + pub fn unavailable(detail: impl Into) -> Self { + Self { + detail: detail.into(), + ..Self::default() + } + } +} + +impl Default for UpstreamSyncStatus { + fn default() -> Self { + Self { + available: false, + session_id: 0, + phase: "unknown".to_string(), + latest_camera_remote_pts_us: None, + latest_microphone_remote_pts_us: None, + last_video_presented_pts_us: None, + last_audio_presented_pts_us: None, + live_lag_ms: None, + planner_skew_ms: None, + stale_audio_drops: 0, + stale_video_drops: 0, + skew_video_drops: 0, + freshness_reanchors: 0, + startup_timeouts: 0, + video_freezes: 0, + detail: "upstream sync planner unavailable".to_string(), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)] pub struct DeviceSelection { pub camera: Option, @@ -415,6 +490,7 @@ pub struct LauncherState { pub swap_key_binding_token: u64, pub capture_power: CapturePowerStatus, pub calibration: CalibrationStatus, + pub upstream_sync: UpstreamSyncStatus, pub remote_active: bool, pub notes: Vec, } @@ -449,6 +525,7 @@ impl Default for LauncherState { swap_key_binding_token: 0, capture_power: CapturePowerStatus::default(), calibration: CalibrationStatus::default(), + upstream_sync: UpstreamSyncStatus::default(), remote_active: false, notes: Vec::new(), } diff --git a/client/src/launcher/tests/preview.rs b/client/src/launcher/tests/preview.rs index 2a4bc6e..a63e333 100644 --- a/client/src/launcher/tests/preview.rs +++ b/client/src/launcher/tests/preview.rs @@ -175,6 +175,15 @@ impl Relay for ProbeRelay { self.get_calibration(Request::new(lesavka_common::lesavka::Empty {})) .await } + + async fn get_upstream_sync( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new( + lesavka_common::lesavka::UpstreamSyncState::default(), + )) + } } #[test] diff --git a/client/src/launcher/tests/state.rs b/client/src/launcher/tests/state.rs index af82c6a..9e4f08a 100644 --- a/client/src/launcher/tests/state.rs +++ b/client/src/launcher/tests/state.rs @@ -405,7 +405,7 @@ fn capture_power_status_updates_snapshot_state() { fn calibration_status_tracks_proto_unavailable_and_status_line() { let mut state = LauncherState::new(); assert!(!state.calibration.available); - assert_eq!(state.calibration.active_audio_offset_us, 1_260_000); + assert_eq!(state.calibration.active_audio_offset_us, 0); let unavailable = CalibrationStatus::unavailable("server unreachable"); assert!(!unavailable.available); @@ -414,7 +414,7 @@ fn calibration_status_tracks_proto_unavailable_and_status_line() { state.set_calibration(CalibrationStatus::from_proto( lesavka_common::lesavka::CalibrationState { profile: "mjpeg".to_string(), - factory_audio_offset_us: 1_260_000, + factory_audio_offset_us: 0, factory_video_offset_us: 0, default_audio_offset_us: -40_000, default_video_offset_us: 1_000, @@ -429,7 +429,7 @@ fn calibration_status_tracks_proto_unavailable_and_status_line() { assert!(state.calibration.available); assert_eq!(state.calibration.profile, "mjpeg"); - assert_eq!(state.calibration.factory_audio_offset_us, 1_260_000); + assert_eq!(state.calibration.factory_audio_offset_us, 0); assert_eq!(state.calibration.factory_video_offset_us, 0); assert_eq!(state.calibration.default_audio_offset_us, -40_000); assert_eq!(state.calibration.default_video_offset_us, 1_000); diff --git a/client/src/launcher/tests/utility_actions.rs b/client/src/launcher/tests/utility_actions.rs index 149f1df..423e585 100644 --- a/client/src/launcher/tests/utility_actions.rs +++ b/client/src/launcher/tests/utility_actions.rs @@ -6,7 +6,7 @@ use futures::stream; use lesavka_common::lesavka::{ AudioPacket, CalibrationRequest, CalibrationState, CapturePowerState, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, SetCapturePowerRequest, - VideoPacket, + UpstreamSyncState, VideoPacket, relay_server::{Relay, RelayServer}, }; use serial_test::serial; @@ -158,6 +158,13 @@ impl Relay for UtilityRelay { ) -> Result, Status> { Ok(Response::new(CalibrationState::default())) } + + async fn get_upstream_sync( + &self, + _request: Request, + ) -> Result, Status> { + Ok(Response::new(UpstreamSyncState::default())) + } } fn serve(relay: UtilityRelay) -> (tokio::runtime::Runtime, String) { diff --git a/client/src/launcher/ui.rs b/client/src/launcher/ui.rs index 440e501..958531a 100644 --- a/client/src/launcher/ui.rs +++ b/client/src/launcher/ui.rs @@ -3,8 +3,8 @@ use anyhow::Result; #[cfg(not(coverage))] use { super::calibration::{ - blind_calibration_estimate, fetch_calibration, nudge_audio_calibration, - restore_default_calibration, restore_factory_calibration, + blind_calibration_estimate, fetch_calibration, fetch_upstream_sync, + nudge_audio_calibration, restore_default_calibration, restore_factory_calibration, }, super::clipboard::send_clipboard_text_to_remote, super::device_test::{DeviceTestController, DeviceTestKind}, @@ -20,7 +20,7 @@ use { super::state::{ BreakoutSizePreset, CalibrationStatus, CapturePowerStatus, CaptureSizePreset, DisplaySurface, FeedSourcePreset, InputRouting, LauncherState, MAX_AUDIO_GAIN_PERCENT, - MAX_MIC_GAIN_PERCENT, + MAX_MIC_GAIN_PERCENT, UpstreamSyncStatus, }, super::ui_components::{ ConsoleLogLevel, build_launcher_view, sync_camera_quality_combo, sync_input_device_combo, @@ -144,6 +144,9 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { calibration_tx, calibration_rx, calibration_request_in_flight, + upstream_sync_tx, + upstream_sync_rx, + upstream_sync_request_in_flight, relay_tx, relay_rx, relay_request_in_flight, @@ -154,6 +157,7 @@ pub fn run_gui_launcher(server_addr: String) -> Result<()> { diagnostics_process, next_power_probe, next_calibration_probe, + next_upstream_sync_probe, next_diagnostics_probe, next_diagnostics_sample, preview_session_active, diff --git a/client/src/launcher/ui/activation_context.rs b/client/src/launcher/ui/activation_context.rs index 114ece5..23af8e3 100644 --- a/client/src/launcher/ui/activation_context.rs +++ b/client/src/launcher/ui/activation_context.rs @@ -21,6 +21,9 @@ struct ActivationContext { calibration_tx: std::sync::mpsc::Sender, calibration_rx: std::sync::mpsc::Receiver, calibration_request_in_flight: Rc>, + upstream_sync_tx: std::sync::mpsc::Sender, + upstream_sync_rx: std::sync::mpsc::Receiver, + upstream_sync_request_in_flight: Rc>, relay_tx: std::sync::mpsc::Sender, relay_rx: std::sync::mpsc::Receiver, relay_request_in_flight: Rc>, @@ -31,6 +34,7 @@ struct ActivationContext { diagnostics_process: Rc>, next_power_probe: Rc>, next_calibration_probe: Rc>, + next_upstream_sync_probe: Rc>, next_diagnostics_probe: Rc>, next_diagnostics_sample: Rc>, preview_session_active: Rc>, diff --git a/client/src/launcher/ui/activation_setup.rs b/client/src/launcher/ui/activation_setup.rs index 1622324..f85149e 100644 --- a/client/src/launcher/ui/activation_setup.rs +++ b/client/src/launcher/ui/activation_setup.rs @@ -112,6 +112,9 @@ let (calibration_tx, calibration_rx) = std::sync::mpsc::channel::(); let calibration_request_in_flight = Rc::new(Cell::new(false)); + let (upstream_sync_tx, upstream_sync_rx) = + std::sync::mpsc::channel::(); + let upstream_sync_request_in_flight = Rc::new(Cell::new(false)); let (relay_tx, relay_rx) = std::sync::mpsc::channel::(); let relay_request_in_flight = Rc::new(Cell::new(false)); let (caps_tx, caps_rx) = std::sync::mpsc::channel::(); @@ -122,6 +125,8 @@ Rc::new(Cell::new(Instant::now() + Duration::from_millis(500))); let next_calibration_probe = Rc::new(Cell::new(Instant::now() + Duration::from_millis(650))); + let next_upstream_sync_probe = + Rc::new(Cell::new(Instant::now() + Duration::from_millis(750))); let next_diagnostics_probe = Rc::new(Cell::new(Instant::now() + Duration::from_millis(250))); let next_diagnostics_sample = @@ -157,6 +162,9 @@ calibration_tx, calibration_rx, calibration_request_in_flight, + upstream_sync_tx, + upstream_sync_rx, + upstream_sync_request_in_flight, relay_tx, relay_rx, relay_request_in_flight, @@ -167,6 +175,7 @@ diagnostics_process, next_power_probe, next_calibration_probe, + next_upstream_sync_probe, next_diagnostics_probe, next_diagnostics_sample, preview_session_active, diff --git a/client/src/launcher/ui/control_requests.rs b/client/src/launcher/ui/control_requests.rs index a07a8aa..84f2913 100644 --- a/client/src/launcher/ui/control_requests.rs +++ b/client/src/launcher/ui/control_requests.rs @@ -179,6 +179,22 @@ fn request_calibration_refresh( }); } +#[cfg(not(coverage))] +/// Refresh authoritative upstream sync planner state in the background. +fn request_upstream_sync_refresh( + upstream_sync_tx: std::sync::mpsc::Sender, + server_addr: String, + delay: Duration, +) { + std::thread::spawn(move || { + if !delay.is_zero() { + std::thread::sleep(delay); + } + let result = fetch_upstream_sync(&server_addr).map_err(|err| err.to_string()); + let _ = upstream_sync_tx.send(UpstreamSyncMessage::Refresh(result)); + }); +} + #[cfg(not(coverage))] fn request_calibration_command( calibration_tx: std::sync::mpsc::Sender, @@ -233,6 +249,11 @@ fn unavailable_calibration(detail: String) -> CalibrationStatus { CalibrationStatus::unavailable(detail) } +#[cfg(not(coverage))] +fn unavailable_upstream_sync(detail: String) -> UpstreamSyncStatus { + UpstreamSyncStatus::unavailable(detail) +} + #[cfg(not(coverage))] fn calibration_summary(calibration: &CalibrationStatus) -> String { format!( diff --git a/client/src/launcher/ui/message_and_network_state.rs b/client/src/launcher/ui/message_and_network_state.rs index c5a2aea..4729241 100644 --- a/client/src/launcher/ui/message_and_network_state.rs +++ b/client/src/launcher/ui/message_and_network_state.rs @@ -10,6 +10,11 @@ enum CalibrationMessage { Command(std::result::Result), } +#[cfg(not(coverage))] +enum UpstreamSyncMessage { + Refresh(std::result::Result), +} + #[cfg(not(coverage))] enum RelayMessage { Spawned(std::result::Result), diff --git a/client/src/launcher/ui/runtime_poll.rs b/client/src/launcher/ui/runtime_poll.rs index 1508518..c3d5166 100644 --- a/client/src/launcher/ui/runtime_poll.rs +++ b/client/src/launcher/ui/runtime_poll.rs @@ -13,14 +13,17 @@ Rc::new(RefCell::new(path_marker(focus_signal_path.as_path()))); let power_request_in_flight = Rc::clone(&power_request_in_flight); let calibration_request_in_flight = Rc::clone(&calibration_request_in_flight); + let upstream_sync_request_in_flight = Rc::clone(&upstream_sync_request_in_flight); let relay_request_in_flight = Rc::clone(&relay_request_in_flight); let preview = preview.clone(); let power_tx = power_tx.clone(); let calibration_tx = calibration_tx.clone(); + let upstream_sync_tx = upstream_sync_tx.clone(); let caps_tx = caps_tx.clone(); let caps_request_in_flight = Rc::clone(&caps_request_in_flight); let diagnostics_network = Rc::clone(&diagnostics_network); let diagnostics_process = Rc::clone(&diagnostics_process); + let next_upstream_sync_probe = Rc::clone(&next_upstream_sync_probe); let next_diagnostics_probe = Rc::clone(&next_diagnostics_probe); let next_diagnostics_sample = Rc::clone(&next_diagnostics_sample); let preview_session_active = Rc::clone(&preview_session_active); @@ -77,6 +80,11 @@ server_addr.clone(), Duration::from_millis(350), ); + request_upstream_sync_refresh( + upstream_sync_tx.clone(), + server_addr.clone(), + Duration::from_millis(450), + ); request_capture_power_refresh( power_tx.clone(), server_addr, @@ -156,6 +164,11 @@ server_addr.clone(), Duration::from_millis(300), ); + request_upstream_sync_refresh( + upstream_sync_tx.clone(), + server_addr.clone(), + Duration::from_millis(400), + ); request_capture_power_refresh( power_tx.clone(), server_addr, @@ -302,6 +315,26 @@ } } + while let Ok(message) = upstream_sync_rx.try_recv() { + upstream_sync_request_in_flight.set(false); + match message { + UpstreamSyncMessage::Refresh(Ok(upstream_sync)) => { + let mut state = state.borrow_mut(); + state.set_server_available(true); + state.set_upstream_sync(upstream_sync); + } + UpstreamSyncMessage::Refresh(Err(err)) => { + let relay_live = child_proc.borrow().is_some() + || state.borrow().remote_active; + let mut state = state.borrow_mut(); + if relay_live { + state.set_server_available(true); + } + state.set_upstream_sync(unavailable_upstream_sync(err)); + } + } + } + while let Ok(message) = caps_rx.try_recv() { caps_request_in_flight.set(false); match message { @@ -393,6 +426,21 @@ next_calibration_probe.set(now + Duration::from_secs(2)); } + if now >= next_upstream_sync_probe.get() + && !upstream_sync_request_in_flight.get() + && (child_running || state.borrow().server_available) + { + upstream_sync_request_in_flight.set(true); + let server_addr = + selected_server_addr(&server_entry, server_addr_fallback.as_ref()); + request_upstream_sync_refresh( + upstream_sync_tx.clone(), + server_addr, + Duration::ZERO, + ); + next_upstream_sync_probe.set(now + Duration::from_secs(2)); + } + if now >= next_diagnostics_probe.get() && !caps_request_in_flight.get() { caps_request_in_flight.set(true); let server_addr = diff --git a/client/src/launcher/ui/utility_button_bindings.rs b/client/src/launcher/ui/utility_button_bindings.rs index ea8de9e..d9fa58b 100644 --- a/client/src/launcher/ui/utility_button_bindings.rs +++ b/client/src/launcher/ui/utility_button_bindings.rs @@ -406,7 +406,7 @@ let widgets = widgets.clone(); widgets.calibration_rig_button.connect_clicked(move |_| { widgets.status_label.set_text( - "Rig calibration wizard is queued for the 0.16.0 test-equipment phase; for now the manual Tethys sync battery remains the measured-default path.", + "Rig calibration wizard is queued for lab tooling; for now the mirrored Tethys sync probe remains the measured-default path.", ); }); } diff --git a/common/Cargo.toml b/common/Cargo.toml index d578499..12769b0 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.16.26" +version = "0.17.0" edition = "2024" build = "build.rs" diff --git a/common/proto/lesavka.proto b/common/proto/lesavka.proto index f8f8bda..9af1a83 100644 --- a/common/proto/lesavka.proto +++ b/common/proto/lesavka.proto @@ -89,6 +89,24 @@ message CalibrationRequest { string note = 6; } +message UpstreamSyncState { + uint64 session_id = 1; + string phase = 2; + optional uint64 latest_camera_remote_pts_us = 3; + optional uint64 latest_microphone_remote_pts_us = 4; + optional uint64 last_video_presented_pts_us = 5; + optional uint64 last_audio_presented_pts_us = 6; + optional float live_lag_ms = 7; + optional float planner_skew_ms = 8; + uint64 stale_audio_drops = 9; + uint64 stale_video_drops = 10; + uint64 skew_video_drops = 11; + uint64 freshness_reanchors = 12; + uint64 startup_timeouts = 13; + uint64 video_freezes = 14; + string last_reason = 15; +} + message HandshakeSet { bool camera = 1; bool microphone = 2; @@ -122,6 +140,7 @@ service Relay { rpc SetCapturePower (SetCapturePowerRequest) returns (CapturePowerState); rpc GetCalibration (Empty) returns (CalibrationState); rpc Calibrate (CalibrationRequest) returns (CalibrationState); + rpc GetUpstreamSync (Empty) returns (UpstreamSyncState); } service Handshake { diff --git a/docs/operational-env.md b/docs/operational-env.md index 9271b95..bc0adf1 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -248,8 +248,10 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_UAC_DEV` | server hardware/device override | | `LESAVKA_UAC_SESSION_CLOCK_ALIGN` | server audio sink clock-alignment override; `0` is the host-validated default | | `LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts gadget-audio presentation relative to the shared playout epoch | -| `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may lead the planned audio-master capture moment before the frame is held or dropped, defaults to `20000` | -| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization window; the server uses this shared buffer to pair webcam frames with their matching gadget-mic audio before remote presentation, defaults to `1000` | +| `LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS` | server upstream planner freshness ceiling; planner-approved audio/video should not exceed this live lag budget, defaults to `1000` | +| `LESAVKA_UPSTREAM_PAIR_SLACK_US` | server upstream pairing override; how far video may diverge from the planned audio-master capture moment before the frame is held or dropped, defaults to `80000` | +| `LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS` | server upstream pairing/synchronization target buffer; the server uses this shared buffer to pair webcam frames with matching gadget-mic audio before remote presentation, defaults to `350` | +| `LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS` | server upstream startup guard; paired startup must converge before this timeout or fail visibly, defaults to `60000` | | `LESAVKA_UPSTREAM_STALE_DROP_MS` | server upstream freshness override; late audio/video that miss this budget are dropped instead of silently extending lag, defaults to `80` | | `LESAVKA_UPSTREAM_TIMING_TRACE` | upstream capture/rebase trace override for sync debugging | | `LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US` | server upstream playout override; shifts webcam-video presentation relative to the shared playout epoch | diff --git a/scripts/install/server.sh b/scripts/install/server.sh index c75cb03..6cfbc98 100755 --- a/scripts/install/server.sh +++ b/scripts/install/server.sh @@ -14,9 +14,10 @@ INSTALL_UVC_CODEC=${LESAVKA_INSTALL_UVC_CODEC:-mjpeg} INSTALL_SERVER_BIND_ADDR=${LESAVKA_INSTALL_SERVER_BIND_ADDR:-0.0.0.0:50051} LESAVKA_TLS_DIR=${LESAVKA_TLS_DIR:-/etc/lesavka/pki} LESAVKA_CLIENT_BUNDLE=${LESAVKA_CLIENT_BUNDLE:-/etc/lesavka/lesavka-client-pki.tar.gz} -DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=1260000 +DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=0 LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=-45000 PREVIOUS_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=720000 +PREVIOUS_TUNED_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=1260000 resolve_upstream_audio_playout_offset_us() { if [[ -n ${LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} ]]; then @@ -24,8 +25,8 @@ resolve_upstream_audio_playout_offset_us() { return 0 fi - if [[ ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" || ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$PREVIOUS_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" ]]; then - echo "⚠️ migrating stale upstream audio playout offset to +1260ms for MJPEG/UVC." >&2 + if [[ ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" || ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$PREVIOUS_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" || ${LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US:-} == "$PREVIOUS_TUNED_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" ]]; then + echo "⚠️ migrating stale upstream audio playout offset to the 0.17 freshness-first planner default." >&2 echo " Use LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US to intentionally keep an older value." >&2 printf '%s\n' "$DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US" return 0 @@ -974,7 +975,9 @@ fi printf 'LESAVKA_ALSA_DEV=%s\n' "${LESAVKA_ALSA_DEV:-hw:UAC2Gadget,0}" printf 'LESAVKA_UAC_HDMI_COMPENSATION_US=%s\n' "${LESAVKA_UAC_HDMI_COMPENSATION_US:-205000}" printf 'LESAVKA_UAC_SESSION_CLOCK_ALIGN=%s\n' "${LESAVKA_UAC_SESSION_CLOCK_ALIGN:-0}" - printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}" + printf 'LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s\n' "${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-350}" + printf 'LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS=%s\n' "${LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS:-1000}" + printf 'LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS=%s\n' "${LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS:-60000}" printf 'LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s\n' "$(resolve_upstream_audio_playout_offset_us)" printf 'LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s\n' "${LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US:-0}" printf 'LESAVKA_UPSTREAM_PAIR_SLACK_US=%s\n' "${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}" diff --git a/scripts/manual/run_upstream_mirrored_av_sync.sh b/scripts/manual/run_upstream_mirrored_av_sync.sh index b315983..7fb0a7f 100755 --- a/scripts/manual/run_upstream_mirrored_av_sync.sh +++ b/scripts/manual/run_upstream_mirrored_av_sync.sh @@ -185,6 +185,27 @@ print_lesavka_versions() { done <<<"${version_output}" } +print_upstream_sync_state() { + local label="$1" + echo "==> upstream sync planner state (${label})" + if [[ ! -x "${REPO_ROOT}/target/debug/lesavka-relayctl" ]]; then + (cd "${REPO_ROOT}" && cargo build -p lesavka_client --bin lesavka-relayctl >/dev/null) + fi + local sync_output + if ! sync_output="$( + LESAVKA_TLS_DOMAIN="${LESAVKA_TLS_DOMAIN}" \ + "${REPO_ROOT}/target/debug/lesavka-relayctl" \ + --server "${RESOLVED_LESAVKA_SERVER_ADDR}" \ + upstream-sync 2>&1 + )"; then + echo " ↪ planner query failed: ${sync_output}" + return 0 + fi + while IFS= read -r line; do + [[ -n "${line}" ]] && echo " ↪ ${line}" + done <<<"${sync_output}" +} + start_local_stimulus() { echo "==> starting local A/V stimulus server" python3 "${REPO_ROOT}/scripts/manual/local_av_stimulus.py" \ @@ -266,9 +287,11 @@ echo "==> prebuilding real client and analyzer" start_server_tunnel_if_needed print_lesavka_versions +print_upstream_sync_state "before mirrored run" start_local_stimulus start_real_lesavka_client run_browser_capture_with_real_driver +print_upstream_sync_state "after mirrored run" echo "==> mirrored probe complete" echo "artifact_dir: ${ARTIFACT_DIR}" diff --git a/server/Cargo.toml b/server/Cargo.toml index 7450d06..cea602a 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.16.26" +version = "0.17.0" edition = "2024" autobins = false diff --git a/server/src/calibration.rs b/server/src/calibration.rs index 78a9588..fe1f2c6 100644 --- a/server/src/calibration.rs +++ b/server/src/calibration.rs @@ -9,13 +9,14 @@ use lesavka_common::lesavka::{ use crate::upstream_media_runtime::UpstreamMediaRuntime; -pub const FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 1_260_000; +pub const FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 0; pub const FACTORY_MJPEG_VIDEO_OFFSET_US: i64 = 0; const LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = -45_000; const PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US: i64 = 720_000; +const PREVIOUS_TUNED_MJPEG_AUDIO_OFFSET_US: i64 = 1_260_000; const PROFILE: &str = "mjpeg"; const FACTORY_CONFIDENCE: &str = "factory"; -const OFFSET_LIMIT_US: i64 = 1_500_000; +const OFFSET_LIMIT_US: i64 = 500_000; #[derive(Debug, Clone, PartialEq, Eq)] struct CalibrationSnapshot { @@ -237,10 +238,17 @@ fn parse_snapshot(raw: &str) -> CalibrationSnapshot { fn migrate_legacy_snapshot(mut state: CalibrationSnapshot) -> CalibrationSnapshot { let source_allows_migration = matches!(state.source.as_str(), "factory" | "env"); let confidence_allows_migration = matches!(state.confidence.as_str(), "factory" | "configured"); - let untouched_legacy_audio = matches!( + let clamped_previous_baseline = state.default_audio_offset_us == OFFSET_LIMIT_US + && state + .detail + .contains("loaded upstream A/V calibration defaults"); + let untouched_legacy_audio = (matches!( state.default_audio_offset_us, - LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US | PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US - ) && state.active_audio_offset_us == state.default_audio_offset_us; + LEGACY_FACTORY_MJPEG_AUDIO_OFFSET_US + | PREVIOUS_FACTORY_MJPEG_AUDIO_OFFSET_US + | PREVIOUS_TUNED_MJPEG_AUDIO_OFFSET_US + ) || clamped_previous_baseline) + && state.active_audio_offset_us == state.default_audio_offset_us; let untouched_legacy_video = state.default_video_offset_us == FACTORY_MJPEG_VIDEO_OFFSET_US && state.active_video_offset_us == FACTORY_MJPEG_VIDEO_OFFSET_US; if state.profile == PROFILE @@ -321,7 +329,7 @@ mod tests { ], || { let state = snapshot_from_env(); - assert_eq!(state.default_audio_offset_us, 1_260_000); + assert_eq!(state.default_audio_offset_us, 0); assert_eq!(state.active_video_offset_us, 0); assert_eq!(state.source, "factory"); }, @@ -345,10 +353,10 @@ mod tests { note: String::new(), }) .expect("manual adjust applies"); - assert_eq!(state.active_audio_offset_us, 1_255_000); - assert_eq!(runtime.playout_offsets(), (0, 1_255_000)); + assert_eq!(state.active_audio_offset_us, -5_000); + assert_eq!(runtime.playout_offsets(), (0, -5_000)); let raw = std::fs::read_to_string(file.path()).expect("persisted calibration"); - assert!(raw.contains("active_audio_offset_us=1255000")); + assert!(raw.contains("active_audio_offset_us=-5000")); }); } @@ -371,7 +379,7 @@ mod tests { ], || { let state = snapshot_from_env(); - assert_eq!(state.default_audio_offset_us, -1_500_000); + assert_eq!(state.default_audio_offset_us, -500_000); assert_eq!(state.default_video_offset_us, 12_345); assert_eq!(state.source, "env"); assert_eq!(state.confidence, "configured"); @@ -399,7 +407,7 @@ mod tests { ); assert_eq!(state.default_audio_offset_us, FACTORY_MJPEG_AUDIO_OFFSET_US); assert_eq!(state.default_video_offset_us, 2_500); - assert_eq!(state.active_audio_offset_us, -1_500_000); + assert_eq!(state.active_audio_offset_us, -500_000); assert_eq!(state.active_video_offset_us, FACTORY_MJPEG_VIDEO_OFFSET_US); assert_eq!(state.source, "saved"); assert_eq!(state.confidence, FACTORY_CONFIDENCE); @@ -429,10 +437,10 @@ mod tests { let runtime = Arc::new(UpstreamMediaRuntime::new()); let store = CalibrationStore::load(runtime.clone()); let state = store.current(); - assert_eq!(state.active_audio_offset_us, 1_260_000); - assert_eq!(state.default_audio_offset_us, 1_260_000); + assert_eq!(state.active_audio_offset_us, 0); + assert_eq!(state.default_audio_offset_us, 0); assert_eq!(state.source, "factory"); - assert_eq!(runtime.playout_offsets(), (0, 1_260_000)); + assert_eq!(runtime.playout_offsets(), (0, 0)); assert!(state.detail.contains("migrated legacy MJPEG")); }); } @@ -459,11 +467,11 @@ mod tests { let runtime = Arc::new(UpstreamMediaRuntime::new()); let store = CalibrationStore::load(runtime.clone()); let state = store.current(); - assert_eq!(state.active_audio_offset_us, 1_260_000); - assert_eq!(state.default_audio_offset_us, 1_260_000); + assert_eq!(state.active_audio_offset_us, 0); + assert_eq!(state.default_audio_offset_us, 0); assert_eq!(state.source, "factory"); - assert_eq!(runtime.playout_offsets(), (0, 1_260_000)); - assert!(state.detail.contains("from +720.0ms to +1260.0ms")); + assert_eq!(runtime.playout_offsets(), (0, 0)); + assert!(state.detail.contains("to +0.0ms")); }); } @@ -519,7 +527,7 @@ mod tests { .expect("blind estimate"); assert_eq!(blind.source, "blind"); assert!(blind.detail.contains("delivery skew 44.0ms")); - assert_eq!(runtime.playout_offsets(), (-2_000, 1_265_000)); + assert_eq!(runtime.playout_offsets(), (-2_000, 5_000)); let manual = store .apply(CalibrationRequest { @@ -531,7 +539,7 @@ mod tests { note: String::new(), }) .expect("manual clamp"); - assert_eq!(manual.active_audio_offset_us, 1_500_000); + assert_eq!(manual.active_audio_offset_us, 500_000); let saved = store .apply(CalibrationRequest { diff --git a/server/src/main.rs b/server/src/main.rs index 6c3f558..bbaf67d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -20,7 +20,7 @@ use tracing::{debug, error, info, warn}; use lesavka_common::lesavka::{ AudioPacket, CalibrationRequest, CalibrationState, CapturePowerCommand, CapturePowerState, Empty, KeyboardReport, MonitorRequest, MouseReport, PasteReply, PasteRequest, ResetUsbReply, - SetCapturePowerRequest, VideoPacket, + SetCapturePowerRequest, UpstreamSyncState, VideoPacket, relay_server::{Relay, RelayServer}, }; diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 1bd3751..dc052d0 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -198,12 +198,41 @@ impl Relay for Handler { }; let plan = match upstream_media_rt.plan_audio_pts(pkt.pts) { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::AwaitingPair => { + if inbound_closed { + tracing::debug!( + rpc_id, + session_id = lease.session_id, + pts = pkt.pts, + "🎤 dropping trailing upstream audio because no paired video arrived before stream close" + ); + continue; + } pending.push_front(pkt); continue; } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { continue; } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => { + tracing::warn!( + rpc_id, + session_id = lease.session_id, + pts = pkt.pts, + reason, + "🎤 upstream audio packet dropped by authoritative freshness planner" + ); + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(reason) => { + tracing::error!( + rpc_id, + session_id = lease.session_id, + reason, + "🎤 upstream audio startup failed" + ); + cleanup.mark_aborted(); + break; + } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; if plan.late_by > stale_drop_budget { @@ -236,6 +265,7 @@ impl Relay for Handler { tracing::info!(rpc_id, "🎤⬇ srv pkt#{n} {} bytes", pkt.data.len()); } sink.push(&pkt); + upstream_media_rt.mark_audio_presented(pkt.pts); } sink.finish(); // flush on EOS let _ = tx.send(Ok(Empty {})).await; @@ -362,12 +392,37 @@ impl Relay for Handler { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { continue; } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(reason) => { + tracing::warn!( + rpc_id, + session_id = upstream_lease.session_id, + camera_session_id, + pts = pkt.pts, + reason, + "🎥 upstream video frame dropped by authoritative freshness planner" + ); + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(reason) => { + tracing::error!( + rpc_id, + session_id = upstream_lease.session_id, + camera_session_id, + reason, + "🎥 upstream video startup failed" + ); + cleanup.mark_aborted(); + break; + } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; if !upstream_media_rt .wait_for_audio_master(plan.local_pts_us, plan.due_at) .await { + upstream_media_rt.record_video_freeze( + "video froze because audio master did not reach the frame timestamp", + ); tracing::warn!( rpc_id, session_id = upstream_lease.session_id, @@ -433,7 +488,9 @@ impl Relay for Handler { } pkt.pts = plan.local_pts_us; startup_video_settled = true; + let presented_pts = pkt.pts; relay.feed(pkt); // ← all logging inside video.rs + upstream_media_rt.mark_video_presented(presented_pts); } tx.send(Ok(Empty {})).await.ok(); Ok::<(), Status>(()) @@ -503,6 +560,13 @@ impl Relay for Handler { ) -> Result, Status> { self.calibrate_reply(req).await } + + async fn get_upstream_sync( + &self, + _req: Request, + ) -> Result, Status> { + self.get_upstream_sync_reply().await + } } #[cfg(test)] diff --git a/server/src/main/relay_service_coverage.rs b/server/src/main/relay_service_coverage.rs index 52b9421..cf51ccf 100644 --- a/server/src/main/relay_service_coverage.rs +++ b/server/src/main/relay_service_coverage.rs @@ -154,6 +154,12 @@ impl Relay for Handler { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { continue; } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(_) => { + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(_) => { + break; + } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; if plan.late_by > stale_drop_budget { @@ -168,6 +174,7 @@ impl Relay for Handler { } pkt.pts = plan.local_pts_us; sink.push(&pkt); + upstream_media_rt.mark_audio_presented(pkt.pts); } sink.finish(); upstream_media_rt.close_microphone(lease.generation); @@ -233,12 +240,19 @@ impl Relay for Handler { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { continue; } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropStale(_) => { + continue; + } + lesavka_server::upstream_media_runtime::UpstreamPlanDecision::StartupFailed(_) => { + break; + } lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, }; if !upstream_media_rt .wait_for_audio_master(plan.local_pts_us, plan.due_at) .await { + upstream_media_rt.record_video_freeze("coverage video froze awaiting audio master"); continue; } if plan.late_by > stale_drop_budget { @@ -254,7 +268,9 @@ impl Relay for Handler { continue; } pkt.pts = plan.local_pts_us; + let presented_pts = pkt.pts; relay.feed(pkt); + upstream_media_rt.mark_video_presented(presented_pts); } upstream_media_rt.close_camera(upstream_lease.generation); tx.send(Ok(Empty {})).await.ok(); @@ -336,4 +352,11 @@ impl Relay for Handler { ) -> Result, Status> { self.calibrate_reply(req).await } + + async fn get_upstream_sync( + &self, + _req: Request, + ) -> Result, Status> { + self.get_upstream_sync_reply().await + } } diff --git a/server/src/main/rpc_helpers.rs b/server/src/main/rpc_helpers.rs index 5a50f45..d2c6cb1 100644 --- a/server/src/main/rpc_helpers.rs +++ b/server/src/main/rpc_helpers.rs @@ -164,4 +164,25 @@ impl Handler { .map(Response::new) .map_err(|e| Status::internal(format!("{e:#}"))) } + + async fn get_upstream_sync_reply(&self) -> Result, Status> { + let snapshot = self.upstream_media_rt.snapshot(); + Ok(Response::new(UpstreamSyncState { + session_id: snapshot.session_id, + phase: snapshot.phase.to_string(), + latest_camera_remote_pts_us: snapshot.latest_camera_remote_pts_us, + latest_microphone_remote_pts_us: snapshot.latest_microphone_remote_pts_us, + last_video_presented_pts_us: snapshot.last_video_presented_pts_us, + last_audio_presented_pts_us: snapshot.last_audio_presented_pts_us, + live_lag_ms: snapshot.live_lag_ms.map(|value| value as f32), + planner_skew_ms: snapshot.planner_skew_ms.map(|value| value as f32), + stale_audio_drops: snapshot.stale_audio_drops, + stale_video_drops: snapshot.stale_video_drops, + skew_video_drops: snapshot.skew_video_drops, + freshness_reanchors: snapshot.freshness_reanchors, + startup_timeouts: snapshot.startup_timeouts, + video_freezes: snapshot.video_freezes, + last_reason: snapshot.last_reason, + })) + } } diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 05396a6..c32b8be 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -12,13 +12,15 @@ mod state; mod types; use config::{ - apply_playout_offset, upstream_camera_startup_grace_us, upstream_pairing_master_slack, - upstream_playout_delay, upstream_playout_offset_us, upstream_reanchor_late_threshold, - upstream_reanchor_window_us, upstream_require_paired_startup, upstream_timing_trace_enabled, + apply_playout_offset, upstream_camera_startup_grace_us, upstream_max_live_lag, + upstream_pairing_master_slack, upstream_playout_delay, upstream_playout_offset_us, + upstream_reanchor_late_threshold, upstream_require_paired_startup, upstream_startup_timeout, + upstream_timing_trace_enabled, }; -use state::UpstreamClockState; +use state::{UpstreamClockState, UpstreamSyncPhase}; pub use types::{ - PlannedUpstreamPacket, UpstreamMediaKind, UpstreamPlanDecision, UpstreamStreamLease, + PlannedUpstreamPacket, UpstreamMediaKind, UpstreamPlanDecision, UpstreamPlannerSnapshot, + UpstreamStreamLease, }; /// Coordinate upstream stream ownership and keep audio/video on one timeline. @@ -93,6 +95,79 @@ impl UpstreamMediaRuntime { let microphone_offset_us = self.microphone_playout_offset_us.load(Ordering::Relaxed); microphone_offset_us.saturating_sub(camera_offset_us).max(0) as u64 } + + /// Mark one audio chunk as actually handed to the UAC sink. + pub fn mark_audio_presented(&self, local_pts_us: u64) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + state.last_audio_presented_pts_us = Some(local_pts_us); + if state.phase != UpstreamSyncPhase::Failed { + state.phase = UpstreamSyncPhase::Live; + state.last_reason = "audio-master playhead flowing".to_string(); + } + } + + /// Mark one video frame as actually handed to the UVC/HDMI sink. + pub fn mark_video_presented(&self, local_pts_us: u64) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + state.last_video_presented_pts_us = Some(local_pts_us); + if state.phase != UpstreamSyncPhase::Failed { + state.phase = UpstreamSyncPhase::Live; + state.last_reason = "video follower emitted a synced frame".to_string(); + } + } + + /// Record that video intentionally froze instead of showing an out-of-sync frame. + pub fn record_video_freeze(&self, reason: impl Into) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + state.video_freezes = state.video_freezes.saturating_add(1); + if state.phase != UpstreamSyncPhase::Failed { + state.phase = UpstreamSyncPhase::Healing; + } + state.last_reason = reason.into(); + } + + /// Return current planner facts for diagnostics and probe artifacts. + #[must_use] + pub fn snapshot(&self) -> UpstreamPlannerSnapshot { + let state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + let live_lag_ms = live_lag_us(&state).map(us_to_ms); + let planner_skew_ms = match ( + state.last_audio_presented_pts_us, + state.last_video_presented_pts_us, + ) { + (Some(audio), Some(video)) => Some((audio as i128 - video as i128) as f64 / 1000.0), + _ => None, + }; + UpstreamPlannerSnapshot { + session_id: state.session_id, + phase: state.phase.as_str(), + latest_camera_remote_pts_us: state.latest_camera_remote_pts_us, + latest_microphone_remote_pts_us: state.latest_microphone_remote_pts_us, + last_video_presented_pts_us: state.last_video_presented_pts_us, + last_audio_presented_pts_us: state.last_audio_presented_pts_us, + live_lag_ms, + planner_skew_ms, + stale_audio_drops: state.stale_audio_drops, + stale_video_drops: state.stale_video_drops, + skew_video_drops: state.skew_video_drops, + freshness_reanchors: state.freshness_reanchors, + startup_timeouts: state.startup_timeouts, + video_freezes: state.video_freezes, + last_reason: state.last_reason.clone(), + } + } } include!("upstream_media_runtime/lease_lifecycle.rs"); @@ -189,6 +264,7 @@ impl UpstreamMediaRuntime { state.microphone_packet_count } }; + update_latest_remote_pts(&mut state, kind, remote_pts_us); let mut first_remote_for_kind = match kind { UpstreamMediaKind::Camera => { let first_slot = &mut state.first_camera_remote_pts_us; @@ -215,8 +291,20 @@ impl UpstreamMediaRuntime { .pairing_anchor_deadline .get_or_insert_with(|| now + upstream_playout_delay()); let playout_delay = upstream_playout_delay(); + let max_live_lag = upstream_max_live_lag(); if state.session_base_remote_pts_us.is_none() { + if state.session_started_at.is_some_and(|started_at| { + now.saturating_duration_since(started_at) > upstream_startup_timeout() + }) { + state.phase = UpstreamSyncPhase::Failed; + state.startup_timeouts = state.startup_timeouts.saturating_add(1); + state.last_reason = + "paired upstream startup did not converge before timeout".to_string(); + return UpstreamPlanDecision::StartupFailed( + "paired upstream startup did not converge before timeout", + ); + } if state.first_camera_remote_pts_us.is_some() && state.first_microphone_remote_pts_us.is_some() && state.camera_startup_ready @@ -230,6 +318,8 @@ impl UpstreamMediaRuntime { let overlap_epoch = now + playout_delay; state.playout_epoch = Some(overlap_epoch); state.pairing_anchor_deadline = Some(overlap_epoch); + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "fresh audio/video overlap anchor established".to_string(); if !state.startup_anchor_logged { let startup_delta_us = first_camera_remote_pts_us as i128 - first_microphone_remote_pts_us as i128; @@ -246,6 +336,8 @@ impl UpstreamMediaRuntime { } self.pairing_state_notify.notify_waiters(); } else if now < pairing_deadline { + state.phase = UpstreamSyncPhase::Acquiring; + state.last_reason = "awaiting both upstream media streams".to_string(); if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { @@ -260,6 +352,8 @@ impl UpstreamMediaRuntime { } return UpstreamPlanDecision::AwaitingPair; } else if state.first_camera_remote_pts_us.is_some() && !state.camera_startup_ready { + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "camera startup warm-up is still in progress".to_string(); if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { @@ -290,6 +384,8 @@ impl UpstreamMediaRuntime { "upstream media pairing window expired; holding one-sided stream for synced startup" ); } + state.phase = UpstreamSyncPhase::Syncing; + state.last_reason = "holding one-sided stream for synced startup".to_string(); return UpstreamPlanDecision::AwaitingPair; } else { let single_stream_base_remote_pts_us = match kind { @@ -331,6 +427,23 @@ impl UpstreamMediaRuntime { return UpstreamPlanDecision::DropBeforeOverlap; } + let source_lag = source_lag_for_kind(&state, kind, remote_pts_us); + if source_lag > max_live_lag { + match kind { + UpstreamMediaKind::Camera => { + state.stale_video_drops = state.stale_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.last_reason = "dropped stale video beyond max live lag".to_string(); + } + UpstreamMediaKind::Microphone => { + state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); + state.last_reason = "dropped stale audio beyond max live lag".to_string(); + } + } + state.phase = UpstreamSyncPhase::Healing; + return UpstreamPlanDecision::DropStale("packet exceeded max live lag"); + } + let mut local_pts_us = remote_pts_us.saturating_sub(session_base_remote_pts_us); let last_slot = match kind { UpstreamMediaKind::Camera => &mut state.last_video_local_pts_us, @@ -342,27 +455,41 @@ impl UpstreamMediaRuntime { local_pts_us = last_pts_us.saturating_add(min_step_us.max(1)); } *last_slot = Some(local_pts_us); + if kind == UpstreamMediaKind::Camera + && state.last_audio_local_pts_us.is_some_and(|audio_pts_us| { + video_is_too_far_behind_audio(local_pts_us, audio_pts_us) + }) + { + state.skew_video_drops = state.skew_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.phase = UpstreamSyncPhase::Healing; + state.last_reason = + "dropped video frame that was too far behind the audio master".to_string(); + return UpstreamPlanDecision::DropStale("video frame was too far behind audio master"); + } let epoch = *state.playout_epoch.get_or_insert(pairing_deadline); let sink_offset_us = self.playout_offset_us(kind); - let playout_delay = upstream_playout_delay(); + let playout_delay = upstream_playout_delay().min(max_live_lag); let mut due_at = apply_playout_offset(epoch + Duration::from_micros(local_pts_us), sink_offset_us); let mut late_by = now.checked_duration_since(due_at).unwrap_or_default(); let reanchor_threshold = upstream_reanchor_late_threshold(playout_delay); - let reanchor_window_us = upstream_reanchor_window_us(playout_delay); - if !state.catastrophic_reanchor_done - && local_pts_us <= reanchor_window_us - && late_by > reanchor_threshold - { + let max_future_wait = max_live_lag.saturating_sub(source_lag); + let due_future_wait = due_at.saturating_duration_since(now); + if late_by > reanchor_threshold || due_future_wait > max_future_wait { let old_late_by = late_by; - let desired_due_at = now + playout_delay; + let old_future_wait = due_future_wait; + let desired_delay = playout_delay.min(max_future_wait); + let desired_due_at = now + desired_delay; let unoffset_due_at = apply_playout_offset(desired_due_at, -sink_offset_us); let recovered_epoch = unoffset_due_at .checked_sub(Duration::from_micros(local_pts_us)) .unwrap_or(unoffset_due_at); state.playout_epoch = Some(recovered_epoch); state.pairing_anchor_deadline = Some(desired_due_at); - state.catastrophic_reanchor_done = true; + state.freshness_reanchors = state.freshness_reanchors.saturating_add(1); + state.phase = UpstreamSyncPhase::Healing; + state.last_reason = "reanchored upstream playhead to preserve freshness".to_string(); due_at = apply_playout_offset( recovered_epoch + Duration::from_micros(local_pts_us), sink_offset_us, @@ -375,11 +502,33 @@ impl UpstreamMediaRuntime { local_pts_us, remote_pts_us, old_late_by_ms = old_late_by.as_millis(), + old_future_wait_ms = old_future_wait.as_millis(), recovery_buffer_ms = playout_delay.as_millis(), reanchor_threshold_ms = reanchor_threshold.as_millis(), - "upstream media playout epoch reanchored after catastrophic lateness" + max_live_lag_ms = max_live_lag.as_millis(), + source_lag_ms = source_lag.as_millis(), + "upstream media playhead reanchored to preserve freshness" ); } + let predicted_lag_at_playout = + source_lag.saturating_add(due_at.saturating_duration_since(now)); + if predicted_lag_at_playout > max_live_lag { + match kind { + UpstreamMediaKind::Camera => { + state.stale_video_drops = state.stale_video_drops.saturating_add(1); + state.video_freezes = state.video_freezes.saturating_add(1); + state.last_reason = + "dropped video that would exceed max live lag at playout".to_string(); + } + UpstreamMediaKind::Microphone => { + state.stale_audio_drops = state.stale_audio_drops.saturating_add(1); + state.last_reason = + "dropped audio that would exceed max live lag at playout".to_string(); + } + } + state.phase = UpstreamSyncPhase::Healing; + return UpstreamPlanDecision::DropStale("packet would exceed max live lag at playout"); + } if upstream_timing_trace_enabled() && (packet_count <= 10 || packet_count.is_multiple_of(300)) { @@ -397,6 +546,7 @@ impl UpstreamMediaRuntime { playout_delay_us, sink_offset_us, late_by_us, + source_lag_us = source_lag.as_micros(), "upstream media rebase sample" ); } @@ -407,10 +557,54 @@ impl UpstreamMediaRuntime { local_pts_us, due_at, late_by, + source_lag, }) } } +fn update_latest_remote_pts( + state: &mut UpstreamClockState, + kind: UpstreamMediaKind, + remote_pts_us: u64, +) { + let slot = match kind { + UpstreamMediaKind::Camera => &mut state.latest_camera_remote_pts_us, + UpstreamMediaKind::Microphone => &mut state.latest_microphone_remote_pts_us, + }; + *slot = Some((*slot).unwrap_or(remote_pts_us).max(remote_pts_us)); +} + +fn source_lag_for_kind( + state: &UpstreamClockState, + kind: UpstreamMediaKind, + remote_pts_us: u64, +) -> Duration { + let latest = match kind { + UpstreamMediaKind::Camera => state.latest_camera_remote_pts_us, + UpstreamMediaKind::Microphone => state.latest_microphone_remote_pts_us, + } + .unwrap_or(remote_pts_us); + Duration::from_micros(latest.saturating_sub(remote_pts_us)) +} + +fn video_is_too_far_behind_audio(video_pts_us: u64, audio_pts_us: u64) -> bool { + let slack_us = upstream_pairing_master_slack() + .as_micros() + .min(u64::MAX as u128) as u64; + video_pts_us.saturating_add(slack_us) < audio_pts_us +} + +fn live_lag_us(state: &UpstreamClockState) -> Option { + let latest_audio = state.latest_microphone_remote_pts_us?; + let audio_playhead = state.last_audio_presented_pts_us?; + let base = state.session_base_remote_pts_us?; + Some(latest_audio.saturating_sub(base.saturating_add(audio_playhead))) +} + +fn us_to_ms(value: u64) -> f64 { + value as f64 / 1000.0 +} + fn refresh_unpaired_pairing_anchor( state: &mut UpstreamClockState, kind: UpstreamMediaKind, diff --git a/server/src/upstream_media_runtime/config.rs b/server/src/upstream_media_runtime/config.rs index 615eee6..7e4fac7 100644 --- a/server/src/upstream_media_runtime/config.rs +++ b/server/src/upstream_media_runtime/config.rs @@ -21,10 +21,26 @@ pub(super) fn upstream_playout_delay() -> Duration { let delay_ms = std::env::var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS") .ok() .and_then(|value| value.trim().parse::().ok()) - .unwrap_or(1_000); + .unwrap_or(350); Duration::from_millis(delay_ms) } +pub(super) fn upstream_max_live_lag() -> Duration { + let lag_ms = std::env::var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(1_000); + Duration::from_millis(lag_ms.max(1)) +} + +pub(super) fn upstream_startup_timeout() -> Duration { + let timeout_ms = std::env::var("LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .unwrap_or(60_000); + Duration::from_millis(timeout_ms.max(1)) +} + pub(super) fn upstream_require_paired_startup() -> bool { std::env::var("LESAVKA_UPSTREAM_REQUIRE_PAIRED_STARTUP") .ok() @@ -45,10 +61,8 @@ pub(super) fn upstream_playout_offset_us(kind: UpstreamMediaKind) -> i64 { }; let default_offset_us = match kind { UpstreamMediaKind::Camera => FACTORY_MJPEG_VIDEO_OFFSET_US, - // Hardware sync probes on the MJPEG UVC path show the UAC leg arriving - // about 80ms after video when using the older +35ms default. Bias the - // server playout earlier so the shipped default lands in the preferred - // lip-sync band instead of hovering at the guardrail. + // 0.17 keeps shipped offsets small: the planner owns freshness and + // pairing, while calibration only handles sub-frame trim. UpstreamMediaKind::Microphone => FACTORY_MJPEG_AUDIO_OFFSET_US, }; std::env::var(name) @@ -61,7 +75,7 @@ pub(super) fn upstream_pairing_master_slack() -> Duration { let slack_us = std::env::var("LESAVKA_UPSTREAM_PAIR_SLACK_US") .ok() .and_then(|value| value.trim().parse::().ok()) - .unwrap_or(20_000); + .unwrap_or(80_000); Duration::from_micros(slack_us) } @@ -75,7 +89,7 @@ pub(super) fn upstream_reanchor_late_threshold(playout_delay: Duration) -> Durat let default_ms = (playout_delay.as_millis().min(u64::MAX as u128) as u64) .saturating_div(2) - .max(250); + .max(100); Duration::from_millis(default_ms) } @@ -87,10 +101,6 @@ pub(super) fn upstream_camera_startup_grace_us() -> u64 { .saturating_mul(1_000) } -pub(super) fn upstream_reanchor_window_us(playout_delay: Duration) -> u64 { - playout_delay.as_micros().min(u64::MAX as u128) as u64 -} - pub(super) fn apply_playout_offset(base: Instant, offset_us: i64) -> Instant { if offset_us >= 0 { base + Duration::from_micros(offset_us as u64) diff --git a/server/src/upstream_media_runtime/lease_lifecycle.rs b/server/src/upstream_media_runtime/lease_lifecycle.rs index b5b9eaa..1e732f9 100644 --- a/server/src/upstream_media_runtime/lease_lifecycle.rs +++ b/server/src/upstream_media_runtime/lease_lifecycle.rs @@ -55,8 +55,14 @@ impl UpstreamMediaRuntime { if starting_new_session { state.session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst) + 1; reset_timing_anchors(&mut state); + state.session_started_at = Some(tokio::time::Instant::now()); + state.phase = UpstreamSyncPhase::Acquiring; + state.last_reason = "new upstream session acquiring media".to_string(); } else if reset_on_replace && replacing_existing_owner { reset_timing_anchors(&mut state); + state.session_started_at = Some(tokio::time::Instant::now()); + state.phase = UpstreamSyncPhase::Acquiring; + state.last_reason = "upstream stream replacement reset media anchors".to_string(); } match kind { UpstreamMediaKind::Camera => state.active_camera_generation = Some(generation), @@ -138,16 +144,28 @@ impl UpstreamMediaRuntime { } fn reset_timing_anchors(state: &mut UpstreamClockState) { + state.session_started_at = None; state.first_camera_remote_pts_us = None; state.first_microphone_remote_pts_us = None; + state.latest_camera_remote_pts_us = None; + state.latest_microphone_remote_pts_us = None; state.camera_startup_ready = false; state.session_base_remote_pts_us = None; state.last_video_local_pts_us = None; state.last_audio_local_pts_us = None; + state.last_video_presented_pts_us = None; + state.last_audio_presented_pts_us = None; state.camera_packet_count = 0; state.microphone_packet_count = 0; state.startup_anchor_logged = false; state.playout_epoch = None; state.pairing_anchor_deadline = None; - state.catastrophic_reanchor_done = false; + state.freshness_reanchors = 0; + state.stale_audio_drops = 0; + state.stale_video_drops = 0; + state.skew_video_drops = 0; + state.startup_timeouts = 0; + state.video_freezes = 0; + state.phase = UpstreamSyncPhase::Acquiring; + state.last_reason = "timing anchors reset".to_string(); } diff --git a/server/src/upstream_media_runtime/state.rs b/server/src/upstream_media_runtime/state.rs index ac57526..ce6cbc0 100644 --- a/server/src/upstream_media_runtime/state.rs +++ b/server/src/upstream_media_runtime/state.rs @@ -1,20 +1,59 @@ use tokio::time::Instant; +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum UpstreamSyncPhase { + Acquiring, + Syncing, + Live, + Healing, + Failed, +} + +impl UpstreamSyncPhase { + pub fn as_str(self) -> &'static str { + match self { + Self::Acquiring => "acquiring", + Self::Syncing => "syncing", + Self::Live => "live", + Self::Healing => "healing", + Self::Failed => "failed", + } + } +} + #[derive(Debug, Default)] pub(super) struct UpstreamClockState { pub session_id: u64, pub active_camera_generation: Option, pub active_microphone_generation: Option, + pub session_started_at: Option, + pub phase: UpstreamSyncPhase, pub first_camera_remote_pts_us: Option, pub first_microphone_remote_pts_us: Option, + pub latest_camera_remote_pts_us: Option, + pub latest_microphone_remote_pts_us: Option, pub camera_startup_ready: bool, pub session_base_remote_pts_us: Option, pub last_video_local_pts_us: Option, pub last_audio_local_pts_us: Option, + pub last_video_presented_pts_us: Option, + pub last_audio_presented_pts_us: Option, pub camera_packet_count: u64, pub microphone_packet_count: u64, pub startup_anchor_logged: bool, pub playout_epoch: Option, pub pairing_anchor_deadline: Option, - pub catastrophic_reanchor_done: bool, + pub freshness_reanchors: u64, + pub stale_audio_drops: u64, + pub stale_video_drops: u64, + pub skew_video_drops: u64, + pub startup_timeouts: u64, + pub video_freezes: u64, + pub last_reason: String, +} + +impl Default for UpstreamSyncPhase { + fn default() -> Self { + Self::Acquiring + } } diff --git a/server/src/upstream_media_runtime/tests/config.rs b/server/src/upstream_media_runtime/tests/config.rs index f660004..eb2388e 100644 --- a/server/src/upstream_media_runtime/tests/config.rs +++ b/server/src/upstream_media_runtime/tests/config.rs @@ -4,9 +4,9 @@ use std::time::Duration; #[test] #[serial(upstream_media_runtime)] -fn upstream_playout_delay_defaults_to_one_second_and_accepts_overrides() { +fn upstream_playout_delay_defaults_to_freshness_budget_and_accepts_overrides() { temp_env::with_var_unset("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", || { - assert_eq!(super::upstream_playout_delay(), Duration::from_secs(1)); + assert_eq!(super::upstream_playout_delay(), Duration::from_millis(350)); }); temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("250"), || { @@ -14,6 +14,33 @@ fn upstream_playout_delay_defaults_to_one_second_and_accepts_overrides() { }); } +#[test] +#[serial(upstream_media_runtime)] +fn upstream_max_live_lag_defaults_to_one_second_and_accepts_overrides() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", || { + assert_eq!(super::upstream_max_live_lag(), Duration::from_secs(1)); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("750"), || { + assert_eq!(super::upstream_max_live_lag(), Duration::from_millis(750)); + }); +} + +#[test] +#[serial(upstream_media_runtime)] +fn upstream_startup_timeout_defaults_to_one_minute_and_accepts_overrides() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS", || { + assert_eq!(super::upstream_startup_timeout(), Duration::from_secs(60)); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS", Some("2500"), || { + assert_eq!( + super::upstream_startup_timeout(), + Duration::from_millis(2500) + ); + }); +} + #[test] #[serial(upstream_media_runtime)] fn upstream_requires_paired_startup_by_default_with_compatibility_override() { @@ -43,7 +70,7 @@ fn upstream_playout_offsets_default_to_mjpeg_calibration_and_accept_overrides() temp_env::with_var_unset("LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US", || { assert_eq!( super::upstream_playout_offset_us(UpstreamMediaKind::Microphone), - 1_260_000 + 0 ); assert_eq!( super::upstream_playout_offset_us(UpstreamMediaKind::Camera), @@ -76,11 +103,11 @@ fn upstream_playout_offsets_default_to_mjpeg_calibration_and_accept_overrides() #[test] #[serial(upstream_media_runtime)] -fn upstream_pairing_master_slack_defaults_to_twenty_ms_and_accepts_overrides() { +fn upstream_pairing_master_slack_defaults_to_eighty_ms_and_accepts_overrides() { temp_env::with_var_unset("LESAVKA_UPSTREAM_PAIR_SLACK_US", || { assert_eq!( super::upstream_pairing_master_slack(), - Duration::from_micros(20_000) + Duration::from_micros(80_000) ); }); @@ -102,7 +129,7 @@ fn upstream_reanchor_late_threshold_defaults_to_half_the_buffer_and_accepts_over ); assert_eq!( super::upstream_reanchor_late_threshold(Duration::from_millis(100)), - Duration::from_millis(250) + Duration::from_millis(100) ); }); diff --git a/server/src/upstream_media_runtime/tests/planning.rs b/server/src/upstream_media_runtime/tests/planning.rs index d22e896..242a736 100644 --- a/server/src/upstream_media_runtime/tests/planning.rs +++ b/server/src/upstream_media_runtime/tests/planning.rs @@ -208,7 +208,7 @@ fn overlap_anchor_gets_a_fresh_playout_budget_when_pairing_finishes_late() { #[test] #[serial(upstream_media_runtime)] -fn catastrophic_lateness_reanchors_only_once_per_session() { +fn catastrophic_lateness_reanchors_repeatedly_to_preserve_freshness() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { let runtime = UpstreamMediaRuntime::new(); @@ -226,12 +226,18 @@ fn catastrophic_lateness_reanchors_only_once_per_session() { std::thread::sleep(Duration::from_millis(30)); let first_recovered = play(runtime.plan_audio_pts(1_000_000)); assert!(first_recovered.due_at > tokio::time::Instant::now()); + assert!(first_recovered.late_by <= Duration::from_millis(1)); std::thread::sleep(Duration::from_millis(30)); - let second_late = play(runtime.plan_audio_pts(1_000_001)); + let second_recovered = play(runtime.plan_audio_pts(1_000_001)); + assert!(second_recovered.due_at > tokio::time::Instant::now()); assert!( - second_late.late_by > Duration::from_millis(5), - "session should not keep extending itself with repeated reanchors" + second_recovered.late_by <= Duration::from_millis(1), + "0.17 planner must keep healing instead of preserving stale timing" + ); + assert!( + runtime.snapshot().freshness_reanchors >= 2, + "repeated freshness reanchors should be counted for diagnostics" ); }); }); @@ -239,7 +245,7 @@ fn catastrophic_lateness_reanchors_only_once_per_session() { #[test] #[serial(upstream_media_runtime)] -fn catastrophic_lateness_does_not_reanchor_once_the_session_is_well_past_startup() { +fn catastrophic_lateness_reanchors_even_after_startup_window() { temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("20"), || { temp_env::with_var("LESAVKA_UPSTREAM_REANCHOR_LATE_MS", Some("5"), || { let runtime = UpstreamMediaRuntime::new(); @@ -258,17 +264,152 @@ fn catastrophic_lateness_does_not_reanchor_once_the_session_is_well_past_startup let late_audio = play(runtime.plan_audio_pts(1_100_000)); assert_eq!(late_audio.local_pts_us, 100_000); assert!( - late_audio.late_by > Duration::from_millis(5), - "late packet should remain late instead of reanchoring the shared epoch mid-session" + late_audio.late_by <= Duration::from_millis(1), + "0.17 planner should heal mid-session lateness instead of preserving drift" ); assert!( - late_audio.due_at <= tokio::time::Instant::now(), - "mid-session lateness should no longer push due_at back into the future" + late_audio.due_at > tokio::time::Instant::now(), + "mid-session freshness healing should push due_at back into the live budget" ); + assert!(runtime.snapshot().freshness_reanchors >= 1); }); }); } +#[test] +#[serial(upstream_media_runtime)] +fn stale_audio_behind_the_freshest_audio_frontier_is_dropped() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("50"), || { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio = play(runtime.plan_audio_pts(1_000_000)); + let _video = play(runtime.plan_video_pts(1_000_000, 16_666)); + let _fresh_audio = play(runtime.plan_audio_pts(2_000_000)); + + assert!(matches!( + runtime.plan_audio_pts(1_900_000), + super::UpstreamPlanDecision::DropStale("packet exceeded max live lag") + )); + assert_eq!(runtime.snapshot().stale_audio_drops, 1); + }); + }); +} + +#[test] +#[serial(upstream_media_runtime)] +fn stale_video_behind_the_freshest_video_frontier_is_dropped() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + temp_env::with_var("LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS", Some("50"), || { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio = play(runtime.plan_audio_pts(1_000_000)); + let _video = play(runtime.plan_video_pts(1_000_000, 16_666)); + let _fresh_video = play(runtime.plan_video_pts(2_000_000, 16_666)); + + assert!(matches!( + runtime.plan_video_pts(1_900_000, 16_666), + super::UpstreamPlanDecision::DropStale("packet exceeded max live lag") + )); + let snapshot = runtime.snapshot(); + assert_eq!(snapshot.stale_video_drops, 1); + assert_eq!(snapshot.video_freezes, 1); + }); + }); +} + +#[test] +#[serial(upstream_media_runtime)] +fn video_too_far_behind_audio_master_is_dropped_and_counted_as_freeze() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + temp_env::with_var("LESAVKA_UPSTREAM_PAIR_SLACK_US", Some("50000"), || { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let _audio = play(runtime.plan_audio_pts(1_000_000)); + let _video = play(runtime.plan_video_pts(1_000_000, 16_666)); + let _audio_master = play(runtime.plan_audio_pts(1_200_000)); + + assert!(matches!( + runtime.plan_video_pts(1_100_000, 16_666), + super::UpstreamPlanDecision::DropStale( + "video frame was too far behind audio master" + ) + )); + let snapshot = runtime.snapshot(); + assert_eq!(snapshot.skew_video_drops, 1); + assert_eq!(snapshot.video_freezes, 1); + }); + }); +} + +#[test] +#[serial(upstream_media_runtime)] +fn paired_startup_times_out_instead_of_waiting_forever() { + temp_env::with_var("LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS", Some("0"), || { + temp_env::with_var("LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS", Some("1"), || { + let runtime = UpstreamMediaRuntime::new(); + runtime.set_playout_offsets(0, 0); + let _camera = runtime.activate_camera(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + std::thread::sleep(Duration::from_millis(3)); + assert!(matches!( + runtime.plan_video_pts(1_016_666, 16_666), + super::UpstreamPlanDecision::StartupFailed( + "paired upstream startup did not converge before timeout" + ) + )); + let snapshot = runtime.snapshot(); + assert_eq!(snapshot.phase, "failed"); + assert_eq!(snapshot.startup_timeouts, 1); + }); + }); +} + +#[test] +#[serial(upstream_media_runtime)] +fn planner_snapshot_tracks_presented_playheads_and_skew() { + let runtime = runtime_without_offsets(); + let _camera = runtime.activate_camera(); + let _microphone = runtime.activate_microphone(); + + assert!(matches!( + runtime.plan_video_pts(1_000_000, 16_666), + super::UpstreamPlanDecision::AwaitingPair + )); + let audio = play(runtime.plan_audio_pts(1_000_000)); + let video = play(runtime.plan_video_pts(1_000_000, 16_666)); + runtime.mark_audio_presented(audio.local_pts_us); + runtime.mark_video_presented(video.local_pts_us); + + let snapshot = runtime.snapshot(); + assert_eq!(snapshot.phase, "live"); + assert_eq!(snapshot.last_audio_presented_pts_us, Some(0)); + assert_eq!(snapshot.last_video_presented_pts_us, Some(0)); + assert_eq!(snapshot.planner_skew_ms, Some(0.0)); +} + #[test] #[serial(upstream_media_runtime)] fn default_runtime_covers_video_map_play_path() { diff --git a/server/src/upstream_media_runtime/types.rs b/server/src/upstream_media_runtime/types.rs index 4b3fd10..47d6f38 100644 --- a/server/src/upstream_media_runtime/types.rs +++ b/server/src/upstream_media_runtime/types.rs @@ -28,6 +28,9 @@ pub struct PlannedUpstreamPacket { pub due_at: Instant, /// How late the packet already is when planned, if any. pub late_by: Duration, + /// How far this packet's capture time trails the freshest relevant input + /// frontier known to the planner. + pub source_lag: Duration, } /// Result of asking the shared upstream runtime how to handle one packet. @@ -39,6 +42,31 @@ pub enum UpstreamPlanDecision { /// Discard the packet because it belongs before the shared overlapping A/V /// session base and would only reintroduce startup skew. DropBeforeOverlap, + /// Discard a packet that would violate the live freshness or A/V skew + /// contract. + DropStale(&'static str), + /// Stop the stream because paired startup never converged. + StartupFailed(&'static str), /// Present the packet at the planned wall-clock deadline. Play(PlannedUpstreamPacket), } + +/// Snapshot of the authoritative upstream sync planner. +#[derive(Clone, Debug)] +pub struct UpstreamPlannerSnapshot { + pub session_id: u64, + pub phase: &'static str, + pub latest_camera_remote_pts_us: Option, + pub latest_microphone_remote_pts_us: Option, + pub last_video_presented_pts_us: Option, + pub last_audio_presented_pts_us: Option, + pub live_lag_ms: Option, + pub planner_skew_ms: Option, + pub stale_audio_drops: u64, + pub stale_video_drops: u64, + pub skew_video_drops: u64, + pub freshness_reanchors: u64, + pub startup_timeouts: u64, + pub video_freezes: u64, + pub last_reason: String, +} diff --git a/testing/tests/client_launcher_layout_contract.rs b/testing/tests/client_launcher_layout_contract.rs index 9fff913..0d69156 100644 --- a/testing/tests/client_launcher_layout_contract.rs +++ b/testing/tests/client_launcher_layout_contract.rs @@ -333,9 +333,12 @@ fn relay_controls_keep_connect_inline_with_server_entry() { assert!(UI_LAYOUT_SRC.contains("tools_buttons.set_homogeneous(true);")); assert!(UI_LAYOUT_SRC.contains("tools_heading.set_width_chars(RELAY_SUBGROUP_LABEL_WIDTH);")); assert!(UI_LAYOUT_SRC.contains("let clipboard_button = rail_button(\"Clipboard\"")); - assert!(UI_LAYOUT_SRC.contains("let usb_recover_button = rail_button(\"USB\"")); - assert!(UI_LAYOUT_SRC.contains("let uac_recover_button = rail_button(\"UAC\"")); - assert!(UI_LAYOUT_SRC.contains("let uvc_recover_button = rail_button(\"UVC\"")); + assert!(UI_LAYOUT_SRC.contains("let usb_recover_button = rail_button(")); + assert!(UI_LAYOUT_SRC.contains("\"USB\",")); + assert!(UI_LAYOUT_SRC.contains("let uac_recover_button = rail_button(")); + assert!(UI_LAYOUT_SRC.contains("\"UAC\",")); + assert!(UI_LAYOUT_SRC.contains("let uvc_recover_button = rail_button(")); + assert!(UI_LAYOUT_SRC.contains("\"UVC\",")); assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&usb_recover_button);")); assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&uac_recover_button);")); assert!(UI_LAYOUT_SRC.contains("recovery_buttons.append(&uvc_recover_button);")); diff --git a/testing/tests/server_install_script_contract.rs b/testing/tests/server_install_script_contract.rs index da634d7..1855776 100644 --- a/testing/tests/server_install_script_contract.rs +++ b/testing/tests/server_install_script_contract.rs @@ -19,6 +19,8 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { "LESAVKA_HDMI_SINK=%s", "LESAVKA_HDMI_FBDEV=%s", "LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS=%s", + "LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS=%s", + "LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS=%s", "LESAVKA_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=%s", "LESAVKA_UPSTREAM_VIDEO_PLAYOUT_OFFSET_US=%s", "LESAVKA_UPSTREAM_PAIR_SLACK_US=%s", @@ -49,16 +51,21 @@ fn server_install_pins_hdmi_camera_and_display_defaults() { assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_WIDTH:-1920}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_HEIGHT:-1080}")); assert!(SERVER_INSTALL.contains("${LESAVKA_HDMI_SINK:-fbdevsink}")); - assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-1000}")); - assert!(SERVER_INSTALL.contains("DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=1260000")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PLAYOUT_DELAY_MS:-350}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_MAX_LIVE_LAG_MS:-1000}")); + assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STARTUP_TIMEOUT_MS:-60000}")); + assert!(SERVER_INSTALL.contains("DEFAULT_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=0")); assert!(SERVER_INSTALL.contains("LEGACY_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=-45000")); + assert!( + SERVER_INSTALL.contains("PREVIOUS_TUNED_MJPEG_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US=1260000") + ); assert!( SERVER_INSTALL.contains("LESAVKA_INSTALL_UPSTREAM_AUDIO_PLAYOUT_OFFSET_US"), "install-specific offset override should bypass stale ambient runtime env" ); assert!( - SERVER_INSTALL.contains("migrating stale upstream audio playout offset to +1260ms"), - "installer should not preserve the old MJPEG/UVC sync baseline accidentally" + SERVER_INSTALL.contains("migrating stale upstream audio playout offset to the 0.17 freshness-first planner default"), + "installer should not preserve old MJPEG/UVC sync baselines accidentally" ); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_PAIR_SLACK_US:-80000}")); assert!(SERVER_INSTALL.contains("${LESAVKA_UPSTREAM_STALE_DROP_MS:-80}")); diff --git a/testing/tests/server_main_rpc_contract.rs b/testing/tests/server_main_rpc_contract.rs index 4f35771..fcdab23 100644 --- a/testing/tests/server_main_rpc_contract.rs +++ b/testing/tests/server_main_rpc_contract.rs @@ -467,7 +467,7 @@ mod server_main_rpc { .expect("initial calibration") .into_inner(); assert_eq!(initial.profile, "mjpeg"); - assert_eq!(initial.active_audio_offset_us, 1_260_000); + assert_eq!(initial.active_audio_offset_us, 0); let adjusted = rt .block_on(async { @@ -486,7 +486,7 @@ mod server_main_rpc { .expect("calibrate") .into_inner(); assert_eq!(adjusted.source, "blind"); - assert_eq!(adjusted.active_audio_offset_us, -35_000); + assert_eq!(adjusted.active_audio_offset_us, 10_000); assert_eq!(adjusted.active_video_offset_us, 2_000); assert!( std::fs::read_to_string(calibration_path) @@ -496,4 +496,27 @@ mod server_main_rpc { }, ); } + + #[test] + #[cfg(coverage)] + #[serial] + fn upstream_sync_rpc_surfaces_planner_snapshot() { + let (_dir, handler) = build_handler_for_tests(); + let rt = tokio::runtime::Runtime::new().expect("runtime"); + + let lease_camera = handler.upstream_media_rt.activate_camera(); + let lease_microphone = handler.upstream_media_rt.activate_microphone(); + assert_eq!(lease_camera.session_id, lease_microphone.session_id); + + let initial = rt + .block_on(async { + handler + .get_upstream_sync(tonic::Request::new(Empty {})) + .await + }) + .expect("planner sync state") + .into_inner(); + assert_eq!(initial.phase, "acquiring"); + assert_eq!(initial.session_id, lease_camera.session_id); + } }