package cluster import ( "context" "encoding/base64" "encoding/json" "fmt" "os" "path/filepath" "strconv" "strings" "time" ) // bestEffort runs one orchestration or CLI step. // Signature: (o *Orchestrator) bestEffort(name string, fn func() error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) bestEffort(name string, fn func() error) { if err := fn(); err != nil { o.log.Printf("warning: %s: %v", name, err) } } // missingCriticalStartupWorkloads runs one orchestration or CLI step. // Signature: (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error) { missing := []string{} for _, w := range criticalStartupWorkloads { ready, err := o.workloadReady(ctx, w) if err != nil { if isNotFoundErr(err) { missing = append(missing, fmt.Sprintf("%s/%s/%s(not found)", w.Namespace, w.Kind, w.Name)) continue } return nil, fmt.Errorf("check %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if !ready { missing = append(missing, fmt.Sprintf("%s/%s/%s", w.Namespace, w.Kind, w.Name)) } } return missing, nil } // ensureCriticalStartupWorkloads runs one orchestration or CLI step. // Signature: (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error { for _, w := range criticalStartupWorkloads { if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing, skipping scale: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return fmt.Errorf("scale %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if err := o.cleanupStaleCriticalWorkloadPods(ctx, w); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing during stale-pod cleanup: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return fmt.Errorf("cleanup stale pods %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } if err := o.waitWorkloadReady(ctx, w); err != nil { if isNotFoundErr(err) { o.log.Printf("warning: startup workload missing during readiness wait: %s/%s/%s", w.Namespace, w.Kind, w.Name) continue } return err } } return nil } // cleanupStaleCriticalWorkloadPods runs one orchestration or CLI step. // Signature: (o *Orchestrator) cleanupStaleCriticalWorkloadPods(ctx context.Context, w startupWorkload) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) cleanupStaleCriticalWorkloadPods(ctx context.Context, w startupWorkload) error { if o.runner.DryRun { return nil } if w.Kind != "statefulset" { return nil } out, err := o.kubectl( ctx, 20*time.Second, "-n", w.Namespace, "get", "pods", "-o", "custom-columns=NAME:.metadata.name,PHASE:.status.phase,OWNER_KIND:.metadata.ownerReferences[0].kind,OWNER_NAME:.metadata.ownerReferences[0].name", "--no-headers", ) if err != nil { return err } prefix := w.Name + "-" for _, line := range lines(out) { fields := strings.Fields(line) if len(fields) < 4 { continue } podName := fields[0] phase := strings.ToLower(strings.TrimSpace(fields[1])) ownerKind := strings.TrimSpace(fields[2]) ownerName := strings.TrimSpace(fields[3]) if !strings.EqualFold(ownerKind, "StatefulSet") || ownerName != w.Name { continue } if !strings.HasPrefix(podName, prefix) { continue } if phase != "unknown" && phase != "failed" { continue } o.log.Printf("warning: deleting stale critical pod %s/%s phase=%s before readiness wait", w.Namespace, podName, phase) if _, delErr := o.kubectl( ctx, 40*time.Second, "-n", w.Namespace, "delete", "pod", podName, "--grace-period=0", "--force", "--wait=false", ); delErr != nil { return fmt.Errorf("delete stale pod %s/%s: %w", w.Namespace, podName, delErr) } } return nil } // ensureWorkloadReplicas runs one orchestration or CLI step. // Signature: (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error { _, err := o.kubectl( ctx, 45*time.Second, "-n", w.Namespace, "scale", w.Kind, w.Name, fmt.Sprintf("--replicas=%d", replicas), ) return err } // waitWorkloadReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error { if w.Namespace == "vault" && w.Kind == "statefulset" && w.Name == "vault" { return o.waitVaultReady(ctx, w) } timeout := "240s" if w.Kind == "statefulset" { timeout = "360s" } _, err := o.kubectl( ctx, 7*time.Minute, "-n", w.Namespace, "rollout", "status", fmt.Sprintf("%s/%s", w.Kind, w.Name), "--timeout="+timeout, ) if err != nil { return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err) } return nil } // waitVaultReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error { if o.runner.DryRun { return nil } deadline := time.Now().Add(7 * time.Minute) var lastErr error for time.Now().Before(deadline) { ready, err := o.workloadReady(ctx, w) if err == nil && ready { return nil } if err != nil { lastErr = err } if err := o.ensureVaultUnsealed(ctx); err != nil { lastErr = err o.log.Printf("warning: vault readiness still blocked: %v", err) } ready, err = o.workloadReady(ctx, w) if err == nil && ready { return nil } if err != nil { lastErr = err } select { case <-ctx.Done(): return ctx.Err() case <-time.After(5 * time.Second): } } if lastErr != nil { return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr) } return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name) } // ensureVaultUnsealedWhenRunnable runs one orchestration or CLI step. // Signature: (o *Orchestrator) ensureVaultUnsealedWhenRunnable(ctx context.Context) (bool, string, error). // Why: lets startup defer vault unseal until the pod is actually runnable, while // keeping the direct unseal helper strict for explicit recovery paths and tests. func (o *Orchestrator) ensureVaultUnsealedWhenRunnable(ctx context.Context) (bool, string, error) { if o.runner.DryRun { return false, "", nil } phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}") if err != nil { if isNotFoundErr(err) { return true, "vault-0 pod is not present yet; deferring unseal until critical workload recovery", nil } return false, "", fmt.Errorf("vault pod phase check failed: %w", err) } trimmedPhase := strings.TrimSpace(phase) if trimmedPhase != "Running" { return true, fmt.Sprintf("vault-0 pod phase is %q; deferring unseal until critical workload recovery", trimmedPhase), nil } return false, "", o.ensureVaultUnsealed(ctx) } // ensureVaultUnsealed runs one orchestration or CLI step. // Signature: (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error { if o.runner.DryRun { return nil } phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}") if err != nil { return fmt.Errorf("vault pod phase check failed: %w", err) } if strings.TrimSpace(phase) != "Running" { return fmt.Errorf("vault-0 pod phase is %q", strings.TrimSpace(phase)) } sealed, err := o.vaultSealed(ctx) if err != nil { return err } if !sealed { return nil } unsealKey, err := o.vaultUnsealKey(ctx) if err != nil { return err } for attempt := 1; attempt <= 5; attempt++ { o.log.Printf("vault is sealed; attempting auto-unseal (%d/5)", attempt) if _, err := o.runSensitive( ctx, 30*time.Second, "kubectl", "-n", "vault", "exec", "vault-0", "--", "vault", "operator", "unseal", unsealKey, ); err != nil { return fmt.Errorf("vault unseal attempt %d failed: %w", attempt, err) } sealed, err = o.vaultSealed(ctx) if err != nil { return err } if !sealed { o.log.Printf("vault auto-unseal succeeded") return nil } time.Sleep(2 * time.Second) } return fmt.Errorf("vault remained sealed after 5 auto-unseal attempts") } // vaultSealed runs one orchestration or CLI step. // Signature: (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error) { out, err := o.kubectl( ctx, 25*time.Second, "-n", "vault", "exec", "vault-0", "--", "sh", "-lc", "VAULT_ADDR=http://127.0.0.1:8200 vault status -format=json 2>/dev/null || true", ) if err != nil { return false, fmt.Errorf("vault status check failed: %w", err) } sealed, err := parseVaultSealed(out) if err != nil { return false, fmt.Errorf("parse vault status: %w", err) } return sealed, nil } // parseVaultSealed runs one orchestration or CLI step. // Signature: parseVaultSealed(raw string) (bool, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func parseVaultSealed(raw string) (bool, error) { trimmed := strings.TrimSpace(raw) if trimmed == "" { return false, fmt.Errorf("empty vault status output") } start := strings.Index(trimmed, "{") end := strings.LastIndex(trimmed, "}") if start < 0 || end < 0 || end < start { return false, fmt.Errorf("vault status payload missing JSON object") } payload := trimmed[start : end+1] type vaultStatus struct { Sealed bool `json:"sealed"` } var st vaultStatus if err := json.Unmarshal([]byte(payload), &st); err != nil { return false, err } return st.Sealed, nil } // vaultUnsealKey runs one orchestration or CLI step. // Signature: (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error) { out, err := o.kubectl( ctx, 15*time.Second, "-n", "vault", "get", "secret", "vault-init", "-o", "jsonpath={.data.unseal_key_b64}", ) if err == nil { decoded, decodeErr := base64.StdEncoding.DecodeString(strings.TrimSpace(out)) if decodeErr == nil { key := strings.TrimSpace(string(decoded)) if key != "" { o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(key) }) return key, nil } err = fmt.Errorf("vault-init unseal key is empty") } else { err = fmt.Errorf("decode vault-init unseal_key_b64: %w", decodeErr) } } else { err = fmt.Errorf("read vault-init secret: %w", err) } fallbackKey, fileErr := o.readVaultUnsealKeyFile() if fileErr == nil { o.log.Printf("warning: using cached vault unseal key from %s", o.cfg.Startup.VaultUnsealKeyFile) return fallbackKey, nil } breakglassKey, breakglassErr := o.readVaultUnsealKeyBreakglass(ctx) if breakglassErr == nil { o.log.Printf("warning: using break-glass vault unseal key command fallback") o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(breakglassKey) }) return breakglassKey, nil } return "", fmt.Errorf("%v; fallback %v; break-glass %v", err, fileErr, breakglassErr) } // readVaultUnsealKeyBreakglass runs one orchestration or CLI step. // Signature: (o *Orchestrator) readVaultUnsealKeyBreakglass(ctx context.Context) (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) readVaultUnsealKeyBreakglass(ctx context.Context) (string, error) { cmd := strings.TrimSpace(o.cfg.Startup.VaultUnsealBreakglassCommand) if cmd == "" { return "", fmt.Errorf("break-glass command not configured") } timeout := time.Duration(o.cfg.Startup.VaultUnsealBreakglassTimeout) * time.Second if timeout <= 0 { timeout = 15 * time.Second } out, err := o.runSensitive(ctx, timeout, "sh", "-lc", cmd) if err != nil { return "", fmt.Errorf("run break-glass command: %w", err) } key := strings.TrimSpace(out) if key == "" { return "", fmt.Errorf("break-glass command returned empty output") } return key, nil } // writeVaultUnsealKeyFile runs one orchestration or CLI step. // Signature: (o *Orchestrator) writeVaultUnsealKeyFile(key string) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) writeVaultUnsealKeyFile(key string) error { path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile) if path == "" { return fmt.Errorf("vault unseal key file path is empty") } if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil { return fmt.Errorf("ensure vault unseal key dir: %w", err) } if err := os.WriteFile(path, []byte(strings.TrimSpace(key)+"\n"), 0o600); err != nil { return fmt.Errorf("write vault unseal key file: %w", err) } return nil } // readVaultUnsealKeyFile runs one orchestration or CLI step. // Signature: (o *Orchestrator) readVaultUnsealKeyFile() (string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) readVaultUnsealKeyFile() (string, error) { path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile) if path == "" { return "", fmt.Errorf("vault unseal key file path is empty") } b, err := os.ReadFile(path) if err != nil { return "", fmt.Errorf("read vault unseal key file %s: %w", path, err) } key := strings.TrimSpace(string(b)) if key == "" { return "", fmt.Errorf("vault unseal key file %s is empty", path) } return key, nil } // workloadReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) { out, err := o.kubectl( ctx, 20*time.Second, "-n", w.Namespace, "get", w.Kind, w.Name, "-o", "jsonpath={.status.readyReplicas}", ) if err != nil { return false, err } raw := strings.TrimSpace(out) if raw == "" || raw == "" { return false, nil } n, err := strconv.Atoi(raw) if err != nil { return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err) } return n >= 1, nil } // isNotFoundErr runs one orchestration or CLI step. // Signature: isNotFoundErr(err error) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func isNotFoundErr(err error) bool { if err == nil { return false } msg := strings.ToLower(err.Error()) return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)") }