impl LesavkaClientApp { /*──────────────── bundled webcam + mic stream ─────────────────*/ #[cfg(not(coverage))] async fn webcam_media_loop( ep: Channel, initial_camera_source: Option, initial_camera_profile: Option, initial_microphone_source: Option, camera_cfg: Option, camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let state = media_controls.refresh(); let camera_requested = state.camera; let microphone_requested = state.microphone || state.camera; if !camera_requested && !microphone_requested { camera_telemetry.record_enabled(false); microphone_telemetry.record_enabled(false); tokio::time::sleep(Duration::from_millis(100)).await; continue; } let active_camera_source = state.camera_source.resolve(initial_camera_source.as_deref()); let active_camera_profile = state.camera_profile.resolve(initial_camera_profile.as_deref()); let active_microphone_source = state .microphone_source .resolve(initial_microphone_source.as_deref()); let capture_profile = active_camera_profile .as_deref() .and_then(parse_camera_profile_id); let use_default_microphone = matches!( state.microphone_source, crate::live_media_control::MediaDeviceChoice::Auto ) && active_microphone_source.is_none(); let setup_camera_source = active_camera_source.clone(); let setup_microphone_source = active_microphone_source.clone(); let setup = tokio::task::spawn_blocking(move || { let microphone = if use_default_microphone { MicrophoneCapture::new_default_source() } else { MicrophoneCapture::new_with_source(setup_microphone_source.as_deref()) }?; let camera = if camera_requested { Some(CameraCapture::new_with_capture_profile( setup_camera_source.as_deref(), camera_cfg, capture_profile, )?) } else { None }; Ok::<_, anyhow::Error>((camera.map(Arc::new), Arc::new(microphone))) }) .await; let (camera, microphone) = match setup { Ok(Ok(captures)) => captures, Ok(Err(err)) => { camera_telemetry.record_disconnect(format!( "bundled webcam media setup failed: {err:#}" )); microphone_telemetry.record_disconnect(format!( "bundled webcam media setup failed: {err:#}" )); warn!( "📦 bundled webcam media setup failed for camera={:?} mic={:?}: {err:#}", active_camera_source.as_deref().unwrap_or("auto"), active_microphone_source.as_deref().unwrap_or("auto") ); if camera_requested { abort_if_required_media_source_failed( "camera", "📸", active_camera_source.as_deref(), &err, ); } abort_if_required_media_source_failed( "microphone", "🎤", active_microphone_source.as_deref(), &err, ); delay = app_support::next_delay(delay); tokio::time::sleep(delay).await; continue; } Err(err) => { camera_telemetry.record_disconnect(format!( "bundled webcam media setup task failed: {err}" )); microphone_telemetry.record_disconnect(format!( "bundled webcam media setup task failed: {err}" )); warn!("📦 bundled webcam media setup task failed: {err}"); delay = app_support::next_delay(delay); tokio::time::sleep(delay).await; continue; } }; camera_telemetry.record_reconnect_attempt(); microphone_telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue: crate::uplink_fresh_queue::FreshPacketQueue = crate::uplink_fresh_queue::FreshPacketQueue::new(BUNDLED_MEDIA_UPLINK_QUEUE); let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new( "bundled-webcam-media", "📦", ))); let queue_stream = queue.clone(); let camera_telemetry_stream = camera_telemetry.clone(); let microphone_telemetry_stream = microphone_telemetry.clone(); let drop_log_stream = Arc::clone(&drop_log); let outbound = async_stream::stream! { loop { let next = queue_stream.pop_fresh().await; if next.dropped_stale > 0 { camera_telemetry_stream.record_stale_drop(next.dropped_stale); microphone_telemetry_stream.record_stale_drop(next.dropped_stale); log_uplink_drop( &drop_log_stream, UplinkDropReason::Stale, next.dropped_stale, next.queue_depth, duration_ms(next.delivery_age), ); } if let Some(mut bundle) = next.packet { let queue_depth = queue_depth_u32(next.queue_depth); let delivery_age_ms = duration_ms(next.delivery_age); if bundle.video.is_some() { camera_telemetry_stream.record_streamed( queue_depth, delivery_age_ms, ); } if !bundle.audio.is_empty() { microphone_telemetry_stream.record_streamed( queue_depth, delivery_age_ms, ); } attach_bundle_queue_metadata(&mut bundle, next.queue_depth, next.delivery_age); yield bundle; continue; } break; } }; match cli.stream_webcam_media(Request::new(outbound)).await { Ok(mut resp) => { let stop = Arc::new(AtomicBool::new(false)); let (event_tx, event_rx) = std::sync::mpsc::channel::(); let camera_worker = camera.as_ref().map(|camera| { let camera = Arc::clone(camera); let stop = Arc::clone(&stop); let event_tx = event_tx.clone(); let media_controls = media_controls.clone(); let initial_camera_source = initial_camera_source.clone(); let initial_camera_profile = initial_camera_profile.clone(); let active_camera_source = active_camera_source.clone(); let active_camera_profile = active_camera_profile.clone(); std::thread::spawn(move || { while !stop.load(Ordering::Relaxed) { let state = media_controls.refresh(); let desired_source = state.camera_source.resolve(initial_camera_source.as_deref()); let desired_profile = state.camera_profile.resolve(initial_camera_profile.as_deref()); if !state.camera || desired_source != active_camera_source || desired_profile != active_camera_profile { let _ = event_tx.send(BundledCaptureEvent::Restart); break; } if let Some(mut pkt) = camera.pull() { let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt); if event_tx.send(BundledCaptureEvent::Video(pkt)).is_err() { break; } } } }) }); let microphone_worker = { let microphone = Arc::clone(µphone); let stop = Arc::clone(&stop); let event_tx = event_tx.clone(); let media_controls = media_controls.clone(); let initial_microphone_source = initial_microphone_source.clone(); let active_microphone_source = active_microphone_source.clone(); let active_camera_requested = camera_requested; std::thread::spawn(move || { while !stop.load(Ordering::Relaxed) { let state = media_controls.refresh(); let desired_source = state .microphone_source .resolve(initial_microphone_source.as_deref()); if state.camera != active_camera_requested || !(state.microphone || state.camera) || desired_source != active_microphone_source { let _ = event_tx.send(BundledCaptureEvent::Restart); break; } if let Some(mut pkt) = microphone.pull() { let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt); if event_tx.send(BundledCaptureEvent::Audio(pkt)).is_err() { break; } } } }) }; drop(event_tx); let bundle_worker = { let stop = Arc::clone(&stop); let queue = queue.clone(); let camera_telemetry = camera_telemetry.clone(); let microphone_telemetry = microphone_telemetry.clone(); let drop_log = Arc::clone(&drop_log); std::thread::spawn(move || { bundle_captured_media( event_rx, stop, queue, camera_telemetry, microphone_telemetry, drop_log, ); }) }; delay = Duration::from_secs(1); camera_telemetry.record_connected(); microphone_telemetry.record_connected(); while resp.get_mut().message().await.transpose().is_some() {} camera_telemetry.record_disconnect("bundled webcam media stream ended"); microphone_telemetry.record_disconnect("bundled webcam media stream ended"); stop.store(true, Ordering::Relaxed); queue.close(); if let Some(worker) = camera_worker { let _ = worker.join(); } let _ = microphone_worker.join(); let _ = bundle_worker.join(); } Err(e) if e.code() == tonic::Code::Unimplemented => { camera_telemetry.record_disconnect("bundled webcam media unavailable on server"); microphone_telemetry .record_disconnect("bundled webcam media unavailable on server"); warn!("📦 server does not support bundled webcam media – retrying"); delay = app_support::next_delay(delay); } Err(e) => { camera_telemetry .record_disconnect(format!("bundled webcam media connect failed: {e}")); microphone_telemetry .record_disconnect(format!("bundled webcam media connect failed: {e}")); if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌📦 bundled webcam media connect failed: {e}"); } else { debug!("❌📦 bundled webcam media reconnect failed: {e}"); } delay = app_support::next_delay(delay); } } queue.close(); tokio::time::sleep(delay).await; } } /*──────────────── mic stream ─────────────────*/ #[cfg(not(coverage))] async fn voice_loop( ep: Channel, initial_source: Option, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let state = media_controls.refresh(); if !state.microphone { telemetry.record_enabled(false); tokio::time::sleep(Duration::from_millis(100)).await; continue; } let microphone_source_choice = state.microphone_source.clone(); let active_source = microphone_source_choice.resolve(initial_source.as_deref()); let use_default_source = matches!( microphone_source_choice, crate::live_media_control::MediaDeviceChoice::Auto ) && active_source.is_none(); let setup_source = active_source.clone(); let result = tokio::task::spawn_blocking(move || { if use_default_source { MicrophoneCapture::new_default_source() } else { MicrophoneCapture::new_with_source(setup_source.as_deref()) } }) .await; let mic = match result { Ok(Ok(mic)) => Arc::new(mic), Ok(Err(err)) => { telemetry.record_disconnect(format!("microphone uplink setup failed: {err:#}")); warn!( "🎤 microphone uplink setup failed for {:?}: {err:#}", active_source.as_deref().unwrap_or("auto") ); abort_if_required_media_source_failed("microphone", "🎤", active_source.as_deref(), &err); delay = app_support::next_delay(delay); tokio::time::sleep(delay).await; continue; } Err(err) => { telemetry.record_disconnect(format!("microphone uplink setup task failed: {err}")); warn!("🎤 microphone uplink setup task failed before StreamMicrophone could start: {err}"); abort_if_required_media_source_failed( "microphone", "🎤", active_source.as_deref(), &err, ); delay = app_support::next_delay(delay); tokio::time::sleep(delay).await; continue; } }; telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE); let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new( "microphone", "🎤", ))); let queue_stream = queue.clone(); let telemetry_stream = telemetry.clone(); let drop_log_stream = Arc::clone(&drop_log); let outbound = async_stream::stream! { loop { let next = queue_stream.pop_fresh().await; if next.dropped_stale > 0 { telemetry_stream.record_stale_drop(next.dropped_stale); log_uplink_drop( &drop_log_stream, UplinkDropReason::Stale, next.dropped_stale, next.queue_depth, duration_ms(next.delivery_age), ); } if let Some(mut packet) = next.packet { telemetry_stream.record_streamed( queue_depth_u32(next.queue_depth), duration_ms(next.delivery_age), ); attach_audio_queue_metadata( &mut packet, next.queue_depth, next.delivery_age, ); yield packet; continue; } break; } }; match cli.stream_microphone(Request::new(outbound)).await { Ok(mut resp) => { let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); let mic_clone = mic.clone(); let telemetry_thread = telemetry.clone(); let queue_thread = queue.clone(); let drop_log_thread = Arc::clone(&drop_log); let media_controls_thread = media_controls.clone(); let initial_source_thread = initial_source.clone(); let active_source_thread = active_source.clone(); let mic_worker = std::thread::spawn(move || { let mut paused = false; while stop_rx.try_recv().is_err() { let state = media_controls_thread.refresh(); let desired_source = state .microphone_source .resolve(initial_source_thread.as_deref()); if desired_source != active_source_thread { tracing::info!( from = active_source_thread.as_deref().unwrap_or("auto"), to = desired_source.as_deref().unwrap_or("auto"), "🎤 microphone source changed; restarting live uplink pipeline" ); break; } if !state.microphone { if !paused { telemetry_thread.record_enabled(false); tracing::info!("🎤 microphone uplink soft-paused"); paused = true; } std::thread::sleep(Duration::from_millis(20)); continue; } if paused { telemetry_thread.record_enabled(true); tracing::info!("🎤 microphone uplink resumed"); paused = false; } if let Some(mut pkt) = mic_clone.pull() { trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len()); let enqueue_age = stamp_audio_timing_metadata_at_enqueue(&mut pkt); let stats = queue_thread.push(pkt, enqueue_age); if stats.dropped_queue_full > 0 { telemetry_thread.record_queue_full_drop(stats.dropped_queue_full); log_uplink_drop( &drop_log_thread, UplinkDropReason::QueueFull, stats.dropped_queue_full, stats.queue_depth, duration_ms(enqueue_age), ); } telemetry_thread.record_enqueue( queue_depth_u32(stats.queue_depth), duration_ms(enqueue_age), 0.0, ); } } }); delay = Duration::from_secs(1); telemetry.record_connected(); while resp.get_mut().message().await.transpose().is_some() {} telemetry.record_disconnect("microphone uplink stream ended"); queue.close(); let _ = stop_tx.send(()); let _ = mic_worker.join(); } Err(e) => { telemetry.record_disconnect(format!("microphone uplink connect failed: {e}")); if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎤 connect failed: {e}"); warn!("⚠️🎤 further microphone‑stream failures will be logged at DEBUG"); } else { debug!("❌🎤 reconnect failed: {e}"); } delay = app_support::next_delay(delay); } } queue.close(); tokio::time::sleep(delay).await; } } /*──────────────── cam stream ───────────────────*/ #[cfg(not(coverage))] async fn cam_loop( ep: Channel, initial_source: Option, initial_profile: Option, camera_cfg: Option, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); loop { let state = media_controls.refresh(); if !state.camera { telemetry.record_enabled(false); tokio::time::sleep(Duration::from_millis(100)).await; continue; } let active_source = state.camera_source.resolve(initial_source.as_deref()); let active_profile = state.camera_profile.resolve(initial_profile.as_deref()); let capture_profile = active_profile .as_deref() .and_then(parse_camera_profile_id); let setup_source = active_source.clone(); let result = tokio::task::spawn_blocking(move || { CameraCapture::new_with_capture_profile( setup_source.as_deref(), camera_cfg, capture_profile, ) }) .await; let cam = match result { Ok(Ok(cam)) => Arc::new(cam), Ok(Err(err)) => { telemetry.record_disconnect(format!("webcam uplink setup failed: {err:#}")); warn!( "📸 webcam uplink setup failed for {:?}: {err:#}", active_source.as_deref().unwrap_or("auto") ); abort_if_required_media_source_failed("camera", "📸", active_source.as_deref(), &err); delay = app_support::next_delay(delay); tokio::time::sleep(delay).await; continue; } Err(err) => { telemetry.record_disconnect(format!("webcam uplink setup task failed: {err}")); warn!("📸 webcam uplink setup task failed before StreamCamera could start: {err}"); abort_if_required_media_source_failed( "camera", "📸", active_source.as_deref(), &err, ); delay = app_support::next_delay(delay); tokio::time::sleep(delay).await; continue; } }; telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE); let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new("camera", "📸"))); let queue_stream = queue.clone(); let telemetry_stream = telemetry.clone(); let drop_log_stream = Arc::clone(&drop_log); let outbound = async_stream::stream! { loop { let next = queue_stream.pop_fresh().await; if next.dropped_stale > 0 { telemetry_stream.record_stale_drop(next.dropped_stale); log_uplink_drop( &drop_log_stream, UplinkDropReason::Stale, next.dropped_stale, next.queue_depth, duration_ms(next.delivery_age), ); } if let Some(mut packet) = next.packet { telemetry_stream.record_streamed( queue_depth_u32(next.queue_depth), duration_ms(next.delivery_age), ); attach_video_queue_metadata( &mut packet, next.queue_depth, next.delivery_age, ); yield packet; continue; } break; } }; match cli.stream_camera(Request::new(outbound)).await { Ok(mut resp) => { let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); let cam_worker = std::thread::spawn({ let cam = cam.clone(); let telemetry = telemetry.clone(); let queue = queue.clone(); let drop_log = Arc::clone(&drop_log); let media_controls = media_controls.clone(); let initial_source_thread = initial_source.clone(); let active_source_thread = active_source.clone(); let initial_profile_thread = initial_profile.clone(); let active_profile_thread = active_profile.clone(); move || loop { if stop_rx.try_recv().is_ok() { break; } let state = media_controls.refresh(); let desired_source = state.camera_source.resolve(initial_source_thread.as_deref()); let desired_profile = state.camera_profile.resolve(initial_profile_thread.as_deref()); if desired_source != active_source_thread || desired_profile != active_profile_thread { tracing::info!( from = active_source_thread.as_deref().unwrap_or("auto"), to = desired_source.as_deref().unwrap_or("auto"), "📸 webcam source changed; restarting live uplink pipeline" ); break; } if !state.camera { telemetry.record_enabled(false); tracing::info!("📸 webcam uplink soft-paused"); while stop_rx.try_recv().is_err() { let state = media_controls.refresh(); let desired_source = state.camera_source.resolve(initial_source_thread.as_deref()); let desired_profile = state .camera_profile .resolve(initial_profile_thread.as_deref()); if desired_source != active_source_thread || desired_profile != active_profile_thread { break; } if state.camera { break; } std::thread::sleep(Duration::from_millis(25)); } if stop_rx.try_recv().is_ok() { break; } let state = media_controls.refresh(); let desired_source = state.camera_source.resolve(initial_source_thread.as_deref()); let desired_profile = state .camera_profile .resolve(initial_profile_thread.as_deref()); if desired_source != active_source_thread || desired_profile != active_profile_thread { tracing::info!( from = active_source_thread.as_deref().unwrap_or("auto"), to = desired_source.as_deref().unwrap_or("auto"), "📸 webcam source changed while paused; restarting live uplink pipeline" ); break; } telemetry.record_enabled(true); tracing::info!("📸 webcam uplink resumed"); } let Some(mut pkt) = cam.pull() else { std::thread::sleep(Duration::from_millis(5)); continue; }; static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n.is_multiple_of(120) { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); let enqueue_age = stamp_video_timing_metadata_at_enqueue(&mut pkt); let stats = queue.push(pkt, enqueue_age); if stats.dropped_queue_full > 0 { telemetry.record_queue_full_drop(stats.dropped_queue_full); log_uplink_drop( &drop_log, UplinkDropReason::QueueFull, stats.dropped_queue_full, stats.queue_depth, duration_ms(enqueue_age), ); } telemetry.record_enqueue( queue_depth_u32(stats.queue_depth), duration_ms(enqueue_age), 0.0, ); } }); delay = Duration::from_secs(1); telemetry.record_connected(); while resp.get_mut().message().await.transpose().is_some() {} telemetry.record_disconnect("camera uplink stream ended"); queue.close(); let _ = stop_tx.send(()); let _ = cam_worker.join(); } Err(e) if e.code() == tonic::Code::Unimplemented => { tracing::warn!("📸 server does not support StreamCamera – giving up"); telemetry.record_disconnect("camera uplink unavailable on server"); queue.close(); return; } Err(e) => { telemetry.record_disconnect(format!("camera uplink connect failed: {e}")); tracing::warn!("❌📸 connect failed: {e:?}"); delay = app_support::next_delay(delay); } } queue.close(); tokio::time::sleep(delay).await; } } } #[cfg(not(coverage))] fn initial_camera_profile_id_from_env() -> Option { let width = std::env::var("LESAVKA_CAM_WIDTH").ok()?; let height = std::env::var("LESAVKA_CAM_HEIGHT").ok()?; let fps = std::env::var("LESAVKA_CAM_FPS").ok()?; Some(format!("{width}x{height}@{fps}")) } #[cfg(not(coverage))] fn parse_camera_profile_id(raw: &str) -> Option<(u32, u32, u32)> { let (size, fps) = raw.split_once('@')?; let (width, height) = size.split_once('x')?; let width = width.parse().ok()?; let height = height.parse().ok()?; let fps = fps.parse().ok()?; (width > 0 && height > 0 && fps > 0).then_some((width, height, fps)) } #[cfg(not(coverage))] fn abort_if_required_media_source_failed( kind: &str, icon: &str, source: Option<&str>, err: &dyn std::fmt::Display, ) { if !explicit_media_sources_required() || source.is_none_or(|source| source.trim().is_empty()) { return; } let source = source.expect("checked source presence"); error!( "{icon} required {kind} source '{source}' failed to start; aborting client because LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES=1: {err}" ); eprintln!( "{icon} required {kind} source '{source}' failed to start; aborting client because LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES=1: {err}" ); std::process::exit(2); } #[cfg(not(coverage))] fn explicit_media_sources_required() -> bool { std::env::var("LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES") .ok() .is_some_and(|value| { let value = value.trim(); value == "1" || value.eq_ignore_ascii_case("true") || value.eq_ignore_ascii_case("yes") || value.eq_ignore_ascii_case("on") }) } #[cfg(not(coverage))] const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { capacity: 32, max_age: Duration::from_millis(350), policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, }; #[cfg(not(coverage))] const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { capacity: 64, max_age: Duration::from_millis(400), policy: crate::uplink_fresh_queue::FreshQueuePolicy::DrainOldest, }; #[cfg(not(coverage))] const BUNDLED_MEDIA_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { capacity: 16, max_age: Duration::from_millis(350), policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, }; #[cfg(not(coverage))] const BUNDLED_AUDIO_FLUSH_INTERVAL: Duration = Duration::from_millis(20); #[cfg(not(coverage))] const BUNDLED_AUDIO_MAX_PENDING: usize = 8; #[cfg(not(coverage))] #[derive(Debug)] enum BundledCaptureEvent { Audio(AudioPacket), Video(VideoPacket), Restart, } #[cfg(not(coverage))] fn bundle_captured_media( event_rx: std::sync::mpsc::Receiver, stop: Arc, queue: crate::uplink_fresh_queue::FreshPacketQueue, camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, drop_log: Arc>, ) { static BUNDLED_SESSION: AtomicU64 = AtomicU64::new(0); let session_id = BUNDLED_SESSION .fetch_add(1, Ordering::Relaxed) .saturating_add(1); let mut bundle_seq = 0_u64; let mut pending_audio = Vec::new(); let mut next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; loop { if stop.load(Ordering::Relaxed) { break; } let timeout = next_audio_flush.saturating_duration_since(Instant::now()); match event_rx.recv_timeout(timeout) { Ok(BundledCaptureEvent::Audio(packet)) => { pending_audio.push(packet); if pending_audio.len() >= BUNDLED_AUDIO_MAX_PENDING { emit_bundled_media( session_id, &mut bundle_seq, None, std::mem::take(&mut pending_audio), &queue, &camera_telemetry, µphone_telemetry, &drop_log, ); next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; } } Ok(BundledCaptureEvent::Video(packet)) => { emit_bundled_media( session_id, &mut bundle_seq, Some(packet), std::mem::take(&mut pending_audio), &queue, &camera_telemetry, µphone_telemetry, &drop_log, ); next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; } Ok(BundledCaptureEvent::Restart) => { stop.store(true, Ordering::Relaxed); break; } Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { if !pending_audio.is_empty() { emit_bundled_media( session_id, &mut bundle_seq, None, std::mem::take(&mut pending_audio), &queue, &camera_telemetry, µphone_telemetry, &drop_log, ); } next_audio_flush = Instant::now() + BUNDLED_AUDIO_FLUSH_INTERVAL; } Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break, } } queue.close(); } #[cfg(not(coverage))] fn emit_bundled_media( session_id: u64, bundle_seq: &mut u64, video: Option, audio: Vec, queue: &crate::uplink_fresh_queue::FreshPacketQueue, camera_telemetry: &crate::uplink_telemetry::UplinkTelemetryHandle, microphone_telemetry: &crate::uplink_telemetry::UplinkTelemetryHandle, drop_log: &Arc>, ) { if video.is_none() && audio.is_empty() { return; } *bundle_seq = bundle_seq.saturating_add(1); let (capture_start_us, capture_end_us) = bundled_capture_bounds(video.as_ref(), &audio); let enqueue_now_us = crate::live_capture_clock::capture_pts_us(); let enqueue_age = Duration::from_micros(enqueue_now_us.saturating_sub(capture_start_us)); let has_video = video.is_some(); let has_audio = !audio.is_empty(); let mut bundle = UpstreamMediaBundle { session_id, seq: *bundle_seq, capture_start_us, capture_end_us, video, audio, ..UpstreamMediaBundle::default() }; attach_bundle_queue_metadata(&mut bundle, 0, enqueue_age); let stats = queue.push(bundle, enqueue_age); if stats.dropped_queue_full > 0 { if has_video { camera_telemetry.record_queue_full_drop(stats.dropped_queue_full); } if has_audio { microphone_telemetry.record_queue_full_drop(stats.dropped_queue_full); } log_uplink_drop( drop_log, UplinkDropReason::QueueFull, stats.dropped_queue_full, stats.queue_depth, duration_ms(enqueue_age), ); } let queue_depth = queue_depth_u32(stats.queue_depth); let age_ms = duration_ms(enqueue_age); if has_video { camera_telemetry.record_enqueue(queue_depth, age_ms, 0.0); } if has_audio { microphone_telemetry.record_enqueue(queue_depth, age_ms, 0.0); } } #[cfg(not(coverage))] fn bundled_capture_bounds(video: Option<&VideoPacket>, audio: &[AudioPacket]) -> (u64, u64) { let mut start = u64::MAX; let mut end = 0_u64; if let Some(video) = video { let pts = packet_video_capture_pts_us(video); start = start.min(pts); end = end.max(pts); } for packet in audio { let pts = packet_audio_capture_pts_us(packet); start = start.min(pts); end = end.max(pts); } if start == u64::MAX { let now = crate::live_capture_clock::capture_pts_us(); return (now, now); } (start, end.max(start)) } #[cfg(not(coverage))] fn packet_audio_capture_pts_us(packet: &AudioPacket) -> u64 { if packet.client_capture_pts_us == 0 { packet.pts } else { packet.client_capture_pts_us } } #[cfg(not(coverage))] fn packet_video_capture_pts_us(packet: &VideoPacket) -> u64 { if packet.client_capture_pts_us == 0 { packet.pts } else { packet.client_capture_pts_us } } #[cfg(not(coverage))] fn queue_depth_u32(depth: usize) -> u32 { depth.try_into().unwrap_or(u32::MAX) } #[cfg(not(coverage))] fn duration_ms(duration: Duration) -> f32 { duration.as_secs_f32() * 1_000.0 } #[cfg(not(coverage))] fn duration_ms_u32(duration: Duration) -> u32 { duration.as_millis().min(u128::from(u32::MAX)) as u32 } #[cfg(not(coverage))] fn age_between_capture_and_enqueue(capture_pts_us: u64, enqueue_pts_us: u64) -> Duration { Duration::from_micros(enqueue_pts_us.saturating_sub(capture_pts_us)) } #[cfg(not(coverage))] fn stamp_audio_timing_metadata_at_enqueue(packet: &mut AudioPacket) -> Duration { static AUDIO_SEQUENCE: AtomicU64 = AtomicU64::new(0); let enqueue_pts_us = crate::live_capture_clock::capture_pts_us(); let capture_pts_us = sanitized_capture_pts_us(packet.pts, enqueue_pts_us); packet.pts = capture_pts_us; packet.seq = AUDIO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); packet.client_capture_pts_us = capture_pts_us; packet.client_send_pts_us = enqueue_pts_us; age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us) } #[cfg(not(coverage))] fn stamp_video_timing_metadata_at_enqueue(packet: &mut VideoPacket) -> Duration { static VIDEO_SEQUENCE: AtomicU64 = AtomicU64::new(0); let enqueue_pts_us = crate::live_capture_clock::capture_pts_us(); let capture_pts_us = sanitized_capture_pts_us(packet.pts, enqueue_pts_us); packet.pts = capture_pts_us; packet.seq = VIDEO_SEQUENCE.fetch_add(1, Ordering::Relaxed).saturating_add(1); packet.client_capture_pts_us = capture_pts_us; packet.client_send_pts_us = enqueue_pts_us; age_between_capture_and_enqueue(capture_pts_us, enqueue_pts_us) } #[cfg(not(coverage))] fn sanitized_capture_pts_us(packet_pts_us: u64, enqueue_pts_us: u64) -> u64 { let mut capture_pts_us = packet_pts_us.min(enqueue_pts_us); let max_lag_us = crate::live_capture_clock::upstream_source_lag_cap() .as_micros() .min(u64::MAX as u128) as u64; let lag_floor_us = enqueue_pts_us.saturating_sub(max_lag_us); if capture_pts_us < lag_floor_us { capture_pts_us = lag_floor_us; } capture_pts_us } #[cfg(not(coverage))] fn attach_audio_queue_metadata( packet: &mut AudioPacket, queue_depth: usize, delivery_age: Duration, ) { if packet.seq == 0 { let _ = stamp_audio_timing_metadata_at_enqueue(packet); } packet.client_queue_depth = queue_depth_u32(queue_depth); packet.client_queue_age_ms = duration_ms_u32(delivery_age); } #[cfg(not(coverage))] fn attach_video_queue_metadata( packet: &mut VideoPacket, queue_depth: usize, delivery_age: Duration, ) { if packet.seq == 0 { let _ = stamp_video_timing_metadata_at_enqueue(packet); } packet.client_queue_depth = queue_depth_u32(queue_depth); packet.client_queue_age_ms = duration_ms_u32(delivery_age); } #[cfg(not(coverage))] fn attach_bundle_queue_metadata( bundle: &mut UpstreamMediaBundle, queue_depth: usize, delivery_age: Duration, ) { for packet in &mut bundle.audio { attach_audio_queue_metadata(packet, queue_depth, delivery_age); } if let Some(packet) = bundle.video.as_mut() { attach_video_queue_metadata(packet, queue_depth, delivery_age); } } #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] enum UplinkDropReason { QueueFull, Stale, } #[cfg(not(coverage))] #[derive(Debug)] struct UplinkDropLogLimiter { stream: &'static str, icon: &'static str, last_warn_at: Option, suppressed_full: u64, suppressed_stale: u64, } #[cfg(not(coverage))] /// Aggregate freshness-first upstream drops into periodic warnings per stream. impl UplinkDropLogLimiter { fn new(stream: &'static str, icon: &'static str) -> Self { Self { stream, icon, last_warn_at: None, suppressed_full: 0, suppressed_stale: 0, } } /// Fold full-queue and stale-packet drops into one periodic warning. fn record(&mut self, reason: UplinkDropReason, count: u64, queue_depth: usize, age_ms: f32) { match reason { UplinkDropReason::QueueFull => { self.suppressed_full = self.suppressed_full.saturating_add(count) } UplinkDropReason::Stale => { self.suppressed_stale = self.suppressed_stale.saturating_add(count) } } let should_warn = self .last_warn_at .map(|last| last.elapsed() >= UPLINK_DROP_WARN_INTERVAL) .unwrap_or(true); if should_warn { warn!( stream = self.stream, dropped_queue_full = self.suppressed_full, dropped_stale = self.suppressed_stale, queue_depth, latest_age_ms = age_ms, "{} upstream {} queue is dropping stale/superseded packets to preserve live A/V sync", self.icon, self.stream ); self.suppressed_full = 0; self.suppressed_stale = 0; self.last_warn_at = Some(Instant::now()); } else { debug!( stream = self.stream, ?reason, count, queue_depth, latest_age_ms = age_ms, "upstream media queue drop suppressed from WARN noise" ); } } } #[cfg(not(coverage))] const UPLINK_DROP_WARN_INTERVAL: Duration = Duration::from_secs(5); #[cfg(not(coverage))] /// Report an upstream queue drop through the shared rate limiter. fn log_uplink_drop( limiter: &Arc>, reason: UplinkDropReason, count: u64, queue_depth: usize, age_ms: f32, ) { if let Ok(mut limiter) = limiter.lock() { limiter.record(reason, count, queue_depth, age_ms); } } #[cfg(test)] mod uplink_timing_tests { use super::*; #[test] fn audio_timing_metadata_is_stamped_before_async_queue_pop() { std::thread::sleep(Duration::from_millis(5)); let packet_pts_us = crate::live_capture_clock::capture_pts_us().saturating_sub(2_000); let mut packet = AudioPacket { pts: packet_pts_us, ..AudioPacket::default() }; let enqueue_age = stamp_audio_timing_metadata_at_enqueue(&mut packet); let capture_pts_us = packet.client_capture_pts_us; let send_pts_us = packet.client_send_pts_us; std::thread::sleep(Duration::from_millis(5)); attach_audio_queue_metadata( &mut packet, 3, enqueue_age.saturating_add(Duration::from_millis(5)), ); assert!(packet.seq > 0); assert_eq!(packet.client_queue_depth, 3); assert!(packet.client_queue_age_ms >= 5); assert_eq!(packet.client_capture_pts_us, capture_pts_us); assert_eq!(packet.client_send_pts_us, send_pts_us); assert!( packet.client_send_pts_us >= packet.client_capture_pts_us, "enqueue/send stamp must be on or after the shared-clock capture estimate" ); assert!( packet.client_send_pts_us - packet.client_capture_pts_us <= 3_000, "capture-to-enqueue age, not async pop delay, should define the timing window" ); } #[test] fn video_timing_metadata_is_stamped_before_async_queue_pop() { std::thread::sleep(Duration::from_millis(5)); let packet_pts_us = crate::live_capture_clock::capture_pts_us().saturating_sub(3_000); let mut packet = VideoPacket { pts: packet_pts_us, ..VideoPacket::default() }; let enqueue_age = stamp_video_timing_metadata_at_enqueue(&mut packet); let capture_pts_us = packet.client_capture_pts_us; let send_pts_us = packet.client_send_pts_us; std::thread::sleep(Duration::from_millis(5)); attach_video_queue_metadata( &mut packet, 4, enqueue_age.saturating_add(Duration::from_millis(5)), ); assert!(packet.seq > 0); assert_eq!(packet.client_queue_depth, 4); assert!(packet.client_queue_age_ms >= 5); assert_eq!(packet.client_capture_pts_us, capture_pts_us); assert_eq!(packet.client_send_pts_us, send_pts_us); assert!( packet.client_send_pts_us >= packet.client_capture_pts_us, "enqueue/send stamp must be on or after the shared-clock capture estimate" ); assert!( packet.client_send_pts_us - packet.client_capture_pts_us <= 4_000, "capture-to-enqueue age, not async pop delay, should define the timing window" ); } #[test] fn stale_source_timestamps_are_clamped_before_bundling() { let enqueue_pts_us = crate::live_capture_clock::capture_pts_us(); let stale_pts_us = enqueue_pts_us.saturating_sub(30_000_000); let mut audio = AudioPacket { pts: stale_pts_us, ..AudioPacket::default() }; let mut video = VideoPacket { pts: stale_pts_us, ..VideoPacket::default() }; let audio_age = stamp_audio_timing_metadata_at_enqueue(&mut audio); let video_age = stamp_video_timing_metadata_at_enqueue(&mut video); assert_eq!(audio.pts, audio.client_capture_pts_us); assert_eq!(video.pts, video.client_capture_pts_us); assert!( audio_age <= crate::live_capture_clock::upstream_source_lag_cap(), "audio capture timestamp should not resurrect stale source timing" ); assert!( video_age <= crate::live_capture_clock::upstream_source_lag_cap(), "video capture timestamp should not resurrect stale source timing" ); } }