1356 lines
37 KiB
Go
1356 lines
37 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/hecate/internal/config"
|
|
"scm.bstein.dev/bstein/hecate/internal/execx"
|
|
"scm.bstein.dev/bstein/hecate/internal/state"
|
|
)
|
|
|
|
type Orchestrator struct {
|
|
cfg config.Config
|
|
runner *execx.Runner
|
|
store *state.Store
|
|
log *log.Logger
|
|
}
|
|
|
|
type StartupOptions struct {
|
|
ForceFluxBranch string
|
|
SkipLocalBootstrap bool
|
|
Reason string
|
|
}
|
|
|
|
type ShutdownOptions struct {
|
|
SkipEtcdSnapshot bool
|
|
SkipDrain bool
|
|
Reason string
|
|
}
|
|
|
|
type startupWorkload struct {
|
|
Namespace string
|
|
Kind string
|
|
Name string
|
|
}
|
|
|
|
type workloadScaleEntry struct {
|
|
Namespace string `json:"namespace"`
|
|
Kind string `json:"kind"`
|
|
Name string `json:"name"`
|
|
Replicas int `json:"replicas"`
|
|
}
|
|
|
|
type workloadScaleSnapshot struct {
|
|
GeneratedAt time.Time `json:"generated_at"`
|
|
Entries []workloadScaleEntry `json:"entries"`
|
|
}
|
|
|
|
var criticalStartupWorkloads = []startupWorkload{
|
|
{Namespace: "flux-system", Kind: "deployment", Name: "source-controller"},
|
|
{Namespace: "flux-system", Kind: "deployment", Name: "kustomize-controller"},
|
|
{Namespace: "flux-system", Kind: "deployment", Name: "helm-controller"},
|
|
{Namespace: "flux-system", Kind: "deployment", Name: "notification-controller"},
|
|
{Namespace: "harbor", Kind: "statefulset", Name: "harbor-redis"},
|
|
{Namespace: "harbor", Kind: "deployment", Name: "harbor-registry"},
|
|
{Namespace: "vault", Kind: "statefulset", Name: "vault"},
|
|
{Namespace: "postgres", Kind: "statefulset", Name: "postgres"},
|
|
{Namespace: "gitea", Kind: "deployment", Name: "gitea"},
|
|
}
|
|
|
|
func New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator {
|
|
return &Orchestrator{cfg: cfg, runner: runner, store: store, log: logger}
|
|
}
|
|
|
|
func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err error) {
|
|
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer unlock()
|
|
|
|
record := state.RunRecord{
|
|
ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()),
|
|
Action: "startup",
|
|
Reason: opts.Reason,
|
|
StartedAt: time.Now().UTC(),
|
|
}
|
|
defer o.finalizeRecord(&record, &err)
|
|
|
|
if !o.runner.DryRun {
|
|
currentIntent, readErr := state.ReadIntent(o.cfg.State.IntentPath)
|
|
if readErr != nil {
|
|
return fmt.Errorf("read startup intent: %w", readErr)
|
|
}
|
|
if currentIntent.State == state.IntentShuttingDown {
|
|
return fmt.Errorf("startup blocked: shutdown intent is active (%s)", currentIntent.Reason)
|
|
}
|
|
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentStartupInProgress, opts.Reason, "startup"); writeErr != nil {
|
|
return fmt.Errorf("set startup intent: %w", writeErr)
|
|
}
|
|
defer func() {
|
|
finalReason := opts.Reason
|
|
if err != nil {
|
|
finalReason = fmt.Sprintf("%s (failed)", strings.TrimSpace(opts.Reason))
|
|
}
|
|
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentNormal, finalReason, "startup"); writeErr != nil {
|
|
o.log.Printf("warning: write startup completion intent failed: %v", writeErr)
|
|
}
|
|
}()
|
|
}
|
|
|
|
o.log.Printf("startup control-planes=%s", strings.Join(o.cfg.ControlPlanes, ","))
|
|
|
|
o.reportFluxSource(ctx, opts.ForceFluxBranch)
|
|
o.startControlPlanes(ctx, o.cfg.ControlPlanes)
|
|
|
|
apiPoll := time.Duration(o.cfg.Startup.APIPollSeconds) * time.Second
|
|
apiAttempts := o.cfg.Startup.APIWaitSeconds / o.cfg.Startup.APIPollSeconds
|
|
if apiAttempts < 1 {
|
|
apiAttempts = 1
|
|
}
|
|
if err := o.waitForAPI(ctx, apiAttempts, apiPoll); err != nil {
|
|
return err
|
|
}
|
|
|
|
workers, err := o.effectiveWorkers(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.log.Printf("startup workers=%s", strings.Join(workers, ","))
|
|
o.startWorkers(ctx, workers)
|
|
o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) })
|
|
|
|
if opts.ForceFluxBranch != "" {
|
|
patch := fmt.Sprintf(`{"spec":{"ref":{"branch":"%s"}}}`, opts.ForceFluxBranch)
|
|
if _, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "gitrepository", "flux-system", "--type=merge", "-p", patch); err != nil {
|
|
return fmt.Errorf("force flux branch: %w", err)
|
|
}
|
|
}
|
|
|
|
needsLocalBootstrap := false
|
|
bootstrapReasons := []string{}
|
|
if !opts.SkipLocalBootstrap {
|
|
ready, readyErr := o.fluxSourceReady(ctx)
|
|
if readyErr != nil {
|
|
o.log.Printf("warning: unable to read flux source readiness: %v", readyErr)
|
|
needsLocalBootstrap = true
|
|
bootstrapReasons = append(bootstrapReasons, "flux source readiness check failed")
|
|
}
|
|
if !ready {
|
|
needsLocalBootstrap = true
|
|
bootstrapReasons = append(bootstrapReasons, "flux source not ready")
|
|
}
|
|
}
|
|
|
|
missing, missingErr := o.missingCriticalStartupWorkloads(ctx)
|
|
if missingErr != nil {
|
|
o.log.Printf("warning: unable to inspect critical startup workloads: %v", missingErr)
|
|
}
|
|
if len(missing) > 0 {
|
|
o.log.Printf("startup critical workloads not ready; applying targeted recovery first: %s", strings.Join(missing, ", "))
|
|
}
|
|
if err := o.ensureCriticalStartupWorkloads(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if !opts.SkipLocalBootstrap && needsLocalBootstrap {
|
|
if ready, err := o.waitForFluxSourceReady(ctx, 5*time.Minute); err != nil {
|
|
o.log.Printf("warning: flux source readiness wait failed before local bootstrap: %v", err)
|
|
} else if ready {
|
|
o.log.Printf("flux source became ready after targeted recovery; skipping local bootstrap")
|
|
needsLocalBootstrap = false
|
|
}
|
|
}
|
|
|
|
if !opts.SkipLocalBootstrap && needsLocalBootstrap {
|
|
o.log.Printf("startup bootstrap required after wait: %s", strings.Join(bootstrapReasons, "; "))
|
|
if err := o.bootstrapLocal(ctx); err != nil {
|
|
return err
|
|
}
|
|
if err := o.ensureCriticalStartupWorkloads(ctx); err != nil {
|
|
return err
|
|
}
|
|
ready, err := o.fluxSourceReady(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("flux source readiness after bootstrap: %w", err)
|
|
}
|
|
if !ready {
|
|
return fmt.Errorf("flux source still not ready after local bootstrap")
|
|
}
|
|
}
|
|
|
|
o.bestEffort("restore scaled workloads", func() error { return o.restoreScaledApps(ctx) })
|
|
|
|
if err := o.resumeFluxAndReconcile(ctx); err != nil {
|
|
return err
|
|
}
|
|
o.log.Printf("startup flow complete")
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err error) {
|
|
unlock, err := state.AcquireLock(o.cfg.State.LockPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer unlock()
|
|
|
|
record := state.RunRecord{
|
|
ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()),
|
|
Action: "shutdown",
|
|
Reason: opts.Reason,
|
|
StartedAt: time.Now().UTC(),
|
|
}
|
|
defer o.finalizeRecord(&record, &err)
|
|
if !o.runner.DryRun {
|
|
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, state.IntentShuttingDown, opts.Reason, "shutdown"); writeErr != nil {
|
|
return fmt.Errorf("set shutdown intent: %w", writeErr)
|
|
}
|
|
defer func() {
|
|
final := state.IntentShuttingDown
|
|
if err == nil {
|
|
final = state.IntentShutdownComplete
|
|
}
|
|
if writeErr := state.MustWriteIntent(o.cfg.State.IntentPath, final, opts.Reason, "shutdown"); writeErr != nil {
|
|
o.log.Printf("warning: write shutdown completion intent failed: %v", writeErr)
|
|
}
|
|
}()
|
|
}
|
|
|
|
workers, err := o.effectiveWorkers(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.log.Printf("shutdown control-planes=%s workers=%s", strings.Join(o.cfg.ControlPlanes, ","), strings.Join(workers, ","))
|
|
|
|
o.reportFluxSource(ctx, "")
|
|
|
|
skipEtcd := opts.SkipEtcdSnapshot || o.cfg.Shutdown.SkipEtcdSnapshot
|
|
if !skipEtcd {
|
|
o.bestEffort("etcd snapshot", func() error {
|
|
return o.takeEtcdSnapshot(ctx, o.cfg.ControlPlanes[0])
|
|
})
|
|
}
|
|
|
|
o.bestEffort("suspend flux", func() error { return o.patchFluxSuspendAll(ctx, true) })
|
|
o.bestEffort("scale down apps", func() error { return o.scaleDownApps(ctx) })
|
|
|
|
skipDrain := opts.SkipDrain || o.cfg.Shutdown.SkipDrain
|
|
if !skipDrain {
|
|
o.bestEffort("drain workers", func() error { return o.drainWorkers(ctx, workers) })
|
|
}
|
|
|
|
o.stopWorkers(ctx, workers)
|
|
o.stopControlPlanes(ctx, o.cfg.ControlPlanes)
|
|
if o.cfg.Shutdown.PoweroffEnabled {
|
|
o.bestEffort("poweroff hosts", func() error { return o.poweroffHosts(ctx, workers) })
|
|
}
|
|
o.log.Printf("shutdown flow complete")
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) EstimatedShutdownSeconds() int {
|
|
return o.store.ShutdownP95(o.cfg.Shutdown.DefaultBudgetSeconds)
|
|
}
|
|
|
|
func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) {
|
|
record.EndedAt = time.Now().UTC()
|
|
record.DurationSeconds = int(record.EndedAt.Sub(record.StartedAt).Seconds())
|
|
record.Success = *err == nil
|
|
if *err != nil {
|
|
record.Error = (*err).Error()
|
|
}
|
|
if appendErr := o.store.Append(*record); appendErr != nil {
|
|
o.log.Printf("warning: append run record failed: %v", appendErr)
|
|
}
|
|
}
|
|
|
|
func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) {
|
|
if len(o.cfg.Workers) > 0 {
|
|
return append([]string{}, o.cfg.Workers...), nil
|
|
}
|
|
workers, err := o.discoverWorkers(ctx)
|
|
if err == nil {
|
|
return workers, nil
|
|
}
|
|
fallback := o.fallbackWorkersFromInventory()
|
|
if len(fallback) == 0 {
|
|
return nil, err
|
|
}
|
|
o.log.Printf("warning: worker discovery failed via kubernetes API (%v); falling back to inventory workers=%s", err, strings.Join(fallback, ","))
|
|
return fallback, nil
|
|
}
|
|
|
|
func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) {
|
|
out, err := o.kubectl(ctx, 15*time.Second,
|
|
"get", "nodes",
|
|
"-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master",
|
|
"--no-headers",
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("discover workers: %w", err)
|
|
}
|
|
var workers []string
|
|
for _, line := range lines(out) {
|
|
fields := strings.Fields(line)
|
|
if len(fields) < 3 {
|
|
continue
|
|
}
|
|
if fields[1] == "<none>" && fields[2] == "<none>" {
|
|
workers = append(workers, fields[0])
|
|
}
|
|
}
|
|
if len(workers) == 0 {
|
|
return nil, fmt.Errorf("no workers discovered")
|
|
}
|
|
return workers, nil
|
|
}
|
|
|
|
func (o *Orchestrator) fallbackWorkersFromInventory() []string {
|
|
cp := make(map[string]struct{}, len(o.cfg.ControlPlanes))
|
|
for _, node := range o.cfg.ControlPlanes {
|
|
cp[strings.TrimSpace(node)] = struct{}{}
|
|
}
|
|
|
|
candidates := make(map[string]struct{})
|
|
add := func(node string) {
|
|
name := strings.TrimSpace(node)
|
|
if name == "" {
|
|
return
|
|
}
|
|
if _, isCP := cp[name]; isCP {
|
|
return
|
|
}
|
|
candidates[name] = struct{}{}
|
|
}
|
|
|
|
for _, node := range o.cfg.SSHManagedNodes {
|
|
add(node)
|
|
}
|
|
if len(candidates) == 0 {
|
|
for node := range o.cfg.SSHNodeHosts {
|
|
add(node)
|
|
}
|
|
}
|
|
|
|
workers := make([]string, 0, len(candidates))
|
|
for node := range candidates {
|
|
workers = append(workers, node)
|
|
}
|
|
sort.Strings(workers)
|
|
return workers
|
|
}
|
|
|
|
func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error {
|
|
patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend)
|
|
|
|
ksOut, err := o.kubectl(ctx, 20*time.Second,
|
|
"-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io",
|
|
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}",
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, ks := range lines(ksOut) {
|
|
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch)
|
|
if patchErr != nil {
|
|
o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr)
|
|
}
|
|
}
|
|
|
|
hrOut, err := o.kubectl(ctx, 25*time.Second,
|
|
"get", "helmreleases.helm.toolkit.fluxcd.io", "-A",
|
|
"-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}",
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, hr := range lines(hrOut) {
|
|
parts := strings.SplitN(hr, "/", 2)
|
|
if len(parts) != 2 {
|
|
continue
|
|
}
|
|
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch)
|
|
if patchErr != nil {
|
|
o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) scaleDownApps(ctx context.Context) error {
|
|
targets, err := o.listScalableWorkloads(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := o.writeScaledWorkloadSnapshot(targets); err != nil {
|
|
return err
|
|
}
|
|
if len(targets) == 0 {
|
|
o.log.Printf("scale down apps: no workloads above 0 replicas")
|
|
return nil
|
|
}
|
|
|
|
parallelism := o.cfg.Shutdown.ScaleParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 8
|
|
}
|
|
o.log.Printf("scale down apps targets=%d parallelism=%d", len(targets), parallelism)
|
|
return o.scaleWorkloads(ctx, targets, 0, parallelism)
|
|
}
|
|
|
|
func (o *Orchestrator) restoreScaledApps(ctx context.Context) error {
|
|
snapshot, err := o.readScaledWorkloadSnapshot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if snapshot == nil || len(snapshot.Entries) == 0 {
|
|
o.log.Printf("restore scaled workloads: no snapshot entries to restore")
|
|
return nil
|
|
}
|
|
|
|
parallelism := o.cfg.Shutdown.ScaleParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 8
|
|
}
|
|
o.log.Printf("restore scaled workloads entries=%d parallelism=%d snapshot_at=%s", len(snapshot.Entries), parallelism, snapshot.GeneratedAt.Format(time.RFC3339))
|
|
if err := o.scaleWorkloads(ctx, snapshot.Entries, -1, parallelism); err != nil {
|
|
return err
|
|
}
|
|
if o.runner.DryRun {
|
|
return nil
|
|
}
|
|
if err := os.Remove(o.scaledWorkloadSnapshotPath()); err != nil && !os.IsNotExist(err) {
|
|
return fmt.Errorf("remove scaled workload snapshot: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error) {
|
|
exclude := map[string]struct{}{}
|
|
for _, ns := range o.cfg.ExcludedNamespaces {
|
|
exclude[strings.TrimSpace(ns)] = struct{}{}
|
|
}
|
|
|
|
collect := func(kind string) ([]workloadScaleEntry, error) {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
25*time.Second,
|
|
"get",
|
|
kind,
|
|
"-A",
|
|
"-o",
|
|
"jsonpath={range .items[*]}{.metadata.namespace}{'\\t'}{.metadata.name}{'\\t'}{.spec.replicas}{'\\n'}{end}",
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var entries []workloadScaleEntry
|
|
for _, line := range lines(out) {
|
|
parts := strings.Split(line, "\t")
|
|
if len(parts) < 3 {
|
|
continue
|
|
}
|
|
ns := strings.TrimSpace(parts[0])
|
|
if _, skip := exclude[ns]; skip {
|
|
continue
|
|
}
|
|
replicas, convErr := strconv.Atoi(strings.TrimSpace(parts[2]))
|
|
if convErr != nil || replicas <= 0 {
|
|
continue
|
|
}
|
|
entries = append(entries, workloadScaleEntry{
|
|
Namespace: ns,
|
|
Kind: kind,
|
|
Name: strings.TrimSpace(parts[1]),
|
|
Replicas: replicas,
|
|
})
|
|
}
|
|
return entries, nil
|
|
}
|
|
|
|
deployments, err := collect("deployment")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("collect deployments: %w", err)
|
|
}
|
|
statefulsets, err := collect("statefulset")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("collect statefulsets: %w", err)
|
|
}
|
|
targets := append(deployments, statefulsets...)
|
|
sort.Slice(targets, func(i, j int) bool {
|
|
a, b := targets[i], targets[j]
|
|
if a.Namespace != b.Namespace {
|
|
return a.Namespace < b.Namespace
|
|
}
|
|
if a.Kind != b.Kind {
|
|
return a.Kind < b.Kind
|
|
}
|
|
return a.Name < b.Name
|
|
})
|
|
return targets, nil
|
|
}
|
|
|
|
func (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error {
|
|
if len(entries) == 0 {
|
|
return nil
|
|
}
|
|
if parallelism <= 0 {
|
|
parallelism = 1
|
|
}
|
|
if parallelism > len(entries) {
|
|
parallelism = len(entries)
|
|
}
|
|
|
|
sem := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
errCh := make(chan error, len(entries))
|
|
|
|
for _, entry := range entries {
|
|
entry := entry
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
replicas := entry.Replicas
|
|
if forceReplicas >= 0 {
|
|
replicas = forceReplicas
|
|
}
|
|
if _, err := o.kubectl(
|
|
ctx,
|
|
20*time.Second,
|
|
"-n",
|
|
entry.Namespace,
|
|
"scale",
|
|
entry.Kind,
|
|
entry.Name,
|
|
fmt.Sprintf("--replicas=%d", replicas),
|
|
); err != nil {
|
|
if isNotFoundErr(err) {
|
|
o.log.Printf("warning: skip missing workload while scaling %s/%s/%s", entry.Namespace, entry.Kind, entry.Name)
|
|
return
|
|
}
|
|
errCh <- fmt.Errorf("scale %s/%s/%s -> %d: %w", entry.Namespace, entry.Kind, entry.Name, replicas, err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errCh)
|
|
|
|
errorCount := len(errCh)
|
|
if errorCount == 0 {
|
|
return nil
|
|
}
|
|
errs := make([]string, 0, len(errCh))
|
|
for err := range errCh {
|
|
errs = append(errs, err.Error())
|
|
if len(errs) >= 5 {
|
|
break
|
|
}
|
|
}
|
|
return fmt.Errorf("scaling had %d errors (first: %s)", errorCount, strings.Join(errs, " | "))
|
|
}
|
|
|
|
func (o *Orchestrator) scaledWorkloadSnapshotPath() string {
|
|
return filepath.Join(o.cfg.State.Dir, "scaled-workloads.json")
|
|
}
|
|
|
|
func (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error {
|
|
if o.runner.DryRun {
|
|
return nil
|
|
}
|
|
if err := os.MkdirAll(o.cfg.State.Dir, 0o755); err != nil {
|
|
return fmt.Errorf("ensure state dir: %w", err)
|
|
}
|
|
payload := workloadScaleSnapshot{
|
|
GeneratedAt: time.Now().UTC(),
|
|
Entries: entries,
|
|
}
|
|
b, err := json.MarshalIndent(payload, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("marshal scaled workload snapshot: %w", err)
|
|
}
|
|
if err := os.WriteFile(o.scaledWorkloadSnapshotPath(), b, 0o644); err != nil {
|
|
return fmt.Errorf("write scaled workload snapshot: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error) {
|
|
if o.runner.DryRun {
|
|
return nil, nil
|
|
}
|
|
b, err := os.ReadFile(o.scaledWorkloadSnapshotPath())
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("read scaled workload snapshot: %w", err)
|
|
}
|
|
var snapshot workloadScaleSnapshot
|
|
if err := json.Unmarshal(b, &snapshot); err != nil {
|
|
return nil, fmt.Errorf("decode scaled workload snapshot: %w", err)
|
|
}
|
|
return &snapshot, nil
|
|
}
|
|
|
|
func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error {
|
|
total := len(workers)
|
|
if total == 0 {
|
|
return nil
|
|
}
|
|
parallelism := o.cfg.Shutdown.DrainParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 6
|
|
}
|
|
if parallelism > total {
|
|
parallelism = total
|
|
}
|
|
|
|
o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism)
|
|
sem := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
errCh := make(chan error, total)
|
|
|
|
for idx, node := range workers {
|
|
idx := idx
|
|
node := node
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
o.log.Printf("drain worker %d/%d: %s", idx+1, total, node)
|
|
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil {
|
|
o.log.Printf("warning: cordon %s failed: %v", node, err)
|
|
}
|
|
if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil {
|
|
errCh <- fmt.Errorf("drain %s failed: %w", node, err)
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errCh)
|
|
if len(errCh) == 0 {
|
|
return nil
|
|
}
|
|
count := len(errCh)
|
|
samples := []string{}
|
|
for err := range errCh {
|
|
samples = append(samples, err.Error())
|
|
if len(samples) >= 4 {
|
|
break
|
|
}
|
|
}
|
|
return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | "))
|
|
}
|
|
|
|
func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error {
|
|
for _, node := range workers {
|
|
if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil {
|
|
o.log.Printf("warning: uncordon %s failed: %v", node, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
|
|
o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true")
|
|
}
|
|
|
|
func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
|
|
o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true")
|
|
}
|
|
|
|
func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
|
|
o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true")
|
|
}
|
|
|
|
func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
|
|
o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true")
|
|
}
|
|
|
|
func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) {
|
|
if len(nodes) == 0 {
|
|
return
|
|
}
|
|
parallelism := o.cfg.Shutdown.SSHParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 8
|
|
}
|
|
if parallelism > len(nodes) {
|
|
parallelism = len(nodes)
|
|
}
|
|
|
|
sem := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
for _, node := range nodes {
|
|
node := node
|
|
if !o.sshManaged(node) {
|
|
o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node)
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
o.bestEffort(action+" on "+node, func() error {
|
|
_, err := o.ssh(ctx, node, command)
|
|
return err
|
|
})
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error {
|
|
if !o.sshManaged(node) {
|
|
return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node)
|
|
}
|
|
name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405")
|
|
_, err := o.ssh(ctx, node, "sudo k3s etcd-snapshot save --name "+name)
|
|
return err
|
|
}
|
|
|
|
func (o *Orchestrator) waitForAPI(ctx context.Context, attempts int, sleep time.Duration) error {
|
|
if o.runner.DryRun {
|
|
return nil
|
|
}
|
|
for i := 0; i < attempts; i++ {
|
|
_, err := o.kubectl(ctx, 5*time.Second, "version", "--request-timeout=5s")
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
time.Sleep(sleep)
|
|
}
|
|
return fmt.Errorf("kubernetes API did not become reachable within timeout")
|
|
}
|
|
|
|
func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) {
|
|
out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}")
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return strings.Contains(out, "True"), nil
|
|
}
|
|
|
|
func (o *Orchestrator) reportFluxSource(ctx context.Context, forceBranch string) {
|
|
urlOut, urlErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.url}")
|
|
if urlErr == nil {
|
|
o.log.Printf("flux-source-url=%s", strings.TrimSpace(urlOut))
|
|
}
|
|
branchOut, branchErr := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.spec.ref.branch}")
|
|
if branchErr == nil {
|
|
branch := strings.TrimSpace(branchOut)
|
|
o.log.Printf("flux-source-branch=%s", branch)
|
|
if forceBranch == "" && branch != o.cfg.ExpectedFluxBranch {
|
|
o.log.Printf("warning: flux source branch is '%s', expected '%s'", branch, o.cfg.ExpectedFluxBranch)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *Orchestrator) bootstrapLocal(ctx context.Context) error {
|
|
failures := 0
|
|
for _, rel := range o.cfg.LocalBootstrapPaths {
|
|
full := filepath.Join(o.cfg.IACRepoPath, rel)
|
|
o.log.Printf("local bootstrap apply -k %s", full)
|
|
if o.runner.DryRun {
|
|
continue
|
|
}
|
|
if _, err := o.kubectl(ctx, 2*time.Minute, "apply", "-k", full); err != nil {
|
|
o.log.Printf("warning: local bootstrap apply failed at %s: %v", full, err)
|
|
o.log.Printf("local bootstrap fallback render/apply with LoadRestrictionsNone for %s", full)
|
|
if fallbackErr := o.applyKustomizeFallback(ctx, full); fallbackErr != nil {
|
|
failures++
|
|
o.log.Printf("warning: local bootstrap fallback failed at %s: %v", full, fallbackErr)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
if failures == len(o.cfg.LocalBootstrapPaths) {
|
|
return fmt.Errorf("local bootstrap apply failed for every configured path (%d total)", failures)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) applyKustomizeFallback(ctx context.Context, full string) error {
|
|
cmd := fmt.Sprintf("kubectl kustomize --load-restrictor=LoadRestrictionsNone %q | kubectl apply -f -", full)
|
|
if _, err := o.runSensitive(ctx, 3*time.Minute, "sh", "-lc", cmd); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) waitForFluxSourceReady(ctx context.Context, window time.Duration) (bool, error) {
|
|
if o.runner.DryRun {
|
|
return true, nil
|
|
}
|
|
deadline := time.Now().Add(window)
|
|
for {
|
|
ready, err := o.fluxSourceReady(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if ready {
|
|
return true, nil
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return false, nil
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *Orchestrator) resumeFluxAndReconcile(ctx context.Context) error {
|
|
if err := o.patchFluxSuspendAll(ctx, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
|
if _, err := o.kubectl(
|
|
ctx,
|
|
25*time.Second,
|
|
"-n", "flux-system",
|
|
"annotate",
|
|
"kustomizations.kustomize.toolkit.fluxcd.io",
|
|
"--all",
|
|
"reconcile.fluxcd.io/requestedAt="+now,
|
|
"--overwrite",
|
|
); err != nil {
|
|
o.log.Printf("warning: annotate kustomizations for reconcile failed: %v", err)
|
|
}
|
|
if _, err := o.kubectl(
|
|
ctx,
|
|
25*time.Second,
|
|
"annotate",
|
|
"--all-namespaces",
|
|
"helmreleases.helm.toolkit.fluxcd.io",
|
|
"--all",
|
|
"reconcile.fluxcd.io/requestedAt="+now,
|
|
"--overwrite",
|
|
); err != nil {
|
|
o.log.Printf("warning: annotate helmreleases for reconcile failed: %v", err)
|
|
}
|
|
|
|
if o.runner.CommandExists("flux") {
|
|
sourceCmd := []string{"reconcile", "source", "git", "flux-system", "-n", "flux-system", "--timeout=60s"}
|
|
if _, err := o.run(ctx, 75*time.Second, "flux", sourceCmd...); err != nil {
|
|
o.log.Printf("warning: flux command failed (%s): %v", strings.Join(sourceCmd, " "), err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args ...string) (string, error) {
|
|
return o.run(ctx, timeout, "kubectl", args...)
|
|
}
|
|
|
|
func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) {
|
|
host := node
|
|
if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" {
|
|
host = strings.TrimSpace(mapped)
|
|
}
|
|
sshUser := o.cfg.SSHUser
|
|
if override, ok := o.cfg.SSHNodeUsers[node]; ok && strings.TrimSpace(override) != "" {
|
|
sshUser = strings.TrimSpace(override)
|
|
}
|
|
target := host
|
|
if sshUser != "" {
|
|
target = sshUser + "@" + host
|
|
}
|
|
sshConfigFile := o.resolveSSHConfigFile()
|
|
sshIdentity := o.resolveSSHIdentityFile()
|
|
baseArgs := []string{
|
|
"-o", "BatchMode=yes",
|
|
"-o", "ConnectTimeout=8",
|
|
"-o", "StrictHostKeyChecking=accept-new",
|
|
}
|
|
if sshConfigFile != "" {
|
|
baseArgs = append(baseArgs, "-F", sshConfigFile)
|
|
}
|
|
if sshIdentity != "" {
|
|
baseArgs = append(baseArgs, "-i", sshIdentity)
|
|
}
|
|
if o.cfg.SSHPort > 0 {
|
|
baseArgs = append(baseArgs, "-p", strconv.Itoa(o.cfg.SSHPort))
|
|
}
|
|
attempts := make([][]string, 0, 2)
|
|
attemptNames := make([]string, 0, 2)
|
|
if o.cfg.SSHJumpHost != "" {
|
|
jump := o.cfg.SSHJumpHost
|
|
if o.cfg.SSHJumpUser != "" {
|
|
jump = o.cfg.SSHJumpUser + "@" + jump
|
|
}
|
|
if o.cfg.SSHPort > 0 && !strings.Contains(jump, ":") {
|
|
jump = fmt.Sprintf("%s:%d", jump, o.cfg.SSHPort)
|
|
}
|
|
withJump := append([]string{}, baseArgs...)
|
|
withJump = append(withJump, "-J", jump, target, command)
|
|
attempts = append(attempts, withJump)
|
|
attemptNames = append(attemptNames, "jump")
|
|
}
|
|
direct := append([]string{}, baseArgs...)
|
|
direct = append(direct, target, command)
|
|
attempts = append(attempts, direct)
|
|
attemptNames = append(attemptNames, "direct")
|
|
|
|
var lastOut string
|
|
var lastErr error
|
|
for i, args := range attempts {
|
|
out, err := o.run(ctx, 45*time.Second, "ssh", args...)
|
|
if err == nil {
|
|
if i > 0 {
|
|
o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i])
|
|
}
|
|
return out, nil
|
|
}
|
|
lastOut = out
|
|
lastErr = err
|
|
if i < len(attempts)-1 {
|
|
o.log.Printf("warning: ssh %s path failed for %s: %v; trying %s path", attemptNames[i], node, err, attemptNames[i+1])
|
|
}
|
|
}
|
|
return lastOut, lastErr
|
|
}
|
|
|
|
func (o *Orchestrator) run(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
runCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
return o.runner.Run(runCtx, name, args...)
|
|
}
|
|
|
|
func (o *Orchestrator) runSensitive(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
runCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
cmd := exec.CommandContext(runCtx, name, args...)
|
|
cmd.Env = os.Environ()
|
|
if o.runner.Kubeconfig != "" {
|
|
cmd.Env = append(cmd.Env, "KUBECONFIG="+o.runner.Kubeconfig)
|
|
}
|
|
out, err := cmd.CombinedOutput()
|
|
trimmed := strings.TrimSpace(string(out))
|
|
if err != nil {
|
|
if trimmed == "" {
|
|
return "", fmt.Errorf("%s failed: %w", name, err)
|
|
}
|
|
return trimmed, fmt.Errorf("%s failed: %w", name, err)
|
|
}
|
|
return trimmed, nil
|
|
}
|
|
|
|
func lines(in string) []string {
|
|
in = strings.TrimSpace(in)
|
|
if in == "" {
|
|
return nil
|
|
}
|
|
parts := strings.Split(in, "\n")
|
|
out := make([]string, 0, len(parts))
|
|
for _, p := range parts {
|
|
v := strings.TrimSpace(p)
|
|
if v != "" {
|
|
out = append(out, v)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (o *Orchestrator) sshManaged(node string) bool {
|
|
if len(o.cfg.SSHManagedNodes) == 0 {
|
|
return true
|
|
}
|
|
for _, allowed := range o.cfg.SSHManagedNodes {
|
|
if strings.TrimSpace(allowed) == node {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (o *Orchestrator) resolveSSHConfigFile() string {
|
|
if strings.TrimSpace(o.cfg.SSHConfigFile) != "" {
|
|
return strings.TrimSpace(o.cfg.SSHConfigFile)
|
|
}
|
|
candidates := []string{
|
|
"/home/atlas/.ssh/config",
|
|
"/home/tethys/.ssh/config",
|
|
}
|
|
for _, p := range candidates {
|
|
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
|
|
return p
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (o *Orchestrator) resolveSSHIdentityFile() string {
|
|
if strings.TrimSpace(o.cfg.SSHIdentityFile) != "" {
|
|
return strings.TrimSpace(o.cfg.SSHIdentityFile)
|
|
}
|
|
candidates := []string{
|
|
"/home/atlas/.ssh/id_ed25519",
|
|
"/home/tethys/.ssh/id_ed25519",
|
|
}
|
|
for _, p := range candidates {
|
|
if stat, err := os.Stat(p); err == nil && !stat.IsDir() {
|
|
return p
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (o *Orchestrator) bestEffort(name string, fn func() error) {
|
|
if err := fn(); err != nil {
|
|
o.log.Printf("warning: %s: %v", name, err)
|
|
}
|
|
}
|
|
|
|
func (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error) {
|
|
missing := []string{}
|
|
for _, w := range criticalStartupWorkloads {
|
|
ready, err := o.workloadReady(ctx, w)
|
|
if err != nil {
|
|
if isNotFoundErr(err) {
|
|
missing = append(missing, fmt.Sprintf("%s/%s/%s(not found)", w.Namespace, w.Kind, w.Name))
|
|
continue
|
|
}
|
|
return nil, fmt.Errorf("check %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
|
|
}
|
|
if !ready {
|
|
missing = append(missing, fmt.Sprintf("%s/%s/%s", w.Namespace, w.Kind, w.Name))
|
|
}
|
|
}
|
|
return missing, nil
|
|
}
|
|
|
|
func (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error {
|
|
for _, w := range criticalStartupWorkloads {
|
|
if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil {
|
|
if isNotFoundErr(err) {
|
|
o.log.Printf("warning: startup workload missing, skipping scale: %s/%s/%s", w.Namespace, w.Kind, w.Name)
|
|
continue
|
|
}
|
|
return fmt.Errorf("scale %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
|
|
}
|
|
if err := o.waitWorkloadReady(ctx, w); err != nil {
|
|
if isNotFoundErr(err) {
|
|
o.log.Printf("warning: startup workload missing during readiness wait: %s/%s/%s", w.Namespace, w.Kind, w.Name)
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error {
|
|
_, err := o.kubectl(
|
|
ctx,
|
|
45*time.Second,
|
|
"-n",
|
|
w.Namespace,
|
|
"scale",
|
|
w.Kind,
|
|
w.Name,
|
|
fmt.Sprintf("--replicas=%d", replicas),
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error {
|
|
if w.Namespace == "vault" && w.Kind == "statefulset" && w.Name == "vault" {
|
|
return o.waitVaultReady(ctx, w)
|
|
}
|
|
|
|
timeout := "240s"
|
|
if w.Kind == "statefulset" {
|
|
timeout = "360s"
|
|
}
|
|
_, err := o.kubectl(
|
|
ctx,
|
|
7*time.Minute,
|
|
"-n",
|
|
w.Namespace,
|
|
"rollout",
|
|
"status",
|
|
fmt.Sprintf("%s/%s", w.Kind, w.Name),
|
|
"--timeout="+timeout,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error {
|
|
if o.runner.DryRun {
|
|
return nil
|
|
}
|
|
|
|
deadline := time.Now().Add(7 * time.Minute)
|
|
var lastErr error
|
|
for time.Now().Before(deadline) {
|
|
ready, err := o.workloadReady(ctx, w)
|
|
if err == nil && ready {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
lastErr = err
|
|
}
|
|
|
|
if err := o.ensureVaultUnsealed(ctx); err != nil {
|
|
lastErr = err
|
|
o.log.Printf("warning: vault readiness still blocked: %v", err)
|
|
}
|
|
|
|
ready, err = o.workloadReady(ctx, w)
|
|
if err == nil && ready {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
lastErr = err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
|
|
if lastErr != nil {
|
|
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr)
|
|
}
|
|
return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name)
|
|
}
|
|
|
|
func (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error {
|
|
if o.runner.DryRun {
|
|
return nil
|
|
}
|
|
|
|
phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}")
|
|
if err != nil {
|
|
return fmt.Errorf("vault pod phase check failed: %w", err)
|
|
}
|
|
if strings.TrimSpace(phase) != "Running" {
|
|
return fmt.Errorf("vault-0 pod phase is %q", strings.TrimSpace(phase))
|
|
}
|
|
|
|
sealed, err := o.vaultSealed(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !sealed {
|
|
return nil
|
|
}
|
|
|
|
unsealKey, err := o.vaultUnsealKey(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for attempt := 1; attempt <= 5; attempt++ {
|
|
o.log.Printf("vault is sealed; attempting auto-unseal (%d/5)", attempt)
|
|
if _, err := o.runSensitive(
|
|
ctx,
|
|
30*time.Second,
|
|
"kubectl",
|
|
"-n", "vault",
|
|
"exec", "vault-0", "--",
|
|
"vault", "operator", "unseal", unsealKey,
|
|
); err != nil {
|
|
return fmt.Errorf("vault unseal attempt %d failed: %w", attempt, err)
|
|
}
|
|
|
|
sealed, err = o.vaultSealed(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !sealed {
|
|
o.log.Printf("vault auto-unseal succeeded")
|
|
return nil
|
|
}
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
|
|
return fmt.Errorf("vault remained sealed after 5 auto-unseal attempts")
|
|
}
|
|
|
|
func (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error) {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
25*time.Second,
|
|
"-n", "vault",
|
|
"exec", "vault-0", "--",
|
|
"sh", "-lc",
|
|
"VAULT_ADDR=http://127.0.0.1:8200 vault status -format=json 2>/dev/null || true",
|
|
)
|
|
if err != nil {
|
|
return false, fmt.Errorf("vault status check failed: %w", err)
|
|
}
|
|
sealed, err := parseVaultSealed(out)
|
|
if err != nil {
|
|
return false, fmt.Errorf("parse vault status: %w", err)
|
|
}
|
|
return sealed, nil
|
|
}
|
|
|
|
func parseVaultSealed(raw string) (bool, error) {
|
|
trimmed := strings.TrimSpace(raw)
|
|
if trimmed == "" {
|
|
return false, fmt.Errorf("empty vault status output")
|
|
}
|
|
start := strings.Index(trimmed, "{")
|
|
end := strings.LastIndex(trimmed, "}")
|
|
if start < 0 || end < 0 || end < start {
|
|
return false, fmt.Errorf("vault status payload missing JSON object")
|
|
}
|
|
payload := trimmed[start : end+1]
|
|
|
|
type vaultStatus struct {
|
|
Sealed bool `json:"sealed"`
|
|
}
|
|
var st vaultStatus
|
|
if err := json.Unmarshal([]byte(payload), &st); err != nil {
|
|
return false, err
|
|
}
|
|
return st.Sealed, nil
|
|
}
|
|
|
|
func (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error) {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
15*time.Second,
|
|
"-n", "vault",
|
|
"get", "secret", "vault-init",
|
|
"-o", "jsonpath={.data.unseal_key_b64}",
|
|
)
|
|
if err != nil {
|
|
return "", fmt.Errorf("read vault-init secret: %w", err)
|
|
}
|
|
decoded, err := base64.StdEncoding.DecodeString(strings.TrimSpace(out))
|
|
if err != nil {
|
|
return "", fmt.Errorf("decode vault-init unseal_key_b64: %w", err)
|
|
}
|
|
key := strings.TrimSpace(string(decoded))
|
|
if key == "" {
|
|
return "", fmt.Errorf("vault-init unseal key is empty")
|
|
}
|
|
return key, nil
|
|
}
|
|
|
|
func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
20*time.Second,
|
|
"-n",
|
|
w.Namespace,
|
|
"get",
|
|
w.Kind,
|
|
w.Name,
|
|
"-o",
|
|
"jsonpath={.status.readyReplicas}",
|
|
)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
raw := strings.TrimSpace(out)
|
|
if raw == "" || raw == "<no value>" {
|
|
return false, nil
|
|
}
|
|
n, err := strconv.Atoi(raw)
|
|
if err != nil {
|
|
return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err)
|
|
}
|
|
return n >= 1, nil
|
|
}
|
|
|
|
func isNotFoundErr(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
msg := strings.ToLower(err.Error())
|
|
return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)")
|
|
}
|
|
|
|
func (o *Orchestrator) poweroffHosts(ctx context.Context, workers []string) error {
|
|
delay := o.cfg.Shutdown.PoweroffDelaySeconds
|
|
if delay <= 0 {
|
|
delay = 25
|
|
}
|
|
|
|
localNames := map[string]struct{}{}
|
|
if hn, err := os.Hostname(); err == nil && strings.TrimSpace(hn) != "" {
|
|
localNames[strings.TrimSpace(hn)] = struct{}{}
|
|
}
|
|
if o.cfg.SSHUser != "" {
|
|
localNames[o.cfg.SSHUser] = struct{}{}
|
|
}
|
|
|
|
hostSet := map[string]struct{}{}
|
|
for _, n := range o.cfg.ControlPlanes {
|
|
hostSet[n] = struct{}{}
|
|
}
|
|
for _, n := range workers {
|
|
hostSet[n] = struct{}{}
|
|
}
|
|
for _, n := range o.cfg.Shutdown.ExtraPoweroffHosts {
|
|
if strings.TrimSpace(n) != "" {
|
|
hostSet[strings.TrimSpace(n)] = struct{}{}
|
|
}
|
|
}
|
|
|
|
hosts := make([]string, 0, len(hostSet))
|
|
for h := range hostSet {
|
|
hosts = append(hosts, h)
|
|
}
|
|
sort.Strings(hosts)
|
|
|
|
remoteCmd := fmt.Sprintf(`sudo nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &`, delay)
|
|
for _, host := range hosts {
|
|
host = strings.TrimSpace(host)
|
|
if host == "" {
|
|
continue
|
|
}
|
|
if _, isLocal := localNames[host]; isLocal {
|
|
continue
|
|
}
|
|
o.bestEffort("schedule poweroff on "+host, func() error {
|
|
_, err := o.ssh(ctx, host, remoteCmd)
|
|
return err
|
|
})
|
|
}
|
|
|
|
if o.cfg.Shutdown.PoweroffLocalHost {
|
|
o.bestEffort("schedule local host poweroff", func() error {
|
|
_, err := o.run(ctx, 5*time.Second, "sh", "-c", fmt.Sprintf("nohup sh -c 'sleep %d; systemctl poweroff' >/dev/null 2>&1 &", delay+10))
|
|
return err
|
|
})
|
|
}
|
|
return nil
|
|
}
|