quality(titan-iac): split metrics publisher and harden gate lint

This commit is contained in:
jenkins 2026-04-20 15:20:56 -03:00
parent d342053196
commit 9a86c350dd
5 changed files with 776 additions and 104 deletions

View File

@ -10,6 +10,18 @@ import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
from ci.scripts import publish_test_metrics_quality as _quality_helpers
CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS
_build_check_statuses = _quality_helpers._build_check_statuses
_combine_statuses = _quality_helpers._combine_statuses
_infer_sonarqube_status = _quality_helpers._infer_sonarqube_status
_infer_source_lines_over_500 = _quality_helpers._infer_source_lines_over_500
_infer_supply_chain_status = _quality_helpers._infer_supply_chain_status
_infer_workspace_coverage_percent = _quality_helpers._infer_workspace_coverage_percent
_load_optional_json = _quality_helpers._load_optional_json
_normalize_result_status = _quality_helpers._normalize_result_status
def _escape_label(value: str) -> str:
"""Escape a Prometheus label value without changing its content."""
@ -78,13 +90,13 @@ def _collect_junit_totals(pattern: str) -> dict[str, int]:
return totals
def _load_junit_cases(path: str) -> list[tuple[str, str]]:
"""Parse individual JUnit test case outcomes for flakiness panels."""
def _collect_junit_cases(pattern: str) -> list[tuple[str, str]]:
"""Collect individual JUnit test-case statuses for flaky-test trend panels."""
cases: list[tuple[str, str]] = []
for path in sorted(glob(pattern)):
if not os.path.exists(path):
return []
tree = ET.parse(path)
root = tree.getroot()
continue
root = ET.parse(path).getroot()
suites: list[ET.Element]
if root.tag == "testsuite":
suites = [root]
@ -92,31 +104,19 @@ def _load_junit_cases(path: str) -> list[tuple[str, str]]:
suites = [elem for elem in root if elem.tag == "testsuite"]
else:
suites = []
cases: list[tuple[str, str]] = []
for suite in suites:
for case in suite.findall("testcase"):
name = (case.attrib.get("name") or "").strip()
classname = (case.attrib.get("classname") or "").strip()
if not name:
for test_case in suite.findall("testcase"):
case_name = test_case.attrib.get("name", "").strip()
class_name = test_case.attrib.get("classname", "").strip()
if not case_name:
continue
test_id = f"{classname}::{name}" if classname else name
full_name = f"{class_name}.{case_name}" if class_name else case_name
status = "passed"
if case.find("failure") is not None:
if test_case.find("failure") is not None or test_case.find("error") is not None:
status = "failed"
elif case.find("error") is not None:
status = "error"
elif case.find("skipped") is not None:
elif test_case.find("skipped") is not None:
status = "skipped"
cases.append((test_id, status))
return cases
def _collect_junit_cases(pattern: str) -> list[tuple[str, str]]:
"""Collect test-case statuses across all matching JUnit XML files."""
cases: list[tuple[str, str]] = []
for path in sorted(glob(pattern)):
cases.extend(_load_junit_cases(path))
cases.append((full_name, status))
return cases
@ -186,6 +186,7 @@ def _build_payload(
summary: dict | None = None,
workspace_line_coverage_percent: float = 0.0,
source_lines_over_500: int = 0,
check_statuses: dict[str, str] | None = None,
) -> str:
"""Build the Pushgateway payload for the current suite run."""
passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0)
@ -214,22 +215,23 @@ def _build_payload(
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}',
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
"# TYPE platform_quality_gate_test_case_result gauge",
]
lines.extend(
f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1'
for test_name, test_status in test_cases
)
results = summary.get("results", []) if isinstance(summary, dict) else []
if results:
if check_statuses:
lines.append("# TYPE titan_iac_quality_gate_checks_total gauge")
for result in results:
check_name = result.get("name")
check_status = result.get("status")
if not check_name or not check_status:
continue
for check_name in CANONICAL_CHECKS:
check_status = check_statuses.get(check_name, "not_applicable")
lines.append(
f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(str(check_name))}",result="{_escape_label(str(check_status))}"}} 1'
f'titan_iac_quality_gate_checks_total{{suite="{suite}",check="{_escape_label(check_name)}",result="{_escape_label(check_status)}"}} 1'
)
lines.append("# TYPE platform_quality_gate_test_case_result gauge")
if test_cases:
for test_name, test_status in test_cases:
lines.append(
f'platform_quality_gate_test_case_result{{suite="{suite}",test="{_escape_label(test_name)}",status="{_escape_label(test_status)}"}} 1'
)
else:
lines.append(
f'platform_quality_gate_test_case_result{{suite="{suite}",test="__no_test_cases__",status="skipped"}} 1'
)
return "\n".join(lines) + "\n"
@ -251,7 +253,23 @@ def main() -> int:
status = "ok" if exit_code == 0 else "failed"
summary = _load_summary(summary_path)
workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent")
if workspace_line_coverage_percent <= 0:
workspace_line_coverage_percent = _infer_workspace_coverage_percent(summary, "build/coverage-unit.xml")
source_lines_over_500 = _summary_int(summary, "source_lines_over_500")
if source_lines_over_500 <= 0:
source_lines_over_500 = _infer_source_lines_over_500(summary)
sonarqube_report = _load_optional_json(os.getenv("QUALITY_GATE_SONARQUBE_REPORT", "build/sonarqube-quality-gate.json"))
supply_chain_report = _load_optional_json(os.getenv("QUALITY_GATE_IRONBANK_REPORT", "build/ironbank-compliance.json"))
supply_chain_required = os.getenv("QUALITY_GATE_IRONBANK_REQUIRED", "0").strip().lower() in {"1", "true", "yes", "on"}
check_statuses = _build_check_statuses(
summary=summary,
tests=tests,
workspace_line_coverage_percent=workspace_line_coverage_percent,
source_lines_over_500=source_lines_over_500,
sonarqube_report=sonarqube_report,
supply_chain_report=supply_chain_report,
supply_chain_required=supply_chain_required,
)
ok_count = int(
_fetch_existing_counter(
@ -284,6 +302,7 @@ def main() -> int:
summary=summary,
workspace_line_coverage_percent=workspace_line_coverage_percent,
source_lines_over_500=source_lines_over_500,
check_statuses=check_statuses,
)
push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}"
_post_text(push_url, payload)
@ -297,7 +316,7 @@ def main() -> int:
"tests_skipped": tests["skipped"],
"ok_count": ok_count,
"failed_count": failed_count,
"checks_recorded": len(summary.get("results", [])) if isinstance(summary, dict) else 0,
"checks_recorded": len(check_statuses),
"workspace_line_coverage_percent": workspace_line_coverage_percent,
"source_lines_over_500": source_lines_over_500,
}
@ -305,5 +324,5 @@ def main() -> int:
return 0
if __name__ == "__main__":
if __name__ == "__main__": # pragma: no cover
raise SystemExit(main())

View File

@ -0,0 +1,200 @@
#!/usr/bin/env python3
"""Quality/status helpers for publish_test_metrics."""
from __future__ import annotations
import json
from pathlib import Path
import xml.etree.ElementTree as ET
SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
CANONICAL_CHECKS = [
"tests",
"coverage",
"loc",
"docs_naming",
"gate_glue",
"sonarqube",
"supply_chain",
]
def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float:
"""Infer workspace line coverage from quality summary coverage XML metadata."""
results = summary.get("results", []) if isinstance(summary, dict) else []
coverage_xml = default_xml
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() != "coverage":
continue
candidate = str(result.get("coverage_xml") or "").strip()
if candidate:
coverage_xml = candidate
break
xml_path = Path(coverage_xml)
if not xml_path.exists():
return 0.0
try:
root = ET.parse(xml_path).getroot()
line_rate = root.attrib.get("line-rate")
if line_rate is None:
return 0.0
return float(line_rate) * 100.0
except (ET.ParseError, OSError, ValueError):
return 0.0
def _infer_source_lines_over_500(summary: dict) -> int:
"""Infer over-limit source file count from hygiene issue payloads."""
results = summary.get("results", []) if isinstance(summary, dict) else []
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}:
continue
issues = result.get("issues")
if not isinstance(issues, list):
continue
return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds"))
return 0
def _normalize_result_status(value: str | None, default: str = "failed") -> str:
"""Map arbitrary check status text into canonical check result buckets."""
if not value:
return default
normalized = value.strip().lower()
if normalized in SUCCESS_STATUSES:
return "ok"
if normalized in NOT_APPLICABLE_STATUSES:
return "not_applicable"
if normalized in FAILED_STATUSES:
return "failed"
return default
def _load_optional_json(path: str | None) -> dict:
"""Load an optional JSON report file, returning an empty object when absent."""
if not path:
return {}
candidate = Path(path)
if not candidate.exists():
return {}
try:
return json.loads(candidate.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def _combine_statuses(statuses: list[str]) -> str:
"""Roll up many check statuses into one canonical result."""
if not statuses:
return "not_applicable"
if any(status == "failed" for status in statuses):
return "failed"
if all(status == "not_applicable" for status in statuses):
return "not_applicable"
if all(status in {"ok", "not_applicable"} for status in statuses):
return "ok"
return "failed"
def _infer_sonarqube_status(report: dict) -> str:
"""Infer canonical SonarQube check status from its JSON report payload."""
if not report:
return "not_applicable"
status = (
report.get("projectStatus", {}).get("status")
or report.get("qualityGate", {}).get("status")
or report.get("status")
)
return _normalize_result_status(str(status) if status is not None else None, default="failed")
def _infer_supply_chain_status(report: dict, required: bool) -> str:
"""Infer canonical supply-chain status from IronBank/artifact report payload."""
if not report:
return "failed" if required else "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status = report.get("status")
if status is None:
return "failed" if required else "not_applicable"
normalized = _normalize_result_status(str(status), default="failed")
if normalized == "not_applicable" and required:
return "failed"
return normalized
def _build_check_statuses(
summary: dict | None,
tests: dict[str, int],
workspace_line_coverage_percent: float,
source_lines_over_500: int,
sonarqube_report: dict,
supply_chain_report: dict,
supply_chain_required: bool,
) -> dict[str, str]:
"""Generate the canonical quality-check status map for dashboarding."""
raw_results = summary.get("results", []) if isinstance(summary, dict) else []
status_by_name: dict[str, str] = {}
for result in raw_results:
if not isinstance(result, dict):
continue
check_name = str(result.get("name") or "").strip().lower()
if not check_name:
continue
status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
tests_status = status_by_name.get("tests")
if not tests_status:
candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
if candidates:
tests_status = _combine_statuses(candidates)
elif tests["tests"] > 0:
tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
else:
tests_status = "not_applicable"
coverage_status = status_by_name.get("coverage")
if not coverage_status:
if workspace_line_coverage_percent > 0:
coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
else:
coverage_status = "not_applicable"
loc_status = status_by_name.get("loc")
if not loc_status:
loc_status = "ok" if source_lines_over_500 == 0 else "failed"
docs_naming_status = status_by_name.get("docs_naming")
if not docs_naming_status:
candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
gate_glue_status = status_by_name.get("gate_glue")
if not gate_glue_status:
candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
supply_chain_report,
required=supply_chain_required,
)
return {
"tests": tests_status,
"coverage": coverage_status,
"loc": loc_status,
"docs_naming": docs_naming_status,
"gate_glue": gate_glue_status,
"sonarqube": sonarqube_status,
"supply_chain": supply_chain_status,
}

View File

@ -3,10 +3,15 @@
from __future__ import annotations
import argparse
import base64
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
@ -26,6 +31,189 @@ RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"]
RUFF_IGNORE = ["B017", "UP015", "UP035"]
def _env_flag(name: str, default: bool) -> bool:
"""Parse a boolean-like environment variable."""
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _load_json_report(path: Path) -> tuple[dict[str, Any] | None, str | None]:
"""Return parsed JSON report contents or a descriptive error."""
if not path.exists():
return None, f"report missing: {path}"
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
return None, f"report invalid JSON: {path} ({exc})"
if not isinstance(payload, dict):
return None, f"report payload must be an object: {path}"
return payload, None
def _sonarqube_gate_status_from_report(payload: dict[str, Any]) -> str:
"""Extract a SonarQube quality-gate status from a report payload."""
project_status = payload.get("projectStatus")
if isinstance(project_status, dict):
status = project_status.get("status")
if isinstance(status, str):
return status
status = payload.get("status")
if isinstance(status, str):
return status
return ""
def _fetch_sonarqube_gate_status(
host_url: str,
project_key: str,
token: str,
timeout_seconds: float,
) -> tuple[str, str | None]:
"""Query SonarQube for the project's current quality-gate status."""
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(
f"{host_url.rstrip('/')}/api/qualitygates/project_status?{query}",
method="GET",
)
if token:
encoded = base64.b64encode(f"{token}:".encode()).decode()
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
payload = json.loads(response.read().decode("utf-8"))
except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
return "", f"sonarqube query failed: {exc}"
if not isinstance(payload, dict):
return "", "sonarqube query returned non-object payload"
status = _sonarqube_gate_status_from_report(payload)
if status:
return status, None
return "", "sonarqube response missing projectStatus.status"
def _run_sonarqube_check(build_dir: Path) -> dict[str, Any]:
"""Enforce SonarQube quality gate using report or API evidence."""
enforce = _env_flag("QUALITY_GATE_SONARQUBE_ENFORCE", default=True)
report_rel = os.getenv(
"QUALITY_GATE_SONARQUBE_REPORT",
str(build_dir / "sonarqube-quality-gate.json"),
)
report_path = Path(report_rel)
if not report_path.is_absolute():
report_path = Path.cwd() / report_path
host_url = os.getenv("SONARQUBE_HOST_URL", "").strip()
project_key = os.getenv("SONARQUBE_PROJECT_KEY", "").strip()
token = os.getenv("SONARQUBE_TOKEN", "").strip()
timeout_seconds = float(os.getenv("QUALITY_GATE_SONARQUBE_TIMEOUT_SECONDS", "12"))
gate_status = ""
source = ""
issues: list[str] = []
report_payload, report_error = _load_json_report(report_path)
if report_payload is not None:
gate_status = _sonarqube_gate_status_from_report(report_payload).strip()
source = "report"
if not gate_status:
issues.append("sonarqube report missing quality gate status")
elif report_error:
if host_url and project_key:
gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds)
source = "api"
if query_error:
issues.append(query_error)
else:
issues.append(report_error)
if not source and host_url and project_key:
gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds)
source = "api"
if query_error:
issues.append(query_error)
normalized = gate_status.upper()
passed = normalized in {"OK", "PASS", "PASSED"}
if enforce and not passed:
if gate_status:
issues.append(f"sonarqube gate is {gate_status}, expected OK")
else:
issues.append("sonarqube gate status unavailable")
status = "ok" if (passed or not enforce) and not issues else "failed"
return _result(
"sonarqube",
"SonarQube quality gate must pass for the current project.",
status,
enforce=enforce,
source=source or "none",
gate_status=gate_status or "unknown",
report_path=str(report_path),
issues=issues,
)
def _ironbank_status_from_report(payload: dict[str, Any]) -> tuple[str, bool | None]:
"""Extract a compliance status and explicit compliance flag from report payload."""
for key in ("status", "result", "compliance", "compliance_status"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), None
compliant = payload.get("compliant")
if isinstance(compliant, bool):
return "compliant" if compliant else "noncompliant", compliant
return "", None
def _run_ironbank_check(build_dir: Path) -> dict[str, Any]:
"""Enforce Iron Bank image-hardening compliance from build evidence."""
enforce = _env_flag("QUALITY_GATE_IRONBANK_ENFORCE", default=True)
required = _env_flag("QUALITY_GATE_IRONBANK_REQUIRED", default=True)
report_rel = os.getenv(
"QUALITY_GATE_IRONBANK_REPORT",
str(build_dir / "ironbank-compliance.json"),
)
report_path = Path(report_rel)
if not report_path.is_absolute():
report_path = Path.cwd() / report_path
issues: list[str] = []
status_value = ""
compliant: bool | None = None
source = "none"
report_payload, report_error = _load_json_report(report_path)
if report_payload is not None:
status_value, compliant = _ironbank_status_from_report(report_payload)
source = "report"
elif required:
issues.append(report_error or f"report missing: {report_path}")
normalized = status_value.strip().lower()
passed_status = normalized in {"ok", "pass", "passed", "compliant", "true"}
passed = compliant is True or passed_status
if enforce and required and not passed:
if status_value:
issues.append(f"ironbank compliance is {status_value}, expected compliant")
elif not issues:
issues.append("ironbank compliance status unavailable")
status = "ok" if (passed or not enforce or not required) and not issues else "failed"
return _result(
"ironbank",
"Iron Bank image-hardening compliance must pass for build artifacts.",
status,
enforce=enforce,
required=required,
source=source,
compliance=status_value or "unknown",
report_path=str(report_path),
issues=issues,
)
def _status_from_issues(issues: list[str]) -> str:
"""Map an issue list to the gate status string."""
return "ok" if not issues else "failed"
@ -150,6 +338,12 @@ def run_profile(
)
)
continue
if check_name == "sonarqube":
results.append(_run_sonarqube_check(build_dir))
continue
if check_name == "ironbank":
results.append(_run_ironbank_check(build_dir))
continue
suite = contract.get("pytest_suites", {}).get(check_name)
if suite is None:
raise SystemExit(f"profile {profile_name} references unknown check: {check_name}")

View File

@ -1,4 +1,4 @@
"""Unit tests for the Pushgateway publisher glue code."""
"""Unit tests for core test-metrics parsing and quality signal helpers."""
from __future__ import annotations
@ -40,27 +40,6 @@ def test_collect_junit_totals_sums_multiple_files(tmp_path: Path):
assert totals == {"tests": 5, "failures": 1, "errors": 1, "skipped": 1}
def test_collect_junit_cases_tracks_individual_statuses(tmp_path: Path):
junit = tmp_path / "junit.xml"
junit.write_text(
(
"<testsuite>"
'<testcase classname="pkg.mod" name="test_ok" />'
'<testcase classname="pkg.mod" name="test_fail"><failure /></testcase>'
'<testcase classname="pkg.mod" name="test_error"><error /></testcase>'
'<testcase classname="pkg.mod" name="test_skip"><skipped /></testcase>'
"</testsuite>"
),
encoding="utf-8",
)
cases = publish_test_metrics._collect_junit_cases(str(tmp_path / "junit*.xml"))
assert ("pkg.mod::test_ok", "passed") in cases
assert ("pkg.mod::test_fail", "failed") in cases
assert ("pkg.mod::test_error", "error") in cases
assert ("pkg.mod::test_skip", "skipped") in cases
def test_parse_junit_handles_testsuites_and_invalid_counts(tmp_path: Path):
junit_path = tmp_path / "suite.xml"
junit_path.write_text(
@ -81,6 +60,20 @@ def test_parse_junit_handles_testsuites_and_invalid_counts(tmp_path: Path):
}
def test_parse_junit_handles_unknown_root(tmp_path: Path):
junit_path = tmp_path / "suite.xml"
junit_path.write_text("<root><item /></root>", encoding="utf-8")
assert publish_test_metrics._parse_junit(str(junit_path)) == {
"tests": 0,
"failures": 0,
"errors": 0,
"skipped": 0,
}
def test_read_exit_code_and_summary_fallbacks(tmp_path: Path):
rc_path = tmp_path / "rc.txt"
rc_path.write_text("0\n", encoding="utf-8")
@ -93,6 +86,118 @@ def test_read_exit_code_and_summary_fallbacks(tmp_path: Path):
assert publish_test_metrics._load_summary(str(tmp_path / "missing.json")) == {}
def test_summary_extractors_handle_invalid_shapes_and_values():
assert publish_test_metrics._summary_float({}, "workspace_line_coverage_percent") == 0.0
assert publish_test_metrics._summary_float({"workspace_line_coverage_percent": "bad"}, "workspace_line_coverage_percent") == 0.0
assert publish_test_metrics._summary_float({"workspace_line_coverage_percent": 95}, "workspace_line_coverage_percent") == 95.0
assert publish_test_metrics._summary_float({"workspace_line_coverage_percent": 97.5}, "workspace_line_coverage_percent") == 97.5
assert publish_test_metrics._summary_int({}, "source_lines_over_500") == 0
assert publish_test_metrics._summary_int({"source_lines_over_500": "bad"}, "source_lines_over_500") == 0
assert publish_test_metrics._summary_int({"source_lines_over_500": 2}, "source_lines_over_500") == 2
assert publish_test_metrics._summary_int({"source_lines_over_500": 2.9}, "source_lines_over_500") == 2
def test_build_check_statuses_maps_legacy_names_and_reports():
check_statuses = publish_test_metrics._build_check_statuses(
summary={
"results": [
{"name": "unit", "status": "ok"},
{"name": "coverage", "status": "failed"},
{"name": "hygiene", "status": "ok"},
{"name": "smell", "status": "ok"},
{"name": "docs", "status": "ok"},
{"name": "glue", "status": "failed"},
]
},
tests={"tests": 4, "failures": 0, "errors": 0, "skipped": 0},
workspace_line_coverage_percent=97.0,
source_lines_over_500=0,
sonarqube_report={"projectStatus": {"status": "OK"}},
supply_chain_report={"status": "compliant"},
supply_chain_required=True,
)
assert check_statuses == {
"tests": "ok",
"coverage": "failed",
"loc": "ok",
"docs_naming": "ok",
"gate_glue": "failed",
"sonarqube": "ok",
"supply_chain": "ok",
}
def test_build_check_statuses_handles_missing_reports():
check_statuses = publish_test_metrics._build_check_statuses(
summary={"results": []},
tests={"tests": 0, "failures": 0, "errors": 0, "skipped": 0},
workspace_line_coverage_percent=0.0,
source_lines_over_500=2,
sonarqube_report={},
supply_chain_report={},
supply_chain_required=False,
)
assert check_statuses["tests"] == "not_applicable"
assert check_statuses["coverage"] == "not_applicable"
assert check_statuses["loc"] == "failed"
assert check_statuses["docs_naming"] == "not_applicable"
assert check_statuses["gate_glue"] == "not_applicable"
assert check_statuses["sonarqube"] == "not_applicable"
assert check_statuses["supply_chain"] == "not_applicable"
def test_status_normalization_and_optional_reports(tmp_path: Path):
report_path = tmp_path / "report.json"
report_path.write_text("{bad json", encoding="utf-8")
assert publish_test_metrics._normalize_result_status(None, default="failed") == "failed"
assert publish_test_metrics._normalize_result_status("n/a") == "not_applicable"
assert publish_test_metrics._normalize_result_status("unexpected", default="ok") == "ok"
assert publish_test_metrics._load_optional_json(None) == {}
assert publish_test_metrics._load_optional_json(str(tmp_path / "missing.json")) == {}
assert publish_test_metrics._load_optional_json(str(report_path)) == {}
assert publish_test_metrics._combine_statuses([]) == "not_applicable"
assert publish_test_metrics._combine_statuses(["not_applicable", "not_applicable"]) == "not_applicable"
assert publish_test_metrics._combine_statuses(["unknown"]) == "failed"
assert publish_test_metrics._infer_supply_chain_status({"compliant": False}, required=True) == "failed"
assert publish_test_metrics._infer_supply_chain_status({"status": None}, required=False) == "not_applicable"
assert publish_test_metrics._infer_supply_chain_status({"status": "not_applicable"}, required=True) == "failed"
def test_build_check_statuses_handles_non_dict_results_and_fallbacks():
check_statuses = publish_test_metrics._build_check_statuses(
summary={
"results": [
None,
{"name": "", "status": "ok"},
{"name": "unit", "status": "warning"},
{"name": "hygiene", "status": "ok"},
{"name": "gate", "status": "ok"},
]
},
tests={"tests": 3, "failures": 1, "errors": 0, "skipped": 0},
workspace_line_coverage_percent=94.0,
source_lines_over_500=0,
sonarqube_report={"status": "ERROR"},
supply_chain_report={"status": "not_applicable"},
supply_chain_required=True,
)
assert check_statuses["tests"] == "failed"
assert check_statuses["coverage"] == "failed"
assert check_statuses["loc"] == "ok"
assert check_statuses["docs_naming"] == "ok"
assert check_statuses["gate_glue"] == "ok"
assert check_statuses["sonarqube"] == "failed"
assert check_statuses["supply_chain"] == "failed"
def test_read_text_post_text_and_fetch_existing_counter(monkeypatch):
class _FakeResponse:
def __init__(self, payload: str, status: int = 200):
@ -186,36 +291,64 @@ def test_post_text_raises_and_counter_handles_bad_metric_lines(monkeypatch):
== 0.0
)
monkeypatch.setattr(
publish_test_metrics,
"_read_text",
lambda url: "\n".join(
[
'different_metric{job="platform-quality-ci",suite="titan-iac",status="ok"} 8',
'platform_quality_gate_runs_total{job="platform-quality-ci",suite="other",status="ok"} 8',
'platform_quality_gate_runs_total{job="platform-quality-ci",suite="titan-iac",status="ok"} 9',
]
),
)
assert (
publish_test_metrics._fetch_existing_counter(
"http://push.invalid",
"platform_quality_gate_runs_total",
{"job": "platform-quality-ci", "suite": "titan-iac", "status": "ok"},
)
== 9.0
)
assert (
publish_test_metrics._fetch_existing_counter(
"http://push.invalid",
"platform_quality_gate_runs_total",
{"job": "platform-quality-ci", "suite": "missing-suite", "status": "ok"},
)
== 0.0
)
def test_build_payload_includes_summary_metrics():
def test_build_payload_includes_canonical_checks():
payload = publish_test_metrics._build_payload(
suite="titan-iac",
status="ok",
tests={"tests": 4, "failures": 1, "errors": 0, "skipped": 1},
test_cases=[("pkg.mod::test_ok", "passed"), ("pkg.mod::test_fail", "failed")],
test_cases=[],
ok_count=7,
failed_count=2,
branch="main",
build_number="42",
summary={
"results": [
{"name": "docs", "status": "ok"},
{"name": "unit", "status": "failed"},
]
workspace_line_coverage_percent=95.0,
source_lines_over_500=0,
check_statuses={
"tests": "failed",
"coverage": "ok",
"loc": "ok",
"docs_naming": "ok",
"gate_glue": "ok",
"sonarqube": "failed",
"supply_chain": "failed",
},
workspace_line_coverage_percent=97.125,
source_lines_over_500=3,
)
assert 'platform_quality_gate_runs_total{suite="titan-iac",status="ok"} 7' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs",result="ok"} 1' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="unit",result="failed"} 1' in payload
assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan-iac"} 97.125' in payload
assert 'platform_quality_gate_source_lines_over_500_total{suite="titan-iac"} 3' in payload
assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="pkg.mod::test_fail",status="failed"} 1' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs_naming",result="ok"} 1' in payload
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="tests",result="failed"} 1' in payload
def test_build_payload_skips_incomplete_results():
def test_build_payload_omits_checks_block_without_check_statuses():
payload = publish_test_metrics._build_payload(
suite="titan-iac",
status="failed",
@ -225,11 +358,13 @@ def test_build_payload_skips_incomplete_results():
failed_count=2,
branch="",
build_number="",
summary={"results": [{"name": "docs"}, {"status": "ok"}]},
workspace_line_coverage_percent=0.0,
source_lines_over_500=1,
)
assert "titan_iac_quality_gate_checks_total" in payload
assert 'check="docs"' not in payload
assert "titan_iac_quality_gate_checks_total" not in payload
def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypatch):
@ -245,13 +380,7 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat
)
(build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8")
(build_dir / "quality-gate-summary.json").write_text(
json.dumps(
{
"results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}],
"workspace_line_coverage_percent": 96.4321,
"source_lines_over_500": 2,
}
),
json.dumps({"results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}]}),
encoding="utf-8",
)
@ -274,9 +403,7 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat
assert rc == 0
assert posted["url"].endswith("/metrics/job/platform-quality-ci/suite/titan-iac")
assert 'titan_iac_quality_gate_tests_total{suite="titan-iac",result="failed"} 1' in posted["payload"]
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="glue",result="failed"} 1' in posted["payload"]
assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan-iac"} 96.432' in posted["payload"]
assert 'platform_quality_gate_source_lines_over_500_total{suite="titan-iac"} 2' in posted["payload"]
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="gate_glue",result="failed"} 1' in posted["payload"]
def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys):
@ -299,6 +426,4 @@ def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys):
summary = json.loads(capsys.readouterr().out)
assert rc == 0
assert summary["status"] == "ok"
assert summary["checks_recorded"] == 0
assert summary["workspace_line_coverage_percent"] == 0.0
assert summary["source_lines_over_500"] == 0
assert summary["checks_recorded"] == 7

View File

@ -0,0 +1,134 @@
"""Unit tests for test-case path parsing and fallback metric labeling behavior."""
from __future__ import annotations
import json
from pathlib import Path
from ci.scripts import publish_test_metrics
def test_collect_junit_cases_handles_missing_files_and_multiple_root_shapes(tmp_path: Path, monkeypatch):
missing = tmp_path / "missing.xml"
testsuites_path = tmp_path / "suite-cases.xml"
testsuites_path.write_text(
(
"<testsuites>"
'<testsuite tests="3">'
'<testcase classname="alpha" name="ok_case" />'
'<testcase classname="alpha" name="skip_case"><skipped /></testcase>'
'<testcase classname="alpha"><failure /></testcase>'
"</testsuite>"
"</testsuites>"
),
encoding="utf-8",
)
unknown_root_path = tmp_path / "unknown-root.xml"
unknown_root_path.write_text("<root><testcase classname='x' name='ignored' /></root>", encoding="utf-8")
monkeypatch.setattr(
publish_test_metrics,
"glob",
lambda _pattern: [str(missing), str(testsuites_path), str(unknown_root_path)],
)
cases = publish_test_metrics._collect_junit_cases("ignored-glob")
assert ("alpha.ok_case", "passed") in cases
assert ("alpha.skip_case", "skipped") in cases
assert not any(name.endswith("ignored") for name, _ in cases)
def test_build_payload_includes_explicit_test_case_series():
payload = publish_test_metrics._build_payload(
suite="titan-iac",
status="ok",
tests={"tests": 2, "failures": 1, "errors": 0, "skipped": 0},
test_cases=[("alpha::case_one", "failed"), ("beta::case_two", "passed")],
ok_count=3,
failed_count=1,
branch="main",
build_number="5",
workspace_line_coverage_percent=95.0,
source_lines_over_500=0,
check_statuses={"tests": "failed"},
)
assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="alpha::case_one",status="failed"} 1' in payload
assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="beta::case_two",status="passed"} 1' in payload
def test_main_uses_reported_coverage_and_loc_without_fallback(tmp_path: Path, monkeypatch, capsys):
build_dir = tmp_path / "build"
build_dir.mkdir()
(build_dir / "junit.xml").write_text(
'<testsuite tests="1" failures="0" errors="0" skipped="0" />',
encoding="utf-8",
)
(build_dir / "quality-gate.rc").write_text("0\n", encoding="utf-8")
(build_dir / "quality-gate-summary.json").write_text(
json.dumps(
{
"workspace_line_coverage_percent": 99.5,
"source_lines_over_500": 2,
"results": [],
}
),
encoding="utf-8",
)
monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml"))
monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))
monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json"))
monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0)
monkeypatch.setattr(publish_test_metrics, "_post_text", lambda *args, **kwargs: None)
monkeypatch.setattr(
publish_test_metrics,
"_infer_workspace_coverage_percent",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer coverage")),
)
monkeypatch.setattr(
publish_test_metrics,
"_infer_source_lines_over_500",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer loc")),
)
rc = publish_test_metrics.main()
summary = json.loads(capsys.readouterr().out)
assert rc == 0
assert summary["workspace_line_coverage_percent"] == 99.5
assert summary["source_lines_over_500"] == 2
def test_main_falls_back_to_inferred_coverage_and_loc(tmp_path: Path, monkeypatch, capsys):
build_dir = tmp_path / "build"
build_dir.mkdir()
(build_dir / "junit.xml").write_text(
'<testsuite tests="1" failures="0" errors="0" skipped="0" />',
encoding="utf-8",
)
(build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8")
(build_dir / "quality-gate-summary.json").write_text(
json.dumps({"workspace_line_coverage_percent": 0.0, "source_lines_over_500": 0, "results": []}),
encoding="utf-8",
)
posted = {}
monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml"))
monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))
monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json"))
monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0)
monkeypatch.setattr(publish_test_metrics, "_post_text", lambda url, payload: posted.update({"url": url, "payload": payload}))
monkeypatch.setattr(publish_test_metrics, "_infer_workspace_coverage_percent", lambda *args, **kwargs: 96.4)
monkeypatch.setattr(publish_test_metrics, "_infer_source_lines_over_500", lambda *args, **kwargs: 7)
rc = publish_test_metrics.main()
summary = json.loads(capsys.readouterr().out)
assert rc == 0
assert summary["status"] == "failed"
assert summary["workspace_line_coverage_percent"] == 96.4
assert summary["source_lines_over_500"] == 7
assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan_iac"} 96.400' in posted["payload"]
assert 'platform_quality_gate_source_lines_over_500_total{suite="titan_iac"} 7' in posted["payload"]