314 lines
11 KiB
Python

from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.request import urlopen
from flask import Flask, jsonify, request, send_from_directory
from flask_cors import CORS
import httpx
app = Flask(__name__, static_folder="../frontend/dist", static_url_path="")
CORS(app, resources={r"/api/*": {"origins": "*"}})
MONERO_GET_INFO_URL = os.getenv("MONERO_GET_INFO_URL", "http://monerod.crypto.svc.cluster.local:18081/get_info")
VM_BASE_URL = os.getenv(
"VM_BASE_URL",
"http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428",
).rstrip("/")
VM_QUERY_TIMEOUT_SEC = float(os.getenv("VM_QUERY_TIMEOUT_SEC", "2"))
HTTP_CHECK_TIMEOUT_SEC = float(os.getenv("HTTP_CHECK_TIMEOUT_SEC", "2"))
LAB_STATUS_CACHE_SEC = float(os.getenv("LAB_STATUS_CACHE_SEC", "30"))
GRAFANA_HEALTH_URL = os.getenv("GRAFANA_HEALTH_URL", "https://metrics.bstein.dev/api/health")
OCEANUS_NODE_EXPORTER_URL = os.getenv("OCEANUS_NODE_EXPORTER_URL", "http://192.168.22.24:9100/metrics")
AI_CHAT_API = os.getenv("AI_CHAT_API", "http://ollama.ai.svc.cluster.local:11434").rstrip("/")
AI_CHAT_MODEL = os.getenv("AI_CHAT_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
AI_CHAT_SYSTEM_PROMPT = os.getenv(
"AI_CHAT_SYSTEM_PROMPT",
"You are the Titan Lab assistant for bstein.dev. Be concise and helpful.",
)
AI_CHAT_TIMEOUT_SEC = float(os.getenv("AI_CHAT_TIMEOUT_SEC", "20"))
AI_NODE_NAME = os.getenv("AI_CHAT_NODE_NAME") or os.getenv("AI_NODE_NAME") or "ai-cluster"
AI_GPU_DESC = os.getenv("AI_CHAT_GPU_DESC") or "local GPU (dynamic)"
AI_PUBLIC_ENDPOINT = os.getenv("AI_PUBLIC_CHAT_ENDPOINT", "https://chat.ai.bstein.dev/api/chat")
AI_K8S_LABEL = os.getenv("AI_K8S_LABEL", "app=ollama")
AI_K8S_NAMESPACE = os.getenv("AI_K8S_NAMESPACE", "ai")
AI_MODEL_ANNOTATION = os.getenv("AI_MODEL_ANNOTATION", "ai.bstein.dev/model")
AI_GPU_ANNOTATION = os.getenv("AI_GPU_ANNOTATION", "ai.bstein.dev/gpu")
AI_WARM_INTERVAL_SEC = float(os.getenv("AI_WARM_INTERVAL_SEC", "300"))
AI_WARM_ENABLED = os.getenv("AI_WARM_ENABLED", "true").lower() in ("1", "true", "yes")
_LAB_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "value": None}
@app.route("/api/healthz")
def healthz() -> Any:
return jsonify({"ok": True})
@app.route("/api/monero/get_info")
def monero_get_info() -> Any:
try:
with urlopen(MONERO_GET_INFO_URL, timeout=2) as resp:
payload = json.loads(resp.read().decode("utf-8"))
return jsonify(payload)
except (URLError, TimeoutError, ValueError) as exc:
return jsonify({"error": str(exc), "url": MONERO_GET_INFO_URL}), 503
def _vm_query(expr: str) -> float | None:
url = f"{VM_BASE_URL}/api/v1/query?{urlencode({'query': expr})}"
with urlopen(url, timeout=VM_QUERY_TIMEOUT_SEC) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if payload.get("status") != "success":
return None
result = (payload.get("data") or {}).get("result") or []
if not result:
return None
values: list[float] = []
for item in result:
try:
values.append(float(item["value"][1]))
except (KeyError, IndexError, TypeError, ValueError):
continue
if not values:
return None
return max(values)
def _http_ok(url: str, expect_substring: str | None = None) -> bool:
try:
with urlopen(url, timeout=HTTP_CHECK_TIMEOUT_SEC) as resp:
if getattr(resp, "status", 200) != 200:
return False
if expect_substring:
chunk = resp.read(4096).decode("utf-8", errors="ignore")
return expect_substring in chunk
return True
except (URLError, TimeoutError, ValueError):
return False
@app.route("/api/lab/status")
def lab_status() -> Any:
now = time.time()
cached = _LAB_STATUS_CACHE.get("value")
if cached and (now - float(_LAB_STATUS_CACHE.get("ts", 0.0)) < LAB_STATUS_CACHE_SEC):
return jsonify(cached)
connected = False
atlas_up = False
atlas_known = False
atlas_source = "unknown"
oceanus_up = False
oceanus_known = False
oceanus_source = "unknown"
try:
atlas_value = _vm_query('max(up{job="kubernetes-apiservers"})')
oceanus_value = _vm_query('max(up{instance=~"(titan-23|192.168.22.24)(:9100)?"})')
connected = True
atlas_known = atlas_value is not None
atlas_up = bool(atlas_value and atlas_value > 0.5)
atlas_source = "victoria-metrics"
oceanus_known = oceanus_value is not None
oceanus_up = bool(oceanus_value and oceanus_value > 0.5)
oceanus_source = "victoria-metrics"
except (URLError, TimeoutError, ValueError):
atlas_value = None
oceanus_value = None
if not atlas_known:
if _http_ok(GRAFANA_HEALTH_URL):
connected = True
atlas_known = True
atlas_up = True
atlas_source = "grafana-health"
if not oceanus_up:
if _http_ok(OCEANUS_NODE_EXPORTER_URL, expect_substring="node_exporter_build_info"):
connected = True
oceanus_known = True
oceanus_up = True
oceanus_source = "node-exporter"
payload = {
"connected": connected,
"atlas": {"up": atlas_up, "known": atlas_known, "source": atlas_source},
"oceanus": {"up": oceanus_up, "known": oceanus_known, "source": oceanus_source},
"checked_at": int(now),
}
_LAB_STATUS_CACHE["ts"] = now
_LAB_STATUS_CACHE["value"] = payload
return jsonify(payload)
@app.route("/api/chat", methods=["POST"])
@app.route("/api/ai/chat", methods=["POST"])
def ai_chat() -> Any:
payload = request.get_json(silent=True) or {}
user_message = (payload.get("message") or "").strip()
history = payload.get("history") or []
if not user_message:
return jsonify({"error": "message required"}), 400
messages: list[dict[str, str]] = []
if AI_CHAT_SYSTEM_PROMPT:
messages.append({"role": "system", "content": AI_CHAT_SYSTEM_PROMPT})
for item in history:
role = item.get("role")
content = (item.get("content") or "").strip()
if role in ("user", "assistant") and content:
messages.append({"role": role, "content": content})
messages.append({"role": "user", "content": user_message})
body = {"model": AI_CHAT_MODEL, "messages": messages, "stream": False}
started = time.time()
try:
with httpx.Client(timeout=AI_CHAT_TIMEOUT_SEC) as client:
resp = client.post(f"{AI_CHAT_API}/api/chat", json=body)
resp.raise_for_status()
data = resp.json()
reply = (data.get("message") or {}).get("content") or ""
elapsed_ms = int((time.time() - started) * 1000)
return jsonify({"reply": reply, "latency_ms": elapsed_ms})
except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as exc:
return jsonify({"error": str(exc)}), 502
@app.route("/api/chat/info", methods=["GET"])
@app.route("/api/ai/info", methods=["GET"])
def ai_info() -> Any:
meta = _discover_ai_meta()
return jsonify(meta)
def _discover_ai_meta() -> dict[str, str]:
"""
Best-effort discovery of which node/gpu is hosting the AI service.
Tries the Kubernetes API using the service account if available; falls back to env.
"""
meta = {
"node": AI_NODE_NAME,
"gpu": AI_GPU_DESC,
"model": AI_CHAT_MODEL,
"endpoint": AI_PUBLIC_ENDPOINT or "/api/chat",
}
# Only attempt k8s if we're in-cluster and credentials exist.
sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount")
token_path = sa_path / "token"
ca_path = sa_path / "ca.crt"
ns_path = sa_path / "namespace"
if not token_path.exists() or not ca_path.exists() or not ns_path.exists():
return meta
try:
token = token_path.read_text().strip()
namespace = AI_K8S_NAMESPACE
base_url = "https://kubernetes.default.svc"
pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={AI_K8S_LABEL}"
with httpx.Client(verify=str(ca_path), timeout=HTTP_CHECK_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}) as client:
resp = client.get(pod_url)
resp.raise_for_status()
data = resp.json()
items = data.get("items") or []
running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items
if running:
pod = running[0]
node_name = pod.get("spec", {}).get("nodeName") or meta["node"]
meta["node"] = node_name
annotations = pod.get("metadata", {}).get("annotations") or {}
gpu_hint = (
annotations.get(AI_GPU_ANNOTATION)
or annotations.get("ai.gpu/description")
or annotations.get("gpu/description")
)
if gpu_hint:
meta["gpu"] = gpu_hint
model_hint = annotations.get(AI_MODEL_ANNOTATION)
if not model_hint:
# Try to infer from container image tag.
containers = pod.get("spec", {}).get("containers") or []
if containers:
image = containers[0].get("image") or ""
model_hint = image.split(":")[-1] if ":" in image else image
if model_hint:
meta["model"] = model_hint
except Exception:
# swallow errors; keep fallbacks
pass
return meta
def _keep_warm() -> None:
"""Periodically ping the model to keep it warm."""
if not AI_WARM_ENABLED or AI_WARM_INTERVAL_SEC <= 0:
return
def loop() -> None:
while True:
time.sleep(AI_WARM_INTERVAL_SEC)
try:
body = {
"model": AI_CHAT_MODEL,
"messages": [{"role": "user", "content": "ping"}],
"stream": False,
}
with httpx.Client(timeout=min(AI_CHAT_TIMEOUT_SEC, 15)) as client:
client.post(f"{AI_CHAT_API}/api/chat", json=body)
except Exception:
# best-effort; ignore failures
continue
import threading
threading.Thread(target=loop, daemon=True, name="ai-keep-warm").start()
# Start keep-warm loop on import.
_keep_warm()
@app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def serve_frontend(path: str) -> Any:
dist_path = Path(app.static_folder)
index_path = dist_path / "index.html"
if dist_path.exists() and index_path.exists():
target = dist_path / path
if path and target.exists():
return send_from_directory(app.static_folder, path)
return send_from_directory(app.static_folder, "index.html")
return jsonify(
{
"message": "Frontend not built yet. Run `npm install && npm run build` inside frontend/, then restart Flask.",
"available_endpoints": ["/api/healthz", "/api/monero/get_info"],
}
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)