30 lines
869 B
Python
30 lines
869 B
Python
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
import requests
|
|
|
|
|
|
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/")
|
|
|
|
|
|
def _query(promql: str) -> list[dict]:
|
|
response = requests.get(f"{VM_URL}/api/v1/query", params={"query": promql}, timeout=10)
|
|
response.raise_for_status()
|
|
payload = response.json()
|
|
return payload.get("data", {}).get("result", [])
|
|
|
|
|
|
def test_glue_metrics_present():
|
|
series = _query('kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}')
|
|
assert series, "No glue cronjob label series found"
|
|
|
|
|
|
def test_glue_metrics_success_join():
|
|
query = (
|
|
"kube_cronjob_status_last_successful_time "
|
|
'and on(namespace,cronjob) kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}'
|
|
)
|
|
series = _query(query)
|
|
assert series, "No glue cronjob last success series found"
|