296 lines
15 KiB
Rust
296 lines
15 KiB
Rust
|
|
impl LesavkaClientApp {
|
|||
|
|
/*──────────────── bundled webcam + mic stream ─────────────────*/
|
|||
|
|
#[cfg(not(coverage))]
|
|||
|
|
#[allow(clippy::too_many_arguments)]
|
|||
|
|
/// Keeps `webcam_media_loop` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
|
|||
|
|
/// Inputs are the typed parameters; output is the return value or side effect.
|
|||
|
|
async fn webcam_media_loop(
|
|||
|
|
ep: Channel,
|
|||
|
|
initial_camera_source: Option<String>,
|
|||
|
|
initial_camera_profile: Option<String>,
|
|||
|
|
initial_microphone_source: Option<String>,
|
|||
|
|
camera_cfg: Option<crate::input::camera::CameraConfig>,
|
|||
|
|
camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
|
|||
|
|
microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
|
|||
|
|
media_controls: crate::live_media_control::LiveMediaControls,
|
|||
|
|
) {
|
|||
|
|
let mut delay = Duration::from_secs(1);
|
|||
|
|
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
|
|||
|
|
|
|||
|
|
loop {
|
|||
|
|
let state = media_controls.refresh();
|
|||
|
|
let camera_requested = state.camera;
|
|||
|
|
if !camera_requested {
|
|||
|
|
camera_telemetry.record_enabled(false);
|
|||
|
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
let active_camera_source = state.camera_source.resolve(initial_camera_source.as_deref());
|
|||
|
|
let active_camera_profile =
|
|||
|
|
state.camera_profile.resolve(initial_camera_profile.as_deref());
|
|||
|
|
let active_microphone_source = state
|
|||
|
|
.microphone_source
|
|||
|
|
.resolve(initial_microphone_source.as_deref());
|
|||
|
|
let capture_profile = active_camera_profile
|
|||
|
|
.as_deref()
|
|||
|
|
.and_then(parse_camera_profile_id);
|
|||
|
|
let use_default_microphone = matches!(
|
|||
|
|
state.microphone_source,
|
|||
|
|
crate::live_media_control::MediaDeviceChoice::Auto
|
|||
|
|
) && active_microphone_source.is_none();
|
|||
|
|
let setup_camera_source = active_camera_source.clone();
|
|||
|
|
let setup_microphone_source = active_microphone_source.clone();
|
|||
|
|
|
|||
|
|
let setup = tokio::task::spawn_blocking(move || {
|
|||
|
|
let microphone = if use_default_microphone {
|
|||
|
|
MicrophoneCapture::new_default_source()
|
|||
|
|
} else {
|
|||
|
|
MicrophoneCapture::new_with_source(setup_microphone_source.as_deref())
|
|||
|
|
}?;
|
|||
|
|
let camera = if camera_requested {
|
|||
|
|
Some(CameraCapture::new_with_capture_profile(
|
|||
|
|
setup_camera_source.as_deref(),
|
|||
|
|
camera_cfg,
|
|||
|
|
capture_profile,
|
|||
|
|
)?)
|
|||
|
|
} else {
|
|||
|
|
None
|
|||
|
|
};
|
|||
|
|
Ok::<_, anyhow::Error>((camera.map(Arc::new), Arc::new(microphone)))
|
|||
|
|
})
|
|||
|
|
.await;
|
|||
|
|
|
|||
|
|
let (camera, microphone) = match setup {
|
|||
|
|
Ok(Ok(captures)) => captures,
|
|||
|
|
Ok(Err(err)) => {
|
|||
|
|
camera_telemetry.record_disconnect(format!(
|
|||
|
|
"bundled webcam media setup failed: {err:#}"
|
|||
|
|
));
|
|||
|
|
microphone_telemetry.record_disconnect(format!(
|
|||
|
|
"bundled webcam media setup failed: {err:#}"
|
|||
|
|
));
|
|||
|
|
warn!(
|
|||
|
|
"📦 bundled webcam media setup failed for camera={:?} mic={:?}: {err:#}",
|
|||
|
|
active_camera_source.as_deref().unwrap_or("auto"),
|
|||
|
|
active_microphone_source.as_deref().unwrap_or("auto")
|
|||
|
|
);
|
|||
|
|
if camera_requested {
|
|||
|
|
abort_if_required_media_source_failed(
|
|||
|
|
"camera",
|
|||
|
|
"📸",
|
|||
|
|
active_camera_source.as_deref(),
|
|||
|
|
&err,
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
abort_if_required_media_source_failed(
|
|||
|
|
"microphone",
|
|||
|
|
"🎤",
|
|||
|
|
active_microphone_source.as_deref(),
|
|||
|
|
&err,
|
|||
|
|
);
|
|||
|
|
delay = app_support::next_delay(delay);
|
|||
|
|
tokio::time::sleep(delay).await;
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
Err(err) => {
|
|||
|
|
camera_telemetry.record_disconnect(format!(
|
|||
|
|
"bundled webcam media setup task failed: {err}"
|
|||
|
|
));
|
|||
|
|
microphone_telemetry.record_disconnect(format!(
|
|||
|
|
"bundled webcam media setup task failed: {err}"
|
|||
|
|
));
|
|||
|
|
warn!("📦 bundled webcam media setup task failed: {err}");
|
|||
|
|
delay = app_support::next_delay(delay);
|
|||
|
|
tokio::time::sleep(delay).await;
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
camera_telemetry.record_reconnect_attempt();
|
|||
|
|
microphone_telemetry.record_reconnect_attempt();
|
|||
|
|
let mut cli = RelayClient::new(ep.clone());
|
|||
|
|
let queue: crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle> =
|
|||
|
|
crate::uplink_fresh_queue::FreshPacketQueue::new(BUNDLED_MEDIA_UPLINK_QUEUE);
|
|||
|
|
let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new(
|
|||
|
|
"bundled-webcam-media",
|
|||
|
|
"📦",
|
|||
|
|
)));
|
|||
|
|
|
|||
|
|
let queue_stream = queue.clone();
|
|||
|
|
let camera_telemetry_stream = camera_telemetry.clone();
|
|||
|
|
let microphone_telemetry_stream = microphone_telemetry.clone();
|
|||
|
|
let drop_log_stream = Arc::clone(&drop_log);
|
|||
|
|
let outbound = async_stream::stream! {
|
|||
|
|
loop {
|
|||
|
|
let next = queue_stream.pop_fresh().await;
|
|||
|
|
if next.dropped_stale > 0 {
|
|||
|
|
camera_telemetry_stream.record_stale_drop(next.dropped_stale);
|
|||
|
|
microphone_telemetry_stream.record_stale_drop(next.dropped_stale);
|
|||
|
|
log_uplink_drop(
|
|||
|
|
&drop_log_stream,
|
|||
|
|
UplinkDropReason::Stale,
|
|||
|
|
next.dropped_stale,
|
|||
|
|
next.queue_depth,
|
|||
|
|
duration_ms(next.delivery_age),
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
if let Some(mut bundle) = next.packet {
|
|||
|
|
let queue_depth = queue_depth_u32(next.queue_depth);
|
|||
|
|
let delivery_age_ms = duration_ms(next.delivery_age);
|
|||
|
|
if bundle.video.is_some() {
|
|||
|
|
camera_telemetry_stream.record_streamed(
|
|||
|
|
queue_depth,
|
|||
|
|
delivery_age_ms,
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
if !bundle.audio.is_empty() {
|
|||
|
|
microphone_telemetry_stream.record_streamed(
|
|||
|
|
queue_depth,
|
|||
|
|
delivery_age_ms,
|
|||
|
|
);
|
|||
|
|
}
|
|||
|
|
attach_bundle_queue_metadata(&mut bundle, next.queue_depth, next.delivery_age);
|
|||
|
|
yield bundle;
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
match cli.stream_webcam_media(Request::new(outbound)).await {
|
|||
|
|
Ok(mut resp) => {
|
|||
|
|
let stop = Arc::new(AtomicBool::new(false));
|
|||
|
|
let (event_tx, event_rx) = std::sync::mpsc::sync_channel::<BundledCaptureEvent>(
|
|||
|
|
BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY,
|
|||
|
|
);
|
|||
|
|
let camera_worker = camera.as_ref().map(|camera| {
|
|||
|
|
let camera = Arc::clone(camera);
|
|||
|
|
let stop = Arc::clone(&stop);
|
|||
|
|
let event_tx = event_tx.clone();
|
|||
|
|
let media_controls = media_controls.clone();
|
|||
|
|
let initial_camera_source = initial_camera_source.clone();
|
|||
|
|
let initial_camera_profile = initial_camera_profile.clone();
|
|||
|
|
let active_camera_source = active_camera_source.clone();
|
|||
|
|
let active_camera_profile = active_camera_profile.clone();
|
|||
|
|
std::thread::spawn(move || {
|
|||
|
|
while !stop.load(Ordering::Relaxed) {
|
|||
|
|
let state = media_controls.refresh();
|
|||
|
|
let desired_source =
|
|||
|
|
state.camera_source.resolve(initial_camera_source.as_deref());
|
|||
|
|
let desired_profile =
|
|||
|
|
state.camera_profile.resolve(initial_camera_profile.as_deref());
|
|||
|
|
if !state.camera
|
|||
|
|
|| desired_source != active_camera_source
|
|||
|
|
|| desired_profile != active_camera_profile
|
|||
|
|
{
|
|||
|
|
stop.store(true, Ordering::Relaxed);
|
|||
|
|
let _ = event_tx.try_send(BundledCaptureEvent::Restart);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
if let Some(mut pkt) = camera.pull() {
|
|||
|
|
let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt);
|
|||
|
|
match event_tx.try_send(BundledCaptureEvent::Video(pkt)) {
|
|||
|
|
Ok(()) => {}
|
|||
|
|
Err(std::sync::mpsc::TrySendError::Full(_)) => continue,
|
|||
|
|
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
})
|
|||
|
|
});
|
|||
|
|
let microphone_worker = {
|
|||
|
|
let microphone = Arc::clone(µphone);
|
|||
|
|
let stop = Arc::clone(&stop);
|
|||
|
|
let event_tx = event_tx.clone();
|
|||
|
|
let media_controls = media_controls.clone();
|
|||
|
|
let initial_microphone_source = initial_microphone_source.clone();
|
|||
|
|
let active_microphone_source = active_microphone_source.clone();
|
|||
|
|
let active_camera_requested = camera_requested;
|
|||
|
|
std::thread::spawn(move || {
|
|||
|
|
while !stop.load(Ordering::Relaxed) {
|
|||
|
|
let state = media_controls.refresh();
|
|||
|
|
let desired_source = state
|
|||
|
|
.microphone_source
|
|||
|
|
.resolve(initial_microphone_source.as_deref());
|
|||
|
|
if state.camera != active_camera_requested
|
|||
|
|
|| !(state.microphone || state.camera)
|
|||
|
|
|| desired_source != active_microphone_source
|
|||
|
|
{
|
|||
|
|
stop.store(true, Ordering::Relaxed);
|
|||
|
|
let _ = event_tx.try_send(BundledCaptureEvent::Restart);
|
|||
|
|
break;
|
|||
|
|
}
|
|||
|
|
if let Some(mut pkt) = microphone.pull() {
|
|||
|
|
let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt);
|
|||
|
|
match event_tx.try_send(BundledCaptureEvent::Audio(pkt)) {
|
|||
|
|
Ok(()) => {}
|
|||
|
|
Err(std::sync::mpsc::TrySendError::Full(_)) => continue,
|
|||
|
|
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
})
|
|||
|
|
};
|
|||
|
|
drop(event_tx);
|
|||
|
|
|
|||
|
|
let bundle_worker = {
|
|||
|
|
let stop = Arc::clone(&stop);
|
|||
|
|
let queue = queue.clone();
|
|||
|
|
let camera_telemetry = camera_telemetry.clone();
|
|||
|
|
let microphone_telemetry = microphone_telemetry.clone();
|
|||
|
|
let drop_log = Arc::clone(&drop_log);
|
|||
|
|
std::thread::spawn(move || {
|
|||
|
|
bundle_captured_media(
|
|||
|
|
event_rx,
|
|||
|
|
stop,
|
|||
|
|
queue,
|
|||
|
|
camera.is_some(),
|
|||
|
|
camera_telemetry,
|
|||
|
|
microphone_telemetry,
|
|||
|
|
drop_log,
|
|||
|
|
);
|
|||
|
|
})
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
delay = Duration::from_secs(1);
|
|||
|
|
camera_telemetry.record_connected();
|
|||
|
|
microphone_telemetry.record_connected();
|
|||
|
|
while resp.get_mut().message().await.transpose().is_some() {}
|
|||
|
|
camera_telemetry.record_disconnect("bundled webcam media stream ended");
|
|||
|
|
microphone_telemetry.record_disconnect("bundled webcam media stream ended");
|
|||
|
|
stop.store(true, Ordering::Relaxed);
|
|||
|
|
queue.close();
|
|||
|
|
if let Some(worker) = camera_worker {
|
|||
|
|
let _ = worker.join();
|
|||
|
|
}
|
|||
|
|
let _ = microphone_worker.join();
|
|||
|
|
let _ = bundle_worker.join();
|
|||
|
|
}
|
|||
|
|
Err(e) if e.code() == tonic::Code::Unimplemented => {
|
|||
|
|
camera_telemetry.record_disconnect("bundled webcam media unavailable on server");
|
|||
|
|
microphone_telemetry
|
|||
|
|
.record_disconnect("bundled webcam media unavailable on server");
|
|||
|
|
warn!("📦 server does not support bundled webcam media – retrying");
|
|||
|
|
delay = app_support::next_delay(delay);
|
|||
|
|
}
|
|||
|
|
Err(e) => {
|
|||
|
|
camera_telemetry
|
|||
|
|
.record_disconnect(format!("bundled webcam media connect failed: {e}"));
|
|||
|
|
microphone_telemetry
|
|||
|
|
.record_disconnect(format!("bundled webcam media connect failed: {e}"));
|
|||
|
|
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
|
|||
|
|
warn!("❌📦 bundled webcam media connect failed: {e}");
|
|||
|
|
} else {
|
|||
|
|
debug!("❌📦 bundled webcam media reconnect failed: {e}");
|
|||
|
|
}
|
|||
|
|
delay = app_support::next_delay(delay);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
queue.close();
|
|||
|
|
tokio::time::sleep(delay).await;
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|