diff --git a/ci/scripts/publish_test_metrics.py b/ci/scripts/publish_test_metrics.py
index 3fc76054..4dc6ced7 100644
--- a/ci/scripts/publish_test_metrics.py
+++ b/ci/scripts/publish_test_metrics.py
@@ -6,24 +6,21 @@ from __future__ import annotations
import json
import os
from glob import glob
-from pathlib import Path
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
-SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
-NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
-FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
+from ci.scripts import publish_test_metrics_quality as _quality_helpers
-CANONICAL_CHECKS = [
- "tests",
- "coverage",
- "loc",
- "docs_naming",
- "gate_glue",
- "sonarqube",
- "supply_chain",
-]
+CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS
+_build_check_statuses = _quality_helpers._build_check_statuses
+_combine_statuses = _quality_helpers._combine_statuses
+_infer_sonarqube_status = _quality_helpers._infer_sonarqube_status
+_infer_source_lines_over_500 = _quality_helpers._infer_source_lines_over_500
+_infer_supply_chain_status = _quality_helpers._infer_supply_chain_status
+_infer_workspace_coverage_percent = _quality_helpers._infer_workspace_coverage_percent
+_load_optional_json = _quality_helpers._load_optional_json
+_normalize_result_status = _quality_helpers._normalize_result_status
def _escape_label(value: str) -> str:
@@ -159,189 +156,6 @@ def _summary_int(summary: dict, key: str) -> int:
return 0
-def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float:
- """Infer workspace line coverage from quality summary coverage XML metadata."""
- results = summary.get("results", []) if isinstance(summary, dict) else []
- coverage_xml = default_xml
- for result in results:
- if not isinstance(result, dict):
- continue
- if str(result.get("name") or "").strip().lower() != "coverage":
- continue
- candidate = str(result.get("coverage_xml") or "").strip()
- if candidate:
- coverage_xml = candidate
- break
- xml_path = Path(coverage_xml)
- if not xml_path.exists():
- return 0.0
- try:
- root = ET.parse(xml_path).getroot()
- line_rate = root.attrib.get("line-rate")
- if line_rate is None:
- return 0.0
- return float(line_rate) * 100.0
- except (ET.ParseError, OSError, ValueError):
- return 0.0
-
-
-def _infer_source_lines_over_500(summary: dict) -> int:
- """Infer over-limit source file count from hygiene issue payloads."""
- results = summary.get("results", []) if isinstance(summary, dict) else []
- for result in results:
- if not isinstance(result, dict):
- continue
- if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}:
- continue
- issues = result.get("issues")
- if not isinstance(issues, list):
- continue
- return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds"))
- return 0
-
-
-def _normalize_result_status(value: str | None, default: str = "failed") -> str:
- """Map arbitrary check status text into canonical check result buckets."""
- if not value:
- return default
- normalized = value.strip().lower()
- if normalized in SUCCESS_STATUSES:
- return "ok"
- if normalized in NOT_APPLICABLE_STATUSES:
- return "not_applicable"
- if normalized in FAILED_STATUSES:
- return "failed"
- return default
-
-
-def _load_optional_json(path: str | None) -> dict:
- """Load an optional JSON report file, returning an empty object when absent."""
- if not path:
- return {}
- candidate = Path(path)
- if not candidate.exists():
- return {}
- try:
- return json.loads(candidate.read_text(encoding="utf-8"))
- except json.JSONDecodeError:
- return {}
-
-
-def _combine_statuses(statuses: list[str]) -> str:
- """Roll up many check statuses into one canonical result."""
- if not statuses:
- return "not_applicable"
- if any(status == "failed" for status in statuses):
- return "failed"
- if all(status == "not_applicable" for status in statuses):
- return "not_applicable"
- if all(status in {"ok", "not_applicable"} for status in statuses):
- return "ok"
- return "failed"
-
-
-def _infer_sonarqube_status(report: dict) -> str:
- """Infer canonical SonarQube check status from its JSON report payload."""
- if not report:
- return "not_applicable"
- status = (
- report.get("projectStatus", {}).get("status")
- or report.get("qualityGate", {}).get("status")
- or report.get("status")
- )
- return _normalize_result_status(str(status) if status is not None else None, default="failed")
-
-
-def _infer_supply_chain_status(report: dict, required: bool) -> str:
- """Infer canonical supply-chain status from IronBank/artifact report payload."""
- if not report:
- return "failed" if required else "not_applicable"
- compliant = report.get("compliant")
- if isinstance(compliant, bool):
- return "ok" if compliant else "failed"
- status = report.get("status")
- if status is None:
- return "failed" if required else "not_applicable"
- normalized = _normalize_result_status(str(status), default="failed")
- if normalized == "not_applicable" and required:
- return "failed"
- return normalized
-
-
-def _build_check_statuses(
- summary: dict | None,
- tests: dict[str, int],
- workspace_line_coverage_percent: float,
- source_lines_over_500: int,
- sonarqube_report: dict,
- supply_chain_report: dict,
- supply_chain_required: bool,
-) -> dict[str, str]:
- """Generate the canonical quality-check status map for dashboarding."""
- raw_results = summary.get("results", []) if isinstance(summary, dict) else []
- status_by_name: dict[str, str] = {}
- for result in raw_results:
- if not isinstance(result, dict):
- continue
- check_name = str(result.get("name") or "").strip().lower()
- if not check_name:
- continue
- status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
-
- # tests
- tests_status = status_by_name.get("tests")
- if not tests_status:
- candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
- candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
- if candidates:
- tests_status = _combine_statuses(candidates)
- elif tests["tests"] > 0:
- tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
- else:
- tests_status = "not_applicable"
-
- # coverage
- coverage_status = status_by_name.get("coverage")
- if not coverage_status:
- if workspace_line_coverage_percent > 0:
- coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
- else:
- coverage_status = "not_applicable"
-
- # loc
- loc_status = status_by_name.get("loc")
- if not loc_status:
- loc_status = "ok" if source_lines_over_500 == 0 else "failed"
-
- # docs + naming + lint hygiene
- docs_naming_status = status_by_name.get("docs_naming")
- if not docs_naming_status:
- candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
- docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
-
- # gate glue
- gate_glue_status = status_by_name.get("gate_glue")
- if not gate_glue_status:
- candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
- gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
-
- sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
- supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
- supply_chain_report,
- required=supply_chain_required,
- )
-
- return {
- "tests": tests_status,
- "coverage": coverage_status,
- "loc": loc_status,
- "docs_naming": docs_naming_status,
- "gate_glue": gate_glue_status,
- "sonarqube": sonarqube_status,
- "supply_chain": supply_chain_status,
- }
-
-
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
"""Return the current counter value for a labeled metric if present."""
text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics")
diff --git a/ci/scripts/publish_test_metrics_quality.py b/ci/scripts/publish_test_metrics_quality.py
new file mode 100644
index 00000000..09fe64e6
--- /dev/null
+++ b/ci/scripts/publish_test_metrics_quality.py
@@ -0,0 +1,200 @@
+#!/usr/bin/env python3
+"""Quality/status helpers for publish_test_metrics."""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+import xml.etree.ElementTree as ET
+
+SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
+NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
+FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
+
+CANONICAL_CHECKS = [
+ "tests",
+ "coverage",
+ "loc",
+ "docs_naming",
+ "gate_glue",
+ "sonarqube",
+ "supply_chain",
+]
+
+
+def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float:
+ """Infer workspace line coverage from quality summary coverage XML metadata."""
+ results = summary.get("results", []) if isinstance(summary, dict) else []
+ coverage_xml = default_xml
+ for result in results:
+ if not isinstance(result, dict):
+ continue
+ if str(result.get("name") or "").strip().lower() != "coverage":
+ continue
+ candidate = str(result.get("coverage_xml") or "").strip()
+ if candidate:
+ coverage_xml = candidate
+ break
+ xml_path = Path(coverage_xml)
+ if not xml_path.exists():
+ return 0.0
+ try:
+ root = ET.parse(xml_path).getroot()
+ line_rate = root.attrib.get("line-rate")
+ if line_rate is None:
+ return 0.0
+ return float(line_rate) * 100.0
+ except (ET.ParseError, OSError, ValueError):
+ return 0.0
+
+
+def _infer_source_lines_over_500(summary: dict) -> int:
+ """Infer over-limit source file count from hygiene issue payloads."""
+ results = summary.get("results", []) if isinstance(summary, dict) else []
+ for result in results:
+ if not isinstance(result, dict):
+ continue
+ if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}:
+ continue
+ issues = result.get("issues")
+ if not isinstance(issues, list):
+ continue
+ return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds"))
+ return 0
+
+
+def _normalize_result_status(value: str | None, default: str = "failed") -> str:
+ """Map arbitrary check status text into canonical check result buckets."""
+ if not value:
+ return default
+ normalized = value.strip().lower()
+ if normalized in SUCCESS_STATUSES:
+ return "ok"
+ if normalized in NOT_APPLICABLE_STATUSES:
+ return "not_applicable"
+ if normalized in FAILED_STATUSES:
+ return "failed"
+ return default
+
+
+def _load_optional_json(path: str | None) -> dict:
+ """Load an optional JSON report file, returning an empty object when absent."""
+ if not path:
+ return {}
+ candidate = Path(path)
+ if not candidate.exists():
+ return {}
+ try:
+ return json.loads(candidate.read_text(encoding="utf-8"))
+ except json.JSONDecodeError:
+ return {}
+
+
+def _combine_statuses(statuses: list[str]) -> str:
+ """Roll up many check statuses into one canonical result."""
+ if not statuses:
+ return "not_applicable"
+ if any(status == "failed" for status in statuses):
+ return "failed"
+ if all(status == "not_applicable" for status in statuses):
+ return "not_applicable"
+ if all(status in {"ok", "not_applicable"} for status in statuses):
+ return "ok"
+ return "failed"
+
+
+def _infer_sonarqube_status(report: dict) -> str:
+ """Infer canonical SonarQube check status from its JSON report payload."""
+ if not report:
+ return "not_applicable"
+ status = (
+ report.get("projectStatus", {}).get("status")
+ or report.get("qualityGate", {}).get("status")
+ or report.get("status")
+ )
+ return _normalize_result_status(str(status) if status is not None else None, default="failed")
+
+
+def _infer_supply_chain_status(report: dict, required: bool) -> str:
+ """Infer canonical supply-chain status from IronBank/artifact report payload."""
+ if not report:
+ return "failed" if required else "not_applicable"
+ compliant = report.get("compliant")
+ if isinstance(compliant, bool):
+ return "ok" if compliant else "failed"
+ status = report.get("status")
+ if status is None:
+ return "failed" if required else "not_applicable"
+ normalized = _normalize_result_status(str(status), default="failed")
+ if normalized == "not_applicable" and required:
+ return "failed"
+ return normalized
+
+
+def _build_check_statuses(
+ summary: dict | None,
+ tests: dict[str, int],
+ workspace_line_coverage_percent: float,
+ source_lines_over_500: int,
+ sonarqube_report: dict,
+ supply_chain_report: dict,
+ supply_chain_required: bool,
+) -> dict[str, str]:
+ """Generate the canonical quality-check status map for dashboarding."""
+ raw_results = summary.get("results", []) if isinstance(summary, dict) else []
+ status_by_name: dict[str, str] = {}
+ for result in raw_results:
+ if not isinstance(result, dict):
+ continue
+ check_name = str(result.get("name") or "").strip().lower()
+ if not check_name:
+ continue
+ status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
+
+ tests_status = status_by_name.get("tests")
+ if not tests_status:
+ candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
+ candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
+ if candidates:
+ tests_status = _combine_statuses(candidates)
+ elif tests["tests"] > 0:
+ tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
+ else:
+ tests_status = "not_applicable"
+
+ coverage_status = status_by_name.get("coverage")
+ if not coverage_status:
+ if workspace_line_coverage_percent > 0:
+ coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
+ else:
+ coverage_status = "not_applicable"
+
+ loc_status = status_by_name.get("loc")
+ if not loc_status:
+ loc_status = "ok" if source_lines_over_500 == 0 else "failed"
+
+ docs_naming_status = status_by_name.get("docs_naming")
+ if not docs_naming_status:
+ candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
+ docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
+
+ gate_glue_status = status_by_name.get("gate_glue")
+ if not gate_glue_status:
+ candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
+ gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
+
+ sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
+ supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
+ supply_chain_report,
+ required=supply_chain_required,
+ )
+
+ return {
+ "tests": tests_status,
+ "coverage": coverage_status,
+ "loc": loc_status,
+ "docs_naming": docs_naming_status,
+ "gate_glue": gate_glue_status,
+ "sonarqube": sonarqube_status,
+ "supply_chain": supply_chain_status,
+ }
diff --git a/testing/quality_gate.py b/testing/quality_gate.py
index cff6a659..ed85ac8e 100644
--- a/testing/quality_gate.py
+++ b/testing/quality_gate.py
@@ -3,10 +3,15 @@
from __future__ import annotations
import argparse
+import base64
import json
+import os
import subprocess
import sys
import time
+import urllib.error
+import urllib.parse
+import urllib.request
from pathlib import Path
from typing import Any
@@ -20,6 +25,189 @@ RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"]
RUFF_IGNORE = ["B017", "UP015", "UP035"]
+def _env_flag(name: str, default: bool) -> bool:
+ """Parse a boolean-like environment variable."""
+ raw = os.getenv(name)
+ if raw is None:
+ return default
+ return raw.strip().lower() in {"1", "true", "yes", "on"}
+
+
+def _load_json_report(path: Path) -> tuple[dict[str, Any] | None, str | None]:
+ """Return parsed JSON report contents or a descriptive error."""
+ if not path.exists():
+ return None, f"report missing: {path}"
+ try:
+ payload = json.loads(path.read_text(encoding="utf-8"))
+ except json.JSONDecodeError as exc:
+ return None, f"report invalid JSON: {path} ({exc})"
+ if not isinstance(payload, dict):
+ return None, f"report payload must be an object: {path}"
+ return payload, None
+
+
+def _sonarqube_gate_status_from_report(payload: dict[str, Any]) -> str:
+ """Extract a SonarQube quality-gate status from a report payload."""
+ project_status = payload.get("projectStatus")
+ if isinstance(project_status, dict):
+ status = project_status.get("status")
+ if isinstance(status, str):
+ return status
+ status = payload.get("status")
+ if isinstance(status, str):
+ return status
+ return ""
+
+
+def _fetch_sonarqube_gate_status(
+ host_url: str,
+ project_key: str,
+ token: str,
+ timeout_seconds: float,
+) -> tuple[str, str | None]:
+ """Query SonarQube for the project's current quality-gate status."""
+ query = urllib.parse.urlencode({"projectKey": project_key})
+ request = urllib.request.Request(
+ f"{host_url.rstrip('/')}/api/qualitygates/project_status?{query}",
+ method="GET",
+ )
+ if token:
+ encoded = base64.b64encode(f"{token}:".encode()).decode()
+ request.add_header("Authorization", f"Basic {encoded}")
+ try:
+ with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
+ payload = json.loads(response.read().decode("utf-8"))
+ except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
+ return "", f"sonarqube query failed: {exc}"
+ if not isinstance(payload, dict):
+ return "", "sonarqube query returned non-object payload"
+ status = _sonarqube_gate_status_from_report(payload)
+ if status:
+ return status, None
+ return "", "sonarqube response missing projectStatus.status"
+
+
+def _run_sonarqube_check(build_dir: Path) -> dict[str, Any]:
+ """Enforce SonarQube quality gate using report or API evidence."""
+ enforce = _env_flag("QUALITY_GATE_SONARQUBE_ENFORCE", default=True)
+ report_rel = os.getenv(
+ "QUALITY_GATE_SONARQUBE_REPORT",
+ str(build_dir / "sonarqube-quality-gate.json"),
+ )
+ report_path = Path(report_rel)
+ if not report_path.is_absolute():
+ report_path = Path.cwd() / report_path
+ host_url = os.getenv("SONARQUBE_HOST_URL", "").strip()
+ project_key = os.getenv("SONARQUBE_PROJECT_KEY", "").strip()
+ token = os.getenv("SONARQUBE_TOKEN", "").strip()
+ timeout_seconds = float(os.getenv("QUALITY_GATE_SONARQUBE_TIMEOUT_SECONDS", "12"))
+
+ gate_status = ""
+ source = ""
+ issues: list[str] = []
+
+ report_payload, report_error = _load_json_report(report_path)
+ if report_payload is not None:
+ gate_status = _sonarqube_gate_status_from_report(report_payload).strip()
+ source = "report"
+ if not gate_status:
+ issues.append("sonarqube report missing quality gate status")
+ elif report_error:
+ if host_url and project_key:
+ gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds)
+ source = "api"
+ if query_error:
+ issues.append(query_error)
+ else:
+ issues.append(report_error)
+
+ if not source and host_url and project_key:
+ gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds)
+ source = "api"
+ if query_error:
+ issues.append(query_error)
+
+ normalized = gate_status.upper()
+ passed = normalized in {"OK", "PASS", "PASSED"}
+ if enforce and not passed:
+ if gate_status:
+ issues.append(f"sonarqube gate is {gate_status}, expected OK")
+ else:
+ issues.append("sonarqube gate status unavailable")
+
+ status = "ok" if (passed or not enforce) and not issues else "failed"
+ return _result(
+ "sonarqube",
+ "SonarQube quality gate must pass for the current project.",
+ status,
+ enforce=enforce,
+ source=source or "none",
+ gate_status=gate_status or "unknown",
+ report_path=str(report_path),
+ issues=issues,
+ )
+
+
+def _ironbank_status_from_report(payload: dict[str, Any]) -> tuple[str, bool | None]:
+ """Extract a compliance status and explicit compliance flag from report payload."""
+ for key in ("status", "result", "compliance", "compliance_status"):
+ value = payload.get(key)
+ if isinstance(value, str) and value.strip():
+ return value.strip(), None
+ compliant = payload.get("compliant")
+ if isinstance(compliant, bool):
+ return "compliant" if compliant else "noncompliant", compliant
+ return "", None
+
+
+def _run_ironbank_check(build_dir: Path) -> dict[str, Any]:
+ """Enforce Iron Bank image-hardening compliance from build evidence."""
+ enforce = _env_flag("QUALITY_GATE_IRONBANK_ENFORCE", default=True)
+ required = _env_flag("QUALITY_GATE_IRONBANK_REQUIRED", default=True)
+ report_rel = os.getenv(
+ "QUALITY_GATE_IRONBANK_REPORT",
+ str(build_dir / "ironbank-compliance.json"),
+ )
+ report_path = Path(report_rel)
+ if not report_path.is_absolute():
+ report_path = Path.cwd() / report_path
+
+ issues: list[str] = []
+ status_value = ""
+ compliant: bool | None = None
+ source = "none"
+
+ report_payload, report_error = _load_json_report(report_path)
+ if report_payload is not None:
+ status_value, compliant = _ironbank_status_from_report(report_payload)
+ source = "report"
+ elif required:
+ issues.append(report_error or f"report missing: {report_path}")
+
+ normalized = status_value.strip().lower()
+ passed_status = normalized in {"ok", "pass", "passed", "compliant", "true"}
+ passed = compliant is True or passed_status
+
+ if enforce and required and not passed:
+ if status_value:
+ issues.append(f"ironbank compliance is {status_value}, expected compliant")
+ elif not issues:
+ issues.append("ironbank compliance status unavailable")
+
+ status = "ok" if (passed or not enforce or not required) and not issues else "failed"
+ return _result(
+ "ironbank",
+ "Iron Bank image-hardening compliance must pass for build artifacts.",
+ status,
+ enforce=enforce,
+ required=required,
+ source=source,
+ compliance=status_value or "unknown",
+ report_path=str(report_path),
+ issues=issues,
+ )
+
+
def _status_from_issues(issues: list[str]) -> str:
return "ok" if not issues else "failed"
@@ -140,6 +328,12 @@ def run_profile(
)
)
continue
+ if check_name == "sonarqube":
+ results.append(_run_sonarqube_check(build_dir))
+ continue
+ if check_name == "ironbank":
+ results.append(_run_ironbank_check(build_dir))
+ continue
suite = contract.get("pytest_suites", {}).get(check_name)
if suite is None:
raise SystemExit(f"profile {profile_name} references unknown check: {check_name}")
diff --git a/testing/tests/test_publish_test_metrics.py b/testing/tests/test_publish_test_metrics.py
index 74eaab72..b8b170a5 100644
--- a/testing/tests/test_publish_test_metrics.py
+++ b/testing/tests/test_publish_test_metrics.py
@@ -1,3 +1,5 @@
+"""Unit tests for core test-metrics parsing and quality signal helpers."""
+
from __future__ import annotations
import json
@@ -70,6 +72,8 @@ def test_parse_junit_handles_unknown_root(tmp_path: Path):
}
+
+
def test_read_exit_code_and_summary_fallbacks(tmp_path: Path):
rc_path = tmp_path / "rc.txt"
rc_path.write_text("0\n", encoding="utf-8")
@@ -361,6 +365,8 @@ def test_build_payload_omits_checks_block_without_check_statuses():
assert "titan_iac_quality_gate_checks_total" not in payload
+
+
def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypatch):
build_dir = tmp_path / "build"
build_dir.mkdir()
diff --git a/testing/tests/test_publish_test_metrics_paths.py b/testing/tests/test_publish_test_metrics_paths.py
new file mode 100644
index 00000000..39acd796
--- /dev/null
+++ b/testing/tests/test_publish_test_metrics_paths.py
@@ -0,0 +1,134 @@
+"""Unit tests for test-case path parsing and fallback metric labeling behavior."""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+
+from ci.scripts import publish_test_metrics
+
+
+def test_collect_junit_cases_handles_missing_files_and_multiple_root_shapes(tmp_path: Path, monkeypatch):
+ missing = tmp_path / "missing.xml"
+ testsuites_path = tmp_path / "suite-cases.xml"
+ testsuites_path.write_text(
+ (
+ ""
+ ''
+ ''
+ ''
+ ''
+ ""
+ ""
+ ),
+ encoding="utf-8",
+ )
+ unknown_root_path = tmp_path / "unknown-root.xml"
+ unknown_root_path.write_text("", encoding="utf-8")
+
+ monkeypatch.setattr(
+ publish_test_metrics,
+ "glob",
+ lambda _pattern: [str(missing), str(testsuites_path), str(unknown_root_path)],
+ )
+
+ cases = publish_test_metrics._collect_junit_cases("ignored-glob")
+
+ assert ("alpha.ok_case", "passed") in cases
+ assert ("alpha.skip_case", "skipped") in cases
+ assert not any(name.endswith("ignored") for name, _ in cases)
+
+
+def test_build_payload_includes_explicit_test_case_series():
+ payload = publish_test_metrics._build_payload(
+ suite="titan-iac",
+ status="ok",
+ tests={"tests": 2, "failures": 1, "errors": 0, "skipped": 0},
+ test_cases=[("alpha::case_one", "failed"), ("beta::case_two", "passed")],
+ ok_count=3,
+ failed_count=1,
+ branch="main",
+ build_number="5",
+ workspace_line_coverage_percent=95.0,
+ source_lines_over_500=0,
+ check_statuses={"tests": "failed"},
+ )
+
+ assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="alpha::case_one",status="failed"} 1' in payload
+ assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="beta::case_two",status="passed"} 1' in payload
+
+
+def test_main_uses_reported_coverage_and_loc_without_fallback(tmp_path: Path, monkeypatch, capsys):
+ build_dir = tmp_path / "build"
+ build_dir.mkdir()
+ (build_dir / "junit.xml").write_text(
+ '',
+ encoding="utf-8",
+ )
+ (build_dir / "quality-gate.rc").write_text("0\n", encoding="utf-8")
+ (build_dir / "quality-gate-summary.json").write_text(
+ json.dumps(
+ {
+ "workspace_line_coverage_percent": 99.5,
+ "source_lines_over_500": 2,
+ "results": [],
+ }
+ ),
+ encoding="utf-8",
+ )
+
+ monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml"))
+ monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))
+ monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json"))
+ monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0)
+ monkeypatch.setattr(publish_test_metrics, "_post_text", lambda *args, **kwargs: None)
+ monkeypatch.setattr(
+ publish_test_metrics,
+ "_infer_workspace_coverage_percent",
+ lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer coverage")),
+ )
+ monkeypatch.setattr(
+ publish_test_metrics,
+ "_infer_source_lines_over_500",
+ lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer loc")),
+ )
+
+ rc = publish_test_metrics.main()
+
+ summary = json.loads(capsys.readouterr().out)
+ assert rc == 0
+ assert summary["workspace_line_coverage_percent"] == 99.5
+ assert summary["source_lines_over_500"] == 2
+
+
+def test_main_falls_back_to_inferred_coverage_and_loc(tmp_path: Path, monkeypatch, capsys):
+ build_dir = tmp_path / "build"
+ build_dir.mkdir()
+ (build_dir / "junit.xml").write_text(
+ '',
+ encoding="utf-8",
+ )
+ (build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8")
+ (build_dir / "quality-gate-summary.json").write_text(
+ json.dumps({"workspace_line_coverage_percent": 0.0, "source_lines_over_500": 0, "results": []}),
+ encoding="utf-8",
+ )
+
+ posted = {}
+ monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml"))
+ monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))
+ monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json"))
+ monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0)
+ monkeypatch.setattr(publish_test_metrics, "_post_text", lambda url, payload: posted.update({"url": url, "payload": payload}))
+ monkeypatch.setattr(publish_test_metrics, "_infer_workspace_coverage_percent", lambda *args, **kwargs: 96.4)
+ monkeypatch.setattr(publish_test_metrics, "_infer_source_lines_over_500", lambda *args, **kwargs: 7)
+
+ rc = publish_test_metrics.main()
+
+ summary = json.loads(capsys.readouterr().out)
+ assert rc == 0
+ assert summary["status"] == "failed"
+ assert summary["workspace_line_coverage_percent"] == 96.4
+ assert summary["source_lines_over_500"] == 7
+ assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan_iac"} 96.400' in posted["payload"]
+ assert 'platform_quality_gate_source_lines_over_500_total{suite="titan_iac"} 7' in posted["payload"]