ananke/internal/cluster/orchestrator_lifecycle.go

494 lines
20 KiB
Go

package cluster
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/ananke/internal/state"
)
// Startup runs one orchestration or CLI step.
// Signature: (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) {
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
if err != nil {
return err
}
defer unlock()
o.beginStartupReport(opts.Reason)
defer o.finalizeStartupReport(err)
o.setStartupPhase("preflight-node-inventory", "validating configured node inventory")
record := state.RunRecord{
ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()),
Action: "startup",
Reason: opts.Reason,
DryRun: o.runner.DryRun,
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
if invErr := o.validateNodeInventory(); invErr != nil {
o.noteStartupCheck("node-inventory", false, invErr.Error())
return invErr
}
o.noteStartupCheck("node-inventory", true, "inventory/user/port validation passed")
o.setStartupPhase("preflight-node-reachability", "waiting for ssh reachability across configured inventory")
if reachErr := o.waitForNodeInventoryReachability(ctx); reachErr != nil {
o.noteStartupCheck("node-inventory-reachability", false, reachErr.Error())
return reachErr
}
o.noteStartupCheck("node-inventory-reachability", true, "all expected nodes responded over SSH")
resumedFlux := false
defer func() {
if o.runner.DryRun || err == nil || resumedFlux {
return
}
o.log.Printf("warning: startup failed before normal flux resume; attempting best-effort recovery resume")
o.bestEffort("restore scaled workloads after failed startup", func() error { return o.restoreScaledApps(ctx) })
o.bestEffort("resume flux after failed startup", func() error { return o.resumeFluxAndReconcile(ctx) })
}()
if !o.runner.DryRun {
currentIntent, readErr := state.ReadIntent(o.cfg.State.IntentPath)
if readErr != nil {
return fmt.Errorf("read startup intent: %w", readErr)
}
if currentIntent.State == state.IntentStartupInProgress {
o.log.Printf("warning: detected stale startup intent from a previous interrupted run; clearing it before continuing")
if clearErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, "auto-clear stale startup intent", "startup"); clearErr != nil {
return fmt.Errorf("clear stale startup intent: %w", clearErr)
}
currentIntent = state.Intent{State: state.IntentNormal}
}
if currentIntent.State == state.IntentShuttingDown {
if intentFresh(currentIntent, o.startupGuardAge()) {
return fmt.Errorf("startup blocked: shutdown intent is active (%s)", currentIntent.Reason)
}
o.log.Printf("warning: local shutdown intent appears stale (updated_at=%s reason=%q); auto-clearing to continue startup",
currentIntent.UpdatedAt.Format(time.RFC3339), currentIntent.Reason)
if clearErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, "auto-clear stale shutdown intent", "startup"); clearErr != nil {
return fmt.Errorf("clear stale shutdown intent: %w", clearErr)
}
currentIntent = state.Intent{State: state.IntentNormal}
}
cooldown := o.startupShutdownCooldown()
if currentIntent.State == state.IntentShutdownComplete && intentFresh(currentIntent, cooldown) {
elapsed := intentAge(currentIntent)
remaining := cooldown - elapsed
if remaining < time.Second {
remaining = time.Second
}
o.log.Printf("startup cooldown active: last shutdown completed %s ago; waiting %s", elapsed.Round(time.Second), remaining.Round(time.Second))
timer := time.NewTimer(remaining)
select {
case <-ctx.Done():
timer.Stop()
return fmt.Errorf("startup canceled while waiting for shutdown cooldown: %w", ctx.Err())
case <-timer.C:
}
refreshed, readErr := state.ReadIntent(o.cfg.State.IntentPath)
if readErr != nil {
return fmt.Errorf("re-read startup intent after cooldown wait: %w", readErr)
}
currentIntent = refreshed
if currentIntent.State == state.IntentShuttingDown && intentFresh(currentIntent, o.startupGuardAge()) {
return fmt.Errorf("startup blocked: shutdown intent became active during cooldown wait (%s)", currentIntent.Reason)
}
if currentIntent.State == state.IntentShutdownComplete && intentFresh(currentIntent, cooldown) {
return fmt.Errorf("startup blocked: shutdown completed too recently (%s ago)", intentAge(currentIntent).Round(time.Second))
}
}
if err := o.guardPeerStartupIntents(ctx); err != nil {
return err
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentStartupInProgress, opts.Reason, "startup"); writeErr != nil {
return fmt.Errorf("set startup intent: %w", writeErr)
}
defer func() {
finalReason := opts.Reason
if err != nil {
finalReason = fmt.Sprintf("%s (failed)", strings.TrimSpace(opts.Reason))
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, finalReason, "startup"); writeErr != nil {
o.log.Printf("warning: write startup completion intent failed: %v", writeErr)
}
}()
}
o.log.Printf("startup control-planes=%s", strings.Join(o.cfg.ControlPlanes, ","))
if o.cfg.Startup.RequireTimeSync {
o.noteStartupCheckState("time-sync", "running", "waiting for node clock synchronization")
o.setStartupPhase("preflight-time-sync", "waiting for control-plane time sync quorum")
if err := o.waitForTimeSync(ctx, o.cfg.ControlPlanes); err != nil {
o.noteStartupCheck("time-sync", false, err.Error())
return err
}
o.noteStartupCheck("time-sync", true, "time synchronization healthy")
}
o.noteStartupCheckState("datastore-preflight", "running", "checking datastore endpoint health")
o.setStartupPhase("preflight-datastore", "checking k3s datastore endpoint")
if err := o.preflightExternalDatastore(ctx); err != nil {
o.noteStartupCheck("datastore-preflight", false, err.Error())
return err
}
o.noteStartupCheck("datastore-preflight", true, "datastore endpoint accepted connections")
o.bestEffort("sync local titan-iac checkout", func() error { return o.syncLocalIACRepo(ctx) })
o.bestEffort("refresh bootstrap cache from local repo", func() error { return o.refreshBootstrapCache(ctx) })
if o.cfg.Startup.ReconcileAccessOnBoot {
o.bestEffort("reconcile control-plane access", func() error { return o.reconcileNodeAccess(ctx, o.cfg.ControlPlanes) })
}
o.reportFluxSource(ctx, opts.ForceFluxBranch)
o.setStartupPhase("control-plane-start", "starting control-plane nodes")
o.startControlPlanes(ctx, o.cfg.ControlPlanes)
apiPoll := time.Duration(o.cfg.Startup.APIPollSeconds) * time.Second
apiAttempts := o.cfg.Startup.APIWaitSeconds / o.cfg.Startup.APIPollSeconds
if apiAttempts < 1 {
apiAttempts = 1
}
o.noteStartupCheckState("kubernetes-api", "running", "waiting for kubernetes api availability")
o.setStartupPhase("api-wait", "waiting for kubernetes api")
if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil {
if !o.cfg.Startup.AutoEtcdRestoreOnAPIFailure {
o.noteStartupCheck("kubernetes-api", false, err.Error())
return err
}
cp := strings.TrimSpace(o.cfg.Startup.EtcdRestoreControlPlane)
if cp == "" && len(o.cfg.ControlPlanes) > 0 {
cp = o.cfg.ControlPlanes[0]
}
o.log.Printf("warning: initial API wait failed (%v); attempting automatic etcd restore on %s", err, cp)
if restoreErr := o.EtcdRestore(ctx, EtcdRestoreOptions{ControlPlane: cp}); restoreErr != nil {
if errors.Is(restoreErr, ErrEtcdRestoreNotApplicable) {
o.log.Printf("warning: automatic etcd restore skipped: %v", restoreErr)
o.log.Printf("warning: retrying control-plane start because datastore recovery path is external")
o.startControlPlanes(ctx, o.cfg.ControlPlanes)
} else {
return fmt.Errorf("kubernetes API did not become reachable and automatic etcd restore failed: %w", restoreErr)
}
}
if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil {
o.noteStartupCheck("kubernetes-api", false, err.Error())
return fmt.Errorf("kubernetes API did not become reachable after automatic etcd restore: %w", err)
}
}
o.noteStartupCheck("kubernetes-api", true, "kubernetes api reachable")
if err := o.ensureRequiredNodeLabels(ctx); err != nil {
return err
}
desiredFluxBranch := strings.TrimSpace(opts.ForceFluxBranch)
if desiredFluxBranch == "" {
desiredFluxBranch = strings.TrimSpace(o.cfg.ExpectedFluxBranch)
}
allowFluxBranchPatch := strings.TrimSpace(opts.ForceFluxBranch) != ""
o.noteStartupCheckState("flux-source-guard", "running", "validating flux source url and branch drift")
o.setStartupPhase("flux-guard", "validating flux source url and branch")
if err := o.guardFluxSourceDrift(ctx, desiredFluxBranch, allowFluxBranchPatch); err != nil {
o.noteStartupCheck("flux-source-guard", false, err.Error())
return err
}
if err := o.ensureFluxBranch(ctx, desiredFluxBranch, allowFluxBranchPatch); err != nil {
o.noteStartupCheck("flux-source-guard", false, err.Error())
return err
}
o.noteStartupCheck("flux-source-guard", true, fmt.Sprintf("flux source branch/url validated (branch=%s)", desiredFluxBranch))
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("startup workers=%s", strings.Join(workers, ","))
o.setStartupPhase("worker-start", "starting and uncordoning worker nodes")
if o.cfg.Startup.ReconcileAccessOnBoot {
o.bestEffort("reconcile worker access", func() error { return o.reconcileNodeAccess(ctx, workers) })
}
o.startWorkers(ctx, workers)
o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) })
sshCheckNodes := append([]string{}, o.cfg.ControlPlanes...)
sshCheckNodes = append(sshCheckNodes, workers...)
if err := o.waitForNodeSSHAuth(ctx, sshCheckNodes); err != nil {
o.noteStartupCheck("node-ssh-auth", false, err.Error())
return err
}
o.noteStartupCheck("node-ssh-auth", true, fmt.Sprintf("nodes=%d", len(sshCheckNodes)))
needsLocalBootstrap := false
bootstrapReasons := []string{}
if !opts.SkipLocalBootstrap {
ready, readyErr := o.fluxSourceReady(ctx)
if readyErr != nil {
o.log.Printf("warning: unable to read flux source readiness: %v", readyErr)
needsLocalBootstrap = true
bootstrapReasons = append(bootstrapReasons, "flux source readiness check failed")
}
if !ready {
needsLocalBootstrap = true
bootstrapReasons = append(bootstrapReasons, "flux source not ready")
}
}
missing, missingErr := o.missingCriticalStartupWorkloads(ctx)
if missingErr != nil {
o.log.Printf("warning: unable to inspect critical startup workloads: %v", missingErr)
}
if len(missing) > 0 {
o.log.Printf("startup critical workloads not ready; applying targeted recovery first: %s", strings.Join(missing, ", "))
}
if o.cfg.Startup.RequireStorageReady {
o.noteStartupCheckState("storage-readiness", "running", "waiting for longhorn and critical pvc readiness")
o.setStartupPhase("storage-readiness", "waiting for longhorn and critical pvcs")
if err := o.waitForStorageReady(ctx); err != nil {
o.noteStartupCheck("storage-readiness", false, err.Error())
return err
}
o.noteStartupCheck("storage-readiness", true, "longhorn and critical PVCs ready")
}
o.noteStartupCheckState("critical-workloads", "running", "ensuring critical startup workloads have replicas")
o.setStartupPhase("critical-workloads", "ensuring critical startup workloads are ready")
if err := o.ensureCriticalStartupWorkloads(ctx); err != nil {
o.noteStartupCheck("critical-workloads", false, err.Error())
return err
}
o.noteStartupCheck("critical-workloads", true, "critical startup workloads are ready")
if !opts.SkipLocalBootstrap && needsLocalBootstrap {
if ready, err := o.waitForFluxSourceReady(ctx, 5*time.Minute); err != nil {
o.log.Printf("warning: flux source readiness wait failed before local bootstrap: %v", err)
} else if ready {
o.log.Printf("flux source became ready after targeted recovery; skipping local bootstrap")
needsLocalBootstrap = false
}
}
if !opts.SkipLocalBootstrap && needsLocalBootstrap {
o.log.Printf("startup bootstrap required after wait: %s", strings.Join(bootstrapReasons, "; "))
if err := o.bootstrapLocal(ctx); err != nil {
return err
}
if err := o.ensureCriticalStartupWorkloads(ctx); err != nil {
return err
}
ready, err := o.fluxSourceReady(ctx)
if err != nil {
return fmt.Errorf("flux source readiness after bootstrap: %w", err)
}
if !ready {
return fmt.Errorf("flux source still not ready after local bootstrap")
}
}
o.bestEffort("restore scaled workloads", func() error { return o.restoreScaledApps(ctx) })
o.noteStartupCheckState("flux-resume-reconcile", "running", "resuming flux and waiting for reconcile")
o.setStartupPhase("flux-resume", "resuming flux controllers and reconciling kustomizations")
if err := o.resumeFluxAndReconcile(ctx); err != nil {
o.noteStartupCheck("flux-resume-reconcile", false, err.Error())
return err
}
o.noteStartupCheck("flux-resume-reconcile", true, "flux resumed and reconciled")
resumedFlux = true
o.bestEffort("heal critical zero-replica workloads", func() error {
healed, healErr := o.healCriticalWorkloadReplicas(ctx)
if healErr != nil {
return healErr
}
if len(healed) > 0 {
sort.Strings(healed)
o.noteStartupAutoHeal(fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8)))
}
return nil
})
o.setStartupPhase("convergence-checks", "waiting for ingress, service, flux, workload, and stability checks")
if err := o.waitForStartupConvergence(ctx); err != nil {
return err
}
if o.cfg.Startup.RequirePostStartProbes {
o.noteStartupCheckState("post-start-probes", "running", "waiting for post-start probe endpoints")
o.setStartupPhase("post-start-probes", "running post-start probe checks")
if err := o.waitForPostStartProbes(ctx); err != nil {
o.noteStartupCheck("post-start-probes", false, err.Error())
return err
}
o.noteStartupCheck("post-start-probes", true, "post-start probes passed")
}
o.setStartupPhase("startup-complete", "startup workflow reached completion")
o.log.Printf("startup flow complete")
return nil
}
// EtcdRestore runs one orchestration or CLI step.
// Signature: (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions) error {
controlPlane := strings.TrimSpace(opts.ControlPlane)
if controlPlane == "" {
if len(o.cfg.ControlPlanes) == 0 {
return fmt.Errorf("cannot restore etcd: no control planes configured")
}
controlPlane = o.cfg.ControlPlanes[0]
}
found := false
for _, cp := range o.cfg.ControlPlanes {
if cp == controlPlane {
found = true
break
}
}
if !found {
return fmt.Errorf("cannot restore etcd: control plane %s is not in configured control_planes", controlPlane)
}
if !o.sshManaged(controlPlane) {
return fmt.Errorf("cannot restore etcd on %s: node not in ssh_managed_nodes", controlPlane)
}
snapshotPath := strings.TrimSpace(opts.SnapshotPath)
if o.runner.DryRun {
if snapshotPath == "" {
snapshotPath = "<latest-snapshot-on-" + controlPlane + ">"
}
o.log.Printf("etcd restore target=%s snapshot=%s (dry-run; datastore-mode and snapshot checks skipped)", controlPlane, snapshotPath)
return nil
}
externalDatastore, err := o.controlPlaneUsesExternalDatastore(ctx, controlPlane)
if err != nil {
return err
}
if externalDatastore {
return fmt.Errorf("%w: %s uses --datastore-endpoint", ErrEtcdRestoreNotApplicable, controlPlane)
}
if snapshotPath == "" {
resolved, err := o.latestEtcdSnapshotPath(ctx, controlPlane)
if err != nil {
return err
}
snapshotPath = resolved
}
if err := o.verifyEtcdSnapshot(ctx, controlPlane, snapshotPath); err != nil {
return err
}
o.log.Printf("etcd restore target=%s snapshot=%s", controlPlane, snapshotPath)
for _, cp := range o.cfg.ControlPlanes {
cp := cp
o.bestEffort("stop k3s before etcd restore on "+cp, func() error {
_, err := o.ssh(ctx, cp, "sudo systemctl stop k3s || true")
return err
})
}
if _, err := o.runSudoK3S(ctx, controlPlane, "server", "--cluster-reset", "--cluster-reset-restore-path", snapshotPath); err != nil {
return fmt.Errorf("etcd restore command failed on %s: %w", controlPlane, err)
}
o.log.Printf("etcd restore command completed on %s", controlPlane)
if _, err := o.ssh(ctx, controlPlane, "sudo systemctl start k3s || true"); err != nil {
return fmt.Errorf("failed to start k3s on restore control plane %s: %w", controlPlane, err)
}
time.Sleep(10 * time.Second)
for _, cp := range o.cfg.ControlPlanes {
cp := cp
if cp == controlPlane {
continue
}
o.bestEffort("start k3s after etcd restore on "+cp, func() error {
_, err := o.ssh(ctx, cp, "sudo systemctl start k3s || true")
return err
})
}
return nil
}
// Shutdown runs one orchestration or CLI step.
// Signature: (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) {
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
if err != nil {
return err
}
defer unlock()
if invErr := o.validateNodeInventory(); invErr != nil {
return invErr
}
record := state.RunRecord{
ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()),
Action: "shutdown",
Reason: opts.Reason,
DryRun: o.runner.DryRun,
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
if !o.runner.DryRun {
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil {
return fmt.Errorf("set shutdown intent: %w", writeErr)
}
defer func() {
final := state.IntentShuttingDown
if err == nil {
final = state.IntentShutdownComplete
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil {
o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr)
}
}()
}
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ","))
o.reportFluxSource(ctx, "")
skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot
if !skipEtcd {
o.bestEffort("etcd snapshot", func() error {
return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0])
})
}
o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) })
o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) })
skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain
if !skipDrain {
o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) })
}
shutdownMode := strings.TrimSpace(opts.Mode)
effectiveMode, modeErr := normalizeShutdownMode(shutdownMode)
if modeErr != nil {
return modeErr
}
o.log.Printf("shutdown execution mode=%s (requested=%q)", effectiveMode, shutdownMode)
o.stopWorkers(ctx, workers)
o.stopControlPlanes(ctx, o.cfg.ControlPlanes)
o.log.Printf("shutdown flow complete")
return nil
}
// normalizeShutdownMode runs one orchestration or CLI step.
// Signature: normalizeShutdownMode(raw string) (string, error).
// Why: keeps shutdown behavior explicit and safe by allowing only cluster-only
// semantics while preserving compatibility with legacy "config" callers.
func normalizeShutdownMode(raw string) (string, error) {
switch strings.TrimSpace(raw) {
case "", "config", "cluster-only":
return "cluster-only", nil
case "poweroff":
return "", fmt.Errorf("shutdown mode %q has been removed; ananke no longer powers off hosts", raw)
default:
return "", fmt.Errorf("unsupported shutdown mode %q (expected config|cluster-only)", raw)
}
}