130 lines
4.3 KiB
Python
Executable File
130 lines
4.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Inspect a Lesavka UVC MJPEG spool frame for size/profile corruption clues."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
MAX_MJPEG_FRAME_BYTES = 8 * 1024 * 1024
|
|
DEFAULT_BUDGET_BYTES_PER_SEC = 10_000_000
|
|
|
|
|
|
def read_be16(data: bytes, offset: int) -> Optional[int]:
|
|
if offset + 2 > len(data):
|
|
return None
|
|
return int.from_bytes(data[offset : offset + 2], "big")
|
|
|
|
|
|
def marker_has_length(marker: int) -> bool:
|
|
return marker != 0x01 and not 0xD0 <= marker <= 0xD9
|
|
|
|
|
|
def is_sof(marker: int) -> bool:
|
|
return marker in {0xC0, 0xC1, 0xC2, 0xC3, 0xC5, 0xC6, 0xC7, 0xC9, 0xCA, 0xCB, 0xCD, 0xCE, 0xCF}
|
|
|
|
|
|
def inspect_jpeg(data: bytes) -> dict[str, object]:
|
|
complete = len(data) >= 4 and data.startswith(b"\xff\xd8") and data.endswith(b"\xff\xd9") and b"\xff\xda" in data
|
|
width = None
|
|
height = None
|
|
entropy_start = None
|
|
entropy_end = len(data) - 2 if len(data) >= 2 else len(data)
|
|
|
|
idx = 2
|
|
while idx + 4 <= len(data):
|
|
if data[idx] != 0xFF:
|
|
idx += 1
|
|
continue
|
|
while idx < len(data) and data[idx] == 0xFF:
|
|
idx += 1
|
|
if idx >= len(data):
|
|
break
|
|
marker = data[idx]
|
|
idx += 1
|
|
if marker == 0xDA:
|
|
seg_len = read_be16(data, idx)
|
|
if seg_len is not None and seg_len >= 2 and idx + seg_len <= len(data):
|
|
entropy_start = idx + seg_len
|
|
break
|
|
if marker == 0xD9:
|
|
break
|
|
if not marker_has_length(marker):
|
|
continue
|
|
seg_len = read_be16(data, idx)
|
|
if seg_len is None or seg_len < 2 or idx + seg_len > len(data):
|
|
break
|
|
if is_sof(marker) and seg_len >= 7:
|
|
height = read_be16(data, idx + 3)
|
|
width = read_be16(data, idx + 5)
|
|
idx += seg_len
|
|
|
|
payload = data[entropy_start:entropy_end] if entropy_start is not None and entropy_end > entropy_start else b""
|
|
counts = [0] * 256
|
|
max_run = 0
|
|
run = 0
|
|
prev = None
|
|
for byte in payload:
|
|
counts[byte] += 1
|
|
if byte == prev:
|
|
run += 1
|
|
else:
|
|
prev = byte
|
|
run = 1
|
|
max_run = max(max_run, run)
|
|
dominant = max(counts) if payload else 0
|
|
distinct = sum(1 for count in counts if count)
|
|
dominant_pct = round(dominant * 100 / len(payload), 2) if payload else 0.0
|
|
return {
|
|
"bytes": len(data),
|
|
"complete": complete,
|
|
"width": width,
|
|
"height": height,
|
|
"entropy_bytes": len(payload),
|
|
"entropy_distinct_bytes": distinct,
|
|
"entropy_dominant_pct": dominant_pct,
|
|
"entropy_max_run": max_run,
|
|
}
|
|
|
|
|
|
def derived_cap(fps: int, budget: int, explicit: Optional[int], guard: bool) -> int:
|
|
if not guard:
|
|
return MAX_MJPEG_FRAME_BYTES
|
|
if explicit and explicit > 0:
|
|
return min(explicit, MAX_MJPEG_FRAME_BYTES)
|
|
per_frame = max(budget // max(fps, 1), 64 * 1024)
|
|
return min(per_frame, MAX_MJPEG_FRAME_BYTES)
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("frame", nargs="?", default=os.environ.get("LESAVKA_UVC_FRAME_PATH", "/run/lesavka-uvc-frame.mjpg"))
|
|
parser.add_argument("--fps", type=int, default=int(os.environ.get("LESAVKA_UVC_FPS", "30") or "30"))
|
|
parser.add_argument("--budget-bytes-per-sec", type=int, default=int(os.environ.get("LESAVKA_UVC_MJPEG_BUDGET_BYTES_PER_SEC", str(DEFAULT_BUDGET_BYTES_PER_SEC)) or str(DEFAULT_BUDGET_BYTES_PER_SEC)))
|
|
parser.add_argument("--max-bytes", type=int, default=int(os.environ.get("LESAVKA_UVC_FRAME_MAX_BYTES", "0") or "0"))
|
|
parser.add_argument("--disable-size-guard", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
path = Path(args.frame)
|
|
data = path.read_bytes()
|
|
cap = derived_cap(args.fps, args.budget_bytes_per_sec, args.max_bytes, not args.disable_size_guard)
|
|
report = inspect_jpeg(data)
|
|
report.update(
|
|
{
|
|
"path": str(path),
|
|
"fps": args.fps,
|
|
"budget_bytes_per_sec": args.budget_bytes_per_sec,
|
|
"max_bytes": cap,
|
|
"over_budget": len(data) > cap,
|
|
}
|
|
)
|
|
print(json.dumps(report, indent=2, sort_keys=True))
|
|
return 1 if report["over_budget"] or not report["complete"] else 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|