226 lines
8.7 KiB
Python
226 lines
8.7 KiB
Python
from __future__ import annotations
|
|
|
|
import types
|
|
|
|
import pytest
|
|
|
|
from ariadne.services import comms as comms_module
|
|
from ariadne.services.comms import CommsService
|
|
|
|
|
|
class _Response:
|
|
def __init__(self, payload=None, status_code=200, text=""):
|
|
self._payload = payload or {}
|
|
self.status_code = status_code
|
|
self.text = text
|
|
|
|
def json(self):
|
|
return self._payload
|
|
|
|
def raise_for_status(self):
|
|
if self.status_code >= 400:
|
|
raise RuntimeError(f"status {self.status_code}")
|
|
|
|
|
|
class _Client:
|
|
def __init__(self, responses=None):
|
|
self.responses = responses or {}
|
|
self.calls = []
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, tb):
|
|
return False
|
|
|
|
def _respond(self, method, url, **kwargs):
|
|
self.calls.append((method, url, kwargs))
|
|
value = self.responses.get((method, url), self.responses.get(method))
|
|
if isinstance(value, list):
|
|
return value.pop(0) if value else _Response()
|
|
if value is None:
|
|
return _Response()
|
|
return value
|
|
|
|
def get(self, url, **kwargs):
|
|
return self._respond("GET", url, **kwargs)
|
|
|
|
def post(self, url, **kwargs):
|
|
return self._respond("POST", url, **kwargs)
|
|
|
|
def put(self, url, **kwargs):
|
|
return self._respond("PUT", url, **kwargs)
|
|
|
|
def delete(self, url, **kwargs):
|
|
return self._respond("DELETE", url, **kwargs)
|
|
|
|
|
|
def _settings(**overrides):
|
|
base = {
|
|
"comms_auth_base": "http://auth",
|
|
"comms_synapse_base": "http://synapse",
|
|
"comms_server_name": "live.bstein.dev",
|
|
"comms_room_alias": "#othrys:live.bstein.dev",
|
|
"comms_room_name": "Othrys",
|
|
"comms_pin_message": "invite",
|
|
"comms_seeder_user": "othrys-seeder",
|
|
"comms_seeder_password": "pw",
|
|
"comms_bot_user": "atlasbot",
|
|
"comms_bot_password": "bot",
|
|
"comms_timeout_sec": 5.0,
|
|
}
|
|
base.update(overrides)
|
|
return types.SimpleNamespace(**base)
|
|
|
|
|
|
def test_pin_invite_requires_password(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings(comms_seeder_password=""))
|
|
|
|
with pytest.raises(RuntimeError, match="seeder password missing"):
|
|
CommsService().run_pin_invite()
|
|
|
|
|
|
def test_pin_invite_reports_missing_event_id(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings())
|
|
alias = "%23othrys%3Alive.bstein.dev"
|
|
client = _Client(
|
|
{
|
|
("POST", "http://auth/_matrix/client/v3/login"): _Response({"access_token": "tok"}),
|
|
("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias}"): _Response({"room_id": "room1"}),
|
|
("GET", "http://synapse/_matrix/client/v3/rooms/room1/state/m.room.pinned_events"): _Response({"pinned": []}),
|
|
("POST", "http://synapse/_matrix/client/v3/rooms/room1/send/m.room.message"): _Response({}),
|
|
}
|
|
)
|
|
|
|
result = CommsService(client_factory=lambda timeout=None: client).run_pin_invite()
|
|
|
|
assert result == {"status": "error", "detail": "pin event_id missing"}
|
|
|
|
|
|
def test_reset_room_requires_pin_event(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings())
|
|
svc = CommsService()
|
|
monkeypatch.setattr(svc, "_client", lambda: _Client())
|
|
monkeypatch.setattr(svc, "_login_with_retry", lambda client, user, password: "tok")
|
|
monkeypatch.setattr(svc, "_resolve_alias", lambda client, token, alias: "old-room")
|
|
monkeypatch.setattr(svc, "_create_room", lambda client, token, name: "new-room")
|
|
monkeypatch.setattr(svc, "_set_room_state", lambda *args, **kwargs: None)
|
|
monkeypatch.setattr(svc, "_delete_alias", lambda *args, **kwargs: None)
|
|
monkeypatch.setattr(svc, "_put_alias", lambda *args, **kwargs: None)
|
|
monkeypatch.setattr(svc, "_set_directory_visibility", lambda *args, **kwargs: None)
|
|
monkeypatch.setattr(svc, "_invite_user", lambda *args, **kwargs: None)
|
|
monkeypatch.setattr(svc, "_list_joined_members", lambda *args, **kwargs: [])
|
|
monkeypatch.setattr(svc, "_send_message", lambda *args, **kwargs: "")
|
|
|
|
with pytest.raises(RuntimeError, match="pin message event_id missing"):
|
|
svc.run_reset_room()
|
|
|
|
|
|
def test_seed_room_skips_server_admin_errors(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings())
|
|
svc = CommsService()
|
|
monkeypatch.setattr(svc, "_client", lambda: _Client())
|
|
monkeypatch.setattr(svc, "_login", lambda client, user, password: "tok")
|
|
monkeypatch.setattr(svc, "_ensure_room", lambda client, token: "room1")
|
|
monkeypatch.setattr(svc, "_join_user", lambda *args, **kwargs: None)
|
|
monkeypatch.setattr(svc, "_join_all_locals", lambda *args, **kwargs: None)
|
|
|
|
def ensure_user(client, token, user, password, admin):
|
|
if admin:
|
|
raise RuntimeError("You are not a server admin")
|
|
|
|
monkeypatch.setattr(svc, "_ensure_user", ensure_user)
|
|
|
|
assert svc.run_seed_room() == {"status": "ok", "detail": "room seeded"}
|
|
|
|
|
|
def test_seed_room_requires_bot_password(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings(comms_bot_password=""))
|
|
|
|
with pytest.raises(RuntimeError, match="seeder/bot password missing"):
|
|
CommsService().run_seed_room()
|
|
|
|
|
|
def test_login_failure_and_missing_token(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings())
|
|
svc = CommsService()
|
|
|
|
with pytest.raises(RuntimeError, match="login failed"):
|
|
svc._login(_Client({"POST": _Response(status_code=403, text="denied")}), "alice", "pw")
|
|
|
|
with pytest.raises(RuntimeError, match="login missing token"):
|
|
svc._login(_Client({"POST": _Response({})}), "alice", "pw")
|
|
|
|
|
|
def test_login_retry_exhaustion(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings())
|
|
svc = CommsService()
|
|
monkeypatch.setattr(svc, "_login", lambda *args, **kwargs: (_ for _ in ()).throw(RuntimeError("nope")))
|
|
monkeypatch.setattr(svc, "_sleep", lambda _seconds: None)
|
|
|
|
with pytest.raises(RuntimeError, match="nope"):
|
|
svc._login_with_retry(_Client(), "alice", "pw")
|
|
|
|
|
|
def test_not_found_helpers(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings())
|
|
svc = CommsService()
|
|
client = _Client({"GET": _Response(status_code=404)})
|
|
|
|
assert svc._get_pinned(client, "tok", "room1") == []
|
|
assert svc._get_event(client, "tok", "room1", "event1") is None
|
|
|
|
|
|
def test_error_helpers_raise(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings())
|
|
svc = CommsService()
|
|
|
|
with pytest.raises(RuntimeError, match="status 500"):
|
|
svc._delete_alias(_Client({"DELETE": _Response(status_code=500)}), "tok", "#othrys:live.bstein.dev")
|
|
|
|
with pytest.raises(RuntimeError, match="status 500"):
|
|
svc._invite_user(_Client({"POST": _Response(status_code=500)}), "tok", "room1", "@alice:live.bstein.dev")
|
|
|
|
with pytest.raises(RuntimeError, match="create user"):
|
|
svc._ensure_user(_Client({"GET": _Response(status_code=404), "PUT": _Response(status_code=500)}), "tok", "alice", "pw", False)
|
|
|
|
|
|
def test_ensure_user_existing_and_ensure_room_edges(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings())
|
|
svc = CommsService()
|
|
alias = "%23othrys%3Alive.bstein.dev"
|
|
|
|
existing_user_client = _Client({"GET": _Response(status_code=200)})
|
|
svc._ensure_user(existing_user_client, "tok", "alice", "pw", False)
|
|
|
|
existing_room = _Client({("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias}"): _Response({"room_id": "room1"})})
|
|
assert svc._ensure_room(existing_room, "tok") == "room1"
|
|
|
|
create_failure = _Client(
|
|
{
|
|
("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias}"): _Response(status_code=404),
|
|
("POST", "http://synapse/_matrix/client/v3/createRoom"): _Response(status_code=500, text="boom"),
|
|
}
|
|
)
|
|
with pytest.raises(RuntimeError, match="create room failed"):
|
|
svc._ensure_room(create_failure, "tok")
|
|
|
|
|
|
def test_join_all_locals_handles_pagination(monkeypatch) -> None:
|
|
monkeypatch.setattr(comms_module, "settings", _settings(comms_synapse_admin_token="admintok"))
|
|
svc = CommsService()
|
|
joined = []
|
|
first = "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"
|
|
second = first + "&from=next"
|
|
client = _Client(
|
|
{
|
|
("GET", first): _Response({"users": [{"name": "@a:live.bstein.dev"}, {"bad": True}], "next_token": "next"}),
|
|
("GET", second): _Response({"users": [{"name": "@b:live.bstein.dev"}]}),
|
|
}
|
|
)
|
|
monkeypatch.setattr(svc, "_join_user", lambda client, token, room_id, uid: joined.append(uid))
|
|
|
|
svc._join_all_locals(client, "tok", "room1")
|
|
|
|
assert joined == ["@a:live.bstein.dev", "@b:live.bstein.dev"]
|