2026-04-23 07:00:06 -03:00
|
|
|
|
impl LesavkaClientApp {
|
|
|
|
|
|
/*──────────────── mic stream ─────────────────*/
|
|
|
|
|
|
#[cfg(not(coverage))]
|
2026-04-24 00:30:07 -03:00
|
|
|
|
async fn voice_loop(
|
|
|
|
|
|
ep: Channel,
|
|
|
|
|
|
mic: Arc<MicrophoneCapture>,
|
|
|
|
|
|
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
|
|
|
|
|
|
) {
|
2026-04-23 07:00:06 -03:00
|
|
|
|
let mut delay = Duration::from_secs(1);
|
|
|
|
|
|
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
|
2026-04-24 00:30:07 -03:00
|
|
|
|
|
2026-04-23 07:00:06 -03:00
|
|
|
|
loop {
|
2026-04-24 00:30:07 -03:00
|
|
|
|
telemetry.record_reconnect_attempt();
|
2026-04-23 07:00:06 -03:00
|
|
|
|
let mut cli = RelayClient::new(ep.clone());
|
2026-04-24 00:30:07 -03:00
|
|
|
|
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(AUDIO_UPLINK_QUEUE);
|
2026-04-23 07:00:06 -03:00
|
|
|
|
|
2026-04-24 00:30:07 -03:00
|
|
|
|
let queue_stream = queue.clone();
|
|
|
|
|
|
let telemetry_stream = telemetry.clone();
|
|
|
|
|
|
let outbound = async_stream::stream! {
|
|
|
|
|
|
loop {
|
|
|
|
|
|
let next = queue_stream.pop_fresh().await;
|
|
|
|
|
|
if next.dropped_stale > 0 {
|
|
|
|
|
|
telemetry_stream.record_stale_drop(next.dropped_stale);
|
2026-04-27 10:24:53 -03:00
|
|
|
|
warn!(
|
|
|
|
|
|
dropped_stale = next.dropped_stale,
|
|
|
|
|
|
queue_depth = next.queue_depth,
|
|
|
|
|
|
"🎤 upstream microphone queue dropped stale packets"
|
|
|
|
|
|
);
|
2026-04-24 00:30:07 -03:00
|
|
|
|
}
|
|
|
|
|
|
if let Some(packet) = next.packet {
|
|
|
|
|
|
telemetry_stream.record_streamed(
|
|
|
|
|
|
queue_depth_u32(next.queue_depth),
|
|
|
|
|
|
duration_ms(next.delivery_age),
|
|
|
|
|
|
);
|
|
|
|
|
|
yield packet;
|
|
|
|
|
|
continue;
|
|
|
|
|
|
}
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
};
|
|
|
|
|
|
|
2026-04-23 07:00:06 -03:00
|
|
|
|
match cli.stream_microphone(Request::new(outbound)).await {
|
2026-04-24 00:30:07 -03:00
|
|
|
|
Ok(mut resp) => {
|
2026-04-27 07:16:08 -03:00
|
|
|
|
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
|
|
|
|
|
|
let mic_clone = mic.clone();
|
|
|
|
|
|
let telemetry_thread = telemetry.clone();
|
|
|
|
|
|
let queue_thread = queue.clone();
|
|
|
|
|
|
let mic_worker = std::thread::spawn(move || {
|
|
|
|
|
|
while stop_rx.try_recv().is_err() {
|
|
|
|
|
|
if let Some(pkt) = mic_clone.pull() {
|
|
|
|
|
|
trace!("🎤📤 cli {} bytes → gRPC", pkt.data.len());
|
|
|
|
|
|
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
|
|
|
|
|
|
let stats = queue_thread.push(pkt, enqueue_age);
|
|
|
|
|
|
if stats.dropped_queue_full > 0 {
|
|
|
|
|
|
telemetry_thread.record_queue_full_drop(stats.dropped_queue_full);
|
2026-04-27 10:24:53 -03:00
|
|
|
|
warn!(
|
|
|
|
|
|
dropped_queue_full = stats.dropped_queue_full,
|
|
|
|
|
|
queue_depth = stats.queue_depth,
|
|
|
|
|
|
enqueue_age_ms = duration_ms(enqueue_age),
|
|
|
|
|
|
"🎤 upstream microphone queue dropped the oldest packet because it was full"
|
|
|
|
|
|
);
|
2026-04-27 07:16:08 -03:00
|
|
|
|
}
|
|
|
|
|
|
telemetry_thread.record_enqueue(
|
|
|
|
|
|
queue_depth_u32(stats.queue_depth),
|
|
|
|
|
|
duration_ms(enqueue_age),
|
|
|
|
|
|
0.0,
|
|
|
|
|
|
);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
2026-04-24 00:30:07 -03:00
|
|
|
|
delay = Duration::from_secs(1);
|
|
|
|
|
|
telemetry.record_connected();
|
|
|
|
|
|
while resp.get_mut().message().await.transpose().is_some() {}
|
|
|
|
|
|
telemetry.record_disconnect("microphone uplink stream ended");
|
2026-04-27 07:16:08 -03:00
|
|
|
|
queue.close();
|
|
|
|
|
|
let _ = stop_tx.send(());
|
|
|
|
|
|
let _ = mic_worker.join();
|
2026-04-24 00:30:07 -03:00
|
|
|
|
}
|
2026-04-23 07:00:06 -03:00
|
|
|
|
Err(e) => {
|
2026-04-24 00:30:07 -03:00
|
|
|
|
telemetry.record_disconnect(format!("microphone uplink connect failed: {e}"));
|
2026-04-23 07:00:06 -03:00
|
|
|
|
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
|
|
|
|
|
|
warn!("❌🎤 connect failed: {e}");
|
|
|
|
|
|
warn!("⚠️🎤 further microphone‑stream failures will be logged at DEBUG");
|
|
|
|
|
|
} else {
|
|
|
|
|
|
debug!("❌🎤 reconnect failed: {e}");
|
|
|
|
|
|
}
|
|
|
|
|
|
delay = app_support::next_delay(delay);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-04-24 00:30:07 -03:00
|
|
|
|
|
|
|
|
|
|
queue.close();
|
2026-04-23 07:00:06 -03:00
|
|
|
|
tokio::time::sleep(delay).await;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*──────────────── cam stream ───────────────────*/
|
|
|
|
|
|
#[cfg(not(coverage))]
|
2026-04-24 00:30:07 -03:00
|
|
|
|
async fn cam_loop(
|
|
|
|
|
|
ep: Channel,
|
|
|
|
|
|
cam: Arc<CameraCapture>,
|
|
|
|
|
|
telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
|
|
|
|
|
|
) {
|
2026-04-23 07:00:06 -03:00
|
|
|
|
let mut delay = Duration::from_secs(1);
|
2026-04-24 00:30:07 -03:00
|
|
|
|
|
2026-04-23 07:00:06 -03:00
|
|
|
|
loop {
|
2026-04-24 00:30:07 -03:00
|
|
|
|
telemetry.record_reconnect_attempt();
|
2026-04-23 07:00:06 -03:00
|
|
|
|
let mut cli = RelayClient::new(ep.clone());
|
2026-04-24 00:30:07 -03:00
|
|
|
|
let queue = crate::uplink_fresh_queue::FreshPacketQueue::new(VIDEO_UPLINK_QUEUE);
|
|
|
|
|
|
|
|
|
|
|
|
let queue_stream = queue.clone();
|
|
|
|
|
|
let telemetry_stream = telemetry.clone();
|
|
|
|
|
|
let outbound = async_stream::stream! {
|
|
|
|
|
|
loop {
|
|
|
|
|
|
let next = queue_stream.pop_fresh().await;
|
|
|
|
|
|
if next.dropped_stale > 0 {
|
|
|
|
|
|
telemetry_stream.record_stale_drop(next.dropped_stale);
|
2026-04-27 10:24:53 -03:00
|
|
|
|
warn!(
|
|
|
|
|
|
dropped_stale = next.dropped_stale,
|
|
|
|
|
|
queue_depth = next.queue_depth,
|
|
|
|
|
|
"📸 upstream camera queue dropped stale packets"
|
|
|
|
|
|
);
|
2026-04-23 07:00:06 -03:00
|
|
|
|
}
|
2026-04-24 00:30:07 -03:00
|
|
|
|
if let Some(packet) = next.packet {
|
|
|
|
|
|
telemetry_stream.record_streamed(
|
|
|
|
|
|
queue_depth_u32(next.queue_depth),
|
|
|
|
|
|
duration_ms(next.delivery_age),
|
|
|
|
|
|
);
|
|
|
|
|
|
yield packet;
|
|
|
|
|
|
continue;
|
2026-04-23 07:00:06 -03:00
|
|
|
|
}
|
2026-04-24 00:30:07 -03:00
|
|
|
|
break;
|
2026-04-23 07:00:06 -03:00
|
|
|
|
}
|
2026-04-24 00:30:07 -03:00
|
|
|
|
};
|
2026-04-23 07:00:06 -03:00
|
|
|
|
|
|
|
|
|
|
match cli.stream_camera(Request::new(outbound)).await {
|
|
|
|
|
|
Ok(mut resp) => {
|
2026-04-27 07:16:08 -03:00
|
|
|
|
let (stop_tx, stop_rx) = std::sync::mpsc::channel::<()>();
|
|
|
|
|
|
let cam_worker = std::thread::spawn({
|
|
|
|
|
|
let cam = cam.clone();
|
|
|
|
|
|
let telemetry = telemetry.clone();
|
|
|
|
|
|
let queue = queue.clone();
|
|
|
|
|
|
move || loop {
|
|
|
|
|
|
if stop_rx.try_recv().is_ok() {
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
let Some(pkt) = cam.pull() else {
|
|
|
|
|
|
std::thread::sleep(Duration::from_millis(5));
|
|
|
|
|
|
continue;
|
|
|
|
|
|
};
|
|
|
|
|
|
static CNT: std::sync::atomic::AtomicU64 =
|
|
|
|
|
|
std::sync::atomic::AtomicU64::new(0);
|
|
|
|
|
|
let n = CNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
|
|
|
|
|
if n < 10 || n.is_multiple_of(120) {
|
|
|
|
|
|
tracing::trace!("📸 cli frame#{n} {} B", pkt.data.len());
|
|
|
|
|
|
}
|
|
|
|
|
|
tracing::trace!("📸⬆️ sent webcam AU pts={} {} B", pkt.pts, pkt.data.len());
|
|
|
|
|
|
let enqueue_age = crate::live_capture_clock::packet_age(pkt.pts);
|
|
|
|
|
|
let stats = queue.push(pkt, enqueue_age);
|
|
|
|
|
|
if stats.dropped_queue_full > 0 {
|
|
|
|
|
|
telemetry.record_queue_full_drop(stats.dropped_queue_full);
|
2026-04-27 10:24:53 -03:00
|
|
|
|
warn!(
|
|
|
|
|
|
dropped_queue_full = stats.dropped_queue_full,
|
|
|
|
|
|
queue_depth = stats.queue_depth,
|
|
|
|
|
|
enqueue_age_ms = duration_ms(enqueue_age),
|
|
|
|
|
|
"📸 upstream camera queue dropped the oldest frame because it was full"
|
|
|
|
|
|
);
|
2026-04-27 07:16:08 -03:00
|
|
|
|
}
|
|
|
|
|
|
telemetry.record_enqueue(
|
|
|
|
|
|
queue_depth_u32(stats.queue_depth),
|
|
|
|
|
|
duration_ms(enqueue_age),
|
|
|
|
|
|
0.0,
|
|
|
|
|
|
);
|
|
|
|
|
|
}
|
|
|
|
|
|
});
|
2026-04-24 00:30:07 -03:00
|
|
|
|
delay = Duration::from_secs(1);
|
|
|
|
|
|
telemetry.record_connected();
|
2026-04-23 07:00:06 -03:00
|
|
|
|
while resp.get_mut().message().await.transpose().is_some() {}
|
2026-04-24 00:30:07 -03:00
|
|
|
|
telemetry.record_disconnect("camera uplink stream ended");
|
2026-04-27 07:16:08 -03:00
|
|
|
|
queue.close();
|
|
|
|
|
|
let _ = stop_tx.send(());
|
|
|
|
|
|
let _ = cam_worker.join();
|
2026-04-23 07:00:06 -03:00
|
|
|
|
}
|
|
|
|
|
|
Err(e) if e.code() == tonic::Code::Unimplemented => {
|
|
|
|
|
|
tracing::warn!("📸 server does not support StreamCamera – giving up");
|
2026-04-24 00:30:07 -03:00
|
|
|
|
telemetry.record_disconnect("camera uplink unavailable on server");
|
|
|
|
|
|
queue.close();
|
|
|
|
|
|
return;
|
2026-04-23 07:00:06 -03:00
|
|
|
|
}
|
|
|
|
|
|
Err(e) => {
|
2026-04-24 00:30:07 -03:00
|
|
|
|
telemetry.record_disconnect(format!("camera uplink connect failed: {e}"));
|
2026-04-23 07:00:06 -03:00
|
|
|
|
tracing::warn!("❌📸 connect failed: {e:?}");
|
2026-04-24 00:30:07 -03:00
|
|
|
|
delay = app_support::next_delay(delay);
|
2026-04-23 07:00:06 -03:00
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-04-24 00:30:07 -03:00
|
|
|
|
|
|
|
|
|
|
queue.close();
|
2026-04-23 07:00:06 -03:00
|
|
|
|
tokio::time::sleep(delay).await;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
2026-04-24 00:30:07 -03:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(not(coverage))]
|
|
|
|
|
|
const VIDEO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
|
|
|
|
|
|
crate::uplink_fresh_queue::FreshQueueConfig {
|
2026-04-27 07:16:08 -03:00
|
|
|
|
capacity: 32,
|
|
|
|
|
|
max_age: Duration::from_secs(1),
|
2026-04-24 00:30:07 -03:00
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(not(coverage))]
|
|
|
|
|
|
const AUDIO_UPLINK_QUEUE: crate::uplink_fresh_queue::FreshQueueConfig =
|
|
|
|
|
|
crate::uplink_fresh_queue::FreshQueueConfig {
|
|
|
|
|
|
capacity: 16,
|
|
|
|
|
|
max_age: Duration::from_millis(400),
|
|
|
|
|
|
};
|
2026-04-23 07:00:06 -03:00
|
|
|
|
|
2026-04-24 00:30:07 -03:00
|
|
|
|
#[cfg(not(coverage))]
|
|
|
|
|
|
fn queue_depth_u32(depth: usize) -> u32 {
|
|
|
|
|
|
depth.try_into().unwrap_or(u32::MAX)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#[cfg(not(coverage))]
|
|
|
|
|
|
fn duration_ms(duration: Duration) -> f32 {
|
|
|
|
|
|
duration.as_secs_f32() * 1_000.0
|
|
|
|
|
|
}
|