Merge remote-tracking branch 'origin/main'

This commit is contained in:
jenkins 2026-04-21 09:23:03 -03:00
commit 7cd40d457d
10 changed files with 206 additions and 47 deletions

View File

@ -6,18 +6,11 @@ from __future__ import annotations
import json import json
import os import os
from glob import glob from glob import glob
from pathlib import Path
import sys
import urllib.error import urllib.error
import urllib.request import urllib.request
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
try: from ci.scripts import publish_test_metrics_quality as _quality_helpers
from ci.scripts import publish_test_metrics_quality as _quality_helpers
except ModuleNotFoundError:
# Jenkins executes this file directly; keep sibling helper import working in that mode.
sys.path.insert(0, str(Path(__file__).resolve().parent))
import publish_test_metrics_quality as _quality_helpers
CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS CANONICAL_CHECKS = _quality_helpers.CANONICAL_CHECKS
_build_check_statuses = _quality_helpers._build_check_statuses _build_check_statuses = _quality_helpers._build_check_statuses

View File

@ -0,0 +1,90 @@
"""Glue checks for Ariadne schedules exported to VictoriaMetrics."""
from __future__ import annotations
import os
from datetime import datetime, timezone
from pathlib import Path
import requests
import yaml
CONFIG_PATH = Path(__file__).with_name("config.yaml")
def _load_config() -> dict:
with CONFIG_PATH.open("r", encoding="utf-8") as handle:
return yaml.safe_load(handle) or {}
def _query(promql: str) -> list[dict]:
vm_url = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/")
response = requests.get(f"{vm_url}/api/v1/query", params={"query": promql}, timeout=10)
response.raise_for_status()
payload = response.json()
return payload.get("data", {}).get("result", [])
def _expected_tasks() -> list[dict]:
cfg = _load_config()
tasks = cfg.get("ariadne_schedule_tasks", [])
assert tasks, "No Ariadne schedule tasks configured"
return tasks
def _tracked_tasks(tasks: list[dict]) -> list[dict]:
tracked = [item for item in tasks if item.get("check_last_success")]
assert tracked, "No Ariadne schedule tasks are marked for success tracking"
return tracked
def _task_regex(tasks: list[dict]) -> str:
return "|".join(item["task"] for item in tasks)
def test_ariadne_schedule_series_exist():
tasks = _expected_tasks()
selector = _task_regex(tasks)
series = _query(f'ariadne_schedule_next_run_timestamp_seconds{{task=~"{selector}"}}')
seen = {item.get("metric", {}).get("task") for item in series}
missing = [item["task"] for item in tasks if item["task"] not in seen]
assert not missing, f"Missing next-run metrics for: {', '.join(missing)}"
def test_ariadne_schedule_recent_success():
tasks = _tracked_tasks(_expected_tasks())
selector = _task_regex(tasks)
series = _query(f'ariadne_schedule_last_success_timestamp_seconds{{task=~"{selector}"}}')
seen = {item.get("metric", {}).get("task") for item in series}
missing = [item["task"] for item in tasks if item["task"] not in seen]
assert not missing, f"Missing last-success metrics for: {', '.join(missing)}"
now = datetime.now(timezone.utc)
age_by_task = {
item.get("metric", {}).get("task"): (now - datetime.fromtimestamp(float(item["value"][1]), tz=timezone.utc)).total_seconds() / 3600
for item in series
}
too_old = [
f"{task} ({age_by_task[task]:.1f}h > {item['max_success_age_hours']}h)"
for item in tasks
if (task := item["task"]) in age_by_task and age_by_task[task] > float(item["max_success_age_hours"])
]
assert not too_old, "Ariadne schedules are stale: " + ", ".join(too_old)
def test_ariadne_schedule_last_status_present_and_boolean():
tasks = _tracked_tasks(_expected_tasks())
selector = _task_regex(tasks)
series = _query(f'ariadne_schedule_last_status{{task=~"{selector}"}}')
seen = {item.get("metric", {}).get("task") for item in series}
missing = [item["task"] for item in tasks if item["task"] not in seen]
assert not missing, f"Missing last-status metrics for: {', '.join(missing)}"
invalid = []
for item in series:
task = item.get("metric", {}).get("task")
value = float(item["value"][1])
if value not in (0.0, 1.0):
invalid.append(f"{task}={value}")
assert not invalid, f"Unexpected Ariadne last-status values: {', '.join(invalid)}"

View File

@ -1,3 +1,5 @@
"""Glue checks for the metrics the quality-gate publishes."""
from __future__ import annotations from __future__ import annotations
import os import os
@ -7,10 +9,7 @@ import requests
import yaml import yaml
VM_URL = os.environ.get( VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/")
"VM_URL",
"http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428",
).rstrip("/")
CONFIG_PATH = Path(__file__).with_name("config.yaml") CONFIG_PATH = Path(__file__).with_name("config.yaml")
@ -26,26 +25,45 @@ def _query(promql: str) -> list[dict]:
return payload.get("data", {}).get("result", []) return payload.get("data", {}).get("result", [])
def test_glue_metrics_present(): def _expected_tasks() -> list[dict]:
series = _query('kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}') cfg = _load_config()
assert series, "No glue cronjob label series found" tasks = cfg.get("ariadne_schedule_tasks", [])
assert tasks, "No Ariadne schedule tasks configured"
return tasks
def test_glue_metrics_success_join(): def _tracked_tasks(tasks: list[dict]) -> list[dict]:
query = ( tracked = [item for item in tasks if item.get("check_last_success")]
"kube_cronjob_status_last_successful_time " assert tracked, "No Ariadne schedule tasks are marked for success tracking"
'and on(namespace,cronjob) kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}' return tracked
)
series = _query(query)
assert series, "No glue cronjob last success series found" def _task_regex(tasks: list[dict]) -> str:
return "|".join(item["task"] for item in tasks)
def test_ariadne_schedule_metrics_present(): def test_ariadne_schedule_metrics_present():
cfg = _load_config() tasks = _expected_tasks()
expected = cfg.get("ariadne_schedule_tasks", []) selector = _task_regex(tasks)
if not expected: series = _query(f'ariadne_schedule_next_run_timestamp_seconds{{task=~"{selector}"}}')
return seen = {item.get("metric", {}).get("task") for item in series}
series = _query("ariadne_schedule_next_run_timestamp_seconds") missing = [item["task"] for item in tasks if item["task"] not in seen]
tasks = {item.get("metric", {}).get("task") for item in series}
missing = [task for task in expected if task not in tasks]
assert not missing, f"Missing Ariadne schedule metrics for: {', '.join(missing)}" assert not missing, f"Missing Ariadne schedule metrics for: {', '.join(missing)}"
def test_ariadne_schedule_success_and_status_metrics_present():
tasks = _tracked_tasks(_expected_tasks())
selector = _task_regex(tasks)
success = _query(f'ariadne_schedule_last_success_timestamp_seconds{{task=~"{selector}"}}')
status = _query(f'ariadne_schedule_last_status{{task=~"{selector}"}}')
success_tasks = {item.get("metric", {}).get("task") for item in success}
status_tasks = {item.get("metric", {}).get("task") for item in status}
expected = {item["task"] for item in tasks}
missing_success = sorted(expected - success_tasks)
missing_status = sorted(expected - status_tasks)
assert not missing_success, f"Missing Ariadne success metrics for: {', '.join(missing_success)}"
assert not missing_status, f"Missing Ariadne status metrics for: {', '.join(missing_status)}"

View File

@ -48,7 +48,7 @@ resources:
- metis-ingress.yaml - metis-ingress.yaml
images: images:
- name: registry.bstein.dev/bstein/ariadne - name: registry.bstein.dev/bstein/ariadne
newTag: 0.1.0-121 # {"$imagepolicy": "maintenance:ariadne:tag"} newTag: 0.1.0-148 # {"$imagepolicy": "maintenance:ariadne:tag"}
- name: registry.bstein.dev/bstein/metis - name: registry.bstein.dev/bstein/metis
newTag: 0.1.0-25-amd64 newTag: 0.1.0-25-amd64
- name: registry.bstein.dev/bstein/soteria - name: registry.bstein.dev/bstein/soteria

View File

@ -1,5 +1,4 @@
{ {
"scope_note": "Quality-gate LOC/naming/coverage checks apply to managed automation and testing modules only, not broad Flux/Kubernetes manifest trees.",
"required_docs": [ "required_docs": [
{ {
"path": "README.md", "path": "README.md",

View File

@ -8,6 +8,7 @@ from typing import Any
def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]: def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]:
"""Load per-file line-rate percentages from a Cobertura XML report."""
tree = ET.parse(xml_path) tree = ET.parse(xml_path)
xml_root = tree.getroot() xml_root = tree.getroot()
source_roots = [ source_roots = [
@ -36,7 +37,11 @@ def _load_percentages(xml_path: Path, root: Path) -> dict[str, float]:
def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]: def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]:
"""Return human-readable issues for tracked files below the coverage floor.""" """Return human-readable issues for tracked files below the coverage floor.
The report is intentionally per-file so a single weak module cannot hide
behind aggregate suite coverage.
"""
if not xml_path.exists(): if not xml_path.exists():
return [f"coverage xml missing: {xml_path.relative_to(root)}"] return [f"coverage xml missing: {xml_path.relative_to(root)}"]
@ -56,3 +61,25 @@ def run_check(contract: dict[str, Any], root: Path, xml_path: Path) -> list[str]
) )
return issues return issues
def compute_workspace_line_coverage(
contract: dict[str, Any],
root: Path,
xml_path: Path,
) -> float:
"""Compute mean line coverage across tracked files present in the XML."""
if not xml_path.exists():
return 0.0
percentages = _load_percentages(xml_path, root)
samples: list[float] = []
for relative_path in contract.get("coverage", {}).get("tracked_files", []):
normalized = relative_path.replace("\\", "/")
percent = percentages.get(normalized)
if percent is not None:
samples.append(percent)
if not samples:
return 0.0
return round(sum(samples) / len(samples), 3)

View File

@ -16,9 +16,15 @@ from pathlib import Path
from typing import Any from typing import Any
from testing.quality_contract import load_contract from testing.quality_contract import load_contract
from testing.quality_coverage import run_check as run_coverage_check from testing.quality_coverage import (
compute_workspace_line_coverage,
run_check as run_coverage_check,
)
from testing.quality_docs import run_check as run_docs_check from testing.quality_docs import run_check as run_docs_check
from testing.quality_hygiene import run_check as run_hygiene_check from testing.quality_hygiene import (
count_files_over_line_limit,
run_check as run_hygiene_check,
)
RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"] RUFF_SELECT = ["F", "B", "SIM", "C4", "UP"]
@ -209,14 +215,17 @@ def _run_ironbank_check(build_dir: Path) -> dict[str, Any]:
def _status_from_issues(issues: list[str]) -> str: def _status_from_issues(issues: list[str]) -> str:
"""Map an issue list to the gate status string."""
return "ok" if not issues else "failed" return "ok" if not issues else "failed"
def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]: def _result(name: str, description: str, status: str, **extra: Any) -> dict[str, Any]:
"""Build a JSON-serializable result record for the summary payload."""
return {"name": name, "description": description, "status": status, **extra} return {"name": name, "description": description, "status": status, **extra}
def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]: def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]:
"""Run the pedantic ruff subset against the contract's lint paths."""
command = [ command = [
sys.executable, sys.executable,
"-m", "-m",
@ -241,6 +250,7 @@ def _run_ruff(contract: dict[str, Any], root: Path) -> dict[str, Any]:
def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]: def _run_pytest_suite(root: Path, suite_name: str, suite: dict[str, Any]) -> dict[str, Any]:
"""Run a pytest suite and collect junit/coverage artifact locations."""
junit_path = root / suite["junit"] junit_path = root / suite["junit"]
junit_path.parent.mkdir(parents=True, exist_ok=True) junit_path.parent.mkdir(parents=True, exist_ok=True)
command = [ command = [
@ -340,11 +350,23 @@ def run_profile(
results.append(_run_pytest_suite(root, check_name, suite)) results.append(_run_pytest_suite(root, check_name, suite))
status = "ok" if all(item["status"] == "ok" for item in results) else "failed" status = "ok" if all(item["status"] == "ok" for item in results) else "failed"
workspace_line_coverage_percent = 0.0
if "coverage" in profiles[profile_name]:
unit_suite = contract.get("pytest_suites", {}).get("unit", {})
coverage_xml_rel = unit_suite.get("coverage_xml")
if coverage_xml_rel:
workspace_line_coverage_percent = compute_workspace_line_coverage(
contract,
root,
root / coverage_xml_rel,
)
return { return {
"profile": profile_name, "profile": profile_name,
"status": status, "status": status,
"results": results, "results": results,
"manual_scripts": contract.get("manual_scripts", []), "manual_scripts": contract.get("manual_scripts", []),
"workspace_line_coverage_percent": workspace_line_coverage_percent,
"source_lines_over_500": count_files_over_line_limit(contract, root),
} }
@ -365,5 +387,5 @@ def main(argv: list[str] | None = None) -> int:
return 0 if summary["status"] == "ok" else 1 return 0 if summary["status"] == "ok" else 1
if __name__ == "__main__": # pragma: no cover - exercised via CLI execution if __name__ == "__main__":
raise SystemExit(main()) raise SystemExit(main())

View File

@ -9,6 +9,7 @@ from typing import Any
def _expand_globs(root: Path, patterns: Iterable[str]) -> list[Path]: def _expand_globs(root: Path, patterns: Iterable[str]) -> list[Path]:
"""Expand a set of relative glob patterns to unique file paths."""
matched: set[Path] = set() matched: set[Path] = set()
for pattern in patterns: for pattern in patterns:
matched.update(path for path in root.glob(pattern) if path.is_file()) matched.update(path for path in root.glob(pattern) if path.is_file())
@ -29,9 +30,24 @@ def run_check(contract: dict[str, Any], root: Path) -> list[str]:
for rule in config.get("naming_rules", []): for rule in config.get("naming_rules", []):
pattern = re.compile(rule["pattern"]) pattern = re.compile(rule["pattern"])
for path in _expand_globs(root, [rule["glob"]]): for path in _expand_globs(root, [rule["glob"]]):
if path.name == "conftest.py":
continue
if not pattern.match(path.name): if not pattern.match(path.name):
issues.append( issues.append(
f"naming rule failed ({rule['description']}): {path.relative_to(root)}" f"naming rule failed ({rule['description']}): {path.relative_to(root)}"
) )
return issues return issues
def count_files_over_line_limit(contract: dict[str, Any], root: Path) -> int:
"""Return the number of managed files that exceed the configured LOC cap."""
config = contract.get("hygiene", {})
max_lines = int(config.get("max_lines", 500))
count = 0
for path in _expand_globs(root, config.get("line_limit_globs", [])):
line_count = sum(1 for _ in path.open("r", encoding="utf-8"))
if line_count > max_lines:
count += 1
return count

View File

@ -1,3 +1,5 @@
"""Unit tests for the repository testing contract helpers."""
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
@ -16,18 +18,6 @@ def test_bundled_contract_exposes_local_and_jenkins_profiles():
assert contract["pytest_suites"]["unit"]["paths"] assert contract["pytest_suites"]["unit"]["paths"]
def test_bundled_contract_keeps_monorepo_manifest_trees_out_of_hygiene_scope():
contract = load_contract()
required_doc_paths = {item["path"] for item in contract.get("required_docs", [])}
assert "AGENTS.md" not in required_doc_paths
globs = contract.get("hygiene", {}).get("line_limit_globs", [])
assert globs
for entry in globs:
assert entry.startswith(("testing/", "ci/", "scripts/tests/", "services/"))
assert "/scripts/" in entry or not entry.startswith("services/")
def test_docs_check_reports_missing_docstring_and_missing_path(tmp_path: Path): def test_docs_check_reports_missing_docstring_and_missing_path(tmp_path: Path):
module_path = tmp_path / "managed.py" module_path = tmp_path / "managed.py"
module_path.write_text("value = 1\n", encoding="utf-8") module_path.write_text("value = 1\n", encoding="utf-8")

View File

@ -1,3 +1,5 @@
"""Unit tests for the top-level quality-gate runner."""
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
@ -52,6 +54,8 @@ def test_run_profile_aggregates_internal_and_pytest_results(tmp_path: Path, monk
"unit", "unit",
"coverage", "coverage",
] ]
assert summary["workspace_line_coverage_percent"] == 0.0
assert summary["source_lines_over_500"] == 0
assert calls[0][0][:3] == [quality_gate.sys.executable, "-m", "ruff"] assert calls[0][0][:3] == [quality_gate.sys.executable, "-m", "ruff"]
assert any(result.get("junit") == "build/junit-unit.xml" for result in summary["results"]) assert any(result.get("junit") == "build/junit-unit.xml" for result in summary["results"])