package cluster import ( "context" "errors" "io" "log" "net" "path/filepath" "strings" "sync" "testing" "time" "scm.bstein.dev/bstein/ananke/internal/config" "scm.bstein.dev/bstein/ananke/internal/execx" "scm.bstein.dev/bstein/ananke/internal/state" ) type commandStub struct { match func(name string, args []string) bool out string err error } // buildOrchestratorWithStubs runs one orchestration or CLI step. // Signature: buildOrchestratorWithStubs(t *testing.T, cfg config.Config, stubs []commandStub) *Orchestrator. // Why: helper centralizes deterministic command dispatch for fast, isolated unit tests. func buildOrchestratorWithStubs(t *testing.T, cfg config.Config, stubs []commandStub) *Orchestrator { t.Helper() if cfg.State.Dir == "" { cfg.State.Dir = t.TempDir() } if cfg.State.ReportsDir == "" { cfg.State.ReportsDir = filepath.Join(cfg.State.Dir, "reports") } if cfg.State.RunHistoryPath == "" { cfg.State.RunHistoryPath = filepath.Join(cfg.State.Dir, "runs.json") } orch := &Orchestrator{ cfg: cfg, runner: &execx.Runner{}, store: state.New(cfg.State.RunHistoryPath), log: log.New(io.Discard, "", 0), } dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { for _, stub := range stubs { if stub.match(name, args) { return stub.out, stub.err } } return "", nil } orch.runOverride = dispatch orch.runSensitiveOverride = dispatch return orch } // matchContains runs one orchestration or CLI step. // Signature: matchContains(cmd string, parts ...string) func(string, []string) bool. // Why: concise substring matching keeps command stubs readable across many tests. func matchContains(cmd string, parts ...string) func(string, []string) bool { return func(name string, args []string) bool { if name != cmd { return false } joined := strings.Join(args, " ") for _, part := range parts { if !strings.Contains(joined, part) { return false } } return true } } // TestStartupEarlyFailureLeavesFluxSuspensionUnchanged runs one orchestration or CLI step. // Signature: TestStartupEarlyFailureLeavesFluxSuspensionUnchanged(t *testing.T). // Why: recovery must not release Flux when bootstrap fails before storage and // critical workloads are ready, or Flux can re-create the same dependency loop. func TestStartupEarlyFailureLeavesFluxSuspensionUnchanged(t *testing.T) { tmpDir := t.TempDir() cfg := config.Config{ SSHPort: 2277, Startup: config.Startup{ APIWaitSeconds: 1, APIPollSeconds: 1, RequireNodeInventoryReach: false, RequireTimeSync: false, RequireNodeSSHAuth: false, ReconcileAccessOnBoot: false, AutoEtcdRestoreOnAPIFailure: false, RequiredNodeLabels: map[string]map[string]string{ "titan-missing": { "node-role.kubernetes.io/worker": "true", }, }, }, State: config.State{ Dir: tmpDir, ReportsDir: filepath.Join(tmpDir, "reports"), RunHistoryPath: filepath.Join(tmpDir, "runs.json"), LockPath: filepath.Join(tmpDir, "ananke.lock"), IntentPath: filepath.Join(tmpDir, "intent.json"), }, } var mu sync.Mutex calls := []string{} orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ { match: func(name string, args []string) bool { mu.Lock() calls = append(calls, name+" "+strings.Join(args, " ")) mu.Unlock() return false }, }, {match: matchContains("kubectl", "version", "--request-timeout=5s"), out: "ok"}, {match: matchContains("kubectl", "-n", "vault", "get", "pod", "vault-0"), out: "Pending"}, { match: matchContains("kubectl", "label", "node", "titan-missing"), err: errors.New(`nodes "titan-missing" not found`), }, }) err := orch.Startup(context.Background(), StartupOptions{Reason: "test early failure"}) if err == nil { t.Fatalf("expected startup to fail before flux resume") } if !strings.Contains(err.Error(), "ensure required node labels on titan-missing") { t.Fatalf("expected required-label failure, got: %v", err) } mu.Lock() defer mu.Unlock() for _, call := range calls { if strings.Contains(call, `"suspend":false`) { t.Fatalf("early failed startup unexpectedly resumed flux via call: %s", call) } } } // TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods runs one orchestration or CLI step. // Signature: TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T). // Why: Pending Longhorn-backed pods on Longhorn-unready nodes should be // rescheduled without mutating Longhorn volume, replica, or disk objects. func TestRecycleStuckControllerPodsHandlesLonghornAttachBlockedPods(t *testing.T) { created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) lastSeen := time.Now().UTC().Format(time.RFC3339) pods := `{"items":[{"metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server-0","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"StatefulSet","name":"victoria-metrics-single-server"}]},"spec":{"nodeName":"titan-0b"},"status":{"phase":"Pending"}}]}` events := `{"items":[{"metadata":{"namespace":"monitoring","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"monitoring","name":"victoria-metrics-single-server-0"},"type":"Warning","reason":"FailedAttachVolume","message":"AttachVolume.Attach failed for volume \"pvc-1\" : rpc error from [http://longhorn-backend:9500/v1/volumes/pvc-1?action=attach]: unable to attach volume pvc-1 to titan-0b: node titan-0b is not ready","lastTimestamp":"` + lastSeen + `"}]}` deleted := false orch := buildOrchestratorWithStubs(t, config.Config{ Startup: config.Startup{StuckPodGraceSeconds: 180}, }, []commandStub{ {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-0b\tFalse\n"}, {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-0b"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, { match: func(name string, args []string) bool { if !matchContains("kubectl", "-n", "monitoring", "delete", "pod", "victoria-metrics-single-server-0", "--wait=false")(name, args) { return false } deleted = true return true }, }, }) if err := orch.recycleStuckControllerPods(context.Background()); err != nil { t.Fatalf("recycleStuckControllerPods failed: %v", err) } if !deleted { t.Fatalf("expected longhorn attach-blocked pending pod to be recycled") } } // TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup runs one orchestration or CLI step. // Signature: TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T). // Why: encrypted Longhorn PVC recovery should repair missing host cryptsetup and // then recycle the blocked pod without touching Longhorn data-plane objects. func TestRecycleStuckControllerPodsRepairsEncryptedVolumeCryptsetup(t *testing.T) { created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) lastSeen := time.Now().UTC().Format(time.RFC3339) pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}` events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}` installed := false deleted := false orch := buildOrchestratorWithStubs(t, config.Config{ Startup: config.Startup{StuckPodGraceSeconds: 180}, }, []commandStub{ {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"}, {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, { match: func(name string, args []string) bool { if name != "ssh" || !strings.Contains(strings.Join(args, " "), "apt-get install -y --no-install-recommends cryptsetup-bin") { return false } installed = true return true }, out: "__ANANKE_CRYPTSETUP_INSTALLED__", }, { match: func(name string, args []string) bool { if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) { return false } deleted = true return true }, }, }) if err := orch.recycleStuckControllerPods(context.Background()); err != nil { t.Fatalf("recycleStuckControllerPods failed: %v", err) } if !installed { t.Fatalf("expected missing host cryptsetup to be installed") } if !deleted { t.Fatalf("expected encrypted-volume blocked pod to be recycled") } } // TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails runs one orchestration or CLI step. // Signature: TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T). // Why: when host package repair is blocked by sudo policy, Ananke should avoid // the bad node and retry the controller-owned pod elsewhere. func TestRecycleStuckControllerPodsCordonsEncryptedVolumeNodeWhenRepairFails(t *testing.T) { created := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) lastSeen := time.Now().UTC().Format(time.RFC3339) pods := `{"items":[{"metadata":{"namespace":"finance","name":"actual-budget-abc","creationTimestamp":"` + created + `","ownerReferences":[{"kind":"ReplicaSet","name":"actual-budget"}]},"spec":{"nodeName":"titan-19"},"status":{"phase":"Pending"}}]}` events := `{"items":[{"metadata":{"namespace":"finance","creationTimestamp":"` + lastSeen + `"},"involvedObject":{"kind":"Pod","namespace":"finance","name":"actual-budget-abc"},"type":"Warning","reason":"FailedMount","message":"MountVolume.MountDevice failed for volume \"pvc-1\" : nsenter: failed to execute cryptsetup: No such file or directory","lastTimestamp":"` + lastSeen + `"}]}` cordoned := false deleted := false orch := buildOrchestratorWithStubs(t, config.Config{ Startup: config.Startup{StuckPodGraceSeconds: 180}, }, []commandStub{ {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-19\tTrue\n"}, {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: events}, {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-19"},"status":{"conditions":[{"type":"Ready","status":"True"}]}}]}`}, { match: matchContains("ssh", "apt-get install -y --no-install-recommends cryptsetup-bin"), err: errors.New("sudo: a password is required"), }, { match: func(name string, args []string) bool { if !matchContains("kubectl", "cordon", "titan-19")(name, args) { return false } cordoned = true return true }, }, { match: func(name string, args []string) bool { if !matchContains("kubectl", "-n", "finance", "delete", "pod", "actual-budget-abc", "--wait=false")(name, args) { return false } deleted = true return true }, }, }) if err := orch.recycleStuckControllerPods(context.Background()); err != nil { t.Fatalf("recycleStuckControllerPods failed: %v", err) } if !cordoned { t.Fatalf("expected cryptsetup-missing node to be cordoned") } if !deleted { t.Fatalf("expected encrypted-volume blocked pod to be recycled") } } // TestRecycleStuckControllerPodsHandlesUnknownPodsOnReadyNodes runs one orchestration or CLI step. // Signature: TestRecycleStuckControllerPodsHandlesUnknownPodsOnReadyNodes(t *testing.T). // Why: post-outage controller pods can remain Unknown after their node recovers; // normal deletion clears stale status without force-deleting or touching storage. func TestRecycleStuckControllerPodsHandlesUnknownPodsOnReadyNodes(t *testing.T) { old := time.Now().Add(-10 * time.Minute).UTC().Format(time.RFC3339) recent := time.Now().Add(-30 * time.Second).UTC().Format(time.RFC3339) pods := `{"items":[` + `{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-old","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` + `{"metadata":{"namespace":"longhorn-system","name":"longhorn-vault-sync-fresh","creationTimestamp":"` + recent + `","ownerReferences":[{"kind":"ReplicaSet","name":"longhorn-vault-sync"}]},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}},` + `{"metadata":{"namespace":"maintenance","name":"stale-on-bad-node","creationTimestamp":"` + old + `","ownerReferences":[{"kind":"ReplicaSet","name":"maintenance"}]},"spec":{"nodeName":"titan-22"},"status":{"phase":"Unknown"}},` + `{"metadata":{"namespace":"default","name":"bare-pod","creationTimestamp":"` + old + `"},"spec":{"nodeName":"titan-12"},"status":{"phase":"Unknown"}}]}` deleted := []string{} orch := buildOrchestratorWithStubs(t, config.Config{ Startup: config.Startup{StuckPodGraceSeconds: 180}, }, []commandStub{ {match: matchContains("kubectl", "get", "pods", "-A", "-o", "json"), out: pods}, {match: matchContains("kubectl", "-n", "longhorn-system", "get", "nodes.longhorn.io"), out: "titan-12\tTrue\ntitan-22\tTrue\n"}, {match: matchContains("kubectl", "get", "events", "-A", "-o", "json"), out: `{"items":[]}`}, {match: matchContains("kubectl", "get", "nodes", "-o", "json"), out: `{"items":[{"metadata":{"name":"titan-12"},"status":{"conditions":[{"type":"Ready","status":"True"}]}},{"metadata":{"name":"titan-22"},"status":{"conditions":[{"type":"Ready","status":"False"}]}}]}`}, { match: func(name string, args []string) bool { if !matchContains("kubectl", "-n", "longhorn-system", "delete", "pod", "longhorn-vault-sync-old", "--wait=false")(name, args) { return false } deleted = append(deleted, "longhorn-vault-sync-old") return true }, }, }) if err := orch.recycleStuckControllerPods(context.Background()); err != nil { t.Fatalf("recycleStuckControllerPods failed: %v", err) } if len(deleted) != 1 || deleted[0] != "longhorn-vault-sync-old" { t.Fatalf("expected only old Unknown controller pod on Ready node to be recycled, got %#v", deleted) } } // TestNewConstructsOrchestrator runs one orchestration or CLI step. // Signature: TestNewConstructsOrchestrator(t *testing.T). // Why: covers constructor path in orchestrator core module. func TestNewConstructsOrchestrator(t *testing.T) { cfg := config.Config{State: config.State{RunHistoryPath: filepath.Join(t.TempDir(), "runs.json")}} r := &execx.Runner{} s := state.New(cfg.State.RunHistoryPath) orch := New(cfg, r, s, log.New(io.Discard, "", 0)) if orch == nil || orch.runner != r || orch.store != s { t.Fatalf("constructor returned unexpected orchestrator: %#v", orch) } } // TestParseSnapshotPathFromEtcdSnapshotList runs one orchestration or CLI step. // Signature: TestParseSnapshotPathFromEtcdSnapshotList(t *testing.T). // Why: covers snapshot-path parser branches including header skip and no-match. func TestParseSnapshotPathFromEtcdSnapshotList(t *testing.T) { out := strings.Join([]string{ "Name Size Created Location", `pre-shutdown 4.2M now "file:///var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"`, }, "\n") got := parseSnapshotPathFromEtcdSnapshotList(out) if got != "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown" { t.Fatalf("unexpected snapshot path: %q", got) } if parseSnapshotPathFromEtcdSnapshotList("no snapshots") != "" { t.Fatalf("expected no snapshot path") } } // TestFluxSourceHelpers runs one orchestration or CLI step. // Signature: TestFluxSourceHelpers(t *testing.T). // Why: covers flux source readiness/guard/branch patch helper flows. func TestFluxSourceHelpers(t *testing.T) { cfg := config.Config{ ExpectedFluxSource: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git", } orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ {match: matchContains("kubectl", "jsonpath={.status.conditions"), out: "True"}, {match: matchContains("kubectl", "jsonpath={.spec.url}"), out: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git"}, {match: matchContains("kubectl", "jsonpath={.spec.ref.branch}"), out: "main"}, {match: matchContains("kubectl", "patch", "gitrepository"), out: ""}, }) ready, err := orch.fluxSourceReady(context.Background()) if err != nil || !ready { t.Fatalf("expected flux source ready, got ready=%v err=%v", ready, err) } if err := orch.guardFluxSourceDrift(context.Background(), "main", false); err != nil { t.Fatalf("guardFluxSourceDrift failed: %v", err) } if err := orch.ensureFluxBranch(context.Background(), "main", false); err != nil { t.Fatalf("ensureFluxBranch no-op failed: %v", err) } if got := normalizeGitURL(" SSH://Git@Host/Repo.git/ "); got != "ssh://git@host/repo" { t.Fatalf("unexpected normalized url: %q", got) } } // TestCoordinationHelpers runs one orchestration or CLI step. // Signature: TestCoordinationHelpers(t *testing.T). // Why: covers intent-age helpers, shell quoting, and peer selection logic. func TestCoordinationHelpers(t *testing.T) { in := state.Intent{UpdatedAt: time.Now().Add(-10 * time.Second)} if intentAge(in) <= 0 { t.Fatalf("expected positive age") } if !intentFresh(state.Intent{}, time.Second) { t.Fatalf("zero timestamp should be fresh") } if shellQuote("a'b") != `'a'"'"'b'` { t.Fatalf("unexpected shell quote output") } orch := buildOrchestratorWithStubs(t, config.Config{ Coordination: config.Coordination{ PeerHosts: []string{"titan-24", "titan-24", "titan-db"}, ForwardShutdownHost: "titan-db", }, }, nil) peers := orch.coordinationPeers() if len(peers) != 2 { t.Fatalf("expected deduped peers, got %v", peers) } } // TestVerifyEtcdSnapshotAndRunSudoK3S runs one orchestration or CLI step. // Signature: TestVerifyEtcdSnapshotAndRunSudoK3S(t *testing.T). // Why: covers k3s command fallback and snapshot verification happy path. func TestVerifyEtcdSnapshotAndRunSudoK3S(t *testing.T) { orch := buildOrchestratorWithStubs(t, config.Config{}, []commandStub{ {match: matchContains("ssh", "stat -c %s"), out: "2097152"}, {match: matchContains("ssh", "k3s etcd-snapshot ls"), out: "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"}, {match: matchContains("ssh", "sha256sum"), out: strings.Repeat("a", 64)}, }) if err := orch.verifyEtcdSnapshot(context.Background(), "titan-0a", "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"); err != nil { t.Fatalf("verifyEtcdSnapshot failed: %v", err) } } // TestScalingHelpers runs one orchestration or CLI step. // Signature: TestScalingHelpers(t *testing.T). // Why: covers workload discovery, snapshot IO, and scale command orchestration. func TestScalingHelpers(t *testing.T) { cfg := config.Config{ ExcludedNamespaces: []string{"kube-system"}, State: config.State{Dir: t.TempDir()}, } orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ { match: matchContains("kubectl", "get deployment", "jsonpath"), out: strings.Join([]string{ "default\tgrafana\t1", "kube-system\tcoredns\t2", "", }, "\n"), }, { match: matchContains("kubectl", "get statefulset", "jsonpath"), out: "vault\tvault\t1\n", }, {match: matchContains("kubectl", "scale", "deployment", "grafana"), out: ""}, {match: matchContains("kubectl", "scale", "statefulset", "vault"), out: ""}, }) entries, err := orch.listScalableWorkloads(context.Background()) if err != nil { t.Fatalf("listScalableWorkloads failed: %v", err) } if len(entries) != 2 { t.Fatalf("expected 2 scalable entries, got %d (%v)", len(entries), entries) } if err := orch.writeScaledWorkloadSnapshot(entries); err != nil { t.Fatalf("writeScaledWorkloadSnapshot failed: %v", err) } snapshot, err := orch.readScaledWorkloadSnapshot() if err != nil || snapshot == nil || len(snapshot.Entries) != 2 { t.Fatalf("readScaledWorkloadSnapshot failed snapshot=%v err=%v", snapshot, err) } if err := orch.scaleWorkloads(context.Background(), entries, -1, 2); err != nil { t.Fatalf("scaleWorkloads failed: %v", err) } } // TestStorageReadyAndWorkloadHelpers runs one orchestration or CLI step. // Signature: TestStorageReadyAndWorkloadHelpers(t *testing.T). // Why: covers storage readiness checks and workload helper utilities. func TestStorageReadyAndWorkloadHelpers(t *testing.T) { cfg := config.Config{ Startup: config.Startup{ StorageMinReadyNodes: 1, StorageCriticalPVCs: []string{"vault/data-vault-0"}, }, } orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ {match: matchContains("kubectl", "nodes.longhorn.io"), out: "titan-23:True:True\n"}, {match: matchContains("kubectl", "get pvc data-vault-0"), out: "Bound"}, }) ok, reason, err := orch.storageReady(context.Background()) if err != nil || !ok { t.Fatalf("expected storageReady true, got ok=%v reason=%q err=%v", ok, reason, err) } } // TestIngressAndServiceHelpers runs one orchestration or CLI step. // Signature: TestIngressAndServiceHelpers(t *testing.T). // Why: covers ingress host discovery helpers and URL parsing helpers. func TestIngressAndServiceHelpers(t *testing.T) { cfg := config.Config{ Startup: config.Startup{ IngressChecklistIgnoreHosts: []string{"ignore.bstein.dev"}, }, } orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ {match: matchContains("kubectl", "get ingress", "-A", "-o", "json"), out: `{"items":[{"metadata":{"namespace":"gitea"},"spec":{"rules":[{"host":"scm.bstein.dev"}]}},{"metadata":{"namespace":"x"},"spec":{"rules":[{"host":"ignore.bstein.dev"}]}}]}`}, }) hosts, err := orch.discoverIngressHosts(context.Background()) if err != nil || len(hosts) != 1 || hosts[0] != "scm.bstein.dev" { t.Fatalf("discoverIngressHosts unexpected hosts=%v err=%v", hosts, err) } if got := hostFromURL("https://metrics.bstein.dev/api/health"); got != "metrics.bstein.dev" { t.Fatalf("unexpected hostFromURL value: %q", got) } if !isLikelyHostname("metrics.bstein.dev") || isLikelyHostname("bad path/value") { t.Fatalf("isLikelyHostname classification mismatch") } } // TestWorkloadConvergenceHelpers runs one orchestration or CLI step. // Signature: TestWorkloadConvergenceHelpers(t *testing.T). // Why: covers controller readiness helpers and stuck-pod heuristics. func TestWorkloadConvergenceHelpers(t *testing.T) { replicas := int32(2) item := workloadResource{Kind: "deployment"} item.Spec.Replicas = &replicas item.Status.ReadyReplicas = 1 desired, ready, ok := desiredReady(item) if !ok || desired != 2 || ready != 1 { t.Fatalf("desiredReady mismatch desired=%d ready=%d ok=%v", desired, ready, ok) } var pod podResource pod.Metadata.OwnerReferences = []ownerReference{{Kind: "ReplicaSet"}} if !podControllerOwned(pod) { t.Fatalf("expected podControllerOwned=true") } pod.Status.ContainerStatuses = []podContainerStatus{{State: podContainerState{Waiting: &podContainerWaitingState{Reason: "CrashLoopBackOff"}}}} reason := stuckContainerReason(pod, map[string]struct{}{"CrashLoopBackOff": struct{}{}}) if reason != "CrashLoopBackOff" { t.Fatalf("unexpected stuck reason: %q", reason) } } // TestDrainAndK3SHelpers runs one orchestration or CLI step. // Signature: TestDrainAndK3SHelpers(t *testing.T). // Why: covers node drain diagnostics and k3s snapshot selection flow. func TestDrainAndK3SHelpers(t *testing.T) { cfg := config.Config{ SSHManagedNodes: []string{"titan-0a"}, } orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ {match: matchContains("kubectl", "get pods", "--field-selector", "spec.nodeName=titan-22"), out: "vault vault-0 Running StatefulSet\n"}, {match: matchContains("ssh", "k3s etcd-snapshot ls"), out: "pre-shutdown /var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"}, }) diag := orch.drainNodeDiagnostics(context.Background(), "titan-22") if !strings.Contains(diag, "vault/vault-0") { t.Fatalf("unexpected diagnostics output: %q", diag) } snapshot, err := orch.latestEtcdSnapshotPath(context.Background(), "titan-0a") if err != nil || snapshot == "" { t.Fatalf("latestEtcdSnapshotPath failed snapshot=%q err=%v", snapshot, err) } } // TestTimesyncAndInventoryHelpers runs one orchestration or CLI step. // Signature: TestTimesyncAndInventoryHelpers(t *testing.T). // Why: covers time sync helpers, datastore endpoint parsing, and inventory assembly. func TestTimesyncAndInventoryHelpers(t *testing.T) { cfg := config.Config{ ControlPlanes: []string{"titan-0a"}, Workers: []string{"titan-22"}, SSHManagedNodes: []string{"titan-0a", "titan-22"}, SSHNodeHosts: map[string]string{ "titan-db": "10.0.0.10", }, Coordination: config.Coordination{ PeerHosts: []string{"titan-24"}, ForwardShutdownHost: "titan-db", }, } orch := buildOrchestratorWithStubs(t, cfg, nil) nodes := orch.inventoryNodesForValidation() if len(nodes) < 3 { t.Fatalf("expected combined inventory nodes, got %v", nodes) } if parseDatastoreEndpoint("ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://x") == "" { t.Fatalf("expected datastore endpoint parse") } if !isTimeSynced("YES") || isTimeSynced("no") { t.Fatalf("unexpected isTimeSynced behavior") } ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatalf("listen failed: %v", err) } defer ln.Close() if !orch.tcpReachable(ln.Addr().String(), 500*time.Millisecond) { t.Fatalf("expected tcpReachable=true for open listener") } } // TestShutdownModeValidation runs one orchestration or CLI step. // Signature: TestShutdownModeValidation(t *testing.T). // Why: covers removed poweroff mode and invalid-mode errors. func TestShutdownModeValidation(t *testing.T) { if mode, err := normalizeShutdownMode("cluster-only"); err != nil || mode != "cluster-only" { t.Fatalf("expected cluster-only mode, got mode=%q err=%v", mode, err) } if _, err := normalizeShutdownMode("bogus"); err == nil { t.Fatalf("expected invalid mode error") } } // TestWaitForAPIDryRunShortCircuit runs one orchestration or CLI step. // Signature: TestWaitForAPIDryRunShortCircuit(t *testing.T). // Why: covers dry-run short-circuit branch for api readiness wait. func TestWaitForAPIDryRunShortCircuit(t *testing.T) { orch := &Orchestrator{runner: &execx.Runner{DryRun: true}} if err := orch.waitForAPI(context.Background(), 1, time.Millisecond); err != nil { t.Fatalf("expected dry-run waitForAPI to pass: %v", err) } } // TestGuardFluxSourceDriftMismatch runs one orchestration or CLI step. // Signature: TestGuardFluxSourceDriftMismatch(t *testing.T). // Why: covers url-drift and branch-drift error branches. func TestGuardFluxSourceDriftMismatch(t *testing.T) { cfg := config.Config{ExpectedFluxSource: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git"} orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ {match: matchContains("kubectl", "jsonpath={.spec.url}"), out: "ssh://git@scm.bstein.dev:2242/bstein/wrong.git"}, }) if err := orch.guardFluxSourceDrift(context.Background(), "main", false); err == nil { t.Fatalf("expected guardFluxSourceDrift mismatch error") } orch = buildOrchestratorWithStubs(t, cfg, []commandStub{ {match: matchContains("kubectl", "jsonpath={.spec.url}"), out: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git"}, {match: matchContains("kubectl", "jsonpath={.spec.ref.branch}"), out: "atlasbot"}, }) if err := orch.guardFluxSourceDrift(context.Background(), "main", false); err == nil { t.Fatalf("expected branch drift error") } } // TestRunSudoK3SFailsWhenAllCandidatesFail runs one orchestration or CLI step. // Signature: TestRunSudoK3SFailsWhenAllCandidatesFail(t *testing.T). // Why: covers fallback failure return in runSudoK3S. func TestRunSudoK3SFailsWhenAllCandidatesFail(t *testing.T) { orch := buildOrchestratorWithStubs(t, config.Config{}, []commandStub{ {match: matchContains("ssh", "k3s"), err: errors.New("no binary")}, }) if _, err := orch.runSudoK3S(context.Background(), "titan-0a", "server"); err == nil { t.Fatalf("expected runSudoK3S failure when all candidates fail") } } // TestCriticalEndpointHelpers runs one orchestration or CLI step. // Signature: TestCriticalEndpointHelpers(t *testing.T). // Why: covers critical endpoint parsing and readiness checks that gate startup completion. func TestCriticalEndpointHelpers(t *testing.T) { cfg := config.Config{ Startup: config.Startup{ CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"}, }, } orch := buildOrchestratorWithStubs(t, cfg, []commandStub{ {match: matchContains("kubectl", "get endpoints victoria-metrics-single-server"), out: "10.42.0.10\n10.42.0.11\n"}, }) ok, detail, ns, svc, err := orch.criticalServiceEndpointsReady(context.Background()) if err != nil || !ok { t.Fatalf("expected criticalServiceEndpointsReady success, got ok=%v detail=%q ns=%q svc=%q err=%v", ok, detail, ns, svc, err) } if detail != "services=1" { t.Fatalf("unexpected readiness detail: %q", detail) } gotNS, gotSvc, err := parseCriticalServiceEndpoint("monitoring/victoria-metrics-single-server") if err != nil || gotNS != "monitoring" || gotSvc != "victoria-metrics-single-server" { t.Fatalf("unexpected parse result ns=%q svc=%q err=%v", gotNS, gotSvc, err) } if _, _, err := parseCriticalServiceEndpoint("invalid"); err == nil { t.Fatalf("expected parseCriticalServiceEndpoint error") } } // TestCriticalEndpointAutoHealWorkflow runs one orchestration or CLI step. // Signature: TestCriticalEndpointAutoHealWorkflow(t *testing.T). // Why: covers endpoint-zero recovery where startup heals workload replicas before succeeding. func TestCriticalEndpointAutoHealWorkflow(t *testing.T) { cfg := config.Config{ Startup: config.Startup{ CriticalServiceEndpointWaitSec: 2, CriticalServiceEndpointPollSec: 1, CriticalServiceEndpoints: []string{"monitoring/victoria-metrics-single-server"}, }, State: config.State{ Dir: t.TempDir(), ReportsDir: filepath.Join(t.TempDir(), "reports"), RunHistoryPath: filepath.Join(t.TempDir(), "runs.json"), }, } orch := &Orchestrator{ cfg: cfg, runner: &execx.Runner{}, store: state.New(cfg.State.RunHistoryPath), log: log.New(io.Discard, "", 0), } endpointChecks := 0 dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { joined := name + " " + strings.Join(args, " ") if strings.Contains(joined, "get endpoints victoria-metrics-single-server") { endpointChecks++ if endpointChecks == 1 { return "", nil } return "10.42.0.10\n", nil } if strings.Contains(joined, "scale deployment victoria-metrics-single-server") { return "", errors.New(`Error from server (NotFound): deployments.apps "victoria-metrics-single-server" not found`) } if strings.Contains(joined, "scale statefulset victoria-metrics-single-server") { return "", nil } if strings.Contains(joined, "rollout status statefulset/victoria-metrics-single-server") { return "statefulset rolled out", nil } return "", nil } orch.runOverride = dispatch orch.runSensitiveOverride = dispatch if err := orch.waitForCriticalServiceEndpoints(context.Background()); err != nil { t.Fatalf("waitForCriticalServiceEndpoints failed: %v", err) } if endpointChecks < 2 { t.Fatalf("expected repeated endpoint checks, got %d", endpointChecks) } }