Compare commits

...

4 Commits

2 changed files with 173 additions and 5 deletions

77
Jenkinsfile vendored
View File

@ -42,6 +42,8 @@ spec:
environment {
SUITE_NAME = 'pegasus'
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
QUALITY_GATE_SONARQUBE_REPORT = 'build/sonarqube-quality-gate.json'
QUALITY_GATE_IRONBANK_REPORT = 'build/ironbank-compliance.json'
}
options {
@ -59,6 +61,73 @@ spec:
}
}
stage('Collect SonarQube evidence') {
steps {
container('publisher') {
sh '''
set -eu
mkdir -p build
python3 - <<'PY'
import base64
import json
import os
import urllib.parse
import urllib.request
host = os.getenv('SONARQUBE_HOST_URL', '').strip().rstrip('/')
project_key = os.getenv('SONARQUBE_PROJECT_KEY', '').strip()
token = os.getenv('SONARQUBE_TOKEN', '').strip()
report_path = os.getenv('QUALITY_GATE_SONARQUBE_REPORT', 'build/sonarqube-quality-gate.json')
payload = {"status": "ERROR", "note": "missing SONARQUBE_HOST_URL and/or SONARQUBE_PROJECT_KEY"}
if host and project_key:
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(f"{host}/api/qualitygates/project_status?{query}", method="GET")
if token:
encoded = base64.b64encode(f"{token}:".encode("utf-8")).decode("utf-8")
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=12) as response:
payload = json.loads(response.read().decode("utf-8"))
except Exception as exc: # noqa: BLE001
payload = {"status": "ERROR", "error": str(exc)}
with open(report_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\\n")
PY
'''
}
}
}
stage('Collect Supply Chain evidence') {
steps {
container('publisher') {
sh '''
set -eu
mkdir -p build
python3 - <<'PY'
import json
import os
from pathlib import Path
report_path = Path(os.getenv('QUALITY_GATE_IRONBANK_REPORT', 'build/ironbank-compliance.json'))
if report_path.exists():
raise SystemExit(0)
status = os.getenv('IRONBANK_COMPLIANCE_STATUS', '').strip()
compliant = os.getenv('IRONBANK_COMPLIANT', '').strip().lower()
payload = {"status": status or "unknown", "compliant": compliant in {"1", "true", "yes", "on"} if compliant else None}
payload = {k: v for k, v in payload.items() if v is not None}
if "status" not in payload:
payload["status"] = "unknown"
payload["note"] = "Set IRONBANK_COMPLIANCE_STATUS/IRONBANK_COMPLIANT or write build/ironbank-compliance.json in image-building repos."
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\\n", encoding="utf-8")
PY
'''
}
}
}
stage('Backend unit tests') {
steps {
container('go-tester') {
@ -110,11 +179,13 @@ spec:
}
}
stage('Quality gate report') {
stage('Run quality gate') {
steps {
container('publisher') {
sh '''
set -eu
apt-get update
apt-get install -y --no-install-recommends golang-go nodejs npm
python -m testing.pegasus_gate report
'''
}
@ -132,11 +203,13 @@ spec:
}
}
stage('Quality gate') {
stage('Enforce quality gate') {
steps {
container('publisher') {
sh '''
set -eu
apt-get update
apt-get install -y --no-install-recommends golang-go nodejs npm
python -m testing.pegasus_gate enforce
'''
}

View File

@ -8,6 +8,7 @@ Inputs:
Outputs pushed:
- platform_quality_gate_runs_total{suite="pegasus",status="ok|failed"}
- pegasus_quality_gate_tests_total{suite="pegasus",result=*}
- platform_quality_gate_test_case_result{suite="pegasus",test=*,status=*}
- pegasus_quality_gate_coverage_percent{suite="pegasus"}
- platform_quality_gate_workspace_line_coverage_percent{suite="pegasus"}
- platform_quality_gate_source_lines_over_500_total{suite="pegasus"}
@ -23,6 +24,7 @@ from pathlib import Path
SOURCE_SCAN_ROOTS = ("backend", "frontend/src", "scripts", "testing")
SOURCE_EXTENSIONS = {".go", ".py", ".ts", ".tsx", ".sh"}
QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"}
def _escape_label(value: str) -> str:
@ -71,6 +73,39 @@ def _load_junit(path: Path) -> dict[str, int]:
return totals
def _load_junit_cases(path: Path) -> list[tuple[str, str]]:
if not path.exists():
return []
tree = ET.parse(path)
root = tree.getroot()
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
elif root.tag == "testsuites":
suites = list(root.findall("testsuite"))
else:
suites = []
cases: list[tuple[str, str]] = []
for suite in suites:
for case in suite.findall("testcase"):
name = (case.attrib.get("name") or "").strip()
classname = (case.attrib.get("classname") or "").strip()
if not name:
continue
test_id = f"{classname}::{name}" if classname else name
status = "passed"
if case.find("failure") is not None:
status = "failed"
elif case.find("error") is not None:
status = "error"
elif case.find("skipped") is not None:
status = "skipped"
cases.append((test_id, status))
return cases
def _load_backend_coverage_percent(path: Path) -> float:
if not path.exists():
return 0.0
@ -143,7 +178,7 @@ def _post_text(url: str, payload: str) -> None:
req = urllib.request.Request(
url,
data=payload.encode("utf-8"),
method="POST",
method="PUT",
headers={"Content-Type": "text/plain"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
@ -168,8 +203,48 @@ def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int
return count
def _load_json(path: Path) -> dict | None:
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
return payload if isinstance(payload, dict) else None
def _sonarqube_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json"))))
if not report:
return "not_applicable"
status_candidates = [
report.get("status"),
((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None),
((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None),
]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def _supply_chain_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json"))))
if not report:
return "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status_candidates = [report.get("status"), report.get("result"), report.get("compliance")]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def main() -> int:
repo_root = Path(__file__).resolve().parents[1]
build_dir = repo_root / "build"
suite = os.getenv("SUITE_NAME", "pegasus")
pushgateway_url = os.getenv(
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
@ -187,6 +262,7 @@ def main() -> int:
b = _load_junit(backend_junit)
f = _load_junit(frontend_junit)
test_cases = _load_junit_cases(backend_junit) + _load_junit_cases(frontend_junit)
totals = {
"tests": b["tests"] + f["tests"],
"failures": b["failures"] + f["failures"],
@ -226,6 +302,15 @@ def main() -> int:
and totals["errors"] == 0
else "failed"
)
checks = {
"tests": "ok" if outcome == "ok" else "failed",
"coverage": "ok" if coverage_pct >= 95.0 else "failed",
"loc": "ok" if source_lines_over_500 == 0 else "failed",
"docs_naming": "ok" if not gate_issues else "failed",
"gate_glue": "ok",
"sonarqube": _sonarqube_check_status(build_dir),
"supply_chain": _supply_chain_check_status(build_dir),
}
job_name = "platform-quality-ci"
ok_count = _fetch_existing_counter(
@ -249,8 +334,8 @@ def main() -> int:
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}',
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}',
"# TYPE pegasus_test_suite_result gauge",
f'pegasus_test_suite_result{{suite="backend",status="{backend_suite_result}"}} 1',
f'pegasus_test_suite_result{{suite="frontend",status="{frontend_suite_result}"}} 1',
f'pegasus_test_suite_result{{test_suite="backend",status="{backend_suite_result}"}} 1',
f'pegasus_test_suite_result{{test_suite="frontend",status="{frontend_suite_result}"}} 1',
"# TYPE pegasus_quality_gate_tests_total gauge",
f'pegasus_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}',
f'pegasus_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}',
@ -266,9 +351,19 @@ def main() -> int:
f'pegasus_quality_gate_status{{suite="{suite}",result="{"ok" if gate_ok else "failed"}"}} 1',
"# TYPE pegasus_quality_gate_issues_total gauge",
f'pegasus_quality_gate_issues_total{{suite="{suite}"}} {len(gate_issues)}',
"# TYPE pegasus_quality_gate_checks_total gauge",
"# TYPE platform_quality_gate_test_case_result gauge",
"# TYPE pegasus_quality_gate_build_info gauge",
f"pegasus_quality_gate_build_info{_label_str(labels)} 1",
]
payload_lines.extend(
f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1'
for test_name, test_status in test_cases
)
payload_lines.extend(
f'pegasus_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1'
for check_name, check_status in checks.items()
)
payload = "\n".join(payload_lines) + "\n"
push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}"