ananke/testing/orchestrator/drill_lifecycle_test.go

375 lines
14 KiB
Go
Raw Permalink Normal View History

package orchestrator
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"scm.bstein.dev/bstein/ananke/internal/cluster"
"scm.bstein.dev/bstein/ananke/internal/config"
"scm.bstein.dev/bstein/ananke/internal/execx"
"scm.bstein.dev/bstein/ananke/internal/state"
)
type commandRecorder struct {
mu sync.Mutex
calls []string
}
// record runs one orchestration or CLI step.
// Signature: (r *commandRecorder) record(name string, args []string).
// Why: keeps startup/shutdown integration tests deterministic and auditable.
func (r *commandRecorder) record(name string, args []string) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, name+" "+strings.Join(args, " "))
}
// contains runs one orchestration or CLI step.
// Signature: (r *commandRecorder) contains(part string) bool.
// Why: validates that the orchestration flow reached the intended checklist gates.
func (r *commandRecorder) contains(part string) bool {
r.mu.Lock()
defer r.mu.Unlock()
for _, call := range r.calls {
if strings.Contains(call, part) {
return true
}
}
return false
}
// lifecycleConfig runs one orchestration or CLI step.
// Signature: lifecycleConfig(t *testing.T) config.Config.
// Why: centralizes a realistic startup/shutdown config used by top-level drill tests.
func lifecycleConfig(t *testing.T) config.Config {
t.Helper()
root := t.TempDir()
return config.Config{
SSHUser: "atlas",
SSHPort: 22,
ExpectedFluxBranch: "main",
ExpectedFluxSource: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git",
ControlPlanes: []string{"titan-db"},
Workers: []string{"titan-23"},
SSHManagedNodes: []string{"titan-db", "titan-23"},
SSHNodeHosts: map[string]string{
"titan-db": "titan-db",
"titan-23": "titan-23",
},
State: config.State{
Dir: root,
ReportsDir: filepath.Join(root, "reports"),
RunHistoryPath: filepath.Join(root, "runs.json"),
LockPath: filepath.Join(root, "ananke.lock"),
IntentPath: filepath.Join(root, "intent.json"),
},
Startup: config.Startup{
APIWaitSeconds: 2,
APIPollSeconds: 1,
RequireNodeInventoryReach: true,
NodeInventoryReachWaitSeconds: 2,
NodeInventoryReachPollSeconds: 1,
RequireNodeSSHAuth: false,
RequireStorageReady: false,
RequireIngressChecklist: false,
RequireServiceChecklist: false,
RequireCriticalServiceEndpoints: true,
CriticalServiceEndpointWaitSec: 2,
CriticalServiceEndpointPollSec: 1,
CriticalServiceEndpoints: []string{
"monitoring/victoria-metrics-single-server",
},
RequireFluxHealth: true,
FluxHealthWaitSeconds: 2,
FluxHealthPollSeconds: 1,
RequireWorkloadConvergence: true,
WorkloadConvergenceWaitSeconds: 2,
WorkloadConvergencePollSeconds: 1,
ServiceChecklistStabilitySec: 0,
AutoRecycleStuckPods: true,
StuckPodGraceSeconds: 1,
VaultUnsealKeyFile: filepath.Join(root, "vault-unseal.key"),
},
Shutdown: config.Shutdown{
DrainParallelism: 1,
ScaleParallelism: 1,
SSHParallelism: 1,
},
}
}
// lifecycleDispatcher runs one orchestration or CLI step.
// Signature: lifecycleDispatcher(recorder *commandRecorder) func(context.Context, time.Duration, string, ...string) (string, error).
// Why: emulates kubectl/ssh/git behavior so top-level tests can exercise full orchestration logic deterministically.
func lifecycleDispatcher(recorder *commandRecorder) func(context.Context, time.Duration, string, ...string) (string, error) {
return func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
recorder.record(name, args)
command := name + " " + strings.Join(args, " ")
switch name {
case "ssh":
switch {
case strings.Contains(command, "__ANANKE_NODE_REACHABLE__"):
return "__ANANKE_NODE_REACHABLE__", nil
case strings.Contains(command, "__ANANKE_SSH_AUTH_OK__"):
return "__ANANKE_SSH_AUTH_OK__", nil
case strings.Contains(command, "systemctl cat k3s"):
return "ExecStart=/usr/local/bin/k3s server", nil
case strings.Contains(command, "k3s etcd-snapshot"):
return "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown", nil
default:
return "ok", nil
}
case "git":
if strings.Contains(command, "status --porcelain") {
return "", nil
}
return "", nil
case "flux":
return "", nil
case "sh":
return "", nil
case "kubectl":
switch {
case strings.Contains(command, "version --request-timeout=5s"):
return "v1.31.0", nil
case strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"):
return "True", nil
case strings.Contains(command, "jsonpath={.spec.url}"):
return "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git", nil
case strings.Contains(command, "jsonpath={.spec.ref.branch}"):
return "main", nil
case strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false,"timeout":"30s"},"status":{"conditions":[{"type":"Ready","status":"True","reason":"ReconciliationSucceeded","message":"ok"}]}}]}`, nil
case strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io -o jsonpath="):
return "services\n", nil
case strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A -o jsonpath="):
return "monitoring/grafana\n", nil
case strings.Contains(command, "annotate kustomizations.kustomize.toolkit.fluxcd.io"):
return "", nil
case strings.Contains(command, "annotate --all-namespaces helmreleases.helm.toolkit.fluxcd.io"):
return "", nil
case strings.Contains(command, "get deploy,statefulset,daemonset -A -o json"):
return `{"items":[{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"grafana"},"spec":{"replicas":1,"template":{"spec":{}}},"status":{"readyReplicas":1}}]}`, nil
case strings.Contains(command, "get deploy,statefulset -A -o json"):
return `{"items":[{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"grafana"},"spec":{"replicas":1}},{"kind":"StatefulSet","metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server"},"spec":{"replicas":1}}]}`, nil
case strings.Contains(command, "get pods -A -o json"):
return `{"items":[]}`, nil
case strings.Contains(command, "get deployment -A -o jsonpath="):
return "monitoring\tgrafana\t1\n", nil
case strings.Contains(command, "get statefulset -A -o jsonpath="):
return "monitoring\tvictoria-metrics-single-server\t1\n", nil
case strings.Contains(command, "jsonpath={.status.readyReplicas}"):
return "1", nil
case strings.Contains(command, "get endpoints victoria-metrics-single-server"):
return "10.42.0.10\n", nil
case strings.Contains(command, "get jobs -A -o json"):
return `{"items":[]}`, nil
case strings.Contains(command, "rollout status"):
return "rolled out", nil
case strings.Contains(command, "cordon "):
return "", nil
case strings.Contains(command, "drain "):
return "", nil
case strings.Contains(command, "uncordon "):
return "", nil
case strings.Contains(command, "patch "):
return "", nil
case strings.Contains(command, "scale "):
return "", nil
default:
return "", nil
}
default:
return "", nil
}
}
}
// newLifecycleOrchestrator runs one orchestration or CLI step.
// Signature: newLifecycleOrchestrator(t *testing.T) (*cluster.Orchestrator, *commandRecorder, config.Config).
// Why: constructs an orchestrator with deterministic command behavior for top-level module tests.
func newLifecycleOrchestrator(t *testing.T) (*cluster.Orchestrator, *commandRecorder, config.Config) {
t.Helper()
cfg := lifecycleConfig(t)
if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil {
t.Fatalf("ensure state dir: %v", err)
}
runner := &execx.Runner{DryRun: false}
recorder := &commandRecorder{}
orch := cluster.New(cfg, runner, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
dispatch := lifecycleDispatcher(recorder)
orch.SetCommandOverrides(dispatch, dispatch)
return orch, recorder, cfg
}
// TestLifecycleStartupShutdownFromTopLevelModule runs one orchestration or CLI step.
// Signature: TestLifecycleStartupShutdownFromTopLevelModule(t *testing.T).
// Why: validates that a separate top-level testing module can drive full startup
// and shutdown flows, and confirms checklist-oriented steps were actually executed.
func TestLifecycleStartupShutdownFromTopLevelModule(t *testing.T) {
orch, recorder, cfg := newLifecycleOrchestrator(t)
ctx := context.Background()
if err := orch.Startup(ctx, cluster.StartupOptions{Reason: "drill"}); err != nil {
t.Fatalf("startup failed: %v", err)
}
if err := orch.Shutdown(ctx, cluster.ShutdownOptions{
Reason: "drill",
Mode: "cluster-only",
SkipEtcdSnapshot: true,
}); err != nil {
t.Fatalf("shutdown failed: %v", err)
}
if !recorder.contains("get gitrepository flux-system") {
t.Fatalf("expected flux source guard/read commands to run")
}
if !recorder.contains("get endpoints victoria-metrics-single-server") {
t.Fatalf("expected critical endpoint checklist to run")
}
if !recorder.contains("drain titan-23") {
t.Fatalf("expected worker drain command to run")
}
lastStartup := filepath.Join(cfg.State.Dir, "last-startup-report.json")
if _, err := os.Stat(lastStartup); err != nil {
t.Fatalf("expected startup report at %s: %v", lastStartup, err)
}
lastShutdown := filepath.Join(cfg.State.Dir, "last-shutdown-report.json")
if _, err := os.Stat(lastShutdown); err != nil {
t.Fatalf("expected shutdown report at %s: %v", lastShutdown, err)
}
}
// TestLifecycleStartupFailsWhenFluxSourceDrifts runs one orchestration or CLI step.
// Signature: TestLifecycleStartupFailsWhenFluxSourceDrifts(t *testing.T).
// Why: verifies startup blocks on flux source URL drift, preventing deadlock-prone bootstrap against the wrong repo.
func TestLifecycleStartupFailsWhenFluxSourceDrifts(t *testing.T) {
orch, _, cfg := newLifecycleOrchestrator(t)
orch.SetCommandOverrides(func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
command := name + " " + strings.Join(args, " ")
if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}") {
return "ssh://git@scm.bstein.dev:2242/bstein/wrong-repo.git", nil
}
if name == "kubectl" && strings.Contains(command, "version --request-timeout=5s") {
return "v1.31.0", nil
}
if name == "ssh" && strings.Contains(command, "__ANANKE_NODE_REACHABLE__") {
return "__ANANKE_NODE_REACHABLE__", nil
}
if name == "ssh" && strings.Contains(command, "systemctl cat k3s") {
return "ExecStart=/usr/local/bin/k3s server", nil
}
if name == "kubectl" && strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}") {
return "True", nil
}
if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}") {
return "main", nil
}
return "", nil
}, nil)
err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "drift-test"})
if err == nil {
t.Fatalf("expected startup to fail on flux source drift")
}
if !strings.Contains(err.Error(), "flux source url drift") {
t.Fatalf("unexpected startup error: %v", err)
}
intent, readErr := state.ReadIntent(cfg.State.IntentPath)
if readErr != nil {
t.Fatalf("read intent after failed startup: %v", readErr)
}
if intent.State != state.IntentNormal {
t.Fatalf("expected intent to be normalized after startup failure, got %q", intent.State)
}
}
// TestLifecycleShutdownRejectsLegacyPoweroffMode runs one orchestration or CLI step.
// Signature: TestLifecycleShutdownRejectsLegacyPoweroffMode(t *testing.T).
// Why: enforces the safety rule that Ananke never powers off hosts, only performs cluster-only shutdown.
func TestLifecycleShutdownRejectsLegacyPoweroffMode(t *testing.T) {
orch, _, _ := newLifecycleOrchestrator(t)
err := orch.Shutdown(context.Background(), cluster.ShutdownOptions{
Reason: "mode-test",
Mode: "poweroff",
})
if err == nil {
t.Fatalf("expected poweroff mode to be rejected")
}
if !strings.Contains(err.Error(), "no longer powers off hosts") {
t.Fatalf("unexpected shutdown mode error: %v", err)
}
}
// TestLifecycleStatusArtifactsIncludeProgress runs one orchestration or CLI step.
// Signature: TestLifecycleStatusArtifactsIncludeProgress(t *testing.T).
// Why: confirms startup writes progress snapshots so CLI status polling can report checklist progress live.
func TestLifecycleStatusArtifactsIncludeProgress(t *testing.T) {
orch, _, cfg := newLifecycleOrchestrator(t)
ctx := context.Background()
if err := orch.Startup(ctx, cluster.StartupOptions{Reason: "status-test"}); err != nil {
t.Fatalf("startup failed: %v", err)
}
progressPath := filepath.Join(cfg.State.Dir, "startup-progress.json")
b, err := os.ReadFile(progressPath)
if err != nil {
t.Fatalf("read startup progress artifact: %v", err)
}
payload := string(b)
if !strings.Contains(payload, `"status": "success"`) {
t.Fatalf("expected startup progress to show success, payload=%s", payload)
}
if !strings.Contains(payload, `"checks"`) {
t.Fatalf("expected startup progress payload to include checks map")
}
lastReportPath := filepath.Join(cfg.State.Dir, "last-startup-report.json")
lastReport, err := os.ReadFile(lastReportPath)
if err != nil {
t.Fatalf("read last startup report: %v", err)
}
if !strings.Contains(string(lastReport), `"phase": "complete"`) {
t.Fatalf("expected startup report to include final completion phase, got: %s", string(lastReport))
}
}
// TestLifecycleReasonAppearsInRunHistory runs one orchestration or CLI step.
// Signature: TestLifecycleReasonAppearsInRunHistory(t *testing.T).
// Why: ties operator-supplied reasons to durable history for post-drill analysis.
func TestLifecycleReasonAppearsInRunHistory(t *testing.T) {
orch, _, cfg := newLifecycleOrchestrator(t)
reason := fmt.Sprintf("drill-%d", time.Now().UnixNano())
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: reason}); err != nil {
t.Fatalf("startup failed: %v", err)
}
records, err := state.New(cfg.State.RunHistoryPath).Load()
if err != nil {
t.Fatalf("load run history: %v", err)
}
if len(records) == 0 {
t.Fatalf("expected at least one run history record")
}
found := false
for _, rec := range records {
if rec.Action == "startup" && rec.Reason == reason && rec.Success {
found = true
break
}
}
if !found {
t.Fatalf("expected startup record with reason %q in run history", reason)
}
}