diff --git a/Cargo.lock b/Cargo.lock index 8f10e4a..4c53144 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.24" +version = "0.22.25" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.24" +version = "0.22.25" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.24" +version = "0.22.25" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index a6c768b..b409c26 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.24" +version = "0.22.25" edition = "2024" [dependencies] diff --git a/client/src/input/camera/capture_pipeline.rs b/client/src/input/camera/capture_pipeline.rs index 9b1ccdd..1b958dd 100644 --- a/client/src/input/camera/capture_pipeline.rs +++ b/client/src/input/camera/capture_pipeline.rs @@ -502,8 +502,7 @@ impl CameraCapture { crate::live_capture_clock::upstream_source_lag_cap(), ); if timing.lag_clamped { - log_camera_stale_source_drop(timing, map.as_slice().len()); - return None; + log_camera_lag_clamped_source(timing, map.as_slice().len()); } let pts = timing.packet_pts_us; static CAMERA_PACKET_COUNT: std::sync::atomic::AtomicU64 = @@ -610,21 +609,24 @@ fn log_camera_timing_sample( } } -/// Keeps `log_camera_stale_source_drop` explicit because it sits on camera selection, where negotiated profiles must match the server output contract. +/// Keeps `log_camera_lag_clamped_source` explicit because it sits on camera selection, where negotiated profiles must match the server output contract. /// Inputs are the typed parameters; output is the return value or side effect. -fn log_camera_stale_source_drop(timing: crate::live_capture_clock::RebasedSourcePts, bytes: usize) { - static CAMERA_STALE_SOURCE_DROPS: std::sync::atomic::AtomicU64 = +fn log_camera_lag_clamped_source( + timing: crate::live_capture_clock::RebasedSourcePts, + bytes: usize, +) { + static CAMERA_LAG_CLAMPED_PACKETS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); - let drop_index = - CAMERA_STALE_SOURCE_DROPS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - if drop_index < 10 || drop_index.is_multiple_of(300) { + let packet_index = + CAMERA_LAG_CLAMPED_PACKETS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + if packet_index < 10 || packet_index.is_multiple_of(300) { tracing::warn!( - drop_index, + packet_index, bytes, source_pts_us = timing.source_pts_us.unwrap_or_default(), capture_now_us = timing.capture_now_us, packet_pts_us = timing.packet_pts_us, - "πŸ“Έ dropping stale webcam source buffer before bundled uplink" + "πŸ“Έ clamped laggy webcam source timestamp before bundled uplink" ); } } diff --git a/client/src/input/microphone.rs b/client/src/input/microphone.rs index f4d5c42..d774242 100644 --- a/client/src/input/microphone.rs +++ b/client/src/input/microphone.rs @@ -140,28 +140,28 @@ fn pcm_payload_duration_us(bytes: usize) -> u64 { } #[cfg(not(coverage))] -/// Keeps `log_microphone_stale_source_drop` explicit because it sits on microphone capture setup, where host audio stacks expose different source names and latency controls. +/// Keeps `log_microphone_lag_clamped_source` explicit because it sits on microphone capture setup, where host audio stacks expose different source names and latency controls. /// Inputs are the typed parameters; output is the return value or side effect. -fn log_microphone_stale_source_drop( +fn log_microphone_lag_clamped_source( timing: crate::live_capture_clock::RebasedSourcePts, bytes: usize, ) { - static MIC_STALE_SOURCE_DROPS: AtomicU64 = AtomicU64::new(0); - let drop_index = MIC_STALE_SOURCE_DROPS.fetch_add(1, Ordering::Relaxed); - if drop_index < 10 || drop_index.is_multiple_of(300) { + static MIC_LAG_CLAMPED_PACKETS: AtomicU64 = AtomicU64::new(0); + let packet_index = MIC_LAG_CLAMPED_PACKETS.fetch_add(1, Ordering::Relaxed); + if packet_index < 10 || packet_index.is_multiple_of(300) { warn!( - drop_index, + packet_index, bytes, source_pts_us = timing.source_pts_us.unwrap_or_default(), capture_now_us = timing.capture_now_us, packet_pts_us = timing.packet_pts_us, - "🎀 dropping stale microphone source buffer before bundled uplink" + "🎀 clamped laggy microphone source timestamp before bundled uplink" ); } } #[cfg(coverage)] -fn log_microphone_stale_source_drop( +fn log_microphone_lag_clamped_source( _timing: crate::live_capture_clock::RebasedSourcePts, _bytes: usize, ) { diff --git a/client/src/input/microphone/capture_runtime.rs b/client/src/input/microphone/capture_runtime.rs index 14fc80c..4e26943 100644 --- a/client/src/input/microphone/capture_runtime.rs +++ b/client/src/input/microphone/capture_runtime.rs @@ -177,8 +177,7 @@ impl MicrophoneCapture { crate::live_capture_clock::upstream_source_lag_cap(), ); if timing.lag_clamped { - log_microphone_stale_source_drop(timing, map.len()); - return None; + log_microphone_lag_clamped_source(timing, map.len()); } let pts = timing.packet_pts_us; let target_bytes = mic_packet_target_bytes(); diff --git a/client/src/launcher/ui/eye_capture_bindings.rs b/client/src/launcher/ui/eye_capture_bindings.rs index 502c95d..fbb866e 100644 --- a/client/src/launcher/ui/eye_capture_bindings.rs +++ b/client/src/launcher/ui/eye_capture_bindings.rs @@ -411,6 +411,13 @@ let state = state.borrow(); best_effort_recording_profile(&state, preview.as_deref(), monitor_id) }; + if let Err(err) = current_eye_texture(&pane.picture) { + widgets.status_label.set_text(&format!( + "{} recording needs a live frame first: {err}", + pane.title + )); + return; + } let root = { let borrowed = save_state.borrow(); match ensure_eye_capture_dir(borrowed.save_dir_override.as_deref(), "recordings") { @@ -462,6 +469,7 @@ let pane_for_tick = pane.clone(); let widgets_for_tick = widgets.clone(); let save_state_for_tick = Rc::clone(&save_state); + let button_for_tick = button.clone(); let timer = glib::timeout_add_local( Duration::from_millis(recording_interval_ms(record_fps)), move || { @@ -473,6 +481,13 @@ if let Some(frame_writer_tx) = state.frame_writer_tx.take() { let _ = frame_writer_tx.send(RecordFrameTask::Finish); } + state.timer = None; + state.next_frame_index = 0; + state.frame_dir = None; + state.finalize_rx = None; + button_for_tick.remove_css_class("recording-active"); + button_for_tick.set_sensitive(true); + button_for_tick.set_label("Record"); widgets_for_tick.status_label.set_text(&format!( "{} recording frame skipped: {err}", pane_for_tick.title diff --git a/common/Cargo.toml b/common/Cargo.toml index 0e271e3..5400362 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.24" +version = "0.22.25" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index dcff47a..850fa89 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.24" +version = "0.22.25" edition = "2024" autobins = false diff --git a/server/src/audio/ear_capture.rs b/server/src/audio/ear_capture.rs index ed979e8..0aca853 100644 --- a/server/src/audio/ear_capture.rs +++ b/server/src/audio/ear_capture.rs @@ -8,7 +8,7 @@ use gstreamer as gst; use gstreamer_app as gst_app; use std::fs; use std::sync::{ - Arc, Mutex, + Arc, Mutex, OnceLock, atomic::{AtomicBool, AtomicU64, Ordering}, }; use std::time::{Duration, Instant}; @@ -23,6 +23,7 @@ use lesavka_common::lesavka::AudioPacket; pub struct AudioStream { _pipeline: gst::Pipeline, inner: ReceiverStream>, + active_generation: Option, } impl Stream for AudioStream { @@ -38,9 +39,74 @@ impl Stream for AudioStream { impl Drop for AudioStream { fn drop(&mut self) { let _ = self._pipeline.set_state(gst::State::Null); + clear_active_audio_capture(self.active_generation); } } +#[cfg(not(coverage))] +static NEXT_AUDIO_CAPTURE_GENERATION: AtomicU64 = AtomicU64::new(1); + +#[cfg(not(coverage))] +static ACTIVE_AUDIO_CAPTURE: OnceLock>> = OnceLock::new(); + +#[cfg(not(coverage))] +fn active_audio_capture() -> &'static Mutex> { + ACTIVE_AUDIO_CAPTURE.get_or_init(|| Mutex::new(None)) +} + +#[cfg(not(coverage))] +fn next_audio_capture_generation() -> u64 { + NEXT_AUDIO_CAPTURE_GENERATION.fetch_add(1, Ordering::Relaxed) +} + +#[cfg(not(coverage))] +fn retire_active_audio_capture(reason: &'static str) { + let Ok(mut active) = active_audio_capture().lock() else { + return; + }; + if let Some((generation, pipeline)) = active.take() { + warn!(generation, reason, "πŸ”Š retiring previous downstream audio capture"); + let _ = pipeline.set_state(gst::State::Null); + } +} + +#[cfg(not(coverage))] +fn remember_active_audio_capture(generation: u64, pipeline: &gst::Pipeline) { + let Ok(mut active) = active_audio_capture().lock() else { + return; + }; + if let Some((previous_generation, previous_pipeline)) = + active.replace((generation, pipeline.clone())) + { + if previous_generation != generation { + warn!( + previous_generation, + generation, "πŸ”Š replacing overlapping downstream audio capture" + ); + let _ = previous_pipeline.set_state(gst::State::Null); + } + } +} + +#[cfg(not(coverage))] +fn clear_active_audio_capture(generation: Option) { + let Some(generation) = generation else { + return; + }; + let Ok(mut active) = active_audio_capture().lock() else { + return; + }; + if active + .as_ref() + .is_some_and(|(active_generation, _)| *active_generation == generation) + { + active.take(); + } +} + +#[cfg(coverage)] +fn clear_active_audio_capture(_generation: Option) {} + /// Start a GStreamer pipeline and reset it to NULL if startup fails. #[cfg(not(coverage))] pub(crate) fn start_pipeline_or_reset( @@ -50,12 +116,47 @@ pub(crate) fn start_pipeline_or_reset( match pipeline.set_state(gst::State::Playing) { Ok(_) => Ok(()), Err(error) => { + let details = pipeline_start_failure_details(pipeline); let _ = pipeline.set_state(gst::State::Null); - Err(error).context(context) + Err(anyhow!("{error}{details}")).context(context) } } } +#[cfg(not(coverage))] +fn pipeline_start_failure_details(pipeline: &gst::Pipeline) -> String { + let Some(bus) = pipeline.bus() else { + return String::new(); + }; + let mut details = Vec::new(); + let deadline = Instant::now() + Duration::from_millis(200); + while Instant::now() < deadline && details.len() < 3 { + let Some(msg) = bus.timed_pop(gst::ClockTime::from_mseconds(20)) else { + continue; + }; + match msg.view() { + Error(error) => details.push(format!( + "error from {:?}: {} ({})", + msg.src().map(gst::prelude::GstObjectExt::path_string), + error.error(), + error.debug().unwrap_or_default() + )), + Warning(warning) => details.push(format!( + "warning from {:?}: {} ({})", + msg.src().map(gst::prelude::GstObjectExt::path_string), + warning.error(), + warning.debug().unwrap_or_default() + )), + _ => {} + } + } + if details.is_empty() { + String::new() + } else { + format!("; {}", details.join("; ")) + } +} + /// Start a coverage pipeline with a deterministic forced-failure hook. #[cfg(coverage)] pub(crate) fn start_pipeline_or_reset( @@ -156,6 +257,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { Ok(AudioStream { _pipeline: pipeline, inner: ReceiverStream::new(rx), + active_generation: None, }) } @@ -166,6 +268,8 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { // added later (for multi‑channel) without changing the client. gst::init().context("gst init")?; ensure_remote_usb_audio_ready(alsa_dev)?; + let active_generation = next_audio_capture_generation(); + retire_active_audio_capture("new downstream audio stream requested"); /*──────────── pipeline description ──────────── * @@ -249,6 +353,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { ); start_pipeline_or_reset(&pipeline, "starting audio pipeline")?; + remember_active_audio_capture(active_generation, &pipeline); spawn_pipeline_bus_logger(bus, "audio", "🎢 audio pipeline PLAYING"); spawn_audio_source_watchdog( @@ -261,6 +366,7 @@ pub async fn ear(alsa_dev: &str, id: u32) -> anyhow::Result { Ok(AudioStream { _pipeline: pipeline, inner: ReceiverStream::new(rx), + active_generation: Some(active_generation), }) } diff --git a/server/src/video_sinks/hevc_mjpeg_guard.rs b/server/src/video_sinks/hevc_mjpeg_guard.rs index 7757555..71efdbc 100644 --- a/server/src/video_sinks/hevc_mjpeg_guard.rs +++ b/server/src/video_sinks/hevc_mjpeg_guard.rs @@ -163,6 +163,17 @@ pub(super) fn should_freeze_decoded_mjpeg_frame(previous_bytes: u64, decoded_mjp || should_freeze_decoded_mjpeg(previous_bytes, decoded_mjpeg.len()) } +/// Decide whether a direct MJPEG camera frame is unsafe to publish. +/// +/// Inputs: MJPEG bytes from the client webcam capture path. Output: true only +/// for incomplete JPEG payloads. Why: the aggressive decoded-HEVC visual guard +/// intentionally freezes flat/size-collapsed frames, but applying that same +/// heuristic to direct MJPEG can freeze a legitimate live camera path after one +/// good frame; direct MJPEG should pass through unless the JPEG is incomplete. +pub(super) fn should_reject_direct_mjpeg_frame(mjpeg: &[u8]) -> bool { + !looks_like_complete_jpeg(mjpeg) +} + #[cfg(test)] mod tests { #[test] @@ -265,4 +276,23 @@ mod tests { }, ); } + + #[test] + fn direct_mjpeg_guard_only_rejects_incomplete_jpegs() { + fn jpeg_with_payload(payload: &[u8]) -> Vec { + let mut bytes = vec![0xff, 0xd8, 0xff, 0xda]; + bytes.extend_from_slice(payload); + bytes.extend_from_slice(&[0xff, 0xd9]); + bytes + } + + let flat = jpeg_with_payload(&vec![0x80; 120_000]); + let varied = jpeg_with_payload(&(0..120_000).map(|idx| (idx % 251) as u8).collect::>()); + let mut truncated = varied.clone(); + truncated.pop(); + + assert!(!super::should_reject_direct_mjpeg_frame(&flat)); + assert!(!super::should_reject_direct_mjpeg_frame(&varied)); + assert!(super::should_reject_direct_mjpeg_frame(&truncated)); + } } diff --git a/server/src/video_sinks/webcam_sink.rs b/server/src/video_sinks/webcam_sink.rs index 45aaa88..fb33bd4 100644 --- a/server/src/video_sinks/webcam_sink.rs +++ b/server/src/video_sinks/webcam_sink.rs @@ -784,15 +784,11 @@ impl WebcamSink { #[cfg(not(coverage))] fn spool_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { - let previous_bytes = self - .last_mjpeg_passthrough_bytes - .load(std::sync::atomic::Ordering::Relaxed); - if hevc_mjpeg_guard::should_freeze_decoded_mjpeg_frame(previous_bytes, &pkt.data) { + if hevc_mjpeg_guard::should_reject_direct_mjpeg_frame(&pkt.data) { warn!( target:"lesavka_server::video", - previous_bytes, next_bytes = pkt.data.len(), - "πŸ“Έβš οΈ freezing suspicious direct MJPEG frame before UVC spool" + "πŸ“Έβš οΈ dropping incomplete direct MJPEG frame before UVC spool" ); return; } diff --git a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs index 0d23883..4b96be4 100644 --- a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs +++ b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs @@ -29,6 +29,10 @@ mod guard { pub fn should_freeze_frame(previous_bytes: u64, decoded_mjpeg: &[u8]) -> bool { should_freeze_decoded_mjpeg_frame(previous_bytes, decoded_mjpeg) } + + pub fn should_reject_direct_frame(mjpeg: &[u8]) -> bool { + should_reject_direct_mjpeg_frame(mjpeg) + } } const WEBCAM_SINK: &str = include_str!(concat!( @@ -111,9 +115,10 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() { "last_decoded_mjpeg_bytes", "last_mjpeg_passthrough_bytes", "should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())", + "should_reject_direct_mjpeg_frame(&pkt.data)", "spool_direct_mjpeg_frame", "freezing suspicious decoded HEVC->MJPEG frame", - "freezing suspicious direct MJPEG frame before UVC spool", + "dropping incomplete direct MJPEG frame before UVC spool", ] { assert!( WEBCAM_SINK.contains(marker), @@ -133,6 +138,19 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() { } } +#[test] +fn direct_mjpeg_passthrough_does_not_use_decoded_hevc_visual_freeze_rules() { + let healthy_payload: Vec = (0..140_000).map(|idx| (idx % 251) as u8).collect(); + let flat = jpeg_with_payload(&vec![0x80; 140_000]); + let healthy = jpeg_with_payload(&healthy_payload); + let mut truncated = healthy.clone(); + truncated.pop(); + + assert!(!guard::should_reject_direct_frame(&flat)); + assert!(!guard::should_reject_direct_frame(&healthy)); + assert!(guard::should_reject_direct_frame(&truncated)); +} + fn jpeg_with_payload(payload: &[u8]) -> Vec { let mut bytes = vec![0xff, 0xd8, 0xff, 0xda]; bytes.extend_from_slice(payload); diff --git a/tests/contract/client/app/client_app_include_contract.rs b/tests/contract/client/app/client_app_include_contract.rs index 5997813..761da51 100644 --- a/tests/contract/client/app/client_app_include_contract.rs +++ b/tests/contract/client/app/client_app_include_contract.rs @@ -51,7 +51,7 @@ mod app_support { use std::time::Duration; #[allow(dead_code)] - #[derive(Clone, Copy, Debug)] + #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum CameraCodec { H264, Hevc, diff --git a/tests/contract/client/input/camera/client_camera_timing_contract.rs b/tests/contract/client/input/camera/client_camera_timing_contract.rs index 5216b1c..0eea3b8 100644 --- a/tests/contract/client/input/camera/client_camera_timing_contract.rs +++ b/tests/contract/client/input/camera/client_camera_timing_contract.rs @@ -19,6 +19,10 @@ mod camera_timing_contract { include!(env!("LESAVKA_CLIENT_CAMERA_SRC")); use temp_env::with_var; + const CAMERA_CAPTURE_SRC: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/client/src/input/camera/capture_pipeline.rs" + )); #[test] fn camera_timing_helpers_cover_first_packet_and_trace_enabled_paths() { @@ -43,7 +47,7 @@ mod camera_timing_contract { ); }); - log_camera_stale_source_drop( + log_camera_lag_clamped_source( crate::live_capture_clock::RebasedSourcePts { packet_pts_us: 1, capture_now_us: 1_000_000, @@ -57,4 +61,18 @@ mod camera_timing_contract { 512, ); } + + #[test] + fn lag_clamped_camera_timestamp_still_sends_the_frame() { + assert!(CAMERA_CAPTURE_SRC.contains("log_camera_lag_clamped_source(")); + assert!(CAMERA_CAPTURE_SRC.contains("let pts = timing.packet_pts_us;")); + assert!( + !CAMERA_CAPTURE_SRC.contains( + "log_camera_lag_clamped_source(timing, map.as_slice().len());\n return None;" + ) && !CAMERA_CAPTURE_SRC.contains( + "log_camera_lag_clamped_source(\n timing,\n map.as_slice().len(),\n );\n return None;" + ), + "lag-clamped webcam packets should use the repaired live timestamp, not freeze UVC by being dropped" + ); + } } diff --git a/tests/contract/client/input/microphone/client_microphone_include_contract.rs b/tests/contract/client/input/microphone/client_microphone_include_contract.rs index 95ce958..411b7cf 100644 --- a/tests/contract/client/input/microphone/client_microphone_include_contract.rs +++ b/tests/contract/client/input/microphone/client_microphone_include_contract.rs @@ -31,6 +31,11 @@ mod microphone_include_contract { use temp_env::with_var; use tempfile::tempdir; + const MICROPHONE_CAPTURE_SRC: &str = include_str!(concat!( + env!("CARGO_MANIFEST_DIR"), + "/client/src/input/microphone/capture_runtime.rs" + )); + fn write_executable(dir: &Path, name: &str, body: &str) { let path = dir.join(name); fs::write(&path, body).expect("write script"); @@ -138,7 +143,7 @@ exit 0 ); assert!(duration_matches_pcm_payload(1, 0)); assert!(!bool_env_enabled("LESAVKA_TEST_BOOL_ENV_NEVER_SET")); - log_microphone_stale_source_drop( + log_microphone_lag_clamped_source( crate::live_capture_clock::RebasedSourcePts { packet_pts_us: 1, capture_now_us: 10_000, @@ -153,6 +158,21 @@ exit 0 ); } + #[test] + fn lag_clamped_microphone_timestamp_still_sends_audio() { + assert!( + MICROPHONE_CAPTURE_SRC + .contains("log_microphone_lag_clamped_source(timing, map.len());") + ); + assert!(MICROPHONE_CAPTURE_SRC.contains("let pts = timing.packet_pts_us;")); + assert!( + !MICROPHONE_CAPTURE_SRC.contains( + "log_microphone_lag_clamped_source(timing, map.len());\n return None;" + ), + "lag-clamped mic packets should use the repaired live timestamp instead of starving UAC" + ); + } + #[test] fn pipewire_source_desc_formats_selected_and_default_sources() { let selected = MicrophoneCapture::pipewire_source_desc(Some("alsa input/Desk Mic"));