312 lines
11 KiB
Python
Raw Normal View History

2025-12-18 01:13:04 -03:00
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.request import urlopen
2025-12-20 14:25:55 -03:00
from flask import Flask, jsonify, request, send_from_directory
2025-12-18 01:13:04 -03:00
from flask_cors import CORS
2025-12-20 14:25:55 -03:00
import httpx
2025-12-18 01:13:04 -03:00
app = Flask(__name__, static_folder="../frontend/dist", static_url_path="")
CORS(app, resources={r"/api/*": {"origins": "*"}})
MONERO_GET_INFO_URL = os.getenv("MONERO_GET_INFO_URL", "http://monerod.crypto.svc.cluster.local:18081/get_info")
VM_BASE_URL = os.getenv(
"VM_BASE_URL",
"http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428",
).rstrip("/")
VM_QUERY_TIMEOUT_SEC = float(os.getenv("VM_QUERY_TIMEOUT_SEC", "2"))
HTTP_CHECK_TIMEOUT_SEC = float(os.getenv("HTTP_CHECK_TIMEOUT_SEC", "2"))
LAB_STATUS_CACHE_SEC = float(os.getenv("LAB_STATUS_CACHE_SEC", "30"))
GRAFANA_HEALTH_URL = os.getenv("GRAFANA_HEALTH_URL", "https://metrics.bstein.dev/api/health")
OCEANUS_NODE_EXPORTER_URL = os.getenv("OCEANUS_NODE_EXPORTER_URL", "http://192.168.22.24:9100/metrics")
2025-12-20 14:25:55 -03:00
AI_CHAT_API = os.getenv("AI_CHAT_API", "http://ollama.ai.svc.cluster.local:11434").rstrip("/")
AI_CHAT_MODEL = os.getenv("AI_CHAT_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
2025-12-20 14:25:55 -03:00
AI_CHAT_SYSTEM_PROMPT = os.getenv(
"AI_CHAT_SYSTEM_PROMPT",
"You are the Titan Lab assistant for bstein.dev. Be concise and helpful.",
)
AI_CHAT_TIMEOUT_SEC = float(os.getenv("AI_CHAT_TIMEOUT_SEC", "20"))
AI_NODE_NAME = os.getenv("AI_CHAT_NODE_NAME") or os.getenv("AI_NODE_NAME") or "ai-cluster"
AI_GPU_DESC = os.getenv("AI_CHAT_GPU_DESC") or "local GPU (dynamic)"
AI_PUBLIC_ENDPOINT = os.getenv("AI_PUBLIC_CHAT_ENDPOINT", "https://chat.ai.bstein.dev/api/ai/chat")
AI_K8S_LABEL = os.getenv("AI_K8S_LABEL", "app=ollama")
AI_K8S_NAMESPACE = os.getenv("AI_K8S_NAMESPACE", "ai")
AI_MODEL_ANNOTATION = os.getenv("AI_MODEL_ANNOTATION", "ai.bstein.dev/model")
AI_GPU_ANNOTATION = os.getenv("AI_GPU_ANNOTATION", "ai.bstein.dev/gpu")
AI_WARM_INTERVAL_SEC = float(os.getenv("AI_WARM_INTERVAL_SEC", "300"))
AI_WARM_ENABLED = os.getenv("AI_WARM_ENABLED", "true").lower() in ("1", "true", "yes")
2025-12-18 01:13:04 -03:00
_LAB_STATUS_CACHE: dict[str, Any] = {"ts": 0.0, "value": None}
@app.route("/api/healthz")
def healthz() -> Any:
return jsonify({"ok": True})
@app.route("/api/monero/get_info")
def monero_get_info() -> Any:
try:
with urlopen(MONERO_GET_INFO_URL, timeout=2) as resp:
payload = json.loads(resp.read().decode("utf-8"))
return jsonify(payload)
except (URLError, TimeoutError, ValueError) as exc:
return jsonify({"error": str(exc), "url": MONERO_GET_INFO_URL}), 503
def _vm_query(expr: str) -> float | None:
url = f"{VM_BASE_URL}/api/v1/query?{urlencode({'query': expr})}"
with urlopen(url, timeout=VM_QUERY_TIMEOUT_SEC) as resp:
payload = json.loads(resp.read().decode("utf-8"))
if payload.get("status") != "success":
return None
result = (payload.get("data") or {}).get("result") or []
if not result:
return None
values: list[float] = []
for item in result:
try:
values.append(float(item["value"][1]))
except (KeyError, IndexError, TypeError, ValueError):
continue
if not values:
return None
return max(values)
def _http_ok(url: str, expect_substring: str | None = None) -> bool:
try:
with urlopen(url, timeout=HTTP_CHECK_TIMEOUT_SEC) as resp:
if getattr(resp, "status", 200) != 200:
return False
if expect_substring:
chunk = resp.read(4096).decode("utf-8", errors="ignore")
return expect_substring in chunk
return True
except (URLError, TimeoutError, ValueError):
return False
@app.route("/api/lab/status")
def lab_status() -> Any:
now = time.time()
cached = _LAB_STATUS_CACHE.get("value")
if cached and (now - float(_LAB_STATUS_CACHE.get("ts", 0.0)) < LAB_STATUS_CACHE_SEC):
return jsonify(cached)
connected = False
atlas_up = False
atlas_known = False
atlas_source = "unknown"
oceanus_up = False
oceanus_known = False
oceanus_source = "unknown"
try:
atlas_value = _vm_query('max(up{job="kubernetes-apiservers"})')
oceanus_value = _vm_query('max(up{instance=~"(titan-23|192.168.22.24)(:9100)?"})')
connected = True
atlas_known = atlas_value is not None
atlas_up = bool(atlas_value and atlas_value > 0.5)
atlas_source = "victoria-metrics"
oceanus_known = oceanus_value is not None
oceanus_up = bool(oceanus_value and oceanus_value > 0.5)
oceanus_source = "victoria-metrics"
except (URLError, TimeoutError, ValueError):
atlas_value = None
oceanus_value = None
if not atlas_known:
if _http_ok(GRAFANA_HEALTH_URL):
connected = True
atlas_known = True
atlas_up = True
atlas_source = "grafana-health"
if not oceanus_up:
if _http_ok(OCEANUS_NODE_EXPORTER_URL, expect_substring="node_exporter_build_info"):
connected = True
oceanus_known = True
oceanus_up = True
oceanus_source = "node-exporter"
payload = {
"connected": connected,
"atlas": {"up": atlas_up, "known": atlas_known, "source": atlas_source},
"oceanus": {"up": oceanus_up, "known": oceanus_known, "source": oceanus_source},
"checked_at": int(now),
}
_LAB_STATUS_CACHE["ts"] = now
_LAB_STATUS_CACHE["value"] = payload
return jsonify(payload)
2025-12-20 14:25:55 -03:00
@app.route("/api/ai/chat", methods=["POST"])
def ai_chat() -> Any:
payload = request.get_json(silent=True) or {}
user_message = (payload.get("message") or "").strip()
history = payload.get("history") or []
if not user_message:
return jsonify({"error": "message required"}), 400
messages: list[dict[str, str]] = []
if AI_CHAT_SYSTEM_PROMPT:
messages.append({"role": "system", "content": AI_CHAT_SYSTEM_PROMPT})
for item in history:
role = item.get("role")
content = (item.get("content") or "").strip()
if role in ("user", "assistant") and content:
messages.append({"role": role, "content": content})
messages.append({"role": "user", "content": user_message})
body = {"model": AI_CHAT_MODEL, "messages": messages, "stream": False}
started = time.time()
try:
with httpx.Client(timeout=AI_CHAT_TIMEOUT_SEC) as client:
resp = client.post(f"{AI_CHAT_API}/api/chat", json=body)
resp.raise_for_status()
data = resp.json()
reply = (data.get("message") or {}).get("content") or ""
elapsed_ms = int((time.time() - started) * 1000)
return jsonify({"reply": reply, "latency_ms": elapsed_ms})
except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as exc:
return jsonify({"error": str(exc)}), 502
@app.route("/api/ai/info", methods=["GET"])
def ai_info() -> Any:
meta = _discover_ai_meta()
return jsonify(meta)
def _discover_ai_meta() -> dict[str, str]:
"""
Best-effort discovery of which node/gpu is hosting the AI service.
Tries the Kubernetes API using the service account if available; falls back to env.
"""
meta = {
"node": AI_NODE_NAME,
"gpu": AI_GPU_DESC,
"model": AI_CHAT_MODEL,
"endpoint": AI_PUBLIC_ENDPOINT or "/api/ai/chat",
}
# Only attempt k8s if we're in-cluster and credentials exist.
sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount")
token_path = sa_path / "token"
ca_path = sa_path / "ca.crt"
ns_path = sa_path / "namespace"
if not token_path.exists() or not ca_path.exists() or not ns_path.exists():
return meta
try:
token = token_path.read_text().strip()
namespace = AI_K8S_NAMESPACE
base_url = "https://kubernetes.default.svc"
pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={AI_K8S_LABEL}"
with httpx.Client(verify=str(ca_path), timeout=HTTP_CHECK_TIMEOUT_SEC, headers={"Authorization": f"Bearer {token}"}) as client:
resp = client.get(pod_url)
resp.raise_for_status()
data = resp.json()
items = data.get("items") or []
running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items
if running:
pod = running[0]
node_name = pod.get("spec", {}).get("nodeName") or meta["node"]
meta["node"] = node_name
annotations = pod.get("metadata", {}).get("annotations") or {}
gpu_hint = (
annotations.get(AI_GPU_ANNOTATION)
or annotations.get("ai.gpu/description")
or annotations.get("gpu/description")
)
if gpu_hint:
meta["gpu"] = gpu_hint
model_hint = annotations.get(AI_MODEL_ANNOTATION)
if not model_hint:
# Try to infer from container image tag.
containers = pod.get("spec", {}).get("containers") or []
if containers:
image = containers[0].get("image") or ""
model_hint = image.split(":")[-1] if ":" in image else image
if model_hint:
meta["model"] = model_hint
except Exception:
# swallow errors; keep fallbacks
pass
return meta
def _keep_warm() -> None:
"""Periodically ping the model to keep it warm."""
if not AI_WARM_ENABLED or AI_WARM_INTERVAL_SEC <= 0:
return
def loop() -> None:
while True:
time.sleep(AI_WARM_INTERVAL_SEC)
try:
body = {
"model": AI_CHAT_MODEL,
"messages": [{"role": "user", "content": "ping"}],
"stream": False,
}
with httpx.Client(timeout=min(AI_CHAT_TIMEOUT_SEC, 15)) as client:
client.post(f"{AI_CHAT_API}/api/chat", json=body)
except Exception:
# best-effort; ignore failures
continue
import threading
threading.Thread(target=loop, daemon=True, name="ai-keep-warm").start()
# Start keep-warm loop on import.
_keep_warm()
2025-12-18 01:13:04 -03:00
@app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def serve_frontend(path: str) -> Any:
dist_path = Path(app.static_folder)
index_path = dist_path / "index.html"
if dist_path.exists() and index_path.exists():
target = dist_path / path
if path and target.exists():
return send_from_directory(app.static_folder, path)
return send_from_directory(app.static_folder, "index.html")
return jsonify(
{
"message": "Frontend not built yet. Run `npm install && npm run build` inside frontend/, then restart Flask.",
"available_endpoints": ["/api/healthz", "/api/monero/get_info"],
}
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True)