titan-iac/services/communication/guest-register-configmap.yaml

250 lines
10 KiB
YAML

# services/communication/guest-register-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: matrix-guest-register
data:
server.py: |
import base64
import json
import os
import random
import secrets
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib import error, parse, request
MATRIX_BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008").rstrip("/")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080").rstrip("/")
MAS_ADMIN_BASE = os.environ.get("MAS_ADMIN_BASE", "http://matrix-authentication-service:8081").rstrip("/")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAS_ADMIN_CLIENT_ID = os.environ["MAS_ADMIN_CLIENT_ID"]
MAS_ADMIN_CLIENT_SECRET_FILE = os.environ.get("MAS_ADMIN_CLIENT_SECRET_FILE", "/etc/mas/admin-client/client_secret")
# Basic rate limiting (best-effort) to avoid accidental abuse.
# Count requests per client IP over a short window.
RATE_WINDOW_SEC = int(os.environ.get("RATE_WINDOW_SEC", "60"))
RATE_MAX = int(os.environ.get("RATE_MAX", "30"))
_rate = {} # ip -> [window_start, count]
ADJ = ["brisk", "calm", "eager", "gentle", "merry", "nifty", "rapid", "sunny", "witty", "zesty"]
NOUN = ["otter", "falcon", "comet", "ember", "grove", "harbor", "meadow", "raven", "river", "summit"]
def _json(method, url, *, token=None, body=None, timeout=20):
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
data = None
if body is not None:
data = json.dumps(body).encode()
req = request.Request(url, data=data, headers=headers, method=method)
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return resp.status, payload
except error.HTTPError as e:
raw = e.read()
try:
payload = json.loads(raw.decode()) if raw else {}
except Exception:
payload = {}
return e.code, payload
def _form(method, url, *, headers=None, fields=None, timeout=20):
hdrs = {"Content-Type": "application/x-www-form-urlencoded"}
if headers:
hdrs.update(headers)
data = parse.urlencode(fields or {}).encode()
req = request.Request(url, data=data, headers=hdrs, method=method)
try:
with request.urlopen(req, timeout=timeout) as resp:
raw = resp.read()
payload = json.loads(raw.decode()) if raw else {}
return resp.status, payload
except error.HTTPError as e:
raw = e.read()
try:
payload = json.loads(raw.decode()) if raw else {}
except Exception:
payload = {}
return e.code, payload
def _login(user, password):
status, payload = _json(
"POST",
f"{AUTH_BASE}/_matrix/client/v3/login",
body={
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": user},
"password": password,
},
timeout=20,
)
if status != 200 or "access_token" not in payload:
raise RuntimeError("login_failed")
return payload
_mas_admin_token = None
_mas_admin_token_at = 0.0
def _mas_admin_access_token(now):
global _mas_admin_token, _mas_admin_token_at
if _mas_admin_token and (now - _mas_admin_token_at) < 300:
return _mas_admin_token
with open(MAS_ADMIN_CLIENT_SECRET_FILE, encoding="utf-8") as fh:
client_secret = fh.read().strip()
creds = f"{MAS_ADMIN_CLIENT_ID}:{client_secret}".encode()
basic = base64.b64encode(creds).decode()
status, payload = _form(
"POST",
f"{AUTH_BASE}/oauth2/token",
headers={"Authorization": f"Basic {basic}"},
fields={"grant_type": "client_credentials"},
timeout=20,
)
if status != 200 or "access_token" not in payload:
raise RuntimeError("mas_admin_token_failed")
_mas_admin_token = payload["access_token"]
_mas_admin_token_at = now
return _mas_admin_token
def _mas_create_user(admin_token, username, password):
status, payload = _json(
"POST",
f"{MAS_ADMIN_BASE}/api/admin/v1/users",
token=admin_token,
body={"username": username, "password": password},
timeout=25,
)
if status in (200, 201):
return
if status == 409 or payload.get("errcode") == "M_ALREADY_EXISTS":
raise RuntimeError("username_taken")
raise RuntimeError("user_create_failed")
def _generate_localpart():
return "guest-" + secrets.token_hex(6)
def _generate_displayname():
return f"{random.choice(ADJ)}-{random.choice(NOUN)}"
def _set_displayname(access_token, user_id, displayname):
try:
_json(
"PUT",
f"{MATRIX_BASE}/_matrix/client/v3/profile/{parse.quote(user_id)}/displayname",
token=access_token,
body={"displayname": displayname},
timeout=15,
)
except Exception:
return
def _rate_check(ip, now):
win, cnt = _rate.get(ip, (now, 0))
if now - win > RATE_WINDOW_SEC:
_rate[ip] = (now, 1)
return True
if cnt >= RATE_MAX:
return False
_rate[ip] = (win, cnt + 1)
return True
class Handler(BaseHTTPRequestHandler):
server_version = "matrix-guest-register"
def _send_json(self, code, payload):
body = json.dumps(payload).encode()
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_OPTIONS(self): # noqa: N802
self.send_response(204)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type, Authorization, X-Requested-With")
self.end_headers()
def do_GET(self): # noqa: N802
if self.path in ("/healthz", "/"):
return self._send_json(200, {"ok": True})
return self._send_json(404, {"errcode": "M_NOT_FOUND", "error": "not_found"})
def do_POST(self): # noqa: N802
# We only implement guest registration (used by Element Web "Join as guest").
parsed = parse.urlparse(self.path)
if parsed.path not in ("/_matrix/client/v3/register", "/_matrix/client/r0/register"):
return self._send_json(404, {"errcode": "M_NOT_FOUND", "error": "not_found"})
qs = parse.parse_qs(parsed.query)
kind = (qs.get("kind") or ["user"])[0]
if kind != "guest":
return self._send_json(
403,
{
"errcode": "M_FORBIDDEN",
"error": "Registration is disabled; use https://bstein.dev/request-access for accounts.",
},
)
# Best-effort client IP from X-Forwarded-For (Traefik).
xfwd = self.headers.get("x-forwarded-for", "")
ip = (xfwd.split(",")[0].strip() if xfwd else "") or self.client_address[0]
now = __import__("time").time()
if not _rate_check(ip, now):
return self._send_json(429, {"errcode": "M_LIMIT_EXCEEDED", "error": "rate_limited"})
# Consume request body (Element may send fields; we ignore).
length = int(self.headers.get("content-length", "0") or "0")
_ = self.rfile.read(length) if length else b"{}"
# Create a short-lived "guest" account by provisioning a normal user with a random password.
# This keeps MAS/OIDC intact while restoring a no-signup guest UX.
try:
displayname = _generate_displayname()
password = base64.urlsafe_b64encode(secrets.token_bytes(24)).decode().rstrip("=")
admin_token = _mas_admin_access_token(now)
last = None
for _ in range(3):
localpart = _generate_localpart()
try:
_mas_create_user(admin_token, localpart, password)
break
except RuntimeError as e:
last = str(e)
if last != "username_taken":
raise
else:
raise RuntimeError(last or "user_create_failed")
login_payload = _login(localpart, password)
_set_displayname(login_payload.get("access_token"), login_payload.get("user_id"), displayname)
except Exception:
return self._send_json(502, {"errcode": "M_UNKNOWN", "error": "guest_provision_failed"})
resp = {
"user_id": login_payload.get("user_id") or f"@{localpart}:{SERVER_NAME}",
"access_token": login_payload.get("access_token"),
"device_id": login_payload.get("device_id"),
"home_server": SERVER_NAME,
}
# Do not expose refresh tokens for guests.
return self._send_json(200, resp)
def main():
port = int(os.environ.get("PORT", "8080"))
HTTPServer(("0.0.0.0", port), Handler).serve_forever()
if __name__ == "__main__":
main()