579 lines
20 KiB
Python
579 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""Render Atlas knowledge artifacts from Flux + kustomize manifests.
|
|
|
|
Outputs (committed to git for stable diffs + RAG):
|
|
- knowledge/catalog/*.yaml
|
|
- knowledge/diagrams/*.mmd
|
|
|
|
This is intentionally conservative:
|
|
- never includes Secret objects
|
|
- never includes secret values
|
|
- keeps output deterministic (sorted)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
import shutil
|
|
from typing import Any, Iterable
|
|
|
|
import yaml
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
CLUSTER_SCOPED_KINDS = {
|
|
"Namespace",
|
|
"Node",
|
|
"CustomResourceDefinition",
|
|
"ClusterRole",
|
|
"ClusterRoleBinding",
|
|
"StorageClass",
|
|
"PersistentVolume",
|
|
"MutatingWebhookConfiguration",
|
|
"ValidatingWebhookConfiguration",
|
|
"APIService",
|
|
}
|
|
|
|
INCLUDED_KINDS = {
|
|
"Namespace",
|
|
"Deployment",
|
|
"StatefulSet",
|
|
"DaemonSet",
|
|
"Service",
|
|
"Ingress",
|
|
"IngressRoute", # traefik
|
|
"HelmRelease", # only to harvest ingress hostnames from values
|
|
}
|
|
|
|
|
|
def _run(cmd: list[str], *, cwd: Path) -> str:
|
|
res = subprocess.run(cmd, cwd=cwd, capture_output=True, text=True, check=False)
|
|
if res.returncode != 0:
|
|
raise RuntimeError(
|
|
f"Command failed ({res.returncode}): {' '.join(cmd)}\n{res.stderr.strip()}"
|
|
)
|
|
return res.stdout
|
|
|
|
|
|
def _sync_tree(source: Path, dest: Path) -> None:
|
|
if dest.exists():
|
|
shutil.rmtree(dest)
|
|
shutil.copytree(source, dest)
|
|
|
|
|
|
def kustomize_build(path: Path) -> str:
|
|
rel = path.relative_to(REPO_ROOT)
|
|
try:
|
|
return _run(["kubectl", "kustomize", str(rel)], cwd=REPO_ROOT)
|
|
except Exception as e:
|
|
msg = str(e)
|
|
if "is not in or below" in msg:
|
|
# Repo uses configMapGenerators that reference ../../scripts/*.py.
|
|
# Kustomize load restriction must be disabled for a full render.
|
|
try:
|
|
return _run(
|
|
["kubectl", "kustomize", "--load-restrictor=LoadRestrictionsNone", str(rel)],
|
|
cwd=REPO_ROOT,
|
|
)
|
|
except Exception:
|
|
pass
|
|
return _run(["kustomize", "build", "--load-restrictor=LoadRestrictionsNone", str(rel)], cwd=REPO_ROOT)
|
|
|
|
|
|
def _iter_docs(raw_yaml: str) -> Iterable[dict[str, Any]]:
|
|
for doc in yaml.safe_load_all(raw_yaml):
|
|
if not isinstance(doc, dict):
|
|
continue
|
|
kind = doc.get("kind")
|
|
if kind == "List" and isinstance(doc.get("items"), list):
|
|
for item in doc["items"]:
|
|
if isinstance(item, dict):
|
|
yield item
|
|
continue
|
|
if kind:
|
|
yield doc
|
|
|
|
|
|
def _meta(doc: dict[str, Any]) -> tuple[str, str | None]:
|
|
md = doc.get("metadata") or {}
|
|
name = md.get("name") or ""
|
|
namespace = md.get("namespace")
|
|
return name, namespace
|
|
|
|
|
|
def _is_namespaced(doc: dict[str, Any]) -> bool:
|
|
kind = doc.get("kind") or ""
|
|
return kind not in CLUSTER_SCOPED_KINDS
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class FluxKustomization:
|
|
name: str
|
|
path: str
|
|
target_namespace: str | None
|
|
|
|
|
|
def find_flux_kustomizations() -> list[FluxKustomization]:
|
|
"""Find Flux Kustomization CRs under clusters/atlas/flux-system."""
|
|
root = REPO_ROOT / "clusters" / "atlas" / "flux-system"
|
|
items: list[FluxKustomization] = []
|
|
for file in sorted(root.rglob("*.yaml")):
|
|
raw = file.read_text()
|
|
for doc in _iter_docs(raw):
|
|
if doc.get("kind") != "Kustomization":
|
|
continue
|
|
api = str(doc.get("apiVersion") or "")
|
|
if not api.startswith("kustomize.toolkit.fluxcd.io/"):
|
|
continue
|
|
name, _ = _meta(doc)
|
|
spec = doc.get("spec") or {}
|
|
path = spec.get("path")
|
|
if not isinstance(path, str) or not path.strip():
|
|
continue
|
|
items.append(
|
|
FluxKustomization(
|
|
name=name,
|
|
path=path.strip().lstrip("./"),
|
|
target_namespace=spec.get("targetNamespace"),
|
|
)
|
|
)
|
|
return sorted(items, key=lambda k: k.name)
|
|
|
|
|
|
def _safe_string_scan_for_hosts(value: Any) -> set[str]:
|
|
"""Best-effort host scan from HelmRelease values without chart rendering."""
|
|
hosts: set[str] = set()
|
|
if isinstance(value, str):
|
|
for m in re.finditer(r"(?i)([a-z0-9-]+(?:\.[a-z0-9-]+)+)", value):
|
|
host = m.group(1).lower()
|
|
if host.endswith("bstein.dev"):
|
|
hosts.add(host)
|
|
return hosts
|
|
if isinstance(value, list):
|
|
for item in value:
|
|
hosts |= _safe_string_scan_for_hosts(item)
|
|
return hosts
|
|
if isinstance(value, dict):
|
|
for item in value.values():
|
|
hosts |= _safe_string_scan_for_hosts(item)
|
|
return hosts
|
|
return hosts
|
|
|
|
|
|
def _service_ports(svc: dict[str, Any]) -> list[dict[str, Any]]:
|
|
spec = svc.get("spec") or {}
|
|
out: list[dict[str, Any]] = []
|
|
for p in spec.get("ports") or []:
|
|
if not isinstance(p, dict):
|
|
continue
|
|
out.append(
|
|
{
|
|
"name": p.get("name"),
|
|
"port": p.get("port"),
|
|
"targetPort": p.get("targetPort"),
|
|
"protocol": p.get("protocol", "TCP"),
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
def _workload_labels(doc: dict[str, Any]) -> dict[str, str]:
|
|
tpl = (doc.get("spec") or {}).get("template") or {}
|
|
md = tpl.get("metadata") or {}
|
|
labels = md.get("labels") or {}
|
|
return {str(k): str(v) for k, v in labels.items()} if isinstance(labels, dict) else {}
|
|
|
|
|
|
def _service_selector(doc: dict[str, Any]) -> dict[str, str]:
|
|
spec = doc.get("spec") or {}
|
|
sel = spec.get("selector") or {}
|
|
return {str(k): str(v) for k, v in sel.items()} if isinstance(sel, dict) else {}
|
|
|
|
|
|
def _selector_matches(selector: dict[str, str], labels: dict[str, str]) -> bool:
|
|
if not selector:
|
|
return False
|
|
return all(labels.get(k) == v for k, v in selector.items())
|
|
|
|
|
|
def _sanitize_node_id(text: str) -> str:
|
|
return re.sub(r"[^a-zA-Z0-9_]", "_", text)
|
|
|
|
|
|
def extract_catalog(
|
|
rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]],
|
|
) -> tuple[dict[str, Any], dict[str, Any], str]:
|
|
"""Build knowledge catalog + mermaid diagram from rendered docs."""
|
|
# Index workloads and services for mapping.
|
|
workloads: dict[tuple[str, str], dict[str, Any]] = {}
|
|
services: dict[tuple[str, str], dict[str, Any]] = {}
|
|
ingresses: list[dict[str, Any]] = []
|
|
ingressroutes: list[dict[str, Any]] = []
|
|
helmrelease_hosts: dict[str, list[str]] = {}
|
|
|
|
for src, docs in rendered:
|
|
for doc in docs:
|
|
kind = doc.get("kind")
|
|
if kind not in INCLUDED_KINDS:
|
|
continue
|
|
if kind == "Secret":
|
|
continue
|
|
|
|
name, namespace = _meta(doc)
|
|
if _is_namespaced(doc) and not namespace and src.target_namespace:
|
|
namespace = src.target_namespace
|
|
doc = dict(doc)
|
|
doc.setdefault("metadata", {})["namespace"] = namespace
|
|
|
|
if kind in ("Deployment", "StatefulSet", "DaemonSet"):
|
|
workloads[(namespace or "", name)] = {
|
|
"kind": kind,
|
|
"namespace": namespace or "",
|
|
"name": name,
|
|
"labels": _workload_labels(doc),
|
|
"serviceAccountName": ((doc.get("spec") or {}).get("template") or {})
|
|
.get("spec", {})
|
|
.get("serviceAccountName"),
|
|
"nodeSelector": ((doc.get("spec") or {}).get("template") or {})
|
|
.get("spec", {})
|
|
.get("nodeSelector", {}),
|
|
"images": sorted(
|
|
{
|
|
c.get("image")
|
|
for c in (
|
|
(((doc.get("spec") or {}).get("template") or {}).get("spec") or {}).get(
|
|
"containers"
|
|
)
|
|
or []
|
|
)
|
|
if isinstance(c, dict) and c.get("image")
|
|
}
|
|
),
|
|
}
|
|
elif kind == "Service":
|
|
services[(namespace or "", name)] = {
|
|
"namespace": namespace or "",
|
|
"name": name,
|
|
"type": (doc.get("spec") or {}).get("type", "ClusterIP"),
|
|
"selector": _service_selector(doc),
|
|
"ports": _service_ports(doc),
|
|
}
|
|
elif kind == "Ingress":
|
|
ingresses.append({"source": src.name, "doc": doc})
|
|
elif kind == "IngressRoute":
|
|
ingressroutes.append({"source": src.name, "doc": doc})
|
|
elif kind == "HelmRelease":
|
|
spec = doc.get("spec") or {}
|
|
vals = spec.get("values") or {}
|
|
hosts = sorted(_safe_string_scan_for_hosts(vals))
|
|
if hosts:
|
|
helmrelease_hosts[f"{src.name}:{namespace or ''}/{name}"] = hosts
|
|
|
|
# Map services to workloads.
|
|
service_to_workloads: dict[tuple[str, str], list[dict[str, str]]] = {}
|
|
for (ns, svc_name), svc in services.items():
|
|
selector = svc.get("selector") or {}
|
|
matches: list[dict[str, str]] = []
|
|
for (w_ns, w_name), w in workloads.items():
|
|
if w_ns != ns:
|
|
continue
|
|
if _selector_matches(selector, w.get("labels") or {}):
|
|
matches.append({"kind": w["kind"], "name": w_name})
|
|
service_to_workloads[(ns, svc_name)] = sorted(matches, key=lambda m: (m["kind"], m["name"]))
|
|
|
|
# Extract HTTP endpoints.
|
|
endpoints: list[dict[str, Any]] = []
|
|
|
|
def add_endpoint(
|
|
*,
|
|
host: str,
|
|
path: str,
|
|
namespace: str,
|
|
service: str,
|
|
port: Any,
|
|
source: str,
|
|
kind: str,
|
|
obj_name: str,
|
|
):
|
|
wk = service_to_workloads.get((namespace, service), [])
|
|
endpoints.append(
|
|
{
|
|
"host": host,
|
|
"path": path,
|
|
"backend": {
|
|
"namespace": namespace,
|
|
"service": service,
|
|
"port": port,
|
|
"workloads": wk,
|
|
},
|
|
"via": {"kind": kind, "name": obj_name, "source": source},
|
|
}
|
|
)
|
|
|
|
for item in ingresses:
|
|
doc = item["doc"]
|
|
source = item["source"]
|
|
name, namespace = _meta(doc)
|
|
namespace = namespace or ""
|
|
spec = doc.get("spec") or {}
|
|
for rule in spec.get("rules") or []:
|
|
if not isinstance(rule, dict):
|
|
continue
|
|
host = (rule.get("host") or "").strip()
|
|
http = rule.get("http") or {}
|
|
for p in http.get("paths") or []:
|
|
if not isinstance(p, dict):
|
|
continue
|
|
backend = (p.get("backend") or {}).get("service") or {}
|
|
svc_name = backend.get("name")
|
|
svc_port = (backend.get("port") or {}).get("number") or (backend.get("port") or {}).get("name")
|
|
if not host or not svc_name:
|
|
continue
|
|
add_endpoint(
|
|
host=host,
|
|
path=p.get("path") or "/",
|
|
namespace=namespace,
|
|
service=svc_name,
|
|
port=svc_port,
|
|
source=source,
|
|
kind="Ingress",
|
|
obj_name=name,
|
|
)
|
|
|
|
host_re = re.compile(r"Host\(`([^`]+)`\)")
|
|
pathprefix_re = re.compile(r"PathPrefix\(`([^`]+)`\)")
|
|
for item in ingressroutes:
|
|
doc = item["doc"]
|
|
source = item["source"]
|
|
name, namespace = _meta(doc)
|
|
namespace = namespace or ""
|
|
spec = doc.get("spec") or {}
|
|
for route in spec.get("routes") or []:
|
|
if not isinstance(route, dict):
|
|
continue
|
|
match = route.get("match") or ""
|
|
hosts = host_re.findall(match)
|
|
pathprefixes = pathprefix_re.findall(match) or ["/"]
|
|
for svc in route.get("services") or []:
|
|
if not isinstance(svc, dict):
|
|
continue
|
|
svc_name = svc.get("name")
|
|
svc_port = svc.get("port")
|
|
if not svc_name:
|
|
continue
|
|
for host in hosts:
|
|
for pp in pathprefixes:
|
|
add_endpoint(
|
|
host=host,
|
|
path=pp,
|
|
namespace=namespace,
|
|
service=svc_name,
|
|
port=svc_port,
|
|
source=source,
|
|
kind="IngressRoute",
|
|
obj_name=name,
|
|
)
|
|
|
|
endpoints = sorted(
|
|
endpoints,
|
|
key=lambda e: (
|
|
e["host"],
|
|
e["path"],
|
|
e["backend"]["namespace"],
|
|
e["backend"]["service"],
|
|
),
|
|
)
|
|
|
|
catalog = {
|
|
"cluster": "atlas",
|
|
"sources": [
|
|
{"name": k.name, "path": k.path, "targetNamespace": k.target_namespace}
|
|
for k, _ in rendered
|
|
],
|
|
"workloads": sorted(
|
|
list(workloads.values()),
|
|
key=lambda w: (w["namespace"], w["kind"], w["name"]),
|
|
),
|
|
"services": sorted(
|
|
list(services.values()),
|
|
key=lambda s: (s["namespace"], s["name"]),
|
|
),
|
|
"http_endpoints": endpoints,
|
|
"helmrelease_host_hints": {k: v for k, v in sorted(helmrelease_hosts.items())},
|
|
}
|
|
|
|
# Mermaid diagram: host -> service -> workload (grouped by namespace).
|
|
ns_nodes: dict[str, list[str]] = {}
|
|
lines: list[str] = ["flowchart LR"]
|
|
edges: set[tuple[str, str]] = set()
|
|
|
|
def ensure_ns_node(ns: str, node_id: str):
|
|
ns_nodes.setdefault(ns, [])
|
|
if node_id not in ns_nodes[ns]:
|
|
ns_nodes[ns].append(node_id)
|
|
|
|
host_nodes: dict[str, str] = {}
|
|
|
|
for ep in endpoints:
|
|
host = ep["host"]
|
|
host_id = host_nodes.get(host)
|
|
if not host_id:
|
|
host_id = f"host_{_sanitize_node_id(host)}"
|
|
host_nodes[host] = host_id
|
|
lines.append(f' {host_id}["{host}"]')
|
|
|
|
ns = ep["backend"]["namespace"]
|
|
svc = ep["backend"]["service"]
|
|
svc_id = f"svc_{_sanitize_node_id(ns)}_{_sanitize_node_id(svc)}"
|
|
if svc_id not in ns_nodes.get(ns, []):
|
|
lines.append(f' {svc_id}["{ns}/{svc} (Service)"]')
|
|
ensure_ns_node(ns, svc_id)
|
|
|
|
if (host_id, svc_id) not in edges:
|
|
edges.add((host_id, svc_id))
|
|
lines.append(f" {host_id} --> {svc_id}")
|
|
|
|
for w in ep["backend"]["workloads"]:
|
|
w_id = f"wl_{_sanitize_node_id(ns)}_{_sanitize_node_id(w['name'])}"
|
|
if w_id not in ns_nodes.get(ns, []):
|
|
lines.append(f' {w_id}["{ns}/{w["name"]} ({w["kind"]})"]')
|
|
ensure_ns_node(ns, w_id)
|
|
if (svc_id, w_id) not in edges:
|
|
edges.add((svc_id, w_id))
|
|
lines.append(f" {svc_id} --> {w_id}")
|
|
|
|
# Wrap namespace subgraphs at the end for stability (sorted namespaces).
|
|
if ns_nodes:
|
|
lines.append("")
|
|
for ns in sorted(ns_nodes.keys()):
|
|
lines.append(f" subgraph { _sanitize_node_id(ns) }[{ns}]")
|
|
for node_id in ns_nodes[ns]:
|
|
lines.append(f" {node_id}")
|
|
lines.append(" end")
|
|
|
|
diagram = "\n".join(lines).rstrip() + "\n"
|
|
|
|
summary = {
|
|
"counts": {
|
|
"workloads": len(workloads),
|
|
"services": len(services),
|
|
"http_endpoints": len(endpoints),
|
|
"helmrelease_host_hints": sum(len(v) for v in helmrelease_hosts.values()),
|
|
}
|
|
}
|
|
|
|
return catalog, summary, diagram
|
|
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--out", default="knowledge", help="Output base directory (default: knowledge/)")
|
|
ap.add_argument(
|
|
"--write",
|
|
action="store_true",
|
|
help="Write generated files (otherwise just print a summary).",
|
|
)
|
|
ap.add_argument(
|
|
"--sync-comms",
|
|
action="store_true",
|
|
help="Mirror rendered knowledge into services/comms/knowledge for atlasbot.",
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
out_dir = REPO_ROOT / args.out
|
|
flux = find_flux_kustomizations()
|
|
if not flux:
|
|
print("No Flux Kustomizations found under clusters/atlas/flux-system.", file=sys.stderr)
|
|
return 2
|
|
|
|
rendered: list[tuple[FluxKustomization, list[dict[str, Any]]]] = []
|
|
for k in flux:
|
|
path = REPO_ROOT / k.path
|
|
if not path.exists():
|
|
continue
|
|
raw = kustomize_build(path)
|
|
docs = [d for d in _iter_docs(raw) if d.get("kind") != "Secret"]
|
|
rendered.append((k, docs))
|
|
|
|
rendered = sorted(rendered, key=lambda item: item[0].name)
|
|
catalog, summary, diagram = extract_catalog(rendered)
|
|
|
|
if not args.write:
|
|
print(json.dumps(summary, indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
(out_dir / "catalog").mkdir(parents=True, exist_ok=True)
|
|
(out_dir / "diagrams").mkdir(parents=True, exist_ok=True)
|
|
|
|
catalog_path = out_dir / "catalog" / "atlas.yaml"
|
|
catalog_json_path = out_dir / "catalog" / "atlas.json"
|
|
summary_path = out_dir / "catalog" / "atlas-summary.json"
|
|
diagram_path = out_dir / "diagrams" / "atlas-http.mmd"
|
|
runbooks_json_path = out_dir / "catalog" / "runbooks.json"
|
|
|
|
catalog_rel = catalog_path.relative_to(REPO_ROOT).as_posix()
|
|
catalog_path.write_text(
|
|
f"# {catalog_rel}\n"
|
|
"# Generated by scripts/knowledge_render_atlas.py (do not edit by hand)\n"
|
|
+ yaml.safe_dump(catalog, sort_keys=False),
|
|
encoding="utf-8",
|
|
)
|
|
catalog_json_path.write_text(json.dumps(catalog, indent=2, sort_keys=False) + "\n", encoding="utf-8")
|
|
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
|
diagram_path.write_text(diagram, encoding="utf-8")
|
|
|
|
# Render runbooks into JSON for lightweight, dependency-free consumption in-cluster.
|
|
runbook_dirs = [
|
|
out_dir / "runbooks",
|
|
out_dir / "software",
|
|
]
|
|
runbooks: list[dict[str, Any]] = []
|
|
for runbooks_dir in runbook_dirs:
|
|
if not runbooks_dir.exists():
|
|
continue
|
|
for md_file in sorted(runbooks_dir.glob("*.md")):
|
|
raw = md_file.read_text(encoding="utf-8")
|
|
fm: dict[str, Any] = {}
|
|
body = raw
|
|
if raw.startswith("---\n"):
|
|
try:
|
|
_, rest = raw.split("---\n", 1)
|
|
fm_raw, body = rest.split("\n---\n", 1)
|
|
fm = yaml.safe_load(fm_raw) or {}
|
|
except Exception:
|
|
fm = {}
|
|
body = raw
|
|
runbooks.append(
|
|
{
|
|
"path": str(md_file.relative_to(out_dir)),
|
|
"title": fm.get("title") or md_file.stem,
|
|
"tags": fm.get("tags") or [],
|
|
"entrypoints": fm.get("entrypoints") or [],
|
|
"source_paths": fm.get("source_paths") or [],
|
|
"body": body.strip(),
|
|
}
|
|
)
|
|
runbooks_json_path.write_text(json.dumps(runbooks, indent=2, sort_keys=False) + "\n", encoding="utf-8")
|
|
|
|
print(f"Wrote {catalog_path.relative_to(REPO_ROOT)}")
|
|
print(f"Wrote {catalog_json_path.relative_to(REPO_ROOT)}")
|
|
print(f"Wrote {summary_path.relative_to(REPO_ROOT)}")
|
|
print(f"Wrote {diagram_path.relative_to(REPO_ROOT)}")
|
|
print(f"Wrote {runbooks_json_path.relative_to(REPO_ROOT)}")
|
|
|
|
if args.sync_comms:
|
|
comms_dir = REPO_ROOT / "services" / "comms" / "knowledge"
|
|
_sync_tree(out_dir, comms_dir)
|
|
print(f"Synced {out_dir.relative_to(REPO_ROOT)} -> {comms_dir.relative_to(REPO_ROOT)}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|