ci(metrics): publish canonical titan-iac gate checks

This commit is contained in:
Brad Stein 2026-04-19 16:28:55 -03:00
parent aede5aa899
commit 29138b8a51
2 changed files with 242 additions and 23 deletions

View File

@ -6,10 +6,25 @@ from __future__ import annotations
import json
import os
from glob import glob
from pathlib import Path
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
CANONICAL_CHECKS = [
"tests",
"coverage",
"loc",
"docs_naming",
"gate_glue",
"sonarqube",
"supply_chain",
]
def _escape_label(value: str) -> str:
"""Escape a Prometheus label value without changing its content."""
@ -114,6 +129,148 @@ def _summary_int(summary: dict, key: str) -> int:
return 0
def _normalize_result_status(value: str | None, default: str = "failed") -> str:
"""Map arbitrary check status text into canonical check result buckets."""
if not value:
return default
normalized = value.strip().lower()
if normalized in SUCCESS_STATUSES:
return "ok"
if normalized in NOT_APPLICABLE_STATUSES:
return "not_applicable"
if normalized in FAILED_STATUSES:
return "failed"
return default
def _load_optional_json(path: str | None) -> dict:
"""Load an optional JSON report file, returning an empty object when absent."""
if not path:
return {}
candidate = Path(path)
if not candidate.exists():
return {}
try:
return json.loads(candidate.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def _combine_statuses(statuses: list[str]) -> str:
"""Roll up many check statuses into one canonical result."""
if not statuses:
return "not_applicable"
if any(status == "failed" for status in statuses):
return "failed"
if all(status == "not_applicable" for status in statuses):
return "not_applicable"
if all(status in {"ok", "not_applicable"} for status in statuses):
return "ok"
return "failed"
def _infer_sonarqube_status(report: dict) -> str:
"""Infer canonical SonarQube check status from its JSON report payload."""
if not report:
return "not_applicable"
status = (
report.get("projectStatus", {}).get("status")
or report.get("qualityGate", {}).get("status")
or report.get("status")
)
return _normalize_result_status(str(status) if status is not None else None, default="failed")
def _infer_supply_chain_status(report: dict, required: bool) -> str:
"""Infer canonical supply-chain status from IronBank/artifact report payload."""
if not report:
return "failed" if required else "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status = report.get("status")
if status is None:
return "failed" if required else "not_applicable"
normalized = _normalize_result_status(str(status), default="failed")
if normalized == "not_applicable" and required:
return "failed"
return normalized
def _build_check_statuses(
summary: dict | None,
tests: dict[str, int],
workspace_line_coverage_percent: float,
source_lines_over_500: int,
sonarqube_report: dict,
supply_chain_report: dict,
supply_chain_required: bool,
) -> dict[str, str]:
"""Generate the canonical quality-check status map for dashboarding."""
raw_results = summary.get("results", []) if isinstance(summary, dict) else []
status_by_name: dict[str, str] = {}
for result in raw_results:
if not isinstance(result, dict):
continue
check_name = str(result.get("name") or "").strip().lower()
if not check_name:
continue
status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
# tests
tests_status = status_by_name.get("tests")
if not tests_status:
candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
if candidates:
tests_status = _combine_statuses(candidates)
elif tests["tests"] > 0:
tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
else:
tests_status = "not_applicable"
# coverage
coverage_status = status_by_name.get("coverage")
if not coverage_status:
if workspace_line_coverage_percent > 0:
coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
else:
coverage_status = "not_applicable"
# loc
loc_status = status_by_name.get("loc")
if not loc_status:
loc_status = "ok" if source_lines_over_500 == 0 else "failed"
# docs + naming + lint hygiene
docs_naming_status = status_by_name.get("docs_naming")
if not docs_naming_status:
candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
# gate glue
gate_glue_status = status_by_name.get("gate_glue")
if not gate_glue_status:
candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
supply_chain_report,
required=supply_chain_required,
)
return {
"tests": tests_status,
"coverage": coverage_status,
"loc": loc_status,
"docs_naming": docs_naming_status,
"gate_glue": gate_glue_status,
"sonarqube": sonarqube_status,
"supply_chain": supply_chain_status,
}
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
"""Return the current counter value for a labeled metric if present."""
text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics")
@ -143,6 +300,7 @@ def _build_payload(
summary: dict | None = None,
workspace_line_coverage_percent: float = 0.0,
source_lines_over_500: int = 0,
check_statuses: dict[str, str] | None = None,
) -> str:
"""Build the Pushgateway payload for the current suite run."""
passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0)
@ -172,16 +330,12 @@ def _build_payload(
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
]
results = summary.get("results", []) if isinstance(summary, dict) else []
if results:
if check_statuses:
lines.append("# TYPE titan_iac_quality_gate_checks_total gauge")
for result in results:
check_name = result.get("name")
check_status = result.get("status")
if not check_name or not check_status:
continue
for check_name in CANONICAL_CHECKS:
check_status = check_statuses.get(check_name, "not_applicable")
lines.append(
f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(str(check_name))}",result="{_escape_label(str(check_status))}"}} 1'
f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{_escape_label(check_status)}"}} 1'
)
return "\n".join(lines) + "\n"
@ -203,6 +357,18 @@ def main() -> int:
summary = _load_summary(summary_path)
workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent")
source_lines_over_500 = _summary_int(summary, "source_lines_over_500")
sonarqube_report = _load_optional_json(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json"))
supply_chain_report = _load_optional_json(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json"))
supply_chain_required = os.getenv("QUALITY_GATE_IRONBANK_REQUIRED", "0").strip().lower() in {"1", "true", "yes", "on"}
check_statuses = _build_check_statuses(
summary=summary,
tests=tests,
workspace_line_coverage_percent=workspace_line_coverage_percent,
source_lines_over_500=source_lines_over_500,
sonarqube_report=sonarqube_report,
supply_chain_report=supply_chain_report,
supply_chain_required=supply_chain_required,
)
ok_count = int(
_fetch_existing_counter(
@ -234,6 +400,7 @@ def main() -> int:
summary=summary,
workspace_line_coverage_percent=workspace_line_coverage_percent,
source_lines_over_500=source_lines_over_500,
check_statuses=check_statuses,
)
push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}"
_post_text(push_url, payload)
@ -247,7 +414,7 @@ def main() -> int:
"tests_skipped": tests["skipped"],
"ok_count": ok_count,
"failed_count": failed_count,
"checks_recorded": len(summary.get("results", [])) if isinstance(summary, dict) else 0,
"checks_recorded": len(check_statuses),
"workspace_line_coverage_percent": workspace_line_coverage_percent,
"source_lines_over_500": source_lines_over_500,
}

View File

@ -94,6 +94,57 @@ def test_summary_extractors_handle_invalid_shapes_and_values():
assert publish_test_metrics._summary_int({"source_lines_over_500": 2.9}, "source_lines_over_500") == 2
def test_build_check_statuses_maps_legacy_names_and_reports():
check_statuses = publish_test_metrics._build_check_statuses(
summary={
"results": [
{"name": "unit", "status": "ok"},
{"name": "coverage", "status": "failed"},
{"name": "hygiene", "status": "ok"},
{"name": "smell", "status": "ok"},
{"name": "docs", "status": "ok"},
{"name": "glue", "status": "failed"},
]
},
tests={"tests": 4, "failures": 0, "errors": 0, "skipped": 0},
workspace_line_coverage_percent=97.0,
source_lines_over_500=0,
sonarqube_report={"projectStatus": {"status": "OK"}},
supply_chain_report={"status": "compliant"},
supply_chain_required=True,
)
assert check_statuses == {
"tests": "ok",
"coverage": "failed",
"loc": "ok",
"docs_naming": "ok",
"gate_glue": "failed",
"sonarqube": "ok",
"supply_chain": "ok",
}
def test_build_check_statuses_handles_missing_reports():
check_statuses = publish_test_metrics._build_check_statuses(
summary={"results": []},
tests={"tests": 0, "failures": 0, "errors": 0, "skipped": 0},
workspace_line_coverage_percent=0.0,
source_lines_over_500=2,
sonarqube_report={},
supply_chain_report={},
supply_chain_required=False,
)
assert check_statuses["tests"] == "not_applicable"
assert check_statuses["coverage"] == "not_applicable"
assert check_statuses["loc"] == "failed"
assert check_statuses["docs_naming"] == "not_applicable"
assert check_statuses["gate_glue"] == "not_applicable"
assert check_statuses["sonarqube"] == "not_applicable"
assert check_statuses["supply_chain"] == "not_applicable"
def test_read_text_post_text_and_fetch_existing_counter(monkeypatch):
class _FakeResponse:
def __init__(self, payload: str, status: int = 200):
@ -208,7 +259,7 @@ def test_post_text_raises_and_counter_handles_bad_metric_lines(monkeypatch):
)
def test_build_payload_includes_summary_metrics():
def test_build_payload_includes_canonical_checks():
payload = publish_test_metrics._build_payload(
suite="titan-iac",
status="ok",
@ -219,20 +270,23 @@ def test_build_payload_includes_summary_metrics():
build_number="42",
workspace_line_coverage_percent=95.0,
source_lines_over_500=0,
summary={
"results": [
{"name": "docs", "status": "ok"},
{"name": "unit", "status": "failed"},
]
check_statuses={
"tests": "failed",
"coverage": "ok",
"loc": "ok",
"docs_naming": "ok",
"gate_glue": "ok",
"sonarqube": "failed",
"supply_chain": "failed",
},
)
assert 'platform_quality_gate_runs_total{suite="titan-iac",status="ok"} 7' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs",result="ok"} 1' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="unit",result="failed"} 1' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs_naming",result="ok"} 1' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="tests",result="failed"} 1' in payload
def test_build_payload_skips_incomplete_results():
def test_build_payload_omits_checks_block_without_check_statuses():
payload = publish_test_metrics._build_payload(
suite="titan-iac",
status="failed",
@ -243,11 +297,9 @@ def test_build_payload_skips_incomplete_results():
build_number="",
workspace_line_coverage_percent=0.0,
source_lines_over_500=1,
summary={"results": [{"name": "docs"}, {"status": "ok"}]},
)
assert "titan_iac_quality_gate_checks_total" in payload
assert 'check="docs"' not in payload
assert "titan_iac_quality_gate_checks_total" not in payload
def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypatch):
@ -286,7 +338,7 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat
assert rc == 0
assert posted["url"].endswith("/metrics/job/platform-quality-ci/suite/titan-iac")
assert 'titan_iac_quality_gate_tests_total{suite="titan-iac",result="failed"} 1' in posted["payload"]
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="glue",result="failed"} 1' in posted["payload"]
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="gate_glue",result="failed"} 1' in posted["payload"]
def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys):
@ -309,4 +361,4 @@ def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys):
summary = json.loads(capsys.readouterr().out)
assert rc == 0
assert summary["status"] == "ok"
assert summary["checks_recorded"] == 0
assert summary["checks_recorded"] == 7