335 lines
12 KiB
Python
335 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Publish Metis quality-gate test metrics to Pushgateway (Prometheus ingest)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
import urllib.request
|
|
import xml.etree.ElementTree as ET
|
|
|
|
QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"}
|
|
|
|
|
|
def _escape_label(value: str) -> str:
|
|
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
|
|
|
|
|
|
def _label_str(labels: dict[str, str]) -> str:
|
|
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
|
|
return "{" + ",".join(parts) + "}" if parts else ""
|
|
|
|
|
|
def _load_coverage(path: str) -> float:
|
|
with open(path, "r", encoding="utf-8") as handle:
|
|
payload = json.load(handle)
|
|
summary = payload.get("summary") or {}
|
|
percent = summary.get("percent_covered")
|
|
if isinstance(percent, (int, float)):
|
|
return float(percent)
|
|
raise RuntimeError("coverage summary missing percent_covered")
|
|
|
|
|
|
def _load_junit(path: str) -> dict[str, int]:
|
|
tree = ET.parse(path)
|
|
root = tree.getroot()
|
|
|
|
def _as_int(node: ET.Element, name: str) -> int:
|
|
raw = node.attrib.get(name) or "0"
|
|
try:
|
|
return int(float(raw))
|
|
except ValueError:
|
|
return 0
|
|
|
|
if root.tag == "testsuite":
|
|
suites = [root]
|
|
elif root.tag == "testsuites":
|
|
suites = list(root.findall("testsuite"))
|
|
else:
|
|
suites = []
|
|
|
|
totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
|
|
for suite in suites:
|
|
totals["tests"] += _as_int(suite, "tests")
|
|
totals["failures"] += _as_int(suite, "failures")
|
|
totals["errors"] += _as_int(suite, "errors")
|
|
totals["skipped"] += _as_int(suite, "skipped")
|
|
return totals
|
|
|
|
|
|
def _load_junit_cases(path: str) -> list[tuple[str, str]]:
|
|
tree = ET.parse(path)
|
|
root = tree.getroot()
|
|
if root.tag == "testsuite":
|
|
suites = [root]
|
|
elif root.tag == "testsuites":
|
|
suites = list(root.findall("testsuite"))
|
|
else:
|
|
suites = []
|
|
|
|
cases: list[tuple[str, str]] = []
|
|
for suite in suites:
|
|
for case in suite.findall("testcase"):
|
|
name = (case.attrib.get("name") or "").strip()
|
|
classname = (case.attrib.get("classname") or "").strip()
|
|
if not name:
|
|
continue
|
|
test_id = f"{classname}::{name}" if classname else name
|
|
status = "passed"
|
|
if case.find("failure") is not None:
|
|
status = "failed"
|
|
elif case.find("error") is not None:
|
|
status = "error"
|
|
elif case.find("skipped") is not None:
|
|
status = "skipped"
|
|
cases.append((test_id, status))
|
|
return cases
|
|
|
|
|
|
def _load_exit_code(path: str) -> int | None:
|
|
if not path or not os.path.exists(path):
|
|
return None
|
|
with open(path, "r", encoding="utf-8") as handle:
|
|
raw = handle.read().strip()
|
|
if not raw:
|
|
return None
|
|
try:
|
|
return int(raw)
|
|
except ValueError as exc:
|
|
raise RuntimeError(f"invalid test exit code {raw!r} in {path}") from exc
|
|
|
|
|
|
def _post_text(url: str, payload: str) -> None:
|
|
req = urllib.request.Request(
|
|
url,
|
|
data=payload.encode("utf-8"),
|
|
method="PUT",
|
|
headers={"Content-Type": "text/plain"},
|
|
)
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
if resp.status >= 400:
|
|
raise RuntimeError(f"metrics push failed status={resp.status}")
|
|
|
|
|
|
def _read_http(url: str) -> str:
|
|
try:
|
|
with urllib.request.urlopen(url, timeout=10) as resp:
|
|
return resp.read().decode("utf-8", errors="replace")
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
|
|
text = _read_http(f"{pushgateway_url.rstrip('/')}/metrics")
|
|
if not text:
|
|
return 0.0
|
|
for line in text.splitlines():
|
|
if not line.startswith(metric + "{"):
|
|
continue
|
|
if any(f'{key}="{value}"' not in line for key, value in labels.items()):
|
|
continue
|
|
parts = line.split()
|
|
if len(parts) < 2:
|
|
continue
|
|
try:
|
|
return float(parts[1])
|
|
except ValueError:
|
|
return 0.0
|
|
return 0.0
|
|
|
|
|
|
def _count_source_files_over_limit(repo_root: Path, max_lines: int = 500) -> int:
|
|
"""Count source files above the configured line budget."""
|
|
|
|
count = 0
|
|
for rel_root in ("cmd", "pkg", "scripts", "testing"):
|
|
base = repo_root / rel_root
|
|
if not base.exists():
|
|
continue
|
|
for path in base.rglob("*"):
|
|
if not path.is_file():
|
|
continue
|
|
if path.suffix not in {".go", ".py", ".sh"}:
|
|
continue
|
|
lines = len(path.read_text(encoding="utf-8", errors="ignore").splitlines())
|
|
if lines > max_lines:
|
|
count += 1
|
|
return count
|
|
|
|
|
|
def _load_json(path: Path) -> dict | None:
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
return None
|
|
return payload if isinstance(payload, dict) else None
|
|
|
|
|
|
def _sonarqube_check_status(build_dir: Path) -> str:
|
|
report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json"))))
|
|
if not report:
|
|
return "not_applicable"
|
|
status_candidates = [
|
|
report.get("status"),
|
|
((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None),
|
|
((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None),
|
|
]
|
|
for value in status_candidates:
|
|
if isinstance(value, str):
|
|
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
|
|
return "failed"
|
|
|
|
|
|
def _supply_chain_check_status(build_dir: Path) -> str:
|
|
report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json"))))
|
|
if not report:
|
|
return "not_applicable"
|
|
compliant = report.get("compliant")
|
|
if isinstance(compliant, bool):
|
|
return "ok" if compliant else "failed"
|
|
status_candidates = [report.get("status"), report.get("result"), report.get("compliance")]
|
|
for value in status_candidates:
|
|
if isinstance(value, str):
|
|
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
|
|
return "failed"
|
|
|
|
|
|
def main() -> int:
|
|
coverage_path = os.getenv("COVERAGE_JSON", "build/coverage.json")
|
|
junit_path = os.getenv("JUNIT_XML", "build/junit.xml")
|
|
test_exit_code_path = os.getenv("TEST_EXIT_CODE_PATH", "build/test.exitcode")
|
|
docs_exit_code_path = os.getenv("DOCS_EXIT_CODE_PATH", "build/docs-naming.rc")
|
|
pushgateway_url = os.getenv(
|
|
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
|
|
).strip()
|
|
suite = os.getenv("SUITE_NAME", "metis")
|
|
branch = os.getenv("BRANCH_NAME") or os.getenv("GIT_BRANCH") or "unknown"
|
|
if branch.startswith("origin/"):
|
|
branch = branch[len("origin/") :]
|
|
build_number = os.getenv("BUILD_NUMBER", "")
|
|
commit = os.getenv("GIT_COMMIT", "")
|
|
strict = os.getenv("METRICS_STRICT", "") == "1"
|
|
repo_root = Path(__file__).resolve().parents[1]
|
|
build_dir = repo_root / "build"
|
|
|
|
if not os.path.exists(coverage_path):
|
|
raise RuntimeError(f"missing coverage file {coverage_path}")
|
|
if not os.path.exists(junit_path):
|
|
raise RuntimeError(f"missing junit file {junit_path}")
|
|
|
|
coverage = _load_coverage(coverage_path)
|
|
totals = _load_junit(junit_path)
|
|
test_cases = _load_junit_cases(junit_path)
|
|
test_exit_code = _load_exit_code(test_exit_code_path)
|
|
docs_exit_code = _load_exit_code(docs_exit_code_path)
|
|
source_lines_over_500 = _count_source_files_over_limit(repo_root, max_lines=500)
|
|
passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0)
|
|
|
|
outcome = "ok"
|
|
if (
|
|
(test_exit_code is not None and test_exit_code != 0)
|
|
or totals["tests"] <= 0
|
|
or totals["failures"] > 0
|
|
or totals["errors"] > 0
|
|
):
|
|
outcome = "failed"
|
|
checks = {
|
|
"tests": "ok" if outcome == "ok" else "failed",
|
|
"coverage": "ok" if coverage >= 95.0 else "failed",
|
|
"loc": "ok" if source_lines_over_500 == 0 else "failed",
|
|
"docs_naming": "ok" if docs_exit_code == 0 else "failed",
|
|
"gate_glue": "ok",
|
|
"sonarqube": _sonarqube_check_status(build_dir),
|
|
"supply_chain": _supply_chain_check_status(build_dir),
|
|
}
|
|
ok_count = _fetch_existing_counter(
|
|
pushgateway_url,
|
|
"platform_quality_gate_runs_total",
|
|
{"job": "platform-quality-ci", "suite": suite, "status": "ok"},
|
|
)
|
|
failed_count = _fetch_existing_counter(
|
|
pushgateway_url,
|
|
"platform_quality_gate_runs_total",
|
|
{"job": "platform-quality-ci", "suite": suite, "status": "failed"},
|
|
)
|
|
if outcome == "ok":
|
|
ok_count += 1
|
|
else:
|
|
failed_count += 1
|
|
|
|
labels = {
|
|
"job": "platform-quality-ci",
|
|
"suite": suite,
|
|
"branch": branch,
|
|
"build_number": build_number,
|
|
"commit": commit,
|
|
}
|
|
payload_lines = [
|
|
"# TYPE platform_quality_gate_runs_total counter",
|
|
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}',
|
|
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}',
|
|
"# TYPE metis_quality_gate_tests_total gauge",
|
|
f'metis_quality_gate_tests_total{{suite="{suite}",result="total"}} {totals["tests"]}',
|
|
f'metis_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}',
|
|
f'metis_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}',
|
|
f'metis_quality_gate_tests_total{{suite="{suite}",result="error"}} {totals["errors"]}',
|
|
f'metis_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {totals["skipped"]}',
|
|
"# TYPE metis_quality_gate_run_status gauge",
|
|
f'metis_quality_gate_run_status{{suite="{suite}",status="ok"}} {1 if outcome == "ok" else 0}',
|
|
f'metis_quality_gate_run_status{{suite="{suite}",status="failed"}} {1 if outcome == "failed" else 0}',
|
|
"# TYPE metis_quality_gate_coverage_percent gauge",
|
|
f'metis_quality_gate_coverage_percent{{suite="{suite}"}} {coverage:.3f}',
|
|
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
|
|
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage:.3f}',
|
|
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
|
|
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
|
|
"# TYPE platform_quality_gate_build_info gauge",
|
|
f"platform_quality_gate_build_info{_label_str(labels)} 1",
|
|
"# TYPE metis_quality_gate_checks_total gauge",
|
|
"# TYPE platform_quality_gate_test_case_result gauge",
|
|
"# TYPE metis_quality_gate_build_info gauge",
|
|
f"metis_quality_gate_build_info{_label_str(labels)} 1",
|
|
]
|
|
payload_lines.extend(
|
|
f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1'
|
|
for test_name, test_status in test_cases
|
|
)
|
|
payload_lines.extend(
|
|
f'metis_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1'
|
|
for check_name, check_status in checks.items()
|
|
)
|
|
payload = "\n".join(payload_lines) + "\n"
|
|
|
|
try:
|
|
_post_text(f"{pushgateway_url.rstrip('/')}/metrics/job/{labels['job']}/suite/{suite}", payload)
|
|
except Exception as exc:
|
|
print(f"metrics push failed: {exc}")
|
|
if strict:
|
|
raise
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"suite": suite,
|
|
"outcome": outcome,
|
|
"tests_total": totals["tests"],
|
|
"tests_passed": passed,
|
|
"tests_failed": totals["failures"],
|
|
"tests_errors": totals["errors"],
|
|
"tests_skipped": totals["skipped"],
|
|
"coverage_percent": round(coverage, 3),
|
|
"source_lines_over_500": source_lines_over_500,
|
|
"test_exit_code": test_exit_code,
|
|
},
|
|
indent=2,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|