atlasbot: simplify metric formatting and kb chunking
This commit is contained in:
parent
4aec30c5c9
commit
109ae4e645
@ -2094,44 +2094,65 @@ def _format_direct_metric_line(line: str) -> str:
|
||||
if not line:
|
||||
return ""
|
||||
if ":" in line:
|
||||
key, value = line.split(":", 1)
|
||||
formatted = _format_colon_metric(line)
|
||||
if formatted:
|
||||
return formatted
|
||||
if "=" in line:
|
||||
formatted = _format_equals_metric(line)
|
||||
if formatted:
|
||||
return formatted
|
||||
return line
|
||||
|
||||
|
||||
def _format_colon_metric(line: str) -> str | None:
|
||||
key, value = line.split(":", 1)
|
||||
key = key.strip().replace("_", " ")
|
||||
value = value.strip()
|
||||
if not value:
|
||||
return None
|
||||
if key == "nodes":
|
||||
formatted = _format_nodes_value(value)
|
||||
if formatted:
|
||||
return formatted
|
||||
if key in {"nodes total", "nodes_total"}:
|
||||
return f"Atlas has {value} total nodes."
|
||||
return f"{key} is {value}."
|
||||
|
||||
|
||||
def _format_equals_metric(line: str) -> str | None:
|
||||
pairs: list[str] = []
|
||||
for part in line.split(","):
|
||||
if "=" not in part:
|
||||
continue
|
||||
key, value = part.split("=", 1)
|
||||
key = key.strip().replace("_", " ")
|
||||
value = value.strip()
|
||||
if value:
|
||||
if key == "nodes":
|
||||
parts = [p.strip() for p in value.split(",") if p.strip()]
|
||||
total = None
|
||||
rest: list[str] = []
|
||||
for part in parts:
|
||||
if part.startswith("total="):
|
||||
total = part.split("=", 1)[1]
|
||||
else:
|
||||
rest.append(part.replace("_", " "))
|
||||
if total:
|
||||
if rest:
|
||||
return f"Atlas has {total} total nodes ({'; '.join(rest)})."
|
||||
return f"Atlas has {total} total nodes."
|
||||
if key in {"nodes total", "nodes_total"}:
|
||||
return f"Atlas has {value} total nodes."
|
||||
return f"{key} is {value}."
|
||||
if "=" in line:
|
||||
pairs: list[str] = []
|
||||
for part in line.split(","):
|
||||
if "=" not in part:
|
||||
continue
|
||||
k, v = part.split("=", 1)
|
||||
k = k.strip().replace("_", " ")
|
||||
v = v.strip()
|
||||
if not v:
|
||||
continue
|
||||
if k in {"nodes total", "nodes_total"}:
|
||||
return f"Atlas has {v} total nodes."
|
||||
pairs.append(f"{k} is {v}")
|
||||
if pairs:
|
||||
if len(pairs) == 1:
|
||||
return f"{pairs[0]}."
|
||||
return "; ".join(pairs) + "."
|
||||
return line
|
||||
if not value:
|
||||
continue
|
||||
if key in {"nodes total", "nodes_total"}:
|
||||
return f"Atlas has {value} total nodes."
|
||||
pairs.append(f"{key} is {value}")
|
||||
if not pairs:
|
||||
return None
|
||||
if len(pairs) == 1:
|
||||
return f"{pairs[0]}."
|
||||
return "; ".join(pairs) + "."
|
||||
|
||||
|
||||
def _format_nodes_value(value: str) -> str | None:
|
||||
parts = [p.strip() for p in value.split(",") if p.strip()]
|
||||
total = None
|
||||
rest: list[str] = []
|
||||
for part in parts:
|
||||
if part.startswith("total="):
|
||||
total = part.split("=", 1)[1]
|
||||
else:
|
||||
rest.append(part.replace("_", " "))
|
||||
if not total:
|
||||
return None
|
||||
if rest:
|
||||
return f"Atlas has {total} total nodes ({'; '.join(rest)})."
|
||||
return f"Atlas has {total} total nodes."
|
||||
|
||||
|
||||
def _global_facts(lines: list[str]) -> list[str]:
|
||||
|
||||
@ -78,35 +78,52 @@ class KnowledgeBase:
|
||||
|
||||
def chunk_lines(self, *, max_files: int = 20, max_chars: int = 6000) -> list[str]:
|
||||
self.load()
|
||||
lines: list[str] = []
|
||||
if not self._base:
|
||||
return []
|
||||
lines: list[str] = []
|
||||
self._append_summary(lines)
|
||||
self._append_catalog(lines, max_chars)
|
||||
if not self._within_limit(lines, max_chars):
|
||||
return lines
|
||||
self._append_runbooks(lines)
|
||||
if not self._within_limit(lines, max_chars):
|
||||
return lines
|
||||
self._append_files(lines, max_files=max_files, max_chars=max_chars)
|
||||
return lines
|
||||
|
||||
def _append_summary(self, lines: list[str]) -> None:
|
||||
summary = self.summary()
|
||||
if summary:
|
||||
lines.append(f"KB Summary: {summary}")
|
||||
# Prefer curated catalog JSON if present.
|
||||
if self._atlas:
|
||||
try:
|
||||
atlas_json = json.dumps(self._atlas, indent=2)
|
||||
lines.append("KB: atlas.json")
|
||||
lines.extend(atlas_json.splitlines())
|
||||
except Exception:
|
||||
pass
|
||||
if self._runbooks:
|
||||
lines.append("KB: runbooks.json")
|
||||
for entry in self._runbooks:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
title = entry.get("title")
|
||||
path = entry.get("path")
|
||||
if title and path:
|
||||
lines.append(f"- {title} ({path})")
|
||||
# Include markdown/text sources as additional chunks.
|
||||
if len(lines) >= max_chars:
|
||||
return lines
|
||||
|
||||
def _append_catalog(self, lines: list[str], max_chars: int) -> None:
|
||||
if not self._atlas:
|
||||
return
|
||||
if not self._within_limit(lines, max_chars):
|
||||
return
|
||||
try:
|
||||
atlas_json = json.dumps(self._atlas, indent=2)
|
||||
except Exception:
|
||||
return
|
||||
lines.append("KB: atlas.json")
|
||||
lines.extend(atlas_json.splitlines())
|
||||
|
||||
def _append_runbooks(self, lines: list[str]) -> None:
|
||||
if not self._runbooks:
|
||||
return
|
||||
lines.append("KB: runbooks.json")
|
||||
for entry in self._runbooks:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
title = entry.get("title")
|
||||
path = entry.get("path")
|
||||
if title and path:
|
||||
lines.append(f"- {title} ({path})")
|
||||
|
||||
def _append_files(self, lines: list[str], *, max_files: int, max_chars: int) -> None:
|
||||
files = sorted(self._base.rglob("*.md")) + sorted(self._base.rglob("*.txt"))
|
||||
for path in files:
|
||||
if len(lines) >= max_chars:
|
||||
if not self._within_limit(lines, max_chars):
|
||||
break
|
||||
if len(lines) > max_files * 50:
|
||||
break
|
||||
@ -118,6 +135,7 @@ class KnowledgeBase:
|
||||
continue
|
||||
lines.append(f"KB File: {path.relative_to(self._base)}")
|
||||
lines.extend(text.splitlines())
|
||||
if sum(len(line) for line in lines) >= max_chars:
|
||||
break
|
||||
return lines
|
||||
|
||||
@staticmethod
|
||||
def _within_limit(lines: list[str], max_chars: int) -> bool:
|
||||
return sum(len(line) for line in lines) < max_chars
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user