package cluster import ( "context" "encoding/base64" "encoding/json" "errors" "fmt" "log" "os" "os/exec" "path/filepath" "sort" "strconv" "strings" "sync" "time" "scm.bstein.dev/bstein/hecate/internal/config" "scm.bstein.dev/bstein/hecate/internal/execx" "scm.bstein.dev/bstein/hecate/internal/state" ) type Orchestrator struct { cfg config.Config runner *execx.Runner store *state.Store log *log.Logger } type StartupOptions struct { ForceFluxBranch string SkipLocalBootstrap bool Reason string } type ShutdownOptions struct { SkipEtcdSnapshot bool SkipDrain bool Reason string } type EtcdRestoreOptions struct { ControlPlane string SnapshotPath string } type startupWorkload struct { Namespace string Kind string Name string } type workloadScaleEntry struct { Namespace string `json:"namespace"` Kind string `json:"kind"` Name string `json:"name"` Replicas int `json:"replicas"` } type workloadScaleSnapshot struct { GeneratedAt time.Time `json:"generated_at"` Entries []workloadScaleEntry `json:"entries"` } var criticalStartupWorkloads = []startupWorkload{ {Namespace: "flux-system", Kind: "deployment", Name: "source-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "kustomize-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "helm-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "notification-controller"}, {Namespace: "harbor", Kind: "statefulset", Name: "harbor-redis"}, {Namespace: "harbor", Kind: "deployment", Name: "harbor-registry"}, {Namespace: "vault", Kind: "statefulset", Name: "vault"}, {Namespace: "postgres", Kind: "statefulset", Name: "postgres"}, {Namespace: "gitea", Kind: "deployment", Name: "gitea"}, } var ErrEtcdRestoreNotApplicable = errors.New("etcd restore not applicable") func New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator { return &Orchestrator{cfg: cfg, runner: runner, store: store, log: logger} } func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) { unlock, err := state.AcquireLock(o.cfg.State.LockPath) if err != nil { return err } defer unlock() record := state.RunRecord{ ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()), Action: "startup", Reason: opts.Reason, StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) if !o.runner.DryRun { currentIntent, readErr := state.ReadIntent(o.cfg.State.IntentPath) if readErr != nil { return fmt.Errorf("read startup intent: %w", readErr) } if currentIntent.State == state.IntentShuttingDown { return fmt.Errorf("startup blocked: shutdown intent is active (%s)", currentIntent.Reason) } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentStartupInProgress, opts.Reason, "startup"); writeErr != nil { return fmt.Errorf("set startup intent: %w", writeErr) } defer func() { finalReason := opts.Reason if err != nil { finalReason = fmt.Sprintf("%s (failed)", strings.TrimSpace(opts.Reason)) } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, finalReason, "startup"); writeErr != nil { o.log.Printf("warning: write startup completion intent failed: %v", writeErr) } }() } o.log.Printf("startup control-planes=%s", strings.Join(o.cfg.ControlPlanes, ",")) o.reportFluxSource(ctx, opts.ForceFluxBranch) o.startControlPlanes(ctx, o.cfg.ControlPlanes) apiPoll := time.Duration(o.cfg.Startup.APIPollSeconds) * time.Second apiAttempts := o.cfg.Startup.APIWaitSeconds / o.cfg.Startup.APIPollSeconds if apiAttempts < 1 { apiAttempts = 1 } if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil { if !o.cfg.Startup.AutoEtcdRestoreOnAPIFailure { return err } cp := strings.TrimSpace(o.cfg.Startup.EtcdRestoreControlPlane) if cp == "" && len(o.cfg.ControlPlanes) > 0 { cp = o.cfg.ControlPlanes[0] } o.log.Printf("warning: initial API wait failed (%v); attempting automatic etcd restore on %s", err, cp) if restoreErr := o.EtcdRestore(ctx, EtcdRestoreOptions{ControlPlane: cp}); restoreErr != nil { if errors.Is(restoreErr, ErrEtcdRestoreNotApplicable) { o.log.Printf("warning: automatic etcd restore skipped: %v", restoreErr) o.log.Printf("warning: retrying control-plane start because datastore recovery path is external") o.startControlPlanes(ctx, o.cfg.ControlPlanes) } else { return fmt.Errorf("kubernetes API did not become reachable and automatic etcd restore failed: %w", restoreErr) } } if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil { return fmt.Errorf("kubernetes API did not become reachable after automatic etcd restore: %w", err) } } workers, err := o.effectiveWorkers(ctx) if err != nil { return err } o.log.Printf("startup workers=%s", strings.Join(workers, ",")) o.startWorkers(ctx, workers) o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) }) if opts.ForceFluxBranch != "" { patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch) if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil { return fmt.Errorf("force flux branch: %w", err) } } needsLocalBootstrap := false bootstrapReasons := []string{} if !opts.SkipLocalBootstrap { ready, readyErr := o.fluxSourceReady(ctx) if readyErr != nil { o.log.Printf("warning: unable to read flux source readiness: %v", readyErr) needsLocalBootstrap = true bootstrapReasons = append(bootstrapReasons, "flux source readiness check failed") } if !ready { needsLocalBootstrap = true bootstrapReasons = append(bootstrapReasons, "flux source not ready") } } missing, missingErr := o.missingCriticalStartupWorkloads(ctx) if missingErr != nil { o.log.Printf("warning: unable to inspect critical startup workloads: %v", missingErr) } if len(missing) > 0 { o.log.Printf("startup critical workloads not ready; applying targeted recovery first: %s", strings.Join(missing, ", ")) } if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { return err } if !opts.SkipLocalBootstrap && needsLocalBootstrap { if ready, err := o.waitForFluxSourceReady(ctx, 5*time.Minute); err != nil { o.log.Printf("warning: flux source readiness wait failed before local bootstrap: %v", err) } else if ready { o.log.Printf("flux source became ready after targeted recovery; skipping local bootstrap") needsLocalBootstrap = false } } if !opts.SkipLocalBootstrap && needsLocalBootstrap { o.log.Printf("startup bootstrap required after wait: %s", strings.Join(bootstrapReasons, "; ")) if err := o.bootstrapLocal(ctx); err != nil { return err } if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { return err } ready, err := o.fluxSourceReady(ctx) if err != nil { return fmt.Errorf("flux source readiness after bootstrap: %w", err) } if !ready { return fmt.Errorf("flux source still not ready after local bootstrap") } } o.bestEffort("restore scaled workloads", func() error { return o.restoreScaledApps(ctx) }) if err := o.resumeFluxAndReconcile(ctx); err != nil { return err } o.log.Printf("startup flow complete") return nil } func (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions) error { controlPlane := strings.TrimSpace(opts.ControlPlane) if controlPlane == "" { if len(o.cfg.ControlPlanes) == 0 { return fmt.Errorf("cannot restore etcd: no control planes configured") } controlPlane = o.cfg.ControlPlanes[0] } found := false for _, cp := range o.cfg.ControlPlanes { if cp == controlPlane { found = true break } } if !found { return fmt.Errorf("cannot restore etcd: control plane %s is not in configured control_planes", controlPlane) } if !o.sshManaged(controlPlane) { return fmt.Errorf("cannot restore etcd on %s: node not in ssh_managed_nodes", controlPlane) } snapshotPath := strings.TrimSpace(opts.SnapshotPath) if o.runner.DryRun { if snapshotPath == "" { snapshotPath = "" } o.log.Printf("etcd restore target=%s snapshot=%s (dry-run; datastore-mode and snapshot checks skipped)", controlPlane, snapshotPath) return nil } externalDatastore, err := o.controlPlaneUsesExternalDatastore(ctx, controlPlane) if err != nil { return err } if externalDatastore { return fmt.Errorf("%w: %s uses --datastore-endpoint", ErrEtcdRestoreNotApplicable, controlPlane) } if snapshotPath == "" { resolved, err := o.latestEtcdSnapshotPath(ctx, controlPlane) if err != nil { return err } snapshotPath = resolved } o.log.Printf("etcd restore target=%s snapshot=%s", controlPlane, snapshotPath) for _, cp := range o.cfg.ControlPlanes { cp := cp o.bestEffort("stop k3s before etcd restore on "+cp, func() error { _, err := o.ssh(ctx, cp, "sudo systemctl stop k3s || true") return err }) } restoreCmd := fmt.Sprintf("sudo k3s server --cluster-reset --cluster-reset-restore-path %q", snapshotPath) if _, err := o.ssh(ctx, controlPlane, restoreCmd); err != nil { return fmt.Errorf("etcd restore command failed on %s: %w", controlPlane, err) } o.log.Printf("etcd restore command completed on %s", controlPlane) if _, err := o.ssh(ctx, controlPlane, "sudo systemctl start k3s || true"); err != nil { return fmt.Errorf("failed to start k3s on restore control plane %s: %w", controlPlane, err) } time.Sleep(10 * time.Second) for _, cp := range o.cfg.ControlPlanes { cp := cp if cp == controlPlane { continue } o.bestEffort("start k3s after etcd restore on "+cp, func() error { _, err := o.ssh(ctx, cp, "sudo systemctl start k3s || true") return err }) } return nil } func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) { unlock, err := state.AcquireLock(o.cfg.State.LockPath) if err != nil { return err } defer unlock() record := state.RunRecord{ ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()), Action: "shutdown", Reason: opts.Reason, StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) if !o.runner.DryRun { if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil { return fmt.Errorf("set shutdown intent: %w", writeErr) } defer func() { final := state.IntentShuttingDown if err == nil { final = state.IntentShutdownComplete } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil { o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr) } }() } workers, err := o.effectiveWorkers(ctx) if err != nil { return err } o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) o.reportFluxSource(ctx, "") skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot if !skipEtcd { o.bestEffort("etcd snapshot", func() error { return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0]) }) } o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) }) o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) }) skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain if !skipDrain { o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) }) } o.stopWorkers(ctx, workers) o.stopControlPlanes(ctx, o.cfg.ControlPlanes) if o.cfg.Shutdown.PoweroffEnabled { o.bestEffort("poweroff hosts", func() error { return o.poweroffHosts(ctx, workers) }) } o.log.Printf("shutdown flow complete") return nil } func (o *Orchestrator) EstimatedShutdownSeconds() int { return o.store.ShutdownP95(o.cfg.Shutdown.DefaultBudgetSeconds) } func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) { record.EndedAt = time.Now().UTC() record.DurationSeconds = int(record.EndedAt.Sub(record.StartedAt).Seconds()) record.Success = *err == nil if *err != nil { record.Error = (*err).Error() } if appendErr := o.store.Append(*record); appendErr != nil { o.log.Printf("warning: append run record failed: %v", appendErr) } } func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) { if len(o.cfg.Workers) > 0 { return append([]string{}, o.cfg.Workers...), nil } workers, err := o.discoverWorkers(ctx) if err == nil { return workers, nil } fallback := o.fallbackWorkersFromInventory() if len(fallback) == 0 { return nil, err } o.log.Printf("warning: worker discovery failed via kubernetes API (%v); falling back to inventory workers=%s", err, strings.Join(fallback, ",")) return fallback, nil } func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 15*time.Second, "get", "nodes", "-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master", "--no-headers", ) if err != nil { return nil, fmt.Errorf("discover workers: %w", err) } var workers []string for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 3 { continue } if fields[1] == "" && fields[2] == "" { workers = append(workers, fields[0]) } } if len(workers) == 0 { return nil, fmt.Errorf("no workers discovered") } return workers, nil } func (o *Orchestrator) fallbackWorkersFromInventory() []string { cp := make(map[string]struct{}, len(o.cfg.ControlPlanes)) for _, node := range o.cfg.ControlPlanes { cp[strings.TrimSpace(node)] = struct{}{} } candidates := make(map[string]struct{}) add := func(node string) { name := strings.TrimSpace(node) if name == "" { return } if _, isCP := cp[name]; isCP { return } candidates[name] = struct{}{} } for _, node := range o.cfg.SSHManagedNodes { add(node) } if len(candidates) == 0 { for node := range o.cfg.SSHNodeHosts { add(node) } } workers := make([]string, 0, len(candidates)) for node := range candidates { workers = append(workers, node) } sort.Strings(workers) return workers } func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error { patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend) ksOut, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}", ) if err != nil { return err } for _, ks := range lines(ksOut) { _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch) if patchErr != nil { o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr) } } hrOut, err := o.kubectl(ctx, 25*time.Second, "get", "helmreleases.helm.toolkit.fluxcd.io", "-A", "-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}", ) if err != nil { return err } for _, hr := range lines(hrOut) { parts := strings.SplitN(hr, "/", 2) if len(parts) != 2 { continue } _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch) if patchErr != nil { o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr) } } return nil } func (o *Orchestrator) scaleDownApps(ctx context.Context) error { targets, err := o.listScalableWorkloads(ctx) if err != nil { return err } if err := o.writeScaledWorkloadSnapshot(targets); err != nil { return err } if len(targets) == 0 { o.log.Printf("scale down apps: no workloads above 0 replicas") return nil } parallelism := o.cfg.Shutdown.ScaleParallelism if parallelism <= 0 { parallelism = 8 } o.log.Printf("scale down apps targets=%d parallelism=%d", len(targets), parallelism) return o.scaleWorkloads(ctx, targets, 0, parallelism) } func (o *Orchestrator) restoreScaledApps(ctx context.Context) error { snapshot, err := o.readScaledWorkloadSnapshot() if err != nil { return err } if snapshot == nil || len(snapshot.Entries) == 0 { o.log.Printf("restore scaled workloads: no snapshot entries to restore") return nil } parallelism := o.cfg.Shutdown.ScaleParallelism if parallelism <= 0 { parallelism = 8 } o.log.Printf("restore scaled workloads entries=%d parallelism=%d snapshot_at=%s", len(snapshot.Entries), parallelism, snapshot.GeneratedAt.Format(time.RFC3339)) if err := o.scaleWorkloads(ctx, snapshot.Entries, -1, parallelism); err != nil { return err } if o.runner.DryRun { return nil } if err := os.Remove(o.scaledWorkloadSnapshotPath()); err != nil && !os.IsNotExist(err) { return fmt.Errorf("remove scaled workload snapshot: %w", err) } return nil } func (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error) { exclude := map[string]struct{}{} for _, ns := range o.cfg.ExcludedNamespaces { exclude[strings.TrimSpace(ns)] = struct{}{} } collect := func(kind string) ([]workloadScaleEntry, error) { out, err := o.kubectl( ctx, 25*time.Second, "get", kind, "-A", "-o", "jsonpath={range .items[*]}{.metadata.namespace}{'\\t'}{.metadata.name}{'\\t'}{.spec.replicas}{'\\n'}{end}", ) if err != nil { return nil, err } var entries []workloadScaleEntry for _, line := range lines(out) { parts := strings.Split(line, "\t") if len(parts) < 3 { continue } ns := strings.TrimSpace(parts[0]) if _, skip := exclude[ns]; skip { continue } replicas, convErr := strconv.Atoi(strings.TrimSpace(parts[2])) if convErr != nil || replicas <= 0 { continue } entries = append(entries, workloadScaleEntry{ Namespace: ns, Kind: kind, Name: strings.TrimSpace(parts[1]), Replicas: replicas, }) } return entries, nil } deployments, err := collect("deployment") if err != nil { return nil, fmt.Errorf("collect deployments: %w", err) } statefulsets, err := collect("statefulset") if err != nil { return nil, fmt.Errorf("collect statefulsets: %w", err) } targets := append(deployments, statefulsets...) sort.Slice(targets, func(i, j int) bool { a, b := targets[i], targets[j] if a.Namespace != b.Namespace { return a.Namespace < b.Namespace } if a.Kind != b.Kind { return a.Kind < b.Kind } return a.Name < b.Name }) return targets, nil } func (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error { if len(entries) == 0 { return nil } if parallelism <= 0 { parallelism = 1 } if parallelism > len(entries) { parallelism = len(entries) } sem := make(chan struct{}, parallelism) var wg sync.WaitGroup errCh := make(chan error, len(entries)) for _, entry := range entries { entry := entry wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() replicas := entry.Replicas if forceReplicas >= 0 { replicas = forceReplicas } if _, err := o.kubectl( ctx, 20*time.Second, "-n", entry.Namespace, "scale", entry.Kind, entry.Name, fmt.Sprintf("--replicas=%d", replicas), ); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: skip missing workload while scaling %s/%s/%s", entry.Namespace, entry.Kind, entry.Name) return } errCh <- fmt.Errorf("scale %s/%s/%s -> %d: %w", entry.Namespace, entry.Kind, entry.Name, replicas, err) } }() } wg.Wait() close(errCh) errorCount := len(errCh) if errorCount == 0 { return nil } errs := make([]string, 0, len(errCh)) for err := range errCh { errs = append(errs, err.Error()) if len(errs) >= 5 { break } } return fmt.Errorf("scaling had %d errors (first: %s)", errorCount, strings.Join(errs, " | ")) } func (o *Orchestrator) scaledWorkloadSnapshotPath() string { return filepath.Join(o.cfg.State.Dir, "scaled-workloads.json") } func (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error { if o.runner.DryRun { return nil } if err := os.MkdirAll(o.cfg.State.Dir, 0o755); err != nil { return fmt.Errorf("ensure state dir: %w", err) } payload := workloadScaleSnapshot{ GeneratedAt: time.Now().UTC(), Entries: entries, } b, err := json.MarshalIndent(payload, "", " ") if err != nil { return fmt.Errorf("marshal scaled workload snapshot: %w", err) } if err := os.WriteFile(o.scaledWorkloadSnapshotPath(), b, 0o644); err != nil { return fmt.Errorf("write scaled workload snapshot: %w", err) } return nil } func (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error) { if o.runner.DryRun { return nil, nil } b, err := os.ReadFile(o.scaledWorkloadSnapshotPath()) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, fmt.Errorf("read scaled workload snapshot: %w", err) } var snapshot workloadScaleSnapshot if err := json.Unmarshal(b, &snapshot); err != nil { return nil, fmt.Errorf("decode scaled workload snapshot: %w", err) } return &snapshot, nil } func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error { total := len(workers) if total == 0 { return nil } parallelism := o.cfg.Shutdown.DrainParallelism if parallelism <= 0 { parallelism = 6 } if parallelism > total { parallelism = total } o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism) sem := make(chan struct{}, parallelism) var wg sync.WaitGroup errCh := make(chan error, total) for idx, node := range workers { idx := idx node := node wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() o.log.Printf("drain worker %d/%d: %s", idx+1, total, node) if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil { o.log.Printf("warning: cordon %s failed: %v", node, err) } if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil { errCh <- fmt.Errorf("drain %s failed: %w", node, err) return } }() } wg.Wait() close(errCh) if len(errCh) == 0 { return nil } count := len(errCh) samples := []string{} for err := range errCh { samples = append(samples, err.Error()) if len(samples) >= 4 { break } } return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | ")) } func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error { for _, node := range workers { if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil { o.log.Printf("warning: uncordon %s failed: %v", node, err) } } return nil } func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) { o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true") } func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) { o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true") } func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) { o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true") } func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) { o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true") } func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) { if len(nodes) == 0 { return } parallelism := o.cfg.Shutdown.SSHParallelism if parallelism <= 0 { parallelism = 8 } if parallelism > len(nodes) { parallelism = len(nodes) } sem := make(chan struct{}, parallelism) var wg sync.WaitGroup for _, node := range nodes { node := node if !o.sshManaged(node) { o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node) continue } wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() o.bestEffort(action+" on "+node, func() error { _, err := o.ssh(ctx, node, command) return err }) }() } wg.Wait() } func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error { if !o.sshManaged(node) { return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node) } name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405") _, err := o.ssh(ctx, node, "sudo k3s etcd-snapshot save --name "+name) return err } func (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error) { if !o.sshManaged(node) { return "", fmt.Errorf("cannot resolve etcd snapshot on %s: node not in ssh_managed_nodes", node) } cmd := `sudo sh -lc 'ls -1t /var/lib/rancher/k3s/server/db/snapshots/* 2>/dev/null | head -n 1'` out, err := o.ssh(ctx, node, cmd) if err != nil { return "", fmt.Errorf("resolve latest etcd snapshot on %s: %w", node, err) } snapshot := strings.TrimSpace(out) if snapshot == "" { return "", fmt.Errorf("no etcd snapshots found on %s under /var/lib/rancher/k3s/server/db/snapshots", node) } return snapshot, nil } func (o *Orchestrator) controlPlaneUsesExternalDatastore(ctx context.Context, node string) (bool, error) { out, err := o.ssh(ctx, node, "sudo systemctl cat k3s") if err != nil { return false, fmt.Errorf("inspect k3s service on %s for datastore mode: %w", node, err) } return strings.Contains(out, "--datastore-endpoint="), nil } func (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error { if o.runner.DryRun { return nil } for i := 0; i < attempts; i++ { _, err := o.kubectl(ctx, 5*time.Second, "version", "--request-timeout=5s") if err == nil { return nil } time.Sleep(sleep) } return fmt.Errorf("kubernetes API did not become reachable within timeout") } func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) { out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}") if err != nil { return false, err } return strings.Contains(out, "True"), nil } func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) { urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}") if urlErr == nil { o.log.Printf("flux-source-url=%s", strings.TrimSpace(urlOut)) } branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}") if branchErr == nil { branch := strings.TrimSpace(branchOut) o.log.Printf("flux-source-branch=%s", branch) if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch { o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch) } } } func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { failures := 0 for _, rel := range o.cfg.LocalBootstrapPaths { full := filepath.Join(o.cfg.IACRepoPath, rel) o.log.Printf("local bootstrap apply -k %s", full) if o.runner.DryRun { continue } if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil { o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err) o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full) if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil { failures++ o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr) continue } } } if failures == len(o.cfg.LocalBootstrapPaths) { return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures) } return nil } func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error { cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full) if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil { return err } return nil } func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) { if o.runner.DryRun { return true, nil } deadline := time.Now().Add(window) for { ready, err := o.fluxSourceReady(ctx) if err != nil { return false, err } if ready { return true, nil } if time.Now().After(deadline) { return false, nil } select { case <-ctx.Done(): return false, ctx.Err() case <-time.After(5 * time.Second): } } } func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error { if err := o.patchFluxSuspendAll(ctx, false); err != nil { return err } now := time.Now().UTC().Format(time.RFC3339) if _, err := o.kubectl( ctx, 25*time.Second, "-n", "flux-system", "annotate", "kustomizations.kustomize.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite", ); err != nil { o.log.Printf("warning: annotate kustomizations for reconcile failed: %v", err) } if _, err := o.kubectl( ctx, 25*time.Second, "annotate", "--all-namespaces", "helmreleases.helm.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite", ); err != nil { o.log.Printf("warning: annotate helmreleases for reconcile failed: %v", err) } if o.runner.CommandExists("flux") { sourceCmd := []string{"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=60s"} if _, err := o.run(ctx, 75*time.Second, "flux", sourceCmd...); err != nil { o.log.Printf("warning: flux command failed (%s): %v", strings.Join(sourceCmd, " "), err) } } return nil } func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) { return o.run(ctx, timeout, "kubectl", args...) } func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) { host := node if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" { host = strings.TrimSpace(mapped) } sshUser := o.cfg.SSHUser if override, ok := o.cfg.SSHNodeUsers[node]; ok && strings.TrimSpace(override) != "" { sshUser = strings.TrimSpace(override) } target := host if sshUser != "" { target = sshUser + "@" + host } sshConfigFile := o.resolveSSHConfigFile() sshIdentity := o.resolveSSHIdentityFile() baseArgs := []string{ "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", "-o", "StrictHostKeyChecking=accept-new", } if sshConfigFile != "" { baseArgs = append(baseArgs, "-F", sshConfigFile) } if sshIdentity != "" { baseArgs = append(baseArgs, "-i", sshIdentity) } if o.cfg.SSHPort > 0 { baseArgs = append(baseArgs, "-p", strconv.Itoa(o.cfg.SSHPort)) } attempts := make([][]string, 0, 2) attemptNames := make([]string, 0, 2) if o.cfg.SSHJumpHost != "" { jump := o.cfg.SSHJumpHost if o.cfg.SSHJumpUser != "" { jump = o.cfg.SSHJumpUser + "@" + jump } if o.cfg.SSHPort > 0 && !strings.Contains(jump, ":") { jump = fmt.Sprintf("%s:%d", jump, o.cfg.SSHPort) } withJump := append([]string{}, baseArgs...) withJump = append(withJump, "-J", jump, target, command) attempts = append(attempts, withJump) attemptNames = append(attemptNames, "jump") } direct := append([]string{}, baseArgs...) direct = append(direct, target, command) attempts = append(attempts, direct) attemptNames = append(attemptNames, "direct") var lastOut string var lastErr error for i, args := range attempts { out, err := o.run(ctx, 45*time.Second, "ssh", args...) if err == nil { if i > 0 { o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i]) } return out, nil } lastOut = out lastErr = err if i < len(attempts)-1 { o.log.Printf("warning: ssh %s path failed for %s: %v; trying %s path", attemptNames[i], node, err, attemptNames[i+1]) } } return lastOut, lastErr } func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return o.runner.Run(runCtx, name, args...) } func (o *Orchestrator) runSensitive(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() cmd := exec.CommandContext(runCtx, name, args...) cmd.Env = os.Environ() if o.runner.Kubeconfig != "" { cmd.Env = append(cmd.Env, "KUBECONFIG="+o.runner.Kubeconfig) } out, err := cmd.CombinedOutput() trimmed := strings.TrimSpace(string(out)) if err != nil { if trimmed == "" { return "", fmt.Errorf("%s failed: %w", name, err) } return trimmed, fmt.Errorf("%s failed: %w", name, err) } return trimmed, nil } func lines(in string) []string { in = strings.TrimSpace(in) if in == "" { return nil } parts := strings.Split(in, "\n") out := make([]string, 0, len(parts)) for _, p := range parts { v := strings.TrimSpace(p) if v != "" { out = append(out, v) } } return out } func (o *Orchestrator) sshManaged(node string) bool { if len(o.cfg.SSHManagedNodes) == 0 { return true } for _, allowed := range o.cfg.SSHManagedNodes { if strings.TrimSpace(allowed) == node { return true } } return false } func (o *Orchestrator) resolveSSHConfigFile() string { if strings.TrimSpace(o.cfg.SSHConfigFile) != "" { return strings.TrimSpace(o.cfg.SSHConfigFile) } candidates := []string{ "/home/atlas/.ssh/config", "/home/tethys/.ssh/config", } for _, p := range candidates { if stat, err := os.Stat(p); err == nil && !stat.IsDir() { return p } } return "" } func (o *Orchestrator) resolveSSHIdentityFile() string { if strings.TrimSpace(o.cfg.SSHIdentityFile) != "" { return strings.TrimSpace(o.cfg.SSHIdentityFile) } candidates := []string{ "/home/atlas/.ssh/id_ed25519", "/home/tethys/.ssh/id_ed25519", } for _, p := range candidates { if stat, err := os.Stat(p); err == nil && !stat.IsDir() { return p } } return "" } func (o *Orchestrator) bestEffort(name string, fn func() error) { if err := fn(); err != nil { o.log.Printf("warning: %s: %v", name, err) } } func (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error) { missing := []string{} for _, w := range criticalStartupWorkloads { ready, err := o.workloadReady(ctx, w) if err != nil { if isNotFoundErr(err) { missing = append(missing, fmt.Sprintf("%s/%s/%s(not found)", w.Namespace, w.Kind, w.Name)) continue } return nil, fmt.Errorf("check %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if !ready { missing = append(missing, fmt.Sprintf("%s/%s/%s", w.Namespace, w.Kind, w.Name)) } } return missing, nil } func (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error { for _, w := range criticalStartupWorkloads { if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing, skipping scale: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return fmt.Errorf("scale %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if err := o.waitWorkloadReady(ctx, w); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing during readiness wait: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return err } } return nil } func (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error { _, err := o.kubectl( ctx, 45*time.Second, "-n", w.Namespace, "scale", w.Kind, w.Name, fmt.Sprintf("--replicas=%d", replicas), ) return err } func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error { if w.Namespace == "vault" && w.Kind == "statefulset" && w.Name == "vault" { return o.waitVaultReady(ctx, w) } timeout := "240s" if w.Kind == "statefulset" { timeout = "360s" } _, err := o.kubectl( ctx, 7*time.Minute, "-n", w.Namespace, "rollout", "status", fmt.Sprintf("%s/%s", w.Kind, w.Name), "--timeout="+timeout, ) if err != nil { return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } return nil } func (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error { if o.runner.DryRun { return nil } deadline := time.Now().Add(7 * time.Minute) var lastErr error for time.Now().Before(deadline) { ready, err := o.workloadReady(ctx, w) if err == nil && ready { return nil } if err != nil { lastErr = err } if err := o.ensureVaultUnsealed(ctx); err != nil { lastErr = err o.log.Printf("warning: vault readiness still blocked: %v", err) } ready, err = o.workloadReady(ctx, w) if err == nil && ready { return nil } if err != nil { lastErr = err } select { case <-ctx.Done(): return ctx.Err() case <-time.After(5 * time.Second): } } if lastErr != nil { return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr) } return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name) } func (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error { if o.runner.DryRun { return nil } phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}") if err != nil { return fmt.Errorf("vault pod phase check failed: %w", err) } if strings.TrimSpace(phase) != "Running" { return fmt.Errorf("vault-0 pod phase is %q", strings.TrimSpace(phase)) } sealed, err := o.vaultSealed(ctx) if err != nil { return err } if !sealed { return nil } unsealKey, err := o.vaultUnsealKey(ctx) if err != nil { return err } for attempt := 1; attempt <= 5; attempt++ { o.log.Printf("vault is sealed; attempting auto-unseal (%d/5)", attempt) if _, err := o.runSensitive( ctx, 30*time.Second, "kubectl", "-n", "vault", "exec", "vault-0", "--", "vault", "operator", "unseal", unsealKey, ); err != nil { return fmt.Errorf("vault unseal attempt %d failed: %w", attempt, err) } sealed, err = o.vaultSealed(ctx) if err != nil { return err } if !sealed { o.log.Printf("vault auto-unseal succeeded") return nil } time.Sleep(2 * time.Second) } return fmt.Errorf("vault remained sealed after 5 auto-unseal attempts") } func (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error) { out, err := o.kubectl( ctx, 25*time.Second, "-n", "vault", "exec", "vault-0", "--", "sh", "-lc", "VAULT_ADDR=http://127.0.0.1:8200 vault status -format=json 2>/dev/null || true", ) if err != nil { return false, fmt.Errorf("vault status check failed: %w", err) } sealed, err := parseVaultSealed(out) if err != nil { return false, fmt.Errorf("parse vault status: %w", err) } return sealed, nil } func parseVaultSealed(raw string) (bool, error) { trimmed := strings.TrimSpace(raw) if trimmed == "" { return false, fmt.Errorf("empty vault status output") } start := strings.Index(trimmed, "{") end := strings.LastIndex(trimmed, "}") if start < 0 || end < 0 || end < start { return false, fmt.Errorf("vault status payload missing JSON object") } payload := trimmed[start : end+1] type vaultStatus struct { Sealed bool `json:"sealed"` } var st vaultStatus if err := json.Unmarshal([]byte(payload), &st); err != nil { return false, err } return st.Sealed, nil } func (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error) { out, err := o.kubectl( ctx, 15*time.Second, "-n", "vault", "get", "secret", "vault-init", "-o", "jsonpath={.data.unseal_key_b64}", ) if err != nil { return "", fmt.Errorf("read vault-init secret: %w", err) } decoded, err := base64.StdEncoding.DecodeString(strings.TrimSpace(out)) if err != nil { return "", fmt.Errorf("decode vault-init unseal_key_b64: %w", err) } key := strings.TrimSpace(string(decoded)) if key == "" { return "", fmt.Errorf("vault-init unseal key is empty") } return key, nil } func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) { out, err := o.kubectl( ctx, 20*time.Second, "-n", w.Namespace, "get", w.Kind, w.Name, "-o", "jsonpath={.status.readyReplicas}", ) if err != nil { return false, err } raw := strings.TrimSpace(out) if raw == "" || raw == "" { return false, nil } n, err := strconv.Atoi(raw) if err != nil { return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err) } return n >= 1, nil } func isNotFoundErr(err error) bool { if err == nil { return false } msg := strings.ToLower(err.Error()) return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)") } func (o *Orchestrator) poweroffHosts(ctx context.Context, workers []string) error { delay := o.cfg.Shutdown.PoweroffDelaySeconds if delay <= 0 { delay = 25 } localNames := map[string]struct{}{} if hn, err := os.Hostname(); err == nil && strings.TrimSpace(hn) != "" { localNames[strings.TrimSpace(hn)] = struct{}{} } if o.cfg.SSHUser != "" { localNames[o.cfg.SSHUser] = struct{}{} } hostSet := map[string]struct{}{} for _, n := range o.cfg.ControlPlanes { hostSet[n] = struct{}{} } for _, n := range workers { hostSet[n] = struct{}{} } for _, n := range o.cfg.Shutdown.ExtraPoweroffHosts { if strings.TrimSpace(n) != "" { hostSet[strings.TrimSpace(n)] = struct{}{} } } hosts := make([]string, 0, len(hostSet)) for h := range hostSet { hosts = append(hosts, h) } sort.Strings(hosts) remoteCmd := fmt.Sprintf(`sudo nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &`, delay) for _, host := range hosts { host = strings.TrimSpace(host) if host == "" { continue } if _, isLocal := localNames[host]; isLocal { continue } o.bestEffort("schedule poweroff on "+host, func() error { _, err := o.ssh(ctx, host, remoteCmd) return err }) } if o.cfg.Shutdown.PoweroffLocalHost { o.bestEffort("schedule local host poweroff", func() error { _, err := o.run(ctx, 5*time.Second, "sh", "-c", fmt.Sprintf("nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &", delay+10)) return err }) } return nil }