371 lines
14 KiB
Python
371 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
"""Unified quality gate for the repo's managed production scope."""
|
|
|
|
import argparse
|
|
import ast
|
|
import json
|
|
import re
|
|
import xml.etree.ElementTree as ET
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
DEFAULT_CONTRACT = ROOT / "testing" / "quality_contract.json"
|
|
DEFAULT_BACKEND_COVERAGE = ROOT / "build" / "backend-coverage.xml"
|
|
DEFAULT_FRONTEND_COVERAGE = ROOT / "frontend" / "coverage" / "coverage-summary.json"
|
|
|
|
TEXT_EXTENSIONS = {".py", ".js", ".mjs", ".ts", ".vue", ".json", ".yaml", ".yml"}
|
|
DOCSTRING_MIN_LINES = 10
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class GateIssue:
|
|
"""Describe one violated gate condition."""
|
|
|
|
check: str
|
|
path: str
|
|
message: str
|
|
|
|
|
|
def load_contract(path: Path) -> dict:
|
|
"""Load the JSON gate contract from disk."""
|
|
|
|
return json.loads(path.read_text())
|
|
|
|
|
|
def _resolve(path_str: str) -> Path:
|
|
path = Path(path_str)
|
|
return path if path.is_absolute() else ROOT / path
|
|
|
|
|
|
def _count_lines(path: Path) -> int:
|
|
return len(path.read_text().splitlines())
|
|
|
|
|
|
def check_file_sizes(paths: Iterable[Path], *, max_lines: int = 500) -> list[GateIssue]:
|
|
"""Flag text files that exceed the maximum line budget."""
|
|
|
|
issues: list[GateIssue] = []
|
|
for path in paths:
|
|
if not path.exists() or path.suffix.lower() not in TEXT_EXTENSIONS:
|
|
continue
|
|
lines = _count_lines(path)
|
|
if lines > max_lines:
|
|
issues.append(GateIssue("loc", str(path), f"{lines} lines exceeds {max_lines}"))
|
|
return issues
|
|
|
|
|
|
def _node_span(node: ast.AST) -> int:
|
|
"""Return the physical source span for a parsed Python definition."""
|
|
|
|
start = getattr(node, "lineno", 0)
|
|
end = getattr(node, "end_lineno", start)
|
|
return max(end - start + 1, 1)
|
|
|
|
|
|
def _is_nontrivial_python_node(node: ast.AST) -> bool:
|
|
"""Decide whether a Python definition needs an explicit contract.
|
|
|
|
WHY: the gate should document public APIs and meaningful logic without
|
|
forcing noisy docstrings on tiny private glue helpers.
|
|
"""
|
|
|
|
name = getattr(node, "name", "")
|
|
if isinstance(node, ast.ClassDef):
|
|
return not name.startswith("_") or _node_span(node) >= DOCSTRING_MIN_LINES
|
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
return False
|
|
if name.startswith("__") and name.endswith("__"):
|
|
return _node_span(node) >= DOCSTRING_MIN_LINES
|
|
if not name.startswith("_"):
|
|
return True
|
|
return _node_span(node) >= DOCSTRING_MIN_LINES
|
|
|
|
|
|
def _python_node_issues(path: Path) -> list[GateIssue]:
|
|
"""Require docstrings on non-trivial Python functions and classes."""
|
|
|
|
issues: list[GateIssue] = []
|
|
tree = ast.parse(path.read_text())
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
|
|
continue
|
|
if not _is_nontrivial_python_node(node):
|
|
continue
|
|
if ast.get_docstring(node):
|
|
continue
|
|
issues.append(GateIssue("docstring", str(path), f"missing docstring on {node.__class__.__name__} {node.name}"))
|
|
return issues
|
|
|
|
|
|
_FUNCTION_RE = re.compile(r"^\s*(?:export\s+)?function\s+([A-Za-z_$][\w$]*)\s*\(")
|
|
_CLASS_RE = re.compile(r"^\s*class\s+([A-Za-z_$][\w$]*)\s*")
|
|
|
|
|
|
def _has_js_contract(lines: list[str], index: int) -> bool:
|
|
"""Check whether the nearest leading comment block documents a JS function."""
|
|
|
|
seen_comment = False
|
|
for pos in range(index - 1, -1, -1):
|
|
raw = lines[pos].rstrip()
|
|
stripped = raw.strip()
|
|
if not stripped:
|
|
if seen_comment:
|
|
continue
|
|
continue
|
|
if stripped.startswith("//"):
|
|
seen_comment = True
|
|
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
|
|
return True
|
|
continue
|
|
if stripped.startswith("*"):
|
|
seen_comment = True
|
|
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
|
|
return True
|
|
continue
|
|
if stripped.endswith("*/"):
|
|
seen_comment = True
|
|
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
|
|
return True
|
|
continue
|
|
if stripped.startswith("/**"):
|
|
seen_comment = True
|
|
if "WHY:" in stripped or "@param" in stripped or "@returns" in stripped:
|
|
return True
|
|
continue
|
|
break
|
|
return seen_comment and any(
|
|
marker in line for line in lines[max(0, index - 6):index] for marker in ("WHY:", "@param", "@returns")
|
|
)
|
|
|
|
|
|
def _is_nontrivial_js_definition(lines: list[str], index: int) -> bool:
|
|
"""Decide whether a JavaScript definition needs a leading contract comment."""
|
|
|
|
current = lines[index]
|
|
exported = "export" in current.split("function", 1)[0].split("class", 1)[0]
|
|
if exported:
|
|
return True
|
|
depth = 0
|
|
for offset, line in enumerate(lines[index:], start=1):
|
|
depth += line.count("{")
|
|
depth -= line.count("}")
|
|
if offset >= DOCSTRING_MIN_LINES:
|
|
return True
|
|
if offset > 1 and depth <= 0:
|
|
return False
|
|
return False
|
|
|
|
|
|
def _js_node_issues(path: Path) -> list[GateIssue]:
|
|
"""Require leading contract comments for non-trivial JS functions/classes."""
|
|
|
|
lines = path.read_text().splitlines()
|
|
issues: list[GateIssue] = []
|
|
for index, line in enumerate(lines):
|
|
match = _FUNCTION_RE.match(line) or _CLASS_RE.match(line)
|
|
if not match:
|
|
continue
|
|
if not _is_nontrivial_js_definition(lines, index):
|
|
continue
|
|
name = match.group(1)
|
|
if _has_js_contract(lines, index):
|
|
continue
|
|
issues.append(GateIssue("docstring", str(path), f"missing contract comment on {name}"))
|
|
return issues
|
|
|
|
|
|
def check_docstrings(paths: Iterable[Path]) -> list[GateIssue]:
|
|
"""Check that managed production files document non-trivial definitions."""
|
|
|
|
issues: list[GateIssue] = []
|
|
for path in paths:
|
|
if not path.exists():
|
|
continue
|
|
suffix = path.suffix.lower()
|
|
if suffix == ".py":
|
|
issues.extend(_python_node_issues(path))
|
|
elif suffix in {".js", ".mjs", ".ts", ".vue"}:
|
|
issues.extend(_js_node_issues(path))
|
|
return issues
|
|
|
|
|
|
def _normalize_key(value: str) -> str:
|
|
return value.replace("\\", "/").lstrip("./")
|
|
|
|
|
|
def _path_suffixes(value: str) -> set[str]:
|
|
parts = _normalize_key(value).split("/")
|
|
return {"/".join(parts[index:]) for index in range(len(parts))}
|
|
|
|
|
|
def _coverage_lookup(report: dict, wanted: str) -> dict | None:
|
|
wanted_key = _normalize_key(wanted)
|
|
wanted_suffixes = _path_suffixes(wanted_key)
|
|
candidates = []
|
|
for key, value in report.items():
|
|
if not isinstance(value, dict) or "lines" not in value:
|
|
continue
|
|
normalized = _normalize_key(key)
|
|
if normalized == wanted_key or normalized in wanted_suffixes or any(normalized.endswith(f"/{suffix}") for suffix in wanted_suffixes):
|
|
candidates.append(value)
|
|
if candidates:
|
|
return candidates[0]
|
|
return None
|
|
|
|
|
|
def _load_frontend_coverage(path: Path) -> dict:
|
|
data = json.loads(path.read_text())
|
|
return {key: value for key, value in data.items() if isinstance(value, dict)}
|
|
|
|
|
|
def _load_backend_coverage(path: Path) -> dict[str, dict[str, float]]:
|
|
root = ET.parse(path).getroot()
|
|
report: dict[str, dict[str, float]] = {}
|
|
for class_node in root.findall(".//class"):
|
|
filename = class_node.attrib.get("filename")
|
|
if not filename:
|
|
continue
|
|
report[_normalize_key(filename)] = {
|
|
"lines": float(class_node.attrib.get("line-rate", "0")) * 100,
|
|
"branches": float(class_node.attrib.get("branch-rate", "0")) * 100,
|
|
}
|
|
return report
|
|
|
|
|
|
def check_coverage(
|
|
paths: Iterable[Path],
|
|
*,
|
|
backend_report: Path,
|
|
frontend_report: Path,
|
|
threshold: float = 95.0,
|
|
) -> list[GateIssue]:
|
|
"""Check the per-file coverage floor for the managed production scope."""
|
|
|
|
issues: list[GateIssue] = []
|
|
backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {}
|
|
frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {}
|
|
|
|
for path in paths:
|
|
if not path.exists():
|
|
continue
|
|
rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path))
|
|
if rel.startswith("backend/"):
|
|
metrics = _coverage_lookup(backend_cov, rel)
|
|
if metrics is None:
|
|
issues.append(GateIssue("coverage", rel, "missing from backend coverage report"))
|
|
continue
|
|
if metrics["lines"] < threshold:
|
|
issues.append(GateIssue("coverage", rel, f"line coverage {metrics['lines']:.2f}% below {threshold}%"))
|
|
elif rel.startswith("frontend/"):
|
|
lookup = rel.split("frontend/", 1)[1]
|
|
metrics = _coverage_lookup(frontend_cov, lookup)
|
|
if metrics is None:
|
|
issues.append(GateIssue("coverage", rel, "missing from frontend coverage report"))
|
|
continue
|
|
pct = metrics.get("lines", {}).get("pct", 0.0)
|
|
if pct < threshold:
|
|
issues.append(GateIssue("coverage", rel, f"line coverage {pct:.2f}% below {threshold}%"))
|
|
return issues
|
|
|
|
|
|
def _coverage_values_for_paths(
|
|
paths: Iterable[Path],
|
|
*,
|
|
backend_report: Path,
|
|
frontend_report: Path,
|
|
) -> list[float]:
|
|
"""Return per-file line coverage values for tracked backend/frontend files."""
|
|
|
|
backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {}
|
|
frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {}
|
|
values: list[float] = []
|
|
|
|
for path in paths:
|
|
if not path.exists():
|
|
continue
|
|
rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path))
|
|
if rel.startswith("backend/"):
|
|
metrics = _coverage_lookup(backend_cov, rel)
|
|
if metrics is None:
|
|
continue
|
|
values.append(float(metrics.get("lines", 0.0)))
|
|
continue
|
|
if rel.startswith("frontend/"):
|
|
lookup = rel.split("frontend/", 1)[1]
|
|
metrics = _coverage_lookup(frontend_cov, lookup)
|
|
if metrics is None:
|
|
continue
|
|
pct = metrics.get("lines", {}).get("pct", 0.0)
|
|
values.append(float(pct))
|
|
return values
|
|
|
|
|
|
def _build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description="Run the repo's unified quality gate")
|
|
parser.add_argument("--contract", default=str(DEFAULT_CONTRACT), help="Path to the JSON gate contract")
|
|
parser.add_argument("--backend-coverage", default=str(DEFAULT_BACKEND_COVERAGE), help="Backend coverage XML")
|
|
parser.add_argument("--frontend-coverage", default=str(DEFAULT_FRONTEND_COVERAGE), help="Frontend coverage summary JSON")
|
|
parser.add_argument("--report", default=str(ROOT / "build" / "quality-gate.json"), help="Write a JSON report here")
|
|
return parser
|
|
|
|
|
|
def run_gate(contract_path: Path, *, backend_coverage: Path, frontend_coverage: Path) -> tuple[list[GateIssue], dict]:
|
|
contract = load_contract(contract_path)
|
|
managed_files = [_resolve(path) for path in contract["managed_files"]]
|
|
docstring_files = [_resolve(path) for path in contract["docstring_files"]]
|
|
coverage_files = [_resolve(path) for path in contract["coverage_files"]]
|
|
max_lines = int(contract.get("max_lines", 500))
|
|
threshold = float(contract.get("coverage_threshold_pct", 95))
|
|
|
|
issues: list[GateIssue] = []
|
|
issues.extend(check_file_sizes(managed_files, max_lines=max_lines))
|
|
issues.extend(check_docstrings(docstring_files))
|
|
issues.extend(check_coverage(coverage_files, backend_report=backend_coverage, frontend_report=frontend_coverage, threshold=threshold))
|
|
coverage_values = _coverage_values_for_paths(
|
|
coverage_files,
|
|
backend_report=backend_coverage,
|
|
frontend_report=frontend_coverage,
|
|
)
|
|
workspace_line_coverage_percent = round(sum(coverage_values) / len(coverage_values), 3) if coverage_values else 0.0
|
|
source_lines_over_500 = sum(1 for issue in issues if issue.check == "loc")
|
|
report = {
|
|
"managed_files": [str(path.relative_to(ROOT)) for path in managed_files],
|
|
"docstring_files": [str(path.relative_to(ROOT)) for path in docstring_files],
|
|
"coverage_files": [str(path.relative_to(ROOT)) for path in coverage_files],
|
|
"max_lines": max_lines,
|
|
"coverage_threshold_pct": threshold,
|
|
"workspace_line_coverage_percent": workspace_line_coverage_percent,
|
|
"source_lines_over_500": source_lines_over_500,
|
|
"issue_count": len(issues),
|
|
"issues": [issue.__dict__ for issue in issues],
|
|
}
|
|
return issues, report
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = _build_parser()
|
|
args = parser.parse_args(argv)
|
|
backend_coverage = _resolve(args.backend_coverage)
|
|
frontend_coverage = _resolve(args.frontend_coverage)
|
|
report_path = _resolve(args.report)
|
|
issues, report = run_gate(_resolve(args.contract), backend_coverage=backend_coverage, frontend_coverage=frontend_coverage)
|
|
report_path.parent.mkdir(parents=True, exist_ok=True)
|
|
report_path.write_text(json.dumps(report, indent=2, sort_keys=True) + "\n")
|
|
|
|
for issue in issues:
|
|
print(f"{issue.check}: {issue.path}: {issue.message}")
|
|
|
|
if issues:
|
|
print(f"quality gate failed: {len(issues)} issue(s)")
|
|
return 1
|
|
|
|
print(f"quality gate passed: {len(report['managed_files'])} managed files checked")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|