From 655c26c5896e22ea76ebe5e3bd94cf13fa7ce49e Mon Sep 17 00:00:00 2001 From: jenkins Date: Mon, 20 Apr 2026 15:20:56 -0300 Subject: [PATCH] quality(titan-iac): split metrics publisher and harden gate lint --- ci/scripts/publish_test_metrics.py | 206 +----------------- ci/scripts/publish_test_metrics_quality.py | 200 +++++++++++++++++ testing/quality_gate.py | 194 +++++++++++++++++ testing/tests/test_publish_test_metrics.py | 6 + .../tests/test_publish_test_metrics_paths.py | 134 ++++++++++++ 5 files changed, 544 insertions(+), 196 deletions(-) create mode 100644 ci/scripts/publish_test_metrics_quality.py create mode 100644 testing/tests/test_publish_test_metrics_paths.py diff --git a/ci/scripts/publish_test_metrics.py b/ci/scripts/publish_test_metrics.py index 3fc76054..4dc6ced7 100644 --- a/ci/scripts/publish_test_metrics.py +++ b/ci/scripts/publish_test_metrics.py @@ -6,24 +6,21 @@ from __future__ import annotations import json import os from glob import glob -from pathlib import Path import urllib.error import urllib.request import xml.etree.ElementTree as ET -SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"} -NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"} -FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"} +from ci.scripts import publish_test_metrics_quality as _quality_helpers -CANONICAL_CHECKS = [ - "tests", - "coverage", - "loc", - "docs_naming", - "gate_glue", - "sonarqube", - "supply_chain", -] +CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS +_build_check_statuses = _quality_helpers._build_check_statuses +_combine_statuses = _quality_helpers._combine_statuses +_infer_sonarqube_status = _quality_helpers._infer_sonarqube_status +_infer_source_lines_over_500 = _quality_helpers._infer_source_lines_over_500 +_infer_supply_chain_status = _quality_helpers._infer_supply_chain_status +_infer_workspace_coverage_percent = _quality_helpers._infer_workspace_coverage_percent +_load_optional_json = _quality_helpers._load_optional_json +_normalize_result_status = _quality_helpers._normalize_result_status def _escape_label(value: str) -> str: @@ -159,189 +156,6 @@ def _summary_int(summary: dict, key: str) -> int: return 0 -def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float: - """Infer workspace line coverage from quality summary coverage XML metadata.""" - results = summary.get("results", []) if isinstance(summary, dict) else [] - coverage_xml = default_xml - for result in results: - if not isinstance(result, dict): - continue - if str(result.get("name") or "").strip().lower() != "coverage": - continue - candidate = str(result.get("coverage_xml") or "").strip() - if candidate: - coverage_xml = candidate - break - xml_path = Path(coverage_xml) - if not xml_path.exists(): - return 0.0 - try: - root = ET.parse(xml_path).getroot() - line_rate = root.attrib.get("line-rate") - if line_rate is None: - return 0.0 - return float(line_rate) * 100.0 - except (ET.ParseError, OSError, ValueError): - return 0.0 - - -def _infer_source_lines_over_500(summary: dict) -> int: - """Infer over-limit source file count from hygiene issue payloads.""" - results = summary.get("results", []) if isinstance(summary, dict) else [] - for result in results: - if not isinstance(result, dict): - continue - if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}: - continue - issues = result.get("issues") - if not isinstance(issues, list): - continue - return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds")) - return 0 - - -def _normalize_result_status(value: str | None, default: str = "failed") -> str: - """Map arbitrary check status text into canonical check result buckets.""" - if not value: - return default - normalized = value.strip().lower() - if normalized in SUCCESS_STATUSES: - return "ok" - if normalized in NOT_APPLICABLE_STATUSES: - return "not_applicable" - if normalized in FAILED_STATUSES: - return "failed" - return default - - -def _load_optional_json(path: str | None) -> dict: - """Load an optional JSON report file, returning an empty object when absent.""" - if not path: - return {} - candidate = Path(path) - if not candidate.exists(): - return {} - try: - return json.loads(candidate.read_text(encoding="utf-8")) - except json.JSONDecodeError: - return {} - - -def _combine_statuses(statuses: list[str]) -> str: - """Roll up many check statuses into one canonical result.""" - if not statuses: - return "not_applicable" - if any(status == "failed" for status in statuses): - return "failed" - if all(status == "not_applicable" for status in statuses): - return "not_applicable" - if all(status in {"ok", "not_applicable"} for status in statuses): - return "ok" - return "failed" - - -def _infer_sonarqube_status(report: dict) -> str: - """Infer canonical SonarQube check status from its JSON report payload.""" - if not report: - return "not_applicable" - status = ( - report.get("projectStatus", {}).get("status") - or report.get("qualityGate", {}).get("status") - or report.get("status") - ) - return _normalize_result_status(str(status) if status is not None else None, default="failed") - - -def _infer_supply_chain_status(report: dict, required: bool) -> str: - """Infer canonical supply-chain status from IronBank/artifact report payload.""" - if not report: - return "failed" if required else "not_applicable" - compliant = report.get("compliant") - if isinstance(compliant, bool): - return "ok" if compliant else "failed" - status = report.get("status") - if status is None: - return "failed" if required else "not_applicable" - normalized = _normalize_result_status(str(status), default="failed") - if normalized == "not_applicable" and required: - return "failed" - return normalized - - -def _build_check_statuses( - summary: dict | None, - tests: dict[str, int], - workspace_line_coverage_percent: float, - source_lines_over_500: int, - sonarqube_report: dict, - supply_chain_report: dict, - supply_chain_required: bool, -) -> dict[str, str]: - """Generate the canonical quality-check status map for dashboarding.""" - raw_results = summary.get("results", []) if isinstance(summary, dict) else [] - status_by_name: dict[str, str] = {} - for result in raw_results: - if not isinstance(result, dict): - continue - check_name = str(result.get("name") or "").strip().lower() - if not check_name: - continue - status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed") - - # tests - tests_status = status_by_name.get("tests") - if not tests_status: - candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"] - candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name] - if candidates: - tests_status = _combine_statuses(candidates) - elif tests["tests"] > 0: - tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed" - else: - tests_status = "not_applicable" - - # coverage - coverage_status = status_by_name.get("coverage") - if not coverage_status: - if workspace_line_coverage_percent > 0: - coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed" - else: - coverage_status = "not_applicable" - - # loc - loc_status = status_by_name.get("loc") - if not loc_status: - loc_status = "ok" if source_lines_over_500 == 0 else "failed" - - # docs + naming + lint hygiene - docs_naming_status = status_by_name.get("docs_naming") - if not docs_naming_status: - candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name] - docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable" - - # gate glue - gate_glue_status = status_by_name.get("gate_glue") - if not gate_glue_status: - candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name] - gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable" - - sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report) - supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status( - supply_chain_report, - required=supply_chain_required, - ) - - return { - "tests": tests_status, - "coverage": coverage_status, - "loc": loc_status, - "docs_naming": docs_naming_status, - "gate_glue": gate_glue_status, - "sonarqube": sonarqube_status, - "supply_chain": supply_chain_status, - } - - def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: """Return the current counter value for a labeled metric if present.""" text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics") diff --git a/ci/scripts/publish_test_metrics_quality.py b/ci/scripts/publish_test_metrics_quality.py new file mode 100644 index 00000000..09fe64e6 --- /dev/null +++ b/ci/scripts/publish_test_metrics_quality.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +"""Quality/status helpers for publish_test_metrics.""" + +from __future__ import annotations + +import json +from pathlib import Path +import xml.etree.ElementTree as ET + +SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"} +NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"} +FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"} + +CANONICAL_CHECKS = [ + "tests", + "coverage", + "loc", + "docs_naming", + "gate_glue", + "sonarqube", + "supply_chain", +] + + +def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float: + """Infer workspace line coverage from quality summary coverage XML metadata.""" + results = summary.get("results", []) if isinstance(summary, dict) else [] + coverage_xml = default_xml + for result in results: + if not isinstance(result, dict): + continue + if str(result.get("name") or "").strip().lower() != "coverage": + continue + candidate = str(result.get("coverage_xml") or "").strip() + if candidate: + coverage_xml = candidate + break + xml_path = Path(coverage_xml) + if not xml_path.exists(): + return 0.0 + try: + root = ET.parse(xml_path).getroot() + line_rate = root.attrib.get("line-rate") + if line_rate is None: + return 0.0 + return float(line_rate) * 100.0 + except (ET.ParseError, OSError, ValueError): + return 0.0 + + +def _infer_source_lines_over_500(summary: dict) -> int: + """Infer over-limit source file count from hygiene issue payloads.""" + results = summary.get("results", []) if isinstance(summary, dict) else [] + for result in results: + if not isinstance(result, dict): + continue + if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}: + continue + issues = result.get("issues") + if not isinstance(issues, list): + continue + return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds")) + return 0 + + +def _normalize_result_status(value: str | None, default: str = "failed") -> str: + """Map arbitrary check status text into canonical check result buckets.""" + if not value: + return default + normalized = value.strip().lower() + if normalized in SUCCESS_STATUSES: + return "ok" + if normalized in NOT_APPLICABLE_STATUSES: + return "not_applicable" + if normalized in FAILED_STATUSES: + return "failed" + return default + + +def _load_optional_json(path: str | None) -> dict: + """Load an optional JSON report file, returning an empty object when absent.""" + if not path: + return {} + candidate = Path(path) + if not candidate.exists(): + return {} + try: + return json.loads(candidate.read_text(encoding="utf-8")) + except json.JSONDecodeError: + return {} + + +def _combine_statuses(statuses: list[str]) -> str: + """Roll up many check statuses into one canonical result.""" + if not statuses: + return "not_applicable" + if any(status == "failed" for status in statuses): + return "failed" + if all(status == "not_applicable" for status in statuses): + return "not_applicable" + if all(status in {"ok", "not_applicable"} for status in statuses): + return "ok" + return "failed" + + +def _infer_sonarqube_status(report: dict) -> str: + """Infer canonical SonarQube check status from its JSON report payload.""" + if not report: + return "not_applicable" + status = ( + report.get("projectStatus", {}).get("status") + or report.get("qualityGate", {}).get("status") + or report.get("status") + ) + return _normalize_result_status(str(status) if status is not None else None, default="failed") + + +def _infer_supply_chain_status(report: dict, required: bool) -> str: + """Infer canonical supply-chain status from IronBank/artifact report payload.""" + if not report: + return "failed" if required else "not_applicable" + compliant = report.get("compliant") + if isinstance(compliant, bool): + return "ok" if compliant else "failed" + status = report.get("status") + if status is None: + return "failed" if required else "not_applicable" + normalized = _normalize_result_status(str(status), default="failed") + if normalized == "not_applicable" and required: + return "failed" + return normalized + + +def _build_check_statuses( + summary: dict | None, + tests: dict[str, int], + workspace_line_coverage_percent: float, + source_lines_over_500: int, + sonarqube_report: dict, + supply_chain_report: dict, + supply_chain_required: bool, +) -> dict[str, str]: + """Generate the canonical quality-check status map for dashboarding.""" + raw_results = summary.get("results", []) if isinstance(summary, dict) else [] + status_by_name: dict[str, str] = {} + for result in raw_results: + if not isinstance(result, dict): + continue + check_name = str(result.get("name") or "").strip().lower() + if not check_name: + continue + status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed") + + tests_status = status_by_name.get("tests") + if not tests_status: + candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"] + candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name] + if candidates: + tests_status = _combine_statuses(candidates) + elif tests["tests"] > 0: + tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed" + else: + tests_status = "not_applicable" + + coverage_status = status_by_name.get("coverage") + if not coverage_status: + if workspace_line_coverage_percent > 0: + coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed" + else: + coverage_status = "not_applicable" + + loc_status = status_by_name.get("loc") + if not loc_status: + loc_status = "ok" if source_lines_over_500 == 0 else "failed" + + docs_naming_status = status_by_name.get("docs_naming") + if not docs_naming_status: + candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name] + docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable" + + gate_glue_status = status_by_name.get("gate_glue") + if not gate_glue_status: + candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name] + gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable" + + sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report) + supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status( + supply_chain_report, + required=supply_chain_required, + ) + + return { + "tests": tests_status, + "coverage": coverage_status, + "loc": loc_status, + "docs_naming": docs_naming_status, + "gate_glue": gate_glue_status, + "sonarqube": sonarqube_status, + "supply_chain": supply_chain_status, + } diff --git a/testing/quality_gate.py b/testing/quality_gate.py index cff6a659..ed85ac8e 100644 --- a/testing/quality_gate.py +++ b/testing/quality_gate.py @@ -3,10 +3,15 @@ from __future__ import annotations import argparse +import base64 import json +import os import subprocess import sys import time +import urllib.error +import urllib.parse +import urllib.request from pathlib import Path from typing import Any @@ -20,6 +25,189 @@ RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"] RUFF_IGNORE = ["B017", "UP015", "UP035"] +def _env_flag(name: str, default: bool) -> bool: + """Parse a boolean-like environment variable.""" + raw = os.getenv(name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +def _load_json_report(path: Path) -> tuple[dict[str, Any] | None, str | None]: + """Return parsed JSON report contents or a descriptive error.""" + if not path.exists(): + return None, f"report missing: {path}" + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + return None, f"report invalid JSON: {path} ({exc})" + if not isinstance(payload, dict): + return None, f"report payload must be an object: {path}" + return payload, None + + +def _sonarqube_gate_status_from_report(payload: dict[str, Any]) -> str: + """Extract a SonarQube quality-gate status from a report payload.""" + project_status = payload.get("projectStatus") + if isinstance(project_status, dict): + status = project_status.get("status") + if isinstance(status, str): + return status + status = payload.get("status") + if isinstance(status, str): + return status + return "" + + +def _fetch_sonarqube_gate_status( + host_url: str, + project_key: str, + token: str, + timeout_seconds: float, +) -> tuple[str, str | None]: + """Query SonarQube for the project's current quality-gate status.""" + query = urllib.parse.urlencode({"projectKey": project_key}) + request = urllib.request.Request( + f"{host_url.rstrip('/')}/api/qualitygates/project_status?{query}", + method="GET", + ) + if token: + encoded = base64.b64encode(f"{token}:".encode()).decode() + request.add_header("Authorization", f"Basic {encoded}") + try: + with urllib.request.urlopen(request, timeout=timeout_seconds) as response: + payload = json.loads(response.read().decode("utf-8")) + except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc: + return "", f"sonarqube query failed: {exc}" + if not isinstance(payload, dict): + return "", "sonarqube query returned non-object payload" + status = _sonarqube_gate_status_from_report(payload) + if status: + return status, None + return "", "sonarqube response missing projectStatus.status" + + +def _run_sonarqube_check(build_dir: Path) -> dict[str, Any]: + """Enforce SonarQube quality gate using report or API evidence.""" + enforce = _env_flag("QUALITY_GATE_SONARQUBE_ENFORCE", default=True) + report_rel = os.getenv( + "QUALITY_GATE_SONARQUBE_REPORT", + str(build_dir / "sonarqube-quality-gate.json"), + ) + report_path = Path(report_rel) + if not report_path.is_absolute(): + report_path = Path.cwd() / report_path + host_url = os.getenv("SONARQUBE_HOST_URL", "").strip() + project_key = os.getenv("SONARQUBE_PROJECT_KEY", "").strip() + token = os.getenv("SONARQUBE_TOKEN", "").strip() + timeout_seconds = float(os.getenv("QUALITY_GATE_SONARQUBE_TIMEOUT_SECONDS", "12")) + + gate_status = "" + source = "" + issues: list[str] = [] + + report_payload, report_error = _load_json_report(report_path) + if report_payload is not None: + gate_status = _sonarqube_gate_status_from_report(report_payload).strip() + source = "report" + if not gate_status: + issues.append("sonarqube report missing quality gate status") + elif report_error: + if host_url and project_key: + gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds) + source = "api" + if query_error: + issues.append(query_error) + else: + issues.append(report_error) + + if not source and host_url and project_key: + gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds) + source = "api" + if query_error: + issues.append(query_error) + + normalized = gate_status.upper() + passed = normalized in {"OK", "PASS", "PASSED"} + if enforce and not passed: + if gate_status: + issues.append(f"sonarqube gate is {gate_status}, expected OK") + else: + issues.append("sonarqube gate status unavailable") + + status = "ok" if (passed or not enforce) and not issues else "failed" + return _result( + "sonarqube", + "SonarQube quality gate must pass for the current project.", + status, + enforce=enforce, + source=source or "none", + gate_status=gate_status or "unknown", + report_path=str(report_path), + issues=issues, + ) + + +def _ironbank_status_from_report(payload: dict[str, Any]) -> tuple[str, bool | None]: + """Extract a compliance status and explicit compliance flag from report payload.""" + for key in ("status", "result", "compliance", "compliance_status"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + return value.strip(), None + compliant = payload.get("compliant") + if isinstance(compliant, bool): + return "compliant" if compliant else "noncompliant", compliant + return "", None + + +def _run_ironbank_check(build_dir: Path) -> dict[str, Any]: + """Enforce Iron Bank image-hardening compliance from build evidence.""" + enforce = _env_flag("QUALITY_GATE_IRONBANK_ENFORCE", default=True) + required = _env_flag("QUALITY_GATE_IRONBANK_REQUIRED", default=True) + report_rel = os.getenv( + "QUALITY_GATE_IRONBANK_REPORT", + str(build_dir / "ironbank-compliance.json"), + ) + report_path = Path(report_rel) + if not report_path.is_absolute(): + report_path = Path.cwd() / report_path + + issues: list[str] = [] + status_value = "" + compliant: bool | None = None + source = "none" + + report_payload, report_error = _load_json_report(report_path) + if report_payload is not None: + status_value, compliant = _ironbank_status_from_report(report_payload) + source = "report" + elif required: + issues.append(report_error or f"report missing: {report_path}") + + normalized = status_value.strip().lower() + passed_status = normalized in {"ok", "pass", "passed", "compliant", "true"} + passed = compliant is True or passed_status + + if enforce and required and not passed: + if status_value: + issues.append(f"ironbank compliance is {status_value}, expected compliant") + elif not issues: + issues.append("ironbank compliance status unavailable") + + status = "ok" if (passed or not enforce or not required) and not issues else "failed" + return _result( + "ironbank", + "Iron Bank image-hardening compliance must pass for build artifacts.", + status, + enforce=enforce, + required=required, + source=source, + compliance=status_value or "unknown", + report_path=str(report_path), + issues=issues, + ) + + def _status_from_issues(issues: list[str]) -> str: return "ok" if not issues else "failed" @@ -140,6 +328,12 @@ def run_profile( ) ) continue + if check_name == "sonarqube": + results.append(_run_sonarqube_check(build_dir)) + continue + if check_name == "ironbank": + results.append(_run_ironbank_check(build_dir)) + continue suite = contract.get("pytest_suites", {}).get(check_name) if suite is None: raise SystemExit(f"profile {profile_name} references unknown check: {check_name}") diff --git a/testing/tests/test_publish_test_metrics.py b/testing/tests/test_publish_test_metrics.py index 74eaab72..b8b170a5 100644 --- a/testing/tests/test_publish_test_metrics.py +++ b/testing/tests/test_publish_test_metrics.py @@ -1,3 +1,5 @@ +"""Unit tests for core test-metrics parsing and quality signal helpers.""" + from __future__ import annotations import json @@ -70,6 +72,8 @@ def test_parse_junit_handles_unknown_root(tmp_path: Path): } + + def test_read_exit_code_and_summary_fallbacks(tmp_path: Path): rc_path = tmp_path / "rc.txt" rc_path.write_text("0\n", encoding="utf-8") @@ -361,6 +365,8 @@ def test_build_payload_omits_checks_block_without_check_statuses(): assert "titan_iac_quality_gate_checks_total" not in payload + + def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypatch): build_dir = tmp_path / "build" build_dir.mkdir() diff --git a/testing/tests/test_publish_test_metrics_paths.py b/testing/tests/test_publish_test_metrics_paths.py new file mode 100644 index 00000000..39acd796 --- /dev/null +++ b/testing/tests/test_publish_test_metrics_paths.py @@ -0,0 +1,134 @@ +"""Unit tests for test-case path parsing and fallback metric labeling behavior.""" + +from __future__ import annotations + +import json +from pathlib import Path + +from ci.scripts import publish_test_metrics + + +def test_collect_junit_cases_handles_missing_files_and_multiple_root_shapes(tmp_path: Path, monkeypatch): + missing = tmp_path / "missing.xml" + testsuites_path = tmp_path / "suite-cases.xml" + testsuites_path.write_text( + ( + "" + '' + '' + '' + '' + "" + "" + ), + encoding="utf-8", + ) + unknown_root_path = tmp_path / "unknown-root.xml" + unknown_root_path.write_text("", encoding="utf-8") + + monkeypatch.setattr( + publish_test_metrics, + "glob", + lambda _pattern: [str(missing), str(testsuites_path), str(unknown_root_path)], + ) + + cases = publish_test_metrics._collect_junit_cases("ignored-glob") + + assert ("alpha.ok_case", "passed") in cases + assert ("alpha.skip_case", "skipped") in cases + assert not any(name.endswith("ignored") for name, _ in cases) + + +def test_build_payload_includes_explicit_test_case_series(): + payload = publish_test_metrics._build_payload( + suite="titan-iac", + status="ok", + tests={"tests": 2, "failures": 1, "errors": 0, "skipped": 0}, + test_cases=[("alpha::case_one", "failed"), ("beta::case_two", "passed")], + ok_count=3, + failed_count=1, + branch="main", + build_number="5", + workspace_line_coverage_percent=95.0, + source_lines_over_500=0, + check_statuses={"tests": "failed"}, + ) + + assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="alpha::case_one",status="failed"} 1' in payload + assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="beta::case_two",status="passed"} 1' in payload + + +def test_main_uses_reported_coverage_and_loc_without_fallback(tmp_path: Path, monkeypatch, capsys): + build_dir = tmp_path / "build" + build_dir.mkdir() + (build_dir / "junit.xml").write_text( + '', + encoding="utf-8", + ) + (build_dir / "quality-gate.rc").write_text("0\n", encoding="utf-8") + (build_dir / "quality-gate-summary.json").write_text( + json.dumps( + { + "workspace_line_coverage_percent": 99.5, + "source_lines_over_500": 2, + "results": [], + } + ), + encoding="utf-8", + ) + + monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml")) + monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc")) + monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json")) + monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0) + monkeypatch.setattr(publish_test_metrics, "_post_text", lambda *args, **kwargs: None) + monkeypatch.setattr( + publish_test_metrics, + "_infer_workspace_coverage_percent", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer coverage")), + ) + monkeypatch.setattr( + publish_test_metrics, + "_infer_source_lines_over_500", + lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer loc")), + ) + + rc = publish_test_metrics.main() + + summary = json.loads(capsys.readouterr().out) + assert rc == 0 + assert summary["workspace_line_coverage_percent"] == 99.5 + assert summary["source_lines_over_500"] == 2 + + +def test_main_falls_back_to_inferred_coverage_and_loc(tmp_path: Path, monkeypatch, capsys): + build_dir = tmp_path / "build" + build_dir.mkdir() + (build_dir / "junit.xml").write_text( + '', + encoding="utf-8", + ) + (build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8") + (build_dir / "quality-gate-summary.json").write_text( + json.dumps({"workspace_line_coverage_percent": 0.0, "source_lines_over_500": 0, "results": []}), + encoding="utf-8", + ) + + posted = {} + monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml")) + monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc")) + monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json")) + monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0) + monkeypatch.setattr(publish_test_metrics, "_post_text", lambda url, payload: posted.update({"url": url, "payload": payload})) + monkeypatch.setattr(publish_test_metrics, "_infer_workspace_coverage_percent", lambda *args, **kwargs: 96.4) + monkeypatch.setattr(publish_test_metrics, "_infer_source_lines_over_500", lambda *args, **kwargs: 7) + + rc = publish_test_metrics.main() + + summary = json.loads(capsys.readouterr().out) + assert rc == 0 + assert summary["status"] == "failed" + assert summary["workspace_line_coverage_percent"] == 96.4 + assert summary["source_lines_over_500"] == 7 + assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan_iac"} 96.400' in posted["payload"] + assert 'platform_quality_gate_source_lines_over_500_total{suite="titan_iac"} 7' in posted["payload"]