from __future__ import annotations import os import requests VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server:8428").rstrip("/") def _query(promql: str) -> list[dict]: response = requests.get(f"{VM_URL}/api/v1/query", params={"query": promql}, timeout=10) response.raise_for_status() payload = response.json() return payload.get("data", {}).get("result", []) def test_glue_metrics_present(): series = _query('kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}') assert series, "No glue cronjob label series found" def test_glue_metrics_success_join(): query = ( "kube_cronjob_status_last_successful_time " 'and on(namespace,cronjob) kube_cronjob_labels{label_atlas_bstein_dev_glue="true"}' ) series = _query(query) assert series, "No glue cronjob last success series found"