From 9a86c350dd1100f1809889afbdfd3d27f75c4dce Mon Sep 17 00:00:00 2001 From: jenkins Date: Mon, 20 Apr 2026 15:20:56 -0300 Subject: [PATCH] quality(titan-iac): split metrics publisher and harden gate lint --- ci/scripts/publish_test_metrics.py | 121 +++++---- ci/scripts/publish_test_metrics_quality.py | 200 +++++++++++++++ testing/quality_gate.py | 194 +++++++++++++++ testing/tests/test_publish_test_metrics.py | 231 ++++++++++++++---- .../tests/test_publish_test_metrics_paths.py | 134 ++++++++++ 5 files changed, 776 insertions(+), 104 deletions(-) create mode 100644 ci/scripts/publish_test_metrics_quality.py create mode 100644 testing/tests/test_publish_test_metrics_paths.py diff --git a/ci/scripts/publish_test_metrics.py b/ci/scripts/publish_test_metrics.py index 15154937..4dc6ced7 100644 --- a/ci/scripts/publish_test_metrics.py +++ b/ci/scripts/publish_test_metrics.py @@ -10,6 +10,18 @@ import urllib.error import urllib.request import xml.etree.ElementTree as ET +from ci.scripts import publish_test_metrics_quality as _quality_helpers + +CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS +_build_check_statuses = _quality_helpers._build_check_statuses +_combine_statuses = _quality_helpers._combine_statuses +_infer_sonarqube_status = _quality_helpers._infer_sonarqube_status +_infer_source_lines_over_500 = _quality_helpers._infer_source_lines_over_500 +_infer_supply_chain_status = _quality_helpers._infer_supply_chain_status +_infer_workspace_coverage_percent = _quality_helpers._infer_workspace_coverage_percent +_load_optional_json = _quality_helpers._load_optional_json +_normalize_result_status = _quality_helpers._normalize_result_status + def _escape_label(value: str) -> str: """Escape a Prometheus label value without changing its content.""" @@ -78,45 +90,33 @@ def _collect_junit_totals(pattern: str) -> dict[str, int]: return totals -def _load_junit_cases(path: str) -> list[tuple[str, str]]: - """Parse individual JUnit test case outcomes for flakiness panels.""" - if not os.path.exists(path): - return [] - - tree = ET.parse(path) - root = tree.getroot() - suites: list[ET.Element] - if root.tag == "testsuite": - suites = [root] - elif root.tag == "testsuites": - suites = [elem for elem in root if elem.tag == "testsuite"] - else: - suites = [] - - cases: list[tuple[str, str]] = [] - for suite in suites: - for case in suite.findall("testcase"): - name = (case.attrib.get("name") or "").strip() - classname = (case.attrib.get("classname") or "").strip() - if not name: - continue - test_id = f"{classname}::{name}" if classname else name - status = "passed" - if case.find("failure") is not None: - status = "failed" - elif case.find("error") is not None: - status = "error" - elif case.find("skipped") is not None: - status = "skipped" - cases.append((test_id, status)) - return cases - - def _collect_junit_cases(pattern: str) -> list[tuple[str, str]]: - """Collect test-case statuses across all matching JUnit XML files.""" + """Collect individual JUnit test-case statuses for flaky-test trend panels.""" cases: list[tuple[str, str]] = [] for path in sorted(glob(pattern)): - cases.extend(_load_junit_cases(path)) + if not os.path.exists(path): + continue + root = ET.parse(path).getroot() + suites: list[ET.Element] + if root.tag == "testsuite": + suites = [root] + elif root.tag == "testsuites": + suites = [elem for elem in root if elem.tag == "testsuite"] + else: + suites = [] + for suite in suites: + for test_case in suite.findall("testcase"): + case_name = test_case.attrib.get("name", "").strip() + class_name = test_case.attrib.get("classname", "").strip() + if not case_name: + continue + full_name = f"{class_name}.{case_name}" if class_name else case_name + status = "passed" + if test_case.find("failure") is not None or test_case.find("error") is not None: + status = "failed" + elif test_case.find("skipped") is not None: + status = "skipped" + cases.append((full_name, status)) return cases @@ -186,6 +186,7 @@ def _build_payload( summary: dict | None = None, workspace_line_coverage_percent: float = 0.0, source_lines_over_500: int = 0, + check_statuses: dict[str, str] | None = None, ) -> str: """Build the Pushgateway payload for the current suite run.""" passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0) @@ -214,23 +215,24 @@ def _build_payload( f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}', "# TYPE platform_quality_gate_source_lines_over_500_total gauge", f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', - "# TYPE platform_quality_gate_test_case_result gauge", ] - lines.extend( - f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1' - for test_name, test_status in test_cases - ) - results = summary.get("results", []) if isinstance(summary, dict) else [] - if results: + if check_statuses: lines.append("# TYPE titan_iac_quality_gate_checks_total gauge") - for result in results: - check_name = result.get("name") - check_status = result.get("status") - if not check_name or not check_status: - continue + for check_name in CANONICAL_CHECKS: + check_status = check_statuses.get(check_name, "not_applicable") lines.append( - f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(str(check_name))}",result="{_escape_label(str(check_status))}"}} 1' + f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{_escape_label(check_status)}"}} 1' ) + lines.append("# TYPE platform_quality_gate_test_case_result gauge") + if test_cases: + for test_name, test_status in test_cases: + lines.append( + f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1' + ) + else: + lines.append( + f'platform_quality_gate_test_case_result{{suite="{suite}",test="__no_test_cases__",status="skipped"}} 1' + ) return "\n".join(lines) + "\n" @@ -251,7 +253,23 @@ def main() -> int: status = "ok" if exit_code == 0 else "failed" summary = _load_summary(summary_path) workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent") + if workspace_line_coverage_percent <= 0: + workspace_line_coverage_percent = _infer_workspace_coverage_percent(summary, "build/coverage-unit.xml") source_lines_over_500 = _summary_int(summary, "source_lines_over_500") + if source_lines_over_500 <= 0: + source_lines_over_500 = _infer_source_lines_over_500(summary) + sonarqube_report = _load_optional_json(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json")) + supply_chain_report = _load_optional_json(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json")) + supply_chain_required = os.getenv("QUALITY_GATE_IRONBANK_REQUIRED", "0").strip().lower() in {"1", "true", "yes", "on"} + check_statuses = _build_check_statuses( + summary=summary, + tests=tests, + workspace_line_coverage_percent=workspace_line_coverage_percent, + source_lines_over_500=source_lines_over_500, + sonarqube_report=sonarqube_report, + supply_chain_report=supply_chain_report, + supply_chain_required=supply_chain_required, + ) ok_count = int( _fetch_existing_counter( @@ -284,6 +302,7 @@ def main() -> int: summary=summary, workspace_line_coverage_percent=workspace_line_coverage_percent, source_lines_over_500=source_lines_over_500, + check_statuses=check_statuses, ) push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}" _post_text(push_url, payload) @@ -297,7 +316,7 @@ def main() -> int: "tests_skipped": tests["skipped"], "ok_count": ok_count, "failed_count": failed_count, - "checks_recorded": len(summary.get("results", [])) if isinstance(summary, dict) else 0, + "checks_recorded": len(check_statuses), "workspace_line_coverage_percent": workspace_line_coverage_percent, "source_lines_over_500": source_lines_over_500, } @@ -305,5 +324,5 @@ def main() -> int: return 0 -if __name__ == "__main__": +if __name__ == "__main__": # pragma: no cover raise SystemExit(main()) diff --git a/ci/scripts/publish_test_metrics_quality.py b/ci/scripts/publish_test_metrics_quality.py new file mode 100644 index 00000000..09fe64e6 --- /dev/null +++ b/ci/scripts/publish_test_metrics_quality.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +"""Quality/status helpers for publish_test_metrics.""" + +from __future__ import annotations + +import json +from pathlib import Path +import xml.etree.ElementTree as ET + +SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"} +NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"} +FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"} + +CANONICAL_CHECKS = [ + "tests", + "coverage", + "loc", + "docs_naming", + "gate_glue", + "sonarqube", + "supply_chain", +] + + +def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float: + """Infer workspace line coverage from quality summary coverage XML metadata.""" + results = summary.get("results", []) if isinstance(summary, dict) else [] + coverage_xml = default_xml + for result in results: + if not isinstance(result, dict): + continue + if str(result.get("name") or "").strip().lower() != "coverage": + continue + candidate = str(result.get("coverage_xml") or "").strip() + if candidate: + coverage_xml = candidate + break + xml_path = Path(coverage_xml) + if not xml_path.exists(): + return 0.0 + try: + root = ET.parse(xml_path).getroot() + line_rate = root.attrib.get("line-rate") + if line_rate is None: + return 0.0 + return float(line_rate) * 100.0 + except (ET.ParseError, OSError, ValueError): + return 0.0 + + +def _infer_source_lines_over_500(summary: dict) -> int: + """Infer over-limit source file count from hygiene issue payloads.""" + results = summary.get("results", []) if isinstance(summary, dict) else [] + for result in results: + if not isinstance(result, dict): + continue + if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}: + continue + issues = result.get("issues") + if not isinstance(issues, list): + continue + return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds")) + return 0 + + +def _normalize_result_status(value: str | None, default: str = "failed") -> str: + """Map arbitrary check status text into canonical check result buckets.""" + if not value: + return default + normalized = value.strip().lower() + if normalized in SUCCESS_STATUSES: + return "ok" + if normalized in NOT_APPLICABLE_STATUSES: + return "not_applicable" + if normalized in FAILED_STATUSES: + return "failed" + return default + + +def _load_optional_json(path: str | None) -> dict: + """Load an optional JSON report file, returning an empty object when absent.""" + if not path: + return {} + candidate = Path(path) + if not candidate.exists(): + return {} + try: + return json.loads(candidate.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return {} + + +def _combine_statuses(statuses: list[str]) -> str: + """Roll up many check statuses into one canonical result.""" + if not statuses: + return "not_applicable" + if any(status == "failed" for status in statuses): + return "failed" + if all(status == "not_applicable" for status in statuses): + return "not_applicable" + if all(status in {"ok", "not_applicable"} for status in statuses): + return "ok" + return "failed" + + +def _infer_sonarqube_status(report: dict) -> str: + """Infer canonical SonarQube check status from its JSON report payload.""" + if not report: + return "not_applicable" + status = ( + report.get("projectStatus", {}).get("status") + or report.get("qualityGate", {}).get("status") + or report.get("status") + ) + return _normalize_result_status(str(status) if status is not None else None, default="failed") + + +def _infer_supply_chain_status(report: dict, required: bool) -> str: + """Infer canonical supply-chain status from IronBank/artifact report payload.""" + if not report: + return "failed" if required else "not_applicable" + compliant = report.get("compliant") + if isinstance(compliant, bool): + return "ok" if compliant else "failed" + status = report.get("status") + if status is None: + return "failed" if required else "not_applicable" + normalized = _normalize_result_status(str(status), default="failed") + if normalized == "not_applicable" and required: + return "failed" + return normalized + + +def _build_check_statuses( + summary: dict | None, + tests: dict[str, int], + workspace_line_coverage_percent: float, + source_lines_over_500: int, + sonarqube_report: dict, + supply_chain_report: dict, + supply_chain_required: bool, +) -> dict[str, str]: + """Generate the canonical quality-check status map for dashboarding.""" + raw_results = summary.get("results", []) if isinstance(summary, dict) else [] + status_by_name: dict[str, str] = {} + for result in raw_results: + if not isinstance(result, dict): + continue + check_name = str(result.get("name") or "").strip().lower() + if not check_name: + continue + status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed") + + tests_status = status_by_name.get("tests") + if not tests_status: + candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"] + candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name] + if candidates: + tests_status = _combine_statuses(candidates) + elif tests["tests"] > 0: + tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed" + else: + tests_status = "not_applicable" + + coverage_status = status_by_name.get("coverage") + if not coverage_status: + if workspace_line_coverage_percent > 0: + coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed" + else: + coverage_status = "not_applicable" + + loc_status = status_by_name.get("loc") + if not loc_status: + loc_status = "ok" if source_lines_over_500 == 0 else "failed" + + docs_naming_status = status_by_name.get("docs_naming") + if not docs_naming_status: + candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name] + docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable" + + gate_glue_status = status_by_name.get("gate_glue") + if not gate_glue_status: + candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name] + gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable" + + sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report) + supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status( + supply_chain_report, + required=supply_chain_required, + ) + + return { + "tests": tests_status, + "coverage": coverage_status, + "loc": loc_status, + "docs_naming": docs_naming_status, + "gate_glue": gate_glue_status, + "sonarqube": sonarqube_status, + "supply_chain": supply_chain_status, + } diff --git a/testing/quality_gate.py b/testing/quality_gate.py index 1c864381..320d1466 100644 --- a/testing/quality_gate.py +++ b/testing/quality_gate.py @@ -3,10 +3,15 @@ from __future__ import annotations import argparse +import base64 import json +import os import subprocess import sys import time +import urllib.error +import urllib.parse +import urllib.request from pathlib import Path from typing import Any @@ -26,6 +31,189 @@ RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"] RUFF_IGNORE = ["B017", "UP015", "UP035"] +def _env_flag(name: str, default: bool) -> bool: + """Parse a boolean-like environment variable.""" + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +def _load_json_report(path: Path) -> tuple[dict[str, Any] | None, str | None]: + """Return parsed JSON report contents or a descriptive error.""" + if not path.exists(): + return None, f"report missing: {path}" + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + return None, f"report invalid JSON: {path} ({exc})" + if not isinstance(payload, dict): + return None, f"report payload must be an object: {path}" + return payload, None + + +def _sonarqube_gate_status_from_report(payload: dict[str, Any]) -> str: + """Extract a SonarQube quality-gate status from a report payload.""" + project_status = payload.get("projectStatus") + if isinstance(project_status, dict): + status = project_status.get("status") + if isinstance(status, str): + return status + status = payload.get("status") + if isinstance(status, str): + return status + return "" + + +def _fetch_sonarqube_gate_status( + host_url: str, + project_key: str, + token: str, + timeout_seconds: float, +) -> tuple[str, str | None]: + """Query SonarQube for the project's current quality-gate status.""" + query = urllib.parse.urlencode({"projectKey": project_key}) + request = urllib.request.Request( + f"{host_url.rstrip('/')}/api/qualitygates/project_status?{query}", + method="GET", + ) + if token: + encoded = base64.b64encode(f"{token}:".encode()).decode() + request.add_header("Authorization", f"Basic {encoded}") + try: + with urllib.request.urlopen(request, timeout=timeout_seconds) as response: + payload = json.loads(response.read().decode("utf-8")) + except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: + return "", f"sonarqube query failed: {exc}" + if not isinstance(payload, dict): + return "", "sonarqube query returned non-object payload" + status = _sonarqube_gate_status_from_report(payload) + if status: + return status, None + return "", "sonarqube response missing projectStatus.status" + + +def _run_sonarqube_check(build_dir: Path) -> dict[str, Any]: + """Enforce SonarQube quality gate using report or API evidence.""" + enforce = _env_flag("QUALITY_GATE_SONARQUBE_ENFORCE", default=True) + report_rel = os.getenv( + "QUALITY_GATE_SONARQUBE_REPORT", + str(build_dir / "sonarqube-quality-gate.json"), + ) + report_path = Path(report_rel) + if not report_path.is_absolute(): + report_path = Path.cwd() / report_path + host_url = os.getenv("SONARQUBE_HOST_URL", "").strip() + project_key = os.getenv("SONARQUBE_PROJECT_KEY", "").strip() + token = os.getenv("SONARQUBE_TOKEN", "").strip() + timeout_seconds = float(os.getenv("QUALITY_GATE_SONARQUBE_TIMEOUT_SECONDS", "12")) + + gate_status = "" + source = "" + issues: list[str] = [] + + report_payload, report_error = _load_json_report(report_path) + if report_payload is not None: + gate_status = _sonarqube_gate_status_from_report(report_payload).strip() + source = "report" + if not gate_status: + issues.append("sonarqube report missing quality gate status") + elif report_error: + if host_url and project_key: + gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds) + source = "api" + if query_error: + issues.append(query_error) + else: + issues.append(report_error) + + if not source and host_url and project_key: + gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds) + source = "api" + if query_error: + issues.append(query_error) + + normalized = gate_status.upper() + passed = normalized in {"OK", "PASS", "PASSED"} + if enforce and not passed: + if gate_status: + issues.append(f"sonarqube gate is {gate_status}, expected OK") + else: + issues.append("sonarqube gate status unavailable") + + status = "ok" if (passed or not enforce) and not issues else "failed" + return _result( + "sonarqube", + "SonarQube quality gate must pass for the current project.", + status, + enforce=enforce, + source=source or "none", + gate_status=gate_status or "unknown", + report_path=str(report_path), + issues=issues, + ) + + +def _ironbank_status_from_report(payload: dict[str, Any]) -> tuple[str, bool | None]: + """Extract a compliance status and explicit compliance flag from report payload.""" + for key in ("status", "result", "compliance", "compliance_status"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + return value.strip(), None + compliant = payload.get("compliant") + if isinstance(compliant, bool): + return "compliant" if compliant else "noncompliant", compliant + return "", None + + +def _run_ironbank_check(build_dir: Path) -> dict[str, Any]: + """Enforce Iron Bank image-hardening compliance from build evidence.""" + enforce = _env_flag("QUALITY_GATE_IRONBANK_ENFORCE", default=True) + required = _env_flag("QUALITY_GATE_IRONBANK_REQUIRED", default=True) + report_rel = os.getenv( + "QUALITY_GATE_IRONBANK_REPORT", + str(build_dir / "ironbank-compliance.json"), + ) + report_path = Path(report_rel) + if not report_path.is_absolute(): + report_path = Path.cwd() / report_path + + issues: list[str] = [] + status_value = "" + compliant: bool | None = None + source = "none" + + report_payload, report_error = _load_json_report(report_path) + if report_payload is not None: + status_value, compliant = _ironbank_status_from_report(report_payload) + source = "report" + elif required: + issues.append(report_error or f"report missing: {report_path}") + + normalized = status_value.strip().lower() + passed_status = normalized in {"ok", "pass", "passed", "compliant", "true"} + passed = compliant is True or passed_status + + if enforce and required and not passed: + if status_value: + issues.append(f"ironbank compliance is {status_value}, expected compliant") + elif not issues: + issues.append("ironbank compliance status unavailable") + + status = "ok" if (passed or not enforce or not required) and not issues else "failed" + return _result( + "ironbank", + "Iron Bank image-hardening compliance must pass for build artifacts.", + status, + enforce=enforce, + required=required, + source=source, + compliance=status_value or "unknown", + report_path=str(report_path), + issues=issues, + ) + + def _status_from_issues(issues: list[str]) -> str: """Map an issue list to the gate status string.""" return "ok" if not issues else "failed" @@ -150,6 +338,12 @@ def run_profile( ) ) continue + if check_name == "sonarqube": + results.append(_run_sonarqube_check(build_dir)) + continue + if check_name == "ironbank": + results.append(_run_ironbank_check(build_dir)) + continue suite = contract.get("pytest_suites", {}).get(check_name) if suite is None: raise SystemExit(f"profile {profile_name} references unknown check: {check_name}") diff --git a/testing/tests/test_publish_test_metrics.py b/testing/tests/test_publish_test_metrics.py index c0c29677..b8b170a5 100644 --- a/testing/tests/test_publish_test_metrics.py +++ b/testing/tests/test_publish_test_metrics.py @@ -1,4 +1,4 @@ -"""Unit tests for the Pushgateway publisher glue code.""" +"""Unit tests for core test-metrics parsing and quality signal helpers.""" from __future__ import annotations @@ -40,27 +40,6 @@ def test_collect_junit_totals_sums_multiple_files(tmp_path: Path): assert totals == {"tests": 5, "failures": 1, "errors": 1, "skipped": 1} -def test_collect_junit_cases_tracks_individual_statuses(tmp_path: Path): - junit = tmp_path / "junit.xml" - junit.write_text( - ( - "" - '' - '' - '' - '' - "" - ), - encoding="utf-8", - ) - - cases = publish_test_metrics._collect_junit_cases(str(tmp_path / "junit*.xml")) - assert ("pkg.mod::test_ok", "passed") in cases - assert ("pkg.mod::test_fail", "failed") in cases - assert ("pkg.mod::test_error", "error") in cases - assert ("pkg.mod::test_skip", "skipped") in cases - - def test_parse_junit_handles_testsuites_and_invalid_counts(tmp_path: Path): junit_path = tmp_path / "suite.xml" junit_path.write_text( @@ -81,6 +60,20 @@ def test_parse_junit_handles_testsuites_and_invalid_counts(tmp_path: Path): } +def test_parse_junit_handles_unknown_root(tmp_path: Path): + junit_path = tmp_path / "suite.xml" + junit_path.write_text("", encoding="utf-8") + + assert publish_test_metrics._parse_junit(str(junit_path)) == { + "tests": 0, + "failures": 0, + "errors": 0, + "skipped": 0, + } + + + + def test_read_exit_code_and_summary_fallbacks(tmp_path: Path): rc_path = tmp_path / "rc.txt" rc_path.write_text("0\n", encoding="utf-8") @@ -93,6 +86,118 @@ def test_read_exit_code_and_summary_fallbacks(tmp_path: Path): assert publish_test_metrics._load_summary(str(tmp_path / "missing.json")) == {} +def test_summary_extractors_handle_invalid_shapes_and_values(): + assert publish_test_metrics._summary_float({}, "workspace_line_coverage_percent") == 0.0 + assert publish_test_metrics._summary_float({"workspace_line_coverage_percent": "bad"}, "workspace_line_coverage_percent") == 0.0 + assert publish_test_metrics._summary_float({"workspace_line_coverage_percent": 95}, "workspace_line_coverage_percent") == 95.0 + assert publish_test_metrics._summary_float({"workspace_line_coverage_percent": 97.5}, "workspace_line_coverage_percent") == 97.5 + + assert publish_test_metrics._summary_int({}, "source_lines_over_500") == 0 + assert publish_test_metrics._summary_int({"source_lines_over_500": "bad"}, "source_lines_over_500") == 0 + assert publish_test_metrics._summary_int({"source_lines_over_500": 2}, "source_lines_over_500") == 2 + assert publish_test_metrics._summary_int({"source_lines_over_500": 2.9}, "source_lines_over_500") == 2 + + +def test_build_check_statuses_maps_legacy_names_and_reports(): + check_statuses = publish_test_metrics._build_check_statuses( + summary={ + "results": [ + {"name": "unit", "status": "ok"}, + {"name": "coverage", "status": "failed"}, + {"name": "hygiene", "status": "ok"}, + {"name": "smell", "status": "ok"}, + {"name": "docs", "status": "ok"}, + {"name": "glue", "status": "failed"}, + ] + }, + tests={"tests": 4, "failures": 0, "errors": 0, "skipped": 0}, + workspace_line_coverage_percent=97.0, + source_lines_over_500=0, + sonarqube_report={"projectStatus": {"status": "OK"}}, + supply_chain_report={"status": "compliant"}, + supply_chain_required=True, + ) + + assert check_statuses == { + "tests": "ok", + "coverage": "failed", + "loc": "ok", + "docs_naming": "ok", + "gate_glue": "failed", + "sonarqube": "ok", + "supply_chain": "ok", + } + + +def test_build_check_statuses_handles_missing_reports(): + check_statuses = publish_test_metrics._build_check_statuses( + summary={"results": []}, + tests={"tests": 0, "failures": 0, "errors": 0, "skipped": 0}, + workspace_line_coverage_percent=0.0, + source_lines_over_500=2, + sonarqube_report={}, + supply_chain_report={}, + supply_chain_required=False, + ) + + assert check_statuses["tests"] == "not_applicable" + assert check_statuses["coverage"] == "not_applicable" + assert check_statuses["loc"] == "failed" + assert check_statuses["docs_naming"] == "not_applicable" + assert check_statuses["gate_glue"] == "not_applicable" + assert check_statuses["sonarqube"] == "not_applicable" + assert check_statuses["supply_chain"] == "not_applicable" + + +def test_status_normalization_and_optional_reports(tmp_path: Path): + report_path = tmp_path / "report.json" + report_path.write_text("{bad json", encoding="utf-8") + + assert publish_test_metrics._normalize_result_status(None, default="failed") == "failed" + assert publish_test_metrics._normalize_result_status("n/a") == "not_applicable" + assert publish_test_metrics._normalize_result_status("unexpected", default="ok") == "ok" + + assert publish_test_metrics._load_optional_json(None) == {} + assert publish_test_metrics._load_optional_json(str(tmp_path / "missing.json")) == {} + assert publish_test_metrics._load_optional_json(str(report_path)) == {} + + assert publish_test_metrics._combine_statuses([]) == "not_applicable" + assert publish_test_metrics._combine_statuses(["not_applicable", "not_applicable"]) == "not_applicable" + assert publish_test_metrics._combine_statuses(["unknown"]) == "failed" + + assert publish_test_metrics._infer_supply_chain_status({"compliant": False}, required=True) == "failed" + assert publish_test_metrics._infer_supply_chain_status({"status": None}, required=False) == "not_applicable" + assert publish_test_metrics._infer_supply_chain_status({"status": "not_applicable"}, required=True) == "failed" + + +def test_build_check_statuses_handles_non_dict_results_and_fallbacks(): + check_statuses = publish_test_metrics._build_check_statuses( + summary={ + "results": [ + None, + {"name": "", "status": "ok"}, + {"name": "unit", "status": "warning"}, + {"name": "hygiene", "status": "ok"}, + {"name": "gate", "status": "ok"}, + ] + }, + tests={"tests": 3, "failures": 1, "errors": 0, "skipped": 0}, + workspace_line_coverage_percent=94.0, + source_lines_over_500=0, + sonarqube_report={"status": "ERROR"}, + supply_chain_report={"status": "not_applicable"}, + supply_chain_required=True, + ) + + assert check_statuses["tests"] == "failed" + assert check_statuses["coverage"] == "failed" + assert check_statuses["loc"] == "ok" + assert check_statuses["docs_naming"] == "ok" + assert check_statuses["gate_glue"] == "ok" + assert check_statuses["sonarqube"] == "failed" + assert check_statuses["supply_chain"] == "failed" + + def test_read_text_post_text_and_fetch_existing_counter(monkeypatch): class _FakeResponse: def __init__(self, payload: str, status: int = 200): @@ -186,36 +291,64 @@ def test_post_text_raises_and_counter_handles_bad_metric_lines(monkeypatch): == 0.0 ) + monkeypatch.setattr( + publish_test_metrics, + "_read_text", + lambda url: "\n".join( + [ + 'different_metric{job="platform-quality-ci",suite="titan-iac",status="ok"} 8', + 'platform_quality_gate_runs_total{job="platform-quality-ci",suite="other",status="ok"} 8', + 'platform_quality_gate_runs_total{job="platform-quality-ci",suite="titan-iac",status="ok"} 9', + ] + ), + ) + assert ( + publish_test_metrics._fetch_existing_counter( + "http://push.invalid", + "platform_quality_gate_runs_total", + {"job": "platform-quality-ci", "suite": "titan-iac", "status": "ok"}, + ) + == 9.0 + ) + assert ( + publish_test_metrics._fetch_existing_counter( + "http://push.invalid", + "platform_quality_gate_runs_total", + {"job": "platform-quality-ci", "suite": "missing-suite", "status": "ok"}, + ) + == 0.0 + ) -def test_build_payload_includes_summary_metrics(): + +def test_build_payload_includes_canonical_checks(): payload = publish_test_metrics._build_payload( suite="titan-iac", status="ok", tests={"tests": 4, "failures": 1, "errors": 0, "skipped": 1}, - test_cases=[("pkg.mod::test_ok", "passed"), ("pkg.mod::test_fail", "failed")], + test_cases=[], ok_count=7, failed_count=2, branch="main", build_number="42", - summary={ - "results": [ - {"name": "docs", "status": "ok"}, - {"name": "unit", "status": "failed"}, - ] + workspace_line_coverage_percent=95.0, + source_lines_over_500=0, + check_statuses={ + "tests": "failed", + "coverage": "ok", + "loc": "ok", + "docs_naming": "ok", + "gate_glue": "ok", + "sonarqube": "failed", + "supply_chain": "failed", }, - workspace_line_coverage_percent=97.125, - source_lines_over_500=3, ) assert 'platform_quality_gate_runs_total{suite="titan-iac",status="ok"} 7' in payload - assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs",result="ok"} 1' in payload - assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="unit",result="failed"} 1' in payload - assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan-iac"} 97.125' in payload - assert 'platform_quality_gate_source_lines_over_500_total{suite="titan-iac"} 3' in payload - assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="pkg.mod::test_fail",status="failed"} 1' in payload + assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs_naming",result="ok"} 1' in payload + assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="tests",result="failed"} 1' in payload -def test_build_payload_skips_incomplete_results(): +def test_build_payload_omits_checks_block_without_check_statuses(): payload = publish_test_metrics._build_payload( suite="titan-iac", status="failed", @@ -225,11 +358,13 @@ def test_build_payload_skips_incomplete_results(): failed_count=2, branch="", build_number="", - summary={"results": [{"name": "docs"}, {"status": "ok"}]}, + workspace_line_coverage_percent=0.0, + source_lines_over_500=1, ) - assert "titan_iac_quality_gate_checks_total" in payload - assert 'check="docs"' not in payload + assert "titan_iac_quality_gate_checks_total" not in payload + + def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypatch): @@ -245,13 +380,7 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat ) (build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8") (build_dir / "quality-gate-summary.json").write_text( - json.dumps( - { - "results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}], - "workspace_line_coverage_percent": 96.4321, - "source_lines_over_500": 2, - } - ), + json.dumps({"results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}]}), encoding="utf-8", ) @@ -274,9 +403,7 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat assert rc == 0 assert posted["url"].endswith("/metrics/job/platform-quality-ci/suite/titan-iac") assert 'titan_iac_quality_gate_tests_total{suite="titan-iac",result="failed"} 1' in posted["payload"] - assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="glue",result="failed"} 1' in posted["payload"] - assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan-iac"} 96.432' in posted["payload"] - assert 'platform_quality_gate_source_lines_over_500_total{suite="titan-iac"} 2' in posted["payload"] + assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="gate_glue",result="failed"} 1' in posted["payload"] def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys): @@ -299,6 +426,4 @@ def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys): summary = json.loads(capsys.readouterr().out) assert rc == 0 assert summary["status"] == "ok" - assert summary["checks_recorded"] == 0 - assert summary["workspace_line_coverage_percent"] == 0.0 - assert summary["source_lines_over_500"] == 0 + assert summary["checks_recorded"] == 7 diff --git a/testing/tests/test_publish_test_metrics_paths.py b/testing/tests/test_publish_test_metrics_paths.py new file mode 100644 index 00000000..39acd796 --- /dev/null +++ b/testing/tests/test_publish_test_metrics_paths.py @@ -0,0 +1,134 @@ +"""Unit tests for test-case path parsing and fallback metric labeling behavior.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from ci.scripts import publish_test_metrics + + +def test_collect_junit_cases_handles_missing_files_and_multiple_root_shapes(tmp_path: Path, monkeypatch): + missing = tmp_path / "missing.xml" + testsuites_path = tmp_path / "suite-cases.xml" + testsuites_path.write_text( + ( + "" + '' + '' + '' + '' + "" + "" + ), + encoding="utf-8", + ) + unknown_root_path = tmp_path / "unknown-root.xml" + unknown_root_path.write_text("", encoding="utf-8") + + monkeypatch.setattr( + publish_test_metrics, + "glob", + lambda _pattern: [str(missing), str(testsuites_path), str(unknown_root_path)], + ) + + cases = publish_test_metrics._collect_junit_cases("ignored-glob") + + assert ("alpha.ok_case", "passed") in cases + assert ("alpha.skip_case", "skipped") in cases + assert not any(name.endswith("ignored") for name, _ in cases) + + +def test_build_payload_includes_explicit_test_case_series(): + payload = publish_test_metrics._build_payload( + suite="titan-iac", + status="ok", + tests={"tests": 2, "failures": 1, "errors": 0, "skipped": 0}, + test_cases=[("alpha::case_one", "failed"), ("beta::case_two", "passed")], + ok_count=3, + failed_count=1, + branch="main", + build_number="5", + workspace_line_coverage_percent=95.0, + source_lines_over_500=0, + check_statuses={"tests": "failed"}, + ) + + assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="alpha::case_one",status="failed"} 1' in payload + assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="beta::case_two",status="passed"} 1' in payload + + +def test_main_uses_reported_coverage_and_loc_without_fallback(tmp_path: Path, monkeypatch, capsys): + build_dir = tmp_path / "build" + build_dir.mkdir() + (build_dir / "junit.xml").write_text( + '', + encoding="utf-8", + ) + (build_dir / "quality-gate.rc").write_text("0\n", encoding="utf-8") + (build_dir / "quality-gate-summary.json").write_text( + json.dumps( + { + "workspace_line_coverage_percent": 99.5, + "source_lines_over_500": 2, + "results": [], + } + ), + encoding="utf-8", + ) + + monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml")) + monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc")) + monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json")) + monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0) + monkeypatch.setattr(publish_test_metrics, "_post_text", lambda *args, **kwargs: None) + monkeypatch.setattr( + publish_test_metrics, + "_infer_workspace_coverage_percent", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer coverage")), + ) + monkeypatch.setattr( + publish_test_metrics, + "_infer_source_lines_over_500", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer loc")), + ) + + rc = publish_test_metrics.main() + + summary = json.loads(capsys.readouterr().out) + assert rc == 0 + assert summary["workspace_line_coverage_percent"] == 99.5 + assert summary["source_lines_over_500"] == 2 + + +def test_main_falls_back_to_inferred_coverage_and_loc(tmp_path: Path, monkeypatch, capsys): + build_dir = tmp_path / "build" + build_dir.mkdir() + (build_dir / "junit.xml").write_text( + '', + encoding="utf-8", + ) + (build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8") + (build_dir / "quality-gate-summary.json").write_text( + json.dumps({"workspace_line_coverage_percent": 0.0, "source_lines_over_500": 0, "results": []}), + encoding="utf-8", + ) + + posted = {} + monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml")) + monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc")) + monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json")) + monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0) + monkeypatch.setattr(publish_test_metrics, "_post_text", lambda url, payload: posted.update({"url": url, "payload": payload})) + monkeypatch.setattr(publish_test_metrics, "_infer_workspace_coverage_percent", lambda *args, **kwargs: 96.4) + monkeypatch.setattr(publish_test_metrics, "_infer_source_lines_over_500", lambda *args, **kwargs: 7) + + rc = publish_test_metrics.main() + + summary = json.loads(capsys.readouterr().out) + assert rc == 0 + assert summary["status"] == "failed" + assert summary["workspace_line_coverage_percent"] == 96.4 + assert summary["source_lines_over_500"] == 7 + assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan_iac"} 96.400' in posted["payload"] + assert 'platform_quality_gate_source_lines_over_500_total{suite="titan_iac"} 7' in posted["payload"]