package cluster import ( "context" "crypto/tls" "encoding/base64" "encoding/json" "errors" "fmt" "io" "log" "net" "net/http" neturl "net/url" "os" "os/exec" "path/filepath" "regexp" "sort" "strconv" "strings" "sync" "time" "unicode" "scm.bstein.dev/bstein/ananke/internal/config" "scm.bstein.dev/bstein/ananke/internal/execx" "scm.bstein.dev/bstein/ananke/internal/sshutil" "scm.bstein.dev/bstein/ananke/internal/state" ) type Orchestrator struct { cfg config.Config runner *execx.Runner store *state.Store log *log.Logger startupReportMu sync.Mutex activeStartupReport *startupReport } type StartupOptions struct { ForceFluxBranch string SkipLocalBootstrap bool Reason string } type ShutdownOptions struct { SkipEtcdSnapshot bool SkipDrain bool Mode string Reason string } type EtcdRestoreOptions struct { ControlPlane string SnapshotPath string } type startupWorkload struct { Namespace string Kind string Name string } type workloadScaleEntry struct { Namespace string `json:"namespace"` Kind string `json:"kind"` Name string `json:"name"` Replicas int `json:"replicas"` } type remotePeerStatus struct { Intent state.Intent BootstrapActive bool } type workloadScaleSnapshot struct { GeneratedAt time.Time `json:"generated_at"` Entries []workloadScaleEntry `json:"entries"` } type startupReport struct { StartedAt time.Time `json:"started_at"` Completed time.Time `json:"completed_at"` Reason string `json:"reason"` Success bool `json:"success"` Error string `json:"error,omitempty"` Checks map[string]startupCheckRecord `json:"checks"` AutoHeals []string `json:"auto_heals"` SourceHost string `json:"source_host"` } type startupCheckRecord struct { Status string `json:"status"` Detail string `json:"detail"` UpdatedAt time.Time `json:"updated_at"` } var datastoreEndpointPattern = regexp.MustCompile(`--datastore-endpoint(?:=|\s+)(?:'([^']+)'|"([^"]+)"|([^\s\\]+))`) var criticalStartupWorkloads = []startupWorkload{ {Namespace: "flux-system", Kind: "deployment", Name: "source-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "kustomize-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "helm-controller"}, {Namespace: "flux-system", Kind: "deployment", Name: "notification-controller"}, {Namespace: "vault", Kind: "statefulset", Name: "vault"}, {Namespace: "postgres", Kind: "statefulset", Name: "postgres"}, {Namespace: "gitea", Kind: "deployment", Name: "gitea"}, {Namespace: "monitoring", Kind: "deployment", Name: "grafana"}, {Namespace: "monitoring", Kind: "deployment", Name: "kube-state-metrics"}, } var ErrEtcdRestoreNotApplicable = errors.New("etcd restore not applicable") func New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator { return &Orchestrator{cfg: cfg, runner: runner, store: store, log: logger} } func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) { unlock, err := state.AcquireLock(o.cfg.State.LockPath) if err != nil { return err } defer unlock() o.beginStartupReport(opts.Reason) defer o.finalizeStartupReport(err) record := state.RunRecord{ ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()), Action: "startup", Reason: opts.Reason, DryRun: o.runner.DryRun, StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) if invErr := o.validateNodeInventory(); invErr != nil { o.noteStartupCheck("node-inventory", false, invErr.Error()) return invErr } o.noteStartupCheck("node-inventory", true, "inventory/user/port validation passed") resumedFlux := false defer func() { if o.runner.DryRun || err == nil || resumedFlux { return } o.log.Printf("warning: startup failed before normal flux resume; attempting best-effort recovery resume") o.bestEffort("restore scaled workloads after failed startup", func() error { return o.restoreScaledApps(ctx) }) o.bestEffort("resume flux after failed startup", func() error { return o.resumeFluxAndReconcile(ctx) }) }() if !o.runner.DryRun { currentIntent, readErr := state.ReadIntent(o.cfg.State.IntentPath) if readErr != nil { return fmt.Errorf("read startup intent: %w", readErr) } if currentIntent.State == state.IntentStartupInProgress { o.log.Printf("warning: detected stale startup intent from a previous interrupted run; clearing it before continuing") if clearErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, "auto-clear stale startup intent", "startup"); clearErr != nil { return fmt.Errorf("clear stale startup intent: %w", clearErr) } currentIntent = state.Intent{State: state.IntentNormal} } if currentIntent.State == state.IntentShuttingDown { if intentFresh(currentIntent, o.startupGuardAge()) { return fmt.Errorf("startup blocked: shutdown intent is active (%s)", currentIntent.Reason) } o.log.Printf("warning: local shutdown intent appears stale (updated_at=%s reason=%q); auto-clearing to continue startup", currentIntent.UpdatedAt.Format(time.RFC3339), currentIntent.Reason) if clearErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, "auto-clear stale shutdown intent", "startup"); clearErr != nil { return fmt.Errorf("clear stale shutdown intent: %w", clearErr) } currentIntent = state.Intent{State: state.IntentNormal} } cooldown := o.startupShutdownCooldown() if currentIntent.State == state.IntentShutdownComplete && intentFresh(currentIntent, cooldown) { elapsed := intentAge(currentIntent) remaining := cooldown - elapsed if remaining < time.Second { remaining = time.Second } o.log.Printf("startup cooldown active: last shutdown completed %s ago; waiting %s", elapsed.Round(time.Second), remaining.Round(time.Second)) timer := time.NewTimer(remaining) select { case <-ctx.Done(): timer.Stop() return fmt.Errorf("startup canceled while waiting for shutdown cooldown: %w", ctx.Err()) case <-timer.C: } refreshed, readErr := state.ReadIntent(o.cfg.State.IntentPath) if readErr != nil { return fmt.Errorf("re-read startup intent after cooldown wait: %w", readErr) } currentIntent = refreshed if currentIntent.State == state.IntentShuttingDown && intentFresh(currentIntent, o.startupGuardAge()) { return fmt.Errorf("startup blocked: shutdown intent became active during cooldown wait (%s)", currentIntent.Reason) } if currentIntent.State == state.IntentShutdownComplete && intentFresh(currentIntent, cooldown) { return fmt.Errorf("startup blocked: shutdown completed too recently (%s ago)", intentAge(currentIntent).Round(time.Second)) } } if err := o.guardPeerStartupIntents(ctx); err != nil { return err } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentStartupInProgress, opts.Reason, "startup"); writeErr != nil { return fmt.Errorf("set startup intent: %w", writeErr) } defer func() { finalReason := opts.Reason if err != nil { finalReason = fmt.Sprintf("%s (failed)", strings.TrimSpace(opts.Reason)) } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, finalReason, "startup"); writeErr != nil { o.log.Printf("warning: write startup completion intent failed: %v", writeErr) } }() } o.log.Printf("startup control-planes=%s", strings.Join(o.cfg.ControlPlanes, ",")) if o.cfg.Startup.RequireTimeSync { if err := o.waitForTimeSync(ctx, o.cfg.ControlPlanes); err != nil { return err } } if err := o.preflightExternalDatastore(ctx); err != nil { return err } o.bestEffort("sync local titan-iac checkout", func() error { return o.syncLocalIACRepo(ctx) }) o.bestEffort("refresh bootstrap cache from local repo", func() error { return o.refreshBootstrapCache(ctx) }) if o.cfg.Startup.ReconcileAccessOnBoot { o.bestEffort("reconcile control-plane access", func() error { return o.reconcileNodeAccess(ctx, o.cfg.ControlPlanes) }) } o.reportFluxSource(ctx, opts.ForceFluxBranch) o.startControlPlanes(ctx, o.cfg.ControlPlanes) apiPoll := time.Duration(o.cfg.Startup.APIPollSeconds) * time.Second apiAttempts := o.cfg.Startup.APIWaitSeconds / o.cfg.Startup.APIPollSeconds if apiAttempts < 1 { apiAttempts = 1 } if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil { if !o.cfg.Startup.AutoEtcdRestoreOnAPIFailure { return err } cp := strings.TrimSpace(o.cfg.Startup.EtcdRestoreControlPlane) if cp == "" && len(o.cfg.ControlPlanes) > 0 { cp = o.cfg.ControlPlanes[0] } o.log.Printf("warning: initial API wait failed (%v); attempting automatic etcd restore on %s", err, cp) if restoreErr := o.EtcdRestore(ctx, EtcdRestoreOptions{ControlPlane: cp}); restoreErr != nil { if errors.Is(restoreErr, ErrEtcdRestoreNotApplicable) { o.log.Printf("warning: automatic etcd restore skipped: %v", restoreErr) o.log.Printf("warning: retrying control-plane start because datastore recovery path is external") o.startControlPlanes(ctx, o.cfg.ControlPlanes) } else { return fmt.Errorf("kubernetes API did not become reachable and automatic etcd restore failed: %w", restoreErr) } } if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil { return fmt.Errorf("kubernetes API did not become reachable after automatic etcd restore: %w", err) } } if err := o.ensureRequiredNodeLabels(ctx); err != nil { return err } desiredFluxBranch := strings.TrimSpace(opts.ForceFluxBranch) if desiredFluxBranch == "" { desiredFluxBranch = strings.TrimSpace(o.cfg.ExpectedFluxBranch) } allowFluxBranchPatch := strings.TrimSpace(opts.ForceFluxBranch) != "" if err := o.guardFluxSourceDrift(ctx, desiredFluxBranch, allowFluxBranchPatch); err != nil { return err } if err := o.ensureFluxBranch(ctx, desiredFluxBranch, allowFluxBranchPatch); err != nil { return err } workers, err := o.effectiveWorkers(ctx) if err != nil { return err } o.log.Printf("startup workers=%s", strings.Join(workers, ",")) if o.cfg.Startup.ReconcileAccessOnBoot { o.bestEffort("reconcile worker access", func() error { return o.reconcileNodeAccess(ctx, workers) }) } o.startWorkers(ctx, workers) o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) }) sshCheckNodes := append([]string{}, o.cfg.ControlPlanes...) sshCheckNodes = append(sshCheckNodes, workers...) if err := o.waitForNodeSSHAuth(ctx, sshCheckNodes); err != nil { o.noteStartupCheck("node-ssh-auth", false, err.Error()) return err } o.noteStartupCheck("node-ssh-auth", true, fmt.Sprintf("nodes=%d", len(sshCheckNodes))) needsLocalBootstrap := false bootstrapReasons := []string{} if !opts.SkipLocalBootstrap { ready, readyErr := o.fluxSourceReady(ctx) if readyErr != nil { o.log.Printf("warning: unable to read flux source readiness: %v", readyErr) needsLocalBootstrap = true bootstrapReasons = append(bootstrapReasons, "flux source readiness check failed") } if !ready { needsLocalBootstrap = true bootstrapReasons = append(bootstrapReasons, "flux source not ready") } } missing, missingErr := o.missingCriticalStartupWorkloads(ctx) if missingErr != nil { o.log.Printf("warning: unable to inspect critical startup workloads: %v", missingErr) } if len(missing) > 0 { o.log.Printf("startup critical workloads not ready; applying targeted recovery first: %s", strings.Join(missing, ", ")) } if o.cfg.Startup.RequireStorageReady { if err := o.waitForStorageReady(ctx); err != nil { o.noteStartupCheck("storage-readiness", false, err.Error()) return err } o.noteStartupCheck("storage-readiness", true, "longhorn and critical PVCs ready") } if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { return err } if !opts.SkipLocalBootstrap && needsLocalBootstrap { if ready, err := o.waitForFluxSourceReady(ctx, 5*time.Minute); err != nil { o.log.Printf("warning: flux source readiness wait failed before local bootstrap: %v", err) } else if ready { o.log.Printf("flux source became ready after targeted recovery; skipping local bootstrap") needsLocalBootstrap = false } } if !opts.SkipLocalBootstrap && needsLocalBootstrap { o.log.Printf("startup bootstrap required after wait: %s", strings.Join(bootstrapReasons, "; ")) if err := o.bootstrapLocal(ctx); err != nil { return err } if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { return err } ready, err := o.fluxSourceReady(ctx) if err != nil { return fmt.Errorf("flux source readiness after bootstrap: %w", err) } if !ready { return fmt.Errorf("flux source still not ready after local bootstrap") } } o.bestEffort("restore scaled workloads", func() error { return o.restoreScaledApps(ctx) }) if err := o.resumeFluxAndReconcile(ctx); err != nil { return err } resumedFlux = true o.bestEffort("heal critical zero-replica workloads", func() error { healed, healErr := o.healCriticalWorkloadReplicas(ctx) if healErr != nil { return healErr } if len(healed) > 0 { sort.Strings(healed) o.noteStartupAutoHeal(fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8))) } return nil }) if err := o.waitForStartupConvergence(ctx); err != nil { return err } if o.cfg.Startup.RequirePostStartProbes { if err := o.waitForPostStartProbes(ctx); err != nil { o.noteStartupCheck("post-start-probes", false, err.Error()) return err } o.noteStartupCheck("post-start-probes", true, "post-start probes passed") } o.log.Printf("startup flow complete") return nil } func (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions) error { controlPlane := strings.TrimSpace(opts.ControlPlane) if controlPlane == "" { if len(o.cfg.ControlPlanes) == 0 { return fmt.Errorf("cannot restore etcd: no control planes configured") } controlPlane = o.cfg.ControlPlanes[0] } found := false for _, cp := range o.cfg.ControlPlanes { if cp == controlPlane { found = true break } } if !found { return fmt.Errorf("cannot restore etcd: control plane %s is not in configured control_planes", controlPlane) } if !o.sshManaged(controlPlane) { return fmt.Errorf("cannot restore etcd on %s: node not in ssh_managed_nodes", controlPlane) } snapshotPath := strings.TrimSpace(opts.SnapshotPath) if o.runner.DryRun { if snapshotPath == "" { snapshotPath = "" } o.log.Printf("etcd restore target=%s snapshot=%s (dry-run; datastore-mode and snapshot checks skipped)", controlPlane, snapshotPath) return nil } externalDatastore, err := o.controlPlaneUsesExternalDatastore(ctx, controlPlane) if err != nil { return err } if externalDatastore { return fmt.Errorf("%w: %s uses --datastore-endpoint", ErrEtcdRestoreNotApplicable, controlPlane) } if snapshotPath == "" { resolved, err := o.latestEtcdSnapshotPath(ctx, controlPlane) if err != nil { return err } snapshotPath = resolved } if err := o.verifyEtcdSnapshot(ctx, controlPlane, snapshotPath); err != nil { return err } o.log.Printf("etcd restore target=%s snapshot=%s", controlPlane, snapshotPath) for _, cp := range o.cfg.ControlPlanes { cp := cp o.bestEffort("stop k3s before etcd restore on "+cp, func() error { _, err := o.ssh(ctx, cp, "sudo systemctl stop k3s || true") return err }) } if _, err := o.runSudoK3S(ctx, controlPlane, "server", "--cluster-reset", "--cluster-reset-restore-path", snapshotPath); err != nil { return fmt.Errorf("etcd restore command failed on %s: %w", controlPlane, err) } o.log.Printf("etcd restore command completed on %s", controlPlane) if _, err := o.ssh(ctx, controlPlane, "sudo systemctl start k3s || true"); err != nil { return fmt.Errorf("failed to start k3s on restore control plane %s: %w", controlPlane, err) } time.Sleep(10 * time.Second) for _, cp := range o.cfg.ControlPlanes { cp := cp if cp == controlPlane { continue } o.bestEffort("start k3s after etcd restore on "+cp, func() error { _, err := o.ssh(ctx, cp, "sudo systemctl start k3s || true") return err }) } return nil } func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) { unlock, err := state.AcquireLock(o.cfg.State.LockPath) if err != nil { return err } defer unlock() if invErr := o.validateNodeInventory(); invErr != nil { return invErr } record := state.RunRecord{ ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()), Action: "shutdown", Reason: opts.Reason, DryRun: o.runner.DryRun, StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) if !o.runner.DryRun { if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil { return fmt.Errorf("set shutdown intent: %w", writeErr) } defer func() { final := state.IntentShuttingDown if err == nil { final = state.IntentShutdownComplete } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil { o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr) } }() } workers, err := o.effectiveWorkers(ctx) if err != nil { return err } o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) o.reportFluxSource(ctx, "") skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot if !skipEtcd { o.bestEffort("etcd snapshot", func() error { return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0]) }) } o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) }) o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) }) skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain if !skipDrain { o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) }) } shutdownMode := strings.TrimSpace(opts.Mode) poweroffEnabled := o.cfg.Shutdown.PoweroffEnabled switch shutdownMode { case "": // Safe default for internal triggers and older callers. poweroffEnabled = false case "config": // Honor configured behavior only when explicitly requested. case "cluster-only": poweroffEnabled = false case "poweroff": poweroffEnabled = true default: return fmt.Errorf("unsupported shutdown mode %q (expected config|cluster-only|poweroff)", shutdownMode) } modeLabel := "cluster-only" if poweroffEnabled { modeLabel = "cluster-and-host-poweroff" } o.log.Printf("shutdown execution mode=%s (requested=%q config_poweroff_enabled=%t)", modeLabel, shutdownMode, o.cfg.Shutdown.PoweroffEnabled) o.stopWorkers(ctx, workers) o.stopControlPlanes(ctx, o.cfg.ControlPlanes) if poweroffEnabled { o.bestEffort("poweroff hosts", func() error { return o.poweroffHosts(ctx, workers) }) } o.log.Printf("shutdown flow complete") return nil } func (o *Orchestrator) EstimatedShutdownSeconds() int { return o.store.ShutdownP95WithMinSamples(o.cfg.Shutdown.DefaultBudgetSeconds, o.cfg.Shutdown.HistoryMinSamples) } func (o *Orchestrator) EstimatedEmergencyShutdownSeconds() int { return o.store.ShutdownP95ByReasonPrefix( o.cfg.Shutdown.EmergencyBudgetSec, o.cfg.Shutdown.EmergencyMinSamples, []string{"ups-", "emergency-", "drill-emergency"}, ) } func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) { record.EndedAt = time.Now().UTC() record.DurationSeconds = int(record.EndedAt.Sub(record.StartedAt).Seconds()) record.Success = *err == nil if *err != nil { record.Error = (*err).Error() } if appendErr := o.store.Append(*record); appendErr != nil { o.log.Printf("warning: append run record failed: %v", appendErr) } } func (o *Orchestrator) startupReportPath() string { return filepath.Join(o.cfg.State.Dir, "last-startup-report.json") } func (o *Orchestrator) beginStartupReport(reason string) { host, _ := os.Hostname() o.startupReportMu.Lock() defer o.startupReportMu.Unlock() o.activeStartupReport = &startupReport{ StartedAt: time.Now().UTC(), Reason: strings.TrimSpace(reason), Checks: map[string]startupCheckRecord{}, AutoHeals: []string{}, SourceHost: strings.TrimSpace(host), } } func (o *Orchestrator) noteStartupCheck(name string, success bool, detail string) { o.startupReportMu.Lock() defer o.startupReportMu.Unlock() if o.activeStartupReport == nil { return } status := "failed" if success { status = "passed" } o.activeStartupReport.Checks[strings.TrimSpace(name)] = startupCheckRecord{ Status: status, Detail: strings.TrimSpace(detail), UpdatedAt: time.Now().UTC(), } } func (o *Orchestrator) noteStartupAutoHeal(detail string) { o.startupReportMu.Lock() defer o.startupReportMu.Unlock() if o.activeStartupReport == nil { return } detail = strings.TrimSpace(detail) if detail == "" { return } o.activeStartupReport.AutoHeals = append(o.activeStartupReport.AutoHeals, detail) } func (o *Orchestrator) finalizeStartupReport(runErr error) { o.startupReportMu.Lock() report := o.activeStartupReport o.activeStartupReport = nil o.startupReportMu.Unlock() if report == nil { return } report.Completed = time.Now().UTC() report.Success = runErr == nil if runErr != nil { report.Error = strings.TrimSpace(runErr.Error()) } b, err := json.MarshalIndent(report, "", " ") if err != nil { o.log.Printf("warning: encode startup report failed: %v", err) return } path := o.startupReportPath() if writeErr := os.WriteFile(path, b, 0o640); writeErr != nil { o.log.Printf("warning: write startup report %s failed: %v", path, writeErr) return } o.log.Printf("startup report written: %s", path) } func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) { if len(o.cfg.Workers) > 0 { return append([]string{}, o.cfg.Workers...), nil } workers, err := o.discoverWorkers(ctx) if err == nil { return workers, nil } fallback := o.fallbackWorkersFromInventory() if len(fallback) == 0 { return nil, err } o.log.Printf("warning: worker discovery failed via kubernetes API (%v); falling back to inventory workers=%s", err, strings.Join(fallback, ",")) return fallback, nil } func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 15*time.Second, "get", "nodes", "-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master", "--no-headers", ) if err != nil { return nil, fmt.Errorf("discover workers: %w", err) } var workers []string for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 3 { continue } if fields[1] == "" && fields[2] == "" { workers = append(workers, fields[0]) } } if len(workers) == 0 { return nil, fmt.Errorf("no workers discovered") } return workers, nil } func (o *Orchestrator) fallbackWorkersFromInventory() []string { cp := make(map[string]struct{}, len(o.cfg.ControlPlanes)) for _, node := range o.cfg.ControlPlanes { cp[strings.TrimSpace(node)] = struct{}{} } candidates := make(map[string]struct{}) add := func(node string) { name := strings.TrimSpace(node) if name == "" { return } if _, isCP := cp[name]; isCP { return } candidates[name] = struct{}{} } for _, node := range o.cfg.SSHManagedNodes { add(node) } if len(candidates) == 0 { for node := range o.cfg.SSHNodeHosts { add(node) } } workers := make([]string, 0, len(candidates)) for node := range candidates { workers = append(workers, node) } sort.Strings(workers) return workers } func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error { patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend) ksOut, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}", ) if err != nil { return err } for _, ks := range lines(ksOut) { _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch) if patchErr != nil { o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr) } } hrOut, err := o.kubectl(ctx, 25*time.Second, "get", "helmreleases.helm.toolkit.fluxcd.io", "-A", "-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}", ) if err != nil { return err } for _, hr := range lines(hrOut) { parts := strings.SplitN(hr, "/", 2) if len(parts) != 2 { continue } _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch) if patchErr != nil { o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr) } } return nil } func (o *Orchestrator) scaleDownApps(ctx context.Context) error { targets, err := o.listScalableWorkloads(ctx) if err != nil { return err } if err := o.writeScaledWorkloadSnapshot(targets); err != nil { return err } if len(targets) == 0 { o.log.Printf("scale down apps: no workloads above 0 replicas") return nil } parallelism := o.cfg.Shutdown.ScaleParallelism if parallelism <= 0 { parallelism = 8 } o.log.Printf("scale down apps targets=%d parallelism=%d", len(targets), parallelism) return o.scaleWorkloads(ctx, targets, 0, parallelism) } func (o *Orchestrator) restoreScaledApps(ctx context.Context) error { snapshot, err := o.readScaledWorkloadSnapshot() if err != nil { return err } if snapshot == nil || len(snapshot.Entries) == 0 { o.log.Printf("restore scaled workloads: no snapshot entries to restore") return nil } parallelism := o.cfg.Shutdown.ScaleParallelism if parallelism <= 0 { parallelism = 8 } o.log.Printf("restore scaled workloads entries=%d parallelism=%d snapshot_at=%s", len(snapshot.Entries), parallelism, snapshot.GeneratedAt.Format(time.RFC3339)) if err := o.scaleWorkloads(ctx, snapshot.Entries, -1, parallelism); err != nil { return err } if o.runner.DryRun { return nil } if err := os.Remove(o.scaledWorkloadSnapshotPath()); err != nil && !os.IsNotExist(err) { return fmt.Errorf("remove scaled workload snapshot: %w", err) } return nil } func (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error) { exclude := map[string]struct{}{} for _, ns := range o.cfg.ExcludedNamespaces { exclude[strings.TrimSpace(ns)] = struct{}{} } collect := func(kind string) ([]workloadScaleEntry, error) { out, err := o.kubectl( ctx, 25*time.Second, "get", kind, "-A", "-o", "jsonpath={range .items[*]}{.metadata.namespace}{'\\t'}{.metadata.name}{'\\t'}{.spec.replicas}{'\\n'}{end}", ) if err != nil { return nil, err } var entries []workloadScaleEntry for _, line := range lines(out) { parts := strings.Split(line, "\t") if len(parts) < 3 { continue } ns := strings.TrimSpace(parts[0]) if _, skip := exclude[ns]; skip { continue } replicas, convErr := strconv.Atoi(strings.TrimSpace(parts[2])) if convErr != nil || replicas <= 0 { continue } entries = append(entries, workloadScaleEntry{ Namespace: ns, Kind: kind, Name: strings.TrimSpace(parts[1]), Replicas: replicas, }) } return entries, nil } deployments, err := collect("deployment") if err != nil { return nil, fmt.Errorf("collect deployments: %w", err) } statefulsets, err := collect("statefulset") if err != nil { return nil, fmt.Errorf("collect statefulsets: %w", err) } targets := append(deployments, statefulsets...) sort.Slice(targets, func(i, j int) bool { a, b := targets[i], targets[j] if a.Namespace != b.Namespace { return a.Namespace < b.Namespace } if a.Kind != b.Kind { return a.Kind < b.Kind } return a.Name < b.Name }) return targets, nil } func (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error { if len(entries) == 0 { return nil } if parallelism <= 0 { parallelism = 1 } if parallelism > len(entries) { parallelism = len(entries) } sem := make(chan struct{}, parallelism) var wg sync.WaitGroup errCh := make(chan error, len(entries)) for _, entry := range entries { entry := entry wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() replicas := entry.Replicas if forceReplicas >= 0 { replicas = forceReplicas } if _, err := o.kubectl( ctx, 20*time.Second, "-n", entry.Namespace, "scale", entry.Kind, entry.Name, fmt.Sprintf("--replicas=%d", replicas), ); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: skip missing workload while scaling %s/%s/%s", entry.Namespace, entry.Kind, entry.Name) return } errCh <- fmt.Errorf("scale %s/%s/%s -> %d: %w", entry.Namespace, entry.Kind, entry.Name, replicas, err) } }() } wg.Wait() close(errCh) errorCount := len(errCh) if errorCount == 0 { return nil } errs := make([]string, 0, len(errCh)) for err := range errCh { errs = append(errs, err.Error()) if len(errs) >= 5 { break } } return fmt.Errorf("scaling had %d errors (first: %s)", errorCount, strings.Join(errs, " | ")) } func (o *Orchestrator) scaledWorkloadSnapshotPath() string { return filepath.Join(o.cfg.State.Dir, "scaled-workloads.json") } func (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error { if o.runner.DryRun { return nil } if err := os.MkdirAll(o.cfg.State.Dir, 0o755); err != nil { return fmt.Errorf("ensure state dir: %w", err) } payload := workloadScaleSnapshot{ GeneratedAt: time.Now().UTC(), Entries: entries, } b, err := json.MarshalIndent(payload, "", " ") if err != nil { return fmt.Errorf("marshal scaled workload snapshot: %w", err) } if err := os.WriteFile(o.scaledWorkloadSnapshotPath(), b, 0o644); err != nil { return fmt.Errorf("write scaled workload snapshot: %w", err) } return nil } func (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error) { if o.runner.DryRun { return nil, nil } b, err := os.ReadFile(o.scaledWorkloadSnapshotPath()) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, fmt.Errorf("read scaled workload snapshot: %w", err) } var snapshot workloadScaleSnapshot if err := json.Unmarshal(b, &snapshot); err != nil { return nil, fmt.Errorf("decode scaled workload snapshot: %w", err) } return &snapshot, nil } type drainFailure struct { node string err error details string } func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error { total := len(workers) if total == 0 { return nil } parallelism := o.cfg.Shutdown.DrainParallelism if parallelism <= 0 { parallelism = 6 } if parallelism > total { parallelism = total } o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism) sem := make(chan struct{}, parallelism) var wg sync.WaitGroup errCh := make(chan drainFailure, total) for idx, node := range workers { idx := idx node := node wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() o.log.Printf("drain worker %d/%d: %s", idx+1, total, node) if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil { o.log.Printf("warning: cordon %s failed: %v", node, err) } if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil { details := o.drainNodeDiagnostics(ctx, node) errCh <- drainFailure{ node: node, err: fmt.Errorf("drain %s failed: %w", node, err), details: details, } return } }() } wg.Wait() close(errCh) if len(errCh) == 0 { return nil } failures := make([]drainFailure, 0, len(errCh)) for failure := range errCh { failures = append(failures, failure) } count := len(failures) samples := []string{} for _, failure := range failures { msg := failure.err.Error() if strings.TrimSpace(failure.details) != "" { msg = fmt.Sprintf("%s (details: %s)", msg, failure.details) } samples = append(samples, msg) if len(samples) >= 4 { break } } return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | ")) } func (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string { out, err := o.kubectl( ctx, 20*time.Second, "get", "pods", "-A", "--field-selector", "spec.nodeName="+node, "-o", "custom-columns=NS:.metadata.namespace,NAME:.metadata.name,PHASE:.status.phase,OWNER:.metadata.ownerReferences[0].kind", "--no-headers", ) if err != nil { if strings.TrimSpace(out) == "" { return fmt.Sprintf("diagnostics unavailable: %v", err) } return fmt.Sprintf("diagnostics unavailable: %v (%s)", err, strings.Join(lines(out), "; ")) } blockers := make([]string, 0, 6) for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 4 { continue } namespace := fields[0] name := fields[1] phase := fields[2] owner := fields[3] if strings.EqualFold(owner, "DaemonSet") { continue } if strings.EqualFold(phase, "Succeeded") || strings.EqualFold(phase, "Failed") { continue } blockers = append(blockers, fmt.Sprintf("%s/%s(phase=%s owner=%s)", namespace, name, phase, owner)) if len(blockers) >= 6 { break } } if len(blockers) == 0 { return "no non-daemonset blocking pods found on node" } return strings.Join(blockers, ", ") } func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error { for _, node := range workers { if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil { o.log.Printf("warning: uncordon %s failed: %v", node, err) } } return nil } func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) { o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true") } func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) { o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true") } func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) { o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true") } func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) { o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true") } func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) { if len(nodes) == 0 { return } parallelism := o.cfg.Shutdown.SSHParallelism if parallelism <= 0 { parallelism = 8 } if parallelism > len(nodes) { parallelism = len(nodes) } sem := make(chan struct{}, parallelism) var wg sync.WaitGroup for _, node := range nodes { node := node if !o.sshManaged(node) { o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node) continue } wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() o.bestEffort(action+" on "+node, func() error { _, err := o.ssh(ctx, node, command) return err }) }() } wg.Wait() } func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error { if !o.sshManaged(node) { return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node) } name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405") _, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "save", "--name", name) return err } func (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error) { if !o.sshManaged(node) { return "", fmt.Errorf("cannot resolve etcd snapshot on %s: node not in ssh_managed_nodes", node) } out, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls") if err != nil { return "", fmt.Errorf("resolve latest etcd snapshot on %s: %w", node, err) } snapshot := parseSnapshotPathFromEtcdSnapshotList(out) if snapshot == "" { return "", fmt.Errorf("no etcd snapshots found on %s under /var/lib/rancher/k3s/server/db/snapshots", node) } return snapshot, nil } func parseSnapshotPathFromEtcdSnapshotList(out string) string { for _, line := range lines(out) { trimmed := strings.TrimSpace(line) if trimmed == "" { continue } lower := strings.ToLower(trimmed) if strings.HasPrefix(lower, "name") && strings.Contains(lower, "location") { continue } for _, field := range strings.Fields(trimmed) { candidate := strings.Trim(strings.TrimSpace(field), "\",") candidate = strings.TrimPrefix(candidate, "file://") if strings.HasPrefix(candidate, "/var/lib/rancher/k3s/server/db/snapshots/") { return candidate } } } return "" } func intentAge(in state.Intent) time.Duration { if in.UpdatedAt.IsZero() { return 0 } return time.Since(in.UpdatedAt) } func intentFresh(in state.Intent, maxAge time.Duration) bool { if in.UpdatedAt.IsZero() { return true } return intentAge(in) <= maxAge } func (o *Orchestrator) startupGuardAge() time.Duration { seconds := o.cfg.Coordination.StartupGuardMaxAgeSec if seconds <= 0 { seconds = 900 } return time.Duration(seconds) * time.Second } func (o *Orchestrator) startupShutdownCooldown() time.Duration { seconds := o.cfg.Startup.ShutdownCooldownSeconds if seconds <= 0 { seconds = 45 } return time.Duration(seconds) * time.Second } func (o *Orchestrator) coordinationPeers() []string { seen := map[string]struct{}{} out := make([]string, 0, len(o.cfg.Coordination.PeerHosts)+1) add := func(node string) { node = strings.TrimSpace(node) if node == "" { return } if _, ok := seen[node]; ok { return } seen[node] = struct{}{} out = append(out, node) } for _, node := range o.cfg.Coordination.PeerHosts { add(node) } if strings.TrimSpace(o.cfg.Coordination.ForwardShutdownHost) != "" { add(o.cfg.Coordination.ForwardShutdownHost) } return out } func (o *Orchestrator) guardPeerStartupIntents(ctx context.Context) error { peers := o.coordinationPeers() if len(peers) == 0 { return nil } guardAge := o.startupGuardAge() localRole := strings.ToLower(strings.TrimSpace(o.cfg.Coordination.Role)) for _, peer := range peers { peerStatus, err := o.readRemotePeerStatus(ctx, peer) if err != nil { o.log.Printf("warning: peer startup guard skipped intent check for %s: %v", peer, err) continue } intent := peerStatus.Intent switch intent.State { case "", state.IntentNormal: continue case state.IntentShuttingDown: if intentFresh(intent, guardAge) { return fmt.Errorf("startup blocked: peer %s has active shutdown intent (reason=%q age=%s)", peer, intent.Reason, intentAge(intent).Round(time.Second)) } o.log.Printf("warning: peer %s shutdown intent appears stale; allowing startup", peer) case state.IntentStartupInProgress: if !peerStatus.BootstrapActive { o.log.Printf("warning: peer %s reports startup_in_progress but bootstrap service is inactive (reason=%q age=%s); auto-clearing stale peer intent", peer, intent.Reason, intentAge(intent).Round(time.Second)) o.bestEffort(fmt.Sprintf("clear stale peer startup intent on %s", peer), func() error { return o.clearRemotePeerIntent(ctx, peer, "auto-clear stale peer startup intent") }) continue } if localRole == "coordinator" && strings.EqualFold(strings.TrimSpace(intent.Reason), "manual-startup") { o.log.Printf("warning: peer %s has manual startup in progress (age=%s); allowing coordinator startup to continue", peer, intentAge(intent).Round(time.Second)) continue } if intentFresh(intent, guardAge) { return fmt.Errorf("startup blocked: peer %s reports startup_in_progress (reason=%q age=%s)", peer, intent.Reason, intentAge(intent).Round(time.Second)) } o.log.Printf("warning: peer %s startup intent appears stale; auto-clearing and allowing startup", peer) o.bestEffort(fmt.Sprintf("clear stale peer startup intent on %s", peer), func() error { return o.clearRemotePeerIntent(ctx, peer, "auto-clear stale peer startup intent") }) case state.IntentShutdownComplete: if intentFresh(intent, o.startupShutdownCooldown()) { return fmt.Errorf("startup blocked: peer %s completed shutdown too recently (age=%s)", peer, intentAge(intent).Round(time.Second)) } default: o.log.Printf("warning: peer %s intent state %q is unknown; ignoring", peer, intent.State) } } return nil } func (o *Orchestrator) readRemoteIntent(ctx context.Context, node string) (state.Intent, error) { peer, err := o.readRemotePeerStatus(ctx, node) if err != nil { return state.Intent{}, err } return peer.Intent, nil } func (o *Orchestrator) readRemotePeerStatus(ctx context.Context, node string) (remotePeerStatus, error) { if !o.sshManaged(node) { return remotePeerStatus{}, fmt.Errorf("%s is not in ssh_managed_nodes", node) } out, err := o.ssh(ctx, node, "if sudo -n /usr/bin/systemctl is-active --quiet ananke-bootstrap.service; then echo __ANANKE_BOOTSTRAP_ACTIVE__; else echo __ANANKE_BOOTSTRAP_IDLE__; fi; sudo -n /usr/local/bin/ananke intent --config /etc/ananke/ananke.yaml") if err != nil { return remotePeerStatus{}, err } status := remotePeerStatus{ BootstrapActive: strings.Contains(out, "__ANANKE_BOOTSTRAP_ACTIVE__") || strings.Contains(out, "__ANANKE_BOOTSTRAP_ACTIVE__"), } in, err := state.ParseIntentOutput(out) if err != nil { return remotePeerStatus{}, fmt.Errorf("parse remote intent output: %w", err) } status.Intent = in return status, nil } func (o *Orchestrator) clearRemotePeerIntent(ctx context.Context, node string, reason string) error { cmd := fmt.Sprintf( "sudo -n /usr/local/bin/ananke intent --config /etc/ananke/ananke.yaml --set normal --reason %s --source startup --execute", shellQuote(reason), ) _, err := o.ssh(ctx, node, cmd) return err } func shellQuote(v string) string { return "'" + strings.ReplaceAll(v, "'", `'"'"'`) + "'" } func (o *Orchestrator) verifyEtcdSnapshot(ctx context.Context, node string, snapshotPath string) error { if o.runner.DryRun { return nil } path := strings.TrimSpace(snapshotPath) if path == "" { return fmt.Errorf("etcd snapshot verification failed: snapshot path is empty") } quoted := shellQuote(path) sizeOut, err := o.ssh(ctx, node, fmt.Sprintf("sudo -n sh -lc 'test -s %s && stat -c %%s %s'", quoted, quoted)) if err != nil { return fmt.Errorf("etcd snapshot verification failed for %s on %s: %w", path, node, err) } size, convErr := strconv.ParseInt(strings.TrimSpace(sizeOut), 10, 64) if convErr != nil { return fmt.Errorf("etcd snapshot verification failed for %s on %s: parse size %q: %w", path, node, strings.TrimSpace(sizeOut), convErr) } const minSnapshotBytes = int64(1 << 20) // 1 MiB sanity floor. if size < minSnapshotBytes { return fmt.Errorf("etcd snapshot verification failed for %s on %s: snapshot too small (%d bytes)", path, node, size) } lsOut, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls") if err != nil { return fmt.Errorf("etcd snapshot verification failed for %s on %s: list snapshots: %w", path, node, err) } if !strings.Contains(lsOut, path) && !strings.Contains(lsOut, filepath.Base(path)) { return fmt.Errorf("etcd snapshot verification failed for %s on %s: snapshot is not present in k3s etcd-snapshot ls output", path, node) } sumOut, err := o.ssh(ctx, node, fmt.Sprintf("sudo -n sh -lc 'sha256sum %s | awk \"{print \\$1}\"'", quoted)) if err != nil { return fmt.Errorf("etcd snapshot verification failed for %s on %s: sha256: %w", path, node, err) } hash := strings.TrimSpace(sumOut) if len(hash) != 64 { return fmt.Errorf("etcd snapshot verification failed for %s on %s: invalid sha256 %q", path, node, hash) } o.log.Printf("etcd snapshot verified path=%s size_bytes=%d sha256=%s", path, size, hash[:12]) return nil } func (o *Orchestrator) runSudoK3S(ctx context.Context, node string, args ...string) (string, error) { k3sPaths := []string{ "/usr/local/bin/k3s", "/usr/bin/k3s", "k3s", } var lastErr error for _, path := range k3sPaths { parts := []string{"sudo", "-n", path} parts = append(parts, args...) command := strings.Join(parts, " ") out, err := o.ssh(ctx, node, command) if err == nil { return out, nil } lastErr = err } if lastErr == nil { lastErr = fmt.Errorf("no k3s executable candidates configured") } return "", lastErr } func (o *Orchestrator) controlPlaneUsesExternalDatastore(ctx context.Context, node string) (bool, error) { out, err := o.ssh(ctx, node, "sudo systemctl cat k3s") if err != nil { return false, fmt.Errorf("inspect k3s service on %s for datastore mode: %w", node, err) } return strings.Contains(out, "--datastore-endpoint="), nil } func (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error { if o.runner.DryRun { return nil } for i := 0; i < attempts; i++ { _, err := o.kubectl(ctx, 5*time.Second, "version", "--request-timeout=5s") if err == nil { return nil } time.Sleep(sleep) } return fmt.Errorf("kubernetes API did not become reachable within timeout") } func (o *Orchestrator) waitForTimeSync(ctx context.Context, nodes []string) error { if o.runner.DryRun { return nil } wait := time.Duration(o.cfg.Startup.TimeSyncWaitSeconds) * time.Second if wait <= 0 { wait = 240 * time.Second } poll := time.Duration(o.cfg.Startup.TimeSyncPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } mode := strings.ToLower(strings.TrimSpace(o.cfg.Startup.TimeSyncMode)) if mode == "" { mode = "strict" } managedControlPlanes := 0 for _, node := range nodes { node = strings.TrimSpace(node) if node == "" { continue } if o.sshManaged(node) { managedControlPlanes++ } } requiredQuorum := o.cfg.Startup.TimeSyncQuorum if requiredQuorum <= 0 { requiredQuorum = managedControlPlanes if requiredQuorum <= 0 { requiredQuorum = 1 } } if requiredQuorum > managedControlPlanes && managedControlPlanes > 0 { requiredQuorum = managedControlPlanes } deadline := time.Now().Add(wait) for { unsynced := []string{} syncedControlPlanes := 0 checkedControlPlanes := 0 localOut, localErr := o.run(ctx, 10*time.Second, "sh", "-lc", "timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown") localSynced := localErr == nil && isTimeSynced(localOut) if !localSynced { if localErr != nil { unsynced = append(unsynced, fmt.Sprintf("local(%v)", localErr)) } else { unsynced = append(unsynced, fmt.Sprintf("local(%s)", strings.TrimSpace(localOut))) } } for _, node := range nodes { node = strings.TrimSpace(node) if node == "" { continue } if !o.sshManaged(node) { continue } checkedControlPlanes++ out, err := o.ssh(ctx, node, "timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown") if err != nil || !isTimeSynced(out) { if err != nil { unsynced = append(unsynced, fmt.Sprintf("%s(%v)", node, err)) } else { unsynced = append(unsynced, fmt.Sprintf("%s(%s)", node, strings.TrimSpace(out))) } } else { syncedControlPlanes++ } } ready := false switch mode { case "quorum": if localSynced && syncedControlPlanes >= requiredQuorum { ready = true } default: if localSynced && len(unsynced) == 0 { ready = true } } if ready { return nil } if time.Now().After(deadline) { if mode == "quorum" { return fmt.Errorf( "startup blocked: time sync quorum not ready within %s (mode=quorum local_synced=%t synced_control_planes=%d required=%d checked=%d details=%s)", wait, localSynced, syncedControlPlanes, requiredQuorum, checkedControlPlanes, strings.Join(unsynced, ", "), ) } return fmt.Errorf("startup blocked: time sync not ready within %s (%s)", wait, strings.Join(unsynced, ", ")) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func isTimeSynced(raw string) bool { v := strings.ToLower(strings.TrimSpace(raw)) return v == "yes" || v == "true" || v == "1" } func (o *Orchestrator) preflightExternalDatastore(ctx context.Context) error { if len(o.cfg.ControlPlanes) == 0 { return nil } controlPlane := strings.TrimSpace(o.cfg.ControlPlanes[0]) if controlPlane == "" || !o.sshManaged(controlPlane) { return nil } unitOut, err := o.ssh(ctx, controlPlane, "sudo systemctl cat k3s") if err != nil { o.log.Printf("warning: external datastore preflight skipped: unable to inspect %s k3s unit: %v", controlPlane, err) return nil } datastoreEndpoint := parseDatastoreEndpoint(unitOut) if datastoreEndpoint == "" { return nil } u, err := neturl.Parse(datastoreEndpoint) if err != nil || u.Host == "" { o.log.Printf("warning: external datastore preflight skipped: unable to parse datastore endpoint %q", datastoreEndpoint) return nil } host := strings.TrimSpace(u.Hostname()) port := strings.TrimSpace(u.Port()) if port == "" { port = "5432" } address := net.JoinHostPort(host, port) if o.tcpReachable(address, 3*time.Second) { return nil } o.log.Printf("warning: datastore endpoint %s is unreachable; attempting software recovery", address) if node := o.nodeNameForHost(host); node != "" && o.sshManaged(node) { o.bestEffort("restart datastore service on "+node, func() error { _, err := o.ssh(ctx, node, "sudo systemctl restart postgresql || sudo systemctl restart postgresql@16-main || sudo systemctl restart postgres") return err }) } deadline := time.Now().Add(90 * time.Second) for time.Now().Before(deadline) { if o.tcpReachable(address, 3*time.Second) { return nil } select { case <-ctx.Done(): return ctx.Err() case <-time.After(3 * time.Second): } } return fmt.Errorf("startup blocked: external datastore endpoint %s remained unreachable after recovery attempt", address) } func parseDatastoreEndpoint(unitText string) string { if match := datastoreEndpointPattern.FindStringSubmatch(unitText); len(match) == 4 { for _, candidate := range match[1:] { value := strings.TrimSpace(candidate) if value != "" { return value } } } for _, raw := range strings.Split(unitText, "\n") { line := strings.TrimSpace(raw) idx := strings.Index(line, "--datastore-endpoint") if idx < 0 { continue } value := strings.TrimSpace(line[idx+len("--datastore-endpoint"):]) value = strings.TrimSpace(strings.TrimPrefix(value, "=")) value = strings.TrimSuffix(strings.TrimSpace(value), "\\") value = strings.Trim(value, `"'`) if value != "" { return value } } return "" } func (o *Orchestrator) nodeNameForHost(host string) string { host = strings.TrimSpace(host) if host == "" { return "" } if _, ok := o.cfg.SSHNodeHosts[host]; ok { return host } for node, mapped := range o.cfg.SSHNodeHosts { if strings.TrimSpace(mapped) == host { return strings.TrimSpace(node) } } return "" } func (o *Orchestrator) inventoryNodesForValidation() []string { set := map[string]struct{}{} add := func(node string) { node = strings.TrimSpace(node) if node == "" { return } set[node] = struct{}{} } for _, n := range o.cfg.ControlPlanes { add(n) } for _, n := range o.cfg.Workers { add(n) } for _, n := range o.cfg.SSHManagedNodes { add(n) } for _, n := range o.cfg.Shutdown.ExtraPoweroffHosts { add(n) } for _, n := range o.cfg.Coordination.PeerHosts { add(n) } add(o.cfg.Coordination.ForwardShutdownHost) nodes := make([]string, 0, len(set)) for n := range set { nodes = append(nodes, n) } sort.Strings(nodes) return nodes } func (o *Orchestrator) validateNodeInventory() error { issues := []string{} if o.cfg.SSHPort <= 0 || o.cfg.SSHPort > 65535 { issues = append(issues, fmt.Sprintf("ssh_port=%d is invalid", o.cfg.SSHPort)) } managed := makeStringSet(o.cfg.SSHManagedNodes) for _, cp := range o.cfg.ControlPlanes { cp = strings.TrimSpace(cp) if cp == "" { continue } if _, ok := managed[cp]; !ok { issues = append(issues, fmt.Sprintf("control plane %s is missing from ssh_managed_nodes", cp)) } } for _, node := range o.cfg.Workers { node = strings.TrimSpace(node) if node == "" { continue } if _, ok := managed[node]; !ok { issues = append(issues, fmt.Sprintf("worker %s is missing from ssh_managed_nodes", node)) } } baseUser := strings.TrimSpace(o.cfg.SSHUser) for _, node := range o.inventoryNodesForValidation() { if _, ok := o.cfg.SSHNodeHosts[node]; !ok { issues = append(issues, fmt.Sprintf("%s is missing ssh_node_hosts entry", node)) } host := strings.TrimSpace(o.cfg.SSHNodeHosts[node]) if host == "" { host = node } if strings.ContainsAny(host, " \t\r\n") { issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains whitespace)", node, host)) } if strings.Contains(host, "/") { issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains slash)", node, host)) } user := baseUser if override, ok := o.cfg.SSHNodeUsers[node]; ok { user = strings.TrimSpace(override) } if user == "" { issues = append(issues, fmt.Sprintf("%s has empty ssh user (ssh_user/ssh_node_users)", node)) } if strings.ContainsAny(user, " \t\r\n@") { issues = append(issues, fmt.Sprintf("%s has invalid ssh user %q", node, user)) } } if len(issues) > 0 { sort.Strings(issues) return fmt.Errorf("node inventory preflight failed: %s", joinLimited(issues, 10)) } return nil } func (o *Orchestrator) tcpReachable(address string, timeout time.Duration) bool { conn, err := net.DialTimeout("tcp", address, timeout) if err != nil { return false } _ = conn.Close() return true } func (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) error { if len(nodes) == 0 { return nil } parallelism := o.cfg.Shutdown.SSHParallelism if parallelism <= 0 { parallelism = 8 } if parallelism > len(nodes) { parallelism = len(nodes) } sem := make(chan struct{}, parallelism) var wg sync.WaitGroup errCh := make(chan error, len(nodes)) for _, node := range nodes { node := strings.TrimSpace(node) if node == "" || !o.sshManaged(node) { continue } wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() if _, err := o.ssh(ctx, node, "sudo -n /usr/bin/systemctl --version"); err != nil { errCh <- fmt.Errorf("%s: missing sudo access to /usr/bin/systemctl (--version): %w", node, err) } }() } wg.Wait() close(errCh) if len(errCh) == 0 { return nil } samples := []string{} for err := range errCh { samples = append(samples, err.Error()) if len(samples) >= 4 { break } } return fmt.Errorf("access validation had %d errors (first: %s)", len(errCh), strings.Join(samples, " | ")) } func (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error { if o.runner.DryRun || !o.cfg.Startup.RequireNodeSSHAuth { return nil } wait := time.Duration(o.cfg.Startup.NodeSSHAuthWaitSeconds) * time.Second if wait <= 0 { wait = 4 * time.Minute } poll := time.Duration(o.cfg.Startup.NodeSSHAuthPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) seen := map[string]struct{}{} targets := make([]string, 0, len(nodes)) for _, node := range nodes { node = strings.TrimSpace(node) if node == "" { continue } if _, skip := ignored[node]; skip { continue } if _, ok := seen[node]; ok { continue } seen[node] = struct{}{} targets = append(targets, node) } sort.Strings(targets) if len(targets) == 0 { return nil } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} for { authDenied := []string{} pending := []string{} for _, node := range targets { if !o.sshManaged(node) { authDenied = append(authDenied, fmt.Sprintf("%s(not in ssh_managed_nodes)", node)) continue } out, err := o.sshWithTimeout(ctx, node, "echo __ANANKE_SSH_AUTH_OK__", 12*time.Second) if err != nil { detail := strings.TrimSpace(err.Error()) full := strings.ToLower(strings.TrimSpace(detail + " " + out)) switch { case strings.Contains(full, "permission denied"), strings.Contains(full, "publickey"), strings.Contains(full, "authentication"): authDenied = append(authDenied, fmt.Sprintf("%s(auth denied)", node)) default: pending = append(pending, fmt.Sprintf("%s(unreachable)", node)) } continue } if !strings.Contains(out, "__ANANKE_SSH_AUTH_OK__") { pending = append(pending, fmt.Sprintf("%s(unexpected output)", node)) } } if len(authDenied) > 0 { sort.Strings(authDenied) return fmt.Errorf("ssh auth gate failed: %s", joinLimited(authDenied, 8)) } if len(pending) == 0 { return nil } sort.Strings(pending) lastFailure = joinLimited(pending, 8) if time.Now().After(deadline) { return fmt.Errorf("node ssh auth gate did not pass within %s (%s)", wait, lastFailure) } if time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for node ssh auth (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) { out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}") if err != nil { return false, err } return strings.Contains(out, "True"), nil } func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) { urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}") if urlErr == nil { currentURL := strings.TrimSpace(urlOut) o.log.Printf("flux-source-url=%s", currentURL) expectedURL := strings.TrimSpace(o.cfg.ExpectedFluxSource) if expectedURL != "" && normalizeGitURL(currentURL) != normalizeGitURL(expectedURL) { o.log.Printf("warning: flux source URL is %q, expected %q", currentURL, expectedURL) } } branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}") if branchErr == nil { branch := strings.TrimSpace(branchOut) o.log.Printf("flux-source-branch=%s", branch) if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch { o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch) } } } func (o *Orchestrator) guardFluxSourceDrift(ctx context.Context, expectedBranch string, allowBranchPatch bool) error { urlOut, err := o.kubectl( ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}", ) if err != nil { if isNotFoundErr(err) { o.log.Printf("warning: flux gitrepository/flux-system not found while checking source drift") return nil } return fmt.Errorf("read flux source url: %w", err) } currentURL := strings.TrimSpace(urlOut) expectedURL := strings.TrimSpace(o.cfg.ExpectedFluxSource) if expectedURL != "" && normalizeGitURL(currentURL) != normalizeGitURL(expectedURL) { return fmt.Errorf("startup blocked: flux source url drift detected (current=%q expected=%q)", currentURL, expectedURL) } branchOut, err := o.kubectl( ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}", ) if err != nil { return fmt.Errorf("read flux source branch: %w", err) } currentBranch := strings.TrimSpace(branchOut) if expectedBranch == "" || currentBranch == expectedBranch || allowBranchPatch { return nil } return fmt.Errorf("startup blocked: flux source branch drift detected (current=%q expected=%q)", currentBranch, expectedBranch) } func (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string, allowPatch bool) error { branch = strings.TrimSpace(branch) if branch == "" { return nil } out, err := o.kubectl( ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}", ) if err != nil { if isNotFoundErr(err) { o.log.Printf("warning: flux gitrepository/flux-system not found while ensuring branch=%s", branch) return nil } return fmt.Errorf("read flux source branch: %w", err) } current := strings.TrimSpace(out) if current == branch { return nil } if !allowPatch { return fmt.Errorf("startup blocked: flux source branch is %q but expected %q (use --force-flux-branch to patch intentionally)", current, branch) } patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, branch) if _, err := o.kubectl( ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch, ); err != nil { return fmt.Errorf("set flux source branch %q (current %q): %w", branch, current, err) } o.log.Printf("updated flux source branch from %q to %q", current, branch) return nil } func normalizeGitURL(raw string) string { raw = strings.TrimSpace(strings.ToLower(raw)) raw = strings.TrimSuffix(raw, "/") raw = strings.TrimSuffix(raw, ".git") return raw } func (o *Orchestrator) bootstrapLocal(ctx context.Context) error { failures := 0 successes := 0 for _, rel := range o.cfg.LocalBootstrapPaths { full := filepath.Join(o.cfg.IACRepoPath, rel) o.log.Printf("local bootstrap apply rel=%s path=%s", rel, full) if o.runner.DryRun { successes++ continue } if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil { o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err) o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full) if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil { o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr) o.log.Printf("local bootstrap cache apply for rel=%s", rel) if cacheErr := o.applyBootstrapCache(ctx, rel); cacheErr != nil { failures++ o.log.Printf("warning: local bootstrap cache apply failed for rel=%s: %v", rel, cacheErr) continue } } } successes++ } if failures > 0 && successes == 0 { return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures) } return nil } func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error { cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full) if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil { return err } return nil } func (o *Orchestrator) syncLocalIACRepo(ctx context.Context) error { repo := strings.TrimSpace(o.cfg.IACRepoPath) if repo == "" { return fmt.Errorf("iac repo path is empty") } gitDir := filepath.Join(repo, ".git") if stat, err := os.Stat(gitDir); err != nil || stat.IsDir() == false { return fmt.Errorf("iac repo %s is not a git checkout", repo) } statusOut, statusErr := o.runSensitive(ctx, 10*time.Second, "git", "-C", repo, "status", "--porcelain") if statusErr != nil { return fmt.Errorf("inspect iac repo working tree: %w", statusErr) } if strings.TrimSpace(statusOut) != "" { o.log.Printf("warning: skipping local titan-iac sync because working tree is dirty") return nil } branch := strings.TrimSpace(o.cfg.ExpectedFluxBranch) if branch == "" { branch = "main" } if _, err := o.runSensitive(ctx, 45*time.Second, "git", "-C", repo, "fetch", "origin", "--prune"); err != nil { return fmt.Errorf("git fetch origin: %w", err) } if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "checkout", branch); err != nil { return fmt.Errorf("git checkout %s: %w", branch, err) } if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "reset", "--hard", "origin/"+branch); err != nil { return fmt.Errorf("git reset --hard origin/%s: %w", branch, err) } return nil } func (o *Orchestrator) refreshBootstrapCache(ctx context.Context) error { if len(o.cfg.LocalBootstrapPaths) == 0 { return nil } if err := os.MkdirAll(o.bootstrapCacheDir(), 0o755); err != nil { return fmt.Errorf("ensure bootstrap cache dir: %w", err) } rendered := 0 for _, rel := range o.cfg.LocalBootstrapPaths { rel = strings.TrimSpace(rel) if rel == "" { continue } full := filepath.Join(o.cfg.IACRepoPath, rel) if stat, err := os.Stat(full); err != nil || !stat.IsDir() { o.log.Printf("warning: skip bootstrap cache render for rel=%s (path missing)", rel) continue } cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q", full) manifest, err := o.runSensitive(ctx, 2*time.Minute, "sh", "-lc", cmd) if err != nil { o.log.Printf("warning: bootstrap cache render failed for rel=%s: %v", rel, err) continue } cachePath := o.bootstrapCachePath(rel) if err := os.WriteFile(cachePath, []byte(manifest+"\n"), 0o644); err != nil { o.log.Printf("warning: bootstrap cache write failed for rel=%s path=%s: %v", rel, cachePath, err) continue } rendered++ } if rendered == 0 { return fmt.Errorf("no bootstrap cache manifests rendered") } o.log.Printf("bootstrap cache refreshed (%d paths)", rendered) return nil } func (o *Orchestrator) applyBootstrapCache(ctx context.Context, rel string) error { cachePath := o.bootstrapCachePath(rel) if _, err := os.Stat(cachePath); err != nil { return fmt.Errorf("bootstrap cache missing at %s: %w", cachePath, err) } if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-f", cachePath); err != nil { return err } return nil } func (o *Orchestrator) bootstrapCacheDir() string { return filepath.Join(o.cfg.State.Dir, "bootstrap-cache") } func (o *Orchestrator) bootstrapCachePath(rel string) string { safe := strings.TrimSpace(rel) safe = strings.ReplaceAll(safe, "/", "__") safe = strings.ReplaceAll(safe, string(os.PathSeparator), "__") return filepath.Join(o.bootstrapCacheDir(), safe+".yaml") } func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) { if o.runner.DryRun { return true, nil } deadline := time.Now().Add(window) for { ready, err := o.fluxSourceReady(ctx) if err != nil { return false, err } if ready { return true, nil } if time.Now().After(deadline) { return false, nil } select { case <-ctx.Done(): return false, ctx.Err() case <-time.After(5 * time.Second): } } } func (o *Orchestrator) waitForStorageReady(ctx context.Context) error { if o.runner.DryRun { return nil } wait := time.Duration(o.cfg.Startup.StorageReadyWaitSeconds) * time.Second if wait <= 0 { wait = 420 * time.Second } poll := time.Duration(o.cfg.Startup.StorageReadyPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastReason := "unknown" for { ok, reason, err := o.storageReady(ctx) if err != nil { lastReason = err.Error() } else { lastReason = reason } if ok { o.log.Printf("storage readiness check passed (%s)", reason) return nil } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: storage readiness not satisfied within %s (%s)", wait, lastReason) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func (o *Orchestrator) storageReady(ctx context.Context) (bool, string, error) { minReady := o.cfg.Startup.StorageMinReadyNodes if minReady <= 0 { minReady = 2 } longhornOut, err := o.kubectl( ctx, 15*time.Second, "-n", "longhorn-system", "get", "nodes.longhorn.io", "-o", `jsonpath={range .items[*]}{.metadata.name}{":"}{.status.conditions[?(@.type=="Ready")].status}{":"}{.status.conditions[?(@.type=="Schedulable")].status}{"\n"}{end}`, ) if err != nil { return false, "", fmt.Errorf("query longhorn nodes: %w", err) } readyNodes := 0 for _, line := range lines(longhornOut) { parts := strings.Split(line, ":") if len(parts) < 3 { continue } ready := strings.EqualFold(strings.TrimSpace(parts[1]), "true") sched := strings.EqualFold(strings.TrimSpace(parts[2]), "true") if ready && sched { readyNodes++ } } if readyNodes < minReady { return false, fmt.Sprintf("longhorn ready+sched nodes %d/%d", readyNodes, minReady), nil } for _, item := range o.cfg.Startup.StorageCriticalPVCs { item = strings.TrimSpace(item) if item == "" { continue } parts := strings.SplitN(item, "/", 2) if len(parts) != 2 { return false, "", fmt.Errorf("invalid storage_critical_pvcs entry %q", item) } ns := strings.TrimSpace(parts[0]) name := strings.TrimSpace(parts[1]) out, pvcErr := o.kubectl(ctx, 15*time.Second, "-n", ns, "get", "pvc", name, "-o", "jsonpath={.status.phase}") if pvcErr != nil { if isNotFoundErr(pvcErr) { return false, fmt.Sprintf("pvc %s/%s not found", ns, name), nil } return false, "", fmt.Errorf("query pvc %s/%s: %w", ns, name, pvcErr) } if !strings.EqualFold(strings.TrimSpace(out), "Bound") { return false, fmt.Sprintf("pvc %s/%s phase=%s", ns, name, strings.TrimSpace(out)), nil } } return true, fmt.Sprintf("longhorn ready+sched nodes=%d critical pvcs bound=%d", readyNodes, len(o.cfg.Startup.StorageCriticalPVCs)), nil } type fluxCondition struct { Type string `json:"type"` Status string `json:"status"` Reason string `json:"reason"` Message string `json:"message"` } type fluxKustomizationList struct { Items []fluxKustomization `json:"items"` } type fluxKustomization struct { Metadata struct { Namespace string `json:"namespace"` Name string `json:"name"` } `json:"metadata"` Spec struct { Suspend bool `json:"suspend"` Timeout string `json:"timeout"` } `json:"spec"` Status struct { Conditions []fluxCondition `json:"conditions"` } `json:"status"` } type workloadList struct { Items []workloadResource `json:"items"` } type ingressList struct { Items []ingressResource `json:"items"` } type ingressResource struct { Metadata struct { Namespace string `json:"namespace"` Name string `json:"name"` } `json:"metadata"` Spec struct { Rules []struct { Host string `json:"host"` } `json:"rules"` } `json:"spec"` } type jobList struct { Items []jobResource `json:"items"` } type jobResource struct { Metadata struct { Namespace string `json:"namespace"` Name string `json:"name"` Labels map[string]string `json:"labels"` OwnerReferences []ownerReference `json:"ownerReferences"` } `json:"metadata"` Status struct { Failed int32 `json:"failed"` Succeeded int32 `json:"succeeded"` Conditions []jobConditionRef `json:"conditions"` } `json:"status"` } type jobConditionRef struct { Type string `json:"type"` Status string `json:"status"` } type workloadResource struct { Kind string `json:"kind"` Metadata struct { Namespace string `json:"namespace"` Name string `json:"name"` } `json:"metadata"` Spec struct { Replicas *int32 `json:"replicas"` Template struct { Spec podSpec `json:"spec"` } `json:"template"` } `json:"spec"` Status struct { ReadyReplicas int32 `json:"readyReplicas"` DesiredNumberScheduled int32 `json:"desiredNumberScheduled"` NumberReady int32 `json:"numberReady"` } `json:"status"` } type podList struct { Items []podResource `json:"items"` } type podResource struct { Metadata struct { Namespace string `json:"namespace"` Name string `json:"name"` Annotations map[string]string `json:"annotations"` CreationTimestamp time.Time `json:"creationTimestamp"` OwnerReferences []ownerReference `json:"ownerReferences"` } `json:"metadata"` Spec struct { NodeName string `json:"nodeName"` podSpec } `json:"spec"` Status struct { Phase string `json:"phase"` InitContainerStatuses []podContainerStatus `json:"initContainerStatuses"` ContainerStatuses []podContainerStatus `json:"containerStatuses"` } `json:"status"` } type ownerReference struct { Kind string `json:"kind"` } type podContainerStatus struct { Name string `json:"name"` State podContainerState `json:"state"` } type podContainerState struct { Waiting *podContainerWaitingState `json:"waiting"` Running *podContainerRunningState `json:"running"` } type podContainerWaitingState struct { Reason string `json:"reason"` } type podContainerRunningState struct { StartedAt time.Time `json:"startedAt"` } type podSpec struct { NodeSelector map[string]string `json:"nodeSelector"` Affinity *podAffinity `json:"affinity"` } type podAffinity struct { NodeAffinity *nodeAffinity `json:"nodeAffinity"` } type nodeAffinity struct { RequiredDuringSchedulingIgnoredDuringExecution *nodeSelector `json:"requiredDuringSchedulingIgnoredDuringExecution"` } type nodeSelector struct { NodeSelectorTerms []nodeSelectorTerm `json:"nodeSelectorTerms"` } type nodeSelectorTerm struct { MatchExpressions []nodeSelectorRequirement `json:"matchExpressions"` } type nodeSelectorRequirement struct { Key string `json:"key"` Operator string `json:"operator"` Values []string `json:"values"` } type workloadIgnoreRule struct { Namespace string Kind string Name string } func (o *Orchestrator) ensureRequiredNodeLabels(ctx context.Context) error { if o.runner.DryRun || len(o.cfg.Startup.RequiredNodeLabels) == 0 { return nil } nodes := make([]string, 0, len(o.cfg.Startup.RequiredNodeLabels)) for node := range o.cfg.Startup.RequiredNodeLabels { node = strings.TrimSpace(node) if node != "" { nodes = append(nodes, node) } } sort.Strings(nodes) for _, node := range nodes { labels := o.cfg.Startup.RequiredNodeLabels[node] if len(labels) == 0 { continue } keys := make([]string, 0, len(labels)) for key := range labels { key = strings.TrimSpace(key) if key != "" { keys = append(keys, key) } } sort.Strings(keys) args := []string{"label", "node", node, "--overwrite"} pairs := make([]string, 0, len(keys)) for _, key := range keys { value := strings.TrimSpace(labels[key]) if value == "" { continue } pair := fmt.Sprintf("%s=%s", key, value) args = append(args, pair) pairs = append(pairs, pair) } if len(pairs) == 0 { continue } if _, err := o.kubectl(ctx, 25*time.Second, args...); err != nil { return fmt.Errorf("ensure required node labels on %s (%s): %w", node, strings.Join(pairs, ", "), err) } o.log.Printf("ensured required labels on node %s: %s", node, strings.Join(pairs, ", ")) } return nil } func (o *Orchestrator) waitForStartupConvergence(ctx context.Context) error { if o.runner.DryRun { return nil } if o.cfg.Startup.RequireIngressChecklist { if err := o.waitForIngressChecklist(ctx); err != nil { o.noteStartupCheck("ingress-checklist", false, err.Error()) return err } o.noteStartupCheck("ingress-checklist", true, "all ingress hosts reachable") } if o.cfg.Startup.RequireServiceChecklist { if err := o.waitForServiceChecklist(ctx); err != nil { o.noteStartupCheck("service-checklist", false, err.Error()) return err } o.noteStartupCheck("service-checklist", true, "all configured service checks passed") } if o.cfg.Startup.RequireFluxHealth { if err := o.waitForFluxHealth(ctx); err != nil { o.noteStartupCheck("flux-health", false, err.Error()) return err } o.noteStartupCheck("flux-health", true, "all flux kustomizations ready") } if o.cfg.Startup.RequireWorkloadConvergence { if err := o.waitForWorkloadConvergence(ctx); err != nil { o.noteStartupCheck("workload-convergence", false, err.Error()) return err } o.noteStartupCheck("workload-convergence", true, "controllers converged") } if err := o.waitForStabilityWindow(ctx); err != nil { o.noteStartupCheck("stability-window", false, err.Error()) return err } o.noteStartupCheck("stability-window", true, "startup soak passed") return nil } func (o *Orchestrator) waitForIngressChecklist(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.IngressChecklistWaitSeconds) * time.Second if wait <= 0 { wait = 7 * time.Minute } poll := time.Duration(o.cfg.Startup.IngressChecklistPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} lastIngressHeal := time.Time{} for { o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) prevFailure := lastFailure ready, detail := o.ingressChecklistReady(ctx) lastFailure = detail if ready { o.log.Printf("ingress checklist passed (%s)", detail) return nil } o.maybeAutoHealIngressHostBackends(ctx, &lastIngressHeal, lastFailure) if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for ingress checklist (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: ingress checklist not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func (o *Orchestrator) ingressChecklistReady(ctx context.Context) (bool, string) { hosts, err := o.discoverIngressHosts(ctx) if err != nil { return false, err.Error() } if len(hosts) == 0 { return true, "no ingress hosts discovered" } accepted := o.cfg.Startup.IngressChecklistAccepted if len(accepted) == 0 { accepted = []int{200, 301, 302, 307, 308, 401, 403, 404} } for _, host := range hosts { check := config.ServiceChecklistCheck{ Name: "ingress-" + host, URL: "https://" + host + "/", AcceptedStatuses: accepted, TimeoutSeconds: 12, InsecureSkipTLS: o.cfg.Startup.IngressChecklistInsecureSkip, } ok, detail := o.serviceCheckReady(ctx, check) if !ok { return false, fmt.Sprintf("%s: %s", host, detail) } } return true, fmt.Sprintf("hosts=%d", len(hosts)) } func (o *Orchestrator) discoverIngressHosts(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 25*time.Second, "get", "ingress", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query ingresses: %w", err) } var list ingressList if err := json.Unmarshal([]byte(out), &list); err != nil { return nil, fmt.Errorf("decode ingresses: %w", err) } ignored := makeStringSet(o.cfg.Startup.IngressChecklistIgnoreHosts) hosts := map[string]struct{}{} for _, item := range list.Items { for _, rule := range item.Spec.Rules { host := strings.TrimSpace(rule.Host) if host == "" || strings.Contains(host, "*") { continue } if _, skip := ignored[host]; skip { continue } hosts[host] = struct{}{} } } outHosts := make([]string, 0, len(hosts)) for host := range hosts { outHosts = append(outHosts, host) } sort.Strings(outHosts) return outHosts, nil } func (o *Orchestrator) discoverIngressNamespacesForHost(ctx context.Context, host string) ([]string, error) { host = strings.ToLower(strings.TrimSpace(host)) if host == "" { return nil, nil } out, err := o.kubectl(ctx, 25*time.Second, "get", "ingress", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query ingresses: %w", err) } var list ingressList if err := json.Unmarshal([]byte(out), &list); err != nil { return nil, fmt.Errorf("decode ingresses: %w", err) } namespaces := map[string]struct{}{} for _, item := range list.Items { ns := strings.TrimSpace(item.Metadata.Namespace) if ns == "" { continue } for _, rule := range item.Spec.Rules { ruleHost := strings.ToLower(strings.TrimSpace(rule.Host)) if ruleHost == "" { continue } if ruleHost == host { namespaces[ns] = struct{}{} break } } } outNamespaces := make([]string, 0, len(namespaces)) for ns := range namespaces { outNamespaces = append(outNamespaces, ns) } sort.Strings(outNamespaces) return outNamespaces, nil } func (o *Orchestrator) maybeAutoHealIngressHostBackends(ctx context.Context, lastAttempt *time.Time, failureDetail string) { if o.runner.DryRun { return } host := o.checklistFailureHost(failureDetail) if host == "" { return } now := time.Now() if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 45*time.Second { return } if lastAttempt != nil { *lastAttempt = now } healed, err := o.healIngressHostBackendReplicas(ctx, host) if err != nil { o.log.Printf("warning: ingress host auto-heal failed for %s: %v", host, err) return } if len(healed) == 0 { return } sort.Strings(healed) detail := fmt.Sprintf("restored ingress backend replicas for %s: %s", host, joinLimited(healed, 8)) o.log.Printf("%s", detail) o.noteStartupAutoHeal(detail) } func (o *Orchestrator) checklistFailureHost(failureDetail string) string { prefix := strings.TrimSpace(failureDetail) if idx := strings.Index(prefix, ":"); idx > 0 { prefix = strings.TrimSpace(prefix[:idx]) } if isLikelyHostname(prefix) { return strings.ToLower(prefix) } for _, check := range o.cfg.Startup.ServiceChecklist { name := strings.TrimSpace(check.Name) if !strings.EqualFold(name, prefix) { continue } host := hostFromURL(check.URL) if host != "" { return strings.ToLower(host) } } if host := hostFromURL(prefix); host != "" { return strings.ToLower(host) } return "" } func hostFromURL(raw string) string { parsed, err := neturl.Parse(strings.TrimSpace(raw)) if err != nil || parsed == nil { return "" } return strings.TrimSpace(parsed.Hostname()) } func isLikelyHostname(value string) bool { value = strings.TrimSpace(value) if value == "" { return false } if strings.Contains(value, " ") || strings.Contains(value, "/") { return false } return strings.Contains(value, ".") } func (o *Orchestrator) healIngressHostBackendReplicas(ctx context.Context, host string) ([]string, error) { namespaces, err := o.discoverIngressNamespacesForHost(ctx, host) if err != nil { return nil, err } if len(namespaces) == 0 { return nil, nil } targetNamespaces := makeStringSet(namespaces) out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query workloads: %w", err) } var list workloadList if err := json.Unmarshal([]byte(out), &list); err != nil { return nil, fmt.Errorf("decode workloads: %w", err) } healed := []string{} for _, item := range list.Items { kind := strings.ToLower(strings.TrimSpace(item.Kind)) ns := strings.TrimSpace(item.Metadata.Namespace) name := strings.TrimSpace(item.Metadata.Name) if kind == "" || ns == "" || name == "" { continue } if kind != "deployment" && kind != "statefulset" { continue } if _, ok := targetNamespaces[ns]; !ok { continue } desired := int32(1) if item.Spec.Replicas != nil { desired = *item.Spec.Replicas } if desired >= 1 { continue } workload := startupWorkload{Namespace: ns, Kind: kind, Name: name} if err := o.ensureWorkloadReplicas(ctx, workload, 1); err != nil { if isNotFoundErr(err) { continue } return healed, fmt.Errorf("scale %s/%s/%s to 1: %w", ns, kind, name, err) } healed = append(healed, ns+"/"+kind+"/"+name) } return healed, nil } func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.ServiceChecklistWaitSeconds) * time.Second if wait <= 0 { wait = 7 * time.Minute } poll := time.Duration(o.cfg.Startup.ServiceChecklistPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} lastIngressHeal := time.Time{} for { o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) prevFailure := lastFailure ready, detail := o.serviceChecklistReady(ctx) lastFailure = detail if ready { o.log.Printf("external service checklist passed (%s)", detail) return nil } o.maybeAutoHealIngressHostBackends(ctx, &lastIngressHeal, lastFailure) if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for external service checklist (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: external service checklist not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func (o *Orchestrator) serviceChecklistReady(ctx context.Context) (bool, string) { checks := o.cfg.Startup.ServiceChecklist if len(checks) == 0 { return true, "no checklist items configured" } for _, check := range checks { ok, detail := o.serviceCheckReady(ctx, check) if !ok { name := strings.TrimSpace(check.Name) if name == "" { name = strings.TrimSpace(check.URL) } return false, fmt.Sprintf("%s: %s", name, detail) } } return true, fmt.Sprintf("checks=%d", len(checks)) } func (o *Orchestrator) serviceCheckReady(ctx context.Context, check config.ServiceChecklistCheck) (bool, string) { status, body, err := o.httpChecklistProbe(ctx, check) if err != nil { return false, err.Error() } accepted := check.AcceptedStatuses if len(accepted) == 0 { accepted = []int{200, 201, 202, 203, 204, 301, 302, 303, 307, 308, 401, 403} } statusOk := false for _, code := range accepted { if status == code { statusOk = true break } } if !statusOk { return false, fmt.Sprintf("unexpected status code=%d", status) } bodyContains := strings.TrimSpace(check.BodyContains) if bodyContains != "" && !checklistContains(body, bodyContains) { return false, fmt.Sprintf("response missing expected marker %q", bodyContains) } bodyNotContains := strings.TrimSpace(check.BodyNotContains) if bodyNotContains != "" && checklistContains(body, bodyNotContains) { return false, fmt.Sprintf("response contained forbidden marker %q", bodyNotContains) } return true, fmt.Sprintf("status=%d", status) } func (o *Orchestrator) httpChecklistProbe(ctx context.Context, check config.ServiceChecklistCheck) (int, string, error) { timeout := time.Duration(check.TimeoutSeconds) * time.Second if timeout <= 0 { timeout = 12 * time.Second } transport := &http.Transport{} if check.InsecureSkipTLS { transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} } client := &http.Client{ Timeout: timeout, Transport: transport, } req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimSpace(check.URL), nil) if err != nil { return 0, "", fmt.Errorf("build request: %w", err) } req.Header.Set("User-Agent", "ananke/startup-checklist") resp, err := client.Do(req) if err != nil { return 0, "", fmt.Errorf("request failed: %w", err) } defer resp.Body.Close() body, readErr := io.ReadAll(io.LimitReader(resp.Body, 64*1024)) if readErr != nil { return resp.StatusCode, "", fmt.Errorf("read response body: %w", readErr) } return resp.StatusCode, string(body), nil } func checklistContains(body, marker string) bool { bodyLower := strings.ToLower(body) markerLower := strings.ToLower(marker) if strings.Contains(bodyLower, markerLower) { return true } bodyCompact := compactLowerNoSpace(bodyLower) markerCompact := compactLowerNoSpace(markerLower) if markerCompact == "" { return true } return strings.Contains(bodyCompact, markerCompact) } func compactLowerNoSpace(s string) string { var b strings.Builder b.Grow(len(s)) for _, r := range s { if unicode.IsSpace(r) { continue } b.WriteRune(r) } return b.String() } func (o *Orchestrator) waitForStabilityWindow(ctx context.Context) error { window := time.Duration(o.cfg.Startup.ServiceChecklistStabilitySec) * time.Second if window <= 0 { return nil } poll := time.Duration(o.cfg.Startup.ServiceChecklistPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(window) lastStatus := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} for { o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) if err := o.startupStabilityHealthy(ctx); err != nil { return fmt.Errorf("startup stability window failed: %w", err) } if time.Now().After(deadline) { o.log.Printf("startup stability window passed (%s)", window) return nil } if time.Since(lastStatus) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("startup stability soak in progress (%s remaining)", remaining) lastStatus = time.Now() } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func (o *Orchestrator) startupStabilityHealthy(ctx context.Context) error { if o.cfg.Startup.RequireFluxHealth { ready, detail, err := o.fluxHealthReady(ctx) if err != nil { return fmt.Errorf("flux check error: %w", err) } if !ready { return fmt.Errorf("flux not ready: %s", detail) } } if o.cfg.Startup.RequireWorkloadConvergence { ready, detail, err := o.workloadConvergenceReady(ctx) if err != nil { return fmt.Errorf("workload check error: %w", err) } if !ready { return fmt.Errorf("workloads not converged: %s", detail) } } if o.cfg.Startup.RequireServiceChecklist { ready, detail := o.serviceChecklistReady(ctx) if !ready { return fmt.Errorf("external services not healthy: %s", detail) } } if o.cfg.Startup.RequireIngressChecklist { ready, detail := o.ingressChecklistReady(ctx) if !ready { return fmt.Errorf("ingress reachability not healthy: %s", detail) } } failures, err := o.startupFailurePods(ctx) if err != nil { return fmt.Errorf("pod failure check error: %w", err) } if len(failures) > 0 { return fmt.Errorf("pods in crash/image-pull failures: %s", joinLimited(failures, 8)) } return nil } func (o *Orchestrator) waitForFluxHealth(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.FluxHealthWaitSeconds) * time.Second if wait <= 0 { wait = 15 * time.Minute } if effective, reason, err := o.adaptiveFluxHealthWait(ctx, wait); err != nil { o.log.Printf("warning: unable to evaluate adaptive flux wait window: %v", err) } else if effective > wait { o.log.Printf("adjusted flux convergence wait window from %s to %s (%s)", wait, effective, reason) wait = effective } poll := time.Duration(o.cfg.Startup.FluxHealthPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastImmutableHealAttempt := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} for { o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) prevFailure := lastFailure ready, detail, err := o.fluxHealthReady(ctx) if err != nil { lastFailure = err.Error() } else { lastFailure = detail } if ready { o.log.Printf("flux convergence check passed (%s)", detail) return nil } if !o.runner.DryRun && looksLikeImmutableJobError(lastFailure) && time.Since(lastImmutableHealAttempt) >= 30*time.Second { lastImmutableHealAttempt = time.Now() healed, healErr := o.healImmutableFluxJobs(ctx) if healErr != nil { o.log.Printf("warning: immutable-job self-heal attempt failed: %v", healErr) } else if healed { o.log.Printf("detected immutable-job failure and removed stale failed job(s); re-requesting reconcile") o.noteStartupAutoHeal("deleted stale failed flux-managed job(s) after immutable template error") o.bestEffort("reconcile flux after immutable-job cleanup", func() error { return o.resumeFluxAndReconcile(ctx) }) } } if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for Flux convergence (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: flux convergence not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func (o *Orchestrator) adaptiveFluxHealthWait(ctx context.Context, base time.Duration) (time.Duration, string, error) { if base <= 0 { base = 15 * time.Minute } out, err := o.kubectl(ctx, 20*time.Second, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json") if err != nil { return base, "", fmt.Errorf("query flux kustomizations: %w", err) } var list fluxKustomizationList if err := json.Unmarshal([]byte(out), &list); err != nil { return base, "", fmt.Errorf("decode flux kustomizations: %w", err) } maxTimeout := time.Duration(0) maxName := "" for _, ks := range list.Items { if ks.Spec.Suspend { continue } timeout := parseFluxKustomizationTimeout(ks.Spec.Timeout) if timeout <= maxTimeout { continue } maxTimeout = timeout maxName = strings.TrimSpace(ks.Metadata.Namespace) + "/" + strings.TrimSpace(ks.Metadata.Name) } if maxTimeout <= 0 { return base, "no explicit kustomization timeouts found", nil } required := maxTimeout + 2*time.Minute if required <= base { return base, fmt.Sprintf("max flux timeout %s on %s", maxTimeout, maxName), nil } return required, fmt.Sprintf("max flux timeout %s on %s", maxTimeout, maxName), nil } func parseFluxKustomizationTimeout(raw string) time.Duration { raw = strings.TrimSpace(raw) if raw == "" { return 0 } d, err := time.ParseDuration(raw) if err != nil { return 0 } return d } func (o *Orchestrator) fluxHealthReady(ctx context.Context) (bool, string, error) { out, err := o.kubectl(ctx, 20*time.Second, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json") if err != nil { return false, "", fmt.Errorf("query flux kustomizations: %w", err) } var list fluxKustomizationList if err := json.Unmarshal([]byte(out), &list); err != nil { return false, "", fmt.Errorf("decode flux kustomizations: %w", err) } ignored := makeStringSet(o.cfg.Startup.IgnoreFluxKustomizations) notReady := []string{} for _, ks := range list.Items { ns := strings.TrimSpace(ks.Metadata.Namespace) name := strings.TrimSpace(ks.Metadata.Name) if ns == "" || name == "" { continue } full := ns + "/" + name if ks.Spec.Suspend { continue } if _, ok := ignored[full]; ok { continue } cond := readyCondition(ks.Status.Conditions) if cond != nil && strings.EqualFold(strings.TrimSpace(cond.Status), "True") { continue } reason := "ready condition missing" if cond != nil { reason = strings.TrimSpace(cond.Message) if reason == "" { reason = strings.TrimSpace(cond.Reason) } if reason == "" { reason = "ready=false" } } notReady = append(notReady, fmt.Sprintf("%s(%s)", full, reason)) } if len(notReady) > 0 { sort.Strings(notReady) return false, "not ready: " + joinLimited(notReady, 6), nil } return true, fmt.Sprintf("all kustomizations ready=%d", len(list.Items)), nil } func looksLikeImmutableJobError(detail string) bool { d := strings.ToLower(strings.TrimSpace(detail)) if d == "" { return false } return strings.Contains(d, "field is immutable") && strings.Contains(d, "job") } func (o *Orchestrator) healImmutableFluxJobs(ctx context.Context) (bool, error) { out, err := o.kubectl(ctx, 25*time.Second, "get", "jobs", "-A", "-o", "json") if err != nil { return false, fmt.Errorf("query jobs: %w", err) } var list jobList if err := json.Unmarshal([]byte(out), &list); err != nil { return false, fmt.Errorf("decode jobs: %w", err) } deleted := []string{} for _, job := range list.Items { ns := strings.TrimSpace(job.Metadata.Namespace) name := strings.TrimSpace(job.Metadata.Name) if ns == "" || name == "" { continue } if !jobLooksFluxManaged(job) { continue } if !jobFailed(job) { continue } o.log.Printf("warning: deleting stale failed flux-managed job %s/%s to recover immutable template drift", ns, name) if _, err := o.kubectl(ctx, 20*time.Second, "-n", ns, "delete", "job", name, "--wait=false"); err != nil && !isNotFoundErr(err) { o.log.Printf("warning: delete failed for stale job %s/%s: %v", ns, name, err) continue } deleted = append(deleted, ns+"/"+name) } if len(deleted) == 0 { return false, nil } sort.Strings(deleted) o.log.Printf("immutable-job cleanup removed %d job(s): %s", len(deleted), joinLimited(deleted, 8)) return true, nil } func jobLooksFluxManaged(job jobResource) bool { if strings.TrimSpace(job.Metadata.Labels["kustomize.toolkit.fluxcd.io/name"]) != "" { return true } for _, owner := range job.Metadata.OwnerReferences { if strings.EqualFold(strings.TrimSpace(owner.Kind), "CronJob") { return false } } return false } func jobFailed(job jobResource) bool { if job.Status.Succeeded > 0 { return false } if job.Status.Failed <= 0 { return false } for _, cond := range job.Status.Conditions { if strings.EqualFold(strings.TrimSpace(cond.Type), "Failed") && strings.EqualFold(strings.TrimSpace(cond.Status), "True") { return true } } return false } func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.WorkloadConvergenceWaitSeconds) * time.Second if wait <= 0 { wait = 15 * time.Minute } poll := time.Duration(o.cfg.Startup.WorkloadConvergencePollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} for { prevFailure := lastFailure o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) ready, detail, err := o.workloadConvergenceReady(ctx) if err != nil { lastFailure = err.Error() } else { lastFailure = detail } if ready { o.log.Printf("workload convergence check passed (%s)", detail) return nil } if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for workload convergence (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: workload convergence not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error) { out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset,daemonset", "-A", "-o", "json") if err != nil { return false, "", fmt.Errorf("query controllers: %w", err) } var list workloadList if err := json.Unmarshal([]byte(out), &list); err != nil { return false, "", fmt.Errorf("decode controllers: %w", err) } ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) ignoredByFlux := namespaceCandidatesFromIgnoreKustomizations(o.cfg.Startup.IgnoreFluxKustomizations) pending := []string{} checked := 0 for _, item := range list.Items { kind := strings.ToLower(strings.TrimSpace(item.Kind)) ns := strings.TrimSpace(item.Metadata.Namespace) name := strings.TrimSpace(item.Metadata.Name) if kind == "" || ns == "" || name == "" { continue } if _, ok := ignoredNamespaces[ns]; ok { continue } if _, ok := ignoredByFlux[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, kind, name) { continue } if workloadTargetsIgnoredNodes(item.Spec.Template.Spec, ignoredNodes) { continue } desired, ready, ok := desiredReady(item) if !ok || desired <= 0 { continue } if kind == "daemonset" && desired > ready && len(ignoredNodes) > 0 { missing := desired - ready if missing <= int32(len(ignoredNodes)) { ready = desired } } checked++ if ready < desired { pending = append(pending, fmt.Sprintf("%s/%s/%s ready=%d desired=%d", ns, kind, name, ready, desired)) } } if len(pending) > 0 { sort.Strings(pending) return false, "not ready: " + joinLimited(pending, 8), nil } return true, fmt.Sprintf("controllers ready=%d", checked), nil } func desiredReady(item workloadResource) (int32, int32, bool) { switch strings.ToLower(strings.TrimSpace(item.Kind)) { case "deployment", "statefulset": desired := int32(1) if item.Spec.Replicas != nil { desired = *item.Spec.Replicas } return desired, item.Status.ReadyReplicas, true case "daemonset": return item.Status.DesiredNumberScheduled, item.Status.NumberReady, true default: return 0, 0, false } } func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error { out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") if err != nil { return fmt.Errorf("query pods: %w", err) } var list podList if err := json.Unmarshal([]byte(out), &list); err != nil { return fmt.Errorf("decode pods: %w", err) } ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second if grace <= 0 { grace = 180 * time.Second } stuckReasons := map[string]struct{}{ "ImagePullBackOff": {}, "ErrImagePull": {}, "CrashLoopBackOff": {}, "CreateContainerConfigError": {}, "CreateContainerError": {}, } recycled := []string{} for _, pod := range list.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) if ns == "" || name == "" { continue } if _, ok := ignoredNamespaces[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, "", name) { continue } if podTargetsIgnoredNode(pod, ignoredNodes) { continue } if !podControllerOwned(pod) { continue } age := time.Since(pod.Metadata.CreationTimestamp) if !pod.Metadata.CreationTimestamp.IsZero() && age < grace { continue } reason := stuckContainerReason(pod, stuckReasons) if reason == "" { reason = stuckVaultInitReason(pod, grace) } if reason == "" { continue } o.log.Printf("warning: recycling stuck pod %s/%s reason=%s age=%s", ns, name, reason, age.Round(time.Second)) if _, err := o.kubectl(ctx, 30*time.Second, "-n", ns, "delete", "pod", name, "--wait=false"); err != nil && !isNotFoundErr(err) { o.log.Printf("warning: recycle pod failed for %s/%s: %v", ns, name, err) continue } recycled = append(recycled, ns+"/"+name) } if len(recycled) > 0 { sort.Strings(recycled) o.log.Printf("recycled stuck controller pods (%d): %s", len(recycled), joinLimited(recycled, 10)) o.noteStartupAutoHeal(fmt.Sprintf("recycled stuck controller pods: %s", joinLimited(recycled, 10))) } return nil } func podControllerOwned(p podResource) bool { for _, owner := range p.Metadata.OwnerReferences { switch strings.TrimSpace(owner.Kind) { case "ReplicaSet", "StatefulSet", "DaemonSet": return true } } return false } func stuckContainerReason(p podResource, reasons map[string]struct{}) string { check := func(statuses []podContainerStatus) string { for _, st := range statuses { if st.State.Waiting == nil { continue } reason := strings.TrimSpace(st.State.Waiting.Reason) if reason == "" { continue } if _, ok := reasons[reason]; ok { return reason } } return "" } if reason := check(p.Status.InitContainerStatuses); reason != "" { return reason } return check(p.Status.ContainerStatuses) } func stuckVaultInitReason(p podResource, grace time.Duration) string { if !strings.EqualFold(strings.TrimSpace(p.Status.Phase), "Pending") { return "" } if !strings.EqualFold(strings.TrimSpace(p.Metadata.Annotations["vault.hashicorp.com/agent-inject"]), "true") { return "" } for _, st := range p.Status.InitContainerStatuses { if strings.TrimSpace(st.Name) != "vault-agent-init" || st.State.Running == nil { continue } startedAt := st.State.Running.StartedAt if startedAt.IsZero() { continue } if time.Since(startedAt) < grace { return "" } return "VaultInitStuck" } return "" } func (o *Orchestrator) maybeAutoRecycleStuckPods(ctx context.Context, lastAttempt *time.Time) { if o.runner.DryRun || !o.cfg.Startup.AutoRecycleStuckPods { return } now := time.Now() if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second { return } if lastAttempt != nil { *lastAttempt = now } o.bestEffort("recycle stuck controller pods", func() error { return o.recycleStuckControllerPods(ctx) }) } func (o *Orchestrator) maybeAutoHealCriticalWorkloadReplicas(ctx context.Context, lastAttempt *time.Time) { if o.runner.DryRun { return } now := time.Now() if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second { return } if lastAttempt != nil { *lastAttempt = now } healed, err := o.healCriticalWorkloadReplicas(ctx) if err != nil { o.log.Printf("warning: critical workload replica auto-heal failed: %v", err) return } if len(healed) == 0 { return } sort.Strings(healed) detail := fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8)) o.log.Printf("%s", detail) o.noteStartupAutoHeal(detail) } func (o *Orchestrator) healCriticalWorkloadReplicas(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query workloads: %w", err) } var list workloadList if err := json.Unmarshal([]byte(out), &list); err != nil { return nil, fmt.Errorf("decode workloads: %w", err) } current := map[string]int32{} for _, item := range list.Items { kind := strings.ToLower(strings.TrimSpace(item.Kind)) ns := strings.TrimSpace(item.Metadata.Namespace) name := strings.TrimSpace(item.Metadata.Name) if kind == "" || ns == "" || name == "" { continue } if kind != "deployment" && kind != "statefulset" { continue } desired := int32(1) if item.Spec.Replicas != nil { desired = *item.Spec.Replicas } key := ns + "/" + kind + "/" + name current[key] = desired } healed := []string{} for _, w := range criticalStartupWorkloads { key := w.Namespace + "/" + strings.ToLower(w.Kind) + "/" + w.Name desired, ok := current[key] if !ok || desired >= 1 { continue } if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil { if isNotFoundErr(err) { continue } return healed, fmt.Errorf("scale %s to 1: %w", key, err) } healed = append(healed, key) } return healed, nil } func (o *Orchestrator) startupFailurePods(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query pods: %w", err) } var list podList if err := json.Unmarshal([]byte(out), &list); err != nil { return nil, fmt.Errorf("decode pods: %w", err) } ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) stuckReasons := map[string]struct{}{ "ImagePullBackOff": {}, "ErrImagePull": {}, "CrashLoopBackOff": {}, "CreateContainerConfigError": {}, "CreateContainerError": {}, "RunContainerError": {}, } grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second if grace <= 0 { grace = 180 * time.Second } failures := []string{} for _, pod := range list.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) if ns == "" || name == "" { continue } if _, ok := ignoredNamespaces[ns]; ok { continue } if podTargetsIgnoredNode(pod, ignoredNodes) { continue } reason := stuckContainerReason(pod, stuckReasons) if reason == "" { reason = stuckVaultInitReason(pod, grace) } if reason == "" { continue } failures = append(failures, fmt.Sprintf("%s/%s(%s)", ns, name, reason)) } sort.Strings(failures) return failures, nil } func podTargetsIgnoredNode(p podResource, ignored map[string]struct{}) bool { if len(ignored) == 0 { return false } node := strings.TrimSpace(p.Spec.NodeName) if node != "" { _, ok := ignored[node] return ok } return workloadTargetsIgnoredNodes(p.Spec.podSpec, ignored) } func workloadTargetsIgnoredNodes(spec podSpec, ignored map[string]struct{}) bool { if len(ignored) == 0 { return false } if hostname, ok := spec.NodeSelector["kubernetes.io/hostname"]; ok { _, ignoredHost := ignored[strings.TrimSpace(hostname)] if ignoredHost { return true } } if spec.Affinity == nil || spec.Affinity.NodeAffinity == nil || spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil { return false } terms := spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms if len(terms) != 1 { return false } for _, expr := range terms[0].MatchExpressions { if strings.TrimSpace(expr.Key) != "kubernetes.io/hostname" { continue } if !strings.EqualFold(strings.TrimSpace(expr.Operator), "In") { return false } if len(expr.Values) == 0 { return false } for _, value := range expr.Values { if _, ok := ignored[strings.TrimSpace(value)]; !ok { return false } } return true } return false } func parseWorkloadIgnoreRules(entries []string) []workloadIgnoreRule { out := []workloadIgnoreRule{} for _, entry := range entries { entry = strings.TrimSpace(entry) if entry == "" { continue } parts := strings.Split(entry, "/") switch len(parts) { case 2: out = append(out, workloadIgnoreRule{ Namespace: strings.TrimSpace(parts[0]), Name: strings.TrimSpace(parts[1]), }) case 3: out = append(out, workloadIgnoreRule{ Namespace: strings.TrimSpace(parts[0]), Kind: strings.ToLower(strings.TrimSpace(parts[1])), Name: strings.TrimSpace(parts[2]), }) } } return out } func workloadIgnored(rules []workloadIgnoreRule, namespace, kind, name string) bool { ns := strings.TrimSpace(namespace) k := strings.ToLower(strings.TrimSpace(kind)) n := strings.TrimSpace(name) for _, rule := range rules { if rule.Namespace != ns { continue } if rule.Kind != "" && rule.Kind != k { continue } if rule.Name == n { return true } } return false } func makeStringSet(entries []string) map[string]struct{} { out := make(map[string]struct{}, len(entries)) for _, entry := range entries { entry = strings.TrimSpace(entry) if entry != "" { out[entry] = struct{}{} } } return out } func readyCondition(conditions []fluxCondition) *fluxCondition { for i := range conditions { cond := &conditions[i] if strings.EqualFold(strings.TrimSpace(cond.Type), "Ready") { return cond } } return nil } func joinLimited(items []string, limit int) string { if len(items) <= limit || limit <= 0 { return strings.Join(items, "; ") } return strings.Join(items[:limit], "; ") + fmt.Sprintf("; ... (+%d more)", len(items)-limit) } func namespaceCandidatesFromIgnoreKustomizations(entries []string) map[string]struct{} { out := map[string]struct{}{} for _, entry := range entries { entry = strings.TrimSpace(entry) if entry == "" { continue } parts := strings.SplitN(entry, "/", 2) if len(parts) != 2 { continue } name := strings.TrimSpace(parts[1]) if name != "" { out[name] = struct{}{} } } return out } func (o *Orchestrator) waitForPostStartProbes(ctx context.Context) error { if o.runner.DryRun { return nil } wait := time.Duration(o.cfg.Startup.PostStartProbeWaitSeconds) * time.Second if wait <= 0 { wait = 240 * time.Second } poll := time.Duration(o.cfg.Startup.PostStartProbePollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} for { ok, failure := o.postStartProbesReady(ctx) if ok { o.log.Printf("post-start probes passed") return nil } if failure != lastFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for post-start probes (%s remaining): %s", remaining, failure) lastLogged = time.Now() } lastFailure = failure if time.Now().After(deadline) { return fmt.Errorf("startup blocked: post-start probes did not pass within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } func (o *Orchestrator) postStartProbesReady(ctx context.Context) (bool, string) { probes := make([]string, 0, len(o.cfg.Startup.PostStartProbes)) for _, p := range o.cfg.Startup.PostStartProbes { p = strings.TrimSpace(p) if p != "" { probes = append(probes, p) } } if len(probes) == 0 { return true, "no probes configured" } for _, probe := range probes { code, err := o.httpProbe(ctx, probe) if err != nil { return false, fmt.Sprintf("%s: %v", probe, err) } if !probeStatusAccepted(probe, code) { return false, fmt.Sprintf("%s: unexpected status code=%d", probe, code) } } return true, "all probes successful" } func probeStatusAccepted(_ string, code int) bool { if code >= 200 && code < 400 { return true } // Auth fronts often return unauthorized/forbidden while still proving the service is up. if code == 401 || code == 403 { return true } return false } func (o *Orchestrator) httpProbe(ctx context.Context, probeURL string) (int, error) { out, err := o.run( ctx, 20*time.Second, "curl", "--silent", "--show-error", "--location", "--max-time", "12", "--output", "/dev/null", "--write-out", "%{http_code}", probeURL, ) if err != nil { return 0, err } code, convErr := strconv.Atoi(strings.TrimSpace(out)) if convErr != nil { return 0, fmt.Errorf("parse http status %q: %w", strings.TrimSpace(out), convErr) } return code, nil } func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error { if err := o.patchFluxSuspendAll(ctx, false); err != nil { return err } now := time.Now().UTC().Format(time.RFC3339) if _, err := o.kubectl( ctx, 25*time.Second, "-n", "flux-system", "annotate", "kustomizations.kustomize.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite", ); err != nil { o.log.Printf("warning: annotate kustomizations for reconcile failed: %v", err) } if _, err := o.kubectl( ctx, 25*time.Second, "annotate", "--all-namespaces", "helmreleases.helm.toolkit.fluxcd.io", "--all", "reconcile.fluxcd.io/requestedAt="+now, "--overwrite", ); err != nil { o.log.Printf("warning: annotate helmreleases for reconcile failed: %v", err) } if o.runner.CommandExists("flux") { sourceCmd := []string{"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=60s"} if _, err := o.run(ctx, 75*time.Second, "flux", sourceCmd...); err != nil { o.log.Printf("warning: flux command failed (%s): %v", strings.Join(sourceCmd, " "), err) } } return nil } func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) { return o.run(ctx, timeout, "kubectl", args...) } func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) { return o.sshWithTimeout(ctx, node, command, 45*time.Second) } func (o *Orchestrator) sshWithTimeout(ctx context.Context, node string, command string, timeout time.Duration) (string, error) { host := node if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" { host = strings.TrimSpace(mapped) } sshUser := o.cfg.SSHUser if override, ok := o.cfg.SSHNodeUsers[node]; ok && strings.TrimSpace(override) != "" { sshUser = strings.TrimSpace(override) } target := host if sshUser != "" { target = sshUser + "@" + host } sshConfigFile := o.resolveSSHConfigFile() sshIdentity := o.resolveSSHIdentityFile() baseArgs := []string{ "-o", "BatchMode=yes", "-o", "ConnectTimeout=8", "-o", "StrictHostKeyChecking=accept-new", } if sshConfigFile != "" { baseArgs = append(baseArgs, "-F", sshConfigFile) } if sshIdentity != "" { baseArgs = append(baseArgs, "-i", sshIdentity) } if o.cfg.SSHPort > 0 { baseArgs = append(baseArgs, "-p", strconv.Itoa(o.cfg.SSHPort)) } attempts := make([][]string, 0, 2) attemptNames := make([]string, 0, 2) knownHostsFiles := sshutil.KnownHostsFiles(sshConfigFile, sshIdentity) repairHosts := []string{node, host} if o.cfg.SSHJumpHost != "" { jump := o.cfg.SSHJumpHost repairHosts = append(repairHosts, jump) if mapped, ok := o.cfg.SSHNodeHosts[jump]; ok && strings.TrimSpace(mapped) != "" { repairHosts = append(repairHosts, strings.TrimSpace(mapped)) } if o.cfg.SSHJumpUser != "" { jump = o.cfg.SSHJumpUser + "@" + jump } if o.cfg.SSHPort > 0 && !strings.Contains(jump, ":") { jump = fmt.Sprintf("%s:%d", jump, o.cfg.SSHPort) } withJump := append([]string{}, baseArgs...) withJump = append(withJump, "-J", jump, target, command) attempts = append(attempts, withJump) attemptNames = append(attemptNames, "jump") } direct := append([]string{}, baseArgs...) direct = append(direct, target, command) attempts = append(attempts, direct) attemptNames = append(attemptNames, "direct") var lastOut string var lastErr error for i, args := range attempts { out, err := o.run(ctx, timeout, "ssh", args...) if err == nil { if i > 0 { o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i]) } return out, nil } if sshutil.ShouldAttemptKnownHostsRepair(out, err) { o.log.Printf("warning: ssh failure on %s via %s path may be host-key related; repairing known_hosts and retrying once", node, attemptNames[i]) sshutil.RepairKnownHosts(ctx, o.log, knownHostsFiles, repairHosts, o.cfg.SSHPort) retryOut, retryErr := o.run(ctx, timeout, "ssh", args...) if retryErr == nil { return retryOut, nil } out = retryOut err = retryErr } lastOut = out lastErr = err if i < len(attempts)-1 { o.log.Printf("warning: ssh %s path failed for %s: %v; trying %s path", attemptNames[i], node, err, attemptNames[i+1]) } } return lastOut, lastErr } func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() return o.runner.Run(runCtx, name, args...) } func (o *Orchestrator) runSensitive(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { runCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() cmd := exec.CommandContext(runCtx, name, args...) cmd.Env = os.Environ() if o.runner.Kubeconfig != "" { cmd.Env = append(cmd.Env, "KUBECONFIG="+o.runner.Kubeconfig) } out, err := cmd.CombinedOutput() trimmed := strings.TrimSpace(string(out)) if err != nil { if trimmed == "" { return "", fmt.Errorf("%s failed: %w", name, err) } return trimmed, fmt.Errorf("%s failed: %w", name, err) } return trimmed, nil } func lines(in string) []string { in = strings.TrimSpace(in) if in == "" { return nil } parts := strings.Split(in, "\n") out := make([]string, 0, len(parts)) for _, p := range parts { v := strings.TrimSpace(p) if v != "" { out = append(out, v) } } return out } func (o *Orchestrator) sshManaged(node string) bool { if len(o.cfg.SSHManagedNodes) == 0 { return true } for _, allowed := range o.cfg.SSHManagedNodes { if strings.TrimSpace(allowed) == node { return true } } return false } func (o *Orchestrator) resolveSSHConfigFile() string { if strings.TrimSpace(o.cfg.SSHConfigFile) != "" { return strings.TrimSpace(o.cfg.SSHConfigFile) } candidates := []string{ "/home/atlas/.ssh/config", "/home/tethys/.ssh/config", } for _, p := range candidates { if stat, err := os.Stat(p); err == nil && !stat.IsDir() { return p } } return "" } func (o *Orchestrator) resolveSSHIdentityFile() string { if strings.TrimSpace(o.cfg.SSHIdentityFile) != "" { return strings.TrimSpace(o.cfg.SSHIdentityFile) } candidates := []string{ "/home/atlas/.ssh/id_ed25519", "/home/tethys/.ssh/id_ed25519", } for _, p := range candidates { if stat, err := os.Stat(p); err == nil && !stat.IsDir() { return p } } return "" } func (o *Orchestrator) bestEffort(name string, fn func() error) { if err := fn(); err != nil { o.log.Printf("warning: %s: %v", name, err) } } func (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error) { missing := []string{} for _, w := range criticalStartupWorkloads { ready, err := o.workloadReady(ctx, w) if err != nil { if isNotFoundErr(err) { missing = append(missing, fmt.Sprintf("%s/%s/%s(not found)", w.Namespace, w.Kind, w.Name)) continue } return nil, fmt.Errorf("check %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if !ready { missing = append(missing, fmt.Sprintf("%s/%s/%s", w.Namespace, w.Kind, w.Name)) } } return missing, nil } func (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error { for _, w := range criticalStartupWorkloads { if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing, skipping scale: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return fmt.Errorf("scale %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if err := o.cleanupStaleCriticalWorkloadPods(ctx, w); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing during stale-pod cleanup: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return fmt.Errorf("cleanup stale pods %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if err := o.waitWorkloadReady(ctx, w); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing during readiness wait: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return err } } return nil } func (o *Orchestrator) cleanupStaleCriticalWorkloadPods(ctx context.Context, w startupWorkload) error { if o.runner.DryRun { return nil } if w.Kind != "statefulset" { return nil } out, err := o.kubectl( ctx, 20*time.Second, "-n", w.Namespace, "get", "pods", "-o", "custom-columns=NAME:.metadata.name,PHASE:.status.phase,OWNER_KIND:.metadata.ownerReferences[0].kind,OWNER_NAME:.metadata.ownerReferences[0].name", "--no-headers", ) if err != nil { return err } prefix := w.Name + "-" for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 4 { continue } podName := fields[0] phase := strings.ToLower(strings.TrimSpace(fields[1])) ownerKind := strings.TrimSpace(fields[2]) ownerName := strings.TrimSpace(fields[3]) if !strings.EqualFold(ownerKind, "StatefulSet") || ownerName != w.Name { continue } if !strings.HasPrefix(podName, prefix) { continue } if phase != "unknown" && phase != "failed" { continue } o.log.Printf("warning: deleting stale critical pod %s/%s phase=%s before readiness wait", w.Namespace, podName, phase) if _, delErr := o.kubectl( ctx, 40*time.Second, "-n", w.Namespace, "delete", "pod", podName, "--grace-period=0", "--force", "--wait=false", ); delErr != nil { return fmt.Errorf("delete stale pod %s/%s: %w", w.Namespace, podName, delErr) } } return nil } func (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error { _, err := o.kubectl( ctx, 45*time.Second, "-n", w.Namespace, "scale", w.Kind, w.Name, fmt.Sprintf("--replicas=%d", replicas), ) return err } func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error { if w.Namespace == "vault" && w.Kind == "statefulset" && w.Name == "vault" { return o.waitVaultReady(ctx, w) } timeout := "240s" if w.Kind == "statefulset" { timeout = "360s" } _, err := o.kubectl( ctx, 7*time.Minute, "-n", w.Namespace, "rollout", "status", fmt.Sprintf("%s/%s", w.Kind, w.Name), "--timeout="+timeout, ) if err != nil { return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } return nil } func (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error { if o.runner.DryRun { return nil } deadline := time.Now().Add(7 * time.Minute) var lastErr error for time.Now().Before(deadline) { ready, err := o.workloadReady(ctx, w) if err == nil && ready { return nil } if err != nil { lastErr = err } if err := o.ensureVaultUnsealed(ctx); err != nil { lastErr = err o.log.Printf("warning: vault readiness still blocked: %v", err) } ready, err = o.workloadReady(ctx, w) if err == nil && ready { return nil } if err != nil { lastErr = err } select { case <-ctx.Done(): return ctx.Err() case <-time.After(5 * time.Second): } } if lastErr != nil { return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr) } return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name) } func (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error { if o.runner.DryRun { return nil } phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}") if err != nil { return fmt.Errorf("vault pod phase check failed: %w", err) } if strings.TrimSpace(phase) != "Running" { return fmt.Errorf("vault-0 pod phase is %q", strings.TrimSpace(phase)) } sealed, err := o.vaultSealed(ctx) if err != nil { return err } if !sealed { return nil } unsealKey, err := o.vaultUnsealKey(ctx) if err != nil { return err } for attempt := 1; attempt <= 5; attempt++ { o.log.Printf("vault is sealed; attempting auto-unseal (%d/5)", attempt) if _, err := o.runSensitive( ctx, 30*time.Second, "kubectl", "-n", "vault", "exec", "vault-0", "--", "vault", "operator", "unseal", unsealKey, ); err != nil { return fmt.Errorf("vault unseal attempt %d failed: %w", attempt, err) } sealed, err = o.vaultSealed(ctx) if err != nil { return err } if !sealed { o.log.Printf("vault auto-unseal succeeded") return nil } time.Sleep(2 * time.Second) } return fmt.Errorf("vault remained sealed after 5 auto-unseal attempts") } func (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error) { out, err := o.kubectl( ctx, 25*time.Second, "-n", "vault", "exec", "vault-0", "--", "sh", "-lc", "VAULT_ADDR=http://127.0.0.1:8200 vault status -format=json 2>/dev/null || true", ) if err != nil { return false, fmt.Errorf("vault status check failed: %w", err) } sealed, err := parseVaultSealed(out) if err != nil { return false, fmt.Errorf("parse vault status: %w", err) } return sealed, nil } func parseVaultSealed(raw string) (bool, error) { trimmed := strings.TrimSpace(raw) if trimmed == "" { return false, fmt.Errorf("empty vault status output") } start := strings.Index(trimmed, "{") end := strings.LastIndex(trimmed, "}") if start < 0 || end < 0 || end < start { return false, fmt.Errorf("vault status payload missing JSON object") } payload := trimmed[start : end+1] type vaultStatus struct { Sealed bool `json:"sealed"` } var st vaultStatus if err := json.Unmarshal([]byte(payload), &st); err != nil { return false, err } return st.Sealed, nil } func (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error) { out, err := o.kubectl( ctx, 15*time.Second, "-n", "vault", "get", "secret", "vault-init", "-o", "jsonpath={.data.unseal_key_b64}", ) if err == nil { decoded, decodeErr := base64.StdEncoding.DecodeString(strings.TrimSpace(out)) if decodeErr == nil { key := strings.TrimSpace(string(decoded)) if key != "" { o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(key) }) return key, nil } err = fmt.Errorf("vault-init unseal key is empty") } else { err = fmt.Errorf("decode vault-init unseal_key_b64: %w", decodeErr) } } else { err = fmt.Errorf("read vault-init secret: %w", err) } fallbackKey, fileErr := o.readVaultUnsealKeyFile() if fileErr == nil { o.log.Printf("warning: using cached vault unseal key from %s", o.cfg.Startup.VaultUnsealKeyFile) return fallbackKey, nil } breakglassKey, breakglassErr := o.readVaultUnsealKeyBreakglass(ctx) if breakglassErr == nil { o.log.Printf("warning: using break-glass vault unseal key command fallback") o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(breakglassKey) }) return breakglassKey, nil } return "", fmt.Errorf("%v; fallback %v; break-glass %v", err, fileErr, breakglassErr) } func (o *Orchestrator) readVaultUnsealKeyBreakglass(ctx context.Context) (string, error) { cmd := strings.TrimSpace(o.cfg.Startup.VaultUnsealBreakglassCommand) if cmd == "" { return "", fmt.Errorf("break-glass command not configured") } timeout := time.Duration(o.cfg.Startup.VaultUnsealBreakglassTimeout) * time.Second if timeout <= 0 { timeout = 15 * time.Second } out, err := o.runSensitive(ctx, timeout, "sh", "-lc", cmd) if err != nil { return "", fmt.Errorf("run break-glass command: %w", err) } key := strings.TrimSpace(out) if key == "" { return "", fmt.Errorf("break-glass command returned empty output") } return key, nil } func (o *Orchestrator) writeVaultUnsealKeyFile(key string) error { path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile) if path == "" { return fmt.Errorf("vault unseal key file path is empty") } if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { return fmt.Errorf("ensure vault unseal key dir: %w", err) } if err := os.WriteFile(path, []byte(strings.TrimSpace(key)+"\n"), 0o600); err != nil { return fmt.Errorf("write vault unseal key file: %w", err) } return nil } func (o *Orchestrator) readVaultUnsealKeyFile() (string, error) { path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile) if path == "" { return "", fmt.Errorf("vault unseal key file path is empty") } b, err := os.ReadFile(path) if err != nil { return "", fmt.Errorf("read vault unseal key file %s: %w", path, err) } key := strings.TrimSpace(string(b)) if key == "" { return "", fmt.Errorf("vault unseal key file %s is empty", path) } return key, nil } func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) { out, err := o.kubectl( ctx, 20*time.Second, "-n", w.Namespace, "get", w.Kind, w.Name, "-o", "jsonpath={.status.readyReplicas}", ) if err != nil { return false, err } raw := strings.TrimSpace(out) if raw == "" || raw == "" { return false, nil } n, err := strconv.Atoi(raw) if err != nil { return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err) } return n >= 1, nil } func isNotFoundErr(err error) bool { if err == nil { return false } msg := strings.ToLower(err.Error()) return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)") } func (o *Orchestrator) poweroffHosts(ctx context.Context, workers []string) error { delay := o.cfg.Shutdown.PoweroffDelaySeconds if delay <= 0 { delay = 25 } localNames := map[string]struct{}{} if hn, err := os.Hostname(); err == nil && strings.TrimSpace(hn) != "" { localNames[strings.TrimSpace(hn)] = struct{}{} } if o.cfg.SSHUser != "" { localNames[o.cfg.SSHUser] = struct{}{} } hostSet := map[string]struct{}{} for _, n := range o.cfg.ControlPlanes { hostSet[n] = struct{}{} } for _, n := range workers { hostSet[n] = struct{}{} } for _, n := range o.cfg.Shutdown.ExtraPoweroffHosts { if strings.TrimSpace(n) != "" { hostSet[strings.TrimSpace(n)] = struct{}{} } } hosts := make([]string, 0, len(hostSet)) for h := range hostSet { hosts = append(hosts, h) } sort.Strings(hosts) remoteCmd := fmt.Sprintf(`sudo nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &`, delay) for _, host := range hosts { host = strings.TrimSpace(host) if host == "" { continue } if _, isLocal := localNames[host]; isLocal { continue } o.bestEffort("schedule poweroff on "+host, func() error { _, err := o.ssh(ctx, host, remoteCmd) return err }) } if o.cfg.Shutdown.PoweroffLocalHost { o.bestEffort("schedule local host poweroff", func() error { _, err := o.run(ctx, 5*time.Second, "sh", "-c", fmt.Sprintf("nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &", delay+10)) return err }) } return nil }