diff --git a/Cargo.lock b/Cargo.lock index 08c75e6..5fb23ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.21" +version = "0.22.22" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.21" +version = "0.22.22" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.21" +version = "0.22.22" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index b145812..a7df3d9 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.21" +version = "0.22.22" edition = "2024" [dependencies] diff --git a/client/src/app/uplink_media/webcam_media_loop.rs b/client/src/app/uplink_media/webcam_media_loop.rs index 146b073..d4dcbcd 100644 --- a/client/src/app/uplink_media/webcam_media_loop.rs +++ b/client/src/app/uplink_media/webcam_media_loop.rs @@ -16,7 +16,6 @@ impl LesavkaClientApp { ) { let mut delay = Duration::from_secs(1); let mut startup_epoch_heal_delay = upstream_epoch_auto_heal_delay(); - let recover_hevc_after_drops = upstream_camera_uses_hevc(camera_cfg); static FAIL_CNT: AtomicUsize = AtomicUsize::new(0); loop { @@ -31,6 +30,9 @@ impl LesavkaClientApp { let active_camera_source = state.camera_source.resolve(initial_camera_source.as_deref()); let active_camera_profile = state.camera_profile.resolve(initial_camera_profile.as_deref()); + let active_camera_cfg = camera_config_with_live_codec(camera_cfg, &state.camera_codec); + let active_camera_codec = active_camera_cfg.map(|cfg| cfg.codec); + let recover_hevc_after_drops = upstream_camera_uses_hevc(active_camera_cfg); let active_microphone_source = state .microphone_source .resolve(initial_microphone_source.as_deref()); @@ -64,7 +66,7 @@ impl LesavkaClientApp { let camera = if camera_requested { Some(CameraCapture::new_with_capture_profile( setup_camera_source.as_deref(), - camera_cfg, + active_camera_cfg, capture_profile, )?) } else { @@ -210,6 +212,7 @@ impl LesavkaClientApp { let initial_camera_profile = initial_camera_profile.clone(); let active_camera_source = active_camera_source.clone(); let active_camera_profile = active_camera_profile.clone(); + let active_camera_codec = active_camera_codec; std::thread::spawn(move || { let mut waiting_for_hevc_keyframe = false; while !stop.load(Ordering::Relaxed) { @@ -218,9 +221,13 @@ impl LesavkaClientApp { state.camera_source.resolve(initial_camera_source.as_deref()); let desired_profile = state.camera_profile.resolve(initial_camera_profile.as_deref()); + let desired_camera_cfg = + camera_config_with_live_codec(camera_cfg, &state.camera_codec); if !state.camera || desired_source != active_camera_source || desired_profile != active_camera_profile + || desired_camera_cfg.map(|cfg| cfg.codec) + != active_camera_codec { stop.store(true, Ordering::Relaxed); let _ = event_tx.try_send(BundledCaptureEvent::Restart); @@ -366,6 +373,39 @@ impl LesavkaClientApp { } } +fn camera_config_with_live_codec( + cfg: Option, + choice: &crate::live_media_control::MediaCameraCodecChoice, +) -> Option { + let mut cfg = cfg?; + let fallback = camera_codec_id(cfg.codec); + if let Some(codec) = choice + .resolve(Some(fallback)) + .as_deref() + .and_then(parse_live_camera_codec) + { + cfg.codec = codec; + } + Some(cfg) +} + +fn camera_codec_id(codec: crate::input::camera::CameraCodec) -> &'static str { + match codec { + crate::input::camera::CameraCodec::Mjpeg => "mjpeg", + crate::input::camera::CameraCodec::Hevc => "hevc", + crate::input::camera::CameraCodec::H264 => "h264", + } +} + +fn parse_live_camera_codec(raw: &str) -> Option { + match raw.trim().to_ascii_lowercase().as_str() { + "mjpeg" | "mjpg" | "jpeg" => Some(crate::input::camera::CameraCodec::Mjpeg), + "hevc" | "h265" | "h.265" => Some(crate::input::camera::CameraCodec::Hevc), + "h264" => Some(crate::input::camera::CameraCodec::H264), + _ => None, + } +} + const DEFAULT_UPSTREAM_AUTO_HEAL_AFTER_MS: u64 = 3_000; /// Resolve whether the live bundled uplink should force one startup epoch heal. diff --git a/client/src/input/audio_codec.rs b/client/src/input/audio_codec.rs index 2b28f09..74c324b 100644 --- a/client/src/input/audio_codec.rs +++ b/client/src/input/audio_codec.rs @@ -14,11 +14,13 @@ use lesavka_common::{ }, lesavka::AudioPacket, }; +use std::collections::VecDeque; use std::time::Duration; const AUDIO_CODEC_ENV: &str = "LESAVKA_UPLINK_AUDIO_CODEC"; const AUDIO_CODEC_LEGACY_ENV: &str = "LESAVKA_AUDIO_CODEC"; const OPUS_PULL_TIMEOUT: Duration = Duration::from_millis(25); +const MAX_PENDING_OPUS_METADATA: usize = 16; /// Resolve the requested upstream audio codec from runtime environment. /// @@ -40,6 +42,7 @@ pub struct OpusPacketEncoder { _pipeline: gst::Pipeline, appsrc: gst_app::AppSrc, appsink: gst_app::AppSink, + pending_packets: VecDeque, } impl OpusPacketEncoder { @@ -79,6 +82,7 @@ impl OpusPacketEncoder { _pipeline: pipeline, appsrc, appsink, + pending_packets: VecDeque::new(), }) } @@ -88,6 +92,7 @@ impl OpusPacketEncoder { return Ok(Some(packet)); } + push_pending_packet(&mut self.pending_packets, packet_metadata(&packet)); let mut buffer = gst::Buffer::from_slice(packet.data.clone()); if let Some(meta) = buffer.get_mut() { let pts = gst::ClockTime::from_useconds(packet.pts); @@ -106,9 +111,14 @@ impl OpusPacketEncoder { )) else { return Ok(None); }; - let encoded = sample + let sample_buffer = sample .buffer() - .context("opus encoder sample missing buffer")? + .context("opus encoder sample missing buffer")?; + let sample_pts_us = sample_buffer.pts().map(|pts| pts.nseconds() / 1_000); + let sample_duration_us = sample_buffer + .duration() + .map(|duration| duration.nseconds() / 1_000); + let encoded = sample_buffer .map_readable() .context("map opus packet")? .to_vec(); @@ -116,10 +126,16 @@ impl OpusPacketEncoder { return Ok(None); } - let mut output = AudioPacket { - data: encoded, - ..packet - }; + let mut output = self + .take_pending_packet(sample_pts_us) + .unwrap_or_else(|| packet_metadata(&packet)); + output.data = encoded; + if let Some(pts_us) = sample_pts_us { + output.pts = pts_us; + } + if let Some(duration_us) = sample_duration_us { + output.frame_duration_us = duration_us.min(u64::from(u32::MAX)) as u32; + } audio_transport::mark_packet_opus(&mut output); Ok(Some(output)) } @@ -129,6 +145,31 @@ impl OpusPacketEncoder { pub const fn profile(&self) -> AudioTransportProfile { AudioTransportProfile::opus_voice() } + + fn take_pending_packet(&mut self, sample_pts_us: Option) -> Option { + if let Some(pts_us) = sample_pts_us + && let Some(index) = self + .pending_packets + .iter() + .position(|packet| packet.pts == pts_us) + { + return self.pending_packets.drain(..=index).last(); + } + self.pending_packets.pop_front() + } +} + +fn packet_metadata(packet: &AudioPacket) -> AudioPacket { + let mut metadata = packet.clone(); + metadata.data.clear(); + metadata +} + +fn push_pending_packet(pending: &mut VecDeque, packet: AudioPacket) { + pending.push_back(packet); + while pending.len() > MAX_PENDING_OPUS_METADATA { + pending.pop_front(); + } } impl Drop for OpusPacketEncoder { diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index a5f1a2d..f4d5c42 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -32,6 +32,7 @@ const MIC_PULSE_LATENCY_TIME_ENV: &str = "LESAVKA_MIC_PULSE_LATENCY_TIME_US"; const MIC_PACKET_TARGET_DURATION_ENV: &str = "LESAVKA_MIC_PACKET_TARGET_US"; const REQUIRE_EXPLICIT_MEDIA_SOURCES_ENV: &str = "LESAVKA_REQUIRE_EXPLICIT_MEDIA_SOURCES"; const MIC_NOISE_SUPPRESSION_ENV: &str = "LESAVKA_MIC_NOISE_SUPPRESSION"; +const MIC_NOISE_SUPPRESSION_LEVEL_ENV: &str = "LESAVKA_MIC_NOISE_SUPPRESSION_LEVEL"; const MIC_SAMPLE_RATE: u64 = 48_000; const MIC_CHANNELS: usize = 2; const MIC_SAMPLE_BYTES: usize = std::mem::size_of::(); @@ -99,14 +100,30 @@ fn microphone_pipeline_desc( } } -fn microphone_noise_suppression_stage(enabled: bool) -> &'static str { +fn microphone_noise_suppression_stage(enabled: bool) -> String { if enabled && gst::ElementFactory::find("webrtcdsp").is_some() { - "webrtcdsp echo-cancel=false noise-suppression=true noise-suppression-level=high high-pass-filter=true gain-control=false limiter=true ! " + format!( + "webrtcdsp echo-cancel=false noise-suppression=true noise-suppression-level={} high-pass-filter=true gain-control=false limiter=true ! ", + mic_noise_suppression_level() + ) } else { - "" + String::new() } } +fn mic_noise_suppression_level() -> &'static str { + std::env::var(MIC_NOISE_SUPPRESSION_LEVEL_ENV) + .ok() + .and_then(|raw| match raw.trim().to_ascii_lowercase().as_str() { + "low" => Some("low"), + "moderate" | "medium" => Some("moderate"), + "high" => Some("high"), + "very-high" | "very_high" | "veryhigh" | "aggressive" => Some("very-high"), + _ => None, + }) + .unwrap_or("very-high") +} + fn buffer_duration_us(buf: &gst::BufferRef, bytes: usize) -> u64 { let payload_duration_us = pcm_payload_duration_us(bytes); buf.duration() diff --git a/client/src/input/microphone/capture_runtime.rs b/client/src/input/microphone/capture_runtime.rs index f071322..14fc80c 100644 --- a/client/src/input/microphone/capture_runtime.rs +++ b/client/src/input/microphone/capture_runtime.rs @@ -163,7 +163,7 @@ impl MicrophoneCapture { /// Blocking pull; call from an async wrapper pub fn pull(&self) -> Option { if let Some(packet) = self.pending_packets.lock().ok()?.pop_front() { - return Some(self.encode_for_transport(packet)); + return self.encode_for_transport(packet); } match self.sink.pull_sample() { Ok(sample) => { @@ -224,25 +224,25 @@ impl MicrophoneCapture { { pending.extend(packets); } - first_packet.map(|packet| self.encode_for_transport(packet)) + first_packet.and_then(|packet| self.encode_for_transport(packet)) } Err(_) => None, } } - fn encode_for_transport(&self, packet: AudioPacket) -> AudioPacket { + fn encode_for_transport(&self, packet: AudioPacket) -> Option { let Some(mut guard) = self.audio_encoder.lock().ok() else { - return packet; + return Some(packet); }; let Some(encoder) = guard.as_mut() else { - return packet; + return Some(packet); }; match encoder.encode_packet(packet.clone()) { - Ok(Some(encoded)) => encoded, - Ok(None) => packet, + Ok(Some(encoded)) => Some(encoded), + Ok(None) => None, Err(err) => { warn!("🎤⚠️ Opus encode failed; sending PCM fallback for this packet: {err:#}"); - packet + Some(packet) } } } diff --git a/client/src/launcher/state/launcher_state_impl.rs b/client/src/launcher/state/launcher_state_impl.rs index c5a7931..14892cb 100644 --- a/client/src/launcher/state/launcher_state_impl.rs +++ b/client/src/launcher/state/launcher_state_impl.rs @@ -322,10 +322,7 @@ impl LauncherState { } pub fn effective_webcam_transport(&self) -> WebcamTransport { - self.server_camera_codec - .as_deref() - .and_then(WebcamTransport::from_server_codec) - .unwrap_or(self.webcam_transport) + self.webcam_transport } pub fn select_upstream_audio_transport(&mut self, transport: UpstreamAudioTransport) { diff --git a/client/src/launcher/tests/ui_runtime.rs b/client/src/launcher/tests/ui_runtime.rs index e074028..df34273 100644 --- a/client/src/launcher/tests/ui_runtime.rs +++ b/client/src/launcher/tests/ui_runtime.rs @@ -427,14 +427,14 @@ fn webcam_transport_combo_tracks_selected_upstream_codec() { assert!(view.device_stage.webcam_transport_combo.is_sensitive()); refresh_launcher_ui(&view.widgets, &state, true); - assert!(!view.device_stage.webcam_transport_combo.is_sensitive()); + assert!(view.device_stage.webcam_transport_combo.is_sensitive()); assert!( view.device_stage .webcam_transport_combo .tooltip_text() .as_deref() .unwrap_or_default() - .contains("Reconnect") + .contains("restarts the live camera path") ); } @@ -832,6 +832,7 @@ fn write_media_control_request_formats_soft_pause_state() { assert!(raw.contains("camera_profile=b64:"), "{raw}"); assert!(raw.contains("microphone_source=b64:"), "{raw}"); assert!(raw.contains("audio_sink=b64:"), "{raw}"); + assert!(raw.contains("camera_codec=mjpeg"), "{raw}"); } #[gtk::test] diff --git a/client/src/launcher/ui/stage_device_bindings.rs b/client/src/launcher/ui/stage_device_bindings.rs index 59d914f..6a49b21 100644 --- a/client/src/launcher/ui/stage_device_bindings.rs +++ b/client/src/launcher/ui/stage_device_bindings.rs @@ -66,10 +66,12 @@ state.borrow_mut().select_webcam_transport(selected); let relay_live = child_proc.borrow().is_some(); if relay_live { - widgets.status_label.set_text(&format!( - "Webcam transport changed to {} for the next reconnect; keeping the live decoder path stable.", - selected.label() - )); + apply_live_media_device_change( + &state.borrow(), + &widgets, + &child_proc, + "Webcam transport", + ); } else { widgets.status_label.set_text(&format!( "Webcam transport set to {} for the next relay launch.", diff --git a/client/src/launcher/ui_runtime/control_paths.rs b/client/src/launcher/ui_runtime/control_paths.rs index 4014e5b..35d2210 100644 --- a/client/src/launcher/ui_runtime/control_paths.rs +++ b/client/src/launcher/ui_runtime/control_paths.rs @@ -145,7 +145,7 @@ pub fn write_mic_gain_request(path: &Path, gain_percent: u32) -> Result<()> { pub fn write_media_control_request(path: &Path, state: &LauncherState) -> Result<()> { crate::live_media_control::write_media_control_request( path, - crate::live_media_control::MediaControlState::with_devices_and_audio( + crate::live_media_control::MediaControlState::with_devices_and_codecs( state.channels.camera, state.channels.microphone, state.channels.audio, @@ -153,6 +153,7 @@ pub fn write_media_control_request(path: &Path, state: &LauncherState) -> Result state.camera_quality.map(|mode| mode.id()), state.devices.microphone.clone(), state.devices.speaker.clone(), + Some(state.webcam_transport.as_id().to_string()), state.upstream_audio_transport.as_common_codec(), state.mic_noise_suppression, ), diff --git a/client/src/launcher/ui_runtime/status_refresh.rs b/client/src/launcher/ui_runtime/status_refresh.rs index e41ea54..8aa074e 100644 --- a/client/src/launcher/ui_runtime/status_refresh.rs +++ b/client/src/launcher/ui_runtime/status_refresh.rs @@ -261,11 +261,11 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi } widgets .webcam_transport_combo - .set_sensitive(!relay_live && state.channels.camera); + .set_sensitive(state.channels.camera); widgets .webcam_transport_combo .set_tooltip_text(Some(if relay_live { - "Reconnect before changing the upstream webcam transport; the server decoder is calibrated per ingress codec." + "Changing upstream webcam transport restarts the live camera path; the picture may pause briefly." } else { "Use the server-advertised upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default." })); diff --git a/client/src/live_media_control.rs b/client/src/live_media_control.rs index 9580312..b442517 100644 --- a/client/src/live_media_control.rs +++ b/client/src/live_media_control.rs @@ -41,6 +41,30 @@ impl MediaAudioCodecChoice { } } +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum MediaCameraCodecChoice { + Inherit, + Selected(String), +} + +impl MediaCameraCodecChoice { + #[must_use] + pub fn selected(codec: Option) -> Self { + codec + .filter(|value| !value.trim().is_empty()) + .map(|value| Self::Selected(value.trim().to_ascii_lowercase())) + .unwrap_or(Self::Inherit) + } + + #[must_use] + pub fn resolve(&self, fallback: Option<&str>) -> Option { + match self { + Self::Inherit => fallback.map(str::to_string), + Self::Selected(codec) => Some(codec.clone()), + } + } +} + #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum MediaNoiseSuppressionChoice { Inherit, @@ -98,6 +122,7 @@ pub(crate) struct MediaControlState { pub camera_profile: MediaDeviceChoice, pub microphone_source: MediaDeviceChoice, pub audio_sink: MediaDeviceChoice, + pub camera_codec: MediaCameraCodecChoice, pub audio_codec: MediaAudioCodecChoice, pub noise_suppression: MediaNoiseSuppressionChoice, } @@ -113,6 +138,7 @@ impl MediaControlState { camera_profile: MediaDeviceChoice::Inherit, microphone_source: MediaDeviceChoice::Inherit, audio_sink: MediaDeviceChoice::Inherit, + camera_codec: MediaCameraCodecChoice::Inherit, audio_codec: MediaAudioCodecChoice::Inherit, noise_suppression: MediaNoiseSuppressionChoice::Inherit, } @@ -136,6 +162,7 @@ impl MediaControlState { camera_profile: MediaDeviceChoice::from_selection(camera_profile), microphone_source: MediaDeviceChoice::from_selection(microphone_source), audio_sink: MediaDeviceChoice::from_selection(audio_sink), + camera_codec: MediaCameraCodecChoice::Inherit, audio_codec: MediaAudioCodecChoice::Inherit, noise_suppression: MediaNoiseSuppressionChoice::Inherit, } @@ -168,6 +195,36 @@ impl MediaControlState { ) } } + + #[must_use] + #[allow(clippy::too_many_arguments)] + pub fn with_devices_and_codecs( + camera: bool, + microphone: bool, + audio: bool, + camera_source: Option, + camera_profile: Option, + microphone_source: Option, + audio_sink: Option, + camera_codec: Option, + audio_codec: UpstreamAudioCodec, + noise_suppression: bool, + ) -> Self { + Self { + camera_codec: MediaCameraCodecChoice::selected(camera_codec), + audio_codec: MediaAudioCodecChoice::selected(audio_codec), + noise_suppression: MediaNoiseSuppressionChoice::selected(noise_suppression), + ..Self::with_devices( + camera, + microphone, + audio, + camera_source, + camera_profile, + microphone_source, + audio_sink, + ) + } + } } #[derive(Clone, Debug)] @@ -217,7 +274,7 @@ pub(crate) fn write_media_control_request( fs::write( path, format!( - "camera={} microphone={} audio={} camera_source={} camera_profile={} microphone_source={} audio_sink={} audio_codec={} noise_suppression={} nonce={}\n", + "camera={} microphone={} audio={} camera_source={} camera_profile={} microphone_source={} audio_sink={} camera_codec={} audio_codec={} noise_suppression={} nonce={}\n", bool_flag(state.camera), bool_flag(state.microphone), bool_flag(state.audio), @@ -225,6 +282,7 @@ pub(crate) fn write_media_control_request( encode_choice(&state.camera_profile), encode_choice(&state.microphone_source), encode_choice(&state.audio_sink), + encode_camera_codec_choice(&state.camera_codec), encode_audio_codec_choice(&state.audio_codec), encode_noise_suppression_choice(&state.noise_suppression), control_request_nonce(), @@ -241,6 +299,7 @@ fn parse_media_control_state(raw: &str) -> Option { let mut camera_profile = MediaDeviceChoice::Inherit; let mut microphone_source = MediaDeviceChoice::Inherit; let mut audio_sink = MediaDeviceChoice::Inherit; + let mut camera_codec = MediaCameraCodecChoice::Inherit; let mut audio_codec = MediaAudioCodecChoice::Inherit; let mut noise_suppression = MediaNoiseSuppressionChoice::Inherit; for token in raw.split_ascii_whitespace() { @@ -257,6 +316,9 @@ fn parse_media_control_state(raw: &str) -> Option { microphone_source = parse_choice(value)?; } "audio_sink" | "speaker_sink" | "audio_sink_b64" => audio_sink = parse_choice(value)?, + "camera_codec" | "uplink_camera_codec" | "webcam_transport" => { + camera_codec = parse_camera_codec_choice(value)?; + } "audio_codec" | "uplink_audio_codec" => { audio_codec = parse_audio_codec_choice(value)?; } @@ -274,6 +336,7 @@ fn parse_media_control_state(raw: &str) -> Option { camera_profile, microphone_source, audio_sink, + camera_codec, audio_codec, noise_suppression, }) @@ -307,6 +370,26 @@ fn parse_choice(value: &str) -> Option { Some(MediaDeviceChoice::from_selection(Some(value.to_string()))) } +fn encode_camera_codec_choice(choice: &MediaCameraCodecChoice) -> &str { + match choice { + MediaCameraCodecChoice::Inherit => "inherit", + MediaCameraCodecChoice::Selected(codec) => codec.as_str(), + } +} + +fn parse_camera_codec_choice(value: &str) -> Option { + let value = value.trim(); + if value.eq_ignore_ascii_case("inherit") || value.is_empty() { + return Some(MediaCameraCodecChoice::Inherit); + } + match value.to_ascii_lowercase().as_str() { + "mjpeg" | "mjpg" | "jpeg" => Some(MediaCameraCodecChoice::Selected("mjpeg".to_string())), + "hevc" | "h265" | "h.265" => Some(MediaCameraCodecChoice::Selected("hevc".to_string())), + "h264" => Some(MediaCameraCodecChoice::Selected("h264".to_string())), + _ => None, + } +} + fn encode_audio_codec_choice(choice: &MediaAudioCodecChoice) -> &'static str { match choice { MediaAudioCodecChoice::Inherit => "inherit", diff --git a/common/Cargo.toml b/common/Cargo.toml index 0625cb4..d8d8e3e 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.21" +version = "0.22.22" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index a6c5dc0..10d60a3 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.21" +version = "0.22.22" edition = "2024" autobins = false diff --git a/server/src/audio/opus_decode.rs b/server/src/audio/opus_decode.rs index e492aed..a945a0f 100644 --- a/server/src/audio/opus_decode.rs +++ b/server/src/audio/opus_decode.rs @@ -12,14 +12,17 @@ use lesavka_common::{ audio_transport, lesavka::AudioPacket, }; +use std::collections::VecDeque; use std::time::Duration; const OPUS_DECODE_PULL_TIMEOUT: Duration = Duration::from_millis(30); +const MAX_PENDING_OPUS_METADATA: usize = 16; pub(super) struct OpusPacketDecoder { _pipeline: gst::Pipeline, appsrc: gst_app::AppSrc, appsink: gst_app::AppSink, + pending_packets: VecDeque, } impl OpusPacketDecoder { @@ -55,10 +58,12 @@ impl OpusPacketDecoder { _pipeline: pipeline, appsrc, appsink, + pending_packets: VecDeque::new(), }) } pub(super) fn decode_packet(&mut self, packet: &AudioPacket) -> Result> { + push_pending_packet(&mut self.pending_packets, packet_metadata(packet)); let mut buffer = gst::Buffer::from_slice(packet.data.clone()); if let Some(meta) = buffer.get_mut() { let pts = gst::ClockTime::from_useconds(packet.pts); @@ -82,9 +87,12 @@ impl OpusPacketDecoder { else { return Ok(None); }; - let decoded = sample - .buffer() - .context("opus decoder sample missing buffer")? + let sample_buffer = sample.buffer().context("opus decoder sample missing buffer")?; + let sample_pts_us = sample_buffer.pts().map(|pts| pts.nseconds() / 1_000); + let sample_duration_us = sample_buffer + .duration() + .map(|duration| duration.nseconds() / 1_000); + let decoded = sample_buffer .map_readable() .context("map decoded pcm")? .to_vec(); @@ -92,13 +100,44 @@ impl OpusPacketDecoder { return Ok(None); } - let mut output = AudioPacket { - data: decoded, - ..packet.clone() - }; + let mut output = self + .take_pending_packet(sample_pts_us) + .unwrap_or_else(|| packet_metadata(packet)); + output.data = decoded; + if let Some(pts_us) = sample_pts_us { + output.pts = pts_us; + } + if let Some(duration_us) = sample_duration_us { + output.frame_duration_us = duration_us.min(u64::from(u32::MAX)) as u32; + } audio_transport::mark_packet_pcm_s16le(&mut output); Ok(Some(output)) } + + fn take_pending_packet(&mut self, sample_pts_us: Option) -> Option { + if let Some(pts_us) = sample_pts_us + && let Some(index) = self + .pending_packets + .iter() + .position(|packet| packet.pts == pts_us) + { + return self.pending_packets.drain(..=index).last(); + } + self.pending_packets.pop_front() + } +} + +fn packet_metadata(packet: &AudioPacket) -> AudioPacket { + let mut metadata = packet.clone(); + metadata.data.clear(); + metadata +} + +fn push_pending_packet(pending: &mut VecDeque, packet: AudioPacket) { + pending.push_back(packet); + while pending.len() > MAX_PENDING_OPUS_METADATA { + pending.pop_front(); + } } impl Drop for OpusPacketDecoder { diff --git a/server/src/video_sinks/hevc_mjpeg_guard.rs b/server/src/video_sinks/hevc_mjpeg_guard.rs index 4680637..7757555 100644 --- a/server/src/video_sinks/hevc_mjpeg_guard.rs +++ b/server/src/video_sinks/hevc_mjpeg_guard.rs @@ -155,12 +155,12 @@ pub(super) fn should_freeze_decoded_mjpeg(previous_bytes: u64, next_bytes: usize /// mostly black frames, or torn images that conferencing apps may otherwise /// display as if they were valid webcam frames. pub(super) fn should_freeze_decoded_mjpeg_frame(previous_bytes: u64, decoded_mjpeg: &[u8]) -> bool { - if !freeze_on_size_drop_enabled() || previous_bytes < u64::from(min_reference_bytes()) { + if !freeze_on_size_drop_enabled() { return false; } !looks_like_complete_jpeg(decoded_mjpeg) - || should_freeze_decoded_mjpeg(previous_bytes, decoded_mjpeg.len()) || suspiciously_flat_payload(decoded_mjpeg) + || should_freeze_decoded_mjpeg(previous_bytes, decoded_mjpeg.len()) } #[cfg(test)] @@ -258,6 +258,10 @@ mod tests { assert!(super::should_freeze_decoded_mjpeg_frame( 200_000, &truncated, )); + assert!(super::should_freeze_decoded_mjpeg_frame( + 0, + &jpeg_with_payload(&flat_payload), + )); }, ); } diff --git a/server/src/video_sinks/webcam_sink.rs b/server/src/video_sinks/webcam_sink.rs index de7f683..45aaa88 100644 --- a/server/src/video_sinks/webcam_sink.rs +++ b/server/src/video_sinks/webcam_sink.rs @@ -39,7 +39,9 @@ pub struct WebcamSink { next_pts_us: AtomicU64, frame_step_us: u64, mjpeg_spool_path: Option, + hevc_mjpeg_appsrc: Option, decoded_mjpeg_sink: Option, + last_mjpeg_passthrough_bytes: AtomicU64, last_decoded_mjpeg_bytes: AtomicU64, decoded_mjpeg_miss_count: AtomicU64, decode_recovery_needs_irap: AtomicBool, @@ -161,6 +163,90 @@ fn build_hevc_freshness_queue(name: &str) -> anyhow::Result { Ok(queue) } +#[cfg(not(coverage))] +fn add_hevc_mjpeg_spool_branch( + pipeline: &gst::Pipeline, + width: i32, + height: i32, + fps: i32, +) -> anyhow::Result<(gst_app::AppSrc, gst_app::AppSink)> { + let src = gst::ElementFactory::make("appsrc") + .name("dynamic_hevc_mjpeg_src") + .build()? + .downcast::() + .expect("dynamic HEVC appsrc"); + src.set_is_live(true); + src.set_format(gst::Format::Time); + src.set_property("do-timestamp", false); + configure_uvc_appsrc(&src); + let caps_hevc = gst::Caps::builder("video/x-h265") + .field("stream-format", "byte-stream") + .field("alignment", "au") + .build(); + src.set_caps(Some(&caps_hevc)); + + let caps_mjpeg = gst::Caps::builder("image/jpeg") + .field("parsed", true) + .field("width", width) + .field("height", height) + .field("framerate", gst::Fraction::new(fps, 1)) + .field("pixel-aspect-ratio", gst::Fraction::new(1, 1)) + .field("colorimetry", "2:4:7:1") + .build(); + let h265parse = gst::ElementFactory::make("h265parse") + .property("disable-passthrough", true) + .property("config-interval", -1i32) + .build()?; + let decoder_name = require_hevc_decoder()?; + let decoder = gst::ElementFactory::make(decoder_name) + .build() + .with_context(|| format!("building dynamic HEVC decoder element {decoder_name}"))?; + configure_hevc_decoder(&decoder); + let decoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_decoded_queue")?; + let convert = gst::ElementFactory::make("videoconvert").build()?; + let encoder = gst::ElementFactory::make("jpegenc") + .property("quality", hevc_mjpeg_guard::hevc_jpeg_quality() as i32) + .build()?; + let caps = gst::ElementFactory::make("capsfilter") + .property("caps", &caps_mjpeg) + .build()?; + let encoded_queue = build_hevc_freshness_queue("dynamic_hevc_mjpeg_encoded_queue")?; + let sink = gst::ElementFactory::make("appsink") + .name("dynamic_hevc_mjpeg_spool_sink") + .property("sync", false) + .property("enable-last-sample", false) + .property("emit-signals", false) + .property("max-buffers", 1u32) + .property("drop", true) + .build()? + .downcast::() + .expect("dynamic HEVC appsink"); + + pipeline.add_many([ + src.upcast_ref(), + &h265parse, + &decoder, + &decoded_queue, + &convert, + &encoder, + &caps, + &encoded_queue, + sink.upcast_ref(), + ])?; + gst::Element::link_many([ + src.upcast_ref(), + &h265parse, + &decoder, + &decoded_queue, + &convert, + &encoder, + &caps, + &encoded_queue, + sink.upcast_ref(), + ])?; + Ok((src, sink)) +} + /// Configure conservative recovery knobs on hardware HEVC decoders. /// /// Inputs: a decoder element selected by `require_hevc_decoder`. Output: @@ -288,7 +374,9 @@ impl WebcamSink { next_pts_us: AtomicU64::new(0), frame_step_us, mjpeg_spool_path: None, + hevc_mjpeg_appsrc: None, decoded_mjpeg_sink: None, + last_mjpeg_passthrough_bytes: AtomicU64::new(0), last_decoded_mjpeg_bytes: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0), decode_recovery_needs_irap: AtomicBool::new(false), @@ -321,6 +409,7 @@ impl WebcamSink { } let mut mjpeg_spool_file = None; + let mut hevc_mjpeg_appsrc = None; let mut decoded_mjpeg_sink = None; if use_mjpeg && mjpeg_spool_enabled() { @@ -345,6 +434,23 @@ impl WebcamSink { pipeline.add_many([src.upcast_ref(), &sink])?; gst::Element::link_many([src.upcast_ref(), &sink])?; mjpeg_spool_file = Some(mjpeg_spool_path()); + match add_hevc_mjpeg_spool_branch(&pipeline, width, height, fps) { + Ok((hevc_src, hevc_sink)) => { + hevc_mjpeg_appsrc = Some(hevc_src); + decoded_mjpeg_sink = Some(hevc_sink); + tracing::info!( + target: "lesavka_server::video", + "📸 MJPEG UVC spool will also accept live HEVC uplink packets" + ); + } + Err(err) => { + tracing::warn!( + target: "lesavka_server::video", + %err, + "📸⚠️ dynamic HEVC->MJPEG branch unavailable; MJPEG UVC spool will accept MJPEG only" + ); + } + } } else if use_mjpeg { let caps_mjpeg = gst::Caps::builder("image/jpeg") .field("parsed", true) @@ -541,7 +647,9 @@ impl WebcamSink { next_pts_us: AtomicU64::new(0), frame_step_us, mjpeg_spool_path: mjpeg_spool_file, + hevc_mjpeg_appsrc, decoded_mjpeg_sink, + last_mjpeg_passthrough_bytes: AtomicU64::new(0), last_decoded_mjpeg_bytes: AtomicU64::new(0), decoded_mjpeg_miss_count: AtomicU64::new(0), decode_recovery_needs_irap: AtomicBool::new(false), @@ -563,6 +671,13 @@ impl WebcamSink { #[cfg(not(coverage))] pub fn push(&self, pkt: VideoPacket) { + if let Some(path) = &self.mjpeg_spool_path + && looks_like_mjpeg_frame(&pkt.data) + { + self.spool_direct_mjpeg_frame(path, &pkt); + return; + } + let hevc_recovery_frame = self.decoded_mjpeg_sink.is_some() && contains_hevc_irap(&pkt.data); if self.decoded_mjpeg_sink.is_some() @@ -584,22 +699,16 @@ impl WebcamSink { ); } - if let Some(path) = &self.mjpeg_spool_path + if self.mjpeg_spool_path.is_some() && self.decoded_mjpeg_sink.is_none() + && !looks_like_mjpeg_frame(&pkt.data) { - if !looks_like_mjpeg_frame(&pkt.data) { - warn!( - target:"lesavka_server::video", - bytes = pkt.data.len(), - hevc_annex_b = looks_like_annex_b_hevc(&pkt.data), - "📸⚠️ dropping non-MJPEG packet before UVC spool; client/server camera codec mismatch would black-screen the browser webcam" - ); - return; - } - let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts); - if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) { - warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper"); - } + warn!( + target:"lesavka_server::video", + bytes = pkt.data.len(), + hevc_annex_b = looks_like_annex_b_hevc(&pkt.data), + "📸⚠️ dropping non-MJPEG packet before UVC spool; no dynamic decoder is available" + ); return; } @@ -617,7 +726,8 @@ impl WebcamSink { meta.set_dts(Some(ts)); meta.set_duration(Some(gst::ClockTime::from_useconds(self.frame_step_us))); } - if let Err(err) = self.appsrc.push_buffer(buf) { + let hevc_appsrc = self.hevc_mjpeg_appsrc.as_ref().unwrap_or(&self.appsrc); + if let Err(err) = hevc_appsrc.push_buffer(buf) { tracing::warn!(target:"lesavka_server::video", %err, "📸⚠️ appsrc push failed"); return; } @@ -671,6 +781,29 @@ impl WebcamSink { } } } + + #[cfg(not(coverage))] + fn spool_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { + let previous_bytes = self + .last_mjpeg_passthrough_bytes + .load(std::sync::atomic::Ordering::Relaxed); + if hevc_mjpeg_guard::should_freeze_decoded_mjpeg_frame(previous_bytes, &pkt.data) { + warn!( + target:"lesavka_server::video", + previous_bytes, + next_bytes = pkt.data.len(), + "📸⚠️ freezing suspicious direct MJPEG frame before UVC spool" + ); + return; + } + let timing = MjpegSpoolTiming::mjpeg_passthrough(pkt.pts); + if let Err(err) = spool_mjpeg_frame_with_timing(path, &pkt.data, Some(timing)) { + warn!(target:"lesavka_server::video", %err, "📸⚠️ failed to spool MJPEG frame for UVC helper"); + } else { + self.last_mjpeg_passthrough_bytes + .store(pkt.data.len() as u64, std::sync::atomic::Ordering::Relaxed); + } + } } impl Drop for WebcamSink { diff --git a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs index b3780c5..0d23883 100644 --- a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs +++ b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs @@ -109,8 +109,11 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() { for marker in [ "freshest_mjpeg_sample(sink)", "last_decoded_mjpeg_bytes", + "last_mjpeg_passthrough_bytes", "should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())", + "spool_direct_mjpeg_frame", "freezing suspicious decoded HEVC->MJPEG frame", + "freezing suspicious direct MJPEG frame before UVC spool", ] { assert!( WEBCAM_SINK.contains(marker), @@ -156,6 +159,7 @@ fn grey_and_black_slab_bursts_freeze_instead_of_reaching_uvc() { assert!(guard::should_freeze_frame(prior_good, &grey)); assert!(guard::should_freeze_frame(prior_good, &black)); + assert!(guard::should_freeze_frame(0, &black)); assert!(!guard::should_freeze_frame(prior_good, &healthy)); }, ); diff --git a/tests/compatibility/client/audio/client_opus_transport_contract.rs b/tests/compatibility/client/audio/client_opus_transport_contract.rs index 5cc8b11..a3fb50d 100644 --- a/tests/compatibility/client/audio/client_opus_transport_contract.rs +++ b/tests/compatibility/client/audio/client_opus_transport_contract.rs @@ -12,6 +12,10 @@ const MICROPHONE: &str = include_str!(concat!( env!("CARGO_MANIFEST_DIR"), "/client/src/input/microphone.rs" )); +const MICROPHONE_CAPTURE_RUNTIME: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/client/src/input/microphone/capture_runtime.rs" +)); const AUDIO_CODEC: &str = include_str!(concat!( env!("CARGO_MANIFEST_DIR"), "/client/src/input/audio_codec.rs" @@ -39,9 +43,12 @@ fn client_microphone_path_preserves_pcm_fallback_and_opus_selection() { "MIC_CHANNELS", "MIC_PACKET_TARGET_DURATION_ENV", "DEFAULT_MIC_PACKET_TARGET_DURATION_US: u64 = 20_000", + "Ok(None) => None", ] { assert!( - MICROPHONE.contains(expected) || AUDIO_CODEC.contains(expected), + MICROPHONE.contains(expected) + || MICROPHONE_CAPTURE_RUNTIME.contains(expected) + || AUDIO_CODEC.contains(expected), "microphone capture should preserve audio transport marker {expected}" ); } @@ -49,6 +56,16 @@ fn client_microphone_path_preserves_pcm_fallback_and_opus_selection() { AUDIO_CODEC.contains("mark_packet_opus") && AUDIO_CODEC.contains("Opus by default"), "client Opus encoder should stamp packets while documenting the default" ); + for expected in [ + "pending_packets: VecDeque", + "take_pending_packet(sample_pts_us)", + "push_pending_packet(&mut self.pending_packets", + ] { + assert!( + AUDIO_CODEC.contains(expected), + "client Opus encoder must preserve timing metadata across codec delay: {expected}" + ); + } } #[test] diff --git a/tests/contract/client/input/microphone/client_microphone_include_contract.rs b/tests/contract/client/input/microphone/client_microphone_include_contract.rs index 65f6261..95ce958 100644 --- a/tests/contract/client/input/microphone/client_microphone_include_contract.rs +++ b/tests/contract/client/input/microphone/client_microphone_include_contract.rs @@ -258,6 +258,7 @@ JSON if gst::ElementFactory::find("webrtcdsp").is_some() { assert!(suppressed.contains("webrtcdsp")); assert!(suppressed.contains("noise-suppression=true")); + assert!(suppressed.contains("noise-suppression-level=very-high")); } else { assert_eq!(raw, suppressed); } diff --git a/tests/contract/server/audio/server_opus_uac_contract.rs b/tests/contract/server/audio/server_opus_uac_contract.rs index 89b69ff..ebd8b14 100644 --- a/tests/contract/server/audio/server_opus_uac_contract.rs +++ b/tests/contract/server/audio/server_opus_uac_contract.rs @@ -40,6 +40,9 @@ fn uac_sink_remains_raw_pcm_and_guards_compressed_packets() { "opusdec", "audio/x-opus", "audio/x-raw", + "pending_packets: VecDeque", + "take_pending_packet(sample_pts_us)", + "push_pending_packet(&mut self.pending_packets", ] { assert!( OPUS_DECODE.contains(expected), diff --git a/tests/integration/client/runtime_controls/client_live_media_control_integration.rs b/tests/integration/client/runtime_controls/client_live_media_control_integration.rs index b8f24b1..ff3225c 100644 --- a/tests/integration/client/runtime_controls/client_live_media_control_integration.rs +++ b/tests/integration/client/runtime_controls/client_live_media_control_integration.rs @@ -20,7 +20,7 @@ use temp_env::with_var; #[test] fn live_device_switch_writes_and_reads_all_selected_media_choices() { - let state = MediaControlState::with_devices_and_audio( + let state = MediaControlState::with_devices_and_codecs( true, true, true, @@ -28,6 +28,7 @@ fn live_device_switch_writes_and_reads_all_selected_media_choices() { Some("1280x720@30".to_string()), Some("alsa_input.usb-focusrite.analog-stereo".to_string()), Some("alsa_output.usb-headphones.analog-stereo".to_string()), + Some("hevc".to_string()), UpstreamAudioCodec::Opus, true, ); @@ -45,6 +46,7 @@ fn live_device_switch_writes_and_reads_all_selected_media_choices() { "camera_profile=b64:", "microphone_source=b64:", "audio_sink=b64:", + "camera_codec=hevc", "audio_codec=opus", "noise_suppression=1", "nonce=", diff --git a/tests/ui/client/launcher/client_codec_transport_ui_contract.rs b/tests/ui/client/launcher/client_codec_transport_ui_contract.rs index 878a43a..672c15e 100644 --- a/tests/ui/client/launcher/client_codec_transport_ui_contract.rs +++ b/tests/ui/client/launcher/client_codec_transport_ui_contract.rs @@ -70,7 +70,8 @@ fn webcam_transport_changes_are_staged_when_relay_is_live() { for marker in [ "state.borrow_mut().select_webcam_transport(selected);", "child_proc.borrow().is_some()", - "Webcam transport changed to {} for the next reconnect; keeping the live decoder path stable.", + "apply_live_media_device_change(", + "\"Webcam transport\"", "Webcam transport set to {} for the next relay launch.", ] { assert!( @@ -80,8 +81,8 @@ fn webcam_transport_changes_are_staged_when_relay_is_live() { } for marker in [ - ".webcam_transport_combo\n .set_sensitive(!relay_live && state.channels.camera);", - "Reconnect before changing the upstream webcam transport; the server decoder is calibrated per ingress codec.", + ".webcam_transport_combo\n .set_sensitive(state.channels.camera);", + "Changing upstream webcam transport restarts the live camera path; the picture may pause briefly.", "Use the server-advertised upstream webcam transport for the next relay launch; MJPEG is the safe calibrated default.", ] { assert!(