media: keep uvc video alive without uac

This commit is contained in:
Brad Stein 2026-05-13 21:40:45 -03:00
parent 692c3a6545
commit ce15a5e79e
14 changed files with 121 additions and 52 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.22.27"
version = "0.22.28"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.22.27"
version = "0.22.28"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.22.27"
version = "0.22.28"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.22.27"
version = "0.22.28"
edition = "2024"
[dependencies]

View File

@ -47,13 +47,6 @@ impl LauncherState {
Some(trimmed.to_string())
}
});
if let Some(transport) = self
.server_camera_codec
.as_deref()
.and_then(WebcamTransport::from_server_codec)
{
self.webcam_transport = transport;
}
}
pub fn set_view_mode(&mut self, view_mode: ViewMode) {

View File

@ -284,7 +284,7 @@ fn runtime_env_vars_emit_selected_webcam_transport() {
);
assert_eq!(
runtime_env_vars(&state).get("LESAVKA_CAM_CODEC"),
Some(&"mjpeg".to_string())
Some(&"hevc".to_string())
);
}

View File

@ -491,6 +491,7 @@ fn server_identity_and_media_caps_trim_blank_values() {
assert_eq!(state.server_microphone, Some(false));
assert_eq!(state.server_camera_output, None);
assert_eq!(state.server_camera_codec.as_deref(), Some("mjpeg"));
state.select_webcam_transport(WebcamTransport::Hevc);
state.set_server_media_caps(
None,
@ -502,6 +503,7 @@ fn server_identity_and_media_caps_trim_blank_values() {
assert_eq!(state.server_microphone, None);
assert_eq!(state.server_camera_output.as_deref(), Some("uvc"));
assert_eq!(state.server_camera_codec, None);
assert_eq!(state.webcam_transport, WebcamTransport::Hevc);
}
#[test]

View File

@ -231,7 +231,7 @@
webcam_transport_combo.set_sensitive(true);
webcam_transport_combo.set_size_request(98, -1);
webcam_transport_combo.set_tooltip_text(Some(
"Upstream webcam transport for the next relay connection. MJPEG is the safe calibrated default; HEVC is used only when the server advertises it.",
"Upstream webcam transport for the next relay connection. MJPEG is the safe calibrated default; HEVC is selectable for hardware-accelerated testing.",
));
let upstream_audio_transport_combo = gtk::ComboBoxText::new();

View File

@ -289,7 +289,7 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi
.set_tooltip_text(Some(if relay_live {
"Changing upstream webcam transport restarts the live camera path; the picture may pause briefly."
} else {
"Use the server-advertised upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default."
"Choose upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default."
}));
if widgets
.upstream_audio_transport_combo

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.22.27"
version = "0.22.28"
edition = "2024"
build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.22.27"
version = "0.22.28"
edition = "2024"
autobins = false

View File

@ -1,5 +1,6 @@
const MEDIA_V2_DEFAULT_PLAYOUT_DELAY_MS: u64 = 20;
const MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS: u64 = 1_000;
const MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS: u64 = 750;
const MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US: u64 = 250_000;
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
@ -107,6 +108,14 @@ fn media_v2_max_live_age() -> Duration {
.unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS))
}
fn media_v2_uac_start_timeout() -> Duration {
std::env::var("LESAVKA_UPSTREAM_V2_UAC_START_TIMEOUT_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.map(Duration::from_millis)
.unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS))
}
/// Keeps `media_v2_handoff_schedule` explicit because it sits on relay RPC orchestration, where hardware failures must surface without stopping the server.
/// Inputs are the typed parameters; output is the return value or side effect.
fn media_v2_handoff_schedule(

View File

@ -30,45 +30,74 @@ impl Handler {
return Err(err);
}
};
let Some(microphone_sink_permit) = self
.upstream_media_rt
.reserve_microphone_sink(microphone_lease.generation)
.await
else {
self.upstream_media_rt.close_camera(camera_lease.generation);
self.upstream_media_rt.close_microphone(microphone_lease.generation);
return Err(Status::aborted(
"v2 bundled media stream superseded before microphone sink became available",
));
};
let uac_dev = std::env::var("LESAVKA_UAC_DEV").unwrap_or_else(|_| "hw:UAC2Gadget,0".into());
let sink = runtime_support::open_voice_with_retry(&uac_dev)
let microphone_sink =
match tokio::time::timeout(
media_v2_uac_start_timeout(),
self.upstream_media_rt
.reserve_microphone_sink(microphone_lease.generation),
)
.await
.map_err(|e| {
self.upstream_media_rt.close_camera(camera_lease.generation);
self.upstream_media_rt.close_microphone(microphone_lease.generation);
Status::internal(format!("{e:#}"))
})?;
{
Ok(Some(permit)) => match runtime_support::open_voice_with_retry(&uac_dev).await {
Ok(sink) => Some((permit, sink)),
Err(err) => {
warn!(
rpc_id,
session_id = camera_lease.session_id,
"📦⚠️ continuing bundled upstream video without UAC audio; microphone sink failed: {err:#}"
);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
None
}
},
Ok(None) => {
warn!(
rpc_id,
session_id = camera_lease.session_id,
"📦⚠️ continuing bundled upstream video without UAC audio; microphone generation was superseded before sink startup"
);
None
}
Err(_) => {
warn!(
rpc_id,
session_id = camera_lease.session_id,
timeout_ms = media_v2_uac_start_timeout().as_millis(),
"📦⚠️ continuing bundled upstream video without UAC audio; microphone sink startup timed out"
);
self.upstream_media_rt
.close_microphone(microphone_lease.generation);
None
}
};
let audio_enabled = microphone_sink.is_some();
let camera_rt = self.camera_rt.clone();
let upstream_media_rt = self.upstream_media_rt.clone();
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
let _microphone_sink_permit = microphone_sink_permit;
let mut inbound = req.into_inner();
let mut last_bundle_session_id = None;
let mut last_bundle_seq = None;
let mut waiting_for_hevc_keyframe = false;
let mut outcome = "aborted";
let (audio_handoff_tx, audio_handoff_rx) =
tokio::sync::mpsc::channel::<MediaV2ScheduledAudio>(32);
let (mut audio_handoff_tx, audio_worker) =
if let Some((microphone_sink_permit, sink)) = microphone_sink {
let (audio_handoff_tx, audio_handoff_rx) =
tokio::sync::mpsc::channel::<MediaV2ScheduledAudio>(32);
let audio_rt = upstream_media_rt.clone();
let worker = tokio::spawn(async move {
let _microphone_sink_permit = microphone_sink_permit;
run_media_v2_audio_handoff(audio_handoff_rx, sink, audio_rt).await;
});
(Some(audio_handoff_tx), Some(worker))
} else {
(None, None)
};
let (video_handoff_tx, video_handoff_rx) =
tokio::sync::mpsc::channel::<MediaV2ScheduledVideo>(32);
let audio_worker = tokio::spawn(run_media_v2_audio_handoff(
audio_handoff_rx,
sink,
upstream_media_rt.clone(),
));
let video_worker = tokio::spawn(run_media_v2_video_handoff(
video_handoff_rx,
relay.clone(),
@ -93,11 +122,16 @@ impl Handler {
let bundle_arrived_at = tokio::time::Instant::now();
if !camera_rt.is_active(camera_session_id)
|| !upstream_media_rt.is_camera_active(camera_lease.generation)
|| !upstream_media_rt.is_microphone_active(microphone_lease.generation)
|| (audio_enabled
&& !upstream_media_rt.is_microphone_active(microphone_lease.generation))
{
outcome = "superseded";
break;
}
if !audio_enabled {
// UVC video must stay live even when UAC is wedged or being recovered.
bundle.audio.clear();
}
if last_bundle_session_id.is_some_and(|session_id| session_id != bundle.session_id) {
warn!(
rpc_id,
@ -187,7 +221,8 @@ impl Handler {
let bundle_base_remote_pts_us = facts.capture_start_us;
let frame_step_us = media_v2_frame_step_us(camera_cfg.fps);
if schedule.audio_due_at.is_some()
if audio_handoff_tx.is_some()
&& schedule.audio_due_at.is_some()
&& let Some(scheduled_audio) = prepare_media_v2_audio(
&mut bundle.audio,
&upstream_media_rt,
@ -195,10 +230,13 @@ impl Handler {
bundle_epoch,
)
&& audio_handoff_tx
.as_ref()
.expect("checked audio handoff sender")
.send(scheduled_audio)
.await
.is_err()
{
audio_handoff_tx = None;
warn!(
rpc_id,
session_id = camera_lease.session_id,
@ -254,12 +292,14 @@ impl Handler {
outcome = if outcome == "aborted" { "closed" } else { outcome };
drop(audio_handoff_tx);
drop(video_handoff_tx);
if let Err(err) = audio_worker.await {
warn!(
rpc_id,
session_id = camera_lease.session_id,
"📦 v2 audio handoff worker join failed: {err}"
);
if let Some(audio_worker) = audio_worker {
if let Err(err) = audio_worker.await {
warn!(
rpc_id,
session_id = camera_lease.session_id,
"📦 v2 audio handoff worker join failed: {err}"
);
}
}
if let Err(err) = video_worker.await {
warn!(

View File

@ -6,7 +6,7 @@ mod tests {
media_v2_handoff_schedule, media_v2_has_hevc_recovery_keyframe,
media_v2_should_hold_hevc_video_for_recovery, prepare_media_v2_audio,
prepare_media_v2_video, retain_freshest_audio_packet, retain_freshest_video_packet,
summarize_media_v2_bundle,
summarize_media_v2_bundle, media_v2_uac_start_timeout,
};
use lesavka_common::lesavka::{AudioPacket, UpstreamMediaBundle, VideoPacket};
use lesavka_server::camera::CameraCodec;
@ -179,6 +179,23 @@ mod tests {
assert!(media_v2_handoff_schedule(facts, 0, 0).is_none());
}
#[test]
fn media_v2_uac_start_timeout_is_short_and_operator_configurable() {
temp_env::with_var_unset("LESAVKA_UPSTREAM_V2_UAC_START_TIMEOUT_MS", || {
assert_eq!(
media_v2_uac_start_timeout(),
std::time::Duration::from_millis(750)
);
});
temp_env::with_var("LESAVKA_UPSTREAM_V2_UAC_START_TIMEOUT_MS", Some("125"), || {
assert_eq!(
media_v2_uac_start_timeout(),
std::time::Duration::from_millis(125)
);
});
}
#[test]
/// Keeps server HEVC drop recovery explicit because late-drop freshness can otherwise corrupt decoded video.
fn media_v2_hevc_recovery_holds_delta_until_keyframe() {

View File

@ -245,4 +245,12 @@ mod server_upstream_media_bundle_normal_mode {
assert!(UPSTREAM_RUNTIME.contains("pub struct UpstreamBundledLeases"));
assert!(UPSTREAM_RUNTIME_LIFECYCLE.contains("pub fn activate_bundled_session"));
}
#[test]
fn bundled_video_handoff_is_not_blocked_indefinitely_by_uac_startup() {
assert!(RELAY_RPC.contains("media_v2_uac_start_timeout()"));
assert!(RELAY_RPC.contains("continuing bundled upstream video without UAC audio"));
assert!(RELAY_RPC.contains("bundle.audio.clear();"));
assert!(RELAY_RPC.contains("tokio::spawn(run_media_v2_video_handoff"));
}
}

View File

@ -56,7 +56,7 @@ fn webcam_transport_selector_exposes_real_hevc_and_mjpeg_choices() {
"webcam_transport_combo.append(Some(transport.as_id()), transport.label());",
"webcam_transport_combo.set_active_id(Some(state.effective_webcam_transport().as_id()));",
"webcam_transport_combo.set_sensitive(true);",
"MJPEG is the safe calibrated default; HEVC is used only when the server advertises it",
"MJPEG is the safe calibrated default; HEVC is selectable for hardware-accelerated testing",
] {
assert!(
BUILD_DEVICE_CONTROLS_SRC.contains(marker),
@ -87,7 +87,7 @@ fn webcam_transport_changes_are_staged_when_relay_is_live() {
"widgets.webcam_transport_syncing.set(false);",
".webcam_transport_combo\n .set_sensitive(state.channels.camera);",
"Changing upstream webcam transport restarts the live camera path; the picture may pause briefly.",
"Use the server-advertised upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default.",
"Choose upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default.",
] {
assert!(
STATUS_REFRESH_SRC.contains(marker),