from __future__ import annotations import json import os import time from functools import wraps from pathlib import Path import secrets import string from typing import Any from urllib.error import URLError from urllib.parse import quote from urllib.parse import urlencode from urllib.request import urlopen from flask import Flask, g, jsonify, request, send_from_directory from flask_cors import CORS import httpx import jwt from jwt import PyJWKClient from werkzeug.middleware.proxy_fix import ProxyFix app = Flask(__name__, static_folder="../frontend/dist", static_url_path="") app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1) CORS(app, resources={r"/api/*": {"origins": "*"}}) MONERO_GET_INFO_URL = os.getenv("MONERO_GET_INFO_URL", "http://monerod.crypto.svc.cluster.local:18081/get_info") VM_BASE_URL = os.getenv( "VM_BASE_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428", ).rstrip("/") VM_QUERY_TIMEOUT_SEC = float(os.getenv("VM_QUERY_TIMEOUT_SEC", "2")) HTTP_CHECK_TIMEOUT_SEC = float(os.getenv("HTTP_CHECK_TIMEOUT_SEC", "2")) LAB_STATUS_CACHE_SEC = float(os.getenv("LAB_STATUS_CACHE_SEC", "30")) GRAFANA_HEALTH_URL = os.getenv("GRAFANA_HEALTH_URL", "https://metrics.bstein.dev/api/health") OCEANUS_NODE_EXPORTER_URL = os.getenv("OCEANUS_NODE_EXPORTER_URL", "http://192.168.22.24:9100/metrics") AI_CHAT_API = os.getenv("AI_CHAT_API", "http://ollama.ai.svc.cluster.local:11434").rstrip("/") AI_CHAT_MODEL = os.getenv("AI_CHAT_MODEL", "qwen2.5-coder:7b-instruct-q4_0") AI_CHAT_SYSTEM_PROMPT = os.getenv( "AI_CHAT_SYSTEM_PROMPT", "You are the Titan Lab assistant for bstein.dev. Be concise and helpful.", ) AI_CHAT_TIMEOUT_SEC = float(os.getenv("AI_CHAT_TIMEOUT_SEC", "20")) AI_NODE_NAME = os.getenv("AI_CHAT_NODE_NAME") or os.getenv("AI_NODE_NAME") or "ai-cluster" AI_GPU_DESC = os.getenv("AI_CHAT_GPU_DESC") or "local GPU (dynamic)" AI_PUBLIC_ENDPOINT = os.getenv("AI_PUBLIC_CHAT_ENDPOINT", "https://chat.ai.bstein.dev/api/chat") AI_K8S_LABEL = os.getenv("AI_K8S_LABEL", "app=ollama") AI_K8S_NAMESPACE = os.getenv("AI_K8S_NAMESPACE", "ai") AI_MODEL_ANNOTATION = os.getenv("AI_MODEL_ANNOTATION", "ai.bstein.dev/model") AI_GPU_ANNOTATION = os.getenv("AI_GPU_ANNOTATION", "ai.bstein.dev/gpu") AI_WARM_INTERVAL_SEC = float(os.getenv("AI_WARM_INTERVAL_SEC", "300")) AI_WARM_ENABLED = os.getenv("AI_WARM_ENABLED", "true").lower() in ("1", "true", "yes") KEYCLOAK_ENABLED = os.getenv("KEYCLOAK_ENABLED", "false").lower() in ("1", "true", "yes") KEYCLOAK_URL = os.getenv("KEYCLOAK_URL", "https://sso.bstein.dev").rstrip("/") KEYCLOAK_REALM = os.getenv("KEYCLOAK_REALM", "atlas") KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "bstein-dev-home") KEYCLOAK_ISSUER = os.getenv("KEYCLOAK_ISSUER", f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}").rstrip("/") KEYCLOAK_JWKS_URL = os.getenv("KEYCLOAK_JWKS_URL", f"{KEYCLOAK_ISSUER}/protocol/openid-connect/certs").rstrip("/") KEYCLOAK_ADMIN_URL = os.getenv("KEYCLOAK_ADMIN_URL", KEYCLOAK_URL).rstrip("/") KEYCLOAK_ADMIN_CLIENT_ID = os.getenv("KEYCLOAK_ADMIN_CLIENT_ID", "") KEYCLOAK_ADMIN_CLIENT_SECRET = os.getenv("KEYCLOAK_ADMIN_CLIENT_SECRET", "") KEYCLOAK_ADMIN_REALM = os.getenv("KEYCLOAK_ADMIN_REALM", KEYCLOAK_REALM) ACCOUNT_ALLOWED_GROUPS = [ g.strip() for g in os.getenv("ACCOUNT_ALLOWED_GROUPS", "dev,admin").split(",") if g.strip() ] MAILU_DOMAIN = os.getenv("MAILU_DOMAIN", "bstein.dev") MAILU_SYNC_URL = os.getenv( "MAILU_SYNC_URL", "http://mailu-sync-listener.mailu-mailserver.svc.cluster.local:8080/events", ).rstrip("/") JELLYFIN_SYNC_URL = os.getenv("JELLYFIN_SYNC_URL", "").rstrip("/") _KEYCLOAK_JWK_CLIENT: PyJWKClient | None = None _KEYCLOAK_ADMIN_TOKEN: dict[str, Any] = {"token": "", "expires_at": 0.0} _LAB_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "value": None} @app.route("/api/healthz") def healthz() -> Any: return jsonify({"ok": True}) def _normalize_groups(groups: Any) -> list[str]: if not isinstance(groups, list): return [] cleaned: list[str] = [] for gname in groups: if not isinstance(gname, str): continue cleaned.append(gname.lstrip("/")) return [gname for gname in cleaned if gname] def _extract_bearer_token() -> str | None: header = request.headers.get("Authorization", "") if not header: return None parts = header.split(None, 1) if len(parts) != 2: return None scheme, token = parts[0].lower(), parts[1].strip() if scheme != "bearer" or not token: return None return token def _get_jwk_client() -> PyJWKClient: global _KEYCLOAK_JWK_CLIENT if _KEYCLOAK_JWK_CLIENT is None: _KEYCLOAK_JWK_CLIENT = PyJWKClient(KEYCLOAK_JWKS_URL) return _KEYCLOAK_JWK_CLIENT def _verify_keycloak_token(token: str) -> dict[str, Any]: if not KEYCLOAK_ENABLED: raise ValueError("keycloak not enabled") signing_key = _get_jwk_client().get_signing_key_from_jwt(token).key claims = jwt.decode( token, signing_key, algorithms=["RS256"], options={"verify_aud": False}, issuer=KEYCLOAK_ISSUER, ) # Ensure this token was minted for our frontend client (defense-in-depth). azp = claims.get("azp") aud = claims.get("aud") aud_list: list[str] = [] if isinstance(aud, str): aud_list = [aud] elif isinstance(aud, list): aud_list = [a for a in aud if isinstance(a, str)] if azp != KEYCLOAK_CLIENT_ID and KEYCLOAK_CLIENT_ID not in aud_list: raise ValueError("token not issued for this client") return claims def require_auth(fn): @wraps(fn) def wrapper(*args, **kwargs): token = _extract_bearer_token() if not token: return jsonify({"error": "missing bearer token"}), 401 try: claims = _verify_keycloak_token(token) except Exception: return jsonify({"error": "invalid token"}), 401 g.keycloak_claims = claims g.keycloak_username = claims.get("preferred_username") or "" g.keycloak_email = claims.get("email") or "" g.keycloak_groups = _normalize_groups(claims.get("groups")) return fn(*args, **kwargs) return wrapper @app.route("/api/auth/config", methods=["GET"]) def auth_config() -> Any: if not KEYCLOAK_ENABLED: return jsonify({"enabled": False}) issuer = KEYCLOAK_ISSUER public_origin = request.host_url.rstrip("/") redirect_uri = quote(f"{public_origin}/", safe="") login_url = ( f"{issuer}/protocol/openid-connect/auth" f"?client_id={quote(KEYCLOAK_CLIENT_ID, safe='')}" f"&redirect_uri={redirect_uri}" f"&response_type=code" f"&scope=openid" ) return jsonify( { "enabled": True, "url": KEYCLOAK_URL, "realm": KEYCLOAK_REALM, "client_id": KEYCLOAK_CLIENT_ID, "login_url": login_url, "reset_url": login_url, } ) def _require_account_access() -> tuple[bool, Any]: if not KEYCLOAK_ENABLED: return False, (jsonify({"error": "keycloak not enabled"}), 503) if not ACCOUNT_ALLOWED_GROUPS: return True, None groups = set(getattr(g, "keycloak_groups", []) or []) if groups.intersection(ACCOUNT_ALLOWED_GROUPS): return True, None return False, (jsonify({"error": "forbidden"}), 403) def _kc_admin_ready() -> bool: return bool(KEYCLOAK_ADMIN_CLIENT_ID and KEYCLOAK_ADMIN_CLIENT_SECRET) def _kc_admin_get_token() -> str: if not _kc_admin_ready(): raise RuntimeError("keycloak admin client not configured") now = time.time() cached = _KEYCLOAK_ADMIN_TOKEN.get("token") expires_at = float(_KEYCLOAK_ADMIN_TOKEN.get("expires_at") or 0.0) if cached and now < expires_at - 30: return str(cached) token_url = f"{KEYCLOAK_ADMIN_URL}/realms/{KEYCLOAK_ADMIN_REALM}/protocol/openid-connect/token" data = { "grant_type": "client_credentials", "client_id": KEYCLOAK_ADMIN_CLIENT_ID, "client_secret": KEYCLOAK_ADMIN_CLIENT_SECRET, } with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.post(token_url, data=data) resp.raise_for_status() payload = resp.json() token = payload.get("access_token") or "" if not token: raise RuntimeError("no access_token in response") expires_in = int(payload.get("expires_in") or 60) _KEYCLOAK_ADMIN_TOKEN["token"] = token _KEYCLOAK_ADMIN_TOKEN["expires_at"] = now + expires_in return token def _kc_admin_headers() -> dict[str, str]: return {"Authorization": f"Bearer {_kc_admin_get_token()}"} def _kc_admin_find_user(username: str) -> dict[str, Any] | None: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users" params = {"username": username, "exact": "true"} with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.get(url, params=params, headers=_kc_admin_headers()) resp.raise_for_status() users = resp.json() if not isinstance(users, list) or not users: return None # Keycloak returns a list even with exact=true; use first match. user = users[0] return user if isinstance(user, dict) else None def _kc_admin_get_user(user_id: str) -> dict[str, Any]: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}" with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.get(url, headers=_kc_admin_headers()) resp.raise_for_status() data = resp.json() if not isinstance(data, dict): raise RuntimeError("unexpected user payload") return data def _kc_admin_update_user(user_id: str, payload: dict[str, Any]) -> None: url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}" with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: resp = client.put(url, headers={**_kc_admin_headers(), "Content-Type": "application/json"}, json=payload) resp.raise_for_status() def _kc_set_user_attribute(username: str, key: str, value: str) -> None: user = _kc_admin_find_user(username) if not user: raise RuntimeError("user not found") user_id = user.get("id") or "" if not user_id: raise RuntimeError("user id missing") full = _kc_admin_get_user(user_id) attrs = full.get("attributes") or {} if not isinstance(attrs, dict): attrs = {} attrs[key] = [value] full["attributes"] = attrs _kc_admin_update_user(user_id, full) def _random_password(length: int = 32) -> str: alphabet = string.ascii_letters + string.digits return "".join(secrets.choice(alphabet) for _ in range(length)) def _best_effort_post(url: str) -> None: if not url: return try: with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client: client.post(url, json={"ts": int(time.time())}) except Exception: return @app.route("/api/account/overview", methods=["GET"]) @require_auth def account_overview() -> Any: ok, resp = _require_account_access() if not ok: return resp username = g.keycloak_username mailu_username = f"{username}@{MAILU_DOMAIN}" if username else "" mailu_status = "ready" jellyfin_status = "ready" if not _kc_admin_ready(): mailu_status = "server not configured" jellyfin_status = "server not configured" return jsonify( { "user": {"username": username, "email": g.keycloak_email, "groups": g.keycloak_groups}, "mailu": {"status": mailu_status, "username": mailu_username}, "jellyfin": {"status": jellyfin_status, "username": username}, } ) @app.route("/api/account/mailu/rotate", methods=["POST"]) @require_auth def account_mailu_rotate() -> Any: ok, resp = _require_account_access() if not ok: return resp if not _kc_admin_ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 password = _random_password() try: _kc_set_user_attribute(username, "mailu_app_password", password) except Exception: return jsonify({"error": "failed to update mail password"}), 502 _best_effort_post(MAILU_SYNC_URL) return jsonify({"password": password}) @app.route("/api/account/jellyfin/rotate", methods=["POST"]) @require_auth def account_jellyfin_rotate() -> Any: ok, resp = _require_account_access() if not ok: return resp if not _kc_admin_ready(): return jsonify({"error": "server not configured"}), 503 username = g.keycloak_username if not username: return jsonify({"error": "missing username"}), 400 password = _random_password() try: _kc_set_user_attribute(username, "jellyfin_app_password", password) except Exception: return jsonify({"error": "failed to update jellyfin password"}), 502 _best_effort_post(JELLYFIN_SYNC_URL) return jsonify({"password": password}) @app.route("/api/monero/get_info") def monero_get_info() -> Any: try: with urlopen(MONERO_GET_INFO_URL, timeout=2) as resp: payload = json.loads(resp.read().decode("utf-8")) return jsonify(payload) except (URLError, TimeoutError, ValueError) as exc: return jsonify({"error": str(exc), "url": MONERO_GET_INFO_URL}), 503 def _vm_query(expr: str) -> float | None: url = f"{VM_BASE_URL}/api/v1/query?{urlencode({'query': expr})}" with urlopen(url, timeout=VM_QUERY_TIMEOUT_SEC) as resp: payload = json.loads(resp.read().decode("utf-8")) if payload.get("status") != "success": return None result = (payload.get("data") or {}).get("result") or [] if not result: return None values: list[float] = [] for item in result: try: values.append(float(item["value"][1])) except (KeyError, IndexError, TypeError, ValueError): continue if not values: return None return max(values) def _http_ok(url: str, expect_substring: str | None = None) -> bool: try: with urlopen(url, timeout=HTTP_CHECK_TIMEOUT_SEC) as resp: if getattr(resp, "status", 200) != 200: return False if expect_substring: chunk = resp.read(4096).decode("utf-8", errors="ignore") return expect_substring in chunk return True except (URLError, TimeoutError, ValueError): return False @app.route("/api/lab/status") def lab_status() -> Any: now = time.time() cached = _LAB_STATUS_CACHE.get("value") if cached and (now - float(_LAB_STATUS_CACHE.get("ts", 0.0)) < LAB_STATUS_CACHE_SEC): return jsonify(cached) connected = False atlas_up = False atlas_known = False atlas_source = "unknown" oceanus_up = False oceanus_known = False oceanus_source = "unknown" try: atlas_value = _vm_query('max(up{job="kubernetes-apiservers"})') oceanus_value = _vm_query('max(up{instance=~"(titan-23|192.168.22.24)(:9100)?"})') connected = True atlas_known = atlas_value is not None atlas_up = bool(atlas_value and atlas_value > 0.5) atlas_source = "victoria-metrics" oceanus_known = oceanus_value is not None oceanus_up = bool(oceanus_value and oceanus_value > 0.5) oceanus_source = "victoria-metrics" except (URLError, TimeoutError, ValueError): atlas_value = None oceanus_value = None if not atlas_known: if _http_ok(GRAFANA_HEALTH_URL): connected = True atlas_known = True atlas_up = True atlas_source = "grafana-health" if not oceanus_up: if _http_ok(OCEANUS_NODE_EXPORTER_URL, expect_substring="node_exporter_build_info"): connected = True oceanus_known = True oceanus_up = True oceanus_source = "node-exporter" payload = { "connected": connected, "atlas": {"up": atlas_up, "known": atlas_known, "source": atlas_source}, "oceanus": {"up": oceanus_up, "known": oceanus_known, "source": oceanus_source}, "checked_at": int(now), } _LAB_STATUS_CACHE["ts"] = now _LAB_STATUS_CACHE["value"] = payload return jsonify(payload) @app.route("/api/chat", methods=["POST"]) @app.route("/api/ai/chat", methods=["POST"]) def ai_chat() -> Any: payload = request.get_json(silent=True) or {} user_message = (payload.get("message") or "").strip() history = payload.get("history") or [] if not user_message: return jsonify({"error": "message required"}), 400 messages: list[dict[str, str]] = [] if AI_CHAT_SYSTEM_PROMPT: messages.append({"role": "system", "content": AI_CHAT_SYSTEM_PROMPT}) for item in history: role = item.get("role") content = (item.get("content") or "").strip() if role in ("user", "assistant") and content: messages.append({"role": role, "content": content}) messages.append({"role": "user", "content": user_message}) body = {"model": AI_CHAT_MODEL, "messages": messages, "stream": False} started = time.time() try: with httpx.Client(timeout=AI_CHAT_TIMEOUT_SEC) as client: resp = client.post(f"{AI_CHAT_API}/api/chat", json=body) resp.raise_for_status() data = resp.json() reply = (data.get("message") or {}).get("content") or "" elapsed_ms = int((time.time() - started) * 1000) return jsonify({"reply": reply, "latency_ms": elapsed_ms}) except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as exc: return jsonify({"error": str(exc)}), 502 @app.route("/api/chat/info", methods=["GET"]) @app.route("/api/ai/info", methods=["GET"]) def ai_info() -> Any: meta = _discover_ai_meta() return jsonify(meta) def _discover_ai_meta() -> dict[str, str]: """ Best-effort discovery of which node/gpu is hosting the AI service. Tries the Kubernetes API using the service account if available; falls back to env. """ meta = { "node": AI_NODE_NAME, "gpu": AI_GPU_DESC, "model": AI_CHAT_MODEL, "endpoint": AI_PUBLIC_ENDPOINT or "/api/chat", } # Only attempt k8s if we're in-cluster and credentials exist. sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount") token_path = sa_path / "token" ca_path = sa_path / "ca.crt" ns_path = sa_path / "namespace" if not token_path.exists() or not ca_path.exists() or not ns_path.exists(): return meta try: token = token_path.read_text().strip() namespace = AI_K8S_NAMESPACE base_url = "https://kubernetes.default.svc" pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={AI_K8S_LABEL}" with httpx.Client(verify=str(ca_path), timeout=HTTP_CHECK_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}) as client: resp = client.get(pod_url) resp.raise_for_status() data = resp.json() items = data.get("items") or [] running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items if running: pod = running[0] node_name = pod.get("spec", {}).get("nodeName") or meta["node"] meta["node"] = node_name annotations = pod.get("metadata", {}).get("annotations") or {} gpu_hint = ( annotations.get(AI_GPU_ANNOTATION) or annotations.get("ai.gpu/description") or annotations.get("gpu/description") ) if gpu_hint: meta["gpu"] = gpu_hint model_hint = annotations.get(AI_MODEL_ANNOTATION) if not model_hint: # Try to infer from container image tag. containers = pod.get("spec", {}).get("containers") or [] if containers: image = containers[0].get("image") or "" model_hint = image.split(":")[-1] if ":" in image else image if model_hint: meta["model"] = model_hint except Exception: # swallow errors; keep fallbacks pass return meta def _keep_warm() -> None: """Periodically ping the model to keep it warm.""" if not AI_WARM_ENABLED or AI_WARM_INTERVAL_SEC <= 0: return def loop() -> None: while True: time.sleep(AI_WARM_INTERVAL_SEC) try: body = { "model": AI_CHAT_MODEL, "messages": [{"role": "user", "content": "ping"}], "stream": False, } with httpx.Client(timeout=min(AI_CHAT_TIMEOUT_SEC, 15)) as client: client.post(f"{AI_CHAT_API}/api/chat", json=body) except Exception: # best-effort; ignore failures continue import threading threading.Thread(target=loop, daemon=True, name="ai-keep-warm").start() # Start keep-warm loop on import. _keep_warm() @app.route("/", defaults={"path": ""}) @app.route("/") def serve_frontend(path: str) -> Any: dist_path = Path(app.static_folder) index_path = dist_path / "index.html" if dist_path.exists() and index_path.exists(): target = dist_path / path if path and target.exists(): return send_from_directory(app.static_folder, path) return send_from_directory(app.static_folder, "index.html") return jsonify( { "message": "Frontend not built yet. Run `npm install && npm run build` inside frontend/, then restart Flask.", "available_endpoints": ["/api/healthz", "/api/monero/get_info"], } ) if __name__ == "__main__": app.run(host="0.0.0.0", port=5000, debug=True)