201 lines
7.2 KiB
Python
201 lines
7.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Quality/status helpers for publish_test_metrics."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
import xml.etree.ElementTree as ET
|
|
|
|
SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"}
|
|
NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"}
|
|
FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"}
|
|
|
|
CANONICAL_CHECKS = [
|
|
"tests",
|
|
"coverage",
|
|
"loc",
|
|
"docs_naming",
|
|
"gate_glue",
|
|
"sonarqube",
|
|
"supply_chain",
|
|
]
|
|
|
|
|
|
def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float:
|
|
"""Infer workspace line coverage from quality summary coverage XML metadata."""
|
|
results = summary.get("results", []) if isinstance(summary, dict) else []
|
|
coverage_xml = default_xml
|
|
for result in results:
|
|
if not isinstance(result, dict):
|
|
continue
|
|
if str(result.get("name") or "").strip().lower() != "coverage":
|
|
continue
|
|
candidate = str(result.get("coverage_xml") or "").strip()
|
|
if candidate:
|
|
coverage_xml = candidate
|
|
break
|
|
xml_path = Path(coverage_xml)
|
|
if not xml_path.exists():
|
|
return 0.0
|
|
try:
|
|
root = ET.parse(xml_path).getroot()
|
|
line_rate = root.attrib.get("line-rate")
|
|
if line_rate is None:
|
|
return 0.0
|
|
return float(line_rate) * 100.0
|
|
except (ET.ParseError, OSError, ValueError):
|
|
return 0.0
|
|
|
|
|
|
def _infer_source_lines_over_500(summary: dict) -> int:
|
|
"""Infer over-limit source file count from hygiene issue payloads."""
|
|
results = summary.get("results", []) if isinstance(summary, dict) else []
|
|
for result in results:
|
|
if not isinstance(result, dict):
|
|
continue
|
|
if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}:
|
|
continue
|
|
issues = result.get("issues")
|
|
if not isinstance(issues, list):
|
|
continue
|
|
return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds"))
|
|
return 0
|
|
|
|
|
|
def _normalize_result_status(value: str | None, default: str = "failed") -> str:
|
|
"""Map arbitrary check status text into canonical check result buckets."""
|
|
if not value:
|
|
return default
|
|
normalized = value.strip().lower()
|
|
if normalized in SUCCESS_STATUSES:
|
|
return "ok"
|
|
if normalized in NOT_APPLICABLE_STATUSES:
|
|
return "not_applicable"
|
|
if normalized in FAILED_STATUSES:
|
|
return "failed"
|
|
return default
|
|
|
|
|
|
def _load_optional_json(path: str | None) -> dict:
|
|
"""Load an optional JSON report file, returning an empty object when absent."""
|
|
if not path:
|
|
return {}
|
|
candidate = Path(path)
|
|
if not candidate.exists():
|
|
return {}
|
|
try:
|
|
return json.loads(candidate.read_text(encoding="utf-8"))
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
|
|
|
|
def _combine_statuses(statuses: list[str]) -> str:
|
|
"""Roll up many check statuses into one canonical result."""
|
|
if not statuses:
|
|
return "not_applicable"
|
|
if any(status == "failed" for status in statuses):
|
|
return "failed"
|
|
if all(status == "not_applicable" for status in statuses):
|
|
return "not_applicable"
|
|
if all(status in {"ok", "not_applicable"} for status in statuses):
|
|
return "ok"
|
|
return "failed"
|
|
|
|
|
|
def _infer_sonarqube_status(report: dict) -> str:
|
|
"""Infer canonical SonarQube check status from its JSON report payload."""
|
|
if not report:
|
|
return "not_applicable"
|
|
status = (
|
|
report.get("projectStatus", {}).get("status")
|
|
or report.get("qualityGate", {}).get("status")
|
|
or report.get("status")
|
|
)
|
|
return _normalize_result_status(str(status) if status is not None else None, default="failed")
|
|
|
|
|
|
def _infer_supply_chain_status(report: dict, required: bool) -> str:
|
|
"""Infer canonical supply-chain status from IronBank/artifact report payload."""
|
|
if not report:
|
|
return "failed" if required else "not_applicable"
|
|
compliant = report.get("compliant")
|
|
if isinstance(compliant, bool):
|
|
return "ok" if compliant else "failed"
|
|
status = report.get("status")
|
|
if status is None:
|
|
return "failed" if required else "not_applicable"
|
|
normalized = _normalize_result_status(str(status), default="failed")
|
|
if normalized == "not_applicable" and required:
|
|
return "failed"
|
|
return normalized
|
|
|
|
|
|
def _build_check_statuses(
|
|
summary: dict | None,
|
|
tests: dict[str, int],
|
|
workspace_line_coverage_percent: float,
|
|
source_lines_over_500: int,
|
|
sonarqube_report: dict,
|
|
supply_chain_report: dict,
|
|
supply_chain_required: bool,
|
|
) -> dict[str, str]:
|
|
"""Generate the canonical quality-check status map for dashboarding."""
|
|
raw_results = summary.get("results", []) if isinstance(summary, dict) else []
|
|
status_by_name: dict[str, str] = {}
|
|
for result in raw_results:
|
|
if not isinstance(result, dict):
|
|
continue
|
|
check_name = str(result.get("name") or "").strip().lower()
|
|
if not check_name:
|
|
continue
|
|
status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed")
|
|
|
|
tests_status = status_by_name.get("tests")
|
|
if not tests_status:
|
|
candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"]
|
|
candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name]
|
|
if candidates:
|
|
tests_status = _combine_statuses(candidates)
|
|
elif tests["tests"] > 0:
|
|
tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed"
|
|
else:
|
|
tests_status = "not_applicable"
|
|
|
|
coverage_status = status_by_name.get("coverage")
|
|
if not coverage_status:
|
|
if workspace_line_coverage_percent > 0:
|
|
coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed"
|
|
else:
|
|
coverage_status = "not_applicable"
|
|
|
|
loc_status = status_by_name.get("loc")
|
|
if not loc_status:
|
|
loc_status = "ok" if source_lines_over_500 == 0 else "failed"
|
|
|
|
docs_naming_status = status_by_name.get("docs_naming")
|
|
if not docs_naming_status:
|
|
candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name]
|
|
docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable"
|
|
|
|
gate_glue_status = status_by_name.get("gate_glue")
|
|
if not gate_glue_status:
|
|
candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name]
|
|
gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable"
|
|
|
|
sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report)
|
|
supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status(
|
|
supply_chain_report,
|
|
required=supply_chain_required,
|
|
)
|
|
|
|
return {
|
|
"tests": tests_status,
|
|
"coverage": coverage_status,
|
|
"loc": loc_status,
|
|
"docs_naming": docs_naming_status,
|
|
"gate_glue": gate_glue_status,
|
|
"sonarqube": sonarqube_status,
|
|
"supply_chain": supply_chain_status,
|
|
}
|