package cluster import ( "context" "fmt" "strings" "sync" "time" ) type drainFailure struct { node string err error details string } // drainWorkers runs one orchestration or CLI step. // Signature: (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error { total := len(workers) if total == 0 { return nil } parallelism := o.cfg.Shutdown.DrainParallelism if parallelism <= 0 { parallelism = 6 } if parallelism > total { parallelism = total } o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism) sem := make(chan struct{}, parallelism) var wg sync.WaitGroup errCh := make(chan drainFailure, total) for idx, node := range workers { idx := idx node := node wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() o.log.Printf("drain worker %d/%d: %s", idx+1, total, node) if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil { o.log.Printf("warning: cordon %s failed: %v", node, err) } if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil { details := o.drainNodeDiagnostics(ctx, node) errCh <- drainFailure{ node: node, err: fmt.Errorf("drain %s failed: %w", node, err), details: details, } return } }() } wg.Wait() close(errCh) if len(errCh) == 0 { return nil } failures := make([]drainFailure, 0, len(errCh)) for failure := range errCh { failures = append(failures, failure) } count := len(failures) samples := []string{} for _, failure := range failures { msg := failure.err.Error() if strings.TrimSpace(failure.details) != "" { msg = fmt.Sprintf("%s (details: %s)", msg, failure.details) } samples = append(samples, msg) if len(samples) >= 4 { break } } return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | ")) } // drainNodeDiagnostics runs one orchestration or CLI step. // Signature: (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string { out, err := o.kubectl( ctx, 20*time.Second, "get", "pods", "-A", "--field-selector", "spec.nodeName="+node, "-o", "custom-columns=NS:.metadata.namespace,NAME:.metadata.name,PHASE:.status.phase,OWNER:.metadata.ownerReferences[0].kind", "--no-headers", ) if err != nil { if strings.TrimSpace(out) == "" { return fmt.Sprintf("diagnostics unavailable: %v", err) } return fmt.Sprintf("diagnostics unavailable: %v (%s)", err, strings.Join(lines(out), "; ")) } blockers := make([]string, 0, 6) for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 4 { continue } namespace := fields[0] name := fields[1] phase := fields[2] owner := fields[3] if strings.EqualFold(owner, "DaemonSet") { continue } if strings.EqualFold(phase, "Succeeded") || strings.EqualFold(phase, "Failed") { continue } blockers = append(blockers, fmt.Sprintf("%s/%s(phase=%s owner=%s)", namespace, name, phase, owner)) if len(blockers) >= 6 { break } } if len(blockers) == 0 { return "no non-daemonset blocking pods found on node" } return strings.Join(blockers, ", ") } // uncordonWorkers runs one orchestration or CLI step. // Signature: (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error { for _, node := range workers { if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil { o.log.Printf("warning: uncordon %s failed: %v", node, err) } } return nil } // stopWorkers runs one orchestration or CLI step. // Signature: (o *Orchestrator) stopWorkers(ctx context.Context, workers []string). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) { o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true") } // startWorkers runs one orchestration or CLI step. // Signature: (o *Orchestrator) startWorkers(ctx context.Context, workers []string). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) { o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true") } // stopControlPlanes runs one orchestration or CLI step. // Signature: (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) { o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true") } // startControlPlanes runs one orchestration or CLI step. // Signature: (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) { o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true") } // runSSHAcrossNodes runs one orchestration or CLI step. // Signature: (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) { if len(nodes) == 0 { return } parallelism := o.cfg.Shutdown.SSHParallelism if parallelism <= 0 { parallelism = 8 } if parallelism > len(nodes) { parallelism = len(nodes) } sem := make(chan struct{}, parallelism) var wg sync.WaitGroup for _, node := range nodes { node := node if !o.sshManaged(node) { o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node) continue } wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() o.bestEffort(action+" on "+node, func() error { _, err := o.ssh(ctx, node, command) return err }) }() } wg.Wait() } // takeEtcdSnapshot runs one orchestration or CLI step. // Signature: (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error { if !o.sshManaged(node) { return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node) } name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405") _, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "save", "--name", name) return err } // latestEtcdSnapshotPath runs one orchestration or CLI step. // Signature: (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error) { if !o.sshManaged(node) { return "", fmt.Errorf("cannot resolve etcd snapshot on %s: node not in ssh_managed_nodes", node) } out, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls") if err != nil { return "", fmt.Errorf("resolve latest etcd snapshot on %s: %w", node, err) } snapshot := parseSnapshotPathFromEtcdSnapshotList(out) if snapshot == "" { return "", fmt.Errorf("no etcd snapshots found on %s under /var/lib/rancher/k3s/server/db/snapshots", node) } return snapshot, nil }