Compare commits

...

4 Commits

5 changed files with 362 additions and 53 deletions

140
Jenkinsfile vendored
View File

@ -80,8 +80,10 @@ spec:
BACK_IMAGE = "${REGISTRY}/bstein-dev-home-backend" BACK_IMAGE = "${REGISTRY}/bstein-dev-home-backend"
VERSION_TAG = 'dev' VERSION_TAG = 'dev'
SEMVER = 'dev' SEMVER = 'dev'
SUITE_NAME = 'bstein-home' SUITE_NAME = 'bstein_home'
PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091' PUSHGATEWAY_URL = 'http://platform-quality-gateway.monitoring.svc.cluster.local:9091'
QUALITY_GATE_SONARQUBE_REPORT = 'build/sonarqube-quality-gate.json'
QUALITY_GATE_IRONBANK_REPORT = 'build/ironbank-compliance.json'
} }
options { options {
disableConcurrentBuilds() disableConcurrentBuilds()
@ -97,6 +99,73 @@ spec:
} }
} }
stage('Collect SonarQube evidence') {
steps {
container('tester') {
sh '''
set -euo pipefail
mkdir -p build
python3 - <<'PY'
import base64
import json
import os
import urllib.parse
import urllib.request
host = os.getenv('SONARQUBE_HOST_URL', '').strip().rstrip('/')
project_key = os.getenv('SONARQUBE_PROJECT_KEY', '').strip()
token = os.getenv('SONARQUBE_TOKEN', '').strip()
report_path = os.getenv('QUALITY_GATE_SONARQUBE_REPORT', 'build/sonarqube-quality-gate.json')
payload = {"status": "ERROR", "note": "missing SONARQUBE_HOST_URL and/or SONARQUBE_PROJECT_KEY"}
if host and project_key:
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(f"{host}/api/qualitygates/project_status?{query}", method="GET")
if token:
encoded = base64.b64encode(f"{token}:".encode("utf-8")).decode("utf-8")
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=12) as response:
payload = json.loads(response.read().decode("utf-8"))
except Exception as exc: # noqa: BLE001
payload = {"status": "ERROR", "error": str(exc)}
with open(report_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\\n")
PY
'''
}
}
}
stage('Collect Supply Chain evidence') {
steps {
container('tester') {
sh '''
set -euo pipefail
mkdir -p build
python3 - <<'PY'
import json
import os
from pathlib import Path
report_path = Path(os.getenv('QUALITY_GATE_IRONBANK_REPORT', 'build/ironbank-compliance.json'))
if report_path.exists():
raise SystemExit(0)
status = os.getenv('IRONBANK_COMPLIANCE_STATUS', '').strip()
compliant = os.getenv('IRONBANK_COMPLIANT', '').strip().lower()
payload = {"status": status or "unknown", "compliant": compliant in {"1", "true", "yes", "on"} if compliant else None}
payload = {k: v for k, v in payload.items() if v is not None}
if "status" not in payload:
payload["status"] = "unknown"
payload["note"] = "Set IRONBANK_COMPLIANCE_STATUS/IRONBANK_COMPLIANT or write build/ironbank-compliance.json in image-building repos."
report_path.parent.mkdir(parents=True, exist_ok=True)
report_path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\\n", encoding="utf-8")
PY
'''
}
}
}
stage('Prep toolchain') { stage('Prep toolchain') {
steps { steps {
container('builder') { container('builder') {
@ -193,7 +262,7 @@ spec:
stage('Frontend tests') { stage('Frontend tests') {
steps { steps {
container('frontend') { container('frontend') {
sh ''' sh(script: '''#!/usr/bin/env bash
set -euo pipefail set -euo pipefail
mkdir -p build mkdir -p build
cd frontend cd frontend
@ -202,21 +271,58 @@ spec:
npm run test:unit npm run test:unit
npm run test:component npm run test:component
npm run test:e2e npm run test:e2e
''' ''')
} }
} }
} }
stage('Unified quality gate') { stage('Run quality gate') {
steps { steps {
container('tester') { container('tester') {
sh ''' sh '''
set -euo pipefail set -euo pipefail
export PYTHONPATH="${WORKSPACE}:${PYTHONPATH:-}" export PYTHONPATH="${WORKSPACE}:${PYTHONPATH:-}"
set +e
python -m testing.ci.quality_gate \ python -m testing.ci.quality_gate \
--backend-coverage build/backend-coverage.xml \ --backend-coverage build/backend-coverage.xml \
--frontend-coverage frontend/coverage/coverage-summary.json \ --frontend-coverage frontend/coverage/coverage-summary.json \
--report build/quality-gate.json --report build/quality-gate.json
gate_rc=$?
set -e
printf '%s\n' "${gate_rc}" > build/quality-gate.rc
'''
}
}
}
stage('Publish test metrics') {
steps {
container('tester') {
sh '''
set -euo pipefail
gate_rc="$(cat build/quality-gate.rc 2>/dev/null || echo 1)"
status="ok"
if [ "${gate_rc}" -ne 0 ]; then
status="failed"
fi
python -m testing.ci.publish_metrics \
--gateway "${PUSHGATEWAY_URL}" \
--suite "${SUITE_NAME}" \
--job platform-quality-ci \
--status "${status}" \
--quality-report build/quality-gate.json \
--junit build/junit-backend.xml build/junit-frontend-unit.xml build/junit-frontend-component.xml build/junit-frontend-e2e.xml
'''
}
}
}
stage('Enforce quality gate') {
steps {
container('tester') {
sh '''
set -euo pipefail
test "$(cat build/quality-gate.rc 2>/dev/null || echo 1)" -eq 0
''' '''
} }
} }
@ -260,32 +366,6 @@ spec:
} }
post { post {
success {
container('tester') {
sh '''
set -euo pipefail
python -m testing.ci.publish_metrics \
--gateway "${PUSHGATEWAY_URL}" \
--suite "${SUITE_NAME}" \
--job platform-quality-ci \
--status ok \
--junit build/junit-backend.xml build/junit-frontend-unit.xml build/junit-frontend-component.xml build/junit-frontend-e2e.xml
'''
}
}
failure {
container('tester') {
sh '''
set -euo pipefail
python -m testing.ci.publish_metrics \
--gateway "${PUSHGATEWAY_URL}" \
--suite "${SUITE_NAME}" \
--job platform-quality-ci \
--status failed \
--junit build/junit-backend.xml build/junit-frontend-unit.xml build/junit-frontend-component.xml build/junit-frontend-e2e.xml
'''
}
}
always { always {
script { script {
def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:] def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:]

View File

@ -3,9 +3,11 @@ from __future__ import annotations
"""Command-line entry point for publishing CI test metrics.""" """Command-line entry point for publishing CI test metrics."""
import argparse import argparse
import json
import os
from pathlib import Path from pathlib import Path
from .summary import load_junit_summary, publish_quality_metrics from .summary import load_junit_cases, load_junit_summary, publish_quality_metrics
def _build_parser() -> argparse.ArgumentParser: def _build_parser() -> argparse.ArgumentParser:
@ -17,21 +19,93 @@ def _build_parser() -> argparse.ArgumentParser:
parser.add_argument("--job", default="platform-quality-ci", help="Pushgateway job label") parser.add_argument("--job", default="platform-quality-ci", help="Pushgateway job label")
parser.add_argument("--status", choices=("ok", "failed"), required=True, help="Gate outcome") parser.add_argument("--status", choices=("ok", "failed"), required=True, help="Gate outcome")
parser.add_argument("--junit", nargs="*", default=(), help="JUnit XML files to aggregate") parser.add_argument("--junit", nargs="*", default=(), help="JUnit XML files to aggregate")
parser.add_argument("--quality-report", default="build/quality-gate.json", help="Quality gate JSON report")
return parser return parser
def _load_quality_report(path: Path) -> tuple[float, int, dict[str, str]]:
"""Read workspace coverage/LOC summary from the quality gate JSON output."""
if not path.exists():
return 0.0, 0, {
"tests": "not_applicable",
"coverage": "not_applicable",
"loc": "not_applicable",
"docs_naming": "not_applicable",
"gate_glue": "ok",
"sonarqube": "not_applicable",
"supply_chain": "not_applicable",
}
payload = json.loads(path.read_text(encoding="utf-8"))
coverage = payload.get("workspace_line_coverage_percent")
if not isinstance(coverage, (int, float)):
coverage = 0.0
source_lines = payload.get("source_lines_over_500")
if not isinstance(source_lines, int):
source_lines = 0
issue_checks = [item.get("check") for item in payload.get("issues", []) if isinstance(item, dict)]
docs_failed = any(str(check).lower() in {"docstring", "docs", "naming"} for check in issue_checks)
coverage_failed = any(str(check).lower() == "coverage" for check in issue_checks)
loc_failed = any(str(check).lower() in {"loc", "smell"} for check in issue_checks) or source_lines > 0
checks = {
"tests": "ok" if payload.get("issue_count", 0) == 0 else "failed",
"coverage": "failed" if coverage_failed or float(coverage) < 95.0 else "ok",
"loc": "failed" if loc_failed else "ok",
"docs_naming": "failed" if docs_failed else "ok",
"gate_glue": "ok",
"sonarqube": "not_applicable",
"supply_chain": "not_applicable",
}
sonarqube_report = Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json"))
if sonarqube_report.exists():
try:
sonarqube_payload = json.loads(sonarqube_report.read_text(encoding="utf-8"))
status = (
sonarqube_payload.get("status")
or (sonarqube_payload.get("projectStatus") or {}).get("status")
or (sonarqube_payload.get("qualityGate") or {}).get("status")
)
if isinstance(status, str):
checks["sonarqube"] = "ok" if status.strip().lower() in {"ok", "pass", "passed", "success"} else "failed"
except Exception:
checks["sonarqube"] = "failed"
ironbank_report = Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json"))
if ironbank_report.exists():
try:
ironbank_payload = json.loads(ironbank_report.read_text(encoding="utf-8"))
compliant = ironbank_payload.get("compliant")
if isinstance(compliant, bool):
checks["supply_chain"] = "ok" if compliant else "failed"
else:
status = ironbank_payload.get("status") or ironbank_payload.get("result")
if isinstance(status, str):
checks["supply_chain"] = (
"ok" if status.strip().lower() in {"ok", "pass", "passed", "success", "compliant"} else "failed"
)
except Exception:
checks["supply_chain"] = "failed"
return float(coverage), int(source_lines), checks
def main(argv: list[str] | None = None) -> int: def main(argv: list[str] | None = None) -> int:
"""Parse arguments, aggregate JUnit files, and publish metrics.""" """Parse arguments, aggregate JUnit files, and publish metrics."""
parser = _build_parser() parser = _build_parser()
args = parser.parse_args(argv) args = parser.parse_args(argv)
summary = load_junit_summary(Path(path) for path in args.junit) junit_paths = [Path(path) for path in args.junit]
summary = load_junit_summary(junit_paths)
test_cases = load_junit_cases(junit_paths)
coverage_percent, source_lines_over_500, checks = _load_quality_report(Path(args.quality_report))
publish_quality_metrics( publish_quality_metrics(
gateway=args.gateway, gateway=args.gateway,
suite=args.suite, suite=args.suite,
job=args.job, job=args.job,
status=args.status, status=args.status,
summary=summary, summary=summary,
workspace_line_coverage_percent=coverage_percent,
source_lines_over_500=source_lines_over_500,
checks=checks,
test_cases=test_cases,
) )
return 0 return 0

View File

@ -221,6 +221,41 @@ def check_coverage(
return issues return issues
def compute_workspace_line_coverage(
paths: Iterable[Path],
*,
backend_report: Path,
frontend_report: Path,
) -> float:
"""Compute the mean line coverage percentage across managed coverage files."""
backend_cov = _load_backend_coverage(backend_report) if backend_report.exists() else {}
frontend_cov = _load_frontend_coverage(frontend_report) if frontend_report.exists() else {}
samples: list[float] = []
for path in paths:
if not path.exists():
continue
rel = path.relative_to(ROOT).as_posix() if path.is_absolute() else _normalize_key(str(path))
if rel.startswith("backend/"):
metrics = _coverage_lookup(backend_cov, rel)
if not metrics:
continue
samples.append(float(metrics.get("lines", 0.0)))
elif rel.startswith("frontend/"):
lookup = rel.split("frontend/", 1)[1]
metrics = _coverage_lookup(frontend_cov, lookup)
if not metrics:
continue
lines = metrics.get("lines")
if isinstance(lines, dict):
samples.append(float(lines.get("pct", 0.0)))
if not samples:
return 0.0
return round(sum(samples) / len(samples), 3)
def _build_parser() -> argparse.ArgumentParser: def _build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run the repo's unified quality gate") parser = argparse.ArgumentParser(description="Run the repo's unified quality gate")
parser.add_argument("--contract", default=str(DEFAULT_CONTRACT), help="Path to the JSON gate contract") parser.add_argument("--contract", default=str(DEFAULT_CONTRACT), help="Path to the JSON gate contract")
@ -239,15 +274,30 @@ def run_gate(contract_path: Path, *, backend_coverage: Path, frontend_coverage:
threshold = float(contract.get("coverage_threshold_pct", 95)) threshold = float(contract.get("coverage_threshold_pct", 95))
issues: list[GateIssue] = [] issues: list[GateIssue] = []
issues.extend(check_file_sizes(managed_files, max_lines=max_lines)) loc_issues = check_file_sizes(managed_files, max_lines=max_lines)
issues.extend(check_docstrings(docstring_files)) doc_issues = check_docstrings(docstring_files)
issues.extend(check_coverage(coverage_files, backend_report=backend_coverage, frontend_report=frontend_coverage, threshold=threshold)) coverage_issues = check_coverage(
coverage_files,
backend_report=backend_coverage,
frontend_report=frontend_coverage,
threshold=threshold,
)
issues.extend(loc_issues)
issues.extend(doc_issues)
issues.extend(coverage_issues)
workspace_line_coverage = compute_workspace_line_coverage(
coverage_files,
backend_report=backend_coverage,
frontend_report=frontend_coverage,
)
report = { report = {
"managed_files": [str(path.relative_to(ROOT)) for path in managed_files], "managed_files": [str(path.relative_to(ROOT)) for path in managed_files],
"docstring_files": [str(path.relative_to(ROOT)) for path in docstring_files], "docstring_files": [str(path.relative_to(ROOT)) for path in docstring_files],
"coverage_files": [str(path.relative_to(ROOT)) for path in coverage_files], "coverage_files": [str(path.relative_to(ROOT)) for path in coverage_files],
"max_lines": max_lines, "max_lines": max_lines,
"coverage_threshold_pct": threshold, "coverage_threshold_pct": threshold,
"workspace_line_coverage_percent": workspace_line_coverage,
"source_lines_over_500": len(loc_issues),
"issue_count": len(issues), "issue_count": len(issues),
"issues": [issue.__dict__ for issue in issues], "issues": [issue.__dict__ for issue in issues],
} }

View File

@ -10,6 +10,11 @@ from pathlib import Path
from typing import Iterable from typing import Iterable
def _escape_label(value: str) -> str:
"""Escape text for safe Prometheus label emission."""
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
@dataclass(frozen=True) @dataclass(frozen=True)
class RunSummary: class RunSummary:
"""Aggregate counts from a collection of JUnit XML files.""" """Aggregate counts from a collection of JUnit XML files."""
@ -48,6 +53,32 @@ def load_junit_summary(paths: Iterable[Path]) -> RunSummary:
return RunSummary(**totals) return RunSummary(**totals)
def load_junit_cases(paths: Iterable[Path]) -> list[tuple[str, str]]:
"""Collect testcase-level statuses for flaky-test visibility panels."""
cases: list[tuple[str, str]] = []
for path in paths:
if not path.exists():
continue
root = ET.parse(path).getroot()
suites = [root] if root.tag == "testsuite" else list(root.findall("testsuite")) if root.tag == "testsuites" else []
for suite in suites:
for case in suite.findall("testcase"):
name = (case.attrib.get("name") or "").strip()
classname = (case.attrib.get("classname") or "").strip()
if not name:
continue
test_id = f"{classname}::{name}" if classname else name
status = "passed"
if case.find("failure") is not None:
status = "failed"
elif case.find("error") is not None:
status = "error"
elif case.find("skipped") is not None:
status = "skipped"
cases.append((test_id, status))
return cases
def read_pushgateway_counters(text: str, *, suite: str, job: str) -> dict[str, float]: def read_pushgateway_counters(text: str, *, suite: str, job: str) -> dict[str, float]:
"""Read the current quality-gate counters for a suite from Pushgateway text.""" """Read the current quality-gate counters for a suite from Pushgateway text."""
@ -63,10 +94,19 @@ def read_pushgateway_counters(text: str, *, suite: str, job: str) -> dict[str, f
return counters return counters
def render_payload(*, suite: str, ok: int, failed: int, summary: RunSummary) -> str: def render_payload(
*,
suite: str,
ok: int,
failed: int,
summary: RunSummary,
workspace_line_coverage_percent: float = 0.0,
source_lines_over_500: int = 0,
checks: dict[str, str] | None = None,
test_cases: list[tuple[str, str]] | None = None,
) -> str:
"""Render the Pushgateway payload for the quality-gate counters.""" """Render the Pushgateway payload for the quality-gate counters."""
payload = (
return (
"# TYPE platform_quality_gate_runs_total counter\n" "# TYPE platform_quality_gate_runs_total counter\n"
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok}\n' f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok}\n'
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed}\n' f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed}\n'
@ -75,10 +115,38 @@ def render_payload(*, suite: str, ok: int, failed: int, summary: RunSummary) ->
f'bstein_home_quality_gate_tests_total{{suite="{suite}",result="failed"}} {summary.failures}\n' f'bstein_home_quality_gate_tests_total{{suite="{suite}",result="failed"}} {summary.failures}\n'
f'bstein_home_quality_gate_tests_total{{suite="{suite}",result="error"}} {summary.errors}\n' f'bstein_home_quality_gate_tests_total{{suite="{suite}",result="error"}} {summary.errors}\n'
f'bstein_home_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {summary.skipped}\n' f'bstein_home_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {summary.skipped}\n'
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge\n"
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}\n'
"# TYPE platform_quality_gate_source_lines_over_500_total gauge\n"
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {int(source_lines_over_500)}\n'
) )
if checks:
payload += "# TYPE bstein_home_quality_gate_checks_total gauge\n"
payload += "".join(
f'bstein_home_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1\n'
for check_name, check_status in checks.items()
)
if test_cases:
payload += "# TYPE platform_quality_gate_test_case_result gauge\n"
payload += "".join(
f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1\n'
for test_name, test_status in test_cases
)
return payload
def publish_quality_metrics(*, gateway: str, suite: str, job: str, status: str, summary: RunSummary) -> None: def publish_quality_metrics(
*,
gateway: str,
suite: str,
job: str,
status: str,
summary: RunSummary,
workspace_line_coverage_percent: float = 0.0,
source_lines_over_500: int = 0,
checks: dict[str, str] | None = None,
test_cases: list[tuple[str, str]] | None = None,
) -> None:
"""Publish run and test totals to Pushgateway.""" """Publish run and test totals to Pushgateway."""
gateway = gateway.rstrip("/") gateway = gateway.rstrip("/")
@ -89,11 +157,20 @@ def publish_quality_metrics(*, gateway: str, suite: str, job: str, status: str,
else: else:
counters["failed"] += 1 counters["failed"] += 1
payload = render_payload(suite=suite, ok=int(counters["ok"]), failed=int(counters["failed"]), summary=summary) payload = render_payload(
suite=suite,
ok=int(counters["ok"]),
failed=int(counters["failed"]),
summary=summary,
workspace_line_coverage_percent=workspace_line_coverage_percent,
source_lines_over_500=source_lines_over_500,
checks=checks,
test_cases=test_cases,
)
req = urllib.request.Request( req = urllib.request.Request(
f"{gateway}/metrics/job/{job}/suite/{suite}", f"{gateway}/metrics/job/{job}/suite/{suite}",
data=payload.encode("utf-8"), data=payload.encode("utf-8"),
method="POST", method="PUT",
headers={"Content-Type": "text/plain"}, headers={"Content-Type": "text/plain"},
) )
urllib.request.urlopen(req, timeout=10).read() urllib.request.urlopen(req, timeout=10).read()

View File

@ -2,7 +2,7 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from testing.ci.summary import RunSummary, load_junit_summary, render_payload from testing.ci.summary import RunSummary, load_junit_cases, load_junit_summary, render_payload
def test_load_junit_summary_combines_suites(tmp_path: Path) -> None: def test_load_junit_summary_combines_suites(tmp_path: Path) -> None:
@ -14,6 +14,34 @@ def test_load_junit_summary_combines_suites(tmp_path: Path) -> None:
summary = load_junit_summary([junit]) summary = load_junit_summary([junit])
assert summary == RunSummary(tests=3, failures=1, errors=0, skipped=1) assert summary == RunSummary(tests=3, failures=1, errors=0, skipped=1)
payload = render_payload(suite="bstein-home", ok=2, failed=0, summary=summary) payload = render_payload(suite="bstein_home", ok=2, failed=0, summary=summary)
assert 'platform_quality_gate_runs_total{suite="bstein-home",status="ok"} 2' in payload assert 'platform_quality_gate_runs_total{suite="bstein_home",status="ok"} 2' in payload
assert 'bstein_home_quality_gate_tests_total{suite="bstein-home",result="skipped"} 1' in payload assert 'bstein_home_quality_gate_tests_total{suite="bstein_home",result="skipped"} 1' in payload
assert 'platform_quality_gate_workspace_line_coverage_percent{suite="bstein_home"} 0.000' in payload
assert 'platform_quality_gate_source_lines_over_500_total{suite="bstein_home"} 0' in payload
def test_load_junit_cases_and_render_test_case_metrics(tmp_path: Path) -> None:
junit = tmp_path / "cases.xml"
junit.write_text(
(
"<testsuite>"
'<testcase classname="app.health" name="test_ok" />'
'<testcase classname="app.health" name="test_fail"><failure/></testcase>'
"</testsuite>"
),
encoding="utf-8",
)
cases = load_junit_cases([junit])
assert ("app.health::test_ok", "passed") in cases
assert ("app.health::test_fail", "failed") in cases
payload = render_payload(
suite="bstein_home",
ok=1,
failed=0,
summary=RunSummary(tests=2, failures=1, errors=0, skipped=0),
test_cases=cases,
)
assert 'platform_quality_gate_test_case_result{suite="bstein_home",test="app.health::test_fail",status="failed"} 1' in payload