375 lines
14 KiB
Go
375 lines
14 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/ananke/internal/cluster"
|
|
"scm.bstein.dev/bstein/ananke/internal/config"
|
|
"scm.bstein.dev/bstein/ananke/internal/execx"
|
|
"scm.bstein.dev/bstein/ananke/internal/state"
|
|
)
|
|
|
|
type commandRecorder struct {
|
|
mu sync.Mutex
|
|
calls []string
|
|
}
|
|
|
|
// record runs one orchestration or CLI step.
|
|
// Signature: (r *commandRecorder) record(name string, args []string).
|
|
// Why: keeps startup/shutdown integration tests deterministic and auditable.
|
|
func (r *commandRecorder) record(name string, args []string) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.calls = append(r.calls, name+" "+strings.Join(args, " "))
|
|
}
|
|
|
|
// contains runs one orchestration or CLI step.
|
|
// Signature: (r *commandRecorder) contains(part string) bool.
|
|
// Why: validates that the orchestration flow reached the intended checklist gates.
|
|
func (r *commandRecorder) contains(part string) bool {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
for _, call := range r.calls {
|
|
if strings.Contains(call, part) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// lifecycleConfig runs one orchestration or CLI step.
|
|
// Signature: lifecycleConfig(t *testing.T) config.Config.
|
|
// Why: centralizes a realistic startup/shutdown config used by top-level drill tests.
|
|
func lifecycleConfig(t *testing.T) config.Config {
|
|
t.Helper()
|
|
root := t.TempDir()
|
|
return config.Config{
|
|
SSHUser: "atlas",
|
|
SSHPort: 22,
|
|
ExpectedFluxBranch: "main",
|
|
ExpectedFluxSource: "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git",
|
|
ControlPlanes: []string{"titan-db"},
|
|
Workers: []string{"titan-23"},
|
|
SSHManagedNodes: []string{"titan-db", "titan-23"},
|
|
SSHNodeHosts: map[string]string{
|
|
"titan-db": "titan-db",
|
|
"titan-23": "titan-23",
|
|
},
|
|
State: config.State{
|
|
Dir: root,
|
|
ReportsDir: filepath.Join(root, "reports"),
|
|
RunHistoryPath: filepath.Join(root, "runs.json"),
|
|
LockPath: filepath.Join(root, "ananke.lock"),
|
|
IntentPath: filepath.Join(root, "intent.json"),
|
|
},
|
|
Startup: config.Startup{
|
|
APIWaitSeconds: 2,
|
|
APIPollSeconds: 1,
|
|
RequireNodeInventoryReach: true,
|
|
NodeInventoryReachWaitSeconds: 2,
|
|
NodeInventoryReachPollSeconds: 1,
|
|
RequireNodeSSHAuth: false,
|
|
RequireStorageReady: false,
|
|
RequireIngressChecklist: false,
|
|
RequireServiceChecklist: false,
|
|
RequireCriticalServiceEndpoints: true,
|
|
CriticalServiceEndpointWaitSec: 2,
|
|
CriticalServiceEndpointPollSec: 1,
|
|
CriticalServiceEndpoints: []string{
|
|
"monitoring/victoria-metrics-single-server",
|
|
},
|
|
RequireFluxHealth: true,
|
|
FluxHealthWaitSeconds: 2,
|
|
FluxHealthPollSeconds: 1,
|
|
RequireWorkloadConvergence: true,
|
|
WorkloadConvergenceWaitSeconds: 2,
|
|
WorkloadConvergencePollSeconds: 1,
|
|
ServiceChecklistStabilitySec: 0,
|
|
AutoRecycleStuckPods: true,
|
|
StuckPodGraceSeconds: 1,
|
|
VaultUnsealKeyFile: filepath.Join(root, "vault-unseal.key"),
|
|
},
|
|
Shutdown: config.Shutdown{
|
|
DrainParallelism: 1,
|
|
ScaleParallelism: 1,
|
|
SSHParallelism: 1,
|
|
},
|
|
}
|
|
}
|
|
|
|
// lifecycleDispatcher runs one orchestration or CLI step.
|
|
// Signature: lifecycleDispatcher(recorder *commandRecorder) func(context.Context, time.Duration, string, ...string) (string, error).
|
|
// Why: emulates kubectl/ssh/git behavior so top-level tests can exercise full orchestration logic deterministically.
|
|
func lifecycleDispatcher(recorder *commandRecorder) func(context.Context, time.Duration, string, ...string) (string, error) {
|
|
return func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
|
|
recorder.record(name, args)
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch name {
|
|
case "ssh":
|
|
switch {
|
|
case strings.Contains(command, "__ANANKE_NODE_REACHABLE__"):
|
|
return "__ANANKE_NODE_REACHABLE__", nil
|
|
case strings.Contains(command, "__ANANKE_SSH_AUTH_OK__"):
|
|
return "__ANANKE_SSH_AUTH_OK__", nil
|
|
case strings.Contains(command, "systemctl cat k3s"):
|
|
return "ExecStart=/usr/local/bin/k3s server", nil
|
|
case strings.Contains(command, "k3s etcd-snapshot"):
|
|
return "/var/lib/rancher/k3s/server/db/snapshots/pre-shutdown", nil
|
|
default:
|
|
return "ok", nil
|
|
}
|
|
case "git":
|
|
if strings.Contains(command, "status --porcelain") {
|
|
return "", nil
|
|
}
|
|
return "", nil
|
|
case "flux":
|
|
return "", nil
|
|
case "sh":
|
|
return "", nil
|
|
case "kubectl":
|
|
switch {
|
|
case strings.Contains(command, "version --request-timeout=5s"):
|
|
return "v1.31.0", nil
|
|
case strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"):
|
|
return "True", nil
|
|
case strings.Contains(command, "jsonpath={.spec.url}"):
|
|
return "ssh://git@scm.bstein.dev:2242/bstein/titan-iac.git", nil
|
|
case strings.Contains(command, "jsonpath={.spec.ref.branch}"):
|
|
return "main", nil
|
|
case strings.Contains(command, "get kustomizations.kustomize.toolkit.fluxcd.io -A -o json"):
|
|
return `{"items":[{"metadata":{"namespace":"flux-system","name":"services"},"spec":{"suspend":false,"timeout":"30s"},"status":{"conditions":[{"type":"Ready","status":"True","reason":"ReconciliationSucceeded","message":"ok"}]}}]}`, nil
|
|
case strings.Contains(command, "-n flux-system get kustomizations.kustomize.toolkit.fluxcd.io -o jsonpath="):
|
|
return "services\n", nil
|
|
case strings.Contains(command, "get helmreleases.helm.toolkit.fluxcd.io -A -o jsonpath="):
|
|
return "monitoring/grafana\n", nil
|
|
case strings.Contains(command, "annotate kustomizations.kustomize.toolkit.fluxcd.io"):
|
|
return "", nil
|
|
case strings.Contains(command, "annotate --all-namespaces helmreleases.helm.toolkit.fluxcd.io"):
|
|
return "", nil
|
|
case strings.Contains(command, "get deploy,statefulset,daemonset -A -o json"):
|
|
return `{"items":[{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"grafana"},"spec":{"replicas":1,"template":{"spec":{}}},"status":{"readyReplicas":1}}]}`, nil
|
|
case strings.Contains(command, "get deploy,statefulset -A -o json"):
|
|
return `{"items":[{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"grafana"},"spec":{"replicas":1}},{"kind":"StatefulSet","metadata":{"namespace":"monitoring","name":"victoria-metrics-single-server"},"spec":{"replicas":1}}]}`, nil
|
|
case strings.Contains(command, "get pods -A -o json"):
|
|
return `{"items":[]}`, nil
|
|
case strings.Contains(command, "get deployment -A -o jsonpath="):
|
|
return "monitoring\tgrafana\t1\n", nil
|
|
case strings.Contains(command, "get statefulset -A -o jsonpath="):
|
|
return "monitoring\tvictoria-metrics-single-server\t1\n", nil
|
|
case strings.Contains(command, "jsonpath={.status.readyReplicas}"):
|
|
return "1", nil
|
|
case strings.Contains(command, "get endpoints victoria-metrics-single-server"):
|
|
return "10.42.0.10\n", nil
|
|
case strings.Contains(command, "get jobs -A -o json"):
|
|
return `{"items":[]}`, nil
|
|
case strings.Contains(command, "rollout status"):
|
|
return "rolled out", nil
|
|
case strings.Contains(command, "cordon "):
|
|
return "", nil
|
|
case strings.Contains(command, "drain "):
|
|
return "", nil
|
|
case strings.Contains(command, "uncordon "):
|
|
return "", nil
|
|
case strings.Contains(command, "patch "):
|
|
return "", nil
|
|
case strings.Contains(command, "scale "):
|
|
return "", nil
|
|
default:
|
|
return "", nil
|
|
}
|
|
default:
|
|
return "", nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// newLifecycleOrchestrator runs one orchestration or CLI step.
|
|
// Signature: newLifecycleOrchestrator(t *testing.T) (*cluster.Orchestrator, *commandRecorder, config.Config).
|
|
// Why: constructs an orchestrator with deterministic command behavior for top-level module tests.
|
|
func newLifecycleOrchestrator(t *testing.T) (*cluster.Orchestrator, *commandRecorder, config.Config) {
|
|
t.Helper()
|
|
cfg := lifecycleConfig(t)
|
|
if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil {
|
|
t.Fatalf("ensure state dir: %v", err)
|
|
}
|
|
runner := &execx.Runner{DryRun: false}
|
|
recorder := &commandRecorder{}
|
|
orch := cluster.New(cfg, runner, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
dispatch := lifecycleDispatcher(recorder)
|
|
orch.SetCommandOverrides(dispatch, dispatch)
|
|
return orch, recorder, cfg
|
|
}
|
|
|
|
// TestLifecycleStartupShutdownFromTopLevelModule runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupShutdownFromTopLevelModule(t *testing.T).
|
|
// Why: validates that a separate top-level testing module can drive full startup
|
|
// and shutdown flows, and confirms checklist-oriented steps were actually executed.
|
|
func TestLifecycleStartupShutdownFromTopLevelModule(t *testing.T) {
|
|
orch, recorder, cfg := newLifecycleOrchestrator(t)
|
|
ctx := context.Background()
|
|
|
|
if err := orch.Startup(ctx, cluster.StartupOptions{Reason: "drill"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
if err := orch.Shutdown(ctx, cluster.ShutdownOptions{
|
|
Reason: "drill",
|
|
Mode: "cluster-only",
|
|
SkipEtcdSnapshot: true,
|
|
}); err != nil {
|
|
t.Fatalf("shutdown failed: %v", err)
|
|
}
|
|
|
|
if !recorder.contains("get gitrepository flux-system") {
|
|
t.Fatalf("expected flux source guard/read commands to run")
|
|
}
|
|
if !recorder.contains("get endpoints victoria-metrics-single-server") {
|
|
t.Fatalf("expected critical endpoint checklist to run")
|
|
}
|
|
if !recorder.contains("drain titan-23") {
|
|
t.Fatalf("expected worker drain command to run")
|
|
}
|
|
|
|
lastStartup := filepath.Join(cfg.State.Dir, "last-startup-report.json")
|
|
if _, err := os.Stat(lastStartup); err != nil {
|
|
t.Fatalf("expected startup report at %s: %v", lastStartup, err)
|
|
}
|
|
lastShutdown := filepath.Join(cfg.State.Dir, "last-shutdown-report.json")
|
|
if _, err := os.Stat(lastShutdown); err != nil {
|
|
t.Fatalf("expected shutdown report at %s: %v", lastShutdown, err)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupFailsWhenFluxSourceDrifts runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupFailsWhenFluxSourceDrifts(t *testing.T).
|
|
// Why: verifies startup blocks on flux source URL drift, preventing deadlock-prone bootstrap against the wrong repo.
|
|
func TestLifecycleStartupFailsWhenFluxSourceDrifts(t *testing.T) {
|
|
orch, _, cfg := newLifecycleOrchestrator(t)
|
|
orch.SetCommandOverrides(func(_ context.Context, _ time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.url}") {
|
|
return "ssh://git@scm.bstein.dev:2242/bstein/wrong-repo.git", nil
|
|
}
|
|
if name == "kubectl" && strings.Contains(command, "version --request-timeout=5s") {
|
|
return "v1.31.0", nil
|
|
}
|
|
if name == "ssh" && strings.Contains(command, "__ANANKE_NODE_REACHABLE__") {
|
|
return "__ANANKE_NODE_REACHABLE__", nil
|
|
}
|
|
if name == "ssh" && strings.Contains(command, "systemctl cat k3s") {
|
|
return "ExecStart=/usr/local/bin/k3s server", nil
|
|
}
|
|
if name == "kubectl" && strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}") {
|
|
return "True", nil
|
|
}
|
|
if name == "kubectl" && strings.Contains(command, "jsonpath={.spec.ref.branch}") {
|
|
return "main", nil
|
|
}
|
|
return "", nil
|
|
}, nil)
|
|
|
|
err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "drift-test"})
|
|
if err == nil {
|
|
t.Fatalf("expected startup to fail on flux source drift")
|
|
}
|
|
if !strings.Contains(err.Error(), "flux source url drift") {
|
|
t.Fatalf("unexpected startup error: %v", err)
|
|
}
|
|
|
|
intent, readErr := state.ReadIntent(cfg.State.IntentPath)
|
|
if readErr != nil {
|
|
t.Fatalf("read intent after failed startup: %v", readErr)
|
|
}
|
|
if intent.State != state.IntentNormal {
|
|
t.Fatalf("expected intent to be normalized after startup failure, got %q", intent.State)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleShutdownRejectsLegacyPoweroffMode runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleShutdownRejectsLegacyPoweroffMode(t *testing.T).
|
|
// Why: enforces the safety rule that Ananke never powers off hosts, only performs cluster-only shutdown.
|
|
func TestLifecycleShutdownRejectsLegacyPoweroffMode(t *testing.T) {
|
|
orch, _, _ := newLifecycleOrchestrator(t)
|
|
err := orch.Shutdown(context.Background(), cluster.ShutdownOptions{
|
|
Reason: "mode-test",
|
|
Mode: "poweroff",
|
|
})
|
|
if err == nil {
|
|
t.Fatalf("expected poweroff mode to be rejected")
|
|
}
|
|
if !strings.Contains(err.Error(), "no longer powers off hosts") {
|
|
t.Fatalf("unexpected shutdown mode error: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStatusArtifactsIncludeProgress runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStatusArtifactsIncludeProgress(t *testing.T).
|
|
// Why: confirms startup writes progress snapshots so CLI status polling can report checklist progress live.
|
|
func TestLifecycleStatusArtifactsIncludeProgress(t *testing.T) {
|
|
orch, _, cfg := newLifecycleOrchestrator(t)
|
|
ctx := context.Background()
|
|
|
|
if err := orch.Startup(ctx, cluster.StartupOptions{Reason: "status-test"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
|
|
progressPath := filepath.Join(cfg.State.Dir, "startup-progress.json")
|
|
b, err := os.ReadFile(progressPath)
|
|
if err != nil {
|
|
t.Fatalf("read startup progress artifact: %v", err)
|
|
}
|
|
payload := string(b)
|
|
if !strings.Contains(payload, `"status": "success"`) {
|
|
t.Fatalf("expected startup progress to show success, payload=%s", payload)
|
|
}
|
|
if !strings.Contains(payload, `"checks"`) {
|
|
t.Fatalf("expected startup progress payload to include checks map")
|
|
}
|
|
|
|
lastReportPath := filepath.Join(cfg.State.Dir, "last-startup-report.json")
|
|
lastReport, err := os.ReadFile(lastReportPath)
|
|
if err != nil {
|
|
t.Fatalf("read last startup report: %v", err)
|
|
}
|
|
if !strings.Contains(string(lastReport), `"phase": "complete"`) {
|
|
t.Fatalf("expected startup report to include final completion phase, got: %s", string(lastReport))
|
|
}
|
|
}
|
|
|
|
// TestLifecycleReasonAppearsInRunHistory runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleReasonAppearsInRunHistory(t *testing.T).
|
|
// Why: ties operator-supplied reasons to durable history for post-drill analysis.
|
|
func TestLifecycleReasonAppearsInRunHistory(t *testing.T) {
|
|
orch, _, cfg := newLifecycleOrchestrator(t)
|
|
reason := fmt.Sprintf("drill-%d", time.Now().UnixNano())
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: reason}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
records, err := state.New(cfg.State.RunHistoryPath).Load()
|
|
if err != nil {
|
|
t.Fatalf("load run history: %v", err)
|
|
}
|
|
if len(records) == 0 {
|
|
t.Fatalf("expected at least one run history record")
|
|
}
|
|
found := false
|
|
for _, rec := range records {
|
|
if rec.Action == "startup" && rec.Reason == reason && rec.Success {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if !found {
|
|
t.Fatalf("expected startup record with reason %q in run history", reason)
|
|
}
|
|
}
|