From df9c17c1135bd2ec130ebb177559654a11b3ebb3 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 18 May 2026 23:31:51 -0300 Subject: [PATCH] diagnostics: add UVC spool boundary audit --- Cargo.lock | 6 +- client/Cargo.toml | 2 +- common/Cargo.toml | 2 +- docs/operational-env.md | 4 + scripts/ci/hygiene_gate_baseline.json | 9 +- server/Cargo.toml | 2 +- server/src/video_sinks/mjpeg_spool.rs | 22 ++- server/src/video_sinks/mjpeg_spool/audit.rs | 192 ++++++++++++++++++++ server/src/video_sinks/mjpeg_spool/tests.rs | 120 +++++++++++- 9 files changed, 342 insertions(+), 17 deletions(-) create mode 100644 server/src/video_sinks/mjpeg_spool/audit.rs diff --git a/Cargo.lock b/Cargo.lock index de8cd60..d85f2fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1658,7 +1658,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "lesavka_client" -version = "0.23.2" +version = "0.24.0" dependencies = [ "anyhow", "async-stream", @@ -1692,7 +1692,7 @@ dependencies = [ [[package]] name = "lesavka_common" -version = "0.23.2" +version = "0.24.0" dependencies = [ "anyhow", "base64", @@ -1704,7 +1704,7 @@ dependencies = [ [[package]] name = "lesavka_server" -version = "0.23.2" +version = "0.24.0" dependencies = [ "anyhow", "base64", diff --git a/client/Cargo.toml b/client/Cargo.toml index b7c7de5..fceb3e2 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -4,7 +4,7 @@ path = "src/main.rs" [package] name = "lesavka_client" -version = "0.23.2" +version = "0.24.0" edition = "2024" [dependencies] diff --git a/common/Cargo.toml b/common/Cargo.toml index b207aa8..6481ac3 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lesavka_common" -version = "0.23.2" +version = "0.24.0" edition = "2024" build = "build.rs" diff --git a/docs/operational-env.md b/docs/operational-env.md index aefc0d6..fc6e21d 100644 --- a/docs/operational-env.md +++ b/docs/operational-env.md @@ -336,6 +336,10 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta | `LESAVKA_UVC_EXTERNAL` | server hardware/device override | | `LESAVKA_UVC_FALLBACK` | server hardware/device override | | `LESAVKA_UVC_FPS` | server hardware/device override | +| `LESAVKA_UVC_FRAME_AUDIT_DIR` | UVC helper boundary-audit directory; when set, the server saves exact MJPEG frames published to the UVC helper plus a JSONL index so recordings can be compared against pre-UVC payloads | +| `LESAVKA_UVC_FRAME_AUDIT_EVERY` | UVC helper boundary-audit sampling interval; saves every Nth spooled frame, defaults to `1` for short repros | +| `LESAVKA_UVC_FRAME_AUDIT_LOG_PATH` | UVC helper boundary-audit JSONL path override; defaults to `spool-audit.jsonl` inside `LESAVKA_UVC_FRAME_AUDIT_DIR` | +| `LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES` | UVC helper boundary-audit frame cap; defaults to `1800`, set `0` for unlimited audit capture during isolated short runs | | `LESAVKA_UVC_FRAME_META` | UVC helper diagnostic override; when true, the server writes an atomic JSON sidecar for each spooled MJPEG frame so HEVC decode/spool timing can be compared with final RCT capture | | `LESAVKA_UVC_FRAME_META_LOG_PATH` | UVC helper diagnostic override; when set with `LESAVKA_UVC_FRAME_META=1`, append every MJPEG spool timing record as JSONL for full-probe HEVC/RCT correlation; summarize with `scripts/manual/summarize_uvc_frame_meta_log.py` | | `LESAVKA_UVC_FRAME_META_PATH` | UVC helper diagnostic override; explicit path for the optional MJPEG spool metadata sidecar | diff --git a/scripts/ci/hygiene_gate_baseline.json b/scripts/ci/hygiene_gate_baseline.json index 376fd37..bc71f06 100644 --- a/scripts/ci/hygiene_gate_baseline.json +++ b/scripts/ci/hygiene_gate_baseline.json @@ -1328,12 +1328,17 @@ "server/src/video_sinks/mjpeg_spool.rs": { "clippy_warnings": 0, "doc_debt": 4, - "loc": 376 + "loc": 396 + }, + "server/src/video_sinks/mjpeg_spool/audit.rs": { + "clippy_warnings": 0, + "doc_debt": 1, + "loc": 192 }, "server/src/video_sinks/mjpeg_spool/tests.rs": { "clippy_warnings": 0, "doc_debt": 2, - "loc": 270 + "loc": 374 }, "server/src/video_sinks/webcam_sink.rs": { "clippy_warnings": 0, diff --git a/server/Cargo.toml b/server/Cargo.toml index 2261a2e..d68439e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -16,7 +16,7 @@ bench = false [package] name = "lesavka_server" -version = "0.23.2" +version = "0.24.0" edition = "2024" autobins = false diff --git a/server/src/video_sinks/mjpeg_spool.rs b/server/src/video_sinks/mjpeg_spool.rs index a0240f5..02ee027 100644 --- a/server/src/video_sinks/mjpeg_spool.rs +++ b/server/src/video_sinks/mjpeg_spool.rs @@ -7,6 +7,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use gstreamer as gst; use gstreamer_app as gst_app; +#[path = "mjpeg_spool/audit.rs"] +mod audit; + static SPOOL_SEQUENCE: AtomicU64 = AtomicU64::new(1); static SPOOL_TEMP_SEQUENCE: AtomicU64 = AtomicU64::new(1); const MAX_MJPEG_FRAME_BYTES: usize = 8 * 1024 * 1024; @@ -357,10 +360,15 @@ pub(super) fn spool_mjpeg_frame_with_timing( fs::write(&tmp, data)?; fs::rename(&tmp, path)?; + let audit_dir = audit::mjpeg_spool_audit_dir(); + let needs_sequence = (mjpeg_spool_metadata_enabled() && timing.is_some()) || audit_dir.is_some(); + let sequence = + needs_sequence.then(|| SPOOL_SEQUENCE.fetch_add(1, Ordering::Relaxed)); + if mjpeg_spool_metadata_enabled() && let Some(timing) = timing + && let Some(sequence) = sequence { - let sequence = SPOOL_SEQUENCE.fetch_add(1, Ordering::Relaxed); let record = format_mjpeg_spool_metadata(sequence, data.len(), timing); write_atomic_text(&mjpeg_spool_metadata_path(path), &record)?; if let Some(log_path) = mjpeg_spool_metadata_log_path() { @@ -368,6 +376,18 @@ pub(super) fn spool_mjpeg_frame_with_timing( } } + if let (Some(dir), Some(sequence)) = (audit_dir, sequence) + && audit::claim_spool_audit_frame(sequence) + && let Err(err) = audit::write_mjpeg_spool_audit_frame(&dir, sequence, data, timing) + { + tracing::warn!( + target: "lesavka_server::video", + %err, + audit_dir = %dir.display(), + "failed to write UVC MJPEG boundary audit frame" + ); + } + Ok(()) } diff --git a/server/src/video_sinks/mjpeg_spool/audit.rs b/server/src/video_sinks/mjpeg_spool/audit.rs new file mode 100644 index 0000000..8f1397c --- /dev/null +++ b/server/src/video_sinks/mjpeg_spool/audit.rs @@ -0,0 +1,192 @@ +use std::fs::{self, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use super::MjpegSpoolTiming; + +static AUDIT_TEMP_SEQUENCE: AtomicU64 = AtomicU64::new(1); +static AUDIT_SAVED_FRAMES: AtomicU64 = AtomicU64::new(0); +const DEFAULT_UVC_FRAME_AUDIT_EVERY: u32 = 1; +const DEFAULT_UVC_FRAME_AUDIT_MAX_FRAMES: u32 = 1800; +const FNV1A64_OFFSET: u64 = 0xcbf29ce484222325; +const FNV1A64_PRIME: u64 = 0x100000001b3; + +/// Parse an optional unsigned tuning value. +/// +/// Inputs: environment variable name. Output: parsed `u32` when present. +/// Why: audit controls should ignore malformed operator input instead of +/// preventing frame spooling. +fn env_u32_opt(name: &str) -> Option { + std::env::var(name) + .ok() + .and_then(|value| value.trim().parse::().ok()) +} + +fn unix_now_ns() -> u128 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_nanos()) + .unwrap_or(0) +} + +fn json_number_or_null(value: Option) -> String { + value + .map(|value| value.to_string()) + .unwrap_or_else(|| "null".to_string()) +} + +/// Append one audit JSONL row, creating parent directories as needed. +/// +/// Inputs: destination path and already formatted JSONL row. Output: success +/// or filesystem error. Why: the audit frame and index should be collected +/// together without changing the normal latest-frame spool file. +fn append_record(path: &Path, record: &str) -> anyhow::Result<()> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + OpenOptions::new() + .create(true) + .append(true) + .open(path)? + .write_all(record.as_bytes())?; + Ok(()) +} + +/// Resolve the optional exact UVC-bound MJPEG audit directory. +/// +/// Inputs: `LESAVKA_UVC_FRAME_AUDIT_DIR`. Output: a directory when configured. +/// Why: the final RCT recording can be corrupt even when this boundary frame is +/// clean, so debugging needs the exact payloads published to UVC. +pub(super) fn mjpeg_spool_audit_dir() -> Option { + std::env::var("LESAVKA_UVC_FRAME_AUDIT_DIR") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .map(PathBuf::from) +} + +/// Resolve the UVC-bound audit sampling interval. +/// +/// Inputs: `LESAVKA_UVC_FRAME_AUDIT_EVERY`. Output: save every Nth frame. Why: +/// short repros need every frame, while long calls need a disk-safe throttle. +pub(super) fn mjpeg_spool_audit_every() -> u64 { + u64::from( + env_u32_opt("LESAVKA_UVC_FRAME_AUDIT_EVERY") + .unwrap_or(DEFAULT_UVC_FRAME_AUDIT_EVERY) + .max(1), + ) +} + +/// Resolve how many audited frames may be saved. +/// +/// Inputs: `LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES`. Output: max saved frames; zero +/// means unlimited. Why: full MJPEG dumps are powerful but intentionally heavy. +pub(super) fn mjpeg_spool_audit_max_frames() -> u64 { + u64::from( + env_u32_opt("LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES") + .unwrap_or(DEFAULT_UVC_FRAME_AUDIT_MAX_FRAMES), + ) +} + +/// Resolve the audit JSONL path. +/// +/// Inputs: audit directory plus `LESAVKA_UVC_FRAME_AUDIT_LOG_PATH`. Output: +/// JSONL index path. Why: the frame dump needs compact timing/hash metadata +/// for alignment with eye-recording frame numbers. +pub(super) fn mjpeg_spool_audit_log_path(dir: &Path) -> PathBuf { + std::env::var("LESAVKA_UVC_FRAME_AUDIT_LOG_PATH") + .ok() + .map(|value| value.trim().to_string()) + .filter(|value| !value.is_empty()) + .map(PathBuf::from) + .unwrap_or_else(|| dir.join("spool-audit.jsonl")) +} + +fn fnv1a64_hex(data: &[u8]) -> String { + let mut hash = FNV1A64_OFFSET; + for byte in data { + hash ^= u64::from(*byte); + hash = hash.wrapping_mul(FNV1A64_PRIME); + } + format!("{hash:016x}") +} + +fn json_escape(value: &str) -> String { + value + .replace('\\', "\\\\") + .replace('"', "\\\"") + .replace('\n', "\\n") + .replace('\r', "\\r") +} + +pub(super) fn should_sample_spool_audit_frame(sequence: u64, every: u64) -> bool { + let offset = sequence.saturating_sub(1); + offset.is_multiple_of(every.max(1)) +} + +/// Decide whether the current spooled frame should be preserved. +/// +/// Inputs: global spool sequence. Output: true if sampling and max-frame budget +/// allow a save. Why: the max cap must count saved audit frames, not every +/// frame the long-running server emitted before the audit started. +pub(super) fn claim_spool_audit_frame(sequence: u64) -> bool { + if !should_sample_spool_audit_frame(sequence, mjpeg_spool_audit_every()) { + return false; + } + let max_frames = mjpeg_spool_audit_max_frames(); + max_frames == 0 || AUDIT_SAVED_FRAMES.fetch_add(1, Ordering::Relaxed) < max_frames +} + +/// Format one exact-frame audit index record. +/// +/// Inputs: sequence, JPEG bytes, optional timing, and saved filename. Output: +/// compact JSONL row. Why: hashes and timestamps let later tooling align a bad +/// RCT frame with the server-boundary payload that preceded it. +fn format_audit_record( + sequence: u64, + data: &[u8], + timing: Option, + frame_file: &str, +) -> String { + let profile = timing.map(|value| value.profile).unwrap_or("unknown"); + let source_pts_us = timing.and_then(|value| value.source_pts_us); + let decoded_pts_us = timing.and_then(|value| value.decoded_pts_us); + format!( + "{{\"schema\":\"lesavka.uvc-mjpeg-spool-audit.v1\",\"sequence\":{},\"profile\":\"{}\",\"bytes\":{},\"source_pts_us\":{},\"decoded_pts_us\":{},\"spool_unix_ns\":{},\"fnv1a64\":\"{}\",\"file\":\"{}\"}}\n", + sequence, + json_escape(profile), + data.len(), + json_number_or_null(source_pts_us), + json_number_or_null(decoded_pts_us), + unix_now_ns(), + fnv1a64_hex(data), + json_escape(frame_file) + ) +} + +/// Save one exact UVC-bound MJPEG frame and append its JSONL index row. +/// +/// Inputs: audit directory, sequence, JPEG bytes, and timing. Output: success +/// or filesystem error. Why: this captures the server-side boundary evidence +/// needed to tell pre-UVC corruption from downstream UVC/browser corruption. +pub(super) fn write_mjpeg_spool_audit_frame( + dir: &Path, + sequence: u64, + data: &[u8], + timing: Option, +) -> anyhow::Result<()> { + fs::create_dir_all(dir)?; + let frame_file = format!("frame-{sequence:012}.mjpg"); + let tmp = dir.join(format!( + ".{frame_file}.{}.{}.tmp", + std::process::id(), + AUDIT_TEMP_SEQUENCE.fetch_add(1, Ordering::Relaxed) + )); + fs::write(&tmp, data)?; + fs::rename(&tmp, dir.join(&frame_file))?; + + let record = format_audit_record(sequence, data, timing, &frame_file); + append_record(&mjpeg_spool_audit_log_path(dir), &record) +} diff --git a/server/src/video_sinks/mjpeg_spool/tests.rs b/server/src/video_sinks/mjpeg_spool/tests.rs index 415746c..b04e521 100644 --- a/server/src/video_sinks/mjpeg_spool/tests.rs +++ b/server/src/video_sinks/mjpeg_spool/tests.rs @@ -138,6 +138,48 @@ fn mjpeg_spool_metadata_is_opt_in_and_path_configurable() { }); } +/// Verifies audit env controls default safely and clamp invalid sampling. +/// +/// Input: unset and explicit audit env vars plus sampler examples. Output: +/// disabled-by-default directory, every-frame default, and bounded sampling. +/// Why: boundary audits should be easy for short repros but impossible to +/// enable accidentally during normal calls. +#[test] +fn mjpeg_spool_audit_knobs_are_opt_in_and_bounded() { + temp_env::with_vars( + [ + ("LESAVKA_UVC_FRAME_AUDIT_DIR", None::<&str>), + ("LESAVKA_UVC_FRAME_AUDIT_EVERY", None::<&str>), + ("LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES", None::<&str>), + ], + || { + assert_eq!(super::audit::mjpeg_spool_audit_dir(), None); + assert_eq!(super::audit::mjpeg_spool_audit_every(), 1); + assert_eq!(super::audit::mjpeg_spool_audit_max_frames(), 1800); + }, + ); + + temp_env::with_vars( + [ + ("LESAVKA_UVC_FRAME_AUDIT_DIR", Some("/tmp/uvc-audit")), + ("LESAVKA_UVC_FRAME_AUDIT_EVERY", Some("0")), + ("LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES", Some("0")), + ], + || { + assert_eq!( + super::audit::mjpeg_spool_audit_dir(), + Some(std::path::PathBuf::from("/tmp/uvc-audit")) + ); + assert_eq!(super::audit::mjpeg_spool_audit_every(), 1); + assert_eq!(super::audit::mjpeg_spool_audit_max_frames(), 0); + }, + ); + + assert!(super::audit::should_sample_spool_audit_frame(1, 3)); + assert!(!super::audit::should_sample_spool_audit_frame(2, 3)); + assert!(super::audit::should_sample_spool_audit_frame(4, 3)); +} + /// Verifies metadata records carry enough timing evidence for RCT analysis. /// /// Input: HEVC-decoded spool timing. Output: JSON fields for source and @@ -206,14 +248,20 @@ fn spool_mjpeg_frame_writes_frame_without_default_sidecar() { let frame = dir.path().join("nested").join("frame.mjpg"); let meta = frame.with_extension("mjpg.meta.json"); - temp_env::with_var_unset("LESAVKA_UVC_FRAME_META", || { - super::spool_mjpeg_frame_with_timing( - &frame, - b"jpeg-bytes", - Some(super::MjpegSpoolTiming::mjpeg_passthrough(10)), - ) - .expect("spool frame"); - }); + temp_env::with_vars( + [ + ("LESAVKA_UVC_FRAME_META", None::<&str>), + ("LESAVKA_UVC_FRAME_AUDIT_DIR", None::<&str>), + ], + || { + super::spool_mjpeg_frame_with_timing( + &frame, + b"jpeg-bytes", + Some(super::MjpegSpoolTiming::mjpeg_passthrough(10)), + ) + .expect("spool frame"); + }, + ); assert_eq!(std::fs::read(&frame).expect("read frame"), b"jpeg-bytes"); assert!(!meta.exists()); @@ -243,6 +291,7 @@ fn spool_mjpeg_frame_writes_enabled_sidecar_with_timing() { "LESAVKA_UVC_FRAME_META_LOG_PATH", Some(log.to_str().expect("utf8 path")), ), + ("LESAVKA_UVC_FRAME_AUDIT_DIR", None::<&str>), ], || { super::spool_mjpeg_frame_with_timing( @@ -268,3 +317,58 @@ fn spool_mjpeg_frame_writes_enabled_sidecar_with_timing() { assert_eq!(log_record.lines().count(), 1); assert!(log_record.contains("\"profile\":\"hevc-decoded-mjpeg\"")); } + +/// Verifies boundary audit saves exact UVC-bound MJPEG payloads and an index. +/// +/// Input: enabled audit dir with every-frame capture. Output: frame copy plus +/// JSONL timing/hash record. Why: this is the evidence needed to decide +/// whether corruption exists before or after the server-to-UVC boundary. +#[test] +fn spool_mjpeg_frame_writes_enabled_boundary_audit() { + let dir = tempfile::tempdir().expect("tempdir"); + let frame = dir.path().join("frame.mjpg"); + let audit = dir.path().join("audit"); + + temp_env::with_vars( + [ + ("LESAVKA_UVC_FRAME_META", None::<&str>), + ( + "LESAVKA_UVC_FRAME_AUDIT_DIR", + Some(audit.to_str().expect("utf8 path")), + ), + ("LESAVKA_UVC_FRAME_AUDIT_EVERY", Some("1")), + ("LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES", Some("1")), + ], + || { + super::spool_mjpeg_frame_with_timing( + &frame, + b"audit-jpeg", + Some(super::MjpegSpoolTiming::mjpeg_passthrough(222)), + ) + .expect("spool audited frame"); + super::spool_mjpeg_frame_with_timing( + &frame, + b"second-jpeg", + Some(super::MjpegSpoolTiming::mjpeg_passthrough(333)), + ) + .expect("spool second audited frame"); + }, + ); + + let audited_frames = std::fs::read_dir(&audit) + .expect("audit dir") + .filter_map(Result::ok) + .filter(|entry| entry.file_name().to_string_lossy().ends_with(".mjpg")) + .collect::>(); + assert_eq!(audited_frames.len(), 1); + assert_eq!( + std::fs::read(audited_frames[0].path()).expect("audit frame"), + b"audit-jpeg" + ); + let index = std::fs::read_to_string(audit.join("spool-audit.jsonl")).expect("audit index"); + assert_eq!(index.lines().count(), 1); + assert!(index.contains("\"schema\":\"lesavka.uvc-mjpeg-spool-audit.v1\"")); + assert!(index.contains("\"profile\":\"mjpeg-passthrough\"")); + assert!(index.contains("\"source_pts_us\":222")); + assert!(index.contains("\"file\":\"frame-")); +}