refactor(ariadne): split comms service helpers

This commit is contained in:
codex 2026-04-21 01:36:40 -03:00
parent 7d9b649a43
commit c11996d860
5 changed files with 967 additions and 903 deletions

View File

@ -1,94 +1,25 @@
from __future__ import annotations
from dataclasses import dataclass
import base64
import time
import urllib.parse
from typing import Any
import httpx
import psycopg
from ..settings import settings
from ..utils.logging import get_logger
from ..utils.name_generator import NameGenerator
from .comms_guest_names import _CommsGuestNameMixin
from .comms_protocol import _canon_user
from .comms_room_ops import _CommsRoomOpsMixin
logger = get_logger(__name__)
class CommsService(_CommsGuestNameMixin, _CommsRoomOpsMixin):
"""Maintain Matrix/MAS guest naming and room hygiene.
HTTP_OK = 200
HTTP_CREATED = 201
HTTP_ACCEPTED = 202
HTTP_NO_CONTENT = 204
HTTP_BAD_REQUEST = 400
HTTP_NOT_FOUND = 404
HTTP_CONFLICT = 409
@dataclass(frozen=True)
class CommsSummary:
processed: int
renamed: int
pruned: int
skipped: int
detail: str = ""
@dataclass(frozen=True)
class MasGuestResult:
renamed: int
skipped: int
usernames: set[str]
@dataclass(frozen=True)
class SynapseGuestResult:
renamed: int
pruned: int
@dataclass(frozen=True)
class DisplayNameTarget:
room_id: str
user_id: str
name: str
in_room: bool
@dataclass(frozen=True)
class SynapseUserRef:
entry: dict[str, Any]
user_id: str
localpart: str
def _auth(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}"}
def _canon_user(user: str, server_name: str) -> str:
user = (user or "").strip()
if user.startswith("@") and ":" in user:
return user
user = user.lstrip("@")
if ":" in user:
return f"@{user}"
return f"@{user}:{server_name}"
def _needs_rename_username(username: str) -> bool:
return username.isdigit() or username.startswith("guest-")
def _needs_rename_display(display: str | None) -> bool:
if not display:
return True
return display.isdigit() or display.startswith("guest-")
class CommsService:
"""Maintain Matrix/MAS guest naming and pruning hygiene."""
Inputs: Matrix/MAS endpoints, service credentials, and optional database access
from settings. Outputs: scheduled maintenance actions plus small status dicts
for scheduler logging.
"""
def __init__(
self,
@ -98,6 +29,10 @@ class CommsService:
self._client_factory = client_factory
self._name_generator = name_generator or NameGenerator()
@property
def _settings(self) -> Any:
return settings
def _pick_guest_name(self, existing: set[str]) -> str | None:
return self._name_generator.unique(existing)
@ -108,838 +43,22 @@ class CommsService:
token = getattr(settings, "comms_synapse_admin_token", "")
return token if token else fallback
def _mas_admin_token(self, client: httpx.Client) -> str:
if not settings.comms_mas_admin_client_id or not settings.comms_mas_admin_client_secret:
raise RuntimeError("mas admin client credentials missing")
basic = base64.b64encode(
f"{settings.comms_mas_admin_client_id}:{settings.comms_mas_admin_client_secret}".encode()
).decode()
last_err: Exception | None = None
for attempt in range(5):
try:
resp = client.post(
settings.comms_mas_token_url,
headers={"Authorization": f"Basic {basic}"},
data={"grant_type": "client_credentials", "scope": "urn:mas:admin"},
)
resp.raise_for_status()
payload = resp.json()
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise RuntimeError("missing mas access token")
return token
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(2**attempt)
raise RuntimeError(str(last_err) if last_err else "mas admin token failed")
def _mas_user_id(self, client: httpx.Client, token: str, username: str) -> str:
url = f"{settings.comms_mas_admin_api_base}/users/by-username/{urllib.parse.quote(username)}"
resp = client.get(url, headers=_auth(token))
resp.raise_for_status()
payload = resp.json()
return payload["data"]["id"]
def _mas_personal_session(self, client: httpx.Client, token: str, user_id: str) -> tuple[str, str]:
resp = client.post(
f"{settings.comms_mas_admin_api_base}/personal-sessions",
headers=_auth(token),
json={
"actor_user_id": user_id,
"human_name": "guest-name-randomizer",
"scope": "urn:matrix:client:api:*",
"expires_in": 300,
},
)
resp.raise_for_status()
payload = resp.json().get("data", {})
session_id = payload.get("id")
attrs = (payload.get("attributes") or {}) if isinstance(payload, dict) else {}
access_token = attrs.get("access_token")
if not isinstance(access_token, str) or not isinstance(session_id, str):
raise RuntimeError("invalid personal session response")
return access_token, session_id
def _mas_revoke_session(self, client: httpx.Client, token: str, session_id: str) -> None:
try:
client.post(
f"{settings.comms_mas_admin_api_base}/personal-sessions/{urllib.parse.quote(session_id)}/revoke",
headers=_auth(token),
json={},
)
except Exception:
return
def _resolve_alias(self, client: httpx.Client, token: str, alias: str) -> str:
resp = client.get(
f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}",
headers=_auth(token),
)
resp.raise_for_status()
payload = resp.json()
return payload["room_id"]
def _room_members(self, client: httpx.Client, token: str, room_id: str) -> tuple[set[str], set[str]]:
resp = client.get(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members",
headers=_auth(token),
)
resp.raise_for_status()
payload = resp.json()
members: set[str] = set()
existing: set[str] = set()
for ev in payload.get("chunk", []) or []:
user_id = ev.get("state_key")
if isinstance(user_id, str) and user_id:
members.add(user_id)
display = (ev.get("content") or {}).get("displayname")
if isinstance(display, str) and display:
existing.add(display)
return members, existing
def _mas_list_users(self, client: httpx.Client, token: str) -> list[dict[str, Any]]:
users: list[dict[str, Any]] = []
cursor = None
while True:
url = f"{settings.comms_mas_admin_api_base}/users?page[size]=100"
if cursor:
url += f"&page[after]={urllib.parse.quote(cursor)}"
resp = client.get(url, headers=_auth(token))
resp.raise_for_status()
payload = resp.json()
data = payload.get("data") or []
if not isinstance(data, list) or not data:
break
users.extend([item for item in data if isinstance(item, dict)])
last = data[-1]
cursor = (
last.get("meta", {})
if isinstance(last, dict)
else {}
).get("page", {}).get("cursor")
if not cursor:
break
return users
def _synapse_list_users(self, client: httpx.Client, token: str) -> list[dict[str, Any]]:
users: list[dict[str, Any]] = []
from_token = None
admin_token = self._admin_token(token)
while True:
url = "{}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100".format(
settings.comms_synapse_base
)
if from_token:
url += f"&from={urllib.parse.quote(from_token)}"
resp = client.get(url, headers=_auth(admin_token))
resp.raise_for_status()
payload = resp.json()
users.extend([item for item in payload.get("users", []) if isinstance(item, dict)])
from_token = payload.get("next_token")
if not from_token:
break
return users
def _should_prune_guest(self, entry: dict[str, Any], now_ms: int) -> bool:
if not entry.get("is_guest"):
return False
last_seen = entry.get("last_seen_ts")
if last_seen is None:
return False
try:
last_seen = int(last_seen)
except (TypeError, ValueError):
return False
stale_ms = int(settings.comms_guest_stale_days) * 24 * 60 * 60 * 1000
return now_ms - last_seen > stale_ms
def _prune_guest(self, client: httpx.Client, token: str, user_id: str) -> bool:
admin_token = self._admin_token(token)
try:
resp = client.delete(
f"{settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}",
headers=_auth(admin_token),
params={"erase": "true"},
)
except Exception as exc: # noqa: BLE001
logger.info(
"guest prune failed",
extra={"event": "comms_guest_prune", "status": "error", "detail": str(exc)},
)
return False
if resp.status_code in (HTTP_OK, HTTP_ACCEPTED, HTTP_NO_CONTENT, HTTP_NOT_FOUND):
return True
logger.info(
"guest prune failed",
extra={
"event": "comms_guest_prune",
"status": "error",
"detail": f"{resp.status_code} {resp.text}",
},
)
return False
def _get_displayname(self, client: httpx.Client, token: str, user_id: str) -> str | None:
resp = client.get(
f"{settings.comms_synapse_base}/_matrix/client/v3/profile/{urllib.parse.quote(user_id)}",
headers=_auth(token),
)
resp.raise_for_status()
return resp.json().get("displayname")
def _get_displayname_admin(self, client: httpx.Client, token: str, user_id: str) -> str | None:
admin_token = self._admin_token(token)
resp = client.get(
f"{settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}",
headers=_auth(admin_token),
)
if resp.status_code == HTTP_NOT_FOUND:
return None
resp.raise_for_status()
return resp.json().get("displayname")
def _set_displayname(
self,
client: httpx.Client,
token: str,
target: DisplayNameTarget,
) -> None:
resp = client.put(
f"{settings.comms_synapse_base}/_matrix/client/v3/profile/{urllib.parse.quote(target.user_id)}/displayname",
headers=_auth(token),
json={"displayname": target.name},
)
resp.raise_for_status()
if not target.in_room:
return
state_url = (
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(target.room_id)}"
f"/state/m.room.member/{urllib.parse.quote(target.user_id)}"
)
client.put(
state_url,
headers=_auth(token),
json={"membership": "join", "displayname": target.name},
)
def _set_displayname_admin(self, client: httpx.Client, token: str, user_id: str, name: str) -> bool:
admin_token = self._admin_token(token)
resp = client.put(
f"{settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}",
headers=_auth(admin_token),
json={"displayname": name},
)
return resp.status_code in (HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT)
def _db_rename_numeric(self, existing: set[str]) -> int:
if not settings.comms_synapse_db_password:
return 0
renamed = 0
conn = psycopg.connect(
def _connect_synapse_db(self) -> Any:
return psycopg.connect(
host=settings.comms_synapse_db_host,
port=settings.comms_synapse_db_port,
dbname=settings.comms_synapse_db_name,
user=settings.comms_synapse_db_user,
password=settings.comms_synapse_db_password,
)
try:
with conn:
with conn.cursor() as cur:
pattern = f"^@\\d+:{settings.comms_server_name}$"
cur.execute(
"SELECT user_id, full_user_id, displayname FROM profiles WHERE full_user_id ~ %s",
(pattern,),
)
profile_rows = cur.fetchall()
profile_index = {row[1]: row for row in profile_rows}
for _user_id, full_user_id, display in profile_rows:
if display and not _needs_rename_display(display):
continue
new_name = self._pick_guest_name(existing)
if not new_name:
continue
cur.execute(
"UPDATE profiles SET displayname = %s WHERE full_user_id = %s",
(new_name, full_user_id),
)
renamed += 1
cur.execute(
"SELECT name FROM users WHERE name ~ %s",
(pattern,),
)
users = [row[0] for row in cur.fetchall()]
if not users:
return renamed
cur.execute(
"SELECT user_id, full_user_id FROM profiles WHERE full_user_id = ANY(%s)",
(users,),
)
for existing_full in cur.fetchall():
profile_index.setdefault(existing_full[1], existing_full)
def _sleep(self, seconds: float) -> None:
time.sleep(seconds)
for full_user_id in users:
if full_user_id in profile_index:
continue
localpart = full_user_id.split(":", 1)[0].lstrip("@")
new_name = self._pick_guest_name(existing)
if not new_name:
continue
cur.execute(
"INSERT INTO profiles (user_id, displayname, full_user_id) VALUES (%s, %s, %s) "
"ON CONFLICT (full_user_id) DO UPDATE SET displayname = EXCLUDED.displayname",
(localpart, new_name, full_user_id),
)
renamed += 1
finally:
conn.close()
return renamed
def _validate_guest_name_settings(self) -> None:
if not settings.comms_mas_admin_client_id or not settings.comms_mas_admin_client_secret:
raise RuntimeError("comms mas admin secret missing")
if not settings.comms_synapse_base:
raise RuntimeError("comms synapse base missing")
def _room_context(self, client: httpx.Client, token: str) -> tuple[str, set[str], set[str]]:
room_id = self._resolve_alias(client, token, settings.comms_room_alias)
members, existing = self._room_members(client, token, room_id)
return room_id, members, existing
def _rename_mas_guests(
self,
client: httpx.Client,
admin_token: str,
room_id: str,
members: set[str],
existing: set[str],
) -> MasGuestResult:
renamed = 0
skipped = 0
mas_usernames: set[str] = set()
users = self._mas_list_users(client, admin_token)
for user in users:
attrs = user.get("attributes") or {}
username = attrs.get("username") or ""
if isinstance(username, str) and username:
mas_usernames.add(username)
legacy_guest = attrs.get("legacy_guest")
if not isinstance(username, str) or not username:
skipped += 1
continue
if not (legacy_guest or _needs_rename_username(username)):
skipped += 1
continue
user_id = user.get("id")
if not isinstance(user_id, str) or not user_id:
skipped += 1
continue
full_user = f"@{username}:{settings.comms_server_name}"
access_token, session_id = self._mas_personal_session(client, admin_token, user_id)
try:
display = self._get_displayname(client, access_token, full_user)
if display and not _needs_rename_display(display):
skipped += 1
continue
new_name = self._pick_guest_name(existing)
if not new_name:
skipped += 1
continue
self._set_displayname(
client,
access_token,
DisplayNameTarget(
room_id=room_id,
user_id=full_user,
name=new_name,
in_room=full_user in members,
),
)
renamed += 1
finally:
self._mas_revoke_session(client, admin_token, session_id)
return MasGuestResult(renamed=renamed, skipped=skipped, usernames=mas_usernames)
def _synapse_entries(self, client: httpx.Client, token: str) -> list[dict[str, Any]]:
try:
return self._synapse_list_users(client, token)
except Exception as exc: # noqa: BLE001
logger.info(
"synapse admin list skipped",
extra={"event": "comms_guest_list", "status": "error", "detail": str(exc)},
)
return []
def _synapse_user_id(self, entry: dict[str, Any]) -> SynapseUserRef | None:
user_id = entry.get("name") or ""
if not isinstance(user_id, str) or not user_id.startswith("@"):
return None
localpart = user_id.split(":", 1)[0].lstrip("@")
return SynapseUserRef(entry=entry, user_id=user_id, localpart=localpart)
def _maybe_prune_synapse_guest(
self,
client: httpx.Client,
token: str,
entry: dict[str, Any],
user_id: str,
now_ms: int,
) -> bool:
if not entry.get("is_guest"):
return False
if not self._should_prune_guest(entry, now_ms):
return False
return self._prune_guest(client, token, user_id)
def _needs_synapse_rename(
self,
client: httpx.Client,
token: str,
user: SynapseUserRef,
mas_usernames: set[str],
) -> bool:
if user.localpart in mas_usernames:
return False
is_guest = user.entry.get("is_guest")
if not (is_guest or _needs_rename_username(user.localpart)):
return False
display = self._get_displayname_admin(client, token, user.user_id)
if display and not _needs_rename_display(display):
return False
return True
def _rename_synapse_user(
self,
client: httpx.Client,
token: str,
existing: set[str],
user_id: str,
) -> bool:
new_name = self._pick_guest_name(existing)
if not new_name:
return False
return self._set_displayname_admin(client, token, user_id, new_name)
def _rename_synapse_guests(
self,
client: httpx.Client,
token: str,
existing: set[str],
mas_usernames: set[str],
) -> SynapseGuestResult:
renamed = 0
pruned = 0
entries = self._synapse_entries(client, token)
now_ms = int(time.time() * 1000)
for entry in entries:
user_ref = self._synapse_user_id(entry)
if not user_ref:
continue
if self._maybe_prune_synapse_guest(client, token, user_ref.entry, user_ref.user_id, now_ms):
pruned += 1
continue
if not self._needs_synapse_rename(client, token, user_ref, mas_usernames):
continue
if self._rename_synapse_user(client, token, existing, user_ref.user_id):
renamed += 1
return SynapseGuestResult(renamed=renamed, pruned=pruned)
def run_guest_name_randomizer(self, wait: bool = True) -> dict[str, Any]:
self._validate_guest_name_settings()
with self._client() as client:
admin_token = self._mas_admin_token(client)
seeder_id = self._mas_user_id(client, admin_token, settings.comms_seeder_user)
seeder_token, seeder_session = self._mas_personal_session(client, admin_token, seeder_id)
try:
room_id, members, existing = self._room_context(client, seeder_token)
mas_result = self._rename_mas_guests(client, admin_token, room_id, members, existing)
synapse_result = self._rename_synapse_guests(
client,
seeder_token,
existing,
mas_result.usernames,
)
db_renamed = self._db_rename_numeric(existing)
finally:
self._mas_revoke_session(client, admin_token, seeder_session)
renamed = mas_result.renamed + synapse_result.renamed + db_renamed
pruned = synapse_result.pruned
skipped = mas_result.skipped
processed = renamed + pruned + skipped
summary = CommsSummary(processed, renamed, pruned, skipped)
logger.info(
"comms guest name sync finished",
extra={
"event": "comms_guest_name",
"status": "ok",
"processed": summary.processed,
"renamed": summary.renamed,
"pruned": summary.pruned,
"skipped": summary.skipped,
},
)
return {"status": "ok", **summary.__dict__}
def run_pin_invite(self, wait: bool = True) -> dict[str, Any]:
if not settings.comms_seeder_password:
raise RuntimeError("comms seeder password missing")
with self._client() as client:
token = self._login(client, settings.comms_seeder_user, settings.comms_seeder_password)
room_id = self._resolve_alias(client, token, settings.comms_room_alias)
pinned = self._get_pinned(client, token, room_id)
for event_id in pinned:
event = self._get_event(client, token, room_id, event_id)
if event and (event.get("content") or {}).get("body") == settings.comms_pin_message:
return {"status": "ok", "detail": "already pinned"}
event_id = self._send_message(client, token, room_id, settings.comms_pin_message)
if not event_id:
return {"status": "error", "detail": "pin event_id missing"}
self._pin_message(client, token, room_id, event_id)
return {"status": "ok", "detail": "pinned"}
def run_reset_room(self, wait: bool = True) -> dict[str, Any]:
if not settings.comms_seeder_password:
raise RuntimeError("comms seeder password missing")
with self._client() as client:
token = self._login_with_retry(client, settings.comms_seeder_user, settings.comms_seeder_password)
old_room_id = self._resolve_alias(client, token, settings.comms_room_alias)
new_room_id = self._create_room(client, token, settings.comms_room_name)
self._set_room_state(client, token, new_room_id, "m.room.join_rules", {"join_rule": "public"})
self._set_room_state(client, token, new_room_id, "m.room.guest_access", {"guest_access": "can_join"})
self._set_room_state(
client,
token,
new_room_id,
"m.room.history_visibility",
{"history_visibility": "shared"},
)
self._set_room_state(client, token, new_room_id, "m.room.power_levels", self._power_levels())
self._delete_alias(client, token, settings.comms_room_alias)
self._put_alias(client, token, settings.comms_room_alias, new_room_id)
self._set_room_state(
client,
token,
new_room_id,
"m.room.canonical_alias",
{"alias": settings.comms_room_alias},
)
self._set_directory_visibility(client, token, new_room_id, "public")
bot_user_id = _canon_user(settings.comms_bot_user, settings.comms_server_name)
self._invite_user(client, token, new_room_id, bot_user_id)
for uid in self._list_joined_members(client, token, old_room_id):
if uid == _canon_user(settings.comms_seeder_user, settings.comms_server_name):
continue
localpart = uid.split(":", 1)[0].lstrip("@")
if localpart.isdigit():
continue
self._invite_user(client, token, new_room_id, uid)
event_id = self._send_message(client, token, new_room_id, settings.comms_pin_message)
if not event_id:
raise RuntimeError("pin message event_id missing")
self._set_room_state(client, token, new_room_id, "m.room.pinned_events", {"pinned": [event_id]})
self._set_directory_visibility(client, token, old_room_id, "private")
self._set_room_state(client, token, old_room_id, "m.room.join_rules", {"join_rule": "invite"})
self._set_room_state(client, token, old_room_id, "m.room.guest_access", {"guest_access": "forbidden"})
self._set_room_state(
client,
token,
old_room_id,
"m.room.tombstone",
{
"body": "Othrys has been reset. Please join the new room.",
"replacement_room": new_room_id,
},
)
self._send_message(
client,
token,
old_room_id,
"Othrys was reset. Join the new room at https://live.bstein.dev/#/room/#othrys:live.bstein.dev?action=join",
)
return {"status": "ok", "detail": f"old_room_id={old_room_id} new_room_id={new_room_id}"}
def run_seed_room(self, wait: bool = True) -> dict[str, Any]:
if not settings.comms_seeder_password or not settings.comms_bot_password:
raise RuntimeError("comms seeder/bot password missing")
with self._client() as client:
token = self._login(client, settings.comms_seeder_user, settings.comms_seeder_password)
for user, password, admin in (
(settings.comms_seeder_user, settings.comms_seeder_password, True),
(settings.comms_bot_user, settings.comms_bot_password, False),
):
try:
self._ensure_user(client, token, user, password, admin)
except RuntimeError as exc:
message = str(exc)
if "You are not a server admin" in message:
logger.warning(
"comms seed room ensure skipped",
extra={"event": "comms_seed_room", "user": user, "detail": message},
)
continue
raise
room_id = self._ensure_room(client, token)
self._join_user(client, token, room_id, _canon_user(settings.comms_bot_user, settings.comms_server_name))
self._join_all_locals(client, token, room_id)
return {"status": "ok", "detail": "room seeded"}
def _login(self, client: httpx.Client, user: str, password: str) -> str:
resp = client.post(
f"{settings.comms_auth_base}/_matrix/client/v3/login",
json={
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": _canon_user(user, settings.comms_server_name)},
"password": password,
},
)
if resp.status_code != HTTP_OK:
raise RuntimeError(f"login failed: {resp.status_code} {resp.text}")
payload = resp.json()
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise RuntimeError("login missing token")
return token
def _login_with_retry(self, client: httpx.Client, user: str, password: str) -> str:
last: Exception | None = None
for attempt in range(1, 6):
try:
return self._login(client, user, password)
except Exception as exc: # noqa: BLE001
last = exc
time.sleep(attempt * 2)
raise RuntimeError(str(last) if last else "login failed")
def _get_pinned(self, client: httpx.Client, token: str, room_id: str) -> list[str]:
resp = client.get(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events",
headers=_auth(token),
)
if resp.status_code == HTTP_NOT_FOUND:
return []
resp.raise_for_status()
pinned = resp.json().get("pinned", [])
return [item for item in pinned if isinstance(item, str)]
def _get_event(self, client: httpx.Client, token: str, room_id: str, event_id: str) -> dict[str, Any] | None:
resp = client.get(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/event/{urllib.parse.quote(event_id)}",
headers=_auth(token),
)
if resp.status_code == HTTP_NOT_FOUND:
return None
resp.raise_for_status()
return resp.json()
def _send_message(self, client: httpx.Client, token: str, room_id: str, body: str) -> str:
resp = client.post(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/send/m.room.message",
headers=_auth(token),
json={"msgtype": "m.text", "body": body},
)
resp.raise_for_status()
payload = resp.json()
event_id = payload.get("event_id")
return event_id if isinstance(event_id, str) else ""
def _pin_message(self, client: httpx.Client, token: str, room_id: str, event_id: str) -> None:
resp = client.put(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events",
headers=_auth(token),
json={"pinned": [event_id]},
)
resp.raise_for_status()
def _create_room(self, client: httpx.Client, token: str, name: str) -> str:
resp = client.post(
f"{settings.comms_synapse_base}/_matrix/client/v3/createRoom",
headers=_auth(token),
json={"preset": "public_chat", "name": name, "room_version": "11"},
)
resp.raise_for_status()
return resp.json()["room_id"]
def _set_room_state(self, client: httpx.Client, token: str, room_id: str, ev_type: str, content: dict[str, Any]) -> None:
resp = client.put(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/{ev_type}",
headers=_auth(token),
json=content,
)
resp.raise_for_status()
def _set_directory_visibility(self, client: httpx.Client, token: str, room_id: str, visibility: str) -> None:
resp = client.put(
f"{settings.comms_synapse_base}/_matrix/client/v3/directory/list/room/{urllib.parse.quote(room_id)}",
headers=_auth(token),
json={"visibility": visibility},
)
resp.raise_for_status()
def _delete_alias(self, client: httpx.Client, token: str, alias: str) -> None:
resp = client.delete(
f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}",
headers=_auth(token),
)
if resp.status_code in (HTTP_OK, HTTP_ACCEPTED, HTTP_NOT_FOUND):
return
resp.raise_for_status()
def _put_alias(self, client: httpx.Client, token: str, alias: str, room_id: str) -> None:
resp = client.put(
f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}",
headers=_auth(token),
json={"room_id": room_id},
)
resp.raise_for_status()
def _list_joined_members(self, client: httpx.Client, token: str, room_id: str) -> list[str]:
resp = client.get(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members?membership=join",
headers=_auth(token),
)
resp.raise_for_status()
members = []
for ev in resp.json().get("chunk", []) or []:
if ev.get("type") != "m.room.member":
continue
uid = ev.get("state_key")
if isinstance(uid, str) and uid.startswith("@"):
members.append(uid)
return members
def _invite_user(self, client: httpx.Client, token: str, room_id: str, user_id: str) -> None:
resp = client.post(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/invite",
headers=_auth(token),
json={"user_id": user_id},
)
if resp.status_code in (HTTP_OK, HTTP_ACCEPTED):
return
resp.raise_for_status()
def _power_levels(self) -> dict[str, Any]:
return {
"ban": 50,
"events": {
"m.room.avatar": 50,
"m.room.canonical_alias": 50,
"m.room.encryption": 100,
"m.room.history_visibility": 100,
"m.room.name": 50,
"m.room.power_levels": 100,
"m.room.server_acl": 100,
"m.room.tombstone": 100,
},
"events_default": 0,
"historical": 100,
"invite": 50,
"kick": 50,
"m.call.invite": 50,
"redact": 50,
"state_default": 50,
"users": { _canon_user(settings.comms_seeder_user, settings.comms_server_name): 100 },
"users_default": 0,
}
def _ensure_user(self, client: httpx.Client, token: str, localpart: str, password: str, admin: bool) -> None:
admin_token = self._admin_token(token)
user_id = _canon_user(localpart, settings.comms_server_name)
url = f"{settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}"
resp = client.get(url, headers=_auth(admin_token))
if resp.status_code == HTTP_OK:
return
payload = {"password": password, "admin": admin, "deactivated": False}
create = client.put(url, headers=_auth(admin_token), json=payload)
if create.status_code not in (HTTP_OK, HTTP_CREATED):
raise RuntimeError(f"create user {user_id} failed: {create.status_code} {create.text}")
def _ensure_room(self, client: httpx.Client, token: str) -> str:
alias = settings.comms_room_alias
alias_enc = urllib.parse.quote(alias)
exists = client.get(
f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{alias_enc}",
headers=_auth(token),
)
if exists.status_code == HTTP_OK:
room_id = exists.json()["room_id"]
else:
create = client.post(
f"{settings.comms_synapse_base}/_matrix/client/v3/createRoom",
headers=_auth(token),
json={
"preset": "public_chat",
"name": settings.comms_room_name,
"room_alias_name": alias.split(":", 1)[0].lstrip("#"),
"initial_state": [],
"power_level_content_override": {
"events_default": 0,
"users_default": 0,
"state_default": 50,
},
},
)
if create.status_code not in (HTTP_OK, HTTP_CONFLICT):
raise RuntimeError(f"create room failed: {create.status_code} {create.text}")
exists = client.get(
f"{settings.comms_synapse_base}/_matrix/client/v3/directory/room/{alias_enc}",
headers=_auth(token),
)
room_id = exists.json()["room_id"]
state_events = [
("m.room.join_rules", {"join_rule": "public"}),
("m.room.guest_access", {"guest_access": "can_join"}),
("m.room.history_visibility", {"history_visibility": "shared"}),
("m.room.canonical_alias", {"alias": alias}),
]
for ev_type, content in state_events:
client.put(
f"{settings.comms_synapse_base}/_matrix/client/v3/rooms/{room_id}/state/{ev_type}",
headers=_auth(token),
json=content,
)
client.put(
f"{settings.comms_synapse_base}/_matrix/client/v3/directory/list/room/{room_id}",
headers=_auth(token),
json={"visibility": "public"},
)
return room_id
def _join_user(self, client: httpx.Client, token: str, room_id: str, user_id: str) -> None:
admin_token = self._admin_token(token)
client.post(
f"{settings.comms_synapse_base}/_synapse/admin/v1/join/{urllib.parse.quote(room_id)}",
headers=_auth(admin_token),
json={"user_id": user_id},
)
def _join_all_locals(self, client: httpx.Client, token: str, room_id: str) -> None:
users: list[str] = []
from_token = None
admin_token = self._admin_token(token)
while True:
url = f"{settings.comms_synapse_base}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"
if from_token:
url += f"&from={from_token}"
resp = client.get(url, headers=_auth(admin_token))
payload = resp.json()
users.extend([u["name"] for u in payload.get("users", []) if isinstance(u, dict) and u.get("name")])
from_token = payload.get("next_token")
if not from_token:
break
for uid in users:
self._join_user(client, token, room_id, uid)
def _time(self) -> float:
return time.time()
comms = CommsService()
__all__ = ["CommsService", "_canon_user", "comms", "psycopg", "settings"]

View File

@ -0,0 +1,485 @@
from __future__ import annotations
import base64
from typing import Any
import urllib.parse
import httpx
from ..utils.logging import get_logger
from .comms_protocol import (
HTTP_ACCEPTED,
HTTP_CREATED,
HTTP_NO_CONTENT,
HTTP_NOT_FOUND,
HTTP_OK,
CommsSummary,
DisplayNameTarget,
MasGuestResult,
SynapseGuestResult,
SynapseUserRef,
_auth,
_needs_rename_display,
_needs_rename_username,
)
logger = get_logger(__name__)
class _CommsGuestNameMixin:
def _mas_admin_token(self, client: httpx.Client) -> str:
settings = self._settings
if not settings.comms_mas_admin_client_id or not settings.comms_mas_admin_client_secret:
raise RuntimeError("mas admin client credentials missing")
basic = base64.b64encode(
f"{settings.comms_mas_admin_client_id}:{settings.comms_mas_admin_client_secret}".encode()
).decode()
last_err: Exception | None = None
for attempt in range(5):
try:
resp = client.post(
settings.comms_mas_token_url,
headers={"Authorization": f"Basic {basic}"},
data={"grant_type": "client_credentials", "scope": "urn:mas:admin"},
)
resp.raise_for_status()
payload = resp.json()
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise RuntimeError("missing mas access token")
return token
except Exception as exc: # noqa: BLE001
last_err = exc
self._sleep(2**attempt)
raise RuntimeError(str(last_err) if last_err else "mas admin token failed")
def _mas_user_id(self, client: httpx.Client, token: str, username: str) -> str:
url = f"{self._settings.comms_mas_admin_api_base}/users/by-username/{urllib.parse.quote(username)}"
resp = client.get(url, headers=_auth(token))
resp.raise_for_status()
payload = resp.json()
return payload["data"]["id"]
def _mas_personal_session(self, client: httpx.Client, token: str, user_id: str) -> tuple[str, str]:
resp = client.post(
f"{self._settings.comms_mas_admin_api_base}/personal-sessions",
headers=_auth(token),
json={
"actor_user_id": user_id,
"human_name": "guest-name-randomizer",
"scope": "urn:matrix:client:api:*",
"expires_in": 300,
},
)
resp.raise_for_status()
payload = resp.json().get("data", {})
session_id = payload.get("id")
attrs = (payload.get("attributes") or {}) if isinstance(payload, dict) else {}
access_token = attrs.get("access_token")
if not isinstance(access_token, str) or not isinstance(session_id, str):
raise RuntimeError("invalid personal session response")
return access_token, session_id
def _mas_revoke_session(self, client: httpx.Client, token: str, session_id: str) -> None:
try:
client.post(
f"{self._settings.comms_mas_admin_api_base}/personal-sessions/{urllib.parse.quote(session_id)}/revoke",
headers=_auth(token),
json={},
)
except Exception:
return
def _room_members(self, client: httpx.Client, token: str, room_id: str) -> tuple[set[str], set[str]]:
resp = client.get(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members",
headers=_auth(token),
)
resp.raise_for_status()
payload = resp.json()
members: set[str] = set()
existing: set[str] = set()
for ev in payload.get("chunk", []) or []:
user_id = ev.get("state_key")
if isinstance(user_id, str) and user_id:
members.add(user_id)
display = (ev.get("content") or {}).get("displayname")
if isinstance(display, str) and display:
existing.add(display)
return members, existing
def _mas_list_users(self, client: httpx.Client, token: str) -> list[dict[str, Any]]:
users: list[dict[str, Any]] = []
cursor = None
while True:
url = f"{self._settings.comms_mas_admin_api_base}/users?page[size]=100"
if cursor:
url += f"&page[after]={urllib.parse.quote(cursor)}"
resp = client.get(url, headers=_auth(token))
resp.raise_for_status()
payload = resp.json()
data = payload.get("data") or []
if not isinstance(data, list) or not data:
break
users.extend([item for item in data if isinstance(item, dict)])
last = data[-1]
cursor = (
last.get("meta", {})
if isinstance(last, dict)
else {}
).get("page", {}).get("cursor")
if not cursor:
break
return users
def _synapse_list_users(self, client: httpx.Client, token: str) -> list[dict[str, Any]]:
users: list[dict[str, Any]] = []
from_token = None
admin_token = self._admin_token(token)
while True:
url = "{}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100".format(
self._settings.comms_synapse_base
)
if from_token:
url += f"&from={urllib.parse.quote(from_token)}"
resp = client.get(url, headers=_auth(admin_token))
resp.raise_for_status()
payload = resp.json()
users.extend([item for item in payload.get("users", []) if isinstance(item, dict)])
from_token = payload.get("next_token")
if not from_token:
break
return users
def _should_prune_guest(self, entry: dict[str, Any], now_ms: int) -> bool:
if not entry.get("is_guest"):
return False
last_seen = entry.get("last_seen_ts")
if last_seen is None:
return False
try:
last_seen = int(last_seen)
except (TypeError, ValueError):
return False
stale_ms = int(self._settings.comms_guest_stale_days) * 24 * 60 * 60 * 1000
return now_ms - last_seen > stale_ms
def _prune_guest(self, client: httpx.Client, token: str, user_id: str) -> bool:
admin_token = self._admin_token(token)
try:
resp = client.delete(
f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}",
headers=_auth(admin_token),
params={"erase": "true"},
)
except Exception as exc: # noqa: BLE001
logger.info(
"guest prune failed",
extra={"event": "comms_guest_prune", "status": "error", "detail": str(exc)},
)
return False
if resp.status_code in (HTTP_OK, HTTP_ACCEPTED, HTTP_NO_CONTENT, HTTP_NOT_FOUND):
return True
logger.info(
"guest prune failed",
extra={
"event": "comms_guest_prune",
"status": "error",
"detail": f"{resp.status_code} {resp.text}",
},
)
return False
def _get_displayname(self, client: httpx.Client, token: str, user_id: str) -> str | None:
resp = client.get(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/profile/{urllib.parse.quote(user_id)}",
headers=_auth(token),
)
resp.raise_for_status()
return resp.json().get("displayname")
def _get_displayname_admin(self, client: httpx.Client, token: str, user_id: str) -> str | None:
admin_token = self._admin_token(token)
resp = client.get(
f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}",
headers=_auth(admin_token),
)
if resp.status_code == HTTP_NOT_FOUND:
return None
resp.raise_for_status()
return resp.json().get("displayname")
def _set_displayname(
self,
client: httpx.Client,
token: str,
target: DisplayNameTarget,
) -> None:
resp = client.put(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/profile/{urllib.parse.quote(target.user_id)}/displayname",
headers=_auth(token),
json={"displayname": target.name},
)
resp.raise_for_status()
if not target.in_room:
return
state_url = (
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(target.room_id)}"
f"/state/m.room.member/{urllib.parse.quote(target.user_id)}"
)
client.put(
state_url,
headers=_auth(token),
json={"membership": "join", "displayname": target.name},
)
def _set_displayname_admin(self, client: httpx.Client, token: str, user_id: str, name: str) -> bool:
admin_token = self._admin_token(token)
resp = client.put(
f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}",
headers=_auth(admin_token),
json={"displayname": name},
)
return resp.status_code in (HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT)
def _db_rename_numeric(self, existing: set[str]) -> int:
settings = self._settings
if not settings.comms_synapse_db_password:
return 0
renamed = 0
conn = self._connect_synapse_db()
try:
with conn:
with conn.cursor() as cur:
pattern = f"^@\\d+:{settings.comms_server_name}$"
cur.execute(
"SELECT user_id, full_user_id, displayname FROM profiles WHERE full_user_id ~ %s",
(pattern,),
)
profile_rows = cur.fetchall()
profile_index = {row[1]: row for row in profile_rows}
for _user_id, full_user_id, display in profile_rows:
if display and not _needs_rename_display(display):
continue
new_name = self._pick_guest_name(existing)
if not new_name:
continue
cur.execute(
"UPDATE profiles SET displayname = %s WHERE full_user_id = %s",
(new_name, full_user_id),
)
renamed += 1
cur.execute(
"SELECT name FROM users WHERE name ~ %s",
(pattern,),
)
users = [row[0] for row in cur.fetchall()]
if not users:
return renamed
cur.execute(
"SELECT user_id, full_user_id FROM profiles WHERE full_user_id = ANY(%s)",
(users,),
)
for existing_full in cur.fetchall():
profile_index.setdefault(existing_full[1], existing_full)
for full_user_id in users:
if full_user_id in profile_index:
continue
localpart = full_user_id.split(":", 1)[0].lstrip("@")
new_name = self._pick_guest_name(existing)
if not new_name:
continue
cur.execute(
"INSERT INTO profiles (user_id, displayname, full_user_id) VALUES (%s, %s, %s) "
"ON CONFLICT (full_user_id) DO UPDATE SET displayname = EXCLUDED.displayname",
(localpart, new_name, full_user_id),
)
renamed += 1
finally:
conn.close()
return renamed
def _validate_guest_name_settings(self) -> None:
if not self._settings.comms_mas_admin_client_id or not self._settings.comms_mas_admin_client_secret:
raise RuntimeError("comms mas admin secret missing")
if not self._settings.comms_synapse_base:
raise RuntimeError("comms synapse base missing")
def _room_context(self, client: httpx.Client, token: str) -> tuple[str, set[str], set[str]]:
room_id = self._resolve_alias(client, token, self._settings.comms_room_alias)
members, existing = self._room_members(client, token, room_id)
return room_id, members, existing
def _rename_mas_guests(
self,
client: httpx.Client,
admin_token: str,
room_id: str,
members: set[str],
existing: set[str],
) -> MasGuestResult:
renamed = 0
skipped = 0
mas_usernames: set[str] = set()
users = self._mas_list_users(client, admin_token)
for user in users:
attrs = user.get("attributes") or {}
username = attrs.get("username") or ""
if isinstance(username, str) and username:
mas_usernames.add(username)
legacy_guest = attrs.get("legacy_guest")
if not isinstance(username, str) or not username:
skipped += 1
continue
if not (legacy_guest or _needs_rename_username(username)):
skipped += 1
continue
user_id = user.get("id")
if not isinstance(user_id, str) or not user_id:
skipped += 1
continue
full_user = f"@{username}:{self._settings.comms_server_name}"
access_token, session_id = self._mas_personal_session(client, admin_token, user_id)
try:
display = self._get_displayname(client, access_token, full_user)
if display and not _needs_rename_display(display):
skipped += 1
continue
new_name = self._pick_guest_name(existing)
if not new_name:
skipped += 1
continue
self._set_displayname(
client,
access_token,
DisplayNameTarget(
room_id=room_id,
user_id=full_user,
name=new_name,
in_room=full_user in members,
),
)
renamed += 1
finally:
self._mas_revoke_session(client, admin_token, session_id)
return MasGuestResult(renamed=renamed, skipped=skipped, usernames=mas_usernames)
def _synapse_entries(self, client: httpx.Client, token: str) -> list[dict[str, Any]]:
try:
return self._synapse_list_users(client, token)
except Exception as exc: # noqa: BLE001
logger.info(
"synapse admin list skipped",
extra={"event": "comms_guest_list", "status": "error", "detail": str(exc)},
)
return []
def _synapse_user_id(self, entry: dict[str, Any]) -> SynapseUserRef | None:
user_id = entry.get("name") or ""
if not isinstance(user_id, str) or not user_id.startswith("@"):
return None
localpart = user_id.split(":", 1)[0].lstrip("@")
return SynapseUserRef(entry=entry, user_id=user_id, localpart=localpart)
def _maybe_prune_synapse_guest(
self,
client: httpx.Client,
token: str,
entry: dict[str, Any],
user_id: str,
now_ms: int,
) -> bool:
if not entry.get("is_guest"):
return False
if not self._should_prune_guest(entry, now_ms):
return False
return self._prune_guest(client, token, user_id)
def _needs_synapse_rename(
self,
client: httpx.Client,
token: str,
user: SynapseUserRef,
mas_usernames: set[str],
) -> bool:
if user.localpart in mas_usernames:
return False
is_guest = user.entry.get("is_guest")
if not (is_guest or _needs_rename_username(user.localpart)):
return False
display = self._get_displayname_admin(client, token, user.user_id)
if display and not _needs_rename_display(display):
return False
return True
def _rename_synapse_user(self, client: httpx.Client, token: str, existing: set[str], user_id: str) -> bool:
new_name = self._pick_guest_name(existing)
if not new_name:
return False
return self._set_displayname_admin(client, token, user_id, new_name)
def _rename_synapse_guests(
self,
client: httpx.Client,
token: str,
existing: set[str],
mas_usernames: set[str],
) -> SynapseGuestResult:
renamed = 0
pruned = 0
entries = self._synapse_entries(client, token)
now_ms = int(self._time() * 1000)
for entry in entries:
user_ref = self._synapse_user_id(entry)
if not user_ref:
continue
if self._maybe_prune_synapse_guest(client, token, user_ref.entry, user_ref.user_id, now_ms):
pruned += 1
continue
if not self._needs_synapse_rename(client, token, user_ref, mas_usernames):
continue
if self._rename_synapse_user(client, token, existing, user_ref.user_id):
renamed += 1
return SynapseGuestResult(renamed=renamed, pruned=pruned)
def run_guest_name_randomizer(self, wait: bool = True) -> dict[str, Any]:
self._validate_guest_name_settings()
with self._client() as client:
admin_token = self._mas_admin_token(client)
seeder_id = self._mas_user_id(client, admin_token, self._settings.comms_seeder_user)
seeder_token, seeder_session = self._mas_personal_session(client, admin_token, seeder_id)
try:
room_id, members, existing = self._room_context(client, seeder_token)
mas_result = self._rename_mas_guests(client, admin_token, room_id, members, existing)
synapse_result = self._rename_synapse_guests(
client,
seeder_token,
existing,
mas_result.usernames,
)
db_renamed = self._db_rename_numeric(existing)
finally:
self._mas_revoke_session(client, admin_token, seeder_session)
renamed = mas_result.renamed + synapse_result.renamed + db_renamed
pruned = synapse_result.pruned
skipped = mas_result.skipped
processed = renamed + pruned + skipped
summary = CommsSummary(processed, renamed, pruned, skipped)
logger.info(
"comms guest name sync finished",
extra={
"event": "comms_guest_name",
"status": "ok",
"processed": summary.processed,
"renamed": summary.renamed,
"pruned": summary.pruned,
"skipped": summary.skipped,
},
)
return {"status": "ok", **summary.__dict__}

View File

@ -0,0 +1,72 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
HTTP_OK = 200
HTTP_CREATED = 201
HTTP_ACCEPTED = 202
HTTP_NO_CONTENT = 204
HTTP_NOT_FOUND = 404
HTTP_CONFLICT = 409
@dataclass(frozen=True)
class CommsSummary:
processed: int
renamed: int
pruned: int
skipped: int
detail: str = ""
@dataclass(frozen=True)
class MasGuestResult:
renamed: int
skipped: int
usernames: set[str]
@dataclass(frozen=True)
class SynapseGuestResult:
renamed: int
pruned: int
@dataclass(frozen=True)
class DisplayNameTarget:
room_id: str
user_id: str
name: str
in_room: bool
@dataclass(frozen=True)
class SynapseUserRef:
entry: dict[str, Any]
user_id: str
localpart: str
def _auth(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}"}
def _canon_user(user: str, server_name: str) -> str:
user = (user or "").strip()
if user.startswith("@") and ":" in user:
return user
user = user.lstrip("@")
if ":" in user:
return f"@{user}"
return f"@{user}:{server_name}"
def _needs_rename_username(username: str) -> bool:
return username.isdigit() or username.startswith("guest-")
def _needs_rename_display(display: str | None) -> bool:
if not display:
return True
return display.isdigit() or display.startswith("guest-")

View File

@ -0,0 +1,389 @@
from __future__ import annotations
from typing import Any
import urllib.parse
import httpx
from ..utils.logging import get_logger
from .comms_protocol import (
HTTP_ACCEPTED,
HTTP_CONFLICT,
HTTP_CREATED,
HTTP_NOT_FOUND,
HTTP_OK,
_auth,
_canon_user,
)
logger = get_logger(__name__)
class _CommsRoomOpsMixin:
def run_pin_invite(self, wait: bool = True) -> dict[str, Any]:
if not self._settings.comms_seeder_password:
raise RuntimeError("comms seeder password missing")
with self._client() as client:
token = self._login(client, self._settings.comms_seeder_user, self._settings.comms_seeder_password)
room_id = self._resolve_alias(client, token, self._settings.comms_room_alias)
pinned = self._get_pinned(client, token, room_id)
for event_id in pinned:
event = self._get_event(client, token, room_id, event_id)
if event and (event.get("content") or {}).get("body") == self._settings.comms_pin_message:
return {"status": "ok", "detail": "already pinned"}
event_id = self._send_message(client, token, room_id, self._settings.comms_pin_message)
if not event_id:
return {"status": "error", "detail": "pin event_id missing"}
self._pin_message(client, token, room_id, event_id)
return {"status": "ok", "detail": "pinned"}
def run_reset_room(self, wait: bool = True) -> dict[str, Any]:
if not self._settings.comms_seeder_password:
raise RuntimeError("comms seeder password missing")
with self._client() as client:
token = self._login_with_retry(client, self._settings.comms_seeder_user, self._settings.comms_seeder_password)
old_room_id = self._resolve_alias(client, token, self._settings.comms_room_alias)
new_room_id = self._create_room(client, token, self._settings.comms_room_name)
self._set_room_state(client, token, new_room_id, "m.room.join_rules", {"join_rule": "public"})
self._set_room_state(client, token, new_room_id, "m.room.guest_access", {"guest_access": "can_join"})
self._set_room_state(
client,
token,
new_room_id,
"m.room.history_visibility",
{"history_visibility": "shared"},
)
self._set_room_state(client, token, new_room_id, "m.room.power_levels", self._power_levels())
self._delete_alias(client, token, self._settings.comms_room_alias)
self._put_alias(client, token, self._settings.comms_room_alias, new_room_id)
self._set_room_state(
client,
token,
new_room_id,
"m.room.canonical_alias",
{"alias": self._settings.comms_room_alias},
)
self._set_directory_visibility(client, token, new_room_id, "public")
bot_user_id = _canon_user(self._settings.comms_bot_user, self._settings.comms_server_name)
self._invite_user(client, token, new_room_id, bot_user_id)
for uid in self._list_joined_members(client, token, old_room_id):
if uid == _canon_user(self._settings.comms_seeder_user, self._settings.comms_server_name):
continue
localpart = uid.split(":", 1)[0].lstrip("@")
if localpart.isdigit():
continue
self._invite_user(client, token, new_room_id, uid)
event_id = self._send_message(client, token, new_room_id, self._settings.comms_pin_message)
if not event_id:
raise RuntimeError("pin message event_id missing")
self._set_room_state(client, token, new_room_id, "m.room.pinned_events", {"pinned": [event_id]})
self._set_directory_visibility(client, token, old_room_id, "private")
self._set_room_state(client, token, old_room_id, "m.room.join_rules", {"join_rule": "invite"})
self._set_room_state(client, token, old_room_id, "m.room.guest_access", {"guest_access": "forbidden"})
self._set_room_state(
client,
token,
old_room_id,
"m.room.tombstone",
{
"body": "Othrys has been reset. Please join the new room.",
"replacement_room": new_room_id,
},
)
self._send_message(
client,
token,
old_room_id,
"Othrys was reset. Join the new room at https://live.bstein.dev/#/room/#othrys:live.bstein.dev?action=join",
)
return {"status": "ok", "detail": f"old_room_id={old_room_id} new_room_id={new_room_id}"}
def run_seed_room(self, wait: bool = True) -> dict[str, Any]:
if not self._settings.comms_seeder_password or not self._settings.comms_bot_password:
raise RuntimeError("comms seeder/bot password missing")
with self._client() as client:
token = self._login(client, self._settings.comms_seeder_user, self._settings.comms_seeder_password)
for user, password, admin in (
(self._settings.comms_seeder_user, self._settings.comms_seeder_password, True),
(self._settings.comms_bot_user, self._settings.comms_bot_password, False),
):
try:
self._ensure_user(client, token, user, password, admin)
except RuntimeError as exc:
message = str(exc)
if "You are not a server admin" in message:
logger.warning(
"comms seed room ensure skipped",
extra={"event": "comms_seed_room", "user": user, "detail": message},
)
continue
raise
room_id = self._ensure_room(client, token)
self._join_user(client, token, room_id, _canon_user(self._settings.comms_bot_user, self._settings.comms_server_name))
self._join_all_locals(client, token, room_id)
return {"status": "ok", "detail": "room seeded"}
def _login(self, client: httpx.Client, user: str, password: str) -> str:
resp = client.post(
f"{self._settings.comms_auth_base}/_matrix/client/v3/login",
json={
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": _canon_user(user, self._settings.comms_server_name)},
"password": password,
},
)
if resp.status_code != HTTP_OK:
raise RuntimeError(f"login failed: {resp.status_code} {resp.text}")
payload = resp.json()
token = payload.get("access_token")
if not isinstance(token, str) or not token:
raise RuntimeError("login missing token")
return token
def _login_with_retry(self, client: httpx.Client, user: str, password: str) -> str:
last: Exception | None = None
for attempt in range(1, 6):
try:
return self._login(client, user, password)
except Exception as exc: # noqa: BLE001
last = exc
self._sleep(attempt * 2)
raise RuntimeError(str(last) if last else "login failed")
def _resolve_alias(self, client: httpx.Client, token: str, alias: str) -> str:
resp = client.get(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}",
headers=_auth(token),
)
resp.raise_for_status()
payload = resp.json()
return payload["room_id"]
def _get_pinned(self, client: httpx.Client, token: str, room_id: str) -> list[str]:
resp = client.get(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events",
headers=_auth(token),
)
if resp.status_code == HTTP_NOT_FOUND:
return []
resp.raise_for_status()
pinned = resp.json().get("pinned", [])
return [item for item in pinned if isinstance(item, str)]
def _get_event(self, client: httpx.Client, token: str, room_id: str, event_id: str) -> dict[str, Any] | None:
resp = client.get(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/event/{urllib.parse.quote(event_id)}",
headers=_auth(token),
)
if resp.status_code == HTTP_NOT_FOUND:
return None
resp.raise_for_status()
return resp.json()
def _send_message(self, client: httpx.Client, token: str, room_id: str, body: str) -> str:
resp = client.post(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/send/m.room.message",
headers=_auth(token),
json={"msgtype": "m.text", "body": body},
)
resp.raise_for_status()
payload = resp.json()
event_id = payload.get("event_id")
return event_id if isinstance(event_id, str) else ""
def _pin_message(self, client: httpx.Client, token: str, room_id: str, event_id: str) -> None:
resp = client.put(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.pinned_events",
headers=_auth(token),
json={"pinned": [event_id]},
)
resp.raise_for_status()
def _create_room(self, client: httpx.Client, token: str, name: str) -> str:
resp = client.post(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/createRoom",
headers=_auth(token),
json={"preset": "public_chat", "name": name, "room_version": "11"},
)
resp.raise_for_status()
return resp.json()["room_id"]
def _set_room_state(self, client: httpx.Client, token: str, room_id: str, ev_type: str, content: dict[str, Any]) -> None:
resp = client.put(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/{ev_type}",
headers=_auth(token),
json=content,
)
resp.raise_for_status()
def _set_directory_visibility(self, client: httpx.Client, token: str, room_id: str, visibility: str) -> None:
resp = client.put(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/list/room/{urllib.parse.quote(room_id)}",
headers=_auth(token),
json={"visibility": visibility},
)
resp.raise_for_status()
def _delete_alias(self, client: httpx.Client, token: str, alias: str) -> None:
resp = client.delete(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}",
headers=_auth(token),
)
if resp.status_code in (HTTP_OK, HTTP_ACCEPTED, HTTP_NOT_FOUND):
return
resp.raise_for_status()
def _put_alias(self, client: httpx.Client, token: str, alias: str, room_id: str) -> None:
resp = client.put(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{urllib.parse.quote(alias)}",
headers=_auth(token),
json={"room_id": room_id},
)
resp.raise_for_status()
def _list_joined_members(self, client: httpx.Client, token: str, room_id: str) -> list[str]:
resp = client.get(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members?membership=join",
headers=_auth(token),
)
resp.raise_for_status()
members = []
for ev in resp.json().get("chunk", []) or []:
if ev.get("type") != "m.room.member":
continue
uid = ev.get("state_key")
if isinstance(uid, str) and uid.startswith("@"):
members.append(uid)
return members
def _invite_user(self, client: httpx.Client, token: str, room_id: str, user_id: str) -> None:
resp = client.post(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/invite",
headers=_auth(token),
json={"user_id": user_id},
)
if resp.status_code in (HTTP_OK, HTTP_ACCEPTED):
return
resp.raise_for_status()
def _power_levels(self) -> dict[str, Any]:
return {
"ban": 50,
"events": {
"m.room.avatar": 50,
"m.room.canonical_alias": 50,
"m.room.encryption": 100,
"m.room.history_visibility": 100,
"m.room.name": 50,
"m.room.power_levels": 100,
"m.room.server_acl": 100,
"m.room.tombstone": 100,
},
"events_default": 0,
"historical": 100,
"invite": 50,
"kick": 50,
"m.call.invite": 50,
"redact": 50,
"state_default": 50,
"users": {_canon_user(self._settings.comms_seeder_user, self._settings.comms_server_name): 100},
"users_default": 0,
}
def _ensure_user(self, client: httpx.Client, token: str, localpart: str, password: str, admin: bool) -> None:
admin_token = self._admin_token(token)
user_id = _canon_user(localpart, self._settings.comms_server_name)
url = f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}"
resp = client.get(url, headers=_auth(admin_token))
if resp.status_code == HTTP_OK:
return
payload = {"password": password, "admin": admin, "deactivated": False}
create = client.put(url, headers=_auth(admin_token), json=payload)
if create.status_code not in (HTTP_OK, HTTP_CREATED):
raise RuntimeError(f"create user {user_id} failed: {create.status_code} {create.text}")
def _ensure_room(self, client: httpx.Client, token: str) -> str:
alias = self._settings.comms_room_alias
alias_enc = urllib.parse.quote(alias)
exists = client.get(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{alias_enc}",
headers=_auth(token),
)
if exists.status_code == HTTP_OK:
room_id = exists.json()["room_id"]
else:
create = client.post(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/createRoom",
headers=_auth(token),
json={
"preset": "public_chat",
"name": self._settings.comms_room_name,
"room_alias_name": alias.split(":", 1)[0].lstrip("#"),
"initial_state": [],
"power_level_content_override": {
"events_default": 0,
"users_default": 0,
"state_default": 50,
},
},
)
if create.status_code not in (HTTP_OK, HTTP_CONFLICT):
raise RuntimeError(f"create room failed: {create.status_code} {create.text}")
exists = client.get(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/room/{alias_enc}",
headers=_auth(token),
)
room_id = exists.json()["room_id"]
state_events = [
("m.room.join_rules", {"join_rule": "public"}),
("m.room.guest_access", {"guest_access": "can_join"}),
("m.room.history_visibility", {"history_visibility": "shared"}),
("m.room.canonical_alias", {"alias": alias}),
]
for ev_type, content in state_events:
client.put(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/rooms/{room_id}/state/{ev_type}",
headers=_auth(token),
json=content,
)
client.put(
f"{self._settings.comms_synapse_base}/_matrix/client/v3/directory/list/room/{room_id}",
headers=_auth(token),
json={"visibility": "public"},
)
return room_id
def _join_user(self, client: httpx.Client, token: str, room_id: str, user_id: str) -> None:
admin_token = self._admin_token(token)
client.post(
f"{self._settings.comms_synapse_base}/_synapse/admin/v1/join/{urllib.parse.quote(room_id)}",
headers=_auth(admin_token),
json={"user_id": user_id},
)
def _join_all_locals(self, client: httpx.Client, token: str, room_id: str) -> None:
users: list[str] = []
from_token = None
admin_token = self._admin_token(token)
while True:
url = f"{self._settings.comms_synapse_base}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"
if from_token:
url += f"&from={from_token}"
resp = client.get(url, headers=_auth(admin_token))
payload = resp.json()
users.extend([u["name"] for u in payload.get("users", []) if isinstance(u, dict) and u.get("name")])
from_token = payload.get("next_token")
if not from_token:
break
for uid in users:
self._join_user(client, token, room_id, uid)

View File

@ -1,7 +1,6 @@
# path reason
ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog
ariadne/app.py split planned; Flask app bootstrap/routes currently co-located
ariadne/services/comms.py split planned; comms adapters still consolidated
ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction
tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile
tests/test_services.py test module split planned; broad service contract coverage retained meanwhile

1 # path reason
2 ariadne/services/cluster_state.py split planned; service orchestration decomposition tracked in hygiene backlog
3 ariadne/app.py split planned; Flask app bootstrap/routes currently co-located
ariadne/services/comms.py split planned; comms adapters still consolidated
4 ariadne/manager/provisioning.py split planned; provisioning flow modules pending extraction
5 tests/test_provisioning.py test module split planned; broad provisioning coverage retained meanwhile
6 tests/test_services.py test module split planned; broad service contract coverage retained meanwhile