atlasbot: tighten scoring and readiness logic

This commit is contained in:
Brad Stein 2026-01-27 22:55:00 -03:00
parent 24b0ac78c4
commit b9b25565a2

View File

@ -1297,7 +1297,7 @@ def snapshot_metric_answer(
parts: list[str] = []
if used is not None and max_conn is not None:
free = max_conn - used
if any(word in q for word in ("free", "available", "remaining")):
if any(word in q for word in ("free", "available", "remaining", "remain", "left")):
parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max ({free:.0f} free).")
else:
parts.append(f"Postgres connections: {used:.0f} used / {max_conn:.0f} max.")
@ -1387,13 +1387,23 @@ def structured_answer(
only_workers = "worker" in q or "workers" in q
role_filters = _detect_role_filters(q)
only_ready: bool | None = None
if "not ready" in q or "unready" in q or "down" in q or "missing" in q:
if (
"not ready" in q
or "notready" in q
or "not-ready" in q
or "unready" in q
or "down" in q
or "missing" in q
):
only_ready = False
elif "ready" in q:
only_ready = True
if entity == "node" and only_ready is not None and op != "count":
op = "status"
if entity == "node" and only_ready is not None and op == "count":
if not any(term in q for term in ("how many", "count", "number")):
op = "status"
if not op and entity == "node":
op = "list" if (include_hw or exclude_hw or nodes_in_query) else "count"
@ -2692,22 +2702,67 @@ def _candidate_note(candidate: dict[str, Any]) -> str:
def _ensure_scores(answer: str) -> str:
text = answer.strip()
lines = [line.strip() for line in text.splitlines() if line.strip()]
score_map: dict[str, str] = {}
body_lines: list[str] = []
def _score_key(line: str) -> str:
cleaned = line.strip().lstrip("-•* ").strip()
return cleaned.lower()
has_relevance = any(_score_key(line).startswith("relevance") for line in lines)
has_satisfaction = any(_score_key(line).startswith("satisfaction") for line in lines)
has_confidence = any(_score_key(line).startswith("confidence") for line in lines)
has_risk = any(_score_key(line).startswith("hallucinationrisk") for line in lines)
if not has_confidence:
lines.append("Confidence: medium")
if not has_relevance:
lines.append("Relevance: 70")
if not has_satisfaction:
lines.append("Satisfaction: 70")
if not has_risk:
lines.append("HallucinationRisk: low")
return "\n".join(lines)
def _extract_value(line: str) -> str:
cleaned = line.strip().lstrip("-•* ").strip()
if ":" in cleaned:
return cleaned.split(":", 1)[1].strip()
parts = cleaned.split()
return parts[1] if len(parts) > 1 else ""
def _record_score(key: str, value: str):
if not value:
return
score_map.setdefault(key, value)
for line in lines:
cleaned = line.strip().lstrip("-•* ").strip()
lowered = cleaned.lower()
if lowered.startswith("confidence,") or (
"confidence" in lowered and "relevance" in lowered and "satisfaction" in lowered
):
for key in ("confidence", "relevance", "satisfaction"):
match = re.search(rf"{key}\\s*[:=]?\\s*(\\d{{1,3}}|high|medium|low)", lowered)
if match:
_record_score(key, match.group(1))
risk_match = re.search(r"hallucination\\s*risk\\s*[:=]?\\s*(low|medium|high)", lowered)
if risk_match:
_record_score("hallucinationrisk", risk_match.group(1))
continue
if lowered.startswith("confidence"):
_record_score("confidence", _extract_value(cleaned))
continue
if lowered.startswith("relevance"):
_record_score("relevance", _extract_value(cleaned))
continue
if lowered.startswith("satisfaction"):
_record_score("satisfaction", _extract_value(cleaned))
continue
if lowered.replace(" ", "").startswith("hallucinationrisk") or lowered.startswith(
"hallucination risk"
):
_record_score("hallucinationrisk", _extract_value(cleaned))
continue
body_lines.append(line)
confidence = score_map.get("confidence") or "medium"
relevance = score_map.get("relevance") or "70"
satisfaction = score_map.get("satisfaction") or "70"
risk = score_map.get("hallucinationrisk") or "low"
final_lines = body_lines + [
f"Confidence: {confidence}",
f"Relevance: {relevance}",
f"Satisfaction: {satisfaction}",
f"HallucinationRisk: {risk}",
]
return "\n".join(final_lines)
def _open_ended_plan(
@ -2799,7 +2854,8 @@ def _open_ended_candidate(
f"{focus}. "
"Write 2-4 sentences in plain prose (not a list). "
"If you infer, label it as inference. "
"Return JSON: {\"answer\":\"...\",\"confidence\":\"high|medium|low\","
"List which fact pack IDs you used. "
"Return JSON: {\"answer\":\"...\",\"facts_used\":[\"F1\"],\"confidence\":\"high|medium|low\","
"\"relevance\":0-100,\"satisfaction\":0-100,\"risk\":\"low|medium|high\"}."
)
context = _append_history_context(fact_pack, history_lines)
@ -2809,9 +2865,13 @@ def _open_ended_candidate(
answer = str(result.get("answer") or "").strip()
if not answer:
answer = "I don't have enough data to answer that from the current snapshot."
facts_used = result.get("facts_used")
if not isinstance(facts_used, list):
facts_used = []
candidate = {
"focus": focus,
"answer": answer,
"facts_used": facts_used,
"confidence": result.get("confidence", "medium"),
"relevance": _normalize_score(result.get("relevance"), default=60),
"satisfaction": _normalize_score(result.get("satisfaction"), default=60),
@ -2826,6 +2886,8 @@ def _candidate_score(candidate: dict[str, Any]) -> float:
satisfaction = _normalize_score(candidate.get("satisfaction"), default=60)
confidence = _confidence_score(candidate.get("confidence"))
score = relevance * 0.45 + satisfaction * 0.35 + confidence * 0.2
if not candidate.get("facts_used"):
score -= 5
return score - _risk_penalty(candidate.get("risk"))
@ -2863,6 +2925,9 @@ def _open_ended_synthesize(
"Select the best 1-2 candidates, blend them if helpful, and keep 2-4 sentences. "
"Use only the fact pack as evidence. "
"If you infer, label it as inference. "
"Do not claim nodes are missing or not ready unless the fact pack explicitly lists "
"nodes_not_ready or expected_workers_missing. "
"Keep the tone conversational and answer the user's intent directly. "
"Avoid repeating the last response if possible. "
"End with lines: Confidence, Relevance (0-100), Satisfaction (0-100), "
"HallucinationRisk (low|medium|high).\n"