700 lines
28 KiB
Go
700 lines
28 KiB
Go
package orchestrator
|
|
|
|
import (
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/ananke/internal/cluster"
|
|
"scm.bstein.dev/bstein/ananke/internal/config"
|
|
"scm.bstein.dev/bstein/ananke/internal/execx"
|
|
"scm.bstein.dev/bstein/ananke/internal/state"
|
|
)
|
|
|
|
// newLifecycleOrchestratorWithConfig runs one orchestration or CLI step.
|
|
// Signature: newLifecycleOrchestratorWithConfig(t *testing.T, cfg config.Config, run commandOverride, runSensitive commandOverride) (*cluster.Orchestrator, *commandRecorder).
|
|
// Why: centralizes top-level orchestrator construction so branch-heavy startup tests stay focused on intent.
|
|
func newLifecycleOrchestratorWithConfig(
|
|
t *testing.T,
|
|
cfg config.Config,
|
|
run func(context.Context, time.Duration, string, ...string) (string, error),
|
|
runSensitive func(context.Context, time.Duration, string, ...string) (string, error),
|
|
) (*cluster.Orchestrator, *commandRecorder) {
|
|
t.Helper()
|
|
if err := os.MkdirAll(cfg.State.Dir, 0o755); err != nil {
|
|
t.Fatalf("ensure state dir: %v", err)
|
|
}
|
|
recorder := &commandRecorder{}
|
|
if run == nil || runSensitive == nil {
|
|
base := lifecycleDispatcher(recorder)
|
|
if run == nil {
|
|
run = base
|
|
}
|
|
if runSensitive == nil {
|
|
runSensitive = base
|
|
}
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, runSensitive)
|
|
return orch, recorder
|
|
}
|
|
|
|
// TestLifecycleStartupBootstrapFallbackWithPeerGuard runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupBootstrapFallbackWithPeerGuard(t *testing.T).
|
|
// Why: covers peer intent guard, access reconciliation, local bootstrap fallback, and cache apply branches.
|
|
func TestLifecycleStartupBootstrapFallbackWithPeerGuard(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
repo := t.TempDir()
|
|
cfg.IACRepoPath = repo
|
|
cfg.LocalBootstrapPaths = []string{"services/bootstrap"}
|
|
cfg.Startup.ReconcileAccessOnBoot = true
|
|
cfg.Startup.RequireNodeSSHAuth = true
|
|
cfg.Startup.NodeSSHAuthWaitSeconds = 2
|
|
cfg.Startup.NodeSSHAuthPollSeconds = 1
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
cfg.Coordination.Role = "coordinator"
|
|
cfg.Coordination.PeerHosts = []string{"titan-24"}
|
|
cfg.SSHManagedNodes = append(cfg.SSHManagedNodes, "titan-24")
|
|
cfg.SSHNodeHosts["titan-24"] = "titan-24"
|
|
|
|
if err := os.MkdirAll(filepath.Join(repo, ".git"), 0o755); err != nil {
|
|
t.Fatalf("mkdir .git: %v", err)
|
|
}
|
|
if err := os.MkdirAll(filepath.Join(repo, "services", "bootstrap"), 0o755); err != nil {
|
|
t.Fatalf("mkdir bootstrap path: %v", err)
|
|
}
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
fluxReadyChecks := 0
|
|
cacheApplied := false
|
|
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml"):
|
|
recorder.record(name, args)
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
|
return "__ANANKE_BOOTSTRAP_ACTIVE__\nintent=startup_in_progress reason=\"manual-startup\" source=peer updated_at=" + now + "\n", nil
|
|
case name == "ssh" && strings.Contains(command, "/usr/bin/systemctl --version"):
|
|
recorder.record(name, args)
|
|
return "systemd 252", nil
|
|
case name == "kubectl" &&
|
|
strings.Contains(command, "get gitrepository flux-system") &&
|
|
strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"):
|
|
recorder.record(name, args)
|
|
fluxReadyChecks++
|
|
if fluxReadyChecks <= 2 {
|
|
return "", fmt.Errorf("flux source not ready")
|
|
}
|
|
return "True", nil
|
|
case name == "kubectl" && strings.Contains(command, " apply -k "):
|
|
recorder.record(name, args)
|
|
return "", fmt.Errorf("apply -k failed")
|
|
case name == "kubectl" && strings.Contains(command, " apply -f ") && strings.Contains(command, "bootstrap-cache"):
|
|
recorder.record(name, args)
|
|
cacheApplied = true
|
|
return "", nil
|
|
default:
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
runSensitive := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "git" && strings.Contains(command, "status --porcelain"):
|
|
recorder.record(name, args)
|
|
return "", nil
|
|
case name == "git" && strings.Contains(command, "fetch origin --prune"):
|
|
recorder.record(name, args)
|
|
return "", nil
|
|
case name == "git" && strings.Contains(command, "checkout main"):
|
|
recorder.record(name, args)
|
|
return "", nil
|
|
case name == "git" && strings.Contains(command, "reset --hard origin/main"):
|
|
recorder.record(name, args)
|
|
return "", nil
|
|
case name == "sh" && strings.Contains(command, "kubectl kustomize"):
|
|
recorder.record(name, args)
|
|
if strings.Contains(command, "| kubectl apply -f -") {
|
|
return "", fmt.Errorf("fallback kustomize pipe failed")
|
|
}
|
|
return "apiVersion: v1\nkind: Namespace\nmetadata:\n name: bootstrap\n", nil
|
|
default:
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, runSensitive)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "bootstrap-fallback"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
if !cacheApplied {
|
|
t.Fatalf("expected bootstrap cache apply fallback to run")
|
|
}
|
|
if !recorder.contains("ananke intent --config /etc/ananke/ananke.yaml") {
|
|
t.Fatalf("expected peer intent guard command in call log")
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupCoversTimeSyncStorageAndServiceChecklist runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupCoversTimeSyncStorageAndServiceChecklist(t *testing.T).
|
|
// Why: covers time-sync quorum path, storage readiness checks, and service checklist success path.
|
|
func TestLifecycleStartupCoversTimeSyncStorageAndServiceChecklist(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.ControlPlanes = []string{"titan-db", "titan-23"}
|
|
cfg.Workers = []string{"titan-24"}
|
|
cfg.SSHManagedNodes = []string{"titan-db", "titan-23", "titan-24"}
|
|
cfg.SSHNodeHosts["titan-23"] = "titan-23"
|
|
cfg.SSHNodeHosts["titan-24"] = "titan-24"
|
|
cfg.Startup.RequireTimeSync = true
|
|
cfg.Startup.TimeSyncMode = "quorum"
|
|
cfg.Startup.TimeSyncQuorum = 1
|
|
cfg.Startup.TimeSyncWaitSeconds = 2
|
|
cfg.Startup.TimeSyncPollSeconds = 1
|
|
cfg.Startup.RequireStorageReady = true
|
|
cfg.Startup.StorageReadyWaitSeconds = 2
|
|
cfg.Startup.StorageReadyPollSeconds = 1
|
|
cfg.Startup.StorageMinReadyNodes = 2
|
|
cfg.Startup.StorageCriticalPVCs = []string{"monitoring/grafana-data"}
|
|
cfg.Startup.RequireServiceChecklist = true
|
|
cfg.Startup.ServiceChecklistWaitSeconds = 2
|
|
cfg.Startup.ServiceChecklistPollSeconds = 1
|
|
cfg.Startup.ServiceChecklistStabilitySec = 0
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte(`{"status":"ok"}`))
|
|
}))
|
|
defer server.Close()
|
|
cfg.Startup.ServiceChecklist = []config.ServiceChecklistCheck{
|
|
{
|
|
Name: "status",
|
|
URL: server.URL,
|
|
AcceptedStatuses: []int{200},
|
|
BodyContains: `"status":"ok"`,
|
|
TimeoutSeconds: 5,
|
|
},
|
|
}
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "sh" && strings.Contains(command, "timedatectl show -p NTPSynchronized --value"):
|
|
recorder.record(name, args)
|
|
return "yes", nil
|
|
case name == "ssh" && strings.Contains(command, "timedatectl show -p NTPSynchronized --value"):
|
|
recorder.record(name, args)
|
|
if strings.Contains(command, "titan-db") {
|
|
return "yes", nil
|
|
}
|
|
return "no", nil
|
|
case name == "kubectl" && strings.Contains(command, "nodes.longhorn.io"):
|
|
recorder.record(name, args)
|
|
return "lh-a:True:True\nlh-b:True:True\n", nil
|
|
case name == "kubectl" && strings.Contains(command, "-n monitoring get pvc grafana-data"):
|
|
recorder.record(name, args)
|
|
return "Bound", nil
|
|
default:
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, run)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "timesync-storage-service"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
if !recorder.contains("nodes.longhorn.io") {
|
|
t.Fatalf("expected longhorn readiness query in call log")
|
|
}
|
|
if !recorder.contains("timedatectl show -p NTPSynchronized") {
|
|
t.Fatalf("expected timedatectl checks in call log")
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupFailsOnNodeSSHAuthDenied runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupFailsOnNodeSSHAuthDenied(t *testing.T).
|
|
// Why: verifies startup fails fast when SSH auth is denied for managed nodes.
|
|
func TestLifecycleStartupFailsOnNodeSSHAuthDenied(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireNodeSSHAuth = true
|
|
cfg.Startup.NodeSSHAuthWaitSeconds = 1
|
|
cfg.Startup.NodeSSHAuthPollSeconds = 1
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "__ANANKE_SSH_AUTH_OK__") && strings.Contains(command, "titan-23") {
|
|
recorder.record(name, args)
|
|
return "Permission denied (publickey)", fmt.Errorf("permission denied")
|
|
}
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
|
|
orch, _ := newLifecycleOrchestratorWithConfig(t, cfg, run, run)
|
|
err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "ssh-auth-denied"})
|
|
if err == nil {
|
|
t.Fatalf("expected startup to fail when ssh auth is denied")
|
|
}
|
|
if !strings.Contains(err.Error(), "ssh auth gate failed") {
|
|
t.Fatalf("unexpected startup error: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupVaultAutoUnsealPath runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupVaultAutoUnsealPath(t *testing.T).
|
|
// Why: covers vault sealed detection, unseal key retrieval, and auto-unseal retry success.
|
|
func TestLifecycleStartupVaultAutoUnsealPath(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
vaultReadyChecks := 0
|
|
vaultSealedChecks := 0
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "-n vault get pod vault-0 -o jsonpath={.status.phase}"):
|
|
recorder.record(name, args)
|
|
return "Running", nil
|
|
case name == "kubectl" && strings.Contains(command, "-n vault get statefulset vault -o jsonpath={.status.readyReplicas}"):
|
|
recorder.record(name, args)
|
|
vaultReadyChecks++
|
|
if vaultReadyChecks <= 2 {
|
|
return "0", nil
|
|
}
|
|
return "1", nil
|
|
case name == "kubectl" && strings.Contains(command, "vault status -format=json"):
|
|
recorder.record(name, args)
|
|
vaultSealedChecks++
|
|
if vaultSealedChecks <= 1 {
|
|
return `{"sealed":true,"initialized":true}`, nil
|
|
}
|
|
return `{"sealed":false,"initialized":true}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "-n vault get secret vault-init -o jsonpath={.data.unseal_key_b64}"):
|
|
recorder.record(name, args)
|
|
return base64.StdEncoding.EncodeToString([]byte("unseal-key")), nil
|
|
default:
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
runSensitive := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "vault operator unseal unseal-key") {
|
|
recorder.record(name, args)
|
|
return "ok", nil
|
|
}
|
|
return run(ctx, timeout, name, args...)
|
|
}
|
|
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, runSensitive)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "vault-unseal"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
if !recorder.contains("vault operator unseal unseal-key") {
|
|
t.Fatalf("expected vault unseal command in call log")
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupServiceChecklistAutoHealsIngressBackends runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupServiceChecklistAutoHealsIngressBackends(t *testing.T).
|
|
// Why: covers service-checklist failure recovery by auto-healing ingress namespace backend replicas.
|
|
func TestLifecycleStartupServiceChecklistAutoHealsIngressBackends(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
cfg.Startup.RequireServiceChecklist = true
|
|
cfg.Startup.ServiceChecklistWaitSeconds = 3
|
|
cfg.Startup.ServiceChecklistPollSeconds = 1
|
|
cfg.Startup.ServiceChecklistStabilitySec = 0
|
|
|
|
requests := 0
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
requests++
|
|
w.WriteHeader(http.StatusOK)
|
|
if requests <= 1 {
|
|
_, _ = w.Write([]byte(`{"database":"booting"}`))
|
|
return
|
|
}
|
|
_, _ = w.Write([]byte(`{"database":"ok"}`))
|
|
}))
|
|
defer server.Close()
|
|
cfg.Startup.ServiceChecklist = []config.ServiceChecklistCheck{
|
|
{
|
|
Name: "grafana-check",
|
|
URL: server.URL,
|
|
AcceptedStatuses: []int{200},
|
|
BodyContains: `"database":"ok"`,
|
|
TimeoutSeconds: 5,
|
|
},
|
|
}
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get ingress -A -o json"):
|
|
recorder.record(name, args)
|
|
return `{"items":[{"metadata":{"namespace":"monitoring","name":"grafana"},"spec":{"rules":[{"host":"127.0.0.1"}]}}]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
|
|
recorder.record(name, args)
|
|
return `{"items":[{"kind":"Deployment","metadata":{"namespace":"monitoring","name":"grafana"},"spec":{"replicas":0,"template":{"spec":{}}},"status":{"readyReplicas":0}}]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "-n monitoring scale deployment grafana --replicas=1"):
|
|
recorder.record(name, args)
|
|
return "", nil
|
|
case name == "kubectl" && strings.Contains(command, "rollout status deployment/grafana"):
|
|
recorder.record(name, args)
|
|
return "rolled out", nil
|
|
default:
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, run)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "service-heal"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
if !recorder.contains("scale deployment grafana --replicas=1") {
|
|
t.Fatalf("expected auto-heal scale command in call log")
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupWithPostStartProbes runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupWithPostStartProbes(t *testing.T).
|
|
// Why: covers post-start probe readiness checks in the final startup phase.
|
|
func TestLifecycleStartupWithPostStartProbes(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
cfg.Startup.RequirePostStartProbes = true
|
|
cfg.Startup.PostStartProbeWaitSeconds = 2
|
|
cfg.Startup.PostStartProbePollSeconds = 1
|
|
|
|
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ok"))
|
|
}))
|
|
defer server.Close()
|
|
cfg.Startup.PostStartProbes = []string{server.URL}
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
if name == "curl" {
|
|
recorder.record(name, args)
|
|
return "200", nil
|
|
}
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
orch, _ := newLifecycleOrchestratorWithConfig(t, cfg, run, run)
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "post-start-probes"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupForceFluxBranchPatchesSource runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupForceFluxBranchPatchesSource(t *testing.T).
|
|
// Why: covers the intentional branch-patch path used during controlled startup override scenarios.
|
|
func TestLifecycleStartupForceFluxBranchPatchesSource(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
patched := false
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "patch gitrepository flux-system --type=merge") {
|
|
recorder.record(name, args)
|
|
patched = true
|
|
return "", nil
|
|
}
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, run)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{
|
|
Reason: "force-branch",
|
|
ForceFluxBranch: "feature/sso",
|
|
}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
if !patched {
|
|
t.Fatalf("expected forced branch startup to patch GitRepository branch")
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupPeerRecentShutdownBlocks runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupPeerRecentShutdownBlocks(t *testing.T).
|
|
// Why: validates peer coordination cooldown guard before startup proceeds.
|
|
func TestLifecycleStartupPeerRecentShutdownBlocks(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Coordination.PeerHosts = []string{"titan-24"}
|
|
cfg.SSHManagedNodes = append(cfg.SSHManagedNodes, "titan-24")
|
|
cfg.SSHNodeHosts["titan-24"] = "titan-24"
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "ananke intent --config /etc/ananke/ananke.yaml") {
|
|
recorder.record(name, args)
|
|
now := time.Now().UTC().Format(time.RFC3339)
|
|
return "__ANANKE_BOOTSTRAP_IDLE__\nintent=shutdown_complete reason=\"drill\" source=peer updated_at=" + now + "\n", nil
|
|
}
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, run)
|
|
|
|
err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "peer-cooldown"})
|
|
if err == nil {
|
|
t.Fatalf("expected startup to be blocked by recent peer shutdown completion")
|
|
}
|
|
if !strings.Contains(err.Error(), "completed shutdown too recently") {
|
|
t.Fatalf("unexpected startup error: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupFailsWhenAllBootstrapPathsFail runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupFailsWhenAllBootstrapPathsFail(t *testing.T).
|
|
// Why: covers bootstrap fallback failure reporting when apply, render/apply, and cache paths all fail.
|
|
func TestLifecycleStartupFailsWhenAllBootstrapPathsFail(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
repo := t.TempDir()
|
|
cfg.IACRepoPath = repo
|
|
cfg.LocalBootstrapPaths = []string{"services/bootstrap"}
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
|
|
if err := os.MkdirAll(filepath.Join(repo, ".git"), 0o755); err != nil {
|
|
t.Fatalf("mkdir .git: %v", err)
|
|
}
|
|
if err := os.MkdirAll(filepath.Join(repo, "services", "bootstrap"), 0o755); err != nil {
|
|
t.Fatalf("mkdir bootstrap path: %v", err)
|
|
}
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
fluxReadyChecks := 0
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" &&
|
|
strings.Contains(command, "get gitrepository flux-system") &&
|
|
strings.Contains(command, "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}"):
|
|
recorder.record(name, args)
|
|
fluxReadyChecks++
|
|
if fluxReadyChecks <= 3 {
|
|
return "", fmt.Errorf("flux source unavailable")
|
|
}
|
|
return "True", nil
|
|
case name == "kubectl" && strings.Contains(command, " apply -k "):
|
|
recorder.record(name, args)
|
|
return "", fmt.Errorf("apply -k failed")
|
|
default:
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
runSensitive := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "git" && strings.Contains(command, "status --porcelain") {
|
|
recorder.record(name, args)
|
|
return "", nil
|
|
}
|
|
if name == "sh" && strings.Contains(command, "kubectl kustomize") {
|
|
recorder.record(name, args)
|
|
return "", fmt.Errorf("render failed")
|
|
}
|
|
return run(ctx, timeout, name, args...)
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, runSensitive)
|
|
|
|
err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "bootstrap-fail"})
|
|
if err == nil {
|
|
t.Fatalf("expected startup to fail when all bootstrap paths fail")
|
|
}
|
|
if !strings.Contains(err.Error(), "local bootstrap apply failed for every configured path") {
|
|
t.Fatalf("unexpected startup error: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupVaultUsesCachedUnsealKeyFallback runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupVaultUsesCachedUnsealKeyFallback(t *testing.T).
|
|
// Why: covers vault unseal fallback when secret retrieval fails but cached key exists.
|
|
func TestLifecycleStartupVaultUsesCachedUnsealKeyFallback(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
if err := os.WriteFile(cfg.Startup.VaultUnsealKeyFile, []byte("cached-unseal-key\n"), 0o600); err != nil {
|
|
t.Fatalf("write cached key: %v", err)
|
|
}
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
vaultReadyChecks := 0
|
|
vaultSealedChecks := 0
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "-n vault get pod vault-0 -o jsonpath={.status.phase}"):
|
|
recorder.record(name, args)
|
|
return "Running", nil
|
|
case name == "kubectl" && strings.Contains(command, "-n vault get statefulset vault -o jsonpath={.status.readyReplicas}"):
|
|
recorder.record(name, args)
|
|
vaultReadyChecks++
|
|
if vaultReadyChecks <= 2 {
|
|
return "0", nil
|
|
}
|
|
return "1", nil
|
|
case name == "kubectl" && strings.Contains(command, "vault status -format=json"):
|
|
recorder.record(name, args)
|
|
vaultSealedChecks++
|
|
if vaultSealedChecks <= 1 {
|
|
return `{"sealed":true,"initialized":true}`, nil
|
|
}
|
|
return `{"sealed":false,"initialized":true}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "-n vault get secret vault-init -o jsonpath={.data.unseal_key_b64}"):
|
|
recorder.record(name, args)
|
|
return "", fmt.Errorf("secret not found")
|
|
default:
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
runSensitive := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "kubectl" && strings.Contains(command, "vault operator unseal cached-unseal-key") {
|
|
recorder.record(name, args)
|
|
return "ok", nil
|
|
}
|
|
return run(ctx, timeout, name, args...)
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, runSensitive)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "vault-cached-key"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
if !recorder.contains("vault operator unseal cached-unseal-key") {
|
|
t.Fatalf("expected cached key unseal command in call log")
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupIngressChecklistTimeout runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupIngressChecklistTimeout(t *testing.T).
|
|
// Why: covers ingress checklist timeout and host-derived auto-heal attempt branches.
|
|
func TestLifecycleStartupIngressChecklistTimeout(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireIngressChecklist = true
|
|
cfg.Startup.IngressChecklistWaitSeconds = 1
|
|
cfg.Startup.IngressChecklistPollSeconds = 1
|
|
cfg.Startup.IngressChecklistInsecureSkip = true
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
switch {
|
|
case name == "kubectl" && strings.Contains(command, "get ingress -A -o json"):
|
|
recorder.record(name, args)
|
|
return `{"items":[{"metadata":{"namespace":"monitoring","name":"grafana"},"spec":{"rules":[{"host":"127.0.0.1"}]}}]}`, nil
|
|
case name == "kubectl" && strings.Contains(command, "get deploy,statefulset -A -o json"):
|
|
recorder.record(name, args)
|
|
return `{"items":[]}`, nil
|
|
default:
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, run)
|
|
|
|
err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "ingress-timeout"})
|
|
if err == nil {
|
|
t.Fatalf("expected startup to fail when ingress checklist cannot pass")
|
|
}
|
|
if !strings.Contains(err.Error(), "ingress checklist not satisfied") {
|
|
t.Fatalf("unexpected startup error: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestLifecycleStartupRepairsHostKeyAndRetriesSSHAuth runs one orchestration or CLI step.
|
|
// Signature: TestLifecycleStartupRepairsHostKeyAndRetriesSSHAuth(t *testing.T).
|
|
// Why: covers host-key mismatch retry logic inside SSH auth gate during startup.
|
|
func TestLifecycleStartupRepairsHostKeyAndRetriesSSHAuth(t *testing.T) {
|
|
cfg := lifecycleConfig(t)
|
|
cfg.Startup.RequireNodeSSHAuth = true
|
|
cfg.Startup.NodeSSHAuthWaitSeconds = 2
|
|
cfg.Startup.NodeSSHAuthPollSeconds = 1
|
|
cfg.Startup.RequireCriticalServiceEndpoints = false
|
|
cfg.Startup.RequireFluxHealth = false
|
|
cfg.Startup.RequireWorkloadConvergence = false
|
|
|
|
recorder := &commandRecorder{}
|
|
base := lifecycleDispatcher(recorder)
|
|
repairAttempts := 0
|
|
run := func(ctx context.Context, timeout time.Duration, name string, args ...string) (string, error) {
|
|
command := name + " " + strings.Join(args, " ")
|
|
if name == "ssh" && strings.Contains(command, "__ANANKE_SSH_AUTH_OK__") && strings.Contains(command, "titan-23") {
|
|
recorder.record(name, args)
|
|
repairAttempts++
|
|
if repairAttempts == 1 {
|
|
return "Host key verification failed", fmt.Errorf("exit status 255")
|
|
}
|
|
return "__ANANKE_SSH_AUTH_OK__", nil
|
|
}
|
|
return base(ctx, timeout, name, args...)
|
|
}
|
|
orch := cluster.New(cfg, &execx.Runner{DryRun: false}, state.New(cfg.State.RunHistoryPath), log.New(io.Discard, "", 0))
|
|
orch.SetCommandOverrides(run, run)
|
|
|
|
if err := orch.Startup(context.Background(), cluster.StartupOptions{Reason: "ssh-hostkey-repair"}); err != nil {
|
|
t.Fatalf("startup failed: %v", err)
|
|
}
|
|
if repairAttempts < 2 {
|
|
t.Fatalf("expected ssh auth retry after host key mismatch, attempts=%d", repairAttempts)
|
|
}
|
|
}
|