atlasbot/scripts/publish_test_metrics.py

293 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""Publish Atlasbot CI test metrics to Pushgateway.
Inputs:
- JUnit XML file and coverage JSON file.
Outputs:
- platform_quality_gate_runs_total{suite="atlasbot",status="ok|failed"}
- atlasbot_quality_gate_tests_total{suite="atlasbot",result=*}
- atlasbot_quality_gate_coverage_percent{suite="atlasbot"}
- platform_quality_gate_workspace_line_coverage_percent{suite="atlasbot"}
- platform_quality_gate_source_lines_over_500_total{suite="atlasbot"}
"""
from __future__ import annotations
import json
import os
import urllib.request
import xml.etree.ElementTree as ET
from pathlib import Path
QUALITY_SUCCESS_STATES = {"ok", "pass", "passed", "success", "compliant"}
def _escape_label(value: str) -> str:
"""Escape Prometheus label values safely."""
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
def _as_int(node: ET.Element, name: str) -> int:
raw = node.attrib.get(name) or "0"
try:
return int(float(raw))
except ValueError:
return 0
def _load_junit(path: Path) -> dict[str, int]:
if not path.exists():
return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
tree = ET.parse(path)
root = tree.getroot()
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
elif root.tag == "testsuites":
suites = list(root.findall("testsuite"))
else:
suites = []
totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
for suite in suites:
totals["tests"] += _as_int(suite, "tests")
totals["failures"] += _as_int(suite, "failures")
totals["errors"] += _as_int(suite, "errors")
totals["skipped"] += _as_int(suite, "skipped")
return totals
def _load_junit_cases(path: Path) -> list[tuple[str, str]]:
if not path.exists():
return []
tree = ET.parse(path)
root = tree.getroot()
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
elif root.tag == "testsuites":
suites = list(root.findall("testsuite"))
else:
suites = []
cases: list[tuple[str, str]] = []
for suite in suites:
for case in suite.findall("testcase"):
name = (case.attrib.get("name") or "").strip()
classname = (case.attrib.get("classname") or "").strip()
if not name:
continue
test_id = f"{classname}::{name}" if classname else name
status = "passed"
if case.find("failure") is not None:
status = "failed"
elif case.find("error") is not None:
status = "error"
elif case.find("skipped") is not None:
status = "skipped"
cases.append((test_id, status))
return cases
def _load_coverage_percent(path: Path) -> float:
if not path.exists():
return 0.0
payload = json.loads(path.read_text(encoding="utf-8"))
summary = payload.get("summary") or {}
percent = summary.get("percent_covered")
if isinstance(percent, (int, float)):
return float(percent)
return 0.0
def _load_gate_rc(path: Path) -> int | None:
if not path.exists():
return None
raw = path.read_text(encoding="utf-8").strip()
if not raw:
return None
try:
return int(raw)
except ValueError:
return None
def _count_source_lines_over_500(root: Path) -> int:
if not root.exists():
return 0
over = 0
for path in root.rglob("*.py"):
if not path.is_file():
continue
line_count = sum(1 for _ in path.open("r", encoding="utf-8"))
if line_count > 500:
over += 1
return over
def _load_json(path: Path) -> dict | None:
if not path.exists():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
return payload if isinstance(payload, dict) else None
def _sonarqube_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", str(build_dir / "sonarqube-quality-gate.json"))))
if not report:
return "not_applicable"
status_candidates = [
report.get("status"),
((report.get("projectStatus") or {}).get("status") if isinstance(report.get("projectStatus"), dict) else None),
((report.get("qualityGate") or {}).get("status") if isinstance(report.get("qualityGate"), dict) else None),
]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def _supply_chain_check_status(build_dir: Path) -> str:
report = _load_json(Path(os.getenv("QUALITY_GATE_IRONBANK_REPORT", str(build_dir / "ironbank-compliance.json"))))
if not report:
return "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status_candidates = [report.get("status"), report.get("result"), report.get("compliance")]
for value in status_candidates:
if isinstance(value, str):
return "ok" if value.strip().lower() in QUALITY_SUCCESS_STATES else "failed"
return "failed"
def _read_text(url: str) -> str:
try:
with urllib.request.urlopen(url, timeout=10) as resp:
return resp.read().decode("utf-8", errors="replace")
except Exception:
return ""
def _counter(metrics: str, suite: str, status: str) -> float:
for line in metrics.splitlines():
if not line.startswith("platform_quality_gate_runs_total{"):
continue
if 'job="platform-quality-ci"' not in line:
continue
if f'suite="{suite}"' not in line:
continue
if f'status="{status}"' not in line:
continue
parts = line.split()
if len(parts) < 2:
continue
try:
return float(parts[1])
except ValueError:
return 0.0
return 0.0
def _post_text(url: str, payload: str) -> None:
req = urllib.request.Request(
url,
data=payload.encode("utf-8"),
method="PUT",
headers={"Content-Type": "text/plain"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status >= 400:
raise RuntimeError(f"push failed status={resp.status}")
def main() -> int:
suite = os.getenv("SUITE_NAME", "atlasbot")
pushgateway_url = os.getenv(
"PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091"
).rstrip("/")
junit_path = Path(os.getenv("JUNIT_PATH", "build/junit.xml"))
coverage_path = Path(os.getenv("COVERAGE_PATH", "build/coverage.json"))
gate_rc_path = Path(os.getenv("QUALITY_GATE_RC_PATH", "build/quality-gate.rc"))
docs_rc_path = Path(os.getenv("QUALITY_GATE_DOCS_RC_PATH", "build/docs-naming.rc"))
source_root = Path(os.getenv("SOURCE_ROOT", "atlasbot"))
build_dir = Path(os.getenv("BUILD_DIR", "build"))
totals = _load_junit(junit_path)
test_cases = _load_junit_cases(junit_path)
coverage_pct = _load_coverage_percent(coverage_path)
gate_rc = _load_gate_rc(gate_rc_path)
docs_rc = _load_gate_rc(docs_rc_path)
source_lines_over_500 = _count_source_lines_over_500(source_root)
passed = max(totals["tests"] - totals["failures"] - totals["errors"] - totals["skipped"], 0)
outcome = "ok" if totals["tests"] > 0 and totals["failures"] == 0 and totals["errors"] == 0 else "failed"
if gate_rc is not None and gate_rc != 0:
outcome = "failed"
checks = {
"tests": "ok" if outcome == "ok" else "failed",
"coverage": "ok" if coverage_pct >= 95.0 else "failed",
"loc": "ok" if source_lines_over_500 == 0 else "failed",
"docs_naming": "ok" if docs_rc == 0 else "failed",
"gate_glue": "ok",
"sonarqube": _sonarqube_check_status(build_dir),
"supply_chain": _supply_chain_check_status(build_dir),
}
metrics = _read_text(f"{pushgateway_url}/metrics")
ok_count = _counter(metrics, suite, "ok")
failed_count = _counter(metrics, suite, "failed")
if outcome == "ok":
ok_count += 1
else:
failed_count += 1
payload = "\n".join(
[
"# TYPE platform_quality_gate_runs_total counter",
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count:.0f}',
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count:.0f}',
"# TYPE atlasbot_quality_gate_tests_total gauge",
f'atlasbot_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}',
f'atlasbot_quality_gate_tests_total{{suite="{suite}",result="failed"}} {totals["failures"]}',
f'atlasbot_quality_gate_tests_total{{suite="{suite}",result="error"}} {totals["errors"]}',
f'atlasbot_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {totals["skipped"]}',
"# TYPE atlasbot_quality_gate_coverage_percent gauge",
f'atlasbot_quality_gate_coverage_percent{{suite="{suite}"}} {coverage_pct:.3f}',
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {coverage_pct:.3f}',
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
"# TYPE atlasbot_quality_gate_checks_total gauge",
"# TYPE platform_quality_gate_test_case_result gauge",
]
) + "\n"
if test_cases:
payload += "\n".join(
f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1'
for test_name, test_status in test_cases
) + "\n"
else:
payload += f'platform_quality_gate_test_case_result{{suite="{suite}",test="__no_test_cases__",status="skipped"}} 1\n'
payload += "\n".join(
f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1'
for test_name, test_status in test_cases
) + "\n"
payload += "\n".join(
f'atlasbot_quality_gate_checks_total{{suite="{suite}",check="{check_name}",result="{check_status}"}} 1'
for check_name, check_status in checks.items()
) + "\n"
_post_text(f"{pushgateway_url}/metrics/job/platform-quality-ci/suite/{suite}", payload)
return 0
if __name__ == "__main__":
raise SystemExit(main())