package cluster import ( "context" "encoding/json" "fmt" "os" "path/filepath" "sort" "strconv" "strings" "sync" "time" ) // effectiveWorkers runs one orchestration or CLI step. // Signature: (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) { if len(o.cfg.Workers) > 0 { return append([]string{}, o.cfg.Workers...), nil } workers, err := o.discoverWorkers(ctx) if err == nil { return workers, nil } fallback := o.fallbackWorkersFromInventory() if len(fallback) == 0 { return nil, err } o.log.Printf("warning: worker discovery failed via kubernetes API (%v); falling back to inventory workers=%s", err, strings.Join(fallback, ",")) return fallback, nil } // discoverWorkers runs one orchestration or CLI step. // Signature: (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) discoverWorkers(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 15*time.Second, "get", "nodes", "-o", "custom-columns=NAME:.metadata.name,CP:.metadata.labels.node-role\\.kubernetes\\.io/control-plane,MASTER:.metadata.labels.node-role\\.kubernetes\\.io/master", "--no-headers", ) if err != nil { return nil, fmt.Errorf("discover workers: %w", err) } var workers []string for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 3 { continue } if fields[1] == "" && fields[2] == "" { workers = append(workers, fields[0]) } } if len(workers) == 0 { return nil, fmt.Errorf("no workers discovered") } return workers, nil } // fallbackWorkersFromInventory runs one orchestration or CLI step. // Signature: (o *Orchestrator) fallbackWorkersFromInventory() []string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) fallbackWorkersFromInventory() []string { cp := make(map[string]struct{}, len(o.cfg.ControlPlanes)) for _, node := range o.cfg.ControlPlanes { cp[strings.TrimSpace(node)] = struct{}{} } candidates := make(map[string]struct{}) add := func(node string) { name := strings.TrimSpace(node) if name == "" { return } if _, isCP := cp[name]; isCP { return } candidates[name] = struct{}{} } for _, node := range o.cfg.SSHManagedNodes { add(node) } if len(candidates) == 0 { for node := range o.cfg.SSHNodeHosts { add(node) } } workers := make([]string, 0, len(candidates)) for node := range candidates { workers = append(workers, node) } sort.Strings(workers) return workers } // patchFluxSuspendAll runs one orchestration or CLI step. // Signature: (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) patchFluxSuspendAll(ctx context.Context, suspend bool) error { patch := fmt.Sprintf(`{"spec":{"suspend":%t}}`, suspend) ksOut, err := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-o", "jsonpath={range .items[*]}{.metadata.name}{'\\n'}{end}", ) if err != nil { return err } for _, ks := range lines(ksOut) { _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", "flux-system", "patch", "kustomization", ks, "--type=merge", "-p", patch) if patchErr != nil { o.log.Printf("warning: patch kustomization %s failed: %v", ks, patchErr) } } hrOut, err := o.kubectl(ctx, 25*time.Second, "get", "helmreleases.helm.toolkit.fluxcd.io", "-A", "-o", "jsonpath={range .items[*]}{.metadata.namespace}{'/'}{.metadata.name}{'\\n'}{end}", ) if err != nil { return err } for _, hr := range lines(hrOut) { parts := strings.SplitN(hr, "/", 2) if len(parts) != 2 { continue } _, patchErr := o.kubectl(ctx, 20*time.Second, "-n", parts[0], "patch", "helmrelease", parts[1], "--type=merge", "-p", patch) if patchErr != nil { o.log.Printf("warning: patch helmrelease %s failed: %v", hr, patchErr) } } return nil } // scaleDownApps runs one orchestration or CLI step. // Signature: (o *Orchestrator) scaleDownApps(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) scaleDownApps(ctx context.Context) error { targets, err := o.listScalableWorkloads(ctx) if err != nil { return err } if err := o.writeScaledWorkloadSnapshot(targets); err != nil { return err } if len(targets) == 0 { o.log.Printf("scale down apps: no workloads above 0 replicas") return nil } parallelism := o.cfg.Shutdown.ScaleParallelism if parallelism <= 0 { parallelism = 8 } o.log.Printf("scale down apps targets=%d parallelism=%d", len(targets), parallelism) return o.scaleWorkloads(ctx, targets, 0, parallelism) } // restoreScaledApps runs one orchestration or CLI step. // Signature: (o *Orchestrator) restoreScaledApps(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) restoreScaledApps(ctx context.Context) error { snapshot, err := o.readScaledWorkloadSnapshot() if err != nil { return err } if snapshot == nil || len(snapshot.Entries) == 0 { o.log.Printf("restore scaled workloads: no snapshot entries to restore") return nil } parallelism := o.cfg.Shutdown.ScaleParallelism if parallelism <= 0 { parallelism = 8 } o.log.Printf("restore scaled workloads entries=%d parallelism=%d snapshot_at=%s", len(snapshot.Entries), parallelism, snapshot.GeneratedAt.Format(time.RFC3339)) if err := o.scaleWorkloads(ctx, snapshot.Entries, -1, parallelism); err != nil { return err } if o.runner.DryRun { return nil } if err := os.Remove(o.scaledWorkloadSnapshotPath()); err != nil && !os.IsNotExist(err) { return fmt.Errorf("remove scaled workload snapshot: %w", err) } return nil } // listScalableWorkloads runs one orchestration or CLI step. // Signature: (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) listScalableWorkloads(ctx context.Context) ([]workloadScaleEntry, error) { exclude := map[string]struct{}{} for _, ns := range o.cfg.ExcludedNamespaces { exclude[strings.TrimSpace(ns)] = struct{}{} } collect := func(kind string) ([]workloadScaleEntry, error) { out, err := o.kubectl( ctx, 25*time.Second, "get", kind, "-A", "-o", "jsonpath={range .items[*]}{.metadata.namespace}{'\\t'}{.metadata.name}{'\\t'}{.spec.replicas}{'\\n'}{end}", ) if err != nil { return nil, err } var entries []workloadScaleEntry for _, line := range lines(out) { parts := strings.Split(line, "\t") if len(parts) < 3 { continue } ns := strings.TrimSpace(parts[0]) if _, skip := exclude[ns]; skip { continue } replicas, convErr := strconv.Atoi(strings.TrimSpace(parts[2])) if convErr != nil || replicas <= 0 { continue } entries = append(entries, workloadScaleEntry{ Namespace: ns, Kind: kind, Name: strings.TrimSpace(parts[1]), Replicas: replicas, }) } return entries, nil } deployments, err := collect("deployment") if err != nil { return nil, fmt.Errorf("collect deployments: %w", err) } statefulsets, err := collect("statefulset") if err != nil { return nil, fmt.Errorf("collect statefulsets: %w", err) } targets := append(deployments, statefulsets...) sort.Slice(targets, func(i, j int) bool { a, b := targets[i], targets[j] if a.Namespace != b.Namespace { return a.Namespace < b.Namespace } if a.Kind != b.Kind { return a.Kind < b.Kind } return a.Name < b.Name }) return targets, nil } // scaleWorkloads runs one orchestration or CLI step. // Signature: (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) scaleWorkloads(ctx context.Context, entries []workloadScaleEntry, forceReplicas int, parallelism int) error { if len(entries) == 0 { return nil } if parallelism <= 0 { parallelism = 1 } if parallelism > len(entries) { parallelism = len(entries) } sem := make(chan struct{}, parallelism) var wg sync.WaitGroup errCh := make(chan error, len(entries)) for _, entry := range entries { entry := entry wg.Add(1) go func() { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() replicas := entry.Replicas if forceReplicas >= 0 { replicas = forceReplicas } if _, err := o.kubectl( ctx, 20*time.Second, "-n", entry.Namespace, "scale", entry.Kind, entry.Name, fmt.Sprintf("--replicas=%d", replicas), ); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: skip missing workload while scaling %s/%s/%s", entry.Namespace, entry.Kind, entry.Name) return } errCh <- fmt.Errorf("scale %s/%s/%s -> %d: %w", entry.Namespace, entry.Kind, entry.Name, replicas, err) } }() } wg.Wait() close(errCh) errorCount := len(errCh) if errorCount == 0 { return nil } errs := make([]string, 0, len(errCh)) for err := range errCh { errs = append(errs, err.Error()) if len(errs) >= 5 { break } } return fmt.Errorf("scaling had %d errors (first: %s)", errorCount, strings.Join(errs, " | ")) } // scaledWorkloadSnapshotPath runs one orchestration or CLI step. // Signature: (o *Orchestrator) scaledWorkloadSnapshotPath() string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) scaledWorkloadSnapshotPath() string { return filepath.Join(o.cfg.State.Dir, "scaled-workloads.json") } // writeScaledWorkloadSnapshot runs one orchestration or CLI step. // Signature: (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) writeScaledWorkloadSnapshot(entries []workloadScaleEntry) error { if o.runner.DryRun { return nil } if err := os.MkdirAll(o.cfg.State.Dir, 0o755); err != nil { return fmt.Errorf("ensure state dir: %w", err) } payload := workloadScaleSnapshot{ GeneratedAt: time.Now().UTC(), Entries: entries, } b, err := json.MarshalIndent(payload, "", " ") if err != nil { return fmt.Errorf("marshal scaled workload snapshot: %w", err) } if err := os.WriteFile(o.scaledWorkloadSnapshotPath(), b, 0o644); err != nil { return fmt.Errorf("write scaled workload snapshot: %w", err) } return nil } // readScaledWorkloadSnapshot runs one orchestration or CLI step. // Signature: (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) readScaledWorkloadSnapshot() (*workloadScaleSnapshot, error) { if o.runner.DryRun { return nil, nil } b, err := os.ReadFile(o.scaledWorkloadSnapshotPath()) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, fmt.Errorf("read scaled workload snapshot: %w", err) } var snapshot workloadScaleSnapshot if err := json.Unmarshal(b, &snapshot); err != nil { return nil, fmt.Errorf("decode scaled workload snapshot: %w", err) } return &snapshot, nil }