package orchestrator import ( "context" "errors" "fmt" "io" "log" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "testing" "time" "scm.bstein.dev/bstein/ananke/internal/cluster" "scm.bstein.dev/bstein/ananke/internal/config" "scm.bstein.dev/bstein/ananke/internal/execx" "scm.bstein.dev/bstein/ananke/internal/state" ) // newLiveHookOrchestrator runs one orchestration or CLI step. // Signature: newLiveHookOrchestrator(t *testing.T, cfg config.Config) *cluster.Orchestrator. // Why: branch saturation tests need real run/runSensitive execution for wrapper fallback paths. func newLiveHookOrchestrator(t *testing.T, cfg config.Config) *cluster.Orchestrator { t.Helper() if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil { t.Fatalf("ensure state dir: %v", err) } return cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0)) } // TestHookPostStartExecRunBranchSaturation runs one orchestration or CLI step. // Signature: TestHookPostStartExecRunBranchSaturation(t *testing.T). // Why: closes uncovered run/runSensitive/post-start probe branches that only // execute when command overrides are absent. func TestHookPostStartExecRunBranchSaturation(t *testing.T) { cfg := lifecycleConfig(t) orch := newLiveHookOrchestrator(t, cfg) probeSrv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNoContent) _, _ = w.Write([]byte("ok")) })) defer probeSrv.Close() code, err := orch.TestHookHTTPProbe(context.Background(), probeSrv.URL) if err != nil || code != http.StatusNoContent { t.Fatalf("expected successful probe, code=%d err=%v", code, err) } if _, err := orch.TestHookHTTPProbe(context.Background(), "http://127.0.0.1:1"); err == nil { t.Fatalf("expected failing probe for closed port") } out, err := orch.TestHookRunSensitive(context.Background(), 3*time.Second, "sh", "-lc", "printf ok") if err != nil || strings.TrimSpace(out) != "ok" { t.Fatalf("expected runSensitive success, out=%q err=%v", out, err) } if _, err := orch.TestHookRunSensitive(context.Background(), 3*time.Second, "sh", "-lc", "echo boom >&2; exit 12"); err == nil { t.Fatalf("expected runSensitive failure branch") } } // TestHookCoordinationBranchSaturation runs one orchestration or CLI step. // Signature: TestHookCoordinationBranchSaturation(t *testing.T). // Why: covers peer-intent and snapshot verification edge branches that are hard // to hit through end-to-end startup runs. func TestHookCoordinationBranchSaturation(t *testing.T) { t.Run("peer-intent-matrix", func(t *testing.T) { now := time.Now().UTC() old := now.Add(-2 * time.Hour) for _, tc := range []struct { name string role string bootstrap string intent string updatedAt time.Time wantErr string }{ { name: "unknown-intent-ignored", intent: "intent=weird reason=\"odd\" source=peer updated_at=" + now.Format(time.RFC3339), bootstrap: "__ANANKE_BOOTSTRAP_IDLE__", }, { name: "fresh-shutdown-complete-blocks", intent: "intent=shutdown_complete reason=\"done\" source=peer updated_at=" + now.Format(time.RFC3339), bootstrap: "__ANANKE_BOOTSTRAP_IDLE__", wantErr: "completed shutdown too recently", }, { name: "stale-startup-intent-auto-clears", intent: "intent=startup_in_progress reason=\"stale\" source=peer updated_at=" + old.Format(time.RFC3339), bootstrap: "__ANANKE_BOOTSTRAP_IDLE__", }, { name: "manual-startup-coordinator-allowed", role: "coordinator", intent: "intent=startup_in_progress reason=\"manual-startup\" source=peer updated_at=" + now.Format(time.RFC3339), bootstrap: "__ANANKE_BOOTSTRAP_ACTIVE__", }, } { t.Run(tc.name, func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Coordination.Role = tc.role cfg.Coordination.PeerHosts = []string{"titan-24"} cfg.SSHManagedNodes = append(cfg.SSHManagedNodes, "titan-24") cfg.SSHNodeHosts["titan-24"] = "titan-24" run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml --set normal"): return "ok", nil case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml"): return tc.bootstrap + "\n" + tc.intent + "\n", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) err := orch.TestHookGuardPeerStartupIntents(context.Background()) if tc.wantErr == "" && err != nil { t.Fatalf("expected success, got %v", err) } if tc.wantErr != "" { if err == nil || !strings.Contains(err.Error(), tc.wantErr) { t.Fatalf("expected error containing %q, got %v", tc.wantErr, err) } } }) } }) t.Run("verify-etcd-snapshot-errors", func(t *testing.T) { type mode string for _, tc := range []struct { name string mode mode path string wantErr string }{ {name: "empty-path", mode: "ok", path: "", wantErr: "snapshot path is empty"}, {name: "size-parse", mode: "size-parse", path: "/snap", wantErr: "parse size"}, {name: "size-too-small", mode: "size-small", path: "/snap", wantErr: "snapshot too small"}, { name: "snapshot-missing-from-list", mode: "list-miss", path: "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown-1", wantErr: "not present in k3s", }, {name: "invalid-sha", mode: "bad-sha", path: "/snap", wantErr: "invalid sha256"}, {name: "success", mode: "ok", path: "/snap", wantErr: ""}, } { t.Run(tc.name, func(t *testing.T) { cfg := lifecycleConfig(t) run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "ssh" && strings.Contains(command, "stat -c %s"): switch tc.mode { case "size-parse": return "abc", nil case "size-small": return "128", nil default: return "2097152", nil } case name == "ssh" && strings.Contains(command, "sha256sum"): if tc.mode == "bad-sha" { return "deadbeef", nil } return "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", nil case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"): if tc.mode == "list-miss" { return "/other/snapshot", nil } return "/snap", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) err := orch.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", tc.path) if tc.wantErr == "" && err != nil { t.Fatalf("expected success, got %v", err) } if tc.wantErr != "" { if err == nil || !strings.Contains(err.Error(), tc.wantErr) { t.Fatalf("expected error containing %q, got %v", tc.wantErr, err) } } }) } }) t.Run("wait-for-api-failure-and-dryrun", func(t *testing.T) { cfg := lifecycleConfig(t) failRun := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "version --request-timeout=5s") { return "", errors.New("api unavailable") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orch, _ := newHookOrchestrator(t, cfg, failRun, failRun) if err := orch.TestHookWaitForAPI(context.Background(), 2, 0); err == nil { t.Fatalf("expected api wait failure") } dry := newDryRunHookOrchestrator(t, cfg, failRun) if err := dry.TestHookWaitForAPI(context.Background(), 1, 0); err != nil { t.Fatalf("expected dry-run api wait success, got %v", err) } }) } // TestHookAccessFluxsourceBranchSaturation runs one orchestration or CLI step. // Signature: TestHookAccessFluxsourceBranchSaturation(t *testing.T). // Why: covers remaining branch paths in flux-source guard/ensure/cache helpers. func TestHookAccessFluxsourceBranchSaturation(t *testing.T) { t.Run("ensure-flux-branch-notfound-and-patch-error", func(t *testing.T) { cfg := lifecycleConfig(t) var mode string run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"): if mode == "notfound" { return "", errors.New("Error from server (NotFound): gitrepositories.source.toolkit.fluxcd.io \"flux-system\" not found") } return "dev", nil case name == "kubectl" && strings.Contains(command, "patch gitrepository flux-system"): return "", errors.New("patch denied") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) mode = "notfound" if err := orch.TestHookEnsureFluxBranch(context.Background(), "main", false); err != nil { t.Fatalf("expected notfound branch to be tolerated, got %v", err) } mode = "normal" if err := orch.TestHookEnsureFluxBranch(context.Background(), "main", false); err == nil { t.Fatalf("expected strict mismatch error") } if err := orch.TestHookEnsureFluxBranch(context.Background(), "main", true); err == nil { t.Fatalf("expected patch failure branch") } }) t.Run("guard-drift-notfound-and-branch-read-error", func(t *testing.T) { cfg := lifecycleConfig(t) run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"): return "", errors.New("Error from server (NotFound): gitrepositories.source.toolkit.fluxcd.io \"flux-system\" not found") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) if err := orch.TestHookGuardFluxSourceDrift(context.Background(), "main", false); err != nil { t.Fatalf("expected notfound branch to pass, got %v", err) } runBranchErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"): return cfg.ExpectedFluxSource, nil case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"): return "", errors.New("branch read failure") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchBranchErr, _ := newHookOrchestrator(t, cfg, runBranchErr, runBranchErr) if err := orchBranchErr.TestHookGuardFluxSourceDrift(context.Background(), "main", false); err == nil { t.Fatalf("expected branch read failure") } }) t.Run("bootstrap-cache-empty-and-missing", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.IACRepoPath = t.TempDir() cfg.LocalBootstrapPaths = []string{"services/does-not-exist"} orch, _ := newHookOrchestrator(t, cfg, nil, nil) if err := orch.TestHookRefreshBootstrapCache(context.Background()); err == nil { t.Fatalf("expected no rendered cache manifests error") } if err := orch.TestHookApplyBootstrapCache(context.Background(), "services/does-not-exist"); err == nil { t.Fatalf("expected missing cache apply error") } }) } // TestHookStartupConvergenceAndStabilitySaturation runs one orchestration or CLI step. // Signature: TestHookStartupConvergenceAndStabilitySaturation(t *testing.T). // Why: saturates convergence/stability checklist branches that are otherwise only // reached in full drill flows. func TestHookStartupConvergenceAndStabilitySaturation(t *testing.T) { t.Run("startup-convergence-success-all-gates-enabled", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.RequireIngressChecklist = true cfg.Startup.RequireServiceChecklist = true cfg.Startup.RequireCriticalServiceEndpoints = true cfg.Startup.CriticalServiceEndpoints = nil cfg.Startup.RequireFluxHealth = true cfg.Startup.RequireWorkloadConvergence = true cfg.Startup.ServiceChecklist = nil cfg.Startup.ServiceChecklistStabilitySec = 0 run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "get ingress -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"): return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"True","reason":"Ready","message":"ok"}]}}]}`, nil case name == "kubectl" && strings.Contains(command, "get deploy,statefulset,daemonset -A -o json"): return `{"items":[]}`, nil case name == "kubectl" && strings.Contains(command, "get pods -A -o json"): return `{"items":[]}`, nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) if err := orch.TestHookWaitForStartupConvergence(context.Background()); err != nil { t.Fatalf("expected startup convergence success, got %v", err) } }) t.Run("startup-convergence-fails-on-flux", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.RequireIngressChecklist = false cfg.Startup.RequireServiceChecklist = false cfg.Startup.RequireCriticalServiceEndpoints = false cfg.Startup.RequireFluxHealth = true cfg.Startup.RequireWorkloadConvergence = false run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json") { return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","reason":"Progressing","message":"syncing"}]}}]}`, nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orch, _ := newHookOrchestrator(t, cfg, run, run) if err := orch.TestHookWaitForStartupConvergence(context.Background()); err == nil { t.Fatalf("expected startup convergence failure") } }) t.Run("checklist-host-parsing-branches", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.ServiceChecklist = []config.ServiceChecklistCheck{ {Name: "grafana-check", URL: "https://metrics.bstein.dev/"}, } orch, _ := newHookOrchestrator(t, cfg, nil, nil) if host := orch.TestHookChecklistFailureHost("grafana-check: status=502"); host != "metrics.bstein.dev" { t.Fatalf("expected mapped host metrics.bstein.dev, got %q", host) } if host := cluster.TestHookHostFromURL("https://gitea.bstein.dev/"); host != "gitea.bstein.dev" { t.Fatalf("expected URL host gitea.bstein.dev, got %q", host) } if host := cluster.TestHookHostFromURL("://bad"); host != "" { t.Fatalf("expected empty host for invalid URL, got %q", host) } }) } // TestHookScalingDrainAndTimesyncSaturation runs one orchestration or CLI step. // Signature: TestHookScalingDrainAndTimesyncSaturation(t *testing.T). // Why: covers remaining branch edges for snapshot/scaling/drain/inventory parsers. func TestHookScalingDrainAndTimesyncSaturation(t *testing.T) { t.Run("scaled-snapshot-read-write-errors", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.State.Dir = filepath.Join(t.TempDir(), "state-root") if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil { t.Fatalf("mkdir state dir: %v", err) } orch, _ := newHookOrchestrator(t, cfg, nil, nil) snapshotPath := filepath.Join(cfg.State.Dir, "scaled-workloads.json") if err := os.WriteFile(snapshotPath, []byte("{bad json"), 0o644); err != nil { t.Fatalf("write invalid snapshot: %v", err) } if _, err := orch.TestHookReadScaledWorkloadSnapshot(); err == nil { t.Fatalf("expected decode snapshot error") } if err := os.Remove(snapshotPath); err != nil { t.Fatalf("remove snapshot file: %v", err) } if err := os.Mkdir(snapshotPath, 0o755); err != nil { t.Fatalf("create snapshot directory: %v", err) } if _, err := orch.TestHookReadScaledWorkloadSnapshot(); err == nil { t.Fatalf("expected read snapshot error when path is directory") } }) t.Run("latest-etcd-snapshot-branches", func(t *testing.T) { cfg := lifecycleConfig(t) orch, _ := newHookOrchestrator(t, cfg, nil, nil) if _, err := orch.TestHookLatestEtcdSnapshotPath(context.Background(), "not-managed"); err == nil { t.Fatalf("expected unmanaged node snapshot error") } run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "ssh" && strings.Contains(command, "etcd-snapshot ls") { return "NAME LOCATION SIZE CREATED\n", nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchEmpty, _ := newHookOrchestrator(t, cfg, run, run) if _, err := orchEmpty.TestHookLatestEtcdSnapshotPath(context.Background(), "titan-db"); err == nil { t.Fatalf("expected empty snapshot list error") } }) t.Run("parse-datastore-and-validate-inventory-branches", func(t *testing.T) { if got := cluster.TestHookParseDatastoreEndpoint(`ExecStart=/usr/local/bin/k3s server --datastore-endpoint "postgres://db:5432/k3s"`); got != "postgres://db:5432/k3s" { t.Fatalf("unexpected parsed endpoint %q", got) } if got := cluster.TestHookParseDatastoreEndpoint("ExecStart=/usr/local/bin/k3s server"); got != "" { t.Fatalf("expected empty endpoint, got %q", got) } cfg := lifecycleConfig(t) cfg.SSHPort = 0 cfg.SSHUser = "bad user" cfg.ControlPlanes = []string{"titan-db"} cfg.Workers = []string{"titan-23"} cfg.SSHManagedNodes = []string{} cfg.SSHNodeHosts = map[string]string{ "titan-db": "bad/host", } orch, _ := newHookOrchestrator(t, cfg, nil, nil) if err := orch.TestHookValidateNodeInventory(); err == nil { t.Fatalf("expected inventory validation errors") } }) } // TestHookReportBranchSaturation runs one orchestration or CLI step. // Signature: TestHookReportBranchSaturation(t *testing.T). // Why: executes report helper branches from the top-level testing module so // startup status/artifact coverage remains strict without package-local tests. func TestHookReportBranchSaturation(t *testing.T) { cfg := lifecycleConfig(t) cfg.State.ReportsDir = "" orch := newLiveHookOrchestrator(t, cfg) if got := cluster.TestHookSanitizeReportFileName(" ### "); got != "run" { t.Fatalf("expected sanitized fallback run, got %q", got) } if got := orch.TestHookReportArchiveDir(); got == "" { t.Fatalf("expected non-empty report archive dir") } if !strings.HasSuffix(orch.TestHookStartupReportPath(), "last-startup-report.json") { t.Fatalf("unexpected startup report path: %s", orch.TestHookStartupReportPath()) } if !strings.HasSuffix(orch.TestHookStartupProgressPath(), "startup-progress.json") { t.Fatalf("unexpected startup progress path: %s", orch.TestHookStartupProgressPath()) } if !strings.HasSuffix(orch.TestHookLastShutdownReportPath(), "last-shutdown-report.json") { t.Fatalf("unexpected shutdown report path: %s", orch.TestHookLastShutdownReportPath()) } record := state.RunRecord{ ID: "startup:abc", Action: "startup", Reason: "test", StartedAt: time.Now().UTC().Add(-10 * time.Second), EndedAt: time.Now().UTC(), Success: true, } if err := orch.TestHookWriteRunRecordArtifact(record); err != nil { t.Fatalf("write startup artifact failed: %v", err) } // Exercise no-active-report branches. orch.TestHookSetStartupPhase("ignored", "no report active") orch.TestHookNoteStartupCheckState("", "passed", "ignored") orch.TestHookNoteStartupAutoHeal(" ") orch.TestHookFinalizeStartupReport(nil) // Exercise active startup report mutation and completion branches. orch.TestHookBeginStartupReport("quality-drill") orch.TestHookSetStartupPhase("phase-a", "detail-a") orch.TestHookNoteStartupCheckState("node-inventory", "weird", "detail-b") orch.TestHookNoteStartupCheck("flux", true, "ok") orch.TestHookNoteStartupAutoHeal("healed something") orch.TestHookFinalizeStartupReport(fmt.Errorf("startup failed")) if ok := orch.TestHookFinalizeStartupReportSnapshot(nil); !ok { t.Fatalf("expected startup report snapshot finalize wrapper to return true") } }