ariadne/scripts/check_quality_gate.py

408 lines
15 KiB
Python
Raw Permalink Normal View History

2026-04-10 13:57:33 -03:00
#!/usr/bin/env python3
"""Enforce Ariadne's ratcheting test-quality gate.
Inputs: repository Python files, optional coverage JSON, and a TOML config that
captures the current legacy exceptions.
Outputs: a JSON report plus a non-zero exit code when file-size, docstring, or
coverage requirements regress, so CI can block quality drift while allowing
incremental cleanup.
"""
from __future__ import annotations
import argparse
import ast
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import tomllib
@dataclass(frozen=True)
class DefinitionFinding:
"""Describe a public definition missing a required docstring.
Inputs: the symbol kind, name, source line, and logical size.
Outputs: a compact record that makes docstring failures actionable in CLI
output and in the JSON quality report.
"""
kind: str
name: str
lineno: int
length: int
@dataclass(frozen=True)
class Violation:
"""Represent a single quality-gate violation.
Inputs: the violated check name, file path, and a human-readable message.
Outputs: a normalized record for console rendering and JSON serialization so
Jenkins and local runs report the same facts.
"""
check: str
path: str
message: str
@dataclass(frozen=True)
class QualityConfig:
"""Typed quality-gate settings loaded from TOML.
Inputs: parsed configuration sections for file-size, docstrings, and
coverage enforcement.
Outputs: one immutable object so the gate logic stays deterministic and easy
to test.
"""
line_roots: tuple[str, ...]
max_lines: int
legacy_max_lines: dict[str, int]
docstring_roots: tuple[str, ...]
non_trivial_min_lines: int
legacy_missing_docstrings: dict[str, int]
coverage_roots: tuple[str, ...]
coverage_targets: tuple[str, ...]
coverage_threshold: float
def _load_config(path: Path) -> QualityConfig:
"""Load the quality-gate config from TOML.
Inputs: the path to the repository-local TOML config file.
Outputs: validated `QualityConfig` values used by every gate check so local
runs and Jenkins share the same policy.
"""
payload = tomllib.loads(path.read_text(encoding="utf-8"))
files = payload.get("files") or {}
docstrings = payload.get("docstrings") or {}
coverage = payload.get("coverage") or {}
legacy = payload.get("legacy") or {}
return QualityConfig(
line_roots=tuple(str(item) for item in files.get("roots") or ("ariadne", "tests", "scripts")),
max_lines=int(files.get("max_lines", 500)),
legacy_max_lines={str(key): int(value) for key, value in (legacy.get("line_count") or {}).items()},
docstring_roots=tuple(str(item) for item in docstrings.get("roots") or ("ariadne", "scripts")),
non_trivial_min_lines=int(docstrings.get("non_trivial_min_lines", 6)),
legacy_missing_docstrings={
str(key): int(value) for key, value in (legacy.get("docstrings") or {}).items()
},
coverage_roots=tuple(str(item) for item in coverage.get("roots") or ("ariadne",)),
coverage_targets=tuple(str(item) for item in coverage.get("targets") or ()),
coverage_threshold=float(coverage.get("threshold", 95.0)),
)
def _iter_python_files(repo_root: Path, roots: tuple[str, ...]) -> list[Path]:
"""Collect Python files under the configured roots.
Inputs: the repository root plus the roots to scan.
Outputs: sorted Python paths so the gate produces stable results and diffs.
"""
files: list[Path] = []
for root in roots:
base = repo_root / root
if not base.exists():
continue
files.extend(sorted(base.rglob("*.py")))
return sorted({path for path in files})
def _relative(path: Path, repo_root: Path) -> str:
return path.relative_to(repo_root).as_posix()
def _line_count(path: Path) -> int:
return len(path.read_text(encoding="utf-8").splitlines())
def _definition_length(node: ast.AST) -> int:
end_lineno = getattr(node, "end_lineno", None) or getattr(node, "lineno", 0)
return max(end_lineno - getattr(node, "lineno", 0) + 1, 1)
def _missing_docstrings(path: Path, min_lines: int) -> list[DefinitionFinding]:
"""Find public top-level definitions missing required docstrings.
Inputs: a Python file path and the minimum logical size considered
non-trivial.
Outputs: missing-docstring findings so the gate can ratchet legacy files
while blocking new undocumented public APIs.
"""
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
findings: list[DefinitionFinding] = []
for node in module.body:
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
continue
if node.name.startswith("_"):
continue
length = _definition_length(node)
if length < min_lines:
continue
if ast.get_docstring(node) is not None:
continue
findings.append(
DefinitionFinding(
kind=type(node).__name__,
name=node.name,
lineno=getattr(node, "lineno", 1),
length=length,
)
)
return findings
def _excluded_coverage_lines(path: Path) -> set[int]:
"""Collect non-executable lines that Slipcover still reports as missing.
Inputs: a Python source file path.
Outputs: line numbers for multiline definition headers and docstring blocks
so adjusted per-file coverage tracks executable logic rather than syntax
scaffolding required for readability.
"""
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
excluded: set[int] = set()
def visit(node: ast.AST) -> None:
if isinstance(node, ast.Module):
for child in node.body:
visit(child)
return
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
return
if node.body:
body_start = node.body[0].lineno
excluded.update(range(node.lineno + 1, body_start))
first = node.body[0]
if (
isinstance(first, ast.Expr)
and isinstance(first.value, ast.Constant)
and isinstance(first.value.value, str)
):
excluded.update(range(first.lineno, (getattr(first, "end_lineno", first.lineno) or first.lineno) + 1))
for child in node.body:
visit(child)
visit(module)
return excluded
def _load_coverage(path: Path | None, repo_root: Path) -> dict[str, dict[str, Any]]:
"""Read per-file coverage details from Slipcover JSON output.
Inputs: the optional coverage artifact path produced by the test run.
Outputs: raw and adjusted per-file coverage details so the gate can enforce
realistic thresholds even when Slipcover counts docstrings and wrapped
signatures as missing lines.
"""
if path is None or not path.exists():
return {}
payload = json.loads(path.read_text(encoding="utf-8"))
files = payload.get("files") or {}
coverage: dict[str, dict[str, Any]] = {}
for name, data in files.items():
if not isinstance(data, dict):
continue
summary = data.get("summary") or {}
percent = summary.get("percent_covered")
executed_lines = {int(line) for line in data.get("executed_lines") or [] if isinstance(line, int)}
missing_lines = {int(line) for line in data.get("missing_lines") or [] if isinstance(line, int)}
relative = str(name)
source_path = repo_root / relative
excluded_lines = _excluded_coverage_lines(source_path) if source_path.exists() else set()
adjusted_missing = missing_lines - excluded_lines
adjusted_total = len(executed_lines) + len(adjusted_missing)
adjusted_percent = 100.0 if adjusted_total == 0 else (len(executed_lines) / adjusted_total) * 100.0
coverage[relative] = {
"raw_percent": float(percent) if isinstance(percent, (int, float)) else None,
"adjusted_percent": adjusted_percent,
"excluded_lines": sorted(excluded_lines),
}
return coverage
def _serialize_violations(violations: list[Violation]) -> list[dict[str, str]]:
return [{"check": item.check, "path": item.path, "message": item.message} for item in violations]
def _build_report(
repo_root: Path,
config: QualityConfig,
coverage: dict[str, dict[str, Any]],
coverage_artifact_present: bool,
) -> dict[str, Any]:
"""Run all configured checks and build a JSON-serializable report.
Inputs: repository paths, quality-gate config, and per-file coverage data.
Outputs: a complete report for CI artifacts, metrics publication, and local
debugging when the gate fails.
"""
violations: list[Violation] = []
files_report: dict[str, dict[str, Any]] = {}
for path in _iter_python_files(repo_root, config.line_roots):
relative = _relative(path, repo_root)
lines = _line_count(path)
entry = files_report.setdefault(relative, {})
entry["lines"] = lines
entry["line_limit"] = config.legacy_max_lines.get(relative, config.max_lines)
entry["line_limit_legacy"] = relative in config.legacy_max_lines
if lines > entry["line_limit"]:
violations.append(
Violation(
"line_count",
relative,
f"{relative} has {lines} lines; allowed maximum is {entry['line_limit']}",
)
)
for path in _iter_python_files(repo_root, config.docstring_roots):
relative = _relative(path, repo_root)
findings = _missing_docstrings(path, config.non_trivial_min_lines)
entry = files_report.setdefault(relative, {})
entry["missing_docstrings"] = len(findings)
entry["missing_docstrings_allowed"] = config.legacy_missing_docstrings.get(relative, 0)
entry["missing_docstrings_legacy"] = relative in config.legacy_missing_docstrings
entry["missing_docstring_symbols"] = [
{
"kind": finding.kind,
"name": finding.name,
"lineno": finding.lineno,
"length": finding.length,
}
for finding in findings
]
if len(findings) > entry["missing_docstrings_allowed"]:
excess = findings[entry["missing_docstrings_allowed"] :]
for finding in excess:
violations.append(
Violation(
"docstrings",
relative,
f"missing docstring for {finding.kind} {finding.name} at line {finding.lineno}",
)
)
coverage_target_set = set(config.coverage_targets)
coverage_root_files = {
_relative(path, repo_root)
for path in _iter_python_files(repo_root, config.coverage_roots)
}
for relative in sorted(coverage_root_files):
entry = files_report.setdefault(relative, {})
details = coverage.get(relative) or {}
value = details.get("adjusted_percent")
entry["coverage_percent"] = value
entry["coverage_raw_percent"] = details.get("raw_percent")
entry["coverage_excluded_lines"] = details.get("excluded_lines") or []
entry["coverage_enforced"] = relative in coverage_target_set
if relative not in coverage_target_set:
continue
entry["coverage_target"] = config.coverage_threshold
if value is None:
violations.append(
Violation("coverage", relative, f"missing coverage data for {relative}")
)
continue
if value < config.coverage_threshold:
violations.append(
Violation(
"coverage",
relative,
f"{relative} coverage {value:.2f}% is below {config.coverage_threshold:.2f}%",
)
)
if coverage_target_set and not coverage_artifact_present:
violations.append(
Violation("coverage", "build/coverage.json", "coverage artifact missing for enforced coverage targets")
)
summary = {
"violations_total": len(violations),
"line_count_violations": sum(item.check == "line_count" for item in violations),
"docstring_violations": sum(item.check == "docstrings" for item in violations),
"coverage_violations": sum(item.check == "coverage" for item in violations),
"legacy_line_count_files": len(config.legacy_max_lines),
"legacy_docstring_files": len(config.legacy_missing_docstrings),
"coverage_targets": len(coverage_target_set),
"coverage_exemptions": max(len(coverage_root_files) - len(coverage_target_set), 0),
}
return {
"status": "ok" if not violations else "failed",
"rules": {
"max_lines": config.max_lines,
"docstring_non_trivial_min_lines": config.non_trivial_min_lines,
"coverage_threshold": config.coverage_threshold,
},
"summary": summary,
"violations": _serialize_violations(violations),
"files": dict(sorted(files_report.items())),
}
def _print_report(report: dict[str, Any]) -> None:
"""Render a concise CLI summary for local and Jenkins logs.
Inputs: the JSON-ready report produced by the gate.
Outputs: human-readable lines that point directly at each violation so a
failing build is easy to fix.
"""
print(json.dumps(report.get("summary") or {}, indent=2, sort_keys=True))
for violation in report.get("violations") or []:
print(f"[{violation['check']}] {violation['path']}: {violation['message']}")
def parse_args() -> argparse.Namespace:
"""Parse CLI arguments for the quality gate.
Inputs: command-line flags supplied by Jenkins or a local developer.
Outputs: normalized paths and options so the gate stays scriptable and
predictable in every environment.
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--config", default="quality_gate.toml", help="path to the quality-gate TOML config")
parser.add_argument("--coverage-json", default="build/coverage.json", help="path to Slipcover JSON output")
parser.add_argument("--output", default="build/quality-gate.json", help="path to write the JSON report")
return parser.parse_args()
def main() -> int:
"""Run the Ariadne quality gate and write its JSON report.
Inputs: CLI arguments naming the config and optional coverage artifact.
Outputs: a persisted JSON report and a process exit code that Jenkins can
use to enforce quality rules.
"""
args = parse_args()
repo_root = Path.cwd()
config_path = repo_root / args.config
output_path = repo_root / args.output
coverage_path = repo_root / args.coverage_json
config = _load_config(config_path)
coverage_present = coverage_path.exists()
coverage = _load_coverage(coverage_path if coverage_present else None, repo_root)
report = _build_report(repo_root, config, coverage, coverage_present)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
_print_report(report)
return 0 if report["status"] == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())