fix: prime bundled webcam startup

This commit is contained in:
Brad Stein 2026-05-03 03:18:50 -03:00
parent 82acfacc1f
commit f1e3faa404
7 changed files with 120 additions and 35 deletions

View File

@ -1,6 +1,6 @@
# Lesavka Agent Notes # Lesavka Agent Notes
## 0.18.0 Bundled Webcam A/V Migration Checklist ## 0.18.1 Bundled Webcam A/V Migration Checklist
Context: manual Google Meet and mirrored-probe testing showed the split webcam Context: manual Google Meet and mirrored-probe testing showed the split webcam
and microphone uplink design is too fragile under real browser/device pressure. and microphone uplink design is too fragile under real browser/device pressure.
@ -25,6 +25,9 @@ explicit no-camera path.
`mic-only` so we never confuse the architectures during debugging. `mic-only` so we never confuse the architectures during debugging.
- [x] Sync protection takes precedence over freshness and smoothness: bad mixed - [x] Sync protection takes precedence over freshness and smoothness: bad mixed
bundle timing is dropped coherently instead of letting one side play alone. bundle timing is dropped coherently instead of letting one side play alone.
- [x] Startup video is allowed to prime UVC if a first mixed bundle has bad
audio/video timing; the mismatched audio is dropped, preserving sync while
avoiding browser `Camera is starting` starvation.
### Wire Protocol ### Wire Protocol
- [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or - [x] Add `UpstreamMediaBundle` containing one optional video frame plus zero or
@ -60,6 +63,9 @@ explicit no-camera path.
raw packet `pts`, and reset the bundled epoch on client-session changes. raw packet `pts`, and reset the bundled epoch on client-session changes.
- [x] Keep server freshness drops/reanchors active for bundled media. - [x] Keep server freshness drops/reanchors active for bundled media.
- [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning. - [x] Drop mixed A/V bundles coherently when one side fails freshness/sync planning.
- [x] Activate the camera relay before opening the microphone sink so UVC can
become ready even if UAC setup is slow.
- [x] Log the first bundled video frame handed to the camera sink.
- [x] Continue reporting client timing and sink handoff diagnostics from bundled packets. - [x] Continue reporting client timing and sink handoff diagnostics from bundled packets.
- [ ] Add bundled-mode counters for first bundle, first audio push, first video feed, - [ ] Add bundled-mode counters for first bundle, first audio push, first video feed,
dropped stale bundles, and bundle queue age. dropped stale bundles, and bundle queue age.

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "lesavka_client" name = "lesavka_client"
version = "0.18.0" version = "0.18.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-stream", "async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_common" name = "lesavka_common"
version = "0.18.0" version = "0.18.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]] [[package]]
name = "lesavka_server" name = "lesavka_server"
version = "0.18.0" version = "0.18.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"base64", "base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package] [package]
name = "lesavka_client" name = "lesavka_client"
version = "0.18.0" version = "0.18.1"
edition = "2024" edition = "2024"
[dependencies] [dependencies]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "lesavka_common" name = "lesavka_common"
version = "0.18.0" version = "0.18.1"
edition = "2024" edition = "2024"
build = "build.rs" build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package] [package]
name = "lesavka_server" name = "lesavka_server"
version = "0.18.0" version = "0.18.1"
edition = "2024" edition = "2024"
autobins = false autobins = false

View File

@ -65,10 +65,19 @@ struct BundledTimingSummary {
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
impl BundledTimingSummary { fn bundled_events_are_mixed(events: &[BundledUpstreamEvent]) -> bool {
fn mixed(self) -> bool { let has_audio = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Audio(_)));
self.has_audio && self.has_video let has_video = events.iter().any(|event| matches!(event, BundledUpstreamEvent::Video(_)));
has_audio && has_video
}
#[cfg(not(coverage))]
fn retain_startup_video_only(events: &mut Vec<BundledUpstreamEvent>) -> bool {
if !bundled_events_are_mixed(events) {
return false;
} }
events.retain(|event| matches!(event, BundledUpstreamEvent::Video(_)));
!events.is_empty()
} }
#[cfg(not(coverage))] #[cfg(not(coverage))]
@ -240,6 +249,17 @@ impl Relay for Handler {
fps = camera_cfg.fps, fps = camera_cfg.fps,
"📦 stream_webcam_media opened" "📦 stream_webcam_media opened"
); );
let (camera_session_id, relay, _relay_reused) =
match self.camera_rt.activate(&camera_cfg).await {
Ok(active) => active,
Err(err) => {
self.upstream_media_rt
.close_camera(camera_lease.generation);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
return Err(err);
}
};
let Some(microphone_sink_permit) = self let Some(microphone_sink_permit) = self
.upstream_media_rt .upstream_media_rt
.reserve_microphone_sink(microphone_lease.generation) .reserve_microphone_sink(microphone_lease.generation)
@ -263,18 +283,6 @@ impl Relay for Handler {
.close_microphone(microphone_lease.generation); .close_microphone(microphone_lease.generation);
Status::internal(format!("{e:#}")) Status::internal(format!("{e:#}"))
})?; })?;
let (camera_session_id, relay, _relay_reused) =
match self.camera_rt.activate(&camera_cfg).await {
Ok(active) => active,
Err(err) => {
sink.finish();
self.upstream_media_rt
.close_camera(camera_lease.generation);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
return Err(err);
}
};
let camera_rt = self.camera_rt.clone(); let camera_rt = self.camera_rt.clone();
let upstream_media_rt = self.upstream_media_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone();
let frame_step_us = (1_000_000u64 / u64::from(camera_cfg.fps.max(1))).max(1); let frame_step_us = (1_000_000u64 / u64::from(camera_cfg.fps.max(1))).max(1);
@ -287,6 +295,7 @@ impl Relay for Handler {
let mut clock = BundledPlayoutClock::default(); let mut clock = BundledPlayoutClock::default();
let mut last_bundle_session_id = None; let mut last_bundle_session_id = None;
let mut last_bundle_seq = None; let mut last_bundle_seq = None;
let mut video_presented_once = false;
let mut outcome = "aborted"; let mut outcome = "aborted";
'bundled_loop: loop { 'bundled_loop: loop {
let bundle = match inbound.next().await { let bundle = match inbound.next().await {
@ -368,22 +377,38 @@ impl Relay for Handler {
); );
} }
if timing_summary.mixed_span_too_wide { if timing_summary.mixed_span_too_wide {
warn!( if !video_presented_once && retain_startup_video_only(&mut events) {
rpc_id, warn!(
session_id = camera_lease.session_id, rpc_id,
client_bundle_session_id = bundle.session_id, session_id = camera_lease.session_id,
bundle_seq = bundle.seq, client_bundle_session_id = bundle.session_id,
span_ms = timing_summary.capture_span_us / 1000, bundle_seq = bundle.seq,
"📦 bundled mixed A/V capture span is too wide; dropping the bundle to protect sync" span_ms = timing_summary.capture_span_us / 1000,
); "📦 bundled startup A/V span is too wide; dropping audio but feeding video to prime the camera device"
continue; );
} else {
warn!(
rpc_id,
session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
span_ms = timing_summary.capture_span_us / 1000,
"📦 bundled mixed A/V capture span is too wide; dropping the bundle to protect sync"
);
continue;
}
} }
let Some((base_remote_pts_us, epoch)) = clock.ensure(&events) else { let Some((base_remote_pts_us, epoch)) = clock.ensure(&events) else {
continue; continue;
}; };
let mixed_bundle = timing_summary.mixed(); let mut mixed_bundle = bundled_events_are_mixed(&events);
let startup_video_priming = !video_presented_once
&& events
.iter()
.any(|event| matches!(event, BundledUpstreamEvent::Video(_)));
let mut planned_events = Vec::with_capacity(events.len()); let mut planned_events = Vec::with_capacity(events.len());
let mut drop_mixed_bundle = false; let mut drop_mixed_bundle = false;
let mut dropped_audio_for_startup_video = false;
for event in events { for event in events {
let kind = event.kind(); let kind = event.kind();
let min_step_us = match kind { let min_step_us = match kind {
@ -399,6 +424,10 @@ impl Relay for Handler {
) { ) {
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan, lesavka_server::upstream_media_runtime::UpstreamPlanDecision::Play(plan) => plan,
lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => { lesavka_server::upstream_media_runtime::UpstreamPlanDecision::DropBeforeOverlap => {
if startup_video_priming && kind == UpstreamMediaKind::Microphone {
dropped_audio_for_startup_video = true;
continue;
}
if mixed_bundle { if mixed_bundle {
drop_mixed_bundle = true; drop_mixed_bundle = true;
} }
@ -412,6 +441,10 @@ impl Relay for Handler {
reason, reason,
"📦 bundled upstream packet dropped by freshness planner" "📦 bundled upstream packet dropped by freshness planner"
); );
if startup_video_priming && kind == UpstreamMediaKind::Microphone {
dropped_audio_for_startup_video = true;
continue;
}
if mixed_bundle { if mixed_bundle {
drop_mixed_bundle = true; drop_mixed_bundle = true;
} }
@ -437,6 +470,10 @@ impl Relay for Handler {
pts = plan.local_pts_us, pts = plan.local_pts_us,
"📦 bundled upstream packet dropped after missing freshness budget" "📦 bundled upstream packet dropped after missing freshness budget"
); );
if startup_video_priming && kind == UpstreamMediaKind::Microphone {
dropped_audio_for_startup_video = true;
continue;
}
if mixed_bundle { if mixed_bundle {
drop_mixed_bundle = true; drop_mixed_bundle = true;
} }
@ -444,6 +481,16 @@ impl Relay for Handler {
} }
planned_events.push((event, plan)); planned_events.push((event, plan));
} }
if dropped_audio_for_startup_video {
mixed_bundle = false;
warn!(
rpc_id,
session_id = camera_lease.session_id,
client_bundle_session_id = bundle.session_id,
bundle_seq = bundle.seq,
"📦 dropped startup audio from a bad bundled packet but kept video-only playout so the camera can start"
);
}
if drop_mixed_bundle { if drop_mixed_bundle {
warn!( warn!(
rpc_id, rpc_id,
@ -491,6 +538,16 @@ impl Relay for Handler {
packet.pts = plan.local_pts_us; packet.pts = plan.local_pts_us;
let presented_pts = packet.pts; let presented_pts = packet.pts;
relay.feed(packet); relay.feed(packet);
if !video_presented_once {
info!(
rpc_id,
session_id = camera_lease.session_id,
camera_session_id,
pts = presented_pts,
"📦 first bundled video frame fed to camera sink"
);
video_presented_once = true;
}
upstream_media_rt.mark_video_presented(presented_pts, plan.due_at); upstream_media_rt.mark_video_presented(presented_pts, plan.due_at);
} }
} }

View File

@ -2,8 +2,9 @@
#[allow(clippy::items_after_test_module)] #[allow(clippy::items_after_test_module)]
mod tests { mod tests {
use super::{ use super::{
BundledUpstreamEvent, UpstreamStreamCleanup, retain_freshest_audio_packet, BundledUpstreamEvent, UpstreamStreamCleanup, bundled_events_are_mixed,
retain_freshest_video_packet, summarize_bundled_timing, retain_freshest_audio_packet, retain_freshest_video_packet, retain_startup_video_only,
summarize_bundled_timing,
}; };
use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket}; use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket};
use lesavka_server::upstream_media_runtime::{ use lesavka_server::upstream_media_runtime::{
@ -136,12 +137,33 @@ mod tests {
let summary = summarize_bundled_timing(&bundle, &events).expect("timing summary"); let summary = summarize_bundled_timing(&bundle, &events).expect("timing summary");
assert!(summary.mixed()); assert!(bundled_events_are_mixed(&events));
assert_eq!(summary.capture_span_us, 400_000); assert_eq!(summary.capture_span_us, 400_000);
assert!(!summary.capture_bounds_match); assert!(!summary.capture_bounds_match);
assert!(summary.mixed_span_too_wide); assert!(summary.mixed_span_too_wide);
} }
#[test]
fn startup_video_retention_drops_audio_from_bad_mixed_bundle() {
let mut events = vec![
BundledUpstreamEvent::Audio(AudioPacket {
client_capture_pts_us: 1_000_000,
..Default::default()
}),
BundledUpstreamEvent::Video(VideoPacket {
client_capture_pts_us: 1_500_000,
..Default::default()
}),
];
assert!(bundled_events_are_mixed(&events));
assert!(retain_startup_video_only(&mut events));
assert!(!bundled_events_are_mixed(&events));
assert_eq!(events.len(), 1);
assert!(matches!(events[0], BundledUpstreamEvent::Video(_)));
}
#[test] #[test]
fn upstream_cleanup_guard_closes_its_microphone_generation() { fn upstream_cleanup_guard_closes_its_microphone_generation() {
let runtime = Arc::new(UpstreamMediaRuntime::new()); let runtime = Arc::new(UpstreamMediaRuntime::new());