ananke/internal/cluster/orchestrator.go

2500 lines
73 KiB
Go

package cluster
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"log"
"net"
neturl "net/url"
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"scm.bstein.dev/bstein/hecate/internal/config"
"scm.bstein.dev/bstein/hecate/internal/execx"
"scm.bstein.dev/bstein/hecate/internal/sshutil"
"scm.bstein.dev/bstein/hecate/internal/state"
)
type Orchestrator struct {
cfg config.Config
runner *execx.Runner
store *state.Store
log *log.Logger
}
type StartupOptions struct {
ForceFluxBranch string
SkipLocalBootstrap bool
Reason string
}
type ShutdownOptions struct {
SkipEtcdSnapshot bool
SkipDrain bool
Reason string
}
type EtcdRestoreOptions struct {
ControlPlane string
SnapshotPath string
}
type startupWorkload struct {
Namespace string
Kind string
Name string
}
type workloadScaleEntry struct {
Namespace string `json:"namespace"`
Kind string `json:"kind"`
Name string `json:"name"`
Replicas int `json:"replicas"`
}
type workloadScaleSnapshot struct {
GeneratedAt time.Time `json:"generated_at"`
Entries []workloadScaleEntry `json:"entries"`
}
var datastoreEndpointPattern = regexp.MustCompile(`--datastore-endpoint(?:=|\s+)(?:'([^']+)'|"([^"]+)"|([^\s\\]+))`)
var criticalStartupWorkloads = []startupWorkload{
{Namespace: "flux-system", Kind: "deployment", Name: "source-controller"},
{Namespace: "flux-system", Kind: "deployment", Name: "kustomize-controller"},
{Namespace: "flux-system", Kind: "deployment", Name: "helm-controller"},
{Namespace: "flux-system", Kind: "deployment", Name: "notification-controller"},
{Namespace: "vault", Kind: "statefulset", Name: "vault"},
{Namespace: "postgres", Kind: "statefulset", Name: "postgres"},
{Namespace: "gitea", Kind: "deployment", Name: "gitea"},
}
var ErrEtcdRestoreNotApplicable = errors.New("etcd restore not applicable")
func New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator {
return &Orchestrator{cfg: cfg, runner: runner, store: store, log: logger}
}
func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) {
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
if err != nil {
return err
}
defer unlock()
record := state.RunRecord{
ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()),
Action: "startup",
Reason: opts.Reason,
DryRun: o.runner.DryRun,
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
resumedFlux := false
defer func() {
if o.runner.DryRun || err == nil || resumedFlux {
return
}
o.log.Printf("warning: startup failed before normal flux resume; attempting best-effort recovery resume")
o.bestEffort("restore scaled workloads after failed startup", func() error { return o.restoreScaledApps(ctx) })
o.bestEffort("resume flux after failed startup", func() error { return o.resumeFluxAndReconcile(ctx) })
}()
if !o.runner.DryRun {
currentIntent, readErr := state.ReadIntent(o.cfg.State.IntentPath)
if readErr != nil {
return fmt.Errorf("read startup intent: %w", readErr)
}
if currentIntent.State == state.IntentStartupInProgress {
o.log.Printf("warning: detected stale startup intent from a previous interrupted run; clearing it before continuing")
if clearErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, "auto-clear stale startup intent", "startup"); clearErr != nil {
return fmt.Errorf("clear stale startup intent: %w", clearErr)
}
currentIntent = state.Intent{State: state.IntentNormal}
}
if currentIntent.State == state.IntentShuttingDown {
if intentFresh(currentIntent, o.startupGuardAge()) {
return fmt.Errorf("startup blocked: shutdown intent is active (%s)", currentIntent.Reason)
}
o.log.Printf("warning: local shutdown intent appears stale (updated_at=%s reason=%q); auto-clearing to continue startup",
currentIntent.UpdatedAt.Format(time.RFC3339), currentIntent.Reason)
if clearErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, "auto-clear stale shutdown intent", "startup"); clearErr != nil {
return fmt.Errorf("clear stale shutdown intent: %w", clearErr)
}
currentIntent = state.Intent{State: state.IntentNormal}
}
cooldown := o.startupShutdownCooldown()
if currentIntent.State == state.IntentShutdownComplete && intentFresh(currentIntent, cooldown) {
elapsed := intentAge(currentIntent)
remaining := cooldown - elapsed
if remaining < time.Second {
remaining = time.Second
}
o.log.Printf("startup cooldown active: last shutdown completed %s ago; waiting %s", elapsed.Round(time.Second), remaining.Round(time.Second))
timer := time.NewTimer(remaining)
select {
case <-ctx.Done():
timer.Stop()
return fmt.Errorf("startup canceled while waiting for shutdown cooldown: %w", ctx.Err())
case <-timer.C:
}
refreshed, readErr := state.ReadIntent(o.cfg.State.IntentPath)
if readErr != nil {
return fmt.Errorf("re-read startup intent after cooldown wait: %w", readErr)
}
currentIntent = refreshed
if currentIntent.State == state.IntentShuttingDown && intentFresh(currentIntent, o.startupGuardAge()) {
return fmt.Errorf("startup blocked: shutdown intent became active during cooldown wait (%s)", currentIntent.Reason)
}
if currentIntent.State == state.IntentShutdownComplete && intentFresh(currentIntent, cooldown) {
return fmt.Errorf("startup blocked: shutdown completed too recently (%s ago)", intentAge(currentIntent).Round(time.Second))
}
}
if err := o.guardPeerStartupIntents(ctx); err != nil {
return err
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentStartupInProgress, opts.Reason, "startup"); writeErr != nil {
return fmt.Errorf("set startup intent: %w", writeErr)
}
defer func() {
finalReason := opts.Reason
if err != nil {
finalReason = fmt.Sprintf("%s (failed)", strings.TrimSpace(opts.Reason))
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, finalReason, "startup"); writeErr != nil {
o.log.Printf("warning: write startup completion intent failed: %v", writeErr)
}
}()
}
o.log.Printf("startup control-planes=%s", strings.Join(o.cfg.ControlPlanes, ","))
if o.cfg.Startup.RequireTimeSync {
if err := o.waitForTimeSync(ctx, o.cfg.ControlPlanes); err != nil {
return err
}
}
if err := o.preflightExternalDatastore(ctx); err != nil {
return err
}
o.bestEffort("sync local titan-iac checkout", func() error { return o.syncLocalIACRepo(ctx) })
o.bestEffort("refresh bootstrap cache from local repo", func() error { return o.refreshBootstrapCache(ctx) })
if o.cfg.Startup.ReconcileAccessOnBoot {
o.bestEffort("reconcile control-plane access", func() error { return o.reconcileNodeAccess(ctx, o.cfg.ControlPlanes) })
}
o.reportFluxSource(ctx, opts.ForceFluxBranch)
o.startControlPlanes(ctx, o.cfg.ControlPlanes)
apiPoll := time.Duration(o.cfg.Startup.APIPollSeconds) * time.Second
apiAttempts := o.cfg.Startup.APIWaitSeconds / o.cfg.Startup.APIPollSeconds
if apiAttempts < 1 {
apiAttempts = 1
}
if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil {
if !o.cfg.Startup.AutoEtcdRestoreOnAPIFailure {
return err
}
cp := strings.TrimSpace(o.cfg.Startup.EtcdRestoreControlPlane)
if cp == "" && len(o.cfg.ControlPlanes) > 0 {
cp = o.cfg.ControlPlanes[0]
}
o.log.Printf("warning: initial API wait failed (%v); attempting automatic etcd restore on %s", err, cp)
if restoreErr := o.EtcdRestore(ctx, EtcdRestoreOptions{ControlPlane: cp}); restoreErr != nil {
if errors.Is(restoreErr, ErrEtcdRestoreNotApplicable) {
o.log.Printf("warning: automatic etcd restore skipped: %v", restoreErr)
o.log.Printf("warning: retrying control-plane start because datastore recovery path is external")
o.startControlPlanes(ctx, o.cfg.ControlPlanes)
} else {
return fmt.Errorf("kubernetes API did not become reachable and automatic etcd restore failed: %w", restoreErr)
}
}
if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil {
return fmt.Errorf("kubernetes API did not become reachable after automatic etcd restore: %w", err)
}
}
desiredFluxBranch := strings.TrimSpace(opts.ForceFluxBranch)
if desiredFluxBranch == "" {
desiredFluxBranch = strings.TrimSpace(o.cfg.ExpectedFluxBranch)
}
if err := o.ensureFluxBranch(ctx, desiredFluxBranch); err != nil {
return err
}
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("startup workers=%s", strings.Join(workers, ","))
if o.cfg.Startup.ReconcileAccessOnBoot {
o.bestEffort("reconcile worker access", func() error { return o.reconcileNodeAccess(ctx, workers) })
}
o.startWorkers(ctx, workers)
o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) })
needsLocalBootstrap := false
bootstrapReasons := []string{}
if !opts.SkipLocalBootstrap {
ready, readyErr := o.fluxSourceReady(ctx)
if readyErr != nil {
o.log.Printf("warning: unable to read flux source readiness: %v", readyErr)
needsLocalBootstrap = true
bootstrapReasons = append(bootstrapReasons, "flux source readiness check failed")
}
if !ready {
needsLocalBootstrap = true
bootstrapReasons = append(bootstrapReasons, "flux source not ready")
}
}
missing, missingErr := o.missingCriticalStartupWorkloads(ctx)
if missingErr != nil {
o.log.Printf("warning: unable to inspect critical startup workloads: %v", missingErr)
}
if len(missing) > 0 {
o.log.Printf("startup critical workloads not ready; applying targeted recovery first: %s", strings.Join(missing, ", "))
}
if o.cfg.Startup.RequireStorageReady {
if err := o.waitForStorageReady(ctx); err != nil {
return err
}
}
if err := o.ensureCriticalStartupWorkloads(ctx); err != nil {
return err
}
if !opts.SkipLocalBootstrap && needsLocalBootstrap {
if ready, err := o.waitForFluxSourceReady(ctx, 5*time.Minute); err != nil {
o.log.Printf("warning: flux source readiness wait failed before local bootstrap: %v", err)
} else if ready {
o.log.Printf("flux source became ready after targeted recovery; skipping local bootstrap")
needsLocalBootstrap = false
}
}
if !opts.SkipLocalBootstrap && needsLocalBootstrap {
o.log.Printf("startup bootstrap required after wait: %s", strings.Join(bootstrapReasons, "; "))
if err := o.bootstrapLocal(ctx); err != nil {
return err
}
if err := o.ensureCriticalStartupWorkloads(ctx); err != nil {
return err
}
ready, err := o.fluxSourceReady(ctx)
if err != nil {
return fmt.Errorf("flux source readiness after bootstrap: %w", err)
}
if !ready {
return fmt.Errorf("flux source still not ready after local bootstrap")
}
}
o.bestEffort("restore scaled workloads", func() error { return o.restoreScaledApps(ctx) })
if err := o.resumeFluxAndReconcile(ctx); err != nil {
return err
}
resumedFlux = true
if o.cfg.Startup.RequirePostStartProbes {
if err := o.waitForPostStartProbes(ctx); err != nil {
return err
}
}
o.log.Printf("startup flow complete")
return nil
}
func (o *Orchestrator) EtcdRestore(ctx context.Context, opts EtcdRestoreOptions) error {
controlPlane := strings.TrimSpace(opts.ControlPlane)
if controlPlane == "" {
if len(o.cfg.ControlPlanes) == 0 {
return fmt.Errorf("cannot restore etcd: no control planes configured")
}
controlPlane = o.cfg.ControlPlanes[0]
}
found := false
for _, cp := range o.cfg.ControlPlanes {
if cp == controlPlane {
found = true
break
}
}
if !found {
return fmt.Errorf("cannot restore etcd: control plane %s is not in configured control_planes", controlPlane)
}
if !o.sshManaged(controlPlane) {
return fmt.Errorf("cannot restore etcd on %s: node not in ssh_managed_nodes", controlPlane)
}
snapshotPath := strings.TrimSpace(opts.SnapshotPath)
if o.runner.DryRun {
if snapshotPath == "" {
snapshotPath = "<latest-snapshot-on-" + controlPlane + ">"
}
o.log.Printf("etcd restore target=%s snapshot=%s (dry-run; datastore-mode and snapshot checks skipped)", controlPlane, snapshotPath)
return nil
}
externalDatastore, err := o.controlPlaneUsesExternalDatastore(ctx, controlPlane)
if err != nil {
return err
}
if externalDatastore {
return fmt.Errorf("%w: %s uses --datastore-endpoint", ErrEtcdRestoreNotApplicable, controlPlane)
}
if snapshotPath == "" {
resolved, err := o.latestEtcdSnapshotPath(ctx, controlPlane)
if err != nil {
return err
}
snapshotPath = resolved
}
if err := o.verifyEtcdSnapshot(ctx, controlPlane, snapshotPath); err != nil {
return err
}
o.log.Printf("etcd restore target=%s snapshot=%s", controlPlane, snapshotPath)
for _, cp := range o.cfg.ControlPlanes {
cp := cp
o.bestEffort("stop k3s before etcd restore on "+cp, func() error {
_, err := o.ssh(ctx, cp, "sudo systemctl stop k3s || true")
return err
})
}
if _, err := o.runSudoK3S(ctx, controlPlane, "server", "--cluster-reset", "--cluster-reset-restore-path", snapshotPath); err != nil {
return fmt.Errorf("etcd restore command failed on %s: %w", controlPlane, err)
}
o.log.Printf("etcd restore command completed on %s", controlPlane)
if _, err := o.ssh(ctx, controlPlane, "sudo systemctl start k3s || true"); err != nil {
return fmt.Errorf("failed to start k3s on restore control plane %s: %w", controlPlane, err)
}
time.Sleep(10 * time.Second)
for _, cp := range o.cfg.ControlPlanes {
cp := cp
if cp == controlPlane {
continue
}
o.bestEffort("start k3s after etcd restore on "+cp, func() error {
_, err := o.ssh(ctx, cp, "sudo systemctl start k3s || true")
return err
})
}
return nil
}
func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) {
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
if err != nil {
return err
}
defer unlock()
record := state.RunRecord{
ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()),
Action: "shutdown",
Reason: opts.Reason,
DryRun: o.runner.DryRun,
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
if !o.runner.DryRun {
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil {
return fmt.Errorf("set shutdown intent: %w", writeErr)
}
defer func() {
final := state.IntentShuttingDown
if err == nil {
final = state.IntentShutdownComplete
}
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil {
o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr)
}
}()
}
workers, err := o.effectiveWorkers(ctx)
if err != nil {
return err
}
o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ","))
o.reportFluxSource(ctx, "")
skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot
if !skipEtcd {
o.bestEffort("etcd snapshot", func() error {
return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0])
})
}
o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) })
o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) })
skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain
if !skipDrain {
o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) })
}
o.stopWorkers(ctx, workers)
o.stopControlPlanes(ctx, o.cfg.ControlPlanes)
if o.cfg.Shutdown.PoweroffEnabled {
o.bestEffort("poweroff hosts", func() error { return o.poweroffHosts(ctx, workers) })
}
o.log.Printf("shutdown flow complete")
return nil
}
func (o *Orchestrator) EstimatedShutdownSeconds() int {
return o.store.ShutdownP95WithMinSamples(o.cfg.Shutdown.DefaultBudgetSeconds, o.cfg.Shutdown.HistoryMinSamples)
}
func (o *Orchestrator) EstimatedEmergencyShutdownSeconds() int {
return o.store.ShutdownP95ByReasonPrefix(
o.cfg.Shutdown.EmergencyBudgetSec,
o.cfg.Shutdown.EmergencyMinSamples,
[]string{"ups-", "emergency-", "drill-emergency"},
)
}
func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) {
record.EndedAt = time.Now().UTC()
record.DurationSeconds = int(record.EndedAt.Sub(record.StartedAt).Seconds())
record.Success = *err == nil
if *err != nil {
record.Error = (*err).Error()
}
if appendErr := o.store.Append(*record); appendErr != nil {
o.log.Printf("warning: append run record failed: %v", appendErr)
}
}
func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) {
if len(o.cfg.Workers) > 0 {
return append([]string{}, o.cfg.Workers...), nil
}
workers, err := o.discoverWorkers(ctx)
if err == nil {
return workers, nil
}
fallback := o.fallbackWorkersFromInventory()
if len(fallback) == 0 {
return nil, err
}
o.log.Printf("warning: worker discovery failed via kubernetes API (%v); falling back to inventory workers=%s", err, strings.Join(fallback, ","))
return fallback, nil
}
func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 15*time.Second,
"get", "nodes",
"-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master",
"--no-headers",
)
if err != nil {
return nil, fmt.Errorf("discover workers: %w", err)
}
var workers []string
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 3 {
continue
}
if fields[1] == "<none>" && fields[2] == "<none>" {
workers = append(workers, fields[0])
}
}
if len(workers) == 0 {
return nil, fmt.Errorf("no workers discovered")
}
return workers, nil
}
func (o *Orchestrator) fallbackWorkersFromInventory() []string {
cp := make(map[string]struct{}, len(o.cfg.ControlPlanes))
for _, node := range o.cfg.ControlPlanes {
cp[strings.TrimSpace(node)] = struct{}{}
}
candidates := make(map[string]struct{})
add := func(node string) {
name := strings.TrimSpace(node)
if name == "" {
return
}
if _, isCP := cp[name]; isCP {
return
}
candidates[name] = struct{}{}
}
for _, node := range o.cfg.SSHManagedNodes {
add(node)
}
if len(candidates) == 0 {
for node := range o.cfg.SSHNodeHosts {
add(node)
}
}
workers := make([]string, 0, len(candidates))
for node := range candidates {
workers = append(workers, node)
}
sort.Strings(workers)
return workers
}
func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error {
patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend)
ksOut, err := o.kubectl(ctx, 20*time.Second,
"-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}",
)
if err != nil {
return err
}
for _, ks := range lines(ksOut) {
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch)
if patchErr != nil {
o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr)
}
}
hrOut, err := o.kubectl(ctx, 25*time.Second,
"get", "helmreleases.helm.toolkit.fluxcd.io", "-A",
"-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}",
)
if err != nil {
return err
}
for _, hr := range lines(hrOut) {
parts := strings.SplitN(hr, "/", 2)
if len(parts) != 2 {
continue
}
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch)
if patchErr != nil {
o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr)
}
}
return nil
}
func (o *Orchestrator) scaleDownApps(ctx context.Context) error {
targets, err := o.listScalableWorkloads(ctx)
if err != nil {
return err
}
if err := o.writeScaledWorkloadSnapshot(targets); err != nil {
return err
}
if len(targets) == 0 {
o.log.Printf("scale down apps: no workloads above 0 replicas")
return nil
}
parallelism := o.cfg.Shutdown.ScaleParallelism
if parallelism <= 0 {
parallelism = 8
}
o.log.Printf("scale down apps targets=%d parallelism=%d", len(targets), parallelism)
return o.scaleWorkloads(ctx, targets, 0, parallelism)
}
func (o *Orchestrator) restoreScaledApps(ctx context.Context) error {
snapshot, err := o.readScaledWorkloadSnapshot()
if err != nil {
return err
}
if snapshot == nil || len(snapshot.Entries) == 0 {
o.log.Printf("restore scaled workloads: no snapshot entries to restore")
return nil
}
parallelism := o.cfg.Shutdown.ScaleParallelism
if parallelism <= 0 {
parallelism = 8
}
o.log.Printf("restore scaled workloads entries=%d parallelism=%d snapshot_at=%s", len(snapshot.Entries), parallelism, snapshot.GeneratedAt.Format(time.RFC3339))
if err := o.scaleWorkloads(ctx, snapshot.Entries, -1, parallelism); err != nil {
return err
}
if o.runner.DryRun {
return nil
}
if err := os.Remove(o.scaledWorkloadSnapshotPath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove scaled workload snapshot: %w", err)
}
return nil
}
func (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error) {
exclude := map[string]struct{}{}
for _, ns := range o.cfg.ExcludedNamespaces {
exclude[strings.TrimSpace(ns)] = struct{}{}
}
collect := func(kind string) ([]workloadScaleEntry, error) {
out, err := o.kubectl(
ctx,
25*time.Second,
"get",
kind,
"-A",
"-o",
"jsonpath={range .items[*]}{.metadata.namespace}{'\\t'}{.metadata.name}{'\\t'}{.spec.replicas}{'\\n'}{end}",
)
if err != nil {
return nil, err
}
var entries []workloadScaleEntry
for _, line := range lines(out) {
parts := strings.Split(line, "\t")
if len(parts) < 3 {
continue
}
ns := strings.TrimSpace(parts[0])
if _, skip := exclude[ns]; skip {
continue
}
replicas, convErr := strconv.Atoi(strings.TrimSpace(parts[2]))
if convErr != nil || replicas <= 0 {
continue
}
entries = append(entries, workloadScaleEntry{
Namespace: ns,
Kind: kind,
Name: strings.TrimSpace(parts[1]),
Replicas: replicas,
})
}
return entries, nil
}
deployments, err := collect("deployment")
if err != nil {
return nil, fmt.Errorf("collect deployments: %w", err)
}
statefulsets, err := collect("statefulset")
if err != nil {
return nil, fmt.Errorf("collect statefulsets: %w", err)
}
targets := append(deployments, statefulsets...)
sort.Slice(targets, func(i, j int) bool {
a, b := targets[i], targets[j]
if a.Namespace != b.Namespace {
return a.Namespace < b.Namespace
}
if a.Kind != b.Kind {
return a.Kind < b.Kind
}
return a.Name < b.Name
})
return targets, nil
}
func (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error {
if len(entries) == 0 {
return nil
}
if parallelism <= 0 {
parallelism = 1
}
if parallelism > len(entries) {
parallelism = len(entries)
}
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
errCh := make(chan error, len(entries))
for _, entry := range entries {
entry := entry
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
replicas := entry.Replicas
if forceReplicas >= 0 {
replicas = forceReplicas
}
if _, err := o.kubectl(
ctx,
20*time.Second,
"-n",
entry.Namespace,
"scale",
entry.Kind,
entry.Name,
fmt.Sprintf("--replicas=%d", replicas),
); err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: skip missing workload while scaling %s/%s/%s", entry.Namespace, entry.Kind, entry.Name)
return
}
errCh <- fmt.Errorf("scale %s/%s/%s -> %d: %w", entry.Namespace, entry.Kind, entry.Name, replicas, err)
}
}()
}
wg.Wait()
close(errCh)
errorCount := len(errCh)
if errorCount == 0 {
return nil
}
errs := make([]string, 0, len(errCh))
for err := range errCh {
errs = append(errs, err.Error())
if len(errs) >= 5 {
break
}
}
return fmt.Errorf("scaling had %d errors (first: %s)", errorCount, strings.Join(errs, " | "))
}
func (o *Orchestrator) scaledWorkloadSnapshotPath() string {
return filepath.Join(o.cfg.State.Dir, "scaled-workloads.json")
}
func (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error {
if o.runner.DryRun {
return nil
}
if err := os.MkdirAll(o.cfg.State.Dir, 0o755); err != nil {
return fmt.Errorf("ensure state dir: %w", err)
}
payload := workloadScaleSnapshot{
GeneratedAt: time.Now().UTC(),
Entries: entries,
}
b, err := json.MarshalIndent(payload, "", " ")
if err != nil {
return fmt.Errorf("marshal scaled workload snapshot: %w", err)
}
if err := os.WriteFile(o.scaledWorkloadSnapshotPath(), b, 0o644); err != nil {
return fmt.Errorf("write scaled workload snapshot: %w", err)
}
return nil
}
func (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error) {
if o.runner.DryRun {
return nil, nil
}
b, err := os.ReadFile(o.scaledWorkloadSnapshotPath())
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("read scaled workload snapshot: %w", err)
}
var snapshot workloadScaleSnapshot
if err := json.Unmarshal(b, &snapshot); err != nil {
return nil, fmt.Errorf("decode scaled workload snapshot: %w", err)
}
return &snapshot, nil
}
type drainFailure struct {
node string
err error
details string
}
func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error {
total := len(workers)
if total == 0 {
return nil
}
parallelism := o.cfg.Shutdown.DrainParallelism
if parallelism <= 0 {
parallelism = 6
}
if parallelism > total {
parallelism = total
}
o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism)
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
errCh := make(chan drainFailure, total)
for idx, node := range workers {
idx := idx
node := node
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
o.log.Printf("drain worker %d/%d: %s", idx+1, total, node)
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil {
o.log.Printf("warning: cordon %s failed: %v", node, err)
}
if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil {
details := o.drainNodeDiagnostics(ctx, node)
errCh <- drainFailure{
node: node,
err: fmt.Errorf("drain %s failed: %w", node, err),
details: details,
}
return
}
}()
}
wg.Wait()
close(errCh)
if len(errCh) == 0 {
return nil
}
failures := make([]drainFailure, 0, len(errCh))
for failure := range errCh {
failures = append(failures, failure)
}
count := len(failures)
samples := []string{}
for _, failure := range failures {
msg := failure.err.Error()
if strings.TrimSpace(failure.details) != "" {
msg = fmt.Sprintf("%s (details: %s)", msg, failure.details)
}
samples = append(samples, msg)
if len(samples) >= 4 {
break
}
}
return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | "))
}
func (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string {
out, err := o.kubectl(
ctx,
20*time.Second,
"get",
"pods",
"-A",
"--field-selector", "spec.nodeName="+node,
"-o",
"custom-columns=NS:.metadata.namespace,NAME:.metadata.name,PHASE:.status.phase,OWNER:.metadata.ownerReferences[0].kind",
"--no-headers",
)
if err != nil {
if strings.TrimSpace(out) == "" {
return fmt.Sprintf("diagnostics unavailable: %v", err)
}
return fmt.Sprintf("diagnostics unavailable: %v (%s)", err, strings.Join(lines(out), "; "))
}
blockers := make([]string, 0, 6)
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 4 {
continue
}
namespace := fields[0]
name := fields[1]
phase := fields[2]
owner := fields[3]
if strings.EqualFold(owner, "DaemonSet") {
continue
}
if strings.EqualFold(phase, "Succeeded") || strings.EqualFold(phase, "Failed") {
continue
}
blockers = append(blockers, fmt.Sprintf("%s/%s(phase=%s owner=%s)", namespace, name, phase, owner))
if len(blockers) >= 6 {
break
}
}
if len(blockers) == 0 {
return "no non-daemonset blocking pods found on node"
}
return strings.Join(blockers, ", ")
}
func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error {
for _, node := range workers {
if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil {
o.log.Printf("warning: uncordon %s failed: %v", node, err)
}
}
return nil
}
func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true")
}
func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true")
}
func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true")
}
func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true")
}
func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) {
if len(nodes) == 0 {
return
}
parallelism := o.cfg.Shutdown.SSHParallelism
if parallelism <= 0 {
parallelism = 8
}
if parallelism > len(nodes) {
parallelism = len(nodes)
}
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
for _, node := range nodes {
node := node
if !o.sshManaged(node) {
o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node)
continue
}
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
o.bestEffort(action+" on "+node, func() error {
_, err := o.ssh(ctx, node, command)
return err
})
}()
}
wg.Wait()
}
func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error {
if !o.sshManaged(node) {
return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node)
}
name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405")
_, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "save", "--name", name)
return err
}
func (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error) {
if !o.sshManaged(node) {
return "", fmt.Errorf("cannot resolve etcd snapshot on %s: node not in ssh_managed_nodes", node)
}
out, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls")
if err != nil {
return "", fmt.Errorf("resolve latest etcd snapshot on %s: %w", node, err)
}
snapshot := parseSnapshotPathFromEtcdSnapshotList(out)
if snapshot == "" {
return "", fmt.Errorf("no etcd snapshots found on %s under /var/lib/rancher/k3s/server/db/snapshots", node)
}
return snapshot, nil
}
func parseSnapshotPathFromEtcdSnapshotList(out string) string {
for _, line := range lines(out) {
trimmed := strings.TrimSpace(line)
if trimmed == "" {
continue
}
lower := strings.ToLower(trimmed)
if strings.HasPrefix(lower, "name") && strings.Contains(lower, "location") {
continue
}
for _, field := range strings.Fields(trimmed) {
candidate := strings.Trim(strings.TrimSpace(field), "\",")
candidate = strings.TrimPrefix(candidate, "file://")
if strings.HasPrefix(candidate, "/var/lib/rancher/k3s/server/db/snapshots/") {
return candidate
}
}
}
return ""
}
func intentAge(in state.Intent) time.Duration {
if in.UpdatedAt.IsZero() {
return 0
}
return time.Since(in.UpdatedAt)
}
func intentFresh(in state.Intent, maxAge time.Duration) bool {
if in.UpdatedAt.IsZero() {
return true
}
return intentAge(in) <= maxAge
}
func (o *Orchestrator) startupGuardAge() time.Duration {
seconds := o.cfg.Coordination.StartupGuardMaxAgeSec
if seconds <= 0 {
seconds = 900
}
return time.Duration(seconds) * time.Second
}
func (o *Orchestrator) startupShutdownCooldown() time.Duration {
seconds := o.cfg.Startup.ShutdownCooldownSeconds
if seconds <= 0 {
seconds = 45
}
return time.Duration(seconds) * time.Second
}
func (o *Orchestrator) coordinationPeers() []string {
seen := map[string]struct{}{}
out := make([]string, 0, len(o.cfg.Coordination.PeerHosts)+1)
add := func(node string) {
node = strings.TrimSpace(node)
if node == "" {
return
}
if _, ok := seen[node]; ok {
return
}
seen[node] = struct{}{}
out = append(out, node)
}
for _, node := range o.cfg.Coordination.PeerHosts {
add(node)
}
if strings.TrimSpace(o.cfg.Coordination.ForwardShutdownHost) != "" {
add(o.cfg.Coordination.ForwardShutdownHost)
}
return out
}
func (o *Orchestrator) guardPeerStartupIntents(ctx context.Context) error {
peers := o.coordinationPeers()
if len(peers) == 0 {
return nil
}
guardAge := o.startupGuardAge()
for _, peer := range peers {
intent, err := o.readRemoteIntent(ctx, peer)
if err != nil {
o.log.Printf("warning: peer startup guard skipped intent check for %s: %v", peer, err)
continue
}
switch intent.State {
case "", state.IntentNormal:
continue
case state.IntentShuttingDown:
if intentFresh(intent, guardAge) {
return fmt.Errorf("startup blocked: peer %s has active shutdown intent (reason=%q age=%s)", peer, intent.Reason, intentAge(intent).Round(time.Second))
}
o.log.Printf("warning: peer %s shutdown intent appears stale; allowing startup", peer)
case state.IntentStartupInProgress:
if intentFresh(intent, guardAge) {
return fmt.Errorf("startup blocked: peer %s reports startup_in_progress (reason=%q age=%s)", peer, intent.Reason, intentAge(intent).Round(time.Second))
}
o.log.Printf("warning: peer %s startup intent appears stale; allowing startup", peer)
case state.IntentShutdownComplete:
if intentFresh(intent, o.startupShutdownCooldown()) {
return fmt.Errorf("startup blocked: peer %s completed shutdown too recently (age=%s)", peer, intentAge(intent).Round(time.Second))
}
default:
o.log.Printf("warning: peer %s intent state %q is unknown; ignoring", peer, intent.State)
}
}
return nil
}
func (o *Orchestrator) readRemoteIntent(ctx context.Context, node string) (state.Intent, error) {
if !o.sshManaged(node) {
return state.Intent{}, fmt.Errorf("%s is not in ssh_managed_nodes", node)
}
out, err := o.ssh(ctx, node, "sudo -n /usr/local/bin/hecate intent --config /etc/hecate/hecate.yaml")
if err != nil {
return state.Intent{}, err
}
in, err := state.ParseIntentOutput(out)
if err != nil {
return state.Intent{}, fmt.Errorf("parse remote intent output: %w", err)
}
return in, nil
}
func shellQuote(v string) string {
return "'" + strings.ReplaceAll(v, "'", `'"'"'`) + "'"
}
func (o *Orchestrator) verifyEtcdSnapshot(ctx context.Context, node string, snapshotPath string) error {
if o.runner.DryRun {
return nil
}
path := strings.TrimSpace(snapshotPath)
if path == "" {
return fmt.Errorf("etcd snapshot verification failed: snapshot path is empty")
}
quoted := shellQuote(path)
sizeOut, err := o.ssh(ctx, node, fmt.Sprintf("sudo -n sh -lc 'test -s %s && stat -c %%s %s'", quoted, quoted))
if err != nil {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: %w", path, node, err)
}
size, convErr := strconv.ParseInt(strings.TrimSpace(sizeOut), 10, 64)
if convErr != nil {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: parse size %q: %w", path, node, strings.TrimSpace(sizeOut), convErr)
}
const minSnapshotBytes = int64(1 << 20) // 1 MiB sanity floor.
if size < minSnapshotBytes {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: snapshot too small (%d bytes)", path, node, size)
}
lsOut, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls")
if err != nil {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: list snapshots: %w", path, node, err)
}
if !strings.Contains(lsOut, path) && !strings.Contains(lsOut, filepath.Base(path)) {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: snapshot is not present in k3s etcd-snapshot ls output", path, node)
}
sumOut, err := o.ssh(ctx, node, fmt.Sprintf("sudo -n sh -lc 'sha256sum %s | awk \"{print \\$1}\"'", quoted))
if err != nil {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: sha256: %w", path, node, err)
}
hash := strings.TrimSpace(sumOut)
if len(hash) != 64 {
return fmt.Errorf("etcd snapshot verification failed for %s on %s: invalid sha256 %q", path, node, hash)
}
o.log.Printf("etcd snapshot verified path=%s size_bytes=%d sha256=%s", path, size, hash[:12])
return nil
}
func (o *Orchestrator) runSudoK3S(ctx context.Context, node string, args ...string) (string, error) {
k3sPaths := []string{
"/usr/local/bin/k3s",
"/usr/bin/k3s",
"k3s",
}
var lastErr error
for _, path := range k3sPaths {
parts := []string{"sudo", "-n", path}
parts = append(parts, args...)
command := strings.Join(parts, " ")
out, err := o.ssh(ctx, node, command)
if err == nil {
return out, nil
}
lastErr = err
}
if lastErr == nil {
lastErr = fmt.Errorf("no k3s executable candidates configured")
}
return "", lastErr
}
func (o *Orchestrator) controlPlaneUsesExternalDatastore(ctx context.Context, node string) (bool, error) {
out, err := o.ssh(ctx, node, "sudo systemctl cat k3s")
if err != nil {
return false, fmt.Errorf("inspect k3s service on %s for datastore mode: %w", node, err)
}
return strings.Contains(out, "--datastore-endpoint="), nil
}
func (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error {
if o.runner.DryRun {
return nil
}
for i := 0; i < attempts; i++ {
_, err := o.kubectl(ctx, 5*time.Second, "version", "--request-timeout=5s")
if err == nil {
return nil
}
time.Sleep(sleep)
}
return fmt.Errorf("kubernetes API did not become reachable within timeout")
}
func (o *Orchestrator) waitForTimeSync(ctx context.Context, nodes []string) error {
if o.runner.DryRun {
return nil
}
wait := time.Duration(o.cfg.Startup.TimeSyncWaitSeconds) * time.Second
if wait <= 0 {
wait = 240 * time.Second
}
poll := time.Duration(o.cfg.Startup.TimeSyncPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
mode := strings.ToLower(strings.TrimSpace(o.cfg.Startup.TimeSyncMode))
if mode == "" {
mode = "strict"
}
managedControlPlanes := 0
for _, node := range nodes {
node = strings.TrimSpace(node)
if node == "" {
continue
}
if o.sshManaged(node) {
managedControlPlanes++
}
}
requiredQuorum := o.cfg.Startup.TimeSyncQuorum
if requiredQuorum <= 0 {
requiredQuorum = managedControlPlanes
if requiredQuorum <= 0 {
requiredQuorum = 1
}
}
if requiredQuorum > managedControlPlanes && managedControlPlanes > 0 {
requiredQuorum = managedControlPlanes
}
deadline := time.Now().Add(wait)
for {
unsynced := []string{}
syncedControlPlanes := 0
checkedControlPlanes := 0
localOut, localErr := o.run(ctx, 10*time.Second, "sh", "-lc", "timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown")
localSynced := localErr == nil && isTimeSynced(localOut)
if !localSynced {
if localErr != nil {
unsynced = append(unsynced, fmt.Sprintf("local(%v)", localErr))
} else {
unsynced = append(unsynced, fmt.Sprintf("local(%s)", strings.TrimSpace(localOut)))
}
}
for _, node := range nodes {
node = strings.TrimSpace(node)
if node == "" {
continue
}
if !o.sshManaged(node) {
continue
}
checkedControlPlanes++
out, err := o.ssh(ctx, node, "timedatectl show -p NTPSynchronized --value 2>/dev/null || echo unknown")
if err != nil || !isTimeSynced(out) {
if err != nil {
unsynced = append(unsynced, fmt.Sprintf("%s(%v)", node, err))
} else {
unsynced = append(unsynced, fmt.Sprintf("%s(%s)", node, strings.TrimSpace(out)))
}
} else {
syncedControlPlanes++
}
}
ready := false
switch mode {
case "quorum":
if localSynced && syncedControlPlanes >= requiredQuorum {
ready = true
}
default:
if localSynced && len(unsynced) == 0 {
ready = true
}
}
if ready {
return nil
}
if time.Now().After(deadline) {
if mode == "quorum" {
return fmt.Errorf(
"startup blocked: time sync quorum not ready within %s (mode=quorum local_synced=%t synced_control_planes=%d required=%d checked=%d details=%s)",
wait,
localSynced,
syncedControlPlanes,
requiredQuorum,
checkedControlPlanes,
strings.Join(unsynced, ", "),
)
}
return fmt.Errorf("startup blocked: time sync not ready within %s (%s)", wait, strings.Join(unsynced, ", "))
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
func isTimeSynced(raw string) bool {
v := strings.ToLower(strings.TrimSpace(raw))
return v == "yes" || v == "true" || v == "1"
}
func (o *Orchestrator) preflightExternalDatastore(ctx context.Context) error {
if len(o.cfg.ControlPlanes) == 0 {
return nil
}
controlPlane := strings.TrimSpace(o.cfg.ControlPlanes[0])
if controlPlane == "" || !o.sshManaged(controlPlane) {
return nil
}
unitOut, err := o.ssh(ctx, controlPlane, "sudo systemctl cat k3s")
if err != nil {
o.log.Printf("warning: external datastore preflight skipped: unable to inspect %s k3s unit: %v", controlPlane, err)
return nil
}
datastoreEndpoint := parseDatastoreEndpoint(unitOut)
if datastoreEndpoint == "" {
return nil
}
u, err := neturl.Parse(datastoreEndpoint)
if err != nil || u.Host == "" {
o.log.Printf("warning: external datastore preflight skipped: unable to parse datastore endpoint %q", datastoreEndpoint)
return nil
}
host := strings.TrimSpace(u.Hostname())
port := strings.TrimSpace(u.Port())
if port == "" {
port = "5432"
}
address := net.JoinHostPort(host, port)
if o.tcpReachable(address, 3*time.Second) {
return nil
}
o.log.Printf("warning: datastore endpoint %s is unreachable; attempting software recovery", address)
if node := o.nodeNameForHost(host); node != "" && o.sshManaged(node) {
o.bestEffort("restart datastore service on "+node, func() error {
_, err := o.ssh(ctx, node, "sudo systemctl restart postgresql || sudo systemctl restart postgresql@16-main || sudo systemctl restart postgres")
return err
})
}
deadline := time.Now().Add(90 * time.Second)
for time.Now().Before(deadline) {
if o.tcpReachable(address, 3*time.Second) {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(3 * time.Second):
}
}
return fmt.Errorf("startup blocked: external datastore endpoint %s remained unreachable after recovery attempt", address)
}
func parseDatastoreEndpoint(unitText string) string {
if match := datastoreEndpointPattern.FindStringSubmatch(unitText); len(match) == 4 {
for _, candidate := range match[1:] {
value := strings.TrimSpace(candidate)
if value != "" {
return value
}
}
}
for _, raw := range strings.Split(unitText, "\n") {
line := strings.TrimSpace(raw)
idx := strings.Index(line, "--datastore-endpoint")
if idx < 0 {
continue
}
value := strings.TrimSpace(line[idx+len("--datastore-endpoint"):])
value = strings.TrimSpace(strings.TrimPrefix(value, "="))
value = strings.TrimSuffix(strings.TrimSpace(value), "\\")
value = strings.Trim(value, `"'`)
if value != "" {
return value
}
}
return ""
}
func (o *Orchestrator) nodeNameForHost(host string) string {
host = strings.TrimSpace(host)
if host == "" {
return ""
}
if _, ok := o.cfg.SSHNodeHosts[host]; ok {
return host
}
for node, mapped := range o.cfg.SSHNodeHosts {
if strings.TrimSpace(mapped) == host {
return strings.TrimSpace(node)
}
}
return ""
}
func (o *Orchestrator) tcpReachable(address string, timeout time.Duration) bool {
conn, err := net.DialTimeout("tcp", address, timeout)
if err != nil {
return false
}
_ = conn.Close()
return true
}
func (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) error {
if len(nodes) == 0 {
return nil
}
parallelism := o.cfg.Shutdown.SSHParallelism
if parallelism <= 0 {
parallelism = 8
}
if parallelism > len(nodes) {
parallelism = len(nodes)
}
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
errCh := make(chan error, len(nodes))
for _, node := range nodes {
node := strings.TrimSpace(node)
if node == "" || !o.sshManaged(node) {
continue
}
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if _, err := o.ssh(ctx, node, "sudo -n /usr/bin/systemctl --version"); err != nil {
errCh <- fmt.Errorf("%s: missing sudo access to /usr/bin/systemctl (--version): %w", node, err)
}
}()
}
wg.Wait()
close(errCh)
if len(errCh) == 0 {
return nil
}
samples := []string{}
for err := range errCh {
samples = append(samples, err.Error())
if len(samples) >= 4 {
break
}
}
return fmt.Errorf("access validation had %d errors (first: %s)", len(errCh), strings.Join(samples, " | "))
}
func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) {
out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}")
if err != nil {
return false, err
}
return strings.Contains(out, "True"), nil
}
func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) {
urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}")
if urlErr == nil {
o.log.Printf("flux-source-url=%s", strings.TrimSpace(urlOut))
}
branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}")
if branchErr == nil {
branch := strings.TrimSpace(branchOut)
o.log.Printf("flux-source-branch=%s", branch)
if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch {
o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch)
}
}
}
func (o *Orchestrator) ensureFluxBranch(ctx context.Context, branch string) error {
branch = strings.TrimSpace(branch)
if branch == "" {
return nil
}
out, err := o.kubectl(
ctx,
10*time.Second,
"-n", "flux-system",
"get", "gitrepository", "flux-system",
"-o", "jsonpath={.spec.ref.branch}",
)
if err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: flux gitrepository/flux-system not found while ensuring branch=%s", branch)
return nil
}
return fmt.Errorf("read flux source branch: %w", err)
}
current := strings.TrimSpace(out)
if current == branch {
return nil
}
patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, branch)
if _, err := o.kubectl(
ctx,
20*time.Second,
"-n", "flux-system",
"patch", "gitrepository", "flux-system",
"--type=merge",
"-p", patch,
); err != nil {
return fmt.Errorf("set flux source branch %q (current %q): %w", branch, current, err)
}
o.log.Printf("updated flux source branch from %q to %q", current, branch)
return nil
}
func (o *Orchestrator) bootstrapLocal(ctx context.Context) error {
failures := 0
successes := 0
for _, rel := range o.cfg.LocalBootstrapPaths {
full := filepath.Join(o.cfg.IACRepoPath, rel)
o.log.Printf("local bootstrap apply rel=%s path=%s", rel, full)
if o.runner.DryRun {
successes++
continue
}
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil {
o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err)
o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full)
if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil {
o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr)
o.log.Printf("local bootstrap cache apply for rel=%s", rel)
if cacheErr := o.applyBootstrapCache(ctx, rel); cacheErr != nil {
failures++
o.log.Printf("warning: local bootstrap cache apply failed for rel=%s: %v", rel, cacheErr)
continue
}
}
}
successes++
}
if failures > 0 && successes == 0 {
return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures)
}
return nil
}
func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error {
cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full)
if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil {
return err
}
return nil
}
func (o *Orchestrator) syncLocalIACRepo(ctx context.Context) error {
repo := strings.TrimSpace(o.cfg.IACRepoPath)
if repo == "" {
return fmt.Errorf("iac repo path is empty")
}
gitDir := filepath.Join(repo, ".git")
if stat, err := os.Stat(gitDir); err != nil || stat.IsDir() == false {
return fmt.Errorf("iac repo %s is not a git checkout", repo)
}
statusOut, statusErr := o.runSensitive(ctx, 10*time.Second, "git", "-C", repo, "status", "--porcelain")
if statusErr != nil {
return fmt.Errorf("inspect iac repo working tree: %w", statusErr)
}
if strings.TrimSpace(statusOut) != "" {
o.log.Printf("warning: skipping local titan-iac sync because working tree is dirty")
return nil
}
branch := strings.TrimSpace(o.cfg.ExpectedFluxBranch)
if branch == "" {
branch = "main"
}
if _, err := o.runSensitive(ctx, 45*time.Second, "git", "-C", repo, "fetch", "origin", "--prune"); err != nil {
return fmt.Errorf("git fetch origin: %w", err)
}
if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "checkout", branch); err != nil {
return fmt.Errorf("git checkout %s: %w", branch, err)
}
if _, err := o.runSensitive(ctx, 20*time.Second, "git", "-C", repo, "reset", "--hard", "origin/"+branch); err != nil {
return fmt.Errorf("git reset --hard origin/%s: %w", branch, err)
}
return nil
}
func (o *Orchestrator) refreshBootstrapCache(ctx context.Context) error {
if len(o.cfg.LocalBootstrapPaths) == 0 {
return nil
}
if err := os.MkdirAll(o.bootstrapCacheDir(), 0o755); err != nil {
return fmt.Errorf("ensure bootstrap cache dir: %w", err)
}
rendered := 0
for _, rel := range o.cfg.LocalBootstrapPaths {
rel = strings.TrimSpace(rel)
if rel == "" {
continue
}
full := filepath.Join(o.cfg.IACRepoPath, rel)
if stat, err := os.Stat(full); err != nil || !stat.IsDir() {
o.log.Printf("warning: skip bootstrap cache render for rel=%s (path missing)", rel)
continue
}
cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q", full)
manifest, err := o.runSensitive(ctx, 2*time.Minute, "sh", "-lc", cmd)
if err != nil {
o.log.Printf("warning: bootstrap cache render failed for rel=%s: %v", rel, err)
continue
}
cachePath := o.bootstrapCachePath(rel)
if err := os.WriteFile(cachePath, []byte(manifest+"\n"), 0o644); err != nil {
o.log.Printf("warning: bootstrap cache write failed for rel=%s path=%s: %v", rel, cachePath, err)
continue
}
rendered++
}
if rendered == 0 {
return fmt.Errorf("no bootstrap cache manifests rendered")
}
o.log.Printf("bootstrap cache refreshed (%d paths)", rendered)
return nil
}
func (o *Orchestrator) applyBootstrapCache(ctx context.Context, rel string) error {
cachePath := o.bootstrapCachePath(rel)
if _, err := os.Stat(cachePath); err != nil {
return fmt.Errorf("bootstrap cache missing at %s: %w", cachePath, err)
}
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-f", cachePath); err != nil {
return err
}
return nil
}
func (o *Orchestrator) bootstrapCacheDir() string {
return filepath.Join(o.cfg.State.Dir, "bootstrap-cache")
}
func (o *Orchestrator) bootstrapCachePath(rel string) string {
safe := strings.TrimSpace(rel)
safe = strings.ReplaceAll(safe, "/", "__")
safe = strings.ReplaceAll(safe, string(os.PathSeparator), "__")
return filepath.Join(o.bootstrapCacheDir(), safe+".yaml")
}
func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) {
if o.runner.DryRun {
return true, nil
}
deadline := time.Now().Add(window)
for {
ready, err := o.fluxSourceReady(ctx)
if err != nil {
return false, err
}
if ready {
return true, nil
}
if time.Now().After(deadline) {
return false, nil
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(5 * time.Second):
}
}
}
func (o *Orchestrator) waitForStorageReady(ctx context.Context) error {
if o.runner.DryRun {
return nil
}
wait := time.Duration(o.cfg.Startup.StorageReadyWaitSeconds) * time.Second
if wait <= 0 {
wait = 420 * time.Second
}
poll := time.Duration(o.cfg.Startup.StorageReadyPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
deadline := time.Now().Add(wait)
lastReason := "unknown"
for {
ok, reason, err := o.storageReady(ctx)
if err != nil {
lastReason = err.Error()
} else {
lastReason = reason
}
if ok {
o.log.Printf("storage readiness check passed (%s)", reason)
return nil
}
if time.Now().After(deadline) {
return fmt.Errorf("startup blocked: storage readiness not satisfied within %s (%s)", wait, lastReason)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
func (o *Orchestrator) storageReady(ctx context.Context) (bool, string, error) {
minReady := o.cfg.Startup.StorageMinReadyNodes
if minReady <= 0 {
minReady = 2
}
longhornOut, err := o.kubectl(
ctx,
15*time.Second,
"-n",
"longhorn-system",
"get",
"nodes.longhorn.io",
"-o",
`jsonpath={range .items[*]}{.metadata.name}{":"}{.status.conditions[?(@.type=="Ready")].status}{":"}{.status.conditions[?(@.type=="Schedulable")].status}{"\n"}{end}`,
)
if err != nil {
return false, "", fmt.Errorf("query longhorn nodes: %w", err)
}
readyNodes := 0
for _, line := range lines(longhornOut) {
parts := strings.Split(line, ":")
if len(parts) < 3 {
continue
}
ready := strings.EqualFold(strings.TrimSpace(parts[1]), "true")
sched := strings.EqualFold(strings.TrimSpace(parts[2]), "true")
if ready && sched {
readyNodes++
}
}
if readyNodes < minReady {
return false, fmt.Sprintf("longhorn ready+sched nodes %d/%d", readyNodes, minReady), nil
}
for _, item := range o.cfg.Startup.StorageCriticalPVCs {
item = strings.TrimSpace(item)
if item == "" {
continue
}
parts := strings.SplitN(item, "/", 2)
if len(parts) != 2 {
return false, "", fmt.Errorf("invalid storage_critical_pvcs entry %q", item)
}
ns := strings.TrimSpace(parts[0])
name := strings.TrimSpace(parts[1])
out, pvcErr := o.kubectl(ctx, 15*time.Second, "-n", ns, "get", "pvc", name, "-o", "jsonpath={.status.phase}")
if pvcErr != nil {
if isNotFoundErr(pvcErr) {
return false, fmt.Sprintf("pvc %s/%s not found", ns, name), nil
}
return false, "", fmt.Errorf("query pvc %s/%s: %w", ns, name, pvcErr)
}
if !strings.EqualFold(strings.TrimSpace(out), "Bound") {
return false, fmt.Sprintf("pvc %s/%s phase=%s", ns, name, strings.TrimSpace(out)), nil
}
}
return true, fmt.Sprintf("longhorn ready+sched nodes=%d critical pvcs bound=%d", readyNodes, len(o.cfg.Startup.StorageCriticalPVCs)), nil
}
func (o *Orchestrator) waitForPostStartProbes(ctx context.Context) error {
if o.runner.DryRun {
return nil
}
wait := time.Duration(o.cfg.Startup.PostStartProbeWaitSeconds) * time.Second
if wait <= 0 {
wait = 240 * time.Second
}
poll := time.Duration(o.cfg.Startup.PostStartProbePollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
deadline := time.Now().Add(wait)
lastFailure := "unknown"
for {
ok, failure := o.postStartProbesReady(ctx)
if ok {
o.log.Printf("post-start probes passed")
return nil
}
lastFailure = failure
if time.Now().After(deadline) {
return fmt.Errorf("startup blocked: post-start probes did not pass within %s (%s)", wait, lastFailure)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
func (o *Orchestrator) postStartProbesReady(ctx context.Context) (bool, string) {
probes := make([]string, 0, len(o.cfg.Startup.PostStartProbes))
for _, p := range o.cfg.Startup.PostStartProbes {
p = strings.TrimSpace(p)
if p != "" {
probes = append(probes, p)
}
}
if len(probes) == 0 {
return true, "no probes configured"
}
for _, probe := range probes {
code, err := o.httpProbe(ctx, probe)
if err != nil {
return false, fmt.Sprintf("%s: %v", probe, err)
}
if code < 200 || code >= 400 {
return false, fmt.Sprintf("%s: unexpected status code=%d", probe, code)
}
}
return true, "all probes successful"
}
func (o *Orchestrator) httpProbe(ctx context.Context, probeURL string) (int, error) {
out, err := o.run(
ctx,
20*time.Second,
"curl",
"--silent",
"--show-error",
"--location",
"--max-time",
"12",
"--output",
"/dev/null",
"--write-out",
"%{http_code}",
probeURL,
)
if err != nil {
return 0, err
}
code, convErr := strconv.Atoi(strings.TrimSpace(out))
if convErr != nil {
return 0, fmt.Errorf("parse http status %q: %w", strings.TrimSpace(out), convErr)
}
return code, nil
}
func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error {
if err := o.patchFluxSuspendAll(ctx, false); err != nil {
return err
}
now := time.Now().UTC().Format(time.RFC3339)
if _, err := o.kubectl(
ctx,
25*time.Second,
"-n", "flux-system",
"annotate",
"kustomizations.kustomize.toolkit.fluxcd.io",
"--all",
"reconcile.fluxcd.io/requestedAt="+now,
"--overwrite",
); err != nil {
o.log.Printf("warning: annotate kustomizations for reconcile failed: %v", err)
}
if _, err := o.kubectl(
ctx,
25*time.Second,
"annotate",
"--all-namespaces",
"helmreleases.helm.toolkit.fluxcd.io",
"--all",
"reconcile.fluxcd.io/requestedAt="+now,
"--overwrite",
); err != nil {
o.log.Printf("warning: annotate helmreleases for reconcile failed: %v", err)
}
if o.runner.CommandExists("flux") {
sourceCmd := []string{"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=60s"}
if _, err := o.run(ctx, 75*time.Second, "flux", sourceCmd...); err != nil {
o.log.Printf("warning: flux command failed (%s): %v", strings.Join(sourceCmd, " "), err)
}
}
return nil
}
func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) {
return o.run(ctx, timeout, "kubectl", args...)
}
func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) {
host := node
if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" {
host = strings.TrimSpace(mapped)
}
sshUser := o.cfg.SSHUser
if override, ok := o.cfg.SSHNodeUsers[node]; ok && strings.TrimSpace(override) != "" {
sshUser = strings.TrimSpace(override)
}
target := host
if sshUser != "" {
target = sshUser + "@" + host
}
sshConfigFile := o.resolveSSHConfigFile()
sshIdentity := o.resolveSSHIdentityFile()
baseArgs := []string{
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=8",
"-o", "StrictHostKeyChecking=accept-new",
}
if sshConfigFile != "" {
baseArgs = append(baseArgs, "-F", sshConfigFile)
}
if sshIdentity != "" {
baseArgs = append(baseArgs, "-i", sshIdentity)
}
if o.cfg.SSHPort > 0 {
baseArgs = append(baseArgs, "-p", strconv.Itoa(o.cfg.SSHPort))
}
attempts := make([][]string, 0, 2)
attemptNames := make([]string, 0, 2)
knownHostsFiles := sshutil.KnownHostsFiles(sshConfigFile, sshIdentity)
repairHosts := []string{node, host}
if o.cfg.SSHJumpHost != "" {
jump := o.cfg.SSHJumpHost
repairHosts = append(repairHosts, jump)
if mapped, ok := o.cfg.SSHNodeHosts[jump]; ok && strings.TrimSpace(mapped) != "" {
repairHosts = append(repairHosts, strings.TrimSpace(mapped))
}
if o.cfg.SSHJumpUser != "" {
jump = o.cfg.SSHJumpUser + "@" + jump
}
if o.cfg.SSHPort > 0 && !strings.Contains(jump, ":") {
jump = fmt.Sprintf("%s:%d", jump, o.cfg.SSHPort)
}
withJump := append([]string{}, baseArgs...)
withJump = append(withJump, "-J", jump, target, command)
attempts = append(attempts, withJump)
attemptNames = append(attemptNames, "jump")
}
direct := append([]string{}, baseArgs...)
direct = append(direct, target, command)
attempts = append(attempts, direct)
attemptNames = append(attemptNames, "direct")
var lastOut string
var lastErr error
for i, args := range attempts {
out, err := o.run(ctx, 45*time.Second, "ssh", args...)
if err == nil {
if i > 0 {
o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i])
}
return out, nil
}
if sshutil.ShouldAttemptKnownHostsRepair(out, err) {
o.log.Printf("warning: ssh failure on %s via %s path may be host-key related; repairing known_hosts and retrying once", node, attemptNames[i])
sshutil.RepairKnownHosts(ctx, o.log, knownHostsFiles, repairHosts, o.cfg.SSHPort)
retryOut, retryErr := o.run(ctx, 45*time.Second, "ssh", args...)
if retryErr == nil {
return retryOut, nil
}
out = retryOut
err = retryErr
}
lastOut = out
lastErr = err
if i < len(attempts)-1 {
o.log.Printf("warning: ssh %s path failed for %s: %v; trying %s path", attemptNames[i], node, err, attemptNames[i+1])
}
}
return lastOut, lastErr
}
func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return o.runner.Run(runCtx, name, args...)
}
func (o *Orchestrator) runSensitive(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
runCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
cmd := exec.CommandContext(runCtx, name, args...)
cmd.Env = os.Environ()
if o.runner.Kubeconfig != "" {
cmd.Env = append(cmd.Env, "KUBECONFIG="+o.runner.Kubeconfig)
}
out, err := cmd.CombinedOutput()
trimmed := strings.TrimSpace(string(out))
if err != nil {
if trimmed == "" {
return "", fmt.Errorf("%s failed: %w", name, err)
}
return trimmed, fmt.Errorf("%s failed: %w", name, err)
}
return trimmed, nil
}
func lines(in string) []string {
in = strings.TrimSpace(in)
if in == "" {
return nil
}
parts := strings.Split(in, "\n")
out := make([]string, 0, len(parts))
for _, p := range parts {
v := strings.TrimSpace(p)
if v != "" {
out = append(out, v)
}
}
return out
}
func (o *Orchestrator) sshManaged(node string) bool {
if len(o.cfg.SSHManagedNodes) == 0 {
return true
}
for _, allowed := range o.cfg.SSHManagedNodes {
if strings.TrimSpace(allowed) == node {
return true
}
}
return false
}
func (o *Orchestrator) resolveSSHConfigFile() string {
if strings.TrimSpace(o.cfg.SSHConfigFile) != "" {
return strings.TrimSpace(o.cfg.SSHConfigFile)
}
candidates := []string{
"/home/atlas/.ssh/config",
"/home/tethys/.ssh/config",
}
for _, p := range candidates {
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
return p
}
}
return ""
}
func (o *Orchestrator) resolveSSHIdentityFile() string {
if strings.TrimSpace(o.cfg.SSHIdentityFile) != "" {
return strings.TrimSpace(o.cfg.SSHIdentityFile)
}
candidates := []string{
"/home/atlas/.ssh/id_ed25519",
"/home/tethys/.ssh/id_ed25519",
}
for _, p := range candidates {
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
return p
}
}
return ""
}
func (o *Orchestrator) bestEffort(name string, fn func() error) {
if err := fn(); err != nil {
o.log.Printf("warning: %s: %v", name, err)
}
}
func (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error) {
missing := []string{}
for _, w := range criticalStartupWorkloads {
ready, err := o.workloadReady(ctx, w)
if err != nil {
if isNotFoundErr(err) {
missing = append(missing, fmt.Sprintf("%s/%s/%s(not found)", w.Namespace, w.Kind, w.Name))
continue
}
return nil, fmt.Errorf("check %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
}
if !ready {
missing = append(missing, fmt.Sprintf("%s/%s/%s", w.Namespace, w.Kind, w.Name))
}
}
return missing, nil
}
func (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error {
for _, w := range criticalStartupWorkloads {
if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: startup workload missing, skipping scale: %s/%s/%s", w.Namespace, w.Kind, w.Name)
continue
}
return fmt.Errorf("scale %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
}
if err := o.waitWorkloadReady(ctx, w); err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: startup workload missing during readiness wait: %s/%s/%s", w.Namespace, w.Kind, w.Name)
continue
}
return err
}
}
return nil
}
func (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error {
_, err := o.kubectl(
ctx,
45*time.Second,
"-n",
w.Namespace,
"scale",
w.Kind,
w.Name,
fmt.Sprintf("--replicas=%d", replicas),
)
return err
}
func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error {
if w.Namespace == "vault" && w.Kind == "statefulset" && w.Name == "vault" {
return o.waitVaultReady(ctx, w)
}
timeout := "240s"
if w.Kind == "statefulset" {
timeout = "360s"
}
_, err := o.kubectl(
ctx,
7*time.Minute,
"-n",
w.Namespace,
"rollout",
"status",
fmt.Sprintf("%s/%s", w.Kind, w.Name),
"--timeout="+timeout,
)
if err != nil {
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
}
return nil
}
func (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error {
if o.runner.DryRun {
return nil
}
deadline := time.Now().Add(7 * time.Minute)
var lastErr error
for time.Now().Before(deadline) {
ready, err := o.workloadReady(ctx, w)
if err == nil && ready {
return nil
}
if err != nil {
lastErr = err
}
if err := o.ensureVaultUnsealed(ctx); err != nil {
lastErr = err
o.log.Printf("warning: vault readiness still blocked: %v", err)
}
ready, err = o.workloadReady(ctx, w)
if err == nil && ready {
return nil
}
if err != nil {
lastErr = err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
}
}
if lastErr != nil {
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr)
}
return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name)
}
func (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error {
if o.runner.DryRun {
return nil
}
phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}")
if err != nil {
return fmt.Errorf("vault pod phase check failed: %w", err)
}
if strings.TrimSpace(phase) != "Running" {
return fmt.Errorf("vault-0 pod phase is %q", strings.TrimSpace(phase))
}
sealed, err := o.vaultSealed(ctx)
if err != nil {
return err
}
if !sealed {
return nil
}
unsealKey, err := o.vaultUnsealKey(ctx)
if err != nil {
return err
}
for attempt := 1; attempt <= 5; attempt++ {
o.log.Printf("vault is sealed; attempting auto-unseal (%d/5)", attempt)
if _, err := o.runSensitive(
ctx,
30*time.Second,
"kubectl",
"-n", "vault",
"exec", "vault-0", "--",
"vault", "operator", "unseal", unsealKey,
); err != nil {
return fmt.Errorf("vault unseal attempt %d failed: %w", attempt, err)
}
sealed, err = o.vaultSealed(ctx)
if err != nil {
return err
}
if !sealed {
o.log.Printf("vault auto-unseal succeeded")
return nil
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("vault remained sealed after 5 auto-unseal attempts")
}
func (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error) {
out, err := o.kubectl(
ctx,
25*time.Second,
"-n", "vault",
"exec", "vault-0", "--",
"sh", "-lc",
"VAULT_ADDR=http://127.0.0.1:8200 vault status -format=json 2>/dev/null || true",
)
if err != nil {
return false, fmt.Errorf("vault status check failed: %w", err)
}
sealed, err := parseVaultSealed(out)
if err != nil {
return false, fmt.Errorf("parse vault status: %w", err)
}
return sealed, nil
}
func parseVaultSealed(raw string) (bool, error) {
trimmed := strings.TrimSpace(raw)
if trimmed == "" {
return false, fmt.Errorf("empty vault status output")
}
start := strings.Index(trimmed, "{")
end := strings.LastIndex(trimmed, "}")
if start < 0 || end < 0 || end < start {
return false, fmt.Errorf("vault status payload missing JSON object")
}
payload := trimmed[start : end+1]
type vaultStatus struct {
Sealed bool `json:"sealed"`
}
var st vaultStatus
if err := json.Unmarshal([]byte(payload), &st); err != nil {
return false, err
}
return st.Sealed, nil
}
func (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error) {
out, err := o.kubectl(
ctx,
15*time.Second,
"-n", "vault",
"get", "secret", "vault-init",
"-o", "jsonpath={.data.unseal_key_b64}",
)
if err == nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(strings.TrimSpace(out))
if decodeErr == nil {
key := strings.TrimSpace(string(decoded))
if key != "" {
o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(key) })
return key, nil
}
err = fmt.Errorf("vault-init unseal key is empty")
} else {
err = fmt.Errorf("decode vault-init unseal_key_b64: %w", decodeErr)
}
} else {
err = fmt.Errorf("read vault-init secret: %w", err)
}
fallbackKey, fileErr := o.readVaultUnsealKeyFile()
if fileErr == nil {
o.log.Printf("warning: using cached vault unseal key from %s", o.cfg.Startup.VaultUnsealKeyFile)
return fallbackKey, nil
}
breakglassKey, breakglassErr := o.readVaultUnsealKeyBreakglass(ctx)
if breakglassErr == nil {
o.log.Printf("warning: using break-glass vault unseal key command fallback")
o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(breakglassKey) })
return breakglassKey, nil
}
return "", fmt.Errorf("%v; fallback %v; break-glass %v", err, fileErr, breakglassErr)
}
func (o *Orchestrator) readVaultUnsealKeyBreakglass(ctx context.Context) (string, error) {
cmd := strings.TrimSpace(o.cfg.Startup.VaultUnsealBreakglassCommand)
if cmd == "" {
return "", fmt.Errorf("break-glass command not configured")
}
timeout := time.Duration(o.cfg.Startup.VaultUnsealBreakglassTimeout) * time.Second
if timeout <= 0 {
timeout = 15 * time.Second
}
out, err := o.runSensitive(ctx, timeout, "sh", "-lc", cmd)
if err != nil {
return "", fmt.Errorf("run break-glass command: %w", err)
}
key := strings.TrimSpace(out)
if key == "" {
return "", fmt.Errorf("break-glass command returned empty output")
}
return key, nil
}
func (o *Orchestrator) writeVaultUnsealKeyFile(key string) error {
path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile)
if path == "" {
return fmt.Errorf("vault unseal key file path is empty")
}
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
return fmt.Errorf("ensure vault unseal key dir: %w", err)
}
if err := os.WriteFile(path, []byte(strings.TrimSpace(key)+"\n"), 0o600); err != nil {
return fmt.Errorf("write vault unseal key file: %w", err)
}
return nil
}
func (o *Orchestrator) readVaultUnsealKeyFile() (string, error) {
path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile)
if path == "" {
return "", fmt.Errorf("vault unseal key file path is empty")
}
b, err := os.ReadFile(path)
if err != nil {
return "", fmt.Errorf("read vault unseal key file %s: %w", path, err)
}
key := strings.TrimSpace(string(b))
if key == "" {
return "", fmt.Errorf("vault unseal key file %s is empty", path)
}
return key, nil
}
func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) {
out, err := o.kubectl(
ctx,
20*time.Second,
"-n",
w.Namespace,
"get",
w.Kind,
w.Name,
"-o",
"jsonpath={.status.readyReplicas}",
)
if err != nil {
return false, err
}
raw := strings.TrimSpace(out)
if raw == "" || raw == "<no value>" {
return false, nil
}
n, err := strconv.Atoi(raw)
if err != nil {
return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err)
}
return n >= 1, nil
}
func isNotFoundErr(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)")
}
func (o *Orchestrator) poweroffHosts(ctx context.Context, workers []string) error {
delay := o.cfg.Shutdown.PoweroffDelaySeconds
if delay <= 0 {
delay = 25
}
localNames := map[string]struct{}{}
if hn, err := os.Hostname(); err == nil && strings.TrimSpace(hn) != "" {
localNames[strings.TrimSpace(hn)] = struct{}{}
}
if o.cfg.SSHUser != "" {
localNames[o.cfg.SSHUser] = struct{}{}
}
hostSet := map[string]struct{}{}
for _, n := range o.cfg.ControlPlanes {
hostSet[n] = struct{}{}
}
for _, n := range workers {
hostSet[n] = struct{}{}
}
for _, n := range o.cfg.Shutdown.ExtraPoweroffHosts {
if strings.TrimSpace(n) != "" {
hostSet[strings.TrimSpace(n)] = struct{}{}
}
}
hosts := make([]string, 0, len(hostSet))
for h := range hostSet {
hosts = append(hosts, h)
}
sort.Strings(hosts)
remoteCmd := fmt.Sprintf(`sudo nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &`, delay)
for _, host := range hosts {
host = strings.TrimSpace(host)
if host == "" {
continue
}
if _, isLocal := localNames[host]; isLocal {
continue
}
o.bestEffort("schedule poweroff on "+host, func() error {
_, err := o.ssh(ctx, host, remoteCmd)
return err
})
}
if o.cfg.Shutdown.PoweroffLocalHost {
o.bestEffort("schedule local host poweroff", func() error {
_, err := o.run(ctx, 5*time.Second, "sh", "-c", fmt.Sprintf("nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &", delay+10))
return err
})
}
return nil
}