From ee532ac215a35080f47b16d436a4bcb88941eddd Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Mon, 19 Jan 2026 16:57:18 -0300 Subject: [PATCH] feat: add Ariadne provisioning service --- .gitignore | 12 + Dockerfile | 15 + Jenkinsfile | 162 +++++++++ ariadne/__init__.py | 1 + ariadne/app.py | 361 +++++++++++++++++++ ariadne/auth/keycloak.py | 111 ++++++ ariadne/db/database.py | 46 +++ ariadne/db/schema.py | 48 +++ ariadne/db/storage.py | 273 ++++++++++++++ ariadne/k8s/client.py | 63 ++++ ariadne/k8s/jobs.py | 121 +++++++ ariadne/manager/provisioning.py | 520 +++++++++++++++++++++++++++ ariadne/metrics/metrics.py | 71 ++++ ariadne/scheduler/cron.py | 125 +++++++ ariadne/services/firefly.py | 37 ++ ariadne/services/keycloak_admin.py | 192 ++++++++++ ariadne/services/mailer.py | 102 ++++++ ariadne/services/mailu.py | 63 ++++ ariadne/services/nextcloud.py | 37 ++ ariadne/services/vaultwarden.py | 151 ++++++++ ariadne/services/vaultwarden_sync.py | 197 ++++++++++ ariadne/services/wger.py | 49 +++ ariadne/settings.py | 208 +++++++++++ ariadne/utils/errors.py | 33 ++ ariadne/utils/http.py | 16 + ariadne/utils/passwords.py | 9 + requirements-dev.txt | 2 + requirements.txt | 8 + tests/test_utils.py | 26 ++ 29 files changed, 3059 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 Jenkinsfile create mode 100644 ariadne/__init__.py create mode 100644 ariadne/app.py create mode 100644 ariadne/auth/keycloak.py create mode 100644 ariadne/db/database.py create mode 100644 ariadne/db/schema.py create mode 100644 ariadne/db/storage.py create mode 100644 ariadne/k8s/client.py create mode 100644 ariadne/k8s/jobs.py create mode 100644 ariadne/manager/provisioning.py create mode 100644 ariadne/metrics/metrics.py create mode 100644 ariadne/scheduler/cron.py create mode 100644 ariadne/services/firefly.py create mode 100644 ariadne/services/keycloak_admin.py create mode 100644 ariadne/services/mailer.py create mode 100644 ariadne/services/mailu.py create mode 100644 ariadne/services/nextcloud.py create mode 100644 ariadne/services/vaultwarden.py create mode 100644 ariadne/services/vaultwarden_sync.py create mode 100644 ariadne/services/wger.py create mode 100644 ariadne/settings.py create mode 100644 ariadne/utils/errors.py create mode 100644 ariadne/utils/http.py create mode 100644 ariadne/utils/passwords.py create mode 100644 requirements-dev.txt create mode 100644 requirements.txt create mode 100644 tests/test_utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..343fc58 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +__pycache__/ +*.pyc +*.pyo +*.pyd +*.egg-info/ +.venv/ +.venv*/ +.env +.dist/ +build/ +.pytest_cache/ +.coverage diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..98dd53e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.12-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 + +WORKDIR /app + +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt + +COPY ariadne /app/ariadne + +EXPOSE 8080 + +CMD ["uvicorn", "ariadne.app:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000..5c7574b --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,162 @@ +pipeline { + agent { + kubernetes { + label 'ariadne' + defaultContainer 'builder' + yaml """ +apiVersion: v1 +kind: Pod +metadata: + labels: + app: ariadne +spec: + nodeSelector: + kubernetes.io/arch: arm64 + node-role.kubernetes.io/worker: "true" + containers: + - name: dind + image: docker:27-dind + securityContext: + privileged: true + env: + - name: DOCKER_TLS_CERTDIR + value: "" + args: + - --mtu=1400 + - --host=unix:///var/run/docker.sock + - --host=tcp://0.0.0.0:2375 + volumeMounts: + - name: dind-storage + mountPath: /var/lib/docker + - name: builder + image: docker:27 + command: ["cat"] + tty: true + env: + - name: DOCKER_HOST + value: tcp://localhost:2375 + - name: DOCKER_TLS_CERTDIR + value: "" + volumeMounts: + - name: workspace-volume + mountPath: /home/jenkins/agent + - name: docker-config-writable + mountPath: /root/.docker + - name: harbor-config + mountPath: /docker-config + volumes: + - name: workspace-volume + emptyDir: {} + - name: docker-config-writable + emptyDir: {} + - name: dind-storage + emptyDir: {} + - name: harbor-config + secret: + secretName: harbor-bstein-robot + items: + - key: .dockerconfigjson + path: config.json +""" + } + } + environment { + REGISTRY = 'registry.bstein.dev/bstein' + IMAGE = "${REGISTRY}/ariadne" + VERSION_TAG = 'dev' + SEMVER = 'dev' + } + options { + disableConcurrentBuilds() + } + triggers { + pollSCM('H/2 * * * *') + } + stages { + stage('Checkout') { + steps { + checkout scm + } + } + + stage('Prep toolchain') { + steps { + container('builder') { + sh ''' + set -euo pipefail + apk add --no-cache bash git jq + mkdir -p /root/.docker + cp /docker-config/config.json /root/.docker/config.json + ''' + } + } + } + + stage('Compute version') { + steps { + container('builder') { + script { + sh ''' + set -euo pipefail + if git describe --tags --exact-match >/dev/null 2>&1; then + SEMVER="$(git describe --tags --exact-match)" + else + SEMVER="0.1.0-${BUILD_NUMBER}" + fi + if ! echo "$SEMVER" | grep -Eq '^v?[0-9]+\.[0-9]+\.[0-9]+([-.][0-9A-Za-z]+)?$'; then + SEMVER="0.1.0-${BUILD_NUMBER}" + fi + echo "SEMVER=${SEMVER}" > build.env + ''' + def props = readProperties file: 'build.env' + env.SEMVER = props['SEMVER'] ?: "0.1.0-${env.BUILD_NUMBER}" + env.VERSION_TAG = env.SEMVER + } + } + } + } + + stage('Buildx setup') { + steps { + container('builder') { + sh ''' + set -euo pipefail + for i in $(seq 1 10); do + if docker info >/dev/null 2>&1; then + break + fi + sleep 2 + done + docker buildx create --name bstein-builder --driver docker-container --bootstrap --use || docker buildx use bstein-builder + ''' + } + } + } + + stage('Build & push image') { + steps { + container('builder') { + sh ''' + set -euo pipefail + VERSION_TAG="$(cut -d= -f2 build.env)" + docker buildx build \ + --platform linux/arm64 \ + --tag "${IMAGE}:${VERSION_TAG}" \ + --tag "${IMAGE}:latest" \ + --push \ + . + ''' + } + } + } + } + + post { + always { + script { + def props = fileExists('build.env') ? readProperties(file: 'build.env') : [:] + echo "Build complete for ${props['SEMVER'] ?: env.VERSION_TAG}" + } + } + } +} diff --git a/ariadne/__init__.py b/ariadne/__init__.py new file mode 100644 index 0000000..9d48db4 --- /dev/null +++ b/ariadne/__init__.py @@ -0,0 +1 @@ +from __future__ import annotations diff --git a/ariadne/app.py b/ariadne/app.py new file mode 100644 index 0000000..628513c --- /dev/null +++ b/ariadne/app.py @@ -0,0 +1,361 @@ +from __future__ import annotations + +from datetime import datetime, timezone +import threading +from typing import Any + +from fastapi import Depends, FastAPI, HTTPException, Request +from fastapi.responses import JSONResponse, Response +from prometheus_client import CONTENT_TYPE_LATEST, generate_latest + +from .auth.keycloak import AuthContext, authenticator +from .db.database import Database +from .db.storage import Storage +from .manager.provisioning import ProvisioningManager +from .scheduler.cron import CronScheduler +from .services.firefly import firefly +from .services.keycloak_admin import keycloak_admin +from .services.mailu import mailu +from .services.nextcloud import nextcloud +from .services.vaultwarden_sync import run_vaultwarden_sync +from .services.wger import wger +from .settings import settings +from .utils.errors import safe_error_detail +from .utils.http import extract_bearer_token +from .utils.passwords import random_password + + +db = Database(settings.portal_database_url) +storage = Storage(db) +provisioning = ProvisioningManager(db, storage) +scheduler = CronScheduler(storage, settings.schedule_tick_sec) + + +app = FastAPI(title=settings.app_name) + + +def _require_auth(request: Request) -> AuthContext: + token = extract_bearer_token(request) + if not token: + raise HTTPException(status_code=401, detail="missing bearer token") + try: + return authenticator.authenticate(token) + except Exception: + raise HTTPException(status_code=401, detail="invalid token") + + +def _require_admin(ctx: AuthContext) -> None: + if ctx.username and ctx.username in settings.portal_admin_users: + return + if settings.portal_admin_groups and set(ctx.groups).intersection(settings.portal_admin_groups): + return + raise HTTPException(status_code=403, detail="forbidden") + + +def _require_account_access(ctx: AuthContext) -> None: + if not settings.account_allowed_groups: + return + if set(ctx.groups).intersection(settings.account_allowed_groups): + return + raise HTTPException(status_code=403, detail="forbidden") + + +@app.on_event("startup") +def _startup() -> None: + db.ensure_schema() + provisioning.start() + + scheduler.add_task("schedule.mailu_sync", settings.mailu_sync_cron, lambda: mailu.sync("ariadne_schedule")) + scheduler.add_task( + "schedule.nextcloud_sync", + settings.nextcloud_sync_cron, + lambda: nextcloud.sync_mail(wait=False), + ) + scheduler.add_task("schedule.vaultwarden_sync", settings.vaultwarden_sync_cron, run_vaultwarden_sync) + scheduler.add_task("schedule.wger_admin", settings.wger_admin_cron, lambda: wger.ensure_admin(wait=False)) + scheduler.start() + + +@app.on_event("shutdown") +def _shutdown() -> None: + scheduler.stop() + provisioning.stop() + db.close() + + +@app.get("/health") +def health() -> dict[str, Any]: + return {"ok": True} + + +@app.get(settings.metrics_path) +def metrics() -> Response: + payload = generate_latest() + return Response(payload, media_type=CONTENT_TYPE_LATEST) + + +@app.get("/api/admin/access/requests") +def list_access_requests(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: + _require_admin(ctx) + try: + rows = storage.list_pending_requests() + except Exception: + raise HTTPException(status_code=502, detail="failed to load requests") + + output: list[dict[str, Any]] = [] + for row in rows: + created_at = row.get("created_at") + output.append( + { + "id": row.get("request_code"), + "username": row.get("username"), + "email": row.get("contact_email") or "", + "request_code": row.get("request_code"), + "created_at": created_at.isoformat() if isinstance(created_at, datetime) else "", + "note": row.get("note") or "", + } + ) + return JSONResponse({"requests": output}) + + +@app.post("/api/admin/access/requests/{username}/approve") +async def approve_access_request( + username: str, + request: Request, + ctx: AuthContext = Depends(_require_auth), +) -> JSONResponse: + _require_admin(ctx) + try: + payload = await request.json() + except Exception: + payload = {} + + flags_raw = payload.get("flags") if isinstance(payload, dict) else None + flags = [f for f in flags_raw if isinstance(f, str)] if isinstance(flags_raw, list) else [] + flags = [f for f in flags if f in settings.allowed_flag_groups] + note = payload.get("note") if isinstance(payload, dict) else None + note = str(note).strip() if isinstance(note, str) else None + + decided_by = ctx.username or "" + try: + row = db.fetchone( + """ + UPDATE access_requests + SET status = 'approved', + decided_at = NOW(), + decided_by = %s, + approval_flags = %s, + approval_note = %s + WHERE username = %s + AND status = 'pending' + AND email_verified_at IS NOT NULL + RETURNING request_code + """, + (decided_by or None, flags or None, note, username), + ) + except Exception: + raise HTTPException(status_code=502, detail="failed to approve request") + + if not row: + return JSONResponse({"ok": True, "request_code": ""}) + + request_code = row.get("request_code") or "" + if request_code: + threading.Thread( + target=provisioning.provision_access_request, + args=(request_code,), + daemon=True, + ).start() + return JSONResponse({"ok": True, "request_code": request_code}) + + +@app.post("/api/admin/access/requests/{username}/deny") +async def deny_access_request( + username: str, + request: Request, + ctx: AuthContext = Depends(_require_auth), +) -> JSONResponse: + _require_admin(ctx) + try: + payload = await request.json() + except Exception: + payload = {} + note = payload.get("note") if isinstance(payload, dict) else None + note = str(note).strip() if isinstance(note, str) else None + decided_by = ctx.username or "" + + try: + row = db.fetchone( + """ + UPDATE access_requests + SET status = 'denied', + decided_at = NOW(), + decided_by = %s, + denial_note = %s + WHERE username = %s AND status = 'pending' + RETURNING request_code + """, + (decided_by or None, note, username), + ) + except Exception: + raise HTTPException(status_code=502, detail="failed to deny request") + + if not row: + return JSONResponse({"ok": True, "request_code": ""}) + return JSONResponse({"ok": True, "request_code": row.get("request_code")}) + + +@app.post("/api/account/mailu/rotate") +def rotate_mailu_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: + _require_account_access(ctx) + if not keycloak_admin.ready(): + raise HTTPException(status_code=503, detail="server not configured") + + username = ctx.username or "" + if not username: + raise HTTPException(status_code=400, detail="missing username") + + password = random_password() + try: + keycloak_admin.set_user_attribute(username, "mailu_app_password", password) + except Exception: + raise HTTPException(status_code=502, detail="failed to update mail password") + + sync_enabled = bool(settings.mailu_sync_url) + sync_ok = False + sync_error = "" + if sync_enabled: + try: + mailu.sync("ariadne_mailu_rotate") + sync_ok = True + except Exception as exc: + sync_error = safe_error_detail(exc, "sync request failed") + + nextcloud_sync: dict[str, Any] = {"status": "skipped"} + try: + nextcloud_sync = nextcloud.sync_mail(username, wait=True) + except Exception as exc: + nextcloud_sync = {"status": "error", "detail": safe_error_detail(exc, "failed to sync nextcloud")} + + return JSONResponse( + { + "password": password, + "sync_enabled": sync_enabled, + "sync_ok": sync_ok, + "sync_error": sync_error, + "nextcloud_sync": nextcloud_sync, + } + ) + + +@app.post("/api/account/wger/reset") +def reset_wger_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: + _require_account_access(ctx) + if not keycloak_admin.ready(): + raise HTTPException(status_code=503, detail="server not configured") + + username = ctx.username or "" + if not username: + raise HTTPException(status_code=400, detail="missing username") + + mailu_email = f"{username}@{settings.mailu_domain}" + try: + user = keycloak_admin.find_user(username) or {} + attrs = user.get("attributes") if isinstance(user, dict) else None + if isinstance(attrs, dict): + raw_mailu = attrs.get("mailu_email") + if isinstance(raw_mailu, list) and raw_mailu: + mailu_email = str(raw_mailu[0]) + elif isinstance(raw_mailu, str) and raw_mailu: + mailu_email = raw_mailu + except Exception: + pass + + password = random_password() + try: + result = wger.sync_user(username, mailu_email, password, wait=True) + status_val = result.get("status") if isinstance(result, dict) else "error" + if status_val != "ok": + raise RuntimeError(f"wger sync {status_val}") + except Exception as exc: + raise HTTPException(status_code=502, detail=safe_error_detail(exc, "wger sync failed")) + + try: + keycloak_admin.set_user_attribute(username, "wger_password", password) + keycloak_admin.set_user_attribute( + username, + "wger_password_updated_at", + datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + ) + except Exception: + raise HTTPException(status_code=502, detail="failed to store wger password") + + return JSONResponse({"status": "ok", "password": password}) + + +@app.post("/api/account/firefly/reset") +def reset_firefly_password(ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: + _require_account_access(ctx) + if not keycloak_admin.ready(): + raise HTTPException(status_code=503, detail="server not configured") + + username = ctx.username or "" + if not username: + raise HTTPException(status_code=400, detail="missing username") + + mailu_email = f"{username}@{settings.mailu_domain}" + try: + user = keycloak_admin.find_user(username) or {} + attrs = user.get("attributes") if isinstance(user, dict) else None + if isinstance(attrs, dict): + raw_mailu = attrs.get("mailu_email") + if isinstance(raw_mailu, list) and raw_mailu: + mailu_email = str(raw_mailu[0]) + elif isinstance(raw_mailu, str) and raw_mailu: + mailu_email = raw_mailu + except Exception: + pass + + password = random_password(24) + try: + result = firefly.sync_user(mailu_email, password, wait=True) + status_val = result.get("status") if isinstance(result, dict) else "error" + if status_val != "ok": + raise RuntimeError(f"firefly sync {status_val}") + except Exception as exc: + raise HTTPException(status_code=502, detail=safe_error_detail(exc, "firefly sync failed")) + + try: + keycloak_admin.set_user_attribute(username, "firefly_password", password) + keycloak_admin.set_user_attribute( + username, + "firefly_password_updated_at", + datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + ) + except Exception: + raise HTTPException(status_code=502, detail="failed to store firefly password") + + return JSONResponse({"status": "ok", "password": password}) + + +@app.post("/api/account/nextcloud/mail/sync") +async def nextcloud_mail_sync(request: Request, ctx: AuthContext = Depends(_require_auth)) -> JSONResponse: + _require_account_access(ctx) + if not keycloak_admin.ready(): + raise HTTPException(status_code=503, detail="server not configured") + + username = ctx.username or "" + if not username: + raise HTTPException(status_code=400, detail="missing username") + + try: + payload = await request.json() + except Exception: + payload = {} + wait = bool(payload.get("wait", True)) if isinstance(payload, dict) else True + + try: + result = nextcloud.sync_mail(username, wait=wait) + return JSONResponse(result) + except Exception as exc: + raise HTTPException(status_code=502, detail=safe_error_detail(exc, "failed to sync nextcloud mail")) diff --git a/ariadne/auth/keycloak.py b/ariadne/auth/keycloak.py new file mode 100644 index 0000000..2b3b39d --- /dev/null +++ b/ariadne/auth/keycloak.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any +import time + +import httpx +import jwt + +from ..settings import settings + + +@dataclass(frozen=True) +class AuthContext: + username: str + email: str + groups: list[str] + claims: dict[str, Any] + + +class KeycloakOIDC: + def __init__(self, jwks_url: str, issuer: str, client_id: str) -> None: + self._jwks_url = jwks_url + self._issuer = issuer + self._client_id = client_id + self._jwks: dict[str, Any] | None = None + self._jwks_fetched_at: float = 0.0 + self._jwks_ttl_sec = 300.0 + + def verify(self, token: str) -> dict[str, Any]: + if not token: + raise ValueError("missing token") + jwks = self._get_jwks() + header = jwt.get_unverified_header(token) + kid = header.get("kid") + if not isinstance(kid, str): + raise ValueError("token missing kid") + key = None + for candidate in jwks.get("keys", []) if isinstance(jwks, dict) else []: + if isinstance(candidate, dict) and candidate.get("kid") == kid: + key = candidate + break + if not key: + self._jwks = None + jwks = self._get_jwks(force=True) + for candidate in jwks.get("keys", []) if isinstance(jwks, dict) else []: + if isinstance(candidate, dict) and candidate.get("kid") == kid: + key = candidate + break + if not key: + raise ValueError("token kid not found") + + claims = jwt.decode( + token, + key=jwt.algorithms.RSAAlgorithm.from_jwk(key), + algorithms=["RS256"], + options={"verify_aud": False}, + issuer=self._issuer, + ) + + azp = claims.get("azp") + aud = claims.get("aud") + aud_list: list[str] = [] + if isinstance(aud, str): + aud_list = [aud] + elif isinstance(aud, list): + aud_list = [a for a in aud if isinstance(a, str)] + if azp != self._client_id and self._client_id not in aud_list: + raise ValueError("token not issued for expected client") + + return claims + + def _get_jwks(self, force: bool = False) -> dict[str, Any]: + now = time.time() + if not force and self._jwks and now - self._jwks_fetched_at < self._jwks_ttl_sec: + return self._jwks + with httpx.Client(timeout=5.0) as client: + resp = client.get(self._jwks_url) + resp.raise_for_status() + payload = resp.json() + if not isinstance(payload, dict): + raise ValueError("jwks payload invalid") + self._jwks = payload + self._jwks_fetched_at = now + return payload + + +class Authenticator: + def __init__(self) -> None: + self._oidc = KeycloakOIDC(settings.keycloak_jwks_url, settings.keycloak_issuer, settings.keycloak_client_id) + + @staticmethod + def _normalize_groups(groups: Any) -> list[str]: + if not isinstance(groups, list): + return [] + cleaned: list[str] = [] + for name in groups: + if not isinstance(name, str): + continue + cleaned.append(name.lstrip("/")) + return [name for name in cleaned if name] + + def authenticate(self, token: str) -> AuthContext: + claims = self._oidc.verify(token) + username = claims.get("preferred_username") or "" + email = claims.get("email") or "" + groups = self._normalize_groups(claims.get("groups")) + return AuthContext(username=username, email=email, groups=groups, claims=claims) + + +authenticator = Authenticator() diff --git a/ariadne/db/database.py b/ariadne/db/database.py new file mode 100644 index 0000000..ee9e9ba --- /dev/null +++ b/ariadne/db/database.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +from contextlib import contextmanager +from typing import Any, Iterable + +import psycopg +from psycopg_pool import ConnectionPool + +from .schema import ARIADNE_ACCESS_REQUEST_ALTER, ARIADNE_TABLES_SQL + + +class Database: + def __init__(self, dsn: str, pool_size: int = 5) -> None: + if not dsn: + raise RuntimeError("PORTAL_DATABASE_URL is required") + self._pool = ConnectionPool(conninfo=dsn, max_size=pool_size) + + @contextmanager + def connection(self): + with self._pool.connection() as conn: + conn.row_factory = psycopg.rows.dict_row + yield conn + + def ensure_schema(self) -> None: + with self.connection() as conn: + for stmt in ARIADNE_TABLES_SQL: + conn.execute(stmt) + for stmt in ARIADNE_ACCESS_REQUEST_ALTER: + conn.execute(stmt) + + def fetchone(self, query: str, params: Iterable[Any] | None = None) -> dict[str, Any] | None: + with self.connection() as conn: + row = conn.execute(query, params or ()).fetchone() + return dict(row) if row else None + + def fetchall(self, query: str, params: Iterable[Any] | None = None) -> list[dict[str, Any]]: + with self.connection() as conn: + rows = conn.execute(query, params or ()).fetchall() + return [dict(row) for row in rows] + + def execute(self, query: str, params: Iterable[Any] | None = None) -> None: + with self.connection() as conn: + conn.execute(query, params or ()) + + def close(self) -> None: + self._pool.close() diff --git a/ariadne/db/schema.py b/ariadne/db/schema.py new file mode 100644 index 0000000..e62a201 --- /dev/null +++ b/ariadne/db/schema.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +ARIADNE_TABLES_SQL = [ + """ + CREATE TABLE IF NOT EXISTS ariadne_task_runs ( + id BIGSERIAL PRIMARY KEY, + request_code TEXT, + task TEXT NOT NULL, + status TEXT NOT NULL, + detail TEXT, + started_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + finished_at TIMESTAMPTZ, + duration_ms INTEGER + ) + """, + """ + CREATE INDEX IF NOT EXISTS ariadne_task_runs_request_code_idx + ON ariadne_task_runs (request_code) + """, + """ + CREATE TABLE IF NOT EXISTS ariadne_schedule_state ( + task_name TEXT PRIMARY KEY, + cron_expr TEXT NOT NULL, + last_started_at TIMESTAMPTZ, + last_finished_at TIMESTAMPTZ, + last_status TEXT, + last_error TEXT, + last_duration_ms INTEGER, + next_run_at TIMESTAMPTZ, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """, + """ + CREATE TABLE IF NOT EXISTS ariadne_events ( + id BIGSERIAL PRIMARY KEY, + event_type TEXT NOT NULL, + detail TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + """, +] + +ARIADNE_ACCESS_REQUEST_ALTER = [ + "ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS welcome_email_sent_at TIMESTAMPTZ", + "ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS approval_flags TEXT[]", + "ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS approval_note TEXT", + "ALTER TABLE access_requests ADD COLUMN IF NOT EXISTS denial_note TEXT", +] diff --git a/ariadne/db/storage.py b/ariadne/db/storage.py new file mode 100644 index 0000000..637d91f --- /dev/null +++ b/ariadne/db/storage.py @@ -0,0 +1,273 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Iterable + +from .database import Database + + +REQUIRED_TASKS = ( + "keycloak_user", + "keycloak_password", + "keycloak_groups", + "mailu_app_password", + "mailu_sync", + "nextcloud_mail_sync", + "wger_account", + "firefly_account", + "vaultwarden_invite", +) + + +@dataclass(frozen=True) +class AccessRequest: + request_code: str + username: str + contact_email: str + status: str + email_verified_at: datetime | None + initial_password: str | None + initial_password_revealed_at: datetime | None + provision_attempted_at: datetime | None + approval_flags: list[str] + approval_note: str | None + denial_note: str | None + + +class Storage: + def __init__(self, db: Database) -> None: + self._db = db + + def ensure_task_rows(self, request_code: str, tasks: Iterable[str]) -> None: + tasks_list = list(tasks) + if not tasks_list: + return + self._db.execute( + """ + INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) + SELECT %s, task, 'pending', NULL, NOW() + FROM UNNEST(%s::text[]) AS task + ON CONFLICT (request_code, task) DO NOTHING + """, + (request_code, tasks_list), + ) + + def update_task(self, request_code: str, task: str, status: str, detail: str | None) -> None: + self._db.execute( + """ + INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) + VALUES (%s, %s, %s, %s, NOW()) + ON CONFLICT (request_code, task) + DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW() + """, + (request_code, task, status, detail), + ) + + def task_statuses(self, request_code: str) -> dict[str, str]: + rows = self._db.fetchall( + "SELECT task, status FROM access_request_tasks WHERE request_code = %s", + (request_code,), + ) + output: dict[str, str] = {} + for row in rows: + task = row.get("task") + status = row.get("status") + if isinstance(task, str) and isinstance(status, str): + output[task] = status + return output + + def tasks_complete(self, request_code: str, tasks: Iterable[str]) -> bool: + statuses = self.task_statuses(request_code) + for task in tasks: + if statuses.get(task) != "ok": + return False + return True + + def fetch_access_request(self, request_code: str) -> AccessRequest | None: + row = self._db.fetchone( + """ + SELECT request_code, username, contact_email, status, email_verified_at, + initial_password, initial_password_revealed_at, provision_attempted_at, + approval_flags, approval_note, denial_note + FROM access_requests + WHERE request_code = %s + """, + (request_code,), + ) + if not row: + return None + return self._row_to_request(row) + + def find_access_request_by_username(self, username: str) -> AccessRequest | None: + row = self._db.fetchone( + """ + SELECT request_code, username, contact_email, status, email_verified_at, + initial_password, initial_password_revealed_at, provision_attempted_at, + approval_flags, approval_note, denial_note + FROM access_requests + WHERE username = %s + ORDER BY created_at DESC + LIMIT 1 + """, + (username,), + ) + if not row: + return None + return self._row_to_request(row) + + def list_pending_requests(self) -> list[dict[str, Any]]: + return self._db.fetchall( + """ + SELECT request_code, username, contact_email, note, status, created_at + FROM access_requests + WHERE status = 'pending' + ORDER BY created_at ASC + LIMIT 200 + """ + ) + + def list_provision_candidates(self) -> list[AccessRequest]: + rows = self._db.fetchall( + """ + SELECT request_code, username, contact_email, status, email_verified_at, + initial_password, initial_password_revealed_at, provision_attempted_at, + approval_flags, approval_note, denial_note + FROM access_requests + WHERE status IN ('approved', 'accounts_building') + ORDER BY created_at ASC + LIMIT 200 + """ + ) + return [self._row_to_request(row) for row in rows] + + def update_status(self, request_code: str, status: str) -> None: + self._db.execute( + "UPDATE access_requests SET status = %s WHERE request_code = %s", + (status, request_code), + ) + + def mark_provision_attempted(self, request_code: str) -> None: + self._db.execute( + "UPDATE access_requests SET provision_attempted_at = NOW() WHERE request_code = %s", + (request_code,), + ) + + def set_initial_password(self, request_code: str, password: str) -> None: + self._db.execute( + """ + UPDATE access_requests + SET initial_password = %s + WHERE request_code = %s AND initial_password IS NULL + """, + (password, request_code), + ) + + def mark_welcome_sent(self, request_code: str) -> None: + self._db.execute( + """ + UPDATE access_requests + SET welcome_email_sent_at = NOW() + WHERE request_code = %s AND welcome_email_sent_at IS NULL + """, + (request_code,), + ) + + def update_approval(self, request_code: str, status: str, decided_by: str, flags: list[str], note: str | None) -> None: + self._db.execute( + """ + UPDATE access_requests + SET status = %s, + decided_at = NOW(), + decided_by = %s, + approval_flags = %s, + approval_note = %s, + denial_note = CASE WHEN %s = 'denied' THEN %s ELSE denial_note END + WHERE request_code = %s + """, + (status, decided_by or None, flags or None, note, status, note, request_code), + ) + + def record_task_run( + self, + request_code: str | None, + task: str, + status: str, + detail: str | None, + started_at: datetime, + finished_at: datetime | None, + duration_ms: int | None, + ) -> None: + self._db.execute( + """ + INSERT INTO ariadne_task_runs + (request_code, task, status, detail, started_at, finished_at, duration_ms) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """, + (request_code, task, status, detail, started_at, finished_at, duration_ms), + ) + + def update_schedule_state( + self, + task_name: str, + cron_expr: str, + last_started_at: datetime | None, + last_finished_at: datetime | None, + last_status: str | None, + last_error: str | None, + last_duration_ms: int | None, + next_run_at: datetime | None, + ) -> None: + self._db.execute( + """ + INSERT INTO ariadne_schedule_state + (task_name, cron_expr, last_started_at, last_finished_at, last_status, + last_error, last_duration_ms, next_run_at, updated_at) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW()) + ON CONFLICT (task_name) DO UPDATE + SET cron_expr = EXCLUDED.cron_expr, + last_started_at = EXCLUDED.last_started_at, + last_finished_at = EXCLUDED.last_finished_at, + last_status = EXCLUDED.last_status, + last_error = EXCLUDED.last_error, + last_duration_ms = EXCLUDED.last_duration_ms, + next_run_at = EXCLUDED.next_run_at, + updated_at = NOW() + """, + ( + task_name, + cron_expr, + last_started_at, + last_finished_at, + last_status, + last_error, + last_duration_ms, + next_run_at, + ), + ) + + def record_event(self, event_type: str, detail: str | None) -> None: + self._db.execute( + "INSERT INTO ariadne_events (event_type, detail) VALUES (%s, %s)", + (event_type, detail), + ) + + @staticmethod + def _row_to_request(row: dict[str, Any]) -> AccessRequest: + flags = row.get("approval_flags") + flags_list: list[str] = [] + if isinstance(flags, list): + flags_list = [str(item) for item in flags if item] + return AccessRequest( + request_code=str(row.get("request_code") or ""), + username=str(row.get("username") or ""), + contact_email=str(row.get("contact_email") or ""), + status=str(row.get("status") or ""), + email_verified_at=row.get("email_verified_at"), + initial_password=row.get("initial_password"), + initial_password_revealed_at=row.get("initial_password_revealed_at"), + provision_attempted_at=row.get("provision_attempted_at"), + approval_flags=flags_list, + approval_note=row.get("approval_note"), + denial_note=row.get("denial_note"), + ) diff --git a/ariadne/k8s/client.py b/ariadne/k8s/client.py new file mode 100644 index 0000000..25fe7f6 --- /dev/null +++ b/ariadne/k8s/client.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import base64 +from pathlib import Path +from typing import Any + +import httpx + +from ..settings import settings + + +_K8S_BASE_URL = "https://kubernetes.default.svc" +_SA_PATH = Path("/var/run/secrets/kubernetes.io/serviceaccount") + + +def _read_service_account() -> tuple[str, str]: + token_path = _SA_PATH / "token" + ca_path = _SA_PATH / "ca.crt" + if not token_path.exists() or not ca_path.exists(): + raise RuntimeError("kubernetes service account token missing") + token = token_path.read_text().strip() + if not token: + raise RuntimeError("kubernetes service account token empty") + return token, str(ca_path) + + +def _k8s_request(method: str, path: str, payload: dict[str, Any] | None = None) -> Any: + token, ca_path = _read_service_account() + url = f"{_K8S_BASE_URL}{path}" + headers = {"Authorization": f"Bearer {token}"} + with httpx.Client(verify=ca_path, timeout=settings.k8s_api_timeout_sec, headers=headers) as client: + resp = client.request(method, url, json=payload) + resp.raise_for_status() + return resp.json() + + +def get_json(path: str) -> dict[str, Any]: + payload = _k8s_request("GET", path) + if not isinstance(payload, dict): + raise RuntimeError("unexpected kubernetes response") + return payload + + +def post_json(path: str, payload: dict[str, Any]) -> dict[str, Any]: + data = _k8s_request("POST", path, payload) + if not isinstance(data, dict): + raise RuntimeError("unexpected kubernetes response") + return data + + +def get_secret_value(namespace: str, name: str, key: str) -> str: + data = get_json(f"/api/v1/namespaces/{namespace}/secrets/{name}") + blob = data.get("data") if isinstance(data.get("data"), dict) else {} + raw = blob.get(key) + if not isinstance(raw, str) or not raw: + raise RuntimeError("secret key missing") + try: + decoded = base64.b64decode(raw).decode("utf-8").strip() + except Exception as exc: + raise RuntimeError("failed to decode secret") from exc + if not decoded: + raise RuntimeError("secret value empty") + return decoded diff --git a/ariadne/k8s/jobs.py b/ariadne/k8s/jobs.py new file mode 100644 index 0000000..da99e51 --- /dev/null +++ b/ariadne/k8s/jobs.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import re +import time +from typing import Any + +from .client import get_json, post_json + + +class JobSpawner: + def __init__(self, namespace: str, cronjob_name: str) -> None: + self._namespace = namespace + self._cronjob_name = cronjob_name + + @staticmethod + def _safe_name_fragment(value: str, max_len: int = 24) -> str: + cleaned = re.sub(r"[^a-z0-9-]+", "-", (value or "").lower()).strip("-") + if not cleaned: + cleaned = "job" + return cleaned[:max_len].rstrip("-") or "job" + + def _job_from_cronjob( + self, + cronjob: dict[str, Any], + label_suffix: str, + env_overrides: list[dict[str, str]] | None = None, + job_ttl_seconds: int | None = None, + ) -> dict[str, Any]: + spec = cronjob.get("spec") if isinstance(cronjob.get("spec"), dict) else {} + jt = spec.get("jobTemplate") if isinstance(spec.get("jobTemplate"), dict) else {} + job_spec = jt.get("spec") if isinstance(jt.get("spec"), dict) else {} + + now = int(time.time()) + safe_label = self._safe_name_fragment(label_suffix) + job_name = f"{self._cronjob_name}-{safe_label}-{now}" + + job: dict[str, Any] = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": job_name, + "namespace": self._namespace, + "labels": { + "app": self._cronjob_name, + "atlas.bstein.dev/trigger": "ariadne", + "atlas.bstein.dev/label": safe_label, + }, + }, + "spec": job_spec, + } + + if isinstance(job_ttl_seconds, int) and job_ttl_seconds > 0: + job.setdefault("spec", {}) + job["spec"]["ttlSecondsAfterFinished"] = job_ttl_seconds + + tpl = job.get("spec", {}).get("template", {}) + pod_spec = tpl.get("spec") if isinstance(tpl.get("spec"), dict) else {} + containers = pod_spec.get("containers") if isinstance(pod_spec.get("containers"), list) else [] + if containers and isinstance(containers[0], dict) and env_overrides: + env = containers[0].get("env") + if not isinstance(env, list): + env = [] + env = [e for e in env if not (isinstance(e, dict) and e.get("name") in {item["name"] for item in env_overrides})] + env.extend(env_overrides) + containers[0]["env"] = env + pod_spec["containers"] = containers + tpl["spec"] = pod_spec + job["spec"]["template"] = tpl + + return job + + def trigger( + self, + label_suffix: str, + env_overrides: list[dict[str, str]] | None = None, + job_ttl_seconds: int | None = None, + ) -> dict[str, Any]: + cronjob = get_json(f"/apis/batch/v1/namespaces/{self._namespace}/cronjobs/{self._cronjob_name}") + job_payload = self._job_from_cronjob(cronjob, label_suffix, env_overrides, job_ttl_seconds) + created = post_json(f"/apis/batch/v1/namespaces/{self._namespace}/jobs", job_payload) + job_name = ( + created.get("metadata", {}).get("name") + if isinstance(created.get("metadata"), dict) + else job_payload.get("metadata", {}).get("name") + ) + if not isinstance(job_name, str) or not job_name: + raise RuntimeError("job name missing") + return {"job": job_name, "status": "queued"} + + def wait_for_completion(self, job_name: str, timeout_sec: float) -> dict[str, Any]: + deadline = time.time() + timeout_sec + while time.time() < deadline: + job = get_json(f"/apis/batch/v1/namespaces/{self._namespace}/jobs/{job_name}") + status = job.get("status") if isinstance(job.get("status"), dict) else {} + if int(status.get("succeeded") or 0) > 0: + return {"job": job_name, "status": "ok"} + if int(status.get("failed") or 0) > 0: + return {"job": job_name, "status": "error"} + conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else [] + for cond in conditions: + if not isinstance(cond, dict): + continue + if cond.get("type") == "Complete" and cond.get("status") == "True": + return {"job": job_name, "status": "ok"} + if cond.get("type") == "Failed" and cond.get("status") == "True": + return {"job": job_name, "status": "error"} + time.sleep(2) + return {"job": job_name, "status": "running"} + + def trigger_and_wait( + self, + label_suffix: str, + env_overrides: list[dict[str, str]] | None, + timeout_sec: float, + job_ttl_seconds: int | None = None, + ) -> dict[str, Any]: + created = self.trigger(label_suffix, env_overrides, job_ttl_seconds) + job_name = created.get("job") + if not isinstance(job_name, str) or not job_name: + raise RuntimeError("job name missing") + return self.wait_for_completion(job_name, timeout_sec) diff --git a/ariadne/manager/provisioning.py b/ariadne/manager/provisioning.py new file mode 100644 index 0000000..38ef388 --- /dev/null +++ b/ariadne/manager/provisioning.py @@ -0,0 +1,520 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +import hashlib +import threading +import time +from typing import Any + +from ..db.database import Database +from ..db.storage import REQUIRED_TASKS, Storage +from ..metrics.metrics import record_task_run, set_access_request_counts +from ..services.firefly import firefly +from ..services.keycloak_admin import keycloak_admin +from ..services.mailer import MailerError, mailer +from ..services.mailu import mailu +from ..services.nextcloud import nextcloud +from ..services.vaultwarden import vaultwarden +from ..services.wger import wger +from ..settings import settings +from ..utils.errors import safe_error_detail +from ..utils.passwords import random_password + + +MAILU_EMAIL_ATTR = "mailu_email" +MAILU_APP_PASSWORD_ATTR = "mailu_app_password" +MAILU_ENABLED_ATTR = "mailu_enabled" +WGER_PASSWORD_ATTR = "wger_password" +WGER_PASSWORD_UPDATED_ATTR = "wger_password_updated_at" +FIREFLY_PASSWORD_ATTR = "firefly_password" +FIREFLY_PASSWORD_UPDATED_ATTR = "firefly_password_updated_at" + + +@dataclass(frozen=True) +class ProvisionOutcome: + ok: bool + status: str + + +def _advisory_lock_id(request_code: str) -> int: + digest = hashlib.sha256(request_code.encode("utf-8")).digest() + return int.from_bytes(digest[:8], "big", signed=True) + + +def _extract_attr(attrs: Any, key: str) -> str: + if not isinstance(attrs, dict): + return "" + raw = attrs.get(key) + if isinstance(raw, list): + for item in raw: + if isinstance(item, str) and item.strip(): + return item.strip() + return "" + if isinstance(raw, str) and raw.strip(): + return raw.strip() + return "" + + +class ProvisioningManager: + def __init__(self, db: Database, storage: Storage) -> None: + self._db = db + self._storage = storage + self._thread: threading.Thread | None = None + self._stop_event = threading.Event() + + def start(self) -> None: + if self._thread and self._thread.is_alive(): + return + self._stop_event.clear() + self._thread = threading.Thread(target=self._run_loop, name="ariadne-provision", daemon=True) + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + if self._thread: + self._thread.join(timeout=5) + + def _run_loop(self) -> None: + while not self._stop_event.is_set(): + try: + self._sync_status_metrics() + except Exception: + pass + if not keycloak_admin.ready(): + time.sleep(settings.provision_poll_interval_sec) + continue + candidates = self._storage.list_provision_candidates() + for request in candidates: + self.provision_access_request(request.request_code) + time.sleep(settings.provision_poll_interval_sec) + + def _sync_status_metrics(self) -> None: + counts = self._db.fetchall( + "SELECT status, COUNT(*) AS count FROM access_requests GROUP BY status" + ) + payload: dict[str, int] = {} + for row in counts: + status = row.get("status") + count = row.get("count") + if isinstance(status, str) and isinstance(count, int): + payload[status] = count + set_access_request_counts(payload) + + def provision_access_request(self, request_code: str) -> ProvisionOutcome: + if not request_code: + return ProvisionOutcome(ok=False, status="unknown") + if not keycloak_admin.ready(): + return ProvisionOutcome(ok=False, status="accounts_building") + + required_tasks = list(REQUIRED_TASKS) + + with self._db.connection() as conn: + lock_id = _advisory_lock_id(request_code) + locked_row = conn.execute("SELECT pg_try_advisory_lock(%s) AS locked", (lock_id,)).fetchone() + if not locked_row or not locked_row.get("locked"): + return ProvisionOutcome(ok=False, status="accounts_building") + + try: + row = conn.execute( + """ + SELECT username, + contact_email, + email_verified_at, + status, + initial_password, + initial_password_revealed_at, + provision_attempted_at, + approval_flags + FROM access_requests + WHERE request_code = %s + """, + (request_code,), + ).fetchone() + if not row: + return ProvisionOutcome(ok=False, status="unknown") + + username = str(row.get("username") or "") + contact_email = str(row.get("contact_email") or "") + email_verified_at = row.get("email_verified_at") + status = str(row.get("status") or "") + initial_password = row.get("initial_password") + revealed_at = row.get("initial_password_revealed_at") + attempted_at = row.get("provision_attempted_at") + approval_flags = row.get("approval_flags") if isinstance(row.get("approval_flags"), list) else [] + + if status == "approved": + conn.execute( + """ + UPDATE access_requests + SET status = 'accounts_building' + WHERE request_code = %s AND status = 'approved' + """, + (request_code,), + ) + status = "accounts_building" + + if status not in {"accounts_building", "awaiting_onboarding", "ready"}: + return ProvisionOutcome(ok=False, status=status or "unknown") + + self._ensure_task_rows(conn, request_code, required_tasks) + + if status == "accounts_building": + now = datetime.now(timezone.utc) + if isinstance(attempted_at, datetime): + if attempted_at.tzinfo is None: + attempted_at = attempted_at.replace(tzinfo=timezone.utc) + age_sec = (now - attempted_at).total_seconds() + if age_sec < settings.provision_retry_cooldown_sec: + return ProvisionOutcome(ok=False, status="accounts_building") + conn.execute( + "UPDATE access_requests SET provision_attempted_at = NOW() WHERE request_code = %s", + (request_code,), + ) + + user_id = "" + mailu_email = f"{username}@{settings.mailu_domain}" + + # Task: ensure Keycloak user exists + start = datetime.now(timezone.utc) + try: + user = keycloak_admin.find_user(username) + if not user: + if not isinstance(email_verified_at, datetime): + raise RuntimeError("missing verified email address") + email = contact_email.strip() + if not email: + raise RuntimeError("missing verified email address") + existing_email_user = keycloak_admin.find_user_by_email(email) + if existing_email_user and (existing_email_user.get("username") or "") != username: + raise RuntimeError("email is already associated with an existing Atlas account") + payload = { + "username": username, + "enabled": True, + "email": email, + "emailVerified": True, + "requiredActions": [], + "attributes": { + MAILU_EMAIL_ATTR: [mailu_email], + MAILU_ENABLED_ATTR: ["true"], + }, + } + created_id = keycloak_admin.create_user(payload) + user = keycloak_admin.get_user(created_id) + user_id = str((user or {}).get("id") or "") + if not user_id: + raise RuntimeError("user id missing") + + try: + full = keycloak_admin.get_user(user_id) + attrs = full.get("attributes") or {} + actions = full.get("requiredActions") + if isinstance(actions, list) and "CONFIGURE_TOTP" in actions: + new_actions = [a for a in actions if a != "CONFIGURE_TOTP"] + keycloak_admin.update_user(user_id, {"requiredActions": new_actions}) + if isinstance(attrs, dict): + existing = _extract_attr(attrs, MAILU_EMAIL_ATTR) + if existing: + mailu_email = existing + else: + mailu_email = f"{username}@{settings.mailu_domain}" + keycloak_admin.set_user_attribute(username, MAILU_EMAIL_ATTR, mailu_email) + enabled_value = _extract_attr(attrs, MAILU_ENABLED_ATTR) + if enabled_value.lower() not in {"1", "true", "yes", "y", "on"}: + keycloak_admin.set_user_attribute(username, MAILU_ENABLED_ATTR, "true") + except Exception: + mailu_email = f"{username}@{settings.mailu_domain}" + + self._upsert_task(conn, request_code, "keycloak_user", "ok", None) + self._record_task(request_code, "keycloak_user", "ok", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to ensure user") + self._upsert_task(conn, request_code, "keycloak_user", "error", detail) + self._record_task(request_code, "keycloak_user", "error", detail, start) + + if not user_id: + return ProvisionOutcome(ok=False, status="accounts_building") + + # Task: set initial password for Keycloak + start = datetime.now(timezone.utc) + try: + should_reset = status == "accounts_building" and revealed_at is None + password_value: str | None = None + + if should_reset: + if isinstance(initial_password, str) and initial_password: + password_value = initial_password + elif initial_password is None: + password_value = random_password(20) + conn.execute( + """ + UPDATE access_requests + SET initial_password = %s + WHERE request_code = %s AND initial_password IS NULL + """, + (password_value, request_code), + ) + initial_password = password_value + + if password_value: + keycloak_admin.reset_password(user_id, password_value, temporary=False) + + if isinstance(initial_password, str) and initial_password: + self._upsert_task(conn, request_code, "keycloak_password", "ok", None) + self._record_task(request_code, "keycloak_password", "ok", None, start) + elif revealed_at is not None: + detail = "initial password already revealed" + self._upsert_task(conn, request_code, "keycloak_password", "ok", detail) + self._record_task(request_code, "keycloak_password", "ok", detail, start) + else: + raise RuntimeError("initial password missing") + except Exception as exc: + detail = safe_error_detail(exc, "failed to set password") + self._upsert_task(conn, request_code, "keycloak_password", "error", detail) + self._record_task(request_code, "keycloak_password", "error", detail, start) + + # Task: group membership + start = datetime.now(timezone.utc) + try: + approved_flags = [flag for flag in approval_flags if flag in settings.allowed_flag_groups] + groups = list(dict.fromkeys(settings.default_user_groups + approved_flags)) + for group_name in groups: + gid = keycloak_admin.get_group_id(group_name) + if not gid: + raise RuntimeError(f"group missing: {group_name}") + keycloak_admin.add_user_to_group(user_id, gid) + self._upsert_task(conn, request_code, "keycloak_groups", "ok", None) + self._record_task(request_code, "keycloak_groups", "ok", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to add groups") + self._upsert_task(conn, request_code, "keycloak_groups", "error", detail) + self._record_task(request_code, "keycloak_groups", "error", detail, start) + + # Task: ensure mailu app password exists + start = datetime.now(timezone.utc) + try: + full = keycloak_admin.get_user(user_id) + attrs = full.get("attributes") or {} + existing = _extract_attr(attrs, MAILU_APP_PASSWORD_ATTR) + if not existing: + keycloak_admin.set_user_attribute(username, MAILU_APP_PASSWORD_ATTR, random_password()) + self._upsert_task(conn, request_code, "mailu_app_password", "ok", None) + self._record_task(request_code, "mailu_app_password", "ok", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to set mail password") + self._upsert_task(conn, request_code, "mailu_app_password", "error", detail) + self._record_task(request_code, "mailu_app_password", "error", detail, start) + + # Task: trigger Mailu sync + start = datetime.now(timezone.utc) + try: + if not settings.mailu_sync_url: + detail = "sync disabled" + self._upsert_task(conn, request_code, "mailu_sync", "ok", detail) + self._record_task(request_code, "mailu_sync", "ok", detail, start) + else: + mailu.sync(reason="ariadne_access_approve") + self._upsert_task(conn, request_code, "mailu_sync", "ok", None) + self._record_task(request_code, "mailu_sync", "ok", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to sync mailu") + self._upsert_task(conn, request_code, "mailu_sync", "error", detail) + self._record_task(request_code, "mailu_sync", "error", detail, start) + + # Task: trigger Nextcloud mail sync + start = datetime.now(timezone.utc) + try: + if not settings.nextcloud_namespace or not settings.nextcloud_mail_sync_cronjob: + detail = "sync disabled" + self._upsert_task(conn, request_code, "nextcloud_mail_sync", "ok", detail) + self._record_task(request_code, "nextcloud_mail_sync", "ok", detail, start) + else: + result = nextcloud.sync_mail(username, wait=True) + if isinstance(result, dict) and result.get("status") == "ok": + self._upsert_task(conn, request_code, "nextcloud_mail_sync", "ok", None) + self._record_task(request_code, "nextcloud_mail_sync", "ok", None, start) + else: + status_val = result.get("status") if isinstance(result, dict) else "error" + detail = str(status_val) + self._upsert_task(conn, request_code, "nextcloud_mail_sync", "error", detail) + self._record_task(request_code, "nextcloud_mail_sync", "error", detail, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to sync nextcloud") + self._upsert_task(conn, request_code, "nextcloud_mail_sync", "error", detail) + self._record_task(request_code, "nextcloud_mail_sync", "error", detail, start) + + # Task: ensure wger account exists + start = datetime.now(timezone.utc) + try: + full = keycloak_admin.get_user(user_id) + attrs = full.get("attributes") or {} + wger_password = _extract_attr(attrs, WGER_PASSWORD_ATTR) + wger_password_updated_at = _extract_attr(attrs, WGER_PASSWORD_UPDATED_ATTR) + + if not wger_password: + wger_password = random_password(20) + keycloak_admin.set_user_attribute(username, WGER_PASSWORD_ATTR, wger_password) + + if not wger_password_updated_at: + result = wger.sync_user(username, mailu_email, wger_password, wait=True) + status_val = result.get("status") if isinstance(result, dict) else "error" + if status_val != "ok": + raise RuntimeError(f"wger sync {status_val}") + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + keycloak_admin.set_user_attribute(username, WGER_PASSWORD_UPDATED_ATTR, now_iso) + + self._upsert_task(conn, request_code, "wger_account", "ok", None) + self._record_task(request_code, "wger_account", "ok", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to provision wger") + self._upsert_task(conn, request_code, "wger_account", "error", detail) + self._record_task(request_code, "wger_account", "error", detail, start) + + # Task: ensure firefly account exists + start = datetime.now(timezone.utc) + try: + full = keycloak_admin.get_user(user_id) + attrs = full.get("attributes") or {} + firefly_password = _extract_attr(attrs, FIREFLY_PASSWORD_ATTR) + firefly_password_updated_at = _extract_attr(attrs, FIREFLY_PASSWORD_UPDATED_ATTR) + + if not firefly_password: + firefly_password = random_password(24) + keycloak_admin.set_user_attribute(username, FIREFLY_PASSWORD_ATTR, firefly_password) + + if not firefly_password_updated_at: + result = firefly.sync_user(mailu_email, firefly_password, wait=True) + status_val = result.get("status") if isinstance(result, dict) else "error" + if status_val != "ok": + raise RuntimeError(f"firefly sync {status_val}") + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + keycloak_admin.set_user_attribute(username, FIREFLY_PASSWORD_UPDATED_ATTR, now_iso) + + self._upsert_task(conn, request_code, "firefly_account", "ok", None) + self._record_task(request_code, "firefly_account", "ok", None, start) + except Exception as exc: + detail = safe_error_detail(exc, "failed to provision firefly") + self._upsert_task(conn, request_code, "firefly_account", "error", detail) + self._record_task(request_code, "firefly_account", "error", detail, start) + + # Task: ensure Vaultwarden account exists (invite flow) + start = datetime.now(timezone.utc) + try: + if not mailu.wait_for_mailbox(mailu_email, settings.mailu_mailbox_wait_timeout_sec): + raise RuntimeError("mailbox not ready") + + result = vaultwarden.invite_user(mailu_email) + if result.ok: + self._upsert_task(conn, request_code, "vaultwarden_invite", "ok", result.status) + self._record_task(request_code, "vaultwarden_invite", "ok", result.status, start) + else: + detail = result.detail or result.status + self._upsert_task(conn, request_code, "vaultwarden_invite", "error", detail) + self._record_task(request_code, "vaultwarden_invite", "error", detail, start) + + try: + now_iso = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + keycloak_admin.set_user_attribute(username, "vaultwarden_email", mailu_email) + keycloak_admin.set_user_attribute(username, "vaultwarden_status", result.status) + keycloak_admin.set_user_attribute(username, "vaultwarden_synced_at", now_iso) + except Exception: + pass + except Exception as exc: + detail = safe_error_detail(exc, "failed to provision vaultwarden") + self._upsert_task(conn, request_code, "vaultwarden_invite", "error", detail) + self._record_task(request_code, "vaultwarden_invite", "error", detail, start) + + if self._all_tasks_ok(conn, request_code, required_tasks): + conn.execute( + """ + UPDATE access_requests + SET status = 'awaiting_onboarding' + WHERE request_code = %s AND status = 'accounts_building' + """, + (request_code,), + ) + self._send_welcome_email(request_code, username, contact_email) + return ProvisionOutcome(ok=True, status="awaiting_onboarding") + + return ProvisionOutcome(ok=False, status="accounts_building") + finally: + conn.execute("SELECT pg_advisory_unlock(%s)", (lock_id,)) + + def _ensure_task_rows(self, conn, request_code: str, tasks: list[str]) -> None: + if not tasks: + return + conn.execute( + """ + INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) + SELECT %s, task, 'pending', NULL, NOW() + FROM UNNEST(%s::text[]) AS task + ON CONFLICT (request_code, task) DO NOTHING + """, + (request_code, tasks), + ) + + def _upsert_task(self, conn, request_code: str, task: str, status: str, detail: str | None = None) -> None: + conn.execute( + """ + INSERT INTO access_request_tasks (request_code, task, status, detail, updated_at) + VALUES (%s, %s, %s, %s, NOW()) + ON CONFLICT (request_code, task) + DO UPDATE SET status = EXCLUDED.status, detail = EXCLUDED.detail, updated_at = NOW() + """, + (request_code, task, status, detail), + ) + + def _task_statuses(self, conn, request_code: str) -> dict[str, str]: + rows = conn.execute( + "SELECT task, status FROM access_request_tasks WHERE request_code = %s", + (request_code,), + ).fetchall() + output: dict[str, str] = {} + for row in rows: + task = row.get("task") if isinstance(row, dict) else None + status = row.get("status") if isinstance(row, dict) else None + if isinstance(task, str) and isinstance(status, str): + output[task] = status + return output + + def _all_tasks_ok(self, conn, request_code: str, tasks: list[str]) -> bool: + statuses = self._task_statuses(conn, request_code) + for task in tasks: + if statuses.get(task) != "ok": + return False + return True + + def _record_task(self, request_code: str, task: str, status: str, detail: str | None, started: datetime) -> None: + finished = datetime.now(timezone.utc) + duration_sec = (finished - started).total_seconds() + record_task_run(task, status, duration_sec) + try: + self._storage.record_task_run( + request_code, + task, + status, + detail, + started, + finished, + int(duration_sec * 1000), + ) + except Exception: + pass + + def _send_welcome_email(self, request_code: str, username: str, contact_email: str) -> None: + if not settings.welcome_email_enabled: + return + if not contact_email: + return + try: + row = self._db.fetchone( + "SELECT welcome_email_sent_at FROM access_requests WHERE request_code = %s", + (request_code,), + ) + if row and row.get("welcome_email_sent_at"): + return + onboarding_url = f"{settings.portal_public_base_url}/onboarding?code={request_code}" + mailer.send_welcome(contact_email, request_code, onboarding_url, username=username) + self._storage.mark_welcome_sent(request_code) + except MailerError: + return diff --git a/ariadne/metrics/metrics.py b/ariadne/metrics/metrics.py new file mode 100644 index 0000000..789c433 --- /dev/null +++ b/ariadne/metrics/metrics.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from prometheus_client import Counter, Gauge, Histogram + + +TASK_RUNS_TOTAL = Counter( + "ariadne_task_runs_total", + "Ariadne task runs by status", + ["task", "status"], +) +TASK_DURATION_SECONDS = Histogram( + "ariadne_task_duration_seconds", + "Ariadne task durations in seconds", + ["task", "status"], + buckets=(0.5, 1, 2, 5, 10, 30, 60, 120, 300), +) + +SCHEDULE_LAST_RUN_TS = Gauge( + "ariadne_schedule_last_run_timestamp_seconds", + "Last schedule run timestamp", + ["task"], +) +SCHEDULE_LAST_SUCCESS_TS = Gauge( + "ariadne_schedule_last_success_timestamp_seconds", + "Last successful schedule run timestamp", + ["task"], +) +SCHEDULE_NEXT_RUN_TS = Gauge( + "ariadne_schedule_next_run_timestamp_seconds", + "Next scheduled run timestamp", + ["task"], +) +SCHEDULE_STATUS = Gauge( + "ariadne_schedule_last_status", + "Last schedule status (1=ok,0=error)", + ["task"], +) + +ACCESS_REQUESTS = Gauge( + "ariadne_access_requests_total", + "Access requests by status", + ["status"], +) + + +def record_task_run(task: str, status: str, duration_sec: float | None) -> None: + TASK_RUNS_TOTAL.labels(task=task, status=status).inc() + if duration_sec is not None: + TASK_DURATION_SECONDS.labels(task=task, status=status).observe(duration_sec) + + +def record_schedule_state( + task: str, + last_run_ts: float | None, + last_success_ts: float | None, + next_run_ts: float | None, + ok: bool | None, +) -> None: + if last_run_ts: + SCHEDULE_LAST_RUN_TS.labels(task=task).set(last_run_ts) + if last_success_ts: + SCHEDULE_LAST_SUCCESS_TS.labels(task=task).set(last_success_ts) + if next_run_ts: + SCHEDULE_NEXT_RUN_TS.labels(task=task).set(next_run_ts) + if ok is not None: + SCHEDULE_STATUS.labels(task=task).set(1 if ok else 0) + + +def set_access_request_counts(counts: dict[str, int]) -> None: + for status, count in counts.items(): + ACCESS_REQUESTS.labels(status=status).set(count) diff --git a/ariadne/scheduler/cron.py b/ariadne/scheduler/cron.py new file mode 100644 index 0000000..7f9cb1a --- /dev/null +++ b/ariadne/scheduler/cron.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +import threading +import time +from typing import Callable + +from croniter import croniter + +from ..db.storage import Storage +from ..metrics.metrics import record_schedule_state, record_task_run + + +@dataclass(frozen=True) +class CronTask: + name: str + cron_expr: str + runner: Callable[[], None] + + +class CronScheduler: + def __init__(self, storage: Storage, tick_sec: float = 5.0) -> None: + self._storage = storage + self._tick_sec = tick_sec + self._tasks: dict[str, CronTask] = {} + self._next_run: dict[str, datetime] = {} + self._running: set[str] = set() + self._lock = threading.Lock() + self._stop_event = threading.Event() + self._thread: threading.Thread | None = None + + def add_task(self, name: str, cron_expr: str, runner: Callable[[], None]) -> None: + task = CronTask(name=name, cron_expr=cron_expr, runner=runner) + self._tasks[name] = task + self._next_run[name] = self._compute_next(cron_expr, datetime.now(timezone.utc)) + + def start(self) -> None: + if self._thread and self._thread.is_alive(): + return + self._stop_event.clear() + self._thread = threading.Thread(target=self._run_loop, name="ariadne-scheduler", daemon=True) + self._thread.start() + + def stop(self) -> None: + self._stop_event.set() + if self._thread: + self._thread.join(timeout=5) + + def _compute_next(self, cron_expr: str, base: datetime) -> datetime: + itr = croniter(cron_expr, base) + next_time = itr.get_next(datetime) + if next_time.tzinfo is None: + return next_time.replace(tzinfo=timezone.utc) + return next_time + + def _run_loop(self) -> None: + while not self._stop_event.is_set(): + now = datetime.now(timezone.utc) + for name, task in list(self._tasks.items()): + next_run = self._next_run.get(name) + if next_run and now >= next_run: + with self._lock: + if name in self._running: + continue + self._running.add(name) + self._next_run[name] = self._compute_next(task.cron_expr, now) + threading.Thread( + target=self._execute_task, + args=(task,), + name=f"ariadne-scheduler-{name}", + daemon=True, + ).start() + record_schedule_state( + name, + None, + None, + self._next_run.get(name).timestamp() if self._next_run.get(name) else None, + None, + ) + time.sleep(self._tick_sec) + + def _execute_task(self, task: CronTask) -> None: + started = datetime.now(timezone.utc) + status = "ok" + detail = None + try: + task.runner() + except Exception as exc: + status = "error" + detail = str(exc).strip() or "task failed" + finished = datetime.now(timezone.utc) + duration_sec = (finished - started).total_seconds() + record_task_run(task.name, status, duration_sec) + record_schedule_state( + task.name, + started.timestamp(), + started.timestamp() if status == "ok" else None, + self._next_run.get(task.name).timestamp() if self._next_run.get(task.name) else None, + status == "ok", + ) + try: + self._storage.record_task_run( + None, + task.name, + status, + detail, + started, + finished, + int(duration_sec * 1000), + ) + self._storage.update_schedule_state( + task.name, + task.cron_expr, + started, + finished, + status, + detail, + int(duration_sec * 1000), + self._next_run.get(task.name), + ) + except Exception: + pass + with self._lock: + self._running.discard(task.name) diff --git a/ariadne/services/firefly.py b/ariadne/services/firefly.py new file mode 100644 index 0000000..507bbbf --- /dev/null +++ b/ariadne/services/firefly.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from typing import Any + +from ..k8s.jobs import JobSpawner +from ..settings import settings + + +class FireflyService: + def __init__(self) -> None: + self._spawner = JobSpawner(settings.firefly_namespace, settings.firefly_user_sync_cronjob) + + def sync_user(self, email: str, password: str, wait: bool = True) -> dict[str, Any]: + email = (email or "").strip() + if not email: + raise RuntimeError("missing email") + if not password: + raise RuntimeError("missing password") + if not settings.firefly_namespace or not settings.firefly_user_sync_cronjob: + raise RuntimeError("firefly sync not configured") + + env_overrides = [ + {"name": "FIREFLY_USER_EMAIL", "value": email}, + {"name": "FIREFLY_USER_PASSWORD", "value": password}, + ] + + label_suffix = email.split("@", 1)[0] if "@" in email else email + if wait: + return self._spawner.trigger_and_wait( + label_suffix, + env_overrides, + settings.firefly_user_sync_wait_timeout_sec, + ) + return self._spawner.trigger(label_suffix, env_overrides) + + +firefly = FireflyService() diff --git a/ariadne/services/keycloak_admin.py b/ariadne/services/keycloak_admin.py new file mode 100644 index 0000000..f60f397 --- /dev/null +++ b/ariadne/services/keycloak_admin.py @@ -0,0 +1,192 @@ +from __future__ import annotations + +from typing import Any +import time + +import httpx + +from ..settings import settings + + +class KeycloakAdminClient: + def __init__(self) -> None: + self._token: str = "" + self._expires_at: float = 0.0 + self._group_id_cache: dict[str, str] = {} + + def ready(self) -> bool: + return bool(settings.keycloak_admin_client_id and settings.keycloak_admin_client_secret) + + def _get_token(self) -> str: + if not self.ready(): + raise RuntimeError("keycloak admin client not configured") + + now = time.time() + if self._token and now < self._expires_at - 30: + return self._token + + token_url = ( + f"{settings.keycloak_admin_url}/realms/{settings.keycloak_admin_realm}/protocol/openid-connect/token" + ) + data = { + "grant_type": "client_credentials", + "client_id": settings.keycloak_admin_client_id, + "client_secret": settings.keycloak_admin_client_secret, + } + with httpx.Client(timeout=10.0) as client: + resp = client.post(token_url, data=data) + resp.raise_for_status() + payload = resp.json() + token = payload.get("access_token") or "" + if not token: + raise RuntimeError("no access_token in response") + expires_in = int(payload.get("expires_in") or 60) + self._token = token + self._expires_at = now + expires_in + return token + + def _headers(self) -> dict[str, str]: + return {"Authorization": f"Bearer {self._get_token()}"} + + def headers(self) -> dict[str, str]: + return self._headers() + + def find_user(self, username: str) -> dict[str, Any] | None: + url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users" + params = {"username": username, "exact": "true", "max": "1"} + with httpx.Client(timeout=10.0) as client: + resp = client.get(url, params=params, headers=self._headers()) + resp.raise_for_status() + users = resp.json() + if not isinstance(users, list) or not users: + return None + user = users[0] + return user if isinstance(user, dict) else None + + def find_user_by_email(self, email: str) -> dict[str, Any] | None: + email = (email or "").strip() + if not email: + return None + + url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users" + params = {"email": email, "exact": "true", "max": "2"} + email_norm = email.lower() + with httpx.Client(timeout=10.0) as client: + resp = client.get(url, params=params, headers=self._headers()) + resp.raise_for_status() + users = resp.json() + if not isinstance(users, list) or not users: + return None + for user in users: + if not isinstance(user, dict): + continue + candidate = user.get("email") + if isinstance(candidate, str) and candidate.strip().lower() == email_norm: + return user + return None + + def get_user(self, user_id: str) -> dict[str, Any]: + url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users/{user_id}" + with httpx.Client(timeout=10.0) as client: + resp = client.get(url, headers=self._headers()) + resp.raise_for_status() + data = resp.json() + if not isinstance(data, dict): + raise RuntimeError("unexpected user payload") + return data + + def update_user(self, user_id: str, payload: dict[str, Any]) -> None: + url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users/{user_id}" + with httpx.Client(timeout=10.0) as client: + resp = client.put(url, headers={**self._headers(), "Content-Type": "application/json"}, json=payload) + resp.raise_for_status() + + def create_user(self, payload: dict[str, Any]) -> str: + url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users" + with httpx.Client(timeout=10.0) as client: + resp = client.post(url, headers={**self._headers(), "Content-Type": "application/json"}, json=payload) + resp.raise_for_status() + location = resp.headers.get("Location") or "" + if location: + return location.rstrip("/").split("/")[-1] + raise RuntimeError("failed to determine created user id") + + def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None: + url = ( + f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}" + f"/users/{user_id}/reset-password" + ) + payload = {"type": "password", "value": password, "temporary": bool(temporary)} + with httpx.Client(timeout=10.0) as client: + resp = client.put(url, headers={**self._headers(), "Content-Type": "application/json"}, json=payload) + resp.raise_for_status() + + def set_user_attribute(self, username: str, key: str, value: str) -> None: + user = self.find_user(username) + if not user: + raise RuntimeError("user not found") + user_id = user.get("id") or "" + if not user_id: + raise RuntimeError("user id missing") + + full = self.get_user(user_id) + attrs = full.get("attributes") or {} + if not isinstance(attrs, dict): + attrs = {} + attrs[key] = [value] + self.update_user(user_id, {"attributes": attrs}) + + def get_group_id(self, group_name: str) -> str | None: + cached = self._group_id_cache.get(group_name) + if cached: + return cached + + url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/groups" + params = {"search": group_name} + with httpx.Client(timeout=10.0) as client: + resp = client.get(url, params=params, headers=self._headers()) + resp.raise_for_status() + items = resp.json() + if not isinstance(items, list): + return None + for item in items: + if not isinstance(item, dict): + continue + if item.get("name") == group_name and item.get("id"): + gid = str(item["id"]) + self._group_id_cache[group_name] = gid + return gid + return None + + def add_user_to_group(self, user_id: str, group_id: str) -> None: + url = ( + f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}" + f"/users/{user_id}/groups/{group_id}" + ) + with httpx.Client(timeout=10.0) as client: + resp = client.put(url, headers=self._headers()) + resp.raise_for_status() + + def iter_users(self, page_size: int = 200, brief: bool = False) -> list[dict[str, Any]]: + url = f"{settings.keycloak_admin_url}/admin/realms/{settings.keycloak_realm}/users" + users: list[dict[str, Any]] = [] + first = 0 + while True: + params = {"first": str(first), "max": str(page_size)} + if not brief: + params["briefRepresentation"] = "false" + with httpx.Client(timeout=10.0) as client: + resp = client.get(url, params=params, headers=self._headers()) + resp.raise_for_status() + payload = resp.json() + if not isinstance(payload, list) or not payload: + return users + for item in payload: + if isinstance(item, dict): + users.append(item) + if len(payload) < page_size: + return users + first += page_size + + +keycloak_admin = KeycloakAdminClient() diff --git a/ariadne/services/mailer.py b/ariadne/services/mailer.py new file mode 100644 index 0000000..f7a753f --- /dev/null +++ b/ariadne/services/mailer.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from dataclasses import dataclass +from email.message import EmailMessage +import smtplib +from typing import Iterable + +from ..settings import settings + + +class MailerError(RuntimeError): + pass + + +@dataclass(frozen=True) +class SentEmail: + ok: bool + detail: str = "" + + +class Mailer: + def __init__(self) -> None: + self._host = settings.smtp_host + self._port = settings.smtp_port + self._username = settings.smtp_username + self._password = settings.smtp_password + self._from_addr = settings.smtp_from + self._starttls = settings.smtp_starttls + self._use_tls = settings.smtp_use_tls + self._timeout = settings.smtp_timeout_sec + + def send(self, subject: str, to_addrs: Iterable[str], text_body: str, html_body: str | None = None) -> SentEmail: + if not self._host: + raise MailerError("smtp host not configured") + + message = EmailMessage() + message["Subject"] = subject + message["From"] = self._from_addr + message["To"] = ", ".join(to_addrs) + message.set_content(text_body) + if html_body: + message.add_alternative(html_body, subtype="html") + + try: + if self._use_tls: + server: smtplib.SMTP = smtplib.SMTP_SSL(self._host, self._port, timeout=self._timeout) + else: + server = smtplib.SMTP(self._host, self._port, timeout=self._timeout) + with server: + server.ehlo() + if self._starttls: + server.starttls() + server.ehlo() + if self._username: + server.login(self._username, self._password) + server.send_message(message) + return SentEmail(ok=True, detail="sent") + except Exception as exc: + raise MailerError(str(exc)) from exc + + def send_welcome(self, to_addr: str, request_code: str, onboarding_url: str, username: str | None = None) -> SentEmail: + display = username or "there" + subject = "Welcome to Titan Lab" + text_body = "\n".join( + [ + f"Hi {display},", + "", + "Your Titan Lab access is approved.", + f"Complete onboarding here: {onboarding_url}", + "", + f"Request code: {request_code}", + "", + "If you did not request access, ignore this email.", + "", + "— Titan Lab", + ] + ) + + html_body = f""" + + + + +
+

Welcome to Titan Lab

+

Hi {display}, your access has been approved.

+

+ + Start onboarding + +

+

Request code: {request_code}

+

If you did not request access, ignore this email.

+
+ + + """.strip() + + return self.send(subject, [to_addr], text_body, html_body) + + +mailer = Mailer() diff --git a/ariadne/services/mailu.py b/ariadne/services/mailu.py new file mode 100644 index 0000000..0600741 --- /dev/null +++ b/ariadne/services/mailu.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import time +from typing import Any + +import httpx +import psycopg + +from ..settings import settings + + +class MailuService: + def __init__(self) -> None: + self._db_config = { + "host": settings.mailu_db_host, + "port": settings.mailu_db_port, + "dbname": settings.mailu_db_name, + "user": settings.mailu_db_user, + "password": settings.mailu_db_password, + } + + def mailbox_exists(self, email: str) -> bool: + email = (email or "").strip() + if not email: + return False + try: + with psycopg.connect(**self._db_config) as conn: + with conn.cursor() as cur: + cur.execute('SELECT 1 FROM "user" WHERE email = %s LIMIT 1', (email,)) + return cur.fetchone() is not None + except Exception: + return False + + def wait_for_mailbox(self, email: str, timeout_sec: float = 60.0) -> bool: + deadline = time.time() + timeout_sec + while time.time() < deadline: + if self.mailbox_exists(email): + return True + time.sleep(2) + return False + + def sync(self, reason: str) -> None: + if not settings.mailu_sync_url: + return + with httpx.Client(timeout=settings.mailu_sync_wait_timeout_sec) as client: + resp = client.post( + settings.mailu_sync_url, + json={"ts": int(time.time()), "wait": True, "reason": reason}, + ) + if resp.status_code != 200: + raise RuntimeError(f"mailu sync failed status={resp.status_code}") + + @staticmethod + def resolve_mailu_email(username: str, attributes: dict[str, Any] | None) -> str: + attrs = attributes or {} + raw = attrs.get("mailu_email") if isinstance(attrs, dict) else None + if isinstance(raw, list): + for item in raw: + if isinstance(item, str) and item.strip(): + return item.strip() + if isinstance(raw, str) and raw.strip(): + return raw.strip() + return f"{username}@{settings.mailu_domain}" diff --git a/ariadne/services/nextcloud.py b/ariadne/services/nextcloud.py new file mode 100644 index 0000000..eb4b395 --- /dev/null +++ b/ariadne/services/nextcloud.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from typing import Any + +from ..k8s.jobs import JobSpawner +from ..settings import settings + + +class NextcloudService: + def __init__(self) -> None: + self._spawner = JobSpawner(settings.nextcloud_namespace, settings.nextcloud_mail_sync_cronjob) + + def sync_mail(self, username: str | None = None, wait: bool = True) -> dict[str, Any]: + if not settings.nextcloud_namespace or not settings.nextcloud_mail_sync_cronjob: + raise RuntimeError("nextcloud mail sync not configured") + + env_overrides = None + label_suffix = "all" + if username: + cleaned = (username or "").strip() + if not cleaned: + raise RuntimeError("missing username") + env_overrides = [{"name": "ONLY_USERNAME", "value": cleaned}] + label_suffix = cleaned + + ttl = settings.nextcloud_mail_sync_job_ttl_sec + if wait: + return self._spawner.trigger_and_wait( + label_suffix, + env_overrides, + settings.nextcloud_mail_sync_wait_timeout_sec, + job_ttl_seconds=ttl, + ) + return self._spawner.trigger(label_suffix, env_overrides, job_ttl_seconds=ttl) + + +nextcloud = NextcloudService() diff --git a/ariadne/services/vaultwarden.py b/ariadne/services/vaultwarden.py new file mode 100644 index 0000000..263d4ef --- /dev/null +++ b/ariadne/services/vaultwarden.py @@ -0,0 +1,151 @@ +from __future__ import annotations + +import threading +import time +from dataclasses import dataclass + +import httpx + +from ..k8s.client import get_secret_value, get_json +from ..settings import settings + + +@dataclass(frozen=True) +class VaultwardenInvite: + ok: bool + status: str + detail: str = "" + + +class VaultwardenService: + def __init__(self) -> None: + self._admin_lock = threading.Lock() + self._admin_session: httpx.Client | None = None + self._admin_session_expires_at: float = 0.0 + self._admin_session_base_url: str = "" + self._rate_limited_until: float = 0.0 + + def invite_user(self, email: str) -> VaultwardenInvite: + email = (email or "").strip() + if not email or "@" not in email: + return VaultwardenInvite(ok=False, status="invalid_email", detail="email invalid") + if self._rate_limited_until and time.time() < self._rate_limited_until: + return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited") + + base_url = f"http://{settings.vaultwarden_service_host}" + fallback_url = "" + try: + pod_ip = self._find_pod_ip(settings.vaultwarden_namespace, settings.vaultwarden_pod_label) + fallback_url = f"http://{pod_ip}:{settings.vaultwarden_pod_port}" + except Exception: + fallback_url = "" + + last_error = "" + for candidate in [base_url, fallback_url]: + if not candidate: + continue + try: + session = self._admin_session(candidate) + resp = session.post("/admin/invite", json={"email": email}) + if resp.status_code == 429: + self._rate_limited_until = time.time() + float(settings.vaultwarden_admin_rate_limit_backoff_sec) + return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited") + + if resp.status_code in {200, 201, 204}: + return VaultwardenInvite(ok=True, status="invited", detail="invite created") + + body = "" + try: + body = resp.text or "" + except Exception: + body = "" + if resp.status_code in {400, 409} and any( + marker in body.lower() + for marker in ( + "already invited", + "already exists", + "already registered", + "user already exists", + ) + ): + return VaultwardenInvite(ok=True, status="already_present", detail="user already present") + + last_error = f"status {resp.status_code}" + except Exception as exc: + last_error = str(exc) + if "rate limited" in last_error.lower(): + return VaultwardenInvite(ok=False, status="rate_limited", detail="vaultwarden rate limited") + continue + + return VaultwardenInvite(ok=False, status="error", detail=last_error or "failed to invite") + + def _admin_session(self, base_url: str) -> httpx.Client: + now = time.time() + with self._admin_lock: + if self._rate_limited_until and now < self._rate_limited_until: + raise RuntimeError("vaultwarden rate limited") + if self._admin_session and now < self._admin_session_expires_at and self._admin_session_base_url == base_url: + return self._admin_session + + if self._admin_session: + try: + self._admin_session.close() + except Exception: + pass + self._admin_session = None + + token = get_secret_value( + settings.vaultwarden_namespace, + settings.vaultwarden_admin_secret_name, + settings.vaultwarden_admin_secret_key, + ) + + client = httpx.Client( + base_url=base_url, + timeout=10.0, + follow_redirects=True, + headers={"User-Agent": "ariadne/1"}, + ) + resp = client.post("/admin", data={"token": token}) + if resp.status_code == 429: + self._rate_limited_until = now + float(settings.vaultwarden_admin_rate_limit_backoff_sec) + raise RuntimeError("vaultwarden rate limited") + resp.raise_for_status() + + self._admin_session = client + self._admin_session_base_url = base_url + self._admin_session_expires_at = now + float(settings.vaultwarden_admin_session_ttl_sec) + return client + + @staticmethod + def _find_pod_ip(namespace: str, label_selector: str) -> str: + data = get_json(f"/api/v1/namespaces/{namespace}/pods?labelSelector={label_selector}") + items = data.get("items") or [] + if not isinstance(items, list) or not items: + raise RuntimeError("no vaultwarden pods found") + + def _pod_ready(pod: dict) -> bool: + status = pod.get("status") if isinstance(pod.get("status"), dict) else {} + if status.get("phase") != "Running": + return False + ip = status.get("podIP") + if not isinstance(ip, str) or not ip: + return False + conditions = status.get("conditions") if isinstance(status.get("conditions"), list) else [] + for cond in conditions: + if not isinstance(cond, dict): + continue + if cond.get("type") == "Ready": + return cond.get("status") == "True" + return True + + ready = [p for p in items if isinstance(p, dict) and _pod_ready(p)] + candidates = ready or [p for p in items if isinstance(p, dict)] + status = candidates[0].get("status") or {} + ip = status.get("podIP") if isinstance(status, dict) else None + if not isinstance(ip, str) or not ip: + raise RuntimeError("vaultwarden pod has no IP") + return ip + + +vaultwarden = VaultwardenService() diff --git a/ariadne/services/vaultwarden_sync.py b/ariadne/services/vaultwarden_sync.py new file mode 100644 index 0000000..5671a6c --- /dev/null +++ b/ariadne/services/vaultwarden_sync.py @@ -0,0 +1,197 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +import time +from typing import Any + +from ..settings import settings +from .keycloak_admin import keycloak_admin +from .mailu import mailu +from .vaultwarden import vaultwarden + + +VAULTWARDEN_EMAIL_ATTR = "vaultwarden_email" +VAULTWARDEN_STATUS_ATTR = "vaultwarden_status" +VAULTWARDEN_SYNCED_AT_ATTR = "vaultwarden_synced_at" + + +@dataclass(frozen=True) +class VaultwardenSyncSummary: + processed: int + created_or_present: int + skipped: int + failures: int + detail: str = "" + + +def _extract_attr(attrs: Any, key: str) -> str: + if not isinstance(attrs, dict): + return "" + raw = attrs.get(key) + if isinstance(raw, list): + for item in raw: + if isinstance(item, str) and item.strip(): + return item.strip() + return "" + if isinstance(raw, str) and raw.strip(): + return raw.strip() + return "" + + +def _parse_synced_at(value: str) -> float | None: + value = (value or "").strip() + if not value: + return None + for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S%z"): + try: + parsed = datetime.strptime(value, fmt) + if parsed.tzinfo is None: + parsed = parsed.replace(tzinfo=timezone.utc) + return parsed.timestamp() + except ValueError: + continue + return None + + +def _vaultwarden_email_for_user(user: dict[str, Any]) -> str: + username = (user.get("username") if isinstance(user.get("username"), str) else "") or "" + username = username.strip() + if not username: + return "" + + attrs = user.get("attributes") + vaultwarden_email = _extract_attr(attrs, VAULTWARDEN_EMAIL_ATTR) + if vaultwarden_email: + return vaultwarden_email + + mailu_email = _extract_attr(attrs, "mailu_email") + if mailu_email: + return mailu_email + + email = (user.get("email") if isinstance(user.get("email"), str) else "") or "" + email = email.strip() + if email and email.lower().endswith(f"@{settings.mailu_domain.lower()}"): + return email + + return "" + + +def _set_user_attribute_if_missing(username: str, user: dict[str, Any], key: str, value: str) -> None: + value = (value or "").strip() + if not value: + return + existing = _extract_attr(user.get("attributes"), key) + if existing: + return + keycloak_admin.set_user_attribute(username, key, value) + + +def _set_user_attribute(username: str, key: str, value: str) -> None: + value = (value or "").strip() + if not value: + return + keycloak_admin.set_user_attribute(username, key, value) + + +def run_vaultwarden_sync() -> VaultwardenSyncSummary: + processed = 0 + created = 0 + skipped = 0 + failures = 0 + consecutive_failures = 0 + + if not keycloak_admin.ready(): + return VaultwardenSyncSummary(0, 0, 0, 1, detail="keycloak admin not configured") + + users = keycloak_admin.iter_users(page_size=200, brief=False) + for user in users: + username = (user.get("username") if isinstance(user.get("username"), str) else "") or "" + username = username.strip() + if not username: + skipped += 1 + continue + + enabled = user.get("enabled") + if enabled is False: + skipped += 1 + continue + + if user.get("serviceAccountClientId") or username.startswith("service-account-"): + skipped += 1 + continue + + user_id = (user.get("id") if isinstance(user.get("id"), str) else "") or "" + full_user = user + if user_id: + try: + full_user = keycloak_admin.get_user(user_id) + except Exception: + full_user = user + + current_status = _extract_attr(full_user.get("attributes"), VAULTWARDEN_STATUS_ATTR) + current_synced_at = _extract_attr(full_user.get("attributes"), VAULTWARDEN_SYNCED_AT_ATTR) + current_synced_ts = _parse_synced_at(current_synced_at) + if current_status in {"rate_limited", "error"} and current_synced_ts: + if time.time() - current_synced_ts < settings.vaultwarden_retry_cooldown_sec: + skipped += 1 + continue + + email = _vaultwarden_email_for_user(full_user) + if not email: + skipped += 1 + continue + + if not mailu.mailbox_exists(email): + skipped += 1 + continue + + try: + _set_user_attribute_if_missing(username, full_user, VAULTWARDEN_EMAIL_ATTR, email) + except Exception: + pass + + if current_status in {"invited", "already_present"}: + if not current_synced_at: + try: + _set_user_attribute( + username, + VAULTWARDEN_SYNCED_AT_ATTR, + time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + ) + except Exception: + pass + skipped += 1 + continue + + processed += 1 + result = vaultwarden.invite_user(email) + if result.ok: + created += 1 + consecutive_failures = 0 + try: + _set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status) + _set_user_attribute( + username, + VAULTWARDEN_SYNCED_AT_ATTR, + time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + ) + except Exception: + pass + else: + failures += 1 + if result.status in {"rate_limited", "error"}: + consecutive_failures += 1 + try: + _set_user_attribute(username, VAULTWARDEN_STATUS_ATTR, result.status) + _set_user_attribute( + username, + VAULTWARDEN_SYNCED_AT_ATTR, + time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + ) + except Exception: + pass + if consecutive_failures >= settings.vaultwarden_failure_bailout: + break + + return VaultwardenSyncSummary(processed, created, skipped, failures) diff --git a/ariadne/services/wger.py b/ariadne/services/wger.py new file mode 100644 index 0000000..b5b848f --- /dev/null +++ b/ariadne/services/wger.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from typing import Any + +from ..k8s.jobs import JobSpawner +from ..settings import settings + + +class WgerService: + def __init__(self) -> None: + self._user_spawner = JobSpawner(settings.wger_namespace, settings.wger_user_sync_cronjob) + self._admin_spawner = JobSpawner(settings.wger_namespace, settings.wger_admin_cronjob) + + def sync_user(self, username: str, email: str, password: str, wait: bool = True) -> dict[str, Any]: + username = (username or "").strip() + if not username: + raise RuntimeError("missing username") + if not password: + raise RuntimeError("missing password") + if not settings.wger_namespace or not settings.wger_user_sync_cronjob: + raise RuntimeError("wger sync not configured") + + env_overrides = [ + {"name": "WGER_USERNAME", "value": username}, + {"name": "WGER_EMAIL", "value": email}, + {"name": "WGER_PASSWORD", "value": password}, + ] + + if wait: + return self._user_spawner.trigger_and_wait( + username, + env_overrides, + settings.wger_user_sync_wait_timeout_sec, + ) + return self._user_spawner.trigger(username, env_overrides) + + def ensure_admin(self, wait: bool = False) -> dict[str, Any]: + if not settings.wger_namespace or not settings.wger_admin_cronjob: + raise RuntimeError("wger admin sync not configured") + if wait: + return self._admin_spawner.trigger_and_wait( + "admin", + None, + settings.wger_user_sync_wait_timeout_sec, + ) + return self._admin_spawner.trigger("admin", None) + + +wger = WgerService() diff --git a/ariadne/settings.py b/ariadne/settings.py new file mode 100644 index 0000000..a603813 --- /dev/null +++ b/ariadne/settings.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +from dataclasses import dataclass +import os + + +def _env(name: str, default: str = "") -> str: + value = os.getenv(name, default) + return value.strip() if isinstance(value, str) else default + + +def _env_bool(name: str, default: str = "false") -> bool: + return _env(name, default).lower() in {"1", "true", "yes", "y", "on"} + + +def _env_int(name: str, default: int) -> int: + raw = _env(name, str(default)) + try: + return int(raw) + except ValueError: + return default + + +def _env_float(name: str, default: float) -> float: + raw = _env(name, str(default)) + try: + return float(raw) + except ValueError: + return default + + +@dataclass(frozen=True) +class Settings: + app_name: str + bind_host: str + bind_port: int + portal_database_url: str + portal_public_base_url: str + + keycloak_url: str + keycloak_realm: str + keycloak_client_id: str + keycloak_issuer: str + keycloak_jwks_url: str + + keycloak_admin_url: str + keycloak_admin_realm: str + keycloak_admin_client_id: str + keycloak_admin_client_secret: str + + portal_admin_users: list[str] + portal_admin_groups: list[str] + account_allowed_groups: list[str] + allowed_flag_groups: list[str] + default_user_groups: list[str] + + mailu_domain: str + mailu_sync_url: str + mailu_sync_wait_timeout_sec: float + mailu_mailbox_wait_timeout_sec: float + mailu_db_host: str + mailu_db_port: int + mailu_db_name: str + mailu_db_user: str + mailu_db_password: str + + nextcloud_namespace: str + nextcloud_mail_sync_cronjob: str + nextcloud_mail_sync_wait_timeout_sec: float + nextcloud_mail_sync_job_ttl_sec: int + + wger_namespace: str + wger_user_sync_cronjob: str + wger_user_sync_wait_timeout_sec: float + wger_admin_cronjob: str + + firefly_namespace: str + firefly_user_sync_cronjob: str + firefly_user_sync_wait_timeout_sec: float + + vaultwarden_namespace: str + vaultwarden_pod_label: str + vaultwarden_pod_port: int + vaultwarden_service_host: str + vaultwarden_admin_secret_name: str + vaultwarden_admin_secret_key: str + vaultwarden_admin_session_ttl_sec: float + vaultwarden_admin_rate_limit_backoff_sec: float + vaultwarden_retry_cooldown_sec: float + vaultwarden_failure_bailout: int + + smtp_host: str + smtp_port: int + smtp_username: str + smtp_password: str + smtp_starttls: bool + smtp_use_tls: bool + smtp_from: str + smtp_timeout_sec: float + welcome_email_enabled: bool + + provision_poll_interval_sec: float + provision_retry_cooldown_sec: float + schedule_tick_sec: float + k8s_api_timeout_sec: float + + mailu_sync_cron: str + nextcloud_sync_cron: str + vaultwarden_sync_cron: str + wger_admin_cron: str + + metrics_path: str + + @classmethod + def from_env(cls) -> "Settings": + keycloak_url = _env("KEYCLOAK_URL", "https://sso.bstein.dev").rstrip("/") + keycloak_realm = _env("KEYCLOAK_REALM", "atlas") + keycloak_client_id = _env("KEYCLOAK_CLIENT_ID", "bstein-dev-home") + keycloak_issuer = _env("KEYCLOAK_ISSUER", f"{keycloak_url}/realms/{keycloak_realm}").rstrip("/") + keycloak_jwks_url = _env("KEYCLOAK_JWKS_URL", f"{keycloak_issuer}/protocol/openid-connect/certs").rstrip("/") + + admin_users = [u for u in (_env("PORTAL_ADMIN_USERS", "bstein")).split(",") if u.strip()] + admin_groups = [g for g in (_env("PORTAL_ADMIN_GROUPS", "admin")).split(",") if g.strip()] + allowed_groups = [g for g in (_env("ACCOUNT_ALLOWED_GROUPS", "dev,admin")).split(",") if g.strip()] + flag_groups = [g for g in (_env("ALLOWED_FLAG_GROUPS", "demo,test")).split(",") if g.strip()] + default_groups = [g for g in (_env("DEFAULT_USER_GROUPS", "dev")).split(",") if g.strip()] + + mailu_db_port = _env_int("MAILU_DB_PORT", 5432) + smtp_port = _env_int("SMTP_PORT", 25) + + return cls( + app_name=_env("ARIADNE_APP_NAME", "ariadne"), + bind_host=_env("ARIADNE_BIND_HOST", "0.0.0.0"), + bind_port=_env_int("ARIADNE_BIND_PORT", 8080), + portal_database_url=_env("PORTAL_DATABASE_URL", ""), + portal_public_base_url=_env("PORTAL_PUBLIC_BASE_URL", "https://bstein.dev").rstrip("/"), + keycloak_url=keycloak_url, + keycloak_realm=keycloak_realm, + keycloak_client_id=keycloak_client_id, + keycloak_issuer=keycloak_issuer, + keycloak_jwks_url=keycloak_jwks_url, + keycloak_admin_url=_env("KEYCLOAK_ADMIN_URL", keycloak_url).rstrip("/"), + keycloak_admin_realm=_env("KEYCLOAK_ADMIN_REALM", keycloak_realm), + keycloak_admin_client_id=_env("KEYCLOAK_ADMIN_CLIENT_ID", ""), + keycloak_admin_client_secret=_env("KEYCLOAK_ADMIN_CLIENT_SECRET", ""), + portal_admin_users=admin_users, + portal_admin_groups=admin_groups, + account_allowed_groups=allowed_groups, + allowed_flag_groups=flag_groups, + default_user_groups=default_groups, + mailu_domain=_env("MAILU_DOMAIN", "bstein.dev"), + mailu_sync_url=_env( + "MAILU_SYNC_URL", + "http://mailu-sync-listener.mailu-mailserver.svc.cluster.local:8080/events", + ).rstrip("/"), + mailu_sync_wait_timeout_sec=_env_float("MAILU_SYNC_WAIT_TIMEOUT_SEC", 60.0), + mailu_mailbox_wait_timeout_sec=_env_float("MAILU_MAILBOX_WAIT_TIMEOUT_SEC", 60.0), + mailu_db_host=_env("MAILU_DB_HOST", "postgres-service.postgres.svc.cluster.local"), + mailu_db_port=mailu_db_port, + mailu_db_name=_env("MAILU_DB_NAME", "mailu"), + mailu_db_user=_env("MAILU_DB_USER", "mailu"), + mailu_db_password=_env("MAILU_DB_PASSWORD", ""), + nextcloud_namespace=_env("NEXTCLOUD_NAMESPACE", "nextcloud"), + nextcloud_mail_sync_cronjob=_env("NEXTCLOUD_MAIL_SYNC_CRONJOB", "nextcloud-mail-sync"), + nextcloud_mail_sync_wait_timeout_sec=_env_float("NEXTCLOUD_MAIL_SYNC_WAIT_TIMEOUT_SEC", 90.0), + nextcloud_mail_sync_job_ttl_sec=_env_int("NEXTCLOUD_MAIL_SYNC_JOB_TTL_SEC", 3600), + wger_namespace=_env("WGER_NAMESPACE", "health"), + wger_user_sync_cronjob=_env("WGER_USER_SYNC_CRONJOB", "wger-user-sync"), + wger_user_sync_wait_timeout_sec=_env_float("WGER_USER_SYNC_WAIT_TIMEOUT_SEC", 60.0), + wger_admin_cronjob=_env("WGER_ADMIN_CRONJOB", "wger-admin-ensure"), + firefly_namespace=_env("FIREFLY_NAMESPACE", "finance"), + firefly_user_sync_cronjob=_env("FIREFLY_USER_SYNC_CRONJOB", "firefly-user-sync"), + firefly_user_sync_wait_timeout_sec=_env_float("FIREFLY_USER_SYNC_WAIT_TIMEOUT_SEC", 90.0), + vaultwarden_namespace=_env("VAULTWARDEN_NAMESPACE", "vaultwarden"), + vaultwarden_pod_label=_env("VAULTWARDEN_POD_LABEL", "app=vaultwarden"), + vaultwarden_pod_port=_env_int("VAULTWARDEN_POD_PORT", 80), + vaultwarden_service_host=_env( + "VAULTWARDEN_SERVICE_HOST", + "vaultwarden-service.vaultwarden.svc.cluster.local", + ), + vaultwarden_admin_secret_name=_env("VAULTWARDEN_ADMIN_SECRET_NAME", "vaultwarden-admin"), + vaultwarden_admin_secret_key=_env("VAULTWARDEN_ADMIN_SECRET_KEY", "ADMIN_TOKEN"), + vaultwarden_admin_session_ttl_sec=_env_float("VAULTWARDEN_ADMIN_SESSION_TTL_SEC", 300.0), + vaultwarden_admin_rate_limit_backoff_sec=_env_float("VAULTWARDEN_ADMIN_RATE_LIMIT_BACKOFF_SEC", 600.0), + vaultwarden_retry_cooldown_sec=_env_float("VAULTWARDEN_RETRY_COOLDOWN_SEC", 1800.0), + vaultwarden_failure_bailout=_env_int("VAULTWARDEN_FAILURE_BAILOUT", 2), + smtp_host=_env("SMTP_HOST", ""), + smtp_port=smtp_port, + smtp_username=_env("SMTP_USERNAME", ""), + smtp_password=_env("SMTP_PASSWORD", ""), + smtp_starttls=_env_bool("SMTP_STARTTLS", "false"), + smtp_use_tls=_env_bool("SMTP_USE_TLS", "false"), + smtp_from=_env("SMTP_FROM", f"postmaster@{mailu_domain}"), + smtp_timeout_sec=_env_float("SMTP_TIMEOUT_SEC", 10.0), + welcome_email_enabled=_env_bool("WELCOME_EMAIL_ENABLED", "true"), + provision_poll_interval_sec=_env_float("ARIADNE_PROVISION_POLL_INTERVAL_SEC", 5.0), + provision_retry_cooldown_sec=_env_float("ARIADNE_PROVISION_RETRY_COOLDOWN_SEC", 30.0), + schedule_tick_sec=_env_float("ARIADNE_SCHEDULE_TICK_SEC", 5.0), + k8s_api_timeout_sec=_env_float("K8S_API_TIMEOUT_SEC", 5.0), + mailu_sync_cron=_env("ARIADNE_SCHEDULE_MAILU_SYNC", "30 4 * * *"), + nextcloud_sync_cron=_env("ARIADNE_SCHEDULE_NEXTCLOUD_SYNC", "0 5 * * *"), + vaultwarden_sync_cron=_env("ARIADNE_SCHEDULE_VAULTWARDEN_SYNC", "*/15 * * * *"), + wger_admin_cron=_env("ARIADNE_SCHEDULE_WGER_ADMIN", "15 3 * * *"), + metrics_path=_env("METRICS_PATH", "/metrics"), + ) + + +settings = Settings.from_env() diff --git a/ariadne/utils/errors.py b/ariadne/utils/errors.py new file mode 100644 index 0000000..9138dd7 --- /dev/null +++ b/ariadne/utils/errors.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import httpx + + +def safe_error_detail(exc: Exception, fallback: str) -> str: + if isinstance(exc, RuntimeError): + msg = str(exc).strip() + if msg: + return msg + if isinstance(exc, httpx.HTTPStatusError): + detail = f"http {exc.response.status_code}" + try: + payload = exc.response.json() + msg: str | None = None + if isinstance(payload, dict): + raw = payload.get("errorMessage") or payload.get("error") or payload.get("message") + if isinstance(raw, str) and raw.strip(): + msg = raw.strip() + elif isinstance(payload, str) and payload.strip(): + msg = payload.strip() + if msg: + msg = " ".join(msg.split()) + detail = f"{detail}: {msg[:200]}" + except Exception: + text = (exc.response.text or "").strip() + if text: + text = " ".join(text.split()) + detail = f"{detail}: {text[:200]}" + return detail + if isinstance(exc, httpx.TimeoutException): + return "timeout" + return fallback diff --git a/ariadne/utils/http.py b/ariadne/utils/http.py new file mode 100644 index 0000000..05449cf --- /dev/null +++ b/ariadne/utils/http.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from fastapi import Request + + +def extract_bearer_token(request: Request) -> str | None: + header = request.headers.get("Authorization", "") + if not header: + return None + parts = header.split(None, 1) + if len(parts) != 2: + return None + scheme, token = parts[0].lower(), parts[1].strip() + if scheme != "bearer" or not token: + return None + return token diff --git a/ariadne/utils/passwords.py b/ariadne/utils/passwords.py new file mode 100644 index 0000000..be7bd32 --- /dev/null +++ b/ariadne/utils/passwords.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +import secrets +import string + + +def random_password(length: int = 32) -> str: + alphabet = string.ascii_letters + string.digits + return "".join(secrets.choice(alphabet) for _ in range(length)) diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..6dc4c8e --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,2 @@ +pytest==8.3.5 +pytest-mock==3.14.0 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9fd66ff --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +fastapi==0.115.11 +uvicorn[standard]==0.30.6 +httpx==0.27.2 +PyJWT==2.10.1 +psycopg[binary]==3.2.6 +psycopg-pool==3.2.6 +croniter==2.0.7 +prometheus-client==0.21.1 diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..f0ecd1c --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,26 @@ +from __future__ import annotations + +import re + +from ariadne.services.mailu import MailuService +from ariadne.utils.errors import safe_error_detail +from ariadne.utils.passwords import random_password + + +def test_random_password_length() -> None: + password = random_password(24) + assert len(password) == 24 + assert re.match(r"^[A-Za-z0-9]+$", password) + + +def test_mailu_resolve_email_attribute() -> None: + attrs = {"mailu_email": ["custom@bstein.dev"]} + assert MailuService.resolve_mailu_email("alice", attrs) == "custom@bstein.dev" + + +def test_mailu_resolve_email_default() -> None: + assert MailuService.resolve_mailu_email("alice", {}) == "alice@bstein.dev" + + +def test_safe_error_detail_runtime() -> None: + assert safe_error_detail(RuntimeError("boom"), "fallback") == "boom"