ananke/internal/cluster/orchestrator_drain.go

372 lines
13 KiB
Go
Raw Normal View History

package cluster
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
)
type drainFailure struct {
node string
err error
details string
}
// drainWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error {
total := len(workers)
if total == 0 {
return nil
}
parallelism := o.cfg.Shutdown.DrainParallelism
if parallelism <= 0 {
parallelism = 6
}
if parallelism > total {
parallelism = total
}
o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism)
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
errCh := make(chan drainFailure, total)
for idx, node := range workers {
idx := idx
node := node
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
o.log.Printf("drain worker %d/%d: %s", idx+1, total, node)
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil {
o.log.Printf("warning: cordon %s failed: %v", node, err)
}
if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil {
details := o.drainNodeDiagnostics(ctx, node)
errCh <- drainFailure{
node: node,
err: fmt.Errorf("drain %s failed: %w", node, err),
details: details,
}
return
}
}()
}
wg.Wait()
close(errCh)
if len(errCh) == 0 {
return nil
}
failures := make([]drainFailure, 0, len(errCh))
for failure := range errCh {
failures = append(failures, failure)
}
count := len(failures)
samples := []string{}
for _, failure := range failures {
msg := failure.err.Error()
if strings.TrimSpace(failure.details) != "" {
msg = fmt.Sprintf("%s (details: %s)", msg, failure.details)
}
samples = append(samples, msg)
if len(samples) >= 4 {
break
}
}
return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | "))
}
// drainNodeDiagnostics runs one orchestration or CLI step.
// Signature: (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string {
out, err := o.kubectl(
ctx,
20*time.Second,
"get",
"pods",
"-A",
"--field-selector", "spec.nodeName="+node,
"-o",
"custom-columns=NS:.metadata.namespace,NAME:.metadata.name,PHASE:.status.phase,OWNER:.metadata.ownerReferences[0].kind",
"--no-headers",
)
if err != nil {
if strings.TrimSpace(out) == "" {
return fmt.Sprintf("diagnostics unavailable: %v", err)
}
return fmt.Sprintf("diagnostics unavailable: %v (%s)", err, strings.Join(lines(out), "; "))
}
blockers := make([]string, 0, 6)
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 4 {
continue
}
namespace := fields[0]
name := fields[1]
phase := fields[2]
owner := fields[3]
if strings.EqualFold(owner, "DaemonSet") {
continue
}
if strings.EqualFold(phase, "Succeeded") || strings.EqualFold(phase, "Failed") {
continue
}
blockers = append(blockers, fmt.Sprintf("%s/%s(phase=%s owner=%s)", namespace, name, phase, owner))
if len(blockers) >= 6 {
break
}
}
if len(blockers) == 0 {
return "no non-daemonset blocking pods found on node"
}
return strings.Join(blockers, ", ")
}
// uncordonWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error {
for _, node := range workers {
if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil {
o.log.Printf("warning: uncordon %s failed: %v", node, err)
}
}
return nil
}
// ensureLonghornEncryptedHostPrereqs runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureLonghornEncryptedHostPrereqs(ctx context.Context, workers []string) ([]string, error).
// Why: encrypted Longhorn PVCs fail at kubelet mount time when a storage host
// lacks host cryptsetup; startup must quarantine those nodes before workloads
// are scheduled there.
func (o *Orchestrator) ensureLonghornEncryptedHostPrereqs(ctx context.Context, workers []string) ([]string, error) {
longhornHosts, err := o.longhornHostNodes(ctx)
if err != nil {
return workers, err
}
if len(longhornHosts) == 0 {
return workers, nil
}
ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
exempt := makeStringSet(o.cfg.Startup.LonghornCryptsetupExemptNodes)
unsafe := map[string]struct{}{}
var errs []string
for node := range longhornHosts {
if _, skip := ignored[node]; skip {
continue
}
if _, skip := exempt[node]; skip {
o.log.Printf("skip cryptsetup preflight on longhorn host %s: configured exemption", node)
continue
}
if !o.sshManaged(node) {
o.log.Printf("warning: keeping longhorn host %s cordoned because encrypted-volume prerequisites cannot be verified without SSH management", node)
if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil {
errs = append(errs, fmt.Sprintf("%s cordon after unverifiable cryptsetup prerequisite: %v", node, cordonErr))
}
unsafe[node] = struct{}{}
continue
}
if checkErr := o.ensureHostCryptsetup(ctx, node); checkErr != nil {
o.log.Printf("warning: keeping longhorn host %s cordoned after cryptsetup prerequisite check failed: %v", node, checkErr)
if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil {
errs = append(errs, fmt.Sprintf("%s cordon after cryptsetup prerequisite failure: %v", node, cordonErr))
}
unsafe[node] = struct{}{}
continue
}
}
guarded := make([]string, 0, len(workers))
for _, worker := range workers {
node := strings.TrimSpace(worker)
if node == "" {
continue
}
if _, blocked := unsafe[node]; blocked {
continue
}
guarded = append(guarded, node)
}
if len(errs) > 0 {
return guarded, fmt.Errorf("%s", strings.Join(errs, "; "))
}
return guarded, nil
}
// uncordonLonghornCryptsetupExemptNodes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) uncordonLonghornCryptsetupExemptNodes(ctx context.Context) error.
// Why: special Longhorn hosts that only run non-encrypted local workloads, such
// as Veles/Oceanus, must recover from stale cordons without weakening the
// encrypted-volume guard for normal storage workers.
func (o *Orchestrator) uncordonLonghornCryptsetupExemptNodes(ctx context.Context) error {
exempt := makeStringSet(o.cfg.Startup.LonghornCryptsetupExemptNodes)
if len(exempt) == 0 {
return nil
}
longhornHosts, err := o.longhornHostNodes(ctx)
if err != nil {
return err
}
ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
nodes := make([]string, 0, len(exempt))
for node := range exempt {
if _, skip := ignored[node]; skip {
continue
}
if _, ok := longhornHosts[node]; ok {
nodes = append(nodes, node)
}
}
sort.Strings(nodes)
if len(nodes) == 0 {
return nil
}
o.log.Printf("uncordon longhorn cryptsetup-exempt hosts: %s", strings.Join(nodes, ","))
o.noteStartupAutoHeal(fmt.Sprintf("uncordoned longhorn cryptsetup-exempt hosts: %s", strings.Join(nodes, ",")))
return o.uncordonWorkers(ctx, nodes)
}
// longhornHostNodes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) longhornHostNodes(ctx context.Context) (map[string]struct{}, error).
// Why: the live node label captures storage hosts that may not be in Ananke's
// static worker list, so startup quarantine decisions should follow the
// cluster's actual scheduling surface.
func (o *Orchestrator) longhornHostNodes(ctx context.Context) (map[string]struct{}, error) {
out, err := o.kubectl(ctx, 20*time.Second,
"get", "nodes",
"-l", "longhorn-host=true",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}",
)
if err != nil {
return nil, fmt.Errorf("query longhorn host nodes: %w", err)
}
nodes := map[string]struct{}{}
for _, line := range lines(out) {
node := strings.TrimSpace(line)
if node != "" {
nodes[node] = struct{}{}
}
}
if len(nodes) > 0 {
return nodes, nil
}
for node, labels := range o.cfg.Startup.RequiredNodeLabels {
if strings.EqualFold(strings.TrimSpace(labels["longhorn-host"]), "true") {
name := strings.TrimSpace(node)
if name != "" {
nodes[name] = struct{}{}
}
}
}
return nodes, nil
}
// stopWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) stopWorkers(ctx context.Context, workers []string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true")
}
// startWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) startWorkers(ctx context.Context, workers []string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true")
}
// stopControlPlanes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true")
}
// startControlPlanes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true")
}
// runSSHAcrossNodes runs one orchestration or CLI step.
// Signature: (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) {
if len(nodes) == 0 {
return
}
parallelism := o.cfg.Shutdown.SSHParallelism
if parallelism <= 0 {
parallelism = 8
}
if parallelism > len(nodes) {
parallelism = len(nodes)
}
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
for _, node := range nodes {
node := node
if !o.sshManaged(node) {
o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node)
continue
}
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
o.bestEffort(action+" on "+node, func() error {
_, err := o.ssh(ctx, node, command)
return err
})
}()
}
wg.Wait()
}
// takeEtcdSnapshot runs one orchestration or CLI step.
// Signature: (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error {
if !o.sshManaged(node) {
return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node)
}
name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405")
_, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "save", "--name", name)
return err
}
// latestEtcdSnapshotPath runs one orchestration or CLI step.
// Signature: (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error) {
if !o.sshManaged(node) {
return "", fmt.Errorf("cannot resolve etcd snapshot on %s: node not in ssh_managed_nodes", node)
}
out, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls")
if err != nil {
return "", fmt.Errorf("resolve latest etcd snapshot on %s: %w", node, err)
}
snapshot := parseSnapshotPathFromEtcdSnapshotList(out)
if snapshot == "" {
return "", fmt.Errorf("no etcd snapshots found on %s under /var/lib/rancher/k3s/server/db/snapshots", node)
}
return snapshot, nil
}