titan-iac/ci/scripts/publish_test_metrics_quality.py

201 lines
7.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Quality/status helpers for publish_test_metrics."""
from __future__ import annotations
import json
from pathlib import Path
import xml.etree.ElementTree as ET
SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
CANONICAL_CHECKS = [
"tests",
"coverage",
"loc",
"docs_naming",
"gate_glue",
"sonarqube",
"supply_chain",
]
def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float:
"""Infer workspace line coverage from quality summary coverage XML metadata."""
results = summary.get("results", []) if isinstance(summary, dict) else []
coverage_xml = default_xml
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() != "coverage":
continue
candidate = str(result.get("coverage_xml") or "").strip()
if candidate:
coverage_xml = candidate
break
xml_path = Path(coverage_xml)
if not xml_path.exists():
return 0.0
try:
root = ET.parse(xml_path).getroot()
line_rate = root.attrib.get("line-rate")
if line_rate is None:
return 0.0
return float(line_rate) * 100.0
except (ET.ParseError, OSError, ValueError):
return 0.0
def _infer_source_lines_over_500(summary: dict) -> int:
"""Infer over-limit source file count from hygiene issue payloads."""
results = summary.get("results", []) if isinstance(summary, dict) else []
for result in results:
if not isinstance(result, dict):
continue
if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}:
continue
issues = result.get("issues")
if not isinstance(issues, list):
continue
return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds"))
return 0
def _normalize_result_status(value: str | None, default: str = "failed") -> str:
"""Map arbitrary check status text into canonical check result buckets."""
if not value:
return default
normalized = value.strip().lower()
if normalized in SUCCESS_STATUSES:
return "ok"
if normalized in NOT_APPLICABLE_STATUSES:
return "not_applicable"
if normalized in FAILED_STATUSES:
return "failed"
return default
def _load_optional_json(path: str | None) -> dict:
"""Load an optional JSON report file, returning an empty object when absent."""
if not path:
return {}
candidate = Path(path)
if not candidate.exists():
return {}
try:
return json.loads(candidate.read_text(encoding="utf-8"))
except json.JSONDecodeError:
return {}
def _combine_statuses(statuses: list[str]) -> str:
"""Roll up many check statuses into one canonical result."""
if not statuses:
return "not_applicable"
if any(status == "failed" for status in statuses):
return "failed"
if all(status == "not_applicable" for status in statuses):
return "not_applicable"
if all(status in {"ok", "not_applicable"} for status in statuses):
return "ok"
return "failed"
def _infer_sonarqube_status(report: dict) -> str:
"""Infer canonical SonarQube check status from its JSON report payload."""
if not report:
return "not_applicable"
status = (
report.get("projectStatus", {}).get("status")
or report.get("qualityGate", {}).get("status")
or report.get("status")
)
return _normalize_result_status(str(status) if status is not None else None, default="failed")
def _infer_supply_chain_status(report: dict, required: bool) -> str:
"""Infer canonical supply-chain status from IronBank/artifact report payload."""
if not report:
return "failed" if required else "not_applicable"
compliant = report.get("compliant")
if isinstance(compliant, bool):
return "ok" if compliant else "failed"
status = report.get("status")
if status is None:
return "failed" if required else "not_applicable"
normalized = _normalize_result_status(str(status), default="failed")
if normalized == "not_applicable" and required:
return "failed"
return normalized
def _build_check_statuses(
summary: dict | None,
tests: dict[str, int],
workspace_line_coverage_percent: float,
source_lines_over_500: int,
sonarqube_report: dict,
supply_chain_report: dict,
supply_chain_required: bool,
) -> dict[str, str]:
"""Generate the canonical quality-check status map for dashboarding."""
raw_results = summary.get("results", []) if isinstance(summary, dict) else []
status_by_name: dict[str, str] = {}
for result in raw_results:
if not isinstance(result, dict):
continue
check_name = str(result.get("name") or "").strip().lower()
if not check_name:
continue
status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
tests_status = status_by_name.get("tests")
if not tests_status:
candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
if candidates:
tests_status = _combine_statuses(candidates)
elif tests["tests"] > 0:
tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
else:
tests_status = "not_applicable"
coverage_status = status_by_name.get("coverage")
if not coverage_status:
if workspace_line_coverage_percent > 0:
coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
else:
coverage_status = "not_applicable"
loc_status = status_by_name.get("loc")
if not loc_status:
loc_status = "ok" if source_lines_over_500 == 0 else "failed"
docs_naming_status = status_by_name.get("docs_naming")
if not docs_naming_status:
candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
gate_glue_status = status_by_name.get("gate_glue")
if not gate_glue_status:
candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
supply_chain_report,
required=supply_chain_required,
)
return {
"tests": tests_status,
"coverage": coverage_status,
"loc": loc_status,
"docs_naming": docs_naming_status,
"gate_glue": gate_glue_status,
"sonarqube": sonarqube_status,
"supply_chain": supply_chain_status,
}