From f1e3faa404d95fa6132d33c40148f4bedef27f61 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 3 May 2026 03:18:50 -0300 Subject: [PATCH] fix: prime bundled webcam startup --- AGENTS.md | 8 +- Cargo.lock | 6 +- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- server/src/main/relay_service.rs | 107 +++++++++++++++++++------ server/src/main/relay_service_tests.rs | 28 ++++++- 7 files changed, 120 insertions(+), 35 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index 56e7eb4..1f6892f 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,6 +1,6 @@ # Lesavka Agent Notes -## 0.18.0 Bundled Webcam A/V Migration Checklist +## 0.18.1 Bundled Webcam A/V Migration Checklist Context: manual Google Meet and mirrored-probe testing showed the split webcam and microphone uplink design is too fragile under real browser/device pressure. @@ -25,6 +25,9 @@ explicit no-camera path. `mic-only` so we never confuse the architectures during debugging. - [x] Sync protection takes precedence over freshness and smoothness: bad mixed bundle timing is dropped coherently instead of letting one side play alone. +- [x] Startup video is allowed to prime UVC if a first mixed bundle has bad + audio/video timing; the mismatched audio is dropped, preserving sync while + avoiding browser `Camera is starting` starvation. ### Wire Protocol - [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or @@ -60,6 +63,9 @@ explicit no-camera path. raw packet `pts`, and reset the bundled epoch on client-session changes. - [x] Keep server freshness drops/reanchors active for bundled media. - [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning. +- [x] Activate the camera relay before opening the microphone sink so UVC can + become ready even if UAC setup is slow. +- [x] Log the first bundled video frame handed to the camera sink. - [x] Continue reporting client timing and sink handoff diagnostics from bundled packets. - [ ] Add bundled-mode counters for first bundle, first audio push, first video feed, dropped stale bundles, and bundle queue age. diff --git a/Cargo.lock b/Cargo.lock index ca60a9f..d95369d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.18.0" +version = "0.18.1" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.18.0" +version = "0.18.1" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.18.0" +version = "0.18.1" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 1a539ae..371e87a 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.18.0" +version = "0.18.1" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index a1b11f2..7dcdace 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.18.0" +version = "0.18.1" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 2ae0fe3..f39050c 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.18.0" +version = "0.18.1" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 2f57adc..d8a38af 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -65,10 +65,19 @@ struct BundledTimingSummary { } #[cfg(not(coverage))] -impl BundledTimingSummary { - fn mixed(self) -> bool { - self.has_audio && self.has_video +fn bundled_events_are_mixed(events: &[BundledUpstreamEvent]) -> bool { + let has_audio = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Audio(_))); + let has_video = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Video(_))); + has_audio && has_video +} + +#[cfg(not(coverage))] +fn retain_startup_video_only(events: &mut Vec) -> bool { + if !bundled_events_are_mixed(events) { + return false; } + events.retain(|event| matches!(event, BundledUpstreamEvent::Video(_))); + !events.is_empty() } #[cfg(not(coverage))] @@ -240,6 +249,17 @@ impl Relay for Handler { fps = camera_cfg.fps, "📦 stream_webcam_media opened" ); + let (camera_session_id, relay, _relay_reused) = + match self.camera_rt.activate(&camera_cfg).await { + Ok(active) => active, + Err(err) => { + self.upstream_media_rt + .close_camera(camera_lease.generation); + self.upstream_media_rt + .close_microphone(microphone_lease.generation); + return Err(err); + } + }; let Some(microphone_sink_permit) = self .upstream_media_rt .reserve_microphone_sink(microphone_lease.generation) @@ -263,18 +283,6 @@ impl Relay for Handler { .close_microphone(microphone_lease.generation); Status::internal(format!("{e:#}")) })?; - let (camera_session_id, relay, _relay_reused) = - match self.camera_rt.activate(&camera_cfg).await { - Ok(active) => active, - Err(err) => { - sink.finish(); - self.upstream_media_rt - .close_camera(camera_lease.generation); - self.upstream_media_rt - .close_microphone(microphone_lease.generation); - return Err(err); - } - }; let camera_rt = self.camera_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone(); let frame_step_us = (1_000_000u64 / u64::from(camera_cfg.fps.max(1))).max(1); @@ -287,6 +295,7 @@ impl Relay for Handler { let mut clock = BundledPlayoutClock::default(); let mut last_bundle_session_id = None; let mut last_bundle_seq = None; + let mut video_presented_once = false; let mut outcome = "aborted"; 'bundled_loop: loop { let bundle = match inbound.next().await { @@ -368,22 +377,38 @@ impl Relay for Handler { ); } if timing_summary.mixed_span_too_wide { - warn!( - rpc_id, - session_id = camera_lease.session_id, - client_bundle_session_id = bundle.session_id, - bundle_seq = bundle.seq, - span_ms = timing_summary.capture_span_us / 1000, - "📦 bundled mixed A/V capture span is too wide; dropping the bundle to protect sync" - ); - continue; + if !video_presented_once && retain_startup_video_only(&mut events) { + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + span_ms = timing_summary.capture_span_us / 1000, + "📦 bundled startup A/V span is too wide; dropping audio but feeding video to prime the camera device" + ); + } else { + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + span_ms = timing_summary.capture_span_us / 1000, + "📦 bundled mixed A/V capture span is too wide; dropping the bundle to protect sync" + ); + continue; + } } let Some((base_remote_pts_us, epoch)) = clock.ensure(&events) else { continue; }; - let mixed_bundle = timing_summary.mixed(); + let mut mixed_bundle = bundled_events_are_mixed(&events); + let startup_video_priming = !video_presented_once + && events + .iter() + .any(|event| matches!(event, BundledUpstreamEvent::Video(_))); let mut planned_events = Vec::with_capacity(events.len()); let mut drop_mixed_bundle = false; + let mut dropped_audio_for_startup_video = false; for event in events { let kind = event.kind(); let min_step_us = match kind { @@ -399,6 +424,10 @@ impl Relay for Handler { ) { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { + if startup_video_priming && kind == UpstreamMediaKind::Microphone { + dropped_audio_for_startup_video = true; + continue; + } if mixed_bundle { drop_mixed_bundle = true; } @@ -412,6 +441,10 @@ impl Relay for Handler { reason, "📦 bundled upstream packet dropped by freshness planner" ); + if startup_video_priming && kind == UpstreamMediaKind::Microphone { + dropped_audio_for_startup_video = true; + continue; + } if mixed_bundle { drop_mixed_bundle = true; } @@ -437,6 +470,10 @@ impl Relay for Handler { pts = plan.local_pts_us, "📦 bundled upstream packet dropped after missing freshness budget" ); + if startup_video_priming && kind == UpstreamMediaKind::Microphone { + dropped_audio_for_startup_video = true; + continue; + } if mixed_bundle { drop_mixed_bundle = true; } @@ -444,6 +481,16 @@ impl Relay for Handler { } planned_events.push((event, plan)); } + if dropped_audio_for_startup_video { + mixed_bundle = false; + warn!( + rpc_id, + session_id = camera_lease.session_id, + client_bundle_session_id = bundle.session_id, + bundle_seq = bundle.seq, + "📦 dropped startup audio from a bad bundled packet but kept video-only playout so the camera can start" + ); + } if drop_mixed_bundle { warn!( rpc_id, @@ -491,6 +538,16 @@ impl Relay for Handler { packet.pts = plan.local_pts_us; let presented_pts = packet.pts; relay.feed(packet); + if !video_presented_once { + info!( + rpc_id, + session_id = camera_lease.session_id, + camera_session_id, + pts = presented_pts, + "📦 first bundled video frame fed to camera sink" + ); + video_presented_once = true; + } upstream_media_rt.mark_video_presented(presented_pts, plan.due_at); } } diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index c401dbb..ed8032b 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -2,8 +2,9 @@ #[allow(clippy::items_after_test_module)] mod tests { use super::{ - BundledUpstreamEvent, UpstreamStreamCleanup, retain_freshest_audio_packet, - retain_freshest_video_packet, summarize_bundled_timing, + BundledUpstreamEvent, UpstreamStreamCleanup, bundled_events_are_mixed, + retain_freshest_audio_packet, retain_freshest_video_packet, retain_startup_video_only, + summarize_bundled_timing, }; use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket}; use lesavka_server::upstream_media_runtime::{ @@ -136,12 +137,33 @@ mod tests { let summary = summarize_bundled_timing(&bundle, &events).expect("timing summary"); - assert!(summary.mixed()); + assert!(bundled_events_are_mixed(&events)); assert_eq!(summary.capture_span_us, 400_000); assert!(!summary.capture_bounds_match); assert!(summary.mixed_span_too_wide); } + #[test] + fn startup_video_retention_drops_audio_from_bad_mixed_bundle() { + let mut events = vec![ + BundledUpstreamEvent::Audio(AudioPacket { + client_capture_pts_us: 1_000_000, + ..Default::default() + }), + BundledUpstreamEvent::Video(VideoPacket { + client_capture_pts_us: 1_500_000, + ..Default::default() + }), + ]; + + assert!(bundled_events_are_mixed(&events)); + assert!(retain_startup_video_only(&mut events)); + + assert!(!bundled_events_are_mixed(&events)); + assert_eq!(events.len(), 1); + assert!(matches!(events[0], BundledUpstreamEvent::Video(_))); + } + #[test] fn upstream_cleanup_guard_closes_its_microphone_generation() { let runtime = Arc::new(UpstreamMediaRuntime::new());