lesavka/client/src/app/uplink_media/webcam_media_loop.rs

493 lines
24 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

impl LesavkaClientApp {
/*──────────────── bundled webcam + mic stream ─────────────────*/
#[cfg(not(coverage))]
#[allow(clippy::too_many_arguments)]
/// Keeps `webcam_media_loop` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency.
/// Inputs are the typed parameters; output is the return value or side effect.
async fn webcam_media_loop(
ep: Channel,
initial_camera_source: Option<String>,
initial_camera_profile: Option<String>,
initial_microphone_source: Option<String>,
camera_cfg: Option<crate::input::camera::CameraConfig>,
camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle,
media_controls: crate::live_media_control::LiveMediaControls,
) {
let mut delay = Duration::from_secs(1);
let mut startup_epoch_heal_delay = upstream_epoch_auto_heal_delay();
static FAIL_CNT: AtomicUsize = AtomicUsize::new(0);
loop {
let state = media_controls.refresh();
let camera_requested = state.camera;
if !camera_requested {
camera_telemetry.record_enabled(false);
tokio::time::sleep(Duration::from_millis(100)).await;
continue;
}
let active_camera_source = state.camera_source.resolve(initial_camera_source.as_deref());
let active_camera_profile =
state.camera_profile.resolve(initial_camera_profile.as_deref());
let active_camera_cfg = camera_config_with_live_codec(camera_cfg, &state.camera_codec);
let active_camera_codec = active_camera_cfg.map(|cfg| cfg.codec);
let recover_hevc_after_drops = upstream_camera_uses_hevc(active_camera_cfg);
let active_microphone_source = state
.microphone_source
.resolve(initial_microphone_source.as_deref());
let active_audio_codec = state
.audio_codec
.resolve(lesavka_common::audio_transport::UpstreamAudioCodec::Opus);
let active_noise_suppression = state.noise_suppression.resolve(false);
let capture_profile = active_camera_profile
.as_deref()
.and_then(parse_camera_profile_id);
let use_default_microphone = matches!(
state.microphone_source,
crate::live_media_control::MediaDeviceChoice::Auto
) && active_microphone_source.is_none();
let setup_camera_source = active_camera_source.clone();
let setup_microphone_source = active_microphone_source.clone();
let setup = tokio::task::spawn_blocking(move || {
let microphone = if use_default_microphone {
MicrophoneCapture::new_default_source_options(
active_audio_codec,
active_noise_suppression,
)
} else {
MicrophoneCapture::new_with_source_options(
setup_microphone_source.as_deref(),
active_audio_codec,
active_noise_suppression,
)
}?;
let camera = if camera_requested {
Some(CameraCapture::new_with_capture_profile(
setup_camera_source.as_deref(),
active_camera_cfg,
capture_profile,
)?)
} else {
None
};
Ok::<_, anyhow::Error>((camera.map(Arc::new), Arc::new(microphone)))
})
.await;
let (camera, microphone) = match setup {
Ok(Ok(captures)) => captures,
Ok(Err(err)) => {
camera_telemetry.record_disconnect(format!(
"bundled webcam media setup failed: {err:#}"
));
microphone_telemetry.record_disconnect(format!(
"bundled webcam media setup failed: {err:#}"
));
warn!(
"📦 bundled webcam media setup failed for camera={:?} mic={:?}: {err:#}",
active_camera_source.as_deref().unwrap_or("auto"),
active_microphone_source.as_deref().unwrap_or("auto")
);
if camera_requested {
abort_if_required_media_source_failed(
"camera",
"📸",
active_camera_source.as_deref(),
&err,
);
}
abort_if_required_media_source_failed(
"microphone",
"🎤",
active_microphone_source.as_deref(),
&err,
);
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
Err(err) => {
camera_telemetry.record_disconnect(format!(
"bundled webcam media setup task failed: {err}"
));
microphone_telemetry.record_disconnect(format!(
"bundled webcam media setup task failed: {err}"
));
warn!("📦 bundled webcam media setup task failed: {err}");
delay = app_support::next_delay(delay);
tokio::time::sleep(delay).await;
continue;
}
};
camera_telemetry.record_reconnect_attempt();
microphone_telemetry.record_reconnect_attempt();
let mut cli = RelayClient::new(ep.clone());
let queue: crate::uplink_fresh_queue::FreshPacketQueue<UpstreamMediaBundle> =
crate::uplink_fresh_queue::FreshPacketQueue::new(BUNDLED_MEDIA_UPLINK_QUEUE);
let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new(
"bundled-webcam-media",
"📦",
)));
let queue_stream = queue.clone();
let camera_telemetry_stream = camera_telemetry.clone();
let microphone_telemetry_stream = microphone_telemetry.clone();
let drop_log_stream = Arc::clone(&drop_log);
let outbound = async_stream::stream! {
let mut waiting_for_hevc_keyframe = false;
loop {
let next = queue_stream.pop_fresh().await;
if next.dropped_stale > 0 {
camera_telemetry_stream.record_stale_drop(next.dropped_stale);
microphone_telemetry_stream.record_stale_drop(next.dropped_stale);
log_uplink_drop(
&drop_log_stream,
UplinkDropReason::Stale,
next.dropped_stale,
next.queue_depth,
duration_ms(next.delivery_age),
);
if recover_hevc_after_drops {
waiting_for_hevc_keyframe = true;
}
}
if let Some(mut bundle) = next.packet {
if recover_hevc_after_drops
&& should_hold_hevc_bundle_for_keyframe_recovery(
waiting_for_hevc_keyframe,
&bundle,
)
{
camera_telemetry_stream.record_stale_drop(1);
microphone_telemetry_stream.record_stale_drop(1);
log_uplink_drop(
&drop_log_stream,
UplinkDropReason::Stale,
1,
next.queue_depth,
duration_ms(next.delivery_age),
);
continue;
}
if recover_hevc_after_drops && bundle_has_hevc_recovery_keyframe(&bundle) {
waiting_for_hevc_keyframe = false;
}
let queue_depth = queue_depth_u32(next.queue_depth);
let delivery_age_ms = duration_ms(next.delivery_age);
if bundle.video.is_some() {
camera_telemetry_stream.record_streamed(
queue_depth,
delivery_age_ms,
);
}
if !bundle.audio.is_empty() {
microphone_telemetry_stream.record_streamed(
queue_depth,
delivery_age_ms,
);
}
attach_bundle_queue_metadata(&mut bundle, next.queue_depth, next.delivery_age);
yield bundle;
continue;
}
break;
}
};
match cli.stream_webcam_media(Request::new(outbound)).await {
Ok(mut resp) => {
let stop = Arc::new(AtomicBool::new(false));
let (event_tx, event_rx) = std::sync::mpsc::sync_channel::<BundledCaptureEvent>(
BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY,
);
let camera_worker = camera.as_ref().map(|camera| {
let camera = Arc::clone(camera);
let stop = Arc::clone(&stop);
let event_tx = event_tx.clone();
let media_controls = media_controls.clone();
let initial_camera_source = initial_camera_source.clone();
let initial_camera_profile = initial_camera_profile.clone();
let active_camera_source = active_camera_source.clone();
let active_camera_profile = active_camera_profile.clone();
std::thread::spawn(move || {
let mut waiting_for_hevc_keyframe = false;
while !stop.load(Ordering::Relaxed) {
let state = media_controls.refresh();
let desired_source =
state.camera_source.resolve(initial_camera_source.as_deref());
let desired_profile =
state.camera_profile.resolve(initial_camera_profile.as_deref());
let desired_camera_cfg =
camera_config_with_live_codec(camera_cfg, &state.camera_codec);
if !state.camera
|| desired_source != active_camera_source
|| desired_profile != active_camera_profile
|| desired_camera_cfg.map(|cfg| cfg.codec)
!= active_camera_codec
{
stop.store(true, Ordering::Relaxed);
let _ = event_tx.try_send(BundledCaptureEvent::Restart);
break;
}
if let Some(mut pkt) = camera.pull() {
if recover_hevc_after_drops
&& waiting_for_hevc_keyframe
&& !contains_hevc_irap(&pkt.data)
{
continue;
}
let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt);
let is_recovery_keyframe = recover_hevc_after_drops
&& contains_hevc_irap(&pkt.data);
match event_tx.try_send(BundledCaptureEvent::Video(pkt)) {
Ok(()) => {
if is_recovery_keyframe {
waiting_for_hevc_keyframe = false;
}
}
Err(std::sync::mpsc::TrySendError::Full(_)) => {
if recover_hevc_after_drops {
waiting_for_hevc_keyframe = true;
}
continue;
}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
}
} else {
note_hevc_capture_gap(
recover_hevc_after_drops,
&mut waiting_for_hevc_keyframe,
);
}
}
})
});
let microphone_worker = {
let microphone = Arc::clone(&microphone);
let stop = Arc::clone(&stop);
let event_tx = event_tx.clone();
let media_controls = media_controls.clone();
let initial_microphone_source = initial_microphone_source.clone();
let active_microphone_source = active_microphone_source.clone();
let active_camera_requested = camera_requested;
std::thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let state = media_controls.refresh();
let desired_source = state
.microphone_source
.resolve(initial_microphone_source.as_deref());
let desired_audio_codec = state
.audio_codec
.resolve(lesavka_common::audio_transport::UpstreamAudioCodec::Opus);
let desired_noise_suppression =
state.noise_suppression.resolve(false);
if state.camera != active_camera_requested
|| !(state.microphone || state.camera)
|| desired_source != active_microphone_source
|| desired_audio_codec != active_audio_codec
|| desired_noise_suppression != active_noise_suppression
{
stop.store(true, Ordering::Relaxed);
let _ = event_tx.try_send(BundledCaptureEvent::Restart);
break;
}
if let Some(mut pkt) = microphone.pull() {
let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt);
match event_tx.try_send(BundledCaptureEvent::Audio(pkt)) {
Ok(()) => {}
Err(std::sync::mpsc::TrySendError::Full(_)) => continue,
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break,
}
}
}
})
};
drop(event_tx);
let bundle_worker = {
let stop = Arc::clone(&stop);
let queue = queue.clone();
let camera_telemetry = camera_telemetry.clone();
let microphone_telemetry = microphone_telemetry.clone();
let drop_log = Arc::clone(&drop_log);
std::thread::spawn(move || {
bundle_captured_media(
event_rx,
stop,
queue,
camera.is_some(),
camera_telemetry,
microphone_telemetry,
drop_log,
);
})
};
delay = Duration::from_secs(1);
camera_telemetry.record_connected();
microphone_telemetry.record_connected();
if let Some(heal_delay) = startup_epoch_heal_delay.take() {
spawn_upstream_epoch_auto_heal(ep.clone(), heal_delay);
}
while resp.get_mut().message().await.transpose().is_some() {}
camera_telemetry.record_disconnect("bundled webcam media stream ended");
microphone_telemetry.record_disconnect("bundled webcam media stream ended");
stop.store(true, Ordering::Relaxed);
queue.close();
if let Some(worker) = camera_worker {
let _ = worker.join();
}
let _ = microphone_worker.join();
let _ = bundle_worker.join();
}
Err(e) if e.code() == tonic::Code::Unimplemented => {
camera_telemetry.record_disconnect("bundled webcam media unavailable on server");
microphone_telemetry
.record_disconnect("bundled webcam media unavailable on server");
warn!("📦 server does not support bundled webcam media retrying");
delay = app_support::next_delay(delay);
}
Err(e) => {
camera_telemetry
.record_disconnect(format!("bundled webcam media connect failed: {e}"));
microphone_telemetry
.record_disconnect(format!("bundled webcam media connect failed: {e}"));
if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 {
warn!("❌📦 bundled webcam media connect failed: {e}");
} else {
debug!("❌📦 bundled webcam media reconnect failed: {e}");
}
delay = app_support::next_delay(delay);
}
}
queue.close();
tokio::time::sleep(delay).await;
}
}
}
fn camera_config_with_live_codec(
cfg: Option<crate::input::camera::CameraConfig>,
choice: &crate::live_media_control::MediaCameraCodecChoice,
) -> Option<crate::input::camera::CameraConfig> {
let mut cfg = cfg?;
let fallback = camera_codec_id(cfg.codec);
if let Some(codec) = choice
.resolve(Some(fallback))
.as_deref()
.and_then(parse_live_camera_codec)
{
if codec == cfg.codec || live_codec_mismatch_allowed() {
cfg.codec = codec;
} else {
tracing::warn!(
target: "lesavka_client::camera",
requested = camera_codec_id(codec),
negotiated = fallback,
"ignoring live camera codec switch that would mismatch the server UVC contract"
);
}
}
Some(cfg)
}
fn camera_codec_id(codec: crate::input::camera::CameraCodec) -> &'static str {
match codec {
crate::input::camera::CameraCodec::Mjpeg => "mjpeg",
crate::input::camera::CameraCodec::Hevc => "hevc",
crate::input::camera::CameraCodec::H264 => "h264",
}
}
fn parse_live_camera_codec(raw: &str) -> Option<crate::input::camera::CameraCodec> {
match raw.trim().to_ascii_lowercase().as_str() {
"mjpeg" | "mjpg" | "jpeg" => Some(crate::input::camera::CameraCodec::Mjpeg),
"hevc" | "h265" | "h.265" => Some(crate::input::camera::CameraCodec::Hevc),
"h264" => Some(crate::input::camera::CameraCodec::H264),
_ => None,
}
}
fn live_codec_mismatch_allowed() -> bool {
std::env::var("LESAVKA_CAM_CODEC_FORCE")
.ok()
.map(|value| value.trim().to_ascii_lowercase())
.is_some_and(|value| matches!(value.as_str(), "1" | "true" | "yes" | "on"))
}
const DEFAULT_UPSTREAM_AUTO_HEAL_AFTER_MS: u64 = 3_000;
/// Resolve whether the live bundled uplink should force one startup epoch heal.
///
/// Inputs: `LESAVKA_UPSTREAM_AUTO_HEAL` and
/// `LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS`; output is the delay before the heal
/// request, or `None` when disabled. Why: browser/WebRTC consumers can latch a
/// stale UAC epoch on first join, and the safe fix is to retire the mic epoch
/// once the live bundled stream has settled rather than changing calibration.
#[cfg(not(coverage))]
fn upstream_epoch_auto_heal_delay() -> Option<Duration> {
parse_upstream_epoch_auto_heal_delay(
std::env::var("LESAVKA_UPSTREAM_AUTO_HEAL").ok().as_deref(),
std::env::var("LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS")
.ok()
.as_deref(),
)
}
/// Parse the live-uplink auto-heal knobs without reading global environment.
///
/// Inputs are optional raw environment values. Output is a bounded delay, or
/// `None` when the operator opted out. Why: the default needs to be safe enough
/// for real calls while tests can pin the behavior without racing process env.
fn parse_upstream_epoch_auto_heal_delay(
enabled: Option<&str>,
delay_ms: Option<&str>,
) -> Option<Duration> {
if enabled.is_some_and(disables_upstream_epoch_auto_heal) {
return None;
}
let millis = delay_ms
.and_then(|raw| raw.trim().parse::<u64>().ok())
.unwrap_or(DEFAULT_UPSTREAM_AUTO_HEAL_AFTER_MS);
(millis > 0).then(|| Duration::from_millis(millis))
}
/// Return whether a text env value disables startup A/V epoch healing.
fn disables_upstream_epoch_auto_heal(raw: &str) -> bool {
matches!(
raw.trim().to_ascii_lowercase().as_str(),
"0" | "false" | "off" | "no"
)
}
/// Ask the relay to retire the current UAC epoch after the stream is live.
///
/// Inputs: the connected relay channel and a delay. Output is only log
/// telemetry. Why: this mimics the known-good probe side effect that superseded
/// a stale first-join epoch and forced the live client stream to reconnect
/// cleanly without touching USB enumeration or A/V calibration.
#[cfg(not(coverage))]
fn spawn_upstream_epoch_auto_heal(ep: Channel, delay: Duration) {
tokio::spawn(async move {
tokio::time::sleep(delay).await;
let mut client = RelayClient::new(ep);
match client.recover_uac(Request::new(Empty {})).await {
Ok(reply) => {
if reply.into_inner().ok {
info!(
delay_ms = delay.as_millis(),
"📦🩺 automatic upstream A/V epoch heal requested"
);
} else {
warn!("📦🩺 automatic upstream A/V epoch heal was refused by relay");
}
}
Err(err) => warn!(%err, "📦🩺 automatic upstream A/V epoch heal failed"),
}
});
}