140 lines
4.9 KiB
Python

from __future__ import annotations
import json
import threading
import time
from pathlib import Path
from typing import Any
from flask import jsonify, request
import httpx
from .. import settings
def register(app) -> None:
@app.route("/api/chat", methods=["POST"])
@app.route("/api/ai/chat", methods=["POST"])
def ai_chat() -> Any:
payload = request.get_json(silent=True) or {}
user_message = (payload.get("message") or "").strip()
history = payload.get("history") or []
if not user_message:
return jsonify({"error": "message required"}), 400
messages: list[dict[str, str]] = []
if settings.AI_CHAT_SYSTEM_PROMPT:
messages.append({"role": "system", "content": settings.AI_CHAT_SYSTEM_PROMPT})
for item in history:
role = item.get("role")
content = (item.get("content") or "").strip()
if role in ("user", "assistant") and content:
messages.append({"role": role, "content": content})
messages.append({"role": "user", "content": user_message})
body = {"model": settings.AI_CHAT_MODEL, "messages": messages, "stream": False}
started = time.time()
try:
with httpx.Client(timeout=settings.AI_CHAT_TIMEOUT_SEC) as client:
resp = client.post(f"{settings.AI_CHAT_API}/api/chat", json=body)
resp.raise_for_status()
data = resp.json()
reply = (data.get("message") or {}).get("content") or ""
elapsed_ms = int((time.time() - started) * 1000)
return jsonify({"reply": reply, "latency_ms": elapsed_ms})
except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as exc:
return jsonify({"error": str(exc)}), 502
@app.route("/api/chat/info", methods=["GET"])
@app.route("/api/ai/info", methods=["GET"])
def ai_info() -> Any:
meta = _discover_ai_meta()
return jsonify(meta)
_start_keep_warm()
def _discover_ai_meta() -> dict[str, str]:
meta = {
"node": settings.AI_NODE_NAME,
"gpu": settings.AI_GPU_DESC,
"model": settings.AI_CHAT_MODEL,
"endpoint": settings.AI_PUBLIC_ENDPOINT or "/api/chat",
}
sa_path = Path("/var/run/secrets/kubernetes.io/serviceaccount")
token_path = sa_path / "token"
ca_path = sa_path / "ca.crt"
ns_path = sa_path / "namespace"
if not token_path.exists() or not ca_path.exists() or not ns_path.exists():
return meta
try:
token = token_path.read_text().strip()
namespace = settings.AI_K8S_NAMESPACE
base_url = "https://kubernetes.default.svc"
pod_url = f"{base_url}/api/v1/namespaces/{namespace}/pods?labelSelector={settings.AI_K8S_LABEL}"
with httpx.Client(
verify=str(ca_path),
timeout=settings.HTTP_CHECK_TIMEOUT_SEC,
headers={"Authorization": f"Bearer {token}"},
) as client:
resp = client.get(pod_url)
resp.raise_for_status()
data = resp.json()
items = data.get("items") or []
running = [p for p in items if p.get("status", {}).get("phase") == "Running"] or items
if running:
pod = running[0]
node_name = pod.get("spec", {}).get("nodeName") or meta["node"]
meta["node"] = node_name
annotations = pod.get("metadata", {}).get("annotations") or {}
gpu_hint = (
annotations.get(settings.AI_GPU_ANNOTATION)
or annotations.get("ai.gpu/description")
or annotations.get("gpu/description")
)
if gpu_hint:
meta["gpu"] = gpu_hint
model_hint = annotations.get(settings.AI_MODEL_ANNOTATION)
if not model_hint:
containers = pod.get("spec", {}).get("containers") or []
if containers:
image = containers[0].get("image") or ""
model_hint = image.split(":")[-1] if ":" in image else image
if model_hint:
meta["model"] = model_hint
except Exception:
pass
return meta
def _start_keep_warm() -> None:
if not settings.AI_WARM_ENABLED or settings.AI_WARM_INTERVAL_SEC <= 0:
return
def loop() -> None:
while True:
time.sleep(settings.AI_WARM_INTERVAL_SEC)
try:
body = {
"model": settings.AI_CHAT_MODEL,
"messages": [{"role": "user", "content": "ping"}],
"stream": False,
}
with httpx.Client(timeout=min(settings.AI_CHAT_TIMEOUT_SEC, 15)) as client:
client.post(f"{settings.AI_CHAT_API}/api/chat", json=body)
except Exception:
continue
threading.Thread(target=loop, daemon=True, name="ai-keep-warm").start()