from __future__ import annotations import types from ariadne.services import comms as comms_module from ariadne.services.comms import CommsService class DummyResponse: def __init__(self, payload=None, status_code=200, text=""): self._payload = payload or {} self.status_code = status_code self.text = text def json(self): return self._payload def raise_for_status(self): if self.status_code >= 400: raise RuntimeError("status error") class DummyClient: def __init__(self, handler, timeout=None): self._handler = handler def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def get(self, url, **kwargs): return self._handler("GET", url, **kwargs) def post(self, url, **kwargs): return self._handler("POST", url, **kwargs) def put(self, url, **kwargs): return self._handler("PUT", url, **kwargs) def delete(self, url, **kwargs): return self._handler("DELETE", url, **kwargs) def _make_handler(responses): def handler(method, url, **_kwargs): key = (method, url) value = responses.get(key) if isinstance(value, list): if not value: return DummyResponse() return value.pop(0) if callable(value): return value(method, url) if value is None: return DummyResponse() return value return handler def test_comms_pin_invite_pins(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( comms_auth_base="http://auth", comms_synapse_base="http://synapse", comms_server_name="live.bstein.dev", comms_room_alias="#othrys:live.bstein.dev", comms_pin_message="invite", comms_seeder_user="othrys-seeder", comms_seeder_password="pw", comms_timeout_sec=5.0, ) monkeypatch.setattr(comms_module, "settings", dummy_settings) alias_enc = "%23othrys%3Alive.bstein.dev" responses = { ("POST", "http://auth/_matrix/client/v3/login"): DummyResponse({"access_token": "tok"}), ("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias_enc}"): DummyResponse( {"room_id": "room1"} ), ("GET", "http://synapse/_matrix/client/v3/rooms/room1/state/m.room.pinned_events"): DummyResponse( {"pinned": []} ), ("POST", "http://synapse/_matrix/client/v3/rooms/room1/send/m.room.message"): DummyResponse( {"event_id": "event1"} ), ("PUT", "http://synapse/_matrix/client/v3/rooms/room1/state/m.room.pinned_events"): DummyResponse({}), } handler = _make_handler(responses) svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout)) result = svc.run_pin_invite() assert result["status"] == "ok" assert result["detail"] == "pinned" def test_comms_pin_invite_skips_existing(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( comms_auth_base="http://auth", comms_synapse_base="http://synapse", comms_server_name="live.bstein.dev", comms_room_alias="#othrys:live.bstein.dev", comms_pin_message="invite", comms_seeder_user="othrys-seeder", comms_seeder_password="pw", comms_timeout_sec=5.0, ) monkeypatch.setattr(comms_module, "settings", dummy_settings) alias_enc = "%23othrys%3Alive.bstein.dev" responses = { ("POST", "http://auth/_matrix/client/v3/login"): DummyResponse({"access_token": "tok"}), ("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias_enc}"): DummyResponse( {"room_id": "room1"} ), ("GET", "http://synapse/_matrix/client/v3/rooms/room1/state/m.room.pinned_events"): DummyResponse( {"pinned": ["event1"]} ), ("GET", "http://synapse/_matrix/client/v3/rooms/room1/event/event1"): DummyResponse( {"content": {"body": "invite"}} ), } handler = _make_handler(responses) svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout)) result = svc.run_pin_invite() assert result["status"] == "ok" assert result["detail"] == "already pinned" def test_comms_seed_room(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( comms_auth_base="http://auth", comms_synapse_base="http://synapse", comms_server_name="live.bstein.dev", comms_room_alias="#othrys:live.bstein.dev", comms_room_name="Othrys", comms_seeder_user="othrys-seeder", comms_seeder_password="pw", comms_bot_user="atlasbot", comms_bot_password="bot", comms_timeout_sec=5.0, ) monkeypatch.setattr(comms_module, "settings", dummy_settings) alias_enc = "%23othrys%3Alive.bstein.dev" responses = { ("POST", "http://auth/_matrix/client/v3/login"): DummyResponse({"access_token": "tok"}), ("GET", "http://synapse/_synapse/admin/v2/users/%40othrys-seeder%3Alive.bstein.dev"): DummyResponse( status_code=404 ), ("PUT", "http://synapse/_synapse/admin/v2/users/%40othrys-seeder%3Alive.bstein.dev"): DummyResponse( status_code=201 ), ("GET", "http://synapse/_synapse/admin/v2/users/%40atlasbot%3Alive.bstein.dev"): DummyResponse( status_code=404 ), ("PUT", "http://synapse/_synapse/admin/v2/users/%40atlasbot%3Alive.bstein.dev"): DummyResponse( status_code=201 ), ("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias_enc}"): [ DummyResponse(status_code=404), DummyResponse({"room_id": "room1"}), ], ("POST", "http://synapse/_matrix/client/v3/createRoom"): DummyResponse({"room_id": "room1"}), ("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"): DummyResponse( {"users": [{"name": "@a:live.bstein.dev"}]} ), ("POST", "http://synapse/_synapse/admin/v1/join/room1"): DummyResponse({}), } handler = _make_handler(responses) svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout)) result = svc.run_seed_room() assert result["status"] == "ok" def test_comms_reset_room(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( comms_auth_base="http://auth", comms_synapse_base="http://synapse", comms_server_name="live.bstein.dev", comms_room_alias="#othrys:live.bstein.dev", comms_room_name="Othrys", comms_pin_message="invite", comms_seeder_user="othrys-seeder", comms_seeder_password="pw", comms_bot_user="atlasbot", comms_timeout_sec=5.0, ) monkeypatch.setattr(comms_module, "settings", dummy_settings) alias_enc = "%23othrys%3Alive.bstein.dev" responses = { ("POST", "http://auth/_matrix/client/v3/login"): DummyResponse({"access_token": "tok"}), ("GET", f"http://synapse/_matrix/client/v3/directory/room/{alias_enc}"): DummyResponse( {"room_id": "old-room"} ), ("POST", "http://synapse/_matrix/client/v3/createRoom"): DummyResponse({"room_id": "new-room"}), ("GET", "http://synapse/_matrix/client/v3/rooms/old-room/members?membership=join"): DummyResponse( { "chunk": [ {"type": "m.room.member", "state_key": "@othrys-seeder:live.bstein.dev"}, {"type": "m.room.member", "state_key": "@bob:live.bstein.dev"}, {"type": "m.room.member", "state_key": "@123:live.bstein.dev"}, ] } ), ("POST", "http://synapse/_matrix/client/v3/rooms/new-room/send/m.room.message"): DummyResponse( {"event_id": "event1"} ), } def handler(method, url, **_kwargs): resp = responses.get((method, url)) if resp is None: return DummyResponse({}) if isinstance(resp, list): return resp.pop(0) return resp svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout)) result = svc.run_reset_room() assert result["status"] == "ok" def test_comms_guest_name_randomizer(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( comms_mas_admin_client_id="client", comms_mas_admin_client_secret="secret", comms_mas_token_url="http://mas/token", comms_mas_admin_api_base="http://mas/api/admin/v1", comms_synapse_base="http://synapse", comms_room_alias="#othrys:live.bstein.dev", comms_server_name="live.bstein.dev", comms_seeder_user="othrys-seeder", comms_timeout_sec=5.0, comms_guest_stale_days=1, ) monkeypatch.setattr(comms_module, "settings", dummy_settings) responses = { ("POST", "http://mas/token"): DummyResponse({"access_token": "admintoken"}), ("GET", "http://mas/api/admin/v1/users/by-username/othrys-seeder"): DummyResponse( {"data": {"id": "seed"}} ), ("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse( {"data": {"id": "session-1", "attributes": {"access_token": "seedtoken"}}} ), ("POST", "http://mas/api/admin/v1/personal-sessions/session-1/revoke"): DummyResponse({}), ("GET", "http://synapse/_matrix/client/v3/directory/room/%23othrys%3Alive.bstein.dev"): DummyResponse( {"room_id": "room1"} ), ("GET", "http://synapse/_matrix/client/v3/rooms/room1/members"): DummyResponse( {"chunk": []} ), ("GET", "http://mas/api/admin/v1/users?page[size]=100"): DummyResponse( { "data": [ {"id": "user-1", "attributes": {"username": "guest-1", "legacy_guest": True}}, ] } ), ("POST", "http://mas/api/admin/v1/personal-sessions"): DummyResponse( {"data": {"id": "session-2", "attributes": {"access_token": "usertoken"}}} ), ("GET", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev"): DummyResponse( {"displayname": None} ), ("PUT", "http://synapse/_matrix/client/v3/profile/%40guest-1%3Alive.bstein.dev/displayname"): DummyResponse({}), ("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"): DummyResponse( { "users": [ {"name": "@guest-99:live.bstein.dev", "is_guest": True, "last_seen_ts": 0}, ] } ), ("DELETE", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}), ("GET", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse( {"displayname": None} ), ("PUT", "http://synapse/_synapse/admin/v2/users/%40guest-99%3Alive.bstein.dev"): DummyResponse({}), ("POST", "http://mas/api/admin/v1/personal-sessions/session-2/revoke"): DummyResponse({}), } def handler(method, url, **_kwargs): resp = responses.get((method, url)) if resp is None: return DummyResponse({}) return resp svc = CommsService(client_factory=lambda timeout=None: DummyClient(handler, timeout=timeout)) monkeypatch.setattr(svc, "_db_rename_numeric", lambda *_args, **_kwargs: 0) result = svc.run_guest_name_randomizer() assert result["status"] == "ok" assert result["renamed"] >= 1 def test_comms_db_rename_numeric(monkeypatch) -> None: dummy_settings = types.SimpleNamespace( comms_synapse_db_host="db", comms_synapse_db_port=5432, comms_synapse_db_name="synapse", comms_synapse_db_user="synapse", comms_synapse_db_password="pw", comms_server_name="live.bstein.dev", ) monkeypatch.setattr(comms_module, "settings", dummy_settings) class FakeCursor: def __init__(self): self._queue = [] def execute(self, query, params=None): if "FROM profiles WHERE full_user_id" in query: self._queue.append([("1", "@123:live.bstein.dev", "guest-1")]) elif "FROM users WHERE name" in query: self._queue.append([("@123:live.bstein.dev",)]) elif "FROM profiles WHERE full_user_id = ANY" in query: self._queue.append([]) else: self._queue.append([]) def fetchall(self): return self._queue.pop(0) if self._queue else [] def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False class FakeConn: def cursor(self): return FakeCursor() def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False def close(self): return None monkeypatch.setattr(comms_module.psycopg, "connect", lambda **_kwargs: FakeConn()) svc = CommsService() renamed = svc._db_rename_numeric(set()) assert renamed >= 1