ci: add sonar/supply evidence collection and checks metrics

This commit is contained in:
Brad Stein 2026-04-19 14:12:32 -03:00
parent 02c7d5b799
commit 250edcdad4
4 changed files with 202 additions and 166 deletions

154
Jenkinsfile vendored
View File

@ -52,7 +52,7 @@ spec:
- name: workspace-volume
mountPath: /home/jenkins/agent
- name: frontend
image: mcr.microsoft.com/playwright:v1.59.1-jammy
image: mcr.microsoft.com/playwright:v1.51.0-jammy
command: ["cat"]
tty: true
volumeMounts:
@ -80,8 +80,10 @@ spec:
BACK_IMAGE = "${REGISTRY}/bstein-dev-home-backend"
VERSION_TAG = 'dev'
SEMVER = 'dev'
SUITE_NAME = 'bstein-home'
SUITE_NAME = 'bstein_home'
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
QUALITY_GATE_SONARQUBE_REPORT = 'build/sonarqube-quality-gate.json'
QUALITY_GATE_IRONBANK_REPORT = 'build/ironbank-compliance.json'
}
options {
disableConcurrentBuilds()
@ -97,12 +99,79 @@ spec:
}
}
stage('Collect SonarQube evidence') {
steps {
container('tester') {
sh '''
set -euo pipefail
mkdir -p build
python3 - <<'PY'
import base64
import json
import os
import urllib.parse
import urllib.request
host = os.getenv('SONARQUBE_HOST_URL', '').strip().rstrip('/')
project_key = os.getenv('SONARQUBE_PROJECT_KEY', '').strip()
token = os.getenv('SONARQUBE_TOKEN', '').strip()
report_path = os.getenv('QUALITY_GATE_SONARQUBE_REPORT', 'build/sonarqube-quality-gate.json')
payload = {"status": "ERROR", "note": "missing SONARQUBE_HOST_URL and/or SONARQUBE_PROJECT_KEY"}
if host and project_key:
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(f"{host}/api/qualitygates/project_status?{query}", method="GET")
if token:
encoded = base64.b64encode(f"{token}:".encode("utf-8")).decode("utf-8")
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=12) as response:
payload = json.loads(response.read().decode("utf-8"))
except Exception as exc: # noqa: BLE001
payload = {"status": "ERROR", "error": str(exc)}
with open(report_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\\n")
PY
'''
}
}
}
stage('Collect Supply Chain evidence') {
steps {
container('tester') {
sh '''
set -euo pipefail
mkdir -p build
python3 - <<'PY'
import json
import os
from pathlib import Path
report_path = Path(os.getenv('QUALITY_GATE_IRONBANK_REPORT', 'build/ironbank-compliance.json'))
if report_path.exists():
raise SystemExit(0)
status = os.getenv('IRONBANK_COMPLIANCE_STATUS', '').strip()
compliant = os.getenv('IRONBANK_COMPLIANT', '').strip().lower()
payload = {"status": status or "unknown", "compliant": compliant in {"1", "true", "yes", "on"} if compliant else None}
payload = {k: v for k, v in payload.items() if v is not None}
if "status" not in payload:
payload["status"] = "unknown"
payload["note"] = "Set IRONBANK_COMPLIANCE_STATUS/IRONBANK_COMPLIANT or write build/ironbank-compliance.json in image-building repos."
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\\n", encoding="utf-8")
PY
'''
}
}
}
stage('Prep toolchain') {
steps {
container('builder') {
withCredentials([usernamePassword(credentialsId: 'harbor-robot', usernameVariable: 'HARBOR_USERNAME', passwordVariable: 'HARBOR_PASSWORD')]) {
sh '''
set -eu
set -euo pipefail
for attempt in 1 2 3 4 5; do
if apk add --no-cache bash git jq curl; then
break
@ -126,7 +195,7 @@ spec:
container('builder') {
script {
sh '''
set -eu
set -euo pipefail
if git describe --tags --exact-match >/dev/null 2>&1; then
SEMVER="$(git describe --tags --exact-match)"
else
@ -151,7 +220,7 @@ spec:
steps {
container('builder') {
sh '''
set -eu
set -euo pipefail
ready=0
for i in $(seq 1 10); do
if docker info >/dev/null 2>&1; then
@ -180,7 +249,7 @@ spec:
steps {
container('tester') {
sh '''
set -eu
set -euo pipefail
mkdir -p build
export PYTHONPATH="${WORKSPACE}/backend:${PYTHONPATH:-}"
python -m pip install --no-cache-dir -r backend/requirements.txt -r backend/requirements-dev.txt
@ -194,7 +263,7 @@ spec:
steps {
container('frontend') {
sh '''
set -eu
set -euo pipefail
mkdir -p build
cd frontend
npm ci
@ -207,16 +276,53 @@ spec:
}
}
stage('Unified quality gate') {
stage('Run quality gate') {
steps {
container('tester') {
sh '''
set -eu
set -euo pipefail
export PYTHONPATH="${WORKSPACE}:${PYTHONPATH:-}"
set +e
python -m testing.ci.quality_gate \
--backend-coverage build/backend-coverage.xml \
--frontend-coverage frontend/coverage/coverage-summary.json \
--report build/quality-gate.json
gate_rc=$?
set -e
printf '%s\n' "${gate_rc}" > build/quality-gate.rc
'''
}
}
}
stage('Publish test metrics') {
steps {
container('tester') {
sh '''
set -euo pipefail
gate_rc="$(cat build/quality-gate.rc 2>/dev/null || echo 1)"
status="ok"
if [ "${gate_rc}" -ne 0 ]; then
status="failed"
fi
python -m testing.ci.publish_metrics \
--gateway "${PUSHGATEWAY_URL}" \
--suite "${SUITE_NAME}" \
--job platform-quality-ci \
--status "${status}" \
--quality-report build/quality-gate.json \
--junit build/junit-backend.xml build/junit-frontend-unit.xml build/junit-frontend-component.xml build/junit-frontend-e2e.xml
'''
}
}
}
stage('Enforce quality gate') {
steps {
container('tester') {
sh '''
set -euo pipefail
test "$(cat build/quality-gate.rc 2>/dev/null || echo 1)" -eq 0
'''
}
}
@ -226,7 +332,7 @@ spec:
steps {
container('builder') {
sh '''
set -eu
set -euo pipefail
VERSION_TAG="$(cut -d= -f2 build.env)"
docker buildx build \
--platform linux/arm64 \
@ -244,7 +350,7 @@ spec:
steps {
container('builder') {
sh '''
set -eu
set -euo pipefail
VERSION_TAG="$(cut -d= -f2 build.env)"
docker buildx build \
--platform linux/arm64 \
@ -260,32 +366,6 @@ spec:
}
post {
success {
container('tester') {
sh '''
set -eu
python -m testing.ci.publish_metrics \
--gateway "${PUSHGATEWAY_URL}" \
--suite "${SUITE_NAME}" \
--job platform-quality-ci \
--status ok \
--junit build/junit-backend.xml build/junit-frontend-unit.xml build/junit-frontend-component.xml build/junit-frontend-e2e.xml
'''
}
}
failure {
container('tester') {
sh '''
set -eu
python -m testing.ci.publish_metrics \
--gateway "${PUSHGATEWAY_URL}" \
--suite "${SUITE_NAME}" \
--job platform-quality-ci \
--status failed \
--junit build/junit-backend.xml build/junit-frontend-unit.xml build/junit-frontend-component.xml build/junit-frontend-e2e.xml
'''
}
}
always {
script {
def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:]

View File

@ -4,97 +4,12 @@ from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
import re
import xml.etree.ElementTree as ET
from .summary import load_junit_summary, publish_quality_metrics
def _parse_check_args(raw_checks: list[str]) -> dict[str, str]:
checks: dict[str, str] = {}
for item in raw_checks:
if ":" not in item:
continue
name, status = item.split(":", 1)
normalized_name = name.strip()
normalized_status = status.strip().lower()
if not normalized_name or normalized_status not in {"ok", "failed"}:
continue
checks[normalized_name] = normalized_status
return checks
def _derive_quality_report(path: Path) -> tuple[int, dict[str, str]]:
if not path.exists():
return 0, {}
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return 0, {}
issues = payload.get("issues") if isinstance(payload, dict) else []
if not isinstance(issues, list):
return 0, {}
over_500 = 0
checks = {"docs": "ok", "loc": "ok", "coverage": "ok"}
for issue in issues:
if not isinstance(issue, dict):
continue
check_name = str(issue.get("check", ""))
if check_name == "docstring":
checks["docs"] = "failed"
elif check_name == "loc":
checks["loc"] = "failed"
over_500 += 1
elif check_name == "coverage":
checks["coverage"] = "failed"
return over_500, checks
def _load_backend_coverage_percent(path: Path) -> float:
if not path.exists():
return 0.0
try:
root = ET.parse(path).getroot()
except ET.ParseError:
return 0.0
raw = root.attrib.get("line-rate")
if raw is None:
return 0.0
try:
return float(raw) * 100.0
except ValueError:
return 0.0
def _load_frontend_coverage_percent(path: Path) -> float:
if not path.exists():
return 0.0
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return 0.0
total = payload.get("total") if isinstance(payload, dict) else {}
lines = total.get("lines") if isinstance(total, dict) else {}
pct = lines.get("pct") if isinstance(lines, dict) else None
if isinstance(pct, (int, float)):
return float(pct)
if isinstance(pct, str):
match = re.search(r"[0-9]+(?:\.[0-9]+)?", pct)
if match:
return float(match.group(0))
return 0.0
def _workspace_coverage_percent(backend_path: Path, frontend_path: Path) -> float:
backend = _load_backend_coverage_percent(backend_path)
frontend = _load_frontend_coverage_percent(frontend_path)
values = [value for value in (backend, frontend) if value > 0]
if not values:
return 0.0
return sum(values) / len(values)
def _build_parser() -> argparse.ArgumentParser:
"""Build the CLI parser for the metrics publisher."""
@ -104,46 +19,88 @@ def _build_parser() -> argparse.ArgumentParser:
parser.add_argument("--job", default="platform-quality-ci", help="Pushgateway job label")
parser.add_argument("--status", choices=("ok", "failed"), required=True, help="Gate outcome")
parser.add_argument("--junit", nargs="*", default=(), help="JUnit XML files to aggregate")
parser.add_argument("--quality-report", default="build/quality-gate.json", help="Quality-gate JSON report")
parser.add_argument("--backend-coverage", default="build/backend-coverage.xml", help="Backend coverage XML")
parser.add_argument(
"--frontend-coverage",
default="frontend/coverage/coverage-summary.json",
help="Frontend coverage summary JSON",
)
parser.add_argument("--coverage-percent", type=float, help="Override workspace coverage percent")
parser.add_argument("--source-lines-over-500", type=int, help="Override count of source files over 500 LOC")
parser.add_argument("--check", action="append", default=[], help="check_name:ok|failed")
parser.add_argument("--quality-report", default="build/quality-gate.json", help="Quality gate JSON report")
return parser
def _load_quality_report(path: Path) -> tuple[float, int, dict[str, str]]:
"""Read workspace coverage/LOC summary from the quality gate JSON output."""
if not path.exists():
return 0.0, 0, {
"tests": "not_applicable",
"coverage": "not_applicable",
"loc": "not_applicable",
"docs_naming": "not_applicable",
"gate_glue": "ok",
"sonarqube": "not_applicable",
"supply_chain": "not_applicable",
}
payload = json.loads(path.read_text(encoding="utf-8"))
coverage = payload.get("workspace_line_coverage_percent")
if not isinstance(coverage, (int, float)):
coverage = 0.0
source_lines = payload.get("source_lines_over_500")
if not isinstance(source_lines, int):
source_lines = 0
issue_checks = [item.get("check") for item in payload.get("issues", []) if isinstance(item, dict)]
docs_failed = any(str(check).lower() in {"docstring", "docs", "naming"} for check in issue_checks)
coverage_failed = any(str(check).lower() == "coverage" for check in issue_checks)
loc_failed = any(str(check).lower() in {"loc", "smell"} for check in issue_checks) or source_lines > 0
checks = {
"tests": "ok" if payload.get("issue_count", 0) == 0 else "failed",
"coverage": "failed" if coverage_failed or float(coverage) < 95.0 else "ok",
"loc": "failed" if loc_failed else "ok",
"docs_naming": "failed" if docs_failed else "ok",
"gate_glue": "ok",
"sonarqube": "not_applicable",
"supply_chain": "not_applicable",
}
sonarqube_report = Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json"))
if sonarqube_report.exists():
try:
sonarqube_payload = json.loads(sonarqube_report.read_text(encoding="utf-8"))
status = (
sonarqube_payload.get("status")
or (sonarqube_payload.get("projectStatus") or {}).get("status")
or (sonarqube_payload.get("qualityGate") or {}).get("status")
)
if isinstance(status, str):
checks["sonarqube"] = "ok" if status.strip().lower() in {"ok", "pass", "passed", "success"} else "failed"
except Exception:
checks["sonarqube"] = "failed"
ironbank_report = Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json"))
if ironbank_report.exists():
try:
ironbank_payload = json.loads(ironbank_report.read_text(encoding="utf-8"))
compliant = ironbank_payload.get("compliant")
if isinstance(compliant, bool):
checks["supply_chain"] = "ok" if compliant else "failed"
else:
status = ironbank_payload.get("status") or ironbank_payload.get("result")
if isinstance(status, str):
checks["supply_chain"] = (
"ok" if status.strip().lower() in {"ok", "pass", "passed", "success", "compliant"} else "failed"
)
except Exception:
checks["supply_chain"] = "failed"
return float(coverage), int(source_lines), checks
def main(argv: list[str] | None = None) -> int:
"""Parse arguments, aggregate JUnit files, and publish metrics."""
parser = _build_parser()
args = parser.parse_args(argv)
summary = load_junit_summary(Path(path) for path in args.junit)
report_over_500, report_checks = _derive_quality_report(Path(args.quality_report))
source_lines_over_500 = args.source_lines_over_500 if args.source_lines_over_500 is not None else report_over_500
coverage_percent = (
args.coverage_percent
if args.coverage_percent is not None
else _workspace_coverage_percent(Path(args.backend_coverage), Path(args.frontend_coverage))
)
checks = {
"tests": "ok" if summary.tests > 0 and summary.failures == 0 and summary.errors == 0 else "failed",
"coverage": "ok" if coverage_percent >= 95.0 else "failed",
"loc": "ok" if source_lines_over_500 == 0 else "failed",
}
checks.update(report_checks)
checks.update(_parse_check_args(args.check))
coverage_percent, source_lines_over_500, checks = _load_quality_report(Path(args.quality_report))
publish_quality_metrics(
gateway=args.gateway,
suite=args.suite,
job=args.job,
status=args.status,
summary=summary,
coverage_percent=coverage_percent,
workspace_line_coverage_percent=coverage_percent,
source_lines_over_500=source_lines_over_500,
checks=checks,
)

View File

@ -69,13 +69,12 @@ def render_payload(
ok: int,
failed: int,
summary: RunSummary,
coverage_percent: float = 0.0,
workspace_line_coverage_percent: float = 0.0,
source_lines_over_500: int = 0,
checks: dict[str, str] | None = None,
) -> str:
"""Render the Pushgateway payload for the quality-gate counters."""
lines = [
payload = (
"# TYPE platform_quality_gate_runs_total counter\n"
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok}\n'
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed}\n'
@ -85,19 +84,17 @@ def render_payload(
f'bstein_home_quality_gate_tests_total{{suite="{suite}",result="error"}} {summary.errors}\n'
f'bstein_home_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {summary.skipped}\n'
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge\n"
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_percent:.3f}\n'
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}\n'
"# TYPE platform_quality_gate_source_lines_over_500_total gauge\n"
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {max(source_lines_over_500, 0)}\n'
"# TYPE bstein_home_quality_gate_checks_total gauge\n"
]
merged_checks = checks or {}
for check_name, check_status in merged_checks.items():
if check_status not in {"ok", "failed"}:
continue
lines.append(
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {int(source_lines_over_500)}\n'
)
if checks:
payload += "# TYPE bstein_home_quality_gate_checks_total gauge\n"
payload += "".join(
f'bstein_home_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1\n'
for check_name, check_status in checks.items()
)
return "".join(lines)
return payload
def publish_quality_metrics(
@ -107,7 +104,7 @@ def publish_quality_metrics(
job: str,
status: str,
summary: RunSummary,
coverage_percent: float = 0.0,
workspace_line_coverage_percent: float = 0.0,
source_lines_over_500: int = 0,
checks: dict[str, str] | None = None,
) -> None:
@ -126,7 +123,7 @@ def publish_quality_metrics(
ok=int(counters["ok"]),
failed=int(counters["failed"]),
summary=summary,
coverage_percent=coverage_percent,
workspace_line_coverage_percent=workspace_line_coverage_percent,
source_lines_over_500=source_lines_over_500,
checks=checks,
)

View File

@ -14,6 +14,8 @@ def test_load_junit_summary_combines_suites(tmp_path: Path) -> None:
summary = load_junit_summary([junit])
assert summary == RunSummary(tests=3, failures=1, errors=0, skipped=1)
payload = render_payload(suite="bstein-home", ok=2, failed=0, summary=summary)
assert 'platform_quality_gate_runs_total{suite="bstein-home",status="ok"} 2' in payload
assert 'bstein_home_quality_gate_tests_total{suite="bstein-home",result="skipped"} 1' in payload
payload = render_payload(suite="bstein_home", ok=2, failed=0, summary=summary)
assert 'platform_quality_gate_runs_total{suite="bstein_home",status="ok"} 2' in payload
assert 'bstein_home_quality_gate_tests_total{suite="bstein_home",result="skipped"} 1' in payload
assert 'platform_quality_gate_workspace_line_coverage_percent{suite="bstein_home"} 0.000' in payload
assert 'platform_quality_gate_source_lines_over_500_total{suite="bstein_home"} 0' in payload