package cluster import ( "context" "fmt" "log" "os" "path/filepath" "sort" "strconv" "strings" "time" "scm.bstein.dev/bstein/hecate/internal/config" "scm.bstein.dev/bstein/hecate/internal/execx" "scm.bstein.dev/bstein/hecate/internal/state" ) type Orchestrator struct { cfg config.Config runner *execx.Runner store *state.Store log *log.Logger } type StartupOptions struct { ForceFluxBranch string SkipLocalBootstrap bool Reason string } type ShutdownOptions struct { SkipEtcdSnapshot bool SkipDrain bool Reason string } type startupWorkload struct { Namespace string Kind string Name string } var criticalStartupWorkloads = []startupWorkload{ {Namespace: "flux-system", Kind: "deployment", Name: "source-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "kustomize-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "helm-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "notification-controller"}, {Namespace: "vault", Kind: "statefulset", Name: "vault"}, {Namespace: "postgres", Kind: "statefulset", Name: "postgres"}, {Namespace: "gitea", Kind: "deployment", Name: "gitea"}, } func New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator { return &Orchestrator{cfg: cfg, runner: runner, store: store, log: logger} } func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) { unlock, err := state.AcquireLock(o.cfg.State.LockPath) if err != nil { return err } defer unlock() record := state.RunRecord{ ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()), Action: "startup", Reason: opts.Reason, StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) workers, err := o.effectiveWorkers(ctx) if err != nil { return err } o.log.Printf("startup control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) o.reportFluxSource(ctx, opts.ForceFluxBranch) o.startControlPlanes(ctx, o.cfg.ControlPlanes) o.startWorkers(ctx, workers) if err := o.waitForAPI(ctx, 120, 2*time.Second); err != nil { return err } if opts.ForceFluxBranch != "" { patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch) if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil { return fmt.Errorf("force flux branch: %w", err) } } needsLocalBootstrap := false bootstrapReasons := []string{} if !opts.SkipLocalBootstrap { ready, readyErr := o.fluxSourceReady(ctx) if readyErr != nil { o.log.Printf("warning: unable to read flux source readiness: %v", readyErr) needsLocalBootstrap = true bootstrapReasons = append(bootstrapReasons, "flux source readiness check failed") } if !ready { needsLocalBootstrap = true bootstrapReasons = append(bootstrapReasons, "flux source not ready") } } missing, missingErr := o.missingCriticalStartupWorkloads(ctx) if missingErr != nil { o.log.Printf("warning: unable to inspect critical startup workloads: %v", missingErr) } if len(missing) > 0 { o.log.Printf("startup critical workloads not ready; applying targeted recovery first: %s", strings.Join(missing, ", ")) } if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { return err } if !opts.SkipLocalBootstrap && needsLocalBootstrap { if ready, err := o.waitForFluxSourceReady(ctx, 2*time.Minute); err != nil { o.log.Printf("warning: flux source readiness wait failed before local bootstrap: %v", err) } else if ready { o.log.Printf("flux source became ready after targeted recovery; skipping local bootstrap") needsLocalBootstrap = false } } if !opts.SkipLocalBootstrap && needsLocalBootstrap { o.log.Printf("startup bootstrap required after wait: %s", strings.Join(bootstrapReasons, "; ")) if err := o.bootstrapLocal(ctx); err != nil { return err } if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { return err } ready, err := o.fluxSourceReady(ctx) if err != nil { return fmt.Errorf("flux source readiness after bootstrap: %w", err) } if !ready { return fmt.Errorf("flux source still not ready after local bootstrap") } } if err := o.resumeFluxAndReconcile(ctx); err != nil { return err } o.log.Printf("startup flow complete") return nil } func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) { unlock, err := state.AcquireLock(o.cfg.State.LockPath) if err != nil { return err } defer unlock() record := state.RunRecord{ ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()), Action: "shutdown", Reason: opts.Reason, StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) workers, err := o.effectiveWorkers(ctx) if err != nil { return err } o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) o.reportFluxSource(ctx, "") skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot if !skipEtcd { o.bestEffort("etcd snapshot", func() error { return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0]) }) } o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) }) o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) }) skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain if !skipDrain { o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) }) } o.stopWorkers(ctx, workers) o.stopControlPlanes(ctx, o.cfg.ControlPlanes) if o.cfg.Shutdown.PoweroffEnabled { o.bestEffort("poweroff hosts", func() error { return o.poweroffHosts(ctx, workers) }) } o.log.Printf("shutdown flow complete") return nil } func (o *Orchestrator) EstimatedShutdownSeconds() int { return o.store.ShutdownP95(o.cfg.Shutdown.DefaultBudgetSeconds) } func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) { record.EndedAt = time.Now().UTC() record.DurationSeconds = int(record.EndedAt.Sub(record.StartedAt).Seconds()) record.Success = *err == nil if *err != nil { record.Error = (*err).Error() } if appendErr := o.store.Append(*record); appendErr != nil { o.log.Printf("warning: append run record failed: %v", appendErr) } } func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) { if len(o.cfg.Workers) > 0 { return append([]string{}, o.cfg.Workers...), nil } return o.discoverWorkers(ctx) } func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 15*time.Second, "get", "nodes", "-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master", "--no-headers", ) if err != nil { return nil, fmt.Errorf("discover workers: %w", err) } var workers []string for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 3 { continue } if fields[1] == "" && fields[2] == "" { workers = append(workers, fields[0]) } } if len(workers) == 0 { return nil, fmt.Errorf("no workers discovered") } return workers, nil } func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error { patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend) ksOut, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}", ) if err != nil { return err } for _, ks := range lines(ksOut) { _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch) if patchErr != nil { o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr) } } hrOut, err := o.kubectl(ctx, 25*time.Second, "get", "helmreleases.helm.toolkit.fluxcd.io", "-A", "-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}", ) if err != nil { return err } for _, hr := range lines(hrOut) { parts := strings.SplitN(hr, "/", 2) if len(parts) != 2 { continue } _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch) if patchErr != nil { o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr) } } return nil } func (o *Orchestrator) scaleDownApps(ctx context.Context) error { nsOut, err := o.kubectl(ctx, 15*time.Second, "get", "ns", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}") if err != nil { return err } exclude := map[string]struct{}{} for _, ns := range o.cfg.ExcludedNamespaces { exclude[ns] = struct{}{} } for _, ns := range lines(nsOut) { if _, ok := exclude[ns]; ok { continue } if _, scaleErr := o.kubectl(ctx, 15*time.Second, "-n", ns, "scale", "deployment", "--all", "--replicas=0"); scaleErr != nil { o.log.Printf("warning: scale deployments in %s failed: %v", ns, scaleErr) } if _, scaleErr := o.kubectl(ctx, 15*time.Second, "-n", ns, "scale", "statefulset", "--all", "--replicas=0"); scaleErr != nil { o.log.Printf("warning: scale statefulsets in %s failed: %v", ns, scaleErr) } } return nil } func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error { for _, node := range workers { if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil { o.log.Printf("warning: cordon %s failed: %v", node, err) } if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil { o.log.Printf("warning: drain %s failed: %v", node, err) } } return nil } func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) { for _, n := range workers { o.bestEffort("stop k3s-agent on "+n, func() error { _, err := o.ssh(ctx, n, "sudo systemctl stop k3s-agent || true") return err }) } } func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) { for _, n := range workers { o.bestEffort("start k3s-agent on "+n, func() error { _, err := o.ssh(ctx, n, "sudo systemctl start k3s-agent || true") return err }) } } func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) { for _, n := range cps { o.bestEffort("stop k3s on "+n, func() error { _, err := o.ssh(ctx, n, "sudo systemctl stop k3s || true") return err }) } } func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) { for _, n := range cps { o.bestEffort("start k3s on "+n, func() error { _, err := o.ssh(ctx, n, "sudo systemctl start k3s || true") return err }) } } func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error { name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405") _, err := o.ssh(ctx, node, "sudo k3s etcd-snapshot save --name "+name) return err } func (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error { if o.runner.DryRun { return nil } for i := 0; i < attempts; i++ { _, err := o.kubectl(ctx, 5*time.Second, "version", "--request-timeout=5s") if err == nil { return nil } time.Sleep(sleep) } return fmt.Errorf("kubernetes API did not become reachable within timeout") } func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) { out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}") if err != nil { return false, err } return strings.Contains(out, "True"), nil } func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) { urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}") if urlErr == nil { o.log.Printf("flux-source-url=%s", strings.TrimSpace(urlOut)) } branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}") if branchErr == nil { branch := strings.TrimSpace(branchOut) o.log.Printf("flux-source-branch=%s", branch) if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch { o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch) } } } func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { failures := 0 for _, rel := range o.cfg.LocalBootstrapPaths { full := filepath.Join(o.cfg.IACRepoPath, rel) o.log.Printf("local bootstrap apply -k %s", full) if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil { failures++ o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err) continue } } if failures == len(o.cfg.LocalBootstrapPaths) { return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures) } return nil } func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) { if o.runner.DryRun { return true, nil } deadline := time.Now().Add(window) for { ready, err := o.fluxSourceReady(ctx) if err != nil { return false, err } if ready { return true, nil } if time.Now().After(deadline) { return false, nil } select { case <-ctx.Done(): return false, ctx.Err() case <-time.After(5 * time.Second): } } } func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error { if err := o.patchFluxSuspendAll(ctx, false); err != nil { return err } if o.runner.CommandExists("flux") { commands := [][]string{ {"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=3m"}, {"reconcile", "kustomization", "core", "-n", "flux-system", "--with-source", "--timeout=5m"}, {"reconcile", "kustomization", "helm", "-n", "flux-system", "--with-source", "--timeout=5m"}, {"reconcile", "kustomization", "traefik", "-n", "flux-system", "--with-source", "--timeout=5m"}, {"reconcile", "kustomization", "vault", "-n", "flux-system", "--with-source", "--timeout=10m"}, {"reconcile", "kustomization", "postgres", "-n", "flux-system", "--with-source", "--timeout=10m"}, {"reconcile", "kustomization", "gitea", "-n", "flux-system", "--with-source", "--timeout=10m"}, } for _, c := range commands { if _, err := o.run(ctx, 3*time.Minute, "flux", c...); err != nil { o.log.Printf("warning: flux command failed (%s): %v", strings.Join(c, " "), err) } } return nil } now := time.Now().UTC().Format(time.RFC3339) _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "annotate", "kustomizations.kustomize.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite") return err } func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) { return o.run(ctx, timeout, "kubectl", args...) } func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) { target := node if o.cfg.SSHUser != "" { target = o.cfg.SSHUser + "@" + node } return o.run(ctx, 45*time.Second, "ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", target, command) } func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return o.runner.Run(runCtx, name, args...) } func lines(in string) []string { in = strings.TrimSpace(in) if in == "" { return nil } parts := strings.Split(in, "\n") out := make([]string, 0, len(parts)) for _, p := range parts { v := strings.TrimSpace(p) if v != "" { out = append(out, v) } } return out } func (o *Orchestrator) bestEffort(name string, fn func() error) { if err := fn(); err != nil { o.log.Printf("warning: %s: %v", name, err) } } func (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error) { missing := []string{} for _, w := range criticalStartupWorkloads { ready, err := o.workloadReady(ctx, w) if err != nil { if isNotFoundErr(err) { missing = append(missing, fmt.Sprintf("%s/%s/%s(not found)", w.Namespace, w.Kind, w.Name)) continue } return nil, fmt.Errorf("check %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if !ready { missing = append(missing, fmt.Sprintf("%s/%s/%s", w.Namespace, w.Kind, w.Name)) } } return missing, nil } func (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error { for _, w := range criticalStartupWorkloads { if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing, skipping scale: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return fmt.Errorf("scale %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if err := o.waitWorkloadReady(ctx, w); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing during readiness wait: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return err } } return nil } func (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error { _, err := o.kubectl( ctx, 45*time.Second, "-n", w.Namespace, "scale", w.Kind, w.Name, fmt.Sprintf("--replicas=%d", replicas), ) return err } func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error { timeout := "240s" if w.Kind == "statefulset" { timeout = "360s" } _, err := o.kubectl( ctx, 7*time.Minute, "-n", w.Namespace, "rollout", "status", fmt.Sprintf("%s/%s", w.Kind, w.Name), "--timeout="+timeout, ) if err != nil { return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } return nil } func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) { out, err := o.kubectl( ctx, 20*time.Second, "-n", w.Namespace, "get", w.Kind, w.Name, "-o", "jsonpath={.status.readyReplicas}", ) if err != nil { return false, err } raw := strings.TrimSpace(out) if raw == "" || raw == "" { return false, nil } n, err := strconv.Atoi(raw) if err != nil { return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err) } return n >= 1, nil } func isNotFoundErr(err error) bool { if err == nil { return false } msg := strings.ToLower(err.Error()) return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)") } func (o *Orchestrator) poweroffHosts(ctx context.Context, workers []string) error { delay := o.cfg.Shutdown.PoweroffDelaySeconds if delay <= 0 { delay = 25 } localNames := map[string]struct{}{} if hn, err := os.Hostname(); err == nil && strings.TrimSpace(hn) != "" { localNames[strings.TrimSpace(hn)] = struct{}{} } if o.cfg.SSHUser != "" { localNames[o.cfg.SSHUser] = struct{}{} } hostSet := map[string]struct{}{} for _, n := range o.cfg.ControlPlanes { hostSet[n] = struct{}{} } for _, n := range workers { hostSet[n] = struct{}{} } for _, n := range o.cfg.Shutdown.ExtraPoweroffHosts { if strings.TrimSpace(n) != "" { hostSet[strings.TrimSpace(n)] = struct{}{} } } hosts := make([]string, 0, len(hostSet)) for h := range hostSet { hosts = append(hosts, h) } sort.Strings(hosts) remoteCmd := fmt.Sprintf(`sudo nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &`, delay) for _, host := range hosts { host = strings.TrimSpace(host) if host == "" { continue } if _, isLocal := localNames[host]; isLocal { continue } o.bestEffort("schedule poweroff on "+host, func() error { _, err := o.ssh(ctx, host, remoteCmd) return err }) } if o.cfg.Shutdown.PoweroffLocalHost { o.bestEffort("schedule local host poweroff", func() error { _, err := o.run(ctx, 5*time.Second, "sh", "-c", fmt.Sprintf("nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &", delay+10)) return err }) } return nil }