#!/usr/bin/env python3 """Enforce Ariadne's ratcheting test-quality gate. Inputs: repository Python files, optional coverage JSON, and a TOML config that captures the current legacy exceptions. Outputs: a JSON report plus a non-zero exit code when file-size, docstring, or coverage requirements regress, so CI can block quality drift while allowing incremental cleanup. """ from __future__ import annotations import argparse import ast import json from dataclasses import dataclass from pathlib import Path from typing import Any import tomllib @dataclass(frozen=True) class DefinitionFinding: """Describe a public definition missing a required docstring. Inputs: the symbol kind, name, source line, and logical size. Outputs: a compact record that makes docstring failures actionable in CLI output and in the JSON quality report. """ kind: str name: str lineno: int length: int @dataclass(frozen=True) class Violation: """Represent a single quality-gate violation. Inputs: the violated check name, file path, and a human-readable message. Outputs: a normalized record for console rendering and JSON serialization so Jenkins and local runs report the same facts. """ check: str path: str message: str @dataclass(frozen=True) class QualityConfig: """Typed quality-gate settings loaded from TOML. Inputs: parsed configuration sections for file-size, docstrings, and coverage enforcement. Outputs: one immutable object so the gate logic stays deterministic and easy to test. """ line_roots: tuple[str, ...] max_lines: int legacy_max_lines: dict[str, int] docstring_roots: tuple[str, ...] non_trivial_min_lines: int legacy_missing_docstrings: dict[str, int] coverage_roots: tuple[str, ...] coverage_targets: tuple[str, ...] coverage_threshold: float def _load_config(path: Path) -> QualityConfig: """Load the quality-gate config from TOML. Inputs: the path to the repository-local TOML config file. Outputs: validated `QualityConfig` values used by every gate check so local runs and Jenkins share the same policy. """ payload = tomllib.loads(path.read_text(encoding="utf-8")) files = payload.get("files") or {} docstrings = payload.get("docstrings") or {} coverage = payload.get("coverage") or {} legacy = payload.get("legacy") or {} return QualityConfig( line_roots=tuple(str(item) for item in files.get("roots") or ("ariadne", "tests", "scripts")), max_lines=int(files.get("max_lines", 500)), legacy_max_lines={str(key): int(value) for key, value in (legacy.get("line_count") or {}).items()}, docstring_roots=tuple(str(item) for item in docstrings.get("roots") or ("ariadne", "scripts")), non_trivial_min_lines=int(docstrings.get("non_trivial_min_lines", 6)), legacy_missing_docstrings={ str(key): int(value) for key, value in (legacy.get("docstrings") or {}).items() }, coverage_roots=tuple(str(item) for item in coverage.get("roots") or ("ariadne",)), coverage_targets=tuple(str(item) for item in coverage.get("targets") or ()), coverage_threshold=float(coverage.get("threshold", 95.0)), ) def _iter_python_files(repo_root: Path, roots: tuple[str, ...]) -> list[Path]: """Collect Python files under the configured roots. Inputs: the repository root plus the roots to scan. Outputs: sorted Python paths so the gate produces stable results and diffs. """ files: list[Path] = [] for root in roots: base = repo_root / root if not base.exists(): continue files.extend(sorted(base.rglob("*.py"))) return sorted({path for path in files}) def _relative(path: Path, repo_root: Path) -> str: return path.relative_to(repo_root).as_posix() def _line_count(path: Path) -> int: return len(path.read_text(encoding="utf-8").splitlines()) def _definition_length(node: ast.AST) -> int: end_lineno = getattr(node, "end_lineno", None) or getattr(node, "lineno", 0) return max(end_lineno - getattr(node, "lineno", 0) + 1, 1) def _missing_docstrings(path: Path, min_lines: int) -> list[DefinitionFinding]: """Find public top-level definitions missing required docstrings. Inputs: a Python file path and the minimum logical size considered non-trivial. Outputs: missing-docstring findings so the gate can ratchet legacy files while blocking new undocumented public APIs. """ module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) findings: list[DefinitionFinding] = [] for node in module.body: if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): continue if node.name.startswith("_"): continue length = _definition_length(node) if length < min_lines: continue if ast.get_docstring(node) is not None: continue findings.append( DefinitionFinding( kind=type(node).__name__, name=node.name, lineno=getattr(node, "lineno", 1), length=length, ) ) return findings def _excluded_coverage_lines(path: Path) -> set[int]: """Collect non-executable lines that Slipcover still reports as missing. Inputs: a Python source file path. Outputs: line numbers for multiline definition headers and docstring blocks so adjusted per-file coverage tracks executable logic rather than syntax scaffolding required for readability. """ module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path)) excluded: set[int] = set() def visit(node: ast.AST) -> None: if isinstance(node, ast.Module): for child in node.body: visit(child) return if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): return if node.body: body_start = node.body[0].lineno excluded.update(range(node.lineno + 1, body_start)) first = node.body[0] if ( isinstance(first, ast.Expr) and isinstance(first.value, ast.Constant) and isinstance(first.value.value, str) ): excluded.update(range(first.lineno, (getattr(first, "end_lineno", first.lineno) or first.lineno) + 1)) for child in node.body: visit(child) visit(module) return excluded def _load_coverage(path: Path | None, repo_root: Path) -> dict[str, dict[str, Any]]: """Read per-file coverage details from Slipcover JSON output. Inputs: the optional coverage artifact path produced by the test run. Outputs: raw and adjusted per-file coverage details so the gate can enforce realistic thresholds even when Slipcover counts docstrings and wrapped signatures as missing lines. """ if path is None or not path.exists(): return {} payload = json.loads(path.read_text(encoding="utf-8")) files = payload.get("files") or {} coverage: dict[str, dict[str, Any]] = {} for name, data in files.items(): if not isinstance(data, dict): continue summary = data.get("summary") or {} percent = summary.get("percent_covered") executed_lines = {int(line) for line in data.get("executed_lines") or [] if isinstance(line, int)} missing_lines = {int(line) for line in data.get("missing_lines") or [] if isinstance(line, int)} relative = str(name) source_path = repo_root / relative excluded_lines = _excluded_coverage_lines(source_path) if source_path.exists() else set() adjusted_missing = missing_lines - excluded_lines adjusted_total = len(executed_lines) + len(adjusted_missing) adjusted_percent = 100.0 if adjusted_total == 0 else (len(executed_lines) / adjusted_total) * 100.0 coverage[relative] = { "raw_percent": float(percent) if isinstance(percent, (int, float)) else None, "adjusted_percent": adjusted_percent, "excluded_lines": sorted(excluded_lines), } return coverage def _serialize_violations(violations: list[Violation]) -> list[dict[str, str]]: return [{"check": item.check, "path": item.path, "message": item.message} for item in violations] def _build_report( repo_root: Path, config: QualityConfig, coverage: dict[str, dict[str, Any]], coverage_artifact_present: bool, ) -> dict[str, Any]: """Run all configured checks and build a JSON-serializable report. Inputs: repository paths, quality-gate config, and per-file coverage data. Outputs: a complete report for CI artifacts, metrics publication, and local debugging when the gate fails. """ violations: list[Violation] = [] files_report: dict[str, dict[str, Any]] = {} for path in _iter_python_files(repo_root, config.line_roots): relative = _relative(path, repo_root) lines = _line_count(path) entry = files_report.setdefault(relative, {}) entry["lines"] = lines entry["line_limit"] = config.legacy_max_lines.get(relative, config.max_lines) entry["line_limit_legacy"] = relative in config.legacy_max_lines if lines > entry["line_limit"]: violations.append( Violation( "line_count", relative, f"{relative} has {lines} lines; allowed maximum is {entry['line_limit']}", ) ) for path in _iter_python_files(repo_root, config.docstring_roots): relative = _relative(path, repo_root) findings = _missing_docstrings(path, config.non_trivial_min_lines) entry = files_report.setdefault(relative, {}) entry["missing_docstrings"] = len(findings) entry["missing_docstrings_allowed"] = config.legacy_missing_docstrings.get(relative, 0) entry["missing_docstrings_legacy"] = relative in config.legacy_missing_docstrings entry["missing_docstring_symbols"] = [ { "kind": finding.kind, "name": finding.name, "lineno": finding.lineno, "length": finding.length, } for finding in findings ] if len(findings) > entry["missing_docstrings_allowed"]: excess = findings[entry["missing_docstrings_allowed"] :] for finding in excess: violations.append( Violation( "docstrings", relative, f"missing docstring for {finding.kind} {finding.name} at line {finding.lineno}", ) ) coverage_target_set = set(config.coverage_targets) coverage_root_files = { _relative(path, repo_root) for path in _iter_python_files(repo_root, config.coverage_roots) } for relative in sorted(coverage_root_files): entry = files_report.setdefault(relative, {}) details = coverage.get(relative) or {} value = details.get("adjusted_percent") entry["coverage_percent"] = value entry["coverage_raw_percent"] = details.get("raw_percent") entry["coverage_excluded_lines"] = details.get("excluded_lines") or [] entry["coverage_enforced"] = relative in coverage_target_set if relative not in coverage_target_set: continue entry["coverage_target"] = config.coverage_threshold if value is None: violations.append( Violation("coverage", relative, f"missing coverage data for {relative}") ) continue if value < config.coverage_threshold: violations.append( Violation( "coverage", relative, f"{relative} coverage {value:.2f}% is below {config.coverage_threshold:.2f}%", ) ) if coverage_target_set and not coverage_artifact_present: violations.append( Violation("coverage", "build/coverage.json", "coverage artifact missing for enforced coverage targets") ) summary = { "violations_total": len(violations), "line_count_violations": sum(item.check == "line_count" for item in violations), "docstring_violations": sum(item.check == "docstrings" for item in violations), "coverage_violations": sum(item.check == "coverage" for item in violations), "legacy_line_count_files": len(config.legacy_max_lines), "legacy_docstring_files": len(config.legacy_missing_docstrings), "coverage_targets": len(coverage_target_set), "coverage_exemptions": max(len(coverage_root_files) - len(coverage_target_set), 0), } return { "status": "ok" if not violations else "failed", "rules": { "max_lines": config.max_lines, "docstring_non_trivial_min_lines": config.non_trivial_min_lines, "coverage_threshold": config.coverage_threshold, }, "summary": summary, "violations": _serialize_violations(violations), "files": dict(sorted(files_report.items())), } def _print_report(report: dict[str, Any]) -> None: """Render a concise CLI summary for local and Jenkins logs. Inputs: the JSON-ready report produced by the gate. Outputs: human-readable lines that point directly at each violation so a failing build is easy to fix. """ print(json.dumps(report.get("summary") or {}, indent=2, sort_keys=True)) for violation in report.get("violations") or []: print(f"[{violation['check']}] {violation['path']}: {violation['message']}") def parse_args() -> argparse.Namespace: """Parse CLI arguments for the quality gate. Inputs: command-line flags supplied by Jenkins or a local developer. Outputs: normalized paths and options so the gate stays scriptable and predictable in every environment. """ parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--config", default="quality_gate.toml", help="path to the quality-gate TOML config") parser.add_argument("--coverage-json", default="build/coverage.json", help="path to Slipcover JSON output") parser.add_argument("--output", default="build/quality-gate.json", help="path to write the JSON report") return parser.parse_args() def main() -> int: """Run the Ariadne quality gate and write its JSON report. Inputs: CLI arguments naming the config and optional coverage artifact. Outputs: a persisted JSON report and a process exit code that Jenkins can use to enforce quality rules. """ args = parse_args() repo_root = Path.cwd() config_path = repo_root / args.config output_path = repo_root / args.output coverage_path = repo_root / args.coverage_json config = _load_config(config_path) coverage_present = coverage_path.exists() coverage = _load_coverage(coverage_path if coverage_present else None, repo_root) report = _build_report(repo_root, config, coverage, coverage_present) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8") _print_report(report) return 0 if report["status"] == "ok" else 1 if __name__ == "__main__": raise SystemExit(main())