# services/communication/bstein-force-leave-job.yaml apiVersion: batch/v1 kind: Job metadata: name: bstein-force-leave-6 namespace: comms spec: backoffLimit: 0 template: spec: restartPolicy: Never containers: - name: force-leave image: python:3.11-slim env: - name: POSTGRES_HOST value: postgres-service.postgres.svc.cluster.local - name: POSTGRES_PORT value: "5432" - name: POSTGRES_DB value: synapse - name: POSTGRES_USER valueFrom: secretKeyRef: name: synapse-db key: POSTGRES_USER - name: POSTGRES_PASSWORD valueFrom: secretKeyRef: name: synapse-db key: POSTGRES_PASSWORD - name: SYNAPSE_BASE value: http://othrys-synapse-matrix-synapse:8008 - name: AUTH_BASE value: http://matrix-authentication-service:8080 - name: SERVER_NAME value: live.bstein.dev - name: SEEDER_USER value: othrys-seeder - name: SEEDER_PASS valueFrom: secretKeyRef: name: atlasbot-credentials-runtime key: seeder-password - name: TARGET_USER_ID value: "@bstein:live.bstein.dev" - name: TARGET_ROOMS value: "!OkltaJguODUnZrbcUp:live.bstein.dev,!pMKAVvSRheIOCPIjDM:live.bstein.dev" command: - /bin/sh - -c - | set -euo pipefail pip install --no-cache-dir requests psycopg2-binary >/dev/null python - <<'PY' import json, os, sys, urllib.parse import requests import psycopg2 DB = dict( host=os.environ["POSTGRES_HOST"], port=int(os.environ["POSTGRES_PORT"]), dbname=os.environ["POSTGRES_DB"], user=os.environ["POSTGRES_USER"], password=os.environ["POSTGRES_PASSWORD"], ) SYNAPSE_BASE = os.environ["SYNAPSE_BASE"] AUTH_BASE = os.environ["AUTH_BASE"] SEEDER_USER = os.environ["SEEDER_USER"] SEEDER_PASS = os.environ["SEEDER_PASS"] TARGET_USER_ID = os.environ["TARGET_USER_ID"] TARGET_ROOMS = [r.strip() for r in os.environ["TARGET_ROOMS"].split(",") if r.strip()] def db_connect(): return psycopg2.connect(**DB) def db_get_admin(conn, user_id): with conn.cursor() as cur: cur.execute("SELECT admin FROM users WHERE name = %s", (user_id,)) row = cur.fetchone() if not row: raise RuntimeError(f"user not found in synapse db: {user_id}") # Synapse stores admin as an int (0/1) return int(row[0]) def db_set_admin(conn, user_id, is_admin): with conn.cursor() as cur: cur.execute("UPDATE users SET admin = %s WHERE name = %s", (1 if is_admin else 0, user_id)) def login(user, password): r = requests.post( f"{AUTH_BASE}/_matrix/client/v3/login", json={ "type": "m.login.password", "identifier": {"type": "m.id.user", "user": user}, "password": password, }, timeout=20, ) if r.status_code != 200: raise RuntimeError(f"login failed: {r.status_code} {r.text}") return r.json()["access_token"] def whoami(token): r = requests.get( f"{SYNAPSE_BASE}/_matrix/client/v3/account/whoami", headers={"Authorization": f"Bearer {token}"}, timeout=20, ) r.raise_for_status() return r.json()["user_id"] def admin_delete_room(token, room_id): url = f"{SYNAPSE_BASE}/_synapse/admin/v1/rooms/{urllib.parse.quote(room_id)}" r = requests.delete( url, headers={"Authorization": f"Bearer {token}"}, json={"purge": False, "block": False}, timeout=60, ) if r.status_code != 200: raise RuntimeError(f"admin delete room failed: {room_id}: {r.status_code} {r.text}") return r.json() def admin_joined_rooms(token, user_id): url = f"{SYNAPSE_BASE}/_synapse/admin/v1/users/{urllib.parse.quote(user_id)}/joined_rooms" r = requests.get(url, headers={"Authorization": f"Bearer {token}"}, timeout=20) if r.status_code != 200: raise RuntimeError(f"admin joined_rooms failed: {r.status_code} {r.text}") return r.json().get("joined_rooms", []) results = {"target_user_id": TARGET_USER_ID, "rooms": {}} conn = db_connect() conn.autocommit = False try: try: token = login(SEEDER_USER, SEEDER_PASS) results["seeder_login"] = "ok" except Exception as e: results["seeder_login"] = "error" results["seeder_login_error"] = str(e) print(json.dumps(results, indent=2, sort_keys=True)) raise try: seeder_user_id = whoami(token) results["seeder_user_id"] = seeder_user_id except Exception as e: results["seeder_user_id_error"] = str(e) seeder_user_id = None if seeder_user_id: try: results["seeder_admin_db"] = db_get_admin(conn, seeder_user_id) except Exception as e: results["seeder_admin_db_error"] = str(e) if results.get("seeder_admin_db") == 0: try: db_set_admin(conn, seeder_user_id, True) conn.commit() results["seeder_admin_db_promoted"] = True except Exception as e: results["seeder_admin_db_promote_error"] = str(e) else: results["seeder_admin_db_promoted"] = False for room_id in TARGET_ROOMS: room_res = {} results["rooms"][room_id] = room_res try: room_res["delete"] = admin_delete_room(token, room_id) room_res["deleted"] = True except Exception as e: room_res["deleted"] = False room_res["delete_error"] = str(e) try: results["target_joined_rooms_after"] = admin_joined_rooms(token, TARGET_USER_ID) except Exception as e: results["target_joined_rooms_after_error"] = str(e) print(json.dumps(results, indent=2, sort_keys=True)) finally: conn.close() PY