ci: add sonar/supply evidence collection and checks metrics

This commit is contained in:
Brad Stein 2026-04-19 14:11:17 -03:00
parent cb1ca94f86
commit 6dbb2ceead
2 changed files with 208 additions and 44 deletions

69
Jenkinsfile vendored
View File

@ -35,6 +35,8 @@ spec:
environment { environment {
SUITE_NAME = 'ananke' SUITE_NAME = 'ananke'
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091' PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
QUALITY_GATE_SONARQUBE_REPORT = 'build/sonarqube-quality-gate.json'
QUALITY_GATE_IRONBANK_REPORT = 'build/ironbank-compliance.json'
} }
options { options {
@ -52,6 +54,73 @@ spec:
} }
} }
stage('Collect SonarQube evidence') {
steps {
container('publisher') {
sh '''
set -eu
mkdir -p build
python3 - <<'PY'
import base64
import json
import os
import urllib.parse
import urllib.request
host = os.getenv('SONARQUBE_HOST_URL', '').strip().rstrip('/')
project_key = os.getenv('SONARQUBE_PROJECT_KEY', '').strip()
token = os.getenv('SONARQUBE_TOKEN', '').strip()
report_path = os.getenv('QUALITY_GATE_SONARQUBE_REPORT', 'build/sonarqube-quality-gate.json')
payload = {"status": "ERROR", "note": "missing SONARQUBE_HOST_URL and/or SONARQUBE_PROJECT_KEY"}
if host and project_key:
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(f"{host}/api/qualitygates/project_status?{query}", method="GET")
if token:
encoded = base64.b64encode(f"{token}:".encode("utf-8")).decode("utf-8")
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=12) as response:
payload = json.loads(response.read().decode("utf-8"))
except Exception as exc: # noqa: BLE001
payload = {"status": "ERROR", "error": str(exc)}
with open(report_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\\n")
PY
'''
}
}
}
stage('Collect Supply Chain evidence') {
steps {
container('publisher') {
sh '''
set -eu
mkdir -p build
python3 - <<'PY'
import json
import os
from pathlib import Path
report_path = Path(os.getenv('QUALITY_GATE_IRONBANK_REPORT', 'build/ironbank-compliance.json'))
if report_path.exists():
raise SystemExit(0)
status = os.getenv('IRONBANK_COMPLIANCE_STATUS', '').strip()
compliant = os.getenv('IRONBANK_COMPLIANT', '').strip().lower()
payload = {"status": status or "unknown", "compliant": compliant in {"1", "true", "yes", "on"} if compliant else None}
payload = {k: v for k, v in payload.items() if v is not None}
if "status" not in payload:
payload["status"] = "unknown"
payload["note"] = "Set IRONBANK_COMPLIANCE_STATUS/IRONBANK_COMPLIANT or write build/ironbank-compliance.json in image-building repos."
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\\n", encoding="utf-8")
PY
'''
}
}
}
stage('Run quality gate') { stage('Run quality gate') {
steps { steps {
container('go-tester') { container('go-tester') {

View File

@ -4,7 +4,10 @@
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import json
import os import os
from pathlib import Path
import re
import sys import sys
import time import time
import urllib.error import urllib.error
@ -12,6 +15,9 @@ import urllib.request
DEFAULT_PUSHGATEWAY_URL = "http://platform-quality-gateway.monitoring.svc.cluster.local:9091" DEFAULT_PUSHGATEWAY_URL = "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
SOURCE_SCAN_ROOTS = ("cmd", "internal", "scripts", "testing")
SOURCE_EXTENSIONS = {".go", ".py", ".sh"}
QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"}
def _escape_label(value: str) -> str: def _escape_label(value: str) -> str:
@ -71,57 +77,137 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
return 0.0 return 0.0
def _parse_checks(raw_checks: list[str]) -> list[tuple[str, str]]:
parsed: list[tuple[str, str]] = []
for item in raw_checks:
if ":" not in item:
continue
name, status = item.split(":", 1)
normalized_name = name.strip()
normalized_status = status.strip().lower()
if not normalized_name or normalized_status not in {"ok", "failed"}:
continue
parsed.append((normalized_name, normalized_status))
return parsed
def _build_payload( def _build_payload(
suite: str, suite: str,
trigger: str, trigger: str,
ok_count: int, ok_count: int,
failed_count: int, failed_count: int,
*,
tests_passed: int, tests_passed: int,
tests_failed: int, tests_failed: int,
tests_error: int, tests_errors: int,
tests_skipped: int, tests_skipped: int,
coverage_percent: float, coverage_percent: float,
source_lines_over_500: int, source_lines_over_500: int,
checks: list[tuple[str, str]], checks: dict[str, str],
) -> str: ) -> str:
lines = [ lines = [
"# TYPE platform_quality_gate_runs_total counter", "# TYPE platform_quality_gate_runs_total counter",
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}', f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}',
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}', f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}',
"# TYPE ananke_quality_gate_tests_total gauge", "# TYPE ananke_quality_gate_tests_total gauge",
f'ananke_quality_gate_tests_total{{suite="{suite}",result="passed"}} {max(tests_passed, 0)}', f'ananke_quality_gate_tests_total{{suite="{suite}",result="passed"}} {tests_passed}',
f'ananke_quality_gate_tests_total{{suite="{suite}",result="failed"}} {max(tests_failed, 0)}', f'ananke_quality_gate_tests_total{{suite="{suite}",result="failed"}} {tests_failed}',
f'ananke_quality_gate_tests_total{{suite="{suite}",result="error"}} {max(tests_error, 0)}', f'ananke_quality_gate_tests_total{{suite="{suite}",result="error"}} {tests_errors}',
f'ananke_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {max(tests_skipped, 0)}', f'ananke_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {tests_skipped}',
"# TYPE ananke_quality_gate_coverage_percent gauge",
f'ananke_quality_gate_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}',
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}', f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}',
"# TYPE platform_quality_gate_source_lines_over_500_total gauge", "# TYPE platform_quality_gate_source_lines_over_500_total gauge",
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {max(source_lines_over_500, 0)}', f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
"# TYPE ananke_quality_gate_checks_total gauge", "# TYPE ananke_quality_gate_checks_total gauge",
"# TYPE ananke_quality_gate_publish_info gauge", "# TYPE ananke_quality_gate_publish_info gauge",
f'ananke_quality_gate_publish_info{_label_str({"suite": suite, "trigger": trigger})} 1', f'ananke_quality_gate_publish_info{_label_str({"suite": suite, "trigger": trigger})} 1',
] ]
for check_name, check_status in checks: lines.extend(
lines.append( f'ananke_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1'
f'ananke_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{check_status}"}} 1' for check_name, check_status in checks.items()
) )
return "\n".join(lines) + "\n" return "\n".join(lines) + "\n"
def _read_coverage_percent(path: str) -> float:
if not path:
return 0.0
try:
raw = Path(path).read_text(encoding="utf-8").strip()
except OSError:
return 0.0
try:
return float(raw)
except ValueError:
return 0.0
def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int:
count = 0
for rel_root in SOURCE_SCAN_ROOTS:
base = repo_root / rel_root
if not base.exists():
continue
for path in base.rglob("*"):
if not path.is_file():
continue
if path.suffix not in SOURCE_EXTENSIONS:
continue
lines = len(path.read_text(encoding="utf-8", errors="ignore").splitlines())
if lines > max_lines:
count += 1
return count
def _parse_go_test_counts(output_path: Path) -> dict[str, int]:
if not output_path.exists():
return {"passed": 0, "failed": 0, "errors": 0, "skipped": 0}
text = output_path.read_text(encoding="utf-8", errors="ignore")
return {
"passed": len(re.findall(r"^--- PASS:", text, flags=re.M)),
"failed": len(re.findall(r"^--- FAIL:", text, flags=re.M)),
"errors": 0,
"skipped": len(re.findall(r"^--- SKIP:", text, flags=re.M)),
}
def _read_exit_code(path: Path) -> int:
if not path.exists():
return 1
raw = path.read_text(encoding="utf-8").strip()
try:
return int(raw)
except ValueError:
return 1
def _load_json(path: Path) -> dict | None:
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
return payload if isinstance(payload, dict) else None
def _sonarqube_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json"))))
if not report:
return "not_applicable"
status_candidates = [
report.get("status"),
((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None),
((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None),
]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def _supply_chain_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json"))))
if not report:
return "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status_candidates = [report.get("status"), report.get("result"), report.get("compliance")]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def parse_args(argv: list[str]) -> argparse.Namespace: def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__) parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument( parser.add_argument(
@ -136,13 +222,10 @@ def parse_args(argv: list[str]) -> argparse.Namespace:
parser.add_argument("--trigger", default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_TRIGGER", "host")) parser.add_argument("--trigger", default=os.getenv("ANANKE_QUALITY_PUSHGATEWAY_TRIGGER", "host"))
parser.add_argument("--local-ok", type=int, required=True) parser.add_argument("--local-ok", type=int, required=True)
parser.add_argument("--local-failed", type=int, required=True) parser.add_argument("--local-failed", type=int, required=True)
parser.add_argument("--tests-passed", type=int, default=0) parser.add_argument(
parser.add_argument("--tests-failed", type=int, default=0) "--coverage-percent-file",
parser.add_argument("--tests-error", type=int, default=0) default=os.getenv("ANANKE_QUALITY_COVERAGE_PERCENT_FILE", "build/coverage-percent.txt"),
parser.add_argument("--tests-skipped", type=int, default=0) )
parser.add_argument("--coverage-percent", type=float, default=0.0)
parser.add_argument("--source-lines-over-500", type=int, default=0)
parser.add_argument("--check", action="append", default=[], help="check_name:ok|failed")
parser.add_argument( parser.add_argument(
"--timeout-seconds", "--timeout-seconds",
type=float, type=float,
@ -164,6 +247,8 @@ def parse_args(argv: list[str]) -> argparse.Namespace:
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
args = parse_args(argv or sys.argv[1:]) args = parse_args(argv or sys.argv[1:])
repo_root = Path(__file__).resolve().parents[1]
build_dir = repo_root / "build"
remote_ok = 0 remote_ok = 0
remote_failed = 0 remote_failed = 0
@ -190,21 +275,32 @@ def main(argv: list[str] | None = None) -> int:
resolved_ok = max(args.local_ok, remote_ok) resolved_ok = max(args.local_ok, remote_ok)
resolved_failed = max(args.local_failed, remote_failed) resolved_failed = max(args.local_failed, remote_failed)
checks = _parse_checks(args.check) coverage_percent = _read_coverage_percent(args.coverage_percent_file)
if not checks: source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500)
checks = [("gate", "ok" if args.local_failed <= 0 else "failed")] tests = _parse_go_test_counts(Path(os.getenv("ANANKE_QUALITY_OUTPUT_FILE", str(build_dir / "quality-gate.out"))))
gate_rc = _read_exit_code(Path(os.getenv("ANANKE_QUALITY_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))))
gate_failed = gate_rc != 0
checks = {
"tests": "failed" if gate_failed or tests["failed"] > 0 else "ok",
"coverage": "ok" if coverage_percent >= 95.0 else "failed",
"loc": "ok" if source_lines_over_500 == 0 else "failed",
"docs_naming": "not_applicable",
"gate_glue": "ok",
"sonarqube": _sonarqube_check_status(build_dir),
"supply_chain": _supply_chain_check_status(build_dir),
}
payload = _build_payload( payload = _build_payload(
args.suite, args.suite,
args.trigger, args.trigger,
resolved_ok, resolved_ok,
resolved_failed, resolved_failed,
args.tests_passed, tests_passed=tests["passed"],
args.tests_failed, tests_failed=tests["failed"],
args.tests_error, tests_errors=tests["errors"],
args.tests_skipped, tests_skipped=tests["skipped"],
args.coverage_percent, coverage_percent=coverage_percent,
args.source_lines_over_500, source_lines_over_500=source_lines_over_500,
checks, checks=checks,
) )
if args.dry_run: if args.dry_run:
@ -216,8 +312,7 @@ def main(argv: list[str] | None = None) -> int:
summary = ( summary = (
f"[quality] published Pushgateway metrics suite={args.suite} job={args.job_name} ok={resolved_ok} " f"[quality] published Pushgateway metrics suite={args.suite} job={args.job_name} ok={resolved_ok} "
f"failed={resolved_failed} checks={len(checks)} coverage={args.coverage_percent:.2f} " f"failed={resolved_failed} coverage={coverage_percent:.3f} source_lines_over_500={source_lines_over_500}"
f"over_500={max(args.source_lines_over_500, 0)}"
) )
if remote_error: if remote_error:
summary += f" remote_read_error={remote_error}" summary += f" remote_read_error={remote_error}"