titan-iac/ci/tests/glue/test_glue_metrics.py

49 lines
1.5 KiB
Python
Raw Normal View History

2026-01-18 21:23:11 -03:00
from __future__ import annotations
import os
from pathlib import Path
2026-01-18 21:23:11 -03:00
import requests
import yaml
2026-01-18 21:23:11 -03:00
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/")
CONFIG_PATH = Path(__file__).with_name("config.yaml")
def _load_config() -> dict:
with CONFIG_PATH.open("r", encoding="utf-8") as handle:
return yaml.safe_load(handle) or {}
2026-01-18 21:23:11 -03:00
def _query(promql: str) -> list[dict]:
response = requests.get(f"{VM_URL}/api/v1/query", params={"query": promql}, timeout=10)
response.raise_for_status()
payload = response.json()
return payload.get("data", {}).get("result", [])
def test_glue_metrics_present():
series = _query('kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}')
assert series, "No glue cronjob label series found"
def test_glue_metrics_success_join():
query = (
"kube_cronjob_status_last_successful_time "
'and on(namespace,cronjob) kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}'
)
series = _query(query)
assert series, "No glue cronjob last success series found"
def test_ariadne_schedule_metrics_present():
cfg = _load_config()
expected = cfg.get("ariadne_schedule_tasks", [])
if not expected:
return
series = _query("ariadne_schedule_next_run_timestamp_seconds")
tasks = {item.get("metric", {}).get("task") for item in series}
missing = [task for task in expected if task not in tasks]
assert not missing, f"Missing Ariadne schedule metrics for: {', '.join(missing)}"