titan-iac/services/comms/othrys-kick-numeric-job.yaml

116 lines
4.3 KiB
YAML
Raw Normal View History

# services/comms/othrys-kick-numeric-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
name: othrys-kick-numeric-1
namespace: comms
spec:
backoffLimit: 0
template:
spec:
restartPolicy: Never
containers:
- name: kick
image: python:3.11-slim
env:
- name: SYNAPSE_BASE
value: http://othrys-synapse-matrix-synapse:8008
- name: AUTH_BASE
value: http://matrix-authentication-service:8080
- name: SERVER_NAME
value: live.bstein.dev
- name: ROOM_ALIAS
value: "#othrys:live.bstein.dev"
- name: SEEDER_USER
value: othrys-seeder
- name: SEEDER_PASS
valueFrom:
secretKeyRef:
name: atlasbot-credentials-runtime
key: seeder-password
command:
- /bin/sh
- -c
- |
set -euo pipefail
pip install --no-cache-dir requests >/dev/null
python - <<'PY'
import os
import urllib.parse
import requests
BASE = os.environ["SYNAPSE_BASE"]
AUTH_BASE = os.environ.get("AUTH_BASE", BASE)
SERVER_NAME = os.environ.get("SERVER_NAME", "live.bstein.dev")
ROOM_ALIAS = os.environ.get("ROOM_ALIAS", "#othrys:live.bstein.dev")
SEEDER_USER = os.environ["SEEDER_USER"]
SEEDER_PASS = os.environ["SEEDER_PASS"]
def canon_user(user):
u = (user or "").strip()
if u.startswith("@") and ":" in u:
return u
u = u.lstrip("@")
if ":" in u:
return f"@{u}"
return f"@{u}:{SERVER_NAME}"
def auth(token):
return {"Authorization": f"Bearer {token}"}
def login(user, password):
r = requests.post(
f"{AUTH_BASE}/_matrix/client/v3/login",
json={
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": canon_user(user)},
"password": password,
},
timeout=30,
)
r.raise_for_status()
return r.json()["access_token"]
def resolve_alias(token, alias):
enc = urllib.parse.quote(alias)
r = requests.get(f"{BASE}/_matrix/client/v3/directory/room/{enc}", headers=auth(token), timeout=30)
r.raise_for_status()
return r.json()["room_id"]
def list_members(token, room_id):
r = requests.get(
f"{BASE}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/members?membership=join",
headers=auth(token),
timeout=30,
)
r.raise_for_status()
members = []
for ev in r.json().get("chunk", []):
uid = ev.get("state_key")
if isinstance(uid, str) and uid.startswith("@"):
members.append(uid)
return members
def is_numeric(user_id):
localpart = user_id.split(":", 1)[0].lstrip("@")
return localpart.isdigit()
def kick(token, room_id, user_id):
r = requests.post(
f"{BASE}/_matrix/client/v3/rooms/{urllib.parse.quote(room_id)}/kick",
headers=auth(token),
json={"user_id": user_id, "reason": "cleanup numeric guest"},
timeout=30,
)
if r.status_code not in (200, 202):
raise SystemExit(f"kick {user_id} failed: {r.status_code} {r.text}")
token = login(SEEDER_USER, SEEDER_PASS)
room_id = resolve_alias(token, ROOM_ALIAS)
for user_id in list_members(token, room_id):
if user_id == canon_user(SEEDER_USER):
continue
if is_numeric(user_id):
kick(token, room_id, user_id)
PY