47 lines
1.4 KiB
Python
47 lines
1.4 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
from kubernetes import client, config
|
|
|
|
|
|
CONFIG_PATH = Path(__file__).with_name("config.yaml")
|
|
|
|
|
|
def _load_config() -> dict:
|
|
with CONFIG_PATH.open("r", encoding="utf-8") as handle:
|
|
return yaml.safe_load(handle) or {}
|
|
|
|
|
|
def _load_kube():
|
|
try:
|
|
config.load_incluster_config()
|
|
except config.ConfigException:
|
|
config.load_kube_config()
|
|
|
|
|
|
def test_glue_cronjobs_recent_success():
|
|
cfg = _load_config()
|
|
max_age_hours = int(cfg.get("max_success_age_hours", 48))
|
|
allow_suspended = set(cfg.get("allow_suspended", []))
|
|
|
|
_load_kube()
|
|
batch = client.BatchV1Api()
|
|
cronjobs = batch.list_cron_job_for_all_namespaces(label_selector="atlas.bstein.dev/glue=true").items
|
|
|
|
assert cronjobs, "No glue cronjobs found with atlas.bstein.dev/glue=true"
|
|
|
|
now = datetime.now(timezone.utc)
|
|
for cronjob in cronjobs:
|
|
name = f"{cronjob.metadata.namespace}/{cronjob.metadata.name}"
|
|
if cronjob.spec.suspend:
|
|
assert name in allow_suspended, f"{name} is suspended but not in allow_suspended"
|
|
continue
|
|
|
|
last_success = cronjob.status.last_successful_time
|
|
assert last_success is not None, f"{name} has no lastSuccessfulTime"
|
|
age_hours = (now - last_success).total_seconds() / 3600
|
|
assert age_hours <= max_age_hours, f"{name} last success {age_hours:.1f}h ago"
|