media: keep repaired uplink packets live

This commit is contained in:
Brad Stein 2026-05-13 17:43:00 -03:00
parent 5a5990d593
commit ed3ed1a165
15 changed files with 242 additions and 38 deletions

6
Cargo.lock generated
View File

@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.22.24"
version = "0.22.25"
dependencies = [
"anyhow",
"async-stream",
@ -1686,7 +1686,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.22.24"
version = "0.22.25"
dependencies = [
"anyhow",
"base64",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.22.24"
version = "0.22.25"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.22.24"
version = "0.22.25"
edition = "2024"
[dependencies]

View File

@ -502,8 +502,7 @@ impl CameraCapture {
crate::live_capture_clock::upstream_source_lag_cap(),
);
if timing.lag_clamped {
log_camera_stale_source_drop(timing, map.as_slice().len());
return None;
log_camera_lag_clamped_source(timing, map.as_slice().len());
}
let pts = timing.packet_pts_us;
static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 =
@ -610,21 +609,24 @@ fn log_camera_timing_sample(
}
}
/// Keeps `log_camera_stale_source_drop` explicit because it sits on camera selection, where negotiated profiles must match the server output contract.
/// Keeps `log_camera_lag_clamped_source` explicit because it sits on camera selection, where negotiated profiles must match the server output contract.
/// Inputs are the typed parameters; output is the return value or side effect.
fn log_camera_stale_source_drop(timing: crate::live_capture_clock::RebasedSourcePts, bytes: usize) {
static CAMERA_STALE_SOURCE_DROPS: std::sync::atomic::AtomicU64 =
fn log_camera_lag_clamped_source(
timing: crate::live_capture_clock::RebasedSourcePts,
bytes: usize,
) {
static CAMERA_LAG_CLAMPED_PACKETS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(0);
let drop_index =
CAMERA_STALE_SOURCE_DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if drop_index < 10 || drop_index.is_multiple_of(300) {
let packet_index =
CAMERA_LAG_CLAMPED_PACKETS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if packet_index < 10 || packet_index.is_multiple_of(300) {
tracing::warn!(
drop_index,
packet_index,
bytes,
source_pts_us = timing.source_pts_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us,
"📸 dropping stale webcam source buffer before bundled uplink"
"📸 clamped laggy webcam source timestamp before bundled uplink"
);
}
}

View File

@ -140,28 +140,28 @@ fn pcm_payload_duration_us(bytes: usize) -> u64 {
}
#[cfg(not(coverage))]
/// Keeps `log_microphone_stale_source_drop` explicit because it sits on microphone capture setup, where host audio stacks expose different source names and latency controls.
/// Keeps `log_microphone_lag_clamped_source` explicit because it sits on microphone capture setup, where host audio stacks expose different source names and latency controls.
/// Inputs are the typed parameters; output is the return value or side effect.
fn log_microphone_stale_source_drop(
fn log_microphone_lag_clamped_source(
timing: crate::live_capture_clock::RebasedSourcePts,
bytes: usize,
) {
static MIC_STALE_SOURCE_DROPS: AtomicU64 = AtomicU64::new(0);
let drop_index = MIC_STALE_SOURCE_DROPS.fetch_add(1, Ordering::Relaxed);
if drop_index < 10 || drop_index.is_multiple_of(300) {
static MIC_LAG_CLAMPED_PACKETS: AtomicU64 = AtomicU64::new(0);
let packet_index = MIC_LAG_CLAMPED_PACKETS.fetch_add(1, Ordering::Relaxed);
if packet_index < 10 || packet_index.is_multiple_of(300) {
warn!(
drop_index,
packet_index,
bytes,
source_pts_us = timing.source_pts_us.unwrap_or_default(),
capture_now_us = timing.capture_now_us,
packet_pts_us = timing.packet_pts_us,
"🎤 dropping stale microphone source buffer before bundled uplink"
"🎤 clamped laggy microphone source timestamp before bundled uplink"
);
}
}
#[cfg(coverage)]
fn log_microphone_stale_source_drop(
fn log_microphone_lag_clamped_source(
_timing: crate::live_capture_clock::RebasedSourcePts,
_bytes: usize,
) {

View File

@ -177,8 +177,7 @@ impl MicrophoneCapture {
crate::live_capture_clock::upstream_source_lag_cap(),
);
if timing.lag_clamped {
log_microphone_stale_source_drop(timing, map.len());
return None;
log_microphone_lag_clamped_source(timing, map.len());
}
let pts = timing.packet_pts_us;
let target_bytes = mic_packet_target_bytes();

View File

@ -411,6 +411,13 @@
let state = state.borrow();
best_effort_recording_profile(&state, preview.as_deref(), monitor_id)
};
if let Err(err) = current_eye_texture(&pane.picture) {
widgets.status_label.set_text(&format!(
"{} recording needs a live frame first: {err}",
pane.title
));
return;
}
let root = {
let borrowed = save_state.borrow();
match ensure_eye_capture_dir(borrowed.save_dir_override.as_deref(), "recordings") {
@ -462,6 +469,7 @@
let pane_for_tick = pane.clone();
let widgets_for_tick = widgets.clone();
let save_state_for_tick = Rc::clone(&save_state);
let button_for_tick = button.clone();
let timer = glib::timeout_add_local(
Duration::from_millis(recording_interval_ms(record_fps)),
move || {
@ -473,6 +481,13 @@
if let Some(frame_writer_tx) = state.frame_writer_tx.take() {
let _ = frame_writer_tx.send(RecordFrameTask::Finish);
}
state.timer = None;
state.next_frame_index = 0;
state.frame_dir = None;
state.finalize_rx = None;
button_for_tick.remove_css_class("recording-active");
button_for_tick.set_sensitive(true);
button_for_tick.set_label("Record");
widgets_for_tick.status_label.set_text(&format!(
"{} recording frame skipped: {err}",
pane_for_tick.title

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.22.24"
version = "0.22.25"
edition = "2024"
build = "build.rs"

View File

@ -10,7 +10,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.22.24"
version = "0.22.25"
edition = "2024"
autobins = false

View File

@ -8,7 +8,7 @@ use gstreamer as gst;
use gstreamer_app as gst_app;
use std::fs;
use std::sync::{
Arc, Mutex,
Arc, Mutex, OnceLock,
atomic::{AtomicBool, AtomicU64, Ordering},
};
use std::time::{Duration, Instant};
@ -23,6 +23,7 @@ use lesavka_common::lesavka::AudioPacket;
pub struct AudioStream {
_pipeline: gst::Pipeline,
inner: ReceiverStream<Result<AudioPacket, Status>>,
active_generation: Option<u64>,
}
impl Stream for AudioStream {
@ -38,9 +39,74 @@ impl Stream for AudioStream {
impl Drop for AudioStream {
fn drop(&mut self) {
let _ = self._pipeline.set_state(gst::State::Null);
clear_active_audio_capture(self.active_generation);
}
}
#[cfg(not(coverage))]
static NEXT_AUDIO_CAPTURE_GENERATION: AtomicU64 = AtomicU64::new(1);
#[cfg(not(coverage))]
static ACTIVE_AUDIO_CAPTURE: OnceLock<Mutex<Option<(u64, gst::Pipeline)>>> = OnceLock::new();
#[cfg(not(coverage))]
fn active_audio_capture() -> &'static Mutex<Option<(u64, gst::Pipeline)>> {
ACTIVE_AUDIO_CAPTURE.get_or_init(|| Mutex::new(None))
}
#[cfg(not(coverage))]
fn next_audio_capture_generation() -> u64 {
NEXT_AUDIO_CAPTURE_GENERATION.fetch_add(1, Ordering::Relaxed)
}
#[cfg(not(coverage))]
fn retire_active_audio_capture(reason: &'static str) {
let Ok(mut active) = active_audio_capture().lock() else {
return;
};
if let Some((generation, pipeline)) = active.take() {
warn!(generation, reason, "🔊 retiring previous downstream audio capture");
let _ = pipeline.set_state(gst::State::Null);
}
}
#[cfg(not(coverage))]
fn remember_active_audio_capture(generation: u64, pipeline: &gst::Pipeline) {
let Ok(mut active) = active_audio_capture().lock() else {
return;
};
if let Some((previous_generation, previous_pipeline)) =
active.replace((generation, pipeline.clone()))
{
if previous_generation != generation {
warn!(
previous_generation,
generation, "🔊 replacing overlapping downstream audio capture"
);
let _ = previous_pipeline.set_state(gst::State::Null);
}
}
}
#[cfg(not(coverage))]
fn clear_active_audio_capture(generation: Option<u64>) {
let Some(generation) = generation else {
return;
};
let Ok(mut active) = active_audio_capture().lock() else {
return;
};
if active
.as_ref()
.is_some_and(|(active_generation, _)| *active_generation == generation)
{
active.take();
}
}
#[cfg(coverage)]
fn clear_active_audio_capture(_generation: Option<u64>) {}
/// Start a GStreamer pipeline and reset it to NULL if startup fails.
#[cfg(not(coverage))]
pub(crate) fn start_pipeline_or_reset(
@ -50,12 +116,47 @@ pub(crate) fn start_pipeline_or_reset(
match pipeline.set_state(gst::State::Playing) {
Ok(_) => Ok(()),
Err(error) => {
let details = pipeline_start_failure_details(pipeline);
let _ = pipeline.set_state(gst::State::Null);
Err(error).context(context)
Err(anyhow!("{error}{details}")).context(context)
}
}
}
#[cfg(not(coverage))]
fn pipeline_start_failure_details(pipeline: &gst::Pipeline) -> String {
let Some(bus) = pipeline.bus() else {
return String::new();
};
let mut details = Vec::new();
let deadline = Instant::now() + Duration::from_millis(200);
while Instant::now() < deadline && details.len() < 3 {
let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(20)) else {
continue;
};
match msg.view() {
Error(error) => details.push(format!(
"error from {:?}: {} ({})",
msg.src().map(gst::prelude::GstObjectExt::path_string),
error.error(),
error.debug().unwrap_or_default()
)),
Warning(warning) => details.push(format!(
"warning from {:?}: {} ({})",
msg.src().map(gst::prelude::GstObjectExt::path_string),
warning.error(),
warning.debug().unwrap_or_default()
)),
_ => {}
}
}
if details.is_empty() {
String::new()
} else {
format!("; {}", details.join("; "))
}
}
/// Start a coverage pipeline with a deterministic forced-failure hook.
#[cfg(coverage)]
pub(crate) fn start_pipeline_or_reset(
@ -156,6 +257,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
Ok(AudioStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
active_generation: None,
})
}
@ -166,6 +268,8 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
// added later (for multichannel) without changing the client.
gst::init().context("gst init")?;
ensure_remote_usb_audio_ready(alsa_dev)?;
let active_generation = next_audio_capture_generation();
retire_active_audio_capture("new downstream audio stream requested");
/*──────────── pipeline description ────────────
*
@ -249,6 +353,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
);
start_pipeline_or_reset(&pipeline, "starting audio pipeline")?;
remember_active_audio_capture(active_generation, &pipeline);
spawn_pipeline_bus_logger(bus, "audio", "🎶 audio pipeline PLAYING");
spawn_audio_source_watchdog(
@ -261,6 +366,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result<AudioStream> {
Ok(AudioStream {
_pipeline: pipeline,
inner: ReceiverStream::new(rx),
active_generation: Some(active_generation),
})
}

View File

@ -163,6 +163,17 @@ pub(super) fn should_freeze_decoded_mjpeg_frame(previous_bytes: u64, decoded_mjp
|| should_freeze_decoded_mjpeg(previous_bytes, decoded_mjpeg.len())
}
/// Decide whether a direct MJPEG camera frame is unsafe to publish.
///
/// Inputs: MJPEG bytes from the client webcam capture path. Output: true only
/// for incomplete JPEG payloads. Why: the aggressive decoded-HEVC visual guard
/// intentionally freezes flat/size-collapsed frames, but applying that same
/// heuristic to direct MJPEG can freeze a legitimate live camera path after one
/// good frame; direct MJPEG should pass through unless the JPEG is incomplete.
pub(super) fn should_reject_direct_mjpeg_frame(mjpeg: &[u8]) -> bool {
!looks_like_complete_jpeg(mjpeg)
}
#[cfg(test)]
mod tests {
#[test]
@ -265,4 +276,23 @@ mod tests {
},
);
}
#[test]
fn direct_mjpeg_guard_only_rejects_incomplete_jpegs() {
fn jpeg_with_payload(payload: &[u8]) -> Vec<u8> {
let mut bytes = vec![0xff, 0xd8, 0xff, 0xda];
bytes.extend_from_slice(payload);
bytes.extend_from_slice(&[0xff, 0xd9]);
bytes
}
let flat = jpeg_with_payload(&vec![0x80; 120_000]);
let varied = jpeg_with_payload(&(0..120_000).map(|idx| (idx % 251) as u8).collect::<Vec<_>>());
let mut truncated = varied.clone();
truncated.pop();
assert!(!super::should_reject_direct_mjpeg_frame(&flat));
assert!(!super::should_reject_direct_mjpeg_frame(&varied));
assert!(super::should_reject_direct_mjpeg_frame(&truncated));
}
}

View File

@ -784,15 +784,11 @@ impl WebcamSink {
#[cfg(not(coverage))]
fn spool_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) {
let previous_bytes = self
.last_mjpeg_passthrough_bytes
.load(std::sync::atomic::Ordering::Relaxed);
if hevc_mjpeg_guard::should_freeze_decoded_mjpeg_frame(previous_bytes, &pkt.data) {
if hevc_mjpeg_guard::should_reject_direct_mjpeg_frame(&pkt.data) {
warn!(
target:"lesavka_server::video",
previous_bytes,
next_bytes = pkt.data.len(),
"📸⚠️ freezing suspicious direct MJPEG frame before UVC spool"
"📸⚠️ dropping incomplete direct MJPEG frame before UVC spool"
);
return;
}

View File

@ -29,6 +29,10 @@ mod guard {
pub fn should_freeze_frame(previous_bytes: u64, decoded_mjpeg: &[u8]) -> bool {
should_freeze_decoded_mjpeg_frame(previous_bytes, decoded_mjpeg)
}
pub fn should_reject_direct_frame(mjpeg: &[u8]) -> bool {
should_reject_direct_mjpeg_frame(mjpeg)
}
}
const WEBCAM_SINK: &str = include_str!(concat!(
@ -111,9 +115,10 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() {
"last_decoded_mjpeg_bytes",
"last_mjpeg_passthrough_bytes",
"should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())",
"should_reject_direct_mjpeg_frame(&pkt.data)",
"spool_direct_mjpeg_frame",
"freezing suspicious decoded HEVC->MJPEG frame",
"freezing suspicious direct MJPEG frame before UVC spool",
"dropping incomplete direct MJPEG frame before UVC spool",
] {
assert!(
WEBCAM_SINK.contains(marker),
@ -133,6 +138,19 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() {
}
}
#[test]
fn direct_mjpeg_passthrough_does_not_use_decoded_hevc_visual_freeze_rules() {
let healthy_payload: Vec<u8> = (0..140_000).map(|idx| (idx % 251) as u8).collect();
let flat = jpeg_with_payload(&vec![0x80; 140_000]);
let healthy = jpeg_with_payload(&healthy_payload);
let mut truncated = healthy.clone();
truncated.pop();
assert!(!guard::should_reject_direct_frame(&flat));
assert!(!guard::should_reject_direct_frame(&healthy));
assert!(guard::should_reject_direct_frame(&truncated));
}
fn jpeg_with_payload(payload: &[u8]) -> Vec<u8> {
let mut bytes = vec![0xff, 0xd8, 0xff, 0xda];
bytes.extend_from_slice(payload);

View File

@ -51,7 +51,7 @@ mod app_support {
use std::time::Duration;
#[allow(dead_code)]
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum CameraCodec {
H264,
Hevc,

View File

@ -19,6 +19,10 @@ mod camera_timing_contract {
include!(env!("LESAVKA_CLIENT_CAMERA_SRC"));
use temp_env::with_var;
const CAMERA_CAPTURE_SRC: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/client/src/input/camera/capture_pipeline.rs"
));
#[test]
fn camera_timing_helpers_cover_first_packet_and_trace_enabled_paths() {
@ -43,7 +47,7 @@ mod camera_timing_contract {
);
});
log_camera_stale_source_drop(
log_camera_lag_clamped_source(
crate::live_capture_clock::RebasedSourcePts {
packet_pts_us: 1,
capture_now_us: 1_000_000,
@ -57,4 +61,18 @@ mod camera_timing_contract {
512,
);
}
#[test]
fn lag_clamped_camera_timestamp_still_sends_the_frame() {
assert!(CAMERA_CAPTURE_SRC.contains("log_camera_lag_clamped_source("));
assert!(CAMERA_CAPTURE_SRC.contains("let pts = timing.packet_pts_us;"));
assert!(
!CAMERA_CAPTURE_SRC.contains(
"log_camera_lag_clamped_source(timing, map.as_slice().len());\n return None;"
) && !CAMERA_CAPTURE_SRC.contains(
"log_camera_lag_clamped_source(\n timing,\n map.as_slice().len(),\n );\n return None;"
),
"lag-clamped webcam packets should use the repaired live timestamp, not freeze UVC by being dropped"
);
}
}

View File

@ -31,6 +31,11 @@ mod microphone_include_contract {
use temp_env::with_var;
use tempfile::tempdir;
const MICROPHONE_CAPTURE_SRC: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/client/src/input/microphone/capture_runtime.rs"
));
fn write_executable(dir: &Path, name: &str, body: &str) {
let path = dir.join(name);
fs::write(&path, body).expect("write script");
@ -138,7 +143,7 @@ exit 0
);
assert!(duration_matches_pcm_payload(1, 0));
assert!(!bool_env_enabled("LESAVKA_TEST_BOOL_ENV_NEVER_SET"));
log_microphone_stale_source_drop(
log_microphone_lag_clamped_source(
crate::live_capture_clock::RebasedSourcePts {
packet_pts_us: 1,
capture_now_us: 10_000,
@ -153,6 +158,21 @@ exit 0
);
}
#[test]
fn lag_clamped_microphone_timestamp_still_sends_audio() {
assert!(
MICROPHONE_CAPTURE_SRC
.contains("log_microphone_lag_clamped_source(timing, map.len());")
);
assert!(MICROPHONE_CAPTURE_SRC.contains("let pts = timing.packet_pts_us;"));
assert!(
!MICROPHONE_CAPTURE_SRC.contains(
"log_microphone_lag_clamped_source(timing, map.len());\n return None;"
),
"lag-clamped mic packets should use the repaired live timestamp instead of starving UAC"
);
}
#[test]
fn pipewire_source_desc_formats_selected_and_default_sources() {
let selected = MicrophoneCapture::pipewire_source_desc(Some("alsa input/Desk Mic"));