ariadne/tests/test_metis.py

201 lines
6.9 KiB
Python
Raw Normal View History

2026-03-31 14:07:02 -03:00
from __future__ import annotations
2026-03-31 14:18:31 -03:00
from types import SimpleNamespace
2026-03-31 14:07:02 -03:00
2026-03-31 14:18:31 -03:00
import httpx
2026-03-31 14:07:02 -03:00
2026-03-31 14:18:31 -03:00
from ariadne.services import metis as metis_module
class DummyResponse:
def __init__(self, status_code: int = 200, payload: object | None = None) -> None:
self.status_code = status_code
self._payload = payload
def raise_for_status(self) -> None:
if self.status_code >= 400:
request = httpx.Request("POST", "http://example.test")
raise httpx.HTTPStatusError("boom", request=request, response=self)
def json(self):
if isinstance(self._payload, Exception):
raise self._payload
return self._payload
class DummyClient:
def __init__(self, response: DummyResponse) -> None:
self.response = response
self.calls: list[str] = []
self.kwargs = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def post(self, url: str):
self.calls.append(url)
return self.response
2026-03-31 14:07:02 -03:00
2026-03-31 14:18:31 -03:00
def test_watch_sentinel_posts_to_derived_url(monkeypatch) -> None:
dummy = SimpleNamespace(
metis_base_url="http://metis.maintenance.svc.cluster.local",
metis_watch_url="",
metis_timeout_sec=12.5,
2026-03-31 14:07:02 -03:00
)
2026-03-31 14:18:31 -03:00
monkeypatch.setattr("ariadne.services.metis.settings", dummy)
client = DummyClient(DummyResponse(payload={"status": "ok", "detail": "watched", "nodes": 21}))
captured: dict[str, object] = {}
def factory(**kwargs):
captured.update(kwargs)
return client
monkeypatch.setattr(metis_module.httpx, "Client", factory)
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "ok"
assert summary.watch_url == "http://metis.maintenance.svc.cluster.local/internal/sentinel/watch"
assert summary.detail == "watched"
assert summary.result["nodes"] == 21
assert client.calls == [summary.watch_url]
assert captured["timeout"] == 12.5
def test_watch_sentinel_uses_explicit_url(monkeypatch) -> None:
dummy = SimpleNamespace(
metis_base_url="http://metis.maintenance.svc.cluster.local",
metis_watch_url="http://metis.example/internal/sentinel/watch",
metis_timeout_sec=10.0,
2026-03-31 14:07:02 -03:00
)
2026-03-31 14:18:31 -03:00
monkeypatch.setattr("ariadne.services.metis.settings", dummy)
client = DummyClient(DummyResponse(payload={"status": "ok"}))
monkeypatch.setattr(metis_module.httpx, "Client", lambda **kwargs: client)
2026-03-31 14:07:02 -03:00
2026-03-31 14:18:31 -03:00
summary = metis_module.MetisService().watch_sentinel()
2026-03-31 14:07:02 -03:00
assert summary.status == "ok"
2026-03-31 14:18:31 -03:00
assert summary.watch_url == "http://metis.example/internal/sentinel/watch"
assert client.calls == [summary.watch_url]
2026-03-31 14:07:02 -03:00
def test_watch_sentinel_skips_when_unconfigured(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.metis.settings",
2026-03-31 14:18:31 -03:00
SimpleNamespace(metis_base_url="", metis_watch_url="", metis_timeout_sec=10.0),
2026-03-31 14:07:02 -03:00
)
2026-03-31 14:18:31 -03:00
summary = metis_module.MetisService().watch_sentinel()
2026-03-31 14:07:02 -03:00
assert summary.status == "skipped"
2026-03-31 14:18:31 -03:00
assert summary.watch_url == ""
assert summary.detail == "metis watch url not configured"
def test_watch_sentinel_handles_http_error(monkeypatch) -> None:
dummy = SimpleNamespace(
metis_base_url="http://metis.maintenance.svc.cluster.local",
metis_watch_url="",
metis_timeout_sec=10.0,
)
monkeypatch.setattr("ariadne.services.metis.settings", dummy)
client = DummyClient(DummyResponse(status_code=502, payload={"detail": "upstream fail"}))
monkeypatch.setattr(metis_module.httpx, "Client", lambda **kwargs: client)
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "error"
assert summary.detail == "upstream fail"
assert summary.result["detail"] == "upstream fail"
2026-04-21 03:20:56 -03:00
def test_normalize_payload_and_ready(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.metis.settings",
SimpleNamespace(metis_base_url="http://metis", metis_watch_url="", metis_timeout_sec=10.0),
)
service = metis_module.MetisService()
assert service.ready() is True
assert metis_module._normalize_payload(None) == {}
assert metis_module._normalize_payload(["watched"]) == {"result": ["watched"]}
monkeypatch.setattr(
"ariadne.services.metis.settings",
SimpleNamespace(metis_base_url="", metis_watch_url="", metis_timeout_sec=10.0),
)
assert service.ready() is False
def test_watch_sentinel_handles_non_json_success(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.metis.settings",
SimpleNamespace(metis_base_url="http://metis", metis_watch_url="", metis_timeout_sec=10.0),
)
client = DummyClient(DummyResponse(payload=ValueError("not json")))
monkeypatch.setattr(metis_module.httpx, "Client", lambda **kwargs: client)
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "ok"
assert summary.result == {}
def test_watch_sentinel_handles_http_error_without_json(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.metis.settings",
SimpleNamespace(metis_base_url="http://metis", metis_watch_url="", metis_timeout_sec=10.0),
)
client = DummyClient(DummyResponse(status_code=503, payload=ValueError("not json")))
monkeypatch.setattr(metis_module.httpx, "Client", lambda **kwargs: client)
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "error"
assert summary.detail == "metis watch failed with HTTP 503"
def test_watch_sentinel_handles_client_exception(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.metis.settings",
SimpleNamespace(metis_base_url="http://metis", metis_watch_url="", metis_timeout_sec=10.0),
)
class FailingClient:
def __enter__(self):
raise RuntimeError("network down")
def __exit__(self, exc_type, exc, tb):
return False
monkeypatch.setattr(metis_module.httpx, "Client", lambda **kwargs: FailingClient())
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "error"
assert summary.detail == "network down"
def test_watch_sentinel_normalizes_message_and_unknown_status(monkeypatch) -> None:
monkeypatch.setattr(
"ariadne.services.metis.settings",
SimpleNamespace(metis_base_url="http://metis", metis_watch_url="", metis_timeout_sec=10.0),
)
client = DummyClient(DummyResponse(payload={"status": "warning", "message": " watched with warnings "}))
monkeypatch.setattr(metis_module.httpx, "Client", lambda **kwargs: client)
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "ok"
assert summary.detail == "watched with warnings"
client.response = DummyResponse(payload={"status": "paused"})
summary = metis_module.MetisService().watch_sentinel()
assert summary.status == "ok"
assert summary.detail == "metis watch returned paused"