titan-iac/scripts/knowledge_render_atlas.py

644 lines
22 KiB
Python

#!/usr/bin/env python3
"""Render Atlas knowledge artifacts from Flux + kustomize manifests.
Outputs (committed to git for stable diffs + RAG):
- knowledge/catalog/*.yaml
- knowledge/diagrams/*.mmd
This is intentionally conservative:
- never includes Secret objects
- never includes secret values
- keeps output deterministic (sorted)
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
import shutil
from typing import Any, Iterable
import yaml
REPO_ROOT = Path(__file__).resolve().parents[1]
DASHBOARD_DIR = REPO_ROOT / "services" / "monitoring" / "dashboards"
CLUSTER_SCOPED_KINDS = {
"Namespace",
"Node",
"CustomResourceDefinition",
"ClusterRole",
"ClusterRoleBinding",
"StorageClass",
"PersistentVolume",
"MutatingWebhookConfiguration",
"ValidatingWebhookConfiguration",
"APIService",
}
INCLUDED_KINDS = {
"Namespace",
"Deployment",
"StatefulSet",
"DaemonSet",
"Service",
"Ingress",
"IngressRoute", # traefik
"HelmRelease", # only to harvest ingress hostnames from values
}
def _run(cmd: list[str], *, cwd: Path) -> str:
res = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, check=False)
if res.returncode != 0:
raise RuntimeError(
f"Command failed ({res.returncode}): {' '.join(cmd)}\n{res.stderr.strip()}"
)
return res.stdout
def _sync_tree(source: Path, dest: Path) -> None:
if dest.exists():
shutil.rmtree(dest)
shutil.copytree(source, dest)
def _iter_dashboard_panels(dashboard: dict[str, Any]) -> Iterable[dict[str, Any]]:
panels = dashboard.get("panels") if isinstance(dashboard.get("panels"), list) else []
for panel in panels:
if not isinstance(panel, dict):
continue
if panel.get("type") == "row" and isinstance(panel.get("panels"), list):
yield from _iter_dashboard_panels({"panels": panel.get("panels")})
continue
yield panel
def _extract_metrics_index(dashboard_dir: Path) -> list[dict[str, Any]]:
index: list[dict[str, Any]] = []
for path in sorted(dashboard_dir.glob("*.json")):
try:
data = json.loads(path.read_text(encoding="utf-8"))
except json.JSONDecodeError:
continue
if not isinstance(data, dict):
continue
dash_title = data.get("title") or path.stem
dash_tags = data.get("tags") or []
for panel in _iter_dashboard_panels(data):
targets = panel.get("targets")
if not isinstance(targets, list):
continue
exprs: list[str] = []
for target in targets:
if not isinstance(target, dict):
continue
expr = target.get("expr")
if isinstance(expr, str) and expr.strip():
exprs.append(expr.strip())
if not exprs:
continue
datasource = panel.get("datasource") or {}
if isinstance(datasource, dict):
ds_uid = datasource.get("uid")
ds_type = datasource.get("type")
else:
ds_uid = None
ds_type = None
index.append(
{
"dashboard": dash_title,
"panel_title": panel.get("title") or "",
"panel_id": panel.get("id"),
"panel_type": panel.get("type"),
"description": panel.get("description") or "",
"tags": dash_tags,
"datasource_uid": ds_uid,
"datasource_type": ds_type,
"exprs": exprs,
}
)
return index
def kustomize_build(path: Path) -> str:
rel = path.relative_to(REPO_ROOT)
try:
return _run(["kubectl", "kustomize", str(rel)], cwd=REPO_ROOT)
except Exception as e:
msg = str(e)
if "is not in or below" in msg:
# Repo uses configMapGenerators that reference ../../scripts/*.py.
# Kustomize load restriction must be disabled for a full render.
try:
return _run(
["kubectl", "kustomize", "--load-restrictor=LoadRestrictionsNone", str(rel)],
cwd=REPO_ROOT,
)
except Exception:
pass
return _run(["kustomize", "build", "--load-restrictor=LoadRestrictionsNone", str(rel)], cwd=REPO_ROOT)
def _iter_docs(raw_yaml: str) -> Iterable[dict[str, Any]]:
for doc in yaml.safe_load_all(raw_yaml):
if not isinstance(doc, dict):
continue
kind = doc.get("kind")
if kind == "List" and isinstance(doc.get("items"), list):
for item in doc["items"]:
if isinstance(item, dict):
yield item
continue
if kind:
yield doc
def _meta(doc: dict[str, Any]) -> tuple[str, str | None]:
md = doc.get("metadata") or {}
name = md.get("name") or ""
namespace = md.get("namespace")
return name, namespace
def _is_namespaced(doc: dict[str, Any]) -> bool:
kind = doc.get("kind") or ""
return kind not in CLUSTER_SCOPED_KINDS
@dataclass(frozen=True)
class FluxKustomization:
name: str
path: str
target_namespace: str | None
def find_flux_kustomizations() -> list[FluxKustomization]:
"""Find Flux Kustomization CRs under clusters/atlas/flux-system."""
root = REPO_ROOT / "clusters" / "atlas" / "flux-system"
items: list[FluxKustomization] = []
for file in sorted(root.rglob("*.yaml")):
raw = file.read_text()
for doc in _iter_docs(raw):
if doc.get("kind") != "Kustomization":
continue
api = str(doc.get("apiVersion") or "")
if not api.startswith("kustomize.toolkit.fluxcd.io/"):
continue
name, _ = _meta(doc)
spec = doc.get("spec") or {}
path = spec.get("path")
if not isinstance(path, str) or not path.strip():
continue
items.append(
FluxKustomization(
name=name,
path=path.strip().lstrip("./"),
target_namespace=spec.get("targetNamespace"),
)
)
return sorted(items, key=lambda k: k.name)
def _safe_string_scan_for_hosts(value: Any) -> set[str]:
"""Best-effort host scan from HelmRelease values without chart rendering."""
hosts: set[str] = set()
if isinstance(value, str):
for m in re.finditer(r"(?i)([a-z0-9-]+(?:\.[a-z0-9-]+)+)", value):
host = m.group(1).lower()
if host.endswith("bstein.dev"):
hosts.add(host)
return hosts
if isinstance(value, list):
for item in value:
hosts |= _safe_string_scan_for_hosts(item)
return hosts
if isinstance(value, dict):
for item in value.values():
hosts |= _safe_string_scan_for_hosts(item)
return hosts
return hosts
def _service_ports(svc: dict[str, Any]) -> list[dict[str, Any]]:
spec = svc.get("spec") or {}
out: list[dict[str, Any]] = []
for p in spec.get("ports") or []:
if not isinstance(p, dict):
continue
out.append(
{
"name": p.get("name"),
"port": p.get("port"),
"targetPort": p.get("targetPort"),
"protocol": p.get("protocol", "TCP"),
}
)
return out
def _workload_labels(doc: dict[str, Any]) -> dict[str, str]:
tpl = (doc.get("spec") or {}).get("template") or {}
md = tpl.get("metadata") or {}
labels = md.get("labels") or {}
return {str(k): str(v) for k, v in labels.items()} if isinstance(labels, dict) else {}
def _service_selector(doc: dict[str, Any]) -> dict[str, str]:
spec = doc.get("spec") or {}
sel = spec.get("selector") or {}
return {str(k): str(v) for k, v in sel.items()} if isinstance(sel, dict) else {}
def _selector_matches(selector: dict[str, str], labels: dict[str, str]) -> bool:
if not selector:
return False
return all(labels.get(k) == v for k, v in selector.items())
def _sanitize_node_id(text: str) -> str:
return re.sub(r"[^a-zA-Z0-9_]", "_", text)
def extract_catalog(
rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]],
) -> tuple[dict[str, Any], dict[str, Any], str]:
"""Build knowledge catalog + mermaid diagram from rendered docs."""
# Index workloads and services for mapping.
workloads: dict[tuple[str, str], dict[str, Any]] = {}
services: dict[tuple[str, str], dict[str, Any]] = {}
ingresses: list[dict[str, Any]] = []
ingressroutes: list[dict[str, Any]] = []
helmrelease_hosts: dict[str, list[str]] = {}
for src, docs in rendered:
for doc in docs:
kind = doc.get("kind")
if kind not in INCLUDED_KINDS:
continue
if kind == "Secret":
continue
name, namespace = _meta(doc)
if _is_namespaced(doc) and not namespace and src.target_namespace:
namespace = src.target_namespace
doc = dict(doc)
doc.setdefault("metadata", {})["namespace"] = namespace
if kind in ("Deployment", "StatefulSet", "DaemonSet"):
workloads[(namespace or "", name)] = {
"kind": kind,
"namespace": namespace or "",
"name": name,
"labels": _workload_labels(doc),
"serviceAccountName": ((doc.get("spec") or {}).get("template") or {})
.get("spec", {})
.get("serviceAccountName"),
"nodeSelector": ((doc.get("spec") or {}).get("template") or {})
.get("spec", {})
.get("nodeSelector", {}),
"images": sorted(
{
c.get("image")
for c in (
(((doc.get("spec") or {}).get("template") or {}).get("spec") or {}).get(
"containers"
)
or []
)
if isinstance(c, dict) and c.get("image")
}
),
}
elif kind == "Service":
services[(namespace or "", name)] = {
"namespace": namespace or "",
"name": name,
"type": (doc.get("spec") or {}).get("type", "ClusterIP"),
"selector": _service_selector(doc),
"ports": _service_ports(doc),
}
elif kind == "Ingress":
ingresses.append({"source": src.name, "doc": doc})
elif kind == "IngressRoute":
ingressroutes.append({"source": src.name, "doc": doc})
elif kind == "HelmRelease":
spec = doc.get("spec") or {}
vals = spec.get("values") or {}
hosts = sorted(_safe_string_scan_for_hosts(vals))
if hosts:
helmrelease_hosts[f"{src.name}:{namespace or ''}/{name}"] = hosts
# Map services to workloads.
service_to_workloads: dict[tuple[str, str], list[dict[str, str]]] = {}
for (ns, svc_name), svc in services.items():
selector = svc.get("selector") or {}
matches: list[dict[str, str]] = []
for (w_ns, w_name), w in workloads.items():
if w_ns != ns:
continue
if _selector_matches(selector, w.get("labels") or {}):
matches.append({"kind": w["kind"], "name": w_name})
service_to_workloads[(ns, svc_name)] = sorted(matches, key=lambda m: (m["kind"], m["name"]))
# Extract HTTP endpoints.
endpoints: list[dict[str, Any]] = []
def add_endpoint(
*,
host: str,
path: str,
namespace: str,
service: str,
port: Any,
source: str,
kind: str,
obj_name: str,
):
wk = service_to_workloads.get((namespace, service), [])
endpoints.append(
{
"host": host,
"path": path,
"backend": {
"namespace": namespace,
"service": service,
"port": port,
"workloads": wk,
},
"via": {"kind": kind, "name": obj_name, "source": source},
}
)
for item in ingresses:
doc = item["doc"]
source = item["source"]
name, namespace = _meta(doc)
namespace = namespace or ""
spec = doc.get("spec") or {}
for rule in spec.get("rules") or []:
if not isinstance(rule, dict):
continue
host = (rule.get("host") or "").strip()
http = rule.get("http") or {}
for p in http.get("paths") or []:
if not isinstance(p, dict):
continue
backend = (p.get("backend") or {}).get("service") or {}
svc_name = backend.get("name")
svc_port = (backend.get("port") or {}).get("number") or (backend.get("port") or {}).get("name")
if not host or not svc_name:
continue
add_endpoint(
host=host,
path=p.get("path") or "/",
namespace=namespace,
service=svc_name,
port=svc_port,
source=source,
kind="Ingress",
obj_name=name,
)
host_re = re.compile(r"Host\(`([^`]+)`\)")
pathprefix_re = re.compile(r"PathPrefix\(`([^`]+)`\)")
for item in ingressroutes:
doc = item["doc"]
source = item["source"]
name, namespace = _meta(doc)
namespace = namespace or ""
spec = doc.get("spec") or {}
for route in spec.get("routes") or []:
if not isinstance(route, dict):
continue
match = route.get("match") or ""
hosts = host_re.findall(match)
pathprefixes = pathprefix_re.findall(match) or ["/"]
for svc in route.get("services") or []:
if not isinstance(svc, dict):
continue
svc_name = svc.get("name")
svc_port = svc.get("port")
if not svc_name:
continue
for host in hosts:
for pp in pathprefixes:
add_endpoint(
host=host,
path=pp,
namespace=namespace,
service=svc_name,
port=svc_port,
source=source,
kind="IngressRoute",
obj_name=name,
)
endpoints = sorted(
endpoints,
key=lambda e: (
e["host"],
e["path"],
e["backend"]["namespace"],
e["backend"]["service"],
),
)
catalog = {
"cluster": "atlas",
"sources": [
{"name": k.name, "path": k.path, "targetNamespace": k.target_namespace}
for k, _ in rendered
],
"workloads": sorted(
list(workloads.values()),
key=lambda w: (w["namespace"], w["kind"], w["name"]),
),
"services": sorted(
list(services.values()),
key=lambda s: (s["namespace"], s["name"]),
),
"http_endpoints": endpoints,
"helmrelease_host_hints": {k: v for k, v in sorted(helmrelease_hosts.items())},
}
# Mermaid diagram: host -> service -> workload (grouped by namespace).
ns_nodes: dict[str, list[str]] = {}
lines: list[str] = ["flowchart LR"]
edges: set[tuple[str, str]] = set()
def ensure_ns_node(ns: str, node_id: str):
ns_nodes.setdefault(ns, [])
if node_id not in ns_nodes[ns]:
ns_nodes[ns].append(node_id)
host_nodes: dict[str, str] = {}
for ep in endpoints:
host = ep["host"]
host_id = host_nodes.get(host)
if not host_id:
host_id = f"host_{_sanitize_node_id(host)}"
host_nodes[host] = host_id
lines.append(f' {host_id}["{host}"]')
ns = ep["backend"]["namespace"]
svc = ep["backend"]["service"]
svc_id = f"svc_{_sanitize_node_id(ns)}_{_sanitize_node_id(svc)}"
if svc_id not in ns_nodes.get(ns, []):
lines.append(f' {svc_id}["{ns}/{svc} (Service)"]')
ensure_ns_node(ns, svc_id)
if (host_id, svc_id) not in edges:
edges.add((host_id, svc_id))
lines.append(f" {host_id} --> {svc_id}")
for w in ep["backend"]["workloads"]:
w_id = f"wl_{_sanitize_node_id(ns)}_{_sanitize_node_id(w['name'])}"
if w_id not in ns_nodes.get(ns, []):
lines.append(f' {w_id}["{ns}/{w["name"]} ({w["kind"]})"]')
ensure_ns_node(ns, w_id)
if (svc_id, w_id) not in edges:
edges.add((svc_id, w_id))
lines.append(f" {svc_id} --> {w_id}")
# Wrap namespace subgraphs at the end for stability (sorted namespaces).
if ns_nodes:
lines.append("")
for ns in sorted(ns_nodes.keys()):
lines.append(f" subgraph { _sanitize_node_id(ns) }[{ns}]")
for node_id in ns_nodes[ns]:
lines.append(f" {node_id}")
lines.append(" end")
diagram = "\n".join(lines).rstrip() + "\n"
summary = {
"counts": {
"workloads": len(workloads),
"services": len(services),
"http_endpoints": len(endpoints),
"helmrelease_host_hints": sum(len(v) for v in helmrelease_hosts.values()),
}
}
return catalog, summary, diagram
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--out", default="knowledge", help="Output base directory (default: knowledge/)")
ap.add_argument(
"--write",
action="store_true",
help="Write generated files (otherwise just print a summary).",
)
ap.add_argument(
"--sync-atlasbot",
action="store_true",
help="Mirror rendered knowledge into services/atlasbot/knowledge for atlasbot.",
)
args = ap.parse_args()
out_dir = REPO_ROOT / args.out
flux = find_flux_kustomizations()
if not flux:
print("No Flux Kustomizations found under clusters/atlas/flux-system.", file=sys.stderr)
return 2
rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]] = []
for k in flux:
path = REPO_ROOT / k.path
if not path.exists():
continue
raw = kustomize_build(path)
docs = [d for d in _iter_docs(raw) if d.get("kind") != "Secret"]
rendered.append((k, docs))
rendered = sorted(rendered, key=lambda item: item[0].name)
catalog, summary, diagram = extract_catalog(rendered)
if not args.write:
print(json.dumps(summary, indent=2, sort_keys=True))
return 0
(out_dir / "catalog").mkdir(parents=True, exist_ok=True)
(out_dir / "diagrams").mkdir(parents=True, exist_ok=True)
catalog_path = out_dir / "catalog" / "atlas.yaml"
catalog_json_path = out_dir / "catalog" / "atlas.json"
summary_path = out_dir / "catalog" / "atlas-summary.json"
diagram_path = out_dir / "diagrams" / "atlas-http.mmd"
runbooks_json_path = out_dir / "catalog" / "runbooks.json"
metrics_json_path = out_dir / "catalog" / "metrics.json"
catalog_rel = catalog_path.relative_to(REPO_ROOT).as_posix()
catalog_path.write_text(
f"# {catalog_rel}\n"
"# Generated by scripts/knowledge_render_atlas.py (do not edit by hand)\n"
+ yaml.safe_dump(catalog, sort_keys=False),
encoding="utf-8",
)
catalog_json_path.write_text(json.dumps(catalog, indent=2, sort_keys=False) + "\n", encoding="utf-8")
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
diagram_path.write_text(diagram, encoding="utf-8")
# Render runbooks into JSON for lightweight, dependency-free consumption in-cluster.
runbook_dirs = [
out_dir / "runbooks",
out_dir / "software",
]
runbooks: list[dict[str, Any]] = []
for runbooks_dir in runbook_dirs:
if not runbooks_dir.exists():
continue
for md_file in sorted(runbooks_dir.glob("*.md")):
raw = md_file.read_text(encoding="utf-8")
fm: dict[str, Any] = {}
body = raw
if raw.startswith("---\n"):
try:
_, rest = raw.split("---\n", 1)
fm_raw, body = rest.split("\n---\n", 1)
fm = yaml.safe_load(fm_raw) or {}
except Exception:
fm = {}
body = raw
runbooks.append(
{
"path": str(md_file.relative_to(out_dir)),
"title": fm.get("title") or md_file.stem,
"tags": fm.get("tags") or [],
"entrypoints": fm.get("entrypoints") or [],
"source_paths": fm.get("source_paths") or [],
"body": body.strip(),
}
)
runbooks_json_path.write_text(json.dumps(runbooks, indent=2, sort_keys=False) + "\n", encoding="utf-8")
metrics_index = _extract_metrics_index(DASHBOARD_DIR)
metrics_json_path.write_text(
json.dumps(metrics_index, indent=2, sort_keys=False) + "\n", encoding="utf-8"
)
print(f"Wrote {catalog_path.relative_to(REPO_ROOT)}")
print(f"Wrote {catalog_json_path.relative_to(REPO_ROOT)}")
print(f"Wrote {summary_path.relative_to(REPO_ROOT)}")
print(f"Wrote {diagram_path.relative_to(REPO_ROOT)}")
print(f"Wrote {runbooks_json_path.relative_to(REPO_ROOT)}")
print(f"Wrote {metrics_json_path.relative_to(REPO_ROOT)}")
if args.sync_atlasbot:
atlasbot_dir = REPO_ROOT / "services" / "atlasbot" / "knowledge"
_sync_tree(out_dir, atlasbot_dir)
print(f"Synced {out_dir.relative_to(REPO_ROOT)} -> {atlasbot_dir.relative_to(REPO_ROOT)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())