titan-iac/scripts/knowledge_render_atlas.py

557 lines
19 KiB
Python

#!/usr/bin/env python3
"""Render Atlas knowledge artifacts from Flux + kustomize manifests.
Outputs (committed to git for stable diffs + RAG):
- knowledge/catalog/*.yaml
- knowledge/diagrams/*.mmd
This is intentionally conservative:
- never includes Secret objects
- never includes secret values
- keeps output deterministic (sorted)
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable
import yaml
REPO_ROOT = Path(__file__).resolve().parents[1]
CLUSTER_SCOPED_KINDS = {
"Namespace",
"Node",
"CustomResourceDefinition",
"ClusterRole",
"ClusterRoleBinding",
"StorageClass",
"PersistentVolume",
"MutatingWebhookConfiguration",
"ValidatingWebhookConfiguration",
"APIService",
}
INCLUDED_KINDS = {
"Namespace",
"Deployment",
"StatefulSet",
"DaemonSet",
"Service",
"Ingress",
"IngressRoute", # traefik
"HelmRelease", # only to harvest ingress hostnames from values
}
def _run(cmd: list[str], *, cwd: Path) -> str:
res = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, check=False)
if res.returncode != 0:
raise RuntimeError(
f"Command failed ({res.returncode}): {' '.join(cmd)}\n{res.stderr.strip()}"
)
return res.stdout
def kustomize_build(path: Path) -> str:
rel = path.relative_to(REPO_ROOT)
try:
return _run(["kubectl", "kustomize", str(rel)], cwd=REPO_ROOT)
except Exception as e:
msg = str(e)
if "is not in or below" in msg:
# Repo uses configMapGenerators that reference ../../scripts/*.py.
# Kustomize load restriction must be disabled for a full render.
try:
return _run(
["kubectl", "kustomize", "--load-restrictor=LoadRestrictionsNone", str(rel)],
cwd=REPO_ROOT,
)
except Exception:
pass
return _run(["kustomize", "build", "--load-restrictor=LoadRestrictionsNone", str(rel)], cwd=REPO_ROOT)
def _iter_docs(raw_yaml: str) -> Iterable[dict[str, Any]]:
for doc in yaml.safe_load_all(raw_yaml):
if not isinstance(doc, dict):
continue
kind = doc.get("kind")
if kind == "List" and isinstance(doc.get("items"), list):
for item in doc["items"]:
if isinstance(item, dict):
yield item
continue
if kind:
yield doc
def _meta(doc: dict[str, Any]) -> tuple[str, str | None]:
md = doc.get("metadata") or {}
name = md.get("name") or ""
namespace = md.get("namespace")
return name, namespace
def _is_namespaced(doc: dict[str, Any]) -> bool:
kind = doc.get("kind") or ""
return kind not in CLUSTER_SCOPED_KINDS
@dataclass(frozen=True)
class FluxKustomization:
name: str
path: str
target_namespace: str | None
def find_flux_kustomizations() -> list[FluxKustomization]:
"""Find Flux Kustomization CRs under clusters/atlas/flux-system."""
root = REPO_ROOT / "clusters" / "atlas" / "flux-system"
items: list[FluxKustomization] = []
for file in sorted(root.rglob("*.yaml")):
raw = file.read_text()
for doc in _iter_docs(raw):
if doc.get("kind") != "Kustomization":
continue
api = str(doc.get("apiVersion") or "")
if not api.startswith("kustomize.toolkit.fluxcd.io/"):
continue
name, _ = _meta(doc)
spec = doc.get("spec") or {}
path = spec.get("path")
if not isinstance(path, str) or not path.strip():
continue
items.append(
FluxKustomization(
name=name,
path=path.strip().lstrip("./"),
target_namespace=spec.get("targetNamespace"),
)
)
return sorted(items, key=lambda k: k.name)
def _safe_string_scan_for_hosts(value: Any) -> set[str]:
"""Best-effort host scan from HelmRelease values without chart rendering."""
hosts: set[str] = set()
if isinstance(value, str):
for m in re.finditer(r"(?i)([a-z0-9-]+(?:\.[a-z0-9-]+)+)", value):
host = m.group(1).lower()
if host.endswith("bstein.dev"):
hosts.add(host)
return hosts
if isinstance(value, list):
for item in value:
hosts |= _safe_string_scan_for_hosts(item)
return hosts
if isinstance(value, dict):
for item in value.values():
hosts |= _safe_string_scan_for_hosts(item)
return hosts
return hosts
def _service_ports(svc: dict[str, Any]) -> list[dict[str, Any]]:
spec = svc.get("spec") or {}
out: list[dict[str, Any]] = []
for p in spec.get("ports") or []:
if not isinstance(p, dict):
continue
out.append(
{
"name": p.get("name"),
"port": p.get("port"),
"targetPort": p.get("targetPort"),
"protocol": p.get("protocol", "TCP"),
}
)
return out
def _workload_labels(doc: dict[str, Any]) -> dict[str, str]:
tpl = (doc.get("spec") or {}).get("template") or {}
md = tpl.get("metadata") or {}
labels = md.get("labels") or {}
return {str(k): str(v) for k, v in labels.items()} if isinstance(labels, dict) else {}
def _service_selector(doc: dict[str, Any]) -> dict[str, str]:
spec = doc.get("spec") or {}
sel = spec.get("selector") or {}
return {str(k): str(v) for k, v in sel.items()} if isinstance(sel, dict) else {}
def _selector_matches(selector: dict[str, str], labels: dict[str, str]) -> bool:
if not selector:
return False
return all(labels.get(k) == v for k, v in selector.items())
def _sanitize_node_id(text: str) -> str:
return re.sub(r"[^a-zA-Z0-9_]", "_", text)
def extract_catalog(
rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]],
) -> tuple[dict[str, Any], dict[str, Any], str]:
"""Build knowledge catalog + mermaid diagram from rendered docs."""
# Index workloads and services for mapping.
workloads: dict[tuple[str, str], dict[str, Any]] = {}
services: dict[tuple[str, str], dict[str, Any]] = {}
ingresses: list[dict[str, Any]] = []
ingressroutes: list[dict[str, Any]] = []
helmrelease_hosts: dict[str, list[str]] = {}
for src, docs in rendered:
for doc in docs:
kind = doc.get("kind")
if kind not in INCLUDED_KINDS:
continue
if kind == "Secret":
continue
name, namespace = _meta(doc)
if _is_namespaced(doc) and not namespace and src.target_namespace:
namespace = src.target_namespace
doc = dict(doc)
doc.setdefault("metadata", {})["namespace"] = namespace
if kind in ("Deployment", "StatefulSet", "DaemonSet"):
workloads[(namespace or "", name)] = {
"kind": kind,
"namespace": namespace or "",
"name": name,
"labels": _workload_labels(doc),
"serviceAccountName": ((doc.get("spec") or {}).get("template") or {})
.get("spec", {})
.get("serviceAccountName"),
"nodeSelector": ((doc.get("spec") or {}).get("template") or {})
.get("spec", {})
.get("nodeSelector", {}),
"images": sorted(
{
c.get("image")
for c in (
(((doc.get("spec") or {}).get("template") or {}).get("spec") or {}).get(
"containers"
)
or []
)
if isinstance(c, dict) and c.get("image")
}
),
}
elif kind == "Service":
services[(namespace or "", name)] = {
"namespace": namespace or "",
"name": name,
"type": (doc.get("spec") or {}).get("type", "ClusterIP"),
"selector": _service_selector(doc),
"ports": _service_ports(doc),
}
elif kind == "Ingress":
ingresses.append({"source": src.name, "doc": doc})
elif kind == "IngressRoute":
ingressroutes.append({"source": src.name, "doc": doc})
elif kind == "HelmRelease":
spec = doc.get("spec") or {}
vals = spec.get("values") or {}
hosts = sorted(_safe_string_scan_for_hosts(vals))
if hosts:
helmrelease_hosts[f"{src.name}:{namespace or ''}/{name}"] = hosts
# Map services to workloads.
service_to_workloads: dict[tuple[str, str], list[dict[str, str]]] = {}
for (ns, svc_name), svc in services.items():
selector = svc.get("selector") or {}
matches: list[dict[str, str]] = []
for (w_ns, w_name), w in workloads.items():
if w_ns != ns:
continue
if _selector_matches(selector, w.get("labels") or {}):
matches.append({"kind": w["kind"], "name": w_name})
service_to_workloads[(ns, svc_name)] = sorted(matches, key=lambda m: (m["kind"], m["name"]))
# Extract HTTP endpoints.
endpoints: list[dict[str, Any]] = []
def add_endpoint(
*,
host: str,
path: str,
namespace: str,
service: str,
port: Any,
source: str,
kind: str,
obj_name: str,
):
wk = service_to_workloads.get((namespace, service), [])
endpoints.append(
{
"host": host,
"path": path,
"backend": {
"namespace": namespace,
"service": service,
"port": port,
"workloads": wk,
},
"via": {"kind": kind, "name": obj_name, "source": source},
}
)
for item in ingresses:
doc = item["doc"]
source = item["source"]
name, namespace = _meta(doc)
namespace = namespace or ""
spec = doc.get("spec") or {}
for rule in spec.get("rules") or []:
if not isinstance(rule, dict):
continue
host = (rule.get("host") or "").strip()
http = rule.get("http") or {}
for p in http.get("paths") or []:
if not isinstance(p, dict):
continue
backend = (p.get("backend") or {}).get("service") or {}
svc_name = backend.get("name")
svc_port = (backend.get("port") or {}).get("number") or (backend.get("port") or {}).get("name")
if not host or not svc_name:
continue
add_endpoint(
host=host,
path=p.get("path") or "/",
namespace=namespace,
service=svc_name,
port=svc_port,
source=source,
kind="Ingress",
obj_name=name,
)
host_re = re.compile(r"Host\(`([^`]+)`\)")
pathprefix_re = re.compile(r"PathPrefix\(`([^`]+)`\)")
for item in ingressroutes:
doc = item["doc"]
source = item["source"]
name, namespace = _meta(doc)
namespace = namespace or ""
spec = doc.get("spec") or {}
for route in spec.get("routes") or []:
if not isinstance(route, dict):
continue
match = route.get("match") or ""
hosts = host_re.findall(match)
pathprefixes = pathprefix_re.findall(match) or ["/"]
for svc in route.get("services") or []:
if not isinstance(svc, dict):
continue
svc_name = svc.get("name")
svc_port = svc.get("port")
if not svc_name:
continue
for host in hosts:
for pp in pathprefixes:
add_endpoint(
host=host,
path=pp,
namespace=namespace,
service=svc_name,
port=svc_port,
source=source,
kind="IngressRoute",
obj_name=name,
)
endpoints = sorted(
endpoints,
key=lambda e: (
e["host"],
e["path"],
e["backend"]["namespace"],
e["backend"]["service"],
),
)
catalog = {
"cluster": "atlas",
"sources": [
{"name": k.name, "path": k.path, "targetNamespace": k.target_namespace}
for k, _ in rendered
],
"workloads": sorted(
list(workloads.values()),
key=lambda w: (w["namespace"], w["kind"], w["name"]),
),
"services": sorted(
list(services.values()),
key=lambda s: (s["namespace"], s["name"]),
),
"http_endpoints": endpoints,
"helmrelease_host_hints": {k: v for k, v in sorted(helmrelease_hosts.items())},
}
# Mermaid diagram: host -> service -> workload (grouped by namespace).
ns_nodes: dict[str, list[str]] = {}
lines: list[str] = ["flowchart LR"]
edges: set[tuple[str, str]] = set()
def ensure_ns_node(ns: str, node_id: str):
ns_nodes.setdefault(ns, [])
if node_id not in ns_nodes[ns]:
ns_nodes[ns].append(node_id)
host_nodes: dict[str, str] = {}
for ep in endpoints:
host = ep["host"]
host_id = host_nodes.get(host)
if not host_id:
host_id = f"host_{_sanitize_node_id(host)}"
host_nodes[host] = host_id
lines.append(f' {host_id}["{host}"]')
ns = ep["backend"]["namespace"]
svc = ep["backend"]["service"]
svc_id = f"svc_{_sanitize_node_id(ns)}_{_sanitize_node_id(svc)}"
if svc_id not in ns_nodes.get(ns, []):
lines.append(f' {svc_id}["{ns}/{svc} (Service)"]')
ensure_ns_node(ns, svc_id)
if (host_id, svc_id) not in edges:
edges.add((host_id, svc_id))
lines.append(f" {host_id} --> {svc_id}")
for w in ep["backend"]["workloads"]:
w_id = f"wl_{_sanitize_node_id(ns)}_{_sanitize_node_id(w['name'])}"
if w_id not in ns_nodes.get(ns, []):
lines.append(f' {w_id}["{ns}/{w["name"]} ({w["kind"]})"]')
ensure_ns_node(ns, w_id)
if (svc_id, w_id) not in edges:
edges.add((svc_id, w_id))
lines.append(f" {svc_id} --> {w_id}")
# Wrap namespace subgraphs at the end for stability (sorted namespaces).
if ns_nodes:
lines.append("")
for ns in sorted(ns_nodes.keys()):
lines.append(f" subgraph { _sanitize_node_id(ns) }[{ns}]")
for node_id in ns_nodes[ns]:
lines.append(f" {node_id}")
lines.append(" end")
diagram = "\n".join(lines).rstrip() + "\n"
summary = {
"counts": {
"workloads": len(workloads),
"services": len(services),
"http_endpoints": len(endpoints),
"helmrelease_host_hints": sum(len(v) for v in helmrelease_hosts.values()),
}
}
return catalog, summary, diagram
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--out", default="knowledge", help="Output base directory (default: knowledge/)")
ap.add_argument(
"--write",
action="store_true",
help="Write generated files (otherwise just print a summary).",
)
args = ap.parse_args()
out_dir = REPO_ROOT / args.out
flux = find_flux_kustomizations()
if not flux:
print("No Flux Kustomizations found under clusters/atlas/flux-system.", file=sys.stderr)
return 2
rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]] = []
for k in flux:
path = REPO_ROOT / k.path
if not path.exists():
continue
raw = kustomize_build(path)
docs = [d for d in _iter_docs(raw) if d.get("kind") != "Secret"]
rendered.append((k, docs))
rendered = sorted(rendered, key=lambda item: item[0].name)
catalog, summary, diagram = extract_catalog(rendered)
if not args.write:
print(json.dumps(summary, indent=2, sort_keys=True))
return 0
(out_dir / "catalog").mkdir(parents=True, exist_ok=True)
(out_dir / "diagrams").mkdir(parents=True, exist_ok=True)
catalog_path = out_dir / "catalog" / "atlas.yaml"
catalog_json_path = out_dir / "catalog" / "atlas.json"
summary_path = out_dir / "catalog" / "atlas-summary.json"
diagram_path = out_dir / "diagrams" / "atlas-http.mmd"
runbooks_json_path = out_dir / "catalog" / "runbooks.json"
catalog_rel = catalog_path.relative_to(REPO_ROOT).as_posix()
catalog_path.write_text(
f"# {catalog_rel}\n"
"# Generated by scripts/knowledge_render_atlas.py (do not edit by hand)\n"
+ yaml.safe_dump(catalog, sort_keys=False),
encoding="utf-8",
)
catalog_json_path.write_text(json.dumps(catalog, indent=2, sort_keys=False) + "\n", encoding="utf-8")
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
diagram_path.write_text(diagram, encoding="utf-8")
# Render runbooks into JSON for lightweight, dependency-free consumption in-cluster.
runbooks_dir = out_dir / "runbooks"
runbooks: list[dict[str, Any]] = []
if runbooks_dir.exists():
for md_file in sorted(runbooks_dir.glob("*.md")):
raw = md_file.read_text(encoding="utf-8")
fm: dict[str, Any] = {}
body = raw
if raw.startswith("---\n"):
try:
_, rest = raw.split("---\n", 1)
fm_raw, body = rest.split("\n---\n", 1)
fm = yaml.safe_load(fm_raw) or {}
except Exception:
fm = {}
body = raw
runbooks.append(
{
"path": str(md_file.relative_to(out_dir)),
"title": fm.get("title") or md_file.stem,
"tags": fm.get("tags") or [],
"entrypoints": fm.get("entrypoints") or [],
"source_paths": fm.get("source_paths") or [],
"body": body.strip(),
}
)
runbooks_json_path.write_text(json.dumps(runbooks, indent=2, sort_keys=False) + "\n", encoding="utf-8")
print(f"Wrote {catalog_path.relative_to(REPO_ROOT)}")
print(f"Wrote {catalog_json_path.relative_to(REPO_ROOT)}")
print(f"Wrote {summary_path.relative_to(REPO_ROOT)}")
print(f"Wrote {diagram_path.relative_to(REPO_ROOT)}")
print(f"Wrote {runbooks_json_path.relative_to(REPO_ROOT)}")
return 0
if __name__ == "__main__":
raise SystemExit(main())