ananke/internal/cluster/orchestrator_critical_vault.go

497 lines
16 KiB
Go

package cluster
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
// bestEffort runs one orchestration or CLI step.
// Signature: (o *Orchestrator) bestEffort(name string, fn func() error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) bestEffort(name string, fn func() error) {
if err := fn(); err != nil {
o.log.Printf("warning: %s: %v", name, err)
}
}
// missingCriticalStartupWorkloads runs one orchestration or CLI step.
// Signature: (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error) {
missing := []string{}
for _, w := range criticalStartupWorkloads {
ready, err := o.workloadReady(ctx, w)
if err != nil {
if isNotFoundErr(err) {
missing = append(missing, fmt.Sprintf("%s/%s/%s(not found)", w.Namespace, w.Kind, w.Name))
continue
}
return nil, fmt.Errorf("check %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
}
if !ready {
missing = append(missing, fmt.Sprintf("%s/%s/%s", w.Namespace, w.Kind, w.Name))
}
}
return missing, nil
}
// ensureCriticalStartupWorkloads runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error {
for _, w := range criticalStartupWorkloads {
if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: startup workload missing, skipping scale: %s/%s/%s", w.Namespace, w.Kind, w.Name)
continue
}
return fmt.Errorf("scale %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
}
if err := o.cleanupStaleCriticalWorkloadPods(ctx, w); err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: startup workload missing during stale-pod cleanup: %s/%s/%s", w.Namespace, w.Kind, w.Name)
continue
}
return fmt.Errorf("cleanup stale pods %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
}
if err := o.waitWorkloadReady(ctx, w); err != nil {
if isNotFoundErr(err) {
o.log.Printf("warning: startup workload missing during readiness wait: %s/%s/%s", w.Namespace, w.Kind, w.Name)
continue
}
return err
}
}
return nil
}
// cleanupStaleCriticalWorkloadPods runs one orchestration or CLI step.
// Signature: (o *Orchestrator) cleanupStaleCriticalWorkloadPods(ctx context.Context, w startupWorkload) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) cleanupStaleCriticalWorkloadPods(ctx context.Context, w startupWorkload) error {
if o.runner.DryRun {
return nil
}
if w.Kind != "statefulset" {
return nil
}
out, err := o.kubectl(
ctx,
20*time.Second,
"-n",
w.Namespace,
"get",
"pods",
"-o",
"custom-columns=NAME:.metadata.name,PHASE:.status.phase,OWNER_KIND:.metadata.ownerReferences[0].kind,OWNER_NAME:.metadata.ownerReferences[0].name",
"--no-headers",
)
if err != nil {
return err
}
prefix := w.Name + "-"
for _, line := range lines(out) {
fields := strings.Fields(line)
if len(fields) < 4 {
continue
}
podName := fields[0]
phase := strings.ToLower(strings.TrimSpace(fields[1]))
ownerKind := strings.TrimSpace(fields[2])
ownerName := strings.TrimSpace(fields[3])
if !strings.EqualFold(ownerKind, "StatefulSet") || ownerName != w.Name {
continue
}
if !strings.HasPrefix(podName, prefix) {
continue
}
if phase != "unknown" && phase != "failed" {
continue
}
o.log.Printf("warning: deleting stale critical pod %s/%s phase=%s before readiness wait", w.Namespace, podName, phase)
if _, delErr := o.kubectl(
ctx,
40*time.Second,
"-n",
w.Namespace,
"delete",
"pod",
podName,
"--grace-period=0",
"--force",
"--wait=false",
); delErr != nil {
return fmt.Errorf("delete stale pod %s/%s: %w", w.Namespace, podName, delErr)
}
}
return nil
}
// ensureWorkloadReplicas runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error {
_, err := o.kubectl(
ctx,
45*time.Second,
"-n",
w.Namespace,
"scale",
w.Kind,
w.Name,
fmt.Sprintf("--replicas=%d", replicas),
)
return err
}
// waitWorkloadReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error {
if w.Namespace == "vault" && w.Kind == "statefulset" && w.Name == "vault" {
return o.waitVaultReady(ctx, w)
}
timeout := "240s"
if w.Kind == "statefulset" {
timeout = "360s"
}
_, err := o.kubectl(
ctx,
7*time.Minute,
"-n",
w.Namespace,
"rollout",
"status",
fmt.Sprintf("%s/%s", w.Kind, w.Name),
"--timeout="+timeout,
)
if err != nil {
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
}
return nil
}
// waitVaultReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error {
if o.runner.DryRun {
return nil
}
deadline := time.Now().Add(7 * time.Minute)
var lastErr error
for time.Now().Before(deadline) {
ready, err := o.workloadReady(ctx, w)
if err == nil && ready {
return nil
}
if err != nil {
lastErr = err
}
if err := o.ensureVaultUnsealed(ctx); err != nil {
lastErr = err
o.log.Printf("warning: vault readiness still blocked: %v", err)
}
ready, err = o.workloadReady(ctx, w)
if err == nil && ready {
return nil
}
if err != nil {
lastErr = err
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
}
}
if lastErr != nil {
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr)
}
return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name)
}
// ensureVaultUnsealedWhenRunnable runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureVaultUnsealedWhenRunnable(ctx context.Context) (bool, string, error).
// Why: lets startup defer vault unseal until the pod is actually runnable, while
// keeping the direct unseal helper strict for explicit recovery paths and tests.
func (o *Orchestrator) ensureVaultUnsealedWhenRunnable(ctx context.Context) (bool, string, error) {
if o.runner.DryRun {
return false, "", nil
}
phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}")
if err != nil {
if isNotFoundErr(err) {
return true, "vault-0 pod is not present yet; deferring unseal until critical workload recovery", nil
}
return false, "", fmt.Errorf("vault pod phase check failed: %w", err)
}
trimmedPhase := strings.TrimSpace(phase)
if trimmedPhase != "Running" {
return true, fmt.Sprintf("vault-0 pod phase is %q; deferring unseal until critical workload recovery", trimmedPhase), nil
}
return false, "", o.ensureVaultUnsealed(ctx)
}
// ensureVaultUnsealed runs one orchestration or CLI step.
// Signature: (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error {
if o.runner.DryRun {
return nil
}
phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}")
if err != nil {
return fmt.Errorf("vault pod phase check failed: %w", err)
}
if strings.TrimSpace(phase) != "Running" {
return fmt.Errorf("vault-0 pod phase is %q", strings.TrimSpace(phase))
}
sealed, err := o.vaultSealed(ctx)
if err != nil {
return err
}
if !sealed {
return nil
}
unsealKey, err := o.vaultUnsealKey(ctx)
if err != nil {
return err
}
for attempt := 1; attempt <= 5; attempt++ {
o.log.Printf("vault is sealed; attempting auto-unseal (%d/5)", attempt)
if _, err := o.runSensitive(
ctx,
30*time.Second,
"kubectl",
"-n", "vault",
"exec", "vault-0", "--",
"vault", "operator", "unseal", unsealKey,
); err != nil {
return fmt.Errorf("vault unseal attempt %d failed: %w", attempt, err)
}
sealed, err = o.vaultSealed(ctx)
if err != nil {
return err
}
if !sealed {
o.log.Printf("vault auto-unseal succeeded")
return nil
}
time.Sleep(2 * time.Second)
}
return fmt.Errorf("vault remained sealed after 5 auto-unseal attempts")
}
// vaultSealed runs one orchestration or CLI step.
// Signature: (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error) {
out, err := o.kubectl(
ctx,
25*time.Second,
"-n", "vault",
"exec", "vault-0", "--",
"sh", "-lc",
"VAULT_ADDR=http://127.0.0.1:8200 vault status -format=json 2>/dev/null || true",
)
if err != nil {
return false, fmt.Errorf("vault status check failed: %w", err)
}
sealed, err := parseVaultSealed(out)
if err != nil {
return false, fmt.Errorf("parse vault status: %w", err)
}
return sealed, nil
}
// parseVaultSealed runs one orchestration or CLI step.
// Signature: parseVaultSealed(raw string) (bool, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func parseVaultSealed(raw string) (bool, error) {
trimmed := strings.TrimSpace(raw)
if trimmed == "" {
return false, fmt.Errorf("empty vault status output")
}
start := strings.Index(trimmed, "{")
end := strings.LastIndex(trimmed, "}")
if start < 0 || end < 0 || end < start {
return false, fmt.Errorf("vault status payload missing JSON object")
}
payload := trimmed[start : end+1]
type vaultStatus struct {
Sealed bool `json:"sealed"`
}
var st vaultStatus
if err := json.Unmarshal([]byte(payload), &st); err != nil {
return false, err
}
return st.Sealed, nil
}
// vaultUnsealKey runs one orchestration or CLI step.
// Signature: (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error) {
out, err := o.kubectl(
ctx,
15*time.Second,
"-n", "vault",
"get", "secret", "vault-init",
"-o", "jsonpath={.data.unseal_key_b64}",
)
if err == nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(strings.TrimSpace(out))
if decodeErr == nil {
key := strings.TrimSpace(string(decoded))
if key != "" {
o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(key) })
return key, nil
}
err = fmt.Errorf("vault-init unseal key is empty")
} else {
err = fmt.Errorf("decode vault-init unseal_key_b64: %w", decodeErr)
}
} else {
err = fmt.Errorf("read vault-init secret: %w", err)
}
fallbackKey, fileErr := o.readVaultUnsealKeyFile()
if fileErr == nil {
o.log.Printf("warning: using cached vault unseal key from %s", o.cfg.Startup.VaultUnsealKeyFile)
return fallbackKey, nil
}
breakglassKey, breakglassErr := o.readVaultUnsealKeyBreakglass(ctx)
if breakglassErr == nil {
o.log.Printf("warning: using break-glass vault unseal key command fallback")
o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(breakglassKey) })
return breakglassKey, nil
}
return "", fmt.Errorf("%v; fallback %v; break-glass %v", err, fileErr, breakglassErr)
}
// readVaultUnsealKeyBreakglass runs one orchestration or CLI step.
// Signature: (o *Orchestrator) readVaultUnsealKeyBreakglass(ctx context.Context) (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) readVaultUnsealKeyBreakglass(ctx context.Context) (string, error) {
cmd := strings.TrimSpace(o.cfg.Startup.VaultUnsealBreakglassCommand)
if cmd == "" {
return "", fmt.Errorf("break-glass command not configured")
}
timeout := time.Duration(o.cfg.Startup.VaultUnsealBreakglassTimeout) * time.Second
if timeout <= 0 {
timeout = 15 * time.Second
}
out, err := o.runSensitive(ctx, timeout, "sh", "-lc", cmd)
if err != nil {
return "", fmt.Errorf("run break-glass command: %w", err)
}
key := strings.TrimSpace(out)
if key == "" {
return "", fmt.Errorf("break-glass command returned empty output")
}
return key, nil
}
// writeVaultUnsealKeyFile runs one orchestration or CLI step.
// Signature: (o *Orchestrator) writeVaultUnsealKeyFile(key string) error.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) writeVaultUnsealKeyFile(key string) error {
path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile)
if path == "" {
return fmt.Errorf("vault unseal key file path is empty")
}
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
return fmt.Errorf("ensure vault unseal key dir: %w", err)
}
if err := os.WriteFile(path, []byte(strings.TrimSpace(key)+"\n"), 0o600); err != nil {
return fmt.Errorf("write vault unseal key file: %w", err)
}
return nil
}
// readVaultUnsealKeyFile runs one orchestration or CLI step.
// Signature: (o *Orchestrator) readVaultUnsealKeyFile() (string, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) readVaultUnsealKeyFile() (string, error) {
path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile)
if path == "" {
return "", fmt.Errorf("vault unseal key file path is empty")
}
b, err := os.ReadFile(path)
if err != nil {
return "", fmt.Errorf("read vault unseal key file %s: %w", path, err)
}
key := strings.TrimSpace(string(b))
if key == "" {
return "", fmt.Errorf("vault unseal key file %s is empty", path)
}
return key, nil
}
// workloadReady runs one orchestration or CLI step.
// Signature: (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error).
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) {
out, err := o.kubectl(
ctx,
20*time.Second,
"-n",
w.Namespace,
"get",
w.Kind,
w.Name,
"-o",
"jsonpath={.status.readyReplicas}",
)
if err != nil {
return false, err
}
raw := strings.TrimSpace(out)
if raw == "" || raw == "<no value>" {
return false, nil
}
n, err := strconv.Atoi(raw)
if err != nil {
return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err)
}
return n >= 1, nil
}
// isNotFoundErr runs one orchestration or CLI step.
// Signature: isNotFoundErr(err error) bool.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func isNotFoundErr(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)")
}