345 lines
12 KiB
Python

from __future__ import annotations
import asyncio
import json
import re
from collections.abc import Awaitable
from collections.abc import Callable
from typing import Any
from atlasbot.llm import prompts
from atlasbot.llm.client import parse_json
from ._base import *
from .post_ext import _extract_keywords
def _parse_json_block(text: str, *, fallback: dict[str, Any]) -> dict[str, Any]:
raw = text.strip()
match = re.search(r"\{.*\}", raw, flags=re.S)
if match:
return parse_json(match.group(0), fallback=fallback)
return parse_json(raw, fallback=fallback)
async def _select_metric_chunks(
call_llm: Callable[..., Awaitable[str]],
ctx: dict[str, Any],
chunks: list[dict[str, Any]],
plan: ModePlan,
) -> tuple[list[str], list[str]]:
summary_lines, question, sub_questions, keywords, token_set = _metric_ctx_values(ctx)
if not summary_lines or not chunks:
return [], []
keys = _extract_metric_keys(summary_lines)
if not keys:
return [], []
max_keys = max(4, plan.max_subquestions * 2)
candidate_keys = _filter_metric_keys(keys, token_set)
available_keys = candidate_keys or keys
prompt = prompts.METRIC_KEYS_PROMPT.format(available="\n".join(available_keys), max_keys=max_keys)
raw = await call_llm(
prompts.METRIC_KEYS_SYSTEM,
prompt + "\nQuestion: " + str(question) + "\nSubQuestions:\n" + "\n".join([str(item) for item in sub_questions]),
context="Keywords:\n" + ", ".join([str(item) for item in keywords if item]),
model=plan.fast_model,
tag="metric_keys",
)
selected = _parse_key_list(raw, available_keys, max_keys)
if candidate_keys:
selected = _merge_metric_keys(selected, candidate_keys, max_keys)
if selected and candidate_keys and not _metric_key_overlap(selected, token_set):
selected = candidate_keys[:max_keys]
if not selected and candidate_keys:
selected = candidate_keys[:max_keys]
if available_keys:
missing = await _validate_metric_keys(
call_llm,
{
"question": question,
"sub_questions": sub_questions,
"selected": selected,
},
available_keys,
plan,
)
if missing:
selected = _merge_metric_keys(selected, missing, max_keys)
if not selected:
return [], []
ids = _chunk_ids_for_keys(chunks, selected)
return selected, ids
async def _validate_metric_keys(
call_llm: Callable[..., Awaitable[str]],
ctx: dict[str, Any],
available: list[str],
plan: ModePlan,
) -> list[str]:
if not available:
return []
question = str(ctx.get("question") or "")
sub_questions = ctx.get("sub_questions") if isinstance(ctx.get("sub_questions"), list) else []
selected = ctx.get("selected") if isinstance(ctx.get("selected"), list) else []
cap = max(12, plan.max_subquestions * 4)
available_list = available[:cap]
prompt = prompts.METRIC_KEYS_VALIDATE_PROMPT.format(
question=question,
sub_questions=json.dumps(sub_questions),
selected=json.dumps(selected),
available="\n".join(available_list),
)
raw = await call_llm(
prompts.METRIC_KEYS_VALIDATE_SYSTEM,
prompt,
model=plan.fast_model,
tag="metric_keys_validate",
)
parsed = _parse_json_block(raw, fallback={})
items = parsed.get("missing") if isinstance(parsed, dict) else []
if not isinstance(items, list):
return []
available_set = set(available_list)
out: list[str] = []
for item in items:
if isinstance(item, str) and item in available_set and item not in out:
out.append(item)
return out
async def _gather_limited(coros: list[Awaitable[Any]], limit: int) -> list[Any]:
if not coros:
return []
semaphore = asyncio.Semaphore(max(1, limit))
async def runner(coro: Awaitable[Any]) -> Any:
async with semaphore:
return await coro
return await asyncio.gather(*(runner(coro) for coro in coros))
def _metric_ctx_values(ctx: dict[str, Any]) -> tuple[list[str], str, list[str], list[str], set[str]]:
summary_lines = ctx.get("summary_lines") if isinstance(ctx, dict) else None
if not isinstance(summary_lines, list):
return [], "", [], [], set()
question = ctx.get("question") if isinstance(ctx, dict) else ""
sub_questions = ctx.get("sub_questions") if isinstance(ctx.get("sub_questions"), list) else []
keywords = ctx.get("keywords") if isinstance(ctx.get("keywords"), list) else []
keyword_tokens = ctx.get("keyword_tokens") if isinstance(ctx.get("keyword_tokens"), list) else []
token_set = {str(token).lower() for token in keyword_tokens if token}
token_set |= {token.lower() for token in _extract_keywords(str(question), str(question), sub_questions=sub_questions, keywords=keywords)}
token_set = _token_variants(token_set)
return summary_lines, str(question), sub_questions, keywords, token_set
def _extract_metric_keys(lines: list[str]) -> list[str]:
keys: list[str] = []
for line in lines:
if ":" not in line:
continue
key = line.split(":", 1)[0].strip()
if not key or " " in key:
continue
if key not in keys:
keys.append(key)
return keys
def _token_variants(tokens: set[str]) -> set[str]:
if not tokens:
return set()
variants = set(tokens)
for token in list(tokens):
if len(token) <= TOKEN_MIN_LEN:
continue
if token.endswith("ies") and len(token) > TOKEN_MIN_LEN:
variants.add(token[:-3] + "y")
if token.endswith("es") and len(token) > TOKEN_MIN_LEN:
variants.add(token[:-2])
if token.endswith("s") and len(token) > TOKEN_MIN_LEN:
variants.add(token[:-1])
return variants
def _parse_key_list(raw: str, allowed: list[str], max_keys: int) -> list[str]:
parsed = _parse_json_block(raw, fallback={})
if isinstance(parsed, list):
items = parsed
else:
items = parsed.get("keys") if isinstance(parsed, dict) else []
if not isinstance(items, list):
return []
allowed_set = set(allowed)
out: list[str] = []
for item in items:
if not isinstance(item, str):
continue
if item in allowed_set and item not in out:
out.append(item)
if len(out) >= max_keys:
break
return out
def _chunk_ids_for_keys(chunks: list[dict[str, Any]], keys: list[str]) -> list[str]:
if not keys:
return []
ids: list[str] = []
key_set = {f"{key}:" for key in keys}
for chunk in chunks:
text = str(chunk.get("text") or "")
if not text:
continue
for line in text.splitlines():
for key in key_set:
if line.startswith(key):
cid = chunk.get("id")
if cid and cid not in ids:
ids.append(cid)
break
return ids
def _filter_metric_keys(keys: list[str], tokens: set[str]) -> list[str]:
if not keys or not tokens:
return []
lowered_tokens = {token.lower() for token in tokens if token and len(token) >= TOKEN_MIN_LEN}
ranked: list[tuple[int, str]] = []
for key in keys:
parts = [part for part in re.split(r"[_\W]+", key.lower()) if part]
if not parts:
continue
hits = len(set(parts) & lowered_tokens)
if hits:
ranked.append((hits, key))
ranked.sort(key=lambda item: (-item[0], item[1]))
return [item[1] for item in ranked]
def _metric_key_overlap(keys: list[str], tokens: set[str]) -> bool:
if not keys or not tokens:
return False
lowered_tokens = {token.lower() for token in tokens if token and len(token) >= TOKEN_MIN_LEN}
for key in keys:
parts = [part for part in re.split(r"[_\W]+", key.lower()) if part]
if set(parts) & lowered_tokens:
return True
return False
def _lines_for_metric_keys(lines: list[str], keys: list[str], max_lines: int = 0) -> list[str]:
if not lines or not keys:
return []
prefixes = {f"{key}:" for key in keys}
selected: list[str] = []
for line in lines:
for prefix in prefixes:
if prefix in line:
selected.append(line)
break
if max_lines and len(selected) >= max_lines:
break
return selected
def _merge_metric_keys(current: list[str], candidates: list[str], max_keys: int) -> list[str]:
merged: list[str] = []
seen = set()
for key in current:
if key and key not in seen:
merged.append(key)
seen.add(key)
for key in candidates:
if key and key not in seen:
merged.append(key)
seen.add(key)
if len(merged) >= max_keys:
break
return merged[:max_keys]
def _merge_fact_lines(primary: list[str], fallback: list[str]) -> list[str]:
seen = set()
merged: list[str] = []
for line in primary + fallback:
if line in seen:
continue
seen.add(line)
merged.append(line)
return merged
def _expand_hottest_line(line: str) -> list[str]:
if not line:
return []
if not line.lower().startswith("hottest:"):
return []
expanded: list[str] = []
payload = line.split("hottest:", 1)[1]
for part in payload.split(";"):
part = part.strip()
if not part or "=" not in part:
continue
metric, rest = part.split("=", 1)
metric = metric.strip()
match = re.search(r"(?P<node>[^\s\[]+).*\((?P<value>[^)]+)\)", rest)
if not match:
continue
node = match.group("node").strip()
value = match.group("value").strip()
class_match = re.search(r"\[(?P<class>[^\]]+)\]", rest)
node_class = class_match.group("class").strip() if class_match else ""
if node_class:
expanded.append(f"hottest_{metric}_node: {node} [{node_class}] ({value})")
else:
expanded.append(f"hottest_{metric}_node: {node} ({value})")
return expanded
def _has_token(text: str, token: str) -> bool:
if not text or not token:
return False
if token == "io":
return "i/o" in text or re.search(r"\bio\b", text) is not None
return re.search(rf"\b{re.escape(token)}\b", text) is not None
def _hotspot_evidence(summary: dict[str, Any]) -> list[str]:
hottest = summary.get("hottest") if isinstance(summary.get("hottest"), dict) else {}
if not hottest:
return []
hardware_by_node = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
node_pods_top = summary.get("node_pods_top") if isinstance(summary.get("node_pods_top"), list) else []
ns_map = {}
for item in node_pods_top:
if not isinstance(item, dict):
continue
node = item.get("node")
namespaces_top = item.get("namespaces_top") if isinstance(item.get("namespaces_top"), list) else []
ns_map[node] = namespaces_top
lines: list[str] = []
for metric, info in hottest.items():
if not isinstance(info, dict):
continue
node = info.get("node")
value = info.get("value")
if not node:
continue
node_class = hardware_by_node.get(node)
ns_parts = []
for entry in ns_map.get(node, [])[:3]:
if isinstance(entry, (list, tuple)) and len(entry) >= NS_ENTRY_MIN_LEN:
ns_parts.append(f"{entry[0]}={entry[1]}")
ns_text = ", ".join(ns_parts)
value_text = f"{value:.2f}" if isinstance(value, (int, float)) else str(value)
line = f"hotspot.{metric}: node={node} class={node_class or 'unknown'} value={value_text}"
if ns_text:
line += f" namespaces_top={ns_text}"
lines.append(line)
return lines
__all__ = [name for name in globals() if name.startswith("_") and not name.startswith("__")]