package orchestrator import ( "context" "errors" "io" "log" "strings" "testing" "time" "scm.bstein.dev/bstein/ananke/internal/cluster" "scm.bstein.dev/bstein/ananke/internal/execx" "scm.bstein.dev/bstein/ananke/internal/state" ) // TestLifecycleEtcdRestoreHappyPath runs one orchestration or CLI step. // Signature: TestLifecycleEtcdRestoreHappyPath(t *testing.T). // Why: covers non-dry-run etcd restore flow including snapshot discovery, verification, and k3s reset. func TestLifecycleEtcdRestoreHappyPath(t *testing.T) { orch, recorder, _ := newLifecycleOrchestrator(t) orch.SetCommandOverrides(func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") recorder.record(name, args) switch name { case "ssh": switch { case strings.Contains(command, "sudo systemctl cat k3s"): return "ExecStart=/usr/local/bin/k3s server", nil case strings.Contains(command, "etcd-snapshot ls"): return "Name Size Created Location\npre-shutdown 4.2M now \"file:///var/lib/rancher/k3s/server/db/snapshots/pre-shutdown\"\n", nil case strings.Contains(command, "stat -c %s"): return "2097152", nil case strings.Contains(command, "sha256sum"): return "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil default: return "ok", nil } default: return "", nil } }, nil) if err := orch.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{}); err != nil { t.Fatalf("etcd restore failed: %v", err) } if !recorder.contains("--cluster-reset-restore-path") { t.Fatalf("expected k3s cluster-reset command in call log") } } // TestLifecycleEtcdRestoreRejectsExternalDatastore runs one orchestration or CLI step. // Signature: TestLifecycleEtcdRestoreRejectsExternalDatastore(t *testing.T). // Why: ensures restore exits early when k3s is configured for external datastore mode. func TestLifecycleEtcdRestoreRejectsExternalDatastore(t *testing.T) { cfg := lifecycleConfig(t) orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger()) orch.SetCommandOverrides(func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "ssh" && strings.Contains(command, "sudo systemctl cat k3s") { return "ExecStart=/usr/local/bin/k3s server --datastore-endpoint=postgres://db:5432/k3s", nil } return "", nil }, nil) err := orch.EtcdRestore(context.Background(), cluster.EtcdRestoreOptions{ControlPlane: "titan-db"}) if err == nil { t.Fatalf("expected external datastore restore rejection") } if !errors.Is(err, cluster.ErrEtcdRestoreNotApplicable) { t.Fatalf("expected ErrEtcdRestoreNotApplicable, got %v", err) } } // TestLifecycleStartupCriticalEndpointAutoHeal runs one orchestration or CLI step. // Signature: TestLifecycleStartupCriticalEndpointAutoHeal(t *testing.T). // Why: covers endpoint checklist self-heal path that restores backend replicas before declaring startup complete. func TestLifecycleStartupCriticalEndpointAutoHeal(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.RequireFluxHealth = false cfg.Startup.RequireWorkloadConvergence = false cfg.Startup.CriticalServiceEndpointWaitSec = 3 cfg.Startup.CriticalServiceEndpointPollSec = 1 recorder := &commandRecorder{} orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger()) var healed bool dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { recorder.record(name, args) command := name + " " + strings.Join(args, " ") switch name { case "ssh": if strings.Contains(command, "__ANANKE_NODE_REACHABLE__") { return "__ANANKE_NODE_REACHABLE__", nil } if strings.Contains(command, "systemctl cat k3s") { return "ExecStart=/usr/local/bin/k3s server", nil } return "ok", nil case "kubectl": switch { case strings.Contains(command, "version --request-timeout=5s"): return "v1.31.0", nil case strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"): return "True", nil case strings.Contains(command, "jsonpath={.spec.url}"): return cfg.ExpectedFluxSource, nil case strings.Contains(command, "jsonpath={.spec.ref.branch}"): return cfg.ExpectedFluxBranch, nil case strings.Contains(command, "get endpoints victoria-metrics-single-server"): if healed { return "10.42.0.10\n", nil } return "", nil case strings.Contains(command, "scale statefulset victoria-metrics-single-server"): healed = true return "", nil case strings.Contains(command, "scale "), strings.Contains(command, "rollout status"): return "ok", nil case strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil case strings.Contains(command, "get deploy,statefulset -A -o json"): return `{"items":[{"kind":"StatefulSet","metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server"},"spec":{"replicas":1}}]}`, nil case strings.Contains(command, "jsonpath={.status.readyReplicas}"): return "1", nil case strings.Contains(command, "get deployment -A -o jsonpath="): return "monitoring\tgrafana\t1\n", nil case strings.Contains(command, "get statefulset -A -o jsonpath="): return "monitoring\tvictoria-metrics-single-server\t1\n", nil case strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io -o jsonpath="): return "", nil case strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A -o jsonpath="): return "", nil default: return "", nil } default: return "", nil } } orch.SetCommandOverrides(dispatch, dispatch) if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "critical-endpoint-heal"}); err != nil { t.Fatalf("startup with endpoint auto-heal failed: %v", err) } if !recorder.contains("scale statefulset victoria-metrics-single-server") { t.Fatalf("expected endpoint auto-heal scale command in call log") } } // TestLifecycleStartupFluxImmutableJobSelfHeal runs one orchestration or CLI step. // Signature: TestLifecycleStartupFluxImmutableJobSelfHeal(t *testing.T). // Why: covers immutable-job detection and stale failed job cleanup during flux convergence. func TestLifecycleStartupFluxImmutableJobSelfHeal(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.RequireCriticalServiceEndpoints = false cfg.Startup.RequireWorkloadConvergence = false cfg.Startup.FluxHealthWaitSeconds = 4 cfg.Startup.FluxHealthPollSeconds = 1 recorder := &commandRecorder{} orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), testLogger()) fluxCalls := 0 dispatch := func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) { recorder.record(name, args) command := name + " " + strings.Join(args, " ") switch name { case "ssh": if strings.Contains(command, "__ANANKE_NODE_REACHABLE__") { return "__ANANKE_NODE_REACHABLE__", nil } if strings.Contains(command, "systemctl cat k3s") { return "ExecStart=/usr/local/bin/k3s server", nil } return "ok", nil case "kubectl": switch { case strings.Contains(command, "version --request-timeout=5s"): return "v1.31.0", nil case strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"): return "True", nil case strings.Contains(command, "jsonpath={.spec.url}"): return cfg.ExpectedFluxSource, nil case strings.Contains(command, "jsonpath={.spec.ref.branch}"): return cfg.ExpectedFluxBranch, nil case strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"): fluxCalls++ if fluxCalls <= 2 { return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","message":"Job update failed: field is immutable"}]}}]}`, nil } return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"True","message":"ok"}]}}]}`, nil case strings.Contains(command, "get jobs -A -o json"): return `{"items":[{"metadata":{"namespace":"flux-system","name":"reconcile-services","labels":{"kustomize.toolkit.fluxcd.io/name":"services"}},"status":{"failed":1,"conditions":[{"type":"Failed","status":"True"}]}}]}`, nil case strings.Contains(command, "delete job reconcile-services"): return "", nil case strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io -o jsonpath="): return "services\n", nil case strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A -o jsonpath="): return "", nil case strings.Contains(command, "patch "), strings.Contains(command, "annotate "): return "", nil case strings.Contains(command, "get deployment -A -o jsonpath="): return "monitoring\tgrafana\t1\n", nil case strings.Contains(command, "get statefulset -A -o jsonpath="): return "monitoring\tvictoria-metrics-single-server\t1\n", nil case strings.Contains(command, "scale "), strings.Contains(command, "rollout status"): return "", nil case strings.Contains(command, "jsonpath={.status.readyReplicas}"): return "1", nil case strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil default: return "", nil } case "flux": return "", nil default: return "", nil } } orch.SetCommandOverrides(dispatch, dispatch) if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "immutable-job-heal"}); err != nil { t.Fatalf("startup with immutable-job heal failed: %v", err) } if !recorder.contains("delete job reconcile-services") { t.Fatalf("expected stale failed flux job deletion in call log") } } // testLogger runs one orchestration or CLI step. // Signature: testLogger() *log.Logger. // Why: centralizes discarded logging for focused orchestration recovery tests. func testLogger() *log.Logger { return log.New(io.Discard, "", 0) }