diagnostics: add UVC spool boundary audit

This commit is contained in:
Brad Stein 2026-05-18 23:31:51 -03:00
parent 518f74b91a
commit df9c17c113
9 changed files with 342 additions and 17 deletions

6
Cargo.lock generated
View File

@ -1658,7 +1658,7 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]]
name = "lesavka_client"
version = "0.23.2"
version = "0.24.0"
dependencies = [
"anyhow",
"async-stream",
@ -1692,7 +1692,7 @@ dependencies = [
[[package]]
name = "lesavka_common"
version = "0.23.2"
version = "0.24.0"
dependencies = [
"anyhow",
"base64",
@ -1704,7 +1704,7 @@ dependencies = [
[[package]]
name = "lesavka_server"
version = "0.23.2"
version = "0.24.0"
dependencies = [
"anyhow",
"base64",

View File

@ -4,7 +4,7 @@ path = "src/main.rs"
[package]
name = "lesavka_client"
version = "0.23.2"
version = "0.24.0"
edition = "2024"
[dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "lesavka_common"
version = "0.23.2"
version = "0.24.0"
edition = "2024"
build = "build.rs"

View File

@ -336,6 +336,10 @@ from `LESAVKA_CLIENT_PKI_SSH_SOURCE` over SSH. Runtime clients require the insta
| `LESAVKA_UVC_EXTERNAL` | server hardware/device override |
| `LESAVKA_UVC_FALLBACK` | server hardware/device override |
| `LESAVKA_UVC_FPS` | server hardware/device override |
| `LESAVKA_UVC_FRAME_AUDIT_DIR` | UVC helper boundary-audit directory; when set, the server saves exact MJPEG frames published to the UVC helper plus a JSONL index so recordings can be compared against pre-UVC payloads |
| `LESAVKA_UVC_FRAME_AUDIT_EVERY` | UVC helper boundary-audit sampling interval; saves every Nth spooled frame, defaults to `1` for short repros |
| `LESAVKA_UVC_FRAME_AUDIT_LOG_PATH` | UVC helper boundary-audit JSONL path override; defaults to `spool-audit.jsonl` inside `LESAVKA_UVC_FRAME_AUDIT_DIR` |
| `LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES` | UVC helper boundary-audit frame cap; defaults to `1800`, set `0` for unlimited audit capture during isolated short runs |
| `LESAVKA_UVC_FRAME_META` | UVC helper diagnostic override; when true, the server writes an atomic JSON sidecar for each spooled MJPEG frame so HEVC decode/spool timing can be compared with final RCT capture |
| `LESAVKA_UVC_FRAME_META_LOG_PATH` | UVC helper diagnostic override; when set with `LESAVKA_UVC_FRAME_META=1`, append every MJPEG spool timing record as JSONL for full-probe HEVC/RCT correlation; summarize with `scripts/manual/summarize_uvc_frame_meta_log.py` |
| `LESAVKA_UVC_FRAME_META_PATH` | UVC helper diagnostic override; explicit path for the optional MJPEG spool metadata sidecar |

View File

@ -1328,12 +1328,17 @@
"server/src/video_sinks/mjpeg_spool.rs": {
"clippy_warnings": 0,
"doc_debt": 4,
"loc": 376
"loc": 396
},
"server/src/video_sinks/mjpeg_spool/audit.rs": {
"clippy_warnings": 0,
"doc_debt": 1,
"loc": 192
},
"server/src/video_sinks/mjpeg_spool/tests.rs": {
"clippy_warnings": 0,
"doc_debt": 2,
"loc": 270
"loc": 374
},
"server/src/video_sinks/webcam_sink.rs": {
"clippy_warnings": 0,

View File

@ -16,7 +16,7 @@ bench = false
[package]
name = "lesavka_server"
version = "0.23.2"
version = "0.24.0"
edition = "2024"
autobins = false

View File

@ -7,6 +7,9 @@ use std::time::{SystemTime, UNIX_EPOCH};
use gstreamer as gst;
use gstreamer_app as gst_app;
#[path = "mjpeg_spool/audit.rs"]
mod audit;
static SPOOL_SEQUENCE: AtomicU64 = AtomicU64::new(1);
static SPOOL_TEMP_SEQUENCE: AtomicU64 = AtomicU64::new(1);
const MAX_MJPEG_FRAME_BYTES: usize = 8 * 1024 * 1024;
@ -357,10 +360,15 @@ pub(super) fn spool_mjpeg_frame_with_timing(
fs::write(&tmp, data)?;
fs::rename(&tmp, path)?;
let audit_dir = audit::mjpeg_spool_audit_dir();
let needs_sequence = (mjpeg_spool_metadata_enabled() && timing.is_some()) || audit_dir.is_some();
let sequence =
needs_sequence.then(|| SPOOL_SEQUENCE.fetch_add(1, Ordering::Relaxed));
if mjpeg_spool_metadata_enabled()
&& let Some(timing) = timing
&& let Some(sequence) = sequence
{
let sequence = SPOOL_SEQUENCE.fetch_add(1, Ordering::Relaxed);
let record = format_mjpeg_spool_metadata(sequence, data.len(), timing);
write_atomic_text(&mjpeg_spool_metadata_path(path), &record)?;
if let Some(log_path) = mjpeg_spool_metadata_log_path() {
@ -368,6 +376,18 @@ pub(super) fn spool_mjpeg_frame_with_timing(
}
}
if let (Some(dir), Some(sequence)) = (audit_dir, sequence)
&& audit::claim_spool_audit_frame(sequence)
&& let Err(err) = audit::write_mjpeg_spool_audit_frame(&dir, sequence, data, timing)
{
tracing::warn!(
target: "lesavka_server::video",
%err,
audit_dir = %dir.display(),
"failed to write UVC MJPEG boundary audit frame"
);
}
Ok(())
}

View File

@ -0,0 +1,192 @@
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use super::MjpegSpoolTiming;
static AUDIT_TEMP_SEQUENCE: AtomicU64 = AtomicU64::new(1);
static AUDIT_SAVED_FRAMES: AtomicU64 = AtomicU64::new(0);
const DEFAULT_UVC_FRAME_AUDIT_EVERY: u32 = 1;
const DEFAULT_UVC_FRAME_AUDIT_MAX_FRAMES: u32 = 1800;
const FNV1A64_OFFSET: u64 = 0xcbf29ce484222325;
const FNV1A64_PRIME: u64 = 0x100000001b3;
/// Parse an optional unsigned tuning value.
///
/// Inputs: environment variable name. Output: parsed `u32` when present.
/// Why: audit controls should ignore malformed operator input instead of
/// preventing frame spooling.
fn env_u32_opt(name: &str) -> Option<u32> {
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<u32>().ok())
}
fn unix_now_ns() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0)
}
fn json_number_or_null(value: Option<u64>) -> String {
value
.map(|value| value.to_string())
.unwrap_or_else(|| "null".to_string())
}
/// Append one audit JSONL row, creating parent directories as needed.
///
/// Inputs: destination path and already formatted JSONL row. Output: success
/// or filesystem error. Why: the audit frame and index should be collected
/// together without changing the normal latest-frame spool file.
fn append_record(path: &Path, record: &str) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
OpenOptions::new()
.create(true)
.append(true)
.open(path)?
.write_all(record.as_bytes())?;
Ok(())
}
/// Resolve the optional exact UVC-bound MJPEG audit directory.
///
/// Inputs: `LESAVKA_UVC_FRAME_AUDIT_DIR`. Output: a directory when configured.
/// Why: the final RCT recording can be corrupt even when this boundary frame is
/// clean, so debugging needs the exact payloads published to UVC.
pub(super) fn mjpeg_spool_audit_dir() -> Option<PathBuf> {
std::env::var("LESAVKA_UVC_FRAME_AUDIT_DIR")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.map(PathBuf::from)
}
/// Resolve the UVC-bound audit sampling interval.
///
/// Inputs: `LESAVKA_UVC_FRAME_AUDIT_EVERY`. Output: save every Nth frame. Why:
/// short repros need every frame, while long calls need a disk-safe throttle.
pub(super) fn mjpeg_spool_audit_every() -> u64 {
u64::from(
env_u32_opt("LESAVKA_UVC_FRAME_AUDIT_EVERY")
.unwrap_or(DEFAULT_UVC_FRAME_AUDIT_EVERY)
.max(1),
)
}
/// Resolve how many audited frames may be saved.
///
/// Inputs: `LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES`. Output: max saved frames; zero
/// means unlimited. Why: full MJPEG dumps are powerful but intentionally heavy.
pub(super) fn mjpeg_spool_audit_max_frames() -> u64 {
u64::from(
env_u32_opt("LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES")
.unwrap_or(DEFAULT_UVC_FRAME_AUDIT_MAX_FRAMES),
)
}
/// Resolve the audit JSONL path.
///
/// Inputs: audit directory plus `LESAVKA_UVC_FRAME_AUDIT_LOG_PATH`. Output:
/// JSONL index path. Why: the frame dump needs compact timing/hash metadata
/// for alignment with eye-recording frame numbers.
pub(super) fn mjpeg_spool_audit_log_path(dir: &Path) -> PathBuf {
std::env::var("LESAVKA_UVC_FRAME_AUDIT_LOG_PATH")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.map(PathBuf::from)
.unwrap_or_else(|| dir.join("spool-audit.jsonl"))
}
fn fnv1a64_hex(data: &[u8]) -> String {
let mut hash = FNV1A64_OFFSET;
for byte in data {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(FNV1A64_PRIME);
}
format!("{hash:016x}")
}
fn json_escape(value: &str) -> String {
value
.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
.replace('\r', "\\r")
}
pub(super) fn should_sample_spool_audit_frame(sequence: u64, every: u64) -> bool {
let offset = sequence.saturating_sub(1);
offset.is_multiple_of(every.max(1))
}
/// Decide whether the current spooled frame should be preserved.
///
/// Inputs: global spool sequence. Output: true if sampling and max-frame budget
/// allow a save. Why: the max cap must count saved audit frames, not every
/// frame the long-running server emitted before the audit started.
pub(super) fn claim_spool_audit_frame(sequence: u64) -> bool {
if !should_sample_spool_audit_frame(sequence, mjpeg_spool_audit_every()) {
return false;
}
let max_frames = mjpeg_spool_audit_max_frames();
max_frames == 0 || AUDIT_SAVED_FRAMES.fetch_add(1, Ordering::Relaxed) < max_frames
}
/// Format one exact-frame audit index record.
///
/// Inputs: sequence, JPEG bytes, optional timing, and saved filename. Output:
/// compact JSONL row. Why: hashes and timestamps let later tooling align a bad
/// RCT frame with the server-boundary payload that preceded it.
fn format_audit_record(
sequence: u64,
data: &[u8],
timing: Option<MjpegSpoolTiming>,
frame_file: &str,
) -> String {
let profile = timing.map(|value| value.profile).unwrap_or("unknown");
let source_pts_us = timing.and_then(|value| value.source_pts_us);
let decoded_pts_us = timing.and_then(|value| value.decoded_pts_us);
format!(
"{{\"schema\":\"lesavka.uvc-mjpeg-spool-audit.v1\",\"sequence\":{},\"profile\":\"{}\",\"bytes\":{},\"source_pts_us\":{},\"decoded_pts_us\":{},\"spool_unix_ns\":{},\"fnv1a64\":\"{}\",\"file\":\"{}\"}}\n",
sequence,
json_escape(profile),
data.len(),
json_number_or_null(source_pts_us),
json_number_or_null(decoded_pts_us),
unix_now_ns(),
fnv1a64_hex(data),
json_escape(frame_file)
)
}
/// Save one exact UVC-bound MJPEG frame and append its JSONL index row.
///
/// Inputs: audit directory, sequence, JPEG bytes, and timing. Output: success
/// or filesystem error. Why: this captures the server-side boundary evidence
/// needed to tell pre-UVC corruption from downstream UVC/browser corruption.
pub(super) fn write_mjpeg_spool_audit_frame(
dir: &Path,
sequence: u64,
data: &[u8],
timing: Option<MjpegSpoolTiming>,
) -> anyhow::Result<()> {
fs::create_dir_all(dir)?;
let frame_file = format!("frame-{sequence:012}.mjpg");
let tmp = dir.join(format!(
".{frame_file}.{}.{}.tmp",
std::process::id(),
AUDIT_TEMP_SEQUENCE.fetch_add(1, Ordering::Relaxed)
));
fs::write(&tmp, data)?;
fs::rename(&tmp, dir.join(&frame_file))?;
let record = format_audit_record(sequence, data, timing, &frame_file);
append_record(&mjpeg_spool_audit_log_path(dir), &record)
}

View File

@ -138,6 +138,48 @@ fn mjpeg_spool_metadata_is_opt_in_and_path_configurable() {
});
}
/// Verifies audit env controls default safely and clamp invalid sampling.
///
/// Input: unset and explicit audit env vars plus sampler examples. Output:
/// disabled-by-default directory, every-frame default, and bounded sampling.
/// Why: boundary audits should be easy for short repros but impossible to
/// enable accidentally during normal calls.
#[test]
fn mjpeg_spool_audit_knobs_are_opt_in_and_bounded() {
temp_env::with_vars(
[
("LESAVKA_UVC_FRAME_AUDIT_DIR", None::<&str>),
("LESAVKA_UVC_FRAME_AUDIT_EVERY", None::<&str>),
("LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES", None::<&str>),
],
|| {
assert_eq!(super::audit::mjpeg_spool_audit_dir(), None);
assert_eq!(super::audit::mjpeg_spool_audit_every(), 1);
assert_eq!(super::audit::mjpeg_spool_audit_max_frames(), 1800);
},
);
temp_env::with_vars(
[
("LESAVKA_UVC_FRAME_AUDIT_DIR", Some("/tmp/uvc-audit")),
("LESAVKA_UVC_FRAME_AUDIT_EVERY", Some("0")),
("LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES", Some("0")),
],
|| {
assert_eq!(
super::audit::mjpeg_spool_audit_dir(),
Some(std::path::PathBuf::from("/tmp/uvc-audit"))
);
assert_eq!(super::audit::mjpeg_spool_audit_every(), 1);
assert_eq!(super::audit::mjpeg_spool_audit_max_frames(), 0);
},
);
assert!(super::audit::should_sample_spool_audit_frame(1, 3));
assert!(!super::audit::should_sample_spool_audit_frame(2, 3));
assert!(super::audit::should_sample_spool_audit_frame(4, 3));
}
/// Verifies metadata records carry enough timing evidence for RCT analysis.
///
/// Input: HEVC-decoded spool timing. Output: JSON fields for source and
@ -206,14 +248,20 @@ fn spool_mjpeg_frame_writes_frame_without_default_sidecar() {
let frame = dir.path().join("nested").join("frame.mjpg");
let meta = frame.with_extension("mjpg.meta.json");
temp_env::with_var_unset("LESAVKA_UVC_FRAME_META", || {
super::spool_mjpeg_frame_with_timing(
&frame,
b"jpeg-bytes",
Some(super::MjpegSpoolTiming::mjpeg_passthrough(10)),
)
.expect("spool frame");
});
temp_env::with_vars(
[
("LESAVKA_UVC_FRAME_META", None::<&str>),
("LESAVKA_UVC_FRAME_AUDIT_DIR", None::<&str>),
],
|| {
super::spool_mjpeg_frame_with_timing(
&frame,
b"jpeg-bytes",
Some(super::MjpegSpoolTiming::mjpeg_passthrough(10)),
)
.expect("spool frame");
},
);
assert_eq!(std::fs::read(&frame).expect("read frame"), b"jpeg-bytes");
assert!(!meta.exists());
@ -243,6 +291,7 @@ fn spool_mjpeg_frame_writes_enabled_sidecar_with_timing() {
"LESAVKA_UVC_FRAME_META_LOG_PATH",
Some(log.to_str().expect("utf8 path")),
),
("LESAVKA_UVC_FRAME_AUDIT_DIR", None::<&str>),
],
|| {
super::spool_mjpeg_frame_with_timing(
@ -268,3 +317,58 @@ fn spool_mjpeg_frame_writes_enabled_sidecar_with_timing() {
assert_eq!(log_record.lines().count(), 1);
assert!(log_record.contains("\"profile\":\"hevc-decoded-mjpeg\""));
}
/// Verifies boundary audit saves exact UVC-bound MJPEG payloads and an index.
///
/// Input: enabled audit dir with every-frame capture. Output: frame copy plus
/// JSONL timing/hash record. Why: this is the evidence needed to decide
/// whether corruption exists before or after the server-to-UVC boundary.
#[test]
fn spool_mjpeg_frame_writes_enabled_boundary_audit() {
let dir = tempfile::tempdir().expect("tempdir");
let frame = dir.path().join("frame.mjpg");
let audit = dir.path().join("audit");
temp_env::with_vars(
[
("LESAVKA_UVC_FRAME_META", None::<&str>),
(
"LESAVKA_UVC_FRAME_AUDIT_DIR",
Some(audit.to_str().expect("utf8 path")),
),
("LESAVKA_UVC_FRAME_AUDIT_EVERY", Some("1")),
("LESAVKA_UVC_FRAME_AUDIT_MAX_FRAMES", Some("1")),
],
|| {
super::spool_mjpeg_frame_with_timing(
&frame,
b"audit-jpeg",
Some(super::MjpegSpoolTiming::mjpeg_passthrough(222)),
)
.expect("spool audited frame");
super::spool_mjpeg_frame_with_timing(
&frame,
b"second-jpeg",
Some(super::MjpegSpoolTiming::mjpeg_passthrough(333)),
)
.expect("spool second audited frame");
},
);
let audited_frames = std::fs::read_dir(&audit)
.expect("audit dir")
.filter_map(Result::ok)
.filter(|entry| entry.file_name().to_string_lossy().ends_with(".mjpg"))
.collect::<Vec<_>>();
assert_eq!(audited_frames.len(), 1);
assert_eq!(
std::fs::read(audited_frames[0].path()).expect("audit frame"),
b"audit-jpeg"
);
let index = std::fs::read_to_string(audit.join("spool-audit.jsonl")).expect("audit index");
assert_eq!(index.lines().count(), 1);
assert!(index.contains("\"schema\":\"lesavka.uvc-mjpeg-spool-audit.v1\""));
assert!(index.contains("\"profile\":\"mjpeg-passthrough\""));
assert!(index.contains("\"source_pts_us\":222"));
assert!(index.contains("\"file\":\"frame-"));
}