impl LesavkaClientApp { /*──────────────── bundled webcam + mic stream ─────────────────*/ #[cfg(not(coverage))] #[allow(clippy::too_many_arguments)] /// Keeps `webcam_media_loop` explicit because it sits on the live uplink path, where stale media must be dropped instead of queued into latency. /// Inputs are the typed parameters; output is the return value or side effect. async fn webcam_media_loop( ep: Channel, initial_camera_source: Option, initial_camera_profile: Option, initial_microphone_source: Option, camera_cfg: Option, camera_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, microphone_telemetry: crate::uplink_telemetry::UplinkTelemetryHandle, media_controls: crate::live_media_control::LiveMediaControls, ) { let mut delay = Duration::from_secs(1); let mut startup_epoch_heal_delay = upstream_epoch_auto_heal_delay(); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { let state = media_controls.refresh(); let camera_requested = state.camera; if !camera_requested { camera_telemetry.record_enabled(false); tokio::time::sleep(Duration::from_millis(100)).await; continue; } let active_camera_source = state.camera_source.resolve(initial_camera_source.as_deref()); let active_camera_profile = state.camera_profile.resolve(initial_camera_profile.as_deref()); let active_camera_cfg = camera_config_with_live_codec(camera_cfg, &state.camera_codec); let active_camera_codec = active_camera_cfg.map(|cfg| cfg.codec); let recover_hevc_after_drops = upstream_camera_uses_hevc(active_camera_cfg); let active_microphone_source = state .microphone_source .resolve(initial_microphone_source.as_deref()); let active_audio_codec = state .audio_codec .resolve(lesavka_common::audio_transport::UpstreamAudioCodec::Opus); let active_noise_suppression = state.noise_suppression.resolve(false); let capture_profile = active_camera_profile .as_deref() .and_then(parse_camera_profile_id); let use_default_microphone = matches!( state.microphone_source, crate::live_media_control::MediaDeviceChoice::Auto ) && active_microphone_source.is_none(); let setup_camera_source = active_camera_source.clone(); let setup_microphone_source = active_microphone_source.clone(); let setup = tokio::task::spawn_blocking(move || { let microphone = if use_default_microphone { MicrophoneCapture::new_default_source_options( active_audio_codec, active_noise_suppression, ) } else { MicrophoneCapture::new_with_source_options( setup_microphone_source.as_deref(), active_audio_codec, active_noise_suppression, ) }?; let camera = if camera_requested { Some(CameraCapture::new_with_capture_profile( setup_camera_source.as_deref(), active_camera_cfg, capture_profile, )?) } else { None }; Ok::<_, anyhow::Error>((camera.map(Arc::new), Arc::new(microphone))) }) .await; let (camera, microphone) = match setup { Ok(Ok(captures)) => captures, Ok(Err(err)) => { camera_telemetry.record_disconnect(format!( "bundled webcam media setup failed: {err:#}" )); microphone_telemetry.record_disconnect(format!( "bundled webcam media setup failed: {err:#}" )); warn!( "📦 bundled webcam media setup failed for camera={:?} mic={:?}: {err:#}", active_camera_source.as_deref().unwrap_or("auto"), active_microphone_source.as_deref().unwrap_or("auto") ); if camera_requested { abort_if_required_media_source_failed( "camera", "📸", active_camera_source.as_deref(), &err, ); } abort_if_required_media_source_failed( "microphone", "🎤", active_microphone_source.as_deref(), &err, ); delay = app_support::next_delay(delay); tokio::time::sleep(delay).await; continue; } Err(err) => { camera_telemetry.record_disconnect(format!( "bundled webcam media setup task failed: {err}" )); microphone_telemetry.record_disconnect(format!( "bundled webcam media setup task failed: {err}" )); warn!("📦 bundled webcam media setup task failed: {err}"); delay = app_support::next_delay(delay); tokio::time::sleep(delay).await; continue; } }; camera_telemetry.record_reconnect_attempt(); microphone_telemetry.record_reconnect_attempt(); let mut cli = RelayClient::new(ep.clone()); let queue: crate::uplink_fresh_queue::FreshPacketQueue = crate::uplink_fresh_queue::FreshPacketQueue::new(BUNDLED_MEDIA_UPLINK_QUEUE); let drop_log = Arc::new(std::sync::Mutex::new(UplinkDropLogLimiter::new( "bundled-webcam-media", "📦", ))); let queue_stream = queue.clone(); let camera_telemetry_stream = camera_telemetry.clone(); let microphone_telemetry_stream = microphone_telemetry.clone(); let drop_log_stream = Arc::clone(&drop_log); let outbound = async_stream::stream! { let mut waiting_for_hevc_keyframe = false; loop { let next = queue_stream.pop_fresh().await; if next.dropped_stale > 0 { camera_telemetry_stream.record_stale_drop(next.dropped_stale); microphone_telemetry_stream.record_stale_drop(next.dropped_stale); log_uplink_drop( &drop_log_stream, UplinkDropReason::Stale, next.dropped_stale, next.queue_depth, duration_ms(next.delivery_age), ); if recover_hevc_after_drops { waiting_for_hevc_keyframe = true; } } if let Some(mut bundle) = next.packet { if recover_hevc_after_drops && should_hold_hevc_bundle_for_keyframe_recovery( waiting_for_hevc_keyframe, &bundle, ) { camera_telemetry_stream.record_stale_drop(1); microphone_telemetry_stream.record_stale_drop(1); log_uplink_drop( &drop_log_stream, UplinkDropReason::Stale, 1, next.queue_depth, duration_ms(next.delivery_age), ); continue; } if recover_hevc_after_drops && bundle_has_hevc_recovery_keyframe(&bundle) { waiting_for_hevc_keyframe = false; } let queue_depth = queue_depth_u32(next.queue_depth); let delivery_age_ms = duration_ms(next.delivery_age); if bundle.video.is_some() { camera_telemetry_stream.record_streamed( queue_depth, delivery_age_ms, ); } if !bundle.audio.is_empty() { microphone_telemetry_stream.record_streamed( queue_depth, delivery_age_ms, ); } attach_bundle_queue_metadata(&mut bundle, next.queue_depth, next.delivery_age); yield bundle; continue; } break; } }; match cli.stream_webcam_media(Request::new(outbound)).await { Ok(mut resp) => { let stop = Arc::new(AtomicBool::new(false)); let (event_tx, event_rx) = std::sync::mpsc::sync_channel::( BUNDLED_CAPTURE_EVENT_CHANNEL_CAPACITY, ); let camera_worker = camera.as_ref().map(|camera| { let camera = Arc::clone(camera); let stop = Arc::clone(&stop); let event_tx = event_tx.clone(); let media_controls = media_controls.clone(); let initial_camera_source = initial_camera_source.clone(); let initial_camera_profile = initial_camera_profile.clone(); let active_camera_source = active_camera_source.clone(); let active_camera_profile = active_camera_profile.clone(); std::thread::spawn(move || { let mut waiting_for_hevc_keyframe = false; while !stop.load(Ordering::Relaxed) { let state = media_controls.refresh(); let desired_source = state.camera_source.resolve(initial_camera_source.as_deref()); let desired_profile = state.camera_profile.resolve(initial_camera_profile.as_deref()); let desired_camera_cfg = camera_config_with_live_codec(camera_cfg, &state.camera_codec); if !state.camera || desired_source != active_camera_source || desired_profile != active_camera_profile || desired_camera_cfg.map(|cfg| cfg.codec) != active_camera_codec { stop.store(true, Ordering::Relaxed); let _ = event_tx.try_send(BundledCaptureEvent::Restart); break; } if let Some(mut pkt) = camera.pull() { if recover_hevc_after_drops && waiting_for_hevc_keyframe && !contains_hevc_irap(&pkt.data) { continue; } let _ = stamp_video_timing_metadata_at_enqueue(&mut pkt); let is_recovery_keyframe = recover_hevc_after_drops && contains_hevc_irap(&pkt.data); match event_tx.try_send(BundledCaptureEvent::Video(pkt)) { Ok(()) => { if is_recovery_keyframe { waiting_for_hevc_keyframe = false; } } Err(std::sync::mpsc::TrySendError::Full(_)) => { if recover_hevc_after_drops { waiting_for_hevc_keyframe = true; } continue; } Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break, } } else { note_hevc_capture_gap( recover_hevc_after_drops, &mut waiting_for_hevc_keyframe, ); } } }) }); let microphone_worker = { let microphone = Arc::clone(µphone); let stop = Arc::clone(&stop); let event_tx = event_tx.clone(); let media_controls = media_controls.clone(); let initial_microphone_source = initial_microphone_source.clone(); let active_microphone_source = active_microphone_source.clone(); let active_camera_requested = camera_requested; std::thread::spawn(move || { while !stop.load(Ordering::Relaxed) { let state = media_controls.refresh(); let desired_source = state .microphone_source .resolve(initial_microphone_source.as_deref()); let desired_audio_codec = state .audio_codec .resolve(lesavka_common::audio_transport::UpstreamAudioCodec::Opus); let desired_noise_suppression = state.noise_suppression.resolve(false); if state.camera != active_camera_requested || !(state.microphone || state.camera) || desired_source != active_microphone_source || desired_audio_codec != active_audio_codec || desired_noise_suppression != active_noise_suppression { stop.store(true, Ordering::Relaxed); let _ = event_tx.try_send(BundledCaptureEvent::Restart); break; } if let Some(mut pkt) = microphone.pull() { let _ = stamp_audio_timing_metadata_at_enqueue(&mut pkt); match event_tx.try_send(BundledCaptureEvent::Audio(pkt)) { Ok(()) => {} Err(std::sync::mpsc::TrySendError::Full(_)) => continue, Err(std::sync::mpsc::TrySendError::Disconnected(_)) => break, } } } }) }; drop(event_tx); let bundle_worker = { let stop = Arc::clone(&stop); let queue = queue.clone(); let camera_telemetry = camera_telemetry.clone(); let microphone_telemetry = microphone_telemetry.clone(); let drop_log = Arc::clone(&drop_log); std::thread::spawn(move || { bundle_captured_media( event_rx, stop, queue, camera.is_some(), camera_telemetry, microphone_telemetry, drop_log, ); }) }; delay = Duration::from_secs(1); camera_telemetry.record_connected(); microphone_telemetry.record_connected(); if let Some(heal_delay) = startup_epoch_heal_delay.take() { spawn_upstream_epoch_auto_heal(ep.clone(), heal_delay); } while resp.get_mut().message().await.transpose().is_some() {} camera_telemetry.record_disconnect("bundled webcam media stream ended"); microphone_telemetry.record_disconnect("bundled webcam media stream ended"); stop.store(true, Ordering::Relaxed); queue.close(); if let Some(worker) = camera_worker { let _ = worker.join(); } let _ = microphone_worker.join(); let _ = bundle_worker.join(); } Err(e) if e.code() == tonic::Code::Unimplemented => { camera_telemetry.record_disconnect("bundled webcam media unavailable on server"); microphone_telemetry .record_disconnect("bundled webcam media unavailable on server"); warn!("📦 server does not support bundled webcam media – retrying"); delay = app_support::next_delay(delay); } Err(e) => { camera_telemetry .record_disconnect(format!("bundled webcam media connect failed: {e}")); microphone_telemetry .record_disconnect(format!("bundled webcam media connect failed: {e}")); if FAIL_CNT.fetch_add(1, Ordering::Relaxed) == 0 { warn!("❌📦 bundled webcam media connect failed: {e}"); } else { debug!("❌📦 bundled webcam media reconnect failed: {e}"); } delay = app_support::next_delay(delay); } } queue.close(); tokio::time::sleep(delay).await; } } } fn camera_config_with_live_codec( cfg: Option, choice: &crate::live_media_control::MediaCameraCodecChoice, ) -> Option { let mut cfg = cfg?; let fallback = camera_codec_id(cfg.codec); if let Some(codec) = choice .resolve(Some(fallback)) .as_deref() .and_then(parse_live_camera_codec) { cfg.codec = codec; } Some(cfg) } fn camera_codec_id(codec: crate::input::camera::CameraCodec) -> &'static str { match codec { crate::input::camera::CameraCodec::Mjpeg => "mjpeg", crate::input::camera::CameraCodec::Hevc => "hevc", crate::input::camera::CameraCodec::H264 => "h264", } } fn parse_live_camera_codec(raw: &str) -> Option { match raw.trim().to_ascii_lowercase().as_str() { "mjpeg" | "mjpg" | "jpeg" => Some(crate::input::camera::CameraCodec::Mjpeg), "hevc" | "h265" | "h.265" => Some(crate::input::camera::CameraCodec::Hevc), "h264" => Some(crate::input::camera::CameraCodec::H264), _ => None, } } const DEFAULT_UPSTREAM_AUTO_HEAL_AFTER_MS: u64 = 3_000; /// Resolve whether the live bundled uplink should force one startup epoch heal. /// /// Inputs: `LESAVKA_UPSTREAM_AUTO_HEAL` and /// `LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS`; output is the delay before the heal /// request, or `None` when disabled. Why: browser/WebRTC consumers can latch a /// stale UAC epoch on first join, and the safe fix is to retire the mic epoch /// once the live bundled stream has settled rather than changing calibration. #[cfg(not(coverage))] fn upstream_epoch_auto_heal_delay() -> Option { parse_upstream_epoch_auto_heal_delay( std::env::var("LESAVKA_UPSTREAM_AUTO_HEAL").ok().as_deref(), std::env::var("LESAVKA_UPSTREAM_AUTO_HEAL_AFTER_MS") .ok() .as_deref(), ) } /// Parse the live-uplink auto-heal knobs without reading global environment. /// /// Inputs are optional raw environment values. Output is a bounded delay, or /// `None` when the operator opted out. Why: the default needs to be safe enough /// for real calls while tests can pin the behavior without racing process env. fn parse_upstream_epoch_auto_heal_delay( enabled: Option<&str>, delay_ms: Option<&str>, ) -> Option { if enabled.is_some_and(disables_upstream_epoch_auto_heal) { return None; } let millis = delay_ms .and_then(|raw| raw.trim().parse::().ok()) .unwrap_or(DEFAULT_UPSTREAM_AUTO_HEAL_AFTER_MS); (millis > 0).then(|| Duration::from_millis(millis)) } /// Return whether a text env value disables startup A/V epoch healing. fn disables_upstream_epoch_auto_heal(raw: &str) -> bool { matches!( raw.trim().to_ascii_lowercase().as_str(), "0" | "false" | "off" | "no" ) } /// Ask the relay to retire the current UAC epoch after the stream is live. /// /// Inputs: the connected relay channel and a delay. Output is only log /// telemetry. Why: this mimics the known-good probe side effect that superseded /// a stale first-join epoch and forced the live client stream to reconnect /// cleanly without touching USB enumeration or A/V calibration. #[cfg(not(coverage))] fn spawn_upstream_epoch_auto_heal(ep: Channel, delay: Duration) { tokio::spawn(async move { tokio::time::sleep(delay).await; let mut client = RelayClient::new(ep); match client.recover_uac(Request::new(Empty {})).await { Ok(reply) => { if reply.into_inner().ok { info!( delay_ms = delay.as_millis(), "📦🩺 automatic upstream A/V epoch heal requested" ); } else { warn!("📦🩺 automatic upstream A/V epoch heal was refused by relay"); } } Err(err) => warn!(%err, "📦🩺 automatic upstream A/V epoch heal failed"), } }); }