#!/usr/bin/env python3 """Summarize optional UVC MJPEG frame metadata JSONL logs. The server can append one compact JSON record for every MJPEG frame it spools into the UVC helper. This script turns that raw per-frame stream into cadence, profile, and synthetic-event coverage metrics. Why: when an HEVC client-to-RCT run fails at the final capture, we need to know whether the decoded MJPEG handoff was already incomplete before adding heavier server-side introspection. """ from __future__ import annotations import argparse import json import math import pathlib import sys from collections import Counter from typing import Any SCHEMA = "lesavka.uvc-mjpeg-spool-meta.v1" def percentile(values: list[float], q: float) -> float | None: """Return a nearest-rank percentile for finite numeric samples. Inputs: sample values and a quantile from `0.0` to `1.0`. Output: the selected percentile or `None` when no finite samples exist. Why: all Lesavka probe summaries use p95-style nearest-rank percentiles, so this keeps the spool boundary diagnostics comparable with sync/freshness reports. """ finite = sorted(value for value in values if math.isfinite(value)) if not finite: return None index = min(len(finite) - 1, max(0, math.ceil(len(finite) * q) - 1)) return finite[index] def optional_int(value: Any) -> int | None: """Parse optional integer JSON fields without treating null as an error. Inputs: a raw JSON field. Output: an integer or `None`. Why: MJPEG ingress has no decoded PTS, while HEVC-decoded MJPEG should provide one when the decoder reports it, and both profiles share the same log schema. """ if value is None: return None try: return int(value) except (TypeError, ValueError): return None def load_records(path: pathlib.Path) -> tuple[list[dict[str, Any]], int]: """Load valid metadata records from a JSONL file. Inputs: a JSONL path. Output: valid records plus ignored-line count. Why: probe logs are operational artifacts; the summarizer should tolerate blank, truncated, or unrelated lines while still refusing to summarize an empty usable stream. """ records: list[dict[str, Any]] = [] ignored = 0 for line in path.read_text(errors="replace").splitlines(): if not line.strip(): continue try: raw = json.loads(line) except json.JSONDecodeError: ignored += 1 continue if raw.get("schema") != SCHEMA: ignored += 1 continue sequence = optional_int(raw.get("sequence")) byte_count = optional_int(raw.get("bytes")) spool_unix_ns = optional_int(raw.get("spool_unix_ns")) if sequence is None or byte_count is None or spool_unix_ns is None: ignored += 1 continue records.append( { "sequence": sequence, "profile": str(raw.get("profile") or "unknown"), "bytes": byte_count, "source_pts_us": optional_int(raw.get("source_pts_us")), "decoded_pts_us": optional_int(raw.get("decoded_pts_us")), "spool_unix_ns": spool_unix_ns, } ) return records, ignored def diffs(values: list[int]) -> list[float]: """Return adjacent differences in milliseconds for sorted integer samples. Inputs: timestamps in microseconds or nanoseconds after the caller has selected the unit. Output: millisecond deltas. Why: cadence problems show up as gaps between adjacent frame records, not as absolute timestamps. """ if len(values) < 2: return [] return [(b - a) / 1000.0 for a, b in zip(values, values[1:])] def sequence_gap_count(records: list[dict[str, Any]]) -> int: """Count missing sequence numbers in the append-only frame log. Inputs: parsed frame metadata. Output: total missing sequence IDs. Why: a source PTS gap can be legitimate after freshness drops, but a sequence gap points at incomplete logging or skipped spool writes. """ ordered = sorted(record["sequence"] for record in records) return sum(max(0, b - a - 1) for a, b in zip(ordered, ordered[1:])) def event_coverage(records: list[dict[str, Any]], timeline_path: pathlib.Path | None) -> dict | None: """Compare spooled frame PTS values with synthetic event windows. Inputs: frame records and an optional client/server probe timeline JSON. Output: coverage counts or `None`. Why: the top-level RCT analyzer can miss flashes after transport turbulence; this boundary check tells us whether the event-coded video frames reached the UVC spool before blaming final capture. """ if timeline_path is None: return None try: timeline = json.loads(timeline_path.read_text()) except (OSError, json.JSONDecodeError): return None events = timeline.get("events") if not isinstance(events, list): return None source_pts = [ record["source_pts_us"] for record in records if isinstance(record.get("source_pts_us"), int) ] covered = 0 missing_codes: list[int] = [] per_event: list[dict[str, Any]] = [] for event in events: try: start = int(event["planned_start_us"]) end = int(event["planned_end_us"]) except (KeyError, TypeError, ValueError): continue code = optional_int(event.get("code")) matching = sum(1 for pts in source_pts if start <= pts < end) if matching: covered += 1 elif code is not None: missing_codes.append(code) per_event.append( { "event_id": optional_int(event.get("event_id")), "code": code, "frame_count": matching, } ) return { "expected_events": len(per_event), "covered_events": covered, "missing_codes": missing_codes, "per_event": per_event, } def summarize(records: list[dict[str, Any]], ignored: int, fps: float | None, timeline: pathlib.Path | None) -> dict: """Build the structured UVC spool metadata summary. Inputs: parsed records, ignored-line count, optional expected FPS, and an optional synthetic timeline. Output: JSON-serializable metrics. Why: both humans and follow-up automation need the same artifact to decide whether a failing end-to-end HEVC run needs transport, decode, UVC, or RCT attention. """ profiles = Counter(record["profile"] for record in records) byte_counts = [float(record["bytes"]) for record in records] source_pts = sorted( record["source_pts_us"] for record in records if isinstance(record.get("source_pts_us"), int) ) spool_ns = sorted(record["spool_unix_ns"] for record in records) source_intervals = diffs(source_pts) spool_intervals = [(b - a) / 1_000_000.0 for a, b in zip(spool_ns, spool_ns[1:])] decoded_deltas = [ (record["decoded_pts_us"] - record["source_pts_us"]) / 1000.0 for record in records if isinstance(record.get("decoded_pts_us"), int) and isinstance(record.get("source_pts_us"), int) ] expected_interval_ms = 1000.0 / fps if fps and fps > 0 else None cadence_hiccups = ( sum(1 for value in source_intervals if value > expected_interval_ms * 1.5) if expected_interval_ms is not None else None ) return { "schema": "lesavka.uvc-mjpeg-spool-summary.v1", "record_count": len(records), "ignored_line_count": ignored, "profiles": dict(sorted(profiles.items())), "sequence_first": min(record["sequence"] for record in records), "sequence_last": max(record["sequence"] for record in records), "sequence_gap_count": sequence_gap_count(records), "bytes_median": percentile(byte_counts, 0.50), "bytes_p95": percentile(byte_counts, 0.95), "bytes_max": max(byte_counts) if byte_counts else None, "source_pts_span_ms": ((source_pts[-1] - source_pts[0]) / 1000.0) if len(source_pts) >= 2 else None, "source_interval_p95_ms": percentile(source_intervals, 0.95), "source_interval_max_ms": max(source_intervals) if source_intervals else None, "spool_interval_p95_ms": percentile(spool_intervals, 0.95), "spool_interval_max_ms": max(spool_intervals) if spool_intervals else None, "expected_interval_ms": expected_interval_ms, "source_cadence_hiccup_count": cadence_hiccups, "decoded_pts_delta_median_ms": percentile(decoded_deltas, 0.50), "decoded_pts_delta_p95_ms": percentile(decoded_deltas, 0.95), "event_coverage": event_coverage(records, timeline), } def format_ms(value: float | None) -> str: """Format optional millisecond values for concise text output. Inputs: a numeric value or `None`. Output: display string. Why: report text should make absent evidence explicit instead of quietly rendering `null`. """ return "n/a" if value is None else f"{value:.1f} ms" def write_text_report(path: pathlib.Path, log_path: pathlib.Path, summary: dict) -> None: """Write a human-readable spool metadata report. Inputs: output path, source log path, and structured summary. Output: report file on disk. Why: the run matrix logs are easiest to scan when key timing evidence is available as text next to the JSON artifact. """ coverage = summary.get("event_coverage") or {} coverage_line = "n/a" if coverage: coverage_line = ( f"{coverage.get('covered_events', 0)}/{coverage.get('expected_events', 0)}" f" missing_codes={coverage.get('missing_codes', [])}" ) lines = [ f"UVC frame metadata summary for {log_path}", f"- records: {summary['record_count']} ignored_lines={summary['ignored_line_count']}", f"- profiles: {summary['profiles']}", f"- sequence: {summary['sequence_first']}..{summary['sequence_last']} gaps={summary['sequence_gap_count']}", f"- source cadence: p95={format_ms(summary['source_interval_p95_ms'])} max={format_ms(summary['source_interval_max_ms'])} hiccups={summary['source_cadence_hiccup_count']}", f"- spool cadence: p95={format_ms(summary['spool_interval_p95_ms'])} max={format_ms(summary['spool_interval_max_ms'])}", f"- decoded PTS delta: median={format_ms(summary['decoded_pts_delta_median_ms'])} p95={format_ms(summary['decoded_pts_delta_p95_ms'])}", f"- event coverage: {coverage_line}", ] path.write_text("\n".join(lines) + "\n") def parse_args(argv: list[str]) -> argparse.Namespace: """Parse command-line options for artifact summarization. Inputs: CLI argv. Output: argparse namespace. Why: the script is intended for both manual postmortems and automated probe wrappers, so all outputs are explicit file paths rather than implicit terminal scraping. """ parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("log_jsonl", type=pathlib.Path) parser.add_argument("json_out", type=pathlib.Path) parser.add_argument("txt_out", type=pathlib.Path) parser.add_argument("--fps", type=float, default=None) parser.add_argument("--timeline", type=pathlib.Path, default=None) return parser.parse_args(argv) def main(argv: list[str]) -> int: """Run the UVC frame metadata summarizer. Inputs: command-line arguments. Output: process exit code. Why: returning explicit non-zero statuses makes probe wrappers fail fast when metadata was enabled but no valid frame records were captured. """ args = parse_args(argv) records, ignored = load_records(args.log_jsonl) if not records: print(f"no valid {SCHEMA} records found in {args.log_jsonl}", file=sys.stderr) return 1 summary = summarize(records, ignored, args.fps, args.timeline) args.json_out.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") write_text_report(args.txt_out, args.log_jsonl, summary) print(f"summary_json: {args.json_out}") print(f"summary_txt: {args.txt_out}") return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))