impl LesavkaClientApp { /*──────────────── mic stream ─────────────────*/ #[cfg(not(coverage))] async fn voice_loop(ep: Channel, mic: Arc) { let mut delay = Duration::from_secs(1); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let mut cli = RelayClient::new(ep.clone()); // 1. create a Tokio MPSC channel let (tx, rx) = tokio::sync::mpsc::channel::(256); let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); // 2. spawn a real thread that does the blocking `pull()` let mic_clone = mic.clone(); std::thread::spawn(move || { while stop_rx.try_recv().is_err() { if let Some(pkt) = mic_clone.pull() { trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len()); let _ = tx.blocking_send(pkt); } } }); // 3. turn `rx` into an async stream for gRPC let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); match cli.stream_microphone(Request::new(outbound)).await { Ok(mut resp) => while resp.get_mut().message().await.transpose().is_some() {}, Err(e) => { // first failure → warn, subsequent ones → debug if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌🎤 connect failed: {e}"); warn!("⚠️🎤 further microphone‑stream failures will be logged at DEBUG"); } else { debug!("❌🎤 reconnect failed: {e}"); } delay = app_support::next_delay(delay); } } let _ = stop_tx.send(()); tokio::time::sleep(delay).await; } } /*──────────────── cam stream ───────────────────*/ #[cfg(not(coverage))] async fn cam_loop(ep: Channel, cam: Arc) { let mut delay = Duration::from_secs(1); loop { let mut cli = RelayClient::new(ep.clone()); let (tx, rx) = tokio::sync::mpsc::channel::(256); let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>(); let cam_worker = std::thread::spawn({ let cam = cam.clone(); move || loop { if stop_rx.try_recv().is_ok() { break; } let Some(pkt) = cam.pull() else { std::thread::sleep(Duration::from_millis(5)); continue; }; // TRACE every 120 frames only static CNT: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); if n < 10 || n.is_multiple_of(120) { tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len()); } tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len()); if tx.blocking_send(pkt).is_err() { break; } } }); let outbound = tokio_stream::wrappers::ReceiverStream::new(rx); match cli.stream_camera(Request::new(outbound)).await { Ok(mut resp) => { delay = Duration::from_secs(1); // got a stream → reset while resp.get_mut().message().await.transpose().is_some() {} } Err(e) if e.code() == tonic::Code::Unimplemented => { tracing::warn!("📸 server does not support StreamCamera – giving up"); let _ = stop_tx.send(()); let _ = cam_worker.join(); return; // stop the task completely (#3) } Err(e) => { tracing::warn!("❌📸 connect failed: {e:?}"); delay = app_support::next_delay(delay); // back-off (#2) } } let _ = stop_tx.send(()); let _ = cam_worker.join(); tokio::time::sleep(delay).await; } } }