From 4b7b2af687eb61c6f1d7b5a4afa706715c2420ef Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 4 Feb 2026 14:27:12 -0300 Subject: [PATCH] atlasbot: add metric direct fallback --- atlasbot/engine/answerer.py | 41 +++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/atlasbot/engine/answerer.py b/atlasbot/engine/answerer.py index 751e239..8ee3da4 100644 --- a/atlasbot/engine/answerer.py +++ b/atlasbot/engine/answerer.py @@ -641,6 +641,19 @@ class AnswerEngine: model=plan.model, tag="evidence_fix_enforce", ) + if metric_facts and not _reply_matches_metric_facts(reply, metric_facts, all_tokens): + direct_line = _select_metric_line(summary_lines, normalized, all_tokens) + if direct_line: + direct_prompt = f"Question: {normalized}\nFact: {direct_line}\nAnswer using the fact." + reply = await call_llm( + prompts.ANSWER_SYSTEM, + direct_prompt, + context="", + model=plan.fast_model, + tag="metric_direct", + ) + if not _reply_matches_metric_facts(reply, [direct_line], all_tokens): + reply = _format_direct_metric_line(direct_line) if "raspberry" in lowered_question and "not" in lowered_question: non_rpi = _non_rpi_nodes(summary) @@ -2044,6 +2057,34 @@ def _rank_metric_lines(lines: list[str], tokens: set[str], max_lines: int) -> li return [item[2] for item in ranked[:max_lines]] +def _select_metric_line(lines: list[str], question: str, tokens: list[str] | set[str]) -> str | None: + if not lines or not tokens: + return None + token_set = {str(tok).lower() for tok in tokens if tok} + ranked = _rank_metric_lines(lines, token_set, max_lines=6) + if not ranked: + return None + question_lower = (question or "").lower() + if any(term in question_lower for term in ("how many", "count", "total")): + for line in ranked: + lower = line.lower() + if "total" in lower or "count" in lower: + return line + return ranked[0] + + +def _format_direct_metric_line(line: str) -> str: + if not line: + return "" + if ":" in line: + key, value = line.split(":", 1) + key = key.strip().replace("_", " ") + value = value.strip() + if value: + return f"{key} is {value}." + return line + + def _global_facts(lines: list[str]) -> list[str]: if not lines: return []