ci: add sonar/supply evidence collection and checks metrics

This commit is contained in:
Brad Stein 2026-04-19 14:11:17 -03:00
parent 8a59825a9c
commit add6683e3c
2 changed files with 188 additions and 9 deletions

69
Jenkinsfile vendored
View File

@ -35,6 +35,8 @@ spec:
environment {
SUITE_NAME = 'ananke'
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
QUALITY_GATE_SONARQUBE_REPORT = 'build/sonarqube-quality-gate.json'
QUALITY_GATE_IRONBANK_REPORT = 'build/ironbank-compliance.json'
}
options {
@ -52,6 +54,73 @@ spec:
}
}
stage('Collect SonarQube evidence') {
steps {
container('publisher') {
sh '''
set -eu
mkdir -p build
python3 - <<'PY'
import base64
import json
import os
import urllib.parse
import urllib.request
host = os.getenv('SONARQUBE_HOST_URL', '').strip().rstrip('/')
project_key = os.getenv('SONARQUBE_PROJECT_KEY', '').strip()
token = os.getenv('SONARQUBE_TOKEN', '').strip()
report_path = os.getenv('QUALITY_GATE_SONARQUBE_REPORT', 'build/sonarqube-quality-gate.json')
payload = {"status": "ERROR", "note": "missing SONARQUBE_HOST_URL and/or SONARQUBE_PROJECT_KEY"}
if host and project_key:
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(f"{host}/api/qualitygates/project_status?{query}", method="GET")
if token:
encoded = base64.b64encode(f"{token}:".encode("utf-8")).decode("utf-8")
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=12) as response:
payload = json.loads(response.read().decode("utf-8"))
except Exception as exc: # noqa: BLE001
payload = {"status": "ERROR", "error": str(exc)}
with open(report_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\\n")
PY
'''
}
}
}
stage('Collect Supply Chain evidence') {
steps {
container('publisher') {
sh '''
set -eu
mkdir -p build
python3 - <<'PY'
import json
import os
from pathlib import Path
report_path = Path(os.getenv('QUALITY_GATE_IRONBANK_REPORT', 'build/ironbank-compliance.json'))
if report_path.exists():
raise SystemExit(0)
status = os.getenv('IRONBANK_COMPLIANCE_STATUS', '').strip()
compliant = os.getenv('IRONBANK_COMPLIANT', '').strip().lower()
payload = {"status": status or "unknown", "compliant": compliant in {"1", "true", "yes", "on"} if compliant else None}
payload = {k: v for k, v in payload.items() if v is not None}
if "status" not in payload:
payload["status"] = "unknown"
payload["note"] = "Set IRONBANK_COMPLIANCE_STATUS/IRONBANK_COMPLIANT or write build/ironbank-compliance.json in image-building repos."
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\\n", encoding="utf-8")
PY
'''
}
}
}
stage('Run quality gate') {
steps {
container('go-tester') {

View File

@ -4,8 +4,10 @@
from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
import re
import sys
import time
import urllib.error
@ -15,6 +17,7 @@ import urllib.request
DEFAULT_PUSHGATEWAY_URL = "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
SOURCE_SCAN_ROOTS = ("cmd", "internal", "scripts", "testing")
SOURCE_EXTENSIONS = {".go", ".py", ".sh"}
QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"}
def _escape_label(value: str) -> str:
@ -74,14 +77,43 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
return 0.0
def _build_payload(suite: str, trigger: str, ok_count: int, failed_count: int) -> str:
def _build_payload(
suite: str,
trigger: str,
ok_count: int,
failed_count: int,
*,
tests_passed: int,
tests_failed: int,
tests_errors: int,
tests_skipped: int,
coverage_percent: float,
source_lines_over_500: int,
checks: dict[str, str],
) -> str:
lines = [
"# TYPE platform_quality_gate_runs_total counter",
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}',
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}',
"# TYPE ananke_quality_gate_tests_total gauge",
f'ananke_quality_gate_tests_total{{suite="{suite}",result="passed"}} {tests_passed}',
f'ananke_quality_gate_tests_total{{suite="{suite}",result="failed"}} {tests_failed}',
f'ananke_quality_gate_tests_total{{suite="{suite}",result="error"}} {tests_errors}',
f'ananke_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {tests_skipped}',
"# TYPE ananke_quality_gate_coverage_percent gauge",
f'ananke_quality_gate_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}',
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}',
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
"# TYPE ananke_quality_gate_checks_total gauge",
"# TYPE ananke_quality_gate_publish_info gauge",
f'ananke_quality_gate_publish_info{_label_str({"suite": suite, "trigger": trigger})} 1',
]
lines.extend(
f'ananke_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1'
for check_name, check_status in checks.items()
)
return "\n".join(lines) + "\n"
@ -115,6 +147,67 @@ def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int
return count
def _parse_go_test_counts(output_path: Path) -> dict[str, int]:
if not output_path.exists():
return {"passed": 0, "failed": 0, "errors": 0, "skipped": 0}
text = output_path.read_text(encoding="utf-8", errors="ignore")
return {
"passed": len(re.findall(r"^--- PASS:", text, flags=re.M)),
"failed": len(re.findall(r"^--- FAIL:", text, flags=re.M)),
"errors": 0,
"skipped": len(re.findall(r"^--- SKIP:", text, flags=re.M)),
}
def _read_exit_code(path: Path) -> int:
if not path.exists():
return 1
raw = path.read_text(encoding="utf-8").strip()
try:
return int(raw)
except ValueError:
return 1
def _load_json(path: Path) -> dict | None:
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
return payload if isinstance(payload, dict) else None
def _sonarqube_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json"))))
if not report:
return "not_applicable"
status_candidates = [
report.get("status"),
((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None),
((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None),
]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def _supply_chain_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json"))))
if not report:
return "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status_candidates = [report.get("status"), report.get("result"), report.get("compliance")]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
@ -155,6 +248,7 @@ def parse_args(argv: list[str]) -> argparse.Namespace:
def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:])
repo_root = Path(__file__).resolve().parents[1]
build_dir = repo_root / "build"
remote_ok = 0
remote_failed = 0
@ -183,14 +277,30 @@ def main(argv: list[str] | None = None) -> int:
resolved_failed = max(args.local_failed, remote_failed)
coverage_percent = _read_coverage_percent(args.coverage_percent_file)
source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500)
payload = _build_payload(args.suite, args.trigger, resolved_ok, resolved_failed).rstrip("\n")
payload += (
"\n# TYPE ananke_quality_gate_coverage_percent gauge\n"
f'ananke_quality_gate_coverage_percent{{suite="{args.suite}"}} {coverage_percent:.3f}\n'
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge\n"
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{args.suite}"}} {coverage_percent:.3f}\n'
"# TYPE platform_quality_gate_source_lines_over_500_total gauge\n"
f'platform_quality_gate_source_lines_over_500_total{{suite="{args.suite}"}} {source_lines_over_500}\n'
tests = _parse_go_test_counts(Path(os.getenv("ANANKE_QUALITY_OUTPUT_FILE", str(build_dir / "quality-gate.out"))))
gate_rc = _read_exit_code(Path(os.getenv("ANANKE_QUALITY_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))))
gate_failed = gate_rc != 0
checks = {
"tests": "failed" if gate_failed or tests["failed"] > 0 else "ok",
"coverage": "ok" if coverage_percent >= 95.0 else "failed",
"loc": "ok" if source_lines_over_500 == 0 else "failed",
"docs_naming": "not_applicable",
"gate_glue": "ok",
"sonarqube": _sonarqube_check_status(build_dir),
"supply_chain": _supply_chain_check_status(build_dir),
}
payload = _build_payload(
args.suite,
args.trigger,
resolved_ok,
resolved_failed,
tests_passed=tests["passed"],
tests_failed=tests["failed"],
tests_errors=tests["errors"],
tests_skipped=tests["skipped"],
coverage_percent=coverage_percent,
source_lines_over_500=source_lines_over_500,
checks=checks,
)
if args.dry_run: