#!/usr/bin/env python3 """Summarize client-origin transport timing from RCT capture artifacts.""" from __future__ import annotations import json import math import pathlib import statistics import subprocess import sys from client_rct_transport_layers import client_send_summary, freshness_bottleneck def capture_start_ns(path: pathlib.Path) -> int | None: """Return the RCT recorder Unix start timestamp when the capture log has it. Inputs: a capture log path written by the remote recorder. Outputs: the nanosecond Unix timestamp or `None`. Why: client-origin event timestamps need to be translated into the capture file's timebase before end-to-end media age can be measured. """ for line in path.read_text(errors="replace").splitlines(): if line.startswith("capture_start_unix_ns="): return int(line.split("=", 1)[1].strip()) return None def percentile(values: list[float], q: float) -> float | None: """Return a simple nearest-rank percentile for finite values. Inputs: numeric samples and a quantile in `[0, 1]`. Outputs: the percentile or `None` for an empty set. Why: manual transport reports should match the conservative p95 style used by the server-to-RCT gate without pulling in extra dependencies. """ finite = sorted(value for value in values if math.isfinite(value)) if not finite: return None index = min(len(finite) - 1, max(0, math.ceil(len(finite) * q) - 1)) return finite[index] def fmt_ms(value: float | None) -> str: """Format optional millisecond evidence for compact text reports. Inputs: a numeric millisecond value or `None`. Output: display text. Why: missing layer evidence should remain explicit when optional samplers are disabled, rather than becoming a confusing `null` or Python exception. """ return f"{value:.1f}ms" if value is not None else "unavailable" def ffprobe_times(capture_path: pathlib.Path, kind: str) -> list[float]: """Read video frame or audio packet timestamps from a capture. Inputs: the Matroska capture path and `video` or `audio`. Outputs: timestamp seconds from ffprobe. Why: smoothness warnings need cadence evidence even when the sync analyzer correctly focuses only on flash/tone onsets. """ selector = "v:0" if kind == "video" else "a:0" section = "frame=pts_time" if kind == "video" else "packet=pts_time" show = "-show_frames" if kind == "video" else "-show_packets" try: output = subprocess.check_output( [ "ffprobe", "-v", "error", "-select_streams", selector, show, "-show_entries", section, "-of", "json", str(capture_path), ], text=True, stderr=subprocess.DEVNULL, ) data = json.loads(output) except Exception: return [] rows = data.get("frames" if kind == "video" else "packets", []) times: list[float] = [] for row in rows: try: times.append(float(row["pts_time"])) except (KeyError, TypeError, ValueError): pass return times def smoothness_summary( capture_path: pathlib.Path, timeline: dict, require_smoothness: bool, ) -> dict: """Compute coarse cadence warnings for the final RCT capture. Inputs: the final capture, client timeline media profile, and whether smoothness should be hard-gated. Outputs: a JSON-serializable smoothness summary. Why: we are not tuning smoothness yet, but the circuit test should preserve enough evidence to notice if transport improvements regress cadence. """ fps = float(timeline.get("camera_fps") or 0.0) video_times = ffprobe_times(capture_path, "video") audio_times = ffprobe_times(capture_path, "audio") expected_video_ms = 1000.0 / fps if fps > 0 else None video_intervals = [(b - a) * 1000.0 for a, b in zip(video_times, video_times[1:])] audio_intervals = [(b - a) * 1000.0 for a, b in zip(audio_times, audio_times[1:])] video_jitter = ( [abs(value - expected_video_ms) for value in video_intervals] if expected_video_ms else [] ) audio_median = statistics.median(audio_intervals) if audio_intervals else None audio_jitter = ( [abs(value - audio_median) for value in audio_intervals] if audio_median else [] ) video_hiccups = sum( 1 for value in video_intervals if expected_video_ms and value > expected_video_ms * 1.75 ) audio_hiccups = sum( 1 for value in audio_intervals if audio_median and value > audio_median * 2.5 ) return { "passed": video_hiccups == 0 and audio_hiccups == 0, "required": require_smoothness, "video_frames": len(video_times), "video_expected_interval_ms": expected_video_ms, "video_p95_jitter_ms": percentile(video_jitter, 0.95), "video_max_interval_ms": max(video_intervals) if video_intervals else None, "video_hiccups": video_hiccups, "audio_packets": len(audio_times), "audio_median_interval_ms": audio_median, "audio_p95_jitter_ms": percentile(audio_jitter, 0.95), "audio_max_interval_ms": max(audio_intervals) if audio_intervals else None, "audio_hiccups": audio_hiccups, } def parse_float_field(fields: dict, name: str) -> float | None: """Read a numeric upstream-sync field when relayctl reported one. Inputs: parsed relayctl fields and a key name. Outputs: a finite float or `None` for `pending`/missing values. Why: failed black-box runs need lightweight ingress diagnosis without requiring a second log-scraping tool. """ raw = fields.get(name) if raw is None or raw == "pending": return None try: value = float(raw) except (TypeError, ValueError): return None return value if math.isfinite(value) else None def upstream_sync_summary(report_path: pathlib.Path, timeline: dict) -> dict | None: """Summarize client-to-server timing from optional sampler artifacts. Inputs: the report path, used to find sibling `upstream-sync-samples.jsonl`, and the client-origin timeline. Outputs: transport-lag and queue-age percentiles, or `None`. Why: when final RCT freshness fails, the sampler shows whether media was already late at server ingress or only after server handoff. """ samples_path = report_path.parent / "upstream-sync-samples.jsonl" if not samples_path.exists(): return None client_start_unix_ns = int(timeline.get("client_start_unix_ns") or 0) if client_start_unix_ns <= 0: return None media_lags: list[float] = [] camera_lags: list[float] = [] microphone_lags: list[float] = [] camera_queue_ages: list[float] = [] microphone_queue_ages: list[float] = [] server_receive_ages: list[float] = [] sink_late_values: list[float] = [] live_samples = 0 for line in samples_path.read_text(errors="replace").splitlines(): try: record = json.loads(line) except json.JSONDecodeError: continue fields = record.get("fields", {}) sample_unix_ns = int(record.get("sample_unix_ns") or 0) if sample_unix_ns <= client_start_unix_ns: continue sample_rel_ms = (sample_unix_ns - client_start_unix_ns) / 1_000_000.0 camera_pts_ms = parse_float_field(fields, "planner_latest_camera_remote_pts_us") microphone_pts_ms = parse_float_field( fields, "planner_latest_microphone_remote_pts_us" ) if camera_pts_ms is None and microphone_pts_ms is None: continue live_samples += 1 if camera_pts_ms is not None: camera_pts_ms /= 1000.0 camera_lags.append(sample_rel_ms - camera_pts_ms) media_lags.append(sample_rel_ms - camera_pts_ms) if microphone_pts_ms is not None: microphone_pts_ms /= 1000.0 microphone_lags.append(sample_rel_ms - microphone_pts_ms) media_lags.append(sample_rel_ms - microphone_pts_ms) for key, bucket in [ ("planner_camera_client_queue_age_ms", camera_queue_ages), ("planner_microphone_client_queue_age_ms", microphone_queue_ages), ("planner_camera_server_receive_age_ms", server_receive_ages), ("planner_microphone_server_receive_age_ms", server_receive_ages), ("planner_camera_sink_late_ms", sink_late_values), ("planner_microphone_sink_late_ms", sink_late_values), ]: value = parse_float_field(fields, key) if value is not None: bucket.append(value) if live_samples == 0: return None return { "sample_count": live_samples, "media_transport_lag_p50_ms": percentile(media_lags, 0.50), "media_transport_lag_p95_ms": percentile(media_lags, 0.95), "camera_transport_lag_p95_ms": percentile(camera_lags, 0.95), "microphone_transport_lag_p95_ms": percentile(microphone_lags, 0.95), "camera_client_queue_age_p95_ms": percentile(camera_queue_ages, 0.95), "microphone_client_queue_age_p95_ms": percentile(microphone_queue_ages, 0.95), "server_receive_age_p95_ms": percentile(server_receive_ages, 0.95), "sink_late_p95_ms": percentile(sink_late_values, 0.95), } def uvc_spool_summary(report_path: pathlib.Path) -> dict | None: """Load optional server UVC spool-boundary timing next to the RCT report. Inputs: the analyzer report path. Output: parsed spool summary or `None`. Why: blind HEVC runs need one compact report that shows whether synthetic coded frames reached the server's decoded-MJPEG spool before final RCT capture, without making the normal non-mutating probe require this artifact. """ summary_path = report_path.parent / "uvc-frame-meta-summary.json" if not summary_path.exists(): return None try: summary = json.loads(summary_path.read_text()) except (OSError, json.JSONDecodeError): return None if summary.get("schema") != "lesavka.uvc-mjpeg-spool-summary.v1": return None return summary def build_summary(args: list[str]) -> tuple[dict, str]: """Build the transport summary JSON and human text. Inputs: command-line paths and thresholds from the Bash harness. Outputs: structured summary plus text lines. Why: keeping this in Python makes the shell runner small and leaves the timing math easy to test or extend if black-box results fail. """ ( report_path, timeline_path, capture_log_path, clock_path, capture_path, _json_out, _txt_out, max_age_raw, min_pairs_raw, require_smoothness_raw, ) = args report_file = pathlib.Path(report_path) report = json.loads(report_file.read_text()) timeline = json.loads(pathlib.Path(timeline_path).read_text()) clock = json.loads(pathlib.Path(clock_path).read_text()) max_age_ms = float(max_age_raw) min_pairs = int(min_pairs_raw) require_smoothness = require_smoothness_raw not in {"0", "false", "False", "no", "off"} capture_start = capture_start_ns(pathlib.Path(capture_log_path)) offset_ns = int(clock.get("capture_clock_offset_from_client_ns") or 0) uncertainty_ms = float(clock.get("clock_uncertainty_ms") or 0.0) timeline_events = {int(event["event_id"]): event for event in timeline.get("events", [])} joined: list[dict] = [] video_ages: list[float] = [] audio_ages: list[float] = [] for pair in report.get("paired_events", []): paired_server_event_id = pair.get("server_event_id") event_id = int( paired_server_event_id if paired_server_event_id is not None else pair.get("event_id", -1) ) event = timeline_events.get(event_id) if not event or capture_start is None: continue expected_capture_s = ( int(event["client_capture_unix_ns"]) + offset_ns - capture_start ) / 1_000_000_000.0 video_age_ms = (float(pair["video_time_s"]) - expected_capture_s) * 1000.0 audio_age_ms = (float(pair["audio_time_s"]) - expected_capture_s) * 1000.0 video_ages.append(video_age_ms) audio_ages.append(audio_age_ms) joined.append( { "event_id": event_id, "event_code": event.get("code"), "client_planned_start_us": event.get("planned_start_us"), "client_expected_capture_s": expected_capture_s, "tethys_video_time_s": pair.get("video_time_s"), "tethys_audio_time_s": pair.get("audio_time_s"), "video_age_ms": video_age_ms, "audio_age_ms": audio_age_ms, "skew_ms": pair.get("skew_ms"), "confidence": pair.get("confidence"), } ) worst_p95 = max( value for value in [percentile(video_ages, 0.95), percentile(audio_ages, 0.95)] if value is not None ) if video_ages or audio_ages else None freshness_budget_ms = worst_p95 + uncertainty_ms if worst_p95 is not None else None sync = report.get("verdict", {}) smoothness = smoothness_summary(pathlib.Path(capture_path), timeline, require_smoothness) upstream_sync = upstream_sync_summary(report_file, timeline) client_send = client_send_summary(report_file, joined) uvc_spool = uvc_spool_summary(report_file) freshness_passed = ( freshness_budget_ms is not None and freshness_budget_ms <= max_age_ms and len(joined) >= min_pairs ) passed = ( bool(sync.get("passed")) and freshness_passed and (smoothness["passed"] or not require_smoothness) ) summary = { "schema": "lesavka.client-rct-transport-summary.v1", "passed": passed, "sync_passed": bool(sync.get("passed")), "sync_status": sync.get("status"), "paired_event_count": len(joined), "min_paired_events": min_pairs, "freshness_passed": freshness_passed, "freshness_worst_p95_ms": worst_p95, "freshness_budget_ms": freshness_budget_ms, "freshness_limit_ms": max_age_ms, "clock_uncertainty_ms": uncertainty_ms, "video_age_p95_ms": percentile(video_ages, 0.95), "audio_age_p95_ms": percentile(audio_ages, 0.95), "smoothness": smoothness, "upstream_sync": upstream_sync, "client_send": client_send, "uvc_spool": uvc_spool, "expected_event_count": len(timeline_events), "freshness_bottleneck": None, "events": joined, } summary["freshness_bottleneck"] = freshness_bottleneck(summary) text = "\n".join(human_lines(report_path, summary, sync, smoothness)) + "\n" return summary, text def human_lines(report_path: str, summary: dict, sync: dict, smoothness: dict) -> list[str]: """Render a compact operator summary. Inputs: structured timing summaries. Outputs: readable report lines. Why: the user should be able to paste a short tail and still preserve the three dimensions we care about: sync, freshness, and smoothness. """ lines = [ f"Client-to-RCT transport summary for {report_path}", f"- verdict: {'pass' if summary['passed'] else 'fail'}", f"- sync: {sync.get('status', 'unknown')} ({'pass' if sync.get('passed') else 'fail'}), p95={float(sync.get('p95_abs_skew_ms', 0.0)):.1f}ms", f"- paired events: {summary['paired_event_count']}/{summary['min_paired_events']}", f"- synthetic evidence: paired={summary['paired_event_count']}/{summary['expected_event_count']} expected coded events", ] if summary["freshness_budget_ms"] is None: lines.append("- freshness: unavailable") else: lines.append( f"- freshness: {'pass' if summary['freshness_passed'] else 'fail'} " f"budget={summary['freshness_budget_ms']:.1f}ms " f"limit={summary['freshness_limit_ms']:.1f}ms" ) for label, key in [("video", "video_age_p95_ms"), ("audio", "audio_age_p95_ms")]: value = summary[key] lines.append( f"- {label} age p95: {value:.1f}ms" if value is not None else f"- {label} age p95: unavailable" ) lines.append( f"- smoothness: {'pass' if smoothness['passed'] else 'warn'} " f"video_hiccups={smoothness['video_hiccups']} " f"audio_hiccups={smoothness['audio_hiccups']} " f"video_p95_jitter={smoothness['video_p95_jitter_ms']}" ) client_send = summary.get("client_send") if client_send: lines.append( "- client send: " f"bundles={client_send['bundle_count']} " f"joined={client_send['joined_event_count']} " f"local_age_p95={fmt_ms(client_send.get('local_bundle_age_p95_ms'))} " f"post_send_to_rct_worst_p95={fmt_ms(client_send.get('post_client_send_worst_p95_ms'))}" ) lines.append(f"- freshness bottleneck: {summary['freshness_bottleneck']}") upstream = summary.get("upstream_sync") if upstream: lag = fmt_ms(upstream.get("media_transport_lag_p95_ms")) camera_queue = fmt_ms(upstream.get("camera_client_queue_age_p95_ms")) microphone_queue = fmt_ms(upstream.get("microphone_client_queue_age_p95_ms")) server_age = fmt_ms(upstream.get("server_receive_age_p95_ms")) sink_late = fmt_ms(upstream.get("sink_late_p95_ms")) lines.append( "- upstream sampler: " f"samples={upstream['sample_count']} " f"transport_lag_p95={lag} " f"client_queue_p95=video {camera_queue}/audio {microphone_queue} " f"server_receive_age_p95={server_age} " f"sink_late_p95={sink_late}" ) spool = summary.get("uvc_spool") if spool: coverage = spool.get("event_coverage") or {} expected = coverage.get("expected_events", 0) covered = coverage.get("covered_events", 0) missing = coverage.get("missing_codes", []) source_hiccups = spool.get("source_cadence_hiccup_count") spool_p95 = spool.get("spool_interval_p95_ms") decoded_p95 = spool.get("decoded_pts_delta_p95_ms") lines.append( "- UVC spool boundary: " f"records={spool.get('record_count')} " f"events={covered}/{expected} " f"missing_codes={missing} " f"sequence_gaps={spool.get('sequence_gap_count')} " f"source_hiccups={source_hiccups} " f"spool_interval_p95={fmt_ms(spool_p95)} " f"decoded_delta_p95={fmt_ms(decoded_p95)}" ) return lines def main() -> int: """CLI entrypoint for the manual transport summary helper.""" if len(sys.argv) != 11: print( "usage: client_rct_transport_summary.py REPORT TIMELINE CAPTURE_LOG CLOCK CAPTURE JSON_OUT TXT_OUT MAX_AGE_MS MIN_PAIRS REQUIRE_SMOOTHNESS", file=sys.stderr, ) return 2 summary, text = build_summary(sys.argv[1:]) pathlib.Path(sys.argv[6]).write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n") pathlib.Path(sys.argv[7]).write_text(text) print(text, end="") return 0 if summary["passed"] else 1 if __name__ == "__main__": raise SystemExit(main())