package orchestrator import ( "context" "errors" "io" "log" "os" "path/filepath" "strings" "testing" "time" "scm.bstein.dev/bstein/ananke/internal/cluster" "scm.bstein.dev/bstein/ananke/internal/execx" "scm.bstein.dev/bstein/ananke/internal/state" ) // TestHookGapMatrixPart9AccessCoordinationEndpoints runs one orchestration or CLI step. // Signature: TestHookGapMatrixPart9AccessCoordinationEndpoints(t *testing.T). // Why: closes uncovered statement ranges in access/fluxsource, coordination, // and critical-endpoint orchestration helpers. func TestHookGapMatrixPart9AccessCoordinationEndpoints(t *testing.T) { t.Run("access-fluxsource-uncovered-ranges", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Shutdown.SSHParallelism = 0 cfg.SSHManagedNodes = []string{"n1", "n2", "n3", "n4", "n5"} cfg.SSHNodeHosts = map[string]string{ "n1": "n1", "n2": "n2", "n3": "n3", "n4": "n4", "n5": "n5", } run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "ssh" && strings.Contains(command, "/usr/bin/systemctl --version"): return "", errors.New("sudo denied") case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"): return cfg.ExpectedFluxSource, nil case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"): return "dev", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orch, _ := newHookOrchestrator(t, cfg, run, run) err := orch.TestHookReconcileNodeAccess(context.Background(), []string{"", "unmanaged", "n1", "n2", "n3", "n4", "n5"}) if err == nil || !strings.Contains(err.Error(), "access validation had") { t.Fatalf("expected reconcile error aggregation branch, got %v", err) } cfgAuthNone := lifecycleConfig(t) cfgAuthNone.Startup.RequireNodeSSHAuth = true cfgAuthNone.Startup.NodeSSHAuthWaitSeconds = 1 cfgAuthNone.Startup.NodeSSHAuthPollSeconds = 1 cfgAuthNone.SSHManagedNodes = []string{"n1"} cfgAuthNone.Startup.IgnoreUnavailableNodes = []string{"n1"} orchAuthNone, _ := newHookOrchestrator(t, cfgAuthNone, nil, nil) if err := orchAuthNone.TestHookWaitForNodeSSHAuth(context.Background(), []string{"", "n1", "n1"}); err != nil { t.Fatalf("expected ssh auth pruned-target branch, got %v", err) } cfgAuthDenied := lifecycleConfig(t) cfgAuthDenied.Startup.RequireNodeSSHAuth = true cfgAuthDenied.Startup.NodeSSHAuthWaitSeconds = 1 cfgAuthDenied.Startup.NodeSSHAuthPollSeconds = 1 cfgAuthDenied.SSHManagedNodes = []string{"n1"} orchAuthDenied, _ := newHookOrchestrator(t, cfgAuthDenied, nil, nil) if err := orchAuthDenied.TestHookWaitForNodeSSHAuth(context.Background(), []string{"n2"}); err == nil || !strings.Contains(err.Error(), "not in ssh_managed_nodes") { t.Fatalf("expected ssh auth unmanaged-node branch, got %v", err) } if err := orch.TestHookGuardFluxSourceDrift(context.Background(), "main", false); err == nil || !strings.Contains(err.Error(), "branch drift") { t.Fatalf("expected guardFluxSourceDrift mismatch branch, got %v", err) } if err := orch.TestHookEnsureFluxBranch(context.Background(), " ", false); err != nil { t.Fatalf("expected ensureFluxBranch empty-branch fast path, got %v", err) } runReadErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}") { return "", errors.New("read failed") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchReadErr, _ := newHookOrchestrator(t, lifecycleConfig(t), runReadErr, runReadErr) if err := orchReadErr.TestHookEnsureFluxBranch(context.Background(), "main", true); err == nil || !strings.Contains(err.Error(), "read flux source branch") { t.Fatalf("expected ensureFluxBranch read error branch, got %v", err) } cfgReport := lifecycleConfig(t) runReport := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"): return cfgReport.ExpectedFluxSource, nil case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"): return "legacy", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchReport, _ := newHookOrchestrator(t, cfgReport, runReport, runReport) orchReport.TestHookReportFluxSource(context.Background(), "") cfgCache := lifecycleConfig(t) repo := t.TempDir() cfgCache.IACRepoPath = repo cfgCache.LocalBootstrapPaths = []string{" ", "services/bootstrap"} if err := os.MkdirAll(filepath.Join(repo, "services", "bootstrap"), 0o755); err != nil { t.Fatalf("mkdir bootstrap path: %v", err) } runCache := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "sh" && strings.Contains(command, "kubectl kustomize") { return "apiVersion: v1\nkind: Namespace\nmetadata:\n name: test\n", nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchCache, _ := newHookOrchestrator(t, cfgCache, runCache, runCache) cachePath := orchCache.TestHookBootstrapCachePath("services/bootstrap") if err := os.MkdirAll(cachePath, 0o755); err != nil { t.Fatalf("mkdir cache path-as-dir: %v", err) } if err := orchCache.TestHookRefreshBootstrapCache(context.Background()); err == nil || !strings.Contains(err.Error(), "no bootstrap cache manifests rendered") { t.Fatalf("expected refresh cache write-failure branch, got %v", err) } }) t.Run("coordination-uncovered-ranges", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Coordination.PeerHosts = []string{"titan-24"} cfg.Coordination.ForwardShutdownHost = "titan-25" cfg.SSHManagedNodes = append(cfg.SSHManagedNodes, "titan-24", "titan-25") cfg.SSHNodeHosts["titan-24"] = "titan-24" cfg.SSHNodeHosts["titan-25"] = "titan-25" orchPeers, _ := newHookOrchestrator(t, cfg, nil, nil) peers := orchPeers.TestHookCoordinationPeers() if len(peers) < 2 { t.Fatalf("expected peers to include forward host, got %v", peers) } stale := time.Now().UTC().Add(-2 * time.Hour).Format(time.RFC3339) runStale := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml --set normal"): return "ok", nil case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml"): return "__ANANKE_BOOTSTRAP_ACTIVE__\nintent=startup_in_progress reason=\"rolling\" source=peer updated_at=" + stale + "\n", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchStale, _ := newHookOrchestrator(t, cfg, runStale, runStale) if err := orchStale.TestHookGuardPeerStartupIntents(context.Background()); err != nil { t.Fatalf("expected stale startup intent auto-clear branch, got %v", err) } runShutdownStale := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml") { return "__ANANKE_BOOTSTRAP_IDLE__\nintent=shutting_down reason=\"old\" source=peer updated_at=" + stale + "\n", nil } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchShutdownStale, _ := newHookOrchestrator(t, cfg, runShutdownStale, runShutdownStale) if err := orchShutdownStale.TestHookGuardPeerStartupIntents(context.Background()); err != nil { t.Fatalf("expected stale shutdown intent allow branch, got %v", err) } runSSHErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml") { return "", errors.New("ssh failed") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchSSHErr, _ := newHookOrchestrator(t, cfg, runSSHErr, runSSHErr) if _, err := orchSSHErr.TestHookReadRemotePeerStatus(context.Background(), "titan-24"); err == nil || !strings.Contains(err.Error(), "ssh failed") { t.Fatalf("expected readRemotePeerStatus ssh-error branch, got %v", err) } dryCfg := lifecycleConfig(t) dry := cluster.New(dryCfg, &execx.Runner{DryRun: true}, state.New(dryCfg.State.RunHistoryPath), log.New(io.Discard, "", 0)) if err := dry.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/snap"); err != nil { t.Fatalf("expected verifyEtcdSnapshot dry-run fast path, got %v", err) } runMissing := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "ssh" && strings.Contains(command, "stat -c %s"): return "2097152", nil case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"): return "other-snapshot", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchMissing, _ := newHookOrchestrator(t, cfg, runMissing, runMissing) if err := orchMissing.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"); err == nil || !strings.Contains(err.Error(), "not present") { t.Fatalf("expected verifyEtcdSnapshot missing-in-ls branch, got %v", err) } runBadHash := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "ssh" && strings.Contains(command, "stat -c %s"): return "2097152", nil case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"): return "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown", nil case name == "ssh" && strings.Contains(command, "sha256sum"): return "abc", nil default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchBadHash, _ := newHookOrchestrator(t, cfg, runBadHash, runBadHash) if err := orchBadHash.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"); err == nil || !strings.Contains(err.Error(), "invalid sha256") { t.Fatalf("expected verifyEtcdSnapshot invalid-hash branch, got %v", err) } }) t.Run("critical-endpoint-uncovered-ranges", func(t *testing.T) { cfg := lifecycleConfig(t) cfg.Startup.CriticalServiceEndpointWaitSec = 1 cfg.Startup.CriticalServiceEndpointPollSec = 1 cfg.Startup.CriticalServiceEndpoints = []string{"invalid-entry"} orchErr, _ := newHookOrchestrator(t, cfg, nil, nil) ctx, cancel := context.WithCancel(context.Background()) cancel() if err := orchErr.TestHookWaitForCriticalServiceEndpoints(ctx); !errors.Is(err, context.Canceled) { t.Fatalf("expected critical endpoint canceled branch after parse error pass, got %v", err) } runNotFound := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "get endpoints") { return "", errors.New("endpoints \"svc\" not found") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } cfgNF := lifecycleConfig(t) cfgNF.Startup.CriticalServiceEndpoints = []string{"ns/svc"} orchNF, _ := newHookOrchestrator(t, cfgNF, runNotFound, runNotFound) ready, detail, ns, svc, err := orchNF.TestHookCriticalServiceEndpointsReady(context.Background()) if err != nil || ready || ns != "ns" || svc != "svc" || !strings.Contains(detail, "not found") { t.Fatalf("expected critical endpoint notfound branch, ready=%v detail=%q ns=%q svc=%q err=%v", ready, detail, ns, svc, err) } runQueryErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") if name == "kubectl" && strings.Contains(command, "get endpoints") { return "", errors.New("boom") } return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } orchQErr, _ := newHookOrchestrator(t, cfgNF, runQueryErr, runQueryErr) if _, _, _, _, err := orchQErr.TestHookCriticalServiceEndpointsReady(context.Background()); err == nil || !strings.Contains(err.Error(), "query endpoints") { t.Fatalf("expected critical endpoint query error branch, got %v", err) } runHealErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) { command := name + " " + strings.Join(args, " ") switch { case name == "kubectl" && strings.Contains(command, " scale deployment svc "): return "", nil case name == "kubectl" && strings.Contains(command, "rollout status deployment/svc"): return "", errors.New("rollout failed") case name == "kubectl" && strings.Contains(command, " scale statefulset svc "): return "", errors.New("statefulset not found") default: return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...) } } orchHealErr, _ := newHookOrchestrator(t, lifecycleConfig(t), runHealErr, runHealErr) if _, err := orchHealErr.TestHookMaybeHealCriticalEndpointBackends(context.Background(), "ns", "svc"); err == nil || !strings.Contains(err.Error(), "rollout failed") { t.Fatalf("expected critical endpoint heal wait error branch, got %v", err) } cfgBadParse := lifecycleConfig(t) cfgBadParse.Startup.CriticalServiceEndpoints = []string{"/svc"} orchBadParse, _ := newHookOrchestrator(t, cfgBadParse, nil, nil) if _, _, _, _, err := orchBadParse.TestHookCriticalServiceEndpointsReady(context.Background()); err == nil || !strings.Contains(err.Error(), "namespace and service") { t.Fatalf("expected critical endpoint parse-empty branch, got %v", err) } }) }