"""Source-of-truth quality-gate runner for titan-iac.""" from __future__ import annotations import argparse import json import subprocess import sys import time from pathlib import Path from typing import Any from testing.quality_contract import load_contract from testing.quality_coverage import ( compute_workspace_line_coverage, run_check as run_coverage_check, ) from testing.quality_docs import run_check as run_docs_check from testing.quality_hygiene import ( count_files_over_line_limit, run_check as run_hygiene_check, ) RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"] RUFF_IGNORE = ["B017", "UP015", "UP035"] def _status_from_issues(issues: list[str]) -> str: """Map an issue list to the gate status string.""" return "ok" if not issues else "failed" def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]: """Build a JSON-serializable result record for the summary payload.""" return {"name": name, "description": description, "status": status, **extra} def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]: """Run the pedantic ruff subset against the contract's lint paths.""" command = [ sys.executable, "-m", "ruff", "check", "--select", ",".join(RUFF_SELECT), "--ignore", ",".join(RUFF_IGNORE), *contract.get("lint_paths", []), ] started_at = time.monotonic() completed = subprocess.run(command, cwd=root, check=False) return _result( "smell", "Code-smell lint for managed Python automation.", "ok" if completed.returncode == 0 else "failed", returncode=completed.returncode, command=command, duration_seconds=round(time.monotonic() - started_at, 3), ) def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]: """Run a pytest suite and collect junit/coverage artifact locations.""" junit_path = root / suite["junit"] junit_path.parent.mkdir(parents=True, exist_ok=True) command = [ sys.executable, "-m", "pytest", "-q", *suite.get("paths", []), f"--junitxml={junit_path}", ] coverage_xml = suite.get("coverage_xml") if coverage_xml: for source in suite.get("coverage_sources", []): command.append(f"--cov={source}") command.extend( [ "--cov-branch", f"--cov-report=xml:{root / coverage_xml}", ] ) started_at = time.monotonic() completed = subprocess.run(command, cwd=root, check=False) return _result( suite_name, suite["description"], "ok" if completed.returncode == 0 else "failed", returncode=completed.returncode, command=command, junit=str(junit_path.relative_to(root)), coverage_xml=coverage_xml, duration_seconds=round(time.monotonic() - started_at, 3), ) def run_profile( contract: dict[str, Any], root: Path, profile_name: str, build_dir: Path, ) -> dict[str, Any]: """Execute the configured profile and return a JSON-serializable summary.""" build_dir.mkdir(parents=True, exist_ok=True) results: list[dict[str, Any]] = [] profiles = contract.get("profiles", {}) if profile_name not in profiles: raise SystemExit(f"unknown profile: {profile_name}") for check_name in profiles[profile_name]: if check_name == "docs": issues = run_docs_check(contract, root) results.append( _result( "docs", "Required docs, contract descriptions, and module docstrings.", _status_from_issues(issues), issues=issues, ) ) continue if check_name == "smell": results.append(_run_ruff(contract, root)) continue if check_name == "hygiene": issues = run_hygiene_check(contract, root) results.append( _result( "hygiene", "500 LOC hygiene and naming rules for managed test automation.", _status_from_issues(issues), issues=issues, ) ) continue if check_name == "coverage": unit_suite = contract.get("pytest_suites", {}).get("unit", {}) coverage_xml = root / unit_suite.get("coverage_xml", "build/coverage-unit.xml") issues = run_coverage_check(contract, root, coverage_xml) results.append( _result( "coverage", "Per-file 95% coverage floor for tracked quality-managed modules.", _status_from_issues(issues), issues=issues, coverage_xml=str(coverage_xml.relative_to(root)), ) ) continue suite = contract.get("pytest_suites", {}).get(check_name) if suite is None: raise SystemExit(f"profile {profile_name} references unknown check: {check_name}") results.append(_run_pytest_suite(root, check_name, suite)) status = "ok" if all(item["status"] == "ok" for item in results) else "failed" workspace_line_coverage_percent = 0.0 if "coverage" in profiles[profile_name]: unit_suite = contract.get("pytest_suites", {}).get("unit", {}) coverage_xml_rel = unit_suite.get("coverage_xml") if coverage_xml_rel: workspace_line_coverage_percent = compute_workspace_line_coverage( contract, root, root / coverage_xml_rel, ) return { "profile": profile_name, "status": status, "results": results, "manual_scripts": contract.get("manual_scripts", []), "workspace_line_coverage_percent": workspace_line_coverage_percent, "source_lines_over_500": count_files_over_line_limit(contract, root), } def main(argv: list[str] | None = None) -> int: """CLI entrypoint for the quality gate.""" parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--profile", default="local") parser.add_argument("--build-dir", default="build") args = parser.parse_args(argv) root = Path.cwd() build_dir = root / args.build_dir build_dir.mkdir(parents=True, exist_ok=True) contract = load_contract() summary = run_profile(contract, root, args.profile, build_dir) summary_path = build_dir / "quality-gate-summary.json" summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") return 0 if summary["status"] == "ok" else 1 if __name__ == "__main__": raise SystemExit(main())