277 lines
9.0 KiB
Python
277 lines
9.0 KiB
Python
from __future__ import annotations
|
|
|
|
import difflib
|
|
import re
|
|
import time
|
|
from typing import Any
|
|
|
|
from ._base import *
|
|
|
|
|
|
def _reply_matches_metric_facts(reply: str, metric_facts: list[str], tokens: list[str] | set[str] | None = None) -> bool:
|
|
if not reply or not metric_facts:
|
|
return True
|
|
reply_numbers = set(re.findall(r"\d+(?:\\.\d+)?", reply))
|
|
if not reply_numbers:
|
|
return False
|
|
fact_numbers: set[str] = set()
|
|
value_pattern = re.compile(r"(?:>=|<=|=|:)\s*(\d+(?:\.\d+)?)")
|
|
filtered = metric_facts
|
|
if tokens:
|
|
token_set = {str(tok).lower() for tok in tokens if tok}
|
|
focused = []
|
|
for line in metric_facts:
|
|
key = line.split(":", 1)[0].lower()
|
|
if any(tok in key for tok in token_set):
|
|
focused.append(line)
|
|
if focused:
|
|
filtered = focused
|
|
for line in filtered:
|
|
for match in value_pattern.findall(line):
|
|
fact_numbers.add(match)
|
|
if not fact_numbers:
|
|
return False
|
|
return bool(reply_numbers & fact_numbers)
|
|
|
|
|
|
def _needs_dedup(reply: str) -> bool:
|
|
if not reply:
|
|
return False
|
|
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", reply) if s.strip()]
|
|
if len(sentences) < DEDUP_MIN_SENTENCES:
|
|
return False
|
|
seen = set()
|
|
for sent in sentences:
|
|
norm = re.sub(r"\s+", " ", sent.lower())
|
|
if norm in seen:
|
|
return True
|
|
seen.add(norm)
|
|
return False
|
|
|
|
|
|
def _needs_focus_fix(question: str, reply: str, classify: dict[str, Any]) -> bool:
|
|
if not reply:
|
|
return False
|
|
q_lower = (question or "").lower()
|
|
if classify.get("question_type") not in {"metric", "diagnostic"} and not re.search(r"\b(how many|list|count)\b", q_lower):
|
|
return False
|
|
missing_markers = (
|
|
"does not provide",
|
|
"does not specify",
|
|
"not available",
|
|
"not provided",
|
|
"cannot determine",
|
|
"don't have",
|
|
"do not have",
|
|
"insufficient",
|
|
"no data",
|
|
)
|
|
if any(marker in reply.lower() for marker in missing_markers):
|
|
return True
|
|
if reply.count(".") <= 1:
|
|
return False
|
|
extra_markers = ("for more", "if you need", "additional", "based on")
|
|
return any(marker in reply.lower() for marker in extra_markers)
|
|
|
|
|
|
def _extract_keywords(raw_question: str, normalized: str, sub_questions: list[str], keywords: list[Any] | None) -> list[str]:
|
|
stopwords = {
|
|
"the",
|
|
"and",
|
|
"for",
|
|
"with",
|
|
"that",
|
|
"this",
|
|
"what",
|
|
"which",
|
|
"when",
|
|
"where",
|
|
"who",
|
|
"why",
|
|
"how",
|
|
"tell",
|
|
"show",
|
|
"list",
|
|
"give",
|
|
"about",
|
|
"right",
|
|
"now",
|
|
}
|
|
tokens: list[str] = []
|
|
for source in [raw_question, normalized, *sub_questions]:
|
|
for part in re.split(r"[^a-zA-Z0-9_-]+", source.lower()):
|
|
if len(part) < TOKEN_MIN_LEN or part in stopwords:
|
|
continue
|
|
tokens.append(part)
|
|
if keywords:
|
|
for kw in keywords:
|
|
if isinstance(kw, str):
|
|
part = kw.strip().lower()
|
|
if part and part not in stopwords and part not in tokens:
|
|
tokens.append(part)
|
|
return list(dict.fromkeys(tokens))[:12]
|
|
|
|
|
|
def _allowed_nodes(summary: dict[str, Any]) -> list[str]:
|
|
hardware = summary.get("hardware_by_node") if isinstance(summary.get("hardware_by_node"), dict) else {}
|
|
if hardware:
|
|
return sorted([node for node in hardware if isinstance(node, str)])
|
|
return []
|
|
|
|
|
|
def _allowed_namespaces(summary: dict[str, Any]) -> list[str]:
|
|
namespaces: list[str] = []
|
|
for entry in summary.get("namespace_pods") or []:
|
|
if isinstance(entry, dict):
|
|
name = entry.get("namespace")
|
|
if name:
|
|
namespaces.append(str(name))
|
|
return sorted(set(namespaces))
|
|
|
|
|
|
def _find_unknown_nodes(reply: str, allowed: list[str]) -> list[str]:
|
|
if not reply or not allowed:
|
|
return []
|
|
pattern = re.compile(r"\b(titan-[0-9a-z]+|node-?\d+)\b", re.IGNORECASE)
|
|
found = {m.group(1) for m in pattern.finditer(reply)}
|
|
if not found:
|
|
return []
|
|
allowed_set = {a.lower() for a in allowed}
|
|
return sorted({item for item in found if item.lower() not in allowed_set})
|
|
|
|
|
|
def _find_unknown_namespaces(reply: str, allowed: list[str]) -> list[str]:
|
|
if not reply or not allowed:
|
|
return []
|
|
pattern = re.compile(r"\bnamespace\s+([a-z0-9-]+)\b", re.IGNORECASE)
|
|
found = {m.group(1) for m in pattern.finditer(reply)}
|
|
if not found:
|
|
return []
|
|
allowed_set = {a.lower() for a in allowed}
|
|
return sorted({item for item in found if item.lower() not in allowed_set})
|
|
|
|
|
|
def _needs_runbook_fix(reply: str, allowed: list[str]) -> bool:
|
|
if not reply or not allowed:
|
|
return False
|
|
paths = set(re.findall(r"runbooks/[A-Za-z0-9._-]+", reply))
|
|
if not paths:
|
|
return False
|
|
allowed_set = {p.lower() for p in allowed}
|
|
return any(path.lower() not in allowed_set for path in paths)
|
|
|
|
|
|
def _needs_runbook_reference(question: str, allowed: list[str], reply: str) -> bool:
|
|
if not allowed or not question:
|
|
return False
|
|
lowered = question.lower()
|
|
cues = ("runbook", "checklist", "documented", "documentation", "where", "guide")
|
|
if not any(cue in lowered for cue in cues):
|
|
return False
|
|
if not reply:
|
|
return True
|
|
for token in re.findall(r"runbooks/[A-Za-z0-9._-]+", reply):
|
|
if token.lower() in {p.lower() for p in allowed}:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _best_runbook_match(candidate: str, allowed: list[str]) -> str | None:
|
|
if not candidate or not allowed:
|
|
return None
|
|
best = None
|
|
best_score = 0.0
|
|
for path in allowed:
|
|
score = difflib.SequenceMatcher(a=candidate.lower(), b=path.lower()).ratio()
|
|
if score > best_score:
|
|
best_score = score
|
|
best = path
|
|
return best if best_score >= RUNBOOK_SIMILARITY_THRESHOLD else None
|
|
|
|
|
|
def _resolve_path(data: Any, path: str) -> Any | None:
|
|
if path.startswith("line:"):
|
|
return path.split("line:", 1)[1].strip()
|
|
cursor = data
|
|
for part in re.split(r"\.(?![^\[]*\])", path):
|
|
if not part:
|
|
continue
|
|
match = re.match(r"^(\w+)(?:\[(\d+)\])?$", part)
|
|
if not match:
|
|
return None
|
|
key = match.group(1)
|
|
index = match.group(2)
|
|
if isinstance(cursor, dict):
|
|
cursor = cursor.get(key)
|
|
else:
|
|
return None
|
|
if index is not None:
|
|
idx = int(index)
|
|
if isinstance(cursor, list) and 0 <= idx < len(cursor):
|
|
cursor = cursor[idx]
|
|
else:
|
|
return None
|
|
return cursor
|
|
|
|
|
|
def _snapshot_id(summary: dict[str, Any]) -> str | None:
|
|
if not summary:
|
|
return None
|
|
for key in ("generated_at", "snapshot_ts", "snapshot_id"):
|
|
value = summary.get(key)
|
|
if isinstance(value, str) and value:
|
|
return value
|
|
return None
|
|
|
|
|
|
def _claims_to_payload(claims: list[ClaimItem]) -> list[dict[str, Any]]:
|
|
output: list[dict[str, Any]] = []
|
|
for claim in claims:
|
|
evidence = []
|
|
for ev in claim.evidence:
|
|
evidence.append(
|
|
{
|
|
"path": ev.path,
|
|
"reason": ev.reason,
|
|
"value_at_claim": ev.value_at_claim,
|
|
}
|
|
)
|
|
output.append({"id": claim.id, "claim": claim.claim, "evidence": evidence})
|
|
return output
|
|
|
|
|
|
def _state_from_payload(payload: dict[str, Any] | None) -> ConversationState | None:
|
|
if not payload:
|
|
return None
|
|
claims_raw = payload.get("claims") if isinstance(payload, dict) else None
|
|
claims: list[ClaimItem] = []
|
|
if isinstance(claims_raw, list):
|
|
for entry in claims_raw:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
claim_text = str(entry.get("claim") or "").strip()
|
|
claim_id = str(entry.get("id") or "").strip()
|
|
if not claim_text or not claim_id:
|
|
continue
|
|
evidence_items: list[EvidenceItem] = []
|
|
for ev in entry.get("evidence") or []:
|
|
if not isinstance(ev, dict):
|
|
continue
|
|
path = str(ev.get("path") or "").strip()
|
|
if not path:
|
|
continue
|
|
reason = str(ev.get("reason") or "").strip()
|
|
value_at_claim = ev.get("value_at_claim")
|
|
evidence_items.append(EvidenceItem(path=path, reason=reason, value_at_claim=value_at_claim))
|
|
if evidence_items:
|
|
claims.append(ClaimItem(id=claim_id, claim=claim_text, evidence=evidence_items))
|
|
return ConversationState(
|
|
updated_at=float(payload.get("updated_at") or time.monotonic()),
|
|
claims=claims,
|
|
snapshot_id=payload.get("snapshot_id"),
|
|
snapshot=payload.get("snapshot"),
|
|
)
|
|
|
|
|
|
__all__ = [name for name in globals() if name.startswith("_") and not name.startswith("__")]
|