From ce15a5e79e5c05465a67771be509909f6a8ce68e Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 13 May 2026 21:40:45 -0300 Subject: [PATCH] media: keep uvc video alive without uac --- Cargo.lock | 6 +- client/Cargo.toml | 2 +- .../src/launcher/state/launcher_state_impl.rs | 7 -- client/src/launcher/tests/mod.rs | 2 +- client/src/launcher/tests/state.rs | 2 + .../ui_components/build_device_controls.rs | 2 +- .../src/launcher/ui_runtime/status_refresh.rs | 2 +- common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- server/src/main/relay_service.rs | 9 ++ .../main/relay_service/upstream_media_rpc.rs | 106 ++++++++++++------ server/src/main/relay_service_tests.rs | 19 +++- .../server_upstream_media_bundle_contract.rs | 8 ++ .../client_codec_transport_ui_contract.rs | 4 +- 14 files changed, 121 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0af3611..7d4b686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.27" +version = "0.22.28" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.27" +version = "0.22.28" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.27" +version = "0.22.28" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index a4f1ad1..78e702f 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.27" +version = "0.22.28" edition = "2024" [dependencies] diff --git a/client/src/launcher/state/launcher_state_impl.rs b/client/src/launcher/state/launcher_state_impl.rs index 14892cb..73bd722 100644 --- a/client/src/launcher/state/launcher_state_impl.rs +++ b/client/src/launcher/state/launcher_state_impl.rs @@ -47,13 +47,6 @@ impl LauncherState { Some(trimmed.to_string()) } }); - if let Some(transport) = self - .server_camera_codec - .as_deref() - .and_then(WebcamTransport::from_server_codec) - { - self.webcam_transport = transport; - } } pub fn set_view_mode(&mut self, view_mode: ViewMode) { diff --git a/client/src/launcher/tests/mod.rs b/client/src/launcher/tests/mod.rs index 4b92b68..e8f9dcd 100644 --- a/client/src/launcher/tests/mod.rs +++ b/client/src/launcher/tests/mod.rs @@ -284,7 +284,7 @@ fn runtime_env_vars_emit_selected_webcam_transport() { ); assert_eq!( runtime_env_vars(&state).get("LESAVKA_CAM_CODEC"), - Some(&"mjpeg".to_string()) + Some(&"hevc".to_string()) ); } diff --git a/client/src/launcher/tests/state.rs b/client/src/launcher/tests/state.rs index 48726a2..f38be75 100644 --- a/client/src/launcher/tests/state.rs +++ b/client/src/launcher/tests/state.rs @@ -491,6 +491,7 @@ fn server_identity_and_media_caps_trim_blank_values() { assert_eq!(state.server_microphone, Some(false)); assert_eq!(state.server_camera_output, None); assert_eq!(state.server_camera_codec.as_deref(), Some("mjpeg")); + state.select_webcam_transport(WebcamTransport::Hevc); state.set_server_media_caps( None, @@ -502,6 +503,7 @@ fn server_identity_and_media_caps_trim_blank_values() { assert_eq!(state.server_microphone, None); assert_eq!(state.server_camera_output.as_deref(), Some("uvc")); assert_eq!(state.server_camera_codec, None); + assert_eq!(state.webcam_transport, WebcamTransport::Hevc); } #[test] diff --git a/client/src/launcher/ui_components/build_device_controls.rs b/client/src/launcher/ui_components/build_device_controls.rs index f14d151..c8e40ed 100644 --- a/client/src/launcher/ui_components/build_device_controls.rs +++ b/client/src/launcher/ui_components/build_device_controls.rs @@ -231,7 +231,7 @@ webcam_transport_combo.set_sensitive(true); webcam_transport_combo.set_size_request(98, -1); webcam_transport_combo.set_tooltip_text(Some( - "Upstream webcam transport for the next relay connection. MJPEG is the safe calibrated default; HEVC is used only when the server advertises it.", + "Upstream webcam transport for the next relay connection. MJPEG is the safe calibrated default; HEVC is selectable for hardware-accelerated testing.", )); let upstream_audio_transport_combo = gtk::ComboBoxText::new(); diff --git a/client/src/launcher/ui_runtime/status_refresh.rs b/client/src/launcher/ui_runtime/status_refresh.rs index a7260e7..38cd301 100644 --- a/client/src/launcher/ui_runtime/status_refresh.rs +++ b/client/src/launcher/ui_runtime/status_refresh.rs @@ -289,7 +289,7 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi .set_tooltip_text(Some(if relay_live { "Changing upstream webcam transport restarts the live camera path; the picture may pause briefly." } else { - "Use the server-advertised upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default." + "Choose upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default." })); if widgets .upstream_audio_transport_combo diff --git a/common/Cargo.toml b/common/Cargo.toml index c2b448e..7e73ac1 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.27" +version = "0.22.28" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 3dbd256..5290e20 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.27" +version = "0.22.28" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index b9d3b13..12c0b75 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -1,5 +1,6 @@ const MEDIA_V2_DEFAULT_PLAYOUT_DELAY_MS: u64 = 20; const MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS: u64 = 1_000; +const MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS: u64 = 750; const MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US: u64 = 250_000; #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -107,6 +108,14 @@ fn media_v2_max_live_age() -> Duration { .unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS)) } +fn media_v2_uac_start_timeout() -> Duration { + std::env::var("LESAVKA_UPSTREAM_V2_UAC_START_TIMEOUT_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .map(Duration::from_millis) + .unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS)) +} + /// Keeps `media_v2_handoff_schedule` explicit because it sits on relay RPC orchestration, where hardware failures must surface without stopping the server. /// Inputs are the typed parameters; output is the return value or side effect. fn media_v2_handoff_schedule( diff --git a/server/src/main/relay_service/upstream_media_rpc.rs b/server/src/main/relay_service/upstream_media_rpc.rs index fdbc11e..b39b3c7 100644 --- a/server/src/main/relay_service/upstream_media_rpc.rs +++ b/server/src/main/relay_service/upstream_media_rpc.rs @@ -30,45 +30,74 @@ impl Handler { return Err(err); } }; - let Some(microphone_sink_permit) = self - .upstream_media_rt - .reserve_microphone_sink(microphone_lease.generation) - .await - else { - self.upstream_media_rt.close_camera(camera_lease.generation); - self.upstream_media_rt.close_microphone(microphone_lease.generation); - return Err(Status::aborted( - "v2 bundled media stream superseded before microphone sink became available", - )); - }; let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into()); - let sink = runtime_support::open_voice_with_retry(&uac_dev) + let microphone_sink = + match tokio::time::timeout( + media_v2_uac_start_timeout(), + self.upstream_media_rt + .reserve_microphone_sink(microphone_lease.generation), + ) .await - .map_err(|e| { - self.upstream_media_rt.close_camera(camera_lease.generation); - self.upstream_media_rt.close_microphone(microphone_lease.generation); - Status::internal(format!("{e:#}")) - })?; + { + Ok(Some(permit)) => match runtime_support::open_voice_with_retry(&uac_dev).await { + Ok(sink) => Some((permit, sink)), + Err(err) => { + warn!( + rpc_id, + session_id = camera_lease.session_id, + "📦⚠️ continuing bundled upstream video without UAC audio; microphone sink failed: {err:#}" + ); + self.upstream_media_rt + .close_microphone(microphone_lease.generation); + None + } + }, + Ok(None) => { + warn!( + rpc_id, + session_id = camera_lease.session_id, + "📦⚠️ continuing bundled upstream video without UAC audio; microphone generation was superseded before sink startup" + ); + None + } + Err(_) => { + warn!( + rpc_id, + session_id = camera_lease.session_id, + timeout_ms = media_v2_uac_start_timeout().as_millis(), + "📦⚠️ continuing bundled upstream video without UAC audio; microphone sink startup timed out" + ); + self.upstream_media_rt + .close_microphone(microphone_lease.generation); + None + } + }; + let audio_enabled = microphone_sink.is_some(); let camera_rt = self.camera_rt.clone(); let upstream_media_rt = self.upstream_media_rt.clone(); let (tx, rx) = tokio::sync::mpsc::channel(1); tokio::spawn(async move { - let _microphone_sink_permit = microphone_sink_permit; let mut inbound = req.into_inner(); let mut last_bundle_session_id = None; let mut last_bundle_seq = None; let mut waiting_for_hevc_keyframe = false; let mut outcome = "aborted"; - let (audio_handoff_tx, audio_handoff_rx) = - tokio::sync::mpsc::channel::(32); + let (mut audio_handoff_tx, audio_worker) = + if let Some((microphone_sink_permit, sink)) = microphone_sink { + let (audio_handoff_tx, audio_handoff_rx) = + tokio::sync::mpsc::channel::(32); + let audio_rt = upstream_media_rt.clone(); + let worker = tokio::spawn(async move { + let _microphone_sink_permit = microphone_sink_permit; + run_media_v2_audio_handoff(audio_handoff_rx, sink, audio_rt).await; + }); + (Some(audio_handoff_tx), Some(worker)) + } else { + (None, None) + }; let (video_handoff_tx, video_handoff_rx) = tokio::sync::mpsc::channel::(32); - let audio_worker = tokio::spawn(run_media_v2_audio_handoff( - audio_handoff_rx, - sink, - upstream_media_rt.clone(), - )); let video_worker = tokio::spawn(run_media_v2_video_handoff( video_handoff_rx, relay.clone(), @@ -93,11 +122,16 @@ impl Handler { let bundle_arrived_at = tokio::time::Instant::now(); if !camera_rt.is_active(camera_session_id) || !upstream_media_rt.is_camera_active(camera_lease.generation) - || !upstream_media_rt.is_microphone_active(microphone_lease.generation) + || (audio_enabled + && !upstream_media_rt.is_microphone_active(microphone_lease.generation)) { outcome = "superseded"; break; } + if !audio_enabled { + // UVC video must stay live even when UAC is wedged or being recovered. + bundle.audio.clear(); + } if last_bundle_session_id.is_some_and(|session_id| session_id != bundle.session_id) { warn!( rpc_id, @@ -187,7 +221,8 @@ impl Handler { let bundle_base_remote_pts_us = facts.capture_start_us; let frame_step_us = media_v2_frame_step_us(camera_cfg.fps); - if schedule.audio_due_at.is_some() + if audio_handoff_tx.is_some() + && schedule.audio_due_at.is_some() && let Some(scheduled_audio) = prepare_media_v2_audio( &mut bundle.audio, &upstream_media_rt, @@ -195,10 +230,13 @@ impl Handler { bundle_epoch, ) && audio_handoff_tx + .as_ref() + .expect("checked audio handoff sender") .send(scheduled_audio) .await .is_err() { + audio_handoff_tx = None; warn!( rpc_id, session_id = camera_lease.session_id, @@ -254,12 +292,14 @@ impl Handler { outcome = if outcome == "aborted" { "closed" } else { outcome }; drop(audio_handoff_tx); drop(video_handoff_tx); - if let Err(err) = audio_worker.await { - warn!( - rpc_id, - session_id = camera_lease.session_id, - "📦 v2 audio handoff worker join failed: {err}" - ); + if let Some(audio_worker) = audio_worker { + if let Err(err) = audio_worker.await { + warn!( + rpc_id, + session_id = camera_lease.session_id, + "📦 v2 audio handoff worker join failed: {err}" + ); + } } if let Err(err) = video_worker.await { warn!( diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index 8f717ca..4caf0fe 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -6,7 +6,7 @@ mod tests { media_v2_handoff_schedule, media_v2_has_hevc_recovery_keyframe, media_v2_should_hold_hevc_video_for_recovery, prepare_media_v2_audio, prepare_media_v2_video, retain_freshest_audio_packet, retain_freshest_video_packet, - summarize_media_v2_bundle, + summarize_media_v2_bundle, media_v2_uac_start_timeout, }; use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket}; use lesavka_server::camera::CameraCodec; @@ -179,6 +179,23 @@ mod tests { assert!(media_v2_handoff_schedule(facts, 0, 0).is_none()); } + #[test] + fn media_v2_uac_start_timeout_is_short_and_operator_configurable() { + temp_env::with_var_unset("LESAVKA_UPSTREAM_V2_UAC_START_TIMEOUT_MS", || { + assert_eq!( + media_v2_uac_start_timeout(), + std::time::Duration::from_millis(750) + ); + }); + + temp_env::with_var("LESAVKA_UPSTREAM_V2_UAC_START_TIMEOUT_MS", Some("125"), || { + assert_eq!( + media_v2_uac_start_timeout(), + std::time::Duration::from_millis(125) + ); + }); + } + #[test] /// Keeps server HEVC drop recovery explicit because late-drop freshness can otherwise corrupt decoded video. fn media_v2_hevc_recovery_holds_delta_until_keyframe() { diff --git a/tests/api/server/upstream_media_runtime/server_upstream_media_bundle_contract.rs b/tests/api/server/upstream_media_runtime/server_upstream_media_bundle_contract.rs index e6bf887..bd8f8ad 100644 --- a/tests/api/server/upstream_media_runtime/server_upstream_media_bundle_contract.rs +++ b/tests/api/server/upstream_media_runtime/server_upstream_media_bundle_contract.rs @@ -245,4 +245,12 @@ mod server_upstream_media_bundle_normal_mode { assert!(UPSTREAM_RUNTIME.contains("pub struct UpstreamBundledLeases")); assert!(UPSTREAM_RUNTIME_LIFECYCLE.contains("pub fn activate_bundled_session")); } + + #[test] + fn bundled_video_handoff_is_not_blocked_indefinitely_by_uac_startup() { + assert!(RELAY_RPC.contains("media_v2_uac_start_timeout()")); + assert!(RELAY_RPC.contains("continuing bundled upstream video without UAC audio")); + assert!(RELAY_RPC.contains("bundle.audio.clear();")); + assert!(RELAY_RPC.contains("tokio::spawn(run_media_v2_video_handoff")); + } } diff --git a/tests/ui/client/launcher/client_codec_transport_ui_contract.rs b/tests/ui/client/launcher/client_codec_transport_ui_contract.rs index 1c10881..f1d1196 100644 --- a/tests/ui/client/launcher/client_codec_transport_ui_contract.rs +++ b/tests/ui/client/launcher/client_codec_transport_ui_contract.rs @@ -56,7 +56,7 @@ fn webcam_transport_selector_exposes_real_hevc_and_mjpeg_choices() { "webcam_transport_combo.append(Some(transport.as_id()), transport.label());", "webcam_transport_combo.set_active_id(Some(state.effective_webcam_transport().as_id()));", "webcam_transport_combo.set_sensitive(true);", - "MJPEG is the safe calibrated default; HEVC is used only when the server advertises it", + "MJPEG is the safe calibrated default; HEVC is selectable for hardware-accelerated testing", ] { assert!( BUILD_DEVICE_CONTROLS_SRC.contains(marker), @@ -87,7 +87,7 @@ fn webcam_transport_changes_are_staged_when_relay_is_live() { "widgets.webcam_transport_syncing.set(false);", ".webcam_transport_combo\n .set_sensitive(state.channels.camera);", "Changing upstream webcam transport restarts the live camera path; the picture may pause briefly.", - "Use the server-advertised upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default.", + "Choose upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default.", ] { assert!( STATUS_REFRESH_SRC.contains(marker),