diff --git a/ci/scripts/publish_test_metrics.py b/ci/scripts/publish_test_metrics.py index c31f4add..b4ceea0a 100644 --- a/ci/scripts/publish_test_metrics.py +++ b/ci/scripts/publish_test_metrics.py @@ -12,20 +12,24 @@ import xml.etree.ElementTree as ET def _escape_label(value: str) -> str: + """Escape a Prometheus label value without changing its content.""" return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"') def _label_str(labels: dict[str, str]) -> str: + """Render a stable Prometheus label set from a mapping.""" parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val] return "{" + ",".join(parts) + "}" if parts else "" def _read_text(url: str) -> str: + """Fetch a plain-text response body from the given URL.""" with urllib.request.urlopen(url, timeout=10) as response: return response.read().decode("utf-8") def _post_text(url: str, payload: str) -> None: + """POST a plain-text payload and fail on any 4xx/5xx response.""" request = urllib.request.Request( url, data=payload.encode("utf-8"), @@ -38,6 +42,7 @@ def _post_text(url: str, payload: str) -> None: def _parse_junit(path: str) -> dict[str, int]: + """Parse a JUnit XML file into aggregate test counters.""" if not os.path.exists(path): return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} @@ -64,6 +69,7 @@ def _parse_junit(path: str) -> dict[str, int]: def _collect_junit_totals(pattern: str) -> dict[str, int]: + """Sum JUnit counters across every XML file matching the pattern.""" totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0} for path in sorted(glob(pattern)): parsed = _parse_junit(path) @@ -73,6 +79,7 @@ def _collect_junit_totals(pattern: str) -> dict[str, int]: def _read_exit_code(path: str) -> int: + """Read the quality-gate exit code, defaulting to failure if missing.""" try: with open(path, "r", encoding="utf-8") as handle: return int(handle.read().strip()) @@ -81,6 +88,7 @@ def _read_exit_code(path: str) -> int: def _load_summary(path: str) -> dict: + """Load the JSON quality-gate summary, returning an empty mapping on error.""" try: with open(path, "r", encoding="utf-8") as handle: return json.load(handle) @@ -88,7 +96,26 @@ def _load_summary(path: str) -> dict: return {} +def _summary_float(summary: dict, key: str) -> float: + """Extract a float-like value from the summary, defaulting to 0.0.""" + value = summary.get(key) + if isinstance(value, (int, float)): + return float(value) + return 0.0 + + +def _summary_int(summary: dict, key: str) -> int: + """Extract an int-like value from the summary, defaulting to 0.""" + value = summary.get(key) + if isinstance(value, int): + return value + if isinstance(value, float): + return int(value) + return 0 + + def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float: + """Return the current counter value for a labeled metric if present.""" text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics") for line in text.splitlines(): if not line.startswith(metric + "{"): @@ -114,7 +141,10 @@ def _build_payload( branch: str, build_number: str, summary: dict | None = None, + workspace_line_coverage_percent: float = 0.0, + source_lines_over_500: int = 0, ) -> str: + """Build the Pushgateway payload for the current suite run.""" passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0) build_labels = _label_str( { @@ -137,6 +167,10 @@ def _build_payload( f'titan_iac_quality_gate_run_status{{suite="{suite}",status="failed"}} {1 if status == "failed" else 0}', "# TYPE titan_iac_quality_gate_build_info gauge", f"titan_iac_quality_gate_build_info{build_labels} 1", + "# TYPE platform_quality_gate_workspace_line_coverage_percent gauge", + f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}', + "# TYPE platform_quality_gate_source_lines_over_500_total gauge", + f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}', ] results = summary.get("results", []) if isinstance(summary, dict) else [] if results: @@ -153,6 +187,7 @@ def _build_payload( def main() -> int: + """Publish the quality-gate metrics and print a compact run summary.""" suite = os.getenv("SUITE_NAME", "titan-iac") pushgateway_url = os.getenv("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091") job_name = os.getenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci") @@ -166,6 +201,8 @@ def main() -> int: exit_code = _read_exit_code(exit_code_path) status = "ok" if exit_code == 0 else "failed" summary = _load_summary(summary_path) + workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent") + source_lines_over_500 = _summary_int(summary, "source_lines_over_500") ok_count = int( _fetch_existing_counter( @@ -195,6 +232,8 @@ def main() -> int: branch=branch, build_number=build_number, summary=summary, + workspace_line_coverage_percent=workspace_line_coverage_percent, + source_lines_over_500=source_lines_over_500, ) push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}" _post_text(push_url, payload) @@ -209,6 +248,8 @@ def main() -> int: "ok_count": ok_count, "failed_count": failed_count, "checks_recorded": len(summary.get("results", [])) if isinstance(summary, dict) else 0, + "workspace_line_coverage_percent": workspace_line_coverage_percent, + "source_lines_over_500": source_lines_over_500, } print(json.dumps(summary, sort_keys=True)) return 0 diff --git a/testing/quality_coverage.py b/testing/quality_coverage.py index a778bf56..78d6649f 100644 --- a/testing/quality_coverage.py +++ b/testing/quality_coverage.py @@ -8,6 +8,7 @@ from typing import Any def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]: + """Load per-file line-rate percentages from a Cobertura XML report.""" tree = ET.parse(xml_path) xml_root = tree.getroot() source_roots = [ @@ -36,7 +37,11 @@ def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]: def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]: - """Return human-readable issues for tracked files below the coverage floor.""" + """Return human-readable issues for tracked files below the coverage floor. + + The report is intentionally per-file so a single weak module cannot hide + behind aggregate suite coverage. + """ if not xml_path.exists(): return [f"coverage xml missing: {xml_path.relative_to(root)}"] @@ -56,3 +61,25 @@ def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str] ) return issues + + +def compute_workspace_line_coverage( + contract: dict[str, Any], + root: Path, + xml_path: Path, +) -> float: + """Compute mean line coverage across tracked files present in the XML.""" + + if not xml_path.exists(): + return 0.0 + + percentages = _load_percentages(xml_path, root) + samples: list[float] = [] + for relative_path in contract.get("coverage", {}).get("tracked_files", []): + normalized = relative_path.replace("\\", "/") + percent = percentages.get(normalized) + if percent is not None: + samples.append(percent) + if not samples: + return 0.0 + return round(sum(samples) / len(samples), 3) diff --git a/testing/quality_gate.py b/testing/quality_gate.py index cff6a659..1c864381 100644 --- a/testing/quality_gate.py +++ b/testing/quality_gate.py @@ -11,9 +11,15 @@ from pathlib import Path from typing import Any from testing.quality_contract import load_contract -from testing.quality_coverage import run_check as run_coverage_check +from testing.quality_coverage import ( + compute_workspace_line_coverage, + run_check as run_coverage_check, +) from testing.quality_docs import run_check as run_docs_check -from testing.quality_hygiene import run_check as run_hygiene_check +from testing.quality_hygiene import ( + count_files_over_line_limit, + run_check as run_hygiene_check, +) RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"] @@ -21,14 +27,17 @@ RUFF_IGNORE = ["B017", "UP015", "UP035"] def _status_from_issues(issues: list[str]) -> str: + """Map an issue list to the gate status string.""" return "ok" if not issues else "failed" def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]: + """Build a JSON-serializable result record for the summary payload.""" return {"name": name, "description": description, "status": status, **extra} def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]: + """Run the pedantic ruff subset against the contract's lint paths.""" command = [ sys.executable, "-m", @@ -53,6 +62,7 @@ def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]: def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]: + """Run a pytest suite and collect junit/coverage artifact locations.""" junit_path = root / suite["junit"] junit_path.parent.mkdir(parents=True, exist_ok=True) command = [ @@ -146,11 +156,23 @@ def run_profile( results.append(_run_pytest_suite(root, check_name, suite)) status = "ok" if all(item["status"] == "ok" for item in results) else "failed" + workspace_line_coverage_percent = 0.0 + if "coverage" in profiles[profile_name]: + unit_suite = contract.get("pytest_suites", {}).get("unit", {}) + coverage_xml_rel = unit_suite.get("coverage_xml") + if coverage_xml_rel: + workspace_line_coverage_percent = compute_workspace_line_coverage( + contract, + root, + root / coverage_xml_rel, + ) return { "profile": profile_name, "status": status, "results": results, "manual_scripts": contract.get("manual_scripts", []), + "workspace_line_coverage_percent": workspace_line_coverage_percent, + "source_lines_over_500": count_files_over_line_limit(contract, root), } diff --git a/testing/quality_hygiene.py b/testing/quality_hygiene.py index de93fc6d..629f2096 100644 --- a/testing/quality_hygiene.py +++ b/testing/quality_hygiene.py @@ -9,6 +9,7 @@ from typing import Any def _expand_globs(root: Path, patterns: Iterable[str]) -> list[Path]: + """Expand a set of relative glob patterns to unique file paths.""" matched: set[Path] = set() for pattern in patterns: matched.update(path for path in root.glob(pattern) if path.is_file()) @@ -29,9 +30,24 @@ def run_check(contract: dict[str, Any], root: Path) -> list[str]: for rule in config.get("naming_rules", []): pattern = re.compile(rule["pattern"]) for path in _expand_globs(root, [rule["glob"]]): + if path.name == "conftest.py": + continue if not pattern.match(path.name): issues.append( f"naming rule failed ({rule['description']}): {path.relative_to(root)}" ) return issues + + +def count_files_over_line_limit(contract: dict[str, Any], root: Path) -> int: + """Return the number of managed files that exceed the configured LOC cap.""" + + config = contract.get("hygiene", {}) + max_lines = int(config.get("max_lines", 500)) + count = 0 + for path in _expand_globs(root, config.get("line_limit_globs", [])): + line_count = sum(1 for _ in path.open("r", encoding="utf-8")) + if line_count > max_lines: + count += 1 + return count diff --git a/testing/tests/test_publish_test_metrics.py b/testing/tests/test_publish_test_metrics.py index e6a993da..4263a018 100644 --- a/testing/tests/test_publish_test_metrics.py +++ b/testing/tests/test_publish_test_metrics.py @@ -1,3 +1,5 @@ +"""Unit tests for the Pushgateway publisher glue code.""" + from __future__ import annotations import json @@ -179,11 +181,15 @@ def test_build_payload_includes_summary_metrics(): {"name": "unit", "status": "failed"}, ] }, + workspace_line_coverage_percent=97.125, + source_lines_over_500=3, ) assert 'platform_quality_gate_runs_total{suite="titan-iac",status="ok"} 7' in payload assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs",result="ok"} 1' in payload assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="unit",result="failed"} 1' in payload + assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan-iac"} 97.125' in payload + assert 'platform_quality_gate_source_lines_over_500_total{suite="titan-iac"} 3' in payload def test_build_payload_skips_incomplete_results(): @@ -215,7 +221,13 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat ) (build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8") (build_dir / "quality-gate-summary.json").write_text( - json.dumps({"results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}]}), + json.dumps( + { + "results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}], + "workspace_line_coverage_percent": 96.4321, + "source_lines_over_500": 2, + } + ), encoding="utf-8", ) @@ -239,6 +251,8 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat assert posted["url"].endswith("/metrics/job/platform-quality-ci/suite/titan-iac") assert 'titan_iac_quality_gate_tests_total{suite="titan-iac",result="failed"} 1' in posted["payload"] assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="glue",result="failed"} 1' in posted["payload"] + assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan-iac"} 96.432' in posted["payload"] + assert 'platform_quality_gate_source_lines_over_500_total{suite="titan-iac"} 2' in posted["payload"] def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys): @@ -262,3 +276,5 @@ def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys): assert rc == 0 assert summary["status"] == "ok" assert summary["checks_recorded"] == 0 + assert summary["workspace_line_coverage_percent"] == 0.0 + assert summary["source_lines_over_500"] == 0 diff --git a/testing/tests/test_quality_gate.py b/testing/tests/test_quality_gate.py index a1a1b918..256a78e6 100644 --- a/testing/tests/test_quality_gate.py +++ b/testing/tests/test_quality_gate.py @@ -1,3 +1,5 @@ +"""Unit tests for the top-level quality-gate runner.""" + from __future__ import annotations from pathlib import Path @@ -52,6 +54,8 @@ def test_run_profile_aggregates_internal_and_pytest_results(tmp_path: Path, monk "unit", "coverage", ] + assert summary["workspace_line_coverage_percent"] == 0.0 + assert summary["source_lines_over_500"] == 0 assert calls[0][0][:3] == [quality_gate.sys.executable, "-m", "ruff"] assert any(result.get("junit") == "build/junit-unit.xml" for result in summary["results"])