333 lines
11 KiB
Go
333 lines
11 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type drainFailure struct {
|
|
node string
|
|
err error
|
|
details string
|
|
}
|
|
|
|
// drainWorkers runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) drainWorkers(ctx context.Context, workers []string) error {
|
|
total := len(workers)
|
|
if total == 0 {
|
|
return nil
|
|
}
|
|
parallelism := o.cfg.Shutdown.DrainParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 6
|
|
}
|
|
if parallelism > total {
|
|
parallelism = total
|
|
}
|
|
|
|
o.log.Printf("drain workers total=%d parallelism=%d", total, parallelism)
|
|
sem := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
errCh := make(chan drainFailure, total)
|
|
|
|
for idx, node := range workers {
|
|
idx := idx
|
|
node := node
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
|
|
o.log.Printf("drain worker %d/%d: %s", idx+1, total, node)
|
|
if _, err := o.kubectl(ctx, 20*time.Second, "cordon", node); err != nil {
|
|
o.log.Printf("warning: cordon %s failed: %v", node, err)
|
|
}
|
|
if _, err := o.kubectl(ctx, 3*time.Minute, "drain", node, "--ignore-daemonsets", "--delete-emptydir-data", "--grace-period=30", "--timeout=180s"); err != nil {
|
|
details := o.drainNodeDiagnostics(ctx, node)
|
|
errCh <- drainFailure{
|
|
node: node,
|
|
err: fmt.Errorf("drain %s failed: %w", node, err),
|
|
details: details,
|
|
}
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errCh)
|
|
if len(errCh) == 0 {
|
|
return nil
|
|
}
|
|
failures := make([]drainFailure, 0, len(errCh))
|
|
for failure := range errCh {
|
|
failures = append(failures, failure)
|
|
}
|
|
count := len(failures)
|
|
samples := []string{}
|
|
for _, failure := range failures {
|
|
msg := failure.err.Error()
|
|
if strings.TrimSpace(failure.details) != "" {
|
|
msg = fmt.Sprintf("%s (details: %s)", msg, failure.details)
|
|
}
|
|
samples = append(samples, msg)
|
|
if len(samples) >= 4 {
|
|
break
|
|
}
|
|
}
|
|
return fmt.Errorf("drain workers had %d errors (first: %s)", count, strings.Join(samples, " | "))
|
|
}
|
|
|
|
// drainNodeDiagnostics runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) drainNodeDiagnostics(ctx context.Context, node string) string {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
20*time.Second,
|
|
"get",
|
|
"pods",
|
|
"-A",
|
|
"--field-selector", "spec.nodeName="+node,
|
|
"-o",
|
|
"custom-columns=NS:.metadata.namespace,NAME:.metadata.name,PHASE:.status.phase,OWNER:.metadata.ownerReferences[0].kind",
|
|
"--no-headers",
|
|
)
|
|
if err != nil {
|
|
if strings.TrimSpace(out) == "" {
|
|
return fmt.Sprintf("diagnostics unavailable: %v", err)
|
|
}
|
|
return fmt.Sprintf("diagnostics unavailable: %v (%s)", err, strings.Join(lines(out), "; "))
|
|
}
|
|
|
|
blockers := make([]string, 0, 6)
|
|
for _, line := range lines(out) {
|
|
fields := strings.Fields(line)
|
|
if len(fields) < 4 {
|
|
continue
|
|
}
|
|
namespace := fields[0]
|
|
name := fields[1]
|
|
phase := fields[2]
|
|
owner := fields[3]
|
|
if strings.EqualFold(owner, "DaemonSet") {
|
|
continue
|
|
}
|
|
if strings.EqualFold(phase, "Succeeded") || strings.EqualFold(phase, "Failed") {
|
|
continue
|
|
}
|
|
blockers = append(blockers, fmt.Sprintf("%s/%s(phase=%s owner=%s)", namespace, name, phase, owner))
|
|
if len(blockers) >= 6 {
|
|
break
|
|
}
|
|
}
|
|
if len(blockers) == 0 {
|
|
return "no non-daemonset blocking pods found on node"
|
|
}
|
|
return strings.Join(blockers, ", ")
|
|
}
|
|
|
|
// uncordonWorkers runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) uncordonWorkers(ctx context.Context, workers []string) error {
|
|
for _, node := range workers {
|
|
if _, err := o.kubectl(ctx, 20*time.Second, "uncordon", node); err != nil {
|
|
o.log.Printf("warning: uncordon %s failed: %v", node, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ensureLonghornEncryptedHostPrereqs runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) ensureLonghornEncryptedHostPrereqs(ctx context.Context, workers []string) ([]string, error).
|
|
// Why: encrypted Longhorn PVCs fail at kubelet mount time when a storage host
|
|
// lacks host cryptsetup; startup must quarantine those nodes before workloads
|
|
// are scheduled there.
|
|
func (o *Orchestrator) ensureLonghornEncryptedHostPrereqs(ctx context.Context, workers []string) ([]string, error) {
|
|
longhornHosts, err := o.longhornHostNodes(ctx)
|
|
if err != nil {
|
|
return workers, err
|
|
}
|
|
if len(longhornHosts) == 0 {
|
|
return workers, nil
|
|
}
|
|
|
|
ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
|
|
unsafe := map[string]struct{}{}
|
|
var errs []string
|
|
for node := range longhornHosts {
|
|
if _, skip := ignored[node]; skip {
|
|
continue
|
|
}
|
|
if !o.sshManaged(node) {
|
|
o.log.Printf("warning: keeping longhorn host %s cordoned because encrypted-volume prerequisites cannot be verified without SSH management", node)
|
|
if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil {
|
|
errs = append(errs, fmt.Sprintf("%s cordon after unverifiable cryptsetup prerequisite: %v", node, cordonErr))
|
|
}
|
|
unsafe[node] = struct{}{}
|
|
continue
|
|
}
|
|
if checkErr := o.ensureHostCryptsetup(ctx, node); checkErr != nil {
|
|
o.log.Printf("warning: keeping longhorn host %s cordoned after cryptsetup prerequisite check failed: %v", node, checkErr)
|
|
if cordonErr := o.cordonNodeForMissingCryptsetup(ctx, node); cordonErr != nil {
|
|
errs = append(errs, fmt.Sprintf("%s cordon after cryptsetup prerequisite failure: %v", node, cordonErr))
|
|
}
|
|
unsafe[node] = struct{}{}
|
|
continue
|
|
}
|
|
}
|
|
|
|
guarded := make([]string, 0, len(workers))
|
|
for _, worker := range workers {
|
|
node := strings.TrimSpace(worker)
|
|
if node == "" {
|
|
continue
|
|
}
|
|
if _, blocked := unsafe[node]; blocked {
|
|
continue
|
|
}
|
|
guarded = append(guarded, node)
|
|
}
|
|
if len(errs) > 0 {
|
|
return guarded, fmt.Errorf("%s", strings.Join(errs, "; "))
|
|
}
|
|
return guarded, nil
|
|
}
|
|
|
|
// longhornHostNodes runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) longhornHostNodes(ctx context.Context) (map[string]struct{}, error).
|
|
// Why: the live node label captures storage hosts that may not be in Ananke's
|
|
// static worker list, so startup quarantine decisions should follow the
|
|
// cluster's actual scheduling surface.
|
|
func (o *Orchestrator) longhornHostNodes(ctx context.Context) (map[string]struct{}, error) {
|
|
out, err := o.kubectl(ctx, 20*time.Second,
|
|
"get", "nodes",
|
|
"-l", "longhorn-host=true",
|
|
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}",
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query longhorn host nodes: %w", err)
|
|
}
|
|
nodes := map[string]struct{}{}
|
|
for _, line := range lines(out) {
|
|
node := strings.TrimSpace(line)
|
|
if node != "" {
|
|
nodes[node] = struct{}{}
|
|
}
|
|
}
|
|
if len(nodes) > 0 {
|
|
return nodes, nil
|
|
}
|
|
for node, labels := range o.cfg.Startup.RequiredNodeLabels {
|
|
if strings.EqualFold(strings.TrimSpace(labels["longhorn-host"]), "true") {
|
|
name := strings.TrimSpace(node)
|
|
if name != "" {
|
|
nodes[name] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
return nodes, nil
|
|
}
|
|
|
|
// stopWorkers runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) stopWorkers(ctx context.Context, workers []string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) stopWorkers(ctx context.Context, workers []string) {
|
|
o.runSSHAcrossNodes(ctx, workers, "stop k3s-agent", "sudo systemctl stop k3s-agent || true")
|
|
}
|
|
|
|
// startWorkers runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) startWorkers(ctx context.Context, workers []string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) startWorkers(ctx context.Context, workers []string) {
|
|
o.runSSHAcrossNodes(ctx, workers, "start k3s-agent", "sudo systemctl start k3s-agent || true")
|
|
}
|
|
|
|
// stopControlPlanes runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) stopControlPlanes(ctx context.Context, cps []string) {
|
|
o.runSSHAcrossNodes(ctx, cps, "stop k3s", "sudo systemctl stop k3s || true")
|
|
}
|
|
|
|
// startControlPlanes runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) startControlPlanes(ctx context.Context, cps []string) {
|
|
o.runSSHAcrossNodes(ctx, cps, "start k3s", "sudo systemctl start k3s || true")
|
|
}
|
|
|
|
// runSSHAcrossNodes runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) runSSHAcrossNodes(ctx context.Context, nodes []string, action, command string) {
|
|
if len(nodes) == 0 {
|
|
return
|
|
}
|
|
parallelism := o.cfg.Shutdown.SSHParallelism
|
|
if parallelism <= 0 {
|
|
parallelism = 8
|
|
}
|
|
if parallelism > len(nodes) {
|
|
parallelism = len(nodes)
|
|
}
|
|
|
|
sem := make(chan struct{}, parallelism)
|
|
var wg sync.WaitGroup
|
|
for _, node := range nodes {
|
|
node := node
|
|
if !o.sshManaged(node) {
|
|
o.log.Printf("skip %s on %s: node not in ssh_managed_nodes", action, node)
|
|
continue
|
|
}
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
sem <- struct{}{}
|
|
defer func() { <-sem }()
|
|
o.bestEffort(action+" on "+node, func() error {
|
|
_, err := o.ssh(ctx, node, command)
|
|
return err
|
|
})
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// takeEtcdSnapshot runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) takeEtcdSnapshot(ctx context.Context, node string) error {
|
|
if !o.sshManaged(node) {
|
|
return fmt.Errorf("cannot run etcd snapshot on %s: node not in ssh_managed_nodes", node)
|
|
}
|
|
name := "pre-shutdown-" + time.Now().UTC().Format("20060102-150405")
|
|
_, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "save", "--name", name)
|
|
return err
|
|
}
|
|
|
|
// latestEtcdSnapshotPath runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) latestEtcdSnapshotPath(ctx context.Context, node string) (string, error) {
|
|
if !o.sshManaged(node) {
|
|
return "", fmt.Errorf("cannot resolve etcd snapshot on %s: node not in ssh_managed_nodes", node)
|
|
}
|
|
out, err := o.runSudoK3S(ctx, node, "etcd-snapshot", "ls")
|
|
if err != nil {
|
|
return "", fmt.Errorf("resolve latest etcd snapshot on %s: %w", node, err)
|
|
}
|
|
snapshot := parseSnapshotPathFromEtcdSnapshotList(out)
|
|
if snapshot == "" {
|
|
return "", fmt.Errorf("no etcd snapshots found on %s under /var/lib/rancher/k3s/server/db/snapshots", node)
|
|
}
|
|
return snapshot, nil
|
|
}
|