diff --git a/ci/scripts/publish_test_metrics.py b/ci/scripts/publish_test_metrics.py index cdd7d262..702e4414 100644 --- a/ci/scripts/publish_test_metrics.py +++ b/ci/scripts/publish_test_metrics.py @@ -6,10 +6,25 @@ from __future__ import annotations import json import os from glob import glob +from pathlib import Path import urllib.error import urllib.request import xml.etree.ElementTree as ET +SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"} +NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"} +FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"} + +CANONICAL_CHECKS = [ + "tests", + "coverage", + "loc", + "docs_naming", + "gate_glue", + "sonarqube", + "supply_chain", +] + def _escape_label(value: str) -> str: """Escape a Prometheus label value without changing its content.""" @@ -114,6 +129,148 @@ def _summary_int(summary: dict, key: str) -> int: return 0 +def _normalize_result_status(value: str | None, default: str = "failed") -> str: + """Map arbitrary check status text into canonical check result buckets.""" + if not value: + return default + normalized = value.strip().lower() + if normalized in SUCCESS_STATUSES: + return "ok" + if normalized in NOT_APPLICABLE_STATUSES: + return "not_applicable" + if normalized in FAILED_STATUSES: + return "failed" + return default + + +def _load_optional_json(path: str | None) -> dict: + """Load an optional JSON report file, returning an empty object when absent.""" + if not path: + return {} + candidate = Path(path) + if not candidate.exists(): + return {} + try: + return json.loads(candidate.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return {} + + +def _combine_statuses(statuses: list[str]) -> str: + """Roll up many check statuses into one canonical result.""" + if not statuses: + return "not_applicable" + if any(status == "failed" for status in statuses): + return "failed" + if all(status == "not_applicable" for status in statuses): + return "not_applicable" + if all(status in {"ok", "not_applicable"} for status in statuses): + return "ok" + return "failed" + + +def _infer_sonarqube_status(report: dict) -> str: + """Infer canonical SonarQube check status from its JSON report payload.""" + if not report: + return "not_applicable" + status = ( + report.get("projectStatus", {}).get("status") + or report.get("qualityGate", {}).get("status") + or report.get("status") + ) + return _normalize_result_status(str(status) if status is not None else None, default="failed") + + +def _infer_supply_chain_status(report: dict, required: bool) -> str: + """Infer canonical supply-chain status from IronBank/artifact report payload.""" + if not report: + return "failed" if required else "not_applicable" + compliant = report.get("compliant") + if isinstance(compliant, bool): + return "ok" if compliant else "failed" + status = report.get("status") + if status is None: + return "failed" if required else "not_applicable" + normalized = _normalize_result_status(str(status), default="failed") + if normalized == "not_applicable" and required: + return "failed" + return normalized + + +def _build_check_statuses( + summary: dict | None, + tests: dict[str, int], + workspace_line_coverage_percent: float, + source_lines_over_500: int, + sonarqube_report: dict, + supply_chain_report: dict, + supply_chain_required: bool, +) -> dict[str, str]: + """Generate the canonical quality-check status map for dashboarding.""" + raw_results = summary.get("results", []) if isinstance(summary, dict) else [] + status_by_name: dict[str, str] = {} + for result in raw_results: + if not isinstance(result, dict): + continue + check_name = str(result.get("name") or "").strip().lower() + if not check_name: + continue + status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed") + + # tests + tests_status = status_by_name.get("tests") + if not tests_status: + candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"] + candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name] + if candidates: + tests_status = _combine_statuses(candidates) + elif tests["tests"] > 0: + tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed" + else: + tests_status = "not_applicable" + + # coverage + coverage_status = status_by_name.get("coverage") + if not coverage_status: + if workspace_line_coverage_percent > 0: + coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed" + else: + coverage_status = "not_applicable" + + # loc + loc_status = status_by_name.get("loc") + if not loc_status: + loc_status = "ok" if source_lines_over_500 == 0 else "failed" + + # docs + naming + lint hygiene + docs_naming_status = status_by_name.get("docs_naming") + if not docs_naming_status: + candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name] + docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable" + + # gate glue + gate_glue_status = status_by_name.get("gate_glue") + if not gate_glue_status: + candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name] + gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable" + + sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report) + supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status( + supply_chain_report, + required=supply_chain_required, + ) + + return { + "tests": tests_status, + "coverage": coverage_status, + "loc": loc_status, + "docs_naming": docs_naming_status, + "gate_glue": gate_glue_status, + "sonarqube": sonarqube_status, + "supply_chain": supply_chain_status, + } + + def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: """Return the current counter value for a labeled metric if present.""" text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics") @@ -143,6 +300,7 @@ def _build_payload( summary: dict | None = None, workspace_line_coverage_percent: float = 0.0, source_lines_over_500: int = 0, + check_statuses: dict[str, str] | None = None, ) -> str: """Build the Pushgateway payload for the current suite run.""" passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0) @@ -172,16 +330,12 @@ def _build_payload( "# TYPE platform_quality_gate_source_lines_over_500_total gauge", f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', ] - results = summary.get("results", []) if isinstance(summary, dict) else [] - if results: + if check_statuses: lines.append("# TYPE titan_iac_quality_gate_checks_total gauge") - for result in results: - check_name = result.get("name") - check_status = result.get("status") - if not check_name or not check_status: - continue + for check_name in CANONICAL_CHECKS: + check_status = check_statuses.get(check_name, "not_applicable") lines.append( - f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(str(check_name))}",result="{_escape_label(str(check_status))}"}} 1' + f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{_escape_label(check_status)}"}} 1' ) return "\n".join(lines) + "\n" @@ -203,6 +357,18 @@ def main() -> int: summary = _load_summary(summary_path) workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent") source_lines_over_500 = _summary_int(summary, "source_lines_over_500") + sonarqube_report = _load_optional_json(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json")) + supply_chain_report = _load_optional_json(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json")) + supply_chain_required = os.getenv("QUALITY_GATE_IRONBANK_REQUIRED", "0").strip().lower() in {"1", "true", "yes", "on"} + check_statuses = _build_check_statuses( + summary=summary, + tests=tests, + workspace_line_coverage_percent=workspace_line_coverage_percent, + source_lines_over_500=source_lines_over_500, + sonarqube_report=sonarqube_report, + supply_chain_report=supply_chain_report, + supply_chain_required=supply_chain_required, + ) ok_count = int( _fetch_existing_counter( @@ -234,6 +400,7 @@ def main() -> int: summary=summary, workspace_line_coverage_percent=workspace_line_coverage_percent, source_lines_over_500=source_lines_over_500, + check_statuses=check_statuses, ) push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}" _post_text(push_url, payload) @@ -247,7 +414,7 @@ def main() -> int: "tests_skipped": tests["skipped"], "ok_count": ok_count, "failed_count": failed_count, - "checks_recorded": len(summary.get("results", [])) if isinstance(summary, dict) else 0, + "checks_recorded": len(check_statuses), "workspace_line_coverage_percent": workspace_line_coverage_percent, "source_lines_over_500": source_lines_over_500, } diff --git a/testing/tests/test_publish_test_metrics.py b/testing/tests/test_publish_test_metrics.py index f637d245..f317eb46 100644 --- a/testing/tests/test_publish_test_metrics.py +++ b/testing/tests/test_publish_test_metrics.py @@ -94,6 +94,57 @@ def test_summary_extractors_handle_invalid_shapes_and_values(): assert publish_test_metrics._summary_int({"source_lines_over_500": 2.9}, "source_lines_over_500") == 2 +def test_build_check_statuses_maps_legacy_names_and_reports(): + check_statuses = publish_test_metrics._build_check_statuses( + summary={ + "results": [ + {"name": "unit", "status": "ok"}, + {"name": "coverage", "status": "failed"}, + {"name": "hygiene", "status": "ok"}, + {"name": "smell", "status": "ok"}, + {"name": "docs", "status": "ok"}, + {"name": "glue", "status": "failed"}, + ] + }, + tests={"tests": 4, "failures": 0, "errors": 0, "skipped": 0}, + workspace_line_coverage_percent=97.0, + source_lines_over_500=0, + sonarqube_report={"projectStatus": {"status": "OK"}}, + supply_chain_report={"status": "compliant"}, + supply_chain_required=True, + ) + + assert check_statuses == { + "tests": "ok", + "coverage": "failed", + "loc": "ok", + "docs_naming": "ok", + "gate_glue": "failed", + "sonarqube": "ok", + "supply_chain": "ok", + } + + +def test_build_check_statuses_handles_missing_reports(): + check_statuses = publish_test_metrics._build_check_statuses( + summary={"results": []}, + tests={"tests": 0, "failures": 0, "errors": 0, "skipped": 0}, + workspace_line_coverage_percent=0.0, + source_lines_over_500=2, + sonarqube_report={}, + supply_chain_report={}, + supply_chain_required=False, + ) + + assert check_statuses["tests"] == "not_applicable" + assert check_statuses["coverage"] == "not_applicable" + assert check_statuses["loc"] == "failed" + assert check_statuses["docs_naming"] == "not_applicable" + assert check_statuses["gate_glue"] == "not_applicable" + assert check_statuses["sonarqube"] == "not_applicable" + assert check_statuses["supply_chain"] == "not_applicable" + + def test_read_text_post_text_and_fetch_existing_counter(monkeypatch): class _FakeResponse: def __init__(self, payload: str, status: int = 200): @@ -208,7 +259,7 @@ def test_post_text_raises_and_counter_handles_bad_metric_lines(monkeypatch): ) -def test_build_payload_includes_summary_metrics(): +def test_build_payload_includes_canonical_checks(): payload = publish_test_metrics._build_payload( suite="titan-iac", status="ok", @@ -219,20 +270,23 @@ def test_build_payload_includes_summary_metrics(): build_number="42", workspace_line_coverage_percent=95.0, source_lines_over_500=0, - summary={ - "results": [ - {"name": "docs", "status": "ok"}, - {"name": "unit", "status": "failed"}, - ] + check_statuses={ + "tests": "failed", + "coverage": "ok", + "loc": "ok", + "docs_naming": "ok", + "gate_glue": "ok", + "sonarqube": "failed", + "supply_chain": "failed", }, ) assert 'platform_quality_gate_runs_total{suite="titan-iac",status="ok"} 7' in payload - assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs",result="ok"} 1' in payload - assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="unit",result="failed"} 1' in payload + assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs_naming",result="ok"} 1' in payload + assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="tests",result="failed"} 1' in payload -def test_build_payload_skips_incomplete_results(): +def test_build_payload_omits_checks_block_without_check_statuses(): payload = publish_test_metrics._build_payload( suite="titan-iac", status="failed", @@ -243,11 +297,9 @@ def test_build_payload_skips_incomplete_results(): build_number="", workspace_line_coverage_percent=0.0, source_lines_over_500=1, - summary={"results": [{"name": "docs"}, {"status": "ok"}]}, ) - assert "titan_iac_quality_gate_checks_total" in payload - assert 'check="docs"' not in payload + assert "titan_iac_quality_gate_checks_total" not in payload def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypatch): @@ -286,7 +338,7 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat assert rc == 0 assert posted["url"].endswith("/metrics/job/platform-quality-ci/suite/titan-iac") assert 'titan_iac_quality_gate_tests_total{suite="titan-iac",result="failed"} 1' in posted["payload"] - assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="glue",result="failed"} 1' in posted["payload"] + assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="gate_glue",result="failed"} 1' in posted["payload"] def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys): @@ -309,4 +361,4 @@ def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys): summary = json.loads(capsys.readouterr().out) assert rc == 0 assert summary["status"] == "ok" - assert summary["checks_recorded"] == 0 + assert summary["checks_recorded"] == 7