ci: add sonar/supply evidence collection and checks metrics

This commit is contained in:
Brad Stein 2026-04-19 14:10:50 -03:00
parent 265be3eeab
commit e2f754dd53
2 changed files with 196 additions and 95 deletions

131
Jenkinsfile vendored
View File

@ -90,6 +90,8 @@ spec:
TEST_EXIT_CODE_PATH = 'build/test.exitcode'
SUITE_NAME = 'metis'
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
QUALITY_GATE_SONARQUBE_REPORT = 'build/sonarqube-quality-gate.json'
QUALITY_GATE_IRONBANK_REPORT = 'build/ironbank-compliance.json'
}
options {
disableConcurrentBuilds()
@ -111,7 +113,74 @@ spec:
}
}
stage('Unit tests') {
stage('Collect SonarQube evidence') {
steps {
container('publisher') {
sh '''
set -eu
mkdir -p build
python3 - <<'PY'
import base64
import json
import os
import urllib.parse
import urllib.request
host = os.getenv('SONARQUBE_HOST_URL', '').strip().rstrip('/')
project_key = os.getenv('SONARQUBE_PROJECT_KEY', '').strip()
token = os.getenv('SONARQUBE_TOKEN', '').strip()
report_path = os.getenv('QUALITY_GATE_SONARQUBE_REPORT', 'build/sonarqube-quality-gate.json')
payload = {"status": "ERROR", "note": "missing SONARQUBE_HOST_URL and/or SONARQUBE_PROJECT_KEY"}
if host and project_key:
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(f"{host}/api/qualitygates/project_status?{query}", method="GET")
if token:
encoded = base64.b64encode(f"{token}:".encode("utf-8")).decode("utf-8")
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=12) as response:
payload = json.loads(response.read().decode("utf-8"))
except Exception as exc: # noqa: BLE001
payload = {"status": "ERROR", "error": str(exc)}
with open(report_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\\n")
PY
'''
}
}
}
stage('Collect Supply Chain evidence') {
steps {
container('publisher') {
sh '''
set -eu
mkdir -p build
python3 - <<'PY'
import json
import os
from pathlib import Path
report_path = Path(os.getenv('QUALITY_GATE_IRONBANK_REPORT', 'build/ironbank-compliance.json'))
if report_path.exists():
raise SystemExit(0)
status = os.getenv('IRONBANK_COMPLIANCE_STATUS', '').strip()
compliant = os.getenv('IRONBANK_COMPLIANT', '').strip().lower()
payload = {"status": status or "unknown", "compliant": compliant in {"1", "true", "yes", "on"} if compliant else None}
payload = {k: v for k, v in payload.items() if v is not None}
if "status" not in payload:
payload["status"] = "unknown"
payload["note"] = "Set IRONBANK_COMPLIANCE_STATUS/IRONBANK_COMPLIANT or write build/ironbank-compliance.json in image-building repos."
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\\n", encoding="utf-8")
PY
'''
}
}
}
stage('Run quality gate') {
steps {
container('tester') {
sh '''
@ -119,37 +188,11 @@ spec:
apt-get update >/dev/null
apt-get install -y --no-install-recommends xz-utils >/dev/null
mkdir -p build
export GOPROXY='https://proxy.golang.org,direct'
export GOSUMDB='sum.golang.org'
for attempt in 1 2 3; do
if go mod download >/dev/null 2>&1; then
break
fi
if [ "${attempt}" -eq 3 ]; then
echo "go mod download failed after ${attempt} attempts" >&2
break
fi
sleep $((attempt * 3))
done
go install github.com/jstemmer/go-junit-report/v2@latest
test_rc=1
for attempt in 1 2 3; do
set +e
go test -v -count=1 -coverprofile=build/coverage.out ./... > build/test.out 2>&1
test_rc=$?
set -e
if [ "${test_rc}" -eq 0 ]; then
break
fi
if ! grep -q 'TLS handshake timeout' build/test.out 2>/dev/null; then
break
fi
if [ "${attempt}" -eq 3 ]; then
break
fi
sleep $((attempt * 3))
done
printf '%s\n' "${test_rc}" > "${TEST_EXIT_CODE_PATH}"
set +e
go test -v -count=1 -coverprofile=build/coverage.out ./... > build/test.out 2>&1
test_rc=$?
set -e
cat build/test.out
"$(go env GOPATH)/bin/go-junit-report" < build/test.out > "${JUNIT_XML}"
coverage="0"
@ -158,6 +201,24 @@ spec:
fi
export GO_COVERAGE="${coverage}"
printf '{"summary":{"percent_covered":%s}}\n' "${GO_COVERAGE}" > "${COVERAGE_JSON}"
quality_rc=0
if [ "${test_rc}" -eq 0 ]; then
set +e
cd testing
METIS_USE_EXISTING_COVERAGE=1 go test -v ./...
quality_rc=$?
set -e
cd "${WORKSPACE}"
else
quality_rc=1
fi
gate_rc=0
if [ "${test_rc}" -ne 0 ] || [ "${quality_rc}" -ne 0 ]; then
gate_rc=1
fi
printf '%s\n' "${gate_rc}" > "${TEST_EXIT_CODE_PATH}"
'''
}
}
@ -165,18 +226,16 @@ spec:
stage('Publish test metrics') {
steps {
container('tester') {
container('publisher') {
sh '''
set -eu
apt-get update >/dev/null
apt-get install -y --no-install-recommends python3 >/dev/null
python3 scripts/publish_test_metrics.py
python scripts/publish_test_metrics.py
'''
}
}
}
stage('Enforce test result') {
stage('Enforce quality gate') {
steps {
container('tester') {
sh '''

View File

@ -5,14 +5,11 @@ from __future__ import annotations
import json
import os
import sys
from pathlib import Path
import urllib.request
import xml.etree.ElementTree as ET
from pathlib import Path
SOURCE_SUFFIXES = {".go", ".py", ".js", ".ts", ".tsx", ".json", ".yaml", ".yml", ".sh"}
SKIP_DIRS = {".git", ".venv", "venv", "node_modules", "build", "dist", "__pycache__", ".pytest_cache"}
QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"}
def _escape_label(value: str) -> str:
@ -45,7 +42,6 @@ def _load_junit(path: str) -> dict[str, int]:
except ValueError:
return 0
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
elif root.tag == "testsuites":
@ -62,25 +58,6 @@ def _load_junit(path: str) -> dict[str, int]:
return totals
def _count_lines_over_limit(root: Path, *, max_lines: int = 500) -> int:
count = 0
for path in root.rglob("*"):
if not path.is_file():
continue
if any(part in SKIP_DIRS for part in path.parts):
continue
if path.name != "Jenkinsfile" and path.suffix.lower() not in SOURCE_SUFFIXES:
continue
try:
with path.open("r", encoding="utf-8", errors="ignore") as handle:
lines = sum(1 for _ in handle)
except OSError:
continue
if lines > max_lines:
count += 1
return count
def _load_exit_code(path: str) -> int | None:
if not path or not os.path.exists(path):
return None
@ -90,16 +67,8 @@ def _load_exit_code(path: str) -> int | None:
return None
try:
return int(raw)
except ValueError:
raise RuntimeError(f"invalid test exit code {raw!r} in {path}")
def _read_http(url: str) -> str:
try:
with urllib.request.urlopen(url, timeout=10) as resp:
return resp.read().decode("utf-8", errors="replace")
except Exception:
return ""
except ValueError as exc:
raise RuntimeError(f"invalid test exit code {raw!r} in {path}") from exc
def _post_text(url: str, payload: str) -> None:
@ -114,15 +83,22 @@ def _post_text(url: str, payload: str) -> None:
raise RuntimeError(f"metrics push failed status={resp.status}")
def _read_http(url: str) -> str:
try:
with urllib.request.urlopen(url, timeout=10) as resp:
return resp.read().decode("utf-8", errors="replace")
except Exception:
return ""
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics")
if not text:
return 0.0
for line in text.splitlines():
if not line.startswith(metric + "{"):
continue
if any(f'{k}="{v}"' not in line for k, v in labels.items()):
if any(f'{key}="{value}"' not in line for key, value in labels.items()):
continue
parts = line.split()
if len(parts) < 2:
@ -134,6 +110,64 @@ def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str,
return 0.0
def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int:
"""Count source files above the configured line budget."""
count = 0
for rel_root in ("cmd", "pkg", "scripts", "testing"):
base = repo_root / rel_root
if not base.exists():
continue
for path in base.rglob("*"):
if not path.is_file():
continue
if path.suffix not in {".go", ".py", ".sh"}:
continue
lines = len(path.read_text(encoding="utf-8", errors="ignore").splitlines())
if lines > max_lines:
count += 1
return count
def _load_json(path: Path) -> dict | None:
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
return payload if isinstance(payload, dict) else None
def _sonarqube_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json"))))
if not report:
return "not_applicable"
status_candidates = [
report.get("status"),
((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None),
((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None),
]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def _supply_chain_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json"))))
if not report:
return "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status_candidates = [report.get("status"), report.get("result"), report.get("compliance")]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def main() -> int:
coverage_path = os.getenv("COVERAGE_JSON", "build/coverage.json")
junit_path = os.getenv("JUNIT_XML", "build/junit.xml")
@ -145,17 +179,19 @@ def main() -> int:
branch = os.getenv("BRANCH_NAME", "")
build_number = os.getenv("BUILD_NUMBER", "")
commit = os.getenv("GIT_COMMIT", "")
strict = os.getenv("METRICS_STRICT", "") == "1"
repo_root = Path(__file__).resolve().parents[1]
build_dir = repo_root / "build"
if not os.path.exists(coverage_path):
raise RuntimeError(f"missing coverage file {coverage_path}")
if not os.path.exists(junit_path):
raise RuntimeError(f"missing junit file {junit_path}")
repo_root = Path(__file__).resolve().parents[1]
coverage = _load_coverage(coverage_path)
totals = _load_junit(junit_path)
over_500 = _count_lines_over_limit(repo_root)
test_exit_code = _load_exit_code(test_exit_code_path)
source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500)
passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0)
outcome = "ok"
@ -166,27 +202,32 @@ def main() -> int:
or totals["errors"] > 0
):
outcome = "failed"
job_name = "platform-quality-ci"
checks = {
"tests": "ok" if outcome == "ok" else "failed",
"coverage": "ok" if coverage >= 95.0 else "failed",
"loc": "ok" if source_lines_over_500 == 0 else "failed",
"docs_naming": "not_applicable",
"gate_glue": "ok",
"sonarqube": _sonarqube_check_status(build_dir),
"supply_chain": _supply_chain_check_status(build_dir),
}
ok_count = _fetch_existing_counter(
pushgateway_url,
"platform_quality_gate_runs_total",
{"job": job_name, "suite": suite, "status": "ok"},
{"job": "platform-quality-ci", "suite": suite, "status": "ok"},
)
failed_count = _fetch_existing_counter(
pushgateway_url,
"platform_quality_gate_runs_total",
{"job": job_name, "suite": suite, "status": "failed"},
{"job": "platform-quality-ci", "suite": suite, "status": "failed"},
)
if outcome == "ok":
ok_count += 1
else:
failed_count += 1
tests_check = "ok" if outcome == "ok" else "failed"
coverage_check = "ok" if coverage >= 95.0 else "failed"
loc_check = "ok" if over_500 == 0 else "failed"
labels = {
"job": "platform-quality-ci",
"suite": suite,
"branch": branch,
"build_number": build_number,
@ -210,16 +251,23 @@ def main() -> int:
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage:.3f}',
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {over_500}',
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
"# TYPE metis_quality_gate_checks_total gauge",
f'metis_quality_gate_checks_total{{suite="{suite}",check="tests",result="{tests_check}"}} 1',
f'metis_quality_gate_checks_total{{suite="{suite}",check="coverage",result="{coverage_check}"}} 1',
f'metis_quality_gate_checks_total{{suite="{suite}",check="loc",result="{loc_check}"}} 1',
"# TYPE metis_quality_gate_build_info gauge",
f"metis_quality_gate_build_info{_label_str(labels)} 1",
]
payload_lines.extend(
f'metis_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1'
for check_name, check_status in checks.items()
)
payload = "\n".join(payload_lines) + "\n"
_post_text(f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}", payload)
try:
_post_text(f"{pushgateway_url.rstrip('/')}/metrics/job/{labels['job']}/suite/{suite}", payload)
except Exception as exc:
print(f"metrics push failed: {exc}")
if strict:
raise
print(
json.dumps(
@ -232,10 +280,8 @@ def main() -> int:
"tests_errors": totals["errors"],
"tests_skipped": totals["skipped"],
"coverage_percent": round(coverage, 3),
"source_lines_over_500": over_500,
"source_lines_over_500": source_lines_over_500,
"test_exit_code": test_exit_code,
"ok_counter": ok_count,
"failed_counter": failed_count,
},
indent=2,
)
@ -244,8 +290,4 @@ def main() -> int:
if __name__ == "__main__":
try:
raise SystemExit(main())
except Exception as exc:
print(f"metrics push failed: {exc}")
raise
raise SystemExit(main())