titan-iac/services/comms/guest-name-job.yaml

402 lines
20 KiB
YAML

# services/comms/guest-name-job.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: guest-name-randomizer
namespace: comms
spec:
schedule: "*/1 * * * *"
suspend: false
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 1
failedJobsHistoryLimit: 1
jobTemplate:
spec:
backoffLimit: 0
template:
spec:
restartPolicy: Never
volumes:
- name: mas-admin-client
secret:
secretName: mas-admin-client-runtime
items:
- key: client_secret
path: client_secret
containers:
- name: rename
image: python:3.11-slim
volumeMounts:
- name: mas-admin-client
mountPath: /etc/mas-admin-client
readOnly: true
env:
- name: SYNAPSE_BASE
value: http://othrys-synapse-matrix-synapse:8008
- name: MAS_ADMIN_CLIENT_ID
value: 01KDXMVQBQ5JNY6SEJPZW6Z8BM
- name: MAS_ADMIN_CLIENT_SECRET_FILE
value: /etc/mas-admin-client/client_secret
- name: MAS_ADMIN_API_BASE
value: http://matrix-authentication-service:8081/api/admin/v1
- name: MAS_TOKEN_URL
value: http://matrix-authentication-service:8080/oauth2/token
- name: SEEDER_USER
value: othrys-seeder
- name: PGHOST
value: postgres-service.postgres.svc.cluster.local
- name: PGPORT
value: "5432"
- name: PGDATABASE
value: synapse
- name: PGUSER
value: synapse
- name: PGPASSWORD
valueFrom:
secretKeyRef:
name: synapse-db
key: POSTGRES_PASSWORD
command:
- /bin/sh
- -c
- |
set -euo pipefail
pip install --no-cache-dir requests psycopg2-binary >/dev/null
python - <<'PY'
import base64
import os
import random
import requests
import time
import urllib.parse
import psycopg2
ADJ = [
"brisk","calm","eager","gentle","merry","nifty","rapid","sunny","witty","zesty",
"amber","bold","bright","crisp","daring","frosty","glad","jolly","lively","mellow",
"quiet","ripe","serene","spry","tidy","vivid","warm","wild","clever","kind",
]
NOUN = [
"otter","falcon","comet","ember","grove","harbor","meadow","raven","river","summit",
"breeze","cedar","cinder","cove","delta","forest","glade","lark","marsh","peak",
"pine","quartz","reef","ridge","sable","sage","shore","thunder","vale","zephyr",
]
BASE = os.environ["SYNAPSE_BASE"]
MAS_ADMIN_CLIENT_ID = os.environ["MAS_ADMIN_CLIENT_ID"]
MAS_ADMIN_CLIENT_SECRET_FILE = os.environ["MAS_ADMIN_CLIENT_SECRET_FILE"]
MAS_ADMIN_API_BASE = os.environ["MAS_ADMIN_API_BASE"].rstrip("/")
MAS_TOKEN_URL = os.environ["MAS_TOKEN_URL"]
SEEDER_USER = os.environ["SEEDER_USER"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
SERVER_NAME = "live.bstein.dev"
def mas_admin_token():
with open(MAS_ADMIN_CLIENT_SECRET_FILE, "r", encoding="utf-8") as f:
secret = f.read().strip()
basic = base64.b64encode(f"{MAS_ADMIN_CLIENT_ID}:{secret}".encode()).decode()
last_err = None
for attempt in range(5):
try:
r = requests.post(
MAS_TOKEN_URL,
headers={"Authorization": f"Basic {basic}"},
data={"grant_type": "client_credentials", "scope": "urn:mas:admin"},
timeout=30,
)
r.raise_for_status()
return r.json()["access_token"]
except Exception as exc: # noqa: BLE001
last_err = exc
time.sleep(2 ** attempt)
raise last_err
def mas_user_id(token, username):
r = requests.get(
f"{MAS_ADMIN_API_BASE}/users/by-username/{urllib.parse.quote(username)}",
headers={"Authorization": f"Bearer {token}"},
timeout=30,
)
r.raise_for_status()
return r.json()["data"]["id"]
def mas_personal_session(token, user_id):
r = requests.post(
f"{MAS_ADMIN_API_BASE}/personal-sessions",
headers={"Authorization": f"Bearer {token}"},
json={
"actor_user_id": user_id,
"human_name": "guest-name-randomizer",
"scope": "urn:matrix:client:api:*",
"expires_in": 300,
},
timeout=30,
)
r.raise_for_status()
data = r.json().get("data", {}).get("attributes", {}) or {}
return data["access_token"], r.json()["data"]["id"]
def mas_revoke_session(token, session_id):
requests.post(
f"{MAS_ADMIN_API_BASE}/personal-sessions/{urllib.parse.quote(session_id)}/revoke",
headers={"Authorization": f"Bearer {token}"},
json={},
timeout=30,
)
def resolve_alias(token, alias):
headers = {"Authorization": f"Bearer {token}"}
enc = urllib.parse.quote(alias)
r = requests.get(f"{BASE}/_matrix/client/v3/directory/room/{enc}", headers=headers)
r.raise_for_status()
return r.json()["room_id"]
def room_members(token, room_id):
headers = {"Authorization": f"Bearer {token}"}
r = requests.get(f"{BASE}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members", headers=headers)
r.raise_for_status()
members = set()
existing_names = set()
for ev in r.json().get("chunk", []):
user_id = ev.get("state_key")
if user_id:
members.add(user_id)
disp = (ev.get("content") or {}).get("displayname")
if disp:
existing_names.add(disp)
return members, existing_names
def mas_list_users(token):
headers = {"Authorization": f"Bearer {token}"}
users = []
cursor = None
while True:
url = f"{MAS_ADMIN_API_BASE}/users?page[size]=100"
if cursor:
url += f"&page[after]={urllib.parse.quote(cursor)}"
r = requests.get(url, headers=headers, timeout=30)
r.raise_for_status()
data = r.json().get("data", [])
if not data:
break
users.extend(data)
cursor = data[-1].get("meta", {}).get("page", {}).get("cursor")
if not cursor:
break
return users
def synapse_list_users(token):
headers = {"Authorization": f"Bearer {token}"}
users = []
from_token = None
while True:
url = f"{BASE}/_synapse/admin/v2/users?local=true&deactivated=false&limit=100"
if from_token:
url += f"&from={urllib.parse.quote(from_token)}"
r = requests.get(url, headers=headers, timeout=30)
r.raise_for_status()
payload = r.json()
users.extend(payload.get("users", []))
from_token = payload.get("next_token")
if not from_token:
break
return users
def user_id_for_username(username):
return f"@{username}:live.bstein.dev"
def get_displayname(token, user_id):
headers = {"Authorization": f"Bearer {token}"}
r = requests.get(f"{BASE}/_matrix/client/v3/profile/{urllib.parse.quote(user_id)}", headers=headers)
r.raise_for_status()
return r.json().get("displayname")
def get_displayname_admin(token, user_id):
headers = {"Authorization": f"Bearer {token}"}
r = requests.get(
f"{BASE}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}",
headers=headers,
timeout=30,
)
if r.status_code == 404:
return None
r.raise_for_status()
return r.json().get("displayname")
def set_displayname(token, room_id, user_id, name, in_room):
headers = {"Authorization": f"Bearer {token}"}
payload = {"displayname": name}
r = requests.put(
f"{BASE}/_matrix/client/v3/profile/{urllib.parse.quote(user_id)}/displayname",
headers=headers,
json=payload,
)
r.raise_for_status()
if not in_room:
return
state_url = f"{BASE}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/state/m.room.member/{urllib.parse.quote(user_id)}"
content = {"membership": "join", "displayname": name}
requests.put(state_url, headers=headers, json=content, timeout=30)
def set_displayname_admin(token, user_id, name):
headers = {"Authorization": f"Bearer {token}"}
payload = {"displayname": name}
r = requests.put(
f"{BASE}/_synapse/admin/v2/users/{urllib.parse.quote(user_id)}",
headers=headers,
json=payload,
timeout=30,
)
if r.status_code in (200, 201, 204):
return True
return False
def needs_rename_username(username):
return username.isdigit() or username.startswith("guest-")
def needs_rename_display(display):
return not display or display.isdigit() or display.startswith("guest-")
def db_rename_numeric(existing_names):
profile_rows = []
profile_index = {}
users = []
conn = psycopg2.connect(
host=os.environ["PGHOST"],
port=int(os.environ["PGPORT"]),
dbname=os.environ["PGDATABASE"],
user=os.environ["PGUSER"],
password=os.environ["PGPASSWORD"],
)
try:
with conn:
with conn.cursor() as cur:
cur.execute(
"SELECT user_id, full_user_id, displayname FROM profiles WHERE full_user_id ~ %s",
(f"^@\\d+:{SERVER_NAME}$",),
)
profile_rows = cur.fetchall()
profile_index = {row[1]: row for row in profile_rows}
for user_id, full_user_id, display in profile_rows:
if display and not needs_rename_display(display):
continue
new = None
for _ in range(30):
candidate = f"{random.choice(ADJ)}-{random.choice(NOUN)}"
if candidate not in existing_names:
new = candidate
existing_names.add(candidate)
break
if not new:
continue
cur.execute(
"UPDATE profiles SET displayname = %s WHERE full_user_id = %s",
(new, full_user_id),
)
cur.execute(
"SELECT name FROM users WHERE name ~ %s",
(f"^@\\d+:{SERVER_NAME}$",),
)
users = [row[0] for row in cur.fetchall()]
if not users:
return
cur.execute(
"SELECT user_id, full_user_id FROM profiles WHERE full_user_id = ANY(%s)",
(users,),
)
for existing_full in cur.fetchall():
profile_index.setdefault(existing_full[1], existing_full)
for full_user_id in users:
if full_user_id in profile_index:
continue
localpart = full_user_id.split(":", 1)[0].lstrip("@")
new = None
for _ in range(30):
candidate = f"{random.choice(ADJ)}-{random.choice(NOUN)}"
if candidate not in existing_names:
new = candidate
existing_names.add(candidate)
break
if not new:
continue
cur.execute(
"INSERT INTO profiles (user_id, displayname, full_user_id) VALUES (%s, %s, %s) "
"ON CONFLICT (full_user_id) DO UPDATE SET displayname = EXCLUDED.displayname",
(localpart, new, full_user_id),
)
finally:
conn.close()
admin_token = mas_admin_token()
seeder_id = mas_user_id(admin_token, SEEDER_USER)
seeder_token, seeder_session = mas_personal_session(admin_token, seeder_id)
try:
room_id = resolve_alias(seeder_token, ROOM_ALIAS)
members, existing = room_members(seeder_token, room_id)
users = mas_list_users(admin_token)
mas_usernames = set()
for user in users:
attrs = user.get("attributes") or {}
username = attrs.get("username") or ""
if username:
mas_usernames.add(username)
legacy_guest = attrs.get("legacy_guest")
if not username:
continue
if not (legacy_guest or needs_rename_username(username)):
continue
user_id = user_id_for_username(username)
access_token, session_id = mas_personal_session(admin_token, user["id"])
try:
display = get_displayname(access_token, user_id)
if display and not needs_rename_display(display):
continue
new = None
for _ in range(30):
candidate = f"{random.choice(ADJ)}-{random.choice(NOUN)}"
if candidate not in existing:
new = candidate
existing.add(candidate)
break
if not new:
continue
set_displayname(access_token, room_id, user_id, new, user_id in members)
finally:
mas_revoke_session(admin_token, session_id)
try:
entries = synapse_list_users(seeder_token)
except Exception as exc: # noqa: BLE001
print(f"synapse admin list skipped: {exc}")
entries = []
for entry in entries:
user_id = entry.get("name") or ""
if not user_id.startswith("@"):
continue
localpart = user_id.split(":", 1)[0].lstrip("@")
if localpart in mas_usernames:
continue
is_guest = entry.get("is_guest")
if not (is_guest or needs_rename_username(localpart)):
continue
display = get_displayname_admin(seeder_token, user_id)
if display and not needs_rename_display(display):
continue
new = None
for _ in range(30):
candidate = f"{random.choice(ADJ)}-{random.choice(NOUN)}"
if candidate not in existing:
new = candidate
existing.add(candidate)
break
if not new:
continue
if not set_displayname_admin(seeder_token, user_id, new):
continue
db_rename_numeric(existing)
finally:
mas_revoke_session(admin_token, seeder_session)
PY