metis: add sentinel watch task
This commit is contained in:
parent
44336f1272
commit
cf0271a8ea
@ -25,6 +25,7 @@ from .services.mailu import mailu
|
|||||||
from .services.mailu_events import mailu_events
|
from .services.mailu_events import mailu_events
|
||||||
from .services.nextcloud import nextcloud
|
from .services.nextcloud import nextcloud
|
||||||
from .services.image_sweeper import image_sweeper
|
from .services.image_sweeper import image_sweeper
|
||||||
|
from .services.metis import metis
|
||||||
from .services.opensearch_prune import prune_indices
|
from .services.opensearch_prune import prune_indices
|
||||||
from .services.pod_cleaner import clean_finished_pods
|
from .services.pod_cleaner import clean_finished_pods
|
||||||
from .services.vaultwarden_sync import run_vaultwarden_sync
|
from .services.vaultwarden_sync import run_vaultwarden_sync
|
||||||
@ -309,6 +310,11 @@ def _startup() -> None:
|
|||||||
settings.image_sweeper_cron,
|
settings.image_sweeper_cron,
|
||||||
lambda: image_sweeper.run(wait=True),
|
lambda: image_sweeper.run(wait=True),
|
||||||
)
|
)
|
||||||
|
scheduler.add_task(
|
||||||
|
"schedule.metis_sentinel_watch",
|
||||||
|
settings.metis_sentinel_watch_cron,
|
||||||
|
lambda: metis.watch_sentinel(),
|
||||||
|
)
|
||||||
scheduler.add_task(
|
scheduler.add_task(
|
||||||
"schedule.vault_k8s_auth",
|
"schedule.vault_k8s_auth",
|
||||||
settings.vault_k8s_auth_cron,
|
settings.vault_k8s_auth_cron,
|
||||||
@ -361,6 +367,7 @@ def _startup() -> None:
|
|||||||
"pod_cleaner_cron": settings.pod_cleaner_cron,
|
"pod_cleaner_cron": settings.pod_cleaner_cron,
|
||||||
"opensearch_prune_cron": settings.opensearch_prune_cron,
|
"opensearch_prune_cron": settings.opensearch_prune_cron,
|
||||||
"image_sweeper_cron": settings.image_sweeper_cron,
|
"image_sweeper_cron": settings.image_sweeper_cron,
|
||||||
|
"metis_sentinel_watch_cron": settings.metis_sentinel_watch_cron,
|
||||||
"vault_k8s_auth_cron": settings.vault_k8s_auth_cron,
|
"vault_k8s_auth_cron": settings.vault_k8s_auth_cron,
|
||||||
"vault_oidc_cron": settings.vault_oidc_cron,
|
"vault_oidc_cron": settings.vault_oidc_cron,
|
||||||
"comms_guest_name_cron": settings.comms_guest_name_cron,
|
"comms_guest_name_cron": settings.comms_guest_name_cron,
|
||||||
|
|||||||
189
ariadne/services/metis.py
Normal file
189
ariadne/services/metis.py
Normal file
@ -0,0 +1,189 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from ..settings import settings
|
||||||
|
from ..utils.logging import get_logger
|
||||||
|
|
||||||
|
|
||||||
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
|
_SNAPSHOT_KEYS = ("hostname", "kernel", "os_image", "k3s_version", "containerd", "package_sample")
|
||||||
|
_TIMESTAMP_KEYS = ("collected_at", "timestamp", "generated_at", "created_at")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class MetisSentinelWatchSummary:
|
||||||
|
status: str
|
||||||
|
source: str
|
||||||
|
snapshots: int
|
||||||
|
hosts: int
|
||||||
|
hostnames: list[str] = field(default_factory=list)
|
||||||
|
latest_snapshot_at: str = ""
|
||||||
|
latest_snapshot_age_sec: float | None = None
|
||||||
|
detail: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_timestamp(raw: Any) -> datetime | None:
|
||||||
|
if not isinstance(raw, str):
|
||||||
|
return None
|
||||||
|
text = raw.strip()
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(text.replace("Z", "+00:00"))
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_snapshots(payload: Any) -> list[dict[str, Any]]:
|
||||||
|
if isinstance(payload, list):
|
||||||
|
return [item for item in payload if isinstance(item, dict)]
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
return []
|
||||||
|
for key in ("snapshots", "items", "data"):
|
||||||
|
value = payload.get(key)
|
||||||
|
if isinstance(value, list):
|
||||||
|
return [item for item in value if isinstance(item, dict)]
|
||||||
|
if any(key in payload for key in _SNAPSHOT_KEYS):
|
||||||
|
return [payload]
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
def _snapshot_hostname(snapshot: dict[str, Any], fallback: str) -> str:
|
||||||
|
for key in ("hostname", "host", "name"):
|
||||||
|
value = snapshot.get(key)
|
||||||
|
if isinstance(value, str) and value.strip():
|
||||||
|
return value.strip()
|
||||||
|
return fallback
|
||||||
|
|
||||||
|
|
||||||
|
def _snapshot_timestamp(snapshot: dict[str, Any], fallback_mtime: float) -> datetime:
|
||||||
|
for key in _TIMESTAMP_KEYS:
|
||||||
|
ts = _parse_timestamp(snapshot.get(key))
|
||||||
|
if ts is not None:
|
||||||
|
return ts.astimezone(timezone.utc)
|
||||||
|
return datetime.fromtimestamp(fallback_mtime, tz=timezone.utc)
|
||||||
|
|
||||||
|
|
||||||
|
class MetisService:
|
||||||
|
def ready(self) -> bool:
|
||||||
|
return bool(settings.metis_sentinel_dir)
|
||||||
|
|
||||||
|
def _finish(
|
||||||
|
self,
|
||||||
|
status: str,
|
||||||
|
source: str,
|
||||||
|
snapshots: list[dict[str, Any]],
|
||||||
|
detail: str,
|
||||||
|
latest_ts: datetime | None = None,
|
||||||
|
) -> MetisSentinelWatchSummary:
|
||||||
|
hostnames = sorted(
|
||||||
|
{
|
||||||
|
hostname
|
||||||
|
for idx, snapshot in enumerate(snapshots)
|
||||||
|
if (hostname := _snapshot_hostname(snapshot, f"snapshot-{idx + 1}"))
|
||||||
|
}
|
||||||
|
)
|
||||||
|
summary = MetisSentinelWatchSummary(
|
||||||
|
status=status,
|
||||||
|
source=source,
|
||||||
|
snapshots=len(snapshots),
|
||||||
|
hosts=len(hostnames),
|
||||||
|
hostnames=hostnames,
|
||||||
|
latest_snapshot_at=latest_ts.isoformat() if latest_ts is not None else "",
|
||||||
|
latest_snapshot_age_sec=(
|
||||||
|
max(0.0, (datetime.now(timezone.utc) - latest_ts).total_seconds()) if latest_ts is not None else None
|
||||||
|
),
|
||||||
|
detail=detail,
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"metis sentinel watch finished",
|
||||||
|
extra={
|
||||||
|
"event": "metis_sentinel_watch",
|
||||||
|
"status": summary.status,
|
||||||
|
"source": summary.source,
|
||||||
|
"snapshots": summary.snapshots,
|
||||||
|
"hosts": summary.hosts,
|
||||||
|
"detail": summary.detail,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return summary
|
||||||
|
|
||||||
|
def watch_sentinel(self) -> MetisSentinelWatchSummary:
|
||||||
|
if not settings.metis_sentinel_dir:
|
||||||
|
return self._finish("skipped", "", [], "metis sentinel dir not configured")
|
||||||
|
|
||||||
|
source = Path(settings.metis_sentinel_dir)
|
||||||
|
if not source.exists():
|
||||||
|
return self._finish("error", str(source), [], "metis sentinel dir does not exist")
|
||||||
|
if not source.is_dir():
|
||||||
|
return self._finish("error", str(source), [], "metis sentinel path is not a directory")
|
||||||
|
|
||||||
|
snapshots: list[dict[str, Any]] = []
|
||||||
|
latest_ts: datetime | None = None
|
||||||
|
detail_parts: list[str] = []
|
||||||
|
newest_mtime = 0.0
|
||||||
|
|
||||||
|
files = sorted(path for path in source.rglob("*.json") if path.is_file())
|
||||||
|
if not files:
|
||||||
|
return self._finish("error", str(source), [], "metis sentinel dir does not contain snapshots")
|
||||||
|
|
||||||
|
for file_path in files:
|
||||||
|
try:
|
||||||
|
payload = json.loads(file_path.read_text())
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
detail_parts.append(f"{file_path.name}: {exc}")
|
||||||
|
continue
|
||||||
|
normalized = _normalize_snapshots(payload)
|
||||||
|
if not normalized:
|
||||||
|
detail_parts.append(f"{file_path.name}: empty snapshot payload")
|
||||||
|
continue
|
||||||
|
snapshots.extend(normalized)
|
||||||
|
try:
|
||||||
|
mtime = file_path.stat().st_mtime
|
||||||
|
except OSError:
|
||||||
|
mtime = time.time()
|
||||||
|
newest_mtime = max(newest_mtime, mtime)
|
||||||
|
for snapshot in normalized:
|
||||||
|
ts = _snapshot_timestamp(snapshot, mtime)
|
||||||
|
if latest_ts is None or ts > latest_ts:
|
||||||
|
latest_ts = ts
|
||||||
|
|
||||||
|
if not snapshots:
|
||||||
|
detail = "; ".join(detail_parts) if detail_parts else "metis sentinel snapshots were empty"
|
||||||
|
return self._finish("error", str(source), [], detail)
|
||||||
|
|
||||||
|
if newest_mtime > 0.0:
|
||||||
|
latest_file_ts = datetime.fromtimestamp(newest_mtime, tz=timezone.utc)
|
||||||
|
if latest_ts is None or latest_file_ts > latest_ts:
|
||||||
|
latest_ts = latest_file_ts
|
||||||
|
|
||||||
|
detail = f"loaded {len(files)} file(s) and {len(snapshots)} snapshot(s)"
|
||||||
|
if detail_parts:
|
||||||
|
detail = f"{detail}; {'; '.join(detail_parts)}"
|
||||||
|
|
||||||
|
status = "ok"
|
||||||
|
if settings.metis_sentinel_stale_after_sec > 0 and latest_ts is not None:
|
||||||
|
age_sec = max(0.0, (datetime.now(timezone.utc) - latest_ts).total_seconds())
|
||||||
|
if age_sec > settings.metis_sentinel_stale_after_sec:
|
||||||
|
status = "error"
|
||||||
|
detail = (
|
||||||
|
f"latest sentinel snapshot is stale by {round(age_sec, 1)}s "
|
||||||
|
f"(limit {settings.metis_sentinel_stale_after_sec:.0f}s)"
|
||||||
|
)
|
||||||
|
if detail_parts:
|
||||||
|
detail = f"{detail}; {'; '.join(detail_parts)}"
|
||||||
|
|
||||||
|
if detail_parts and status == "ok":
|
||||||
|
status = "error"
|
||||||
|
|
||||||
|
return self._finish(status, str(source), snapshots, detail, latest_ts=latest_ts)
|
||||||
|
|
||||||
|
|
||||||
|
metis = MetisService()
|
||||||
@ -2,6 +2,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
import os
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
def _env(name: str, default: str = "") -> str:
|
def _env(name: str, default: str = "") -> str:
|
||||||
@ -212,6 +213,9 @@ class Settings:
|
|||||||
keycloak_profile_cron: str
|
keycloak_profile_cron: str
|
||||||
cluster_state_cron: str
|
cluster_state_cron: str
|
||||||
cluster_state_keep: int
|
cluster_state_keep: int
|
||||||
|
metis_sentinel_dir: str
|
||||||
|
metis_sentinel_stale_after_sec: float
|
||||||
|
metis_sentinel_watch_cron: str
|
||||||
|
|
||||||
opensearch_url: str
|
opensearch_url: str
|
||||||
opensearch_limit_bytes: int
|
opensearch_limit_bytes: int
|
||||||
@ -475,6 +479,14 @@ class Settings:
|
|||||||
"cluster_state_keep": _env_int("ARIADNE_CLUSTER_STATE_KEEP", 168),
|
"cluster_state_keep": _env_int("ARIADNE_CLUSTER_STATE_KEEP", 168),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _metis_config(cls) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"metis_sentinel_dir": _env("METIS_SENTINEL_DIR", ""),
|
||||||
|
"metis_sentinel_stale_after_sec": _env_float("METIS_SENTINEL_STALE_AFTER_SEC", 3600.0),
|
||||||
|
"metis_sentinel_watch_cron": _env("ARIADNE_SCHEDULE_METIS_SENTINEL_WATCH", "*/15 * * * *"),
|
||||||
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _opensearch_config(cls) -> dict[str, Any]:
|
def _opensearch_config(cls) -> dict[str, Any]:
|
||||||
return {
|
return {
|
||||||
@ -502,6 +514,7 @@ class Settings:
|
|||||||
vaultwarden_cfg = cls._vaultwarden_config()
|
vaultwarden_cfg = cls._vaultwarden_config()
|
||||||
schedule_cfg = cls._schedule_config()
|
schedule_cfg = cls._schedule_config()
|
||||||
cluster_cfg = cls._cluster_state_config()
|
cluster_cfg = cls._cluster_state_config()
|
||||||
|
metis_cfg = cls._metis_config()
|
||||||
opensearch_cfg = cls._opensearch_config()
|
opensearch_cfg = cls._opensearch_config()
|
||||||
|
|
||||||
portal_db = _env("PORTAL_DATABASE_URL", "")
|
portal_db = _env("PORTAL_DATABASE_URL", "")
|
||||||
@ -540,6 +553,7 @@ class Settings:
|
|||||||
**vaultwarden_cfg,
|
**vaultwarden_cfg,
|
||||||
**schedule_cfg,
|
**schedule_cfg,
|
||||||
**cluster_cfg,
|
**cluster_cfg,
|
||||||
|
**metis_cfg,
|
||||||
**opensearch_cfg,
|
**opensearch_cfg,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
6
tests/conftest.py
Normal file
6
tests/conftest.py
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
os.environ["PORTAL_DATABASE_URL"] = "postgresql://user:pass@localhost/db"
|
||||||
@ -2,13 +2,10 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import dataclasses
|
import dataclasses
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
import os
|
|
||||||
|
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
os.environ.setdefault("PORTAL_DATABASE_URL", "postgresql://user:pass@localhost/db")
|
|
||||||
|
|
||||||
from ariadne.auth.keycloak import AuthContext
|
from ariadne.auth.keycloak import AuthContext
|
||||||
import ariadne.app as app_module
|
import ariadne.app as app_module
|
||||||
|
|
||||||
@ -47,6 +44,26 @@ def test_startup_and_shutdown(monkeypatch) -> None:
|
|||||||
app_module._shutdown()
|
app_module._shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
def test_startup_registers_metis_watch(monkeypatch) -> None:
|
||||||
|
tasks = []
|
||||||
|
|
||||||
|
monkeypatch.setattr(app_module.provisioning, "start", lambda: None)
|
||||||
|
monkeypatch.setattr(app_module.scheduler, "start", lambda: None)
|
||||||
|
monkeypatch.setattr(app_module.scheduler, "stop", lambda: None)
|
||||||
|
monkeypatch.setattr(app_module.provisioning, "stop", lambda: None)
|
||||||
|
monkeypatch.setattr(app_module.portal_db, "close", lambda: None)
|
||||||
|
monkeypatch.setattr(app_module.ariadne_db, "close", lambda: None)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
app_module.scheduler,
|
||||||
|
"add_task",
|
||||||
|
lambda name, cron_expr, runner: tasks.append((name, cron_expr)),
|
||||||
|
)
|
||||||
|
|
||||||
|
app_module._startup()
|
||||||
|
|
||||||
|
assert any(name == "schedule.metis_sentinel_watch" for name, _cron in tasks)
|
||||||
|
|
||||||
|
|
||||||
def test_record_event_handles_exception(monkeypatch) -> None:
|
def test_record_event_handles_exception(monkeypatch) -> None:
|
||||||
monkeypatch.setattr(app_module.storage, "record_event", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("fail")))
|
monkeypatch.setattr(app_module.storage, "record_event", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("fail")))
|
||||||
app_module._record_event("event", {"ok": True})
|
app_module._record_event("event", {"ok": True})
|
||||||
|
|||||||
62
tests/test_metis.py
Normal file
62
tests/test_metis.py
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
import types
|
||||||
|
|
||||||
|
from ariadne.services.metis import MetisService
|
||||||
|
|
||||||
|
|
||||||
|
def test_watch_sentinel_reads_snapshot_dir(monkeypatch, tmp_path) -> None:
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"ariadne.services.metis.settings",
|
||||||
|
types.SimpleNamespace(
|
||||||
|
metis_sentinel_dir=str(tmp_path),
|
||||||
|
metis_sentinel_stale_after_sec=3600.0,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
Path(tmp_path, "node-a.json").write_text(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"hostname": "titan-13",
|
||||||
|
"kernel": "6.6.63",
|
||||||
|
"containerd": "1.7.23",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
Path(tmp_path, "node-b.json").write_text(
|
||||||
|
json.dumps(
|
||||||
|
{
|
||||||
|
"hostname": "titan-19",
|
||||||
|
"kernel": "6.6.63",
|
||||||
|
"containerd": "1.7.23",
|
||||||
|
}
|
||||||
|
),
|
||||||
|
encoding="utf-8",
|
||||||
|
)
|
||||||
|
|
||||||
|
summary = MetisService().watch_sentinel()
|
||||||
|
|
||||||
|
assert summary.status == "ok"
|
||||||
|
assert summary.snapshots == 2
|
||||||
|
assert summary.hosts == 2
|
||||||
|
assert summary.hostnames == ["titan-13", "titan-19"]
|
||||||
|
assert summary.source == str(tmp_path)
|
||||||
|
|
||||||
|
|
||||||
|
def test_watch_sentinel_skips_when_unconfigured(monkeypatch) -> None:
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"ariadne.services.metis.settings",
|
||||||
|
types.SimpleNamespace(
|
||||||
|
metis_sentinel_dir="",
|
||||||
|
metis_sentinel_stale_after_sec=3600.0,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
summary = MetisService().watch_sentinel()
|
||||||
|
|
||||||
|
assert summary.status == "skipped"
|
||||||
|
assert summary.snapshots == 0
|
||||||
|
assert summary.hosts == 0
|
||||||
@ -1,6 +1,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from ariadne import settings as settings_module
|
from ariadne import settings as settings_module
|
||||||
|
from ariadne.settings import Settings
|
||||||
|
|
||||||
|
|
||||||
def test_env_int_invalid(monkeypatch) -> None:
|
def test_env_int_invalid(monkeypatch) -> None:
|
||||||
@ -11,3 +12,14 @@ def test_env_int_invalid(monkeypatch) -> None:
|
|||||||
def test_env_float_invalid(monkeypatch) -> None:
|
def test_env_float_invalid(monkeypatch) -> None:
|
||||||
monkeypatch.setenv("ARIADNE_FLOAT_TEST", "bad")
|
monkeypatch.setenv("ARIADNE_FLOAT_TEST", "bad")
|
||||||
assert settings_module._env_float("ARIADNE_FLOAT_TEST", 1.5) == 1.5
|
assert settings_module._env_float("ARIADNE_FLOAT_TEST", 1.5) == 1.5
|
||||||
|
|
||||||
|
|
||||||
|
def test_from_env_includes_metis_settings(monkeypatch) -> None:
|
||||||
|
monkeypatch.setenv("METIS_SENTINEL_DIR", "/var/lib/metis/sentinel")
|
||||||
|
monkeypatch.setenv("METIS_SENTINEL_STALE_AFTER_SEC", "900")
|
||||||
|
monkeypatch.setenv("ARIADNE_SCHEDULE_METIS_SENTINEL_WATCH", "*/7 * * * *")
|
||||||
|
|
||||||
|
cfg = Settings.from_env()
|
||||||
|
assert cfg.metis_sentinel_dir == "/var/lib/metis/sentinel"
|
||||||
|
assert cfg.metis_sentinel_stale_after_sec == 900.0
|
||||||
|
assert cfg.metis_sentinel_watch_cron == "*/7 * * * *"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user