640 lines
22 KiB
Python

from __future__ import annotations
import json
import os
import time
from functools import wraps
from pathlib import Path
import secrets
import string
from typing import Any
from urllib.error import URLError
from urllib.parse import quote
from urllib.parse import urlencode
from urllib.request import urlopen
from flask import Flask, g, jsonify, request, send_from_directory
from flask_cors import CORS
import httpx
import jwt
from jwt import PyJWKClient
app = Flask(__name__, static_folder="../frontend/dist", static_url_path="")
CORS(app, resources={r"/api/*": {"origins": "*"}})
MONERO_GET_INFO_URL = os.getenv("MONERO_GET_INFO_URL", "http://monerod.crypto.svc.cluster.local:18081/get_info")
VM_BASE_URL = os.getenv(
"VM_BASE_URL",
"http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428",
).rstrip("/")
VM_QUERY_TIMEOUT_SEC = float(os.getenv("VM_QUERY_TIMEOUT_SEC", "2"))
HTTP_CHECK_TIMEOUT_SEC = float(os.getenv("HTTP_CHECK_TIMEOUT_SEC", "2"))
LAB_STATUS_CACHE_SEC = float(os.getenv("LAB_STATUS_CACHE_SEC", "30"))
GRAFANA_HEALTH_URL = os.getenv("GRAFANA_HEALTH_URL", "https://metrics.bstein.dev/api/health")
OCEANUS_NODE_EXPORTER_URL = os.getenv("OCEANUS_NODE_EXPORTER_URL", "http://192.168.22.24:9100/metrics")
AI_CHAT_API = os.getenv("AI_CHAT_API", "http://ollama.ai.svc.cluster.local:11434").rstrip("/")
AI_CHAT_MODEL = os.getenv("AI_CHAT_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
AI_CHAT_SYSTEM_PROMPT = os.getenv(
"AI_CHAT_SYSTEM_PROMPT",
"You are the Titan Lab assistant for bstein.dev. Be concise and helpful.",
)
AI_CHAT_TIMEOUT_SEC = float(os.getenv("AI_CHAT_TIMEOUT_SEC", "20"))
AI_NODE_NAME = os.getenv("AI_CHAT_NODE_NAME") or os.getenv("AI_NODE_NAME") or "ai-cluster"
AI_GPU_DESC = os.getenv("AI_CHAT_GPU_DESC") or "local GPU (dynamic)"
AI_PUBLIC_ENDPOINT = os.getenv("AI_PUBLIC_CHAT_ENDPOINT", "https://chat.ai.bstein.dev/api/chat")
AI_K8S_LABEL = os.getenv("AI_K8S_LABEL", "app=ollama")
AI_K8S_NAMESPACE = os.getenv("AI_K8S_NAMESPACE", "ai")
AI_MODEL_ANNOTATION = os.getenv("AI_MODEL_ANNOTATION", "ai.bstein.dev/model")
AI_GPU_ANNOTATION = os.getenv("AI_GPU_ANNOTATION", "ai.bstein.dev/gpu")
AI_WARM_INTERVAL_SEC = float(os.getenv("AI_WARM_INTERVAL_SEC", "300"))
AI_WARM_ENABLED = os.getenv("AI_WARM_ENABLED", "true").lower() in ("1", "true", "yes")
KEYCLOAK_ENABLED = os.getenv("KEYCLOAK_ENABLED", "false").lower() in ("1", "true", "yes")
KEYCLOAK_URL = os.getenv("KEYCLOAK_URL", "https://sso.bstein.dev").rstrip("/")
KEYCLOAK_REALM = os.getenv("KEYCLOAK_REALM", "atlas")
KEYCLOAK_CLIENT_ID = os.getenv("KEYCLOAK_CLIENT_ID", "bstein-dev-home")
KEYCLOAK_ISSUER = os.getenv("KEYCLOAK_ISSUER", f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}").rstrip("/")
KEYCLOAK_JWKS_URL = os.getenv("KEYCLOAK_JWKS_URL", f"{KEYCLOAK_ISSUER}/protocol/openid-connect/certs").rstrip("/")
KEYCLOAK_ADMIN_URL = os.getenv("KEYCLOAK_ADMIN_URL", KEYCLOAK_URL).rstrip("/")
KEYCLOAK_ADMIN_CLIENT_ID = os.getenv("KEYCLOAK_ADMIN_CLIENT_ID", "")
KEYCLOAK_ADMIN_CLIENT_SECRET = os.getenv("KEYCLOAK_ADMIN_CLIENT_SECRET", "")
KEYCLOAK_ADMIN_REALM = os.getenv("KEYCLOAK_ADMIN_REALM", KEYCLOAK_REALM)
ACCOUNT_ALLOWED_GROUPS = [
g.strip()
for g in os.getenv("ACCOUNT_ALLOWED_GROUPS", "dev,admin").split(",")
if g.strip()
]
MAILU_DOMAIN = os.getenv("MAILU_DOMAIN", "bstein.dev")
MAILU_SYNC_URL = os.getenv(
"MAILU_SYNC_URL",
"http://mailu-sync-listener.mailu-mailserver.svc.cluster.local:8080/events",
).rstrip("/")
JELLYFIN_SYNC_URL = os.getenv("JELLYFIN_SYNC_URL", "").rstrip("/")
_KEYCLOAK_JWK_CLIENT: PyJWKClient | None = None
_KEYCLOAK_ADMIN_TOKEN: dict[str, Any] = {"token": "", "expires_at": 0.0}
_LAB_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "value": None}
@app.route("/api/healthz")
def healthz() -> Any:
return jsonify({"ok": True})
def _normalize_groups(groups: Any) -> list[str]:
if not isinstance(groups, list):
return []
cleaned: list[str] = []
for gname in groups:
if not isinstance(gname, str):
continue
cleaned.append(gname.lstrip("/"))
return [gname for gname in cleaned if gname]
def _extract_bearer_token() -> str | None:
header = request.headers.get("Authorization", "")
if not header:
return None
parts = header.split(None, 1)
if len(parts) != 2:
return None
scheme, token = parts[0].lower(), parts[1].strip()
if scheme != "bearer" or not token:
return None
return token
def _get_jwk_client() -> PyJWKClient:
global _KEYCLOAK_JWK_CLIENT
if _KEYCLOAK_JWK_CLIENT is None:
_KEYCLOAK_JWK_CLIENT = PyJWKClient(KEYCLOAK_JWKS_URL)
return _KEYCLOAK_JWK_CLIENT
def _verify_keycloak_token(token: str) -> dict[str, Any]:
if not KEYCLOAK_ENABLED:
raise ValueError("keycloak not enabled")
signing_key = _get_jwk_client().get_signing_key_from_jwt(token).key
claims = jwt.decode(
token,
signing_key,
algorithms=["RS256"],
options={"verify_aud": False},
issuer=KEYCLOAK_ISSUER,
)
# Ensure this token was minted for our frontend client (defense-in-depth).
azp = claims.get("azp")
aud = claims.get("aud")
aud_list: list[str] = []
if isinstance(aud, str):
aud_list = [aud]
elif isinstance(aud, list):
aud_list = [a for a in aud if isinstance(a, str)]
if azp != KEYCLOAK_CLIENT_ID and KEYCLOAK_CLIENT_ID not in aud_list:
raise ValueError("token not issued for this client")
return claims
def require_auth(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
token = _extract_bearer_token()
if not token:
return jsonify({"error": "missing bearer token"}), 401
try:
claims = _verify_keycloak_token(token)
except Exception:
return jsonify({"error": "invalid token"}), 401
g.keycloak_claims = claims
g.keycloak_username = claims.get("preferred_username") or ""
g.keycloak_email = claims.get("email") or ""
g.keycloak_groups = _normalize_groups(claims.get("groups"))
return fn(*args, **kwargs)
return wrapper
@app.route("/api/auth/config", methods=["GET"])
def auth_config() -> Any:
if not KEYCLOAK_ENABLED:
return jsonify({"enabled": False})
issuer = KEYCLOAK_ISSUER
public_origin = request.host_url.rstrip("/")
redirect_uri = quote(f"{public_origin}/", safe="")
login_url = (
f"{issuer}/protocol/openid-connect/auth"
f"?client_id={quote(KEYCLOAK_CLIENT_ID, safe='')}"
f"&redirect_uri={redirect_uri}"
f"&response_type=code"
f"&scope=openid"
)
return jsonify(
{
"enabled": True,
"url": KEYCLOAK_URL,
"realm": KEYCLOAK_REALM,
"client_id": KEYCLOAK_CLIENT_ID,
"login_url": login_url,
"reset_url": login_url,
}
)
def _require_account_access() -> tuple[bool, Any]:
if not KEYCLOAK_ENABLED:
return False, (jsonify({"error": "keycloak not enabled"}), 503)
if not ACCOUNT_ALLOWED_GROUPS:
return True, None
groups = set(getattr(g, "keycloak_groups", []) or [])
if groups.intersection(ACCOUNT_ALLOWED_GROUPS):
return True, None
return False, (jsonify({"error": "forbidden"}), 403)
def _kc_admin_ready() -> bool:
return bool(KEYCLOAK_ADMIN_CLIENT_ID and KEYCLOAK_ADMIN_CLIENT_SECRET)
def _kc_admin_get_token() -> str:
if not _kc_admin_ready():
raise RuntimeError("keycloak admin client not configured")
now = time.time()
cached = _KEYCLOAK_ADMIN_TOKEN.get("token")
expires_at = float(_KEYCLOAK_ADMIN_TOKEN.get("expires_at") or 0.0)
if cached and now < expires_at - 30:
return str(cached)
token_url = f"{KEYCLOAK_ADMIN_URL}/realms/{KEYCLOAK_ADMIN_REALM}/protocol/openid-connect/token"
data = {
"grant_type": "client_credentials",
"client_id": KEYCLOAK_ADMIN_CLIENT_ID,
"client_secret": KEYCLOAK_ADMIN_CLIENT_SECRET,
}
with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client:
resp = client.post(token_url, data=data)
resp.raise_for_status()
payload = resp.json()
token = payload.get("access_token") or ""
if not token:
raise RuntimeError("no access_token in response")
expires_in = int(payload.get("expires_in") or 60)
_KEYCLOAK_ADMIN_TOKEN["token"] = token
_KEYCLOAK_ADMIN_TOKEN["expires_at"] = now + expires_in
return token
def _kc_admin_headers() -> dict[str, str]:
return {"Authorization": f"Bearer {_kc_admin_get_token()}"}
def _kc_admin_find_user(username: str) -> dict[str, Any] | None:
url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users"
params = {"username": username, "exact": "true"}
with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client:
resp = client.get(url, params=params, headers=_kc_admin_headers())
resp.raise_for_status()
users = resp.json()
if not isinstance(users, list) or not users:
return None
# Keycloak returns a list even with exact=true; use first match.
user = users[0]
return user if isinstance(user, dict) else None
def _kc_admin_get_user(user_id: str) -> dict[str, Any]:
url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}"
with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client:
resp = client.get(url, headers=_kc_admin_headers())
resp.raise_for_status()
data = resp.json()
if not isinstance(data, dict):
raise RuntimeError("unexpected user payload")
return data
def _kc_admin_update_user(user_id: str, payload: dict[str, Any]) -> None:
url = f"{KEYCLOAK_ADMIN_URL}/admin/realms/{KEYCLOAK_REALM}/users/{quote(user_id, safe='')}"
with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client:
resp = client.put(url, headers={**_kc_admin_headers(), "Content-Type": "application/json"}, json=payload)
resp.raise_for_status()
def _kc_set_user_attribute(username: str, key: str, value: str) -> None:
user = _kc_admin_find_user(username)
if not user:
raise RuntimeError("user not found")
user_id = user.get("id") or ""
if not user_id:
raise RuntimeError("user id missing")
full = _kc_admin_get_user(user_id)
attrs = full.get("attributes") or {}
if not isinstance(attrs, dict):
attrs = {}
attrs[key] = [value]
full["attributes"] = attrs
_kc_admin_update_user(user_id, full)
def _random_password(length: int = 32) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
def _best_effort_post(url: str) -> None:
if not url:
return
try:
with httpx.Client(timeout=HTTP_CHECK_TIMEOUT_SEC) as client:
client.post(url, json={"ts": int(time.time())})
except Exception:
return
@app.route("/api/account/overview", methods=["GET"])
@require_auth
def account_overview() -> Any:
ok, resp = _require_account_access()
if not ok:
return resp
username = g.keycloak_username
mailu_username = f"{username}@{MAILU_DOMAIN}" if username else ""
mailu_status = "ready"
jellyfin_status = "ready"
if not _kc_admin_ready():
mailu_status = "server not configured"
jellyfin_status = "server not configured"
return jsonify(
{
"user": {"username": username, "email": g.keycloak_email, "groups": g.keycloak_groups},
"mailu": {"status": mailu_status, "username": mailu_username},
"jellyfin": {"status": jellyfin_status, "username": username},
}
)
@app.route("/api/account/mailu/rotate", methods=["POST"])
@require_auth
def account_mailu_rotate() -> Any:
ok, resp = _require_account_access()
if not ok:
return resp
if not _kc_admin_ready():
return jsonify({"error": "server not configured"}), 503
username = g.keycloak_username
if not username:
return jsonify({"error": "missing username"}), 400
password = _random_password()
try:
_kc_set_user_attribute(username, "mailu_app_password", password)
except Exception:
return jsonify({"error": "failed to update mail password"}), 502
_best_effort_post(MAILU_SYNC_URL)
return jsonify({"password": password})
@app.route("/api/account/jellyfin/rotate", methods=["POST"])
@require_auth
def account_jellyfin_rotate() -> Any:
ok, resp = _require_account_access()
if not ok:
return resp
if not _kc_admin_ready():
return jsonify({"error": "server not configured"}), 503
username = g.keycloak_username
if not username:
return jsonify({"error": "missing username"}), 400
password = _random_password()
try:
_kc_set_user_attribute(username, "jellyfin_app_password", password)
except Exception:
return jsonify({"error": "failed to update jellyfin password"}), 502
_best_effort_post(JELLYFIN_SYNC_URL)
return jsonify({"password": password})
@app.route("/api/monero/get_info")
def monero_get_info() -> Any:
try:
with urlopen(MONERO_GET_INFO_URL, timeout=2) as resp:
payload = json.loads(resp.read().decode("utf-8"))
return jsonify(payload)
except (URLError, TimeoutError, ValueError) as exc:
return jsonify({"error": str(exc), "url": MONERO_GET_INFO_URL}), 503
def _vm_query(expr: str) -> float | None:
url = f"{VM_BASE_URL}/api/v1/query?{urlencode({'query': expr})}"
with urlopen(url, timeout=VM_QUERY_TIMEOUT_SEC) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if payload.get("status") != "success":
return None
result = (payload.get("data") or {}).get("result") or []
if not result:
return None
values: list[float] = []
for item in result:
try:
values.append(float(item["value"][1]))
except (KeyError, IndexError, TypeError, ValueError):
continue
if not values:
return None
return max(values)
def _http_ok(url: str, expect_substring: str | None = None) -> bool:
try:
with urlopen(url, timeout=HTTP_CHECK_TIMEOUT_SEC) as resp:
if getattr(resp, "status", 200) != 200:
return False
if expect_substring:
chunk = resp.read(4096).decode("utf-8", errors="ignore")
return expect_substring in chunk
return True
except (URLError, TimeoutError, ValueError):
return False
@app.route("/api/lab/status")
def lab_status() -> Any:
now = time.time()
cached = _LAB_STATUS_CACHE.get("value")
if cached and (now - float(_LAB_STATUS_CACHE.get("ts", 0.0)) < LAB_STATUS_CACHE_SEC):
return jsonify(cached)
connected = False
atlas_up = False
atlas_known = False
atlas_source = "unknown"
oceanus_up = False
oceanus_known = False
oceanus_source = "unknown"
try:
atlas_value = _vm_query('max(up{job="kubernetes-apiservers"})')
oceanus_value = _vm_query('max(up{instance=~"(titan-23|192.168.22.24)(:9100)?"})')
connected = True
atlas_known = atlas_value is not None
atlas_up = bool(atlas_value and atlas_value > 0.5)
atlas_source = "victoria-metrics"
oceanus_known = oceanus_value is not None
oceanus_up = bool(oceanus_value and oceanus_value > 0.5)
oceanus_source = "victoria-metrics"
except (URLError, TimeoutError, ValueError):
atlas_value = None
oceanus_value = None
if not atlas_known:
if _http_ok(GRAFANA_HEALTH_URL):
connected = True
atlas_known = True
atlas_up = True
atlas_source = "grafana-health"
if not oceanus_up:
if _http_ok(OCEANUS_NODE_EXPORTER_URL, expect_substring="node_exporter_build_info"):
connected = True
oceanus_known = True
oceanus_up = True
oceanus_source = "node-exporter"
payload = {
"connected": connected,
"atlas": {"up": atlas_up, "known": atlas_known, "source": atlas_source},
"oceanus": {"up": oceanus_up, "known": oceanus_known, "source": oceanus_source},
"checked_at": int(now),
}
_LAB_STATUS_CACHE["ts"] = now
_LAB_STATUS_CACHE["value"] = payload
return jsonify(payload)
@app.route("/api/chat", methods=["POST"])
@app.route("/api/ai/chat", methods=["POST"])
def ai_chat() -> Any:
payload = request.get_json(silent=True) or {}
user_message = (payload.get("message") or "").strip()
history = payload.get("history") or []
if not user_message:
return jsonify({"error": "message required"}), 400
messages: list[dict[str, str]] = []
if AI_CHAT_SYSTEM_PROMPT:
messages.append({"role": "system", "content": AI_CHAT_SYSTEM_PROMPT})
for item in history:
role = item.get("role")
content = (item.get("content") or "").strip()
if role in ("user", "assistant") and content:
messages.append({"role": role, "content": content})
messages.append({"role": "user", "content": user_message})
body = {"model": AI_CHAT_MODEL, "messages": messages, "stream": False}
started = time.time()
try:
with httpx.Client(timeout=AI_CHAT_TIMEOUT_SEC) as client:
resp = client.post(f"{AI_CHAT_API}/api/chat", json=body)
resp.raise_for_status()
data = resp.json()
reply = (data.get("message") or {}).get("content") or ""
elapsed_ms = int((time.time() - started) * 1000)
return jsonify({"reply": reply, "latency_ms": elapsed_ms})
except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as exc:
return jsonify({"error": str(exc)}), 502
@app.route("/api/chat/info", methods=["GET"])
@app.route("/api/ai/info", methods=["GET"])
def ai_info() -> Any:
meta = _discover_ai_meta()
return jsonify(meta)
def _discover_ai_meta() -> dict[str, str]:
"""
Best-effort discovery of which node/gpu is hosting the AI service.
Tries the Kubernetes API using the service account if available; falls back to env.
"""
meta = {
"node": AI_NODE_NAME,
"gpu": AI_GPU_DESC,
"model": AI_CHAT_MODEL,
"endpoint": AI_PUBLIC_ENDPOINT or "/api/chat",
}
# Only attempt k8s if we're in-cluster and credentials exist.
sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount")
token_path = sa_path / "token"
ca_path = sa_path / "ca.crt"
ns_path = sa_path / "namespace"
if not token_path.exists() or not ca_path.exists() or not ns_path.exists():
return meta
try:
token = token_path.read_text().strip()
namespace = AI_K8S_NAMESPACE
base_url = "https://kubernetes.default.svc"
pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={AI_K8S_LABEL}"
with httpx.Client(verify=str(ca_path), timeout=HTTP_CHECK_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}) as client:
resp = client.get(pod_url)
resp.raise_for_status()
data = resp.json()
items = data.get("items") or []
running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items
if running:
pod = running[0]
node_name = pod.get("spec", {}).get("nodeName") or meta["node"]
meta["node"] = node_name
annotations = pod.get("metadata", {}).get("annotations") or {}
gpu_hint = (
annotations.get(AI_GPU_ANNOTATION)
or annotations.get("ai.gpu/description")
or annotations.get("gpu/description")
)
if gpu_hint:
meta["gpu"] = gpu_hint
model_hint = annotations.get(AI_MODEL_ANNOTATION)
if not model_hint:
# Try to infer from container image tag.
containers = pod.get("spec", {}).get("containers") or []
if containers:
image = containers[0].get("image") or ""
model_hint = image.split(":")[-1] if ":" in image else image
if model_hint:
meta["model"] = model_hint
except Exception:
# swallow errors; keep fallbacks
pass
return meta
def _keep_warm() -> None:
"""Periodically ping the model to keep it warm."""
if not AI_WARM_ENABLED or AI_WARM_INTERVAL_SEC <= 0:
return
def loop() -> None:
while True:
time.sleep(AI_WARM_INTERVAL_SEC)
try:
body = {
"model": AI_CHAT_MODEL,
"messages": [{"role": "user", "content": "ping"}],
"stream": False,
}
with httpx.Client(timeout=min(AI_CHAT_TIMEOUT_SEC, 15)) as client:
client.post(f"{AI_CHAT_API}/api/chat", json=body)
except Exception:
# best-effort; ignore failures
continue
import threading
threading.Thread(target=loop, daemon=True, name="ai-keep-warm").start()
# Start keep-warm loop on import.
_keep_warm()
@app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def serve_frontend(path: str) -> Any:
dist_path = Path(app.static_folder)
index_path = dist_path / "index.html"
if dist_path.exists() and index_path.exists():
target = dist_path / path
if path and target.exists():
return send_from_directory(app.static_folder, path)
return send_from_directory(app.static_folder, "index.html")
return jsonify(
{
"message": "Frontend not built yet. Run `npm install && npm run build` inside frontend/, then restart Flask.",
"available_endpoints": ["/api/healthz", "/api/monero/get_info"],
}
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)