295 lines
14 KiB
Go
295 lines
14 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/ananke/internal/cluster"
|
|
"scm.bstein.dev/bstein/ananke/internal/execx"
|
|
"scm.bstein.dev/bstein/ananke/internal/state"
|
|
)
|
|
|
|
// TestHookGapMatrixPart9AccessCoordinationEndpoints runs one orchestration or CLI step.
|
|
// Signature: TestHookGapMatrixPart9AccessCoordinationEndpoints(t *testing.T).
|
|
// Why: closes uncovered statement ranges in access/fluxsource, coordination,
|
|
// and critical-endpoint orchestration helpers.
|
|
func TestHookGapMatrixPart9AccessCoordinationEndpoints(t *testing.T) {
|
|
t.Run("access-fluxsource-uncovered-ranges", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Shutdown.SSHParallelism = 0
|
|
cfg.SSHManagedNodes = []string{"n1", "n2", "n3", "n4", "n5"}
|
|
cfg.SSHNodeHosts = map[string]string{
|
|
"n1": "n1",
|
|
"n2": "n2",
|
|
"n3": "n3",
|
|
"n4": "n4",
|
|
"n5": "n5",
|
|
}
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "ssh" && strings.Contains(command, "/usr/bin/systemctl --version"):
|
|
return "", errors.New("sudo denied")
|
|
case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"):
|
|
return cfg.ExpectedFluxSource, nil
|
|
case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"):
|
|
return "dev", nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orch, _ := newHookOrchestrator(t, cfg, run, run)
|
|
|
|
err := orch.TestHookReconcileNodeAccess(context.Background(), []string{"", "unmanaged", "n1", "n2", "n3", "n4", "n5"})
|
|
if err == nil || !strings.Contains(err.Error(), "access validation had") {
|
|
t.Fatalf("expected reconcile error aggregation branch, got %v", err)
|
|
}
|
|
|
|
cfgAuthNone := lifecycleConfig(t)
|
|
cfgAuthNone.Startup.RequireNodeSSHAuth = true
|
|
cfgAuthNone.Startup.NodeSSHAuthWaitSeconds = 1
|
|
cfgAuthNone.Startup.NodeSSHAuthPollSeconds = 1
|
|
cfgAuthNone.SSHManagedNodes = []string{"n1"}
|
|
cfgAuthNone.Startup.IgnoreUnavailableNodes = []string{"n1"}
|
|
orchAuthNone, _ := newHookOrchestrator(t, cfgAuthNone, nil, nil)
|
|
if err := orchAuthNone.TestHookWaitForNodeSSHAuth(context.Background(), []string{"", "n1", "n1"}); err != nil {
|
|
t.Fatalf("expected ssh auth pruned-target branch, got %v", err)
|
|
}
|
|
|
|
cfgAuthDenied := lifecycleConfig(t)
|
|
cfgAuthDenied.Startup.RequireNodeSSHAuth = true
|
|
cfgAuthDenied.Startup.NodeSSHAuthWaitSeconds = 1
|
|
cfgAuthDenied.Startup.NodeSSHAuthPollSeconds = 1
|
|
cfgAuthDenied.SSHManagedNodes = []string{"n1"}
|
|
orchAuthDenied, _ := newHookOrchestrator(t, cfgAuthDenied, nil, nil)
|
|
if err := orchAuthDenied.TestHookWaitForNodeSSHAuth(context.Background(), []string{"n2"}); err == nil || !strings.Contains(err.Error(), "not in ssh_managed_nodes") {
|
|
t.Fatalf("expected ssh auth unmanaged-node branch, got %v", err)
|
|
}
|
|
|
|
if err := orch.TestHookGuardFluxSourceDrift(context.Background(), "main", false); err == nil || !strings.Contains(err.Error(), "branch drift") {
|
|
t.Fatalf("expected guardFluxSourceDrift mismatch branch, got %v", err)
|
|
}
|
|
|
|
if err := orch.TestHookEnsureFluxBranch(context.Background(), " ", false); err != nil {
|
|
t.Fatalf("expected ensureFluxBranch empty-branch fast path, got %v", err)
|
|
}
|
|
|
|
runReadErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}") {
|
|
return "", errors.New("read failed")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchReadErr, _ := newHookOrchestrator(t, lifecycleConfig(t), runReadErr, runReadErr)
|
|
if err := orchReadErr.TestHookEnsureFluxBranch(context.Background(), "main", true); err == nil || !strings.Contains(err.Error(), "read flux source branch") {
|
|
t.Fatalf("expected ensureFluxBranch read error branch, got %v", err)
|
|
}
|
|
|
|
cfgReport := lifecycleConfig(t)
|
|
runReport := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}"):
|
|
return cfgReport.ExpectedFluxSource, nil
|
|
case name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}"):
|
|
return "legacy", nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchReport, _ := newHookOrchestrator(t, cfgReport, runReport, runReport)
|
|
orchReport.TestHookReportFluxSource(context.Background(), "")
|
|
|
|
cfgCache := lifecycleConfig(t)
|
|
repo := t.TempDir()
|
|
cfgCache.IACRepoPath = repo
|
|
cfgCache.LocalBootstrapPaths = []string{" ", "services/bootstrap"}
|
|
if err := os.MkdirAll(filepath.Join(repo, "services", "bootstrap"), 0o755); err != nil {
|
|
t.Fatalf("mkdir bootstrap path: %v", err)
|
|
}
|
|
runCache := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "sh" && strings.Contains(command, "kubectl kustomize") {
|
|
return "apiVersion: v1\nkind: Namespace\nmetadata:\n name: test\n", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchCache, _ := newHookOrchestrator(t, cfgCache, runCache, runCache)
|
|
cachePath := orchCache.TestHookBootstrapCachePath("services/bootstrap")
|
|
if err := os.MkdirAll(cachePath, 0o755); err != nil {
|
|
t.Fatalf("mkdir cache path-as-dir: %v", err)
|
|
}
|
|
if err := orchCache.TestHookRefreshBootstrapCache(context.Background()); err == nil || !strings.Contains(err.Error(), "no bootstrap cache manifests rendered") {
|
|
t.Fatalf("expected refresh cache write-failure branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("coordination-uncovered-ranges", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Coordination.PeerHosts = []string{"titan-24"}
|
|
cfg.Coordination.ForwardShutdownHost = "titan-25"
|
|
cfg.SSHManagedNodes = append(cfg.SSHManagedNodes, "titan-24", "titan-25")
|
|
cfg.SSHNodeHosts["titan-24"] = "titan-24"
|
|
cfg.SSHNodeHosts["titan-25"] = "titan-25"
|
|
orchPeers, _ := newHookOrchestrator(t, cfg, nil, nil)
|
|
peers := orchPeers.TestHookCoordinationPeers()
|
|
if len(peers) < 2 {
|
|
t.Fatalf("expected peers to include forward host, got %v", peers)
|
|
}
|
|
|
|
stale := time.Now().UTC().Add(-2 * time.Hour).Format(time.RFC3339)
|
|
runStale := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml --set normal"):
|
|
return "ok", nil
|
|
case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml"):
|
|
return "__ANANKE_BOOTSTRAP_ACTIVE__\nintent=startup_in_progress reason=\"rolling\" source=peer updated_at=" + stale + "\n", nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchStale, _ := newHookOrchestrator(t, cfg, runStale, runStale)
|
|
if err := orchStale.TestHookGuardPeerStartupIntents(context.Background()); err != nil {
|
|
t.Fatalf("expected stale startup intent auto-clear branch, got %v", err)
|
|
}
|
|
|
|
runShutdownStale := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml") {
|
|
return "__ANANKE_BOOTSTRAP_IDLE__\nintent=shutting_down reason=\"old\" source=peer updated_at=" + stale + "\n", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchShutdownStale, _ := newHookOrchestrator(t, cfg, runShutdownStale, runShutdownStale)
|
|
if err := orchShutdownStale.TestHookGuardPeerStartupIntents(context.Background()); err != nil {
|
|
t.Fatalf("expected stale shutdown intent allow branch, got %v", err)
|
|
}
|
|
|
|
runSSHErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml") {
|
|
return "", errors.New("ssh failed")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchSSHErr, _ := newHookOrchestrator(t, cfg, runSSHErr, runSSHErr)
|
|
if _, err := orchSSHErr.TestHookReadRemotePeerStatus(context.Background(), "titan-24"); err == nil || !strings.Contains(err.Error(), "ssh failed") {
|
|
t.Fatalf("expected readRemotePeerStatus ssh-error branch, got %v", err)
|
|
}
|
|
|
|
dryCfg := lifecycleConfig(t)
|
|
dry := cluster.New(dryCfg, &execx.Runner{DryRun: true}, state.New(dryCfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
if err := dry.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/snap"); err != nil {
|
|
t.Fatalf("expected verifyEtcdSnapshot dry-run fast path, got %v", err)
|
|
}
|
|
|
|
runMissing := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "ssh" && strings.Contains(command, "stat -c %s"):
|
|
return "2097152", nil
|
|
case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"):
|
|
return "other-snapshot", nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchMissing, _ := newHookOrchestrator(t, cfg, runMissing, runMissing)
|
|
if err := orchMissing.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"); err == nil || !strings.Contains(err.Error(), "not present") {
|
|
t.Fatalf("expected verifyEtcdSnapshot missing-in-ls branch, got %v", err)
|
|
}
|
|
|
|
runBadHash := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "ssh" && strings.Contains(command, "stat -c %s"):
|
|
return "2097152", nil
|
|
case name == "ssh" && strings.Contains(command, "etcd-snapshot ls"):
|
|
return "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown", nil
|
|
case name == "ssh" && strings.Contains(command, "sha256sum"):
|
|
return "abc", nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchBadHash, _ := newHookOrchestrator(t, cfg, runBadHash, runBadHash)
|
|
if err := orchBadHash.TestHookVerifyEtcdSnapshot(context.Background(), "titan-db", "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown"); err == nil || !strings.Contains(err.Error(), "invalid sha256") {
|
|
t.Fatalf("expected verifyEtcdSnapshot invalid-hash branch, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("critical-endpoint-uncovered-ranges", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.CriticalServiceEndpointWaitSec = 1
|
|
cfg.Startup.CriticalServiceEndpointPollSec = 1
|
|
cfg.Startup.CriticalServiceEndpoints = []string{"invalid-entry"}
|
|
orchErr, _ := newHookOrchestrator(t, cfg, nil, nil)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orchErr.TestHookWaitForCriticalServiceEndpoints(ctx); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected critical endpoint canceled branch after parse error pass, got %v", err)
|
|
}
|
|
|
|
runNotFound := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "get endpoints") {
|
|
return "", errors.New("endpoints \"svc\" not found")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
cfgNF := lifecycleConfig(t)
|
|
cfgNF.Startup.CriticalServiceEndpoints = []string{"ns/svc"}
|
|
orchNF, _ := newHookOrchestrator(t, cfgNF, runNotFound, runNotFound)
|
|
ready, detail, ns, svc, err := orchNF.TestHookCriticalServiceEndpointsReady(context.Background())
|
|
if err != nil || ready || ns != "ns" || svc != "svc" || !strings.Contains(detail, "not found") {
|
|
t.Fatalf("expected critical endpoint notfound branch, ready=%v detail=%q ns=%q svc=%q err=%v", ready, detail, ns, svc, err)
|
|
}
|
|
|
|
runQueryErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "get endpoints") {
|
|
return "", errors.New("boom")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchQErr, _ := newHookOrchestrator(t, cfgNF, runQueryErr, runQueryErr)
|
|
if _, _, _, _, err := orchQErr.TestHookCriticalServiceEndpointsReady(context.Background()); err == nil || !strings.Contains(err.Error(), "query endpoints") {
|
|
t.Fatalf("expected critical endpoint query error branch, got %v", err)
|
|
}
|
|
|
|
runHealErr := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, " scale deployment svc "):
|
|
return "", nil
|
|
case name == "kubectl" && strings.Contains(command, "rollout status deployment/svc"):
|
|
return "", errors.New("rollout failed")
|
|
case name == "kubectl" && strings.Contains(command, " scale statefulset svc "):
|
|
return "", errors.New("statefulset not found")
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchHealErr, _ := newHookOrchestrator(t, lifecycleConfig(t), runHealErr, runHealErr)
|
|
if _, err := orchHealErr.TestHookMaybeHealCriticalEndpointBackends(context.Background(), "ns", "svc"); err == nil || !strings.Contains(err.Error(), "rollout failed") {
|
|
t.Fatalf("expected critical endpoint heal wait error branch, got %v", err)
|
|
}
|
|
|
|
cfgBadParse := lifecycleConfig(t)
|
|
cfgBadParse.Startup.CriticalServiceEndpoints = []string{"/svc"}
|
|
orchBadParse, _ := newHookOrchestrator(t, cfgBadParse, nil, nil)
|
|
if _, _, _, _, err := orchBadParse.TestHookCriticalServiceEndpointsReady(context.Background()); err == nil || !strings.Contains(err.Error(), "namespace and service") {
|
|
t.Fatalf("expected critical endpoint parse-empty branch, got %v", err)
|
|
}
|
|
})
|
|
}
|