lesavka/scripts/manual/analyze_uvc_mjpeg_frame.py

130 lines
4.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""Inspect a Lesavka UVC MJPEG spool frame for size/profile corruption clues."""
from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
from typing import Optional
MAX_MJPEG_FRAME_BYTES = 8 * 1024 * 1024
DEFAULT_BUDGET_BYTES_PER_SEC = 10_000_000
def read_be16(data: bytes, offset: int) -> Optional[int]:
if offset + 2 > len(data):
return None
return int.from_bytes(data[offset : offset + 2], "big")
def marker_has_length(marker: int) -> bool:
return marker != 0x01 and not 0xD0 <= marker <= 0xD9
def is_sof(marker: int) -> bool:
return marker in {0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF}
def inspect_jpeg(data: bytes) -> dict[str, object]:
complete = len(data) >= 4 and data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9") and b"\xff\xda" in data
width = None
height = None
entropy_start = None
entropy_end = len(data) - 2 if len(data) >= 2 else len(data)
idx = 2
while idx + 4 <= len(data):
if data[idx] != 0xFF:
idx += 1
continue
while idx < len(data) and data[idx] == 0xFF:
idx += 1
if idx >= len(data):
break
marker = data[idx]
idx += 1
if marker == 0xDA:
seg_len = read_be16(data, idx)
if seg_len is not None and seg_len >= 2 and idx + seg_len <= len(data):
entropy_start = idx + seg_len
break
if marker == 0xD9:
break
if not marker_has_length(marker):
continue
seg_len = read_be16(data, idx)
if seg_len is None or seg_len < 2 or idx + seg_len > len(data):
break
if is_sof(marker) and seg_len >= 7:
height = read_be16(data, idx + 3)
width = read_be16(data, idx + 5)
idx += seg_len
payload = data[entropy_start:entropy_end] if entropy_start is not None and entropy_end > entropy_start else b""
counts = [0] * 256
max_run = 0
run = 0
prev = None
for byte in payload:
counts[byte] += 1
if byte == prev:
run += 1
else:
prev = byte
run = 1
max_run = max(max_run, run)
dominant = max(counts) if payload else 0
distinct = sum(1 for count in counts if count)
dominant_pct = round(dominant * 100 / len(payload), 2) if payload else 0.0
return {
"bytes": len(data),
"complete": complete,
"width": width,
"height": height,
"entropy_bytes": len(payload),
"entropy_distinct_bytes": distinct,
"entropy_dominant_pct": dominant_pct,
"entropy_max_run": max_run,
}
def derived_cap(fps: int, budget: int, explicit: Optional[int], guard: bool) -> int:
if not guard:
return MAX_MJPEG_FRAME_BYTES
if explicit and explicit > 0:
return min(explicit, MAX_MJPEG_FRAME_BYTES)
per_frame = max(budget // max(fps, 1), 64 * 1024)
return min(per_frame, MAX_MJPEG_FRAME_BYTES)
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("frame", nargs="?", default=os.environ.get("LESAVKA_UVC_FRAME_PATH", "/run/lesavka-uvc-frame.mjpg"))
parser.add_argument("--fps", type=int, default=int(os.environ.get("LESAVKA_UVC_FPS", "30") or "30"))
parser.add_argument("--budget-bytes-per-sec", type=int, default=int(os.environ.get("LESAVKA_UVC_MJPEG_BUDGET_BYTES_PER_SEC", str(DEFAULT_BUDGET_BYTES_PER_SEC)) or str(DEFAULT_BUDGET_BYTES_PER_SEC)))
parser.add_argument("--max-bytes", type=int, default=int(os.environ.get("LESAVKA_UVC_FRAME_MAX_BYTES", "0") or "0"))
parser.add_argument("--disable-size-guard", action="store_true")
args = parser.parse_args()
path = Path(args.frame)
data = path.read_bytes()
cap = derived_cap(args.fps, args.budget_bytes_per_sec, args.max_bytes, not args.disable_size_guard)
report = inspect_jpeg(data)
report.update(
{
"path": str(path),
"fps": args.fps,
"budget_bytes_per_sec": args.budget_bytes_per_sec,
"max_bytes": cap,
"over_budget": len(data) > cap,
}
)
print(json.dumps(report, indent=2, sort_keys=True))
return 1 if report["over_budget"] or not report["complete"] else 0
if __name__ == "__main__":
raise SystemExit(main())