ananke/internal/cluster/orchestrator_drain.go

242 lines
8.1 KiB
Go

package cluster
import (
"context"
"fmt"
"strings"
"sync"
"time"
)
type drainFailure struct {
node string
err error
details string
}
// drainWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error {
total := len(workers)
if total == 0 {
return nil
}
parallelism := o.cfg.Shutdown.DrainParallelism
if parallelism <= 0 {
parallelism = 6
}
if parallelism > total {
parallelism = total
}
o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism)
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
errCh := make(chan drainFailure, total)
for idx, node := range workers {
idx := idx
node := node
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
o.log.Printf("drain worker %d/%d: %s", idx+1, total, node)
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil {
o.log.Printf("warning: cordon %s failed: %v", node, err)
}
if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil {
details := o.drainNodeDiagnostics(ctx, node)
errCh <- drainFailure{
node: node,
err: fmt.Errorf("drain %s failed: %w", node, err),
details: details,
}
return
}
}()
}
wg.Wait()
close(errCh)
if len(errCh) == 0 {
return nil
}
failures := make([]drainFailure, 0, len(errCh))
for failure := range errCh {
failures = append(failures, failure)
}
count := len(failures)
samples := []string{}
for _, failure := range failures {
msg := failure.err.Error()
if strings.TrimSpace(failure.details) != "" {
msg = fmt.Sprintf("%s (details: %s)", msg, failure.details)
}
samples = append(samples, msg)
if len(samples) >= 4 {
break
}
}
return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | "))
}
// drainNodeDiagnostics runs one orchestration or CLI step.
// Signature: (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string {
out, err := o.kubectl(
ctx,
20*time.Second,
"get",
"pods",
"-A",
"--field-selector", "spec.nodeName="+node,
"-o",
"custom-columns=NS:.metadata.namespace,NAME:.metadata.name,PHASE:.status.phase,OWNER:.metadata.ownerReferences[0].kind",
"--no-headers",
)
if err != nil {
if strings.TrimSpace(out) == "" {
return fmt.Sprintf("diagnostics unavailable: %v", err)
}
return fmt.Sprintf("diagnostics unavailable: %v (%s)", err, strings.Join(lines(out), "; "))
}
blockers := make([]string, 0, 6)
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 4 {
continue
}
namespace := fields[0]
name := fields[1]
phase := fields[2]
owner := fields[3]
if strings.EqualFold(owner, "DaemonSet") {
continue
}
if strings.EqualFold(phase, "Succeeded") || strings.EqualFold(phase, "Failed") {
continue
}
blockers = append(blockers, fmt.Sprintf("%s/%s(phase=%s owner=%s)", namespace, name, phase, owner))
if len(blockers) >= 6 {
break
}
}
if len(blockers) == 0 {
return "no non-daemonset blocking pods found on node"
}
return strings.Join(blockers, ", ")
}
// uncordonWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error {
for _, node := range workers {
if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil {
o.log.Printf("warning: uncordon %s failed: %v", node, err)
}
}
return nil
}
// stopWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) stopWorkers(ctx context.Context, workers []string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true")
}
// startWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) startWorkers(ctx context.Context, workers []string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true")
}
// stopControlPlanes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true")
}
// startControlPlanes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true")
}
// runSSHAcrossNodes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) {
if len(nodes) == 0 {
return
}
parallelism := o.cfg.Shutdown.SSHParallelism
if parallelism <= 0 {
parallelism = 8
}
if parallelism > len(nodes) {
parallelism = len(nodes)
}
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
for _, node := range nodes {
node := node
if !o.sshManaged(node) {
o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node)
continue
}
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
o.bestEffort(action+" on "+node, func() error {
_, err := o.ssh(ctx, node, command)
return err
})
}()
}
wg.Wait()
}
// takeEtcdSnapshot runs one orchestration or CLI step.
// Signature: (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error {
if !o.sshManaged(node) {
return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node)
}
name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405")
_, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "save", "--name", name)
return err
}
// latestEtcdSnapshotPath runs one orchestration or CLI step.
// Signature: (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error) {
if !o.sshManaged(node) {
return "", fmt.Errorf("cannot resolve etcd snapshot on %s: node not in ssh_managed_nodes", node)
}
out, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls")
if err != nil {
return "", fmt.Errorf("resolve latest etcd snapshot on %s: %w", node, err)
}
snapshot := parseSnapshotPathFromEtcdSnapshotList(out)
if snapshot == "" {
return "", fmt.Errorf("no etcd snapshots found on %s under /var/lib/rancher/k3s/server/db/snapshots", node)
}
return snapshot, nil
}