atlasbot: add KB + read-only tools

This commit is contained in:
Brad Stein 2026-01-06 14:46:36 -03:00
parent 7283a740e6
commit 6728b4f4ae
10 changed files with 5654 additions and 21 deletions

View File

@ -0,0 +1,8 @@
{
"counts": {
"helmrelease_host_hints": 7,
"http_endpoints": 32,
"services": 42,
"workloads": 47
}
}

2656
knowledge/catalog/atlas.json Normal file

File diff suppressed because it is too large Load Diff

1726
knowledge/catalog/atlas.yaml Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,73 @@
[
{
"path": "runbooks/ci-gitea-jenkins.md",
"title": "CI: Gitea \u2192 Jenkins pipeline",
"tags": [
"atlas",
"ci",
"gitea",
"jenkins"
],
"entrypoints": [
"scm.bstein.dev",
"ci.bstein.dev"
],
"source_paths": [
"services/gitea",
"services/jenkins",
"scripts/jenkins_cred_sync.sh",
"scripts/gitea_cred_sync.sh"
],
"body": "# CI: Gitea \u2192 Jenkins pipeline\n\n## What this is\nAtlas uses Gitea for source control and Jenkins for CI. Authentication is via Keycloak (SSO).\n\n## Where it is configured\n- Gitea manifests: `services/gitea/`\n- Jenkins manifests: `services/jenkins/`\n- Credential sync helpers: `scripts/gitea_cred_sync.sh`, `scripts/jenkins_cred_sync.sh`\n\n## What users do (typical flow)\n- Create a repo in Gitea.\n- Create/update a Jenkins job/pipeline that can fetch the repo.\n- Configure a webhook (or SCM polling) so pushes trigger builds.\n\n## Troubleshooting (common)\n- \u201cWebhook not firing\u201d: confirm ingress host, webhook URL, and Jenkins job is reachable.\n- \u201cAuth denied cloning\u201d: confirm Keycloak group membership and that Jenkins has a valid token/credential configured."
},
{
"path": "runbooks/kb-authoring.md",
"title": "KB authoring: what to write (and what not to)",
"tags": [
"atlas",
"kb",
"runbooks"
],
"entrypoints": [],
"source_paths": [
"knowledge/runbooks",
"scripts/knowledge_render_atlas.py"
],
"body": "# KB authoring: what to write (and what not to)\n\n## The goal\nGive Atlas assistants enough grounded, Atlas-specific context to answer \u201chow do I\u2026?\u201d questions without guessing.\n\n## What to capture (high value)\n- User workflows: \u201cclick here, set X, expected result\u201d\n- Operator workflows: \u201cedit these files, reconcile this kustomization, verify with these commands\u201d\n- Wiring: \u201cthis host routes to this service; this service depends on Postgres/Vault/etc\u201d\n- Failure modes: exact error messages + the 2\u20135 checks that usually resolve them\n- Permissions: Keycloak groups/roles and what they unlock\n\n## What to avoid (low value / fluff)\n- Generic Kubernetes explanations (link to upstream docs instead)\n- Copy-pasting large manifests (prefer file paths + small snippets)\n- Anything that will drift quickly (render it from GitOps instead)\n- Any secret values (reference Secret/Vault locations by name only)\n\n## Document pattern (recommended)\nEach runbook should answer:\n- \u201cWhat is this?\u201d\n- \u201cWhat do users do?\u201d\n- \u201cWhat do operators change (where in Git)?\u201d\n- \u201cHow do we verify it works?\u201d\n- \u201cWhat breaks and how to debug it?\u201d"
},
{
"path": "runbooks/observability.md",
"title": "Observability: Grafana + VictoriaMetrics (how to query safely)",
"tags": [
"atlas",
"monitoring",
"grafana",
"victoriametrics"
],
"entrypoints": [
"metrics.bstein.dev",
"alerts.bstein.dev"
],
"source_paths": [
"services/monitoring"
],
"body": "# Observability: Grafana + VictoriaMetrics (how to query safely)\n\n## Where it is configured\n- `services/monitoring/helmrelease.yaml` (Grafana + Alertmanager + VM values)\n- `services/monitoring/grafana-dashboard-*.yaml` (dashboards and their PromQL)\n\n## Using metrics as a \u201ctool\u201d for Atlas assistants\nThe safest pattern is: map a small set of intents \u2192 fixed PromQL queries, then summarize results.\n\nExamples (intents)\n- \u201cIs the cluster healthy?\u201d \u2192 node readiness + pod restart rate\n- \u201cWhy is Element Call failing?\u201d \u2192 LiveKit/coturn pod restarts + synapse errors + ingress 5xx\n- \u201cIs Jenkins slow?\u201d \u2192 pod CPU/memory + HTTP latency metrics (if exported)\n\n## Why dashboards are not the KB\nDashboards are great references, but the assistant should query VictoriaMetrics directly for live answers and keep the\nKB focused on wiring, runbooks, and stable conventions."
},
{
"path": "runbooks/template.md",
"title": "<short title>",
"tags": [
"atlas",
"<service>",
"<topic>"
],
"entrypoints": [
"<hostnames if relevant>"
],
"source_paths": [
"services/<svc>",
"clusters/atlas/<...>"
],
"body": "# <Short title>\n\n## What this is\n\n## For users (how to)\n\n## For operators (where configured)\n\n## Troubleshooting (symptoms \u2192 checks)"
}
]

View File

@ -0,0 +1,176 @@
flowchart LR
host_auth_bstein_dev["auth.bstein.dev"]
svc_sso_oauth2_proxy["sso/oauth2-proxy (Service)"]
host_auth_bstein_dev --> svc_sso_oauth2_proxy
wl_sso_oauth2_proxy["sso/oauth2-proxy (Deployment)"]
svc_sso_oauth2_proxy --> wl_sso_oauth2_proxy
host_bstein_dev["bstein.dev"]
svc_bstein_dev_home_bstein_dev_home_frontend["bstein-dev-home/bstein-dev-home-frontend (Service)"]
host_bstein_dev --> svc_bstein_dev_home_bstein_dev_home_frontend
wl_bstein_dev_home_bstein_dev_home_frontend["bstein-dev-home/bstein-dev-home-frontend (Deployment)"]
svc_bstein_dev_home_bstein_dev_home_frontend --> wl_bstein_dev_home_bstein_dev_home_frontend
svc_comms_othrys_synapse_matrix_synapse["comms/othrys-synapse-matrix-synapse (Service)"]
host_bstein_dev --> svc_comms_othrys_synapse_matrix_synapse
wl_comms_othrys_synapse_matrix_synapse["comms/othrys-synapse-matrix-synapse (Deployment)"]
svc_comms_othrys_synapse_matrix_synapse --> wl_comms_othrys_synapse_matrix_synapse
svc_bstein_dev_home_bstein_dev_home_backend["bstein-dev-home/bstein-dev-home-backend (Service)"]
host_bstein_dev --> svc_bstein_dev_home_bstein_dev_home_backend
wl_bstein_dev_home_bstein_dev_home_backend["bstein-dev-home/bstein-dev-home-backend (Deployment)"]
svc_bstein_dev_home_bstein_dev_home_backend --> wl_bstein_dev_home_bstein_dev_home_backend
host_call_live_bstein_dev["call.live.bstein.dev"]
svc_comms_element_call["comms/element-call (Service)"]
host_call_live_bstein_dev --> svc_comms_element_call
wl_comms_element_call["comms/element-call (Deployment)"]
svc_comms_element_call --> wl_comms_element_call
host_chat_ai_bstein_dev["chat.ai.bstein.dev"]
svc_bstein_dev_home_chat_ai_gateway["bstein-dev-home/chat-ai-gateway (Service)"]
host_chat_ai_bstein_dev --> svc_bstein_dev_home_chat_ai_gateway
wl_bstein_dev_home_chat_ai_gateway["bstein-dev-home/chat-ai-gateway (Deployment)"]
svc_bstein_dev_home_chat_ai_gateway --> wl_bstein_dev_home_chat_ai_gateway
host_ci_bstein_dev["ci.bstein.dev"]
svc_jenkins_jenkins["jenkins/jenkins (Service)"]
host_ci_bstein_dev --> svc_jenkins_jenkins
wl_jenkins_jenkins["jenkins/jenkins (Deployment)"]
svc_jenkins_jenkins --> wl_jenkins_jenkins
host_cloud_bstein_dev["cloud.bstein.dev"]
svc_nextcloud_nextcloud["nextcloud/nextcloud (Service)"]
host_cloud_bstein_dev --> svc_nextcloud_nextcloud
wl_nextcloud_nextcloud["nextcloud/nextcloud (Deployment)"]
svc_nextcloud_nextcloud --> wl_nextcloud_nextcloud
host_kit_live_bstein_dev["kit.live.bstein.dev"]
svc_comms_livekit_token_service["comms/livekit-token-service (Service)"]
host_kit_live_bstein_dev --> svc_comms_livekit_token_service
wl_comms_livekit_token_service["comms/livekit-token-service (Deployment)"]
svc_comms_livekit_token_service --> wl_comms_livekit_token_service
svc_comms_livekit["comms/livekit (Service)"]
host_kit_live_bstein_dev --> svc_comms_livekit
wl_comms_livekit["comms/livekit (Deployment)"]
svc_comms_livekit --> wl_comms_livekit
host_live_bstein_dev["live.bstein.dev"]
svc_comms_othrys_element_element_web["comms/othrys-element-element-web (Service)"]
host_live_bstein_dev --> svc_comms_othrys_element_element_web
wl_comms_othrys_element_element_web["comms/othrys-element-element-web (Deployment)"]
svc_comms_othrys_element_element_web --> wl_comms_othrys_element_element_web
host_live_bstein_dev --> svc_comms_othrys_synapse_matrix_synapse
svc_comms_matrix_wellknown["comms/matrix-wellknown (Service)"]
host_live_bstein_dev --> svc_comms_matrix_wellknown
wl_comms_matrix_wellknown["comms/matrix-wellknown (Deployment)"]
svc_comms_matrix_wellknown --> wl_comms_matrix_wellknown
host_longhorn_bstein_dev["longhorn.bstein.dev"]
svc_longhorn_system_oauth2_proxy_longhorn["longhorn-system/oauth2-proxy-longhorn (Service)"]
host_longhorn_bstein_dev --> svc_longhorn_system_oauth2_proxy_longhorn
wl_longhorn_system_oauth2_proxy_longhorn["longhorn-system/oauth2-proxy-longhorn (Deployment)"]
svc_longhorn_system_oauth2_proxy_longhorn --> wl_longhorn_system_oauth2_proxy_longhorn
host_mail_bstein_dev["mail.bstein.dev"]
svc_mailu_mailserver_mailu_front["mailu-mailserver/mailu-front (Service)"]
host_mail_bstein_dev --> svc_mailu_mailserver_mailu_front
host_matrix_live_bstein_dev["matrix.live.bstein.dev"]
svc_comms_matrix_authentication_service["comms/matrix-authentication-service (Service)"]
host_matrix_live_bstein_dev --> svc_comms_matrix_authentication_service
wl_comms_matrix_authentication_service["comms/matrix-authentication-service (Deployment)"]
svc_comms_matrix_authentication_service --> wl_comms_matrix_authentication_service
host_matrix_live_bstein_dev --> svc_comms_matrix_wellknown
host_matrix_live_bstein_dev --> svc_comms_othrys_synapse_matrix_synapse
host_monero_bstein_dev["monero.bstein.dev"]
svc_crypto_monerod["crypto/monerod (Service)"]
host_monero_bstein_dev --> svc_crypto_monerod
wl_crypto_monerod["crypto/monerod (Deployment)"]
svc_crypto_monerod --> wl_crypto_monerod
host_pegasus_bstein_dev["pegasus.bstein.dev"]
svc_jellyfin_pegasus["jellyfin/pegasus (Service)"]
host_pegasus_bstein_dev --> svc_jellyfin_pegasus
wl_jellyfin_pegasus["jellyfin/pegasus (Deployment)"]
svc_jellyfin_pegasus --> wl_jellyfin_pegasus
host_scm_bstein_dev["scm.bstein.dev"]
svc_gitea_gitea["gitea/gitea (Service)"]
host_scm_bstein_dev --> svc_gitea_gitea
wl_gitea_gitea["gitea/gitea (Deployment)"]
svc_gitea_gitea --> wl_gitea_gitea
host_secret_bstein_dev["secret.bstein.dev"]
svc_vault_vault["vault/vault (Service)"]
host_secret_bstein_dev --> svc_vault_vault
wl_vault_vault["vault/vault (StatefulSet)"]
svc_vault_vault --> wl_vault_vault
host_sso_bstein_dev["sso.bstein.dev"]
svc_sso_keycloak["sso/keycloak (Service)"]
host_sso_bstein_dev --> svc_sso_keycloak
wl_sso_keycloak["sso/keycloak (Deployment)"]
svc_sso_keycloak --> wl_sso_keycloak
host_stream_bstein_dev["stream.bstein.dev"]
svc_jellyfin_jellyfin["jellyfin/jellyfin (Service)"]
host_stream_bstein_dev --> svc_jellyfin_jellyfin
wl_jellyfin_jellyfin["jellyfin/jellyfin (Deployment)"]
svc_jellyfin_jellyfin --> wl_jellyfin_jellyfin
host_vault_bstein_dev["vault.bstein.dev"]
svc_vaultwarden_vaultwarden_service["vaultwarden/vaultwarden-service (Service)"]
host_vault_bstein_dev --> svc_vaultwarden_vaultwarden_service
wl_vaultwarden_vaultwarden["vaultwarden/vaultwarden (Deployment)"]
svc_vaultwarden_vaultwarden_service --> wl_vaultwarden_vaultwarden
subgraph bstein_dev_home[bstein-dev-home]
svc_bstein_dev_home_bstein_dev_home_frontend
wl_bstein_dev_home_bstein_dev_home_frontend
svc_bstein_dev_home_bstein_dev_home_backend
wl_bstein_dev_home_bstein_dev_home_backend
svc_bstein_dev_home_chat_ai_gateway
wl_bstein_dev_home_chat_ai_gateway
end
subgraph comms[comms]
svc_comms_othrys_synapse_matrix_synapse
wl_comms_othrys_synapse_matrix_synapse
svc_comms_element_call
wl_comms_element_call
svc_comms_livekit_token_service
wl_comms_livekit_token_service
svc_comms_livekit
wl_comms_livekit
svc_comms_othrys_element_element_web
wl_comms_othrys_element_element_web
svc_comms_matrix_wellknown
wl_comms_matrix_wellknown
svc_comms_matrix_authentication_service
wl_comms_matrix_authentication_service
end
subgraph crypto[crypto]
svc_crypto_monerod
wl_crypto_monerod
end
subgraph gitea[gitea]
svc_gitea_gitea
wl_gitea_gitea
end
subgraph jellyfin[jellyfin]
svc_jellyfin_pegasus
wl_jellyfin_pegasus
svc_jellyfin_jellyfin
wl_jellyfin_jellyfin
end
subgraph jenkins[jenkins]
svc_jenkins_jenkins
wl_jenkins_jenkins
end
subgraph longhorn_system[longhorn-system]
svc_longhorn_system_oauth2_proxy_longhorn
wl_longhorn_system_oauth2_proxy_longhorn
end
subgraph mailu_mailserver[mailu-mailserver]
svc_mailu_mailserver_mailu_front
end
subgraph nextcloud[nextcloud]
svc_nextcloud_nextcloud
wl_nextcloud_nextcloud
end
subgraph sso[sso]
svc_sso_oauth2_proxy
wl_sso_oauth2_proxy
svc_sso_keycloak
wl_sso_keycloak
end
subgraph vault[vault]
svc_vault_vault
wl_vault_vault
end
subgraph vaultwarden[vaultwarden]
svc_vaultwarden_vaultwarden_service
wl_vaultwarden_vaultwarden
end

View File

@ -0,0 +1,554 @@
#!/usr/bin/env python3
"""Render Atlas knowledge artifacts from Flux + kustomize manifests.
Outputs (committed to git for stable diffs + RAG):
- knowledge/catalog/*.yaml
- knowledge/diagrams/*.mmd
This is intentionally conservative:
- never includes Secret objects
- never includes secret values
- keeps output deterministic (sorted)
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable
import yaml
REPO_ROOT = Path(__file__).resolve().parents[1]
CLUSTER_SCOPED_KINDS = {
"Namespace",
"Node",
"CustomResourceDefinition",
"ClusterRole",
"ClusterRoleBinding",
"StorageClass",
"PersistentVolume",
"MutatingWebhookConfiguration",
"ValidatingWebhookConfiguration",
"APIService",
}
INCLUDED_KINDS = {
"Namespace",
"Deployment",
"StatefulSet",
"DaemonSet",
"Service",
"Ingress",
"IngressRoute", # traefik
"HelmRelease", # only to harvest ingress hostnames from values
}
def _run(cmd: list[str], *, cwd: Path) -> str:
res = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, check=False)
if res.returncode != 0:
raise RuntimeError(
f"Command failed ({res.returncode}): {' '.join(cmd)}\n{res.stderr.strip()}"
)
return res.stdout
def kustomize_build(path: Path) -> str:
rel = path.relative_to(REPO_ROOT)
try:
return _run(["kubectl", "kustomize", str(rel)], cwd=REPO_ROOT)
except Exception as e:
msg = str(e)
if "is not in or below" in msg:
# Repo uses configMapGenerators that reference ../../scripts/*.py.
# Kustomize load restriction must be disabled for a full render.
try:
return _run(
["kubectl", "kustomize", "--load-restrictor=LoadRestrictionsNone", str(rel)],
cwd=REPO_ROOT,
)
except Exception:
pass
return _run(["kustomize", "build", "--load-restrictor=LoadRestrictionsNone", str(rel)], cwd=REPO_ROOT)
def _iter_docs(raw_yaml: str) -> Iterable[dict[str, Any]]:
for doc in yaml.safe_load_all(raw_yaml):
if not isinstance(doc, dict):
continue
kind = doc.get("kind")
if kind == "List" and isinstance(doc.get("items"), list):
for item in doc["items"]:
if isinstance(item, dict):
yield item
continue
if kind:
yield doc
def _meta(doc: dict[str, Any]) -> tuple[str, str | None]:
md = doc.get("metadata") or {}
name = md.get("name") or ""
namespace = md.get("namespace")
return name, namespace
def _is_namespaced(doc: dict[str, Any]) -> bool:
kind = doc.get("kind") or ""
return kind not in CLUSTER_SCOPED_KINDS
@dataclass(frozen=True)
class FluxKustomization:
name: str
path: str
target_namespace: str | None
def find_flux_kustomizations() -> list[FluxKustomization]:
"""Find Flux Kustomization CRs under clusters/atlas/flux-system."""
root = REPO_ROOT / "clusters" / "atlas" / "flux-system"
items: list[FluxKustomization] = []
for file in sorted(root.rglob("*.yaml")):
raw = file.read_text()
for doc in _iter_docs(raw):
if doc.get("kind") != "Kustomization":
continue
api = str(doc.get("apiVersion") or "")
if not api.startswith("kustomize.toolkit.fluxcd.io/"):
continue
name, _ = _meta(doc)
spec = doc.get("spec") or {}
path = spec.get("path")
if not isinstance(path, str) or not path.strip():
continue
items.append(
FluxKustomization(
name=name,
path=path.strip().lstrip("./"),
target_namespace=spec.get("targetNamespace"),
)
)
return sorted(items, key=lambda k: k.name)
def _safe_string_scan_for_hosts(value: Any) -> set[str]:
"""Best-effort host scan from HelmRelease values without chart rendering."""
hosts: set[str] = set()
if isinstance(value, str):
for m in re.finditer(r"(?i)([a-z0-9-]+(?:\.[a-z0-9-]+)+)", value):
host = m.group(1).lower()
if host.endswith("bstein.dev"):
hosts.add(host)
return hosts
if isinstance(value, list):
for item in value:
hosts |= _safe_string_scan_for_hosts(item)
return hosts
if isinstance(value, dict):
for item in value.values():
hosts |= _safe_string_scan_for_hosts(item)
return hosts
return hosts
def _service_ports(svc: dict[str, Any]) -> list[dict[str, Any]]:
spec = svc.get("spec") or {}
out: list[dict[str, Any]] = []
for p in spec.get("ports") or []:
if not isinstance(p, dict):
continue
out.append(
{
"name": p.get("name"),
"port": p.get("port"),
"targetPort": p.get("targetPort"),
"protocol": p.get("protocol", "TCP"),
}
)
return out
def _workload_labels(doc: dict[str, Any]) -> dict[str, str]:
tpl = (doc.get("spec") or {}).get("template") or {}
md = tpl.get("metadata") or {}
labels = md.get("labels") or {}
return {str(k): str(v) for k, v in labels.items()} if isinstance(labels, dict) else {}
def _service_selector(doc: dict[str, Any]) -> dict[str, str]:
spec = doc.get("spec") or {}
sel = spec.get("selector") or {}
return {str(k): str(v) for k, v in sel.items()} if isinstance(sel, dict) else {}
def _selector_matches(selector: dict[str, str], labels: dict[str, str]) -> bool:
if not selector:
return False
return all(labels.get(k) == v for k, v in selector.items())
def _sanitize_node_id(text: str) -> str:
return re.sub(r"[^a-zA-Z0-9_]", "_", text)
def extract_catalog(
rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]],
) -> tuple[dict[str, Any], dict[str, Any], str]:
"""Build knowledge catalog + mermaid diagram from rendered docs."""
# Index workloads and services for mapping.
workloads: dict[tuple[str, str], dict[str, Any]] = {}
services: dict[tuple[str, str], dict[str, Any]] = {}
ingresses: list[dict[str, Any]] = []
ingressroutes: list[dict[str, Any]] = []
helmrelease_hosts: dict[str, list[str]] = {}
for src, docs in rendered:
for doc in docs:
kind = doc.get("kind")
if kind not in INCLUDED_KINDS:
continue
if kind == "Secret":
continue
name, namespace = _meta(doc)
if _is_namespaced(doc) and not namespace and src.target_namespace:
namespace = src.target_namespace
doc = dict(doc)
doc.setdefault("metadata", {})["namespace"] = namespace
if kind in ("Deployment", "StatefulSet", "DaemonSet"):
workloads[(namespace or "", name)] = {
"kind": kind,
"namespace": namespace or "",
"name": name,
"labels": _workload_labels(doc),
"serviceAccountName": ((doc.get("spec") or {}).get("template") or {})
.get("spec", {})
.get("serviceAccountName"),
"nodeSelector": ((doc.get("spec") or {}).get("template") or {})
.get("spec", {})
.get("nodeSelector", {}),
"images": sorted(
{
c.get("image")
for c in (
(((doc.get("spec") or {}).get("template") or {}).get("spec") or {}).get(
"containers"
)
or []
)
if isinstance(c, dict) and c.get("image")
}
),
}
elif kind == "Service":
services[(namespace or "", name)] = {
"namespace": namespace or "",
"name": name,
"type": (doc.get("spec") or {}).get("type", "ClusterIP"),
"selector": _service_selector(doc),
"ports": _service_ports(doc),
}
elif kind == "Ingress":
ingresses.append({"source": src.name, "doc": doc})
elif kind == "IngressRoute":
ingressroutes.append({"source": src.name, "doc": doc})
elif kind == "HelmRelease":
spec = doc.get("spec") or {}
vals = spec.get("values") or {}
hosts = sorted(_safe_string_scan_for_hosts(vals))
if hosts:
helmrelease_hosts[f"{src.name}:{namespace or ''}/{name}"] = hosts
# Map services to workloads.
service_to_workloads: dict[tuple[str, str], list[dict[str, str]]] = {}
for (ns, svc_name), svc in services.items():
selector = svc.get("selector") or {}
matches: list[dict[str, str]] = []
for (w_ns, w_name), w in workloads.items():
if w_ns != ns:
continue
if _selector_matches(selector, w.get("labels") or {}):
matches.append({"kind": w["kind"], "name": w_name})
service_to_workloads[(ns, svc_name)] = sorted(matches, key=lambda m: (m["kind"], m["name"]))
# Extract HTTP endpoints.
endpoints: list[dict[str, Any]] = []
def add_endpoint(
*,
host: str,
path: str,
namespace: str,
service: str,
port: Any,
source: str,
kind: str,
obj_name: str,
):
wk = service_to_workloads.get((namespace, service), [])
endpoints.append(
{
"host": host,
"path": path,
"backend": {
"namespace": namespace,
"service": service,
"port": port,
"workloads": wk,
},
"via": {"kind": kind, "name": obj_name, "source": source},
}
)
for item in ingresses:
doc = item["doc"]
source = item["source"]
name, namespace = _meta(doc)
namespace = namespace or ""
spec = doc.get("spec") or {}
for rule in spec.get("rules") or []:
if not isinstance(rule, dict):
continue
host = (rule.get("host") or "").strip()
http = rule.get("http") or {}
for p in http.get("paths") or []:
if not isinstance(p, dict):
continue
backend = (p.get("backend") or {}).get("service") or {}
svc_name = backend.get("name")
svc_port = (backend.get("port") or {}).get("number") or (backend.get("port") or {}).get("name")
if not host or not svc_name:
continue
add_endpoint(
host=host,
path=p.get("path") or "/",
namespace=namespace,
service=svc_name,
port=svc_port,
source=source,
kind="Ingress",
obj_name=name,
)
host_re = re.compile(r"Host\(`([^`]+)`\)")
pathprefix_re = re.compile(r"PathPrefix\(`([^`]+)`\)")
for item in ingressroutes:
doc = item["doc"]
source = item["source"]
name, namespace = _meta(doc)
namespace = namespace or ""
spec = doc.get("spec") or {}
for route in spec.get("routes") or []:
if not isinstance(route, dict):
continue
match = route.get("match") or ""
hosts = host_re.findall(match)
pathprefixes = pathprefix_re.findall(match) or ["/"]
for svc in route.get("services") or []:
if not isinstance(svc, dict):
continue
svc_name = svc.get("name")
svc_port = svc.get("port")
if not svc_name:
continue
for host in hosts:
for pp in pathprefixes:
add_endpoint(
host=host,
path=pp,
namespace=namespace,
service=svc_name,
port=svc_port,
source=source,
kind="IngressRoute",
obj_name=name,
)
endpoints = sorted(
endpoints,
key=lambda e: (
e["host"],
e["path"],
e["backend"]["namespace"],
e["backend"]["service"],
),
)
catalog = {
"cluster": "atlas",
"sources": [
{"name": k.name, "path": k.path, "targetNamespace": k.target_namespace}
for k, _ in rendered
],
"workloads": sorted(
list(workloads.values()),
key=lambda w: (w["namespace"], w["kind"], w["name"]),
),
"services": sorted(
list(services.values()),
key=lambda s: (s["namespace"], s["name"]),
),
"http_endpoints": endpoints,
"helmrelease_host_hints": {k: v for k, v in sorted(helmrelease_hosts.items())},
}
# Mermaid diagram: host -> service -> workload (grouped by namespace).
ns_nodes: dict[str, list[str]] = {}
lines: list[str] = ["flowchart LR"]
edges: set[tuple[str, str]] = set()
def ensure_ns_node(ns: str, node_id: str):
ns_nodes.setdefault(ns, [])
if node_id not in ns_nodes[ns]:
ns_nodes[ns].append(node_id)
host_nodes: dict[str, str] = {}
for ep in endpoints:
host = ep["host"]
host_id = host_nodes.get(host)
if not host_id:
host_id = f"host_{_sanitize_node_id(host)}"
host_nodes[host] = host_id
lines.append(f' {host_id}["{host}"]')
ns = ep["backend"]["namespace"]
svc = ep["backend"]["service"]
svc_id = f"svc_{_sanitize_node_id(ns)}_{_sanitize_node_id(svc)}"
if svc_id not in ns_nodes.get(ns, []):
lines.append(f' {svc_id}["{ns}/{svc} (Service)"]')
ensure_ns_node(ns, svc_id)
if (host_id, svc_id) not in edges:
edges.add((host_id, svc_id))
lines.append(f" {host_id} --> {svc_id}")
for w in ep["backend"]["workloads"]:
w_id = f"wl_{_sanitize_node_id(ns)}_{_sanitize_node_id(w['name'])}"
if w_id not in ns_nodes.get(ns, []):
lines.append(f' {w_id}["{ns}/{w["name"]} ({w["kind"]})"]')
ensure_ns_node(ns, w_id)
if (svc_id, w_id) not in edges:
edges.add((svc_id, w_id))
lines.append(f" {svc_id} --> {w_id}")
# Wrap namespace subgraphs at the end for stability (sorted namespaces).
if ns_nodes:
lines.append("")
for ns in sorted(ns_nodes.keys()):
lines.append(f" subgraph { _sanitize_node_id(ns) }[{ns}]")
for node_id in ns_nodes[ns]:
lines.append(f" {node_id}")
lines.append(" end")
diagram = "\n".join(lines).rstrip() + "\n"
summary = {
"counts": {
"workloads": len(workloads),
"services": len(services),
"http_endpoints": len(endpoints),
"helmrelease_host_hints": sum(len(v) for v in helmrelease_hosts.values()),
}
}
return catalog, summary, diagram
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--out", default="knowledge", help="Output base directory (default: knowledge/)")
ap.add_argument(
"--write",
action="store_true",
help="Write generated files (otherwise just print a summary).",
)
args = ap.parse_args()
out_dir = REPO_ROOT / args.out
flux = find_flux_kustomizations()
if not flux:
print("No Flux Kustomizations found under clusters/atlas/flux-system.", file=sys.stderr)
return 2
rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]] = []
for k in flux:
path = REPO_ROOT / k.path
if not path.exists():
continue
raw = kustomize_build(path)
docs = [d for d in _iter_docs(raw) if d.get("kind") != "Secret"]
rendered.append((k, docs))
rendered = sorted(rendered, key=lambda item: item[0].name)
catalog, summary, diagram = extract_catalog(rendered)
if not args.write:
print(json.dumps(summary, indent=2, sort_keys=True))
return 0
(out_dir / "catalog").mkdir(parents=True, exist_ok=True)
(out_dir / "diagrams").mkdir(parents=True, exist_ok=True)
catalog_path = out_dir / "catalog" / "atlas.yaml"
catalog_json_path = out_dir / "catalog" / "atlas.json"
summary_path = out_dir / "catalog" / "atlas-summary.json"
diagram_path = out_dir / "diagrams" / "atlas-http.mmd"
runbooks_json_path = out_dir / "catalog" / "runbooks.json"
catalog_path.write_text(
"# Generated by scripts/knowledge_render_atlas.py (do not edit by hand)\n"
+ yaml.safe_dump(catalog, sort_keys=False),
encoding="utf-8",
)
catalog_json_path.write_text(json.dumps(catalog, indent=2, sort_keys=False) + "\n", encoding="utf-8")
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
diagram_path.write_text(diagram, encoding="utf-8")
# Render runbooks into JSON for lightweight, dependency-free consumption in-cluster.
runbooks_dir = out_dir / "runbooks"
runbooks: list[dict[str, Any]] = []
if runbooks_dir.exists():
for md_file in sorted(runbooks_dir.glob("*.md")):
raw = md_file.read_text(encoding="utf-8")
fm: dict[str, Any] = {}
body = raw
if raw.startswith("---\n"):
try:
_, rest = raw.split("---\n", 1)
fm_raw, body = rest.split("\n---\n", 1)
fm = yaml.safe_load(fm_raw) or {}
except Exception:
fm = {}
body = raw
runbooks.append(
{
"path": str(md_file.relative_to(out_dir)),
"title": fm.get("title") or md_file.stem,
"tags": fm.get("tags") or [],
"entrypoints": fm.get("entrypoints") or [],
"source_paths": fm.get("source_paths") or [],
"body": body.strip(),
}
)
runbooks_json_path.write_text(json.dumps(runbooks, indent=2, sort_keys=False) + "\n", encoding="utf-8")
print(f"Wrote {catalog_path.relative_to(REPO_ROOT)}")
print(f"Wrote {catalog_json_path.relative_to(REPO_ROOT)}")
print(f"Wrote {summary_path.relative_to(REPO_ROOT)}")
print(f"Wrote {diagram_path.relative_to(REPO_ROOT)}")
print(f"Wrote {runbooks_json_path.relative_to(REPO_ROOT)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@ -5,22 +5,74 @@ metadata:
name: atlasbot name: atlasbot
data: data:
bot.py: | bot.py: |
import json, os, time, collections, re import collections
from urllib import request, parse, error import json
import os
import re
import ssl
import time
from urllib import error, parse, request
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008") BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080") AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"] USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"] PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev" ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "") API_KEY = os.environ.get("CHAT_API_KEY", "")
KB_DIR = os.environ.get("KB_DIR", "")
VM_URL = os.environ.get("VM_URL", "http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428")
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas") BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MAX_KB_CHARS = int(os.environ.get("ATLASBOT_MAX_KB_CHARS", "2500"))
MAX_TOOL_CHARS = int(os.environ.get("ATLASBOT_MAX_TOOL_CHARS", "2500"))
TOKEN_RE = re.compile(r"[a-z0-9][a-z0-9_.-]{1,}", re.IGNORECASE)
HOST_RE = re.compile(r"(?i)([a-z0-9-]+(?:\\.[a-z0-9-]+)+)")
STOPWORDS = {
"the",
"and",
"for",
"with",
"this",
"that",
"from",
"into",
"what",
"how",
"why",
"when",
"where",
"which",
"who",
"can",
"could",
"should",
"would",
"please",
"help",
"atlas",
"othrys",
}
def _tokens(text: str) -> list[str]:
toks = [t.lower() for t in TOKEN_RE.findall(text or "")]
return [t for t in toks if t not in STOPWORDS and len(t) >= 2]
# Mention detection (Matrix rich mentions + plain @atlas).
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)", re.IGNORECASE) MENTION_RE = re.compile(
r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)",
re.IGNORECASE,
)
def normalize_user_id(token: str) -> str: def normalize_user_id(token: str) -> str:
t = token.strip() t = token.strip()
if not t: if not t:
@ -43,6 +95,8 @@ data:
return False return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
# Matrix HTTP helper.
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path url = (base or BASE) + path
data = None data = None
@ -78,31 +132,317 @@ data:
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message" path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text}) req("POST", path, token, body={"msgtype": "m.text", "body": text})
history = collections.defaultdict(list) # (room_id, sender|None) -> list of str (short transcript)
# Atlas KB loader (no external deps; files are pre-rendered JSON via scripts/knowledge_render_atlas.py).
KB = {"catalog": {}, "runbooks": []}
_HOST_INDEX: dict[str, list[dict]] = {}
_NAME_INDEX: set[str] = set()
def _load_json_file(path: str) -> Any | None:
try:
with open(path, "rb") as f:
return json.loads(f.read().decode("utf-8"))
except Exception:
return None
def load_kb():
global KB, _HOST_INDEX, _NAME_INDEX
if not KB_DIR:
return
catalog = _load_json_file(os.path.join(KB_DIR, "catalog", "atlas.json")) or {}
runbooks = _load_json_file(os.path.join(KB_DIR, "catalog", "runbooks.json")) or []
KB = {"catalog": catalog, "runbooks": runbooks}
host_index: dict[str, list[dict]] = collections.defaultdict(list)
for ep in catalog.get("http_endpoints", []) if isinstance(catalog, dict) else []:
host = (ep.get("host") or "").lower()
if host:
host_index[host].append(ep)
_HOST_INDEX = {k: host_index[k] for k in sorted(host_index.keys())}
names: set[str] = set()
for s in catalog.get("services", []) if isinstance(catalog, dict) else []:
if isinstance(s, dict) and s.get("name"):
names.add(str(s["name"]).lower())
for w in catalog.get("workloads", []) if isinstance(catalog, dict) else []:
if isinstance(w, dict) and w.get("name"):
names.add(str(w["name"]).lower())
_NAME_INDEX = names
def kb_retrieve(query: str, *, limit: int = 3) -> str:
q = (query or "").strip()
if not q or not KB.get("runbooks"):
return ""
ql = q.lower()
q_tokens = _tokens(q)
if not q_tokens:
return ""
scored: list[tuple[int, dict]] = []
for doc in KB.get("runbooks", []):
if not isinstance(doc, dict):
continue
title = str(doc.get("title") or "")
body = str(doc.get("body") or "")
tags = doc.get("tags") or []
entrypoints = doc.get("entrypoints") or []
hay = (title + "\n" + " ".join(tags) + "\n" + " ".join(entrypoints) + "\n" + body).lower()
score = 0
for t in set(q_tokens):
if t in hay:
score += 3 if t in title.lower() else 1
for h in entrypoints:
if isinstance(h, str) and h.lower() in ql:
score += 4
if score:
scored.append((score, doc))
scored.sort(key=lambda x: x[0], reverse=True)
picked = [d for _, d in scored[:limit]]
if not picked:
return ""
parts: list[str] = ["Atlas KB (retrieved):"]
used = 0
for d in picked:
path = d.get("path") or ""
title = d.get("title") or path
body = (d.get("body") or "").strip()
snippet = body[:900].strip()
chunk = f"- {title} ({path})\n{snippet}"
if used + len(chunk) > MAX_KB_CHARS:
break
parts.append(chunk)
used += len(chunk)
return "\n".join(parts).strip()
def catalog_hints(query: str) -> tuple[str, list[tuple[str, str]]]:
q = (query or "").strip()
if not q or not KB.get("catalog"):
return "", []
ql = q.lower()
hosts = {m.group(1).lower() for m in HOST_RE.finditer(ql) if m.group(1).lower().endswith("bstein.dev")}
# Also match by known workload/service names.
for t in _tokens(ql):
if t in _NAME_INDEX:
hosts |= {ep["host"].lower() for ep in KB["catalog"].get("http_endpoints", []) if isinstance(ep, dict) and ep.get("backend", {}).get("service") == t}
edges: list[tuple[str, str]] = []
lines: list[str] = []
for host in sorted(hosts):
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
svc = backend.get("service") or ""
path = ep.get("path") or "/"
if not svc:
continue
wk = backend.get("workloads") or []
wk_str = ", ".join(f"{w.get('kind')}:{w.get('name')}" for w in wk if isinstance(w, dict) and w.get("name")) or "unknown"
lines.append(f"- {host}{path} → {ns}/{svc} → {wk_str}")
for w in wk:
if isinstance(w, dict) and w.get("name"):
edges.append((ns, str(w["name"])))
if not lines:
return "", []
return "Atlas endpoints (from GitOps):\n" + "\n".join(lines[:20]), edges
# Kubernetes API (read-only). RBAC is provided via ServiceAccount atlasbot.
_K8S_TOKEN: str | None = None
_K8S_CTX: ssl.SSLContext | None = None
def _k8s_context() -> ssl.SSLContext:
global _K8S_CTX
if _K8S_CTX is not None:
return _K8S_CTX
ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
ctx = ssl.create_default_context(cafile=ca_path)
_K8S_CTX = ctx
return ctx
def _k8s_token() -> str:
global _K8S_TOKEN
if _K8S_TOKEN:
return _K8S_TOKEN
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
with open(token_path, "r", encoding="utf-8") as f:
_K8S_TOKEN = f.read().strip()
return _K8S_TOKEN
def k8s_get(path: str, timeout: int = 8) -> dict:
host = os.environ.get("KUBERNETES_SERVICE_HOST")
port = os.environ.get("KUBERNETES_SERVICE_PORT_HTTPS") or os.environ.get("KUBERNETES_SERVICE_PORT") or "443"
if not host:
raise RuntimeError("k8s host missing")
url = f"https://{host}:{port}{path}"
headers = {"Authorization": f"Bearer {_k8s_token()}"}
r = request.Request(url, headers=headers, method="GET")
with request.urlopen(r, timeout=timeout, context=_k8s_context()) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def k8s_pods(namespace: str) -> list[dict]:
data = k8s_get(f"/api/v1/namespaces/{parse.quote(namespace)}/pods?limit=500")
items = data.get("items") or []
return items if isinstance(items, list) else []
def summarize_pods(namespace: str, prefixes: set[str] | None = None) -> str:
try:
pods = k8s_pods(namespace)
except Exception:
return ""
out: list[str] = []
for p in pods:
md = p.get("metadata") or {}
st = p.get("status") or {}
name = md.get("name") or ""
if prefixes and not any(name.startswith(pref + "-") or name == pref or name.startswith(pref) for pref in prefixes):
continue
phase = st.get("phase") or "?"
cs = st.get("containerStatuses") or []
restarts = 0
ready = 0
total = 0
reason = st.get("reason") or ""
for c in cs if isinstance(cs, list) else []:
if not isinstance(c, dict):
continue
total += 1
restarts += int(c.get("restartCount") or 0)
if c.get("ready"):
ready += 1
state = c.get("state") or {}
if not reason and isinstance(state, dict):
waiting = state.get("waiting") or {}
if isinstance(waiting, dict) and waiting.get("reason"):
reason = waiting.get("reason")
extra = f" ({reason})" if reason else ""
out.append(f"- {namespace}/{name}: {phase} {ready}/{total} restarts={restarts}{extra}")
return "\n".join(out[:20])
def flux_not_ready() -> str:
try:
data = k8s_get(
"/apis/kustomize.toolkit.fluxcd.io/v1/namespaces/flux-system/kustomizations?limit=200"
)
except Exception:
return ""
items = data.get("items") or []
bad: list[str] = []
for it in items if isinstance(items, list) else []:
md = it.get("metadata") or {}
st = it.get("status") or {}
name = md.get("name") or ""
conds = st.get("conditions") or []
ready = None
msg = ""
for c in conds if isinstance(conds, list) else []:
if isinstance(c, dict) and c.get("type") == "Ready":
ready = c.get("status")
msg = c.get("message") or ""
if ready not in ("True", True):
bad.append(f"- flux kustomization/{name}: Ready={ready} {msg}".strip())
return "\n".join(bad[:10])
# VictoriaMetrics (PromQL) helpers.
def vm_query(query: str, timeout: int = 8) -> dict | None:
try:
url = VM_URL.rstrip("/") + "/api/v1/query?" + parse.urlencode({"query": query})
with request.urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode())
except Exception:
return None
def vm_top_restarts(hours: int = 1) -> str:
q = f"topk(5, sum by (namespace,pod) (increase(kube_pod_container_status_restarts_total[{hours}h])))"
res = vm_query(q)
if not res or (res.get("status") != "success"):
return ""
out: list[str] = []
for r in (res.get("data") or {}).get("result") or []:
if not isinstance(r, dict):
continue
m = r.get("metric") or {}
v = r.get("value") or []
ns = (m.get("namespace") or "").strip()
pod = (m.get("pod") or "").strip()
val = v[1] if isinstance(v, list) and len(v) > 1 else ""
if pod:
out.append(f"- restarts({hours}h): {ns}/{pod} = {val}")
return "\n".join(out)
# Conversation state.
history = collections.defaultdict(list) # (room_id, sender|None) -> list[str] (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool): def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender) return (room_id, None) if is_dm else (room_id, sender)
def ollama_reply(hist_key, prompt: str) -> str: def build_context(prompt: str, *, allow_tools: bool, targets: list[tuple[str, str]]) -> str:
parts: list[str] = []
kb = kb_retrieve(prompt)
if kb:
parts.append(kb)
endpoints, edges = catalog_hints(prompt)
if endpoints:
parts.append(endpoints)
if allow_tools:
# Scope pod summaries to relevant namespaces/workloads when possible.
prefixes_by_ns: dict[str, set[str]] = collections.defaultdict(set)
for ns, name in (targets or []) + (edges or []):
if ns and name:
prefixes_by_ns[ns].add(name)
pod_lines: list[str] = []
for ns in sorted(prefixes_by_ns.keys()):
summary = summarize_pods(ns, prefixes_by_ns[ns])
if summary:
pod_lines.append(f"Pods (live):\n{summary}")
if pod_lines:
parts.append("\n".join(pod_lines)[:MAX_TOOL_CHARS])
flux_bad = flux_not_ready()
if flux_bad:
parts.append("Flux (not ready):\n" + flux_bad)
restarts = vm_top_restarts(1)
if restarts:
parts.append("VictoriaMetrics (top restarts 1h):\n" + restarts)
return "\n\n".join([p for p in parts if p]).strip()
def ollama_reply(hist_key, prompt: str, *, context: str) -> str:
try: try:
# Keep short context as plain text transcript system = (
transcript = "\n".join( "System: You are Atlas, the Titan lab assistant for Atlas/Othrys. "
["System: You are Atlas, the Titan lab assistant for Othrys. Be helpful, direct, and concise."] "Be helpful, direct, and concise. "
+ history[hist_key][-24:] "Prefer answering with exact repo paths and Kubernetes resource names. "
+ [f"User: {prompt}"] "Never include or request secret values."
) )
transcript_parts = [system]
if context:
transcript_parts.append("Context (grounded):\n" + context[:MAX_KB_CHARS])
transcript_parts.extend(history[hist_key][-24:])
transcript_parts.append(f"User: {prompt}")
transcript = "\n".join(transcript_parts)
payload = {"model": MODEL, "message": transcript} payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
if API_KEY: if API_KEY:
headers["x-api-key"] = API_KEY headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=15) as resp: with request.urlopen(r, timeout=20) as resp:
data = json.loads(resp.read().decode()) data = json.loads(resp.read().decode())
reply = data.get("message") or data.get("response") or data.get("reply") or "I'm here to help." reply = data.get("message") or data.get("response") or data.get("reply") or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}") history[hist_key].append(f"Atlas: {reply}")
return reply return reply
except Exception: except Exception:
return "Hi! I'm Atlas." return "Im here — but I couldnt reach the model backend."
def sync_loop(token: str, room_id: str): def sync_loop(token: str, room_id: str):
since = None since = None
@ -111,6 +451,7 @@ data:
since = res.get("next_batch") since = res.get("next_batch")
except Exception: except Exception:
pass pass
while True: while True:
params = {"timeout": 30000} params = {"timeout": 30000}
if since: if since:
@ -133,28 +474,48 @@ data:
# messages # messages
for rid, data in res.get("rooms", {}).get("join", {}).items(): for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", []) timeline = data.get("timeline", {}).get("events", [])
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
for ev in timeline: for ev in timeline:
if ev.get("type") != "m.room.message": if ev.get("type") != "m.room.message":
continue continue
content = ev.get("content", {}) content = ev.get("content", {})
body = content.get("body", "") body = (content.get("body", "") or "").strip()
if not body.strip(): if not body:
continue continue
sender = ev.get("sender", "") sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev": if sender == f"@{USER}:live.bstein.dev":
continue continue
# Only respond if bot is mentioned or in a DM
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
mentioned = is_mentioned(content, body) mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm) hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}") history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:] history[hist_key] = history[hist_key][-80:]
if is_dm or mentioned:
reply = ollama_reply(hist_key, body) if not (is_dm or mentioned):
send_msg(token, rid, reply) continue
# Only do live cluster/metrics introspection in DMs.
allow_tools = is_dm
# Attempt to scope tools to the most likely workloads when hostnames are mentioned.
targets: list[tuple[str, str]] = []
for m in HOST_RE.finditer(body.lower()):
host = m.group(1).lower()
for ep in _HOST_INDEX.get(host, []):
backend = ep.get("backend") or {}
ns = backend.get("namespace") or ""
for w in backend.get("workloads") or []:
if isinstance(w, dict) and w.get("name"):
targets.append((ns, str(w["name"])))
context = build_context(body, allow_tools=allow_tools, targets=targets)
reply = ollama_reply(hist_key, body, context=context)
send_msg(token, rid, reply)
def main(): def main():
load_kb()
token = login() token = login()
try: try:
room_id = resolve_alias(token, ROOM_ALIAS) room_id = resolve_alias(token, ROOM_ALIAS)

View File

@ -16,8 +16,9 @@ spec:
labels: labels:
app: atlasbot app: atlasbot
annotations: annotations:
checksum/atlasbot-configmap: c57538d33dc02db7aaf7b2f4681f50620c2cbcde8ddc1c51ccb5fa693247b00a checksum/atlasbot-configmap: b9796738bbbc50fd5c70db0bd4fbffe986fd2728a7487186e39ff7ecabefbd1e
spec: spec:
serviceAccountName: atlasbot
nodeSelector: nodeSelector:
hardware: rpi5 hardware: rpi5
containers: containers:
@ -32,6 +33,10 @@ spec:
value: http://othrys-synapse-matrix-synapse:8008 value: http://othrys-synapse-matrix-synapse:8008
- name: AUTH_BASE - name: AUTH_BASE
value: http://matrix-authentication-service:8080 value: http://matrix-authentication-service:8080
- name: KB_DIR
value: /kb
- name: VM_URL
value: http://victoria-metrics-single-server.monitoring.svc.cluster.local:8428
- name: BOT_USER - name: BOT_USER
value: atlasbot value: atlasbot
- name: BOT_PASS - name: BOT_PASS
@ -59,7 +64,24 @@ spec:
- name: code - name: code
mountPath: /app/bot.py mountPath: /app/bot.py
subPath: bot.py subPath: bot.py
- name: kb
mountPath: /kb
readOnly: true
volumes: volumes:
- name: code - name: code
configMap: configMap:
name: atlasbot name: atlasbot
- name: kb
configMap:
name: atlas-kb
items:
- key: INDEX.md
path: INDEX.md
- key: atlas.json
path: catalog/atlas.json
- key: atlas-summary.json
path: catalog/atlas-summary.json
- key: runbooks.json
path: catalog/runbooks.json
- key: atlas-http.mmd
path: diagrams/atlas-http.mmd

View File

@ -0,0 +1,47 @@
# services/communication/atlasbot-rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: atlasbot
namespace: comms
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: atlasbot-readonly
rules:
- apiGroups: [""]
resources: ["namespaces", "nodes", "pods", "services", "endpoints", "events"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources: ["deployments", "statefulsets", "daemonsets", "replicasets"]
verbs: ["get", "list", "watch"]
- apiGroups: ["networking.k8s.io"]
resources: ["ingresses"]
verbs: ["get", "list", "watch"]
- apiGroups: ["traefik.io"]
resources: ["ingressroutes", "middlewares", "serverstransports"]
verbs: ["get", "list", "watch"]
- apiGroups: ["kustomize.toolkit.fluxcd.io"]
resources: ["kustomizations"]
verbs: ["get", "list", "watch"]
- apiGroups: ["helm.toolkit.fluxcd.io"]
resources: ["helmreleases"]
verbs: ["get", "list", "watch"]
- apiGroups: ["source.toolkit.fluxcd.io"]
resources: ["gitrepositories", "helmrepositories", "buckets"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: atlasbot-readonly
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: atlasbot-readonly
subjects:
- kind: ServiceAccount
name: atlasbot
namespace: comms

View File

@ -3,6 +3,7 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization kind: Kustomization
namespace: comms namespace: comms
resources: resources:
- atlasbot-rbac.yaml
- synapse-rendered.yaml - synapse-rendered.yaml
- synapse-signingkey-ensure-job.yaml - synapse-signingkey-ensure-job.yaml
- mas-configmap.yaml - mas-configmap.yaml
@ -29,3 +30,12 @@ resources:
patchesStrategicMerge: patchesStrategicMerge:
- synapse-deployment-strategy-patch.yaml - synapse-deployment-strategy-patch.yaml
configMapGenerator:
- name: atlas-kb
files:
- INDEX.md=../../knowledge/INDEX.md
- atlas.json=../../knowledge/catalog/atlas.json
- atlas-summary.json=../../knowledge/catalog/atlas-summary.json
- runbooks.json=../../knowledge/catalog/runbooks.json
- atlas-http.mmd=../../knowledge/diagrams/atlas-http.mmd