from __future__ import annotations import types import pytest from ariadne.services import comms as comms_module from ariadne.services.comms import CommsService from ariadne.services.comms_protocol import DisplayNameTarget, SynapseUserRef class _Response: def __init__(self, payload=None, status_code: int = 200, text: str = "") -> None: self._payload = payload or {} self.status_code = status_code self.text = text def json(self): return self._payload def raise_for_status(self) -> None: if self.status_code >= 400: raise RuntimeError(f"status {self.status_code}") class _Client: def __init__(self, responses=None, *, fail_delete: bool = False, fail_post: bool = False) -> None: self.responses = responses or {} self.calls = [] self.fail_delete = fail_delete self.fail_post = fail_post def _next(self, method: str, url: str, **kwargs): self.calls.append((method, url, kwargs)) value = self.responses.get((method, url), _Response()) if isinstance(value, list): return value.pop(0) if value else _Response() return value def get(self, url: str, **kwargs): return self._next("GET", url, **kwargs) def post(self, url: str, **kwargs): if self.fail_post: raise RuntimeError("post failed") return self._next("POST", url, **kwargs) def put(self, url: str, **kwargs): return self._next("PUT", url, **kwargs) def delete(self, url: str, **kwargs): if self.fail_delete: raise RuntimeError("delete failed") return self._next("DELETE", url, **kwargs) def _settings(**overrides): defaults = { "comms_mas_admin_client_id": "client", "comms_mas_admin_client_secret": "secret", "comms_mas_token_url": "http://mas/token", "comms_mas_admin_api_base": "http://mas/api/admin/v1", "comms_synapse_base": "http://synapse", "comms_synapse_admin_token": "admintoken", "comms_synapse_db_password": "pw", "comms_room_alias": "#othrys:live.bstein.dev", "comms_server_name": "live.bstein.dev", "comms_seeder_user": "othrys-seeder", "comms_timeout_sec": 5.0, "comms_guest_stale_days": 1, } defaults.update(overrides) return types.SimpleNamespace(**defaults) def _service(monkeypatch, **overrides) -> CommsService: monkeypatch.setattr(comms_module, "settings", _settings(**overrides)) return CommsService() def test_mas_admin_token_retries_and_validates(monkeypatch) -> None: svc = _service(monkeypatch, comms_mas_admin_client_id="") with pytest.raises(RuntimeError, match="credentials missing"): svc._mas_admin_token(_Client()) svc = _service(monkeypatch) sleeps = [] monkeypatch.setattr(svc, "_sleep", sleeps.append) client = _Client( { ("POST", "http://mas/token"): [ _Response({}), _Response({"access_token": "admintoken"}), ], } ) assert svc._mas_admin_token(client) == "admintoken" assert sleeps == [1] assert client.calls[0][2]["headers"]["Authorization"].startswith("Basic ") def test_mas_admin_token_exhaustion_reports_last_error(monkeypatch) -> None: svc = _service(monkeypatch) sleeps = [] monkeypatch.setattr(svc, "_sleep", sleeps.append) client = _Client({("POST", "http://mas/token"): _Response({})}) with pytest.raises(RuntimeError, match="missing mas access token"): svc._mas_admin_token(client) assert sleeps == [1, 2, 4, 8, 16] def test_personal_session_and_revoke_failure_paths(monkeypatch) -> None: svc = _service(monkeypatch) invalid = _Client( { ("POST", "http://mas/api/admin/v1/personal-sessions"): _Response( {"data": {"id": "session-without-token"}} ), } ) with pytest.raises(RuntimeError, match="invalid personal session"): svc._mas_personal_session(invalid, "admintoken", "user-1") svc._mas_revoke_session(_Client(fail_post=True), "admintoken", "session-1") def test_guest_listing_and_room_context_helpers(monkeypatch) -> None: svc = _service(monkeypatch) client = _Client( { ("GET", "http://synapse/_matrix/client/v3/rooms/room%201/members"): _Response( { "chunk": [ {"state_key": "@one:live.bstein.dev", "content": {"displayname": "One"}}, {"state_key": "", "content": {"displayname": ""}}, {"content": {"displayname": "Two"}}, ] } ), ("GET", "http://mas/api/admin/v1/users?page[size]=100"): _Response( { "data": [ "ignored", {"id": "one", "meta": {"page": {"cursor": "cursor 1"}}}, ] } ), ("GET", "http://mas/api/admin/v1/users?page[size]=100&page[after]=cursor%201"): _Response( {"data": [{"id": "two"}]} ), ("GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"): _Response( {"users": [{"name": "@one:live.bstein.dev"}, "ignored"], "next_token": "next 1"} ), ( "GET", "http://synapse/_synapse/admin/v2/users?local=true&deactivated=false&limit=100&from=next%201", ): _Response({"users": [{"name": "@two:live.bstein.dev"}]}), } ) members, existing = svc._room_members(client, "token", "room 1") assert members == {"@one:live.bstein.dev"} assert existing == {"One", "Two"} assert [item["id"] for item in svc._mas_list_users(client, "admintoken")] == ["one", "two"] assert [item["name"] for item in svc._synapse_list_users(client, "fallback")] == [ "@one:live.bstein.dev", "@two:live.bstein.dev", ] invalid_page = _Client({("GET", "http://mas/api/admin/v1/users?page[size]=100"): _Response({"data": "bad"})}) assert svc._mas_list_users(invalid_page, "admintoken") == [] non_dict_cursor = _Client( {("GET", "http://mas/api/admin/v1/users?page[size]=100"): _Response({"data": [{"id": "one"}, "bad"]})} ) assert [item["id"] for item in svc._mas_list_users(non_dict_cursor, "admintoken")] == ["one"] def test_prune_and_displayname_helpers(monkeypatch) -> None: svc = _service(monkeypatch) now_ms = 2 * 24 * 60 * 60 * 1000 assert not svc._should_prune_guest({"is_guest": False, "last_seen_ts": 0}, now_ms) assert not svc._should_prune_guest({"is_guest": True}, now_ms) assert not svc._should_prune_guest({"is_guest": True, "last_seen_ts": "bad"}, now_ms) assert svc._should_prune_guest({"is_guest": True, "last_seen_ts": 0}, now_ms) assert not svc._prune_guest(_Client(fail_delete=True), "token", "@old:live.bstein.dev") bad_delete = _Client( { ("DELETE", "http://synapse/_synapse/admin/v2/users/%40old%3Alive.bstein.dev"): _Response( status_code=500, text="boom", ) } ) assert not svc._prune_guest(bad_delete, "token", "@old:live.bstein.dev") admin_404 = _Client( { ("GET", "http://synapse/_synapse/admin/v2/users/%40missing%3Alive.bstein.dev"): _Response( status_code=404 ) } ) assert svc._get_displayname_admin(admin_404, "token", "@missing:live.bstein.dev") is None admin_ok = _Client( { ("GET", "http://synapse/_synapse/admin/v2/users/%40guest%3Alive.bstein.dev"): _Response( {"displayname": "Guest"} ) } ) assert svc._get_displayname_admin(admin_ok, "token", "@guest:live.bstein.dev") == "Guest" client = _Client() svc._set_displayname( client, "token", DisplayNameTarget( room_id="room 1", user_id="@guest:live.bstein.dev", name="Atlas Guest", in_room=True, ), ) assert [call[0] for call in client.calls] == ["PUT", "PUT"] ok_admin = _Client( { ("PUT", "http://synapse/_synapse/admin/v2/users/%40guest%3Alive.bstein.dev"): _Response( status_code=201 ) } ) assert svc._set_displayname_admin(ok_admin, "token", "@guest:live.bstein.dev", "Guest") fail_admin = _Client( { ("PUT", "http://synapse/_synapse/admin/v2/users/%40guest%3Alive.bstein.dev"): _Response( status_code=500 ) } ) assert not svc._set_displayname_admin(fail_admin, "token", "@guest:live.bstein.dev", "Guest") def test_db_rename_numeric_empty_and_skip_paths(monkeypatch) -> None: assert _service(monkeypatch, comms_synapse_db_password="")._db_rename_numeric(set()) == 0 svc = _service(monkeypatch) queries = [] monkeypatch.setattr(svc, "_pick_guest_name", lambda _existing: None) class Cursor: def execute(self, query, params=None): queries.append(query) def fetchall(self): if len(queries) == 1: return [("1", "@123:live.bstein.dev", "guest-1")] if len(queries) == 2: return [] return [] def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False class Conn: closed = False def cursor(self): return Cursor() def close(self): self.closed = True def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False conn = Conn() monkeypatch.setattr(svc, "_connect_synapse_db", lambda: conn) assert svc._db_rename_numeric(set()) == 0 assert conn.closed def test_db_rename_numeric_inserts_missing_profiles(monkeypatch) -> None: svc = _service(monkeypatch) executed = [] picks = iter([None, "Rhea"]) monkeypatch.setattr(svc, "_pick_guest_name", lambda _existing: next(picks)) class Cursor: def execute(self, query, params=None): executed.append((query, params)) def fetchall(self): if len(executed) == 1: return [] if len(executed) == 2: return [ ("@123:live.bstein.dev",), ("@456:live.bstein.dev",), ("@789:live.bstein.dev",), ] if len(executed) == 3: return [("123", "@123:live.bstein.dev")] return [] def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False class Conn: def cursor(self): return Cursor() def close(self): return None def __enter__(self): return self def __exit__(self, exc_type, exc, tb): return False monkeypatch.setattr(svc, "_connect_synapse_db", Conn) assert svc._db_rename_numeric(set()) == 1 assert any("INSERT INTO profiles" in query for query, _params in executed) def test_guest_name_setting_validation(monkeypatch) -> None: with pytest.raises(RuntimeError, match="mas admin secret"): _service(monkeypatch, comms_mas_admin_client_secret="")._validate_guest_name_settings() with pytest.raises(RuntimeError, match="synapse base"): _service(monkeypatch, comms_synapse_base="")._validate_guest_name_settings() def test_rename_mas_guests_exercises_skip_and_success_paths(monkeypatch) -> None: svc = _service(monkeypatch) users = [ {"id": "ignored", "attributes": {}}, {"id": "human", "attributes": {"username": "alice"}}, {"attributes": {"username": "guest-no-id"}}, {"id": "named", "attributes": {"username": "guest-named"}}, {"id": "no-name", "attributes": {"username": "guest-empty"}}, {"id": "rename", "attributes": {"username": "guest-rename", "legacy_guest": True}}, ] monkeypatch.setattr(svc, "_mas_list_users", lambda *_args: users) monkeypatch.setattr(svc, "_mas_personal_session", lambda _client, _token, user_id: (f"token-{user_id}", f"sess-{user_id}")) displays = { "@guest-named:live.bstein.dev": "Already Human", "@guest-empty:live.bstein.dev": None, "@guest-rename:live.bstein.dev": None, } monkeypatch.setattr(svc, "_get_displayname", lambda _client, _token, user_id: displays[user_id]) picked = iter([None, "Rhea"]) monkeypatch.setattr(svc, "_pick_guest_name", lambda _existing: next(picked)) set_targets = [] revoked = [] monkeypatch.setattr(svc, "_set_displayname", lambda _client, _token, target: set_targets.append(target)) monkeypatch.setattr(svc, "_mas_revoke_session", lambda _client, _token, session_id: revoked.append(session_id)) result = svc._rename_mas_guests(_Client(), "admintoken", "room", {"@guest-rename:live.bstein.dev"}, set()) assert result.renamed == 1 assert result.skipped == 5 assert "guest-rename" in result.usernames assert set_targets[0].in_room assert revoked == ["sess-named", "sess-no-name", "sess-rename"] def test_synapse_rename_helpers_cover_prune_and_skip_paths(monkeypatch) -> None: svc = _service(monkeypatch) monkeypatch.setattr(svc, "_synapse_list_users", lambda *_args: (_ for _ in ()).throw(RuntimeError("boom"))) assert svc._synapse_entries(_Client(), "token") == [] monkeypatch.undo() svc = _service(monkeypatch) assert svc._synapse_user_id({"name": "not-matrix"}) is None assert svc._synapse_user_id({"name": "@guest:live.bstein.dev"}).localpart == "guest" assert not svc._maybe_prune_synapse_guest(_Client(), "token", {"is_guest": False}, "@one:live.bstein.dev", 0) monkeypatch.setattr(svc, "_should_prune_guest", lambda _entry, _now: False) assert not svc._maybe_prune_synapse_guest( _Client(), "token", {"is_guest": True}, "@one:live.bstein.dev", 0, ) monkeypatch.setattr(svc, "_should_prune_guest", lambda _entry, _now: True) monkeypatch.setattr(svc, "_prune_guest", lambda *_args: True) assert svc._maybe_prune_synapse_guest(_Client(), "token", {"is_guest": True}, "@one:live.bstein.dev", 0) client = _Client() assert not svc._needs_synapse_rename( client, "token", SynapseUserRef({"is_guest": True}, "@skip:live.bstein.dev", "skip"), {"skip"}, ) assert not svc._needs_synapse_rename( client, "token", SynapseUserRef({"is_guest": False}, "@alice:live.bstein.dev", "alice"), set(), ) monkeypatch.setattr(svc, "_get_displayname_admin", lambda *_args: "Human") assert not svc._needs_synapse_rename( client, "token", SynapseUserRef({"is_guest": True}, "@guest:live.bstein.dev", "guest"), set(), ) monkeypatch.setattr(svc, "_get_displayname_admin", lambda *_args: None) assert svc._needs_synapse_rename( client, "token", SynapseUserRef({"is_guest": True}, "@guest:live.bstein.dev", "guest"), set(), ) monkeypatch.setattr(svc, "_pick_guest_name", lambda _existing: None) assert not svc._rename_synapse_user(client, "token", set(), "@guest:live.bstein.dev") monkeypatch.setattr(svc, "_pick_guest_name", lambda _existing: "Guest") monkeypatch.setattr(svc, "_set_displayname_admin", lambda *_args: True) assert svc._rename_synapse_user(client, "token", set(), "@guest:live.bstein.dev") entries = [ {"name": "invalid"}, {"name": "@prune:live.bstein.dev"}, {"name": "@skip:live.bstein.dev"}, {"name": "@rename:live.bstein.dev"}, ] monkeypatch.setattr(svc, "_synapse_entries", lambda *_args: entries) monkeypatch.setattr(svc, "_time", lambda: 10.0) monkeypatch.setattr( svc, "_maybe_prune_synapse_guest", lambda _client, _token, _entry, user_id, _now: user_id.startswith("@prune"), ) monkeypatch.setattr( svc, "_needs_synapse_rename", lambda _client, _token, user, _mas_names: user.localpart == "rename", ) monkeypatch.setattr(svc, "_rename_synapse_user", lambda *_args: True) result = svc._rename_synapse_guests(client, "token", set(), set()) assert result.pruned == 1 assert result.renamed == 1