diff --git a/ariadne/app.py b/ariadne/app.py index d5665d7..0805c60 100644 --- a/ariadne/app.py +++ b/ariadne/app.py @@ -26,7 +26,9 @@ from .services.mailu_events import mailu_events from .services.nextcloud import nextcloud from .services.image_sweeper import image_sweeper from .services.metis import metis +from .services.metis_token_sync import metis_token_sync from .services.opensearch_prune import prune_indices +from .services.platform_quality_probe import platform_quality_probe from .services.pod_cleaner import clean_finished_pods from .services.vaultwarden_sync import run_vaultwarden_sync from .services.vault import vault @@ -315,6 +317,16 @@ def _startup() -> None: settings.metis_sentinel_watch_cron, lambda: metis.watch_sentinel(), ) + scheduler.add_task( + "schedule.metis_k3s_token_sync", + settings.metis_k3s_token_sync_cron, + lambda: metis_token_sync.run(wait=True), + ) + scheduler.add_task( + "schedule.platform_quality_suite_probe", + settings.platform_quality_suite_probe_cron, + lambda: platform_quality_probe.run(wait=True), + ) scheduler.add_task( "schedule.vault_k8s_auth", settings.vault_k8s_auth_cron, @@ -368,6 +380,8 @@ def _startup() -> None: "opensearch_prune_cron": settings.opensearch_prune_cron, "image_sweeper_cron": settings.image_sweeper_cron, "metis_sentinel_watch_cron": settings.metis_sentinel_watch_cron, + "metis_k3s_token_sync_cron": settings.metis_k3s_token_sync_cron, + "platform_quality_suite_probe_cron": settings.platform_quality_suite_probe_cron, "vault_k8s_auth_cron": settings.vault_k8s_auth_cron, "vault_oidc_cron": settings.vault_oidc_cron, "comms_guest_name_cron": settings.comms_guest_name_cron, diff --git a/ariadne/services/metis_token_sync.py b/ariadne/services/metis_token_sync.py new file mode 100644 index 0000000..aefd1c4 --- /dev/null +++ b/ariadne/services/metis_token_sync.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +from dataclasses import dataclass +import time +from typing import Any + +from ..k8s.client import get_json, post_json +from ..settings import settings +from ..utils.logging import get_logger + + +logger = get_logger(__name__) + +_SYNC_SCRIPT = """ +set -eu +token="$(tr -d '\n' < /host/var/lib/rancher/k3s/server/token)" +jwt="$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)" +VAULT_TOKEN="$(vault write -field=token auth/kubernetes/login role="${VAULT_K8S_ROLE}" jwt="${jwt}")" +export VAULT_TOKEN +vault kv put kv/atlas/maintenance/metis-runtime k3s_token="${token}" +""".strip() + + +@dataclass(frozen=True) +class MetisTokenSyncResult: + """Represent a single metis token-sync execution outcome. + + Inputs: job metadata and completion status gathered from the Kubernetes Job API. + Outputs: a stable result shape used by scheduler logs/metrics so operators can + quickly confirm whether token sync completed, is still running, or failed. + """ + + job: str + status: str + + +class MetisTokenSyncService: + """Run metis token synchronization via one-shot Kubernetes Jobs. + + Inputs: scheduler invocations and runtime settings for namespace, role, and + node placement. + Outputs: per-run status that confirms whether Ariadne successfully synced + the k3s server token into Vault. + """ + + def _job_payload(self, job_name: str) -> dict[str, Any]: + payload: dict[str, Any] = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": job_name, + "namespace": settings.metis_token_sync_namespace, + "labels": { + "app": "metis-k3s-token-sync", + "atlas.bstein.dev/trigger": "ariadne", + }, + }, + "spec": { + "backoffLimit": 1, + "ttlSecondsAfterFinished": settings.metis_token_sync_job_ttl_sec, + "template": { + "spec": { + "serviceAccountName": settings.metis_token_sync_service_account, + "restartPolicy": "OnFailure", + "nodeName": settings.metis_token_sync_node_name, + "tolerations": [ + { + "key": "node-role.kubernetes.io/control-plane", + "operator": "Exists", + "effect": "NoSchedule", + }, + { + "key": "node-role.kubernetes.io/master", + "operator": "Exists", + "effect": "NoSchedule", + }, + ], + "containers": [ + { + "name": "sync", + "image": settings.metis_token_sync_image, + "imagePullPolicy": "IfNotPresent", + "command": ["/bin/sh", "-c"], + "args": [_SYNC_SCRIPT], + "env": [ + {"name": "VAULT_ADDR", "value": settings.metis_token_sync_vault_addr}, + { + "name": "VAULT_K8S_ROLE", + "value": settings.metis_token_sync_vault_k8s_role, + }, + ], + "securityContext": {"runAsUser": 0}, + "volumeMounts": [ + { + "name": "k3s-server", + "mountPath": "/host/var/lib/rancher/k3s/server", + "readOnly": True, + } + ], + } + ], + "volumes": [ + { + "name": "k3s-server", + "hostPath": {"path": "/var/lib/rancher/k3s/server"}, + } + ], + } + }, + }, + } + return payload + + def _wait_for_completion(self, job_name: str, timeout_sec: float) -> MetisTokenSyncResult: + deadline = time.time() + timeout_sec + while time.time() < deadline: + job = get_json( + f"/apis/batch/v1/namespaces/{settings.metis_token_sync_namespace}/jobs/{job_name}" + ) + status = job.get("status") if isinstance(job.get("status"), dict) else {} + if int(status.get("succeeded") or 0) > 0: + return MetisTokenSyncResult(job=job_name, status="ok") + if int(status.get("failed") or 0) > 0: + return MetisTokenSyncResult(job=job_name, status="error") + time.sleep(2) + return MetisTokenSyncResult(job=job_name, status="running") + + def run(self, wait: bool = True) -> dict[str, Any]: + """Launch and optionally wait on a metis token-sync job. + + Inputs: `wait` to control synchronous verification. + Outputs: a JSON-serializable status payload that the scheduler records in + metrics/event history for operator visibility. + """ + + job_name = f"metis-k3s-token-sync-{int(time.time())}" + created = post_json( + f"/apis/batch/v1/namespaces/{settings.metis_token_sync_namespace}/jobs", + self._job_payload(job_name), + ) + name = created.get("metadata", {}).get("name", job_name) + logger.info( + "metis token sync job triggered", + extra={"event": "metis_token_sync_trigger", "job": name}, + ) + if not wait: + return {"job": name, "status": "queued"} + + result = self._wait_for_completion(name, settings.metis_token_sync_wait_timeout_sec) + if result.status != "ok": + logger.error( + "metis token sync job incomplete", + extra={"event": "metis_token_sync_incomplete", "job": name, "status": result.status}, + ) + raise RuntimeError(f"metis token sync job {name} {result.status}") + return {"job": result.job, "status": result.status} + + +metis_token_sync = MetisTokenSyncService() diff --git a/ariadne/services/platform_quality_probe.py b/ariadne/services/platform_quality_probe.py new file mode 100644 index 0000000..efbb4d0 --- /dev/null +++ b/ariadne/services/platform_quality_probe.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +from dataclasses import dataclass +import time +from typing import Any + +from ..k8s.client import get_json, post_json +from ..settings import settings +from ..utils.logging import get_logger + + +logger = get_logger(__name__) + + +@dataclass(frozen=True) +class PlatformQualityProbeResult: + """Represent one platform-quality probe execution. + + Inputs: Kubernetes Job completion details gathered from API polling. + Outputs: a stable status payload for scheduler logs and metrics. + """ + + job: str + status: str + + +class PlatformQualityProbeService: + """Run the platform quality-suite probe as an Ariadne-owned one-shot Job. + + Inputs: scheduler invocations plus settings that define namespace, image, + probe script ConfigMap, and Pushgateway endpoint. + Outputs: structured run status so operators can verify probe freshness + without relying on standalone CronJob ownership. + """ + + def _job_payload(self, job_name: str) -> dict[str, Any]: + payload: dict[str, Any] = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": job_name, + "namespace": settings.platform_quality_probe_namespace, + "labels": { + "app": "platform-quality-suite-probe", + "atlas.bstein.dev/trigger": "ariadne", + }, + }, + "spec": { + "backoffLimit": 0, + "ttlSecondsAfterFinished": settings.platform_quality_probe_job_ttl_sec, + "template": { + "metadata": {"labels": {"app": "platform-quality-suite-probe"}}, + "spec": { + "restartPolicy": "Never", + "containers": [ + { + "name": "probe", + "image": settings.platform_quality_probe_image, + "imagePullPolicy": "IfNotPresent", + "command": ["/bin/sh", "/scripts/platform_quality_suite_probe.sh"], + "env": [ + { + "name": "PUSHGATEWAY_URL", + "value": settings.platform_quality_probe_pushgateway_url, + }, + { + "name": "HTTP_TIMEOUT_SECONDS", + "value": str(settings.platform_quality_probe_http_timeout_sec), + }, + ], + "volumeMounts": [ + {"name": "probe-script", "mountPath": "/scripts", "readOnly": True}, + ], + } + ], + "volumes": [ + { + "name": "probe-script", + "configMap": { + "name": settings.platform_quality_probe_script_configmap, + "defaultMode": 365, + }, + } + ], + }, + }, + }, + } + return payload + + def _wait_for_completion(self, job_name: str, timeout_sec: float) -> PlatformQualityProbeResult: + deadline = time.time() + timeout_sec + while time.time() < deadline: + job = get_json( + f"/apis/batch/v1/namespaces/{settings.platform_quality_probe_namespace}/jobs/{job_name}" + ) + status = job.get("status") if isinstance(job.get("status"), dict) else {} + if int(status.get("succeeded") or 0) > 0: + return PlatformQualityProbeResult(job=job_name, status="ok") + if int(status.get("failed") or 0) > 0: + return PlatformQualityProbeResult(job=job_name, status="error") + time.sleep(2) + return PlatformQualityProbeResult(job=job_name, status="running") + + def run(self, wait: bool = True) -> dict[str, Any]: + """Launch and optionally wait on the quality-suite probe job. + + Inputs: `wait` controls whether the scheduler blocks until completion. + Outputs: job identity and status for metrics/events so Grafana can report + the latest probe outcome. + """ + + job_name = f"platform-quality-suite-probe-{int(time.time())}" + created = post_json( + f"/apis/batch/v1/namespaces/{settings.platform_quality_probe_namespace}/jobs", + self._job_payload(job_name), + ) + name = created.get("metadata", {}).get("name", job_name) + logger.info( + "platform quality probe job triggered", + extra={"event": "platform_quality_probe_trigger", "job": name}, + ) + if not wait: + return {"job": name, "status": "queued"} + + result = self._wait_for_completion(name, settings.platform_quality_probe_wait_timeout_sec) + if result.status != "ok": + logger.error( + "platform quality probe incomplete", + extra={"event": "platform_quality_probe_incomplete", "job": name, "status": result.status}, + ) + raise RuntimeError(f"platform quality probe job {name} {result.status}") + return {"job": result.job, "status": result.status} + + +platform_quality_probe = PlatformQualityProbeService() diff --git a/ariadne/settings.py b/ariadne/settings.py index 80b428a..c17a1dd 100644 --- a/ariadne/settings.py +++ b/ariadne/settings.py @@ -161,6 +161,13 @@ class Settings: image_sweeper_service_account: str image_sweeper_job_ttl_sec: int image_sweeper_wait_timeout_sec: float + platform_quality_probe_namespace: str + platform_quality_probe_script_configmap: str + platform_quality_probe_image: str + platform_quality_probe_job_ttl_sec: int + platform_quality_probe_wait_timeout_sec: float + platform_quality_probe_pushgateway_url: str + platform_quality_probe_http_timeout_sec: int vaultwarden_namespace: str vaultwarden_pod_label: str @@ -217,6 +224,16 @@ class Settings: metis_watch_url: str metis_timeout_sec: float metis_sentinel_watch_cron: str + metis_token_sync_namespace: str + metis_token_sync_service_account: str + metis_token_sync_node_name: str + metis_token_sync_image: str + metis_token_sync_job_ttl_sec: int + metis_token_sync_wait_timeout_sec: float + metis_token_sync_vault_addr: str + metis_token_sync_vault_k8s_role: str + metis_k3s_token_sync_cron: str + platform_quality_suite_probe_cron: str opensearch_url: str opensearch_limit_bytes: int @@ -424,6 +441,24 @@ class Settings: "image_sweeper_wait_timeout_sec": _env_float("IMAGE_SWEEPER_WAIT_TIMEOUT_SEC", 1200.0), } + @classmethod + def _platform_quality_probe_config(cls) -> dict[str, Any]: + return { + "platform_quality_probe_namespace": _env("PLATFORM_QUALITY_PROBE_NAMESPACE", "monitoring"), + "platform_quality_probe_script_configmap": _env( + "PLATFORM_QUALITY_PROBE_SCRIPT_CONFIGMAP", + "platform-quality-suite-probe-script", + ), + "platform_quality_probe_image": _env("PLATFORM_QUALITY_PROBE_IMAGE", "curlimages/curl:8.12.1"), + "platform_quality_probe_job_ttl_sec": _env_int("PLATFORM_QUALITY_PROBE_JOB_TTL_SEC", 1800), + "platform_quality_probe_wait_timeout_sec": _env_float("PLATFORM_QUALITY_PROBE_WAIT_TIMEOUT_SEC", 180.0), + "platform_quality_probe_pushgateway_url": _env( + "PLATFORM_QUALITY_PROBE_PUSHGATEWAY_URL", + "http://platform-quality-gateway.monitoring.svc.cluster.local:9091", + ).rstrip("/"), + "platform_quality_probe_http_timeout_sec": _env_int("PLATFORM_QUALITY_PROBE_HTTP_TIMEOUT_SECONDS", 12), + } + @classmethod def _vaultwarden_config(cls) -> dict[str, Any]: return { @@ -465,6 +500,11 @@ class Settings: "comms_reset_room_cron": _env("ARIADNE_SCHEDULE_COMMS_RESET_ROOM", "0 0 1 1 *"), "comms_seed_room_cron": _env("ARIADNE_SCHEDULE_COMMS_SEED_ROOM", "*/10 * * * *"), "keycloak_profile_cron": _env("ARIADNE_SCHEDULE_KEYCLOAK_PROFILE", "0 */6 * * *"), + "metis_k3s_token_sync_cron": _env("ARIADNE_SCHEDULE_METIS_K3S_TOKEN_SYNC", "11 */6 * * *"), + "platform_quality_suite_probe_cron": _env( + "ARIADNE_SCHEDULE_PLATFORM_QUALITY_SUITE_PROBE", + "*/15 * * * *", + ), } @classmethod @@ -487,6 +527,17 @@ class Settings: "metis_watch_url": _env("METIS_WATCH_URL", "").rstrip("/"), "metis_timeout_sec": _env_float("METIS_TIMEOUT_SEC", 10.0), "metis_sentinel_watch_cron": _env("ARIADNE_SCHEDULE_METIS_SENTINEL_WATCH", "*/15 * * * *"), + "metis_token_sync_namespace": _env("METIS_TOKEN_SYNC_NAMESPACE", "maintenance"), + "metis_token_sync_service_account": _env("METIS_TOKEN_SYNC_SERVICE_ACCOUNT", "metis-token-sync"), + "metis_token_sync_node_name": _env("METIS_TOKEN_SYNC_NODE_NAME", "titan-0a"), + "metis_token_sync_image": _env("METIS_TOKEN_SYNC_IMAGE", "hashicorp/vault:1.17.6"), + "metis_token_sync_job_ttl_sec": _env_int("METIS_TOKEN_SYNC_JOB_TTL_SEC", 1800), + "metis_token_sync_wait_timeout_sec": _env_float("METIS_TOKEN_SYNC_WAIT_TIMEOUT_SEC", 180.0), + "metis_token_sync_vault_addr": _env( + "METIS_TOKEN_SYNC_VAULT_ADDR", + "http://vault.vault.svc.cluster.local:8200", + ).rstrip("/"), + "metis_token_sync_vault_k8s_role": _env("METIS_TOKEN_SYNC_VAULT_K8S_ROLE", "maintenance-metis-token-sync"), } @classmethod @@ -513,6 +564,7 @@ class Settings: vault_cfg = cls._vault_config() comms_cfg = cls._comms_config() image_cfg = cls._image_sweeper_config() + platform_quality_probe_cfg = cls._platform_quality_probe_config() vaultwarden_cfg = cls._vaultwarden_config() schedule_cfg = cls._schedule_config() cluster_cfg = cls._cluster_state_config() @@ -552,6 +604,7 @@ class Settings: **vault_cfg, **comms_cfg, **image_cfg, + **platform_quality_probe_cfg, **vaultwarden_cfg, **schedule_cfg, **cluster_cfg, diff --git a/tests/test_app.py b/tests/test_app.py index 1c52ff7..70dd90c 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -62,6 +62,8 @@ def test_startup_registers_metis_watch(monkeypatch) -> None: app_module._startup() assert any(name == "schedule.metis_sentinel_watch" for name, _cron in tasks) + assert any(name == "schedule.metis_k3s_token_sync" for name, _cron in tasks) + assert any(name == "schedule.platform_quality_suite_probe" for name, _cron in tasks) def test_record_event_handles_exception(monkeypatch) -> None: diff --git a/tests/test_metis_token_sync.py b/tests/test_metis_token_sync.py new file mode 100644 index 0000000..a70ae48 --- /dev/null +++ b/tests/test_metis_token_sync.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from ariadne.services import metis_token_sync as module + + +def _settings() -> SimpleNamespace: + return SimpleNamespace( + metis_token_sync_namespace="maintenance", + metis_token_sync_service_account="metis-token-sync", + metis_token_sync_node_name="titan-0a", + metis_token_sync_image="hashicorp/vault:1.17.6", + metis_token_sync_job_ttl_sec=1800, + metis_token_sync_wait_timeout_sec=10.0, + metis_token_sync_vault_addr="http://vault.vault.svc.cluster.local:8200", + metis_token_sync_vault_k8s_role="maintenance-metis-token-sync", + ) + + +def test_payload_matches_expected_contract(monkeypatch) -> None: + monkeypatch.setattr(module, "settings", _settings()) + payload = module.MetisTokenSyncService()._job_payload("sync-job") + + assert payload["metadata"]["namespace"] == "maintenance" + assert payload["metadata"]["labels"]["atlas.bstein.dev/trigger"] == "ariadne" + spec = payload["spec"]["template"]["spec"] + assert spec["serviceAccountName"] == "metis-token-sync" + assert spec["nodeName"] == "titan-0a" + assert spec["containers"][0]["image"] == "hashicorp/vault:1.17.6" + assert spec["containers"][0]["volumeMounts"][0]["mountPath"] == "/host/var/lib/rancher/k3s/server" + + +def test_run_wait_success(monkeypatch) -> None: + monkeypatch.setattr(module, "settings", _settings()) + monkeypatch.setattr(module.time, "time", lambda: 1710000000) + + posted: dict[str, object] = {} + + def fake_post(path: str, payload: dict[str, object]) -> dict[str, object]: + posted["path"] = path + posted["payload"] = payload + return {"metadata": {"name": "sync-job-1"}} + + calls = iter( + [ + {"status": {"active": 1}}, + {"status": {"succeeded": 1}}, + ] + ) + + monkeypatch.setattr(module, "post_json", fake_post) + monkeypatch.setattr(module, "get_json", lambda _path: next(calls)) + monkeypatch.setattr(module.time, "sleep", lambda _seconds: None) + + result = module.MetisTokenSyncService().run(wait=True) + + assert posted["path"] == "/apis/batch/v1/namespaces/maintenance/jobs" + assert result == {"job": "sync-job-1", "status": "ok"} + + +def test_run_wait_failure_raises(monkeypatch) -> None: + monkeypatch.setattr(module, "settings", _settings()) + monkeypatch.setattr(module.time, "time", lambda: 1710000000) + monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "sync-job-2"}}) + monkeypatch.setattr(module, "get_json", lambda _path: {"status": {"failed": 1}}) + monkeypatch.setattr(module.time, "sleep", lambda _seconds: None) + + with pytest.raises(RuntimeError, match="metis token sync job sync-job-2 error"): + module.MetisTokenSyncService().run(wait=True) + + +def test_run_without_wait_queues(monkeypatch) -> None: + monkeypatch.setattr(module, "settings", _settings()) + monkeypatch.setattr(module.time, "time", lambda: 1710000000) + monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "sync-job-3"}}) + + result = module.MetisTokenSyncService().run(wait=False) + + assert result == {"job": "sync-job-3", "status": "queued"} diff --git a/tests/test_platform_quality_probe.py b/tests/test_platform_quality_probe.py new file mode 100644 index 0000000..5d83516 --- /dev/null +++ b/tests/test_platform_quality_probe.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from ariadne.services import platform_quality_probe as module + + +def _settings() -> SimpleNamespace: + return SimpleNamespace( + platform_quality_probe_namespace="monitoring", + platform_quality_probe_script_configmap="platform-quality-suite-probe-script", + platform_quality_probe_image="curlimages/curl:8.12.1", + platform_quality_probe_job_ttl_sec=1800, + platform_quality_probe_wait_timeout_sec=20.0, + platform_quality_probe_pushgateway_url="http://platform-quality-gateway.monitoring.svc.cluster.local:9091", + platform_quality_probe_http_timeout_sec=12, + ) + + +def test_payload_matches_expected_contract(monkeypatch) -> None: + monkeypatch.setattr(module, "settings", _settings()) + payload = module.PlatformQualityProbeService()._job_payload("probe-job") + + assert payload["metadata"]["namespace"] == "monitoring" + assert payload["metadata"]["labels"]["atlas.bstein.dev/trigger"] == "ariadne" + container = payload["spec"]["template"]["spec"]["containers"][0] + assert container["image"] == "curlimages/curl:8.12.1" + assert container["command"] == ["/bin/sh", "/scripts/platform_quality_suite_probe.sh"] + assert payload["spec"]["template"]["spec"]["volumes"][0]["configMap"]["name"] == "platform-quality-suite-probe-script" + + +def test_run_wait_success(monkeypatch) -> None: + monkeypatch.setattr(module, "settings", _settings()) + monkeypatch.setattr(module.time, "time", lambda: 1720000000) + + posted: dict[str, object] = {} + + def fake_post(path: str, payload: dict[str, object]) -> dict[str, object]: + posted["path"] = path + posted["payload"] = payload + return {"metadata": {"name": "probe-job-1"}} + + calls = iter( + [ + {"status": {"active": 1}}, + {"status": {"succeeded": 1}}, + ] + ) + + monkeypatch.setattr(module, "post_json", fake_post) + monkeypatch.setattr(module, "get_json", lambda _path: next(calls)) + monkeypatch.setattr(module.time, "sleep", lambda _seconds: None) + + result = module.PlatformQualityProbeService().run(wait=True) + + assert posted["path"] == "/apis/batch/v1/namespaces/monitoring/jobs" + assert result == {"job": "probe-job-1", "status": "ok"} + + +def test_run_wait_failure_raises(monkeypatch) -> None: + monkeypatch.setattr(module, "settings", _settings()) + monkeypatch.setattr(module.time, "time", lambda: 1720000000) + monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "probe-job-2"}}) + monkeypatch.setattr(module, "get_json", lambda _path: {"status": {"failed": 1}}) + monkeypatch.setattr(module.time, "sleep", lambda _seconds: None) + + with pytest.raises(RuntimeError, match="platform quality probe job probe-job-2 error"): + module.PlatformQualityProbeService().run(wait=True) + + +def test_run_without_wait_queues(monkeypatch) -> None: + monkeypatch.setattr(module, "settings", _settings()) + monkeypatch.setattr(module.time, "time", lambda: 1720000000) + monkeypatch.setattr(module, "post_json", lambda _path, _payload: {"metadata": {"name": "probe-job-3"}}) + + result = module.PlatformQualityProbeService().run(wait=False) + + assert result == {"job": "probe-job-3", "status": "queued"}