380 lines
12 KiB
Go
380 lines
12 KiB
Go
|
|
package cluster
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"fmt"
|
||
|
|
"os"
|
||
|
|
"path/filepath"
|
||
|
|
"sort"
|
||
|
|
"strconv"
|
||
|
|
"strings"
|
||
|
|
"sync"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
// effectiveWorkers runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error).
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) {
|
||
|
|
if len(o.cfg.Workers) > 0 {
|
||
|
|
return append([]string{}, o.cfg.Workers...), nil
|
||
|
|
}
|
||
|
|
workers, err := o.discoverWorkers(ctx)
|
||
|
|
if err == nil {
|
||
|
|
return workers, nil
|
||
|
|
}
|
||
|
|
fallback := o.fallbackWorkersFromInventory()
|
||
|
|
if len(fallback) == 0 {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
o.log.Printf("warning: worker discovery failed via kubernetes API (%v); falling back to inventory workers=%s", err, strings.Join(fallback, ","))
|
||
|
|
return fallback, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// discoverWorkers runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error).
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) {
|
||
|
|
out, err := o.kubectl(ctx, 15*time.Second,
|
||
|
|
"get", "nodes",
|
||
|
|
"-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master",
|
||
|
|
"--no-headers",
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
return nil, fmt.Errorf("discover workers: %w", err)
|
||
|
|
}
|
||
|
|
var workers []string
|
||
|
|
for _, line := range lines(out) {
|
||
|
|
fields := strings.Fields(line)
|
||
|
|
if len(fields) < 3 {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
if fields[1] == "<none>" && fields[2] == "<none>" {
|
||
|
|
workers = append(workers, fields[0])
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if len(workers) == 0 {
|
||
|
|
return nil, fmt.Errorf("no workers discovered")
|
||
|
|
}
|
||
|
|
return workers, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// fallbackWorkersFromInventory runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) fallbackWorkersFromInventory() []string.
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) fallbackWorkersFromInventory() []string {
|
||
|
|
cp := make(map[string]struct{}, len(o.cfg.ControlPlanes))
|
||
|
|
for _, node := range o.cfg.ControlPlanes {
|
||
|
|
cp[strings.TrimSpace(node)] = struct{}{}
|
||
|
|
}
|
||
|
|
|
||
|
|
candidates := make(map[string]struct{})
|
||
|
|
add := func(node string) {
|
||
|
|
name := strings.TrimSpace(node)
|
||
|
|
if name == "" {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
if _, isCP := cp[name]; isCP {
|
||
|
|
return
|
||
|
|
}
|
||
|
|
candidates[name] = struct{}{}
|
||
|
|
}
|
||
|
|
|
||
|
|
for _, node := range o.cfg.SSHManagedNodes {
|
||
|
|
add(node)
|
||
|
|
}
|
||
|
|
if len(candidates) == 0 {
|
||
|
|
for node := range o.cfg.SSHNodeHosts {
|
||
|
|
add(node)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
workers := make([]string, 0, len(candidates))
|
||
|
|
for node := range candidates {
|
||
|
|
workers = append(workers, node)
|
||
|
|
}
|
||
|
|
sort.Strings(workers)
|
||
|
|
return workers
|
||
|
|
}
|
||
|
|
|
||
|
|
// patchFluxSuspendAll runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error.
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error {
|
||
|
|
patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend)
|
||
|
|
|
||
|
|
ksOut, err := o.kubectl(ctx, 20*time.Second,
|
||
|
|
"-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io",
|
||
|
|
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}",
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
for _, ks := range lines(ksOut) {
|
||
|
|
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch)
|
||
|
|
if patchErr != nil {
|
||
|
|
o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
hrOut, err := o.kubectl(ctx, 25*time.Second,
|
||
|
|
"get", "helmreleases.helm.toolkit.fluxcd.io", "-A",
|
||
|
|
"-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}",
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
for _, hr := range lines(hrOut) {
|
||
|
|
parts := strings.SplitN(hr, "/", 2)
|
||
|
|
if len(parts) != 2 {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch)
|
||
|
|
if patchErr != nil {
|
||
|
|
o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// scaleDownApps runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) scaleDownApps(ctx context.Context) error.
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) scaleDownApps(ctx context.Context) error {
|
||
|
|
targets, err := o.listScalableWorkloads(ctx)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if err := o.writeScaledWorkloadSnapshot(targets); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if len(targets) == 0 {
|
||
|
|
o.log.Printf("scale down apps: no workloads above 0 replicas")
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
parallelism := o.cfg.Shutdown.ScaleParallelism
|
||
|
|
if parallelism <= 0 {
|
||
|
|
parallelism = 8
|
||
|
|
}
|
||
|
|
o.log.Printf("scale down apps targets=%d parallelism=%d", len(targets), parallelism)
|
||
|
|
return o.scaleWorkloads(ctx, targets, 0, parallelism)
|
||
|
|
}
|
||
|
|
|
||
|
|
// restoreScaledApps runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) restoreScaledApps(ctx context.Context) error.
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) restoreScaledApps(ctx context.Context) error {
|
||
|
|
snapshot, err := o.readScaledWorkloadSnapshot()
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if snapshot == nil || len(snapshot.Entries) == 0 {
|
||
|
|
o.log.Printf("restore scaled workloads: no snapshot entries to restore")
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
parallelism := o.cfg.Shutdown.ScaleParallelism
|
||
|
|
if parallelism <= 0 {
|
||
|
|
parallelism = 8
|
||
|
|
}
|
||
|
|
o.log.Printf("restore scaled workloads entries=%d parallelism=%d snapshot_at=%s", len(snapshot.Entries), parallelism, snapshot.GeneratedAt.Format(time.RFC3339))
|
||
|
|
if err := o.scaleWorkloads(ctx, snapshot.Entries, -1, parallelism); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
if o.runner.DryRun {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
if err := os.Remove(o.scaledWorkloadSnapshotPath()); err != nil && !os.IsNotExist(err) {
|
||
|
|
return fmt.Errorf("remove scaled workload snapshot: %w", err)
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// listScalableWorkloads runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error).
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error) {
|
||
|
|
exclude := map[string]struct{}{}
|
||
|
|
for _, ns := range o.cfg.ExcludedNamespaces {
|
||
|
|
exclude[strings.TrimSpace(ns)] = struct{}{}
|
||
|
|
}
|
||
|
|
|
||
|
|
collect := func(kind string) ([]workloadScaleEntry, error) {
|
||
|
|
out, err := o.kubectl(
|
||
|
|
ctx,
|
||
|
|
25*time.Second,
|
||
|
|
"get",
|
||
|
|
kind,
|
||
|
|
"-A",
|
||
|
|
"-o",
|
||
|
|
"jsonpath={range .items[*]}{.metadata.namespace}{'\\t'}{.metadata.name}{'\\t'}{.spec.replicas}{'\\n'}{end}",
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
return nil, err
|
||
|
|
}
|
||
|
|
var entries []workloadScaleEntry
|
||
|
|
for _, line := range lines(out) {
|
||
|
|
parts := strings.Split(line, "\t")
|
||
|
|
if len(parts) < 3 {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
ns := strings.TrimSpace(parts[0])
|
||
|
|
if _, skip := exclude[ns]; skip {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
replicas, convErr := strconv.Atoi(strings.TrimSpace(parts[2]))
|
||
|
|
if convErr != nil || replicas <= 0 {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
entries = append(entries, workloadScaleEntry{
|
||
|
|
Namespace: ns,
|
||
|
|
Kind: kind,
|
||
|
|
Name: strings.TrimSpace(parts[1]),
|
||
|
|
Replicas: replicas,
|
||
|
|
})
|
||
|
|
}
|
||
|
|
return entries, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
deployments, err := collect("deployment")
|
||
|
|
if err != nil {
|
||
|
|
return nil, fmt.Errorf("collect deployments: %w", err)
|
||
|
|
}
|
||
|
|
statefulsets, err := collect("statefulset")
|
||
|
|
if err != nil {
|
||
|
|
return nil, fmt.Errorf("collect statefulsets: %w", err)
|
||
|
|
}
|
||
|
|
targets := append(deployments, statefulsets...)
|
||
|
|
sort.Slice(targets, func(i, j int) bool {
|
||
|
|
a, b := targets[i], targets[j]
|
||
|
|
if a.Namespace != b.Namespace {
|
||
|
|
return a.Namespace < b.Namespace
|
||
|
|
}
|
||
|
|
if a.Kind != b.Kind {
|
||
|
|
return a.Kind < b.Kind
|
||
|
|
}
|
||
|
|
return a.Name < b.Name
|
||
|
|
})
|
||
|
|
return targets, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// scaleWorkloads runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error.
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error {
|
||
|
|
if len(entries) == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
if parallelism <= 0 {
|
||
|
|
parallelism = 1
|
||
|
|
}
|
||
|
|
if parallelism > len(entries) {
|
||
|
|
parallelism = len(entries)
|
||
|
|
}
|
||
|
|
|
||
|
|
sem := make(chan struct{}, parallelism)
|
||
|
|
var wg sync.WaitGroup
|
||
|
|
errCh := make(chan error, len(entries))
|
||
|
|
|
||
|
|
for _, entry := range entries {
|
||
|
|
entry := entry
|
||
|
|
wg.Add(1)
|
||
|
|
go func() {
|
||
|
|
defer wg.Done()
|
||
|
|
sem <- struct{}{}
|
||
|
|
defer func() { <-sem }()
|
||
|
|
|
||
|
|
replicas := entry.Replicas
|
||
|
|
if forceReplicas >= 0 {
|
||
|
|
replicas = forceReplicas
|
||
|
|
}
|
||
|
|
if _, err := o.kubectl(
|
||
|
|
ctx,
|
||
|
|
20*time.Second,
|
||
|
|
"-n",
|
||
|
|
entry.Namespace,
|
||
|
|
"scale",
|
||
|
|
entry.Kind,
|
||
|
|
entry.Name,
|
||
|
|
fmt.Sprintf("--replicas=%d", replicas),
|
||
|
|
); err != nil {
|
||
|
|
if isNotFoundErr(err) {
|
||
|
|
o.log.Printf("warning: skip missing workload while scaling %s/%s/%s", entry.Namespace, entry.Kind, entry.Name)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
errCh <- fmt.Errorf("scale %s/%s/%s -> %d: %w", entry.Namespace, entry.Kind, entry.Name, replicas, err)
|
||
|
|
}
|
||
|
|
}()
|
||
|
|
}
|
||
|
|
|
||
|
|
wg.Wait()
|
||
|
|
close(errCh)
|
||
|
|
|
||
|
|
errorCount := len(errCh)
|
||
|
|
if errorCount == 0 {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
errs := make([]string, 0, len(errCh))
|
||
|
|
for err := range errCh {
|
||
|
|
errs = append(errs, err.Error())
|
||
|
|
if len(errs) >= 5 {
|
||
|
|
break
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return fmt.Errorf("scaling had %d errors (first: %s)", errorCount, strings.Join(errs, " | "))
|
||
|
|
}
|
||
|
|
|
||
|
|
// scaledWorkloadSnapshotPath runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) scaledWorkloadSnapshotPath() string.
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) scaledWorkloadSnapshotPath() string {
|
||
|
|
return filepath.Join(o.cfg.State.Dir, "scaled-workloads.json")
|
||
|
|
}
|
||
|
|
|
||
|
|
// writeScaledWorkloadSnapshot runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error.
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error {
|
||
|
|
if o.runner.DryRun {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
if err := os.MkdirAll(o.cfg.State.Dir, 0o755); err != nil {
|
||
|
|
return fmt.Errorf("ensure state dir: %w", err)
|
||
|
|
}
|
||
|
|
payload := workloadScaleSnapshot{
|
||
|
|
GeneratedAt: time.Now().UTC(),
|
||
|
|
Entries: entries,
|
||
|
|
}
|
||
|
|
b, err := json.MarshalIndent(payload, "", " ")
|
||
|
|
if err != nil {
|
||
|
|
return fmt.Errorf("marshal scaled workload snapshot: %w", err)
|
||
|
|
}
|
||
|
|
if err := os.WriteFile(o.scaledWorkloadSnapshotPath(), b, 0o644); err != nil {
|
||
|
|
return fmt.Errorf("write scaled workload snapshot: %w", err)
|
||
|
|
}
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// readScaledWorkloadSnapshot runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error).
|
||
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
||
|
|
func (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error) {
|
||
|
|
if o.runner.DryRun {
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
b, err := os.ReadFile(o.scaledWorkloadSnapshotPath())
|
||
|
|
if err != nil {
|
||
|
|
if os.IsNotExist(err) {
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
return nil, fmt.Errorf("read scaled workload snapshot: %w", err)
|
||
|
|
}
|
||
|
|
var snapshot workloadScaleSnapshot
|
||
|
|
if err := json.Unmarshal(b, &snapshot); err != nil {
|
||
|
|
return nil, fmt.Errorf("decode scaled workload snapshot: %w", err)
|
||
|
|
}
|
||
|
|
return &snapshot, nil
|
||
|
|
}
|