From add6683e3c36075568f6b89ce80ce267dfa00571 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Sun, 19 Apr 2026 14:11:17 -0300 Subject: [PATCH] ci: add sonar/supply evidence collection and checks metrics --- Jenkinsfile | 69 ++++++++++++++++ scripts/publish_quality_metrics.py | 128 +++++++++++++++++++++++++++-- 2 files changed, 188 insertions(+), 9 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index e45e128..3041cae 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -35,6 +35,8 @@ spec: environment { SUITE_NAME = 'ananke' PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091' + QUALITY_GATE_SONARQUBE_REPORT = 'build/sonarqube-quality-gate.json' + QUALITY_GATE_IRONBANK_REPORT = 'build/ironbank-compliance.json' } options { @@ -52,6 +54,73 @@ spec: } } + stage('Collect SonarQube evidence') { + steps { + container('publisher') { + sh ''' + set -eu + mkdir -p build + python3 - <<'PY' +import base64 +import json +import os +import urllib.parse +import urllib.request + +host = os.getenv('SONARQUBE_HOST_URL', '').strip().rstrip('/') +project_key = os.getenv('SONARQUBE_PROJECT_KEY', '').strip() +token = os.getenv('SONARQUBE_TOKEN', '').strip() +report_path = os.getenv('QUALITY_GATE_SONARQUBE_REPORT', 'build/sonarqube-quality-gate.json') +payload = {"status": "ERROR", "note": "missing SONARQUBE_HOST_URL and/or SONARQUBE_PROJECT_KEY"} +if host and project_key: + query = urllib.parse.urlencode({"projectKey": project_key}) + request = urllib.request.Request(f"{host}/api/qualitygates/project_status?{query}", method="GET") + if token: + encoded = base64.b64encode(f"{token}:".encode("utf-8")).decode("utf-8") + request.add_header("Authorization", f"Basic {encoded}") + try: + with urllib.request.urlopen(request, timeout=12) as response: + payload = json.loads(response.read().decode("utf-8")) + except Exception as exc: # noqa: BLE001 + payload = {"status": "ERROR", "error": str(exc)} +with open(report_path, "w", encoding="utf-8") as handle: + json.dump(payload, handle, indent=2, sort_keys=True) + handle.write("\\n") +PY + ''' + } + } + } + + stage('Collect Supply Chain evidence') { + steps { + container('publisher') { + sh ''' + set -eu + mkdir -p build + python3 - <<'PY' +import json +import os +from pathlib import Path + +report_path = Path(os.getenv('QUALITY_GATE_IRONBANK_REPORT', 'build/ironbank-compliance.json')) +if report_path.exists(): + raise SystemExit(0) +status = os.getenv('IRONBANK_COMPLIANCE_STATUS', '').strip() +compliant = os.getenv('IRONBANK_COMPLIANT', '').strip().lower() +payload = {"status": status or "unknown", "compliant": compliant in {"1", "true", "yes", "on"} if compliant else None} +payload = {k: v for k, v in payload.items() if v is not None} +if "status" not in payload: + payload["status"] = "unknown" +payload["note"] = "Set IRONBANK_COMPLIANCE_STATUS/IRONBANK_COMPLIANT or write build/ironbank-compliance.json in image-building repos." +report_path.parent.mkdir(parents=True, exist_ok=True) +report_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\\n", encoding="utf-8") +PY + ''' + } + } + } + stage('Run quality gate') { steps { container('go-tester') { diff --git a/scripts/publish_quality_metrics.py b/scripts/publish_quality_metrics.py index afe5e71..fed09cd 100755 --- a/scripts/publish_quality_metrics.py +++ b/scripts/publish_quality_metrics.py @@ -4,8 +4,10 @@ from __future__ import annotations import argparse +import json import os from pathlib import Path +import re import sys import time import urllib.error @@ -15,6 +17,7 @@ import urllib.request DEFAULT_PUSHGATEWAY_URL = "http://platform-quality-gateway.monitoring.svc.cluster.local:9091" SOURCE_SCAN_ROOTS = ("cmd", "internal", "scripts", "testing") SOURCE_EXTENSIONS = {".go", ".py", ".sh"} +QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"} def _escape_label(value: str) -> str: @@ -74,14 +77,43 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, return 0.0 -def _build_payload(suite: str, trigger: str, ok_count: int, failed_count: int) -> str: +def _build_payload( + suite: str, + trigger: str, + ok_count: int, + failed_count: int, + *, + tests_passed: int, + tests_failed: int, + tests_errors: int, + tests_skipped: int, + coverage_percent: float, + source_lines_over_500: int, + checks: dict[str, str], +) -> str: lines = [ "# TYPE platform_quality_gate_runs_total counter", f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}', f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}', + "# TYPE ananke_quality_gate_tests_total gauge", + f'ananke_quality_gate_tests_total{{suite="{suite}",result="passed"}} {tests_passed}', + f'ananke_quality_gate_tests_total{{suite="{suite}",result="failed"}} {tests_failed}', + f'ananke_quality_gate_tests_total{{suite="{suite}",result="error"}} {tests_errors}', + f'ananke_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {tests_skipped}', + "# TYPE ananke_quality_gate_coverage_percent gauge", + f'ananke_quality_gate_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}', + "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", + f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}', + "# TYPE platform_quality_gate_source_lines_over_500_total gauge", + f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', + "# TYPE ananke_quality_gate_checks_total gauge", "# TYPE ananke_quality_gate_publish_info gauge", f'ananke_quality_gate_publish_info{_label_str({"suite": suite, "trigger": trigger})} 1', ] + lines.extend( + f'ananke_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1' + for check_name, check_status in checks.items() + ) return "\n".join(lines) + "\n" @@ -115,6 +147,67 @@ def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int return count +def _parse_go_test_counts(output_path: Path) -> dict[str, int]: + if not output_path.exists(): + return {"passed": 0, "failed": 0, "errors": 0, "skipped": 0} + text = output_path.read_text(encoding="utf-8", errors="ignore") + return { + "passed": len(re.findall(r"^--- PASS:", text, flags=re.M)), + "failed": len(re.findall(r"^--- FAIL:", text, flags=re.M)), + "errors": 0, + "skipped": len(re.findall(r"^--- SKIP:", text, flags=re.M)), + } + + +def _read_exit_code(path: Path) -> int: + if not path.exists(): + return 1 + raw = path.read_text(encoding="utf-8").strip() + try: + return int(raw) + except ValueError: + return 1 + + +def _load_json(path: Path) -> dict | None: + if not path.exists(): + return None + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return None + return payload if isinstance(payload, dict) else None + + +def _sonarqube_check_status(build_dir: Path) -> str: + report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json")))) + if not report: + return "not_applicable" + status_candidates = [ + report.get("status"), + ((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None), + ((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None), + ] + for value in status_candidates: + if isinstance(value, str): + return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed" + return "failed" + + +def _supply_chain_check_status(build_dir: Path) -> str: + report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json")))) + if not report: + return "not_applicable" + compliant = report.get("compliant") + if isinstance(compliant, bool): + return "ok" if compliant else "failed" + status_candidates = [report.get("status"), report.get("result"), report.get("compliance")] + for value in status_candidates: + if isinstance(value, str): + return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed" + return "failed" + + def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( @@ -155,6 +248,7 @@ def parse_args(argv: list[str]) -> argparse.Namespace: def main(argv: list[str] | None = None) -> int: args = parse_args(argv or sys.argv[1:]) repo_root = Path(__file__).resolve().parents[1] + build_dir = repo_root / "build" remote_ok = 0 remote_failed = 0 @@ -183,14 +277,30 @@ def main(argv: list[str] | None = None) -> int: resolved_failed = max(args.local_failed, remote_failed) coverage_percent = _read_coverage_percent(args.coverage_percent_file) source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500) - payload = _build_payload(args.suite, args.trigger, resolved_ok, resolved_failed).rstrip("\n") - payload += ( - "\n# TYPE ananke_quality_gate_coverage_percent gauge\n" - f'ananke_quality_gate_coverage_percent{{suite="{args.suite}"}} {coverage_percent:.3f}\n' - "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge\n" - f'platform_quality_gate_workspace_line_coverage_percent{{suite="{args.suite}"}} {coverage_percent:.3f}\n' - "# TYPE platform_quality_gate_source_lines_over_500_total gauge\n" - f'platform_quality_gate_source_lines_over_500_total{{suite="{args.suite}"}} {source_lines_over_500}\n' + tests = _parse_go_test_counts(Path(os.getenv("ANANKE_QUALITY_OUTPUT_FILE", str(build_dir / "quality-gate.out")))) + gate_rc = _read_exit_code(Path(os.getenv("ANANKE_QUALITY_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc")))) + gate_failed = gate_rc != 0 + checks = { + "tests": "failed" if gate_failed or tests["failed"] > 0 else "ok", + "coverage": "ok" if coverage_percent >= 95.0 else "failed", + "loc": "ok" if source_lines_over_500 == 0 else "failed", + "docs_naming": "not_applicable", + "gate_glue": "ok", + "sonarqube": _sonarqube_check_status(build_dir), + "supply_chain": _supply_chain_check_status(build_dir), + } + payload = _build_payload( + args.suite, + args.trigger, + resolved_ok, + resolved_failed, + tests_passed=tests["passed"], + tests_failed=tests["failed"], + tests_errors=tests["errors"], + tests_skipped=tests["skipped"], + coverage_percent=coverage_percent, + source_lines_over_500=source_lines_over_500, + checks=checks, ) if args.dry_run: