#!/usr/bin/env python3 """Render Atlas knowledge artifacts from Flux + kustomize manifests. Outputs (committed to git for stable diffs + RAG): - knowledge/catalog/*.yaml - knowledge/diagrams/*.mmd This is intentionally conservative: - never includes Secret objects - never includes secret values - keeps output deterministic (sorted) """ from __future__ import annotations import argparse import json import re import subprocess import sys from dataclasses import dataclass from pathlib import Path from typing import Any, Iterable import yaml REPO_ROOT = Path(__file__).resolve().parents[1] CLUSTER_SCOPED_KINDS = { "Namespace", "Node", "CustomResourceDefinition", "ClusterRole", "ClusterRoleBinding", "StorageClass", "PersistentVolume", "MutatingWebhookConfiguration", "ValidatingWebhookConfiguration", "APIService", } INCLUDED_KINDS = { "Namespace", "Deployment", "StatefulSet", "DaemonSet", "Service", "Ingress", "IngressRoute", # traefik "HelmRelease", # only to harvest ingress hostnames from values } def _run(cmd: list[str], *, cwd: Path) -> str: res = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, check=False) if res.returncode != 0: raise RuntimeError( f"Command failed ({res.returncode}): {' '.join(cmd)}\n{res.stderr.strip()}" ) return res.stdout def kustomize_build(path: Path) -> str: rel = path.relative_to(REPO_ROOT) try: return _run(["kubectl", "kustomize", str(rel)], cwd=REPO_ROOT) except Exception as e: msg = str(e) if "is not in or below" in msg: # Repo uses configMapGenerators that reference ../../scripts/*.py. # Kustomize load restriction must be disabled for a full render. try: return _run( ["kubectl", "kustomize", "--load-restrictor=LoadRestrictionsNone", str(rel)], cwd=REPO_ROOT, ) except Exception: pass return _run(["kustomize", "build", "--load-restrictor=LoadRestrictionsNone", str(rel)], cwd=REPO_ROOT) def _iter_docs(raw_yaml: str) -> Iterable[dict[str, Any]]: for doc in yaml.safe_load_all(raw_yaml): if not isinstance(doc, dict): continue kind = doc.get("kind") if kind == "List" and isinstance(doc.get("items"), list): for item in doc["items"]: if isinstance(item, dict): yield item continue if kind: yield doc def _meta(doc: dict[str, Any]) -> tuple[str, str | None]: md = doc.get("metadata") or {} name = md.get("name") or "" namespace = md.get("namespace") return name, namespace def _is_namespaced(doc: dict[str, Any]) -> bool: kind = doc.get("kind") or "" return kind not in CLUSTER_SCOPED_KINDS @dataclass(frozen=True) class FluxKustomization: name: str path: str target_namespace: str | None def find_flux_kustomizations() -> list[FluxKustomization]: """Find Flux Kustomization CRs under clusters/atlas/flux-system.""" root = REPO_ROOT / "clusters" / "atlas" / "flux-system" items: list[FluxKustomization] = [] for file in sorted(root.rglob("*.yaml")): raw = file.read_text() for doc in _iter_docs(raw): if doc.get("kind") != "Kustomization": continue api = str(doc.get("apiVersion") or "") if not api.startswith("kustomize.toolkit.fluxcd.io/"): continue name, _ = _meta(doc) spec = doc.get("spec") or {} path = spec.get("path") if not isinstance(path, str) or not path.strip(): continue items.append( FluxKustomization( name=name, path=path.strip().lstrip("./"), target_namespace=spec.get("targetNamespace"), ) ) return sorted(items, key=lambda k: k.name) def _safe_string_scan_for_hosts(value: Any) -> set[str]: """Best-effort host scan from HelmRelease values without chart rendering.""" hosts: set[str] = set() if isinstance(value, str): for m in re.finditer(r"(?i)([a-z0-9-]+(?:\.[a-z0-9-]+)+)", value): host = m.group(1).lower() if host.endswith("bstein.dev"): hosts.add(host) return hosts if isinstance(value, list): for item in value: hosts |= _safe_string_scan_for_hosts(item) return hosts if isinstance(value, dict): for item in value.values(): hosts |= _safe_string_scan_for_hosts(item) return hosts return hosts def _service_ports(svc: dict[str, Any]) -> list[dict[str, Any]]: spec = svc.get("spec") or {} out: list[dict[str, Any]] = [] for p in spec.get("ports") or []: if not isinstance(p, dict): continue out.append( { "name": p.get("name"), "port": p.get("port"), "targetPort": p.get("targetPort"), "protocol": p.get("protocol", "TCP"), } ) return out def _workload_labels(doc: dict[str, Any]) -> dict[str, str]: tpl = (doc.get("spec") or {}).get("template") or {} md = tpl.get("metadata") or {} labels = md.get("labels") or {} return {str(k): str(v) for k, v in labels.items()} if isinstance(labels, dict) else {} def _service_selector(doc: dict[str, Any]) -> dict[str, str]: spec = doc.get("spec") or {} sel = spec.get("selector") or {} return {str(k): str(v) for k, v in sel.items()} if isinstance(sel, dict) else {} def _selector_matches(selector: dict[str, str], labels: dict[str, str]) -> bool: if not selector: return False return all(labels.get(k) == v for k, v in selector.items()) def _sanitize_node_id(text: str) -> str: return re.sub(r"[^a-zA-Z0-9_]", "_", text) def extract_catalog( rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]], ) -> tuple[dict[str, Any], dict[str, Any], str]: """Build knowledge catalog + mermaid diagram from rendered docs.""" # Index workloads and services for mapping. workloads: dict[tuple[str, str], dict[str, Any]] = {} services: dict[tuple[str, str], dict[str, Any]] = {} ingresses: list[dict[str, Any]] = [] ingressroutes: list[dict[str, Any]] = [] helmrelease_hosts: dict[str, list[str]] = {} for src, docs in rendered: for doc in docs: kind = doc.get("kind") if kind not in INCLUDED_KINDS: continue if kind == "Secret": continue name, namespace = _meta(doc) if _is_namespaced(doc) and not namespace and src.target_namespace: namespace = src.target_namespace doc = dict(doc) doc.setdefault("metadata", {})["namespace"] = namespace if kind in ("Deployment", "StatefulSet", "DaemonSet"): workloads[(namespace or "", name)] = { "kind": kind, "namespace": namespace or "", "name": name, "labels": _workload_labels(doc), "serviceAccountName": ((doc.get("spec") or {}).get("template") or {}) .get("spec", {}) .get("serviceAccountName"), "nodeSelector": ((doc.get("spec") or {}).get("template") or {}) .get("spec", {}) .get("nodeSelector", {}), "images": sorted( { c.get("image") for c in ( (((doc.get("spec") or {}).get("template") or {}).get("spec") or {}).get( "containers" ) or [] ) if isinstance(c, dict) and c.get("image") } ), } elif kind == "Service": services[(namespace or "", name)] = { "namespace": namespace or "", "name": name, "type": (doc.get("spec") or {}).get("type", "ClusterIP"), "selector": _service_selector(doc), "ports": _service_ports(doc), } elif kind == "Ingress": ingresses.append({"source": src.name, "doc": doc}) elif kind == "IngressRoute": ingressroutes.append({"source": src.name, "doc": doc}) elif kind == "HelmRelease": spec = doc.get("spec") or {} vals = spec.get("values") or {} hosts = sorted(_safe_string_scan_for_hosts(vals)) if hosts: helmrelease_hosts[f"{src.name}:{namespace or ''}/{name}"] = hosts # Map services to workloads. service_to_workloads: dict[tuple[str, str], list[dict[str, str]]] = {} for (ns, svc_name), svc in services.items(): selector = svc.get("selector") or {} matches: list[dict[str, str]] = [] for (w_ns, w_name), w in workloads.items(): if w_ns != ns: continue if _selector_matches(selector, w.get("labels") or {}): matches.append({"kind": w["kind"], "name": w_name}) service_to_workloads[(ns, svc_name)] = sorted(matches, key=lambda m: (m["kind"], m["name"])) # Extract HTTP endpoints. endpoints: list[dict[str, Any]] = [] def add_endpoint( *, host: str, path: str, namespace: str, service: str, port: Any, source: str, kind: str, obj_name: str, ): wk = service_to_workloads.get((namespace, service), []) endpoints.append( { "host": host, "path": path, "backend": { "namespace": namespace, "service": service, "port": port, "workloads": wk, }, "via": {"kind": kind, "name": obj_name, "source": source}, } ) for item in ingresses: doc = item["doc"] source = item["source"] name, namespace = _meta(doc) namespace = namespace or "" spec = doc.get("spec") or {} for rule in spec.get("rules") or []: if not isinstance(rule, dict): continue host = (rule.get("host") or "").strip() http = rule.get("http") or {} for p in http.get("paths") or []: if not isinstance(p, dict): continue backend = (p.get("backend") or {}).get("service") or {} svc_name = backend.get("name") svc_port = (backend.get("port") or {}).get("number") or (backend.get("port") or {}).get("name") if not host or not svc_name: continue add_endpoint( host=host, path=p.get("path") or "/", namespace=namespace, service=svc_name, port=svc_port, source=source, kind="Ingress", obj_name=name, ) host_re = re.compile(r"Host\(`([^`]+)`\)") pathprefix_re = re.compile(r"PathPrefix\(`([^`]+)`\)") for item in ingressroutes: doc = item["doc"] source = item["source"] name, namespace = _meta(doc) namespace = namespace or "" spec = doc.get("spec") or {} for route in spec.get("routes") or []: if not isinstance(route, dict): continue match = route.get("match") or "" hosts = host_re.findall(match) pathprefixes = pathprefix_re.findall(match) or ["/"] for svc in route.get("services") or []: if not isinstance(svc, dict): continue svc_name = svc.get("name") svc_port = svc.get("port") if not svc_name: continue for host in hosts: for pp in pathprefixes: add_endpoint( host=host, path=pp, namespace=namespace, service=svc_name, port=svc_port, source=source, kind="IngressRoute", obj_name=name, ) endpoints = sorted( endpoints, key=lambda e: ( e["host"], e["path"], e["backend"]["namespace"], e["backend"]["service"], ), ) catalog = { "cluster": "atlas", "sources": [ {"name": k.name, "path": k.path, "targetNamespace": k.target_namespace} for k, _ in rendered ], "workloads": sorted( list(workloads.values()), key=lambda w: (w["namespace"], w["kind"], w["name"]), ), "services": sorted( list(services.values()), key=lambda s: (s["namespace"], s["name"]), ), "http_endpoints": endpoints, "helmrelease_host_hints": {k: v for k, v in sorted(helmrelease_hosts.items())}, } # Mermaid diagram: host -> service -> workload (grouped by namespace). ns_nodes: dict[str, list[str]] = {} lines: list[str] = ["flowchart LR"] edges: set[tuple[str, str]] = set() def ensure_ns_node(ns: str, node_id: str): ns_nodes.setdefault(ns, []) if node_id not in ns_nodes[ns]: ns_nodes[ns].append(node_id) host_nodes: dict[str, str] = {} for ep in endpoints: host = ep["host"] host_id = host_nodes.get(host) if not host_id: host_id = f"host_{_sanitize_node_id(host)}" host_nodes[host] = host_id lines.append(f' {host_id}["{host}"]') ns = ep["backend"]["namespace"] svc = ep["backend"]["service"] svc_id = f"svc_{_sanitize_node_id(ns)}_{_sanitize_node_id(svc)}" if svc_id not in ns_nodes.get(ns, []): lines.append(f' {svc_id}["{ns}/{svc} (Service)"]') ensure_ns_node(ns, svc_id) if (host_id, svc_id) not in edges: edges.add((host_id, svc_id)) lines.append(f" {host_id} --> {svc_id}") for w in ep["backend"]["workloads"]: w_id = f"wl_{_sanitize_node_id(ns)}_{_sanitize_node_id(w['name'])}" if w_id not in ns_nodes.get(ns, []): lines.append(f' {w_id}["{ns}/{w["name"]} ({w["kind"]})"]') ensure_ns_node(ns, w_id) if (svc_id, w_id) not in edges: edges.add((svc_id, w_id)) lines.append(f" {svc_id} --> {w_id}") # Wrap namespace subgraphs at the end for stability (sorted namespaces). if ns_nodes: lines.append("") for ns in sorted(ns_nodes.keys()): lines.append(f" subgraph { _sanitize_node_id(ns) }[{ns}]") for node_id in ns_nodes[ns]: lines.append(f" {node_id}") lines.append(" end") diagram = "\n".join(lines).rstrip() + "\n" summary = { "counts": { "workloads": len(workloads), "services": len(services), "http_endpoints": len(endpoints), "helmrelease_host_hints": sum(len(v) for v in helmrelease_hosts.values()), } } return catalog, summary, diagram def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--out", default="knowledge", help="Output base directory (default: knowledge/)") ap.add_argument( "--write", action="store_true", help="Write generated files (otherwise just print a summary).", ) args = ap.parse_args() out_dir = REPO_ROOT / args.out flux = find_flux_kustomizations() if not flux: print("No Flux Kustomizations found under clusters/atlas/flux-system.", file=sys.stderr) return 2 rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]] = [] for k in flux: path = REPO_ROOT / k.path if not path.exists(): continue raw = kustomize_build(path) docs = [d for d in _iter_docs(raw) if d.get("kind") != "Secret"] rendered.append((k, docs)) rendered = sorted(rendered, key=lambda item: item[0].name) catalog, summary, diagram = extract_catalog(rendered) if not args.write: print(json.dumps(summary, indent=2, sort_keys=True)) return 0 (out_dir / "catalog").mkdir(parents=True, exist_ok=True) (out_dir / "diagrams").mkdir(parents=True, exist_ok=True) catalog_path = out_dir / "catalog" / "atlas.yaml" catalog_json_path = out_dir / "catalog" / "atlas.json" summary_path = out_dir / "catalog" / "atlas-summary.json" diagram_path = out_dir / "diagrams" / "atlas-http.mmd" runbooks_json_path = out_dir / "catalog" / "runbooks.json" catalog_rel = catalog_path.relative_to(REPO_ROOT).as_posix() catalog_path.write_text( f"# {catalog_rel}\n" "# Generated by scripts/knowledge_render_atlas.py (do not edit by hand)\n" + yaml.safe_dump(catalog, sort_keys=False), encoding="utf-8", ) catalog_json_path.write_text(json.dumps(catalog, indent=2, sort_keys=False) + "\n", encoding="utf-8") summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") diagram_path.write_text(diagram, encoding="utf-8") # Render runbooks into JSON for lightweight, dependency-free consumption in-cluster. runbooks_dir = out_dir / "runbooks" runbooks: list[dict[str, Any]] = [] if runbooks_dir.exists(): for md_file in sorted(runbooks_dir.glob("*.md")): raw = md_file.read_text(encoding="utf-8") fm: dict[str, Any] = {} body = raw if raw.startswith("---\n"): try: _, rest = raw.split("---\n", 1) fm_raw, body = rest.split("\n---\n", 1) fm = yaml.safe_load(fm_raw) or {} except Exception: fm = {} body = raw runbooks.append( { "path": str(md_file.relative_to(out_dir)), "title": fm.get("title") or md_file.stem, "tags": fm.get("tags") or [], "entrypoints": fm.get("entrypoints") or [], "source_paths": fm.get("source_paths") or [], "body": body.strip(), } ) runbooks_json_path.write_text(json.dumps(runbooks, indent=2, sort_keys=False) + "\n", encoding="utf-8") print(f"Wrote {catalog_path.relative_to(REPO_ROOT)}") print(f"Wrote {catalog_json_path.relative_to(REPO_ROOT)}") print(f"Wrote {summary_path.relative_to(REPO_ROOT)}") print(f"Wrote {diagram_path.relative_to(REPO_ROOT)}") print(f"Wrote {runbooks_json_path.relative_to(REPO_ROOT)}") return 0 if __name__ == "__main__": raise SystemExit(main())