ananke/testing/orchestrator/hooks_gap_matrix_part9_test.go

295 lines
14 KiB
Go

package orchestrator
import (
"context"
"errors"
"io"
"log"
"os"
"path/filepath"
"strings"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/cluster"
"scm.bstein.dev/bstein/ananke/internal/execx"
"scm.bstein.dev/bstein/ananke/internal/state"
)
// TestHookGapMatrixPart9AccessCoordinationEndpoints runs one orchestration or CLI step.
// Signature: TestHookGapMatrixPart9AccessCoordinationEndpoints(t *testing.T).
// Why: closes uncovered statement ranges in access/fluxsource, coordination,
// and critical-endpoint orchestration helpers.
func TestHookGapMatrixPart9AccessCoordinationEndpoints(t *testing.T) {
t.Run("access-fluxsource-uncovered-ranges", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Shutdown.SSHParallelism = 0
cfg.SSHManagedNodes = []string{"n1", "n2", "n3", "n4", "n5"}
cfg.SSHNodeHosts = map[string]string{
"n1": "n1",
"n2": "n2",
"n3": "n3",
"n4": "n4",
"n5": "n5",
}
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "ssh" && strings.Contains(command, "/usr/bin/systemctl --version"):
return "", errors.New("sudo denied")
case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"):
return cfg.ExpectedFluxSource, nil
case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"):
return "dev", nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orch, _ := newHookOrchestrator(t, cfg, run, run)
err := orch.TestHookReconcileNodeAccess(context.Background(), []string{"", "unmanaged", "n1", "n2", "n3", "n4", "n5"})
if err == nil || !strings.Contains(err.Error(), "access validation had") {
t.Fatalf("expected reconcile error aggregation branch, got %v", err)
}
cfgAuthNone := lifecycleConfig(t)
cfgAuthNone.Startup.RequireNodeSSHAuth = true
cfgAuthNone.Startup.NodeSSHAuthWaitSeconds = 1
cfgAuthNone.Startup.NodeSSHAuthPollSeconds = 1
cfgAuthNone.SSHManagedNodes = []string{"n1"}
cfgAuthNone.Startup.IgnoreUnavailableNodes = []string{"n1"}
orchAuthNone, _ := newHookOrchestrator(t, cfgAuthNone, nil, nil)
if err := orchAuthNone.TestHookWaitForNodeSSHAuth(context.Background(), []string{"", "n1", "n1"}); err != nil {
t.Fatalf("expected ssh auth pruned-target branch, got %v", err)
}
cfgAuthDenied := lifecycleConfig(t)
cfgAuthDenied.Startup.RequireNodeSSHAuth = true
cfgAuthDenied.Startup.NodeSSHAuthWaitSeconds = 1
cfgAuthDenied.Startup.NodeSSHAuthPollSeconds = 1
cfgAuthDenied.SSHManagedNodes = []string{"n1"}
orchAuthDenied, _ := newHookOrchestrator(t, cfgAuthDenied, nil, nil)
if err := orchAuthDenied.TestHookWaitForNodeSSHAuth(context.Background(), []string{"n2"}); err == nil || !strings.Contains(err.Error(), "not in ssh_managed_nodes") {
t.Fatalf("expected ssh auth unmanaged-node branch, got %v", err)
}
if err := orch.TestHookGuardFluxSourceDrift(context.Background(), "main", false); err == nil || !strings.Contains(err.Error(), "branch drift") {
t.Fatalf("expected guardFluxSourceDrift mismatch branch, got %v", err)
}
if err := orch.TestHookEnsureFluxBranch(context.Background(), " ", false); err != nil {
t.Fatalf("expected ensureFluxBranch empty-branch fast path, got %v", err)
}
runReadErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}") {
return "", errors.New("read failed")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchReadErr, _ := newHookOrchestrator(t, lifecycleConfig(t), runReadErr, runReadErr)
if err := orchReadErr.TestHookEnsureFluxBranch(context.Background(), "main", true); err == nil || !strings.Contains(err.Error(), "read flux source branch") {
t.Fatalf("expected ensureFluxBranch read error branch, got %v", err)
}
cfgReport := lifecycleConfig(t)
runReport := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"):
return cfgReport.ExpectedFluxSource, nil
case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"):
return "legacy", nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orchReport, _ := newHookOrchestrator(t, cfgReport, runReport, runReport)
orchReport.TestHookReportFluxSource(context.Background(), "")
cfgCache := lifecycleConfig(t)
repo := t.TempDir()
cfgCache.IACRepoPath = repo
cfgCache.LocalBootstrapPaths = []string{" ", "services/bootstrap"}
if err := os.MkdirAll(filepath.Join(repo, "services", "bootstrap"), 0o755); err != nil {
t.Fatalf("mkdir bootstrap path: %v", err)
}
runCache := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "sh" && strings.Contains(command, "kubectl kustomize") {
return "apiVersion: v1\nkind: Namespace\nmetadata:\n name: test\n", nil
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchCache, _ := newHookOrchestrator(t, cfgCache, runCache, runCache)
cachePath := orchCache.TestHookBootstrapCachePath("services/bootstrap")
if err := os.MkdirAll(cachePath, 0o755); err != nil {
t.Fatalf("mkdir cache path-as-dir: %v", err)
}
if err := orchCache.TestHookRefreshBootstrapCache(context.Background()); err == nil || !strings.Contains(err.Error(), "no bootstrap cache manifests rendered") {
t.Fatalf("expected refresh cache write-failure branch, got %v", err)
}
})
t.Run("coordination-uncovered-ranges", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Coordination.PeerHosts = []string{"titan-24"}
cfg.Coordination.ForwardShutdownHost = "titan-25"
cfg.SSHManagedNodes = append(cfg.SSHManagedNodes, "titan-24", "titan-25")
cfg.SSHNodeHosts["titan-24"] = "titan-24"
cfg.SSHNodeHosts["titan-25"] = "titan-25"
orchPeers, _ := newHookOrchestrator(t, cfg, nil, nil)
peers := orchPeers.TestHookCoordinationPeers()
if len(peers) < 2 {
t.Fatalf("expected peers to include forward host, got %v", peers)
}
stale := time.Now().UTC().Add(-2 * time.Hour).Format(time.RFC3339)
runStale := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml --set normal"):
return "ok", nil
case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml"):
return "__ANANKE_BOOTSTRAP_ACTIVE__\nintent=startup_in_progress reason=\"rolling\" source=peer updated_at=" + stale + "\n", nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orchStale, _ := newHookOrchestrator(t, cfg, runStale, runStale)
if err := orchStale.TestHookGuardPeerStartupIntents(context.Background()); err != nil {
t.Fatalf("expected stale startup intent auto-clear branch, got %v", err)
}
runShutdownStale := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml") {
return "__ANANKE_BOOTSTRAP_IDLE__\nintent=shutting_down reason=\"old\" source=peer updated_at=" + stale + "\n", nil
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchShutdownStale, _ := newHookOrchestrator(t, cfg, runShutdownStale, runShutdownStale)
if err := orchShutdownStale.TestHookGuardPeerStartupIntents(context.Background()); err != nil {
t.Fatalf("expected stale shutdown intent allow branch, got %v", err)
}
runSSHErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml") {
return "", errors.New("ssh failed")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchSSHErr, _ := newHookOrchestrator(t, cfg, runSSHErr, runSSHErr)
if _, err := orchSSHErr.TestHookReadRemotePeerStatus(context.Background(), "titan-24"); err == nil || !strings.Contains(err.Error(), "ssh failed") {
t.Fatalf("expected readRemotePeerStatus ssh-error branch, got %v", err)
}
dryCfg := lifecycleConfig(t)
dry := cluster.New(dryCfg, &execx.Runner{DryRun: true}, state.New(dryCfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
if err := dry.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/snap"); err != nil {
t.Fatalf("expected verifyEtcdSnapshot dry-run fast path, got %v", err)
}
runMissing := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "ssh" && strings.Contains(command, "stat -c %s"):
return "2097152", nil
case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"):
return "other-snapshot", nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orchMissing, _ := newHookOrchestrator(t, cfg, runMissing, runMissing)
if err := orchMissing.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"); err == nil || !strings.Contains(err.Error(), "not present") {
t.Fatalf("expected verifyEtcdSnapshot missing-in-ls branch, got %v", err)
}
runBadHash := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "ssh" && strings.Contains(command, "stat -c %s"):
return "2097152", nil
case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"):
return "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown", nil
case name == "ssh" && strings.Contains(command, "sha256sum"):
return "abc", nil
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orchBadHash, _ := newHookOrchestrator(t, cfg, runBadHash, runBadHash)
if err := orchBadHash.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"); err == nil || !strings.Contains(err.Error(), "invalid sha256") {
t.Fatalf("expected verifyEtcdSnapshot invalid-hash branch, got %v", err)
}
})
t.Run("critical-endpoint-uncovered-ranges", func(t *testing.T) {
cfg := lifecycleConfig(t)
cfg.Startup.CriticalServiceEndpointWaitSec = 1
cfg.Startup.CriticalServiceEndpointPollSec = 1
cfg.Startup.CriticalServiceEndpoints = []string{"invalid-entry"}
orchErr, _ := newHookOrchestrator(t, cfg, nil, nil)
ctx, cancel := context.WithCancel(context.Background())
cancel()
if err := orchErr.TestHookWaitForCriticalServiceEndpoints(ctx); !errors.Is(err, context.Canceled) {
t.Fatalf("expected critical endpoint canceled branch after parse error pass, got %v", err)
}
runNotFound := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "get endpoints") {
return "", errors.New("endpoints \"svc\" not found")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
cfgNF := lifecycleConfig(t)
cfgNF.Startup.CriticalServiceEndpoints = []string{"ns/svc"}
orchNF, _ := newHookOrchestrator(t, cfgNF, runNotFound, runNotFound)
ready, detail, ns, svc, err := orchNF.TestHookCriticalServiceEndpointsReady(context.Background())
if err != nil || ready || ns != "ns" || svc != "svc" || !strings.Contains(detail, "not found") {
t.Fatalf("expected critical endpoint notfound branch, ready=%v detail=%q ns=%q svc=%q err=%v", ready, detail, ns, svc, err)
}
runQueryErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "get endpoints") {
return "", errors.New("boom")
}
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
orchQErr, _ := newHookOrchestrator(t, cfgNF, runQueryErr, runQueryErr)
if _, _, _, _, err := orchQErr.TestHookCriticalServiceEndpointsReady(context.Background()); err == nil || !strings.Contains(err.Error(), "query endpoints") {
t.Fatalf("expected critical endpoint query error branch, got %v", err)
}
runHealErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
switch {
case name == "kubectl" && strings.Contains(command, " scale deployment svc "):
return "", nil
case name == "kubectl" && strings.Contains(command, "rollout status deployment/svc"):
return "", errors.New("rollout failed")
case name == "kubectl" && strings.Contains(command, " scale statefulset svc "):
return "", errors.New("statefulset not found")
default:
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
}
}
orchHealErr, _ := newHookOrchestrator(t, lifecycleConfig(t), runHealErr, runHealErr)
if _, err := orchHealErr.TestHookMaybeHealCriticalEndpointBackends(context.Background(), "ns", "svc"); err == nil || !strings.Contains(err.Error(), "rollout failed") {
t.Fatalf("expected critical endpoint heal wait error branch, got %v", err)
}
cfgBadParse := lifecycleConfig(t)
cfgBadParse.Startup.CriticalServiceEndpoints = []string{"/svc"}
orchBadParse, _ := newHookOrchestrator(t, cfgBadParse, nil, nil)
if _, _, _, _, err := orchBadParse.TestHookCriticalServiceEndpointsReady(context.Background()); err == nil || !strings.Contains(err.Error(), "namespace and service") {
t.Fatalf("expected critical endpoint parse-empty branch, got %v", err)
}
})
}