#!/usr/bin/env python3 """Inspect a Lesavka UVC MJPEG spool frame for size/profile corruption clues.""" from __future__ import annotations import argparse import json import os from pathlib import Path from typing import Optional MAX_MJPEG_FRAME_BYTES = 8 * 1024 * 1024 DEFAULT_BUDGET_BYTES_PER_SEC = 10_000_000 def read_be16(data: bytes, offset: int) -> Optional[int]: if offset + 2 > len(data): return None return int.from_bytes(data[offset : offset + 2], "big") def marker_has_length(marker: int) -> bool: return marker != 0x01 and not 0xD0 <= marker <= 0xD9 def is_sof(marker: int) -> bool: return marker in {0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF} def inspect_jpeg(data: bytes) -> dict[str, object]: complete = len(data) >= 4 and data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9") and b"\xff\xda" in data width = None height = None entropy_start = None entropy_end = len(data) - 2 if len(data) >= 2 else len(data) idx = 2 while idx + 4 <= len(data): if data[idx] != 0xFF: idx += 1 continue while idx < len(data) and data[idx] == 0xFF: idx += 1 if idx >= len(data): break marker = data[idx] idx += 1 if marker == 0xDA: seg_len = read_be16(data, idx) if seg_len is not None and seg_len >= 2 and idx + seg_len <= len(data): entropy_start = idx + seg_len break if marker == 0xD9: break if not marker_has_length(marker): continue seg_len = read_be16(data, idx) if seg_len is None or seg_len < 2 or idx + seg_len > len(data): break if is_sof(marker) and seg_len >= 7: height = read_be16(data, idx + 3) width = read_be16(data, idx + 5) idx += seg_len payload = data[entropy_start:entropy_end] if entropy_start is not None and entropy_end > entropy_start else b"" counts = [0] * 256 max_run = 0 run = 0 prev = None for byte in payload: counts[byte] += 1 if byte == prev: run += 1 else: prev = byte run = 1 max_run = max(max_run, run) dominant = max(counts) if payload else 0 distinct = sum(1 for count in counts if count) dominant_pct = round(dominant * 100 / len(payload), 2) if payload else 0.0 return { "bytes": len(data), "complete": complete, "width": width, "height": height, "entropy_bytes": len(payload), "entropy_distinct_bytes": distinct, "entropy_dominant_pct": dominant_pct, "entropy_max_run": max_run, } def derived_cap(fps: int, budget: int, explicit: Optional[int], guard: bool) -> int: if not guard: return MAX_MJPEG_FRAME_BYTES if explicit and explicit > 0: return min(explicit, MAX_MJPEG_FRAME_BYTES) per_frame = max(budget // max(fps, 1), 64 * 1024) return min(per_frame, MAX_MJPEG_FRAME_BYTES) def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("frame", nargs="?", default=os.environ.get("LESAVKA_UVC_FRAME_PATH", "/run/lesavka-uvc-frame.mjpg")) parser.add_argument("--fps", type=int, default=int(os.environ.get("LESAVKA_UVC_FPS", "30") or "30")) parser.add_argument("--budget-bytes-per-sec", type=int, default=int(os.environ.get("LESAVKA_UVC_MJPEG_BUDGET_BYTES_PER_SEC", str(DEFAULT_BUDGET_BYTES_PER_SEC)) or str(DEFAULT_BUDGET_BYTES_PER_SEC))) parser.add_argument("--max-bytes", type=int, default=int(os.environ.get("LESAVKA_UVC_FRAME_MAX_BYTES", "0") or "0")) parser.add_argument("--disable-size-guard", action="store_true") args = parser.parse_args() path = Path(args.frame) data = path.read_bytes() cap = derived_cap(args.fps, args.budget_bytes_per_sec, args.max_bytes, not args.disable_size_guard) report = inspect_jpeg(data) report.update( { "path": str(path), "fps": args.fps, "budget_bytes_per_sec": args.budget_bytes_per_sec, "max_bytes": cap, "over_budget": len(data) > cap, } ) print(json.dumps(report, indent=2, sort_keys=True)) return 1 if report["over_budget"] or not report["complete"] else 0 if __name__ == "__main__": raise SystemExit(main())