quality-gate: emit workspace coverage and LOC gauges
This commit is contained in:
parent
629df65c7b
commit
26b8f23426
@ -12,20 +12,24 @@ import xml.etree.ElementTree as ET
|
||||
|
||||
|
||||
def _escape_label(value: str) -> str:
|
||||
"""Escape a Prometheus label value without changing its content."""
|
||||
return value.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
|
||||
|
||||
|
||||
def _label_str(labels: dict[str, str]) -> str:
|
||||
"""Render a stable Prometheus label set from a mapping."""
|
||||
parts = [f'{key}="{_escape_label(val)}"' for key, val in labels.items() if val]
|
||||
return "{" + ",".join(parts) + "}" if parts else ""
|
||||
|
||||
|
||||
def _read_text(url: str) -> str:
|
||||
"""Fetch a plain-text response body from the given URL."""
|
||||
with urllib.request.urlopen(url, timeout=10) as response:
|
||||
return response.read().decode("utf-8")
|
||||
|
||||
|
||||
def _post_text(url: str, payload: str) -> None:
|
||||
"""POST a plain-text payload and fail on any 4xx/5xx response."""
|
||||
request = urllib.request.Request(
|
||||
url,
|
||||
data=payload.encode("utf-8"),
|
||||
@ -38,6 +42,7 @@ def _post_text(url: str, payload: str) -> None:
|
||||
|
||||
|
||||
def _parse_junit(path: str) -> dict[str, int]:
|
||||
"""Parse a JUnit XML file into aggregate test counters."""
|
||||
if not os.path.exists(path):
|
||||
return {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
|
||||
|
||||
@ -64,6 +69,7 @@ def _parse_junit(path: str) -> dict[str, int]:
|
||||
|
||||
|
||||
def _collect_junit_totals(pattern: str) -> dict[str, int]:
|
||||
"""Sum JUnit counters across every XML file matching the pattern."""
|
||||
totals = {"tests": 0, "failures": 0, "errors": 0, "skipped": 0}
|
||||
for path in sorted(glob(pattern)):
|
||||
parsed = _parse_junit(path)
|
||||
@ -73,6 +79,7 @@ def _collect_junit_totals(pattern: str) -> dict[str, int]:
|
||||
|
||||
|
||||
def _read_exit_code(path: str) -> int:
|
||||
"""Read the quality-gate exit code, defaulting to failure if missing."""
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
return int(handle.read().strip())
|
||||
@ -81,6 +88,7 @@ def _read_exit_code(path: str) -> int:
|
||||
|
||||
|
||||
def _load_summary(path: str) -> dict:
|
||||
"""Load the JSON quality-gate summary, returning an empty mapping on error."""
|
||||
try:
|
||||
with open(path, "r", encoding="utf-8") as handle:
|
||||
return json.load(handle)
|
||||
@ -88,7 +96,26 @@ def _load_summary(path: str) -> dict:
|
||||
return {}
|
||||
|
||||
|
||||
def _summary_float(summary: dict, key: str) -> float:
|
||||
"""Extract a float-like value from the summary, defaulting to 0.0."""
|
||||
value = summary.get(key)
|
||||
if isinstance(value, (int, float)):
|
||||
return float(value)
|
||||
return 0.0
|
||||
|
||||
|
||||
def _summary_int(summary: dict, key: str) -> int:
|
||||
"""Extract an int-like value from the summary, defaulting to 0."""
|
||||
value = summary.get(key)
|
||||
if isinstance(value, int):
|
||||
return value
|
||||
if isinstance(value, float):
|
||||
return int(value)
|
||||
return 0
|
||||
|
||||
|
||||
def _fetch_existing_counter(pushgateway_url: str, metric: str, labels: dict[str, str]) -> float:
|
||||
"""Return the current counter value for a labeled metric if present."""
|
||||
text = _read_text(f"{pushgateway_url.rstrip('/')}/metrics")
|
||||
for line in text.splitlines():
|
||||
if not line.startswith(metric + "{"):
|
||||
@ -114,7 +141,10 @@ def _build_payload(
|
||||
branch: str,
|
||||
build_number: str,
|
||||
summary: dict | None = None,
|
||||
workspace_line_coverage_percent: float = 0.0,
|
||||
source_lines_over_500: int = 0,
|
||||
) -> str:
|
||||
"""Build the Pushgateway payload for the current suite run."""
|
||||
passed = max(tests["tests"] - tests["failures"] - tests["errors"] - tests["skipped"], 0)
|
||||
build_labels = _label_str(
|
||||
{
|
||||
@ -137,6 +167,10 @@ def _build_payload(
|
||||
f'titan_iac_quality_gate_run_status{{suite="{suite}",status="failed"}} {1 if status == "failed" else 0}',
|
||||
"# TYPE titan_iac_quality_gate_build_info gauge",
|
||||
f"titan_iac_quality_gate_build_info{build_labels} 1",
|
||||
"# TYPE platform_quality_gate_workspace_line_coverage_percent gauge",
|
||||
f'platform_quality_gate_workspace_line_coverage_percent{{suite="{suite}"}} {workspace_line_coverage_percent:.3f}',
|
||||
"# TYPE platform_quality_gate_source_lines_over_500_total gauge",
|
||||
f'platform_quality_gate_source_lines_over_500_total{{suite="{suite}"}} {source_lines_over_500}',
|
||||
]
|
||||
results = summary.get("results", []) if isinstance(summary, dict) else []
|
||||
if results:
|
||||
@ -153,6 +187,7 @@ def _build_payload(
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Publish the quality-gate metrics and print a compact run summary."""
|
||||
suite = os.getenv("SUITE_NAME", "titan-iac")
|
||||
pushgateway_url = os.getenv("PUSHGATEWAY_URL", "http://platform-quality-gateway.monitoring.svc.cluster.local:9091")
|
||||
job_name = os.getenv("QUALITY_GATE_JOB_NAME", "platform-quality-ci")
|
||||
@ -166,6 +201,8 @@ def main() -> int:
|
||||
exit_code = _read_exit_code(exit_code_path)
|
||||
status = "ok" if exit_code == 0 else "failed"
|
||||
summary = _load_summary(summary_path)
|
||||
workspace_line_coverage_percent = _summary_float(summary, "workspace_line_coverage_percent")
|
||||
source_lines_over_500 = _summary_int(summary, "source_lines_over_500")
|
||||
|
||||
ok_count = int(
|
||||
_fetch_existing_counter(
|
||||
@ -195,6 +232,8 @@ def main() -> int:
|
||||
branch=branch,
|
||||
build_number=build_number,
|
||||
summary=summary,
|
||||
workspace_line_coverage_percent=workspace_line_coverage_percent,
|
||||
source_lines_over_500=source_lines_over_500,
|
||||
)
|
||||
push_url = f"{pushgateway_url.rstrip('/')}/metrics/job/{job_name}/suite/{suite}"
|
||||
_post_text(push_url, payload)
|
||||
@ -209,6 +248,8 @@ def main() -> int:
|
||||
"ok_count": ok_count,
|
||||
"failed_count": failed_count,
|
||||
"checks_recorded": len(summary.get("results", [])) if isinstance(summary, dict) else 0,
|
||||
"workspace_line_coverage_percent": workspace_line_coverage_percent,
|
||||
"source_lines_over_500": source_lines_over_500,
|
||||
}
|
||||
print(json.dumps(summary, sort_keys=True))
|
||||
return 0
|
||||
|
||||
@ -8,6 +8,7 @@ from typing import Any
|
||||
|
||||
|
||||
def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]:
|
||||
"""Load per-file line-rate percentages from a Cobertura XML report."""
|
||||
tree = ET.parse(xml_path)
|
||||
xml_root = tree.getroot()
|
||||
source_roots = [
|
||||
@ -36,7 +37,11 @@ def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]:
|
||||
|
||||
|
||||
def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]:
|
||||
"""Return human-readable issues for tracked files below the coverage floor."""
|
||||
"""Return human-readable issues for tracked files below the coverage floor.
|
||||
|
||||
The report is intentionally per-file so a single weak module cannot hide
|
||||
behind aggregate suite coverage.
|
||||
"""
|
||||
if not xml_path.exists():
|
||||
return [f"coverage xml missing: {xml_path.relative_to(root)}"]
|
||||
|
||||
@ -56,3 +61,25 @@ def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]
|
||||
)
|
||||
|
||||
return issues
|
||||
|
||||
|
||||
def compute_workspace_line_coverage(
|
||||
contract: dict[str, Any],
|
||||
root: Path,
|
||||
xml_path: Path,
|
||||
) -> float:
|
||||
"""Compute mean line coverage across tracked files present in the XML."""
|
||||
|
||||
if not xml_path.exists():
|
||||
return 0.0
|
||||
|
||||
percentages = _load_percentages(xml_path, root)
|
||||
samples: list[float] = []
|
||||
for relative_path in contract.get("coverage", {}).get("tracked_files", []):
|
||||
normalized = relative_path.replace("\\", "/")
|
||||
percent = percentages.get(normalized)
|
||||
if percent is not None:
|
||||
samples.append(percent)
|
||||
if not samples:
|
||||
return 0.0
|
||||
return round(sum(samples) / len(samples), 3)
|
||||
|
||||
@ -11,9 +11,15 @@ from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from testing.quality_contract import load_contract
|
||||
from testing.quality_coverage import run_check as run_coverage_check
|
||||
from testing.quality_coverage import (
|
||||
compute_workspace_line_coverage,
|
||||
run_check as run_coverage_check,
|
||||
)
|
||||
from testing.quality_docs import run_check as run_docs_check
|
||||
from testing.quality_hygiene import run_check as run_hygiene_check
|
||||
from testing.quality_hygiene import (
|
||||
count_files_over_line_limit,
|
||||
run_check as run_hygiene_check,
|
||||
)
|
||||
|
||||
|
||||
RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"]
|
||||
@ -21,14 +27,17 @@ RUFF_IGNORE = ["B017", "UP015", "UP035"]
|
||||
|
||||
|
||||
def _status_from_issues(issues: list[str]) -> str:
|
||||
"""Map an issue list to the gate status string."""
|
||||
return "ok" if not issues else "failed"
|
||||
|
||||
|
||||
def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]:
|
||||
"""Build a JSON-serializable result record for the summary payload."""
|
||||
return {"name": name, "description": description, "status": status, **extra}
|
||||
|
||||
|
||||
def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]:
|
||||
"""Run the pedantic ruff subset against the contract's lint paths."""
|
||||
command = [
|
||||
sys.executable,
|
||||
"-m",
|
||||
@ -53,6 +62,7 @@ def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]:
|
||||
|
||||
|
||||
def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Run a pytest suite and collect junit/coverage artifact locations."""
|
||||
junit_path = root / suite["junit"]
|
||||
junit_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
command = [
|
||||
@ -146,11 +156,23 @@ def run_profile(
|
||||
results.append(_run_pytest_suite(root, check_name, suite))
|
||||
|
||||
status = "ok" if all(item["status"] == "ok" for item in results) else "failed"
|
||||
workspace_line_coverage_percent = 0.0
|
||||
if "coverage" in profiles[profile_name]:
|
||||
unit_suite = contract.get("pytest_suites", {}).get("unit", {})
|
||||
coverage_xml_rel = unit_suite.get("coverage_xml")
|
||||
if coverage_xml_rel:
|
||||
workspace_line_coverage_percent = compute_workspace_line_coverage(
|
||||
contract,
|
||||
root,
|
||||
root / coverage_xml_rel,
|
||||
)
|
||||
return {
|
||||
"profile": profile_name,
|
||||
"status": status,
|
||||
"results": results,
|
||||
"manual_scripts": contract.get("manual_scripts", []),
|
||||
"workspace_line_coverage_percent": workspace_line_coverage_percent,
|
||||
"source_lines_over_500": count_files_over_line_limit(contract, root),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@ from typing import Any
|
||||
|
||||
|
||||
def _expand_globs(root: Path, patterns: Iterable[str]) -> list[Path]:
|
||||
"""Expand a set of relative glob patterns to unique file paths."""
|
||||
matched: set[Path] = set()
|
||||
for pattern in patterns:
|
||||
matched.update(path for path in root.glob(pattern) if path.is_file())
|
||||
@ -29,9 +30,24 @@ def run_check(contract: dict[str, Any], root: Path) -> list[str]:
|
||||
for rule in config.get("naming_rules", []):
|
||||
pattern = re.compile(rule["pattern"])
|
||||
for path in _expand_globs(root, [rule["glob"]]):
|
||||
if path.name == "conftest.py":
|
||||
continue
|
||||
if not pattern.match(path.name):
|
||||
issues.append(
|
||||
f"naming rule failed ({rule['description']}): {path.relative_to(root)}"
|
||||
)
|
||||
|
||||
return issues
|
||||
|
||||
|
||||
def count_files_over_line_limit(contract: dict[str, Any], root: Path) -> int:
|
||||
"""Return the number of managed files that exceed the configured LOC cap."""
|
||||
|
||||
config = contract.get("hygiene", {})
|
||||
max_lines = int(config.get("max_lines", 500))
|
||||
count = 0
|
||||
for path in _expand_globs(root, config.get("line_limit_globs", [])):
|
||||
line_count = sum(1 for _ in path.open("r", encoding="utf-8"))
|
||||
if line_count > max_lines:
|
||||
count += 1
|
||||
return count
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
"""Unit tests for the Pushgateway publisher glue code."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
@ -179,11 +181,15 @@ def test_build_payload_includes_summary_metrics():
|
||||
{"name": "unit", "status": "failed"},
|
||||
]
|
||||
},
|
||||
workspace_line_coverage_percent=97.125,
|
||||
source_lines_over_500=3,
|
||||
)
|
||||
|
||||
assert 'platform_quality_gate_runs_total{suite="titan-iac",status="ok"} 7' in payload
|
||||
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="docs",result="ok"} 1' in payload
|
||||
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="unit",result="failed"} 1' in payload
|
||||
assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan-iac"} 97.125' in payload
|
||||
assert 'platform_quality_gate_source_lines_over_500_total{suite="titan-iac"} 3' in payload
|
||||
|
||||
|
||||
def test_build_payload_skips_incomplete_results():
|
||||
@ -215,7 +221,13 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat
|
||||
)
|
||||
(build_dir / "quality-gate.rc").write_text("1\n", encoding="utf-8")
|
||||
(build_dir / "quality-gate-summary.json").write_text(
|
||||
json.dumps({"results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}]}),
|
||||
json.dumps(
|
||||
{
|
||||
"results": [{"name": "docs", "status": "ok"}, {"name": "glue", "status": "failed"}],
|
||||
"workspace_line_coverage_percent": 96.4321,
|
||||
"source_lines_over_500": 2,
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
@ -239,6 +251,8 @@ def test_main_uses_quality_gate_summary_and_junit_glob(tmp_path: Path, monkeypat
|
||||
assert posted["url"].endswith("/metrics/job/platform-quality-ci/suite/titan-iac")
|
||||
assert 'titan_iac_quality_gate_tests_total{suite="titan-iac",result="failed"} 1' in posted["payload"]
|
||||
assert 'titan_iac_quality_gate_checks_total{suite="titan-iac",check="glue",result="failed"} 1' in posted["payload"]
|
||||
assert 'platform_quality_gate_workspace_line_coverage_percent{suite="titan-iac"} 96.432' in posted["payload"]
|
||||
assert 'platform_quality_gate_source_lines_over_500_total{suite="titan-iac"} 2' in posted["payload"]
|
||||
|
||||
|
||||
def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys):
|
||||
@ -262,3 +276,5 @@ def test_main_marks_successful_run(tmp_path: Path, monkeypatch, capsys):
|
||||
assert rc == 0
|
||||
assert summary["status"] == "ok"
|
||||
assert summary["checks_recorded"] == 0
|
||||
assert summary["workspace_line_coverage_percent"] == 0.0
|
||||
assert summary["source_lines_over_500"] == 0
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
"""Unit tests for the top-level quality-gate runner."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
@ -52,6 +54,8 @@ def test_run_profile_aggregates_internal_and_pytest_results(tmp_path: Path, monk
|
||||
"unit",
|
||||
"coverage",
|
||||
]
|
||||
assert summary["workspace_line_coverage_percent"] == 0.0
|
||||
assert summary["source_lines_over_500"] == 0
|
||||
assert calls[0][0][:3] == [quality_gate.sys.executable, "-m", "ruff"]
|
||||
assert any(result.get("junit") == "build/junit-unit.xml" for result in summary["results"])
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user