diff --git a/ariadne/services/comms.py b/ariadne/services/comms.py index 9902737..8235a4b 100644 --- a/ariadne/services/comms.py +++ b/ariadne/services/comms.py @@ -1,94 +1,25 @@ from __future__ import annotations -from dataclasses import dataclass -import base64 import time -import urllib.parse from typing import Any import httpx import psycopg from ..settings import settings -from ..utils.logging import get_logger from ..utils.name_generator import NameGenerator +from .comms_guest_names import _CommsGuestNameMixin +from .comms_protocol import _canon_user +from .comms_room_ops import _CommsRoomOpsMixin -logger = get_logger(__name__) +class CommsService(_CommsGuestNameMixin, _CommsRoomOpsMixin): + """Maintain Matrix/MAS guest naming and room hygiene. -HTTP_OK = 200 -HTTP_CREATED = 201 -HTTP_ACCEPTED = 202 -HTTP_NO_CONTENT = 204 -HTTP_BAD_REQUEST = 400 -HTTP_NOT_FOUND = 404 -HTTP_CONFLICT = 409 - -@dataclass(frozen=True) -class CommsSummary: - processed: int - renamed: int - pruned: int - skipped: int - detail: str = "" - - -@dataclass(frozen=True) -class MasGuestResult: - renamed: int - skipped: int - usernames: set[str] - - -@dataclass(frozen=True) -class SynapseGuestResult: - renamed: int - pruned: int - - -@dataclass(frozen=True) -class DisplayNameTarget: - room_id: str - user_id: str - name: str - in_room: bool - - -@dataclass(frozen=True) -class SynapseUserRef: - entry: dict[str, Any] - user_id: str - localpart: str - - -def _auth(token: str) -> dict[str, str]: - return {"Authorization": f"Bearer {token}"} - - -def _canon_user(user: str, server_name: str) -> str: - user = (user or "").strip() - if user.startswith("@") and ":" in user: - return user - user = user.lstrip("@") - if ":" in user: - return f"@{user}" - return f"@{user}:{server_name}" - - -def _needs_rename_username(username: str) -> bool: - return username.isdigit() or username.startswith("guest-") - - -def _needs_rename_display(display: str | None) -> bool: - if not display: - return True - return display.isdigit() or display.startswith("guest-") - - - - -class CommsService: - """Maintain Matrix/MAS guest naming and pruning hygiene.""" + Inputs: Matrix/MAS endpoints, service credentials, and optional database access + from settings. Outputs: scheduled maintenance actions plus small status dicts + for scheduler logging. + """ def __init__( self, @@ -98,6 +29,10 @@ class CommsService: self._client_factory = client_factory self._name_generator = name_generator or NameGenerator() + @property + def _settings(self) -> Any: + return settings + def _pick_guest_name(self, existing: set[str]) -> str | None: return self._name_generator.unique(existing) @@ -108,838 +43,22 @@ class CommsService: token = getattr(settings, "comms_synapse_admin_token", "") return token if token else fallback - def _mas_admin_token(self, client: httpx.Client) -> str: - if not settings.comms_mas_admin_client_id or not settings.comms_mas_admin_client_secret: - raise RuntimeError("mas admin client credentials missing") - basic = base64.b64encode( - f"{settings.comms_mas_admin_client_id}:{settings.comms_mas_admin_client_secret}".encode() - ).decode() - last_err: Exception | None = None - for attempt in range(5): - try: - resp = client.post( - settings.comms_mas_token_url, - headers={"Authorization": f"Basic {basic}"}, - data={"grant_type": "client_credentials", "scope": "urn:mas:admin"}, - ) - resp.raise_for_status() - payload = resp.json() - token = payload.get("access_token") - if not isinstance(token, str) or not token: - raise RuntimeError("missing mas access token") - return token - except Exception as exc: # noqa: BLE001 - last_err = exc - time.sleep(2**attempt) - raise RuntimeError(str(last_err) if last_err else "mas admin token failed") - - def _mas_user_id(self, client: httpx.Client, token: str, username: str) -> str: - url = f"{settings.comms_mas_admin_api_base}/users/by-username/{urllib.parse.quote(username)}" - resp = client.get(url, headers=_auth(token)) - resp.raise_for_status() - payload = resp.json() - return payload["data"]["id"] - - def _mas_personal_session(self, client: httpx.Client, token: str, user_id: str) -> tuple[str, str]: - resp = client.post( - f"{settings.comms_mas_admin_api_base}/personal-sessions", - headers=_auth(token), - json={ - "actor_user_id": user_id, - "human_name": "guest-name-randomizer", - "scope": "urn:matrix:client:api:*", - "expires_in": 300, - }, - ) - resp.raise_for_status() - payload = resp.json().get("data", {}) - session_id = payload.get("id") - attrs = (payload.get("attributes") or {}) if isinstance(payload, dict) else {} - access_token = attrs.get("access_token") - if not isinstance(access_token, str) or not isinstance(session_id, str): - raise RuntimeError("invalid personal session response") - return access_token, session_id - - def _mas_revoke_session(self, client: httpx.Client, token: str, session_id: str) -> None: - try: - client.post( - f"{settings.comms_mas_admin_api_base}/personal-sessions/{urllib.parse.quote(session_id)}/revoke", - headers=_auth(token), - json={}, - ) - except Exception: - return - - def _resolve_alias(self, client: httpx.Client, token: str, alias: str) -> str: - resp = client.get( - f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}", - headers=_auth(token), - ) - resp.raise_for_status() - payload = resp.json() - return payload["room_id"] - - def _room_members(self, client: httpx.Client, token: str, room_id: str) -> tuple[set[str], set[str]]: - resp = client.get( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members", - headers=_auth(token), - ) - resp.raise_for_status() - payload = resp.json() - members: set[str] = set() - existing: set[str] = set() - for ev in payload.get("chunk", []) or []: - user_id = ev.get("state_key") - if isinstance(user_id, str) and user_id: - members.add(user_id) - display = (ev.get("content") or {}).get("displayname") - if isinstance(display, str) and display: - existing.add(display) - return members, existing - - def _mas_list_users(self, client: httpx.Client, token: str) -> list[dict[str, Any]]: - users: list[dict[str, Any]] = [] - cursor = None - while True: - url = f"{settings.comms_mas_admin_api_base}/users?page[size]=100" - if cursor: - url += f"&page[after]={urllib.parse.quote(cursor)}" - resp = client.get(url, headers=_auth(token)) - resp.raise_for_status() - payload = resp.json() - data = payload.get("data") or [] - if not isinstance(data, list) or not data: - break - users.extend([item for item in data if isinstance(item, dict)]) - last = data[-1] - cursor = ( - last.get("meta", {}) - if isinstance(last, dict) - else {} - ).get("page", {}).get("cursor") - if not cursor: - break - return users - - def _synapse_list_users(self, client: httpx.Client, token: str) -> list[dict[str, Any]]: - users: list[dict[str, Any]] = [] - from_token = None - admin_token = self._admin_token(token) - while True: - url = "{}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100".format( - settings.comms_synapse_base - ) - if from_token: - url += f"&from={urllib.parse.quote(from_token)}" - resp = client.get(url, headers=_auth(admin_token)) - resp.raise_for_status() - payload = resp.json() - users.extend([item for item in payload.get("users", []) if isinstance(item, dict)]) - from_token = payload.get("next_token") - if not from_token: - break - return users - - def _should_prune_guest(self, entry: dict[str, Any], now_ms: int) -> bool: - if not entry.get("is_guest"): - return False - last_seen = entry.get("last_seen_ts") - if last_seen is None: - return False - try: - last_seen = int(last_seen) - except (TypeError, ValueError): - return False - stale_ms = int(settings.comms_guest_stale_days) * 24 * 60 * 60 * 1000 - return now_ms - last_seen > stale_ms - - def _prune_guest(self, client: httpx.Client, token: str, user_id: str) -> bool: - admin_token = self._admin_token(token) - try: - resp = client.delete( - f"{settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}", - headers=_auth(admin_token), - params={"erase": "true"}, - ) - except Exception as exc: # noqa: BLE001 - logger.info( - "guest prune failed", - extra={"event": "comms_guest_prune", "status": "error", "detail": str(exc)}, - ) - return False - if resp.status_code in (HTTP_OK, HTTP_ACCEPTED, HTTP_NO_CONTENT, HTTP_NOT_FOUND): - return True - logger.info( - "guest prune failed", - extra={ - "event": "comms_guest_prune", - "status": "error", - "detail": f"{resp.status_code} {resp.text}", - }, - ) - return False - - def _get_displayname(self, client: httpx.Client, token: str, user_id: str) -> str | None: - resp = client.get( - f"{settings.comms_synapse_base}/_matrix/client/v3/profile/{urllib.parse.quote(user_id)}", - headers=_auth(token), - ) - resp.raise_for_status() - return resp.json().get("displayname") - - def _get_displayname_admin(self, client: httpx.Client, token: str, user_id: str) -> str | None: - admin_token = self._admin_token(token) - resp = client.get( - f"{settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}", - headers=_auth(admin_token), - ) - if resp.status_code == HTTP_NOT_FOUND: - return None - resp.raise_for_status() - return resp.json().get("displayname") - - def _set_displayname( - self, - client: httpx.Client, - token: str, - target: DisplayNameTarget, - ) -> None: - resp = client.put( - f"{settings.comms_synapse_base}/_matrix/client/v3/profile/{urllib.parse.quote(target.user_id)}/displayname", - headers=_auth(token), - json={"displayname": target.name}, - ) - resp.raise_for_status() - if not target.in_room: - return - state_url = ( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(target.room_id)}" - f"/state/m.room.member/{urllib.parse.quote(target.user_id)}" - ) - client.put( - state_url, - headers=_auth(token), - json={"membership": "join", "displayname": target.name}, - ) - - def _set_displayname_admin(self, client: httpx.Client, token: str, user_id: str, name: str) -> bool: - admin_token = self._admin_token(token) - resp = client.put( - f"{settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}", - headers=_auth(admin_token), - json={"displayname": name}, - ) - return resp.status_code in (HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT) - - def _db_rename_numeric(self, existing: set[str]) -> int: - if not settings.comms_synapse_db_password: - return 0 - renamed = 0 - conn = psycopg.connect( + def _connect_synapse_db(self) -> Any: + return psycopg.connect( host=settings.comms_synapse_db_host, port=settings.comms_synapse_db_port, dbname=settings.comms_synapse_db_name, user=settings.comms_synapse_db_user, password=settings.comms_synapse_db_password, ) - try: - with conn: - with conn.cursor() as cur: - pattern = f"^@\\d+:{settings.comms_server_name}$" - cur.execute( - "SELECT user_id, full_user_id, displayname FROM profiles WHERE full_user_id ~ %s", - (pattern,), - ) - profile_rows = cur.fetchall() - profile_index = {row[1]: row for row in profile_rows} - for _user_id, full_user_id, display in profile_rows: - if display and not _needs_rename_display(display): - continue - new_name = self._pick_guest_name(existing) - if not new_name: - continue - cur.execute( - "UPDATE profiles SET displayname = %s WHERE full_user_id = %s", - (new_name, full_user_id), - ) - renamed += 1 - cur.execute( - "SELECT name FROM users WHERE name ~ %s", - (pattern,), - ) - users = [row[0] for row in cur.fetchall()] - if not users: - return renamed - cur.execute( - "SELECT user_id, full_user_id FROM profiles WHERE full_user_id = ANY(%s)", - (users,), - ) - for existing_full in cur.fetchall(): - profile_index.setdefault(existing_full[1], existing_full) + def _sleep(self, seconds: float) -> None: + time.sleep(seconds) - for full_user_id in users: - if full_user_id in profile_index: - continue - localpart = full_user_id.split(":", 1)[0].lstrip("@") - new_name = self._pick_guest_name(existing) - if not new_name: - continue - cur.execute( - "INSERT INTO profiles (user_id, displayname, full_user_id) VALUES (%s, %s, %s) " - "ON CONFLICT (full_user_id) DO UPDATE SET displayname = EXCLUDED.displayname", - (localpart, new_name, full_user_id), - ) - renamed += 1 - finally: - conn.close() - return renamed - - def _validate_guest_name_settings(self) -> None: - if not settings.comms_mas_admin_client_id or not settings.comms_mas_admin_client_secret: - raise RuntimeError("comms mas admin secret missing") - if not settings.comms_synapse_base: - raise RuntimeError("comms synapse base missing") - - def _room_context(self, client: httpx.Client, token: str) -> tuple[str, set[str], set[str]]: - room_id = self._resolve_alias(client, token, settings.comms_room_alias) - members, existing = self._room_members(client, token, room_id) - return room_id, members, existing - - def _rename_mas_guests( - self, - client: httpx.Client, - admin_token: str, - room_id: str, - members: set[str], - existing: set[str], - ) -> MasGuestResult: - renamed = 0 - skipped = 0 - mas_usernames: set[str] = set() - users = self._mas_list_users(client, admin_token) - for user in users: - attrs = user.get("attributes") or {} - username = attrs.get("username") or "" - if isinstance(username, str) and username: - mas_usernames.add(username) - legacy_guest = attrs.get("legacy_guest") - if not isinstance(username, str) or not username: - skipped += 1 - continue - if not (legacy_guest or _needs_rename_username(username)): - skipped += 1 - continue - user_id = user.get("id") - if not isinstance(user_id, str) or not user_id: - skipped += 1 - continue - full_user = f"@{username}:{settings.comms_server_name}" - access_token, session_id = self._mas_personal_session(client, admin_token, user_id) - try: - display = self._get_displayname(client, access_token, full_user) - if display and not _needs_rename_display(display): - skipped += 1 - continue - new_name = self._pick_guest_name(existing) - if not new_name: - skipped += 1 - continue - self._set_displayname( - client, - access_token, - DisplayNameTarget( - room_id=room_id, - user_id=full_user, - name=new_name, - in_room=full_user in members, - ), - ) - renamed += 1 - finally: - self._mas_revoke_session(client, admin_token, session_id) - return MasGuestResult(renamed=renamed, skipped=skipped, usernames=mas_usernames) - - def _synapse_entries(self, client: httpx.Client, token: str) -> list[dict[str, Any]]: - try: - return self._synapse_list_users(client, token) - except Exception as exc: # noqa: BLE001 - logger.info( - "synapse admin list skipped", - extra={"event": "comms_guest_list", "status": "error", "detail": str(exc)}, - ) - return [] - - def _synapse_user_id(self, entry: dict[str, Any]) -> SynapseUserRef | None: - user_id = entry.get("name") or "" - if not isinstance(user_id, str) or not user_id.startswith("@"): - return None - localpart = user_id.split(":", 1)[0].lstrip("@") - return SynapseUserRef(entry=entry, user_id=user_id, localpart=localpart) - - def _maybe_prune_synapse_guest( - self, - client: httpx.Client, - token: str, - entry: dict[str, Any], - user_id: str, - now_ms: int, - ) -> bool: - if not entry.get("is_guest"): - return False - if not self._should_prune_guest(entry, now_ms): - return False - return self._prune_guest(client, token, user_id) - - def _needs_synapse_rename( - self, - client: httpx.Client, - token: str, - user: SynapseUserRef, - mas_usernames: set[str], - ) -> bool: - if user.localpart in mas_usernames: - return False - is_guest = user.entry.get("is_guest") - if not (is_guest or _needs_rename_username(user.localpart)): - return False - display = self._get_displayname_admin(client, token, user.user_id) - if display and not _needs_rename_display(display): - return False - return True - - def _rename_synapse_user( - self, - client: httpx.Client, - token: str, - existing: set[str], - user_id: str, - ) -> bool: - new_name = self._pick_guest_name(existing) - if not new_name: - return False - return self._set_displayname_admin(client, token, user_id, new_name) - - def _rename_synapse_guests( - self, - client: httpx.Client, - token: str, - existing: set[str], - mas_usernames: set[str], - ) -> SynapseGuestResult: - renamed = 0 - pruned = 0 - entries = self._synapse_entries(client, token) - - now_ms = int(time.time() * 1000) - for entry in entries: - user_ref = self._synapse_user_id(entry) - if not user_ref: - continue - if self._maybe_prune_synapse_guest(client, token, user_ref.entry, user_ref.user_id, now_ms): - pruned += 1 - continue - if not self._needs_synapse_rename(client, token, user_ref, mas_usernames): - continue - if self._rename_synapse_user(client, token, existing, user_ref.user_id): - renamed += 1 - return SynapseGuestResult(renamed=renamed, pruned=pruned) - - def run_guest_name_randomizer(self, wait: bool = True) -> dict[str, Any]: - self._validate_guest_name_settings() - - with self._client() as client: - admin_token = self._mas_admin_token(client) - seeder_id = self._mas_user_id(client, admin_token, settings.comms_seeder_user) - seeder_token, seeder_session = self._mas_personal_session(client, admin_token, seeder_id) - try: - room_id, members, existing = self._room_context(client, seeder_token) - mas_result = self._rename_mas_guests(client, admin_token, room_id, members, existing) - synapse_result = self._rename_synapse_guests( - client, - seeder_token, - existing, - mas_result.usernames, - ) - db_renamed = self._db_rename_numeric(existing) - finally: - self._mas_revoke_session(client, admin_token, seeder_session) - - renamed = mas_result.renamed + synapse_result.renamed + db_renamed - pruned = synapse_result.pruned - skipped = mas_result.skipped - processed = renamed + pruned + skipped - summary = CommsSummary(processed, renamed, pruned, skipped) - logger.info( - "comms guest name sync finished", - extra={ - "event": "comms_guest_name", - "status": "ok", - "processed": summary.processed, - "renamed": summary.renamed, - "pruned": summary.pruned, - "skipped": summary.skipped, - }, - ) - return {"status": "ok", **summary.__dict__} - - def run_pin_invite(self, wait: bool = True) -> dict[str, Any]: - if not settings.comms_seeder_password: - raise RuntimeError("comms seeder password missing") - - with self._client() as client: - token = self._login(client, settings.comms_seeder_user, settings.comms_seeder_password) - room_id = self._resolve_alias(client, token, settings.comms_room_alias) - pinned = self._get_pinned(client, token, room_id) - for event_id in pinned: - event = self._get_event(client, token, room_id, event_id) - if event and (event.get("content") or {}).get("body") == settings.comms_pin_message: - return {"status": "ok", "detail": "already pinned"} - event_id = self._send_message(client, token, room_id, settings.comms_pin_message) - if not event_id: - return {"status": "error", "detail": "pin event_id missing"} - self._pin_message(client, token, room_id, event_id) - return {"status": "ok", "detail": "pinned"} - - def run_reset_room(self, wait: bool = True) -> dict[str, Any]: - if not settings.comms_seeder_password: - raise RuntimeError("comms seeder password missing") - - with self._client() as client: - token = self._login_with_retry(client, settings.comms_seeder_user, settings.comms_seeder_password) - old_room_id = self._resolve_alias(client, token, settings.comms_room_alias) - new_room_id = self._create_room(client, token, settings.comms_room_name) - self._set_room_state(client, token, new_room_id, "m.room.join_rules", {"join_rule": "public"}) - self._set_room_state(client, token, new_room_id, "m.room.guest_access", {"guest_access": "can_join"}) - self._set_room_state( - client, - token, - new_room_id, - "m.room.history_visibility", - {"history_visibility": "shared"}, - ) - self._set_room_state(client, token, new_room_id, "m.room.power_levels", self._power_levels()) - - self._delete_alias(client, token, settings.comms_room_alias) - self._put_alias(client, token, settings.comms_room_alias, new_room_id) - self._set_room_state( - client, - token, - new_room_id, - "m.room.canonical_alias", - {"alias": settings.comms_room_alias}, - ) - self._set_directory_visibility(client, token, new_room_id, "public") - - bot_user_id = _canon_user(settings.comms_bot_user, settings.comms_server_name) - self._invite_user(client, token, new_room_id, bot_user_id) - for uid in self._list_joined_members(client, token, old_room_id): - if uid == _canon_user(settings.comms_seeder_user, settings.comms_server_name): - continue - localpart = uid.split(":", 1)[0].lstrip("@") - if localpart.isdigit(): - continue - self._invite_user(client, token, new_room_id, uid) - - event_id = self._send_message(client, token, new_room_id, settings.comms_pin_message) - if not event_id: - raise RuntimeError("pin message event_id missing") - self._set_room_state(client, token, new_room_id, "m.room.pinned_events", {"pinned": [event_id]}) - - self._set_directory_visibility(client, token, old_room_id, "private") - self._set_room_state(client, token, old_room_id, "m.room.join_rules", {"join_rule": "invite"}) - self._set_room_state(client, token, old_room_id, "m.room.guest_access", {"guest_access": "forbidden"}) - self._set_room_state( - client, - token, - old_room_id, - "m.room.tombstone", - { - "body": "Othrys has been reset. Please join the new room.", - "replacement_room": new_room_id, - }, - ) - self._send_message( - client, - token, - old_room_id, - "Othrys was reset. Join the new room at https://live.bstein.dev/#/room/#othrys:live.bstein.dev?action=join", - ) - - return {"status": "ok", "detail": f"old_room_id={old_room_id} new_room_id={new_room_id}"} - - def run_seed_room(self, wait: bool = True) -> dict[str, Any]: - if not settings.comms_seeder_password or not settings.comms_bot_password: - raise RuntimeError("comms seeder/bot password missing") - - with self._client() as client: - token = self._login(client, settings.comms_seeder_user, settings.comms_seeder_password) - for user, password, admin in ( - (settings.comms_seeder_user, settings.comms_seeder_password, True), - (settings.comms_bot_user, settings.comms_bot_password, False), - ): - try: - self._ensure_user(client, token, user, password, admin) - except RuntimeError as exc: - message = str(exc) - if "You are not a server admin" in message: - logger.warning( - "comms seed room ensure skipped", - extra={"event": "comms_seed_room", "user": user, "detail": message}, - ) - continue - raise - room_id = self._ensure_room(client, token) - self._join_user(client, token, room_id, _canon_user(settings.comms_bot_user, settings.comms_server_name)) - self._join_all_locals(client, token, room_id) - return {"status": "ok", "detail": "room seeded"} - - def _login(self, client: httpx.Client, user: str, password: str) -> str: - resp = client.post( - f"{settings.comms_auth_base}/_matrix/client/v3/login", - json={ - "type": "m.login.password", - "identifier": {"type": "m.id.user", "user": _canon_user(user, settings.comms_server_name)}, - "password": password, - }, - ) - if resp.status_code != HTTP_OK: - raise RuntimeError(f"login failed: {resp.status_code} {resp.text}") - payload = resp.json() - token = payload.get("access_token") - if not isinstance(token, str) or not token: - raise RuntimeError("login missing token") - return token - - def _login_with_retry(self, client: httpx.Client, user: str, password: str) -> str: - last: Exception | None = None - for attempt in range(1, 6): - try: - return self._login(client, user, password) - except Exception as exc: # noqa: BLE001 - last = exc - time.sleep(attempt * 2) - raise RuntimeError(str(last) if last else "login failed") - - def _get_pinned(self, client: httpx.Client, token: str, room_id: str) -> list[str]: - resp = client.get( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events", - headers=_auth(token), - ) - if resp.status_code == HTTP_NOT_FOUND: - return [] - resp.raise_for_status() - pinned = resp.json().get("pinned", []) - return [item for item in pinned if isinstance(item, str)] - - def _get_event(self, client: httpx.Client, token: str, room_id: str, event_id: str) -> dict[str, Any] | None: - resp = client.get( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/event/{urllib.parse.quote(event_id)}", - headers=_auth(token), - ) - if resp.status_code == HTTP_NOT_FOUND: - return None - resp.raise_for_status() - return resp.json() - - def _send_message(self, client: httpx.Client, token: str, room_id: str, body: str) -> str: - resp = client.post( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/send/m.room.message", - headers=_auth(token), - json={"msgtype": "m.text", "body": body}, - ) - resp.raise_for_status() - payload = resp.json() - event_id = payload.get("event_id") - return event_id if isinstance(event_id, str) else "" - - def _pin_message(self, client: httpx.Client, token: str, room_id: str, event_id: str) -> None: - resp = client.put( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events", - headers=_auth(token), - json={"pinned": [event_id]}, - ) - resp.raise_for_status() - - def _create_room(self, client: httpx.Client, token: str, name: str) -> str: - resp = client.post( - f"{settings.comms_synapse_base}/_matrix/client/v3/createRoom", - headers=_auth(token), - json={"preset": "public_chat", "name": name, "room_version": "11"}, - ) - resp.raise_for_status() - return resp.json()["room_id"] - - def _set_room_state(self, client: httpx.Client, token: str, room_id: str, ev_type: str, content: dict[str, Any]) -> None: - resp = client.put( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/{ev_type}", - headers=_auth(token), - json=content, - ) - resp.raise_for_status() - - def _set_directory_visibility(self, client: httpx.Client, token: str, room_id: str, visibility: str) -> None: - resp = client.put( - f"{settings.comms_synapse_base}/_matrix/client/v3/directory/list/room/{urllib.parse.quote(room_id)}", - headers=_auth(token), - json={"visibility": visibility}, - ) - resp.raise_for_status() - - def _delete_alias(self, client: httpx.Client, token: str, alias: str) -> None: - resp = client.delete( - f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}", - headers=_auth(token), - ) - if resp.status_code in (HTTP_OK, HTTP_ACCEPTED, HTTP_NOT_FOUND): - return - resp.raise_for_status() - - def _put_alias(self, client: httpx.Client, token: str, alias: str, room_id: str) -> None: - resp = client.put( - f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}", - headers=_auth(token), - json={"room_id": room_id}, - ) - resp.raise_for_status() - - def _list_joined_members(self, client: httpx.Client, token: str, room_id: str) -> list[str]: - resp = client.get( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members?membership=join", - headers=_auth(token), - ) - resp.raise_for_status() - members = [] - for ev in resp.json().get("chunk", []) or []: - if ev.get("type") != "m.room.member": - continue - uid = ev.get("state_key") - if isinstance(uid, str) and uid.startswith("@"): - members.append(uid) - return members - - def _invite_user(self, client: httpx.Client, token: str, room_id: str, user_id: str) -> None: - resp = client.post( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/invite", - headers=_auth(token), - json={"user_id": user_id}, - ) - if resp.status_code in (HTTP_OK, HTTP_ACCEPTED): - return - resp.raise_for_status() - - def _power_levels(self) -> dict[str, Any]: - return { - "ban": 50, - "events": { - "m.room.avatar": 50, - "m.room.canonical_alias": 50, - "m.room.encryption": 100, - "m.room.history_visibility": 100, - "m.room.name": 50, - "m.room.power_levels": 100, - "m.room.server_acl": 100, - "m.room.tombstone": 100, - }, - "events_default": 0, - "historical": 100, - "invite": 50, - "kick": 50, - "m.call.invite": 50, - "redact": 50, - "state_default": 50, - "users": { _canon_user(settings.comms_seeder_user, settings.comms_server_name): 100 }, - "users_default": 0, - } - - def _ensure_user(self, client: httpx.Client, token: str, localpart: str, password: str, admin: bool) -> None: - admin_token = self._admin_token(token) - user_id = _canon_user(localpart, settings.comms_server_name) - url = f"{settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}" - resp = client.get(url, headers=_auth(admin_token)) - if resp.status_code == HTTP_OK: - return - payload = {"password": password, "admin": admin, "deactivated": False} - create = client.put(url, headers=_auth(admin_token), json=payload) - if create.status_code not in (HTTP_OK, HTTP_CREATED): - raise RuntimeError(f"create user {user_id} failed: {create.status_code} {create.text}") - - def _ensure_room(self, client: httpx.Client, token: str) -> str: - alias = settings.comms_room_alias - alias_enc = urllib.parse.quote(alias) - exists = client.get( - f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{alias_enc}", - headers=_auth(token), - ) - if exists.status_code == HTTP_OK: - room_id = exists.json()["room_id"] - else: - create = client.post( - f"{settings.comms_synapse_base}/_matrix/client/v3/createRoom", - headers=_auth(token), - json={ - "preset": "public_chat", - "name": settings.comms_room_name, - "room_alias_name": alias.split(":", 1)[0].lstrip("#"), - "initial_state": [], - "power_level_content_override": { - "events_default": 0, - "users_default": 0, - "state_default": 50, - }, - }, - ) - if create.status_code not in (HTTP_OK, HTTP_CONFLICT): - raise RuntimeError(f"create room failed: {create.status_code} {create.text}") - exists = client.get( - f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{alias_enc}", - headers=_auth(token), - ) - room_id = exists.json()["room_id"] - - state_events = [ - ("m.room.join_rules", {"join_rule": "public"}), - ("m.room.guest_access", {"guest_access": "can_join"}), - ("m.room.history_visibility", {"history_visibility": "shared"}), - ("m.room.canonical_alias", {"alias": alias}), - ] - for ev_type, content in state_events: - client.put( - f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{room_id}/state/{ev_type}", - headers=_auth(token), - json=content, - ) - client.put( - f"{settings.comms_synapse_base}/_matrix/client/v3/directory/list/room/{room_id}", - headers=_auth(token), - json={"visibility": "public"}, - ) - return room_id - - def _join_user(self, client: httpx.Client, token: str, room_id: str, user_id: str) -> None: - admin_token = self._admin_token(token) - client.post( - f"{settings.comms_synapse_base}/_synapse/admin/v1/join/{urllib.parse.quote(room_id)}", - headers=_auth(admin_token), - json={"user_id": user_id}, - ) - - def _join_all_locals(self, client: httpx.Client, token: str, room_id: str) -> None: - users: list[str] = [] - from_token = None - admin_token = self._admin_token(token) - while True: - url = f"{settings.comms_synapse_base}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100" - if from_token: - url += f"&from={from_token}" - resp = client.get(url, headers=_auth(admin_token)) - payload = resp.json() - users.extend([u["name"] for u in payload.get("users", []) if isinstance(u, dict) and u.get("name")]) - from_token = payload.get("next_token") - if not from_token: - break - for uid in users: - self._join_user(client, token, room_id, uid) + def _time(self) -> float: + return time.time() comms = CommsService() + +__all__ = ["CommsService", "_canon_user", "comms", "psycopg", "settings"] diff --git a/ariadne/services/comms_guest_names.py b/ariadne/services/comms_guest_names.py new file mode 100644 index 0000000..25bf158 --- /dev/null +++ b/ariadne/services/comms_guest_names.py @@ -0,0 +1,485 @@ +from __future__ import annotations + +import base64 +from typing import Any +import urllib.parse + +import httpx + +from ..utils.logging import get_logger +from .comms_protocol import ( + HTTP_ACCEPTED, + HTTP_CREATED, + HTTP_NO_CONTENT, + HTTP_NOT_FOUND, + HTTP_OK, + CommsSummary, + DisplayNameTarget, + MasGuestResult, + SynapseGuestResult, + SynapseUserRef, + _auth, + _needs_rename_display, + _needs_rename_username, +) + + +logger = get_logger(__name__) + + +class _CommsGuestNameMixin: + def _mas_admin_token(self, client: httpx.Client) -> str: + settings = self._settings + if not settings.comms_mas_admin_client_id or not settings.comms_mas_admin_client_secret: + raise RuntimeError("mas admin client credentials missing") + basic = base64.b64encode( + f"{settings.comms_mas_admin_client_id}:{settings.comms_mas_admin_client_secret}".encode() + ).decode() + last_err: Exception | None = None + for attempt in range(5): + try: + resp = client.post( + settings.comms_mas_token_url, + headers={"Authorization": f"Basic {basic}"}, + data={"grant_type": "client_credentials", "scope": "urn:mas:admin"}, + ) + resp.raise_for_status() + payload = resp.json() + token = payload.get("access_token") + if not isinstance(token, str) or not token: + raise RuntimeError("missing mas access token") + return token + except Exception as exc: # noqa: BLE001 + last_err = exc + self._sleep(2**attempt) + raise RuntimeError(str(last_err) if last_err else "mas admin token failed") + + def _mas_user_id(self, client: httpx.Client, token: str, username: str) -> str: + url = f"{self._settings.comms_mas_admin_api_base}/users/by-username/{urllib.parse.quote(username)}" + resp = client.get(url, headers=_auth(token)) + resp.raise_for_status() + payload = resp.json() + return payload["data"]["id"] + + def _mas_personal_session(self, client: httpx.Client, token: str, user_id: str) -> tuple[str, str]: + resp = client.post( + f"{self._settings.comms_mas_admin_api_base}/personal-sessions", + headers=_auth(token), + json={ + "actor_user_id": user_id, + "human_name": "guest-name-randomizer", + "scope": "urn:matrix:client:api:*", + "expires_in": 300, + }, + ) + resp.raise_for_status() + payload = resp.json().get("data", {}) + session_id = payload.get("id") + attrs = (payload.get("attributes") or {}) if isinstance(payload, dict) else {} + access_token = attrs.get("access_token") + if not isinstance(access_token, str) or not isinstance(session_id, str): + raise RuntimeError("invalid personal session response") + return access_token, session_id + + def _mas_revoke_session(self, client: httpx.Client, token: str, session_id: str) -> None: + try: + client.post( + f"{self._settings.comms_mas_admin_api_base}/personal-sessions/{urllib.parse.quote(session_id)}/revoke", + headers=_auth(token), + json={}, + ) + except Exception: + return + + def _room_members(self, client: httpx.Client, token: str, room_id: str) -> tuple[set[str], set[str]]: + resp = client.get( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members", + headers=_auth(token), + ) + resp.raise_for_status() + payload = resp.json() + members: set[str] = set() + existing: set[str] = set() + for ev in payload.get("chunk", []) or []: + user_id = ev.get("state_key") + if isinstance(user_id, str) and user_id: + members.add(user_id) + display = (ev.get("content") or {}).get("displayname") + if isinstance(display, str) and display: + existing.add(display) + return members, existing + + def _mas_list_users(self, client: httpx.Client, token: str) -> list[dict[str, Any]]: + users: list[dict[str, Any]] = [] + cursor = None + while True: + url = f"{self._settings.comms_mas_admin_api_base}/users?page[size]=100" + if cursor: + url += f"&page[after]={urllib.parse.quote(cursor)}" + resp = client.get(url, headers=_auth(token)) + resp.raise_for_status() + payload = resp.json() + data = payload.get("data") or [] + if not isinstance(data, list) or not data: + break + users.extend([item for item in data if isinstance(item, dict)]) + last = data[-1] + cursor = ( + last.get("meta", {}) + if isinstance(last, dict) + else {} + ).get("page", {}).get("cursor") + if not cursor: + break + return users + + def _synapse_list_users(self, client: httpx.Client, token: str) -> list[dict[str, Any]]: + users: list[dict[str, Any]] = [] + from_token = None + admin_token = self._admin_token(token) + while True: + url = "{}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100".format( + self._settings.comms_synapse_base + ) + if from_token: + url += f"&from={urllib.parse.quote(from_token)}" + resp = client.get(url, headers=_auth(admin_token)) + resp.raise_for_status() + payload = resp.json() + users.extend([item for item in payload.get("users", []) if isinstance(item, dict)]) + from_token = payload.get("next_token") + if not from_token: + break + return users + + def _should_prune_guest(self, entry: dict[str, Any], now_ms: int) -> bool: + if not entry.get("is_guest"): + return False + last_seen = entry.get("last_seen_ts") + if last_seen is None: + return False + try: + last_seen = int(last_seen) + except (TypeError, ValueError): + return False + stale_ms = int(self._settings.comms_guest_stale_days) * 24 * 60 * 60 * 1000 + return now_ms - last_seen > stale_ms + + def _prune_guest(self, client: httpx.Client, token: str, user_id: str) -> bool: + admin_token = self._admin_token(token) + try: + resp = client.delete( + f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}", + headers=_auth(admin_token), + params={"erase": "true"}, + ) + except Exception as exc: # noqa: BLE001 + logger.info( + "guest prune failed", + extra={"event": "comms_guest_prune", "status": "error", "detail": str(exc)}, + ) + return False + if resp.status_code in (HTTP_OK, HTTP_ACCEPTED, HTTP_NO_CONTENT, HTTP_NOT_FOUND): + return True + logger.info( + "guest prune failed", + extra={ + "event": "comms_guest_prune", + "status": "error", + "detail": f"{resp.status_code} {resp.text}", + }, + ) + return False + + def _get_displayname(self, client: httpx.Client, token: str, user_id: str) -> str | None: + resp = client.get( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/profile/{urllib.parse.quote(user_id)}", + headers=_auth(token), + ) + resp.raise_for_status() + return resp.json().get("displayname") + + def _get_displayname_admin(self, client: httpx.Client, token: str, user_id: str) -> str | None: + admin_token = self._admin_token(token) + resp = client.get( + f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}", + headers=_auth(admin_token), + ) + if resp.status_code == HTTP_NOT_FOUND: + return None + resp.raise_for_status() + return resp.json().get("displayname") + + def _set_displayname( + self, + client: httpx.Client, + token: str, + target: DisplayNameTarget, + ) -> None: + resp = client.put( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/profile/{urllib.parse.quote(target.user_id)}/displayname", + headers=_auth(token), + json={"displayname": target.name}, + ) + resp.raise_for_status() + if not target.in_room: + return + state_url = ( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(target.room_id)}" + f"/state/m.room.member/{urllib.parse.quote(target.user_id)}" + ) + client.put( + state_url, + headers=_auth(token), + json={"membership": "join", "displayname": target.name}, + ) + + def _set_displayname_admin(self, client: httpx.Client, token: str, user_id: str, name: str) -> bool: + admin_token = self._admin_token(token) + resp = client.put( + f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}", + headers=_auth(admin_token), + json={"displayname": name}, + ) + return resp.status_code in (HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT) + + def _db_rename_numeric(self, existing: set[str]) -> int: + settings = self._settings + if not settings.comms_synapse_db_password: + return 0 + renamed = 0 + conn = self._connect_synapse_db() + try: + with conn: + with conn.cursor() as cur: + pattern = f"^@\\d+:{settings.comms_server_name}$" + cur.execute( + "SELECT user_id, full_user_id, displayname FROM profiles WHERE full_user_id ~ %s", + (pattern,), + ) + profile_rows = cur.fetchall() + profile_index = {row[1]: row for row in profile_rows} + for _user_id, full_user_id, display in profile_rows: + if display and not _needs_rename_display(display): + continue + new_name = self._pick_guest_name(existing) + if not new_name: + continue + cur.execute( + "UPDATE profiles SET displayname = %s WHERE full_user_id = %s", + (new_name, full_user_id), + ) + renamed += 1 + + cur.execute( + "SELECT name FROM users WHERE name ~ %s", + (pattern,), + ) + users = [row[0] for row in cur.fetchall()] + if not users: + return renamed + cur.execute( + "SELECT user_id, full_user_id FROM profiles WHERE full_user_id = ANY(%s)", + (users,), + ) + for existing_full in cur.fetchall(): + profile_index.setdefault(existing_full[1], existing_full) + + for full_user_id in users: + if full_user_id in profile_index: + continue + localpart = full_user_id.split(":", 1)[0].lstrip("@") + new_name = self._pick_guest_name(existing) + if not new_name: + continue + cur.execute( + "INSERT INTO profiles (user_id, displayname, full_user_id) VALUES (%s, %s, %s) " + "ON CONFLICT (full_user_id) DO UPDATE SET displayname = EXCLUDED.displayname", + (localpart, new_name, full_user_id), + ) + renamed += 1 + finally: + conn.close() + return renamed + + def _validate_guest_name_settings(self) -> None: + if not self._settings.comms_mas_admin_client_id or not self._settings.comms_mas_admin_client_secret: + raise RuntimeError("comms mas admin secret missing") + if not self._settings.comms_synapse_base: + raise RuntimeError("comms synapse base missing") + + def _room_context(self, client: httpx.Client, token: str) -> tuple[str, set[str], set[str]]: + room_id = self._resolve_alias(client, token, self._settings.comms_room_alias) + members, existing = self._room_members(client, token, room_id) + return room_id, members, existing + + def _rename_mas_guests( + self, + client: httpx.Client, + admin_token: str, + room_id: str, + members: set[str], + existing: set[str], + ) -> MasGuestResult: + renamed = 0 + skipped = 0 + mas_usernames: set[str] = set() + users = self._mas_list_users(client, admin_token) + for user in users: + attrs = user.get("attributes") or {} + username = attrs.get("username") or "" + if isinstance(username, str) and username: + mas_usernames.add(username) + legacy_guest = attrs.get("legacy_guest") + if not isinstance(username, str) or not username: + skipped += 1 + continue + if not (legacy_guest or _needs_rename_username(username)): + skipped += 1 + continue + user_id = user.get("id") + if not isinstance(user_id, str) or not user_id: + skipped += 1 + continue + full_user = f"@{username}:{self._settings.comms_server_name}" + access_token, session_id = self._mas_personal_session(client, admin_token, user_id) + try: + display = self._get_displayname(client, access_token, full_user) + if display and not _needs_rename_display(display): + skipped += 1 + continue + new_name = self._pick_guest_name(existing) + if not new_name: + skipped += 1 + continue + self._set_displayname( + client, + access_token, + DisplayNameTarget( + room_id=room_id, + user_id=full_user, + name=new_name, + in_room=full_user in members, + ), + ) + renamed += 1 + finally: + self._mas_revoke_session(client, admin_token, session_id) + return MasGuestResult(renamed=renamed, skipped=skipped, usernames=mas_usernames) + + def _synapse_entries(self, client: httpx.Client, token: str) -> list[dict[str, Any]]: + try: + return self._synapse_list_users(client, token) + except Exception as exc: # noqa: BLE001 + logger.info( + "synapse admin list skipped", + extra={"event": "comms_guest_list", "status": "error", "detail": str(exc)}, + ) + return [] + + def _synapse_user_id(self, entry: dict[str, Any]) -> SynapseUserRef | None: + user_id = entry.get("name") or "" + if not isinstance(user_id, str) or not user_id.startswith("@"): + return None + localpart = user_id.split(":", 1)[0].lstrip("@") + return SynapseUserRef(entry=entry, user_id=user_id, localpart=localpart) + + def _maybe_prune_synapse_guest( + self, + client: httpx.Client, + token: str, + entry: dict[str, Any], + user_id: str, + now_ms: int, + ) -> bool: + if not entry.get("is_guest"): + return False + if not self._should_prune_guest(entry, now_ms): + return False + return self._prune_guest(client, token, user_id) + + def _needs_synapse_rename( + self, + client: httpx.Client, + token: str, + user: SynapseUserRef, + mas_usernames: set[str], + ) -> bool: + if user.localpart in mas_usernames: + return False + is_guest = user.entry.get("is_guest") + if not (is_guest or _needs_rename_username(user.localpart)): + return False + display = self._get_displayname_admin(client, token, user.user_id) + if display and not _needs_rename_display(display): + return False + return True + + def _rename_synapse_user(self, client: httpx.Client, token: str, existing: set[str], user_id: str) -> bool: + new_name = self._pick_guest_name(existing) + if not new_name: + return False + return self._set_displayname_admin(client, token, user_id, new_name) + + def _rename_synapse_guests( + self, + client: httpx.Client, + token: str, + existing: set[str], + mas_usernames: set[str], + ) -> SynapseGuestResult: + renamed = 0 + pruned = 0 + entries = self._synapse_entries(client, token) + + now_ms = int(self._time() * 1000) + for entry in entries: + user_ref = self._synapse_user_id(entry) + if not user_ref: + continue + if self._maybe_prune_synapse_guest(client, token, user_ref.entry, user_ref.user_id, now_ms): + pruned += 1 + continue + if not self._needs_synapse_rename(client, token, user_ref, mas_usernames): + continue + if self._rename_synapse_user(client, token, existing, user_ref.user_id): + renamed += 1 + return SynapseGuestResult(renamed=renamed, pruned=pruned) + + def run_guest_name_randomizer(self, wait: bool = True) -> dict[str, Any]: + self._validate_guest_name_settings() + + with self._client() as client: + admin_token = self._mas_admin_token(client) + seeder_id = self._mas_user_id(client, admin_token, self._settings.comms_seeder_user) + seeder_token, seeder_session = self._mas_personal_session(client, admin_token, seeder_id) + try: + room_id, members, existing = self._room_context(client, seeder_token) + mas_result = self._rename_mas_guests(client, admin_token, room_id, members, existing) + synapse_result = self._rename_synapse_guests( + client, + seeder_token, + existing, + mas_result.usernames, + ) + db_renamed = self._db_rename_numeric(existing) + finally: + self._mas_revoke_session(client, admin_token, seeder_session) + + renamed = mas_result.renamed + synapse_result.renamed + db_renamed + pruned = synapse_result.pruned + skipped = mas_result.skipped + processed = renamed + pruned + skipped + summary = CommsSummary(processed, renamed, pruned, skipped) + logger.info( + "comms guest name sync finished", + extra={ + "event": "comms_guest_name", + "status": "ok", + "processed": summary.processed, + "renamed": summary.renamed, + "pruned": summary.pruned, + "skipped": summary.skipped, + }, + ) + return {"status": "ok", **summary.__dict__} diff --git a/ariadne/services/comms_protocol.py b/ariadne/services/comms_protocol.py new file mode 100644 index 0000000..379d5cb --- /dev/null +++ b/ariadne/services/comms_protocol.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +HTTP_OK = 200 +HTTP_CREATED = 201 +HTTP_ACCEPTED = 202 +HTTP_NO_CONTENT = 204 +HTTP_NOT_FOUND = 404 +HTTP_CONFLICT = 409 + + +@dataclass(frozen=True) +class CommsSummary: + processed: int + renamed: int + pruned: int + skipped: int + detail: str = "" + + +@dataclass(frozen=True) +class MasGuestResult: + renamed: int + skipped: int + usernames: set[str] + + +@dataclass(frozen=True) +class SynapseGuestResult: + renamed: int + pruned: int + + +@dataclass(frozen=True) +class DisplayNameTarget: + room_id: str + user_id: str + name: str + in_room: bool + + +@dataclass(frozen=True) +class SynapseUserRef: + entry: dict[str, Any] + user_id: str + localpart: str + + +def _auth(token: str) -> dict[str, str]: + return {"Authorization": f"Bearer {token}"} + + +def _canon_user(user: str, server_name: str) -> str: + user = (user or "").strip() + if user.startswith("@") and ":" in user: + return user + user = user.lstrip("@") + if ":" in user: + return f"@{user}" + return f"@{user}:{server_name}" + + +def _needs_rename_username(username: str) -> bool: + return username.isdigit() or username.startswith("guest-") + + +def _needs_rename_display(display: str | None) -> bool: + if not display: + return True + return display.isdigit() or display.startswith("guest-") diff --git a/ariadne/services/comms_room_ops.py b/ariadne/services/comms_room_ops.py new file mode 100644 index 0000000..1d077f1 --- /dev/null +++ b/ariadne/services/comms_room_ops.py @@ -0,0 +1,389 @@ +from __future__ import annotations + +from typing import Any +import urllib.parse + +import httpx + +from ..utils.logging import get_logger +from .comms_protocol import ( + HTTP_ACCEPTED, + HTTP_CONFLICT, + HTTP_CREATED, + HTTP_NOT_FOUND, + HTTP_OK, + _auth, + _canon_user, +) + + +logger = get_logger(__name__) + + +class _CommsRoomOpsMixin: + def run_pin_invite(self, wait: bool = True) -> dict[str, Any]: + if not self._settings.comms_seeder_password: + raise RuntimeError("comms seeder password missing") + + with self._client() as client: + token = self._login(client, self._settings.comms_seeder_user, self._settings.comms_seeder_password) + room_id = self._resolve_alias(client, token, self._settings.comms_room_alias) + pinned = self._get_pinned(client, token, room_id) + for event_id in pinned: + event = self._get_event(client, token, room_id, event_id) + if event and (event.get("content") or {}).get("body") == self._settings.comms_pin_message: + return {"status": "ok", "detail": "already pinned"} + event_id = self._send_message(client, token, room_id, self._settings.comms_pin_message) + if not event_id: + return {"status": "error", "detail": "pin event_id missing"} + self._pin_message(client, token, room_id, event_id) + return {"status": "ok", "detail": "pinned"} + + def run_reset_room(self, wait: bool = True) -> dict[str, Any]: + if not self._settings.comms_seeder_password: + raise RuntimeError("comms seeder password missing") + + with self._client() as client: + token = self._login_with_retry(client, self._settings.comms_seeder_user, self._settings.comms_seeder_password) + old_room_id = self._resolve_alias(client, token, self._settings.comms_room_alias) + new_room_id = self._create_room(client, token, self._settings.comms_room_name) + self._set_room_state(client, token, new_room_id, "m.room.join_rules", {"join_rule": "public"}) + self._set_room_state(client, token, new_room_id, "m.room.guest_access", {"guest_access": "can_join"}) + self._set_room_state( + client, + token, + new_room_id, + "m.room.history_visibility", + {"history_visibility": "shared"}, + ) + self._set_room_state(client, token, new_room_id, "m.room.power_levels", self._power_levels()) + + self._delete_alias(client, token, self._settings.comms_room_alias) + self._put_alias(client, token, self._settings.comms_room_alias, new_room_id) + self._set_room_state( + client, + token, + new_room_id, + "m.room.canonical_alias", + {"alias": self._settings.comms_room_alias}, + ) + self._set_directory_visibility(client, token, new_room_id, "public") + + bot_user_id = _canon_user(self._settings.comms_bot_user, self._settings.comms_server_name) + self._invite_user(client, token, new_room_id, bot_user_id) + for uid in self._list_joined_members(client, token, old_room_id): + if uid == _canon_user(self._settings.comms_seeder_user, self._settings.comms_server_name): + continue + localpart = uid.split(":", 1)[0].lstrip("@") + if localpart.isdigit(): + continue + self._invite_user(client, token, new_room_id, uid) + + event_id = self._send_message(client, token, new_room_id, self._settings.comms_pin_message) + if not event_id: + raise RuntimeError("pin message event_id missing") + self._set_room_state(client, token, new_room_id, "m.room.pinned_events", {"pinned": [event_id]}) + + self._set_directory_visibility(client, token, old_room_id, "private") + self._set_room_state(client, token, old_room_id, "m.room.join_rules", {"join_rule": "invite"}) + self._set_room_state(client, token, old_room_id, "m.room.guest_access", {"guest_access": "forbidden"}) + self._set_room_state( + client, + token, + old_room_id, + "m.room.tombstone", + { + "body": "Othrys has been reset. Please join the new room.", + "replacement_room": new_room_id, + }, + ) + self._send_message( + client, + token, + old_room_id, + "Othrys was reset. Join the new room at https://live.bstein.dev/#/room/#othrys:live.bstein.dev?action=join", + ) + + return {"status": "ok", "detail": f"old_room_id={old_room_id} new_room_id={new_room_id}"} + + def run_seed_room(self, wait: bool = True) -> dict[str, Any]: + if not self._settings.comms_seeder_password or not self._settings.comms_bot_password: + raise RuntimeError("comms seeder/bot password missing") + + with self._client() as client: + token = self._login(client, self._settings.comms_seeder_user, self._settings.comms_seeder_password) + for user, password, admin in ( + (self._settings.comms_seeder_user, self._settings.comms_seeder_password, True), + (self._settings.comms_bot_user, self._settings.comms_bot_password, False), + ): + try: + self._ensure_user(client, token, user, password, admin) + except RuntimeError as exc: + message = str(exc) + if "You are not a server admin" in message: + logger.warning( + "comms seed room ensure skipped", + extra={"event": "comms_seed_room", "user": user, "detail": message}, + ) + continue + raise + room_id = self._ensure_room(client, token) + self._join_user(client, token, room_id, _canon_user(self._settings.comms_bot_user, self._settings.comms_server_name)) + self._join_all_locals(client, token, room_id) + return {"status": "ok", "detail": "room seeded"} + + def _login(self, client: httpx.Client, user: str, password: str) -> str: + resp = client.post( + f"{self._settings.comms_auth_base}/_matrix/client/v3/login", + json={ + "type": "m.login.password", + "identifier": {"type": "m.id.user", "user": _canon_user(user, self._settings.comms_server_name)}, + "password": password, + }, + ) + if resp.status_code != HTTP_OK: + raise RuntimeError(f"login failed: {resp.status_code} {resp.text}") + payload = resp.json() + token = payload.get("access_token") + if not isinstance(token, str) or not token: + raise RuntimeError("login missing token") + return token + + def _login_with_retry(self, client: httpx.Client, user: str, password: str) -> str: + last: Exception | None = None + for attempt in range(1, 6): + try: + return self._login(client, user, password) + except Exception as exc: # noqa: BLE001 + last = exc + self._sleep(attempt * 2) + raise RuntimeError(str(last) if last else "login failed") + + def _resolve_alias(self, client: httpx.Client, token: str, alias: str) -> str: + resp = client.get( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}", + headers=_auth(token), + ) + resp.raise_for_status() + payload = resp.json() + return payload["room_id"] + + def _get_pinned(self, client: httpx.Client, token: str, room_id: str) -> list[str]: + resp = client.get( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events", + headers=_auth(token), + ) + if resp.status_code == HTTP_NOT_FOUND: + return [] + resp.raise_for_status() + pinned = resp.json().get("pinned", []) + return [item for item in pinned if isinstance(item, str)] + + def _get_event(self, client: httpx.Client, token: str, room_id: str, event_id: str) -> dict[str, Any] | None: + resp = client.get( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/event/{urllib.parse.quote(event_id)}", + headers=_auth(token), + ) + if resp.status_code == HTTP_NOT_FOUND: + return None + resp.raise_for_status() + return resp.json() + + def _send_message(self, client: httpx.Client, token: str, room_id: str, body: str) -> str: + resp = client.post( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/send/m.room.message", + headers=_auth(token), + json={"msgtype": "m.text", "body": body}, + ) + resp.raise_for_status() + payload = resp.json() + event_id = payload.get("event_id") + return event_id if isinstance(event_id, str) else "" + + def _pin_message(self, client: httpx.Client, token: str, room_id: str, event_id: str) -> None: + resp = client.put( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events", + headers=_auth(token), + json={"pinned": [event_id]}, + ) + resp.raise_for_status() + + def _create_room(self, client: httpx.Client, token: str, name: str) -> str: + resp = client.post( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/createRoom", + headers=_auth(token), + json={"preset": "public_chat", "name": name, "room_version": "11"}, + ) + resp.raise_for_status() + return resp.json()["room_id"] + + def _set_room_state(self, client: httpx.Client, token: str, room_id: str, ev_type: str, content: dict[str, Any]) -> None: + resp = client.put( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/{ev_type}", + headers=_auth(token), + json=content, + ) + resp.raise_for_status() + + def _set_directory_visibility(self, client: httpx.Client, token: str, room_id: str, visibility: str) -> None: + resp = client.put( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/list/room/{urllib.parse.quote(room_id)}", + headers=_auth(token), + json={"visibility": visibility}, + ) + resp.raise_for_status() + + def _delete_alias(self, client: httpx.Client, token: str, alias: str) -> None: + resp = client.delete( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}", + headers=_auth(token), + ) + if resp.status_code in (HTTP_OK, HTTP_ACCEPTED, HTTP_NOT_FOUND): + return + resp.raise_for_status() + + def _put_alias(self, client: httpx.Client, token: str, alias: str, room_id: str) -> None: + resp = client.put( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}", + headers=_auth(token), + json={"room_id": room_id}, + ) + resp.raise_for_status() + + def _list_joined_members(self, client: httpx.Client, token: str, room_id: str) -> list[str]: + resp = client.get( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members?membership=join", + headers=_auth(token), + ) + resp.raise_for_status() + members = [] + for ev in resp.json().get("chunk", []) or []: + if ev.get("type") != "m.room.member": + continue + uid = ev.get("state_key") + if isinstance(uid, str) and uid.startswith("@"): + members.append(uid) + return members + + def _invite_user(self, client: httpx.Client, token: str, room_id: str, user_id: str) -> None: + resp = client.post( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/invite", + headers=_auth(token), + json={"user_id": user_id}, + ) + if resp.status_code in (HTTP_OK, HTTP_ACCEPTED): + return + resp.raise_for_status() + + def _power_levels(self) -> dict[str, Any]: + return { + "ban": 50, + "events": { + "m.room.avatar": 50, + "m.room.canonical_alias": 50, + "m.room.encryption": 100, + "m.room.history_visibility": 100, + "m.room.name": 50, + "m.room.power_levels": 100, + "m.room.server_acl": 100, + "m.room.tombstone": 100, + }, + "events_default": 0, + "historical": 100, + "invite": 50, + "kick": 50, + "m.call.invite": 50, + "redact": 50, + "state_default": 50, + "users": {_canon_user(self._settings.comms_seeder_user, self._settings.comms_server_name): 100}, + "users_default": 0, + } + + def _ensure_user(self, client: httpx.Client, token: str, localpart: str, password: str, admin: bool) -> None: + admin_token = self._admin_token(token) + user_id = _canon_user(localpart, self._settings.comms_server_name) + url = f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}" + resp = client.get(url, headers=_auth(admin_token)) + if resp.status_code == HTTP_OK: + return + payload = {"password": password, "admin": admin, "deactivated": False} + create = client.put(url, headers=_auth(admin_token), json=payload) + if create.status_code not in (HTTP_OK, HTTP_CREATED): + raise RuntimeError(f"create user {user_id} failed: {create.status_code} {create.text}") + + def _ensure_room(self, client: httpx.Client, token: str) -> str: + alias = self._settings.comms_room_alias + alias_enc = urllib.parse.quote(alias) + exists = client.get( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{alias_enc}", + headers=_auth(token), + ) + if exists.status_code == HTTP_OK: + room_id = exists.json()["room_id"] + else: + create = client.post( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/createRoom", + headers=_auth(token), + json={ + "preset": "public_chat", + "name": self._settings.comms_room_name, + "room_alias_name": alias.split(":", 1)[0].lstrip("#"), + "initial_state": [], + "power_level_content_override": { + "events_default": 0, + "users_default": 0, + "state_default": 50, + }, + }, + ) + if create.status_code not in (HTTP_OK, HTTP_CONFLICT): + raise RuntimeError(f"create room failed: {create.status_code} {create.text}") + exists = client.get( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{alias_enc}", + headers=_auth(token), + ) + room_id = exists.json()["room_id"] + + state_events = [ + ("m.room.join_rules", {"join_rule": "public"}), + ("m.room.guest_access", {"guest_access": "can_join"}), + ("m.room.history_visibility", {"history_visibility": "shared"}), + ("m.room.canonical_alias", {"alias": alias}), + ] + for ev_type, content in state_events: + client.put( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{room_id}/state/{ev_type}", + headers=_auth(token), + json=content, + ) + client.put( + f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/list/room/{room_id}", + headers=_auth(token), + json={"visibility": "public"}, + ) + return room_id + + def _join_user(self, client: httpx.Client, token: str, room_id: str, user_id: str) -> None: + admin_token = self._admin_token(token) + client.post( + f"{self._settings.comms_synapse_base}/_synapse/admin/v1/join/{urllib.parse.quote(room_id)}", + headers=_auth(admin_token), + json={"user_id": user_id}, + ) + + def _join_all_locals(self, client: httpx.Client, token: str, room_id: str) -> None: + users: list[str] = [] + from_token = None + admin_token = self._admin_token(token) + while True: + url = f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100" + if from_token: + url += f"&from={from_token}" + resp = client.get(url, headers=_auth(admin_token)) + payload = resp.json() + users.extend([u["name"] for u in payload.get("users", []) if isinstance(u, dict) and u.get("name")]) + from_token = payload.get("next_token") + if not from_token: + break + for uid in users: + self._join_user(client, token, room_id, uid) diff --git a/ci/loc_hygiene_waivers.tsv b/ci/loc_hygiene_waivers.tsv index d690dbc..0b36927 100644 --- a/ci/loc_hygiene_waivers.tsv +++ b/ci/loc_hygiene_waivers.tsv @@ -1,7 +1,6 @@ # path reason ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog ariadne/app.py split planned; Flask app bootstrap/routes currently co-located -ariadne/services/comms.py split planned; comms adapters still consolidated ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile tests/test_services.py test module split planned; broad service contract coverage retained meanwhile