ariadne: add schedule-owned metis token sync and platform probe

This commit is contained in:
Brad Stein 2026-04-10 22:35:33 -03:00
parent 06759399fb
commit aa7098efad
7 changed files with 526 additions and 0 deletions

View File

@ -26,7 +26,9 @@ from .services.mailu_events import mailu_events
from .services.nextcloud import nextcloud
from .services.image_sweeper import image_sweeper
from .services.metis import metis
from .services.metis_token_sync import metis_token_sync
from .services.opensearch_prune import prune_indices
from .services.platform_quality_probe import platform_quality_probe
from .services.pod_cleaner import clean_finished_pods
from .services.vaultwarden_sync import run_vaultwarden_sync
from .services.vault import vault
@ -315,6 +317,16 @@ def _startup() -> None:
settings.metis_sentinel_watch_cron,
lambda: metis.watch_sentinel(),
)
scheduler.add_task(
"schedule.metis_k3s_token_sync",
settings.metis_k3s_token_sync_cron,
lambda: metis_token_sync.run(wait=True),
)
scheduler.add_task(
"schedule.platform_quality_suite_probe",
settings.platform_quality_suite_probe_cron,
lambda: platform_quality_probe.run(wait=True),
)
scheduler.add_task(
"schedule.vault_k8s_auth",
settings.vault_k8s_auth_cron,
@ -368,6 +380,8 @@ def _startup() -> None:
"opensearch_prune_cron": settings.opensearch_prune_cron,
"image_sweeper_cron": settings.image_sweeper_cron,
"metis_sentinel_watch_cron": settings.metis_sentinel_watch_cron,
"metis_k3s_token_sync_cron": settings.metis_k3s_token_sync_cron,
"platform_quality_suite_probe_cron": settings.platform_quality_suite_probe_cron,
"vault_k8s_auth_cron": settings.vault_k8s_auth_cron,
"vault_oidc_cron": settings.vault_oidc_cron,
"comms_guest_name_cron": settings.comms_guest_name_cron,

View File

@ -0,0 +1,159 @@
from __future__ import annotations
from dataclasses import dataclass
import time
from typing import Any
from ..k8s.client import get_json, post_json
from ..settings import settings
from ..utils.logging import get_logger
logger = get_logger(__name__)
_SYNC_SCRIPT = """
set -eu
token="$(tr -d '\n' < /host/var/lib/rancher/k3s/server/token)"
jwt="$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"
VAULT_TOKEN="$(vault write -field=token auth/kubernetes/login role="${VAULT_K8S_ROLE}" jwt="${jwt}")"
export VAULT_TOKEN
vault kv put kv/atlas/maintenance/metis-runtime k3s_token="${token}"
""".strip()
@dataclass(frozen=True)
class MetisTokenSyncResult:
"""Represent a single metis token-sync execution outcome.
Inputs: job metadata and completion status gathered from the Kubernetes Job API.
Outputs: a stable result shape used by scheduler logs/metrics so operators can
quickly confirm whether token sync completed, is still running, or failed.
"""
job: str
status: str
class MetisTokenSyncService:
"""Run metis token synchronization via one-shot Kubernetes Jobs.
Inputs: scheduler invocations and runtime settings for namespace, role, and
node placement.
Outputs: per-run status that confirms whether Ariadne successfully synced
the k3s server token into Vault.
"""
def _job_payload(self, job_name: str) -> dict[str, Any]:
payload: dict[str, Any] = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": job_name,
"namespace": settings.metis_token_sync_namespace,
"labels": {
"app": "metis-k3s-token-sync",
"atlas.bstein.dev/trigger": "ariadne",
},
},
"spec": {
"backoffLimit": 1,
"ttlSecondsAfterFinished": settings.metis_token_sync_job_ttl_sec,
"template": {
"spec": {
"serviceAccountName": settings.metis_token_sync_service_account,
"restartPolicy": "OnFailure",
"nodeName": settings.metis_token_sync_node_name,
"tolerations": [
{
"key": "node-role.kubernetes.io/control-plane",
"operator": "Exists",
"effect": "NoSchedule",
},
{
"key": "node-role.kubernetes.io/master",
"operator": "Exists",
"effect": "NoSchedule",
},
],
"containers": [
{
"name": "sync",
"image": settings.metis_token_sync_image,
"imagePullPolicy": "IfNotPresent",
"command": ["/bin/sh", "-c"],
"args": [_SYNC_SCRIPT],
"env": [
{"name": "VAULT_ADDR", "value": settings.metis_token_sync_vault_addr},
{
"name": "VAULT_K8S_ROLE",
"value": settings.metis_token_sync_vault_k8s_role,
},
],
"securityContext": {"runAsUser": 0},
"volumeMounts": [
{
"name": "k3s-server",
"mountPath": "/host/var/lib/rancher/k3s/server",
"readOnly": True,
}
],
}
],
"volumes": [
{
"name": "k3s-server",
"hostPath": {"path": "/var/lib/rancher/k3s/server"},
}
],
}
},
},
}
return payload
def _wait_for_completion(self, job_name: str, timeout_sec: float) -> MetisTokenSyncResult:
deadline = time.time() + timeout_sec
while time.time() < deadline:
job = get_json(
f"/apis/batch/v1/namespaces/{settings.metis_token_sync_namespace}/jobs/{job_name}"
)
status = job.get("status") if isinstance(job.get("status"), dict) else {}
if int(status.get("succeeded") or 0) > 0:
return MetisTokenSyncResult(job=job_name, status="ok")
if int(status.get("failed") or 0) > 0:
return MetisTokenSyncResult(job=job_name, status="error")
time.sleep(2)
return MetisTokenSyncResult(job=job_name, status="running")
def run(self, wait: bool = True) -> dict[str, Any]:
"""Launch and optionally wait on a metis token-sync job.
Inputs: `wait` to control synchronous verification.
Outputs: a JSON-serializable status payload that the scheduler records in
metrics/event history for operator visibility.
"""
job_name = f"metis-k3s-token-sync-{int(time.time())}"
created = post_json(
f"/apis/batch/v1/namespaces/{settings.metis_token_sync_namespace}/jobs",
self._job_payload(job_name),
)
name = created.get("metadata", {}).get("name", job_name)
logger.info(
"metis token sync job triggered",
extra={"event": "metis_token_sync_trigger", "job": name},
)
if not wait:
return {"job": name, "status": "queued"}
result = self._wait_for_completion(name, settings.metis_token_sync_wait_timeout_sec)
if result.status != "ok":
logger.error(
"metis token sync job incomplete",
extra={"event": "metis_token_sync_incomplete", "job": name, "status": result.status},
)
raise RuntimeError(f"metis token sync job {name} {result.status}")
return {"job": result.job, "status": result.status}
metis_token_sync = MetisTokenSyncService()

View File

@ -0,0 +1,136 @@
from __future__ import annotations
from dataclasses import dataclass
import time
from typing import Any
from ..k8s.client import get_json, post_json
from ..settings import settings
from ..utils.logging import get_logger
logger = get_logger(__name__)
@dataclass(frozen=True)
class PlatformQualityProbeResult:
"""Represent one platform-quality probe execution.
Inputs: Kubernetes Job completion details gathered from API polling.
Outputs: a stable status payload for scheduler logs and metrics.
"""
job: str
status: str
class PlatformQualityProbeService:
"""Run the platform quality-suite probe as an Ariadne-owned one-shot Job.
Inputs: scheduler invocations plus settings that define namespace, image,
probe script ConfigMap, and Pushgateway endpoint.
Outputs: structured run status so operators can verify probe freshness
without relying on standalone CronJob ownership.
"""
def _job_payload(self, job_name: str) -> dict[str, Any]:
payload: dict[str, Any] = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": job_name,
"namespace": settings.platform_quality_probe_namespace,
"labels": {
"app": "platform-quality-suite-probe",
"atlas.bstein.dev/trigger": "ariadne",
},
},
"spec": {
"backoffLimit": 0,
"ttlSecondsAfterFinished": settings.platform_quality_probe_job_ttl_sec,
"template": {
"metadata": {"labels": {"app": "platform-quality-suite-probe"}},
"spec": {
"restartPolicy": "Never",
"containers": [
{
"name": "probe",
"image": settings.platform_quality_probe_image,
"imagePullPolicy": "IfNotPresent",
"command": ["/bin/sh", "/scripts/platform_quality_suite_probe.sh"],
"env": [
{
"name": "PUSHGATEWAY_URL",
"value": settings.platform_quality_probe_pushgateway_url,
},
{
"name": "HTTP_TIMEOUT_SECONDS",
"value": str(settings.platform_quality_probe_http_timeout_sec),
},
],
"volumeMounts": [
{"name": "probe-script", "mountPath": "/scripts", "readOnly": True},
],
}
],
"volumes": [
{
"name": "probe-script",
"configMap": {
"name": settings.platform_quality_probe_script_configmap,
"defaultMode": 365,
},
}
],
},
},
},
}
return payload
def _wait_for_completion(self, job_name: str, timeout_sec: float) -> PlatformQualityProbeResult:
deadline = time.time() + timeout_sec
while time.time() < deadline:
job = get_json(
f"/apis/batch/v1/namespaces/{settings.platform_quality_probe_namespace}/jobs/{job_name}"
)
status = job.get("status") if isinstance(job.get("status"), dict) else {}
if int(status.get("succeeded") or 0) > 0:
return PlatformQualityProbeResult(job=job_name, status="ok")
if int(status.get("failed") or 0) > 0:
return PlatformQualityProbeResult(job=job_name, status="error")
time.sleep(2)
return PlatformQualityProbeResult(job=job_name, status="running")
def run(self, wait: bool = True) -> dict[str, Any]:
"""Launch and optionally wait on the quality-suite probe job.
Inputs: `wait` controls whether the scheduler blocks until completion.
Outputs: job identity and status for metrics/events so Grafana can report
the latest probe outcome.
"""
job_name = f"platform-quality-suite-probe-{int(time.time())}"
created = post_json(
f"/apis/batch/v1/namespaces/{settings.platform_quality_probe_namespace}/jobs",
self._job_payload(job_name),
)
name = created.get("metadata", {}).get("name", job_name)
logger.info(
"platform quality probe job triggered",
extra={"event": "platform_quality_probe_trigger", "job": name},
)
if not wait:
return {"job": name, "status": "queued"}
result = self._wait_for_completion(name, settings.platform_quality_probe_wait_timeout_sec)
if result.status != "ok":
logger.error(
"platform quality probe incomplete",
extra={"event": "platform_quality_probe_incomplete", "job": name, "status": result.status},
)
raise RuntimeError(f"platform quality probe job {name} {result.status}")
return {"job": result.job, "status": result.status}
platform_quality_probe = PlatformQualityProbeService()

View File

@ -161,6 +161,13 @@ class Settings:
image_sweeper_service_account: str
image_sweeper_job_ttl_sec: int
image_sweeper_wait_timeout_sec: float
platform_quality_probe_namespace: str
platform_quality_probe_script_configmap: str
platform_quality_probe_image: str
platform_quality_probe_job_ttl_sec: int
platform_quality_probe_wait_timeout_sec: float
platform_quality_probe_pushgateway_url: str
platform_quality_probe_http_timeout_sec: int
vaultwarden_namespace: str
vaultwarden_pod_label: str
@ -217,6 +224,16 @@ class Settings:
metis_watch_url: str
metis_timeout_sec: float
metis_sentinel_watch_cron: str
metis_token_sync_namespace: str
metis_token_sync_service_account: str
metis_token_sync_node_name: str
metis_token_sync_image: str
metis_token_sync_job_ttl_sec: int
metis_token_sync_wait_timeout_sec: float
metis_token_sync_vault_addr: str
metis_token_sync_vault_k8s_role: str
metis_k3s_token_sync_cron: str
platform_quality_suite_probe_cron: str
opensearch_url: str
opensearch_limit_bytes: int
@ -424,6 +441,24 @@ class Settings:
"image_sweeper_wait_timeout_sec": _env_float("IMAGE_SWEEPER_WAIT_TIMEOUT_SEC", 1200.0),
}
@classmethod
def _platform_quality_probe_config(cls) -> dict[str, Any]:
return {
"platform_quality_probe_namespace": _env("PLATFORM_QUALITY_PROBE_NAMESPACE", "monitoring"),
"platform_quality_probe_script_configmap": _env(
"PLATFORM_QUALITY_PROBE_SCRIPT_CONFIGMAP",
"platform-quality-suite-probe-script",
),
"platform_quality_probe_image": _env("PLATFORM_QUALITY_PROBE_IMAGE", "curlimages/curl:8.12.1"),
"platform_quality_probe_job_ttl_sec": _env_int("PLATFORM_QUALITY_PROBE_JOB_TTL_SEC", 1800),
"platform_quality_probe_wait_timeout_sec": _env_float("PLATFORM_QUALITY_PROBE_WAIT_TIMEOUT_SEC", 180.0),
"platform_quality_probe_pushgateway_url": _env(
"PLATFORM_QUALITY_PROBE_PUSHGATEWAY_URL",
"http://platform-quality-gateway.monitoring.svc.cluster.local:9091",
).rstrip("/"),
"platform_quality_probe_http_timeout_sec": _env_int("PLATFORM_QUALITY_PROBE_HTTP_TIMEOUT_SECONDS", 12),
}
@classmethod
def _vaultwarden_config(cls) -> dict[str, Any]:
return {
@ -465,6 +500,11 @@ class Settings:
"comms_reset_room_cron": _env("ARIADNE_SCHEDULE_COMMS_RESET_ROOM", "0 0 1 1 *"),
"comms_seed_room_cron": _env("ARIADNE_SCHEDULE_COMMS_SEED_ROOM", "*/10 * * * *"),
"keycloak_profile_cron": _env("ARIADNE_SCHEDULE_KEYCLOAK_PROFILE", "0 */6 * * *"),
"metis_k3s_token_sync_cron": _env("ARIADNE_SCHEDULE_METIS_K3S_TOKEN_SYNC", "11 */6 * * *"),
"platform_quality_suite_probe_cron": _env(
"ARIADNE_SCHEDULE_PLATFORM_QUALITY_SUITE_PROBE",
"*/15 * * * *",
),
}
@classmethod
@ -487,6 +527,17 @@ class Settings:
"metis_watch_url": _env("METIS_WATCH_URL", "").rstrip("/"),
"metis_timeout_sec": _env_float("METIS_TIMEOUT_SEC", 10.0),
"metis_sentinel_watch_cron": _env("ARIADNE_SCHEDULE_METIS_SENTINEL_WATCH", "*/15 * * * *"),
"metis_token_sync_namespace": _env("METIS_TOKEN_SYNC_NAMESPACE", "maintenance"),
"metis_token_sync_service_account": _env("METIS_TOKEN_SYNC_SERVICE_ACCOUNT", "metis-token-sync"),
"metis_token_sync_node_name": _env("METIS_TOKEN_SYNC_NODE_NAME", "titan-0a"),
"metis_token_sync_image": _env("METIS_TOKEN_SYNC_IMAGE", "hashicorp/vault:1.17.6"),
"metis_token_sync_job_ttl_sec": _env_int("METIS_TOKEN_SYNC_JOB_TTL_SEC", 1800),
"metis_token_sync_wait_timeout_sec": _env_float("METIS_TOKEN_SYNC_WAIT_TIMEOUT_SEC", 180.0),
"metis_token_sync_vault_addr": _env(
"METIS_TOKEN_SYNC_VAULT_ADDR",
"http://vault.vault.svc.cluster.local:8200",
).rstrip("/"),
"metis_token_sync_vault_k8s_role": _env("METIS_TOKEN_SYNC_VAULT_K8S_ROLE", "maintenance-metis-token-sync"),
}
@classmethod
@ -513,6 +564,7 @@ class Settings:
vault_cfg = cls._vault_config()
comms_cfg = cls._comms_config()
image_cfg = cls._image_sweeper_config()
platform_quality_probe_cfg = cls._platform_quality_probe_config()
vaultwarden_cfg = cls._vaultwarden_config()
schedule_cfg = cls._schedule_config()
cluster_cfg = cls._cluster_state_config()
@ -552,6 +604,7 @@ class Settings:
**vault_cfg,
**comms_cfg,
**image_cfg,
**platform_quality_probe_cfg,
**vaultwarden_cfg,
**schedule_cfg,
**cluster_cfg,

View File

@ -62,6 +62,8 @@ def test_startup_registers_metis_watch(monkeypatch) -> None:
app_module._startup()
assert any(name == "schedule.metis_sentinel_watch" for name, _cron in tasks)
assert any(name == "schedule.metis_k3s_token_sync" for name, _cron in tasks)
assert any(name == "schedule.platform_quality_suite_probe" for name, _cron in tasks)
def test_record_event_handles_exception(monkeypatch) -> None:

View File

@ -0,0 +1,82 @@
from __future__ import annotations
from types import SimpleNamespace
import pytest
from ariadne.services import metis_token_sync as module
def _settings() -> SimpleNamespace:
return SimpleNamespace(
metis_token_sync_namespace="maintenance",
metis_token_sync_service_account="metis-token-sync",
metis_token_sync_node_name="titan-0a",
metis_token_sync_image="hashicorp/vault:1.17.6",
metis_token_sync_job_ttl_sec=1800,
metis_token_sync_wait_timeout_sec=10.0,
metis_token_sync_vault_addr="http://vault.vault.svc.cluster.local:8200",
metis_token_sync_vault_k8s_role="maintenance-metis-token-sync",
)
def test_payload_matches_expected_contract(monkeypatch) -> None:
monkeypatch.setattr(module, "settings", _settings())
payload = module.MetisTokenSyncService()._job_payload("sync-job")
assert payload["metadata"]["namespace"] == "maintenance"
assert payload["metadata"]["labels"]["atlas.bstein.dev/trigger"] == "ariadne"
spec = payload["spec"]["template"]["spec"]
assert spec["serviceAccountName"] == "metis-token-sync"
assert spec["nodeName"] == "titan-0a"
assert spec["containers"][0]["image"] == "hashicorp/vault:1.17.6"
assert spec["containers"][0]["volumeMounts"][0]["mountPath"] == "/host/var/lib/rancher/k3s/server"
def test_run_wait_success(monkeypatch) -> None:
monkeypatch.setattr(module, "settings", _settings())
monkeypatch.setattr(module.time, "time", lambda: 1710000000)
posted: dict[str, object] = {}
def fake_post(path: str, payload: dict[str, object]) -> dict[str, object]:
posted["path"] = path
posted["payload"] = payload
return {"metadata": {"name": "sync-job-1"}}
calls = iter(
[
{"status": {"active": 1}},
{"status": {"succeeded": 1}},
]
)
monkeypatch.setattr(module, "post_json", fake_post)
monkeypatch.setattr(module, "get_json", lambda _path: next(calls))
monkeypatch.setattr(module.time, "sleep", lambda _seconds: None)
result = module.MetisTokenSyncService().run(wait=True)
assert posted["path"] == "/apis/batch/v1/namespaces/maintenance/jobs"
assert result == {"job": "sync-job-1", "status": "ok"}
def test_run_wait_failure_raises(monkeypatch) -> None:
monkeypatch.setattr(module, "settings", _settings())
monkeypatch.setattr(module.time, "time", lambda: 1710000000)
monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "sync-job-2"}})
monkeypatch.setattr(module, "get_json", lambda _path: {"status": {"failed": 1}})
monkeypatch.setattr(module.time, "sleep", lambda _seconds: None)
with pytest.raises(RuntimeError, match="metis token sync job sync-job-2 error"):
module.MetisTokenSyncService().run(wait=True)
def test_run_without_wait_queues(monkeypatch) -> None:
monkeypatch.setattr(module, "settings", _settings())
monkeypatch.setattr(module.time, "time", lambda: 1710000000)
monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "sync-job-3"}})
result = module.MetisTokenSyncService().run(wait=False)
assert result == {"job": "sync-job-3", "status": "queued"}

View File

@ -0,0 +1,80 @@
from __future__ import annotations
from types import SimpleNamespace
import pytest
from ariadne.services import platform_quality_probe as module
def _settings() -> SimpleNamespace:
return SimpleNamespace(
platform_quality_probe_namespace="monitoring",
platform_quality_probe_script_configmap="platform-quality-suite-probe-script",
platform_quality_probe_image="curlimages/curl:8.12.1",
platform_quality_probe_job_ttl_sec=1800,
platform_quality_probe_wait_timeout_sec=20.0,
platform_quality_probe_pushgateway_url="http://platform-quality-gateway.monitoring.svc.cluster.local:9091",
platform_quality_probe_http_timeout_sec=12,
)
def test_payload_matches_expected_contract(monkeypatch) -> None:
monkeypatch.setattr(module, "settings", _settings())
payload = module.PlatformQualityProbeService()._job_payload("probe-job")
assert payload["metadata"]["namespace"] == "monitoring"
assert payload["metadata"]["labels"]["atlas.bstein.dev/trigger"] == "ariadne"
container = payload["spec"]["template"]["spec"]["containers"][0]
assert container["image"] == "curlimages/curl:8.12.1"
assert container["command"] == ["/bin/sh", "/scripts/platform_quality_suite_probe.sh"]
assert payload["spec"]["template"]["spec"]["volumes"][0]["configMap"]["name"] == "platform-quality-suite-probe-script"
def test_run_wait_success(monkeypatch) -> None:
monkeypatch.setattr(module, "settings", _settings())
monkeypatch.setattr(module.time, "time", lambda: 1720000000)
posted: dict[str, object] = {}
def fake_post(path: str, payload: dict[str, object]) -> dict[str, object]:
posted["path"] = path
posted["payload"] = payload
return {"metadata": {"name": "probe-job-1"}}
calls = iter(
[
{"status": {"active": 1}},
{"status": {"succeeded": 1}},
]
)
monkeypatch.setattr(module, "post_json", fake_post)
monkeypatch.setattr(module, "get_json", lambda _path: next(calls))
monkeypatch.setattr(module.time, "sleep", lambda _seconds: None)
result = module.PlatformQualityProbeService().run(wait=True)
assert posted["path"] == "/apis/batch/v1/namespaces/monitoring/jobs"
assert result == {"job": "probe-job-1", "status": "ok"}
def test_run_wait_failure_raises(monkeypatch) -> None:
monkeypatch.setattr(module, "settings", _settings())
monkeypatch.setattr(module.time, "time", lambda: 1720000000)
monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "probe-job-2"}})
monkeypatch.setattr(module, "get_json", lambda _path: {"status": {"failed": 1}})
monkeypatch.setattr(module.time, "sleep", lambda _seconds: None)
with pytest.raises(RuntimeError, match="platform quality probe job probe-job-2 error"):
module.PlatformQualityProbeService().run(wait=True)
def test_run_without_wait_queues(monkeypatch) -> None:
monkeypatch.setattr(module, "settings", _settings())
monkeypatch.setattr(module.time, "time", lambda: 1720000000)
monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "probe-job-3"}})
result = module.PlatformQualityProbeService().run(wait=False)
assert result == {"job": "probe-job-3", "status": "queued"}