ariadne/tests/test_comms.py

353 lines
13 KiB
Python

from __future__ import annotations
import types
from ariadne.services import comms as comms_module
from ariadne.services.comms import CommsService
class DummyResponse:
def __init__(self, payload=None, status_code=200, text=""):
self._payload = payload or {}
self.status_code = status_code
self.text = text
def json(self):
return self._payload
def raise_for_status(self):
if self.status_code >= 400:
raise RuntimeError("status error")
class DummyClient:
def __init__(self, handler, timeout=None):
self._handler = handler
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get(self, url, **kwargs):
return self._handler("GET", url, **kwargs)
def post(self, url, **kwargs):
return self._handler("POST", url, **kwargs)
def put(self, url, **kwargs):
return self._handler("PUT", url, **kwargs)
def delete(self, url, **kwargs):
return self._handler("DELETE", url, **kwargs)
def _make_handler(responses):
def handler(method, url, **_kwargs):
key = (method, url)
value = responses.get(key)
if isinstance(value, list):
if not value:
return DummyResponse()
return value.pop(0)
if callable(value):
return value(method, url)
if value is None:
return DummyResponse()
return value
return handler
def test_comms_pin_invite_pins(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
comms_auth_base="http://auth",
comms_synapse_base="http://synapse",
comms_server_name="live.bstein.dev",
comms_room_alias="#othrys:live.bstein.dev",
comms_pin_message="invite",
comms_seeder_user="othrys-seeder",
comms_seeder_password="pw",
comms_timeout_sec=5.0,
)
monkeypatch.setattr(comms_module, "settings", dummy_settings)
alias_enc = "%23othrys%3Alive.bstein.dev"
responses = {
("POST", "http://auth/_matrix/client/v3/login"): DummyResponse({"access_token": "tok"}),
("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias_enc}"): DummyResponse(
{"room_id": "room1"}
),
("GET", "http://synapse/_matrix/client/v3/rooms/room1/state/m.room.pinned_events"): DummyResponse(
{"pinned": []}
),
("POST", "http://synapse/_matrix/client/v3/rooms/room1/send/m.room.message"): DummyResponse(
{"event_id": "event1"}
),
("PUT", "http://synapse/_matrix/client/v3/rooms/room1/state/m.room.pinned_events"): DummyResponse({}),
}
handler = _make_handler(responses)
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
result = svc.run_pin_invite()
assert result["status"] == "ok"
assert result["detail"] == "pinned"
def test_comms_pin_invite_skips_existing(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
comms_auth_base="http://auth",
comms_synapse_base="http://synapse",
comms_server_name="live.bstein.dev",
comms_room_alias="#othrys:live.bstein.dev",
comms_pin_message="invite",
comms_seeder_user="othrys-seeder",
comms_seeder_password="pw",
comms_timeout_sec=5.0,
)
monkeypatch.setattr(comms_module, "settings", dummy_settings)
alias_enc = "%23othrys%3Alive.bstein.dev"
responses = {
("POST", "http://auth/_matrix/client/v3/login"): DummyResponse({"access_token": "tok"}),
("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias_enc}"): DummyResponse(
{"room_id": "room1"}
),
("GET", "http://synapse/_matrix/client/v3/rooms/room1/state/m.room.pinned_events"): DummyResponse(
{"pinned": ["event1"]}
),
("GET", "http://synapse/_matrix/client/v3/rooms/room1/event/event1"): DummyResponse(
{"content": {"body": "invite"}}
),
}
handler = _make_handler(responses)
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
result = svc.run_pin_invite()
assert result["status"] == "ok"
assert result["detail"] == "already pinned"
def test_comms_seed_room(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
comms_auth_base="http://auth",
comms_synapse_base="http://synapse",
comms_server_name="live.bstein.dev",
comms_room_alias="#othrys:live.bstein.dev",
comms_room_name="Othrys",
comms_seeder_user="othrys-seeder",
comms_seeder_password="pw",
comms_bot_user="atlasbot",
comms_bot_password="bot",
comms_timeout_sec=5.0,
)
monkeypatch.setattr(comms_module, "settings", dummy_settings)
alias_enc = "%23othrys%3Alive.bstein.dev"
responses = {
("POST", "http://auth/_matrix/client/v3/login"): DummyResponse({"access_token": "tok"}),
("GET", "http://synapse/_synapse/admin/v2/users/%40othrys-seeder%3Alive.bstein.dev"): DummyResponse(
status_code=404
),
("PUT", "http://synapse/_synapse/admin/v2/users/%40othrys-seeder%3Alive.bstein.dev"): DummyResponse(
status_code=201
),
("GET", "http://synapse/_synapse/admin/v2/users/%40atlasbot%3Alive.bstein.dev"): DummyResponse(
status_code=404
),
("PUT", "http://synapse/_synapse/admin/v2/users/%40atlasbot%3Alive.bstein.dev"): DummyResponse(
status_code=201
),
("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias_enc}"): [
DummyResponse(status_code=404),
DummyResponse({"room_id": "room1"}),
],
("POST", "http://synapse/_matrix/client/v3/createRoom"): DummyResponse({"room_id": "room1"}),
("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"): DummyResponse(
{"users": [{"name": "@a:live.bstein.dev"}]}
),
("POST", "http://synapse/_synapse/admin/v1/join/room1"): DummyResponse({}),
}
handler = _make_handler(responses)
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
result = svc.run_seed_room()
assert result["status"] == "ok"
def test_comms_reset_room(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
comms_auth_base="http://auth",
comms_synapse_base="http://synapse",
comms_server_name="live.bstein.dev",
comms_room_alias="#othrys:live.bstein.dev",
comms_room_name="Othrys",
comms_pin_message="invite",
comms_seeder_user="othrys-seeder",
comms_seeder_password="pw",
comms_bot_user="atlasbot",
comms_timeout_sec=5.0,
)
monkeypatch.setattr(comms_module, "settings", dummy_settings)
alias_enc = "%23othrys%3Alive.bstein.dev"
responses = {
("POST", "http://auth/_matrix/client/v3/login"): DummyResponse({"access_token": "tok"}),
("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias_enc}"): DummyResponse(
{"room_id": "old-room"}
),
("POST", "http://synapse/_matrix/client/v3/createRoom"): DummyResponse({"room_id": "new-room"}),
("GET", "http://synapse/_matrix/client/v3/rooms/old-room/members?membership=join"): DummyResponse(
{
"chunk": [
{"type": "m.room.member", "state_key": "@othrys-seeder:live.bstein.dev"},
{"type": "m.room.member", "state_key": "@bob:live.bstein.dev"},
{"type": "m.room.member", "state_key": "@123:live.bstein.dev"},
]
}
),
("POST", "http://synapse/_matrix/client/v3/rooms/new-room/send/m.room.message"): DummyResponse(
{"event_id": "event1"}
),
}
def handler(method, url, **_kwargs):
resp = responses.get((method, url))
if resp is None:
return DummyResponse({})
if isinstance(resp, list):
return resp.pop(0)
return resp
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
result = svc.run_reset_room()
assert result["status"] == "ok"
def test_comms_guest_name_randomizer(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
comms_mas_admin_client_id="client",
comms_mas_admin_client_secret="secret",
comms_mas_token_url="http://mas/token",
comms_mas_admin_api_base="http://mas/api/admin/v1",
comms_synapse_base="http://synapse",
comms_room_alias="#othrys:live.bstein.dev",
comms_server_name="live.bstein.dev",
comms_seeder_user="othrys-seeder",
comms_timeout_sec=5.0,
comms_guest_stale_days=1,
)
monkeypatch.setattr(comms_module, "settings", dummy_settings)
responses = {
("POST", "http://mas/token"): DummyResponse({"access_token": "admintoken"}),
("GET", "http://mas/api/admin/v1/users/by-username/othrys-seeder"): DummyResponse(
{"data": {"id": "seed"}}
),
("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse(
{"data": {"id": "session-1", "attributes": {"access_token": "seedtoken"}}}
),
("POST", "http://mas/api/admin/v1/personal-sessions/session-1/revoke"): DummyResponse({}),
("GET", "http://synapse/_matrix/client/v3/directory/room/%23othrys%3Alive.bstein.dev"): DummyResponse(
{"room_id": "room1"}
),
("GET", "http://synapse/_matrix/client/v3/rooms/room1/members"): DummyResponse(
{"chunk": []}
),
("GET", "http://mas/api/admin/v1/users?page[size]=100"): DummyResponse(
{
"data": [
{"id": "user-1", "attributes": {"username": "guest-1", "legacy_guest": True}},
]
}
),
("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse(
{"data": {"id": "session-2", "attributes": {"access_token": "usertoken"}}}
),
("GET", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev"): DummyResponse(
{"displayname": None}
),
("PUT", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev/displayname"): DummyResponse({}),
("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"): DummyResponse(
{
"users": [
{"name": "@guest-99:live.bstein.dev", "is_guest": True, "last_seen_ts": 0},
]
}
),
("DELETE", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}),
("GET", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse(
{"displayname": None}
),
("PUT", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}),
("POST", "http://mas/api/admin/v1/personal-sessions/session-2/revoke"): DummyResponse({}),
}
def handler(method, url, **_kwargs):
resp = responses.get((method, url))
if resp is None:
return DummyResponse({})
return resp
svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout))
monkeypatch.setattr(svc, "_db_rename_numeric", lambda *_args, **_kwargs: 0)
result = svc.run_guest_name_randomizer()
assert result["status"] == "ok"
assert result["renamed"] >= 1
def test_comms_db_rename_numeric(monkeypatch) -> None:
dummy_settings = types.SimpleNamespace(
comms_synapse_db_host="db",
comms_synapse_db_port=5432,
comms_synapse_db_name="synapse",
comms_synapse_db_user="synapse",
comms_synapse_db_password="pw",
comms_server_name="live.bstein.dev",
)
monkeypatch.setattr(comms_module, "settings", dummy_settings)
class FakeCursor:
def __init__(self):
self._queue = []
def execute(self, query, params=None):
if "FROM profiles WHERE full_user_id" in query:
self._queue.append([("1", "@123:live.bstein.dev", "guest-1")])
elif "FROM users WHERE name" in query:
self._queue.append([("@123:live.bstein.dev",)])
elif "FROM profiles WHERE full_user_id = ANY" in query:
self._queue.append([])
else:
self._queue.append([])
def fetchall(self):
return self._queue.pop(0) if self._queue else []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class FakeConn:
def cursor(self):
return FakeCursor()
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def close(self):
return None
monkeypatch.setattr(comms_module.psycopg, "connect", lambda **_kwargs: FakeConn())
svc = CommsService()
renamed = svc._db_rename_numeric(set())
assert renamed >= 1