titan-iac/ci/scripts/publish_test_metrics.py

511 lines
19 KiB
Python

#!/usr/bin/env python3
"""Publish titan-iac quality-gate results to Pushgateway."""
from __future__ import annotations
import json
import os
from glob import glob
from pathlib import Path
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
CANONICAL_CHECKS = [
"tests",
"coverage",
"loc",
"docs_naming",
"gate_glue",
"sonarqube",
"supply_chain",
]
def _escape_label(value: str) -> str:
"""Escape a Prometheus label value without changing its content."""
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
def _label_str(labels: dict[str, str]) -> str:
"""Render a stable Prometheus label set from a mapping."""
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
return "{" + ",".join(parts) + "}" if parts else ""
def _read_text(url: str) -> str:
"""Fetch a plain-text response body from the given URL."""
with urllib.request.urlopen(url, timeout=10) as response:
return response.read().decode("utf-8")
def _post_text(url: str, payload: str) -> None:
"""PUT a plain-text payload and fail on any 4xx/5xx response."""
request = urllib.request.Request(
url,
data=payload.encode("utf-8"),
method="PUT",
headers={"Content-Type": "text/plain"},
)
with urllib.request.urlopen(request, timeout=10) as response:
if response.status >= 400:
raise RuntimeError(f"push failed with status={response.status}")
def _parse_junit(path: str) -> dict[str, int]:
"""Parse a JUnit XML file into aggregate test counters."""
if not os.path.exists(path):
return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
tree = ET.parse(path)
root = tree.getroot()
totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
elif root.tag == "testsuites":
suites = [elem for elem in root if elem.tag == "testsuite"]
else:
suites = []
for suite in suites:
for key in totals:
raw_value = suite.attrib.get(key, "0")
try:
totals[key] += int(float(raw_value))
except ValueError:
totals[key] += 0
return totals
def _collect_junit_totals(pattern: str) -> dict[str, int]:
"""Sum JUnit counters across every XML file matching the pattern."""
totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
for path in sorted(glob(pattern)):
parsed = _parse_junit(path)
for key in totals:
totals[key] += parsed[key]
return totals
def _collect_junit_cases(pattern: str) -> list[tuple[str, str]]:
"""Collect individual JUnit test-case statuses for flaky-test trend panels."""
cases: list[tuple[str, str]] = []
for path in sorted(glob(pattern)):
if not os.path.exists(path):
continue
root = ET.parse(path).getroot()
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
elif root.tag == "testsuites":
suites = [elem for elem in root if elem.tag == "testsuite"]
else:
suites = []
for suite in suites:
for test_case in suite.findall("testcase"):
case_name = test_case.attrib.get("name", "").strip()
class_name = test_case.attrib.get("classname", "").strip()
if not case_name:
continue
full_name = f"{class_name}.{case_name}" if class_name else case_name
status = "passed"
if test_case.find("failure") is not None or test_case.find("error") is not None:
status = "failed"
elif test_case.find("skipped") is not None:
status = "skipped"
cases.append((full_name, status))
return cases
def _read_exit_code(path: str) -> int:
"""Read the quality-gate exit code, defaulting to failure if missing."""
try:
with open(path, "r", encoding="utf-8") as handle:
return int(handle.read().strip())
except (FileNotFoundError, ValueError):
return 1
def _load_summary(path: str) -> dict:
"""Load the JSON quality-gate summary, returning an empty mapping on error."""
try:
with open(path, "r", encoding="utf-8") as handle:
return json.load(handle)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def _summary_float(summary: dict, key: str) -> float:
"""Extract a float-like value from the summary, defaulting to 0.0."""
value = summary.get(key)
if isinstance(value, (int, float)):
return float(value)
return 0.0
def _summary_int(summary: dict, key: str) -> int:
"""Extract an int-like value from the summary, defaulting to 0."""
value = summary.get(key)
if isinstance(value, int):
return value
if isinstance(value, float):
return int(value)
return 0
def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float:
"""Infer workspace line coverage from quality summary coverage XML metadata."""
results = summary.get("results", []) if isinstance(summary, dict) else []
coverage_xml = default_xml
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() != "coverage":
continue
candidate = str(result.get("coverage_xml") or "").strip()
if candidate:
coverage_xml = candidate
break
xml_path = Path(coverage_xml)
if not xml_path.exists():
return 0.0
try:
root = ET.parse(xml_path).getroot()
line_rate = root.attrib.get("line-rate")
if line_rate is None:
return 0.0
return float(line_rate) * 100.0
except (ET.ParseError, OSError, ValueError):
return 0.0
def _infer_source_lines_over_500(summary: dict) -> int:
"""Infer over-limit source file count from hygiene issue payloads."""
results = summary.get("results", []) if isinstance(summary, dict) else []
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}:
continue
issues = result.get("issues")
if not isinstance(issues, list):
continue
return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds"))
return 0
def _normalize_result_status(value: str | None, default: str = "failed") -> str:
"""Map arbitrary check status text into canonical check result buckets."""
if not value:
return default
normalized = value.strip().lower()
if normalized in SUCCESS_STATUSES:
return "ok"
if normalized in NOT_APPLICABLE_STATUSES:
return "not_applicable"
if normalized in FAILED_STATUSES:
return "failed"
return default
def _load_optional_json(path: str | None) -> dict:
"""Load an optional JSON report file, returning an empty object when absent."""
if not path:
return {}
candidate = Path(path)
if not candidate.exists():
return {}
try:
return json.loads(candidate.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def _combine_statuses(statuses: list[str]) -> str:
"""Roll up many check statuses into one canonical result."""
if not statuses:
return "not_applicable"
if any(status == "failed" for status in statuses):
return "failed"
if all(status == "not_applicable" for status in statuses):
return "not_applicable"
if all(status in {"ok", "not_applicable"} for status in statuses):
return "ok"
return "failed"
def _infer_sonarqube_status(report: dict) -> str:
"""Infer canonical SonarQube check status from its JSON report payload."""
if not report:
return "not_applicable"
status = (
report.get("projectStatus", {}).get("status")
or report.get("qualityGate", {}).get("status")
or report.get("status")
)
return _normalize_result_status(str(status) if status is not None else None, default="failed")
def _infer_supply_chain_status(report: dict, required: bool) -> str:
"""Infer canonical supply-chain status from IronBank/artifact report payload."""
if not report:
return "failed" if required else "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status = report.get("status")
if status is None:
return "failed" if required else "not_applicable"
normalized = _normalize_result_status(str(status), default="failed")
if normalized == "not_applicable" and required:
return "failed"
return normalized
def _build_check_statuses(
summary: dict | None,
tests: dict[str, int],
workspace_line_coverage_percent: float,
source_lines_over_500: int,
sonarqube_report: dict,
supply_chain_report: dict,
supply_chain_required: bool,
) -> dict[str, str]:
"""Generate the canonical quality-check status map for dashboarding."""
raw_results = summary.get("results", []) if isinstance(summary, dict) else []
status_by_name: dict[str, str] = {}
for result in raw_results:
if not isinstance(result, dict):
continue
check_name = str(result.get("name") or "").strip().lower()
if not check_name:
continue
status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
# tests
tests_status = status_by_name.get("tests")
if not tests_status:
candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
if candidates:
tests_status = _combine_statuses(candidates)
elif tests["tests"] > 0:
tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
else:
tests_status = "not_applicable"
# coverage
coverage_status = status_by_name.get("coverage")
if not coverage_status:
if workspace_line_coverage_percent > 0:
coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
else:
coverage_status = "not_applicable"
# loc
loc_status = status_by_name.get("loc")
if not loc_status:
loc_status = "ok" if source_lines_over_500 == 0 else "failed"
# docs + naming + lint hygiene
docs_naming_status = status_by_name.get("docs_naming")
if not docs_naming_status:
candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
# gate glue
gate_glue_status = status_by_name.get("gate_glue")
if not gate_glue_status:
candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
supply_chain_report,
required=supply_chain_required,
)
return {
"tests": tests_status,
"coverage": coverage_status,
"loc": loc_status,
"docs_naming": docs_naming_status,
"gate_glue": gate_glue_status,
"sonarqube": sonarqube_status,
"supply_chain": supply_chain_status,
}
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
"""Return the current counter value for a labeled metric if present."""
text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics")
for line in text.splitlines():
if not line.startswith(metric + "{"):
continue
if any(f'{key}="{value}"' not in line for key, value in labels.items()):
continue
parts = line.split()
if len(parts) < 2:
continue
try:
return float(parts[1])
except ValueError:
return 0.0
return 0.0
def _build_payload(
suite: str,
status: str,
tests: dict[str, int],
test_cases: list[tuple[str, str]],
ok_count: int,
failed_count: int,
branch: str,
build_number: str,
summary: dict | None = None,
workspace_line_coverage_percent: float = 0.0,
source_lines_over_500: int = 0,
check_statuses: dict[str, str] | None = None,
) -> str:
"""Build the Pushgateway payload for the current suite run."""
passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0)
build_labels = _label_str(
{
"suite": suite,
"branch": branch or "unknown",
"build_number": build_number or "unknown",
}
)
lines = [
"# TYPE platform_quality_gate_runs_total counter",
f'platform_quality_gate_runs_total{{suite="{suite}",status="ok"}} {ok_count}',
f'platform_quality_gate_runs_total{{suite="{suite}",status="failed"}} {failed_count}',
"# TYPE titan_iac_quality_gate_tests_total gauge",
f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="passed"}} {passed}',
f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="failed"}} {tests["failures"]}',
f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="error"}} {tests["errors"]}',
f'titan_iac_quality_gate_tests_total{{suite="{suite}",result="skipped"}} {tests["skipped"]}',
"# TYPE titan_iac_quality_gate_run_status gauge",
f'titan_iac_quality_gate_run_status{{suite="{suite}",status="ok"}} {1 if status == "ok" else 0}',
f'titan_iac_quality_gate_run_status{{suite="{suite}",status="failed"}} {1 if status == "failed" else 0}',
"# TYPE titan_iac_quality_gate_build_info gauge",
f"titan_iac_quality_gate_build_info{build_labels} 1",
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}',
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
]
if check_statuses:
lines.append("# TYPE titan_iac_quality_gate_checks_total gauge")
for check_name in CANONICAL_CHECKS:
check_status = check_statuses.get(check_name, "not_applicable")
lines.append(
f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{_escape_label(check_status)}"}} 1'
)
if test_cases:
lines.append("# TYPE platform_quality_gate_test_case_result gauge")
for test_name, test_status in test_cases:
lines.append(
f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1'
)
return "\n".join(lines) + "\n"
def main() -> int:
"""Publish the quality-gate metrics and print a compact run summary."""
suite = os.getenv("SUITE_NAME", "titan_iac")
pushgateway_url = os.getenv("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091")
job_name = os.getenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci")
junit_glob = os.getenv("JUNIT_GLOB", os.getenv("JUNIT_PATH", "build/junit-*.xml"))
exit_code_path = os.getenv("QUALITY_GATE_EXIT_CODE_PATH", os.getenv("GLUE_EXIT_CODE_PATH", "build/quality-gate.rc"))
summary_path = os.getenv("QUALITY_GATE_SUMMARY_PATH", "build/quality-gate-summary.json")
branch = os.getenv("BRANCH_NAME", os.getenv("GIT_BRANCH", ""))
build_number = os.getenv("BUILD_NUMBER", "")
tests = _collect_junit_totals(junit_glob)
test_cases = _collect_junit_cases(junit_glob)
exit_code = _read_exit_code(exit_code_path)
status = "ok" if exit_code == 0 else "failed"
summary = _load_summary(summary_path)
workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent")
if workspace_line_coverage_percent <= 0:
workspace_line_coverage_percent = _infer_workspace_coverage_percent(summary, "build/coverage-unit.xml")
source_lines_over_500 = _summary_int(summary, "source_lines_over_500")
if source_lines_over_500 <= 0:
source_lines_over_500 = _infer_source_lines_over_500(summary)
sonarqube_report = _load_optional_json(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json"))
supply_chain_report = _load_optional_json(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json"))
supply_chain_required = os.getenv("QUALITY_GATE_IRONBANK_REQUIRED", "0").strip().lower() in {"1", "true", "yes", "on"}
check_statuses = _build_check_statuses(
summary=summary,
tests=tests,
workspace_line_coverage_percent=workspace_line_coverage_percent,
source_lines_over_500=source_lines_over_500,
sonarqube_report=sonarqube_report,
supply_chain_report=supply_chain_report,
supply_chain_required=supply_chain_required,
)
ok_count = int(
_fetch_existing_counter(
pushgateway_url,
"platform_quality_gate_runs_total",
{"job": job_name, "suite": suite, "status": "ok"},
)
)
failed_count = int(
_fetch_existing_counter(
pushgateway_url,
"platform_quality_gate_runs_total",
{"job": job_name, "suite": suite, "status": "failed"},
)
)
if status == "ok":
ok_count += 1
else:
failed_count += 1
payload = _build_payload(
suite=suite,
status=status,
tests=tests,
test_cases=test_cases,
ok_count=ok_count,
failed_count=failed_count,
branch=branch,
build_number=build_number,
summary=summary,
workspace_line_coverage_percent=workspace_line_coverage_percent,
source_lines_over_500=source_lines_over_500,
check_statuses=check_statuses,
)
push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}"
_post_text(push_url, payload)
summary = {
"suite": suite,
"status": status,
"tests_total": tests["tests"],
"tests_failed": tests["failures"],
"tests_error": tests["errors"],
"tests_skipped": tests["skipped"],
"ok_count": ok_count,
"failed_count": failed_count,
"checks_recorded": len(check_statuses),
"workspace_line_coverage_percent": workspace_line_coverage_percent,
"source_lines_over_500": source_lines_over_500,
}
print(json.dumps(summary, sort_keys=True))
return 0
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())