#!/usr/bin/env python3 """Quality/status helpers for publish_test_metrics.""" from __future__ import annotations import json from pathlib import Path import xml.etree.ElementTree as ET SUCCESS_STATUSES = {"ok", "pass", "passed", "success", "compliant"} NOT_APPLICABLE_STATUSES = {"not_applicable", "n/a", "na", "none", "skipped"} FAILED_STATUSES = {"failed", "fail", "error", "errors", "warn", "warning", "red"} CANONICAL_CHECKS = [ "tests", "coverage", "loc", "docs_naming", "gate_glue", "sonarqube", "supply_chain", ] def _infer_workspace_coverage_percent(summary: dict, default_xml: str) -> float: """Infer workspace line coverage from quality summary coverage XML metadata.""" results = summary.get("results", []) if isinstance(summary, dict) else [] coverage_xml = default_xml for result in results: if not isinstance(result, dict): continue if str(result.get("name") or "").strip().lower() != "coverage": continue candidate = str(result.get("coverage_xml") or "").strip() if candidate: coverage_xml = candidate break xml_path = Path(coverage_xml) if not xml_path.exists(): return 0.0 try: root = ET.parse(xml_path).getroot() line_rate = root.attrib.get("line-rate") if line_rate is None: return 0.0 return float(line_rate) * 100.0 except (ET.ParseError, OSError, ValueError): return 0.0 def _infer_source_lines_over_500(summary: dict) -> int: """Infer over-limit source file count from hygiene issue payloads.""" results = summary.get("results", []) if isinstance(summary, dict) else [] for result in results: if not isinstance(result, dict): continue if str(result.get("name") or "").strip().lower() not in {"hygiene", "loc", "smell"}: continue issues = result.get("issues") if not isinstance(issues, list): continue return sum(1 for item in issues if isinstance(item, str) and item.startswith("file exceeds")) return 0 def _normalize_result_status(value: str | None, default: str = "failed") -> str: """Map arbitrary check status text into canonical check result buckets.""" if not value: return default normalized = value.strip().lower() if normalized in SUCCESS_STATUSES: return "ok" if normalized in NOT_APPLICABLE_STATUSES: return "not_applicable" if normalized in FAILED_STATUSES: return "failed" return default def _load_optional_json(path: str | None) -> dict: """Load an optional JSON report file, returning an empty object when absent.""" if not path: return {} candidate = Path(path) if not candidate.exists(): return {} try: return json.loads(candidate.read_text(encoding="utf-8")) except json.JSONDecodeError: return {} def _combine_statuses(statuses: list[str]) -> str: """Roll up many check statuses into one canonical result.""" if not statuses: return "not_applicable" if any(status == "failed" for status in statuses): return "failed" if all(status == "not_applicable" for status in statuses): return "not_applicable" if all(status in {"ok", "not_applicable"} for status in statuses): return "ok" return "failed" def _infer_sonarqube_status(report: dict) -> str: """Infer canonical SonarQube check status from its JSON report payload.""" if not report: return "not_applicable" status = ( report.get("projectStatus", {}).get("status") or report.get("qualityGate", {}).get("status") or report.get("status") ) return _normalize_result_status(str(status) if status is not None else None, default="failed") def _infer_supply_chain_status(report: dict, required: bool) -> str: """Infer canonical supply-chain status from IronBank/artifact report payload.""" if not report: return "failed" if required else "not_applicable" compliant = report.get("compliant") if isinstance(compliant, bool): return "ok" if compliant else "failed" status = report.get("status") if status is None: return "failed" if required else "not_applicable" normalized = _normalize_result_status(str(status), default="failed") if normalized == "not_applicable" and required: return "failed" return normalized def _build_check_statuses( summary: dict | None, tests: dict[str, int], workspace_line_coverage_percent: float, source_lines_over_500: int, sonarqube_report: dict, supply_chain_report: dict, supply_chain_required: bool, ) -> dict[str, str]: """Generate the canonical quality-check status map for dashboarding.""" raw_results = summary.get("results", []) if isinstance(summary, dict) else [] status_by_name: dict[str, str] = {} for result in raw_results: if not isinstance(result, dict): continue check_name = str(result.get("name") or "").strip().lower() if not check_name: continue status_by_name[check_name] = _normalize_result_status(result.get("status"), default="failed") tests_status = status_by_name.get("tests") if not tests_status: candidate_keys = ["unit", "integration", "e2e", "pytest", "test", "tests"] candidates = [status_by_name[key] for key in candidate_keys if key in status_by_name] if candidates: tests_status = _combine_statuses(candidates) elif tests["tests"] > 0: tests_status = "ok" if (tests["failures"] + tests["errors"]) == 0 else "failed" else: tests_status = "not_applicable" coverage_status = status_by_name.get("coverage") if not coverage_status: if workspace_line_coverage_percent > 0: coverage_status = "ok" if workspace_line_coverage_percent >= 95.0 else "failed" else: coverage_status = "not_applicable" loc_status = status_by_name.get("loc") if not loc_status: loc_status = "ok" if source_lines_over_500 == 0 else "failed" docs_naming_status = status_by_name.get("docs_naming") if not docs_naming_status: candidates = [status_by_name[key] for key in ["docs", "hygiene", "smell", "lint", "naming"] if key in status_by_name] docs_naming_status = _combine_statuses(candidates) if candidates else "not_applicable" gate_glue_status = status_by_name.get("gate_glue") if not gate_glue_status: candidates = [status_by_name[key] for key in ["gate_glue", "glue", "gate"] if key in status_by_name] gate_glue_status = _combine_statuses(candidates) if candidates else "not_applicable" sonarqube_status = status_by_name.get("sonarqube") or _infer_sonarqube_status(sonarqube_report) supply_chain_status = status_by_name.get("supply_chain") or _infer_supply_chain_status( supply_chain_report, required=supply_chain_required, ) return { "tests": tests_status, "coverage": coverage_status, "loc": loc_status, "docs_naming": docs_naming_status, "gate_glue": gate_glue_status, "sonarqube": sonarqube_status, "supply_chain": supply_chain_status, }