feat: add Ariadne provisioning service

This commit is contained in:
Brad Stein 2026-01-19 16:57:18 -03:00
commit ee532ac215
29 changed files with 3059 additions and 0 deletions

12
.gitignore vendored Normal file
View File

@ -0,0 +1,12 @@
__pycache__/
*.pyc
*.pyo
*.pyd
*.egg-info/
.venv/
.venv*/
.env
.dist/
build/
.pytest_cache/
.coverage

15
Dockerfile Normal file
View File

@ -0,0 +1,15 @@
FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
WORKDIR /app
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY ariadne /app/ariadne
EXPOSE 8080
CMD ["uvicorn", "ariadne.app:app", "--host", "0.0.0.0", "--port", "8080"]

162
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,162 @@
pipeline {
agent {
kubernetes {
label 'ariadne'
defaultContainer 'builder'
yaml """
apiVersion: v1
kind: Pod
metadata:
labels:
app: ariadne
spec:
nodeSelector:
kubernetes.io/arch: arm64
node-role.kubernetes.io/worker: "true"
containers:
- name: dind
image: docker:27-dind
securityContext:
privileged: true
env:
- name: DOCKER_TLS_CERTDIR
value: ""
args:
- --mtu=1400
- --host=unix:///var/run/docker.sock
- --host=tcp://0.0.0.0:2375
volumeMounts:
- name: dind-storage
mountPath: /var/lib/docker
- name: builder
image: docker:27
command: ["cat"]
tty: true
env:
- name: DOCKER_HOST
value: tcp://localhost:2375
- name: DOCKER_TLS_CERTDIR
value: ""
volumeMounts:
- name: workspace-volume
mountPath: /home/jenkins/agent
- name: docker-config-writable
mountPath: /root/.docker
- name: harbor-config
mountPath: /docker-config
volumes:
- name: workspace-volume
emptyDir: {}
- name: docker-config-writable
emptyDir: {}
- name: dind-storage
emptyDir: {}
- name: harbor-config
secret:
secretName: harbor-bstein-robot
items:
- key: .dockerconfigjson
path: config.json
"""
}
}
environment {
REGISTRY = 'registry.bstein.dev/bstein'
IMAGE = "${REGISTRY}/ariadne"
VERSION_TAG = 'dev'
SEMVER = 'dev'
}
options {
disableConcurrentBuilds()
}
triggers {
pollSCM('H/2 * * * *')
}
stages {
stage('Checkout') {
steps {
checkout scm
}
}
stage('Prep toolchain') {
steps {
container('builder') {
sh '''
set -euo pipefail
apk add --no-cache bash git jq
mkdir -p /root/.docker
cp /docker-config/config.json /root/.docker/config.json
'''
}
}
}
stage('Compute version') {
steps {
container('builder') {
script {
sh '''
set -euo pipefail
if git describe --tags --exact-match >/dev/null 2>&1; then
SEMVER="$(git describe --tags --exact-match)"
else
SEMVER="0.1.0-${BUILD_NUMBER}"
fi
if ! echo "$SEMVER" | grep -Eq '^v?[0-9]+\.[0-9]+\.[0-9]+([-.][0-9A-Za-z]+)?$'; then
SEMVER="0.1.0-${BUILD_NUMBER}"
fi
echo "SEMVER=${SEMVER}" > build.env
'''
def props = readProperties file: 'build.env'
env.SEMVER = props['SEMVER'] ?: "0.1.0-${env.BUILD_NUMBER}"
env.VERSION_TAG = env.SEMVER
}
}
}
}
stage('Buildx setup') {
steps {
container('builder') {
sh '''
set -euo pipefail
for i in $(seq 1 10); do
if docker info >/dev/null 2>&1; then
break
fi
sleep 2
done
docker buildx create --name bstein-builder --driver docker-container --bootstrap --use || docker buildx use bstein-builder
'''
}
}
}
stage('Build & push image') {
steps {
container('builder') {
sh '''
set -euo pipefail
VERSION_TAG="$(cut -d= -f2 build.env)"
docker buildx build \
--platform linux/arm64 \
--tag "${IMAGE}:${VERSION_TAG}" \
--tag "${IMAGE}:latest" \
--push \
.
'''
}
}
}
}
post {
always {
script {
def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:]
echo "Build complete for ${props['SEMVER'] ?: env.VERSION_TAG}"
}
}
}
}

1
ariadne/__init__.py Normal file
View File

@ -0,0 +1 @@
from __future__ import annotations

361
ariadne/app.py Normal file
View File

@ -0,0 +1,361 @@
from __future__ import annotations
from datetime import datetime, timezone
import threading
from typing import Any
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse, Response
from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
from .auth.keycloak import AuthContext, authenticator
from .db.database import Database
from .db.storage import Storage
from .manager.provisioning import ProvisioningManager
from .scheduler.cron import CronScheduler
from .services.firefly import firefly
from .services.keycloak_admin import keycloak_admin
from .services.mailu import mailu
from .services.nextcloud import nextcloud
from .services.vaultwarden_sync import run_vaultwarden_sync
from .services.wger import wger
from .settings import settings
from .utils.errors import safe_error_detail
from .utils.http import extract_bearer_token
from .utils.passwords import random_password
db = Database(settings.portal_database_url)
storage = Storage(db)
provisioning = ProvisioningManager(db, storage)
scheduler = CronScheduler(storage, settings.schedule_tick_sec)
app = FastAPI(title=settings.app_name)
def _require_auth(request: Request) -> AuthContext:
token = extract_bearer_token(request)
if not token:
raise HTTPException(status_code=401, detail="missing bearer token")
try:
return authenticator.authenticate(token)
except Exception:
raise HTTPException(status_code=401, detail="invalid token")
def _require_admin(ctx: AuthContext) -> None:
if ctx.username and ctx.username in settings.portal_admin_users:
return
if settings.portal_admin_groups and set(ctx.groups).intersection(settings.portal_admin_groups):
return
raise HTTPException(status_code=403, detail="forbidden")
def _require_account_access(ctx: AuthContext) -> None:
if not settings.account_allowed_groups:
return
if set(ctx.groups).intersection(settings.account_allowed_groups):
return
raise HTTPException(status_code=403, detail="forbidden")
@app.on_event("startup")
def _startup() -> None:
db.ensure_schema()
provisioning.start()
scheduler.add_task("schedule.mailu_sync", settings.mailu_sync_cron, lambda: mailu.sync("ariadne_schedule"))
scheduler.add_task(
"schedule.nextcloud_sync",
settings.nextcloud_sync_cron,
lambda: nextcloud.sync_mail(wait=False),
)
scheduler.add_task("schedule.vaultwarden_sync", settings.vaultwarden_sync_cron, run_vaultwarden_sync)
scheduler.add_task("schedule.wger_admin", settings.wger_admin_cron, lambda: wger.ensure_admin(wait=False))
scheduler.start()
@app.on_event("shutdown")
def _shutdown() -> None:
scheduler.stop()
provisioning.stop()
db.close()
@app.get("/health")
def health() -> dict[str, Any]:
return {"ok": True}
@app.get(settings.metrics_path)
def metrics() -> Response:
payload = generate_latest()
return Response(payload, media_type=CONTENT_TYPE_LATEST)
@app.get("/api/admin/access/requests")
def list_access_requests(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_admin(ctx)
try:
rows = storage.list_pending_requests()
except Exception:
raise HTTPException(status_code=502, detail="failed to load requests")
output: list[dict[str, Any]] = []
for row in rows:
created_at = row.get("created_at")
output.append(
{
"id": row.get("request_code"),
"username": row.get("username"),
"email": row.get("contact_email") or "",
"request_code": row.get("request_code"),
"created_at": created_at.isoformat() if isinstance(created_at, datetime) else "",
"note": row.get("note") or "",
}
)
return JSONResponse({"requests": output})
@app.post("/api/admin/access/requests/{username}/approve")
async def approve_access_request(
username: str,
request: Request,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
_require_admin(ctx)
try:
payload = await request.json()
except Exception:
payload = {}
flags_raw = payload.get("flags") if isinstance(payload, dict) else None
flags = [f for f in flags_raw if isinstance(f, str)] if isinstance(flags_raw, list) else []
flags = [f for f in flags if f in settings.allowed_flag_groups]
note = payload.get("note") if isinstance(payload, dict) else None
note = str(note).strip() if isinstance(note, str) else None
decided_by = ctx.username or ""
try:
row = db.fetchone(
"""
UPDATE access_requests
SET status = 'approved',
decided_at = NOW(),
decided_by = %s,
approval_flags = %s,
approval_note = %s
WHERE username = %s
AND status = 'pending'
AND email_verified_at IS NOT NULL
RETURNING request_code
""",
(decided_by or None, flags or None, note, username),
)
except Exception:
raise HTTPException(status_code=502, detail="failed to approve request")
if not row:
return JSONResponse({"ok": True, "request_code": ""})
request_code = row.get("request_code") or ""
if request_code:
threading.Thread(
target=provisioning.provision_access_request,
args=(request_code,),
daemon=True,
).start()
return JSONResponse({"ok": True, "request_code": request_code})
@app.post("/api/admin/access/requests/{username}/deny")
async def deny_access_request(
username: str,
request: Request,
ctx: AuthContext = Depends(_require_auth),
) -> JSONResponse:
_require_admin(ctx)
try:
payload = await request.json()
except Exception:
payload = {}
note = payload.get("note") if isinstance(payload, dict) else None
note = str(note).strip() if isinstance(note, str) else None
decided_by = ctx.username or ""
try:
row = db.fetchone(
"""
UPDATE access_requests
SET status = 'denied',
decided_at = NOW(),
decided_by = %s,
denial_note = %s
WHERE username = %s AND status = 'pending'
RETURNING request_code
""",
(decided_by or None, note, username),
)
except Exception:
raise HTTPException(status_code=502, detail="failed to deny request")
if not row:
return JSONResponse({"ok": True, "request_code": ""})
return JSONResponse({"ok": True, "request_code": row.get("request_code")})
@app.post("/api/account/mailu/rotate")
def rotate_mailu_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
password = random_password()
try:
keycloak_admin.set_user_attribute(username, "mailu_app_password", password)
except Exception:
raise HTTPException(status_code=502, detail="failed to update mail password")
sync_enabled = bool(settings.mailu_sync_url)
sync_ok = False
sync_error = ""
if sync_enabled:
try:
mailu.sync("ariadne_mailu_rotate")
sync_ok = True
except Exception as exc:
sync_error = safe_error_detail(exc, "sync request failed")
nextcloud_sync: dict[str, Any] = {"status": "skipped"}
try:
nextcloud_sync = nextcloud.sync_mail(username, wait=True)
except Exception as exc:
nextcloud_sync = {"status": "error", "detail": safe_error_detail(exc, "failed to sync nextcloud")}
return JSONResponse(
{
"password": password,
"sync_enabled": sync_enabled,
"sync_ok": sync_ok,
"sync_error": sync_error,
"nextcloud_sync": nextcloud_sync,
}
)
@app.post("/api/account/wger/reset")
def reset_wger_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
mailu_email = f"{username}@{settings.mailu_domain}"
try:
user = keycloak_admin.find_user(username) or {}
attrs = user.get("attributes") if isinstance(user, dict) else None
if isinstance(attrs, dict):
raw_mailu = attrs.get("mailu_email")
if isinstance(raw_mailu, list) and raw_mailu:
mailu_email = str(raw_mailu[0])
elif isinstance(raw_mailu, str) and raw_mailu:
mailu_email = raw_mailu
except Exception:
pass
password = random_password()
try:
result = wger.sync_user(username, mailu_email, password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"wger sync {status_val}")
except Exception as exc:
raise HTTPException(status_code=502, detail=safe_error_detail(exc, "wger sync failed"))
try:
keycloak_admin.set_user_attribute(username, "wger_password", password)
keycloak_admin.set_user_attribute(
username,
"wger_password_updated_at",
datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
)
except Exception:
raise HTTPException(status_code=502, detail="failed to store wger password")
return JSONResponse({"status": "ok", "password": password})
@app.post("/api/account/firefly/reset")
def reset_firefly_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
mailu_email = f"{username}@{settings.mailu_domain}"
try:
user = keycloak_admin.find_user(username) or {}
attrs = user.get("attributes") if isinstance(user, dict) else None
if isinstance(attrs, dict):
raw_mailu = attrs.get("mailu_email")
if isinstance(raw_mailu, list) and raw_mailu:
mailu_email = str(raw_mailu[0])
elif isinstance(raw_mailu, str) and raw_mailu:
mailu_email = raw_mailu
except Exception:
pass
password = random_password(24)
try:
result = firefly.sync_user(mailu_email, password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"firefly sync {status_val}")
except Exception as exc:
raise HTTPException(status_code=502, detail=safe_error_detail(exc, "firefly sync failed"))
try:
keycloak_admin.set_user_attribute(username, "firefly_password", password)
keycloak_admin.set_user_attribute(
username,
"firefly_password_updated_at",
datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
)
except Exception:
raise HTTPException(status_code=502, detail="failed to store firefly password")
return JSONResponse({"status": "ok", "password": password})
@app.post("/api/account/nextcloud/mail/sync")
async def nextcloud_mail_sync(request: Request, ctx: AuthContext = Depends(_require_auth)) -> JSONResponse:
_require_account_access(ctx)
if not keycloak_admin.ready():
raise HTTPException(status_code=503, detail="server not configured")
username = ctx.username or ""
if not username:
raise HTTPException(status_code=400, detail="missing username")
try:
payload = await request.json()
except Exception:
payload = {}
wait = bool(payload.get("wait", True)) if isinstance(payload, dict) else True
try:
result = nextcloud.sync_mail(username, wait=wait)
return JSONResponse(result)
except Exception as exc:
raise HTTPException(status_code=502, detail=safe_error_detail(exc, "failed to sync nextcloud mail"))

111
ariadne/auth/keycloak.py Normal file
View File

@ -0,0 +1,111 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
import time
import httpx
import jwt
from ..settings import settings
@dataclass(frozen=True)
class AuthContext:
username: str
email: str
groups: list[str]
claims: dict[str, Any]
class KeycloakOIDC:
def __init__(self, jwks_url: str, issuer: str, client_id: str) -> None:
self._jwks_url = jwks_url
self._issuer = issuer
self._client_id = client_id
self._jwks: dict[str, Any] | None = None
self._jwks_fetched_at: float = 0.0
self._jwks_ttl_sec = 300.0
def verify(self, token: str) -> dict[str, Any]:
if not token:
raise ValueError("missing token")
jwks = self._get_jwks()
header = jwt.get_unverified_header(token)
kid = header.get("kid")
if not isinstance(kid, str):
raise ValueError("token missing kid")
key = None
for candidate in jwks.get("keys", []) if isinstance(jwks, dict) else []:
if isinstance(candidate, dict) and candidate.get("kid") == kid:
key = candidate
break
if not key:
self._jwks = None
jwks = self._get_jwks(force=True)
for candidate in jwks.get("keys", []) if isinstance(jwks, dict) else []:
if isinstance(candidate, dict) and candidate.get("kid") == kid:
key = candidate
break
if not key:
raise ValueError("token kid not found")
claims = jwt.decode(
token,
key=jwt.algorithms.RSAAlgorithm.from_jwk(key),
algorithms=["RS256"],
options={"verify_aud": False},
issuer=self._issuer,
)
azp = claims.get("azp")
aud = claims.get("aud")
aud_list: list[str] = []
if isinstance(aud, str):
aud_list = [aud]
elif isinstance(aud, list):
aud_list = [a for a in aud if isinstance(a, str)]
if azp != self._client_id and self._client_id not in aud_list:
raise ValueError("token not issued for expected client")
return claims
def _get_jwks(self, force: bool = False) -> dict[str, Any]:
now = time.time()
if not force and self._jwks and now - self._jwks_fetched_at < self._jwks_ttl_sec:
return self._jwks
with httpx.Client(timeout=5.0) as client:
resp = client.get(self._jwks_url)
resp.raise_for_status()
payload = resp.json()
if not isinstance(payload, dict):
raise ValueError("jwks payload invalid")
self._jwks = payload
self._jwks_fetched_at = now
return payload
class Authenticator:
def __init__(self) -> None:
self._oidc = KeycloakOIDC(settings.keycloak_jwks_url, settings.keycloak_issuer, settings.keycloak_client_id)
@staticmethod
def _normalize_groups(groups: Any) -> list[str]:
if not isinstance(groups, list):
return []
cleaned: list[str] = []
for name in groups:
if not isinstance(name, str):
continue
cleaned.append(name.lstrip("/"))
return [name for name in cleaned if name]
def authenticate(self, token: str) -> AuthContext:
claims = self._oidc.verify(token)
username = claims.get("preferred_username") or ""
email = claims.get("email") or ""
groups = self._normalize_groups(claims.get("groups"))
return AuthContext(username=username, email=email, groups=groups, claims=claims)
authenticator = Authenticator()

46
ariadne/db/database.py Normal file
View File

@ -0,0 +1,46 @@
from __future__ import annotations
from contextlib import contextmanager
from typing import Any, Iterable
import psycopg
from psycopg_pool import ConnectionPool
from .schema import ARIADNE_ACCESS_REQUEST_ALTER, ARIADNE_TABLES_SQL
class Database:
def __init__(self, dsn: str, pool_size: int = 5) -> None:
if not dsn:
raise RuntimeError("PORTAL_DATABASE_URL is required")
self._pool = ConnectionPool(conninfo=dsn, max_size=pool_size)
@contextmanager
def connection(self):
with self._pool.connection() as conn:
conn.row_factory = psycopg.rows.dict_row
yield conn
def ensure_schema(self) -> None:
with self.connection() as conn:
for stmt in ARIADNE_TABLES_SQL:
conn.execute(stmt)
for stmt in ARIADNE_ACCESS_REQUEST_ALTER:
conn.execute(stmt)
def fetchone(self, query: str, params: Iterable[Any] | None = None) -> dict[str, Any] | None:
with self.connection() as conn:
row = conn.execute(query, params or ()).fetchone()
return dict(row) if row else None
def fetchall(self, query: str, params: Iterable[Any] | None = None) -> list[dict[str, Any]]:
with self.connection() as conn:
rows = conn.execute(query, params or ()).fetchall()
return [dict(row) for row in rows]
def execute(self, query: str, params: Iterable[Any] | None = None) -> None:
with self.connection() as conn:
conn.execute(query, params or ())
def close(self) -> None:
self._pool.close()

48
ariadne/db/schema.py Normal file
View File

@ -0,0 +1,48 @@
from __future__ import annotations
ARIADNE_TABLES_SQL = [
"""
CREATE TABLE IF NOT EXISTS ariadne_task_runs (
id BIGSERIAL PRIMARY KEY,
request_code TEXT,
task TEXT NOT NULL,
status TEXT NOT NULL,
detail TEXT,
started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
finished_at TIMESTAMPTZ,
duration_ms INTEGER
)
""",
"""
CREATE INDEX IF NOT EXISTS ariadne_task_runs_request_code_idx
ON ariadne_task_runs (request_code)
""",
"""
CREATE TABLE IF NOT EXISTS ariadne_schedule_state (
task_name TEXT PRIMARY KEY,
cron_expr TEXT NOT NULL,
last_started_at TIMESTAMPTZ,
last_finished_at TIMESTAMPTZ,
last_status TEXT,
last_error TEXT,
last_duration_ms INTEGER,
next_run_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""",
"""
CREATE TABLE IF NOT EXISTS ariadne_events (
id BIGSERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
detail TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
""",
]
ARIADNE_ACCESS_REQUEST_ALTER = [
"ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS welcome_email_sent_at TIMESTAMPTZ",
"ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS approval_flags TEXT[]",
"ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS approval_note TEXT",
"ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS denial_note TEXT",
]

273
ariadne/db/storage.py Normal file
View File

@ -0,0 +1,273 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Iterable
from .database import Database
REQUIRED_TASKS = (
"keycloak_user",
"keycloak_password",
"keycloak_groups",
"mailu_app_password",
"mailu_sync",
"nextcloud_mail_sync",
"wger_account",
"firefly_account",
"vaultwarden_invite",
)
@dataclass(frozen=True)
class AccessRequest:
request_code: str
username: str
contact_email: str
status: str
email_verified_at: datetime | None
initial_password: str | None
initial_password_revealed_at: datetime | None
provision_attempted_at: datetime | None
approval_flags: list[str]
approval_note: str | None
denial_note: str | None
class Storage:
def __init__(self, db: Database) -> None:
self._db = db
def ensure_task_rows(self, request_code: str, tasks: Iterable[str]) -> None:
tasks_list = list(tasks)
if not tasks_list:
return
self._db.execute(
"""
INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at)
SELECT %s, task, 'pending', NULL, NOW()
FROM UNNEST(%s::text[]) AS task
ON CONFLICT (request_code, task) DO NOTHING
""",
(request_code, tasks_list),
)
def update_task(self, request_code: str, task: str, status: str, detail: str | None) -> None:
self._db.execute(
"""
INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (request_code, task)
DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW()
""",
(request_code, task, status, detail),
)
def task_statuses(self, request_code: str) -> dict[str, str]:
rows = self._db.fetchall(
"SELECT task, status FROM access_request_tasks WHERE request_code = %s",
(request_code,),
)
output: dict[str, str] = {}
for row in rows:
task = row.get("task")
status = row.get("status")
if isinstance(task, str) and isinstance(status, str):
output[task] = status
return output
def tasks_complete(self, request_code: str, tasks: Iterable[str]) -> bool:
statuses = self.task_statuses(request_code)
for task in tasks:
if statuses.get(task) != "ok":
return False
return True
def fetch_access_request(self, request_code: str) -> AccessRequest | None:
row = self._db.fetchone(
"""
SELECT request_code, username, contact_email, status, email_verified_at,
initial_password, initial_password_revealed_at, provision_attempted_at,
approval_flags, approval_note, denial_note
FROM access_requests
WHERE request_code = %s
""",
(request_code,),
)
if not row:
return None
return self._row_to_request(row)
def find_access_request_by_username(self, username: str) -> AccessRequest | None:
row = self._db.fetchone(
"""
SELECT request_code, username, contact_email, status, email_verified_at,
initial_password, initial_password_revealed_at, provision_attempted_at,
approval_flags, approval_note, denial_note
FROM access_requests
WHERE username = %s
ORDER BY created_at DESC
LIMIT 1
""",
(username,),
)
if not row:
return None
return self._row_to_request(row)
def list_pending_requests(self) -> list[dict[str, Any]]:
return self._db.fetchall(
"""
SELECT request_code, username, contact_email, note, status, created_at
FROM access_requests
WHERE status = 'pending'
ORDER BY created_at ASC
LIMIT 200
"""
)
def list_provision_candidates(self) -> list[AccessRequest]:
rows = self._db.fetchall(
"""
SELECT request_code, username, contact_email, status, email_verified_at,
initial_password, initial_password_revealed_at, provision_attempted_at,
approval_flags, approval_note, denial_note
FROM access_requests
WHERE status IN ('approved', 'accounts_building')
ORDER BY created_at ASC
LIMIT 200
"""
)
return [self._row_to_request(row) for row in rows]
def update_status(self, request_code: str, status: str) -> None:
self._db.execute(
"UPDATE access_requests SET status = %s WHERE request_code = %s",
(status, request_code),
)
def mark_provision_attempted(self, request_code: str) -> None:
self._db.execute(
"UPDATE access_requests SET provision_attempted_at = NOW() WHERE request_code = %s",
(request_code,),
)
def set_initial_password(self, request_code: str, password: str) -> None:
self._db.execute(
"""
UPDATE access_requests
SET initial_password = %s
WHERE request_code = %s AND initial_password IS NULL
""",
(password, request_code),
)
def mark_welcome_sent(self, request_code: str) -> None:
self._db.execute(
"""
UPDATE access_requests
SET welcome_email_sent_at = NOW()
WHERE request_code = %s AND welcome_email_sent_at IS NULL
""",
(request_code,),
)
def update_approval(self, request_code: str, status: str, decided_by: str, flags: list[str], note: str | None) -> None:
self._db.execute(
"""
UPDATE access_requests
SET status = %s,
decided_at = NOW(),
decided_by = %s,
approval_flags = %s,
approval_note = %s,
denial_note = CASE WHEN %s = 'denied' THEN %s ELSE denial_note END
WHERE request_code = %s
""",
(status, decided_by or None, flags or None, note, status, note, request_code),
)
def record_task_run(
self,
request_code: str | None,
task: str,
status: str,
detail: str | None,
started_at: datetime,
finished_at: datetime | None,
duration_ms: int | None,
) -> None:
self._db.execute(
"""
INSERT INTO ariadne_task_runs
(request_code, task, status, detail, started_at, finished_at, duration_ms)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""",
(request_code, task, status, detail, started_at, finished_at, duration_ms),
)
def update_schedule_state(
self,
task_name: str,
cron_expr: str,
last_started_at: datetime | None,
last_finished_at: datetime | None,
last_status: str | None,
last_error: str | None,
last_duration_ms: int | None,
next_run_at: datetime | None,
) -> None:
self._db.execute(
"""
INSERT INTO ariadne_schedule_state
(task_name, cron_expr, last_started_at, last_finished_at, last_status,
last_error, last_duration_ms, next_run_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (task_name) DO UPDATE
SET cron_expr = EXCLUDED.cron_expr,
last_started_at = EXCLUDED.last_started_at,
last_finished_at = EXCLUDED.last_finished_at,
last_status = EXCLUDED.last_status,
last_error = EXCLUDED.last_error,
last_duration_ms = EXCLUDED.last_duration_ms,
next_run_at = EXCLUDED.next_run_at,
updated_at = NOW()
""",
(
task_name,
cron_expr,
last_started_at,
last_finished_at,
last_status,
last_error,
last_duration_ms,
next_run_at,
),
)
def record_event(self, event_type: str, detail: str | None) -> None:
self._db.execute(
"INSERT INTO ariadne_events (event_type, detail) VALUES (%s, %s)",
(event_type, detail),
)
@staticmethod
def _row_to_request(row: dict[str, Any]) -> AccessRequest:
flags = row.get("approval_flags")
flags_list: list[str] = []
if isinstance(flags, list):
flags_list = [str(item) for item in flags if item]
return AccessRequest(
request_code=str(row.get("request_code") or ""),
username=str(row.get("username") or ""),
contact_email=str(row.get("contact_email") or ""),
status=str(row.get("status") or ""),
email_verified_at=row.get("email_verified_at"),
initial_password=row.get("initial_password"),
initial_password_revealed_at=row.get("initial_password_revealed_at"),
provision_attempted_at=row.get("provision_attempted_at"),
approval_flags=flags_list,
approval_note=row.get("approval_note"),
denial_note=row.get("denial_note"),
)

63
ariadne/k8s/client.py Normal file
View File

@ -0,0 +1,63 @@
from __future__ import annotations
import base64
from pathlib import Path
from typing import Any
import httpx
from ..settings import settings
_K8S_BASE_URL = "https://kubernetes.default.svc"
_SA_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount")
def _read_service_account() -> tuple[str, str]:
token_path = _SA_PATH / "token"
ca_path = _SA_PATH / "ca.crt"
if not token_path.exists() or not ca_path.exists():
raise RuntimeError("kubernetes service account token missing")
token = token_path.read_text().strip()
if not token:
raise RuntimeError("kubernetes service account token empty")
return token, str(ca_path)
def _k8s_request(method: str, path: str, payload: dict[str, Any] | None = None) -> Any:
token, ca_path = _read_service_account()
url = f"{_K8S_BASE_URL}{path}"
headers = {"Authorization": f"Bearer {token}"}
with httpx.Client(verify=ca_path, timeout=settings.k8s_api_timeout_sec, headers=headers) as client:
resp = client.request(method, url, json=payload)
resp.raise_for_status()
return resp.json()
def get_json(path: str) -> dict[str, Any]:
payload = _k8s_request("GET", path)
if not isinstance(payload, dict):
raise RuntimeError("unexpected kubernetes response")
return payload
def post_json(path: str, payload: dict[str, Any]) -> dict[str, Any]:
data = _k8s_request("POST", path, payload)
if not isinstance(data, dict):
raise RuntimeError("unexpected kubernetes response")
return data
def get_secret_value(namespace: str, name: str, key: str) -> str:
data = get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}")
blob = data.get("data") if isinstance(data.get("data"), dict) else {}
raw = blob.get(key)
if not isinstance(raw, str) or not raw:
raise RuntimeError("secret key missing")
try:
decoded = base64.b64decode(raw).decode("utf-8").strip()
except Exception as exc:
raise RuntimeError("failed to decode secret") from exc
if not decoded:
raise RuntimeError("secret value empty")
return decoded

121
ariadne/k8s/jobs.py Normal file
View File

@ -0,0 +1,121 @@
from __future__ import annotations
import re
import time
from typing import Any
from .client import get_json, post_json
class JobSpawner:
def __init__(self, namespace: str, cronjob_name: str) -> None:
self._namespace = namespace
self._cronjob_name = cronjob_name
@staticmethod
def _safe_name_fragment(value: str, max_len: int = 24) -> str:
cleaned = re.sub(r"[^a-z0-9-]+", "-", (value or "").lower()).strip("-")
if not cleaned:
cleaned = "job"
return cleaned[:max_len].rstrip("-") or "job"
def _job_from_cronjob(
self,
cronjob: dict[str, Any],
label_suffix: str,
env_overrides: list[dict[str, str]] | None = None,
job_ttl_seconds: int | None = None,
) -> dict[str, Any]:
spec = cronjob.get("spec") if isinstance(cronjob.get("spec"), dict) else {}
jt = spec.get("jobTemplate") if isinstance(spec.get("jobTemplate"), dict) else {}
job_spec = jt.get("spec") if isinstance(jt.get("spec"), dict) else {}
now = int(time.time())
safe_label = self._safe_name_fragment(label_suffix)
job_name = f"{self._cronjob_name}-{safe_label}-{now}"
job: dict[str, Any] = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": job_name,
"namespace": self._namespace,
"labels": {
"app": self._cronjob_name,
"atlas.bstein.dev/trigger": "ariadne",
"atlas.bstein.dev/label": safe_label,
},
},
"spec": job_spec,
}
if isinstance(job_ttl_seconds, int) and job_ttl_seconds > 0:
job.setdefault("spec", {})
job["spec"]["ttlSecondsAfterFinished"] = job_ttl_seconds
tpl = job.get("spec", {}).get("template", {})
pod_spec = tpl.get("spec") if isinstance(tpl.get("spec"), dict) else {}
containers = pod_spec.get("containers") if isinstance(pod_spec.get("containers"), list) else []
if containers and isinstance(containers[0], dict) and env_overrides:
env = containers[0].get("env")
if not isinstance(env, list):
env = []
env = [e for e in env if not (isinstance(e, dict) and e.get("name") in {item["name"] for item in env_overrides})]
env.extend(env_overrides)
containers[0]["env"] = env
pod_spec["containers"] = containers
tpl["spec"] = pod_spec
job["spec"]["template"] = tpl
return job
def trigger(
self,
label_suffix: str,
env_overrides: list[dict[str, str]] | None = None,
job_ttl_seconds: int | None = None,
) -> dict[str, Any]:
cronjob = get_json(f"/apis/batch/v1/namespaces/{self._namespace}/cronjobs/{self._cronjob_name}")
job_payload = self._job_from_cronjob(cronjob, label_suffix, env_overrides, job_ttl_seconds)
created = post_json(f"/apis/batch/v1/namespaces/{self._namespace}/jobs", job_payload)
job_name = (
created.get("metadata", {}).get("name")
if isinstance(created.get("metadata"), dict)
else job_payload.get("metadata", {}).get("name")
)
if not isinstance(job_name, str) or not job_name:
raise RuntimeError("job name missing")
return {"job": job_name, "status": "queued"}
def wait_for_completion(self, job_name: str, timeout_sec: float) -> dict[str, Any]:
deadline = time.time() + timeout_sec
while time.time() < deadline:
job = get_json(f"/apis/batch/v1/namespaces/{self._namespace}/jobs/{job_name}")
status = job.get("status") if isinstance(job.get("status"), dict) else {}
if int(status.get("succeeded") or 0) > 0:
return {"job": job_name, "status": "ok"}
if int(status.get("failed") or 0) > 0:
return {"job": job_name, "status": "error"}
conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else []
for cond in conditions:
if not isinstance(cond, dict):
continue
if cond.get("type") == "Complete" and cond.get("status") == "True":
return {"job": job_name, "status": "ok"}
if cond.get("type") == "Failed" and cond.get("status") == "True":
return {"job": job_name, "status": "error"}
time.sleep(2)
return {"job": job_name, "status": "running"}
def trigger_and_wait(
self,
label_suffix: str,
env_overrides: list[dict[str, str]] | None,
timeout_sec: float,
job_ttl_seconds: int | None = None,
) -> dict[str, Any]:
created = self.trigger(label_suffix, env_overrides, job_ttl_seconds)
job_name = created.get("job")
if not isinstance(job_name, str) or not job_name:
raise RuntimeError("job name missing")
return self.wait_for_completion(job_name, timeout_sec)

View File

@ -0,0 +1,520 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import hashlib
import threading
import time
from typing import Any
from ..db.database import Database
from ..db.storage import REQUIRED_TASKS, Storage
from ..metrics.metrics import record_task_run, set_access_request_counts
from ..services.firefly import firefly
from ..services.keycloak_admin import keycloak_admin
from ..services.mailer import MailerError, mailer
from ..services.mailu import mailu
from ..services.nextcloud import nextcloud
from ..services.vaultwarden import vaultwarden
from ..services.wger import wger
from ..settings import settings
from ..utils.errors import safe_error_detail
from ..utils.passwords import random_password
MAILU_EMAIL_ATTR = "mailu_email"
MAILU_APP_PASSWORD_ATTR = "mailu_app_password"
MAILU_ENABLED_ATTR = "mailu_enabled"
WGER_PASSWORD_ATTR = "wger_password"
WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at"
FIREFLY_PASSWORD_ATTR = "firefly_password"
FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at"
@dataclass(frozen=True)
class ProvisionOutcome:
ok: bool
status: str
def _advisory_lock_id(request_code: str) -> int:
digest = hashlib.sha256(request_code.encode("utf-8")).digest()
return int.from_bytes(digest[:8], "big", signed=True)
def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict):
return ""
raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""
class ProvisioningManager:
def __init__(self, db: Database, storage: Storage) -> None:
self._db = db
self._storage = storage
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._run_loop, name="ariadne-provision", daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
def _run_loop(self) -> None:
while not self._stop_event.is_set():
try:
self._sync_status_metrics()
except Exception:
pass
if not keycloak_admin.ready():
time.sleep(settings.provision_poll_interval_sec)
continue
candidates = self._storage.list_provision_candidates()
for request in candidates:
self.provision_access_request(request.request_code)
time.sleep(settings.provision_poll_interval_sec)
def _sync_status_metrics(self) -> None:
counts = self._db.fetchall(
"SELECT status, COUNT(*) AS count FROM access_requests GROUP BY status"
)
payload: dict[str, int] = {}
for row in counts:
status = row.get("status")
count = row.get("count")
if isinstance(status, str) and isinstance(count, int):
payload[status] = count
set_access_request_counts(payload)
def provision_access_request(self, request_code: str) -> ProvisionOutcome:
if not request_code:
return ProvisionOutcome(ok=False, status="unknown")
if not keycloak_admin.ready():
return ProvisionOutcome(ok=False, status="accounts_building")
required_tasks = list(REQUIRED_TASKS)
with self._db.connection() as conn:
lock_id = _advisory_lock_id(request_code)
locked_row = conn.execute("SELECT pg_try_advisory_lock(%s) AS locked", (lock_id,)).fetchone()
if not locked_row or not locked_row.get("locked"):
return ProvisionOutcome(ok=False, status="accounts_building")
try:
row = conn.execute(
"""
SELECT username,
contact_email,
email_verified_at,
status,
initial_password,
initial_password_revealed_at,
provision_attempted_at,
approval_flags
FROM access_requests
WHERE request_code = %s
""",
(request_code,),
).fetchone()
if not row:
return ProvisionOutcome(ok=False, status="unknown")
username = str(row.get("username") or "")
contact_email = str(row.get("contact_email") or "")
email_verified_at = row.get("email_verified_at")
status = str(row.get("status") or "")
initial_password = row.get("initial_password")
revealed_at = row.get("initial_password_revealed_at")
attempted_at = row.get("provision_attempted_at")
approval_flags = row.get("approval_flags") if isinstance(row.get("approval_flags"), list) else []
if status == "approved":
conn.execute(
"""
UPDATE access_requests
SET status = 'accounts_building'
WHERE request_code = %s AND status = 'approved'
""",
(request_code,),
)
status = "accounts_building"
if status not in {"accounts_building", "awaiting_onboarding", "ready"}:
return ProvisionOutcome(ok=False, status=status or "unknown")
self._ensure_task_rows(conn, request_code, required_tasks)
if status == "accounts_building":
now = datetime.now(timezone.utc)
if isinstance(attempted_at, datetime):
if attempted_at.tzinfo is None:
attempted_at = attempted_at.replace(tzinfo=timezone.utc)
age_sec = (now - attempted_at).total_seconds()
if age_sec < settings.provision_retry_cooldown_sec:
return ProvisionOutcome(ok=False, status="accounts_building")
conn.execute(
"UPDATE access_requests SET provision_attempted_at = NOW() WHERE request_code = %s",
(request_code,),
)
user_id = ""
mailu_email = f"{username}@{settings.mailu_domain}"
# Task: ensure Keycloak user exists
start = datetime.now(timezone.utc)
try:
user = keycloak_admin.find_user(username)
if not user:
if not isinstance(email_verified_at, datetime):
raise RuntimeError("missing verified email address")
email = contact_email.strip()
if not email:
raise RuntimeError("missing verified email address")
existing_email_user = keycloak_admin.find_user_by_email(email)
if existing_email_user and (existing_email_user.get("username") or "") != username:
raise RuntimeError("email is already associated with an existing Atlas account")
payload = {
"username": username,
"enabled": True,
"email": email,
"emailVerified": True,
"requiredActions": [],
"attributes": {
MAILU_EMAIL_ATTR: [mailu_email],
MAILU_ENABLED_ATTR: ["true"],
},
}
created_id = keycloak_admin.create_user(payload)
user = keycloak_admin.get_user(created_id)
user_id = str((user or {}).get("id") or "")
if not user_id:
raise RuntimeError("user id missing")
try:
full = keycloak_admin.get_user(user_id)
attrs = full.get("attributes") or {}
actions = full.get("requiredActions")
if isinstance(actions, list) and "CONFIGURE_TOTP" in actions:
new_actions = [a for a in actions if a != "CONFIGURE_TOTP"]
keycloak_admin.update_user(user_id, {"requiredActions": new_actions})
if isinstance(attrs, dict):
existing = _extract_attr(attrs, MAILU_EMAIL_ATTR)
if existing:
mailu_email = existing
else:
mailu_email = f"{username}@{settings.mailu_domain}"
keycloak_admin.set_user_attribute(username, MAILU_EMAIL_ATTR, mailu_email)
enabled_value = _extract_attr(attrs, MAILU_ENABLED_ATTR)
if enabled_value.lower() not in {"1", "true", "yes", "y", "on"}:
keycloak_admin.set_user_attribute(username, MAILU_ENABLED_ATTR, "true")
except Exception:
mailu_email = f"{username}@{settings.mailu_domain}"
self._upsert_task(conn, request_code, "keycloak_user", "ok", None)
self._record_task(request_code, "keycloak_user", "ok", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to ensure user")
self._upsert_task(conn, request_code, "keycloak_user", "error", detail)
self._record_task(request_code, "keycloak_user", "error", detail, start)
if not user_id:
return ProvisionOutcome(ok=False, status="accounts_building")
# Task: set initial password for Keycloak
start = datetime.now(timezone.utc)
try:
should_reset = status == "accounts_building" and revealed_at is None
password_value: str | None = None
if should_reset:
if isinstance(initial_password, str) and initial_password:
password_value = initial_password
elif initial_password is None:
password_value = random_password(20)
conn.execute(
"""
UPDATE access_requests
SET initial_password = %s
WHERE request_code = %s AND initial_password IS NULL
""",
(password_value, request_code),
)
initial_password = password_value
if password_value:
keycloak_admin.reset_password(user_id, password_value, temporary=False)
if isinstance(initial_password, str) and initial_password:
self._upsert_task(conn, request_code, "keycloak_password", "ok", None)
self._record_task(request_code, "keycloak_password", "ok", None, start)
elif revealed_at is not None:
detail = "initial password already revealed"
self._upsert_task(conn, request_code, "keycloak_password", "ok", detail)
self._record_task(request_code, "keycloak_password", "ok", detail, start)
else:
raise RuntimeError("initial password missing")
except Exception as exc:
detail = safe_error_detail(exc, "failed to set password")
self._upsert_task(conn, request_code, "keycloak_password", "error", detail)
self._record_task(request_code, "keycloak_password", "error", detail, start)
# Task: group membership
start = datetime.now(timezone.utc)
try:
approved_flags = [flag for flag in approval_flags if flag in settings.allowed_flag_groups]
groups = list(dict.fromkeys(settings.default_user_groups + approved_flags))
for group_name in groups:
gid = keycloak_admin.get_group_id(group_name)
if not gid:
raise RuntimeError(f"group missing: {group_name}")
keycloak_admin.add_user_to_group(user_id, gid)
self._upsert_task(conn, request_code, "keycloak_groups", "ok", None)
self._record_task(request_code, "keycloak_groups", "ok", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to add groups")
self._upsert_task(conn, request_code, "keycloak_groups", "error", detail)
self._record_task(request_code, "keycloak_groups", "error", detail, start)
# Task: ensure mailu app password exists
start = datetime.now(timezone.utc)
try:
full = keycloak_admin.get_user(user_id)
attrs = full.get("attributes") or {}
existing = _extract_attr(attrs, MAILU_APP_PASSWORD_ATTR)
if not existing:
keycloak_admin.set_user_attribute(username, MAILU_APP_PASSWORD_ATTR, random_password())
self._upsert_task(conn, request_code, "mailu_app_password", "ok", None)
self._record_task(request_code, "mailu_app_password", "ok", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to set mail password")
self._upsert_task(conn, request_code, "mailu_app_password", "error", detail)
self._record_task(request_code, "mailu_app_password", "error", detail, start)
# Task: trigger Mailu sync
start = datetime.now(timezone.utc)
try:
if not settings.mailu_sync_url:
detail = "sync disabled"
self._upsert_task(conn, request_code, "mailu_sync", "ok", detail)
self._record_task(request_code, "mailu_sync", "ok", detail, start)
else:
mailu.sync(reason="ariadne_access_approve")
self._upsert_task(conn, request_code, "mailu_sync", "ok", None)
self._record_task(request_code, "mailu_sync", "ok", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to sync mailu")
self._upsert_task(conn, request_code, "mailu_sync", "error", detail)
self._record_task(request_code, "mailu_sync", "error", detail, start)
# Task: trigger Nextcloud mail sync
start = datetime.now(timezone.utc)
try:
if not settings.nextcloud_namespace or not settings.nextcloud_mail_sync_cronjob:
detail = "sync disabled"
self._upsert_task(conn, request_code, "nextcloud_mail_sync", "ok", detail)
self._record_task(request_code, "nextcloud_mail_sync", "ok", detail, start)
else:
result = nextcloud.sync_mail(username, wait=True)
if isinstance(result, dict) and result.get("status") == "ok":
self._upsert_task(conn, request_code, "nextcloud_mail_sync", "ok", None)
self._record_task(request_code, "nextcloud_mail_sync", "ok", None, start)
else:
status_val = result.get("status") if isinstance(result, dict) else "error"
detail = str(status_val)
self._upsert_task(conn, request_code, "nextcloud_mail_sync", "error", detail)
self._record_task(request_code, "nextcloud_mail_sync", "error", detail, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to sync nextcloud")
self._upsert_task(conn, request_code, "nextcloud_mail_sync", "error", detail)
self._record_task(request_code, "nextcloud_mail_sync", "error", detail, start)
# Task: ensure wger account exists
start = datetime.now(timezone.utc)
try:
full = keycloak_admin.get_user(user_id)
attrs = full.get("attributes") or {}
wger_password = _extract_attr(attrs, WGER_PASSWORD_ATTR)
wger_password_updated_at = _extract_attr(attrs, WGER_PASSWORD_UPDATED_ATTR)
if not wger_password:
wger_password = random_password(20)
keycloak_admin.set_user_attribute(username, WGER_PASSWORD_ATTR, wger_password)
if not wger_password_updated_at:
result = wger.sync_user(username, mailu_email, wger_password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"wger sync {status_val}")
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
keycloak_admin.set_user_attribute(username, WGER_PASSWORD_UPDATED_ATTR, now_iso)
self._upsert_task(conn, request_code, "wger_account", "ok", None)
self._record_task(request_code, "wger_account", "ok", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision wger")
self._upsert_task(conn, request_code, "wger_account", "error", detail)
self._record_task(request_code, "wger_account", "error", detail, start)
# Task: ensure firefly account exists
start = datetime.now(timezone.utc)
try:
full = keycloak_admin.get_user(user_id)
attrs = full.get("attributes") or {}
firefly_password = _extract_attr(attrs, FIREFLY_PASSWORD_ATTR)
firefly_password_updated_at = _extract_attr(attrs, FIREFLY_PASSWORD_UPDATED_ATTR)
if not firefly_password:
firefly_password = random_password(24)
keycloak_admin.set_user_attribute(username, FIREFLY_PASSWORD_ATTR, firefly_password)
if not firefly_password_updated_at:
result = firefly.sync_user(mailu_email, firefly_password, wait=True)
status_val = result.get("status") if isinstance(result, dict) else "error"
if status_val != "ok":
raise RuntimeError(f"firefly sync {status_val}")
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
keycloak_admin.set_user_attribute(username, FIREFLY_PASSWORD_UPDATED_ATTR, now_iso)
self._upsert_task(conn, request_code, "firefly_account", "ok", None)
self._record_task(request_code, "firefly_account", "ok", None, start)
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision firefly")
self._upsert_task(conn, request_code, "firefly_account", "error", detail)
self._record_task(request_code, "firefly_account", "error", detail, start)
# Task: ensure Vaultwarden account exists (invite flow)
start = datetime.now(timezone.utc)
try:
if not mailu.wait_for_mailbox(mailu_email, settings.mailu_mailbox_wait_timeout_sec):
raise RuntimeError("mailbox not ready")
result = vaultwarden.invite_user(mailu_email)
if result.ok:
self._upsert_task(conn, request_code, "vaultwarden_invite", "ok", result.status)
self._record_task(request_code, "vaultwarden_invite", "ok", result.status, start)
else:
detail = result.detail or result.status
self._upsert_task(conn, request_code, "vaultwarden_invite", "error", detail)
self._record_task(request_code, "vaultwarden_invite", "error", detail, start)
try:
now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
keycloak_admin.set_user_attribute(username, "vaultwarden_email", mailu_email)
keycloak_admin.set_user_attribute(username, "vaultwarden_status", result.status)
keycloak_admin.set_user_attribute(username, "vaultwarden_synced_at", now_iso)
except Exception:
pass
except Exception as exc:
detail = safe_error_detail(exc, "failed to provision vaultwarden")
self._upsert_task(conn, request_code, "vaultwarden_invite", "error", detail)
self._record_task(request_code, "vaultwarden_invite", "error", detail, start)
if self._all_tasks_ok(conn, request_code, required_tasks):
conn.execute(
"""
UPDATE access_requests
SET status = 'awaiting_onboarding'
WHERE request_code = %s AND status = 'accounts_building'
""",
(request_code,),
)
self._send_welcome_email(request_code, username, contact_email)
return ProvisionOutcome(ok=True, status="awaiting_onboarding")
return ProvisionOutcome(ok=False, status="accounts_building")
finally:
conn.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
def _ensure_task_rows(self, conn, request_code: str, tasks: list[str]) -> None:
if not tasks:
return
conn.execute(
"""
INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at)
SELECT %s, task, 'pending', NULL, NOW()
FROM UNNEST(%s::text[]) AS task
ON CONFLICT (request_code, task) DO NOTHING
""",
(request_code, tasks),
)
def _upsert_task(self, conn, request_code: str, task: str, status: str, detail: str | None = None) -> None:
conn.execute(
"""
INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at)
VALUES (%s, %s, %s, %s, NOW())
ON CONFLICT (request_code, task)
DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW()
""",
(request_code, task, status, detail),
)
def _task_statuses(self, conn, request_code: str) -> dict[str, str]:
rows = conn.execute(
"SELECT task, status FROM access_request_tasks WHERE request_code = %s",
(request_code,),
).fetchall()
output: dict[str, str] = {}
for row in rows:
task = row.get("task") if isinstance(row, dict) else None
status = row.get("status") if isinstance(row, dict) else None
if isinstance(task, str) and isinstance(status, str):
output[task] = status
return output
def _all_tasks_ok(self, conn, request_code: str, tasks: list[str]) -> bool:
statuses = self._task_statuses(conn, request_code)
for task in tasks:
if statuses.get(task) != "ok":
return False
return True
def _record_task(self, request_code: str, task: str, status: str, detail: str | None, started: datetime) -> None:
finished = datetime.now(timezone.utc)
duration_sec = (finished - started).total_seconds()
record_task_run(task, status, duration_sec)
try:
self._storage.record_task_run(
request_code,
task,
status,
detail,
started,
finished,
int(duration_sec * 1000),
)
except Exception:
pass
def _send_welcome_email(self, request_code: str, username: str, contact_email: str) -> None:
if not settings.welcome_email_enabled:
return
if not contact_email:
return
try:
row = self._db.fetchone(
"SELECT welcome_email_sent_at FROM access_requests WHERE request_code = %s",
(request_code,),
)
if row and row.get("welcome_email_sent_at"):
return
onboarding_url = f"{settings.portal_public_base_url}/onboarding?code={request_code}"
mailer.send_welcome(contact_email, request_code, onboarding_url, username=username)
self._storage.mark_welcome_sent(request_code)
except MailerError:
return

View File

@ -0,0 +1,71 @@
from __future__ import annotations
from prometheus_client import Counter, Gauge, Histogram
TASK_RUNS_TOTAL = Counter(
"ariadne_task_runs_total",
"Ariadne task runs by status",
["task", "status"],
)
TASK_DURATION_SECONDS = Histogram(
"ariadne_task_duration_seconds",
"Ariadne task durations in seconds",
["task", "status"],
buckets=(0.5, 1, 2, 5, 10, 30, 60, 120, 300),
)
SCHEDULE_LAST_RUN_TS = Gauge(
"ariadne_schedule_last_run_timestamp_seconds",
"Last schedule run timestamp",
["task"],
)
SCHEDULE_LAST_SUCCESS_TS = Gauge(
"ariadne_schedule_last_success_timestamp_seconds",
"Last successful schedule run timestamp",
["task"],
)
SCHEDULE_NEXT_RUN_TS = Gauge(
"ariadne_schedule_next_run_timestamp_seconds",
"Next scheduled run timestamp",
["task"],
)
SCHEDULE_STATUS = Gauge(
"ariadne_schedule_last_status",
"Last schedule status (1=ok,0=error)",
["task"],
)
ACCESS_REQUESTS = Gauge(
"ariadne_access_requests_total",
"Access requests by status",
["status"],
)
def record_task_run(task: str, status: str, duration_sec: float | None) -> None:
TASK_RUNS_TOTAL.labels(task=task, status=status).inc()
if duration_sec is not None:
TASK_DURATION_SECONDS.labels(task=task, status=status).observe(duration_sec)
def record_schedule_state(
task: str,
last_run_ts: float | None,
last_success_ts: float | None,
next_run_ts: float | None,
ok: bool | None,
) -> None:
if last_run_ts:
SCHEDULE_LAST_RUN_TS.labels(task=task).set(last_run_ts)
if last_success_ts:
SCHEDULE_LAST_SUCCESS_TS.labels(task=task).set(last_success_ts)
if next_run_ts:
SCHEDULE_NEXT_RUN_TS.labels(task=task).set(next_run_ts)
if ok is not None:
SCHEDULE_STATUS.labels(task=task).set(1 if ok else 0)
def set_access_request_counts(counts: dict[str, int]) -> None:
for status, count in counts.items():
ACCESS_REQUESTS.labels(status=status).set(count)

125
ariadne/scheduler/cron.py Normal file
View File

@ -0,0 +1,125 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import threading
import time
from typing import Callable
from croniter import croniter
from ..db.storage import Storage
from ..metrics.metrics import record_schedule_state, record_task_run
@dataclass(frozen=True)
class CronTask:
name: str
cron_expr: str
runner: Callable[[], None]
class CronScheduler:
def __init__(self, storage: Storage, tick_sec: float = 5.0) -> None:
self._storage = storage
self._tick_sec = tick_sec
self._tasks: dict[str, CronTask] = {}
self._next_run: dict[str, datetime] = {}
self._running: set[str] = set()
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._thread: threading.Thread | None = None
def add_task(self, name: str, cron_expr: str, runner: Callable[[], None]) -> None:
task = CronTask(name=name, cron_expr=cron_expr, runner=runner)
self._tasks[name] = task
self._next_run[name] = self._compute_next(cron_expr, datetime.now(timezone.utc))
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._run_loop, name="ariadne-scheduler", daemon=True)
self._thread.start()
def stop(self) -> None:
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
def _compute_next(self, cron_expr: str, base: datetime) -> datetime:
itr = croniter(cron_expr, base)
next_time = itr.get_next(datetime)
if next_time.tzinfo is None:
return next_time.replace(tzinfo=timezone.utc)
return next_time
def _run_loop(self) -> None:
while not self._stop_event.is_set():
now = datetime.now(timezone.utc)
for name, task in list(self._tasks.items()):
next_run = self._next_run.get(name)
if next_run and now >= next_run:
with self._lock:
if name in self._running:
continue
self._running.add(name)
self._next_run[name] = self._compute_next(task.cron_expr, now)
threading.Thread(
target=self._execute_task,
args=(task,),
name=f"ariadne-scheduler-{name}",
daemon=True,
).start()
record_schedule_state(
name,
None,
None,
self._next_run.get(name).timestamp() if self._next_run.get(name) else None,
None,
)
time.sleep(self._tick_sec)
def _execute_task(self, task: CronTask) -> None:
started = datetime.now(timezone.utc)
status = "ok"
detail = None
try:
task.runner()
except Exception as exc:
status = "error"
detail = str(exc).strip() or "task failed"
finished = datetime.now(timezone.utc)
duration_sec = (finished - started).total_seconds()
record_task_run(task.name, status, duration_sec)
record_schedule_state(
task.name,
started.timestamp(),
started.timestamp() if status == "ok" else None,
self._next_run.get(task.name).timestamp() if self._next_run.get(task.name) else None,
status == "ok",
)
try:
self._storage.record_task_run(
None,
task.name,
status,
detail,
started,
finished,
int(duration_sec * 1000),
)
self._storage.update_schedule_state(
task.name,
task.cron_expr,
started,
finished,
status,
detail,
int(duration_sec * 1000),
self._next_run.get(task.name),
)
except Exception:
pass
with self._lock:
self._running.discard(task.name)

View File

@ -0,0 +1,37 @@
from __future__ import annotations
from typing import Any
from ..k8s.jobs import JobSpawner
from ..settings import settings
class FireflyService:
def __init__(self) -> None:
self._spawner = JobSpawner(settings.firefly_namespace, settings.firefly_user_sync_cronjob)
def sync_user(self, email: str, password: str, wait: bool = True) -> dict[str, Any]:
email = (email or "").strip()
if not email:
raise RuntimeError("missing email")
if not password:
raise RuntimeError("missing password")
if not settings.firefly_namespace or not settings.firefly_user_sync_cronjob:
raise RuntimeError("firefly sync not configured")
env_overrides = [
{"name": "FIREFLY_USER_EMAIL", "value": email},
{"name": "FIREFLY_USER_PASSWORD", "value": password},
]
label_suffix = email.split("@", 1)[0] if "@" in email else email
if wait:
return self._spawner.trigger_and_wait(
label_suffix,
env_overrides,
settings.firefly_user_sync_wait_timeout_sec,
)
return self._spawner.trigger(label_suffix, env_overrides)
firefly = FireflyService()

View File

@ -0,0 +1,192 @@
from __future__ import annotations
from typing import Any
import time
import httpx
from ..settings import settings
class KeycloakAdminClient:
def __init__(self) -> None:
self._token: str = ""
self._expires_at: float = 0.0
self._group_id_cache: dict[str, str] = {}
def ready(self) -> bool:
return bool(settings.keycloak_admin_client_id and settings.keycloak_admin_client_secret)
def _get_token(self) -> str:
if not self.ready():
raise RuntimeError("keycloak admin client not configured")
now = time.time()
if self._token and now < self._expires_at - 30:
return self._token
token_url = (
f"{settings.keycloak_admin_url}/realms/{settings.keycloak_admin_realm}/protocol/openid-connect/token"
)
data = {
"grant_type": "client_credentials",
"client_id": settings.keycloak_admin_client_id,
"client_secret": settings.keycloak_admin_client_secret,
}
with httpx.Client(timeout=10.0) as client:
resp = client.post(token_url, data=data)
resp.raise_for_status()
payload = resp.json()
token = payload.get("access_token") or ""
if not token:
raise RuntimeError("no access_token in response")
expires_in = int(payload.get("expires_in") or 60)
self._token = token
self._expires_at = now + expires_in
return token
def _headers(self) -> dict[str, str]:
return {"Authorization": f"Bearer {self._get_token()}"}
def headers(self) -> dict[str, str]:
return self._headers()
def find_user(self, username: str) -> dict[str, Any] | None:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users"
params = {"username": username, "exact": "true", "max": "1"}
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, params=params, headers=self._headers())
resp.raise_for_status()
users = resp.json()
if not isinstance(users, list) or not users:
return None
user = users[0]
return user if isinstance(user, dict) else None
def find_user_by_email(self, email: str) -> dict[str, Any] | None:
email = (email or "").strip()
if not email:
return None
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users"
params = {"email": email, "exact": "true", "max": "2"}
email_norm = email.lower()
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, params=params, headers=self._headers())
resp.raise_for_status()
users = resp.json()
if not isinstance(users, list) or not users:
return None
for user in users:
if not isinstance(user, dict):
continue
candidate = user.get("email")
if isinstance(candidate, str) and candidate.strip().lower() == email_norm:
return user
return None
def get_user(self, user_id: str) -> dict[str, Any]:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users/{user_id}"
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, headers=self._headers())
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict):
raise RuntimeError("unexpected user payload")
return data
def update_user(self, user_id: str, payload: dict[str, Any]) -> None:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users/{user_id}"
with httpx.Client(timeout=10.0) as client:
resp = client.put(url, headers={**self._headers(), "Content-Type": "application/json"}, json=payload)
resp.raise_for_status()
def create_user(self, payload: dict[str, Any]) -> str:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users"
with httpx.Client(timeout=10.0) as client:
resp = client.post(url, headers={**self._headers(), "Content-Type": "application/json"}, json=payload)
resp.raise_for_status()
location = resp.headers.get("Location") or ""
if location:
return location.rstrip("/").split("/")[-1]
raise RuntimeError("failed to determine created user id")
def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
url = (
f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}"
f"/users/{user_id}/reset-password"
)
payload = {"type": "password", "value": password, "temporary": bool(temporary)}
with httpx.Client(timeout=10.0) as client:
resp = client.put(url, headers={**self._headers(), "Content-Type": "application/json"}, json=payload)
resp.raise_for_status()
def set_user_attribute(self, username: str, key: str, value: str) -> None:
user = self.find_user(username)
if not user:
raise RuntimeError("user not found")
user_id = user.get("id") or ""
if not user_id:
raise RuntimeError("user id missing")
full = self.get_user(user_id)
attrs = full.get("attributes") or {}
if not isinstance(attrs, dict):
attrs = {}
attrs[key] = [value]
self.update_user(user_id, {"attributes": attrs})
def get_group_id(self, group_name: str) -> str | None:
cached = self._group_id_cache.get(group_name)
if cached:
return cached
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/groups"
params = {"search": group_name}
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, params=params, headers=self._headers())
resp.raise_for_status()
items = resp.json()
if not isinstance(items, list):
return None
for item in items:
if not isinstance(item, dict):
continue
if item.get("name") == group_name and item.get("id"):
gid = str(item["id"])
self._group_id_cache[group_name] = gid
return gid
return None
def add_user_to_group(self, user_id: str, group_id: str) -> None:
url = (
f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}"
f"/users/{user_id}/groups/{group_id}"
)
with httpx.Client(timeout=10.0) as client:
resp = client.put(url, headers=self._headers())
resp.raise_for_status()
def iter_users(self, page_size: int = 200, brief: bool = False) -> list[dict[str, Any]]:
url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users"
users: list[dict[str, Any]] = []
first = 0
while True:
params = {"first": str(first), "max": str(page_size)}
if not brief:
params["briefRepresentation"] = "false"
with httpx.Client(timeout=10.0) as client:
resp = client.get(url, params=params, headers=self._headers())
resp.raise_for_status()
payload = resp.json()
if not isinstance(payload, list) or not payload:
return users
for item in payload:
if isinstance(item, dict):
users.append(item)
if len(payload) < page_size:
return users
first += page_size
keycloak_admin = KeycloakAdminClient()

102
ariadne/services/mailer.py Normal file
View File

@ -0,0 +1,102 @@
from __future__ import annotations
from dataclasses import dataclass
from email.message import EmailMessage
import smtplib
from typing import Iterable
from ..settings import settings
class MailerError(RuntimeError):
pass
@dataclass(frozen=True)
class SentEmail:
ok: bool
detail: str = ""
class Mailer:
def __init__(self) -> None:
self._host = settings.smtp_host
self._port = settings.smtp_port
self._username = settings.smtp_username
self._password = settings.smtp_password
self._from_addr = settings.smtp_from
self._starttls = settings.smtp_starttls
self._use_tls = settings.smtp_use_tls
self._timeout = settings.smtp_timeout_sec
def send(self, subject: str, to_addrs: Iterable[str], text_body: str, html_body: str | None = None) -> SentEmail:
if not self._host:
raise MailerError("smtp host not configured")
message = EmailMessage()
message["Subject"] = subject
message["From"] = self._from_addr
message["To"] = ", ".join(to_addrs)
message.set_content(text_body)
if html_body:
message.add_alternative(html_body, subtype="html")
try:
if self._use_tls:
server: smtplib.SMTP = smtplib.SMTP_SSL(self._host, self._port, timeout=self._timeout)
else:
server = smtplib.SMTP(self._host, self._port, timeout=self._timeout)
with server:
server.ehlo()
if self._starttls:
server.starttls()
server.ehlo()
if self._username:
server.login(self._username, self._password)
server.send_message(message)
return SentEmail(ok=True, detail="sent")
except Exception as exc:
raise MailerError(str(exc)) from exc
def send_welcome(self, to_addr: str, request_code: str, onboarding_url: str, username: str | None = None) -> SentEmail:
display = username or "there"
subject = "Welcome to Titan Lab"
text_body = "\n".join(
[
f"Hi {display},",
"",
"Your Titan Lab access is approved.",
f"Complete onboarding here: {onboarding_url}",
"",
f"Request code: {request_code}",
"",
"If you did not request access, ignore this email.",
"",
"— Titan Lab",
]
)
html_body = f"""
<html>
<body style="font-family: Arial, sans-serif; background:#f6f6f4; padding:24px;">
<table style="max-width:600px; background:#ffffff; padding:24px; border-radius:12px; margin:0 auto; box-shadow:0 6px 24px rgba(0,0,0,0.08);">
<tr><td>
<h1 style="margin:0 0 8px; font-size:24px; color:#111827;">Welcome to Titan Lab</h1>
<p style="margin:0 0 16px; color:#4b5563;">Hi {display}, your access has been approved.</p>
<p style="margin:0 0 24px; color:#111827;">
<a href="{onboarding_url}" style="display:inline-block; background:#111827; color:#ffffff; padding:10px 16px; border-radius:8px; text-decoration:none;">
Start onboarding
</a>
</p>
<p style="margin:0 0 8px; color:#6b7280; font-size:14px;">Request code: <strong>{request_code}</strong></p>
<p style="margin:0; color:#9ca3af; font-size:12px;">If you did not request access, ignore this email.</p>
</td></tr>
</table>
</body>
</html>
""".strip()
return self.send(subject, [to_addr], text_body, html_body)
mailer = Mailer()

63
ariadne/services/mailu.py Normal file
View File

@ -0,0 +1,63 @@
from __future__ import annotations
import time
from typing import Any
import httpx
import psycopg
from ..settings import settings
class MailuService:
def __init__(self) -> None:
self._db_config = {
"host": settings.mailu_db_host,
"port": settings.mailu_db_port,
"dbname": settings.mailu_db_name,
"user": settings.mailu_db_user,
"password": settings.mailu_db_password,
}
def mailbox_exists(self, email: str) -> bool:
email = (email or "").strip()
if not email:
return False
try:
with psycopg.connect(**self._db_config) as conn:
with conn.cursor() as cur:
cur.execute('SELECT 1 FROM "user" WHERE email = %s LIMIT 1', (email,))
return cur.fetchone() is not None
except Exception:
return False
def wait_for_mailbox(self, email: str, timeout_sec: float = 60.0) -> bool:
deadline = time.time() + timeout_sec
while time.time() < deadline:
if self.mailbox_exists(email):
return True
time.sleep(2)
return False
def sync(self, reason: str) -> None:
if not settings.mailu_sync_url:
return
with httpx.Client(timeout=settings.mailu_sync_wait_timeout_sec) as client:
resp = client.post(
settings.mailu_sync_url,
json={"ts": int(time.time()), "wait": True, "reason": reason},
)
if resp.status_code != 200:
raise RuntimeError(f"mailu sync failed status={resp.status_code}")
@staticmethod
def resolve_mailu_email(username: str, attributes: dict[str, Any] | None) -> str:
attrs = attributes or {}
raw = attrs.get("mailu_email") if isinstance(attrs, dict) else None
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
if isinstance(raw, str) and raw.strip():
return raw.strip()
return f"{username}@{settings.mailu_domain}"

View File

@ -0,0 +1,37 @@
from __future__ import annotations
from typing import Any
from ..k8s.jobs import JobSpawner
from ..settings import settings
class NextcloudService:
def __init__(self) -> None:
self._spawner = JobSpawner(settings.nextcloud_namespace, settings.nextcloud_mail_sync_cronjob)
def sync_mail(self, username: str | None = None, wait: bool = True) -> dict[str, Any]:
if not settings.nextcloud_namespace or not settings.nextcloud_mail_sync_cronjob:
raise RuntimeError("nextcloud mail sync not configured")
env_overrides = None
label_suffix = "all"
if username:
cleaned = (username or "").strip()
if not cleaned:
raise RuntimeError("missing username")
env_overrides = [{"name": "ONLY_USERNAME", "value": cleaned}]
label_suffix = cleaned
ttl = settings.nextcloud_mail_sync_job_ttl_sec
if wait:
return self._spawner.trigger_and_wait(
label_suffix,
env_overrides,
settings.nextcloud_mail_sync_wait_timeout_sec,
job_ttl_seconds=ttl,
)
return self._spawner.trigger(label_suffix, env_overrides, job_ttl_seconds=ttl)
nextcloud = NextcloudService()

View File

@ -0,0 +1,151 @@
from __future__ import annotations
import threading
import time
from dataclasses import dataclass
import httpx
from ..k8s.client import get_secret_value, get_json
from ..settings import settings
@dataclass(frozen=True)
class VaultwardenInvite:
ok: bool
status: str
detail: str = ""
class VaultwardenService:
def __init__(self) -> None:
self._admin_lock = threading.Lock()
self._admin_session: httpx.Client | None = None
self._admin_session_expires_at: float = 0.0
self._admin_session_base_url: str = ""
self._rate_limited_until: float = 0.0
def invite_user(self, email: str) -> VaultwardenInvite:
email = (email or "").strip()
if not email or "@" not in email:
return VaultwardenInvite(ok=False, status="invalid_email", detail="email invalid")
if self._rate_limited_until and time.time() < self._rate_limited_until:
return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited")
base_url = f"http://{settings.vaultwarden_service_host}"
fallback_url = ""
try:
pod_ip = self._find_pod_ip(settings.vaultwarden_namespace, settings.vaultwarden_pod_label)
fallback_url = f"http://{pod_ip}:{settings.vaultwarden_pod_port}"
except Exception:
fallback_url = ""
last_error = ""
for candidate in [base_url, fallback_url]:
if not candidate:
continue
try:
session = self._admin_session(candidate)
resp = session.post("/admin/invite", json={"email": email})
if resp.status_code == 429:
self._rate_limited_until = time.time() + float(settings.vaultwarden_admin_rate_limit_backoff_sec)
return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited")
if resp.status_code in {200, 201, 204}:
return VaultwardenInvite(ok=True, status="invited", detail="invite created")
body = ""
try:
body = resp.text or ""
except Exception:
body = ""
if resp.status_code in {400, 409} and any(
marker in body.lower()
for marker in (
"already invited",
"already exists",
"already registered",
"user already exists",
)
):
return VaultwardenInvite(ok=True, status="already_present", detail="user already present")
last_error = f"status {resp.status_code}"
except Exception as exc:
last_error = str(exc)
if "rate limited" in last_error.lower():
return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited")
continue
return VaultwardenInvite(ok=False, status="error", detail=last_error or "failed to invite")
def _admin_session(self, base_url: str) -> httpx.Client:
now = time.time()
with self._admin_lock:
if self._rate_limited_until and now < self._rate_limited_until:
raise RuntimeError("vaultwarden rate limited")
if self._admin_session and now < self._admin_session_expires_at and self._admin_session_base_url == base_url:
return self._admin_session
if self._admin_session:
try:
self._admin_session.close()
except Exception:
pass
self._admin_session = None
token = get_secret_value(
settings.vaultwarden_namespace,
settings.vaultwarden_admin_secret_name,
settings.vaultwarden_admin_secret_key,
)
client = httpx.Client(
base_url=base_url,
timeout=10.0,
follow_redirects=True,
headers={"User-Agent": "ariadne/1"},
)
resp = client.post("/admin", data={"token": token})
if resp.status_code == 429:
self._rate_limited_until = now + float(settings.vaultwarden_admin_rate_limit_backoff_sec)
raise RuntimeError("vaultwarden rate limited")
resp.raise_for_status()
self._admin_session = client
self._admin_session_base_url = base_url
self._admin_session_expires_at = now + float(settings.vaultwarden_admin_session_ttl_sec)
return client
@staticmethod
def _find_pod_ip(namespace: str, label_selector: str) -> str:
data = get_json(f"/api/v1/namespaces/{namespace}/pods?labelSelector={label_selector}")
items = data.get("items") or []
if not isinstance(items, list) or not items:
raise RuntimeError("no vaultwarden pods found")
def _pod_ready(pod: dict) -> bool:
status = pod.get("status") if isinstance(pod.get("status"), dict) else {}
if status.get("phase") != "Running":
return False
ip = status.get("podIP")
if not isinstance(ip, str) or not ip:
return False
conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else []
for cond in conditions:
if not isinstance(cond, dict):
continue
if cond.get("type") == "Ready":
return cond.get("status") == "True"
return True
ready = [p for p in items if isinstance(p, dict) and _pod_ready(p)]
candidates = ready or [p for p in items if isinstance(p, dict)]
status = candidates[0].get("status") or {}
ip = status.get("podIP") if isinstance(status, dict) else None
if not isinstance(ip, str) or not ip:
raise RuntimeError("vaultwarden pod has no IP")
return ip
vaultwarden = VaultwardenService()

View File

@ -0,0 +1,197 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
import time
from typing import Any
from ..settings import settings
from .keycloak_admin import keycloak_admin
from .mailu import mailu
from .vaultwarden import vaultwarden
VAULTWARDEN_EMAIL_ATTR = "vaultwarden_email"
VAULTWARDEN_STATUS_ATTR = "vaultwarden_status"
VAULTWARDEN_SYNCED_AT_ATTR = "vaultwarden_synced_at"
@dataclass(frozen=True)
class VaultwardenSyncSummary:
processed: int
created_or_present: int
skipped: int
failures: int
detail: str = ""
def _extract_attr(attrs: Any, key: str) -> str:
if not isinstance(attrs, dict):
return ""
raw = attrs.get(key)
if isinstance(raw, list):
for item in raw:
if isinstance(item, str) and item.strip():
return item.strip()
return ""
if isinstance(raw, str) and raw.strip():
return raw.strip()
return ""
def _parse_synced_at(value: str) -> float | None:
value = (value or "").strip()
if not value:
return None
for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"):
try:
parsed = datetime.strptime(value, fmt)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.timestamp()
except ValueError:
continue
return None
def _vaultwarden_email_for_user(user: dict[str, Any]) -> str:
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip()
if not username:
return ""
attrs = user.get("attributes")
vaultwarden_email = _extract_attr(attrs, VAULTWARDEN_EMAIL_ATTR)
if vaultwarden_email:
return vaultwarden_email
mailu_email = _extract_attr(attrs, "mailu_email")
if mailu_email:
return mailu_email
email = (user.get("email") if isinstance(user.get("email"), str) else "") or ""
email = email.strip()
if email and email.lower().endswith(f"@{settings.mailu_domain.lower()}"):
return email
return ""
def _set_user_attribute_if_missing(username: str, user: dict[str, Any], key: str, value: str) -> None:
value = (value or "").strip()
if not value:
return
existing = _extract_attr(user.get("attributes"), key)
if existing:
return
keycloak_admin.set_user_attribute(username, key, value)
def _set_user_attribute(username: str, key: str, value: str) -> None:
value = (value or "").strip()
if not value:
return
keycloak_admin.set_user_attribute(username, key, value)
def run_vaultwarden_sync() -> VaultwardenSyncSummary:
processed = 0
created = 0
skipped = 0
failures = 0
consecutive_failures = 0
if not keycloak_admin.ready():
return VaultwardenSyncSummary(0, 0, 0, 1, detail="keycloak admin not configured")
users = keycloak_admin.iter_users(page_size=200, brief=False)
for user in users:
username = (user.get("username") if isinstance(user.get("username"), str) else "") or ""
username = username.strip()
if not username:
skipped += 1
continue
enabled = user.get("enabled")
if enabled is False:
skipped += 1
continue
if user.get("serviceAccountClientId") or username.startswith("service-account-"):
skipped += 1
continue
user_id = (user.get("id") if isinstance(user.get("id"), str) else "") or ""
full_user = user
if user_id:
try:
full_user = keycloak_admin.get_user(user_id)
except Exception:
full_user = user
current_status = _extract_attr(full_user.get("attributes"), VAULTWARDEN_STATUS_ATTR)
current_synced_at = _extract_attr(full_user.get("attributes"), VAULTWARDEN_SYNCED_AT_ATTR)
current_synced_ts = _parse_synced_at(current_synced_at)
if current_status in {"rate_limited", "error"} and current_synced_ts:
if time.time() - current_synced_ts < settings.vaultwarden_retry_cooldown_sec:
skipped += 1
continue
email = _vaultwarden_email_for_user(full_user)
if not email:
skipped += 1
continue
if not mailu.mailbox_exists(email):
skipped += 1
continue
try:
_set_user_attribute_if_missing(username, full_user, VAULTWARDEN_EMAIL_ATTR, email)
except Exception:
pass
if current_status in {"invited", "already_present"}:
if not current_synced_at:
try:
_set_user_attribute(
username,
VAULTWARDEN_SYNCED_AT_ATTR,
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
except Exception:
pass
skipped += 1
continue
processed += 1
result = vaultwarden.invite_user(email)
if result.ok:
created += 1
consecutive_failures = 0
try:
_set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status)
_set_user_attribute(
username,
VAULTWARDEN_SYNCED_AT_ATTR,
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
except Exception:
pass
else:
failures += 1
if result.status in {"rate_limited", "error"}:
consecutive_failures += 1
try:
_set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status)
_set_user_attribute(
username,
VAULTWARDEN_SYNCED_AT_ATTR,
time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
)
except Exception:
pass
if consecutive_failures >= settings.vaultwarden_failure_bailout:
break
return VaultwardenSyncSummary(processed, created, skipped, failures)

49
ariadne/services/wger.py Normal file
View File

@ -0,0 +1,49 @@
from __future__ import annotations
from typing import Any
from ..k8s.jobs import JobSpawner
from ..settings import settings
class WgerService:
def __init__(self) -> None:
self._user_spawner = JobSpawner(settings.wger_namespace, settings.wger_user_sync_cronjob)
self._admin_spawner = JobSpawner(settings.wger_namespace, settings.wger_admin_cronjob)
def sync_user(self, username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]:
username = (username or "").strip()
if not username:
raise RuntimeError("missing username")
if not password:
raise RuntimeError("missing password")
if not settings.wger_namespace or not settings.wger_user_sync_cronjob:
raise RuntimeError("wger sync not configured")
env_overrides = [
{"name": "WGER_USERNAME", "value": username},
{"name": "WGER_EMAIL", "value": email},
{"name": "WGER_PASSWORD", "value": password},
]
if wait:
return self._user_spawner.trigger_and_wait(
username,
env_overrides,
settings.wger_user_sync_wait_timeout_sec,
)
return self._user_spawner.trigger(username, env_overrides)
def ensure_admin(self, wait: bool = False) -> dict[str, Any]:
if not settings.wger_namespace or not settings.wger_admin_cronjob:
raise RuntimeError("wger admin sync not configured")
if wait:
return self._admin_spawner.trigger_and_wait(
"admin",
None,
settings.wger_user_sync_wait_timeout_sec,
)
return self._admin_spawner.trigger("admin", None)
wger = WgerService()

208
ariadne/settings.py Normal file
View File

@ -0,0 +1,208 @@
from __future__ import annotations
from dataclasses import dataclass
import os
def _env(name: str, default: str = "") -> str:
value = os.getenv(name, default)
return value.strip() if isinstance(value, str) else default
def _env_bool(name: str, default: str = "false") -> bool:
return _env(name, default).lower() in {"1", "true", "yes", "y", "on"}
def _env_int(name: str, default: int) -> int:
raw = _env(name, str(default))
try:
return int(raw)
except ValueError:
return default
def _env_float(name: str, default: float) -> float:
raw = _env(name, str(default))
try:
return float(raw)
except ValueError:
return default
@dataclass(frozen=True)
class Settings:
app_name: str
bind_host: str
bind_port: int
portal_database_url: str
portal_public_base_url: str
keycloak_url: str
keycloak_realm: str
keycloak_client_id: str
keycloak_issuer: str
keycloak_jwks_url: str
keycloak_admin_url: str
keycloak_admin_realm: str
keycloak_admin_client_id: str
keycloak_admin_client_secret: str
portal_admin_users: list[str]
portal_admin_groups: list[str]
account_allowed_groups: list[str]
allowed_flag_groups: list[str]
default_user_groups: list[str]
mailu_domain: str
mailu_sync_url: str
mailu_sync_wait_timeout_sec: float
mailu_mailbox_wait_timeout_sec: float
mailu_db_host: str
mailu_db_port: int
mailu_db_name: str
mailu_db_user: str
mailu_db_password: str
nextcloud_namespace: str
nextcloud_mail_sync_cronjob: str
nextcloud_mail_sync_wait_timeout_sec: float
nextcloud_mail_sync_job_ttl_sec: int
wger_namespace: str
wger_user_sync_cronjob: str
wger_user_sync_wait_timeout_sec: float
wger_admin_cronjob: str
firefly_namespace: str
firefly_user_sync_cronjob: str
firefly_user_sync_wait_timeout_sec: float
vaultwarden_namespace: str
vaultwarden_pod_label: str
vaultwarden_pod_port: int
vaultwarden_service_host: str
vaultwarden_admin_secret_name: str
vaultwarden_admin_secret_key: str
vaultwarden_admin_session_ttl_sec: float
vaultwarden_admin_rate_limit_backoff_sec: float
vaultwarden_retry_cooldown_sec: float
vaultwarden_failure_bailout: int
smtp_host: str
smtp_port: int
smtp_username: str
smtp_password: str
smtp_starttls: bool
smtp_use_tls: bool
smtp_from: str
smtp_timeout_sec: float
welcome_email_enabled: bool
provision_poll_interval_sec: float
provision_retry_cooldown_sec: float
schedule_tick_sec: float
k8s_api_timeout_sec: float
mailu_sync_cron: str
nextcloud_sync_cron: str
vaultwarden_sync_cron: str
wger_admin_cron: str
metrics_path: str
@classmethod
def from_env(cls) -> "Settings":
keycloak_url = _env("KEYCLOAK_URL", "https://sso.bstein.dev").rstrip("/")
keycloak_realm = _env("KEYCLOAK_REALM", "atlas")
keycloak_client_id = _env("KEYCLOAK_CLIENT_ID", "bstein-dev-home")
keycloak_issuer = _env("KEYCLOAK_ISSUER", f"{keycloak_url}/realms/{keycloak_realm}").rstrip("/")
keycloak_jwks_url = _env("KEYCLOAK_JWKS_URL", f"{keycloak_issuer}/protocol/openid-connect/certs").rstrip("/")
admin_users = [u for u in (_env("PORTAL_ADMIN_USERS", "bstein")).split(",") if u.strip()]
admin_groups = [g for g in (_env("PORTAL_ADMIN_GROUPS", "admin")).split(",") if g.strip()]
allowed_groups = [g for g in (_env("ACCOUNT_ALLOWED_GROUPS", "dev,admin")).split(",") if g.strip()]
flag_groups = [g for g in (_env("ALLOWED_FLAG_GROUPS", "demo,test")).split(",") if g.strip()]
default_groups = [g for g in (_env("DEFAULT_USER_GROUPS", "dev")).split(",") if g.strip()]
mailu_db_port = _env_int("MAILU_DB_PORT", 5432)
smtp_port = _env_int("SMTP_PORT", 25)
return cls(
app_name=_env("ARIADNE_APP_NAME", "ariadne"),
bind_host=_env("ARIADNE_BIND_HOST", "0.0.0.0"),
bind_port=_env_int("ARIADNE_BIND_PORT", 8080),
portal_database_url=_env("PORTAL_DATABASE_URL", ""),
portal_public_base_url=_env("PORTAL_PUBLIC_BASE_URL", "https://bstein.dev").rstrip("/"),
keycloak_url=keycloak_url,
keycloak_realm=keycloak_realm,
keycloak_client_id=keycloak_client_id,
keycloak_issuer=keycloak_issuer,
keycloak_jwks_url=keycloak_jwks_url,
keycloak_admin_url=_env("KEYCLOAK_ADMIN_URL", keycloak_url).rstrip("/"),
keycloak_admin_realm=_env("KEYCLOAK_ADMIN_REALM", keycloak_realm),
keycloak_admin_client_id=_env("KEYCLOAK_ADMIN_CLIENT_ID", ""),
keycloak_admin_client_secret=_env("KEYCLOAK_ADMIN_CLIENT_SECRET", ""),
portal_admin_users=admin_users,
portal_admin_groups=admin_groups,
account_allowed_groups=allowed_groups,
allowed_flag_groups=flag_groups,
default_user_groups=default_groups,
mailu_domain=_env("MAILU_DOMAIN", "bstein.dev"),
mailu_sync_url=_env(
"MAILU_SYNC_URL",
"http://mailu-sync-listener.mailu-mailserver.svc.cluster.local:8080/events",
).rstrip("/"),
mailu_sync_wait_timeout_sec=_env_float("MAILU_SYNC_WAIT_TIMEOUT_SEC", 60.0),
mailu_mailbox_wait_timeout_sec=_env_float("MAILU_MAILBOX_WAIT_TIMEOUT_SEC", 60.0),
mailu_db_host=_env("MAILU_DB_HOST", "postgres-service.postgres.svc.cluster.local"),
mailu_db_port=mailu_db_port,
mailu_db_name=_env("MAILU_DB_NAME", "mailu"),
mailu_db_user=_env("MAILU_DB_USER", "mailu"),
mailu_db_password=_env("MAILU_DB_PASSWORD", ""),
nextcloud_namespace=_env("NEXTCLOUD_NAMESPACE", "nextcloud"),
nextcloud_mail_sync_cronjob=_env("NEXTCLOUD_MAIL_SYNC_CRONJOB", "nextcloud-mail-sync"),
nextcloud_mail_sync_wait_timeout_sec=_env_float("NEXTCLOUD_MAIL_SYNC_WAIT_TIMEOUT_SEC", 90.0),
nextcloud_mail_sync_job_ttl_sec=_env_int("NEXTCLOUD_MAIL_SYNC_JOB_TTL_SEC", 3600),
wger_namespace=_env("WGER_NAMESPACE", "health"),
wger_user_sync_cronjob=_env("WGER_USER_SYNC_CRONJOB", "wger-user-sync"),
wger_user_sync_wait_timeout_sec=_env_float("WGER_USER_SYNC_WAIT_TIMEOUT_SEC", 60.0),
wger_admin_cronjob=_env("WGER_ADMIN_CRONJOB", "wger-admin-ensure"),
firefly_namespace=_env("FIREFLY_NAMESPACE", "finance"),
firefly_user_sync_cronjob=_env("FIREFLY_USER_SYNC_CRONJOB", "firefly-user-sync"),
firefly_user_sync_wait_timeout_sec=_env_float("FIREFLY_USER_SYNC_WAIT_TIMEOUT_SEC", 90.0),
vaultwarden_namespace=_env("VAULTWARDEN_NAMESPACE", "vaultwarden"),
vaultwarden_pod_label=_env("VAULTWARDEN_POD_LABEL", "app=vaultwarden"),
vaultwarden_pod_port=_env_int("VAULTWARDEN_POD_PORT", 80),
vaultwarden_service_host=_env(
"VAULTWARDEN_SERVICE_HOST",
"vaultwarden-service.vaultwarden.svc.cluster.local",
),
vaultwarden_admin_secret_name=_env("VAULTWARDEN_ADMIN_SECRET_NAME", "vaultwarden-admin"),
vaultwarden_admin_secret_key=_env("VAULTWARDEN_ADMIN_SECRET_KEY", "ADMIN_TOKEN"),
vaultwarden_admin_session_ttl_sec=_env_float("VAULTWARDEN_ADMIN_SESSION_TTL_SEC", 300.0),
vaultwarden_admin_rate_limit_backoff_sec=_env_float("VAULTWARDEN_ADMIN_RATE_LIMIT_BACKOFF_SEC", 600.0),
vaultwarden_retry_cooldown_sec=_env_float("VAULTWARDEN_RETRY_COOLDOWN_SEC", 1800.0),
vaultwarden_failure_bailout=_env_int("VAULTWARDEN_FAILURE_BAILOUT", 2),
smtp_host=_env("SMTP_HOST", ""),
smtp_port=smtp_port,
smtp_username=_env("SMTP_USERNAME", ""),
smtp_password=_env("SMTP_PASSWORD", ""),
smtp_starttls=_env_bool("SMTP_STARTTLS", "false"),
smtp_use_tls=_env_bool("SMTP_USE_TLS", "false"),
smtp_from=_env("SMTP_FROM", f"postmaster@{mailu_domain}"),
smtp_timeout_sec=_env_float("SMTP_TIMEOUT_SEC", 10.0),
welcome_email_enabled=_env_bool("WELCOME_EMAIL_ENABLED", "true"),
provision_poll_interval_sec=_env_float("ARIADNE_PROVISION_POLL_INTERVAL_SEC", 5.0),
provision_retry_cooldown_sec=_env_float("ARIADNE_PROVISION_RETRY_COOLDOWN_SEC", 30.0),
schedule_tick_sec=_env_float("ARIADNE_SCHEDULE_TICK_SEC", 5.0),
k8s_api_timeout_sec=_env_float("K8S_API_TIMEOUT_SEC", 5.0),
mailu_sync_cron=_env("ARIADNE_SCHEDULE_MAILU_SYNC", "30 4 * * *"),
nextcloud_sync_cron=_env("ARIADNE_SCHEDULE_NEXTCLOUD_SYNC", "0 5 * * *"),
vaultwarden_sync_cron=_env("ARIADNE_SCHEDULE_VAULTWARDEN_SYNC", "*/15 * * * *"),
wger_admin_cron=_env("ARIADNE_SCHEDULE_WGER_ADMIN", "15 3 * * *"),
metrics_path=_env("METRICS_PATH", "/metrics"),
)
settings = Settings.from_env()

33
ariadne/utils/errors.py Normal file
View File

@ -0,0 +1,33 @@
from __future__ import annotations
import httpx
def safe_error_detail(exc: Exception, fallback: str) -> str:
if isinstance(exc, RuntimeError):
msg = str(exc).strip()
if msg:
return msg
if isinstance(exc, httpx.HTTPStatusError):
detail = f"http {exc.response.status_code}"
try:
payload = exc.response.json()
msg: str | None = None
if isinstance(payload, dict):
raw = payload.get("errorMessage") or payload.get("error") or payload.get("message")
if isinstance(raw, str) and raw.strip():
msg = raw.strip()
elif isinstance(payload, str) and payload.strip():
msg = payload.strip()
if msg:
msg = " ".join(msg.split())
detail = f"{detail}: {msg[:200]}"
except Exception:
text = (exc.response.text or "").strip()
if text:
text = " ".join(text.split())
detail = f"{detail}: {text[:200]}"
return detail
if isinstance(exc, httpx.TimeoutException):
return "timeout"
return fallback

16
ariadne/utils/http.py Normal file
View File

@ -0,0 +1,16 @@
from __future__ import annotations
from fastapi import Request
def extract_bearer_token(request: Request) -> str | None:
header = request.headers.get("Authorization", "")
if not header:
return None
parts = header.split(None, 1)
if len(parts) != 2:
return None
scheme, token = parts[0].lower(), parts[1].strip()
if scheme != "bearer" or not token:
return None
return token

View File

@ -0,0 +1,9 @@
from __future__ import annotations
import secrets
import string
def random_password(length: int = 32) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))

2
requirements-dev.txt Normal file
View File

@ -0,0 +1,2 @@
pytest==8.3.5
pytest-mock==3.14.0

8
requirements.txt Normal file
View File

@ -0,0 +1,8 @@
fastapi==0.115.11
uvicorn[standard]==0.30.6
httpx==0.27.2
PyJWT==2.10.1
psycopg[binary]==3.2.6
psycopg-pool==3.2.6
croniter==2.0.7
prometheus-client==0.21.1

26
tests/test_utils.py Normal file
View File

@ -0,0 +1,26 @@
from __future__ import annotations
import re
from ariadne.services.mailu import MailuService
from ariadne.utils.errors import safe_error_detail
from ariadne.utils.passwords import random_password
def test_random_password_length() -> None:
password = random_password(24)
assert len(password) == 24
assert re.match(r"^[A-Za-z0-9]+$", password)
def test_mailu_resolve_email_attribute() -> None:
attrs = {"mailu_email": ["custom@bstein.dev"]}
assert MailuService.resolve_mailu_email("alice", attrs) == "custom@bstein.dev"
def test_mailu_resolve_email_default() -> None:
assert MailuService.resolve_mailu_email("alice", {}) == "alice@bstein.dev"
def test_safe_error_detail_runtime() -> None:
assert safe_error_detail(RuntimeError("boom"), "fallback") == "boom"