diff --git a/services/ai-llm/deployment.yaml b/services/ai-llm/deployment.yaml index 43d14c8..bf012c0 100644 --- a/services/ai-llm/deployment.yaml +++ b/services/ai-llm/deployment.yaml @@ -20,7 +20,7 @@ spec: labels: app: ollama annotations: - ai.bstein.dev/model: qwen2.5:7b-instruct-q4_0 + ai.bstein.dev/model: qwen2.5:14b-instruct-q4_0 ai.bstein.dev/gpu: GPU pool (titan-22/24) ai.bstein.dev/restartedAt: "2026-01-26T12:00:00Z" spec: @@ -52,7 +52,7 @@ spec: - name: OLLAMA_MODELS value: /root/.ollama - name: OLLAMA_MODEL - value: qwen2.5:7b-instruct-q4_0 + value: qwen2.5:14b-instruct-q4_0 command: - /bin/sh - -c diff --git a/services/comms/atlasbot-deployment.yaml b/services/comms/atlasbot-deployment.yaml index 0ee86f0..f4883c4 100644 --- a/services/comms/atlasbot-deployment.yaml +++ b/services/comms/atlasbot-deployment.yaml @@ -16,7 +16,7 @@ spec: labels: app: atlasbot annotations: - checksum/atlasbot-configmap: manual-atlasbot-29 + checksum/atlasbot-configmap: manual-atlasbot-30 vault.hashicorp.com/agent-inject: "true" vault.hashicorp.com/role: "comms" vault.hashicorp.com/agent-inject-secret-turn-secret: "kv/data/atlas/comms/turn-shared-secret" @@ -82,7 +82,7 @@ spec: - name: OLLAMA_URL value: http://chat-ai-gateway.bstein-dev-home.svc.cluster.local/ - name: OLLAMA_MODEL - value: qwen2.5:7b-instruct-q4_0 + value: qwen2.5:14b-instruct-q4_0 - name: OLLAMA_TIMEOUT_SEC value: "600" - name: ATLASBOT_THINKING_INTERVAL_SEC diff --git a/services/comms/scripts/atlasbot/bot.py b/services/comms/scripts/atlasbot/bot.py index 9f6c38d..a91744d 100644 --- a/services/comms/scripts/atlasbot/bot.py +++ b/services/comms/scripts/atlasbot/bot.py @@ -33,7 +33,10 @@ SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500")) MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500")) +MAX_FACTS_CHARS = int(os.environ.get("ATLASBOT_MAX_FACTS_CHARS", "8000")) THINKING_INTERVAL_SEC = int(os.environ.get("ATLASBOT_THINKING_INTERVAL_SEC", "120")) +OLLAMA_RETRIES = int(os.environ.get("ATLASBOT_OLLAMA_RETRIES", "2")) +OLLAMA_SERIALIZE = os.environ.get("ATLASBOT_OLLAMA_SERIALIZE", "true").lower() != "false" TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE) HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)") @@ -113,6 +116,8 @@ METRIC_HINTS = { "connections": ("connections", "conn", "postgres", "database", "db"), } +_OLLAMA_LOCK = threading.Lock() + HARDWARE_HINTS = { "amd64": ("amd64", "x86", "x86_64", "x86-64"), "jetson": ("jetson",), @@ -638,6 +643,105 @@ def node_inventory_for_prompt(prompt: str) -> list[dict[str, Any]]: return node_inventory() return [] +def _nodes_by_arch(inventory: list[dict[str, Any]]) -> dict[str, list[str]]: + grouped: dict[str, list[str]] = collections.defaultdict(list) + for node in inventory: + grouped[(node.get("arch") or "unknown")].append(node["name"]) + return {k: sorted(v) for k, v in grouped.items()} + +def _node_usage_table(metrics: dict[str, Any]) -> list[dict[str, Any]]: + usage = metrics.get("node_usage") if isinstance(metrics.get("node_usage"), dict) else {} + per_node: dict[str, dict[str, Any]] = {} + for metric_name, entries in usage.items() if isinstance(usage, dict) else []: + if not isinstance(entries, list): + continue + for entry in entries: + if not isinstance(entry, dict): + continue + node = entry.get("node") + if not isinstance(node, str) or not node: + continue + per_node.setdefault(node, {})[metric_name] = entry.get("value") + return [{"node": node, **vals} for node, vals in sorted(per_node.items())] + +def _workloads_for_facts(workloads: list[dict[str, Any]], limit: int = 25) -> list[dict[str, Any]]: + cleaned: list[dict[str, Any]] = [] + for entry in workloads: + if not isinstance(entry, dict): + continue + cleaned.append( + { + "namespace": entry.get("namespace"), + "workload": entry.get("workload"), + "pods_total": entry.get("pods_total"), + "pods_running": entry.get("pods_running"), + "primary_node": entry.get("primary_node"), + "nodes": entry.get("nodes"), + } + ) + cleaned.sort( + key=lambda item: ( + -(item.get("pods_total") or 0), + str(item.get("namespace") or ""), + str(item.get("workload") or ""), + ) + ) + return cleaned[:limit] + +def facts_context( + prompt: str, + *, + inventory: list[dict[str, Any]] | None, + snapshot: dict[str, Any] | None, + workloads: list[dict[str, Any]] | None, +) -> str: + inv = inventory or [] + metrics = _snapshot_metrics(snapshot) + nodes = snapshot.get("nodes") if isinstance(snapshot, dict) else {} + summary = snapshot.get("nodes_summary") if isinstance(snapshot, dict) else {} + expected_workers = expected_worker_nodes_from_metrics() + ready_workers, not_ready_workers = worker_nodes_status(inv) if inv else ([], []) + + facts: dict[str, Any] = { + "generated_at": snapshot.get("generated_at") if isinstance(snapshot, dict) else None, + "nodes": { + "total": summary.get("total") if isinstance(summary, dict) and summary.get("total") is not None else nodes.get("total"), + "ready": summary.get("ready") if isinstance(summary, dict) and summary.get("ready") is not None else nodes.get("ready"), + "not_ready": summary.get("not_ready") if isinstance(summary, dict) and summary.get("not_ready") is not None else nodes.get("not_ready"), + "not_ready_names": summary.get("not_ready_names") if isinstance(summary, dict) else nodes.get("not_ready_names"), + "by_hardware": _group_nodes(inv) if inv else {}, + "by_arch": _nodes_by_arch(inv) if inv else {}, + "workers_ready": ready_workers, + "workers_not_ready": not_ready_workers, + "expected_workers": expected_workers, + }, + "metrics": { + "hottest_nodes": metrics.get("hottest_nodes") if isinstance(metrics, dict) else {}, + "postgres_connections": metrics.get("postgres_connections") if isinstance(metrics, dict) else {}, + "node_usage": _node_usage_table(metrics) if isinstance(metrics, dict) else [], + }, + "workloads": _workloads_for_facts(workloads or []), + } + + rendered = json.dumps(facts, ensure_ascii=False) + if len(rendered) <= MAX_FACTS_CHARS: + return "Facts (live snapshot):\n" + rendered + + trimmed = dict(facts) + trimmed.pop("workloads", None) + rendered = json.dumps(trimmed, ensure_ascii=False) + if len(rendered) <= MAX_FACTS_CHARS: + return "Facts (live snapshot):\n" + rendered + + trimmed_metrics = dict(trimmed.get("metrics") or {}) + trimmed_metrics.pop("node_usage", None) + trimmed["metrics"] = trimmed_metrics + rendered = json.dumps(trimmed, ensure_ascii=False) + if len(rendered) <= MAX_FACTS_CHARS: + return "Facts (live snapshot):\n" + rendered + + return "Facts (live snapshot):\n" + rendered[:MAX_FACTS_CHARS] + def _inventory_sets(inventory: list[dict[str, Any]]) -> dict[str, Any]: names = [node["name"] for node in inventory] ready = [node["name"] for node in inventory if node.get("ready") is True] @@ -1463,26 +1567,19 @@ class _AtlasbotHandler(BaseHTTPRequestHandler): snapshot = _snapshot_state() inventory = _snapshot_inventory(snapshot) or node_inventory_live() workloads = _snapshot_workloads(snapshot) - answer = structured_answer( + context = build_context( prompt, + allow_tools=False, + targets=[], inventory=inventory, - metrics_summary="", snapshot=snapshot, workloads=workloads, ) - if not answer and _knowledge_intent(prompt): - answer = knowledge_summary(prompt, inventory) - if not answer: - kb = kb_retrieve_titles(prompt, limit=4) - context = build_context( - prompt, - allow_tools=False, - targets=[], - inventory=inventory, - snapshot=snapshot, - ) - fallback = kb or "I don't have enough data to answer that." - answer = ollama_reply(("http", "internal"), prompt, context=context, fallback=fallback) + metrics_context, metrics_fallback = metrics_query_context(prompt, allow_tools=True) + if metrics_context: + context = (context + "\n\n" + metrics_context).strip() if context else metrics_context + fallback = metrics_fallback or _context_fallback(context) or "I don't have enough data to answer that." + answer = ollama_reply(("http", "internal"), prompt, context=context, fallback=fallback) self._write_json(200, {"answer": answer}) @@ -1505,10 +1602,13 @@ def build_context( targets: list[tuple[str, str]], inventory: list[dict[str, Any]] | None = None, snapshot: dict[str, Any] | None = None, + workloads: list[dict[str, Any]] | None = None, ) -> str: parts: list[str] = [] kb = kb_retrieve(prompt) + if not kb and _knowledge_intent(prompt): + kb = kb_retrieve_titles(prompt, limit=4) if kb: parts.append(kb) @@ -1516,13 +1616,9 @@ def build_context( if endpoints: parts.append(endpoints) - node_ctx = node_inventory_context(prompt, inventory) - if node_ctx: - parts.append(node_ctx) - - snapshot_ctx = snapshot_context(prompt, snapshot) - if snapshot_ctx: - parts.append(snapshot_ctx) + facts = facts_context(prompt, inventory=inventory, snapshot=snapshot, workloads=workloads) + if facts: + parts.append(facts) if allow_tools: # Scope pod summaries to relevant namespaces/workloads when possible. @@ -1627,7 +1723,9 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str: system = ( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. " "Be helpful, direct, and concise. " - "Prefer answering with exact repo paths and Kubernetes resource names. " + "Use the provided context and facts as your source of truth. " + "If you infer or synthesize, say 'Based on the snapshot' and keep it brief. " + "Prefer exact repo paths and Kubernetes resource names when relevant. " "Never include or request secret values. " "Do not suggest commands unless explicitly asked. " "Respond in plain sentences; do not return JSON or code fences unless explicitly asked. " @@ -1646,21 +1744,32 @@ def _ollama_call(hist_key, prompt: str, *, context: str) -> str: if API_KEY: headers["x-api-key"] = API_KEY r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) - with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp: - data = json.loads(resp.read().decode()) - raw_reply = data.get("message") or data.get("response") or data.get("reply") or data - reply = _normalize_reply(raw_reply) or "I'm here to help." - history[hist_key].append(f"Atlas: {reply}") - return reply + lock = _OLLAMA_LOCK if OLLAMA_SERIALIZE else None + if lock: + lock.acquire() + try: + with request.urlopen(r, timeout=OLLAMA_TIMEOUT_SEC) as resp: + data = json.loads(resp.read().decode()) + raw_reply = data.get("message") or data.get("response") or data.get("reply") or data + reply = _normalize_reply(raw_reply) or "I'm here to help." + history[hist_key].append(f"Atlas: {reply}") + return reply + finally: + if lock: + lock.release() def ollama_reply(hist_key, prompt: str, *, context: str, fallback: str = "") -> str: - try: - return _ollama_call(hist_key, prompt, context=context) - except Exception: - if fallback: - history[hist_key].append(f"Atlas: {fallback}") - return fallback - return "Model backend is busy. Try again in a moment." + last_error = None + for attempt in range(max(1, OLLAMA_RETRIES + 1)): + try: + return _ollama_call(hist_key, prompt, context=context) + except Exception as exc: # noqa: BLE001 + last_error = exc + time.sleep(min(4, 2 ** attempt)) + if fallback: + history[hist_key].append(f"Atlas: {fallback}") + return fallback + return "I don't have enough data to answer that." def ollama_reply_with_thinking(token: str, room: str, hist_key, prompt: str, *, context: str, fallback: str) -> str: result: dict[str, str] = {"reply": ""} @@ -1774,6 +1883,7 @@ def sync_loop(token: str, room_id: str): targets=targets, inventory=inventory, snapshot=snapshot, + workloads=workloads, ) if allow_tools and promql: res = vm_query(promql, timeout=20) @@ -1784,26 +1894,7 @@ def sync_loop(token: str, room_id: str): if metrics_context: context = (context + "\n\n" + metrics_context).strip() if context else metrics_context - fallback = metrics_fallback or "" - if not fallback and context: - fallback = _context_fallback(context) - - structured = structured_answer( - body, - inventory=inventory, - metrics_summary=metrics_fallback or "", - snapshot=snapshot, - workloads=workloads, - ) - if structured: - send_msg(token, rid, structured) - continue - - if _knowledge_intent(body): - summary = knowledge_summary(body, inventory) - if summary: - send_msg(token, rid, summary) - continue + fallback = metrics_fallback or _context_fallback(context) or "I don't have enough data to answer that." reply = ollama_reply_with_thinking( token,