package orchestrator import ( "context" "encoding/base64" "errors" "io" "log" "net" "os" "path/filepath" "strings" "testing" "time" "scm.bstein.dev/bstein/ananke/internal/cluster" "scm.bstein.dev/bstein/ananke/internal/config" "scm.bstein.dev/bstein/ananke/internal/execx" "scm.bstein.dev/bstein/ananke/internal/state" ) // newHookOrchestratorAdvanced runs one orchestration or CLI step. // Signature: newHookOrchestratorAdvanced(t *testing.T, cfg config.Config, dryRun bool, run commandOverride, runSensitive commandOverride) (*cluster.Orchestrator, *commandRecorder). // Why: this part10 matrix needs dry-run and non-dry-run variants while keeping // command dispatch deterministic from the top-level testing module. func newHookOrchestratorAdvanced( t *testing.T, cfg config.Config, dryRun bool, run func(context.Context, time.Duration, string, ...string) (string, error), runSensitive func(context.Context, time.Duration, string, ...string) (string, error), ) (*cluster.Orchestrator, *commandRecorder) { t.Helper() if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil { t.Fatalf("ensure state dir: %v", err) } recorder := &commandRecorder{} if run == nil || runSensitive == nil { base := lifecycleDispatcher(recorder) if run == nil { run = base } if runSensitive == nil { runSensitive = base } } orch := cluster.New(cfg, &execx.Runner{DryRun: dryRun}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0)) orch.SetCommandOverrides(run, runSensitive) return orch, recorder } // TestHookGapMatrixPart10LowFileClosure runs one orchestration or CLI step. // Signature: TestHookGapMatrixPart10LowFileClosure(t *testing.T). // Why: closes remaining branch gaps on low-coverage orchestrator files using // targeted hook-level scenarios instead of brittle full-drill reruns. func TestHookGapMatrixPart10LowFileClosure(t *testing.T) { t.Run("critical-vault-low-branches", func(t *testing.T) { t.Run("vault-sealed-parse-error", func(t *testing.T) { cfg := lifecycleConfig(t) run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "vault status -format=json") { return "{invalid-json", nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) if _, err := orch.TestHookVaultSealed(context.Background()); err == nil || !strings.Contains(err.Error(), "parse vault status") { t.Fatalf("expected vault status parse error branch, got %v", err) } }) t.Run("vault-unseal-key-empty-decoded", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.VaultUnsealBreakglassCommand = "" encodedEmpty := base64.StdEncoding.EncodeToString([]byte(" ")) run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "get secret vault-init") { return encodedEmpty, nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) if _, err := orch.TestHookVaultUnsealKey(context.Background()); err == nil || !strings.Contains(err.Error(), "vault-init unseal key is empty") { t.Fatalf("expected empty decoded unseal key branch, got %v", err) } }) t.Run("write-unseal-key-file-write-error", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.VaultUnsealKeyFile = t.TempDir() orch, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil) if err := orch.TestHookWriteVaultUnsealKeyFile("vault-key"); err == nil || !strings.Contains(err.Error(), "write vault unseal key file") { t.Fatalf("expected write failure branch when key path is a directory, got %v", err) } }) t.Run("workload-ready-no-value-and-ensure-error", func(t *testing.T) { cfg := lifecycleConfig(t) runNoValue := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "jsonpath={.status.readyReplicas}") { return "", nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchNoValue, _ := newHookOrchestratorAdvanced(t, cfg, false, runNoValue, runNoValue) ready, err := orchNoValue.TestHookWorkloadReady(context.Background(), "vault", "statefulset", "vault") if err != nil || ready { t.Fatalf("expected no-value readiness branch, ready=%v err=%v", ready, err) } runEnsureErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, " scale "): return "", nil case name == "kubectl" && strings.Contains(command, "get pods -o custom-columns"): return "", nil case name == "kubectl" && strings.Contains(command, "rollout status statefulset/victoria-metrics-single-server"): return "", errors.New("rollout failed") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchEnsureErr, _ := newHookOrchestratorAdvanced(t, cfg, false, runEnsureErr, runEnsureErr) if err := orchEnsureErr.TestHookEnsureCriticalStartupWorkloads(context.Background()); err == nil || !strings.Contains(err.Error(), "rollout failed") { t.Fatalf("expected ensureCriticalStartupWorkloads wait error branch, got %v", err) } }) t.Run("ensure-vault-unsealed-phase-and-followup-errors", func(t *testing.T) { cfgPhase := lifecycleConfig(t) runPhase := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "get pod vault-0") { return "Pending", nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchPhase, _ := newHookOrchestratorAdvanced(t, cfgPhase, false, runPhase, runPhase) if err := orchPhase.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "pod phase") { t.Fatalf("expected pod phase guard branch, got %v", err) } cfgFollowup := lifecycleConfig(t) sealedCalls := 0 runFollowup := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get pod vault-0"): return "Running", nil case name == "kubectl" && strings.Contains(command, "get secret vault-init"): return base64.StdEncoding.EncodeToString([]byte("vault-key")), nil case name == "kubectl" && strings.Contains(command, "vault status -format=json"): sealedCalls++ if sealedCalls == 1 { return `{"sealed":true}`, nil } return "", errors.New("vault status failed") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } runSensitive := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "vault operator unseal") { return "ok", nil } return runFollowup(ctx, timeout, name, args...) } orchFollowup, _ := newHookOrchestratorAdvanced(t, cfgFollowup, false, runFollowup, runSensitive) if err := orchFollowup.TestHookEnsureVaultUnsealed(context.Background()); err == nil || !strings.Contains(err.Error(), "vault status check failed") { t.Fatalf("expected follow-up sealed status error branch, got %v", err) } }) }) t.Run("drain-and-scaling-low-branches", func(t *testing.T) { t.Run("drain-workers-error-aggregation-and-diagnostics", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Shutdown.DrainParallelism = 0 workers := []string{"w1", "w2", "w3", "w4", "w5"} run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "cordon "): return "", errors.New("cordon denied") case name == "kubectl" && strings.Contains(command, "drain "): return "", errors.New("drain denied") case name == "kubectl" && strings.Contains(command, "get pods -A --field-selector"): return strings.Join([]string{ "malformed", "ns p1 Running Deployment", "ns p2 Running Deployment", "ns p3 Running Deployment", "ns p4 Running Deployment", "ns p5 Running Deployment", "ns p6 Running Deployment", "ns p7 Running Deployment", }, "\n"), nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) err := orch.TestHookDrainWorkers(context.Background(), workers) if err == nil || !strings.Contains(err.Error(), "drain workers had 5 errors") { t.Fatalf("expected drain aggregation branch, got %v", err) } }) t.Run("run-ssh-across-nodes-clamp-and-skip", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Shutdown.SSHParallelism = 99 cfg.SSHManagedNodes = []string{"titan-db"} rec := &commandRecorder{} base := lifecycleDispatcher(rec) orch, _ := newHookOrchestratorAdvanced(t, cfg, false, base, base) orch.TestHookRunSSHAcrossNodes(context.Background(), []string{"titan-db", "not-managed"}, "noop", "echo ok") if !rec.contains("atlas@titan-db echo ok") { t.Fatalf("expected managed ssh execution branch") } }) t.Run("latest-etcd-snapshot-empty-list", func(t *testing.T) { cfg := lifecycleConfig(t) run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "ssh" && strings.Contains(command, "etcd-snapshot ls") { return "NAME LOCATION", nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) if _, err := orch.TestHookLatestEtcdSnapshotPath(context.Background(), "titan-db"); err == nil || !strings.Contains(err.Error(), "no etcd snapshots found") { t.Fatalf("expected empty snapshot-list branch, got %v", err) } }) t.Run("scaling-fallback-and-snapshot-writer-errors", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Workers = nil cfg.SSHManagedNodes = []string{"titan-db", "titan-23", " "} cfg.ControlPlanes = []string{"titan-db"} runWorkers := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { if name == "kubectl" && len(args) >= 2 && args[0] == "get" && args[1] == "nodes" { return "", errors.New("api down") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchWorkers, _ := newHookOrchestratorAdvanced(t, cfg, false, runWorkers, runWorkers) workers, err := orchWorkers.TestHookEffectiveWorkers(context.Background()) if err != nil || len(workers) == 0 { t.Fatalf("expected inventory worker fallback branch, workers=%v err=%v", workers, err) } cfgWrite := lifecycleConfig(t) cfgWrite.State.Dir = t.TempDir() // Force write failure by making the snapshot output path a directory. if err := os.MkdirAll(filepath.Join(cfgWrite.State.Dir, "scaled-workloads.json"), 0o755); err != nil { t.Fatalf("create snapshot blocker directory: %v", err) } runWrite := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get deployment -A -o jsonpath="): return "monitoring\tgrafana\t1\n", nil case name == "kubectl" && strings.Contains(command, "get statefulset -A -o jsonpath="): return "", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchWrite, _ := newHookOrchestratorAdvanced(t, cfgWrite, false, runWrite, runWrite) if err := orchWrite.TestHookScaleDownApps(context.Background()); err == nil || !strings.Contains(err.Error(), "write scaled workload snapshot") { t.Fatalf("expected scaled snapshot write-failure branch, got %v", err) } }) }) t.Run("flux-ingress-service-and-inventory-low-branches", func(t *testing.T) { t.Run("flux-health-ready-reason-fallback-and-heal-query-error", func(t *testing.T) { cfg := lifecycleConfig(t) runReady := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"): return `{"items":[{"metadata":{"namespace":"","name":"skip"},"spec":{"suspend":false},"status":{"conditions":[]}},{"metadata":{"namespace":"flux-system","name":"apps"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","reason":"","message":""}]}}]}`, nil case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"): return "", errors.New("jobs query failed") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchReady, _ := newHookOrchestratorAdvanced(t, cfg, false, runReady, runReady) ready, detail, err := orchReady.TestHookFluxHealthReady(context.Background()) if err != nil || ready || !strings.Contains(detail, "ready=false") { t.Fatalf("expected flux ready-reason fallback branch, ready=%v detail=%q err=%v", ready, detail, err) } if _, err := orchReady.TestHookHealImmutableFluxJobs(context.Background()); err == nil || !strings.Contains(err.Error(), "query jobs") { t.Fatalf("expected immutable-job query error branch, got %v", err) } }) t.Run("wait-for-flux-health-immutable-heal-path-with-cancel", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.FluxHealthWaitSeconds = 1 cfg.Startup.FluxHealthPollSeconds = 1 run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"): return `{"items":[{"metadata":{"namespace":"flux-system","name":"apps"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","message":"Job failed: field is immutable","reason":"ReconciliationFailed"}]}}]}`, nil case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"): return "", errors.New("jobs unavailable") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) ctx, cancel := context.WithCancel(context.Background()) cancel() if err := orch.TestHookWaitForFluxHealth(ctx); !errors.Is(err, context.Canceled) { t.Fatalf("expected canceled flux wait branch, got %v", err) } }) t.Run("required-node-labels-skip-empty-and-apply-one", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.RequiredNodeLabels = map[string]map[string]string{ "node-empty-map": {}, "node-empty-val": {"zone": " "}, "node-apply": {"zone": "lab-a"}, } rec := &commandRecorder{} base := lifecycleDispatcher(rec) orch, _ := newHookOrchestratorAdvanced(t, cfg, false, base, base) if err := orch.TestHookEnsureRequiredNodeLabels(context.Background()); err != nil { t.Fatalf("expected ensureRequiredNodeLabels skip/apply branches, got %v", err) } if !rec.contains("label node node-apply --overwrite zone=lab-a") { t.Fatalf("expected non-empty label application branch") } }) t.Run("wait-for-startup-convergence-dryrun-and-critical-endpoint-fail", func(t *testing.T) { cfgDry := lifecycleConfig(t) orchDry, _ := newHookOrchestratorAdvanced(t, cfgDry, true, nil, nil) if err := orchDry.TestHookWaitForStartupConvergence(context.Background()); err != nil { t.Fatalf("expected startup convergence dry-run fast path, got %v", err) } cfgFail := lifecycleConfig(t) cfgFail.Startup.RequireIngressChecklist = false cfgFail.Startup.RequireServiceChecklist = false cfgFail.Startup.RequireFluxHealth = false cfgFail.Startup.RequireWorkloadConvergence = false cfgFail.Startup.RequireCriticalServiceEndpoints = true run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "get endpoints") { return "", errors.New("endpoint query failed") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchFail, _ := newHookOrchestratorAdvanced(t, cfgFail, false, run, run) if err := orchFail.TestHookWaitForStartupConvergence(context.Background()); err == nil || !strings.Contains(err.Error(), "query endpoints") { t.Fatalf("expected critical-endpoint convergence failure branch, got %v", err) } }) t.Run("ingress-namespace-discovery-empty-and-query-error", func(t *testing.T) { cfg := lifecycleConfig(t) orchEmpty, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil) namespaces, err := orchEmpty.TestHookDiscoverIngressNamespacesForHost(context.Background(), " ") if err != nil || len(namespaces) != 0 { t.Fatalf("expected empty-host fast path, namespaces=%v err=%v", namespaces, err) } runErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "get ingress -A -o json") { return "", errors.New("ingress query failed") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchErr, _ := newHookOrchestratorAdvanced(t, cfg, false, runErr, runErr) if _, err := orchErr.TestHookDiscoverIngressNamespacesForHost(context.Background(), "metrics.bstein.dev"); err == nil || !strings.Contains(err.Error(), "query ingresses") { t.Fatalf("expected ingress query error branch, got %v", err) } }) t.Run("service-checklist-name-fallback-and-contains-empty-marker", func(t *testing.T) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("open local listener: %v", err) } defer listener.Close() go func() { conn, acceptErr := listener.Accept() if acceptErr == nil { _, _ = conn.Write([]byte("HTTP/1.1 503 Service Unavailable\r\nContent-Length: 3\r\n\r\nbad")) _ = conn.Close() } }() cfg := lifecycleConfig(t) cfg.Startup.ServiceChecklist = []config.ServiceChecklistCheck{{ Name: "", URL: "http://" + listener.Addr().String() + "/health", AcceptedStatuses: []int{200}, }} orch, _ := newHookOrchestratorAdvanced(t, cfg, false, nil, nil) ready, detail := orch.TestHookServiceChecklistReady(context.Background()) if ready || !strings.Contains(detail, "http://") { t.Fatalf("expected service checklist URL-name fallback failure, ready=%v detail=%q", ready, detail) } if !cluster.TestHookChecklistContains("any body", " ") { t.Fatalf("expected empty-marker compact contains branch") } }) t.Run("node-inventory-reachability-defaults-and-cancel", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.RequireNodeInventoryReach = true cfg.Startup.NodeInventoryReachWaitSeconds = 0 cfg.Startup.NodeInventoryReachPollSeconds = 0 cfg.Startup.IgnoreUnavailableNodes = []string{"titan-23"} run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "ssh" && strings.Contains(command, "__ANANKE_NODE_REACHABLE__") { return "unexpected", nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) ctx, cancel := context.WithCancel(context.Background()) cancel() if err := orch.TestHookWaitForNodeInventoryReachability(ctx); !errors.Is(err, context.Canceled) { t.Fatalf("expected canceled inventory reachability branch, got %v", err) } }) }) t.Run("poststart-timesync-workload-branches", func(t *testing.T) { t.Run("poststart-default-poll-and-cancel", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.PostStartProbes = []string{"https://metrics.bstein.dev/api/health"} cfg.Startup.PostStartProbeWaitSeconds = 0 cfg.Startup.PostStartProbePollSeconds = 0 run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { if name == "curl" { return "", errors.New("curl failed") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) ctx, cancel := context.WithCancel(context.Background()) cancel() if err := orch.TestHookWaitForPostStartProbes(ctx); !errors.Is(err, context.Canceled) { t.Fatalf("expected canceled post-start branch, got %v", err) } }) t.Run("resume-flux-with-commandexists-and-flux-fail-warning-path", func(t *testing.T) { cfg := lifecycleConfig(t) fakeBin := t.TempDir() fluxPath := filepath.Join(fakeBin, "flux") if err := os.WriteFile(fluxPath, []byte("#!/bin/sh\nexit 0\n"), 0o755); err != nil { t.Fatalf("write fake flux binary: %v", err) } t.Setenv("PATH", fakeBin+string(os.PathListSeparator)+os.Getenv("PATH")) run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { if name == "flux" { return "", errors.New("flux reconcile failed") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) if err := orch.TestHookResumeFluxAndReconcile(context.Background()); err != nil { t.Fatalf("expected resume flux warning-only branch, got %v", err) } }) t.Run("timesync-defaults-quorum-and-datastore-cancel", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.TimeSyncMode = "quorum" cfg.Startup.TimeSyncWaitSeconds = 0 cfg.Startup.TimeSyncPollSeconds = 0 cfg.Startup.TimeSyncQuorum = 0 cfg.SSHManagedNodes = []string{"titan-db"} cfg.SSHNodeHosts["db-host"] = "127.0.0.1" run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "sh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"): return "no", nil case name == "ssh" && strings.Contains(command, "timedatectl show -p NTPSynchronized"): return "", errors.New("ssh timed out") case name == "ssh" && strings.Contains(command, "systemctl cat k3s"): return "ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://127.0.0.1/k3s", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) ctx, cancel := context.WithCancel(context.Background()) cancel() if err := orch.TestHookWaitForTimeSync(ctx, []string{"", "titan-db"}); !errors.Is(err, context.Canceled) { t.Fatalf("expected canceled time-sync branch, got %v", err) } if err := orch.TestHookPreflightExternalDatastore(ctx); !errors.Is(err, context.Canceled) { t.Fatalf("expected canceled datastore preflight branch, got %v", err) } }) t.Run("workload-convergence-and-ignore-quick-branches", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.WorkloadConvergenceWaitSeconds = 0 cfg.Startup.WorkloadConvergencePollSeconds = 0 cfg.Startup.IgnoreUnavailableNodes = []string{"titan-22"} run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get deploy,statefulset,daemonset -A -o json"): return `{"items":[{"kind":"","metadata":{"namespace":"monitoring","name":"skip"}},{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"replicas-zero"},"spec":{"replicas":0,"template":{"spec":{}}},"status":{"readyReplicas":0}},{"kind":"DaemonSet","metadata":{"namespace":"apps","name":"on-ignored"},"spec":{"template":{"spec":{"nodeSelector":{"kubernetes.io/hostname":"titan-22"}}}},"status":{"desiredNumberScheduled":1,"numberReady":0}}]}`, nil case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[{"metadata":{"namespace":"","name":"skip"}},{"metadata":{"namespace":"apps","name":"stuck","creationTimestamp":"2000-01-01T00:00:00Z","ownerReferences":[{"kind":"ReplicaSet"}]},"status":{"phase":"Pending","containerStatuses":[{"state":{"waiting":{"reason":"CrashLoopBackOff"}}}]}}]}`, nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestratorAdvanced(t, cfg, false, run, run) if err := orch.TestHookWaitForWorkloadConvergence(context.Background()); err != nil { t.Fatalf("expected workload convergence default-branch success, got %v", err) } cfgIgnore := lifecycleConfig(t) cfgIgnore.Startup.AutoRecycleStuckPods = false orchIgnoreDry, _ := newHookOrchestratorAdvanced(t, cfgIgnore, true, run, run) now := time.Now().UTC().Add(-time.Hour) orchIgnoreDry.TestHookMaybeAutoRecycleStuckPods(context.Background(), &now) orchIgnoreDry.TestHookMaybeAutoHealCriticalWorkloadReplicas(context.Background(), &now) runHealErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json") { return "", errors.New("workload query failed") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchHealErr, _ := newHookOrchestratorAdvanced(t, lifecycleConfig(t), false, runHealErr, runHealErr) if _, err := orchHealErr.TestHookHealCriticalWorkloadReplicas(context.Background()); err == nil || !strings.Contains(err.Error(), "query workloads") { t.Fatalf("expected critical workload heal query-error branch, got %v", err) } }) }) }