242 lines
8.1 KiB
Go
242 lines
8.1 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type drainFailure struct {
|
|
node string
|
|
err error
|
|
details string
|
|
}
|
|
|
|
// drainWorkers runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error {
|
|
total := len(workers)
|
|
if total == 0 {
|
|
return nil
|
|
}
|
|
parallelism := o.cfg.Shutdown.DrainParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 6
|
|
}
|
|
if parallelism > total {
|
|
parallelism = total
|
|
}
|
|
|
|
o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism)
|
|
sem := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
errCh := make(chan drainFailure, total)
|
|
|
|
for idx, node := range workers {
|
|
idx := idx
|
|
node := node
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
o.log.Printf("drain worker %d/%d: %s", idx+1, total, node)
|
|
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil {
|
|
o.log.Printf("warning: cordon %s failed: %v", node, err)
|
|
}
|
|
if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil {
|
|
details := o.drainNodeDiagnostics(ctx, node)
|
|
errCh <- drainFailure{
|
|
node: node,
|
|
err: fmt.Errorf("drain %s failed: %w", node, err),
|
|
details: details,
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errCh)
|
|
if len(errCh) == 0 {
|
|
return nil
|
|
}
|
|
failures := make([]drainFailure, 0, len(errCh))
|
|
for failure := range errCh {
|
|
failures = append(failures, failure)
|
|
}
|
|
count := len(failures)
|
|
samples := []string{}
|
|
for _, failure := range failures {
|
|
msg := failure.err.Error()
|
|
if strings.TrimSpace(failure.details) != "" {
|
|
msg = fmt.Sprintf("%s (details: %s)", msg, failure.details)
|
|
}
|
|
samples = append(samples, msg)
|
|
if len(samples) >= 4 {
|
|
break
|
|
}
|
|
}
|
|
return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | "))
|
|
}
|
|
|
|
// drainNodeDiagnostics runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
20*time.Second,
|
|
"get",
|
|
"pods",
|
|
"-A",
|
|
"--field-selector", "spec.nodeName="+node,
|
|
"-o",
|
|
"custom-columns=NS:.metadata.namespace,NAME:.metadata.name,PHASE:.status.phase,OWNER:.metadata.ownerReferences[0].kind",
|
|
"--no-headers",
|
|
)
|
|
if err != nil {
|
|
if strings.TrimSpace(out) == "" {
|
|
return fmt.Sprintf("diagnostics unavailable: %v", err)
|
|
}
|
|
return fmt.Sprintf("diagnostics unavailable: %v (%s)", err, strings.Join(lines(out), "; "))
|
|
}
|
|
|
|
blockers := make([]string, 0, 6)
|
|
for _, line := range lines(out) {
|
|
fields := strings.Fields(line)
|
|
if len(fields) < 4 {
|
|
continue
|
|
}
|
|
namespace := fields[0]
|
|
name := fields[1]
|
|
phase := fields[2]
|
|
owner := fields[3]
|
|
if strings.EqualFold(owner, "DaemonSet") {
|
|
continue
|
|
}
|
|
if strings.EqualFold(phase, "Succeeded") || strings.EqualFold(phase, "Failed") {
|
|
continue
|
|
}
|
|
blockers = append(blockers, fmt.Sprintf("%s/%s(phase=%s owner=%s)", namespace, name, phase, owner))
|
|
if len(blockers) >= 6 {
|
|
break
|
|
}
|
|
}
|
|
if len(blockers) == 0 {
|
|
return "no non-daemonset blocking pods found on node"
|
|
}
|
|
return strings.Join(blockers, ", ")
|
|
}
|
|
|
|
// uncordonWorkers runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error {
|
|
for _, node := range workers {
|
|
if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil {
|
|
o.log.Printf("warning: uncordon %s failed: %v", node, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// stopWorkers runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) stopWorkers(ctx context.Context, workers []string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
|
|
o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true")
|
|
}
|
|
|
|
// startWorkers runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) startWorkers(ctx context.Context, workers []string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
|
|
o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true")
|
|
}
|
|
|
|
// stopControlPlanes runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
|
|
o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true")
|
|
}
|
|
|
|
// startControlPlanes runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
|
|
o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true")
|
|
}
|
|
|
|
// runSSHAcrossNodes runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) {
|
|
if len(nodes) == 0 {
|
|
return
|
|
}
|
|
parallelism := o.cfg.Shutdown.SSHParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 8
|
|
}
|
|
if parallelism > len(nodes) {
|
|
parallelism = len(nodes)
|
|
}
|
|
|
|
sem := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
for _, node := range nodes {
|
|
node := node
|
|
if !o.sshManaged(node) {
|
|
o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node)
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
o.bestEffort(action+" on "+node, func() error {
|
|
_, err := o.ssh(ctx, node, command)
|
|
return err
|
|
})
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// takeEtcdSnapshot runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error {
|
|
if !o.sshManaged(node) {
|
|
return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node)
|
|
}
|
|
name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405")
|
|
_, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "save", "--name", name)
|
|
return err
|
|
}
|
|
|
|
// latestEtcdSnapshotPath runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error) {
|
|
if !o.sshManaged(node) {
|
|
return "", fmt.Errorf("cannot resolve etcd snapshot on %s: node not in ssh_managed_nodes", node)
|
|
}
|
|
out, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls")
|
|
if err != nil {
|
|
return "", fmt.Errorf("resolve latest etcd snapshot on %s: %w", node, err)
|
|
}
|
|
snapshot := parseSnapshotPathFromEtcdSnapshotList(out)
|
|
if snapshot == "" {
|
|
return "", fmt.Errorf("no etcd snapshots found on %s under /var/lib/rancher/k3s/server/db/snapshots", node)
|
|
}
|
|
return snapshot, nil
|
|
}
|