package cluster import ( "context" "errors" "fmt" "sort" "strings" "time" "scm.bstein.dev/bstein/ananke/internal/state" ) // Startup runs one orchestration or CLI step. // Signature: (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) { unlock, err := state.AcquireLock(o.cfg.State.LockPath) if err != nil { return err } defer unlock() o.beginStartupReport(opts.Reason) defer o.finalizeStartupReport(err) o.setStartupPhase("preflight-node-inventory", "validating configured node inventory") record := state.RunRecord{ ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()), Action: "startup", Reason: opts.Reason, DryRun: o.runner.DryRun, StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) if invErr := o.validateNodeInventory(); invErr != nil { o.noteStartupCheck("node-inventory", false, invErr.Error()) return invErr } o.noteStartupCheck("node-inventory", true, "inventory/user/port validation passed") o.setStartupPhase("preflight-node-reachability", "waiting for ssh reachability across configured inventory") if reachErr := o.waitForNodeInventoryReachability(ctx); reachErr != nil { o.noteStartupCheck("node-inventory-reachability", false, reachErr.Error()) return reachErr } o.noteStartupCheck("node-inventory-reachability", true, "all expected nodes responded over SSH") resumedFlux := false defer func() { if o.runner.DryRun || err == nil || resumedFlux { return } o.log.Printf("warning: startup failed before normal flux resume; attempting best-effort recovery resume") o.bestEffort("restore scaled workloads after failed startup", func() error { return o.restoreScaledApps(ctx) }) o.bestEffort("resume flux after failed startup", func() error { return o.resumeFluxAndReconcile(ctx) }) }() if !o.runner.DryRun { currentIntent, readErr := state.ReadIntent(o.cfg.State.IntentPath) if readErr != nil { return fmt.Errorf("read startup intent: %w", readErr) } if currentIntent.State == state.IntentStartupInProgress { o.log.Printf("warning: detected stale startup intent from a previous interrupted run; clearing it before continuing") if clearErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, "auto-clear stale startup intent", "startup"); clearErr != nil { return fmt.Errorf("clear stale startup intent: %w", clearErr) } currentIntent = state.Intent{State: state.IntentNormal} } if currentIntent.State == state.IntentShuttingDown { if intentFresh(currentIntent, o.startupGuardAge()) { return fmt.Errorf("startup blocked: shutdown intent is active (%s)", currentIntent.Reason) } o.log.Printf("warning: local shutdown intent appears stale (updated_at=%s reason=%q); auto-clearing to continue startup", currentIntent.UpdatedAt.Format(time.RFC3339), currentIntent.Reason) if clearErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, "auto-clear stale shutdown intent", "startup"); clearErr != nil { return fmt.Errorf("clear stale shutdown intent: %w", clearErr) } currentIntent = state.Intent{State: state.IntentNormal} } cooldown := o.startupShutdownCooldown() if currentIntent.State == state.IntentShutdownComplete && intentFresh(currentIntent, cooldown) { elapsed := intentAge(currentIntent) remaining := cooldown - elapsed if remaining < time.Second { remaining = time.Second } o.log.Printf("startup cooldown active: last shutdown completed %s ago; waiting %s", elapsed.Round(time.Second), remaining.Round(time.Second)) timer := time.NewTimer(remaining) select { case <-ctx.Done(): timer.Stop() return fmt.Errorf("startup canceled while waiting for shutdown cooldown: %w", ctx.Err()) case <-timer.C: } refreshed, readErr := state.ReadIntent(o.cfg.State.IntentPath) if readErr != nil { return fmt.Errorf("re-read startup intent after cooldown wait: %w", readErr) } currentIntent = refreshed if currentIntent.State == state.IntentShuttingDown && intentFresh(currentIntent, o.startupGuardAge()) { return fmt.Errorf("startup blocked: shutdown intent became active during cooldown wait (%s)", currentIntent.Reason) } if currentIntent.State == state.IntentShutdownComplete && intentFresh(currentIntent, cooldown) { return fmt.Errorf("startup blocked: shutdown completed too recently (%s ago)", intentAge(currentIntent).Round(time.Second)) } } if err := o.guardPeerStartupIntents(ctx); err != nil { return err } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentStartupInProgress, opts.Reason, "startup"); writeErr != nil { return fmt.Errorf("set startup intent: %w", writeErr) } defer func() { finalReason := opts.Reason if err != nil { finalReason = fmt.Sprintf("%s (failed)", strings.TrimSpace(opts.Reason)) } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, finalReason, "startup"); writeErr != nil { o.log.Printf("warning: write startup completion intent failed: %v", writeErr) } }() } o.log.Printf("startup control-planes=%s", strings.Join(o.cfg.ControlPlanes, ",")) if o.cfg.Startup.RequireTimeSync { o.noteStartupCheckState("time-sync", "running", "waiting for node clock synchronization") o.setStartupPhase("preflight-time-sync", "waiting for control-plane time sync quorum") if err := o.waitForTimeSync(ctx, o.cfg.ControlPlanes); err != nil { o.noteStartupCheck("time-sync", false, err.Error()) return err } o.noteStartupCheck("time-sync", true, "time synchronization healthy") } o.noteStartupCheckState("datastore-preflight", "running", "checking datastore endpoint health") o.setStartupPhase("preflight-datastore", "checking k3s datastore endpoint") if err := o.preflightExternalDatastore(ctx); err != nil { o.noteStartupCheck("datastore-preflight", false, err.Error()) return err } o.noteStartupCheck("datastore-preflight", true, "datastore endpoint accepted connections") o.bestEffort("sync local titan-iac checkout", func() error { return o.syncLocalIACRepo(ctx) }) o.bestEffort("refresh bootstrap cache from local repo", func() error { return o.refreshBootstrapCache(ctx) }) if o.cfg.Startup.ReconcileAccessOnBoot { o.bestEffort("reconcile control-plane access", func() error { return o.reconcileNodeAccess(ctx, o.cfg.ControlPlanes) }) } o.reportFluxSource(ctx, opts.ForceFluxBranch) o.setStartupPhase("control-plane-start", "starting control-plane nodes") o.startControlPlanes(ctx, o.cfg.ControlPlanes) apiPoll := time.Duration(o.cfg.Startup.APIPollSeconds) * time.Second apiAttempts := o.cfg.Startup.APIWaitSeconds / o.cfg.Startup.APIPollSeconds if apiAttempts < 1 { apiAttempts = 1 } o.noteStartupCheckState("kubernetes-api", "running", "waiting for kubernetes api availability") o.setStartupPhase("api-wait", "waiting for kubernetes api") if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil { if !o.cfg.Startup.AutoEtcdRestoreOnAPIFailure { o.noteStartupCheck("kubernetes-api", false, err.Error()) return err } cp := strings.TrimSpace(o.cfg.Startup.EtcdRestoreControlPlane) if cp == "" && len(o.cfg.ControlPlanes) > 0 { cp = o.cfg.ControlPlanes[0] } o.log.Printf("warning: initial API wait failed (%v); attempting automatic etcd restore on %s", err, cp) if restoreErr := o.EtcdRestore(ctx, EtcdRestoreOptions{ControlPlane: cp}); restoreErr != nil { if errors.Is(restoreErr, ErrEtcdRestoreNotApplicable) { o.log.Printf("warning: automatic etcd restore skipped: %v", restoreErr) o.log.Printf("warning: retrying control-plane start because datastore recovery path is external") o.startControlPlanes(ctx, o.cfg.ControlPlanes) } else { return fmt.Errorf("kubernetes API did not become reachable and automatic etcd restore failed: %w", restoreErr) } } if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil { o.noteStartupCheck("kubernetes-api", false, err.Error()) return fmt.Errorf("kubernetes API did not become reachable after automatic etcd restore: %w", err) } } o.noteStartupCheck("kubernetes-api", true, "kubernetes api reachable") if err := o.ensureRequiredNodeLabels(ctx); err != nil { return err } desiredFluxBranch := strings.TrimSpace(opts.ForceFluxBranch) if desiredFluxBranch == "" { desiredFluxBranch = strings.TrimSpace(o.cfg.ExpectedFluxBranch) } allowFluxBranchPatch := strings.TrimSpace(opts.ForceFluxBranch) != "" o.noteStartupCheckState("flux-source-guard", "running", "validating flux source url and branch drift") o.setStartupPhase("flux-guard", "validating flux source url and branch") if err := o.guardFluxSourceDrift(ctx, desiredFluxBranch, allowFluxBranchPatch); err != nil { o.noteStartupCheck("flux-source-guard", false, err.Error()) return err } if err := o.ensureFluxBranch(ctx, desiredFluxBranch, allowFluxBranchPatch); err != nil { o.noteStartupCheck("flux-source-guard", false, err.Error()) return err } o.noteStartupCheck("flux-source-guard", true, fmt.Sprintf("flux source branch/url validated (branch=%s)", desiredFluxBranch)) workers, err := o.effectiveWorkers(ctx) if err != nil { return err } o.log.Printf("startup workers=%s", strings.Join(workers, ",")) o.setStartupPhase("worker-start", "starting and uncordoning worker nodes") if o.cfg.Startup.ReconcileAccessOnBoot { o.bestEffort("reconcile worker access", func() error { return o.reconcileNodeAccess(ctx, workers) }) } o.startWorkers(ctx, workers) o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) }) sshCheckNodes := append([]string{}, o.cfg.ControlPlanes...) sshCheckNodes = append(sshCheckNodes, workers...) if err := o.waitForNodeSSHAuth(ctx, sshCheckNodes); err != nil { o.noteStartupCheck("node-ssh-auth", false, err.Error()) return err } o.noteStartupCheck("node-ssh-auth", true, fmt.Sprintf("nodes=%d", len(sshCheckNodes))) needsLocalBootstrap := false bootstrapReasons := []string{} if !opts.SkipLocalBootstrap { ready, readyErr := o.fluxSourceReady(ctx) if readyErr != nil { o.log.Printf("warning: unable to read flux source readiness: %v", readyErr) needsLocalBootstrap = true bootstrapReasons = append(bootstrapReasons, "flux source readiness check failed") } if !ready { needsLocalBootstrap = true bootstrapReasons = append(bootstrapReasons, "flux source not ready") } } missing, missingErr := o.missingCriticalStartupWorkloads(ctx) if missingErr != nil { o.log.Printf("warning: unable to inspect critical startup workloads: %v", missingErr) } if len(missing) > 0 { o.log.Printf("startup critical workloads not ready; applying targeted recovery first: %s", strings.Join(missing, ", ")) } if o.cfg.Startup.RequireStorageReady { o.noteStartupCheckState("storage-readiness", "running", "waiting for longhorn and critical pvc readiness") o.setStartupPhase("storage-readiness", "waiting for longhorn and critical pvcs") if err := o.waitForStorageReady(ctx); err != nil { o.noteStartupCheck("storage-readiness", false, err.Error()) return err } o.noteStartupCheck("storage-readiness", true, "longhorn and critical PVCs ready") } o.noteStartupCheckState("critical-workloads", "running", "ensuring critical startup workloads have replicas") o.setStartupPhase("critical-workloads", "ensuring critical startup workloads are ready") if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { o.noteStartupCheck("critical-workloads", false, err.Error()) return err } o.noteStartupCheck("critical-workloads", true, "critical startup workloads are ready") if !opts.SkipLocalBootstrap && needsLocalBootstrap { if ready, err := o.waitForFluxSourceReady(ctx, 5*time.Minute); err != nil { o.log.Printf("warning: flux source readiness wait failed before local bootstrap: %v", err) } else if ready { o.log.Printf("flux source became ready after targeted recovery; skipping local bootstrap") needsLocalBootstrap = false } } if !opts.SkipLocalBootstrap && needsLocalBootstrap { o.log.Printf("startup bootstrap required after wait: %s", strings.Join(bootstrapReasons, "; ")) if err := o.bootstrapLocal(ctx); err != nil { return err } if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { return err } ready, err := o.fluxSourceReady(ctx) if err != nil { return fmt.Errorf("flux source readiness after bootstrap: %w", err) } if !ready { return fmt.Errorf("flux source still not ready after local bootstrap") } } o.bestEffort("restore scaled workloads", func() error { return o.restoreScaledApps(ctx) }) o.noteStartupCheckState("flux-resume-reconcile", "running", "resuming flux and waiting for reconcile") o.setStartupPhase("flux-resume", "resuming flux controllers and reconciling kustomizations") if err := o.resumeFluxAndReconcile(ctx); err != nil { o.noteStartupCheck("flux-resume-reconcile", false, err.Error()) return err } o.noteStartupCheck("flux-resume-reconcile", true, "flux resumed and reconciled") resumedFlux = true o.bestEffort("heal critical zero-replica workloads", func() error { healed, healErr := o.healCriticalWorkloadReplicas(ctx) if healErr != nil { return healErr } if len(healed) > 0 { sort.Strings(healed) o.noteStartupAutoHeal(fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8))) } return nil }) o.setStartupPhase("convergence-checks", "waiting for ingress, service, flux, workload, and stability checks") if err := o.waitForStartupConvergence(ctx); err != nil { return err } if o.cfg.Startup.RequirePostStartProbes { o.noteStartupCheckState("post-start-probes", "running", "waiting for post-start probe endpoints") o.setStartupPhase("post-start-probes", "running post-start probe checks") if err := o.waitForPostStartProbes(ctx); err != nil { o.noteStartupCheck("post-start-probes", false, err.Error()) return err } o.noteStartupCheck("post-start-probes", true, "post-start probes passed") } o.setStartupPhase("startup-complete", "startup workflow reached completion") o.log.Printf("startup flow complete") return nil } // EtcdRestore runs one orchestration or CLI step. // Signature: (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions) error { controlPlane := strings.TrimSpace(opts.ControlPlane) if controlPlane == "" { if len(o.cfg.ControlPlanes) == 0 { return fmt.Errorf("cannot restore etcd: no control planes configured") } controlPlane = o.cfg.ControlPlanes[0] } found := false for _, cp := range o.cfg.ControlPlanes { if cp == controlPlane { found = true break } } if !found { return fmt.Errorf("cannot restore etcd: control plane %s is not in configured control_planes", controlPlane) } if !o.sshManaged(controlPlane) { return fmt.Errorf("cannot restore etcd on %s: node not in ssh_managed_nodes", controlPlane) } snapshotPath := strings.TrimSpace(opts.SnapshotPath) if o.runner.DryRun { if snapshotPath == "" { snapshotPath = "" } o.log.Printf("etcd restore target=%s snapshot=%s (dry-run; datastore-mode and snapshot checks skipped)", controlPlane, snapshotPath) return nil } externalDatastore, err := o.controlPlaneUsesExternalDatastore(ctx, controlPlane) if err != nil { return err } if externalDatastore { return fmt.Errorf("%w: %s uses --datastore-endpoint", ErrEtcdRestoreNotApplicable, controlPlane) } if snapshotPath == "" { resolved, err := o.latestEtcdSnapshotPath(ctx, controlPlane) if err != nil { return err } snapshotPath = resolved } if err := o.verifyEtcdSnapshot(ctx, controlPlane, snapshotPath); err != nil { return err } o.log.Printf("etcd restore target=%s snapshot=%s", controlPlane, snapshotPath) for _, cp := range o.cfg.ControlPlanes { cp := cp o.bestEffort("stop k3s before etcd restore on "+cp, func() error { _, err := o.ssh(ctx, cp, "sudo systemctl stop k3s || true") return err }) } if _, err := o.runSudoK3S(ctx, controlPlane, "server", "--cluster-reset", "--cluster-reset-restore-path", snapshotPath); err != nil { return fmt.Errorf("etcd restore command failed on %s: %w", controlPlane, err) } o.log.Printf("etcd restore command completed on %s", controlPlane) if _, err := o.ssh(ctx, controlPlane, "sudo systemctl start k3s || true"); err != nil { return fmt.Errorf("failed to start k3s on restore control plane %s: %w", controlPlane, err) } time.Sleep(10 * time.Second) for _, cp := range o.cfg.ControlPlanes { cp := cp if cp == controlPlane { continue } o.bestEffort("start k3s after etcd restore on "+cp, func() error { _, err := o.ssh(ctx, cp, "sudo systemctl start k3s || true") return err }) } return nil } // Shutdown runs one orchestration or CLI step. // Signature: (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) { unlock, err := state.AcquireLock(o.cfg.State.LockPath) if err != nil { return err } defer unlock() if invErr := o.validateNodeInventory(); invErr != nil { return invErr } record := state.RunRecord{ ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()), Action: "shutdown", Reason: opts.Reason, DryRun: o.runner.DryRun, StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) if !o.runner.DryRun { if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil { return fmt.Errorf("set shutdown intent: %w", writeErr) } defer func() { final := state.IntentShuttingDown if err == nil { final = state.IntentShutdownComplete } if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil { o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr) } }() } workers, err := o.effectiveWorkers(ctx) if err != nil { return err } o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ",")) o.reportFluxSource(ctx, "") skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot if !skipEtcd { o.bestEffort("etcd snapshot", func() error { return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0]) }) } o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) }) o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) }) skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain if !skipDrain { o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) }) } shutdownMode := strings.TrimSpace(opts.Mode) effectiveMode, modeErr := normalizeShutdownMode(shutdownMode) if modeErr != nil { return modeErr } o.log.Printf("shutdown execution mode=%s (requested=%q)", effectiveMode, shutdownMode) o.stopWorkers(ctx, workers) o.stopControlPlanes(ctx, o.cfg.ControlPlanes) o.log.Printf("shutdown flow complete") return nil } // normalizeShutdownMode runs one orchestration or CLI step. // Signature: normalizeShutdownMode(raw string) (string, error). // Why: keeps shutdown behavior explicit and safe by allowing only cluster-only // semantics while preserving compatibility with legacy "config" callers. func normalizeShutdownMode(raw string) (string, error) { switch strings.TrimSpace(raw) { case "", "config", "cluster-only": return "cluster-only", nil case "poweroff": return "", fmt.Errorf("shutdown mode %q has been removed; ananke no longer powers off hosts", raw) default: return "", fmt.Errorf("unsupported shutdown mode %q (expected config|cluster-only)", raw) } }