lesavka/client/src/app/uplink_media/webcam_media_loop.rs

374 lines
18 KiB
Rust
Raw Normal View History

impl LesavkaClientApp {
/*──────────────── bundled webcam + mic stream ─────────────────*/
#[cfg(not(coverage))]
#[allow(clippy::too_many_arguments)]
/// Keeps `webcam_media_loop` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
async fn webcam_media_loop(
ep: Channel,
initial_camera_source: Option<String>,
initial_camera_profile: Option<String>,
initial_microphone_source: Option<String>,
camera_cfg: Option<crate::input::camera::CameraConfig>,
camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
media_controls: crate::live_media_control::LiveMediaControls,
) {
let mut delay = Duration::from_secs(1);
let mut startup_epoch_heal_delay = upstream_epoch_auto_heal_delay();
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let state = media_controls.refresh();
let camera_requested = state.camera;
if !camera_requested {
camera_telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let active_camera_source = state.camera_source.resolve(initial_camera_source.as_deref());
let active_camera_profile =
state.camera_profile.resolve(initial_camera_profile.as_deref());
let active_microphone_source = state
.microphone_source
.resolve(initial_microphone_source.as_deref());
let capture_profile = active_camera_profile
.as_deref()
.and_then(parse_camera_profile_id);
let use_default_microphone = matches!(
state.microphone_source,
crate::live_media_control::MediaDeviceChoice::Auto
) && active_microphone_source.is_none();
let setup_camera_source = active_camera_source.clone();
let setup_microphone_source = active_microphone_source.clone();
let setup = tokio::task::spawn_blocking(move || {
let microphone = if use_default_microphone {
MicrophoneCapture::new_default_source()
} else {
MicrophoneCapture::new_with_source(setup_microphone_source.as_deref())
}?;
let camera = if camera_requested {
Some(CameraCapture::new_with_capture_profile(
setup_camera_source.as_deref(),
camera_cfg,
capture_profile,
)?)
} else {
None
};
Ok::<_, anyhow::Error>((camera.map(Arc::new), Arc::new(microphone)))
})
.await;
let (camera, microphone) = match setup {
Ok(Ok(captures)) => captures,
Ok(Err(err)) => {
camera_telemetry.record_disconnect(format!(
"bundled webcam media setup failed: {err:#}"
));
microphone_telemetry.record_disconnect(format!(
"bundled webcam media setup failed: {err:#}"
));
warn!(
"📦 bundled webcam media setup failed for camera={:?} mic={:?}: {err:#}",
active_camera_source.as_deref().unwrap_or("auto"),
active_microphone_source.as_deref().unwrap_or("auto")
);
if camera_requested {
abort_if_required_media_source_failed(
"camera",
"📸",
active_camera_source.as_deref(),
&err,
);
}
abort_if_required_media_source_failed(
"microphone",
"🎤",
active_microphone_source.as_deref(),
&err,
);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
Err(err) => {
camera_telemetry.record_disconnect(format!(
"bundled webcam media setup task failed: {err}"
));
microphone_telemetry.record_disconnect(format!(
"bundled webcam media setup task failed: {err}"
));
warn!("📦 bundled webcam media setup task failed: {err}");
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
};
camera_telemetry.record_reconnect_attempt();
microphone_telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue: crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle> =
crate::uplink_fresh_queue::FreshPacketQueue::new(BUNDLED_MEDIA_UPLINK_QUEUE);
let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new(
"bundled-webcam-media",
"📦",
)));
let queue_stream = queue.clone();
let camera_telemetry_stream = camera_telemetry.clone();
let microphone_telemetry_stream = microphone_telemetry.clone();
let drop_log_stream = Arc::clone(&drop_log);
let outbound = async_stream::stream! {
loop {
let next = queue_stream.pop_fresh().await;
if next.dropped_stale > 0 {
camera_telemetry_stream.record_stale_drop(next.dropped_stale);
microphone_telemetry_stream.record_stale_drop(next.dropped_stale);
log_uplink_drop(
&drop_log_stream,
UplinkDropReason::Stale,
next.dropped_stale,
next.queue_depth,
duration_ms(next.delivery_age),
);
}
if let Some(mut bundle) = next.packet {
let queue_depth = queue_depth_u32(next.queue_depth);
let delivery_age_ms = duration_ms(next.delivery_age);
if bundle.video.is_some() {
camera_telemetry_stream.record_streamed(
queue_depth,
delivery_age_ms,
);
}
if !bundle.audio.is_empty() {
microphone_telemetry_stream.record_streamed(
queue_depth,
delivery_age_ms,
);
}
attach_bundle_queue_metadata(&mut bundle, next.queue_depth, next.delivery_age);
yield bundle;
continue;
}
break;
}
};
match cli.stream_webcam_media(Request::new(outbound)).await {
Ok(mut resp) => {
let stop = Arc::new(AtomicBool::new(false));
let (event_tx, event_rx) = std::sync::mpsc::sync_channel::<BundledCaptureEvent>(
BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY,
);
let camera_worker = camera.as_ref().map(|camera| {
let camera = Arc::clone(camera);
let stop = Arc::clone(&stop);
let event_tx = event_tx.clone();
let media_controls = media_controls.clone();
let initial_camera_source = initial_camera_source.clone();
let initial_camera_profile = initial_camera_profile.clone();
let active_camera_source = active_camera_source.clone();
let active_camera_profile = active_camera_profile.clone();
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_camera_source.as_deref());
let desired_profile =
state.camera_profile.resolve(initial_camera_profile.as_deref());
if !state.camera
|| desired_source != active_camera_source
|| desired_profile != active_camera_profile
{
stop.store(true, Ordering::Relaxed);
let _ = event_tx.try_send(BundledCaptureEvent::Restart);
break;
}
if let Some(mut pkt) = camera.pull() {
let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt);
match event_tx.try_send(BundledCaptureEvent::Video(pkt)) {
Ok(()) => {}
Err(std::sync::mpsc::TrySendError::Full(_)) => continue,
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
}
}
}
})
});
let microphone_worker = {
let microphone = Arc::clone(&microphone);
let stop = Arc::clone(&stop);
let event_tx = event_tx.clone();
let media_controls = media_controls.clone();
let initial_microphone_source = initial_microphone_source.clone();
let active_microphone_source = active_microphone_source.clone();
let active_camera_requested = camera_requested;
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let state = media_controls.refresh();
let desired_source = state
.microphone_source
.resolve(initial_microphone_source.as_deref());
if state.camera != active_camera_requested
|| !(state.microphone || state.camera)
|| desired_source != active_microphone_source
{
stop.store(true, Ordering::Relaxed);
let _ = event_tx.try_send(BundledCaptureEvent::Restart);
break;
}
if let Some(mut pkt) = microphone.pull() {
let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt);
match event_tx.try_send(BundledCaptureEvent::Audio(pkt)) {
Ok(()) => {}
Err(std::sync::mpsc::TrySendError::Full(_)) => continue,
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
}
}
}
})
};
drop(event_tx);
let bundle_worker = {
let stop = Arc::clone(&stop);
let queue = queue.clone();
let camera_telemetry = camera_telemetry.clone();
let microphone_telemetry = microphone_telemetry.clone();
let drop_log = Arc::clone(&drop_log);
std::thread::spawn(move || {
bundle_captured_media(
event_rx,
stop,
queue,
camera.is_some(),
camera_telemetry,
microphone_telemetry,
drop_log,
);
})
};
delay = Duration::from_secs(1);
camera_telemetry.record_connected();
microphone_telemetry.record_connected();
if let Some(heal_delay) = startup_epoch_heal_delay.take() {
spawn_upstream_epoch_auto_heal(ep.clone(), heal_delay);
}
while resp.get_mut().message().await.transpose().is_some() {}
camera_telemetry.record_disconnect("bundled webcam media stream ended");
microphone_telemetry.record_disconnect("bundled webcam media stream ended");
stop.store(true, Ordering::Relaxed);
queue.close();
if let Some(worker) = camera_worker {
let _ = worker.join();
}
let _ = microphone_worker.join();
let _ = bundle_worker.join();
}
Err(e) if e.code() == tonic::Code::Unimplemented => {
camera_telemetry.record_disconnect("bundled webcam media unavailable on server");
microphone_telemetry
.record_disconnect("bundled webcam media unavailable on server");
warn!("📦 server does not support bundled webcam media retrying");
delay = app_support::next_delay(delay);
}
Err(e) => {
camera_telemetry
.record_disconnect(format!("bundled webcam media connect failed: {e}"));
microphone_telemetry
.record_disconnect(format!("bundled webcam media connect failed: {e}"));
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
warn!("❌📦 bundled webcam media connect failed: {e}");
} else {
debug!("❌📦 bundled webcam media reconnect failed: {e}");
}
delay = app_support::next_delay(delay);
}
}
queue.close();
tokio::time::sleep(delay).await;
}
}
}
const DEFAULT_UPSTREAM_AUTO_HEAL_AFTER_MS: u64 = 3_000;
/// Resolve whether the live bundled uplink should force one startup epoch heal.
///
/// Inputs: `LESAVKA_UPSTREAM_AUTO_HEAL` and
/// `LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS`; output is the delay before the heal
/// request, or `None` when disabled. Why: browser/WebRTC consumers can latch a
/// stale UAC epoch on first join, and the safe fix is to retire the mic epoch
/// once the live bundled stream has settled rather than changing calibration.
#[cfg(not(coverage))]
fn upstream_epoch_auto_heal_delay() -> Option<Duration> {
parse_upstream_epoch_auto_heal_delay(
std::env::var("LESAVKA_UPSTREAM_AUTO_HEAL").ok().as_deref(),
std::env::var("LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS")
.ok()
.as_deref(),
)
}
/// Parse the live-uplink auto-heal knobs without reading global environment.
///
/// Inputs are optional raw environment values. Output is a bounded delay, or
/// `None` when the operator opted out. Why: the default needs to be safe enough
/// for real calls while tests can pin the behavior without racing process env.
#[cfg(not(coverage))]
fn parse_upstream_epoch_auto_heal_delay(
enabled: Option<&str>,
delay_ms: Option<&str>,
) -> Option<Duration> {
if enabled.is_some_and(disables_upstream_epoch_auto_heal) {
return None;
}
let millis = delay_ms
.and_then(|raw| raw.trim().parse::<u64>().ok())
.unwrap_or(DEFAULT_UPSTREAM_AUTO_HEAL_AFTER_MS);
(millis > 0).then(|| Duration::from_millis(millis))
}
/// Return whether a text env value disables startup A/V epoch healing.
#[cfg(not(coverage))]
fn disables_upstream_epoch_auto_heal(raw: &str) -> bool {
matches!(
raw.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "off" | "no"
)
}
/// Ask the relay to retire the current UAC epoch after the stream is live.
///
/// Inputs: the connected relay channel and a delay. Output is only log
/// telemetry. Why: this mimics the known-good probe side effect that superseded
/// a stale first-join epoch and forced the live client stream to reconnect
/// cleanly without touching USB enumeration or A/V calibration.
#[cfg(not(coverage))]
fn spawn_upstream_epoch_auto_heal(ep: Channel, delay: Duration) {
tokio::spawn(async move {
tokio::time::sleep(delay).await;
let mut client = RelayClient::new(ep);
match client.recover_uac(Request::new(Empty {})).await {
Ok(reply) => {
if reply.into_inner().ok {
info!(
delay_ms = delay.as_millis(),
"📦🩺 automatic upstream A/V epoch heal requested"
);
} else {
warn!("📦🩺 automatic upstream A/V epoch heal was refused by relay");
}
}
Err(err) => warn!(%err, "📦🩺 automatic upstream A/V epoch heal failed"),
}
});
}