408 lines
15 KiB
Python
408 lines
15 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Enforce Ariadne's ratcheting test-quality gate.
|
||
|
|
|
||
|
|
Inputs: repository Python files, optional coverage JSON, and a TOML config that
|
||
|
|
captures the current legacy exceptions.
|
||
|
|
Outputs: a JSON report plus a non-zero exit code when file-size, docstring, or
|
||
|
|
coverage requirements regress, so CI can block quality drift while allowing
|
||
|
|
incremental cleanup.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import ast
|
||
|
|
import json
|
||
|
|
from dataclasses import dataclass
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any
|
||
|
|
import tomllib
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class DefinitionFinding:
|
||
|
|
"""Describe a public definition missing a required docstring.
|
||
|
|
|
||
|
|
Inputs: the symbol kind, name, source line, and logical size.
|
||
|
|
Outputs: a compact record that makes docstring failures actionable in CLI
|
||
|
|
output and in the JSON quality report.
|
||
|
|
"""
|
||
|
|
|
||
|
|
kind: str
|
||
|
|
name: str
|
||
|
|
lineno: int
|
||
|
|
length: int
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class Violation:
|
||
|
|
"""Represent a single quality-gate violation.
|
||
|
|
|
||
|
|
Inputs: the violated check name, file path, and a human-readable message.
|
||
|
|
Outputs: a normalized record for console rendering and JSON serialization so
|
||
|
|
Jenkins and local runs report the same facts.
|
||
|
|
"""
|
||
|
|
|
||
|
|
check: str
|
||
|
|
path: str
|
||
|
|
message: str
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class QualityConfig:
|
||
|
|
"""Typed quality-gate settings loaded from TOML.
|
||
|
|
|
||
|
|
Inputs: parsed configuration sections for file-size, docstrings, and
|
||
|
|
coverage enforcement.
|
||
|
|
Outputs: one immutable object so the gate logic stays deterministic and easy
|
||
|
|
to test.
|
||
|
|
"""
|
||
|
|
|
||
|
|
line_roots: tuple[str, ...]
|
||
|
|
max_lines: int
|
||
|
|
legacy_max_lines: dict[str, int]
|
||
|
|
docstring_roots: tuple[str, ...]
|
||
|
|
non_trivial_min_lines: int
|
||
|
|
legacy_missing_docstrings: dict[str, int]
|
||
|
|
coverage_roots: tuple[str, ...]
|
||
|
|
coverage_targets: tuple[str, ...]
|
||
|
|
coverage_threshold: float
|
||
|
|
|
||
|
|
|
||
|
|
def _load_config(path: Path) -> QualityConfig:
|
||
|
|
"""Load the quality-gate config from TOML.
|
||
|
|
|
||
|
|
Inputs: the path to the repository-local TOML config file.
|
||
|
|
Outputs: validated `QualityConfig` values used by every gate check so local
|
||
|
|
runs and Jenkins share the same policy.
|
||
|
|
"""
|
||
|
|
|
||
|
|
payload = tomllib.loads(path.read_text(encoding="utf-8"))
|
||
|
|
files = payload.get("files") or {}
|
||
|
|
docstrings = payload.get("docstrings") or {}
|
||
|
|
coverage = payload.get("coverage") or {}
|
||
|
|
legacy = payload.get("legacy") or {}
|
||
|
|
return QualityConfig(
|
||
|
|
line_roots=tuple(str(item) for item in files.get("roots") or ("ariadne", "tests", "scripts")),
|
||
|
|
max_lines=int(files.get("max_lines", 500)),
|
||
|
|
legacy_max_lines={str(key): int(value) for key, value in (legacy.get("line_count") or {}).items()},
|
||
|
|
docstring_roots=tuple(str(item) for item in docstrings.get("roots") or ("ariadne", "scripts")),
|
||
|
|
non_trivial_min_lines=int(docstrings.get("non_trivial_min_lines", 6)),
|
||
|
|
legacy_missing_docstrings={
|
||
|
|
str(key): int(value) for key, value in (legacy.get("docstrings") or {}).items()
|
||
|
|
},
|
||
|
|
coverage_roots=tuple(str(item) for item in coverage.get("roots") or ("ariadne",)),
|
||
|
|
coverage_targets=tuple(str(item) for item in coverage.get("targets") or ()),
|
||
|
|
coverage_threshold=float(coverage.get("threshold", 95.0)),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _iter_python_files(repo_root: Path, roots: tuple[str, ...]) -> list[Path]:
|
||
|
|
"""Collect Python files under the configured roots.
|
||
|
|
|
||
|
|
Inputs: the repository root plus the roots to scan.
|
||
|
|
Outputs: sorted Python paths so the gate produces stable results and diffs.
|
||
|
|
"""
|
||
|
|
|
||
|
|
files: list[Path] = []
|
||
|
|
for root in roots:
|
||
|
|
base = repo_root / root
|
||
|
|
if not base.exists():
|
||
|
|
continue
|
||
|
|
files.extend(sorted(base.rglob("*.py")))
|
||
|
|
return sorted({path for path in files})
|
||
|
|
|
||
|
|
|
||
|
|
def _relative(path: Path, repo_root: Path) -> str:
|
||
|
|
return path.relative_to(repo_root).as_posix()
|
||
|
|
|
||
|
|
|
||
|
|
def _line_count(path: Path) -> int:
|
||
|
|
return len(path.read_text(encoding="utf-8").splitlines())
|
||
|
|
|
||
|
|
|
||
|
|
def _definition_length(node: ast.AST) -> int:
|
||
|
|
end_lineno = getattr(node, "end_lineno", None) or getattr(node, "lineno", 0)
|
||
|
|
return max(end_lineno - getattr(node, "lineno", 0) + 1, 1)
|
||
|
|
|
||
|
|
|
||
|
|
def _missing_docstrings(path: Path, min_lines: int) -> list[DefinitionFinding]:
|
||
|
|
"""Find public top-level definitions missing required docstrings.
|
||
|
|
|
||
|
|
Inputs: a Python file path and the minimum logical size considered
|
||
|
|
non-trivial.
|
||
|
|
Outputs: missing-docstring findings so the gate can ratchet legacy files
|
||
|
|
while blocking new undocumented public APIs.
|
||
|
|
"""
|
||
|
|
|
||
|
|
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
|
||
|
|
findings: list[DefinitionFinding] = []
|
||
|
|
for node in module.body:
|
||
|
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
|
||
|
|
continue
|
||
|
|
if node.name.startswith("_"):
|
||
|
|
continue
|
||
|
|
length = _definition_length(node)
|
||
|
|
if length < min_lines:
|
||
|
|
continue
|
||
|
|
if ast.get_docstring(node) is not None:
|
||
|
|
continue
|
||
|
|
findings.append(
|
||
|
|
DefinitionFinding(
|
||
|
|
kind=type(node).__name__,
|
||
|
|
name=node.name,
|
||
|
|
lineno=getattr(node, "lineno", 1),
|
||
|
|
length=length,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
return findings
|
||
|
|
|
||
|
|
|
||
|
|
def _excluded_coverage_lines(path: Path) -> set[int]:
|
||
|
|
"""Collect non-executable lines that Slipcover still reports as missing.
|
||
|
|
|
||
|
|
Inputs: a Python source file path.
|
||
|
|
Outputs: line numbers for multiline definition headers and docstring blocks
|
||
|
|
so adjusted per-file coverage tracks executable logic rather than syntax
|
||
|
|
scaffolding required for readability.
|
||
|
|
"""
|
||
|
|
|
||
|
|
module = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
|
||
|
|
excluded: set[int] = set()
|
||
|
|
|
||
|
|
def visit(node: ast.AST) -> None:
|
||
|
|
if isinstance(node, ast.Module):
|
||
|
|
for child in node.body:
|
||
|
|
visit(child)
|
||
|
|
return
|
||
|
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
|
||
|
|
return
|
||
|
|
if node.body:
|
||
|
|
body_start = node.body[0].lineno
|
||
|
|
excluded.update(range(node.lineno + 1, body_start))
|
||
|
|
first = node.body[0]
|
||
|
|
if (
|
||
|
|
isinstance(first, ast.Expr)
|
||
|
|
and isinstance(first.value, ast.Constant)
|
||
|
|
and isinstance(first.value.value, str)
|
||
|
|
):
|
||
|
|
excluded.update(range(first.lineno, (getattr(first, "end_lineno", first.lineno) or first.lineno) + 1))
|
||
|
|
for child in node.body:
|
||
|
|
visit(child)
|
||
|
|
|
||
|
|
visit(module)
|
||
|
|
return excluded
|
||
|
|
|
||
|
|
|
||
|
|
def _load_coverage(path: Path | None, repo_root: Path) -> dict[str, dict[str, Any]]:
|
||
|
|
"""Read per-file coverage details from Slipcover JSON output.
|
||
|
|
|
||
|
|
Inputs: the optional coverage artifact path produced by the test run.
|
||
|
|
Outputs: raw and adjusted per-file coverage details so the gate can enforce
|
||
|
|
realistic thresholds even when Slipcover counts docstrings and wrapped
|
||
|
|
signatures as missing lines.
|
||
|
|
"""
|
||
|
|
|
||
|
|
if path is None or not path.exists():
|
||
|
|
return {}
|
||
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
||
|
|
files = payload.get("files") or {}
|
||
|
|
coverage: dict[str, dict[str, Any]] = {}
|
||
|
|
for name, data in files.items():
|
||
|
|
if not isinstance(data, dict):
|
||
|
|
continue
|
||
|
|
summary = data.get("summary") or {}
|
||
|
|
percent = summary.get("percent_covered")
|
||
|
|
executed_lines = {int(line) for line in data.get("executed_lines") or [] if isinstance(line, int)}
|
||
|
|
missing_lines = {int(line) for line in data.get("missing_lines") or [] if isinstance(line, int)}
|
||
|
|
relative = str(name)
|
||
|
|
source_path = repo_root / relative
|
||
|
|
excluded_lines = _excluded_coverage_lines(source_path) if source_path.exists() else set()
|
||
|
|
adjusted_missing = missing_lines - excluded_lines
|
||
|
|
adjusted_total = len(executed_lines) + len(adjusted_missing)
|
||
|
|
adjusted_percent = 100.0 if adjusted_total == 0 else (len(executed_lines) / adjusted_total) * 100.0
|
||
|
|
coverage[relative] = {
|
||
|
|
"raw_percent": float(percent) if isinstance(percent, (int, float)) else None,
|
||
|
|
"adjusted_percent": adjusted_percent,
|
||
|
|
"excluded_lines": sorted(excluded_lines),
|
||
|
|
}
|
||
|
|
return coverage
|
||
|
|
|
||
|
|
|
||
|
|
def _serialize_violations(violations: list[Violation]) -> list[dict[str, str]]:
|
||
|
|
return [{"check": item.check, "path": item.path, "message": item.message} for item in violations]
|
||
|
|
|
||
|
|
|
||
|
|
def _build_report(
|
||
|
|
repo_root: Path,
|
||
|
|
config: QualityConfig,
|
||
|
|
coverage: dict[str, dict[str, Any]],
|
||
|
|
coverage_artifact_present: bool,
|
||
|
|
) -> dict[str, Any]:
|
||
|
|
"""Run all configured checks and build a JSON-serializable report.
|
||
|
|
|
||
|
|
Inputs: repository paths, quality-gate config, and per-file coverage data.
|
||
|
|
Outputs: a complete report for CI artifacts, metrics publication, and local
|
||
|
|
debugging when the gate fails.
|
||
|
|
"""
|
||
|
|
|
||
|
|
violations: list[Violation] = []
|
||
|
|
files_report: dict[str, dict[str, Any]] = {}
|
||
|
|
|
||
|
|
for path in _iter_python_files(repo_root, config.line_roots):
|
||
|
|
relative = _relative(path, repo_root)
|
||
|
|
lines = _line_count(path)
|
||
|
|
entry = files_report.setdefault(relative, {})
|
||
|
|
entry["lines"] = lines
|
||
|
|
entry["line_limit"] = config.legacy_max_lines.get(relative, config.max_lines)
|
||
|
|
entry["line_limit_legacy"] = relative in config.legacy_max_lines
|
||
|
|
if lines > entry["line_limit"]:
|
||
|
|
violations.append(
|
||
|
|
Violation(
|
||
|
|
"line_count",
|
||
|
|
relative,
|
||
|
|
f"{relative} has {lines} lines; allowed maximum is {entry['line_limit']}",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
for path in _iter_python_files(repo_root, config.docstring_roots):
|
||
|
|
relative = _relative(path, repo_root)
|
||
|
|
findings = _missing_docstrings(path, config.non_trivial_min_lines)
|
||
|
|
entry = files_report.setdefault(relative, {})
|
||
|
|
entry["missing_docstrings"] = len(findings)
|
||
|
|
entry["missing_docstrings_allowed"] = config.legacy_missing_docstrings.get(relative, 0)
|
||
|
|
entry["missing_docstrings_legacy"] = relative in config.legacy_missing_docstrings
|
||
|
|
entry["missing_docstring_symbols"] = [
|
||
|
|
{
|
||
|
|
"kind": finding.kind,
|
||
|
|
"name": finding.name,
|
||
|
|
"lineno": finding.lineno,
|
||
|
|
"length": finding.length,
|
||
|
|
}
|
||
|
|
for finding in findings
|
||
|
|
]
|
||
|
|
if len(findings) > entry["missing_docstrings_allowed"]:
|
||
|
|
excess = findings[entry["missing_docstrings_allowed"] :]
|
||
|
|
for finding in excess:
|
||
|
|
violations.append(
|
||
|
|
Violation(
|
||
|
|
"docstrings",
|
||
|
|
relative,
|
||
|
|
f"missing docstring for {finding.kind} {finding.name} at line {finding.lineno}",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
coverage_target_set = set(config.coverage_targets)
|
||
|
|
coverage_root_files = {
|
||
|
|
_relative(path, repo_root)
|
||
|
|
for path in _iter_python_files(repo_root, config.coverage_roots)
|
||
|
|
}
|
||
|
|
for relative in sorted(coverage_root_files):
|
||
|
|
entry = files_report.setdefault(relative, {})
|
||
|
|
details = coverage.get(relative) or {}
|
||
|
|
value = details.get("adjusted_percent")
|
||
|
|
entry["coverage_percent"] = value
|
||
|
|
entry["coverage_raw_percent"] = details.get("raw_percent")
|
||
|
|
entry["coverage_excluded_lines"] = details.get("excluded_lines") or []
|
||
|
|
entry["coverage_enforced"] = relative in coverage_target_set
|
||
|
|
if relative not in coverage_target_set:
|
||
|
|
continue
|
||
|
|
entry["coverage_target"] = config.coverage_threshold
|
||
|
|
if value is None:
|
||
|
|
violations.append(
|
||
|
|
Violation("coverage", relative, f"missing coverage data for {relative}")
|
||
|
|
)
|
||
|
|
continue
|
||
|
|
if value < config.coverage_threshold:
|
||
|
|
violations.append(
|
||
|
|
Violation(
|
||
|
|
"coverage",
|
||
|
|
relative,
|
||
|
|
f"{relative} coverage {value:.2f}% is below {config.coverage_threshold:.2f}%",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
if coverage_target_set and not coverage_artifact_present:
|
||
|
|
violations.append(
|
||
|
|
Violation("coverage", "build/coverage.json", "coverage artifact missing for enforced coverage targets")
|
||
|
|
)
|
||
|
|
|
||
|
|
summary = {
|
||
|
|
"violations_total": len(violations),
|
||
|
|
"line_count_violations": sum(item.check == "line_count" for item in violations),
|
||
|
|
"docstring_violations": sum(item.check == "docstrings" for item in violations),
|
||
|
|
"coverage_violations": sum(item.check == "coverage" for item in violations),
|
||
|
|
"legacy_line_count_files": len(config.legacy_max_lines),
|
||
|
|
"legacy_docstring_files": len(config.legacy_missing_docstrings),
|
||
|
|
"coverage_targets": len(coverage_target_set),
|
||
|
|
"coverage_exemptions": max(len(coverage_root_files) - len(coverage_target_set), 0),
|
||
|
|
}
|
||
|
|
return {
|
||
|
|
"status": "ok" if not violations else "failed",
|
||
|
|
"rules": {
|
||
|
|
"max_lines": config.max_lines,
|
||
|
|
"docstring_non_trivial_min_lines": config.non_trivial_min_lines,
|
||
|
|
"coverage_threshold": config.coverage_threshold,
|
||
|
|
},
|
||
|
|
"summary": summary,
|
||
|
|
"violations": _serialize_violations(violations),
|
||
|
|
"files": dict(sorted(files_report.items())),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def _print_report(report: dict[str, Any]) -> None:
|
||
|
|
"""Render a concise CLI summary for local and Jenkins logs.
|
||
|
|
|
||
|
|
Inputs: the JSON-ready report produced by the gate.
|
||
|
|
Outputs: human-readable lines that point directly at each violation so a
|
||
|
|
failing build is easy to fix.
|
||
|
|
"""
|
||
|
|
|
||
|
|
print(json.dumps(report.get("summary") or {}, indent=2, sort_keys=True))
|
||
|
|
for violation in report.get("violations") or []:
|
||
|
|
print(f"[{violation['check']}] {violation['path']}: {violation['message']}")
|
||
|
|
|
||
|
|
|
||
|
|
def parse_args() -> argparse.Namespace:
|
||
|
|
"""Parse CLI arguments for the quality gate.
|
||
|
|
|
||
|
|
Inputs: command-line flags supplied by Jenkins or a local developer.
|
||
|
|
Outputs: normalized paths and options so the gate stays scriptable and
|
||
|
|
predictable in every environment.
|
||
|
|
"""
|
||
|
|
|
||
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
||
|
|
parser.add_argument("--config", default="quality_gate.toml", help="path to the quality-gate TOML config")
|
||
|
|
parser.add_argument("--coverage-json", default="build/coverage.json", help="path to Slipcover JSON output")
|
||
|
|
parser.add_argument("--output", default="build/quality-gate.json", help="path to write the JSON report")
|
||
|
|
return parser.parse_args()
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
"""Run the Ariadne quality gate and write its JSON report.
|
||
|
|
|
||
|
|
Inputs: CLI arguments naming the config and optional coverage artifact.
|
||
|
|
Outputs: a persisted JSON report and a process exit code that Jenkins can
|
||
|
|
use to enforce quality rules.
|
||
|
|
"""
|
||
|
|
|
||
|
|
args = parse_args()
|
||
|
|
repo_root = Path.cwd()
|
||
|
|
config_path = repo_root / args.config
|
||
|
|
output_path = repo_root / args.output
|
||
|
|
coverage_path = repo_root / args.coverage_json
|
||
|
|
|
||
|
|
config = _load_config(config_path)
|
||
|
|
coverage_present = coverage_path.exists()
|
||
|
|
coverage = _load_coverage(coverage_path if coverage_present else None, repo_root)
|
||
|
|
report = _build_report(repo_root, config, coverage, coverage_present)
|
||
|
|
|
||
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
output_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||
|
|
_print_report(report)
|
||
|
|
return 0 if report["status"] == "ok" else 1
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
raise SystemExit(main())
|