ananke/internal/cluster/orchestrator_scaling.go

380 lines
12 KiB
Go
Raw Permalink Normal View History

package cluster
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
)
// effectiveWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) {
if len(o.cfg.Workers) > 0 {
return append([]string{}, o.cfg.Workers...), nil
}
workers, err := o.discoverWorkers(ctx)
if err == nil {
return workers, nil
}
fallback := o.fallbackWorkersFromInventory()
if len(fallback) == 0 {
return nil, err
}
o.log.Printf("warning: worker discovery failed via kubernetes API (%v); falling back to inventory workers=%s", err, strings.Join(fallback, ","))
return fallback, nil
}
// discoverWorkers runs one orchestration or CLI step.
// Signature: (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 15*time.Second,
"get", "nodes",
"-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master",
"--no-headers",
)
if err != nil {
return nil, fmt.Errorf("discover workers: %w", err)
}
var workers []string
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 3 {
continue
}
if fields[1] == "<none>" && fields[2] == "<none>" {
workers = append(workers, fields[0])
}
}
if len(workers) == 0 {
return nil, fmt.Errorf("no workers discovered")
}
return workers, nil
}
// fallbackWorkersFromInventory runs one orchestration or CLI step.
// Signature: (o *Orchestrator) fallbackWorkersFromInventory() []string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) fallbackWorkersFromInventory() []string {
cp := make(map[string]struct{}, len(o.cfg.ControlPlanes))
for _, node := range o.cfg.ControlPlanes {
cp[strings.TrimSpace(node)] = struct{}{}
}
candidates := make(map[string]struct{})
add := func(node string) {
name := strings.TrimSpace(node)
if name == "" {
return
}
if _, isCP := cp[name]; isCP {
return
}
candidates[name] = struct{}{}
}
for _, node := range o.cfg.SSHManagedNodes {
add(node)
}
if len(candidates) == 0 {
for node := range o.cfg.SSHNodeHosts {
add(node)
}
}
workers := make([]string, 0, len(candidates))
for node := range candidates {
workers = append(workers, node)
}
sort.Strings(workers)
return workers
}
// patchFluxSuspendAll runs one orchestration or CLI step.
// Signature: (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error {
patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend)
ksOut, err := o.kubectl(ctx, 20*time.Second,
"-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io",
"-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}",
)
if err != nil {
return err
}
for _, ks := range lines(ksOut) {
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch)
if patchErr != nil {
o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr)
}
}
hrOut, err := o.kubectl(ctx, 25*time.Second,
"get", "helmreleases.helm.toolkit.fluxcd.io", "-A",
"-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}",
)
if err != nil {
return err
}
for _, hr := range lines(hrOut) {
parts := strings.SplitN(hr, "/", 2)
if len(parts) != 2 {
continue
}
_, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch)
if patchErr != nil {
o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr)
}
}
return nil
}
// scaleDownApps runs one orchestration or CLI step.
// Signature: (o *Orchestrator) scaleDownApps(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) scaleDownApps(ctx context.Context) error {
targets, err := o.listScalableWorkloads(ctx)
if err != nil {
return err
}
if err := o.writeScaledWorkloadSnapshot(targets); err != nil {
return err
}
if len(targets) == 0 {
o.log.Printf("scale down apps: no workloads above 0 replicas")
return nil
}
parallelism := o.cfg.Shutdown.ScaleParallelism
if parallelism <= 0 {
parallelism = 8
}
o.log.Printf("scale down apps targets=%d parallelism=%d", len(targets), parallelism)
return o.scaleWorkloads(ctx, targets, 0, parallelism)
}
// restoreScaledApps runs one orchestration or CLI step.
// Signature: (o *Orchestrator) restoreScaledApps(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) restoreScaledApps(ctx context.Context) error {
snapshot, err := o.readScaledWorkloadSnapshot()
if err != nil {
return err
}
if snapshot == nil || len(snapshot.Entries) == 0 {
o.log.Printf("restore scaled workloads: no snapshot entries to restore")
return nil
}
parallelism := o.cfg.Shutdown.ScaleParallelism
if parallelism <= 0 {
parallelism = 8
}
o.log.Printf("restore scaled workloads entries=%d parallelism=%d snapshot_at=%s", len(snapshot.Entries), parallelism, snapshot.GeneratedAt.Format(time.RFC3339))
if err := o.scaleWorkloads(ctx, snapshot.Entries, -1, parallelism); err != nil {
return err
}
if o.runner.DryRun {
return nil
}
if err := os.Remove(o.scaledWorkloadSnapshotPath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove scaled workload snapshot: %w", err)
}
return nil
}
// listScalableWorkloads runs one orchestration or CLI step.
// Signature: (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error) {
exclude := map[string]struct{}{}
for _, ns := range o.cfg.ExcludedNamespaces {
exclude[strings.TrimSpace(ns)] = struct{}{}
}
collect := func(kind string) ([]workloadScaleEntry, error) {
out, err := o.kubectl(
ctx,
25*time.Second,
"get",
kind,
"-A",
"-o",
"jsonpath={range .items[*]}{.metadata.namespace}{'\\t'}{.metadata.name}{'\\t'}{.spec.replicas}{'\\n'}{end}",
)
if err != nil {
return nil, err
}
var entries []workloadScaleEntry
for _, line := range lines(out) {
parts := strings.Split(line, "\t")
if len(parts) < 3 {
continue
}
ns := strings.TrimSpace(parts[0])
if _, skip := exclude[ns]; skip {
continue
}
replicas, convErr := strconv.Atoi(strings.TrimSpace(parts[2]))
if convErr != nil || replicas <= 0 {
continue
}
entries = append(entries, workloadScaleEntry{
Namespace: ns,
Kind: kind,
Name: strings.TrimSpace(parts[1]),
Replicas: replicas,
})
}
return entries, nil
}
deployments, err := collect("deployment")
if err != nil {
return nil, fmt.Errorf("collect deployments: %w", err)
}
statefulsets, err := collect("statefulset")
if err != nil {
return nil, fmt.Errorf("collect statefulsets: %w", err)
}
targets := append(deployments, statefulsets...)
sort.Slice(targets, func(i, j int) bool {
a, b := targets[i], targets[j]
if a.Namespace != b.Namespace {
return a.Namespace < b.Namespace
}
if a.Kind != b.Kind {
return a.Kind < b.Kind
}
return a.Name < b.Name
})
return targets, nil
}
// scaleWorkloads runs one orchestration or CLI step.
// Signature: (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error {
if len(entries) == 0 {
return nil
}
if parallelism <= 0 {
parallelism = 1
}
if parallelism > len(entries) {
parallelism = len(entries)
}
sem := make(chan struct{}, parallelism)
var wg sync.WaitGroup
errCh := make(chan error, len(entries))
for _, entry := range entries {
entry := entry
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
replicas := entry.Replicas
if forceReplicas >= 0 {
replicas = forceReplicas
}
if _, err := o.kubectl(
ctx,
20*time.Second,
"-n",
entry.Namespace,
"scale",
entry.Kind,
entry.Name,
fmt.Sprintf("--replicas=%d", replicas),
); err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: skip missing workload while scaling %s/%s/%s", entry.Namespace, entry.Kind, entry.Name)
return
}
errCh <- fmt.Errorf("scale %s/%s/%s -> %d: %w", entry.Namespace, entry.Kind, entry.Name, replicas, err)
}
}()
}
wg.Wait()
close(errCh)
errorCount := len(errCh)
if errorCount == 0 {
return nil
}
errs := make([]string, 0, len(errCh))
for err := range errCh {
errs = append(errs, err.Error())
if len(errs) >= 5 {
break
}
}
return fmt.Errorf("scaling had %d errors (first: %s)", errorCount, strings.Join(errs, " | "))
}
// scaledWorkloadSnapshotPath runs one orchestration or CLI step.
// Signature: (o *Orchestrator) scaledWorkloadSnapshotPath() string.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) scaledWorkloadSnapshotPath() string {
return filepath.Join(o.cfg.State.Dir, "scaled-workloads.json")
}
// writeScaledWorkloadSnapshot runs one orchestration or CLI step.
// Signature: (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error {
if o.runner.DryRun {
return nil
}
if err := os.MkdirAll(o.cfg.State.Dir, 0o755); err != nil {
return fmt.Errorf("ensure state dir: %w", err)
}
payload := workloadScaleSnapshot{
GeneratedAt: time.Now().UTC(),
Entries: entries,
}
b, err := json.MarshalIndent(payload, "", " ")
if err != nil {
return fmt.Errorf("marshal scaled workload snapshot: %w", err)
}
if err := os.WriteFile(o.scaledWorkloadSnapshotPath(), b, 0o644); err != nil {
return fmt.Errorf("write scaled workload snapshot: %w", err)
}
return nil
}
// readScaledWorkloadSnapshot runs one orchestration or CLI step.
// Signature: (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error) {
if o.runner.DryRun {
return nil, nil
}
b, err := os.ReadFile(o.scaledWorkloadSnapshotPath())
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, fmt.Errorf("read scaled workload snapshot: %w", err)
}
var snapshot workloadScaleSnapshot
if err := json.Unmarshal(b, &snapshot); err != nil {
return nil, fmt.Errorf("decode scaled workload snapshot: %w", err)
}
return &snapshot, nil
}