#!/usr/bin/env python3 """Render Atlas knowledge artifacts from Flux + kustomize manifests. Outputs (committed to git for stable diffs + RAG): - knowledge/catalog/*.yaml - knowledge/diagrams/*.mmd This is intentionally conservative: - never includes Secret objects - never includes secret values - keeps output deterministic (sorted) """ from __future__ import annotations import argparse import json import re import subprocess import sys from dataclasses import dataclass from pathlib import Path import shutil from typing import Any, Iterable import yaml REPO_ROOT = Path(__file__).resolve().parents[1] DASHBOARD_DIR = REPO_ROOT / "services" / "monitoring" / "dashboards" CLUSTER_SCOPED_KINDS = { "Namespace", "Node", "CustomResourceDefinition", "ClusterRole", "ClusterRoleBinding", "StorageClass", "PersistentVolume", "MutatingWebhookConfiguration", "ValidatingWebhookConfiguration", "APIService", } INCLUDED_KINDS = { "Namespace", "Deployment", "StatefulSet", "DaemonSet", "Service", "Ingress", "IngressRoute", # traefik "HelmRelease", # only to harvest ingress hostnames from values } def _run(cmd: list[str], *, cwd: Path) -> str: res = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, check=False) if res.returncode != 0: raise RuntimeError( f"Command failed ({res.returncode}): {' '.join(cmd)}\n{res.stderr.strip()}" ) return res.stdout def _sync_tree(source: Path, dest: Path) -> None: if dest.exists(): shutil.rmtree(dest) shutil.copytree(source, dest) def _iter_dashboard_panels(dashboard: dict[str, Any]) -> Iterable[dict[str, Any]]: panels = dashboard.get("panels") if isinstance(dashboard.get("panels"), list) else [] for panel in panels: if not isinstance(panel, dict): continue if panel.get("type") == "row" and isinstance(panel.get("panels"), list): yield from _iter_dashboard_panels({"panels": panel.get("panels")}) continue yield panel def _extract_metrics_index(dashboard_dir: Path) -> list[dict[str, Any]]: index: list[dict[str, Any]] = [] for path in sorted(dashboard_dir.glob("*.json")): try: data = json.loads(path.read_text(encoding="utf-8")) except json.JSONDecodeError: continue if not isinstance(data, dict): continue dash_title = data.get("title") or path.stem dash_tags = data.get("tags") or [] for panel in _iter_dashboard_panels(data): targets = panel.get("targets") if not isinstance(targets, list): continue exprs: list[str] = [] for target in targets: if not isinstance(target, dict): continue expr = target.get("expr") if isinstance(expr, str) and expr.strip(): exprs.append(expr.strip()) if not exprs: continue datasource = panel.get("datasource") or {} if isinstance(datasource, dict): ds_uid = datasource.get("uid") ds_type = datasource.get("type") else: ds_uid = None ds_type = None index.append( { "dashboard": dash_title, "panel_title": panel.get("title") or "", "panel_id": panel.get("id"), "panel_type": panel.get("type"), "description": panel.get("description") or "", "tags": dash_tags, "datasource_uid": ds_uid, "datasource_type": ds_type, "exprs": exprs, } ) return index def kustomize_build(path: Path) -> str: rel = path.relative_to(REPO_ROOT) try: return _run(["kubectl", "kustomize", str(rel)], cwd=REPO_ROOT) except Exception as e: msg = str(e) if "is not in or below" in msg: # Repo uses configMapGenerators that reference ../../scripts/*.py. # Kustomize load restriction must be disabled for a full render. try: return _run( ["kubectl", "kustomize", "--load-restrictor=LoadRestrictionsNone", str(rel)], cwd=REPO_ROOT, ) except Exception: pass return _run(["kustomize", "build", "--load-restrictor=LoadRestrictionsNone", str(rel)], cwd=REPO_ROOT) def _iter_docs(raw_yaml: str) -> Iterable[dict[str, Any]]: for doc in yaml.safe_load_all(raw_yaml): if not isinstance(doc, dict): continue kind = doc.get("kind") if kind == "List" and isinstance(doc.get("items"), list): for item in doc["items"]: if isinstance(item, dict): yield item continue if kind: yield doc def _meta(doc: dict[str, Any]) -> tuple[str, str | None]: md = doc.get("metadata") or {} name = md.get("name") or "" namespace = md.get("namespace") return name, namespace def _is_namespaced(doc: dict[str, Any]) -> bool: kind = doc.get("kind") or "" return kind not in CLUSTER_SCOPED_KINDS @dataclass(frozen=True) class FluxKustomization: name: str path: str target_namespace: str | None def find_flux_kustomizations() -> list[FluxKustomization]: """Find Flux Kustomization CRs under clusters/atlas/flux-system.""" root = REPO_ROOT / "clusters" / "atlas" / "flux-system" items: list[FluxKustomization] = [] for file in sorted(root.rglob("*.yaml")): raw = file.read_text() for doc in _iter_docs(raw): if doc.get("kind") != "Kustomization": continue api = str(doc.get("apiVersion") or "") if not api.startswith("kustomize.toolkit.fluxcd.io/"): continue name, _ = _meta(doc) spec = doc.get("spec") or {} path = spec.get("path") if not isinstance(path, str) or not path.strip(): continue items.append( FluxKustomization( name=name, path=path.strip().lstrip("./"), target_namespace=spec.get("targetNamespace"), ) ) return sorted(items, key=lambda k: k.name) def _safe_string_scan_for_hosts(value: Any) -> set[str]: """Best-effort host scan from HelmRelease values without chart rendering.""" hosts: set[str] = set() if isinstance(value, str): for m in re.finditer(r"(?i)([a-z0-9-]+(?:\.[a-z0-9-]+)+)", value): host = m.group(1).lower() if host.endswith("bstein.dev"): hosts.add(host) return hosts if isinstance(value, list): for item in value: hosts |= _safe_string_scan_for_hosts(item) return hosts if isinstance(value, dict): for item in value.values(): hosts |= _safe_string_scan_for_hosts(item) return hosts return hosts def _service_ports(svc: dict[str, Any]) -> list[dict[str, Any]]: spec = svc.get("spec") or {} out: list[dict[str, Any]] = [] for p in spec.get("ports") or []: if not isinstance(p, dict): continue out.append( { "name": p.get("name"), "port": p.get("port"), "targetPort": p.get("targetPort"), "protocol": p.get("protocol", "TCP"), } ) return out def _workload_labels(doc: dict[str, Any]) -> dict[str, str]: tpl = (doc.get("spec") or {}).get("template") or {} md = tpl.get("metadata") or {} labels = md.get("labels") or {} return {str(k): str(v) for k, v in labels.items()} if isinstance(labels, dict) else {} def _service_selector(doc: dict[str, Any]) -> dict[str, str]: spec = doc.get("spec") or {} sel = spec.get("selector") or {} return {str(k): str(v) for k, v in sel.items()} if isinstance(sel, dict) else {} def _selector_matches(selector: dict[str, str], labels: dict[str, str]) -> bool: if not selector: return False return all(labels.get(k) == v for k, v in selector.items()) def _sanitize_node_id(text: str) -> str: return re.sub(r"[^a-zA-Z0-9_]", "_", text) def extract_catalog( rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]], ) -> tuple[dict[str, Any], dict[str, Any], str]: """Build knowledge catalog + mermaid diagram from rendered docs.""" # Index workloads and services for mapping. workloads: dict[tuple[str, str], dict[str, Any]] = {} services: dict[tuple[str, str], dict[str, Any]] = {} ingresses: list[dict[str, Any]] = [] ingressroutes: list[dict[str, Any]] = [] helmrelease_hosts: dict[str, list[str]] = {} for src, docs in rendered: for doc in docs: kind = doc.get("kind") if kind not in INCLUDED_KINDS: continue if kind == "Secret": continue name, namespace = _meta(doc) if _is_namespaced(doc) and not namespace and src.target_namespace: namespace = src.target_namespace doc = dict(doc) doc.setdefault("metadata", {})["namespace"] = namespace if kind in ("Deployment", "StatefulSet", "DaemonSet"): workloads[(namespace or "", name)] = { "kind": kind, "namespace": namespace or "", "name": name, "labels": _workload_labels(doc), "serviceAccountName": ((doc.get("spec") or {}).get("template") or {}) .get("spec", {}) .get("serviceAccountName"), "nodeSelector": ((doc.get("spec") or {}).get("template") or {}) .get("spec", {}) .get("nodeSelector", {}), "images": sorted( { c.get("image") for c in ( (((doc.get("spec") or {}).get("template") or {}).get("spec") or {}).get( "containers" ) or [] ) if isinstance(c, dict) and c.get("image") } ), } elif kind == "Service": services[(namespace or "", name)] = { "namespace": namespace or "", "name": name, "type": (doc.get("spec") or {}).get("type", "ClusterIP"), "selector": _service_selector(doc), "ports": _service_ports(doc), } elif kind == "Ingress": ingresses.append({"source": src.name, "doc": doc}) elif kind == "IngressRoute": ingressroutes.append({"source": src.name, "doc": doc}) elif kind == "HelmRelease": spec = doc.get("spec") or {} vals = spec.get("values") or {} hosts = sorted(_safe_string_scan_for_hosts(vals)) if hosts: helmrelease_hosts[f"{src.name}:{namespace or ''}/{name}"] = hosts # Map services to workloads. service_to_workloads: dict[tuple[str, str], list[dict[str, str]]] = {} for (ns, svc_name), svc in services.items(): selector = svc.get("selector") or {} matches: list[dict[str, str]] = [] for (w_ns, w_name), w in workloads.items(): if w_ns != ns: continue if _selector_matches(selector, w.get("labels") or {}): matches.append({"kind": w["kind"], "name": w_name}) service_to_workloads[(ns, svc_name)] = sorted(matches, key=lambda m: (m["kind"], m["name"])) # Extract HTTP endpoints. endpoints: list[dict[str, Any]] = [] def add_endpoint( *, host: str, path: str, namespace: str, service: str, port: Any, source: str, kind: str, obj_name: str, ): wk = service_to_workloads.get((namespace, service), []) endpoints.append( { "host": host, "path": path, "backend": { "namespace": namespace, "service": service, "port": port, "workloads": wk, }, "via": {"kind": kind, "name": obj_name, "source": source}, } ) for item in ingresses: doc = item["doc"] source = item["source"] name, namespace = _meta(doc) namespace = namespace or "" spec = doc.get("spec") or {} for rule in spec.get("rules") or []: if not isinstance(rule, dict): continue host = (rule.get("host") or "").strip() http = rule.get("http") or {} for p in http.get("paths") or []: if not isinstance(p, dict): continue backend = (p.get("backend") or {}).get("service") or {} svc_name = backend.get("name") svc_port = (backend.get("port") or {}).get("number") or (backend.get("port") or {}).get("name") if not host or not svc_name: continue add_endpoint( host=host, path=p.get("path") or "/", namespace=namespace, service=svc_name, port=svc_port, source=source, kind="Ingress", obj_name=name, ) host_re = re.compile(r"Host\(`([^`]+)`\)") pathprefix_re = re.compile(r"PathPrefix\(`([^`]+)`\)") for item in ingressroutes: doc = item["doc"] source = item["source"] name, namespace = _meta(doc) namespace = namespace or "" spec = doc.get("spec") or {} for route in spec.get("routes") or []: if not isinstance(route, dict): continue match = route.get("match") or "" hosts = host_re.findall(match) pathprefixes = pathprefix_re.findall(match) or ["/"] for svc in route.get("services") or []: if not isinstance(svc, dict): continue svc_name = svc.get("name") svc_port = svc.get("port") if not svc_name: continue for host in hosts: for pp in pathprefixes: add_endpoint( host=host, path=pp, namespace=namespace, service=svc_name, port=svc_port, source=source, kind="IngressRoute", obj_name=name, ) endpoints = sorted( endpoints, key=lambda e: ( e["host"], e["path"], e["backend"]["namespace"], e["backend"]["service"], ), ) catalog = { "cluster": "atlas", "sources": [ {"name": k.name, "path": k.path, "targetNamespace": k.target_namespace} for k, _ in rendered ], "workloads": sorted( list(workloads.values()), key=lambda w: (w["namespace"], w["kind"], w["name"]), ), "services": sorted( list(services.values()), key=lambda s: (s["namespace"], s["name"]), ), "http_endpoints": endpoints, "helmrelease_host_hints": {k: v for k, v in sorted(helmrelease_hosts.items())}, } # Mermaid diagram: host -> service -> workload (grouped by namespace). ns_nodes: dict[str, list[str]] = {} lines: list[str] = ["flowchart LR"] edges: set[tuple[str, str]] = set() def ensure_ns_node(ns: str, node_id: str): ns_nodes.setdefault(ns, []) if node_id not in ns_nodes[ns]: ns_nodes[ns].append(node_id) host_nodes: dict[str, str] = {} for ep in endpoints: host = ep["host"] host_id = host_nodes.get(host) if not host_id: host_id = f"host_{_sanitize_node_id(host)}" host_nodes[host] = host_id lines.append(f' {host_id}["{host}"]') ns = ep["backend"]["namespace"] svc = ep["backend"]["service"] svc_id = f"svc_{_sanitize_node_id(ns)}_{_sanitize_node_id(svc)}" if svc_id not in ns_nodes.get(ns, []): lines.append(f' {svc_id}["{ns}/{svc} (Service)"]') ensure_ns_node(ns, svc_id) if (host_id, svc_id) not in edges: edges.add((host_id, svc_id)) lines.append(f" {host_id} --> {svc_id}") for w in ep["backend"]["workloads"]: w_id = f"wl_{_sanitize_node_id(ns)}_{_sanitize_node_id(w['name'])}" if w_id not in ns_nodes.get(ns, []): lines.append(f' {w_id}["{ns}/{w["name"]} ({w["kind"]})"]') ensure_ns_node(ns, w_id) if (svc_id, w_id) not in edges: edges.add((svc_id, w_id)) lines.append(f" {svc_id} --> {w_id}") # Wrap namespace subgraphs at the end for stability (sorted namespaces). if ns_nodes: lines.append("") for ns in sorted(ns_nodes.keys()): lines.append(f" subgraph { _sanitize_node_id(ns) }[{ns}]") for node_id in ns_nodes[ns]: lines.append(f" {node_id}") lines.append(" end") diagram = "\n".join(lines).rstrip() + "\n" summary = { "counts": { "workloads": len(workloads), "services": len(services), "http_endpoints": len(endpoints), "helmrelease_host_hints": sum(len(v) for v in helmrelease_hosts.values()), } } return catalog, summary, diagram def main() -> int: ap = argparse.ArgumentParser() ap.add_argument("--out", default="knowledge", help="Output base directory (default: knowledge/)") ap.add_argument( "--write", action="store_true", help="Write generated files (otherwise just print a summary).", ) ap.add_argument( "--sync-comms", action="store_true", help="Mirror rendered knowledge into services/comms/knowledge for atlasbot.", ) args = ap.parse_args() out_dir = REPO_ROOT / args.out flux = find_flux_kustomizations() if not flux: print("No Flux Kustomizations found under clusters/atlas/flux-system.", file=sys.stderr) return 2 rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]] = [] for k in flux: path = REPO_ROOT / k.path if not path.exists(): continue raw = kustomize_build(path) docs = [d for d in _iter_docs(raw) if d.get("kind") != "Secret"] rendered.append((k, docs)) rendered = sorted(rendered, key=lambda item: item[0].name) catalog, summary, diagram = extract_catalog(rendered) if not args.write: print(json.dumps(summary, indent=2, sort_keys=True)) return 0 (out_dir / "catalog").mkdir(parents=True, exist_ok=True) (out_dir / "diagrams").mkdir(parents=True, exist_ok=True) catalog_path = out_dir / "catalog" / "atlas.yaml" catalog_json_path = out_dir / "catalog" / "atlas.json" summary_path = out_dir / "catalog" / "atlas-summary.json" diagram_path = out_dir / "diagrams" / "atlas-http.mmd" runbooks_json_path = out_dir / "catalog" / "runbooks.json" metrics_json_path = out_dir / "catalog" / "metrics.json" catalog_rel = catalog_path.relative_to(REPO_ROOT).as_posix() catalog_path.write_text( f"# {catalog_rel}\n" "# Generated by scripts/knowledge_render_atlas.py (do not edit by hand)\n" + yaml.safe_dump(catalog, sort_keys=False), encoding="utf-8", ) catalog_json_path.write_text(json.dumps(catalog, indent=2, sort_keys=False) + "\n", encoding="utf-8") summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8") diagram_path.write_text(diagram, encoding="utf-8") # Render runbooks into JSON for lightweight, dependency-free consumption in-cluster. runbook_dirs = [ out_dir / "runbooks", out_dir / "software", ] runbooks: list[dict[str, Any]] = [] for runbooks_dir in runbook_dirs: if not runbooks_dir.exists(): continue for md_file in sorted(runbooks_dir.glob("*.md")): raw = md_file.read_text(encoding="utf-8") fm: dict[str, Any] = {} body = raw if raw.startswith("---\n"): try: _, rest = raw.split("---\n", 1) fm_raw, body = rest.split("\n---\n", 1) fm = yaml.safe_load(fm_raw) or {} except Exception: fm = {} body = raw runbooks.append( { "path": str(md_file.relative_to(out_dir)), "title": fm.get("title") or md_file.stem, "tags": fm.get("tags") or [], "entrypoints": fm.get("entrypoints") or [], "source_paths": fm.get("source_paths") or [], "body": body.strip(), } ) runbooks_json_path.write_text(json.dumps(runbooks, indent=2, sort_keys=False) + "\n", encoding="utf-8") metrics_index = _extract_metrics_index(DASHBOARD_DIR) metrics_json_path.write_text( json.dumps(metrics_index, indent=2, sort_keys=False) + "\n", encoding="utf-8" ) print(f"Wrote {catalog_path.relative_to(REPO_ROOT)}") print(f"Wrote {catalog_json_path.relative_to(REPO_ROOT)}") print(f"Wrote {summary_path.relative_to(REPO_ROOT)}") print(f"Wrote {diagram_path.relative_to(REPO_ROOT)}") print(f"Wrote {runbooks_json_path.relative_to(REPO_ROOT)}") print(f"Wrote {metrics_json_path.relative_to(REPO_ROOT)}") if args.sync_comms: comms_dir = REPO_ROOT / "services" / "comms" / "knowledge" _sync_tree(out_dir, comms_dir) print(f"Synced {out_dir.relative_to(REPO_ROOT)} -> {comms_dir.relative_to(REPO_ROOT)}") return 0 if __name__ == "__main__": raise SystemExit(main())