titan-iac/testing/quality_gate.py

370 lines
13 KiB
Python

"""Source-of-truth quality-gate runner for titan-iac."""
from __future__ import annotations
import argparse
import base64
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
from testing.quality_contract import load_contract
from testing.quality_coverage import run_check as run_coverage_check
from testing.quality_docs import run_check as run_docs_check
from testing.quality_hygiene import run_check as run_hygiene_check
RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"]
RUFF_IGNORE = ["B017", "UP015", "UP035"]
def _env_flag(name: str, default: bool) -> bool:
"""Parse a boolean-like environment variable."""
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _load_json_report(path: Path) -> tuple[dict[str, Any] | None, str | None]:
"""Return parsed JSON report contents or a descriptive error."""
if not path.exists():
return None, f"report missing: {path}"
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
return None, f"report invalid JSON: {path} ({exc})"
if not isinstance(payload, dict):
return None, f"report payload must be an object: {path}"
return payload, None
def _sonarqube_gate_status_from_report(payload: dict[str, Any]) -> str:
"""Extract a SonarQube quality-gate status from a report payload."""
project_status = payload.get("projectStatus")
if isinstance(project_status, dict):
status = project_status.get("status")
if isinstance(status, str):
return status
status = payload.get("status")
if isinstance(status, str):
return status
return ""
def _fetch_sonarqube_gate_status(
host_url: str,
project_key: str,
token: str,
timeout_seconds: float,
) -> tuple[str, str | None]:
"""Query SonarQube for the project's current quality-gate status."""
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(
f"{host_url.rstrip('/')}/api/qualitygates/project_status?{query}",
method="GET",
)
if token:
encoded = base64.b64encode(f"{token}:".encode()).decode()
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
payload = json.loads(response.read().decode("utf-8"))
except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
return "", f"sonarqube query failed: {exc}"
if not isinstance(payload, dict):
return "", "sonarqube query returned non-object payload"
status = _sonarqube_gate_status_from_report(payload)
if status:
return status, None
return "", "sonarqube response missing projectStatus.status"
def _run_sonarqube_check(build_dir: Path) -> dict[str, Any]:
"""Enforce SonarQube quality gate using report or API evidence."""
enforce = _env_flag("QUALITY_GATE_SONARQUBE_ENFORCE", default=True)
report_rel = os.getenv(
"QUALITY_GATE_SONARQUBE_REPORT",
str(build_dir / "sonarqube-quality-gate.json"),
)
report_path = Path(report_rel)
if not report_path.is_absolute():
report_path = Path.cwd() / report_path
host_url = os.getenv("SONARQUBE_HOST_URL", "").strip()
project_key = os.getenv("SONARQUBE_PROJECT_KEY", "").strip()
token = os.getenv("SONARQUBE_TOKEN", "").strip()
timeout_seconds = float(os.getenv("QUALITY_GATE_SONARQUBE_TIMEOUT_SECONDS", "12"))
gate_status = ""
source = ""
issues: list[str] = []
report_payload, report_error = _load_json_report(report_path)
if report_payload is not None:
gate_status = _sonarqube_gate_status_from_report(report_payload).strip()
source = "report"
if not gate_status:
issues.append("sonarqube report missing quality gate status")
elif report_error:
if host_url and project_key:
gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds)
source = "api"
if query_error:
issues.append(query_error)
else:
issues.append(report_error)
if not source and host_url and project_key:
gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds)
source = "api"
if query_error:
issues.append(query_error)
normalized = gate_status.upper()
passed = normalized in {"OK", "PASS", "PASSED"}
if enforce and not passed:
if gate_status:
issues.append(f"sonarqube gate is {gate_status}, expected OK")
else:
issues.append("sonarqube gate status unavailable")
status = "ok" if (passed or not enforce) and not issues else "failed"
return _result(
"sonarqube",
"SonarQube quality gate must pass for the current project.",
status,
enforce=enforce,
source=source or "none",
gate_status=gate_status or "unknown",
report_path=str(report_path),
issues=issues,
)
def _ironbank_status_from_report(payload: dict[str, Any]) -> tuple[str, bool | None]:
"""Extract a compliance status and explicit compliance flag from report payload."""
for key in ("status", "result", "compliance", "compliance_status"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), None
compliant = payload.get("compliant")
if isinstance(compliant, bool):
return "compliant" if compliant else "noncompliant", compliant
return "", None
def _run_ironbank_check(build_dir: Path) -> dict[str, Any]:
"""Enforce Iron Bank image-hardening compliance from build evidence."""
enforce = _env_flag("QUALITY_GATE_IRONBANK_ENFORCE", default=True)
required = _env_flag("QUALITY_GATE_IRONBANK_REQUIRED", default=True)
report_rel = os.getenv(
"QUALITY_GATE_IRONBANK_REPORT",
str(build_dir / "ironbank-compliance.json"),
)
report_path = Path(report_rel)
if not report_path.is_absolute():
report_path = Path.cwd() / report_path
issues: list[str] = []
status_value = ""
compliant: bool | None = None
source = "none"
report_payload, report_error = _load_json_report(report_path)
if report_payload is not None:
status_value, compliant = _ironbank_status_from_report(report_payload)
source = "report"
elif required:
issues.append(report_error or f"report missing: {report_path}")
normalized = status_value.strip().lower()
passed_status = normalized in {"ok", "pass", "passed", "compliant", "true"}
passed = compliant is True or passed_status
if enforce and required and not passed:
if status_value:
issues.append(f"ironbank compliance is {status_value}, expected compliant")
elif not issues:
issues.append("ironbank compliance status unavailable")
status = "ok" if (passed or not enforce or not required) and not issues else "failed"
return _result(
"ironbank",
"Iron Bank image-hardening compliance must pass for build artifacts.",
status,
enforce=enforce,
required=required,
source=source,
compliance=status_value or "unknown",
report_path=str(report_path),
issues=issues,
)
def _status_from_issues(issues: list[str]) -> str:
return "ok" if not issues else "failed"
def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]:
return {"name": name, "description": description, "status": status, **extra}
def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]:
command = [
sys.executable,
"-m",
"ruff",
"check",
"--select",
",".join(RUFF_SELECT),
"--ignore",
",".join(RUFF_IGNORE),
*contract.get("lint_paths", []),
]
started_at = time.monotonic()
completed = subprocess.run(command, cwd=root, check=False)
return _result(
"smell",
"Code-smell lint for managed Python automation.",
"ok" if completed.returncode == 0 else "failed",
returncode=completed.returncode,
command=command,
duration_seconds=round(time.monotonic() - started_at, 3),
)
def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]:
junit_path = root / suite["junit"]
junit_path.parent.mkdir(parents=True, exist_ok=True)
command = [
sys.executable,
"-m",
"pytest",
"-q",
*suite.get("paths", []),
f"--junitxml={junit_path}",
]
coverage_xml = suite.get("coverage_xml")
if coverage_xml:
for source in suite.get("coverage_sources", []):
command.append(f"--cov={source}")
command.extend(
[
"--cov-branch",
f"--cov-report=xml:{root / coverage_xml}",
]
)
started_at = time.monotonic()
completed = subprocess.run(command, cwd=root, check=False)
return _result(
suite_name,
suite["description"],
"ok" if completed.returncode == 0 else "failed",
returncode=completed.returncode,
command=command,
junit=str(junit_path.relative_to(root)),
coverage_xml=coverage_xml,
duration_seconds=round(time.monotonic() - started_at, 3),
)
def run_profile(
contract: dict[str, Any],
root: Path,
profile_name: str,
build_dir: Path,
) -> dict[str, Any]:
"""Execute the configured profile and return a JSON-serializable summary."""
build_dir.mkdir(parents=True, exist_ok=True)
results: list[dict[str, Any]] = []
profiles = contract.get("profiles", {})
if profile_name not in profiles:
raise SystemExit(f"unknown profile: {profile_name}")
for check_name in profiles[profile_name]:
if check_name == "docs":
issues = run_docs_check(contract, root)
results.append(
_result(
"docs",
"Required docs, contract descriptions, and module docstrings.",
_status_from_issues(issues),
issues=issues,
)
)
continue
if check_name == "smell":
results.append(_run_ruff(contract, root))
continue
if check_name == "hygiene":
issues = run_hygiene_check(contract, root)
results.append(
_result(
"hygiene",
"500 LOC hygiene and naming rules for managed test automation.",
_status_from_issues(issues),
issues=issues,
)
)
continue
if check_name == "coverage":
unit_suite = contract.get("pytest_suites", {}).get("unit", {})
coverage_xml = root / unit_suite.get("coverage_xml", "build/coverage-unit.xml")
issues = run_coverage_check(contract, root, coverage_xml)
results.append(
_result(
"coverage",
"Per-file 95% coverage floor for tracked quality-managed modules.",
_status_from_issues(issues),
issues=issues,
coverage_xml=str(coverage_xml.relative_to(root)),
)
)
continue
if check_name == "sonarqube":
results.append(_run_sonarqube_check(build_dir))
continue
if check_name == "ironbank":
results.append(_run_ironbank_check(build_dir))
continue
suite = contract.get("pytest_suites", {}).get(check_name)
if suite is None:
raise SystemExit(f"profile {profile_name} references unknown check: {check_name}")
results.append(_run_pytest_suite(root, check_name, suite))
status = "ok" if all(item["status"] == "ok" for item in results) else "failed"
return {
"profile": profile_name,
"status": status,
"results": results,
"manual_scripts": contract.get("manual_scripts", []),
}
def main(argv: list[str] | None = None) -> int:
"""CLI entrypoint for the quality gate."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--profile", default="local")
parser.add_argument("--build-dir", default="build")
args = parser.parse_args(argv)
root = Path.cwd()
build_dir = root / args.build_dir
build_dir.mkdir(parents=True, exist_ok=True)
contract = load_contract()
summary = run_profile(contract, root, args.profile, build_dir)
summary_path = build_dir / "quality-gate-summary.json"
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return 0 if summary["status"] == "ok" else 1
if __name__ == "__main__": # pragma: no cover - exercised via CLI execution
raise SystemExit(main())