472 lines
15 KiB
Go
472 lines
15 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// bestEffort runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) bestEffort(name string, fn func() error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) bestEffort(name string, fn func() error) {
|
|
if err := fn(); err != nil {
|
|
o.log.Printf("warning: %s: %v", name, err)
|
|
}
|
|
}
|
|
|
|
// missingCriticalStartupWorkloads runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) missingCriticalStartupWorkloads(ctx context.Context) ([]string, error) {
|
|
missing := []string{}
|
|
for _, w := range criticalStartupWorkloads {
|
|
ready, err := o.workloadReady(ctx, w)
|
|
if err != nil {
|
|
if isNotFoundErr(err) {
|
|
missing = append(missing, fmt.Sprintf("%s/%s/%s(not found)", w.Namespace, w.Kind, w.Name))
|
|
continue
|
|
}
|
|
return nil, fmt.Errorf("check %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
|
|
}
|
|
if !ready {
|
|
missing = append(missing, fmt.Sprintf("%s/%s/%s", w.Namespace, w.Kind, w.Name))
|
|
}
|
|
}
|
|
return missing, nil
|
|
}
|
|
|
|
// ensureCriticalStartupWorkloads runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) ensureCriticalStartupWorkloads(ctx context.Context) error {
|
|
for _, w := range criticalStartupWorkloads {
|
|
if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil {
|
|
if isNotFoundErr(err) {
|
|
o.log.Printf("warning: startup workload missing, skipping scale: %s/%s/%s", w.Namespace, w.Kind, w.Name)
|
|
continue
|
|
}
|
|
return fmt.Errorf("scale %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
|
|
}
|
|
if err := o.cleanupStaleCriticalWorkloadPods(ctx, w); err != nil {
|
|
if isNotFoundErr(err) {
|
|
o.log.Printf("warning: startup workload missing during stale-pod cleanup: %s/%s/%s", w.Namespace, w.Kind, w.Name)
|
|
continue
|
|
}
|
|
return fmt.Errorf("cleanup stale pods %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
|
|
}
|
|
if err := o.waitWorkloadReady(ctx, w); err != nil {
|
|
if isNotFoundErr(err) {
|
|
o.log.Printf("warning: startup workload missing during readiness wait: %s/%s/%s", w.Namespace, w.Kind, w.Name)
|
|
continue
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cleanupStaleCriticalWorkloadPods runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) cleanupStaleCriticalWorkloadPods(ctx context.Context, w startupWorkload) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) cleanupStaleCriticalWorkloadPods(ctx context.Context, w startupWorkload) error {
|
|
if o.runner.DryRun {
|
|
return nil
|
|
}
|
|
if w.Kind != "statefulset" {
|
|
return nil
|
|
}
|
|
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
20*time.Second,
|
|
"-n",
|
|
w.Namespace,
|
|
"get",
|
|
"pods",
|
|
"-o",
|
|
"custom-columns=NAME:.metadata.name,PHASE:.status.phase,OWNER_KIND:.metadata.ownerReferences[0].kind,OWNER_NAME:.metadata.ownerReferences[0].name",
|
|
"--no-headers",
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
prefix := w.Name + "-"
|
|
for _, line := range lines(out) {
|
|
fields := strings.Fields(line)
|
|
if len(fields) < 4 {
|
|
continue
|
|
}
|
|
podName := fields[0]
|
|
phase := strings.ToLower(strings.TrimSpace(fields[1]))
|
|
ownerKind := strings.TrimSpace(fields[2])
|
|
ownerName := strings.TrimSpace(fields[3])
|
|
if !strings.EqualFold(ownerKind, "StatefulSet") || ownerName != w.Name {
|
|
continue
|
|
}
|
|
if !strings.HasPrefix(podName, prefix) {
|
|
continue
|
|
}
|
|
if phase != "unknown" && phase != "failed" {
|
|
continue
|
|
}
|
|
|
|
o.log.Printf("warning: deleting stale critical pod %s/%s phase=%s before readiness wait", w.Namespace, podName, phase)
|
|
if _, delErr := o.kubectl(
|
|
ctx,
|
|
40*time.Second,
|
|
"-n",
|
|
w.Namespace,
|
|
"delete",
|
|
"pod",
|
|
podName,
|
|
"--grace-period=0",
|
|
"--force",
|
|
"--wait=false",
|
|
); delErr != nil {
|
|
return fmt.Errorf("delete stale pod %s/%s: %w", w.Namespace, podName, delErr)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ensureWorkloadReplicas runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) ensureWorkloadReplicas(ctx context.Context, w startupWorkload, replicas int) error {
|
|
_, err := o.kubectl(
|
|
ctx,
|
|
45*time.Second,
|
|
"-n",
|
|
w.Namespace,
|
|
"scale",
|
|
w.Kind,
|
|
w.Name,
|
|
fmt.Sprintf("--replicas=%d", replicas),
|
|
)
|
|
return err
|
|
}
|
|
|
|
// waitWorkloadReady runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) waitWorkloadReady(ctx context.Context, w startupWorkload) error {
|
|
if w.Namespace == "vault" && w.Kind == "statefulset" && w.Name == "vault" {
|
|
return o.waitVaultReady(ctx, w)
|
|
}
|
|
|
|
timeout := "240s"
|
|
if w.Kind == "statefulset" {
|
|
timeout = "360s"
|
|
}
|
|
_, err := o.kubectl(
|
|
ctx,
|
|
7*time.Minute,
|
|
"-n",
|
|
w.Namespace,
|
|
"rollout",
|
|
"status",
|
|
fmt.Sprintf("%s/%s", w.Kind, w.Name),
|
|
"--timeout="+timeout,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// waitVaultReady runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) waitVaultReady(ctx context.Context, w startupWorkload) error {
|
|
if o.runner.DryRun {
|
|
return nil
|
|
}
|
|
|
|
deadline := time.Now().Add(7 * time.Minute)
|
|
var lastErr error
|
|
for time.Now().Before(deadline) {
|
|
ready, err := o.workloadReady(ctx, w)
|
|
if err == nil && ready {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
lastErr = err
|
|
}
|
|
|
|
if err := o.ensureVaultUnsealed(ctx); err != nil {
|
|
lastErr = err
|
|
o.log.Printf("warning: vault readiness still blocked: %v", err)
|
|
}
|
|
|
|
ready, err = o.workloadReady(ctx, w)
|
|
if err == nil && ready {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
lastErr = err
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(5 * time.Second):
|
|
}
|
|
}
|
|
|
|
if lastErr != nil {
|
|
return fmt.Errorf("wait ready %s/%s/%s: %w", w.Namespace, w.Kind, w.Name, lastErr)
|
|
}
|
|
return fmt.Errorf("wait ready %s/%s/%s: timeout", w.Namespace, w.Kind, w.Name)
|
|
}
|
|
|
|
// ensureVaultUnsealed runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) ensureVaultUnsealed(ctx context.Context) error {
|
|
if o.runner.DryRun {
|
|
return nil
|
|
}
|
|
|
|
phase, err := o.kubectl(ctx, 15*time.Second, "-n", "vault", "get", "pod", "vault-0", "-o", "jsonpath={.status.phase}")
|
|
if err != nil {
|
|
return fmt.Errorf("vault pod phase check failed: %w", err)
|
|
}
|
|
if strings.TrimSpace(phase) != "Running" {
|
|
return fmt.Errorf("vault-0 pod phase is %q", strings.TrimSpace(phase))
|
|
}
|
|
|
|
sealed, err := o.vaultSealed(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !sealed {
|
|
return nil
|
|
}
|
|
|
|
unsealKey, err := o.vaultUnsealKey(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for attempt := 1; attempt <= 5; attempt++ {
|
|
o.log.Printf("vault is sealed; attempting auto-unseal (%d/5)", attempt)
|
|
if _, err := o.runSensitive(
|
|
ctx,
|
|
30*time.Second,
|
|
"kubectl",
|
|
"-n", "vault",
|
|
"exec", "vault-0", "--",
|
|
"vault", "operator", "unseal", unsealKey,
|
|
); err != nil {
|
|
return fmt.Errorf("vault unseal attempt %d failed: %w", attempt, err)
|
|
}
|
|
|
|
sealed, err = o.vaultSealed(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !sealed {
|
|
o.log.Printf("vault auto-unseal succeeded")
|
|
return nil
|
|
}
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
|
|
return fmt.Errorf("vault remained sealed after 5 auto-unseal attempts")
|
|
}
|
|
|
|
// vaultSealed runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) vaultSealed(ctx context.Context) (bool, error) {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
25*time.Second,
|
|
"-n", "vault",
|
|
"exec", "vault-0", "--",
|
|
"sh", "-lc",
|
|
"VAULT_ADDR=http://127.0.0.1:8200 vault status -format=json 2>/dev/null || true",
|
|
)
|
|
if err != nil {
|
|
return false, fmt.Errorf("vault status check failed: %w", err)
|
|
}
|
|
sealed, err := parseVaultSealed(out)
|
|
if err != nil {
|
|
return false, fmt.Errorf("parse vault status: %w", err)
|
|
}
|
|
return sealed, nil
|
|
}
|
|
|
|
// parseVaultSealed runs one orchestration or CLI step.
|
|
// Signature: parseVaultSealed(raw string) (bool, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func parseVaultSealed(raw string) (bool, error) {
|
|
trimmed := strings.TrimSpace(raw)
|
|
if trimmed == "" {
|
|
return false, fmt.Errorf("empty vault status output")
|
|
}
|
|
start := strings.Index(trimmed, "{")
|
|
end := strings.LastIndex(trimmed, "}")
|
|
if start < 0 || end < 0 || end < start {
|
|
return false, fmt.Errorf("vault status payload missing JSON object")
|
|
}
|
|
payload := trimmed[start : end+1]
|
|
|
|
type vaultStatus struct {
|
|
Sealed bool `json:"sealed"`
|
|
}
|
|
var st vaultStatus
|
|
if err := json.Unmarshal([]byte(payload), &st); err != nil {
|
|
return false, err
|
|
}
|
|
return st.Sealed, nil
|
|
}
|
|
|
|
// vaultUnsealKey runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) vaultUnsealKey(ctx context.Context) (string, error) {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
15*time.Second,
|
|
"-n", "vault",
|
|
"get", "secret", "vault-init",
|
|
"-o", "jsonpath={.data.unseal_key_b64}",
|
|
)
|
|
if err == nil {
|
|
decoded, decodeErr := base64.StdEncoding.DecodeString(strings.TrimSpace(out))
|
|
if decodeErr == nil {
|
|
key := strings.TrimSpace(string(decoded))
|
|
if key != "" {
|
|
o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(key) })
|
|
return key, nil
|
|
}
|
|
err = fmt.Errorf("vault-init unseal key is empty")
|
|
} else {
|
|
err = fmt.Errorf("decode vault-init unseal_key_b64: %w", decodeErr)
|
|
}
|
|
} else {
|
|
err = fmt.Errorf("read vault-init secret: %w", err)
|
|
}
|
|
|
|
fallbackKey, fileErr := o.readVaultUnsealKeyFile()
|
|
if fileErr == nil {
|
|
o.log.Printf("warning: using cached vault unseal key from %s", o.cfg.Startup.VaultUnsealKeyFile)
|
|
return fallbackKey, nil
|
|
}
|
|
breakglassKey, breakglassErr := o.readVaultUnsealKeyBreakglass(ctx)
|
|
if breakglassErr == nil {
|
|
o.log.Printf("warning: using break-glass vault unseal key command fallback")
|
|
o.bestEffort("cache vault unseal key locally", func() error { return o.writeVaultUnsealKeyFile(breakglassKey) })
|
|
return breakglassKey, nil
|
|
}
|
|
return "", fmt.Errorf("%v; fallback %v; break-glass %v", err, fileErr, breakglassErr)
|
|
}
|
|
|
|
// readVaultUnsealKeyBreakglass runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) readVaultUnsealKeyBreakglass(ctx context.Context) (string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) readVaultUnsealKeyBreakglass(ctx context.Context) (string, error) {
|
|
cmd := strings.TrimSpace(o.cfg.Startup.VaultUnsealBreakglassCommand)
|
|
if cmd == "" {
|
|
return "", fmt.Errorf("break-glass command not configured")
|
|
}
|
|
timeout := time.Duration(o.cfg.Startup.VaultUnsealBreakglassTimeout) * time.Second
|
|
if timeout <= 0 {
|
|
timeout = 15 * time.Second
|
|
}
|
|
out, err := o.runSensitive(ctx, timeout, "sh", "-lc", cmd)
|
|
if err != nil {
|
|
return "", fmt.Errorf("run break-glass command: %w", err)
|
|
}
|
|
key := strings.TrimSpace(out)
|
|
if key == "" {
|
|
return "", fmt.Errorf("break-glass command returned empty output")
|
|
}
|
|
return key, nil
|
|
}
|
|
|
|
// writeVaultUnsealKeyFile runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) writeVaultUnsealKeyFile(key string) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) writeVaultUnsealKeyFile(key string) error {
|
|
path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile)
|
|
if path == "" {
|
|
return fmt.Errorf("vault unseal key file path is empty")
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o700); err != nil {
|
|
return fmt.Errorf("ensure vault unseal key dir: %w", err)
|
|
}
|
|
if err := os.WriteFile(path, []byte(strings.TrimSpace(key)+"\n"), 0o600); err != nil {
|
|
return fmt.Errorf("write vault unseal key file: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// readVaultUnsealKeyFile runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) readVaultUnsealKeyFile() (string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) readVaultUnsealKeyFile() (string, error) {
|
|
path := strings.TrimSpace(o.cfg.Startup.VaultUnsealKeyFile)
|
|
if path == "" {
|
|
return "", fmt.Errorf("vault unseal key file path is empty")
|
|
}
|
|
b, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return "", fmt.Errorf("read vault unseal key file %s: %w", path, err)
|
|
}
|
|
key := strings.TrimSpace(string(b))
|
|
if key == "" {
|
|
return "", fmt.Errorf("vault unseal key file %s is empty", path)
|
|
}
|
|
return key, nil
|
|
}
|
|
|
|
// workloadReady runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) workloadReady(ctx context.Context, w startupWorkload) (bool, error) {
|
|
out, err := o.kubectl(
|
|
ctx,
|
|
20*time.Second,
|
|
"-n",
|
|
w.Namespace,
|
|
"get",
|
|
w.Kind,
|
|
w.Name,
|
|
"-o",
|
|
"jsonpath={.status.readyReplicas}",
|
|
)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
raw := strings.TrimSpace(out)
|
|
if raw == "" || raw == "<no value>" {
|
|
return false, nil
|
|
}
|
|
n, err := strconv.Atoi(raw)
|
|
if err != nil {
|
|
return false, fmt.Errorf("parse readyReplicas %q: %w", raw, err)
|
|
}
|
|
return n >= 1, nil
|
|
}
|
|
|
|
// isNotFoundErr runs one orchestration or CLI step.
|
|
// Signature: isNotFoundErr(err error) bool.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func isNotFoundErr(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
msg := strings.ToLower(err.Error())
|
|
return strings.Contains(msg, "not found") || strings.Contains(msg, "(notfound)")
|
|
}
|