quality(titan-iac): split metrics publisher and harden gate lint

This commit is contained in:
jenkins 2026-04-20 15:20:56 -03:00
parent 607d8c21fa
commit 655c26c589
5 changed files with 544 additions and 196 deletions

View File

@ -6,24 +6,21 @@ from __future__ import annotations
import json
import os
from glob import glob
from pathlib import Path
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET
SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
from ci.scripts import publish_test_metrics_quality as _quality_helpers
CANONICAL_CHECKS = [
"tests",
"coverage",
"loc",
"docs_naming",
"gate_glue",
"sonarqube",
"supply_chain",
]
CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS
_build_check_statuses = _quality_helpers._build_check_statuses
_combine_statuses = _quality_helpers._combine_statuses
_infer_sonarqube_status = _quality_helpers._infer_sonarqube_status
_infer_source_lines_over_500 = _quality_helpers._infer_source_lines_over_500
_infer_supply_chain_status = _quality_helpers._infer_supply_chain_status
_infer_workspace_coverage_percent = _quality_helpers._infer_workspace_coverage_percent
_load_optional_json = _quality_helpers._load_optional_json
_normalize_result_status = _quality_helpers._normalize_result_status
def _escape_label(value: str) -> str:
@ -159,189 +156,6 @@ def _summary_int(summary: dict, key: str) -> int:
return 0
def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float:
"""Infer workspace line coverage from quality summary coverage XML metadata."""
results = summary.get("results", []) if isinstance(summary, dict) else []
coverage_xml = default_xml
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() != "coverage":
continue
candidate = str(result.get("coverage_xml") or "").strip()
if candidate:
coverage_xml = candidate
break
xml_path = Path(coverage_xml)
if not xml_path.exists():
return 0.0
try:
root = ET.parse(xml_path).getroot()
line_rate = root.attrib.get("line-rate")
if line_rate is None:
return 0.0
return float(line_rate) * 100.0
except (ET.ParseError, OSError, ValueError):
return 0.0
def _infer_source_lines_over_500(summary: dict) -> int:
"""Infer over-limit source file count from hygiene issue payloads."""
results = summary.get("results", []) if isinstance(summary, dict) else []
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}:
continue
issues = result.get("issues")
if not isinstance(issues, list):
continue
return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds"))
return 0
def _normalize_result_status(value: str | None, default: str = "failed") -> str:
"""Map arbitrary check status text into canonical check result buckets."""
if not value:
return default
normalized = value.strip().lower()
if normalized in SUCCESS_STATUSES:
return "ok"
if normalized in NOT_APPLICABLE_STATUSES:
return "not_applicable"
if normalized in FAILED_STATUSES:
return "failed"
return default
def _load_optional_json(path: str | None) -> dict:
"""Load an optional JSON report file, returning an empty object when absent."""
if not path:
return {}
candidate = Path(path)
if not candidate.exists():
return {}
try:
return json.loads(candidate.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def _combine_statuses(statuses: list[str]) -> str:
"""Roll up many check statuses into one canonical result."""
if not statuses:
return "not_applicable"
if any(status == "failed" for status in statuses):
return "failed"
if all(status == "not_applicable" for status in statuses):
return "not_applicable"
if all(status in {"ok", "not_applicable"} for status in statuses):
return "ok"
return "failed"
def _infer_sonarqube_status(report: dict) -> str:
"""Infer canonical SonarQube check status from its JSON report payload."""
if not report:
return "not_applicable"
status = (
report.get("projectStatus", {}).get("status")
or report.get("qualityGate", {}).get("status")
or report.get("status")
)
return _normalize_result_status(str(status) if status is not None else None, default="failed")
def _infer_supply_chain_status(report: dict, required: bool) -> str:
"""Infer canonical supply-chain status from IronBank/artifact report payload."""
if not report:
return "failed" if required else "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status = report.get("status")
if status is None:
return "failed" if required else "not_applicable"
normalized = _normalize_result_status(str(status), default="failed")
if normalized == "not_applicable" and required:
return "failed"
return normalized
def _build_check_statuses(
summary: dict | None,
tests: dict[str, int],
workspace_line_coverage_percent: float,
source_lines_over_500: int,
sonarqube_report: dict,
supply_chain_report: dict,
supply_chain_required: bool,
) -> dict[str, str]:
"""Generate the canonical quality-check status map for dashboarding."""
raw_results = summary.get("results", []) if isinstance(summary, dict) else []
status_by_name: dict[str, str] = {}
for result in raw_results:
if not isinstance(result, dict):
continue
check_name = str(result.get("name") or "").strip().lower()
if not check_name:
continue
status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
# tests
tests_status = status_by_name.get("tests")
if not tests_status:
candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
if candidates:
tests_status = _combine_statuses(candidates)
elif tests["tests"] > 0:
tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
else:
tests_status = "not_applicable"
# coverage
coverage_status = status_by_name.get("coverage")
if not coverage_status:
if workspace_line_coverage_percent > 0:
coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
else:
coverage_status = "not_applicable"
# loc
loc_status = status_by_name.get("loc")
if not loc_status:
loc_status = "ok" if source_lines_over_500 == 0 else "failed"
# docs + naming + lint hygiene
docs_naming_status = status_by_name.get("docs_naming")
if not docs_naming_status:
candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
# gate glue
gate_glue_status = status_by_name.get("gate_glue")
if not gate_glue_status:
candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
supply_chain_report,
required=supply_chain_required,
)
return {
"tests": tests_status,
"coverage": coverage_status,
"loc": loc_status,
"docs_naming": docs_naming_status,
"gate_glue": gate_glue_status,
"sonarqube": sonarqube_status,
"supply_chain": supply_chain_status,
}
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
"""Return the current counter value for a labeled metric if present."""
text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics")

View File

@ -0,0 +1,200 @@
#!/usr/bin/env python3
"""Quality/status helpers for publish_test_metrics."""
from __future__ import annotations
import json
from pathlib import Path
import xml.etree.ElementTree as ET
SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
CANONICAL_CHECKS = [
"tests",
"coverage",
"loc",
"docs_naming",
"gate_glue",
"sonarqube",
"supply_chain",
]
def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float:
"""Infer workspace line coverage from quality summary coverage XML metadata."""
results = summary.get("results", []) if isinstance(summary, dict) else []
coverage_xml = default_xml
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() != "coverage":
continue
candidate = str(result.get("coverage_xml") or "").strip()
if candidate:
coverage_xml = candidate
break
xml_path = Path(coverage_xml)
if not xml_path.exists():
return 0.0
try:
root = ET.parse(xml_path).getroot()
line_rate = root.attrib.get("line-rate")
if line_rate is None:
return 0.0
return float(line_rate) * 100.0
except (ET.ParseError, OSError, ValueError):
return 0.0
def _infer_source_lines_over_500(summary: dict) -> int:
"""Infer over-limit source file count from hygiene issue payloads."""
results = summary.get("results", []) if isinstance(summary, dict) else []
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}:
continue
issues = result.get("issues")
if not isinstance(issues, list):
continue
return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds"))
return 0
def _normalize_result_status(value: str | None, default: str = "failed") -> str:
"""Map arbitrary check status text into canonical check result buckets."""
if not value:
return default
normalized = value.strip().lower()
if normalized in SUCCESS_STATUSES:
return "ok"
if normalized in NOT_APPLICABLE_STATUSES:
return "not_applicable"
if normalized in FAILED_STATUSES:
return "failed"
return default
def _load_optional_json(path: str | None) -> dict:
"""Load an optional JSON report file, returning an empty object when absent."""
if not path:
return {}
candidate = Path(path)
if not candidate.exists():
return {}
try:
return json.loads(candidate.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def _combine_statuses(statuses: list[str]) -> str:
"""Roll up many check statuses into one canonical result."""
if not statuses:
return "not_applicable"
if any(status == "failed" for status in statuses):
return "failed"
if all(status == "not_applicable" for status in statuses):
return "not_applicable"
if all(status in {"ok", "not_applicable"} for status in statuses):
return "ok"
return "failed"
def _infer_sonarqube_status(report: dict) -> str:
"""Infer canonical SonarQube check status from its JSON report payload."""
if not report:
return "not_applicable"
status = (
report.get("projectStatus", {}).get("status")
or report.get("qualityGate", {}).get("status")
or report.get("status")
)
return _normalize_result_status(str(status) if status is not None else None, default="failed")
def _infer_supply_chain_status(report: dict, required: bool) -> str:
"""Infer canonical supply-chain status from IronBank/artifact report payload."""
if not report:
return "failed" if required else "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status = report.get("status")
if status is None:
return "failed" if required else "not_applicable"
normalized = _normalize_result_status(str(status), default="failed")
if normalized == "not_applicable" and required:
return "failed"
return normalized
def _build_check_statuses(
summary: dict | None,
tests: dict[str, int],
workspace_line_coverage_percent: float,
source_lines_over_500: int,
sonarqube_report: dict,
supply_chain_report: dict,
supply_chain_required: bool,
) -> dict[str, str]:
"""Generate the canonical quality-check status map for dashboarding."""
raw_results = summary.get("results", []) if isinstance(summary, dict) else []
status_by_name: dict[str, str] = {}
for result in raw_results:
if not isinstance(result, dict):
continue
check_name = str(result.get("name") or "").strip().lower()
if not check_name:
continue
status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
tests_status = status_by_name.get("tests")
if not tests_status:
candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
if candidates:
tests_status = _combine_statuses(candidates)
elif tests["tests"] > 0:
tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
else:
tests_status = "not_applicable"
coverage_status = status_by_name.get("coverage")
if not coverage_status:
if workspace_line_coverage_percent > 0:
coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
else:
coverage_status = "not_applicable"
loc_status = status_by_name.get("loc")
if not loc_status:
loc_status = "ok" if source_lines_over_500 == 0 else "failed"
docs_naming_status = status_by_name.get("docs_naming")
if not docs_naming_status:
candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
gate_glue_status = status_by_name.get("gate_glue")
if not gate_glue_status:
candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
supply_chain_report,
required=supply_chain_required,
)
return {
"tests": tests_status,
"coverage": coverage_status,
"loc": loc_status,
"docs_naming": docs_naming_status,
"gate_glue": gate_glue_status,
"sonarqube": sonarqube_status,
"supply_chain": supply_chain_status,
}

View File

@ -3,10 +3,15 @@
from __future__ import annotations
import argparse
import base64
import json
import os
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
@ -20,6 +25,189 @@ RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"]
RUFF_IGNORE = ["B017", "UP015", "UP035"]
def _env_flag(name: str, default: bool) -> bool:
"""Parse a boolean-like environment variable."""
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _load_json_report(path: Path) -> tuple[dict[str, Any] | None, str | None]:
"""Return parsed JSON report contents or a descriptive error."""
if not path.exists():
return None, f"report missing: {path}"
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
return None, f"report invalid JSON: {path} ({exc})"
if not isinstance(payload, dict):
return None, f"report payload must be an object: {path}"
return payload, None
def _sonarqube_gate_status_from_report(payload: dict[str, Any]) -> str:
"""Extract a SonarQube quality-gate status from a report payload."""
project_status = payload.get("projectStatus")
if isinstance(project_status, dict):
status = project_status.get("status")
if isinstance(status, str):
return status
status = payload.get("status")
if isinstance(status, str):
return status
return ""
def _fetch_sonarqube_gate_status(
host_url: str,
project_key: str,
token: str,
timeout_seconds: float,
) -> tuple[str, str | None]:
"""Query SonarQube for the project's current quality-gate status."""
query = urllib.parse.urlencode({"projectKey": project_key})
request = urllib.request.Request(
f"{host_url.rstrip('/')}/api/qualitygates/project_status?{query}",
method="GET",
)
if token:
encoded = base64.b64encode(f"{token}:".encode()).decode()
request.add_header("Authorization", f"Basic {encoded}")
try:
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
payload = json.loads(response.read().decode("utf-8"))
except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
return "", f"sonarqube query failed: {exc}"
if not isinstance(payload, dict):
return "", "sonarqube query returned non-object payload"
status = _sonarqube_gate_status_from_report(payload)
if status:
return status, None
return "", "sonarqube response missing projectStatus.status"
def _run_sonarqube_check(build_dir: Path) -> dict[str, Any]:
"""Enforce SonarQube quality gate using report or API evidence."""
enforce = _env_flag("QUALITY_GATE_SONARQUBE_ENFORCE", default=True)
report_rel = os.getenv(
"QUALITY_GATE_SONARQUBE_REPORT",
str(build_dir / "sonarqube-quality-gate.json"),
)
report_path = Path(report_rel)
if not report_path.is_absolute():
report_path = Path.cwd() / report_path
host_url = os.getenv("SONARQUBE_HOST_URL", "").strip()
project_key = os.getenv("SONARQUBE_PROJECT_KEY", "").strip()
token = os.getenv("SONARQUBE_TOKEN", "").strip()
timeout_seconds = float(os.getenv("QUALITY_GATE_SONARQUBE_TIMEOUT_SECONDS", "12"))
gate_status = ""
source = ""
issues: list[str] = []
report_payload, report_error = _load_json_report(report_path)
if report_payload is not None:
gate_status = _sonarqube_gate_status_from_report(report_payload).strip()
source = "report"
if not gate_status:
issues.append("sonarqube report missing quality gate status")
elif report_error:
if host_url and project_key:
gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds)
source = "api"
if query_error:
issues.append(query_error)
else:
issues.append(report_error)
if not source and host_url and project_key:
gate_status, query_error = _fetch_sonarqube_gate_status(host_url, project_key, token, timeout_seconds)
source = "api"
if query_error:
issues.append(query_error)
normalized = gate_status.upper()
passed = normalized in {"OK", "PASS", "PASSED"}
if enforce and not passed:
if gate_status:
issues.append(f"sonarqube gate is {gate_status}, expected OK")
else:
issues.append("sonarqube gate status unavailable")
status = "ok" if (passed or not enforce) and not issues else "failed"
return _result(
"sonarqube",
"SonarQube quality gate must pass for the current project.",
status,
enforce=enforce,
source=source or "none",
gate_status=gate_status or "unknown",
report_path=str(report_path),
issues=issues,
)
def _ironbank_status_from_report(payload: dict[str, Any]) -> tuple[str, bool | None]:
"""Extract a compliance status and explicit compliance flag from report payload."""
for key in ("status", "result", "compliance", "compliance_status"):
value = payload.get(key)
if isinstance(value, str) and value.strip():
return value.strip(), None
compliant = payload.get("compliant")
if isinstance(compliant, bool):
return "compliant" if compliant else "noncompliant", compliant
return "", None
def _run_ironbank_check(build_dir: Path) -> dict[str, Any]:
"""Enforce Iron Bank image-hardening compliance from build evidence."""
enforce = _env_flag("QUALITY_GATE_IRONBANK_ENFORCE", default=True)
required = _env_flag("QUALITY_GATE_IRONBANK_REQUIRED", default=True)
report_rel = os.getenv(
"QUALITY_GATE_IRONBANK_REPORT",
str(build_dir / "ironbank-compliance.json"),
)
report_path = Path(report_rel)
if not report_path.is_absolute():
report_path = Path.cwd() / report_path
issues: list[str] = []
status_value = ""
compliant: bool | None = None
source = "none"
report_payload, report_error = _load_json_report(report_path)
if report_payload is not None:
status_value, compliant = _ironbank_status_from_report(report_payload)
source = "report"
elif required:
issues.append(report_error or f"report missing: {report_path}")
normalized = status_value.strip().lower()
passed_status = normalized in {"ok", "pass", "passed", "compliant", "true"}
passed = compliant is True or passed_status
if enforce and required and not passed:
if status_value:
issues.append(f"ironbank compliance is {status_value}, expected compliant")
elif not issues:
issues.append("ironbank compliance status unavailable")
status = "ok" if (passed or not enforce or not required) and not issues else "failed"
return _result(
"ironbank",
"Iron Bank image-hardening compliance must pass for build artifacts.",
status,
enforce=enforce,
required=required,
source=source,
compliance=status_value or "unknown",
report_path=str(report_path),
issues=issues,
)
def _status_from_issues(issues: list[str]) -> str:
return "ok" if not issues else "failed"
@ -140,6 +328,12 @@ def run_profile(
)
)
continue
if check_name == "sonarqube":
results.append(_run_sonarqube_check(build_dir))
continue
if check_name == "ironbank":
results.append(_run_ironbank_check(build_dir))
continue
suite = contract.get("pytest_suites", {}).get(check_name)
if suite is None:
raise SystemExit(f"profile {profile_name} references unknown check: {check_name}")

View File

@ -1,3 +1,5 @@
"""Unit tests for core test-metrics parsing and quality signal helpers."""
from __future__ import annotations
import json
@ -70,6 +72,8 @@ def test_parse_junit_handles_unknown_root(tmp_path: Path):
}
def test_read_exit_code_and_summary_fallbacks(tmp_path: Path):
rc_path = tmp_path / "rc.txt"
rc_path.write_text("0\n", encoding="utf-8")
@ -361,6 +365,8 @@ def test_build_payload_omits_checks_block_without_check_statuses():
assert "titan_iac_quality_gate_checks_total" not in payload
def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypatch):
build_dir = tmp_path / "build"
build_dir.mkdir()

View File

@ -0,0 +1,134 @@
"""Unit tests for test-case path parsing and fallback metric labeling behavior."""
from __future__ import annotations
import json
from pathlib import Path
from ci.scripts import publish_test_metrics
def test_collect_junit_cases_handles_missing_files_and_multiple_root_shapes(tmp_path: Path, monkeypatch):
missing = tmp_path / "missing.xml"
testsuites_path = tmp_path / "suite-cases.xml"
testsuites_path.write_text(
(
"<testsuites>"
'<testsuite tests="3">'
'<testcase classname="alpha" name="ok_case" />'
'<testcase classname="alpha" name="skip_case"><skipped /></testcase>'
'<testcase classname="alpha"><failure /></testcase>'
"</testsuite>"
"</testsuites>"
),
encoding="utf-8",
)
unknown_root_path = tmp_path / "unknown-root.xml"
unknown_root_path.write_text("<root><testcase classname='x' name='ignored' /></root>", encoding="utf-8")
monkeypatch.setattr(
publish_test_metrics,
"glob",
lambda _pattern: [str(missing), str(testsuites_path), str(unknown_root_path)],
)
cases = publish_test_metrics._collect_junit_cases("ignored-glob")
assert ("alpha.ok_case", "passed") in cases
assert ("alpha.skip_case", "skipped") in cases
assert not any(name.endswith("ignored") for name, _ in cases)
def test_build_payload_includes_explicit_test_case_series():
payload = publish_test_metrics._build_payload(
suite="titan-iac",
status="ok",
tests={"tests": 2, "failures": 1, "errors": 0, "skipped": 0},
test_cases=[("alpha::case_one", "failed"), ("beta::case_two", "passed")],
ok_count=3,
failed_count=1,
branch="main",
build_number="5",
workspace_line_coverage_percent=95.0,
source_lines_over_500=0,
check_statuses={"tests": "failed"},
)
assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="alpha::case_one",status="failed"} 1' in payload
assert 'platform_quality_gate_test_case_result{suite="titan-iac",test="beta::case_two",status="passed"} 1' in payload
def test_main_uses_reported_coverage_and_loc_without_fallback(tmp_path: Path, monkeypatch, capsys):
build_dir = tmp_path / "build"
build_dir.mkdir()
(build_dir / "junit.xml").write_text(
'<testsuite tests="1" failures="0" errors="0" skipped="0" />',
encoding="utf-8",
)
(build_dir / "quality-gate.rc").write_text("0\n", encoding="utf-8")
(build_dir / "quality-gate-summary.json").write_text(
json.dumps(
{
"workspace_line_coverage_percent": 99.5,
"source_lines_over_500": 2,
"results": [],
}
),
encoding="utf-8",
)
monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml"))
monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))
monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json"))
monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0)
monkeypatch.setattr(publish_test_metrics, "_post_text", lambda *args, **kwargs: None)
monkeypatch.setattr(
publish_test_metrics,
"_infer_workspace_coverage_percent",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer coverage")),
)
monkeypatch.setattr(
publish_test_metrics,
"_infer_source_lines_over_500",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("should not infer loc")),
)
rc = publish_test_metrics.main()
summary = json.loads(capsys.readouterr().out)
assert rc == 0
assert summary["workspace_line_coverage_percent"] == 99.5
assert summary["source_lines_over_500"] == 2
def test_main_falls_back_to_inferred_coverage_and_loc(tmp_path: Path, monkeypatch, capsys):
build_dir = tmp_path / "build"
build_dir.mkdir()
(build_dir / "junit.xml").write_text(
'<testsuite tests="1" failures="0" errors="0" skipped="0" />',
encoding="utf-8",
)
(build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8")
(build_dir / "quality-gate-summary.json").write_text(
json.dumps({"workspace_line_coverage_percent": 0.0, "source_lines_over_500": 0, "results": []}),
encoding="utf-8",
)
posted = {}
monkeypatch.setenv("JUNIT_GLOB", str(build_dir / "*.xml"))
monkeypatch.setenv("QUALITY_GATE_EXIT_CODE_PATH", str(build_dir / "quality-gate.rc"))
monkeypatch.setenv("QUALITY_GATE_SUMMARY_PATH", str(build_dir / "quality-gate-summary.json"))
monkeypatch.setattr(publish_test_metrics, "_fetch_existing_counter", lambda *args, **kwargs: 0)
monkeypatch.setattr(publish_test_metrics, "_post_text", lambda url, payload: posted.update({"url": url, "payload": payload}))
monkeypatch.setattr(publish_test_metrics, "_infer_workspace_coverage_percent", lambda *args, **kwargs: 96.4)
monkeypatch.setattr(publish_test_metrics, "_infer_source_lines_over_500", lambda *args, **kwargs: 7)
rc = publish_test_metrics.main()
summary = json.loads(capsys.readouterr().out)
assert rc == 0
assert summary["status"] == "failed"
assert summary["workspace_line_coverage_percent"] == 96.4
assert summary["source_lines_over_500"] == 7
assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan_iac"} 96.400' in posted["payload"]
assert 'platform_quality_gate_source_lines_over_500_total{suite="titan_iac"} 7' in posted["payload"]