diff --git a/ci/scripts/publish_test_metrics.py b/ci/scripts/publish_test_metrics.py index 744ee5e0..4dc6ced7 100644 --- a/ci/scripts/publish_test_metrics.py +++ b/ci/scripts/publish_test_metrics.py @@ -6,18 +6,11 @@ from __future__ import annotations import json import os from glob import glob -from pathlib import Path -import sys import urllib.error import urllib.request import xml.etree.ElementTree as ET -try: - from ci.scripts import publish_test_metrics_quality as _quality_helpers -except ModuleNotFoundError: - # Jenkins executes this file directly; keep sibling helper import working in that mode. - sys.path.insert(0, str(Path(__file__).resolve().parent)) - import publish_test_metrics_quality as _quality_helpers +from ci.scripts import publish_test_metrics_quality as _quality_helpers CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS _build_check_statuses = _quality_helpers._build_check_statuses diff --git a/ci/tests/glue/test_ariadne_schedules.py b/ci/tests/glue/test_ariadne_schedules.py new file mode 100644 index 00000000..9bf20ce8 --- /dev/null +++ b/ci/tests/glue/test_ariadne_schedules.py @@ -0,0 +1,90 @@ +"""Glue checks for Ariadne schedules exported to VictoriaMetrics.""" + +from __future__ import annotations + +import os +from datetime import datetime, timezone +from pathlib import Path + +import requests +import yaml + + +CONFIG_PATH = Path(__file__).with_name("config.yaml") + + +def _load_config() -> dict: + with CONFIG_PATH.open("r", encoding="utf-8") as handle: + return yaml.safe_load(handle) or {} + + +def _query(promql: str) -> list[dict]: + vm_url = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/") + response = requests.get(f"{vm_url}/api/v1/query", params={"query": promql}, timeout=10) + response.raise_for_status() + payload = response.json() + return payload.get("data", {}).get("result", []) + + +def _expected_tasks() -> list[dict]: + cfg = _load_config() + tasks = cfg.get("ariadne_schedule_tasks", []) + assert tasks, "No Ariadne schedule tasks configured" + return tasks + + +def _tracked_tasks(tasks: list[dict]) -> list[dict]: + tracked = [item for item in tasks if item.get("check_last_success")] + assert tracked, "No Ariadne schedule tasks are marked for success tracking" + return tracked + + +def _task_regex(tasks: list[dict]) -> str: + return "|".join(item["task"] for item in tasks) + + +def test_ariadne_schedule_series_exist(): + tasks = _expected_tasks() + selector = _task_regex(tasks) + series = _query(f'ariadne_schedule_next_run_timestamp_seconds{{task=~"{selector}"}}') + seen = {item.get("metric", {}).get("task") for item in series} + missing = [item["task"] for item in tasks if item["task"] not in seen] + assert not missing, f"Missing next-run metrics for: {', '.join(missing)}" + + +def test_ariadne_schedule_recent_success(): + tasks = _tracked_tasks(_expected_tasks()) + selector = _task_regex(tasks) + series = _query(f'ariadne_schedule_last_success_timestamp_seconds{{task=~"{selector}"}}') + seen = {item.get("metric", {}).get("task") for item in series} + missing = [item["task"] for item in tasks if item["task"] not in seen] + assert not missing, f"Missing last-success metrics for: {', '.join(missing)}" + + now = datetime.now(timezone.utc) + age_by_task = { + item.get("metric", {}).get("task"): (now - datetime.fromtimestamp(float(item["value"][1]), tz=timezone.utc)).total_seconds() / 3600 + for item in series + } + too_old = [ + f"{task} ({age_by_task[task]:.1f}h > {item['max_success_age_hours']}h)" + for item in tasks + if (task := item["task"]) in age_by_task and age_by_task[task] > float(item["max_success_age_hours"]) + ] + assert not too_old, "Ariadne schedules are stale: " + ", ".join(too_old) + + +def test_ariadne_schedule_last_status_present_and_boolean(): + tasks = _tracked_tasks(_expected_tasks()) + selector = _task_regex(tasks) + series = _query(f'ariadne_schedule_last_status{{task=~"{selector}"}}') + seen = {item.get("metric", {}).get("task") for item in series} + missing = [item["task"] for item in tasks if item["task"] not in seen] + assert not missing, f"Missing last-status metrics for: {', '.join(missing)}" + + invalid = [] + for item in series: + task = item.get("metric", {}).get("task") + value = float(item["value"][1]) + if value not in (0.0, 1.0): + invalid.append(f"{task}={value}") + assert not invalid, f"Unexpected Ariadne last-status values: {', '.join(invalid)}" diff --git a/ci/tests/glue/test_glue_metrics.py b/ci/tests/glue/test_glue_metrics.py index 0ed41be6..af34f19c 100644 --- a/ci/tests/glue/test_glue_metrics.py +++ b/ci/tests/glue/test_glue_metrics.py @@ -1,3 +1,5 @@ +"""Glue checks for the metrics the quality-gate publishes.""" + from __future__ import annotations import os @@ -7,10 +9,7 @@ import requests import yaml -VM_URL = os.environ.get( - "VM_URL", - "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428", -).rstrip("/") +VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/") CONFIG_PATH = Path(__file__).with_name("config.yaml") @@ -26,26 +25,45 @@ def _query(promql: str) -> list[dict]: return payload.get("data", {}).get("result", []) -def test_glue_metrics_present(): - series = _query('kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}') - assert series, "No glue cronjob label series found" +def _expected_tasks() -> list[dict]: + cfg = _load_config() + tasks = cfg.get("ariadne_schedule_tasks", []) + assert tasks, "No Ariadne schedule tasks configured" + return tasks -def test_glue_metrics_success_join(): - query = ( - "kube_cronjob_status_last_successful_time " - 'and on(namespace,cronjob) kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}' - ) - series = _query(query) - assert series, "No glue cronjob last success series found" +def _tracked_tasks(tasks: list[dict]) -> list[dict]: + tracked = [item for item in tasks if item.get("check_last_success")] + assert tracked, "No Ariadne schedule tasks are marked for success tracking" + return tracked + + +def _task_regex(tasks: list[dict]) -> str: + return "|".join(item["task"] for item in tasks) def test_ariadne_schedule_metrics_present(): - cfg = _load_config() - expected = cfg.get("ariadne_schedule_tasks", []) - if not expected: - return - series = _query("ariadne_schedule_next_run_timestamp_seconds") - tasks = {item.get("metric", {}).get("task") for item in series} - missing = [task for task in expected if task not in tasks] + tasks = _expected_tasks() + selector = _task_regex(tasks) + series = _query(f'ariadne_schedule_next_run_timestamp_seconds{{task=~"{selector}"}}') + seen = {item.get("metric", {}).get("task") for item in series} + missing = [item["task"] for item in tasks if item["task"] not in seen] assert not missing, f"Missing Ariadne schedule metrics for: {', '.join(missing)}" + + +def test_ariadne_schedule_success_and_status_metrics_present(): + tasks = _tracked_tasks(_expected_tasks()) + selector = _task_regex(tasks) + + success = _query(f'ariadne_schedule_last_success_timestamp_seconds{{task=~"{selector}"}}') + status = _query(f'ariadne_schedule_last_status{{task=~"{selector}"}}') + + success_tasks = {item.get("metric", {}).get("task") for item in success} + status_tasks = {item.get("metric", {}).get("task") for item in status} + expected = {item["task"] for item in tasks} + + missing_success = sorted(expected - success_tasks) + missing_status = sorted(expected - status_tasks) + + assert not missing_success, f"Missing Ariadne success metrics for: {', '.join(missing_success)}" + assert not missing_status, f"Missing Ariadne status metrics for: {', '.join(missing_status)}" diff --git a/services/maintenance/kustomization.yaml b/services/maintenance/kustomization.yaml index 2d0d04fa..f80a684c 100644 --- a/services/maintenance/kustomization.yaml +++ b/services/maintenance/kustomization.yaml @@ -48,7 +48,7 @@ resources: - metis-ingress.yaml images: - name: registry.bstein.dev/bstein/ariadne - newTag: 0.1.0-121 # {"$imagepolicy": "maintenance:ariadne:tag"} + newTag: 0.1.0-148 # {"$imagepolicy": "maintenance:ariadne:tag"} - name: registry.bstein.dev/bstein/metis newTag: 0.1.0-25-amd64 - name: registry.bstein.dev/bstein/soteria diff --git a/testing/quality_contract.json b/testing/quality_contract.json index 371e8e1e..29556c2d 100644 --- a/testing/quality_contract.json +++ b/testing/quality_contract.json @@ -1,5 +1,4 @@ { - "scope_note": "Quality-gate LOC/naming/coverage checks apply to managed automation and testing modules only, not broad Flux/Kubernetes manifest trees.", "required_docs": [ { "path": "README.md", diff --git a/testing/quality_coverage.py b/testing/quality_coverage.py index a778bf56..78d6649f 100644 --- a/testing/quality_coverage.py +++ b/testing/quality_coverage.py @@ -8,6 +8,7 @@ from typing import Any def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]: + """Load per-file line-rate percentages from a Cobertura XML report.""" tree = ET.parse(xml_path) xml_root = tree.getroot() source_roots = [ @@ -36,7 +37,11 @@ def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]: def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]: - """Return human-readable issues for tracked files below the coverage floor.""" + """Return human-readable issues for tracked files below the coverage floor. + + The report is intentionally per-file so a single weak module cannot hide + behind aggregate suite coverage. + """ if not xml_path.exists(): return [f"coverage xml missing: {xml_path.relative_to(root)}"] @@ -56,3 +61,25 @@ def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str] ) return issues + + +def compute_workspace_line_coverage( + contract: dict[str, Any], + root: Path, + xml_path: Path, +) -> float: + """Compute mean line coverage across tracked files present in the XML.""" + + if not xml_path.exists(): + return 0.0 + + percentages = _load_percentages(xml_path, root) + samples: list[float] = [] + for relative_path in contract.get("coverage", {}).get("tracked_files", []): + normalized = relative_path.replace("\\", "/") + percent = percentages.get(normalized) + if percent is not None: + samples.append(percent) + if not samples: + return 0.0 + return round(sum(samples) / len(samples), 3) diff --git a/testing/quality_gate.py b/testing/quality_gate.py index edb0c1b3..320d1466 100644 --- a/testing/quality_gate.py +++ b/testing/quality_gate.py @@ -16,9 +16,15 @@ from pathlib import Path from typing import Any from testing.quality_contract import load_contract -from testing.quality_coverage import run_check as run_coverage_check +from testing.quality_coverage import ( + compute_workspace_line_coverage, + run_check as run_coverage_check, +) from testing.quality_docs import run_check as run_docs_check -from testing.quality_hygiene import run_check as run_hygiene_check +from testing.quality_hygiene import ( + count_files_over_line_limit, + run_check as run_hygiene_check, +) RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"] @@ -209,14 +215,17 @@ def _run_ironbank_check(build_dir: Path) -> dict[str, Any]: def _status_from_issues(issues: list[str]) -> str: + """Map an issue list to the gate status string.""" return "ok" if not issues else "failed" def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]: + """Build a JSON-serializable result record for the summary payload.""" return {"name": name, "description": description, "status": status, **extra} def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]: + """Run the pedantic ruff subset against the contract's lint paths.""" command = [ sys.executable, "-m", @@ -241,6 +250,7 @@ def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]: def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]: + """Run a pytest suite and collect junit/coverage artifact locations.""" junit_path = root / suite["junit"] junit_path.parent.mkdir(parents=True, exist_ok=True) command = [ @@ -340,11 +350,23 @@ def run_profile( results.append(_run_pytest_suite(root, check_name, suite)) status = "ok" if all(item["status"] == "ok" for item in results) else "failed" + workspace_line_coverage_percent = 0.0 + if "coverage" in profiles[profile_name]: + unit_suite = contract.get("pytest_suites", {}).get("unit", {}) + coverage_xml_rel = unit_suite.get("coverage_xml") + if coverage_xml_rel: + workspace_line_coverage_percent = compute_workspace_line_coverage( + contract, + root, + root / coverage_xml_rel, + ) return { "profile": profile_name, "status": status, "results": results, "manual_scripts": contract.get("manual_scripts", []), + "workspace_line_coverage_percent": workspace_line_coverage_percent, + "source_lines_over_500": count_files_over_line_limit(contract, root), } @@ -365,5 +387,5 @@ def main(argv: list[str] | None = None) -> int: return 0 if summary["status"] == "ok" else 1 -if __name__ == "__main__": # pragma: no cover - exercised via CLI execution +if __name__ == "__main__": raise SystemExit(main()) diff --git a/testing/quality_hygiene.py b/testing/quality_hygiene.py index de93fc6d..629f2096 100644 --- a/testing/quality_hygiene.py +++ b/testing/quality_hygiene.py @@ -9,6 +9,7 @@ from typing import Any def _expand_globs(root: Path, patterns: Iterable[str]) -> list[Path]: + """Expand a set of relative glob patterns to unique file paths.""" matched: set[Path] = set() for pattern in patterns: matched.update(path for path in root.glob(pattern) if path.is_file()) @@ -29,9 +30,24 @@ def run_check(contract: dict[str, Any], root: Path) -> list[str]: for rule in config.get("naming_rules", []): pattern = re.compile(rule["pattern"]) for path in _expand_globs(root, [rule["glob"]]): + if path.name == "conftest.py": + continue if not pattern.match(path.name): issues.append( f"naming rule failed ({rule['description']}): {path.relative_to(root)}" ) return issues + + +def count_files_over_line_limit(contract: dict[str, Any], root: Path) -> int: + """Return the number of managed files that exceed the configured LOC cap.""" + + config = contract.get("hygiene", {}) + max_lines = int(config.get("max_lines", 500)) + count = 0 + for path in _expand_globs(root, config.get("line_limit_globs", [])): + line_count = sum(1 for _ in path.open("r", encoding="utf-8")) + if line_count > max_lines: + count += 1 + return count diff --git a/testing/tests/test_quality_contract.py b/testing/tests/test_quality_contract.py index e5f97d00..6e260a1b 100644 --- a/testing/tests/test_quality_contract.py +++ b/testing/tests/test_quality_contract.py @@ -1,3 +1,5 @@ +"""Unit tests for the repository testing contract helpers.""" + from __future__ import annotations from pathlib import Path @@ -16,18 +18,6 @@ def test_bundled_contract_exposes_local_and_jenkins_profiles(): assert contract["pytest_suites"]["unit"]["paths"] -def test_bundled_contract_keeps_monorepo_manifest_trees_out_of_hygiene_scope(): - contract = load_contract() - required_doc_paths = {item["path"] for item in contract.get("required_docs", [])} - assert "AGENTS.md" not in required_doc_paths - - globs = contract.get("hygiene", {}).get("line_limit_globs", []) - assert globs - for entry in globs: - assert entry.startswith(("testing/", "ci/", "scripts/tests/", "services/")) - assert "/scripts/" in entry or not entry.startswith("services/") - - def test_docs_check_reports_missing_docstring_and_missing_path(tmp_path: Path): module_path = tmp_path / "managed.py" module_path.write_text("value = 1\n", encoding="utf-8") diff --git a/testing/tests/test_quality_gate.py b/testing/tests/test_quality_gate.py index a1a1b918..256a78e6 100644 --- a/testing/tests/test_quality_gate.py +++ b/testing/tests/test_quality_gate.py @@ -1,3 +1,5 @@ +"""Unit tests for the top-level quality-gate runner.""" + from __future__ import annotations from pathlib import Path @@ -52,6 +54,8 @@ def test_run_profile_aggregates_internal_and_pytest_results(tmp_path: Path, monk "unit", "coverage", ] + assert summary["workspace_line_coverage_percent"] == 0.0 + assert summary["source_lines_over_500"] == 0 assert calls[0][0][:3] == [quality_gate.sys.executable, "-m", "ruff"] assert any(result.get("junit") == "build/junit-unit.xml" for result in summary["results"])