impl LesavkaClientApp { /*──────────────── mic stream ─────────────────*/ #[cfg(not(coverage))] async fn voice_loop( ep: Channel, mic: Arc, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE); let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new( "microphone", "🎤", ))); let queue_stream = queue.clone(); let telemetry_stream = telemetry.clone(); let drop_log_stream = Arc::clone(&drop_log); let outbound = async_stream::stream! { loop { let next = queue_stream.pop_fresh().await; if next.dropped_stale > 0 { telemetry_stream.record_stale_drop(next.dropped_stale); log_uplink_drop( &drop_log_stream, UplinkDropReason::Stale, next.dropped_stale, next.queue_depth, duration_ms(next.delivery_age), ); } if let Some(packet) = next.packet { telemetry_stream.record_streamed( queue_depth_u32(next.queue_depth), duration_ms(next.delivery_age), ); yield packet; continue; } break; } }; match cli.stream_microphone(Request::new(outbound)).await { Ok(mut resp) => { let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); let mic_clone = mic.clone(); let telemetry_thread = telemetry.clone(); let queue_thread = queue.clone(); let drop_log_thread = Arc::clone(&drop_log); let media_controls_thread = media_controls.clone(); let mic_worker = std::thread::spawn(move || { let mut paused = false; while stop_rx.try_recv().is_err() { if !media_controls_thread.refresh().microphone { if !paused { telemetry_thread.record_enabled(false); tracing::info!("🎤 microphone uplink soft-paused"); paused = true; } std::thread::sleep(Duration::from_millis(20)); continue; } if paused { telemetry_thread.record_enabled(true); tracing::info!("🎤 microphone uplink resumed"); paused = false; } if let Some(pkt) = mic_clone.pull() { trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len()); let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); let stats = queue_thread.push(pkt, enqueue_age); if stats.dropped_queue_full > 0 { telemetry_thread.record_queue_full_drop(stats.dropped_queue_full); log_uplink_drop( &drop_log_thread, UplinkDropReason::QueueFull, stats.dropped_queue_full, stats.queue_depth, duration_ms(enqueue_age), ); } telemetry_thread.record_enqueue( queue_depth_u32(stats.queue_depth), duration_ms(enqueue_age), 0.0, ); } } }); delay = Duration::from_secs(1); telemetry.record_connected(); while resp.get_mut().message().await.transpose().is_some() {} telemetry.record_disconnect("microphone uplink stream ended"); queue.close(); let _ = stop_tx.send(()); let _ = mic_worker.join(); } Err(e) => { telemetry.record_disconnect(format!("microphone uplink connect failed: {e}")); if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎤 connect failed: {e}"); warn!("⚠️🎤 further microphone‑stream failures will be logged at DEBUG"); } else { debug!("❌🎤 reconnect failed: {e}"); } delay = app_support::next_delay(delay); } } queue.close(); tokio::time::sleep(delay).await; } } /*──────────────── cam stream ───────────────────*/ #[cfg(not(coverage))] async fn cam_loop( ep: Channel, cam: Arc, telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); loop { telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE); let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new("camera", "📸"))); let queue_stream = queue.clone(); let telemetry_stream = telemetry.clone(); let drop_log_stream = Arc::clone(&drop_log); let outbound = async_stream::stream! { loop { let next = queue_stream.pop_fresh().await; if next.dropped_stale > 0 { telemetry_stream.record_stale_drop(next.dropped_stale); log_uplink_drop( &drop_log_stream, UplinkDropReason::Stale, next.dropped_stale, next.queue_depth, duration_ms(next.delivery_age), ); } if let Some(packet) = next.packet { telemetry_stream.record_streamed( queue_depth_u32(next.queue_depth), duration_ms(next.delivery_age), ); yield packet; continue; } break; } }; match cli.stream_camera(Request::new(outbound)).await { Ok(mut resp) => { let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); let cam_worker = std::thread::spawn({ let cam = cam.clone(); let telemetry = telemetry.clone(); let queue = queue.clone(); let drop_log = Arc::clone(&drop_log); let media_controls = media_controls.clone(); move || loop { if stop_rx.try_recv().is_ok() { break; } if !media_controls.refresh().camera { telemetry.record_enabled(false); tracing::info!("📸 webcam uplink soft-paused"); while stop_rx.try_recv().is_err() && !media_controls.refresh().camera { std::thread::sleep(Duration::from_millis(25)); } if stop_rx.try_recv().is_ok() { break; } telemetry.record_enabled(true); tracing::info!("📸 webcam uplink resumed"); } let Some(pkt) = cam.pull() else { std::thread::sleep(Duration::from_millis(5)); continue; }; static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n.is_multiple_of(120) { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts); let stats = queue.push(pkt, enqueue_age); if stats.dropped_queue_full > 0 { telemetry.record_queue_full_drop(stats.dropped_queue_full); log_uplink_drop( &drop_log, UplinkDropReason::QueueFull, stats.dropped_queue_full, stats.queue_depth, duration_ms(enqueue_age), ); } telemetry.record_enqueue( queue_depth_u32(stats.queue_depth), duration_ms(enqueue_age), 0.0, ); } }); delay = Duration::from_secs(1); telemetry.record_connected(); while resp.get_mut().message().await.transpose().is_some() {} telemetry.record_disconnect("camera uplink stream ended"); queue.close(); let _ = stop_tx.send(()); let _ = cam_worker.join(); } Err(e) if e.code() == tonic::Code::Unimplemented => { tracing::warn!("📸 server does not support StreamCamera – giving up"); telemetry.record_disconnect("camera uplink unavailable on server"); queue.close(); return; } Err(e) => { telemetry.record_disconnect(format!("camera uplink connect failed: {e}")); tracing::warn!("❌📸 connect failed: {e:?}"); delay = app_support::next_delay(delay); } } queue.close(); tokio::time::sleep(delay).await; } } } #[cfg(not(coverage))] const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { capacity: 32, max_age: Duration::from_millis(350), policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, }; #[cfg(not(coverage))] const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig = crate::uplink_fresh_queue::FreshQueueConfig { capacity: 16, max_age: Duration::from_millis(400), policy: crate::uplink_fresh_queue::FreshQueuePolicy::LatestOnly, }; #[cfg(not(coverage))] fn queue_depth_u32(depth: usize) -> u32 { depth.try_into().unwrap_or(u32::MAX) } #[cfg(not(coverage))] fn duration_ms(duration: Duration) -> f32 { duration.as_secs_f32() * 1_000.0 } #[cfg(not(coverage))] #[derive(Clone, Copy, Debug)] enum UplinkDropReason { QueueFull, Stale, } #[cfg(not(coverage))] #[derive(Debug)] struct UplinkDropLogLimiter { stream: &'static str, icon: &'static str, last_warn_at: Option, suppressed_full: u64, suppressed_stale: u64, } #[cfg(not(coverage))] /// Aggregate freshness-first upstream drops into periodic warnings per stream. impl UplinkDropLogLimiter { fn new(stream: &'static str, icon: &'static str) -> Self { Self { stream, icon, last_warn_at: None, suppressed_full: 0, suppressed_stale: 0, } } /// Fold full-queue and stale-packet drops into one periodic warning. fn record(&mut self, reason: UplinkDropReason, count: u64, queue_depth: usize, age_ms: f32) { match reason { UplinkDropReason::QueueFull => { self.suppressed_full = self.suppressed_full.saturating_add(count) } UplinkDropReason::Stale => { self.suppressed_stale = self.suppressed_stale.saturating_add(count) } } let should_warn = self .last_warn_at .map(|last| last.elapsed() >= UPLINK_DROP_WARN_INTERVAL) .unwrap_or(true); if should_warn { warn!( stream = self.stream, dropped_queue_full = self.suppressed_full, dropped_stale = self.suppressed_stale, queue_depth, latest_age_ms = age_ms, "{} upstream {} queue is dropping stale/superseded packets to preserve live A/V sync", self.icon, self.stream ); self.suppressed_full = 0; self.suppressed_stale = 0; self.last_warn_at = Some(Instant::now()); } else { debug!( stream = self.stream, ?reason, count, queue_depth, latest_age_ms = age_ms, "upstream media queue drop suppressed from WARN noise" ); } } } #[cfg(not(coverage))] const UPLINK_DROP_WARN_INTERVAL: Duration = Duration::from_secs(5); #[cfg(not(coverage))] /// Report an upstream queue drop through the shared rate limiter. fn log_uplink_drop( limiter: &Arc>, reason: UplinkDropReason, count: u64, queue_depth: usize, age_ms: f32, ) { if let Ok(mut limiter) = limiter.lock() { limiter.record(reason, count, queue_depth, age_ms); } }