metis: switch watcher to http

This commit is contained in:
Brad Stein 2026-03-31 14:18:31 -03:00
parent cf0271a8ea
commit 2eecc3d88d
4 changed files with 163 additions and 191 deletions

View File

@ -1,189 +1,106 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
import json
import time
from pathlib import Path
from typing import Any
import httpx
from ..settings import settings
from ..utils.logging import get_logger
logger = get_logger(__name__)
_SNAPSHOT_KEYS = ("hostname", "kernel", "os_image", "k3s_version", "containerd", "package_sample")
_TIMESTAMP_KEYS = ("collected_at", "timestamp", "generated_at", "created_at")
_WATCH_PATH = "/internal/sentinel/watch"
@dataclass(frozen=True)
class MetisSentinelWatchSummary:
status: str
source: str
snapshots: int
hosts: int
hostnames: list[str] = field(default_factory=list)
latest_snapshot_at: str = ""
latest_snapshot_age_sec: float | None = None
watch_url: str
detail: str = ""
result: dict[str, Any] = field(default_factory=dict)
def _parse_timestamp(raw: Any) -> datetime | None:
if not isinstance(raw, str):
return None
text = raw.strip()
if not text:
return None
try:
return datetime.fromisoformat(text.replace("Z", "+00:00"))
except ValueError:
return None
def _watch_url() -> str:
if settings.metis_watch_url:
return settings.metis_watch_url
if settings.metis_base_url:
return f"{settings.metis_base_url}{_WATCH_PATH}"
return ""
def _normalize_snapshots(payload: Any) -> list[dict[str, Any]]:
if isinstance(payload, list):
return [item for item in payload if isinstance(item, dict)]
if not isinstance(payload, dict):
return []
for key in ("snapshots", "items", "data"):
value = payload.get(key)
if isinstance(value, list):
return [item for item in value if isinstance(item, dict)]
if any(key in payload for key in _SNAPSHOT_KEYS):
return [payload]
return []
def _snapshot_hostname(snapshot: dict[str, Any], fallback: str) -> str:
for key in ("hostname", "host", "name"):
value = snapshot.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return fallback
def _snapshot_timestamp(snapshot: dict[str, Any], fallback_mtime: float) -> datetime:
for key in _TIMESTAMP_KEYS:
ts = _parse_timestamp(snapshot.get(key))
if ts is not None:
return ts.astimezone(timezone.utc)
return datetime.fromtimestamp(fallback_mtime, tz=timezone.utc)
def _normalize_payload(payload: Any) -> dict[str, Any]:
if isinstance(payload, dict):
return payload
if payload is None:
return {}
return {"result": payload}
class MetisService:
def ready(self) -> bool:
return bool(settings.metis_sentinel_dir)
return bool(_watch_url())
def _finish(
self,
status: str,
source: str,
snapshots: list[dict[str, Any]],
detail: str,
latest_ts: datetime | None = None,
) -> MetisSentinelWatchSummary:
hostnames = sorted(
{
hostname
for idx, snapshot in enumerate(snapshots)
if (hostname := _snapshot_hostname(snapshot, f"snapshot-{idx + 1}"))
}
)
def _finish(self, status: str, watch_url: str, detail: str = "", result: dict[str, Any] | None = None) -> MetisSentinelWatchSummary:
summary = MetisSentinelWatchSummary(
status=status,
source=source,
snapshots=len(snapshots),
hosts=len(hostnames),
hostnames=hostnames,
latest_snapshot_at=latest_ts.isoformat() if latest_ts is not None else "",
latest_snapshot_age_sec=(
max(0.0, (datetime.now(timezone.utc) - latest_ts).total_seconds()) if latest_ts is not None else None
),
watch_url=watch_url,
detail=detail,
result=result or {},
)
logger.info(
"metis sentinel watch finished",
extra={
"event": "metis_sentinel_watch",
"status": summary.status,
"source": summary.source,
"snapshots": summary.snapshots,
"hosts": summary.hosts,
"watch_url": summary.watch_url,
"detail": summary.detail,
},
)
return summary
def watch_sentinel(self) -> MetisSentinelWatchSummary:
if not settings.metis_sentinel_dir:
return self._finish("skipped", "", [], "metis sentinel dir not configured")
watch_url = _watch_url()
if not watch_url:
return self._finish("skipped", "", "metis watch url not configured")
source = Path(settings.metis_sentinel_dir)
if not source.exists():
return self._finish("error", str(source), [], "metis sentinel dir does not exist")
if not source.is_dir():
return self._finish("error", str(source), [], "metis sentinel path is not a directory")
snapshots: list[dict[str, Any]] = []
latest_ts: datetime | None = None
detail_parts: list[str] = []
newest_mtime = 0.0
files = sorted(path for path in source.rglob("*.json") if path.is_file())
if not files:
return self._finish("error", str(source), [], "metis sentinel dir does not contain snapshots")
for file_path in files:
try:
with httpx.Client(timeout=settings.metis_timeout_sec, follow_redirects=True) as client:
response = client.post(watch_url)
response.raise_for_status()
try:
payload = response.json()
except Exception:
payload = {}
except httpx.HTTPStatusError as exc:
response = exc.response
detail = f"metis watch failed with HTTP {response.status_code}"
try:
payload = json.loads(file_path.read_text())
except Exception as exc: # noqa: BLE001
detail_parts.append(f"{file_path.name}: {exc}")
continue
normalized = _normalize_snapshots(payload)
if not normalized:
detail_parts.append(f"{file_path.name}: empty snapshot payload")
continue
snapshots.extend(normalized)
try:
mtime = file_path.stat().st_mtime
except OSError:
mtime = time.time()
newest_mtime = max(newest_mtime, mtime)
for snapshot in normalized:
ts = _snapshot_timestamp(snapshot, mtime)
if latest_ts is None or ts > latest_ts:
latest_ts = ts
payload = response.json()
except Exception:
payload = {}
payload = _normalize_payload(payload)
if isinstance(payload.get("detail"), str) and payload["detail"].strip():
detail = payload["detail"].strip()
return self._finish("error", watch_url, detail, payload)
except Exception as exc: # noqa: BLE001
return self._finish("error", watch_url, str(exc).strip() or "metis watch failed")
if not snapshots:
detail = "; ".join(detail_parts) if detail_parts else "metis sentinel snapshots were empty"
return self._finish("error", str(source), [], detail)
payload = _normalize_payload(payload)
status = payload.get("status") if isinstance(payload.get("status"), str) else "ok"
detail = ""
if isinstance(payload.get("detail"), str):
detail = payload["detail"].strip()
elif isinstance(payload.get("message"), str):
detail = payload["message"].strip()
elif status != "ok":
detail = f"metis watch returned {status}"
if newest_mtime > 0.0:
latest_file_ts = datetime.fromtimestamp(newest_mtime, tz=timezone.utc)
if latest_ts is None or latest_file_ts > latest_ts:
latest_ts = latest_file_ts
if status not in {"ok", "skipped", "error"}:
status = "ok"
detail = f"loaded {len(files)} file(s) and {len(snapshots)} snapshot(s)"
if detail_parts:
detail = f"{detail}; {'; '.join(detail_parts)}"
status = "ok"
if settings.metis_sentinel_stale_after_sec > 0 and latest_ts is not None:
age_sec = max(0.0, (datetime.now(timezone.utc) - latest_ts).total_seconds())
if age_sec > settings.metis_sentinel_stale_after_sec:
status = "error"
detail = (
f"latest sentinel snapshot is stale by {round(age_sec, 1)}s "
f"(limit {settings.metis_sentinel_stale_after_sec:.0f}s)"
)
if detail_parts:
detail = f"{detail}; {'; '.join(detail_parts)}"
if detail_parts and status == "ok":
status = "error"
return self._finish(status, str(source), snapshots, detail, latest_ts=latest_ts)
return self._finish(status, watch_url, detail, payload)
metis = MetisService()

View File

@ -213,8 +213,9 @@ class Settings:
keycloak_profile_cron: str
cluster_state_cron: str
cluster_state_keep: int
metis_sentinel_dir: str
metis_sentinel_stale_after_sec: float
metis_base_url: str
metis_watch_url: str
metis_timeout_sec: float
metis_sentinel_watch_cron: str
opensearch_url: str
@ -482,8 +483,9 @@ class Settings:
@classmethod
def _metis_config(cls) -> dict[str, Any]:
return {
"metis_sentinel_dir": _env("METIS_SENTINEL_DIR", ""),
"metis_sentinel_stale_after_sec": _env_float("METIS_SENTINEL_STALE_AFTER_SEC", 3600.0),
"metis_base_url": _env("METIS_BASE_URL", "http://metis.maintenance.svc.cluster.local").rstrip("/"),
"metis_watch_url": _env("METIS_WATCH_URL", "").rstrip("/"),
"metis_timeout_sec": _env_float("METIS_TIMEOUT_SEC", 10.0),
"metis_sentinel_watch_cron": _env("ARIADNE_SCHEDULE_METIS_SENTINEL_WATCH", "*/15 * * * *"),
}

View File

@ -1,62 +1,113 @@
from __future__ import annotations
import json
from pathlib import Path
import types
from types import SimpleNamespace
from ariadne.services.metis import MetisService
import httpx
from ariadne.services import metis as metis_module
def test_watch_sentinel_reads_snapshot_dir(monkeypatch, tmp_path) -> None:
monkeypatch.setattr(
"ariadne.services.metis.settings",
types.SimpleNamespace(
metis_sentinel_dir=str(tmp_path),
metis_sentinel_stale_after_sec=3600.0,
),
class DummyResponse:
def __init__(self, status_code: int = 200, payload: object | None = None) -> None:
self.status_code = status_code
self._payload = payload
def raise_for_status(self) -> None:
if self.status_code >= 400:
request = httpx.Request("POST", "http://example.test")
raise httpx.HTTPStatusError("boom", request=request, response=self)
def json(self):
if isinstance(self._payload, Exception):
raise self._payload
return self._payload
class DummyClient:
def __init__(self, response: DummyResponse) -> None:
self.response = response
self.calls: list[str] = []
self.kwargs = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def post(self, url: str):
self.calls.append(url)
return self.response
def test_watch_sentinel_posts_to_derived_url(monkeypatch) -> None:
dummy = SimpleNamespace(
metis_base_url="http://metis.maintenance.svc.cluster.local",
metis_watch_url="",
metis_timeout_sec=12.5,
)
monkeypatch.setattr("ariadne.services.metis.settings", dummy)
client = DummyClient(DummyResponse(payload={"status": "ok", "detail": "watched", "nodes": 21}))
captured: dict[str, object] = {}
Path(tmp_path, "node-a.json").write_text(
json.dumps(
{
"hostname": "titan-13",
"kernel": "6.6.63",
"containerd": "1.7.23",
}
),
encoding="utf-8",
)
Path(tmp_path, "node-b.json").write_text(
json.dumps(
{
"hostname": "titan-19",
"kernel": "6.6.63",
"containerd": "1.7.23",
}
),
encoding="utf-8",
)
def factory(**kwargs):
captured.update(kwargs)
return client
summary = MetisService().watch_sentinel()
monkeypatch.setattr(metis_module.httpx, "Client", factory)
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "ok"
assert summary.snapshots == 2
assert summary.hosts == 2
assert summary.hostnames == ["titan-13", "titan-19"]
assert summary.source == str(tmp_path)
assert summary.watch_url == "http://metis.maintenance.svc.cluster.local/internal/sentinel/watch"
assert summary.detail == "watched"
assert summary.result["nodes"] == 21
assert client.calls == [summary.watch_url]
assert captured["timeout"] == 12.5
def test_watch_sentinel_uses_explicit_url(monkeypatch) -> None:
dummy = SimpleNamespace(
metis_base_url="http://metis.maintenance.svc.cluster.local",
metis_watch_url="http://metis.example/internal/sentinel/watch",
metis_timeout_sec=10.0,
)
monkeypatch.setattr("ariadne.services.metis.settings", dummy)
client = DummyClient(DummyResponse(payload={"status": "ok"}))
monkeypatch.setattr(metis_module.httpx, "Client", lambda **kwargs: client)
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "ok"
assert summary.watch_url == "http://metis.example/internal/sentinel/watch"
assert client.calls == [summary.watch_url]
def test_watch_sentinel_skips_when_unconfigured(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.metis.settings",
types.SimpleNamespace(
metis_sentinel_dir="",
metis_sentinel_stale_after_sec=3600.0,
),
SimpleNamespace(metis_base_url="", metis_watch_url="", metis_timeout_sec=10.0),
)
summary = MetisService().watch_sentinel()
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "skipped"
assert summary.snapshots == 0
assert summary.hosts == 0
assert summary.watch_url == ""
assert summary.detail == "metis watch url not configured"
def test_watch_sentinel_handles_http_error(monkeypatch) -> None:
dummy = SimpleNamespace(
metis_base_url="http://metis.maintenance.svc.cluster.local",
metis_watch_url="",
metis_timeout_sec=10.0,
)
monkeypatch.setattr("ariadne.services.metis.settings", dummy)
client = DummyClient(DummyResponse(status_code=502, payload={"detail": "upstream fail"}))
monkeypatch.setattr(metis_module.httpx, "Client", lambda **kwargs: client)
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "error"
assert summary.detail == "upstream fail"
assert summary.result["detail"] == "upstream fail"

View File

@ -15,11 +15,13 @@ def test_env_float_invalid(monkeypatch) -> None:
def test_from_env_includes_metis_settings(monkeypatch) -> None:
monkeypatch.setenv("METIS_SENTINEL_DIR", "/var/lib/metis/sentinel")
monkeypatch.setenv("METIS_SENTINEL_STALE_AFTER_SEC", "900")
monkeypatch.setenv("METIS_BASE_URL", "http://metis.maintenance.svc.cluster.local/")
monkeypatch.setenv("METIS_WATCH_URL", "http://metis.example/internal/sentinel/watch")
monkeypatch.setenv("METIS_TIMEOUT_SEC", "9.5")
monkeypatch.setenv("ARIADNE_SCHEDULE_METIS_SENTINEL_WATCH", "*/7 * * * *")
cfg = Settings.from_env()
assert cfg.metis_sentinel_dir == "/var/lib/metis/sentinel"
assert cfg.metis_sentinel_stale_after_sec == 900.0
assert cfg.metis_base_url == "http://metis.maintenance.svc.cluster.local"
assert cfg.metis_watch_url == "http://metis.example/internal/sentinel/watch"
assert cfg.metis_timeout_sec == 9.5
assert cfg.metis_sentinel_watch_cron == "*/7 * * * *"