lesavka/server/src/video_sinks/mjpeg_spool.rs

469 lines
17 KiB
Rust

use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use gstreamer as gst;
use gstreamer_app as gst_app;
#[path = "mjpeg_spool/audit.rs"]
mod audit;
static SPOOL_SEQUENCE: AtomicU64 = AtomicU64::new(1);
static SPOOL_TEMP_SEQUENCE: AtomicU64 = AtomicU64::new(1);
const MAX_MJPEG_FRAME_BYTES: usize = 8 * 1024 * 1024;
const DEFAULT_UVC_MJPEG_BUDGET_BYTES_PER_SEC: u32 = 4_500_000;
const DEFAULT_UVC_ISOCHRONOUS_LIMIT_PCT: u32 = 85;
const HIGH_SPEED_ISOCHRONOUS_MICROFRAMES_PER_SEC: u32 = 8_000;
const CONFIGFS_UVC_BASE: &str = "/sys/kernel/config/usb_gadget/lesavka/functions/uvc.usb0";
#[derive(Clone, Copy)]
pub(super) struct MjpegSpoolTiming {
pub profile: &'static str,
pub source_pts_us: Option<u64>,
pub decoded_pts_us: Option<u64>,
pub uvc_width: Option<u16>,
pub uvc_height: Option<u16>,
pub uvc_fps: Option<u32>,
}
impl MjpegSpoolTiming {
/// Build metadata for direct MJPEG ingress.
///
/// Inputs: the upstream packet PTS in microseconds. Output: timing metadata
/// labeled as passthrough MJPEG. Why: direct MJPEG and decoded HEVC share
/// the same spool file, so future diagnostics need to distinguish them.
pub(super) fn mjpeg_passthrough(source_pts_us: u64) -> Self {
Self {
profile: "mjpeg-passthrough",
source_pts_us: Some(source_pts_us),
decoded_pts_us: None,
uvc_width: None,
uvc_height: None,
uvc_fps: None,
}
}
/// Build metadata for direct MJPEG after local decode/re-encode.
///
/// Inputs: the upstream packet PTS in microseconds. Output: timing metadata
/// labeled as normalized MJPEG. Why: browser-visible UVC corruption can
/// happen after a syntactically valid camera JPEG, so probes need to know
/// when the server has intentionally emitted a clean re-encoded frame.
pub(super) fn mjpeg_normalized(source_pts_us: u64) -> Self {
Self {
profile: "mjpeg-normalized",
source_pts_us: Some(source_pts_us),
decoded_pts_us: None,
uvc_width: None,
uvc_height: None,
uvc_fps: None,
}
}
/// Build metadata for decoded HEVC entering the MJPEG UVC helper.
///
/// Inputs: upstream packet PTS plus the decoded appsink buffer PTS.
/// Output: timing metadata labeled as HEVC-decoded MJPEG. Why: the
/// remaining HEVC sync jitter appears after transport, so we need a
/// low-overhead marker at the decode-to-UVC handoff boundary.
pub(super) fn hevc_decoded_mjpeg(source_pts_us: u64, decoded_pts_us: Option<u64>) -> Self {
Self {
profile: "hevc-decoded-mjpeg",
source_pts_us: Some(source_pts_us),
decoded_pts_us,
uvc_width: None,
uvc_height: None,
uvc_fps: None,
}
}
/// Build metadata for decoded HEVC frames frozen by the safety guard.
///
/// Inputs: upstream packet PTS plus the decoded appsink buffer PTS.
/// Output: timing metadata labeled as rejected HEVC-decoded MJPEG. Why:
/// rejected-frame audits need to line up with accepted UVC-bound frames
/// without implying the rejected payload reached the gadget.
pub(super) fn rejected_hevc_decoded_mjpeg(
source_pts_us: u64,
decoded_pts_us: Option<u64>,
) -> Self {
Self {
profile: "hevc-decoded-mjpeg-rejected",
source_pts_us: Some(source_pts_us),
decoded_pts_us,
uvc_width: None,
uvc_height: None,
uvc_fps: None,
}
}
pub(super) fn with_uvc_mode(mut self, width: u16, height: u16, fps: u32) -> Self {
self.uvc_width = Some(width);
self.uvc_height = Some(height);
self.uvc_fps = Some(fps);
self
}
}
/// Decide whether the UVC helper file-spool path should own MJPEG emission.
///
/// Inputs: `LESAVKA_UVC_MJPEG_SPOOL`. Output: true unless explicitly disabled.
/// Why: the helper path prevents two processes from fighting over the UVC
/// gadget node, while preserving a direct `v4l2sink` fallback for diagnostics.
pub(super) fn mjpeg_spool_enabled() -> bool {
std::env::var("LESAVKA_UVC_MJPEG_SPOOL")
.ok()
.map(|value| {
let trimmed = value.trim();
!(trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off"))
})
.unwrap_or(true)
}
/// Resolve the frame path consumed by the UVC helper.
///
/// Inputs: `LESAVKA_UVC_FRAME_PATH`. Output: filesystem path for the newest
/// MJPEG frame. Why: the helper polls a single atomic frame file, so both direct
/// MJPEG and decoded HEVC output need to agree on the handoff location.
pub(super) fn mjpeg_spool_path() -> PathBuf {
std::env::var("LESAVKA_UVC_FRAME_PATH")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("/run/lesavka-uvc-frame.mjpg"))
}
fn env_u32_opt(name: &str) -> Option<u32> {
std::env::var(name)
.ok()
.and_then(|value| value.trim().parse::<u32>().ok())
}
fn env_flag_enabled(name: &str, default: bool) -> bool {
std::env::var(name)
.ok()
.map(|value| {
let trimmed = value.trim();
if trimmed.eq_ignore_ascii_case("0")
|| trimmed.eq_ignore_ascii_case("false")
|| trimmed.eq_ignore_ascii_case("no")
|| trimmed.eq_ignore_ascii_case("off")
{
false
} else if trimmed.eq_ignore_ascii_case("1")
|| trimmed.eq_ignore_ascii_case("true")
|| trimmed.eq_ignore_ascii_case("yes")
|| trimmed.eq_ignore_ascii_case("on")
{
true
} else {
default
}
})
.unwrap_or(default)
}
fn read_u32_file(path: impl AsRef<Path>) -> Option<u32> {
fs::read_to_string(path)
.ok()
.and_then(|value| value.trim().parse::<u32>().ok())
}
fn uvc_bulk_transfer_enabled() -> bool {
if !env_flag_enabled("LESAVKA_UVC_BULK", true) {
return false;
}
let base = Path::new(CONFIGFS_UVC_BASE);
!base.exists() || base.join("streaming_bulk").exists()
}
fn uvc_streaming_maxpacket(bulk: bool) -> u32 {
let mut maxpacket = env_u32_opt("LESAVKA_UVC_MAXPACKET").unwrap_or(1024);
if let Some(live) = read_u32_file(Path::new(CONFIGFS_UVC_BASE).join("streaming_maxpacket")) {
maxpacket = maxpacket.min(live);
}
if bulk {
maxpacket.min(512)
} else {
maxpacket.min(1024)
}
}
fn uvc_isochronous_budget_bytes_per_sec(maxpacket: u32) -> u32 {
let pct = env_u32_opt("LESAVKA_UVC_ISOCHRONOUS_LIMIT_PCT")
.unwrap_or(DEFAULT_UVC_ISOCHRONOUS_LIMIT_PCT)
.clamp(1, 100);
let bytes = u64::from(maxpacket)
.saturating_mul(u64::from(HIGH_SPEED_ISOCHRONOUS_MICROFRAMES_PER_SEC))
.saturating_mul(u64::from(pct))
/ 100;
bytes.min(u64::from(u32::MAX)) as u32
}
fn effective_mjpeg_budget_bytes_per_sec() -> u32 {
let configured = env_u32_opt("LESAVKA_UVC_MJPEG_BUDGET_BYTES_PER_SEC")
.unwrap_or(DEFAULT_UVC_MJPEG_BUDGET_BYTES_PER_SEC)
.max(1);
if uvc_bulk_transfer_enabled() {
configured
} else {
configured
.min(uvc_isochronous_budget_bytes_per_sec(uvc_streaming_maxpacket(false)))
.max(1)
}
}
/// Resolve the MJPEG byte budget used before publishing to the helper.
///
/// Inputs: active FPS plus `LESAVKA_UVC_FRAME_MAX_BYTES`,
/// `LESAVKA_UVC_MJPEG_BUDGET_BYTES_PER_SEC`, and
/// `LESAVKA_UVC_FRAME_SIZE_GUARD`. Output: maximum accepted frame bytes.
/// Why: oversized MJPEG frames are a common source of host-visible UVC tearing;
/// a short freeze is better than letting the USB gadget emit partial pictures.
pub(super) fn mjpeg_spool_frame_max_bytes(fps: u32) -> usize {
if !env_flag_enabled("LESAVKA_UVC_FRAME_SIZE_GUARD", true) {
return MAX_MJPEG_FRAME_BYTES;
}
if let Some(limit) = env_u32_opt("LESAVKA_UVC_FRAME_MAX_BYTES")
&& limit > 0
{
return (limit as usize).min(MAX_MJPEG_FRAME_BYTES);
}
let fps = fps.max(1);
let budget_per_sec = effective_mjpeg_budget_bytes_per_sec();
let per_frame = (budget_per_sec / fps).max(64 * 1024);
per_frame.min(MAX_MJPEG_FRAME_BYTES as u32) as usize
}
/// Decide whether frame spool metadata should be published.
///
/// Inputs: `LESAVKA_UVC_FRAME_META`. Output: false unless explicitly enabled.
/// Why: the metadata is useful for HEVC boundary diagnostics, but it adds one
/// extra atomic sidecar write per frame and should stay opt-in during calls.
pub(super) fn mjpeg_spool_metadata_enabled() -> bool {
std::env::var("LESAVKA_UVC_FRAME_META")
.ok()
.map(|value| {
let trimmed = value.trim();
trimmed.eq_ignore_ascii_case("1")
|| trimmed.eq_ignore_ascii_case("true")
|| trimmed.eq_ignore_ascii_case("yes")
|| trimmed.eq_ignore_ascii_case("on")
})
.unwrap_or(false)
}
/// Resolve the metadata sidecar path for the UVC helper spool.
///
/// Inputs: frame path plus `LESAVKA_UVC_FRAME_META_PATH`. Output: sidecar path.
/// Why: keeping this path explicit lets capture scripts fetch timing evidence
/// without guessing where the virtual webcam helper found the frame.
pub(super) fn mjpeg_spool_metadata_path(frame_path: &Path) -> PathBuf {
std::env::var("LESAVKA_UVC_FRAME_META_PATH")
.map(PathBuf::from)
.unwrap_or_else(|_| frame_path.with_extension("mjpg.meta.json"))
}
/// Resolve the optional JSONL metadata log for full-probe diagnostics.
///
/// Inputs: `LESAVKA_UVC_FRAME_META_LOG_PATH`. Output: an append-only log path
/// when configured. Why: a latest-frame sidecar is enough for spot checks, but
/// client-to-RCT HEVC probes need the whole decode/spool timing sequence.
pub(super) fn mjpeg_spool_metadata_log_path() -> Option<PathBuf> {
std::env::var("LESAVKA_UVC_FRAME_META_LOG_PATH")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.map(PathBuf::from)
}
/// Bound how long one HEVC handoff may wait for decoded MJPEG output.
///
/// Inputs: `LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS`, clamped to 0..=50ms.
/// Output: the timeout used by appsink polling.
/// Why: decoded frames should be published when they are due, but the video
/// handoff worker must not build a WAN-sized backlog while waiting on decode.
pub(super) fn decoded_mjpeg_pull_timeout() -> gst::ClockTime {
let timeout_ms = std::env::var("LESAVKA_UVC_HEVC_SPOOL_PULL_TIMEOUT_MS")
.ok()
.and_then(|value| value.trim().parse::<u64>().ok())
.unwrap_or(20)
.min(50);
gst::ClockTime::from_mseconds(timeout_ms)
}
/// Drain the decoded-MJPEG appsink down to its freshest sample.
///
/// Inputs: the appsink owned by the HEVC-to-MJPEG branch. Output: the newest
/// available sample, if any. Why: the UVC helper should see the latest decoded
/// frame rather than letting stale decode output accumulate during CPU spikes.
#[cfg(not(coverage))]
pub(super) fn freshest_mjpeg_sample(sink: &gst_app::AppSink) -> Option<gst::Sample> {
let mut newest = sink.try_pull_sample(decoded_mjpeg_pull_timeout());
while let Some(sample) = sink.try_pull_sample(gst::ClockTime::ZERO) {
newest = Some(sample);
}
newest
}
fn unix_now_ns() -> u128 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_nanos())
.unwrap_or(0)
}
fn json_number_or_null(value: Option<u64>) -> String {
value
.map(|value| value.to_string())
.unwrap_or_else(|| "null".to_string())
}
/// Atomically write a text sidecar beside the current frame.
///
/// Inputs: a destination path and complete text payload. Output: success or
/// filesystem error. Why: the latest-frame metadata sidecar should never be
/// observed half-written while RCT probe scripts are collecting artifacts.
fn write_atomic_text(path: &Path, data: &str) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let tmp = path.with_extension(format!("json.{}.tmp", std::process::id()));
fs::write(&tmp, data)?;
fs::rename(&tmp, path)?;
Ok(())
}
/// Append one timing record to the optional full-probe metadata log.
///
/// Inputs: a JSONL path and already formatted metadata record. Output: success
/// or filesystem error. Why: HEVC/RCT debugging needs every decoded-MJPEG
/// handoff timestamp, while the latest sidecar only preserves the newest frame.
fn append_metadata_log(path: &Path, record: &str) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
OpenOptions::new()
.create(true)
.append(true)
.open(path)?
.write_all(record.as_bytes())?;
Ok(())
}
/// Render one metadata record for a spooled MJPEG frame.
///
/// Inputs: a sequence number, frame size, and timing labels. Output: compact
/// JSON suitable for sidecar artifacts. Why: keeping the format deterministic
/// makes later client-to-RCT scripts able to compare server decode/spool timing
/// against final RCT observations without parsing log prose.
pub(super) fn format_mjpeg_spool_metadata(
sequence: u64,
bytes: usize,
timing: MjpegSpoolTiming,
) -> String {
format!(
"{{\"schema\":\"lesavka.uvc-mjpeg-spool-meta.v1\",\"sequence\":{},\"profile\":\"{}\",\"bytes\":{},\"source_pts_us\":{},\"decoded_pts_us\":{},\"uvc_width\":{},\"uvc_height\":{},\"uvc_fps\":{},\"spool_unix_ns\":{}}}\n",
sequence,
timing.profile,
bytes,
json_number_or_null(timing.source_pts_us),
json_number_or_null(timing.decoded_pts_us),
json_number_or_null(timing.uvc_width.map(u64::from)),
json_number_or_null(timing.uvc_height.map(u64::from)),
json_number_or_null(timing.uvc_fps.map(u64::from)),
unix_now_ns()
)
}
/// Atomically publish one MJPEG frame plus optional timing metadata.
///
/// Inputs: destination path, JPEG bytes, and optional timing metadata. Output:
/// success or filesystem error. Why: HEVC transport debugging needs to know
/// whether residual jitter happens before or after the decoded-MJPEG handoff,
/// while the default runtime path should remain identical when metadata is off.
pub(super) fn spool_mjpeg_frame_with_timing(
path: &Path,
data: &[u8],
timing: Option<MjpegSpoolTiming>,
) -> anyhow::Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let tmp = path.with_extension(format!(
"mjpg.{}.{}.tmp",
std::process::id(),
SPOOL_TEMP_SEQUENCE.fetch_add(1, Ordering::Relaxed)
));
fs::write(&tmp, data)?;
fs::rename(&tmp, path)?;
let audit_dir = audit::mjpeg_spool_audit_dir();
let needs_sequence = (mjpeg_spool_metadata_enabled() && timing.is_some()) || audit_dir.is_some();
let sequence =
needs_sequence.then(|| SPOOL_SEQUENCE.fetch_add(1, Ordering::Relaxed));
if mjpeg_spool_metadata_enabled()
&& let Some(timing) = timing
&& let Some(sequence) = sequence
{
let record = format_mjpeg_spool_metadata(sequence, data.len(), timing);
write_atomic_text(&mjpeg_spool_metadata_path(path), &record)?;
if let Some(log_path) = mjpeg_spool_metadata_log_path() {
append_metadata_log(&log_path, &record)?;
}
}
if let (Some(dir), Some(sequence)) = (audit_dir, sequence)
&& let Some(slot) = audit::claim_spool_audit_frame(sequence)
&& let Err(err) = audit::write_mjpeg_spool_audit_frame(&dir, sequence, slot, data, timing)
{
tracing::warn!(
target: "lesavka_server::video",
%err,
audit_dir = %dir.display(),
"failed to write UVC MJPEG boundary audit frame"
);
}
Ok(())
}
/// Preserve a guard-rejected MJPEG frame in the optional boundary audit.
///
/// Inputs: rejected JPEG bytes, timing metadata, and a human-readable reason.
/// Output: best-effort filesystem write. Why: when the live guard freezes a
/// frame, we need the exact payload that did not reach UVC so detector tuning
/// can separate false positives from real distortion.
pub(super) fn audit_rejected_mjpeg_frame(
data: &[u8],
timing: MjpegSpoolTiming,
reason: &str,
) {
let Some(dir) = audit::mjpeg_spool_audit_dir() else {
return;
};
let sequence = SPOOL_SEQUENCE.fetch_add(1, Ordering::Relaxed);
let Some(slot) = audit::claim_spool_audit_frame(sequence) else {
return;
};
if let Err(err) =
audit::write_mjpeg_spool_rejected_audit_frame(&dir, sequence, slot, data, timing, reason)
{
tracing::warn!(
target: "lesavka_server::video",
%err,
audit_dir = %dir.display(),
"failed to write rejected UVC MJPEG boundary audit frame"
);
}
}
#[cfg(test)]
#[path = "mjpeg_spool/tests.rs"]
mod tests;