diff --git a/testing/orchestrator/hooks_gap_matrix_part10_test.go b/testing/orchestrator/hooks_gap_matrix_part10_test.go new file mode 100644 index 0000000..6d4be20 --- /dev/null +++ b/testing/orchestrator/hooks_gap_matrix_part10_test.go @@ -0,0 +1,560 @@ +package orchestrator + +import ( + "context" + "encoding/base64" + "errors" + "io" + "log" + "net" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "scm.bstein.dev/bstein/ananke/internal/cluster" + "scm.bstein.dev/bstein/ananke/internal/config" + "scm.bstein.dev/bstein/ananke/internal/execx" + "scm.bstein.dev/bstein/ananke/internal/state" +) + +// newHookOrchestratorAdvanced runs one orchestration or CLI step. +// Signature: newHookOrchestratorAdvanced(t *testing.T, cfg config.Config, dryRun bool, run commandOverride, runSensitive commandOverride) (*cluster.Orchestrator, *commandRecorder). +// Why: this part10 matrix needs dry-run and non-dry-run variants while keeping +// command dispatch deterministic from the top-level testing module. +func newHookOrchestratorAdvanced( + t *testing.T, + cfg config.Config, + dryRun bool, + run func(context.Context, time.Duration, string, ...string) (string, error), + runSensitive func(context.Context, time.Duration, string, ...string) (string, error), +) (*cluster.Orchestrator, *commandRecorder) { + t.Helper() + if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil { + t.Fatalf("ensure state dir: %v", err) + } + recorder := &commandRecorder{} + if run == nil || runSensitive == nil { + base := lifecycleDispatcher(recorder) + if run == nil { + run = base + } + if runSensitive == nil { + runSensitive = base + } + } + orch := cluster.New(cfg, &execx.Runner{DryRun: dryRun}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0)) + orch.SetCommandOverrides(run, runSensitive) + return orch, recorder +} + +// TestHookGapMatrixPart10LowFileClosure runs one orchestration or CLI step. +// Signature: TestHookGapMatrixPart10LowFileClosure(t *testing.T). +// Why: closes remaining branch gaps on low-coverage orchestrator files using +// targeted hook-level scenarios instead of brittle full-drill reruns. +func TestHookGapMatrixPart10LowFileClosure(t *testing.T) { + t.Run("critical-vault-low-branches", func(t *testing.T) { + t.Run("vault-sealed-parse-error", func(t *testing.T) { + cfg := lifecycleConfig(t) + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "vault status -format=json") { + return "{invalid-json", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + if _, err := orch.TestHookVaultSealed(context.Background()); err == nil || !strings.Contains(err.Error(), "parse vault status") { + t.Fatalf("expected vault status parse error branch, got %v", err) + } + }) + + t.Run("vault-unseal-key-empty-decoded", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.VaultUnsealBreakglassCommand = "" + encodedEmpty := base64.StdEncoding.EncodeToString([]byte(" ")) + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get secret vault-init") { + return encodedEmpty, nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + if _, err := orch.TestHookVaultUnsealKey(context.Background()); err == nil || !strings.Contains(err.Error(), "vault-init unseal key is empty") { + t.Fatalf("expected empty decoded unseal key branch, got %v", err) + } + }) + + t.Run("write-unseal-key-file-write-error", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.VaultUnsealKeyFile = t.TempDir() + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil) + if err := orch.TestHookWriteVaultUnsealKeyFile("vault-key"); err == nil || !strings.Contains(err.Error(), "write vault unseal key file") { + t.Fatalf("expected write failure branch when key path is a directory, got %v", err) + } + }) + + t.Run("workload-ready-no-value-and-ensure-error", func(t *testing.T) { + cfg := lifecycleConfig(t) + runNoValue := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "jsonpath={.status.readyReplicas}") { + return "", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchNoValue, _ := newHookOrchestratorAdvanced(t, cfg, false, runNoValue, runNoValue) + ready, err := orchNoValue.TestHookWorkloadReady(context.Background(), "vault", "statefulset", "vault") + if err != nil || ready { + t.Fatalf("expected no-value readiness branch, ready=%v err=%v", ready, err) + } + + runEnsureErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, " scale "): + return "", nil + case name == "kubectl" && strings.Contains(command, "get pods -o custom-columns"): + return "", nil + case name == "kubectl" && strings.Contains(command, "rollout status statefulset/victoria-metrics-single-server"): + return "", errors.New("rollout failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchEnsureErr, _ := newHookOrchestratorAdvanced(t, cfg, false, runEnsureErr, runEnsureErr) + if err := orchEnsureErr.TestHookEnsureCriticalStartupWorkloads(context.Background()); err == nil || !strings.Contains(err.Error(), "rollout failed") { + t.Fatalf("expected ensureCriticalStartupWorkloads wait error branch, got %v", err) + } + }) + + t.Run("ensure-vault-unsealed-phase-and-followup-errors", func(t *testing.T) { + cfgPhase := lifecycleConfig(t) + runPhase := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get pod vault-0") { + return "Pending", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchPhase, _ := newHookOrchestratorAdvanced(t, cfgPhase, false, runPhase, runPhase) + if err := orchPhase.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "pod phase") { + t.Fatalf("expected pod phase guard branch, got %v", err) + } + + cfgFollowup := lifecycleConfig(t) + sealedCalls := 0 + runFollowup := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pod vault-0"): + return "Running", nil + case name == "kubectl" && strings.Contains(command, "get secret vault-init"): + return base64.StdEncoding.EncodeToString([]byte("vault-key")), nil + case name == "kubectl" && strings.Contains(command, "vault status -format=json"): + sealedCalls++ + if sealedCalls == 1 { + return `{"sealed":true}`, nil + } + return "", errors.New("vault status failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + runSensitive := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "vault operator unseal") { + return "ok", nil + } + return runFollowup(ctx, timeout, name, args...) + } + orchFollowup, _ := newHookOrchestratorAdvanced(t, cfgFollowup, false, runFollowup, runSensitive) + if err := orchFollowup.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "vault status check failed") { + t.Fatalf("expected follow-up sealed status error branch, got %v", err) + } + }) + }) + + t.Run("drain-and-scaling-low-branches", func(t *testing.T) { + t.Run("drain-workers-error-aggregation-and-diagnostics", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Shutdown.DrainParallelism = 0 + workers := []string{"w1", "w2", "w3", "w4", "w5"} + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "cordon "): + return "", errors.New("cordon denied") + case name == "kubectl" && strings.Contains(command, "drain "): + return "", errors.New("drain denied") + case name == "kubectl" && strings.Contains(command, "get pods -A --field-selector"): + return strings.Join([]string{ + "malformed", + "ns p1 Running Deployment", + "ns p2 Running Deployment", + "ns p3 Running Deployment", + "ns p4 Running Deployment", + "ns p5 Running Deployment", + "ns p6 Running Deployment", + "ns p7 Running Deployment", + }, "\n"), nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + err := orch.TestHookDrainWorkers(context.Background(), workers) + if err == nil || !strings.Contains(err.Error(), "drain workers had 5 errors") { + t.Fatalf("expected drain aggregation branch, got %v", err) + } + }) + + t.Run("run-ssh-across-nodes-clamp-and-skip", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Shutdown.SSHParallelism = 99 + cfg.SSHManagedNodes = []string{"titan-db"} + rec := &commandRecorder{} + base := lifecycleDispatcher(rec) + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, base, base) + orch.TestHookRunSSHAcrossNodes(context.Background(), []string{"titan-db", "not-managed"}, "noop", "echo ok") + if !rec.contains("atlas@titan-db echo ok") { + t.Fatalf("expected managed ssh execution branch") + } + }) + + t.Run("latest-etcd-snapshot-empty-list", func(t *testing.T) { + cfg := lifecycleConfig(t) + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "ssh" && strings.Contains(command, "etcd-snapshot ls") { + return "NAME LOCATION", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + if _, err := orch.TestHookLatestEtcdSnapshotPath(context.Background(), "titan-db"); err == nil || !strings.Contains(err.Error(), "no etcd snapshots found") { + t.Fatalf("expected empty snapshot-list branch, got %v", err) + } + }) + + t.Run("scaling-fallback-and-snapshot-writer-errors", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Workers = nil + cfg.SSHManagedNodes = []string{"titan-db", "titan-23", " "} + cfg.ControlPlanes = []string{"titan-db"} + runWorkers := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + if name == "kubectl" && len(args) >= 2 && args[0] == "get" && args[1] == "nodes" { + return "", errors.New("api down") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchWorkers, _ := newHookOrchestratorAdvanced(t, cfg, false, runWorkers, runWorkers) + workers, err := orchWorkers.TestHookEffectiveWorkers(context.Background()) + if err != nil || len(workers) == 0 { + t.Fatalf("expected inventory worker fallback branch, workers=%v err=%v", workers, err) + } + + cfgWrite := lifecycleConfig(t) + cfgWrite.State.Dir = t.TempDir() + // Force write failure by making the snapshot output path a directory. + if err := os.MkdirAll(filepath.Join(cfgWrite.State.Dir, "scaled-workloads.json"), 0o755); err != nil { + t.Fatalf("create snapshot blocker directory: %v", err) + } + runWrite := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get deployment -A -o jsonpath="): + return "monitoring\tgrafana\t1\n", nil + case name == "kubectl" && strings.Contains(command, "get statefulset -A -o jsonpath="): + return "", nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchWrite, _ := newHookOrchestratorAdvanced(t, cfgWrite, false, runWrite, runWrite) + if err := orchWrite.TestHookScaleDownApps(context.Background()); err == nil || !strings.Contains(err.Error(), "write scaled workload snapshot") { + t.Fatalf("expected scaled snapshot write-failure branch, got %v", err) + } + }) + }) + + t.Run("flux-ingress-service-and-inventory-low-branches", func(t *testing.T) { + t.Run("flux-health-ready-reason-fallback-and-heal-query-error", func(t *testing.T) { + cfg := lifecycleConfig(t) + runReady := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"): + return `{"items":[{"metadata":{"namespace":"","name":"skip"},"spec":{"suspend":false},"status":{"conditions":[]}},{"metadata":{"namespace":"flux-system","name":"apps"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","reason":"","message":""}]}}]}`, nil + case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"): + return "", errors.New("jobs query failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchReady, _ := newHookOrchestratorAdvanced(t, cfg, false, runReady, runReady) + ready, detail, err := orchReady.TestHookFluxHealthReady(context.Background()) + if err != nil || ready || !strings.Contains(detail, "ready=false") { + t.Fatalf("expected flux ready-reason fallback branch, ready=%v detail=%q err=%v", ready, detail, err) + } + if _, err := orchReady.TestHookHealImmutableFluxJobs(context.Background()); err == nil || !strings.Contains(err.Error(), "query jobs") { + t.Fatalf("expected immutable-job query error branch, got %v", err) + } + }) + + t.Run("wait-for-flux-health-immutable-heal-path-with-cancel", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.FluxHealthWaitSeconds = 1 + cfg.Startup.FluxHealthPollSeconds = 1 + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"): + return `{"items":[{"metadata":{"namespace":"flux-system","name":"apps"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","message":"Job failed: field is immutable","reason":"ReconciliationFailed"}]}}]}`, nil + case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"): + return "", errors.New("jobs unavailable") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if err := orch.TestHookWaitForFluxHealth(ctx); !errors.Is(err, context.Canceled) { + t.Fatalf("expected canceled flux wait branch, got %v", err) + } + }) + + t.Run("required-node-labels-skip-empty-and-apply-one", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.RequiredNodeLabels = map[string]map[string]string{ + "node-empty-map": {}, + "node-empty-val": {"zone": " "}, + "node-apply": {"zone": "lab-a"}, + } + rec := &commandRecorder{} + base := lifecycleDispatcher(rec) + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, base, base) + if err := orch.TestHookEnsureRequiredNodeLabels(context.Background()); err != nil { + t.Fatalf("expected ensureRequiredNodeLabels skip/apply branches, got %v", err) + } + if !rec.contains("label node node-apply --overwrite zone=lab-a") { + t.Fatalf("expected non-empty label application branch") + } + }) + + t.Run("wait-for-startup-convergence-dryrun-and-critical-endpoint-fail", func(t *testing.T) { + cfgDry := lifecycleConfig(t) + orchDry, _ := newHookOrchestratorAdvanced(t, cfgDry, true, nil, nil) + if err := orchDry.TestHookWaitForStartupConvergence(context.Background()); err != nil { + t.Fatalf("expected startup convergence dry-run fast path, got %v", err) + } + + cfgFail := lifecycleConfig(t) + cfgFail.Startup.RequireIngressChecklist = false + cfgFail.Startup.RequireServiceChecklist = false + cfgFail.Startup.RequireFluxHealth = false + cfgFail.Startup.RequireWorkloadConvergence = false + cfgFail.Startup.RequireCriticalServiceEndpoints = true + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get endpoints") { + return "", errors.New("endpoint query failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchFail, _ := newHookOrchestratorAdvanced(t, cfgFail, false, run, run) + if err := orchFail.TestHookWaitForStartupConvergence(context.Background()); err == nil || !strings.Contains(err.Error(), "query endpoints") { + t.Fatalf("expected critical-endpoint convergence failure branch, got %v", err) + } + }) + + t.Run("ingress-namespace-discovery-empty-and-query-error", func(t *testing.T) { + cfg := lifecycleConfig(t) + orchEmpty, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil) + namespaces, err := orchEmpty.TestHookDiscoverIngressNamespacesForHost(context.Background(), " ") + if err != nil || len(namespaces) != 0 { + t.Fatalf("expected empty-host fast path, namespaces=%v err=%v", namespaces, err) + } + + runErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get ingress -A -o json") { + return "", errors.New("ingress query failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchErr, _ := newHookOrchestratorAdvanced(t, cfg, false, runErr, runErr) + if _, err := orchErr.TestHookDiscoverIngressNamespacesForHost(context.Background(), "metrics.bstein.dev"); err == nil || !strings.Contains(err.Error(), "query ingresses") { + t.Fatalf("expected ingress query error branch, got %v", err) + } + }) + + t.Run("service-checklist-name-fallback-and-contains-empty-marker", func(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("open local listener: %v", err) + } + defer listener.Close() + go func() { + conn, acceptErr := listener.Accept() + if acceptErr == nil { + _, _ = conn.Write([]byte("HTTP/1.1 503 Service Unavailable\r\nContent-Length: 3\r\n\r\nbad")) + _ = conn.Close() + } + }() + + cfg := lifecycleConfig(t) + cfg.Startup.ServiceChecklist = []config.ServiceChecklistCheck{{ + Name: "", + URL: "http://" + listener.Addr().String() + "/health", + AcceptedStatuses: []int{200}, + }} + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil) + ready, detail := orch.TestHookServiceChecklistReady(context.Background()) + if ready || !strings.Contains(detail, "http://") { + t.Fatalf("expected service checklist URL-name fallback failure, ready=%v detail=%q", ready, detail) + } + if !cluster.TestHookChecklistContains("any body", " ") { + t.Fatalf("expected empty-marker compact contains branch") + } + }) + + t.Run("node-inventory-reachability-defaults-and-cancel", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.RequireNodeInventoryReach = true + cfg.Startup.NodeInventoryReachWaitSeconds = 0 + cfg.Startup.NodeInventoryReachPollSeconds = 0 + cfg.Startup.IgnoreUnavailableNodes = []string{"titan-23"} + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "ssh" && strings.Contains(command, "__ANANKE_NODE_REACHABLE__") { + return "unexpected", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if err := orch.TestHookWaitForNodeInventoryReachability(ctx); !errors.Is(err, context.Canceled) { + t.Fatalf("expected canceled inventory reachability branch, got %v", err) + } + }) + }) + + t.Run("poststart-timesync-workload-branches", func(t *testing.T) { + t.Run("poststart-default-poll-and-cancel", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.PostStartProbes = []string{"https://metrics.bstein.dev/api/health"} + cfg.Startup.PostStartProbeWaitSeconds = 0 + cfg.Startup.PostStartProbePollSeconds = 0 + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + if name == "curl" { + return "", errors.New("curl failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if err := orch.TestHookWaitForPostStartProbes(ctx); !errors.Is(err, context.Canceled) { + t.Fatalf("expected canceled post-start branch, got %v", err) + } + }) + + t.Run("resume-flux-with-commandexists-and-flux-fail-warning-path", func(t *testing.T) { + cfg := lifecycleConfig(t) + fakeBin := t.TempDir() + fluxPath := filepath.Join(fakeBin, "flux") + if err := os.WriteFile(fluxPath, []byte("#!/bin/sh\nexit 0\n"), 0o755); err != nil { + t.Fatalf("write fake flux binary: %v", err) + } + t.Setenv("PATH", fakeBin+string(os.PathListSeparator)+os.Getenv("PATH")) + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + if name == "flux" { + return "", errors.New("flux reconcile failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + if err := orch.TestHookResumeFluxAndReconcile(context.Background()); err != nil { + t.Fatalf("expected resume flux warning-only branch, got %v", err) + } + }) + + t.Run("timesync-defaults-quorum-and-datastore-cancel", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.TimeSyncMode = "quorum" + cfg.Startup.TimeSyncWaitSeconds = 0 + cfg.Startup.TimeSyncPollSeconds = 0 + cfg.Startup.TimeSyncQuorum = 0 + cfg.SSHManagedNodes = []string{"titan-db"} + cfg.SSHNodeHosts["db-host"] = "127.0.0.1" + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "sh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"): + return "no", nil + case name == "ssh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"): + return "", errors.New("ssh timed out") + case name == "ssh" && strings.Contains(command, "systemctl cat k3s"): + return "ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://127.0.0.1/k3s", nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if err := orch.TestHookWaitForTimeSync(ctx, []string{"", "titan-db"}); !errors.Is(err, context.Canceled) { + t.Fatalf("expected canceled time-sync branch, got %v", err) + } + if err := orch.TestHookPreflightExternalDatastore(ctx); !errors.Is(err, context.Canceled) { + t.Fatalf("expected canceled datastore preflight branch, got %v", err) + } + }) + + t.Run("workload-convergence-and-ignore-quick-branches", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Startup.WorkloadConvergenceWaitSeconds = 0 + cfg.Startup.WorkloadConvergencePollSeconds = 0 + cfg.Startup.IgnoreUnavailableNodes = []string{"titan-22"} + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset,daemonset -A -o json"): + return `{"items":[{"kind":"","metadata":{"namespace":"monitoring","name":"skip"}},{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"replicas-zero"},"spec":{"replicas":0,"template":{"spec":{}}},"status":{"readyReplicas":0}},{"kind":"DaemonSet","metadata":{"namespace":"apps","name":"on-ignored"},"spec":{"template":{"spec":{"nodeSelector":{"kubernetes.io/hostname":"titan-22"}}}},"status":{"desiredNumberScheduled":1,"numberReady":0}}]}`, nil + case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): + return `{"items":[{"metadata":{"namespace":"","name":"skip"}},{"metadata":{"namespace":"apps","name":"stuck","creationTimestamp":"2000-01-01T00:00:00Z","ownerReferences":[{"kind":"ReplicaSet"}]},"status":{"phase":"Pending","containerStatuses":[{"state":{"waiting":{"reason":"CrashLoopBackOff"}}}]}}]}`, nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) + if err := orch.TestHookWaitForWorkloadConvergence(context.Background()); err != nil { + t.Fatalf("expected workload convergence default-branch success, got %v", err) + } + + cfgIgnore := lifecycleConfig(t) + cfgIgnore.Startup.AutoRecycleStuckPods = false + orchIgnoreDry, _ := newHookOrchestratorAdvanced(t, cfgIgnore, true, run, run) + now := time.Now().UTC().Add(-time.Hour) + orchIgnoreDry.TestHookMaybeAutoRecycleStuckPods(context.Background(), &now) + orchIgnoreDry.TestHookMaybeAutoHealCriticalWorkloadReplicas(context.Background(), &now) + + runHealErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json") { + return "", errors.New("workload query failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchHealErr, _ := newHookOrchestratorAdvanced(t, lifecycleConfig(t), false, runHealErr, runHealErr) + if _, err := orchHealErr.TestHookHealCriticalWorkloadReplicas(context.Background()); err == nil || !strings.Contains(err.Error(), "query workloads") { + t.Fatalf("expected critical workload heal query-error branch, got %v", err) + } + }) + }) +} diff --git a/testing/orchestrator/hooks_gap_matrix_part11_test.go b/testing/orchestrator/hooks_gap_matrix_part11_test.go new file mode 100644 index 0000000..4cadf97 --- /dev/null +++ b/testing/orchestrator/hooks_gap_matrix_part11_test.go @@ -0,0 +1,792 @@ +package orchestrator + +import ( + "context" + "errors" + "io" + "log" + "net" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "scm.bstein.dev/bstein/ananke/internal/cluster" + "scm.bstein.dev/bstein/ananke/internal/config" + "scm.bstein.dev/bstein/ananke/internal/execx" + "scm.bstein.dev/bstein/ananke/internal/state" +) + +// newLifecycleMatrixOrchestrator runs one orchestration or CLI step. +// Signature: newLifecycleMatrixOrchestrator(t *testing.T, cfg config.Config, dryRun bool, run commandOverride, runSensitive commandOverride, kubeconfig string) *cluster.Orchestrator. +// Why: part11 needs direct control over runner dry-run and kubeconfig branches. +func newLifecycleMatrixOrchestrator( + t *testing.T, + cfg config.Config, + dryRun bool, + run func(context.Context, time.Duration, string, ...string) (string, error), + runSensitive func(context.Context, time.Duration, string, ...string) (string, error), + kubeconfig string, +) *cluster.Orchestrator { + t.Helper() + if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil { + t.Fatalf("ensure state dir: %v", err) + } + runner := &execx.Runner{DryRun: dryRun, Kubeconfig: kubeconfig} + orch := cluster.New(cfg, runner, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0)) + if run == nil || runSensitive == nil { + recorder := &commandRecorder{} + base := lifecycleDispatcher(recorder) + if run == nil { + run = base + } + if runSensitive == nil { + runSensitive = base + } + } + orch.SetCommandOverrides(run, runSensitive) + return orch +} + +// TestHookGapMatrixPart11RemainingClosure runs one orchestration or CLI step. +// Signature: TestHookGapMatrixPart11RemainingClosure(t *testing.T). +// Why: closes final branch gaps for lifecycle + remaining near-threshold +// orchestrator files so per-file coverage reaches the enforced 95% target. +func TestHookGapMatrixPart11RemainingClosure(t *testing.T) { + t.Run("critical-vault-final-closures", func(t *testing.T) { + t.Run("ensure-critical-cleanup-error-and-cleanup-branches", func(t *testing.T) { + cfg := lifecycleConfig(t) + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pods -o custom-columns"): + return "", errors.New("pods query failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch := newLifecycleMatrixOrchestrator(t, cfg, false, run, run, "") + if err := orch.TestHookEnsureCriticalStartupWorkloads(context.Background()); err == nil || !strings.Contains(err.Error(), "cleanup stale pods") { + t.Fatalf("expected ensureCriticalStartupWorkloads cleanup error branch, got %v", err) + } + + dry := newLifecycleMatrixOrchestrator(t, cfg, true, nil, nil, "") + if err := dry.TestHookCleanupStaleCriticalWorkloadPods(context.Background(), "vault", "statefulset", "vault"); err != nil { + t.Fatalf("expected cleanup stale pod dry-run branch, got %v", err) + } + + runCleanup := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get pods -o custom-columns") { + return strings.Join([]string{ + "badline", + "vault-0 Unknown Deployment vault", + "otherpod Unknown StatefulSet vault", + }, "\n"), nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchCleanup := newLifecycleMatrixOrchestrator(t, cfg, false, runCleanup, runCleanup, "") + if err := orchCleanup.TestHookCleanupStaleCriticalWorkloadPods(context.Background(), "vault", "statefulset", "vault"); err != nil { + t.Fatalf("expected cleanup stale pod parse/owner/prefix branches, got %v", err) + } + }) + + t.Run("wait-vault-ready-error-tracking-and-ensure-unseal-branches", func(t *testing.T) { + cfg := lifecycleConfig(t) + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "jsonpath={.status.readyReplicas}"): + return "", errors.New("ready query failed") + case name == "kubectl" && strings.Contains(command, "get pod vault-0"): + return "Running", nil + case name == "kubectl" && strings.Contains(command, "vault status -format=json"): + return `{"sealed":false}`, nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch := newLifecycleMatrixOrchestrator(t, cfg, false, run, run, "") + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if err := orch.TestHookWaitVaultReady(ctx, "vault", "statefulset", "vault"); !errors.Is(err, context.Canceled) { + t.Fatalf("expected canceled vault wait branch with error tracking, got %v", err) + } + + dry := newLifecycleMatrixOrchestrator(t, cfg, true, nil, nil, "") + if err := dry.TestHookEnsureVaultUnsealed(context.Background()); err != nil { + t.Fatalf("expected ensureVaultUnsealed dry-run fast path, got %v", err) + } + + runSealedErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get pod vault-0"): + return "Running", nil + case name == "kubectl" && strings.Contains(command, "vault status -format=json"): + return "", errors.New("vault status query failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchSealedErr := newLifecycleMatrixOrchestrator(t, cfg, false, runSealedErr, runSealedErr, "") + if err := orchSealedErr.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "vault status check failed") { + t.Fatalf("expected ensureVaultUnsealed sealed-check error branch, got %v", err) + } + }) + + t.Run("parse-vault-sealed-json-unmarshal-error", func(t *testing.T) { + if _, err := cluster.TestHookParseVaultSealed(`{"sealed":tru}`); err == nil { + t.Fatalf("expected parseVaultSealed json-unmarshal error branch") + } + }) + }) + + t.Run("ingress-service-storage-timesync-final-closures", func(t *testing.T) { + t.Run("startup-convergence-workload-and-stability-failure-branches", func(t *testing.T) { + cfgWorkload := lifecycleConfig(t) + cfgWorkload.Startup.RequireIngressChecklist = false + cfgWorkload.Startup.RequireServiceChecklist = false + cfgWorkload.Startup.RequireCriticalServiceEndpoints = false + cfgWorkload.Startup.RequireFluxHealth = false + cfgWorkload.Startup.RequireWorkloadConvergence = true + cfgWorkload.Startup.ServiceChecklistStabilitySec = 0 + runWorkload := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get deploy,statefulset,daemonset -A -o json") { + return "", errors.New("controllers query failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchWorkload := newLifecycleMatrixOrchestrator(t, cfgWorkload, false, runWorkload, runWorkload, "") + if err := orchWorkload.TestHookWaitForStartupConvergence(context.Background()); err == nil || !strings.Contains(err.Error(), "query controllers") { + t.Fatalf("expected startup convergence workload failure branch, got %v", err) + } + + cfgStability := lifecycleConfig(t) + cfgStability.Startup.RequireIngressChecklist = false + cfgStability.Startup.RequireServiceChecklist = false + cfgStability.Startup.RequireCriticalServiceEndpoints = false + cfgStability.Startup.RequireFluxHealth = false + cfgStability.Startup.RequireWorkloadConvergence = false + cfgStability.Startup.ServiceChecklistStabilitySec = 1 + cfgStability.Startup.ServiceChecklistPollSeconds = 1 + runStability := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get pods -A -o json") { + return "", errors.New("pods query failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchStability := newLifecycleMatrixOrchestrator(t, cfgStability, false, runStability, runStability, "") + if err := orchStability.TestHookWaitForStartupConvergence(context.Background()); err == nil || !strings.Contains(err.Error(), "stability window failed") { + t.Fatalf("expected startup convergence stability failure branch, got %v", err) + } + }) + + t.Run("ingress-host-discovery-and-autoheal-early-returns", func(t *testing.T) { + cfg := lifecycleConfig(t) + runHostsErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get ingress -A -o json") { + return "", errors.New("ingress list failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchHostsErr := newLifecycleMatrixOrchestrator(t, cfg, false, runHostsErr, runHostsErr, "") + if _, err := orchHostsErr.TestHookDiscoverIngressHosts(context.Background()); err == nil || !strings.Contains(err.Error(), "query ingresses") { + t.Fatalf("expected discoverIngressHosts query error branch, got %v", err) + } + if _, detail := orchHostsErr.TestHookIngressChecklistReady(context.Background()); !strings.Contains(detail, "query ingresses") { + t.Fatalf("expected ingress checklist to surface discovery error detail, got %q", detail) + } + + runNs := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get ingress -A -o json") { + return `{"items":[{"metadata":{"namespace":""},"spec":{"rules":[{"host":""},{"host":"logs.bstein.dev"}]}}]}`, nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchNs := newLifecycleMatrixOrchestrator(t, cfg, false, runNs, runNs, "") + namespaces, err := orchNs.TestHookDiscoverIngressNamespacesForHost(context.Background(), "logs.bstein.dev") + if err != nil || len(namespaces) != 0 { + t.Fatalf("expected namespace-empty/rule-empty skip branches, namespaces=%v err=%v", namespaces, err) + } + + dry := newLifecycleMatrixOrchestrator(t, cfg, true, nil, nil, "") + now := time.Now().UTC().Add(-time.Hour) + dry.TestHookMaybeAutoHealIngressHostBackends(context.Background(), &now, "logs.bstein.dev: 502") + + orchEarly := newLifecycleMatrixOrchestrator(t, cfg, false, nil, nil, "") + orchEarly.TestHookMaybeAutoHealIngressHostBackends(context.Background(), &now, "no-host-here") + if host := orchEarly.TestHookChecklistFailureHost("https://logs.bstein.dev/login"); host != "" { + t.Fatalf("expected checklistFailureHost colon-split edge branch, got %q", host) + } + }) + + t.Run("service-stability-error-branches", func(t *testing.T) { + cfg := lifecycleConfig(t) + runWorkloadsErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get ingress -A -o json"): + return `{"items":[{"metadata":{"namespace":"monitoring"},"spec":{"rules":[{"host":"logs.bstein.dev"}]}}]}`, nil + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): + return "", errors.New("workload list failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchWorkloadsErr := newLifecycleMatrixOrchestrator(t, cfg, false, runWorkloadsErr, runWorkloadsErr, "") + if _, err := orchWorkloadsErr.TestHookHealIngressHostBackendReplicas(context.Background(), "logs.bstein.dev"); err == nil || !strings.Contains(err.Error(), "query workloads") { + t.Fatalf("expected healIngressHostBackendReplicas query error branch, got %v", err) + } + + runDecodeErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get ingress -A -o json"): + return `{"items":[{"metadata":{"namespace":"monitoring"},"spec":{"rules":[{"host":"logs.bstein.dev"}]}}]}`, nil + case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"): + return "{not-json", nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchDecodeErr := newLifecycleMatrixOrchestrator(t, cfg, false, runDecodeErr, runDecodeErr, "") + if _, err := orchDecodeErr.TestHookHealIngressHostBackendReplicas(context.Background(), "logs.bstein.dev"); err == nil || !strings.Contains(err.Error(), "decode workloads") { + t.Fatalf("expected healIngressHostBackendReplicas decode error branch, got %v", err) + } + + runBodyErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchBodyErr := newLifecycleMatrixOrchestrator(t, cfg, false, runBodyErr, runBodyErr, "") + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("open probe listener: %v", err) + } + defer ln.Close() + go func() { + conn, acceptErr := ln.Accept() + if acceptErr == nil { + _, _ = conn.Write([]byte("HTTP/1.1 200 OK\r\nContent-Length: 100\r\n\r\nshort")) + _ = conn.Close() + } + }() + _, _, probeErr := orchBodyErr.TestHookHTTPChecklistProbe(context.Background(), config.ServiceChecklistCheck{ + URL: "http://" + ln.Addr().String() + "/health", + }) + if probeErr == nil || !strings.Contains(probeErr.Error(), "read response body") { + t.Fatalf("expected checklist body read-error branch, got %v", probeErr) + } + + cfgStability := lifecycleConfig(t) + cfgStability.Startup.RequireFluxHealth = false + cfgStability.Startup.RequireWorkloadConvergence = true + cfgStability.Startup.RequireServiceChecklist = false + cfgStability.Startup.RequireIngressChecklist = false + runStability := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get deploy,statefulset,daemonset -A -o json") { + return "", errors.New("controllers failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchStability := newLifecycleMatrixOrchestrator(t, cfgStability, false, runStability, runStability, "") + if err := orchStability.TestHookStartupStabilityHealthy(context.Background()); err == nil || !strings.Contains(err.Error(), "workload check error") { + t.Fatalf("expected startupStabilityHealthy workload-error branch, got %v", err) + } + + cfgWindow := lifecycleConfig(t) + cfgWindow.Startup.ServiceChecklistStabilitySec = 1 + cfgWindow.Startup.ServiceChecklistPollSeconds = 1 + cfgWindow.Startup.RequireFluxHealth = false + cfgWindow.Startup.RequireWorkloadConvergence = false + cfgWindow.Startup.RequireServiceChecklist = false + cfgWindow.Startup.RequireIngressChecklist = false + orchWindow := newLifecycleMatrixOrchestrator(t, cfgWindow, false, nil, nil, "") + ctx, cancel := context.WithCancel(context.Background()) + cancel() + if err := orchWindow.TestHookWaitForStabilityWindow(ctx); !errors.Is(err, context.Canceled) { + t.Fatalf("expected waitForStabilityWindow ctx-canceled branch, got %v", err) + } + }) + + t.Run("storage-and-timesync-small-branches", func(t *testing.T) { + cfgStorage := lifecycleConfig(t) + cfgStorage.Startup.StorageMinReadyNodes = 1 + orchStorageDry := newLifecycleMatrixOrchestrator(t, cfgStorage, true, nil, nil, "") + if err := orchStorageDry.TestHookWaitForStorageReady(context.Background()); err != nil { + t.Fatalf("expected waitForStorageReady dry-run branch, got %v", err) + } + + cfgStorage.Startup.StorageCriticalPVCs = []string{" "} + runStorage := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "get nodes.longhorn.io") { + return "malformed\nnode-a:True:True\n", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchStorage := newLifecycleMatrixOrchestrator(t, cfgStorage, false, runStorage, runStorage, "") + ready, _, err := orchStorage.TestHookStorageReady(context.Background()) + if err != nil || !ready { + t.Fatalf("expected storage malformed-line and empty-pvc-entry branches, ready=%v err=%v", ready, err) + } + + cfgSync := lifecycleConfig(t) + cfgSync.Startup.TimeSyncMode = "quorum" + cfgSync.Startup.TimeSyncQuorum = 99 + cfgSync.SSHManagedNodes = []string{"titan-db"} + runSync := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "sh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"): + return "yes", nil + case name == "ssh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"): + return "yes", nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchSync := newLifecycleMatrixOrchestrator(t, cfgSync, false, runSync, runSync, "") + if err := orchSync.TestHookWaitForTimeSync(context.Background(), []string{"", "titan-db", "unmanaged"}); err != nil { + t.Fatalf("expected timesync quorum clamp/success branches, got %v", err) + } + + cfgSyncBad := lifecycleConfig(t) + cfgSyncBad.SSHPort = 22 + cfgSyncBad.SSHUser = "" + cfgSyncBad.ControlPlanes = []string{"", "cp1"} + cfgSyncBad.Workers = []string{"", "wk1"} + cfgSyncBad.SSHManagedNodes = []string{"cp1"} + cfgSyncBad.SSHNodeHosts = map[string]string{ + "cp1": "bad host", + "wk1": "wk1", + } + orchSyncBad := newLifecycleMatrixOrchestrator(t, cfgSyncBad, false, nil, nil, "") + if err := orchSyncBad.TestHookValidateNodeInventory(); err == nil || !strings.Contains(err.Error(), "node inventory preflight failed") { + t.Fatalf("expected validateNodeInventory host/user branches, got %v", err) + } + + orchSyncDry := newLifecycleMatrixOrchestrator(t, lifecycleConfig(t), true, nil, nil, "") + if err := orchSyncDry.TestHookWaitForTimeSync(context.Background(), []string{"titan-db"}); err != nil { + t.Fatalf("expected waitForTimeSync dry-run branch, got %v", err) + } + }) + }) + + t.Run("poststart-scaling-and-lifecycle-final-closures", func(t *testing.T) { + t.Run("poststart-ssh-and-sensitive-branches", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.SSHManagedNodes = nil + cfg.SSHNodeUsers = map[string]string{"titan-db": "special"} + cfg.SSHJumpHost = "jump-a" + cfg.SSHJumpUser = "jumper" + cfg.SSHNodeHosts["jump-a"] = "jump.example.internal" + attempt := 0 + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + if name == "ssh" { + attempt++ + if attempt <= 2 { + return "REMOTE HOST IDENTIFICATION HAS CHANGED!", errors.New("host key mismatch") + } + return "", errors.New("still failing") + } + if name == "curl" { + return "200", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch := newLifecycleMatrixOrchestrator(t, cfg, false, run, run, "/tmp/fake-kubeconfig") + if _, err := orch.TestHookSSHWithTimeout(context.Background(), "titan-db", "echo ok", 2*time.Second); err == nil { + t.Fatalf("expected sshWithTimeout retry-failure branch") + } + if !orch.TestHookSSHManaged("any-node") { + t.Fatalf("expected sshManaged empty-allowlist branch") + } + if _, err := orch.TestHookRunSensitive(context.Background(), 2*time.Second, "sh", "-lc", "echo ok"); err != nil { + t.Fatalf("expected runSensitive kubeconfig-env branch, got %v", err) + } + }) + + t.Run("scaling-final-branches", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.Workers = nil + runEffective := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + if name == "kubectl" && len(args) >= 2 && args[0] == "get" && args[1] == "nodes" { + return "wk1 ", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchEffective := newLifecycleMatrixOrchestrator(t, cfg, false, runEffective, runEffective, "") + workers, err := orchEffective.TestHookEffectiveWorkers(context.Background()) + if err != nil || len(workers) != 1 || workers[0] != "wk1" { + t.Fatalf("expected effectiveWorkers discover-success branch, workers=%v err=%v", workers, err) + } + + runNoWorkers := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + if name == "kubectl" && len(args) >= 2 && args[0] == "get" && args[1] == "nodes" { + return "cp1 true ", nil + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchNoWorkers := newLifecycleMatrixOrchestrator(t, cfg, false, runNoWorkers, runNoWorkers, "") + if _, err := orchNoWorkers.TestHookDiscoverWorkers(context.Background()); err == nil || !strings.Contains(err.Error(), "no workers discovered") { + t.Fatalf("expected discoverWorkers no-workers branch, got %v", err) + } + + runPatch := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io"): + return "apps\n", nil + case name == "kubectl" && strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A"): + return "", errors.New("helmrelease list failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchPatch := newLifecycleMatrixOrchestrator(t, cfg, false, runPatch, runPatch, "") + if err := orchPatch.TestHookPatchFluxSuspendAll(context.Background(), true); err == nil || !strings.Contains(err.Error(), "helmrelease list failed") { + t.Fatalf("expected patchFluxSuspendAll helmrelease-query error branch, got %v", err) + } + + cfgScale := lifecycleConfig(t) + cfgScale.Shutdown.ScaleParallelism = 0 + runScale := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "get deployment -A -o jsonpath="): + return "monitoring\tgrafana\t1\nbad-line\n", nil + case name == "kubectl" && strings.Contains(command, "get statefulset -A -o jsonpath="): + return "", errors.New("statefulset query failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchScale := newLifecycleMatrixOrchestrator(t, cfgScale, false, runScale, runScale, "") + if _, err := orchScale.TestHookListScalableWorkloads(context.Background()); err == nil || !strings.Contains(err.Error(), "collect statefulsets") { + t.Fatalf("expected listScalableWorkloads malformed/statefulset-error branches, got %v", err) + } + + cfgRestore := lifecycleConfig(t) + cfgRestore.Shutdown.ScaleParallelism = 0 + if err := os.MkdirAll(cfgRestore.State.Dir, 0o755); err != nil { + t.Fatalf("mkdir restore state dir: %v", err) + } + snapshotPath := filepath.Join(cfgRestore.State.Dir, "scaled-workloads.json") + if err := os.WriteFile(snapshotPath, []byte(`{"generated_at":"2026-01-01T00:00:00Z","entries":[{"namespace":"monitoring","kind":"deployment","name":"grafana","replicas":1}]}`), 0o644); err != nil { + t.Fatalf("write restore snapshot: %v", err) + } + runRestore := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "scale deployment grafana --replicas=1") { + return "", errors.New("scale restore failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchRestore := newLifecycleMatrixOrchestrator(t, cfgRestore, false, runRestore, runRestore, "") + if err := orchRestore.TestHookRestoreScaledApps(context.Background()); err == nil || !strings.Contains(err.Error(), "scaling had 1 errors") { + t.Fatalf("expected restoreScaledApps scale error branch, got %v", err) + } + + orchRestoreDry := newLifecycleMatrixOrchestrator(t, cfgRestore, true, runRestore, runRestore, "") + if err := orchRestoreDry.TestHookRestoreScaledApps(context.Background()); err != nil { + t.Fatalf("expected restoreScaledApps dry-run branch, got %v", err) + } + }) + + t.Run("lifecycle-startup-etcd-shutdown-final-branches", func(t *testing.T) { + t.Run("startup-cooldown-and-poststart-failure-paths", func(t *testing.T) { + cfg := lifecycleFastConfig(t) + cfg.Startup.ShutdownCooldownSeconds = 300 + cfg.Startup.RequirePostStartProbes = true + cfg.Startup.PostStartProbes = []string{"https://logs.bstein.dev/health"} + if err := state.WriteIntent(cfg.State.IntentPath, state.Intent{ + State: state.IntentShutdownComplete, + Reason: "recent-shutdown", + Source: "test", + UpdatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("seed cooldown intent: %v", err) + } + orchCooldown := newLifecycleMatrixOrchestrator(t, cfg, false, nil, nil, "") + ctxCooldown, cancelCooldown := context.WithCancel(context.Background()) + cancelCooldown() + if err := orchCooldown.Startup(ctxCooldown, cluster.StartupOptions{Reason: "cooldown"}); err == nil || !strings.Contains(err.Error(), "startup canceled while waiting for shutdown cooldown") { + t.Fatalf("expected startup cooldown cancel branch, got %v", err) + } + + cfgPost := lifecycleFastConfig(t) + cfgPost.Startup.RequirePostStartProbes = true + cfgPost.Startup.PostStartProbes = []string{"https://logs.bstein.dev/health"} + cfgPost.Startup.PostStartProbeWaitSeconds = 1 + cfgPost.Startup.PostStartProbePollSeconds = 1 + cfgPost.Startup.APIWaitSeconds = 1 + cfgPost.Startup.APIPollSeconds = 5 + runPost := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + if name == "curl" { + return "", errors.New("probe failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchPost := newLifecycleMatrixOrchestrator(t, cfgPost, false, runPost, runPost, "") + if err := orchPost.Startup(context.Background(), cluster.StartupOptions{Reason: "post-start"}); err == nil || !strings.Contains(err.Error(), "post-start probes") { + t.Fatalf("expected startup post-start probe failure branch, got %v", err) + } + }) + + t.Run("startup-bootstrap-and-flux-branch-error-paths", func(t *testing.T) { + cfg := lifecycleFastConfig(t) + repo := t.TempDir() + cfg.IACRepoPath = repo + cfg.LocalBootstrapPaths = []string{"services/bootstrap"} + if err := os.MkdirAll(filepath.Join(repo, ".git"), 0o755); err != nil { + t.Fatalf("mkdir .git: %v", err) + } + if err := os.MkdirAll(filepath.Join(repo, "services", "bootstrap"), 0o755); err != nil { + t.Fatalf("mkdir bootstrap dir: %v", err) + } + readyCalls := 0 + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"): + return "", errors.New("branch read failed") + case name == "kubectl" && strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"): + readyCalls++ + if readyCalls == 1 { + return "", nil + } + return "True", nil + case name == "git": + return "", nil + case name == "sh" && strings.Contains(command, "kubectl kustomize"): + return "apiVersion: v1\nkind: Namespace\nmetadata:\n name: bootstrap\n", nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch := newLifecycleMatrixOrchestrator(t, cfg, false, run, run, "") + if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "flux-branch-error"}); err == nil || !strings.Contains(err.Error(), "read flux source branch") { + t.Fatalf("expected startup ensureFluxBranch error branch, got %v", err) + } + }) + + t.Run("startup-intent-clear-and-auto-etcd-flux-patch-branches", func(t *testing.T) { + t.Run("clear-stale-startup-intent-success-path", func(t *testing.T) { + cfg := lifecycleFastConfig(t) + if err := state.WriteIntent(cfg.State.IntentPath, state.Intent{ + State: state.IntentStartupInProgress, + Reason: "stale-startup", + Source: "test", + UpdatedAt: time.Now().UTC(), + }); err != nil { + t.Fatalf("seed stale startup intent: %v", err) + } + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}") { + return "", errors.New("flux url read failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch := newLifecycleMatrixOrchestrator(t, cfg, false, run, run, "") + if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "stale-startup-clear"}); err == nil || !strings.Contains(err.Error(), "read flux source url") { + t.Fatalf("expected startup flux-url guard error after stale startup clear, got %v", err) + } + }) + + t.Run("clear-stale-shutdown-intent-success-path", func(t *testing.T) { + cfg := lifecycleFastConfig(t) + if err := state.WriteIntent(cfg.State.IntentPath, state.Intent{ + State: state.IntentShuttingDown, + Reason: "stale-shutdown", + Source: "test", + UpdatedAt: time.Now().UTC().Add(-2 * time.Hour), + }); err != nil { + t.Fatalf("seed stale shutdown intent: %v", err) + } + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}") { + return "", errors.New("flux url read failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orch := newLifecycleMatrixOrchestrator(t, cfg, false, run, run, "") + if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "stale-shutdown-clear"}); err == nil || !strings.Contains(err.Error(), "read flux source url") { + t.Fatalf("expected startup flux-url guard error after stale shutdown clear, got %v", err) + } + }) + + t.Run("auto-etcd-default-control-plane-selection", func(t *testing.T) { + cfg := lifecycleFastConfig(t) + cfg.Startup.AutoEtcdRestoreOnAPIFailure = true + apiVersionCalls := 0 + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "version --request-timeout=5s"): + apiVersionCalls++ + if apiVersionCalls == 1 { + return "", errors.New("api down") + } + return "v1.31.0", nil + case name == "ssh" && strings.Contains(command, "systemctl cat k3s"): + return "", errors.New("unit read failed") + case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"): + return "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git", nil + case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"): + return "main", nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch := newLifecycleMatrixOrchestrator(t, cfg, false, run, run, "") + err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "auto-etcd-default-cp"}) + if err == nil || !strings.Contains(err.Error(), "automatic etcd restore failed") { + t.Fatalf("expected startup auto-etcd restore failure path, got %v", err) + } + }) + + t.Run("ensure-flux-branch-patch-error", func(t *testing.T) { + cfg := lifecycleFastConfig(t) + run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "kubectl" && strings.Contains(command, "version --request-timeout=5s"): + return "v1.31.0", nil + case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"): + return "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git", nil + case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"): + return "feature/sso", nil + case name == "kubectl" && strings.Contains(command, "patch gitrepository flux-system"): + return "", errors.New("branch patch failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orch := newLifecycleMatrixOrchestrator(t, cfg, false, run, run, "") + err := orch.Startup(context.Background(), cluster.StartupOptions{ + Reason: "ensure-flux-branch-patch", + ForceFluxBranch: "main", + }) + if err == nil || !strings.Contains(err.Error(), "set flux source branch") { + t.Fatalf("expected startup ensureFluxBranch patch error path, got %v", err) + } + }) + }) + + t.Run("etcd-restore-and-shutdown-error-paths", func(t *testing.T) { + cfg := lifecycleConfig(t) + cfg.ControlPlanes = []string{"titan-db", "titan-24"} + cfg.SSHManagedNodes = []string{"titan-db", "titan-24", "titan-23"} + cfg.SSHNodeHosts["titan-24"] = "titan-24" + cfg.SSHNodeHosts["titan-23"] = "titan-23" + + runEtcdModeErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + if name == "ssh" && strings.Contains(command, "systemctl cat k3s") { + return "", errors.New("unit read failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchEtcdModeErr := newLifecycleMatrixOrchestrator(t, cfg, false, runEtcdModeErr, runEtcdModeErr, "") + if err := orchEtcdModeErr.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{ControlPlane: "titan-db"}); err == nil || !strings.Contains(err.Error(), "inspect k3s service on titan-db for datastore mode") { + t.Fatalf("expected EtcdRestore datastore-mode error branch, got %v", err) + } + + runLatestErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "ssh" && strings.Contains(command, "systemctl cat k3s"): + return "ExecStart=/usr/local/bin/k3s server", nil + case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"): + return "", errors.New("snapshot list failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchLatestErr := newLifecycleMatrixOrchestrator(t, cfg, false, runLatestErr, runLatestErr, "") + if err := orchLatestErr.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{ControlPlane: "titan-db"}); err == nil || !strings.Contains(err.Error(), "resolve latest etcd snapshot") { + t.Fatalf("expected EtcdRestore latest-snapshot error branch, got %v", err) + } + + runVerifyErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "ssh" && strings.Contains(command, "systemctl cat k3s"): + return "ExecStart=/usr/local/bin/k3s server", nil + case name == "ssh" && strings.Contains(command, "stat -c %s"): + return "1", nil + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchVerifyErr := newLifecycleMatrixOrchestrator(t, cfg, false, runVerifyErr, runVerifyErr, "") + if err := orchVerifyErr.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{ + ControlPlane: "titan-db", + SnapshotPath: "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown", + }); err == nil || !strings.Contains(err.Error(), "snapshot too small") { + t.Fatalf("expected EtcdRestore verify-snapshot error branch, got %v", err) + } + + runStartErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + command := name + " " + strings.Join(args, " ") + switch { + case name == "ssh" && strings.Contains(command, "systemctl cat k3s"): + return "ExecStart=/usr/local/bin/k3s server", nil + case name == "ssh" && strings.Contains(command, "stat -c %s"): + return "2097152", nil + case name == "ssh" && strings.Contains(command, "sha256sum"): + return "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil + case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"): + return "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown", nil + case name == "ssh" && strings.Contains(command, "server --cluster-reset"): + return "", nil + case name == "ssh" && strings.Contains(command, "sudo systemctl start k3s || true") && strings.Contains(command, "atlas@titan-db"): + return "", errors.New("start failed") + default: + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + } + orchStartErr := newLifecycleMatrixOrchestrator(t, cfg, false, runStartErr, runStartErr, "") + if err := orchStartErr.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{ + ControlPlane: "titan-db", + SnapshotPath: "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown", + }); err == nil || !strings.Contains(err.Error(), "failed to start k3s on restore control plane") { + t.Fatalf("expected EtcdRestore restore-node start error branch, got %v", err) + } + + cfgShutdownLock := lifecycleConfig(t) + cfgShutdownLock.State.LockPath = t.TempDir() + orchShutdownLock := newLifecycleMatrixOrchestrator(t, cfgShutdownLock, false, nil, nil, "") + if err := orchShutdownLock.Shutdown(context.Background(), cluster.ShutdownOptions{Reason: "lock"}); err == nil { + t.Fatalf("expected shutdown lock acquire failure branch") + } + + cfgShutdownInventory := lifecycleConfig(t) + cfgShutdownInventory.Workers = nil + cfgShutdownInventory.SSHManagedNodes = []string{"titan-db"} + cfgShutdownInventory.SSHNodeHosts = map[string]string{ + "titan-db": "titan-db", + } + runWorkersErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { + if name == "kubectl" && len(args) >= 2 && args[0] == "get" && args[1] == "nodes" { + return "", errors.New("discover workers failed") + } + return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) + } + orchWorkersErr := newLifecycleMatrixOrchestrator(t, cfgShutdownInventory, false, runWorkersErr, runWorkersErr, "") + if err := orchWorkersErr.Shutdown(context.Background(), cluster.ShutdownOptions{Reason: "workers"}); err == nil || !strings.Contains(err.Error(), "discover workers") { + t.Fatalf("expected shutdown effectiveWorkers error branch, got %v", err) + } + }) + }) + }) +}