294 lines
13 KiB
Go
294 lines
13 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/ananke/internal/cluster"
|
|
"scm.bstein.dev/bstein/ananke/internal/config"
|
|
"scm.bstein.dev/bstein/ananke/internal/execx"
|
|
"scm.bstein.dev/bstein/ananke/internal/state"
|
|
)
|
|
|
|
// newDryRunHookOrchestrator runs one orchestration or CLI step.
|
|
// Signature: newDryRunHookOrchestrator(t *testing.T, cfg config.Config, run commandOverride) *cluster.Orchestrator.
|
|
// Why: some branch-only paths are dry-run guarded; this helper lets top-level tests
|
|
// exercise those paths without mutating real systems.
|
|
func newDryRunHookOrchestrator(
|
|
t *testing.T,
|
|
cfg config.Config,
|
|
run func(context.Context, time.Duration, string, ...string) (string, error),
|
|
) *cluster.Orchestrator {
|
|
t.Helper()
|
|
if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil {
|
|
t.Fatalf("ensure state dir: %v", err)
|
|
}
|
|
if run == nil {
|
|
run = lifecycleDispatcher(&commandRecorder{})
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: true}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, run)
|
|
return orch
|
|
}
|
|
|
|
// TestHookAccessFluxsourceBranchCloseout runs one orchestration or CLI step.
|
|
// Signature: TestHookAccessFluxsourceBranchCloseout(t *testing.T).
|
|
// Why: closes the remaining access/fluxsource branch gaps that only appear under
|
|
// unusual repo/auth states.
|
|
func TestHookAccessFluxsourceBranchCloseout(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.SSHManagedNodes = []string{"titan-db"}
|
|
cfg.Startup.RequireNodeSSHAuth = false
|
|
orch, _ := newHookOrchestrator(t, cfg, nil, nil)
|
|
|
|
if err := orch.TestHookReconcileNodeAccess(context.Background(), nil); err != nil {
|
|
t.Fatalf("expected empty-node reconcile fast-path, got %v", err)
|
|
}
|
|
|
|
if err := orch.TestHookWaitForNodeSSHAuth(context.Background(), []string{"titan-db"}); err != nil {
|
|
t.Fatalf("expected ssh-auth disabled fast-path, got %v", err)
|
|
}
|
|
|
|
dryRunCfg := lifecycleConfig(t)
|
|
dryRunCfg.LocalBootstrapPaths = []string{"services/bootstrap"}
|
|
dryRunCfg.IACRepoPath = t.TempDir()
|
|
if err := os.MkdirAll(filepath.Join(dryRunCfg.IACRepoPath, "services", "bootstrap"), 0o755); err != nil {
|
|
t.Fatalf("mkdir bootstrap path: %v", err)
|
|
}
|
|
orchDry := newDryRunHookOrchestrator(t, dryRunCfg, nil)
|
|
if err := orchDry.TestHookBootstrapLocal(context.Background()); err != nil {
|
|
t.Fatalf("expected dry-run bootstrap success, got %v", err)
|
|
}
|
|
|
|
cfgWaitDefaults := lifecycleConfig(t)
|
|
cfgWaitDefaults.Startup.RequireNodeSSHAuth = true
|
|
cfgWaitDefaults.Startup.NodeSSHAuthWaitSeconds = 0
|
|
cfgWaitDefaults.Startup.NodeSSHAuthPollSeconds = 0
|
|
waitRun := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "__ANANKE_SSH_AUTH_OK__") {
|
|
return "", errors.New("network unreachable")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchWaitDefaults, _ := newHookOrchestrator(t, cfgWaitDefaults, waitRun, waitRun)
|
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orchWaitDefaults.TestHookWaitForNodeSSHAuth(cancelCtx, []string{"titan-db"}); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled wait with default wait/poll fallback, got %v", err)
|
|
}
|
|
|
|
cfgSync := lifecycleConfig(t)
|
|
repo := t.TempDir()
|
|
cfgSync.IACRepoPath = repo
|
|
if err := os.MkdirAll(filepath.Join(repo, ".git"), 0o755); err != nil {
|
|
t.Fatalf("mkdir .git: %v", err)
|
|
}
|
|
checkoutErrRun := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "git" && strings.Contains(command, "status --porcelain"):
|
|
return "", nil
|
|
case name == "git" && strings.Contains(command, "fetch origin --prune"):
|
|
return "", nil
|
|
case name == "git" && strings.Contains(command, "checkout main"):
|
|
return "", errors.New("checkout failed")
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchCheckoutErr, _ := newHookOrchestrator(t, cfgSync, nil, checkoutErrRun)
|
|
if err := orchCheckoutErr.TestHookSyncLocalIACRepo(context.Background()); err == nil {
|
|
t.Fatalf("expected checkout error branch")
|
|
}
|
|
|
|
resetErrRun := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "git" && strings.Contains(command, "status --porcelain"):
|
|
return "", nil
|
|
case name == "git" && strings.Contains(command, "fetch origin --prune"):
|
|
return "", nil
|
|
case name == "git" && strings.Contains(command, "checkout main"):
|
|
return "", nil
|
|
case name == "git" && strings.Contains(command, "reset --hard origin/main"):
|
|
return "", errors.New("reset failed")
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orchResetErr, _ := newHookOrchestrator(t, cfgSync, nil, resetErrRun)
|
|
if err := orchResetErr.TestHookSyncLocalIACRepo(context.Background()); err == nil {
|
|
t.Fatalf("expected reset error branch")
|
|
}
|
|
|
|
cfgCache := lifecycleConfig(t)
|
|
cfgCache.IACRepoPath = t.TempDir()
|
|
cfgCache.LocalBootstrapPaths = nil
|
|
orchNoCachePaths, _ := newHookOrchestrator(t, cfgCache, nil, nil)
|
|
if err := orchNoCachePaths.TestHookRefreshBootstrapCache(context.Background()); err != nil {
|
|
t.Fatalf("expected empty bootstrap path list success, got %v", err)
|
|
}
|
|
}
|
|
|
|
// TestHookNodeReachabilityBranchCloseout runs one orchestration or CLI step.
|
|
// Signature: TestHookNodeReachabilityBranchCloseout(t *testing.T).
|
|
// Why: finishes uncovered inventory-reachability branches for ignored/duplicate/auth-denied paths.
|
|
func TestHookNodeReachabilityBranchCloseout(t *testing.T) {
|
|
dryCfg := lifecycleConfig(t)
|
|
dryCfg.Startup.RequireNodeInventoryReach = true
|
|
orchDry := newDryRunHookOrchestrator(t, dryCfg, nil)
|
|
if err := orchDry.TestHookWaitForNodeInventoryReachability(context.Background()); err != nil {
|
|
t.Fatalf("expected dry-run reachability success, got %v", err)
|
|
}
|
|
|
|
ignoreCfg := lifecycleConfig(t)
|
|
ignoreCfg.Startup.RequireNodeInventoryReach = true
|
|
ignoreCfg.Startup.IgnoreUnavailableNodes = []string{"titan-db", "titan-23"}
|
|
ignoreCfg.ControlPlanes = []string{"titan-db", "titan-db"}
|
|
ignoreCfg.Workers = []string{"titan-23", ""}
|
|
orchIgnore, _ := newHookOrchestrator(t, ignoreCfg, nil, nil)
|
|
if err := orchIgnore.TestHookWaitForNodeInventoryReachability(context.Background()); err != nil {
|
|
t.Fatalf("expected all-targets-ignored success, got %v", err)
|
|
}
|
|
|
|
authCfg := lifecycleConfig(t)
|
|
authCfg.Startup.RequireNodeInventoryReach = true
|
|
authCfg.Startup.NodeInventoryReachWaitSeconds = 1
|
|
authCfg.Startup.NodeInventoryReachPollSeconds = 1
|
|
authCfg.SSHManagedNodes = []string{"titan-db", "titan-23"}
|
|
authRun := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "__ANANKE_NODE_REACHABLE__") && strings.Contains(command, "titan-23") {
|
|
return "", errors.New("authentication failed")
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orchAuth, _ := newHookOrchestrator(t, authCfg, authRun, authRun)
|
|
if err := orchAuth.TestHookWaitForNodeInventoryReachability(context.Background()); err == nil || !strings.Contains(err.Error(), "auth denied") {
|
|
t.Fatalf("expected auth denied branch, got %v", err)
|
|
}
|
|
}
|
|
|
|
// TestHookStorageCriticalEndpointFluxCloseout runs one orchestration or CLI step.
|
|
// Signature: TestHookStorageCriticalEndpointFluxCloseout(t *testing.T).
|
|
// Why: closes remaining branch gaps for storage/endpoint/flux control loops with
|
|
// default-window and cancel/error fallbacks.
|
|
func TestHookStorageCriticalEndpointFluxCloseout(t *testing.T) {
|
|
t.Run("wait-for-storage-default-window-cancel", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.StorageReadyWaitSeconds = 0
|
|
cfg.Startup.StorageReadyPollSeconds = 0
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "nodes.longhorn.io") {
|
|
return "titan-23:True:True\n", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newHookOrchestrator(t, cfg, run, run)
|
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orch.TestHookWaitForStorageReady(cancelCtx); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled storage wait, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("critical-endpoint-default-window-cancel", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.CriticalServiceEndpointWaitSec = 0
|
|
cfg.Startup.CriticalServiceEndpointPollSec = 0
|
|
cfg.Startup.CriticalServiceEndpoints = []string{"monitoring/grafana"}
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "get endpoints grafana") {
|
|
return "", nil
|
|
}
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newHookOrchestrator(t, cfg, run, run)
|
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orch.TestHookWaitForCriticalServiceEndpoints(cancelCtx); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled critical-endpoint wait, got %v", err)
|
|
}
|
|
})
|
|
|
|
t.Run("flux-health-default-window-cancel-and-no-heal", func(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.FluxHealthWaitSeconds = 0
|
|
cfg.Startup.FluxHealthPollSeconds = 0
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
|
|
return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false},"status":{"conditions":[{"type":"Ready","status":"False","reason":"Progressing","message":"still reconciling"}]}}]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "get jobs -A -o json"):
|
|
return `{"items":[]}`, nil
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orch, _ := newHookOrchestrator(t, cfg, run, run)
|
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
if err := orch.TestHookWaitForFluxHealth(cancelCtx); !errors.Is(err, context.Canceled) {
|
|
t.Fatalf("expected canceled flux-health wait, got %v", err)
|
|
}
|
|
})
|
|
}
|
|
|
|
// TestHookWorkloadIgnoreBranchCloseout runs one orchestration or CLI step.
|
|
// Signature: TestHookWorkloadIgnoreBranchCloseout(t *testing.T).
|
|
// Why: exercises auto-recycle/auto-heal cooldown and pod parsing edge branches so
|
|
// convergence reporting remains stable when startup failures are noisy.
|
|
func TestHookWorkloadIgnoreBranchCloseout(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.AutoRecycleStuckPods = true
|
|
cfg.Startup.StuckPodGraceSeconds = 0
|
|
cfg.Startup.IgnoreWorkloadNamespaces = []string{"kube-system"}
|
|
cfg.Startup.IgnoreUnavailableNodes = []string{"titan-22"}
|
|
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
|
|
return `{"items":[
|
|
{"kind":"Deployment","metadata":{"namespace":"vault","name":"vault"},"spec":{"replicas":0}},
|
|
{"kind":"CronJob","metadata":{"namespace":"monitoring","name":"ignored"},"spec":{}}
|
|
]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "get pods -A -o json"):
|
|
return `{"items":[
|
|
{"metadata":{"namespace":"kube-system","name":"skip-ns"},"spec":{"nodeName":"titan-23"},"status":{"phase":"Pending","initContainerStatuses":[{"state":{"waiting":{"reason":"ImagePullBackOff"}}}]}}
|
|
]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "scale statefulset vault --replicas=1"):
|
|
return "", errors.New("Error from server (NotFound): statefulsets.apps \"vault\" not found")
|
|
default:
|
|
return lifecycleDispatcher(&commandRecorder{})(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
|
|
orch, _ := newHookOrchestrator(t, cfg, run, run)
|
|
last := time.Now()
|
|
orch.TestHookMaybeAutoRecycleStuckPods(context.Background(), &last)
|
|
orch.TestHookMaybeAutoHealCriticalWorkloadReplicas(context.Background(), &last)
|
|
|
|
last = time.Time{}
|
|
orch.TestHookMaybeAutoRecycleStuckPods(context.Background(), &last)
|
|
orch.TestHookMaybeAutoHealCriticalWorkloadReplicas(context.Background(), &last)
|
|
|
|
if _, err := orch.TestHookHealCriticalWorkloadReplicas(context.Background()); err != nil {
|
|
t.Fatalf("expected healCriticalWorkloadReplicas not-found branch to continue, got %v", err)
|
|
}
|
|
if failures, err := orch.TestHookStartupFailurePods(context.Background()); err != nil || len(failures) != 0 {
|
|
t.Fatalf("expected ignored pod list branch, got failures=%v err=%v", failures, err)
|
|
}
|
|
}
|