From dec332ea400983687d320c92ac526d751fdbb5b8 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Thu, 14 May 2026 01:42:14 -0300 Subject: [PATCH] media: stabilize mjpeg upstream telemetry --- Cargo.lock | 6 +- client/Cargo.toml | 2 +- client/src/launcher/ui_components.rs | 2 + .../launcher/ui_components/assemble_view.rs | 2 + .../launcher/ui_components/build_contexts.rs | 2 + .../src/launcher/ui_components/build_shell.rs | 5 + client/src/launcher/ui_components/types.rs | 2 + .../src/launcher/ui_runtime/status_details.rs | 50 ++++++++ .../src/launcher/ui_runtime/status_refresh.rs | 8 ++ common/Cargo.toml | 2 +- server/Cargo.toml | 2 +- server/src/main/relay_service.rs | 23 +++- .../main/relay_service/upstream_media_rpc.rs | 19 +++- server/src/main/relay_service_tests.rs | 1 + server/src/upstream_media_runtime.rs | 10 +- .../stream_lifecycle_methods.rs | 21 ++++ .../src/upstream_media_runtime/tests/mod.rs | 4 + server/src/video_sinks/hevc_mjpeg_guard.rs | 107 ++++++++++++++++-- server/src/video_sinks/webcam_sink.rs | 8 +- .../main/server_main_state_rpc_contract.rs | 2 +- ...rver_upstream_media_v2_handoff_contract.rs | 7 +- .../hevc_mjpeg_guard_chaos_contract.rs | 31 +++-- .../client_launcher_layout_contract.rs | 3 + 23 files changed, 280 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d4b686..fa2cbf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1652,7 +1652,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.22.28" +version = "0.22.29" dependencies = [ "anyhow", "async-stream", @@ -1686,7 +1686,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.22.28" +version = "0.22.29" dependencies = [ "anyhow", "base64", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.22.28" +version = "0.22.29" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index 78e702f..c5a6f7c 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.22.28" +version = "0.22.29" edition = "2024" [dependencies] diff --git a/client/src/launcher/ui_components.rs b/client/src/launcher/ui_components.rs index f6e0a2c..e100616 100644 --- a/client/src/launcher/ui_components.rs +++ b/client/src/launcher/ui_components.rs @@ -54,6 +54,8 @@ pub fn build_launcher_view( uac_value, uvc_light, uvc_value, + upstream_lag_light, + upstream_lag_value, shortcut_value, } = include!("ui_components/build_shell.rs"); diff --git a/client/src/launcher/ui_components/assemble_view.rs b/client/src/launcher/ui_components/assemble_view.rs index a340008..4b3bac3 100644 --- a/client/src/launcher/ui_components/assemble_view.rs +++ b/client/src/launcher/ui_components/assemble_view.rs @@ -120,6 +120,8 @@ uac_value, uvc_light, uvc_value, + upstream_lag_light, + upstream_lag_value, shortcut_value, }, power_detail, diff --git a/client/src/launcher/ui_components/build_contexts.rs b/client/src/launcher/ui_components/build_contexts.rs index 0f038ff..f799a21 100644 --- a/client/src/launcher/ui_components/build_contexts.rs +++ b/client/src/launcher/ui_components/build_contexts.rs @@ -21,6 +21,8 @@ struct LauncherShellContext { uac_value: gtk::Label, uvc_light: gtk::Box, uvc_value: gtk::Label, + upstream_lag_light: gtk::Box, + upstream_lag_value: gtk::Label, shortcut_value: gtk::Label, } diff --git a/client/src/launcher/ui_components/build_shell.rs b/client/src/launcher/ui_components/build_shell.rs index f0ecba4..5b82e9d 100644 --- a/client/src/launcher/ui_components/build_shell.rs +++ b/client/src/launcher/ui_components/build_shell.rs @@ -57,6 +57,8 @@ let (usb_chip, usb_light, usb_value) = build_status_chip_with_light("HID", "Unknown"); let (uac_chip, uac_light, uac_value) = build_status_chip_with_light("UAC", "Unknown"); let (uvc_chip, uvc_light, uvc_value) = build_status_chip_with_light("UVC", "Unknown"); + let (upstream_lag_chip, upstream_lag_light, upstream_lag_value) = + build_status_chip_with_light("Lag", "???"); let (shortcut_chip, shortcut_value) = build_status_chip("Key", "Pause"); chips.append(&relay_chip); chips.append(&routing_chip); @@ -66,6 +68,7 @@ chips.append(&usb_chip); chips.append(&uac_chip); chips.append(&uvc_chip); + chips.append(&upstream_lag_chip); chips.append(&shortcut_chip); let chips_shell = gtk::ScrolledWindow::builder() .hexpand(true) @@ -138,6 +141,8 @@ uac_value, uvc_light, uvc_value, + upstream_lag_light, + upstream_lag_value, shortcut_value, } } diff --git a/client/src/launcher/ui_components/types.rs b/client/src/launcher/ui_components/types.rs index 81c9123..298d892 100644 --- a/client/src/launcher/ui_components/types.rs +++ b/client/src/launcher/ui_components/types.rs @@ -16,6 +16,8 @@ pub struct SummaryWidgets { pub uac_value: gtk::Label, pub uvc_light: gtk::Box, pub uvc_value: gtk::Label, + pub upstream_lag_light: gtk::Box, + pub upstream_lag_value: gtk::Label, pub shortcut_value: gtk::Label, } diff --git a/client/src/launcher/ui_runtime/status_details.rs b/client/src/launcher/ui_runtime/status_details.rs index 0baca59..1ee8f59 100644 --- a/client/src/launcher/ui_runtime/status_details.rs +++ b/client/src/launcher/ui_runtime/status_details.rs @@ -279,6 +279,56 @@ fn media_stream_health( (StatusLightState::Live, healthy_label.to_string()) } +/// Summarize the server-side client-capture-to-UVC handoff lag. +fn upstream_lag_health( + status: &crate::launcher::state::UpstreamSyncStatus, + relay_live: bool, +) -> (StatusLightState, String, String) { + if !relay_live { + return ( + StatusLightState::Idle, + "Off".to_string(), + "Relay is not connected.".to_string(), + ); + } + if !status.available { + return ( + StatusLightState::Caution, + "???".to_string(), + format!("Upstream lag unavailable: {}", status.detail), + ); + } + let Some(lag_ms) = status.live_lag_ms else { + return ( + StatusLightState::Caution, + "???".to_string(), + "No upstream UVC handoff timing sample is available yet.".to_string(), + ); + }; + let label = compact_lag_label(lag_ms); + let state = if lag_ms <= 500.0 { + StatusLightState::Connected + } else if lag_ms <= 1_000.0 { + StatusLightState::Live + } else if lag_ms <= 1_500.0 { + StatusLightState::Caution + } else { + StatusLightState::Warning + }; + let tooltip = format!( + "Lesavka upstream lower-bound lag to UVC handoff: {lag_ms:.0} ms. This excludes Google Meet/browser buffering after the RCT receives the virtual webcam." + ); + (state, label, tooltip) +} + +fn compact_lag_label(lag_ms: f32) -> String { + if lag_ms >= 950.0 { + format!("{:.1}s", lag_ms / 1_000.0) + } else { + format!("{:.0}ms", lag_ms.max(0.0)) + } +} + fn gpio_light_state(power: &CapturePowerStatus) -> StatusLightState { StatusLightState::from_active(power.available && power.enabled) } diff --git a/client/src/launcher/ui_runtime/status_refresh.rs b/client/src/launcher/ui_runtime/status_refresh.rs index 38cd301..9c77079 100644 --- a/client/src/launcher/ui_runtime/status_refresh.rs +++ b/client/src/launcher/ui_runtime/status_refresh.rs @@ -140,6 +140,14 @@ pub fn refresh_launcher_ui(widgets: &LauncherWidgets, state: &LauncherState, chi "Upstream webcam transport: {}. Server calibration is profile-specific.", state.effective_webcam_transport().label() ))); + let (lag_state, lag_value, lag_tooltip) = + upstream_lag_health(&state.upstream_sync, relay_live); + set_status_light(&widgets.summary.upstream_lag_light, lag_state); + widgets.summary.upstream_lag_value.set_text(&lag_value); + widgets + .summary + .upstream_lag_value + .set_tooltip_text(Some(&lag_tooltip)); let power_detail = if state.server_available { capture_power_detail(&state.capture_power) diff --git a/common/Cargo.toml b/common/Cargo.toml index 7e73ac1..c683d46 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.22.28" +version = "0.22.29" edition = "2024" build = "build.rs" diff --git a/server/Cargo.toml b/server/Cargo.toml index 5290e20..993f36e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -10,7 +10,7 @@ bench = false [package] name = "lesavka_server" -version = "0.22.28" +version = "0.22.29" edition = "2024" autobins = false diff --git a/server/src/main/relay_service.rs b/server/src/main/relay_service.rs index 12c0b75..449f8cd 100644 --- a/server/src/main/relay_service.rs +++ b/server/src/main/relay_service.rs @@ -1,6 +1,7 @@ const MEDIA_V2_DEFAULT_PLAYOUT_DELAY_MS: u64 = 20; const MEDIA_V2_DEFAULT_MAX_LIVE_AGE_MS: u64 = 1_000; const MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS: u64 = 750; +const MEDIA_V2_DEFAULT_STREAM_IDLE_TIMEOUT_MS: u64 = 5_000; const MEDIA_V2_MAX_MIXED_CAPTURE_SPAN_US: u64 = 250_000; #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -32,6 +33,7 @@ struct MediaV2ScheduledAudio { struct MediaV2ScheduledVideo { packet: VideoPacket, due_at: tokio::time::Instant, + received_at: tokio::time::Instant, } /// Keeps `summarize_media_v2_bundle` explicit because it sits on relay RPC orchestration, where hardware failures must surface without stopping the server. @@ -116,6 +118,14 @@ fn media_v2_uac_start_timeout() -> Duration { .unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_UAC_START_TIMEOUT_MS)) } +fn media_v2_stream_idle_timeout() -> Duration { + std::env::var("LESAVKA_UPSTREAM_V2_STREAM_IDLE_TIMEOUT_MS") + .ok() + .and_then(|value| value.trim().parse::().ok()) + .map(Duration::from_millis) + .unwrap_or_else(|| Duration::from_millis(MEDIA_V2_DEFAULT_STREAM_IDLE_TIMEOUT_MS)) +} + /// Keeps `media_v2_handoff_schedule` explicit because it sits on relay RPC orchestration, where hardware failures must surface without stopping the server. /// Inputs are the typed parameters; output is the return value or side effect. fn media_v2_handoff_schedule( @@ -269,6 +279,7 @@ fn prepare_media_v2_video( upstream_media_rt: &UpstreamMediaRuntime, bundle_base_remote_pts_us: u64, bundle_epoch: tokio::time::Instant, + received_at: tokio::time::Instant, frame_step_us: u64, ) -> Option { let mut video = video?; @@ -285,6 +296,7 @@ fn prepare_media_v2_video( Some(MediaV2ScheduledVideo { packet: video, due_at: plan.due_at, + received_at, }) } _ => None, @@ -333,6 +345,11 @@ async fn run_media_v2_video_handoff( while let Some(item) = rx.recv().await { sleep_until_media_v2(item.due_at).await; let presented_pts = item.packet.pts; + let live_lag_ms = f64::from(item.packet.client_queue_age_ms) + + tokio::time::Instant::now() + .saturating_duration_since(item.received_at) + .as_secs_f64() + * 1_000.0; relay.feed(item.packet); if !video_presented_once { info!( @@ -344,7 +361,11 @@ async fn run_media_v2_video_handoff( ); video_presented_once = true; } - upstream_media_rt.mark_video_presented(presented_pts, item.due_at); + upstream_media_rt.mark_video_presented_with_live_lag( + presented_pts, + item.due_at, + live_lag_ms, + ); } } diff --git a/server/src/main/relay_service/upstream_media_rpc.rs b/server/src/main/relay_service/upstream_media_rpc.rs index b39b3c7..0d49537 100644 --- a/server/src/main/relay_service/upstream_media_rpc.rs +++ b/server/src/main/relay_service/upstream_media_rpc.rs @@ -83,6 +83,7 @@ impl Handler { let mut last_bundle_seq = None; let mut waiting_for_hevc_keyframe = false; let mut outcome = "aborted"; + let idle_timeout = media_v2_stream_idle_timeout(); let (mut audio_handoff_tx, audio_worker) = if let Some((microphone_sink_permit, sink)) = microphone_sink { let (audio_handoff_tx, audio_handoff_rx) = @@ -107,7 +108,22 @@ impl Handler { camera_session_id, )); - while let Some(bundle_result) = inbound.next().await { + loop { + let Some(bundle_result) = (match tokio::time::timeout(idle_timeout, inbound.next()).await { + Ok(next) => next, + Err(_) => { + outcome = "idle-timeout"; + warn!( + rpc_id, + session_id = camera_lease.session_id, + idle_timeout_ms = idle_timeout.as_millis(), + "📦 stream_webcam_media v2 idle timeout; closing stale upstream leases" + ); + break; + } + }) else { + break; + }; let mut bundle = match bundle_result { Ok(bundle) => bundle, Err(err) => { @@ -264,6 +280,7 @@ impl Handler { &upstream_media_rt, bundle_base_remote_pts_us, bundle_epoch, + bundle_arrived_at, frame_step_us, ) } else { diff --git a/server/src/main/relay_service_tests.rs b/server/src/main/relay_service_tests.rs index 4caf0fe..eacc3e9 100644 --- a/server/src/main/relay_service_tests.rs +++ b/server/src/main/relay_service_tests.rs @@ -260,6 +260,7 @@ mod tests { &runtime, base, epoch, + tokio::time::Instant::now(), media_v2_frame_step_us(30), ) .expect("video plan"); diff --git a/server/src/upstream_media_runtime.rs b/server/src/upstream_media_runtime.rs index 69c5010..72e8aa5 100644 --- a/server/src/upstream_media_runtime.rs +++ b/server/src/upstream_media_runtime.rs @@ -176,6 +176,7 @@ struct RuntimeState { latest_microphone_timing: Option, latest_camera_presentation: Option, latest_microphone_presentation: Option, + latest_camera_live_lag_ms: Option, latest_paired_client_capture_skew_ms: Option, latest_paired_client_send_skew_ms: Option, latest_paired_server_receive_skew_ms: Option, @@ -232,6 +233,7 @@ fn reset_session_state(state: &mut RuntimeState) { state.latest_microphone_timing = None; state.latest_camera_presentation = None; state.latest_microphone_presentation = None; + state.latest_camera_live_lag_ms = None; state.latest_paired_client_capture_skew_ms = None; state.latest_paired_client_send_skew_ms = None; state.latest_paired_server_receive_skew_ms = None; @@ -289,13 +291,7 @@ fn record_presentation(state: &mut RuntimeState, kind: UpstreamMediaKind, due_at } fn live_lag_ms(state: &RuntimeState) -> Option { - let latest = state - .latest_camera_remote_pts_us - .into_iter() - .chain(state.latest_microphone_remote_pts_us) - .max()?; - let base = state.base_remote_pts_us.unwrap_or(latest); - Some(latest.saturating_sub(base) as f64 / 1000.0) + state.latest_camera_live_lag_ms } /// Keeps `planner_skew_ms` explicit because it sits on server upstream media scheduling, where timing choices directly affect lip sync. diff --git a/server/src/upstream_media_runtime/stream_lifecycle_methods.rs b/server/src/upstream_media_runtime/stream_lifecycle_methods.rs index aae418f..cdfe186 100644 --- a/server/src/upstream_media_runtime/stream_lifecycle_methods.rs +++ b/server/src/upstream_media_runtime/stream_lifecycle_methods.rs @@ -128,6 +128,27 @@ pub fn new() -> Self { .lock() .expect("upstream media state mutex poisoned"); state.last_video_presented_pts_us = Some(local_pts_us); + let now = Instant::now(); + state.latest_camera_live_lag_ms = state + .latest_camera_timing + .map(|sample| f64::from(sample.queue_age_ms) + age_ms(now, sample.received_at)); + record_presentation(&mut state, UpstreamMediaKind::Camera, due_at); + state.phase = UpstreamSyncPhase::Live; + state.last_reason = "v2 video handed to UVC".to_string(); + } + + pub fn mark_video_presented_with_live_lag( + &self, + local_pts_us: u64, + due_at: Instant, + live_lag_ms: f64, + ) { + let mut state = self + .state + .lock() + .expect("upstream media state mutex poisoned"); + state.last_video_presented_pts_us = Some(local_pts_us); + state.latest_camera_live_lag_ms = Some(live_lag_ms.max(0.0)); record_presentation(&mut state, UpstreamMediaKind::Camera, due_at); state.phase = UpstreamSyncPhase::Live; state.last_reason = "v2 video handed to UVC".to_string(); diff --git a/server/src/upstream_media_runtime/tests/mod.rs b/server/src/upstream_media_runtime/tests/mod.rs index 1ca8be7..2e15694 100644 --- a/server/src/upstream_media_runtime/tests/mod.rs +++ b/server/src/upstream_media_runtime/tests/mod.rs @@ -184,6 +184,10 @@ fn runtime_records_client_and_sink_timing_for_upstream_snapshots() { assert_eq!(snapshot.client_capture_skew_ms, Some(6.0)); assert_eq!(snapshot.client_send_skew_ms, Some(10.0)); assert_eq!(snapshot.camera_client_queue_age_ms, Some(20.0)); + assert!( + snapshot.live_lag_ms.is_some_and(|lag| lag >= 20.0), + "upstream live lag should include the client queue age lower bound" + ); assert_eq!(snapshot.microphone_client_queue_age_ms, Some(35.0)); assert_eq!(snapshot.last_video_presented_pts_us, Some(10_000)); assert_eq!(snapshot.last_audio_presented_pts_us, Some(11_500)); diff --git a/server/src/video_sinks/hevc_mjpeg_guard.rs b/server/src/video_sinks/hevc_mjpeg_guard.rs index 71efdbc..7f68271 100644 --- a/server/src/video_sinks/hevc_mjpeg_guard.rs +++ b/server/src/video_sinks/hevc_mjpeg_guard.rs @@ -5,6 +5,8 @@ const DEFAULT_HEVC_SIZE_DROP_PCT: u32 = 45; const DEFAULT_HEVC_MIN_REFERENCE_BYTES: u32 = 64 * 1024; const DEFAULT_HEVC_MIN_PAYLOAD_DISTINCT_BYTES: u32 = 12; const DEFAULT_HEVC_DOMINANT_BYTE_PCT: u32 = 92; +const DEFAULT_DIRECT_MJPEG_SIZE_DROP_PCT: u32 = 18; +const DEFAULT_DIRECT_MJPEG_MIN_REFERENCE_BYTES: u32 = 48 * 1024; /// Resolve the JPEG quality used after HEVC decode. /// @@ -85,6 +87,52 @@ pub(super) fn dominant_byte_pct() -> u32 { .clamp(50, 99) } +/// Decide whether direct MJPEG visual filtering is enabled. +/// +/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD`. Output: true +/// unless explicitly disabled. Why: the direct MJPEG path can still receive +/// complete but visually useless black/collapsed frames, and repeating the last +/// good conference frame is safer than exposing those frames to Google Meet. +pub(super) fn direct_mjpeg_visual_guard_enabled() -> bool { + std::env::var("LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD") + .ok() + .map(|value| { + let trimmed = value.trim(); + !(trimmed.eq_ignore_ascii_case("0") + || trimmed.eq_ignore_ascii_case("false") + || trimmed.eq_ignore_ascii_case("no") + || trimmed.eq_ignore_ascii_case("off")) + }) + .unwrap_or(true) +} + +/// Resolve the direct-MJPEG size-collapse threshold. +/// +/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT`, clamped to +/// 1..=60. Output: next-frame size percentage of the last good direct MJPEG. +/// Why: direct camera MJPEG naturally varies more than decoded HEVC output, so +/// this guard is deliberately conservative and only catches dramatic collapses. +pub(super) fn direct_mjpeg_size_drop_pct() -> u32 { + env_u32( + "LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT", + DEFAULT_DIRECT_MJPEG_SIZE_DROP_PCT, + ) + .clamp(1, 60) +} + +/// Resolve the direct-MJPEG baseline required before visual freezing. +/// +/// Inputs: optional `LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES`. Output: +/// byte count. Why: tiny startup frames should not become the last-good +/// baseline that causes healthy frames to be classified as suspicious. +pub(super) fn direct_mjpeg_min_reference_bytes() -> u32 { + env_u32( + "LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES", + DEFAULT_DIRECT_MJPEG_MIN_REFERENCE_BYTES, + ) + .max(1) +} + /// Return whether a decoded buffer looks like one complete JPEG image. /// /// Inputs: decoded MJPEG bytes. Output: true when SOI, SOS, and EOI markers @@ -165,13 +213,24 @@ pub(super) fn should_freeze_decoded_mjpeg_frame(previous_bytes: u64, decoded_mjp /// Decide whether a direct MJPEG camera frame is unsafe to publish. /// -/// Inputs: MJPEG bytes from the client webcam capture path. Output: true only -/// for incomplete JPEG payloads. Why: the aggressive decoded-HEVC visual guard -/// intentionally freezes flat/size-collapsed frames, but applying that same -/// heuristic to direct MJPEG can freeze a legitimate live camera path after one -/// good frame; direct MJPEG should pass through unless the JPEG is incomplete. -pub(super) fn should_reject_direct_mjpeg_frame(mjpeg: &[u8]) -> bool { - !looks_like_complete_jpeg(mjpeg) +/// Inputs: the byte length of the last successfully spooled direct MJPEG and +/// the next MJPEG bytes. Output: true when the next frame is incomplete, +/// implausibly flat, or a dramatic size collapse. Why: direct MJPEG should be +/// less aggressive than decoded HEVC filtering, but complete black/collapsed +/// frames are still worse than a short last-good-frame freeze. +pub(super) fn should_reject_direct_mjpeg_frame(previous_bytes: u64, mjpeg: &[u8]) -> bool { + if !looks_like_complete_jpeg(mjpeg) { + return true; + } + if !direct_mjpeg_visual_guard_enabled() + || previous_bytes < u64::from(direct_mjpeg_min_reference_bytes()) + { + return false; + } + + let threshold_bytes = previous_bytes.saturating_mul(u64::from(direct_mjpeg_size_drop_pct())) + / 100; + suspiciously_flat_payload(mjpeg) || (mjpeg.len() as u64) < threshold_bytes } #[cfg(test)] @@ -278,7 +337,7 @@ mod tests { } #[test] - fn direct_mjpeg_guard_only_rejects_incomplete_jpegs() { + fn direct_mjpeg_guard_rejects_incomplete_and_obvious_bad_frames_after_baseline() { fn jpeg_with_payload(payload: &[u8]) -> Vec { let mut bytes = vec![0xff, 0xd8, 0xff, 0xda]; bytes.extend_from_slice(payload); @@ -288,11 +347,37 @@ mod tests { let flat = jpeg_with_payload(&vec![0x80; 120_000]); let varied = jpeg_with_payload(&(0..120_000).map(|idx| (idx % 251) as u8).collect::>()); + let tiny = jpeg_with_payload(&vec![0x42; 4_000]); let mut truncated = varied.clone(); truncated.pop(); - assert!(!super::should_reject_direct_mjpeg_frame(&flat)); - assert!(!super::should_reject_direct_mjpeg_frame(&varied)); - assert!(super::should_reject_direct_mjpeg_frame(&truncated)); + temp_env::with_vars( + [ + ("LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD", Some("1")), + ("LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT", Some("18")), + ("LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES", Some("49152")), + ], + || { + assert!(!super::should_reject_direct_mjpeg_frame(0, &flat)); + assert!(!super::should_reject_direct_mjpeg_frame(180_000, &varied)); + assert!(super::should_reject_direct_mjpeg_frame(180_000, &flat)); + assert!(super::should_reject_direct_mjpeg_frame(180_000, &tiny)); + assert!(super::should_reject_direct_mjpeg_frame(180_000, &truncated)); + }, + ); + } + + #[test] + fn direct_mjpeg_visual_guard_can_be_disabled_without_allowing_truncation() { + let mut complete = vec![0xff, 0xd8, 0xff, 0xda]; + complete.extend_from_slice(&vec![0x80; 120_000]); + complete.extend_from_slice(&[0xff, 0xd9]); + let mut truncated = complete.clone(); + truncated.pop(); + + temp_env::with_var("LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD", Some("0"), || { + assert!(!super::should_reject_direct_mjpeg_frame(180_000, &complete)); + assert!(super::should_reject_direct_mjpeg_frame(180_000, &truncated)); + }); } } diff --git a/server/src/video_sinks/webcam_sink.rs b/server/src/video_sinks/webcam_sink.rs index fb33bd4..b6fe092 100644 --- a/server/src/video_sinks/webcam_sink.rs +++ b/server/src/video_sinks/webcam_sink.rs @@ -784,11 +784,15 @@ impl WebcamSink { #[cfg(not(coverage))] fn spool_direct_mjpeg_frame(&self, path: &Path, pkt: &VideoPacket) { - if hevc_mjpeg_guard::should_reject_direct_mjpeg_frame(&pkt.data) { + let previous_bytes = self + .last_mjpeg_passthrough_bytes + .load(std::sync::atomic::Ordering::Relaxed); + if hevc_mjpeg_guard::should_reject_direct_mjpeg_frame(previous_bytes, &pkt.data) { warn!( target:"lesavka_server::video", + previous_bytes, next_bytes = pkt.data.len(), - "📸⚠️ dropping incomplete direct MJPEG frame before UVC spool" + "📸⚠️ freezing suspicious direct MJPEG frame before UVC spool" ); return; } diff --git a/tests/api/server/main/server_main_state_rpc_contract.rs b/tests/api/server/main/server_main_state_rpc_contract.rs index 2a8e7e1..0d72926 100644 --- a/tests/api/server/main/server_main_state_rpc_contract.rs +++ b/tests/api/server/main/server_main_state_rpc_contract.rs @@ -267,7 +267,7 @@ mod server_main_state_rpc { assert_eq!(live.latest_microphone_remote_pts_us, Some(1_001_500)); assert_eq!(live.last_video_presented_pts_us, Some(10_000)); assert_eq!(live.last_audio_presented_pts_us, Some(11_500)); - assert!(live.live_lag_ms.is_some()); + assert!(live.live_lag_ms.is_some_and(|lag| lag >= 20.0)); assert_eq!(live.planner_skew_ms, Some(1.5)); assert_eq!(live.client_capture_skew_ms, Some(1.5)); assert_eq!(live.client_send_skew_ms, Some(2.0)); diff --git a/tests/api/server/upstream_media_runtime/server_upstream_media_v2_handoff_contract.rs b/tests/api/server/upstream_media_runtime/server_upstream_media_v2_handoff_contract.rs index f39fb7c..db3c510 100644 --- a/tests/api/server/upstream_media_runtime/server_upstream_media_v2_handoff_contract.rs +++ b/tests/api/server/upstream_media_runtime/server_upstream_media_v2_handoff_contract.rs @@ -41,7 +41,7 @@ fn bundled_receive_loop_enqueues_instead_of_sleeping_for_handoff() { for expected in [ "tokio::sync::mpsc::channel::(32)", "tokio::sync::mpsc::channel::(32)", - "tokio::spawn(run_media_v2_audio_handoff", + "run_media_v2_audio_handoff(audio_handoff_rx", "tokio::spawn(run_media_v2_video_handoff", "let bundle_epoch = bundle_arrived_at + schedule.common_delay;", "let bundle_base_remote_pts_us = facts.capture_start_us;", @@ -51,6 +51,9 @@ fn bundled_receive_loop_enqueues_instead_of_sleeping_for_handoff() { "bundle_epoch", ".send(scheduled_audio)", ".send(scheduled_video)", + "media_v2_stream_idle_timeout()", + "stream_webcam_media v2 idle timeout", + "closing stale upstream leases", ] { assert!( WEBCAM_RPC.contains(expected), @@ -81,7 +84,7 @@ fn handoff_workers_own_timing_and_presentation_telemetry() { "sink.finish();", "relay.feed(item.packet);", "mark_audio_presented(pts, item.due_at)", - "mark_video_presented(presented_pts, item.due_at)", + "mark_video_presented_with_live_lag(", "Why: sleeping in the receive loop created HTTP/2 backlog", ] { assert!( diff --git a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs index 4b96be4..7b830aa 100644 --- a/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs +++ b/tests/chaos/server/video_sinks/hevc_mjpeg_guard_chaos_contract.rs @@ -30,8 +30,8 @@ mod guard { should_freeze_decoded_mjpeg_frame(previous_bytes, decoded_mjpeg) } - pub fn should_reject_direct_frame(mjpeg: &[u8]) -> bool { - should_reject_direct_mjpeg_frame(mjpeg) + pub fn should_reject_direct_frame(previous_bytes: u64, mjpeg: &[u8]) -> bool { + should_reject_direct_mjpeg_frame(previous_bytes, mjpeg) } } @@ -115,10 +115,10 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() { "last_decoded_mjpeg_bytes", "last_mjpeg_passthrough_bytes", "should_freeze_decoded_mjpeg_frame(previous_bytes, map.as_slice())", - "should_reject_direct_mjpeg_frame(&pkt.data)", + "should_reject_direct_mjpeg_frame(previous_bytes, &pkt.data)", "spool_direct_mjpeg_frame", "freezing suspicious decoded HEVC->MJPEG frame", - "dropping incomplete direct MJPEG frame before UVC spool", + "freezing suspicious direct MJPEG frame before UVC spool", ] { assert!( WEBCAM_SINK.contains(marker), @@ -139,16 +139,31 @@ fn server_hevc_recovery_and_freshest_spool_paths_remain_wired() { } #[test] -fn direct_mjpeg_passthrough_does_not_use_decoded_hevc_visual_freeze_rules() { +fn direct_mjpeg_guard_is_conservative_but_filters_obvious_black_or_truncated_frames() { let healthy_payload: Vec = (0..140_000).map(|idx| (idx % 251) as u8).collect(); let flat = jpeg_with_payload(&vec![0x80; 140_000]); let healthy = jpeg_with_payload(&healthy_payload); + let tiny = jpeg_with_payload(&vec![0x42; 8_000]); let mut truncated = healthy.clone(); truncated.pop(); - assert!(!guard::should_reject_direct_frame(&flat)); - assert!(!guard::should_reject_direct_frame(&healthy)); - assert!(guard::should_reject_direct_frame(&truncated)); + temp_env::with_vars( + [ + ("LESAVKA_UVC_DIRECT_MJPEG_VISUAL_GUARD", Some("1")), + ("LESAVKA_UVC_DIRECT_MJPEG_SIZE_DROP_PCT", Some("18")), + ( + "LESAVKA_UVC_DIRECT_MJPEG_MIN_REFERENCE_BYTES", + Some("49152"), + ), + ], + || { + assert!(!guard::should_reject_direct_frame(0, &flat)); + assert!(!guard::should_reject_direct_frame(220_000, &healthy)); + assert!(guard::should_reject_direct_frame(220_000, &flat)); + assert!(guard::should_reject_direct_frame(220_000, &tiny)); + assert!(guard::should_reject_direct_frame(220_000, &truncated)); + }, + ); } fn jpeg_with_payload(payload: &[u8]) -> Vec { diff --git a/tests/ui/client/launcher/client_launcher_layout_contract.rs b/tests/ui/client/launcher/client_launcher_layout_contract.rs index 29d9541..ebad2f1 100644 --- a/tests/ui/client/launcher/client_launcher_layout_contract.rs +++ b/tests/ui/client/launcher/client_launcher_layout_contract.rs @@ -333,6 +333,9 @@ fn status_chip_text_is_centered_inside_each_pill() { ); assert!(UI_LAYOUT_SRC.contains("build_status_chip_with_light(\"Left\", \"Off\")")); assert!(UI_LAYOUT_SRC.contains("build_status_chip_with_light(\"Right\", \"Off\")")); + assert!(UI_LAYOUT_SRC.contains("build_status_chip_with_light(\"Lag\", \"???\")")); + assert!(UI_LAYOUT_SRC.contains("chips.append(&upstream_lag_chip);")); + assert!(UI_LAYOUT_SRC.contains("upstream_lag_value")); } #[test]