diff --git a/README.md b/README.md index e873965..89cf237 100644 --- a/README.md +++ b/README.md @@ -25,12 +25,18 @@ Vault unseal breakglass is wired for remote retrieval (magic mirror host). If lo ## What “startup complete” means now Ananke does **not** stop at “Flux says Ready”. Startup only completes when all configured gates pass: +- node inventory preflight passes (host mapping + ssh user + port sanity) +- node SSH auth gate passes (real command execution, not just TCP) - Flux source drift guard passes (`expected_flux_source_url` + branch expectation) - Flux kustomizations are healthy - controller convergence is healthy (deployments/statefulsets/daemonsets) +- ingress checklist passes (all discovered ingress hosts reachable with accepted status) - external service checklist passes (Gitea, Grafana, Keycloak OIDC, Harbor registry auth challenge, Longhorn auth redirect) - stability soak window passes (no regressions, no CrashLoop/ImagePull failures) +During startup, Ananke also auto-heals known failure patterns (stuck controller pods, immutable Flux Jobs, critical workloads scaled to zero) and writes a report: +- `/var/lib/ananke/last-startup-report.json` + If any gate fails, startup is blocked with a concrete reason. ## Command quick sheet @@ -76,6 +82,8 @@ Primary path: Core settings to keep accurate: - `expected_flux_branch` - `expected_flux_source_url` +- `startup.require_node_ssh_auth` +- `startup.require_ingress_checklist` - `startup.service_checklist` - `startup.service_checklist_stability_seconds` - `startup.ignore_unavailable_nodes` (for planned temporary node outages) diff --git a/configs/ananke.example.yaml b/configs/ananke.example.yaml index c708745..7077f90 100644 --- a/configs/ananke.example.yaml +++ b/configs/ananke.example.yaml @@ -103,6 +103,15 @@ startup: url: https://longhorn.bstein.dev/ accepted_statuses: [200, 302] timeout_seconds: 12 + require_ingress_checklist: true + ingress_checklist_wait_seconds: 420 + ingress_checklist_poll_seconds: 5 + ingress_checklist_accepted_statuses: [200, 301, 302, 307, 308, 401, 403, 404] + ingress_checklist_ignore_hosts: [] + ingress_checklist_insecure_skip_tls: false + require_node_ssh_auth: true + node_ssh_auth_wait_seconds: 240 + node_ssh_auth_poll_seconds: 5 require_flux_health: true flux_health_wait_seconds: 900 flux_health_poll_seconds: 5 diff --git a/configs/ananke.tethys.yaml b/configs/ananke.tethys.yaml index adb1863..4d36744 100644 --- a/configs/ananke.tethys.yaml +++ b/configs/ananke.tethys.yaml @@ -169,6 +169,15 @@ startup: url: https://longhorn.bstein.dev/ accepted_statuses: [200, 302] timeout_seconds: 12 + require_ingress_checklist: true + ingress_checklist_wait_seconds: 420 + ingress_checklist_poll_seconds: 5 + ingress_checklist_accepted_statuses: [200, 301, 302, 307, 308, 401, 403, 404] + ingress_checklist_ignore_hosts: [] + ingress_checklist_insecure_skip_tls: false + require_node_ssh_auth: true + node_ssh_auth_wait_seconds: 240 + node_ssh_auth_poll_seconds: 5 require_flux_health: true flux_health_wait_seconds: 900 flux_health_poll_seconds: 5 diff --git a/configs/ananke.titan-db.yaml b/configs/ananke.titan-db.yaml index 12a2e60..1bddf44 100644 --- a/configs/ananke.titan-db.yaml +++ b/configs/ananke.titan-db.yaml @@ -169,6 +169,15 @@ startup: url: https://longhorn.bstein.dev/ accepted_statuses: [200, 302] timeout_seconds: 12 + require_ingress_checklist: true + ingress_checklist_wait_seconds: 420 + ingress_checklist_poll_seconds: 5 + ingress_checklist_accepted_statuses: [200, 301, 302, 307, 308, 401, 403, 404] + ingress_checklist_ignore_hosts: [] + ingress_checklist_insecure_skip_tls: false + require_node_ssh_auth: true + node_ssh_auth_wait_seconds: 240 + node_ssh_auth_poll_seconds: 5 require_flux_health: true flux_health_wait_seconds: 900 flux_health_poll_seconds: 5 diff --git a/internal/cluster/orchestrator.go b/internal/cluster/orchestrator.go index c50633c..86c37c3 100644 --- a/internal/cluster/orchestrator.go +++ b/internal/cluster/orchestrator.go @@ -30,10 +30,12 @@ import ( ) type Orchestrator struct { - cfg config.Config - runner *execx.Runner - store *state.Store - log *log.Logger + cfg config.Config + runner *execx.Runner + store *state.Store + log *log.Logger + startupReportMu sync.Mutex + activeStartupReport *startupReport } type StartupOptions struct { @@ -77,6 +79,23 @@ type workloadScaleSnapshot struct { Entries []workloadScaleEntry `json:"entries"` } +type startupReport struct { + StartedAt time.Time `json:"started_at"` + Completed time.Time `json:"completed_at"` + Reason string `json:"reason"` + Success bool `json:"success"` + Error string `json:"error,omitempty"` + Checks map[string]startupCheckRecord `json:"checks"` + AutoHeals []string `json:"auto_heals"` + SourceHost string `json:"source_host"` +} + +type startupCheckRecord struct { + Status string `json:"status"` + Detail string `json:"detail"` + UpdatedAt time.Time `json:"updated_at"` +} + var datastoreEndpointPattern = regexp.MustCompile(`--datastore-endpoint(?:=|\s+)(?:'([^']+)'|"([^"]+)"|([^\s\\]+))`) var criticalStartupWorkloads = []startupWorkload{ @@ -87,6 +106,8 @@ var criticalStartupWorkloads = []startupWorkload{ {Namespace: "vault", Kind: "statefulset", Name: "vault"}, {Namespace: "postgres", Kind: "statefulset", Name: "postgres"}, {Namespace: "gitea", Kind: "deployment", Name: "gitea"}, + {Namespace: "monitoring", Kind: "deployment", Name: "grafana"}, + {Namespace: "monitoring", Kind: "deployment", Name: "kube-state-metrics"}, } var ErrEtcdRestoreNotApplicable = errors.New("etcd restore not applicable") @@ -101,6 +122,8 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er return err } defer unlock() + o.beginStartupReport(opts.Reason) + defer o.finalizeStartupReport(err) record := state.RunRecord{ ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()), @@ -110,6 +133,11 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er StartedAt: time.Now().UTC(), } defer o.finalizeRecord(&record, &err) + if invErr := o.validateNodeInventory(); invErr != nil { + o.noteStartupCheck("node-inventory", false, invErr.Error()) + return invErr + } + o.noteStartupCheck("node-inventory", true, "inventory/user/port validation passed") resumedFlux := false defer func() { if o.runner.DryRun || err == nil || resumedFlux { @@ -258,6 +286,13 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er } o.startWorkers(ctx, workers) o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) }) + sshCheckNodes := append([]string{}, o.cfg.ControlPlanes...) + sshCheckNodes = append(sshCheckNodes, workers...) + if err := o.waitForNodeSSHAuth(ctx, sshCheckNodes); err != nil { + o.noteStartupCheck("node-ssh-auth", false, err.Error()) + return err + } + o.noteStartupCheck("node-ssh-auth", true, fmt.Sprintf("nodes=%d", len(sshCheckNodes))) needsLocalBootstrap := false bootstrapReasons := []string{} @@ -283,8 +318,10 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er } if o.cfg.Startup.RequireStorageReady { if err := o.waitForStorageReady(ctx); err != nil { + o.noteStartupCheck("storage-readiness", false, err.Error()) return err } + o.noteStartupCheck("storage-readiness", true, "longhorn and critical PVCs ready") } if err := o.ensureCriticalStartupWorkloads(ctx); err != nil { return err @@ -322,10 +359,23 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er return err } resumedFlux = true + o.bestEffort("heal critical zero-replica workloads", func() error { + healed, healErr := o.healCriticalWorkloadReplicas(ctx) + if healErr != nil { + return healErr + } + if len(healed) > 0 { + sort.Strings(healed) + o.noteStartupAutoHeal(fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8))) + } + return nil + }) if o.cfg.Startup.RequirePostStartProbes { if err := o.waitForPostStartProbes(ctx); err != nil { + o.noteStartupCheck("post-start-probes", false, err.Error()) return err } + o.noteStartupCheck("post-start-probes", true, "post-start probes passed") } if err := o.waitForStartupConvergence(ctx); err != nil { return err @@ -419,6 +469,9 @@ func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err return err } defer unlock() + if invErr := o.validateNodeInventory(); invErr != nil { + return invErr + } record := state.RunRecord{ ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()), @@ -520,6 +573,79 @@ func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) { } } +func (o *Orchestrator) startupReportPath() string { + return filepath.Join(o.cfg.State.Dir, "last-startup-report.json") +} + +func (o *Orchestrator) beginStartupReport(reason string) { + host, _ := os.Hostname() + o.startupReportMu.Lock() + defer o.startupReportMu.Unlock() + o.activeStartupReport = &startupReport{ + StartedAt: time.Now().UTC(), + Reason: strings.TrimSpace(reason), + Checks: map[string]startupCheckRecord{}, + AutoHeals: []string{}, + SourceHost: strings.TrimSpace(host), + } +} + +func (o *Orchestrator) noteStartupCheck(name string, success bool, detail string) { + o.startupReportMu.Lock() + defer o.startupReportMu.Unlock() + if o.activeStartupReport == nil { + return + } + status := "failed" + if success { + status = "passed" + } + o.activeStartupReport.Checks[strings.TrimSpace(name)] = startupCheckRecord{ + Status: status, + Detail: strings.TrimSpace(detail), + UpdatedAt: time.Now().UTC(), + } +} + +func (o *Orchestrator) noteStartupAutoHeal(detail string) { + o.startupReportMu.Lock() + defer o.startupReportMu.Unlock() + if o.activeStartupReport == nil { + return + } + detail = strings.TrimSpace(detail) + if detail == "" { + return + } + o.activeStartupReport.AutoHeals = append(o.activeStartupReport.AutoHeals, detail) +} + +func (o *Orchestrator) finalizeStartupReport(runErr error) { + o.startupReportMu.Lock() + report := o.activeStartupReport + o.activeStartupReport = nil + o.startupReportMu.Unlock() + if report == nil { + return + } + report.Completed = time.Now().UTC() + report.Success = runErr == nil + if runErr != nil { + report.Error = strings.TrimSpace(runErr.Error()) + } + b, err := json.MarshalIndent(report, "", " ") + if err != nil { + o.log.Printf("warning: encode startup report failed: %v", err) + return + } + path := o.startupReportPath() + if writeErr := os.WriteFile(path, b, 0o640); writeErr != nil { + o.log.Printf("warning: write startup report %s failed: %v", path, writeErr) + return + } + o.log.Printf("startup report written: %s", path) +} + func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) { if len(o.cfg.Workers) > 0 { return append([]string{}, o.cfg.Workers...), nil @@ -1517,6 +1643,101 @@ func (o *Orchestrator) nodeNameForHost(host string) string { return "" } +func (o *Orchestrator) inventoryNodesForValidation() []string { + set := map[string]struct{}{} + add := func(node string) { + node = strings.TrimSpace(node) + if node == "" { + return + } + set[node] = struct{}{} + } + for _, n := range o.cfg.ControlPlanes { + add(n) + } + for _, n := range o.cfg.Workers { + add(n) + } + for _, n := range o.cfg.SSHManagedNodes { + add(n) + } + for _, n := range o.cfg.Shutdown.ExtraPoweroffHosts { + add(n) + } + for _, n := range o.cfg.Coordination.PeerHosts { + add(n) + } + add(o.cfg.Coordination.ForwardShutdownHost) + + nodes := make([]string, 0, len(set)) + for n := range set { + nodes = append(nodes, n) + } + sort.Strings(nodes) + return nodes +} + +func (o *Orchestrator) validateNodeInventory() error { + issues := []string{} + if o.cfg.SSHPort <= 0 || o.cfg.SSHPort > 65535 { + issues = append(issues, fmt.Sprintf("ssh_port=%d is invalid", o.cfg.SSHPort)) + } + + managed := makeStringSet(o.cfg.SSHManagedNodes) + for _, cp := range o.cfg.ControlPlanes { + cp = strings.TrimSpace(cp) + if cp == "" { + continue + } + if _, ok := managed[cp]; !ok { + issues = append(issues, fmt.Sprintf("control plane %s is missing from ssh_managed_nodes", cp)) + } + } + for _, node := range o.cfg.Workers { + node = strings.TrimSpace(node) + if node == "" { + continue + } + if _, ok := managed[node]; !ok { + issues = append(issues, fmt.Sprintf("worker %s is missing from ssh_managed_nodes", node)) + } + } + + baseUser := strings.TrimSpace(o.cfg.SSHUser) + for _, node := range o.inventoryNodesForValidation() { + if _, ok := o.cfg.SSHNodeHosts[node]; !ok { + issues = append(issues, fmt.Sprintf("%s is missing ssh_node_hosts entry", node)) + } + host := strings.TrimSpace(o.cfg.SSHNodeHosts[node]) + if host == "" { + host = node + } + if strings.ContainsAny(host, " \t\r\n") { + issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains whitespace)", node, host)) + } + if strings.Contains(host, "/") { + issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains slash)", node, host)) + } + + user := baseUser + if override, ok := o.cfg.SSHNodeUsers[node]; ok { + user = strings.TrimSpace(override) + } + if user == "" { + issues = append(issues, fmt.Sprintf("%s has empty ssh user (ssh_user/ssh_node_users)", node)) + } + if strings.ContainsAny(user, " \t\r\n@") { + issues = append(issues, fmt.Sprintf("%s has invalid ssh user %q", node, user)) + } + } + + if len(issues) > 0 { + sort.Strings(issues) + return fmt.Errorf("node inventory preflight failed: %s", joinLimited(issues, 10)) + } + return nil +} + func (o *Orchestrator) tcpReachable(address string, timeout time.Duration) bool { conn, err := net.DialTimeout("tcp", address, timeout) if err != nil { @@ -1570,6 +1791,97 @@ func (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string) return fmt.Errorf("access validation had %d errors (first: %s)", len(errCh), strings.Join(samples, " | ")) } +func (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error { + if o.runner.DryRun || !o.cfg.Startup.RequireNodeSSHAuth { + return nil + } + + wait := time.Duration(o.cfg.Startup.NodeSSHAuthWaitSeconds) * time.Second + if wait <= 0 { + wait = 4 * time.Minute + } + poll := time.Duration(o.cfg.Startup.NodeSSHAuthPollSeconds) * time.Second + if poll <= 0 { + poll = 5 * time.Second + } + ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) + seen := map[string]struct{}{} + targets := make([]string, 0, len(nodes)) + for _, node := range nodes { + node = strings.TrimSpace(node) + if node == "" { + continue + } + if _, skip := ignored[node]; skip { + continue + } + if _, ok := seen[node]; ok { + continue + } + seen[node] = struct{}{} + targets = append(targets, node) + } + sort.Strings(targets) + if len(targets) == 0 { + return nil + } + + deadline := time.Now().Add(wait) + lastFailure := "unknown" + lastLogged := time.Time{} + for { + authDenied := []string{} + pending := []string{} + for _, node := range targets { + if !o.sshManaged(node) { + authDenied = append(authDenied, fmt.Sprintf("%s(not in ssh_managed_nodes)", node)) + continue + } + out, err := o.sshWithTimeout(ctx, node, "echo __ANANKE_SSH_AUTH_OK__", 12*time.Second) + if err != nil { + detail := strings.TrimSpace(err.Error()) + full := strings.ToLower(strings.TrimSpace(detail + " " + out)) + switch { + case strings.Contains(full, "permission denied"), strings.Contains(full, "publickey"), strings.Contains(full, "authentication"): + authDenied = append(authDenied, fmt.Sprintf("%s(auth denied)", node)) + default: + pending = append(pending, fmt.Sprintf("%s(unreachable)", node)) + } + continue + } + if !strings.Contains(out, "__ANANKE_SSH_AUTH_OK__") { + pending = append(pending, fmt.Sprintf("%s(unexpected output)", node)) + } + } + if len(authDenied) > 0 { + sort.Strings(authDenied) + return fmt.Errorf("ssh auth gate failed: %s", joinLimited(authDenied, 8)) + } + if len(pending) == 0 { + return nil + } + + sort.Strings(pending) + lastFailure = joinLimited(pending, 8) + if time.Now().After(deadline) { + return fmt.Errorf("node ssh auth gate did not pass within %s (%s)", wait, lastFailure) + } + if time.Since(lastLogged) >= 30*time.Second { + remaining := time.Until(deadline).Round(time.Second) + if remaining < 0 { + remaining = 0 + } + o.log.Printf("waiting for node ssh auth (%s remaining): %s", remaining, lastFailure) + lastLogged = time.Now() + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(poll): + } + } +} + func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) { out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}") if err != nil { @@ -1965,6 +2277,22 @@ type workloadList struct { Items []workloadResource `json:"items"` } +type ingressList struct { + Items []ingressResource `json:"items"` +} + +type ingressResource struct { + Metadata struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + } `json:"metadata"` + Spec struct { + Rules []struct { + Host string `json:"host"` + } `json:"rules"` + } `json:"spec"` +} + type jobList struct { Items []jobResource `json:"items"` } @@ -2136,27 +2464,145 @@ func (o *Orchestrator) waitForStartupConvergence(ctx context.Context) error { if o.runner.DryRun { return nil } - if o.cfg.Startup.RequireServiceChecklist { - if err := o.waitForServiceChecklist(ctx); err != nil { + if o.cfg.Startup.RequireIngressChecklist { + if err := o.waitForIngressChecklist(ctx); err != nil { + o.noteStartupCheck("ingress-checklist", false, err.Error()) return err } + o.noteStartupCheck("ingress-checklist", true, "all ingress hosts reachable") + } + if o.cfg.Startup.RequireServiceChecklist { + if err := o.waitForServiceChecklist(ctx); err != nil { + o.noteStartupCheck("service-checklist", false, err.Error()) + return err + } + o.noteStartupCheck("service-checklist", true, "all configured service checks passed") } if o.cfg.Startup.RequireFluxHealth { if err := o.waitForFluxHealth(ctx); err != nil { + o.noteStartupCheck("flux-health", false, err.Error()) return err } + o.noteStartupCheck("flux-health", true, "all flux kustomizations ready") } if o.cfg.Startup.RequireWorkloadConvergence { if err := o.waitForWorkloadConvergence(ctx); err != nil { + o.noteStartupCheck("workload-convergence", false, err.Error()) return err } + o.noteStartupCheck("workload-convergence", true, "controllers converged") } if err := o.waitForStabilityWindow(ctx); err != nil { + o.noteStartupCheck("stability-window", false, err.Error()) return err } + o.noteStartupCheck("stability-window", true, "startup soak passed") return nil } +func (o *Orchestrator) waitForIngressChecklist(ctx context.Context) error { + wait := time.Duration(o.cfg.Startup.IngressChecklistWaitSeconds) * time.Second + if wait <= 0 { + wait = 7 * time.Minute + } + poll := time.Duration(o.cfg.Startup.IngressChecklistPollSeconds) * time.Second + if poll <= 0 { + poll = 5 * time.Second + } + deadline := time.Now().Add(wait) + lastFailure := "unknown" + lastLogged := time.Time{} + lastRecycleAttempt := time.Time{} + lastReplicaHeal := time.Time{} + for { + o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) + o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) + + prevFailure := lastFailure + ready, detail := o.ingressChecklistReady(ctx) + lastFailure = detail + if ready { + o.log.Printf("ingress checklist passed (%s)", detail) + return nil + } + if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { + remaining := time.Until(deadline).Round(time.Second) + if remaining < 0 { + remaining = 0 + } + o.log.Printf("waiting for ingress checklist (%s remaining): %s", remaining, lastFailure) + lastLogged = time.Now() + } + if time.Now().After(deadline) { + return fmt.Errorf("startup blocked: ingress checklist not satisfied within %s (%s)", wait, lastFailure) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(poll): + } + } +} + +func (o *Orchestrator) ingressChecklistReady(ctx context.Context) (bool, string) { + hosts, err := o.discoverIngressHosts(ctx) + if err != nil { + return false, err.Error() + } + if len(hosts) == 0 { + return true, "no ingress hosts discovered" + } + accepted := o.cfg.Startup.IngressChecklistAccepted + if len(accepted) == 0 { + accepted = []int{200, 301, 302, 307, 308, 401, 403, 404} + } + for _, host := range hosts { + check := config.ServiceChecklistCheck{ + Name: "ingress-" + host, + URL: "https://" + host + "/", + AcceptedStatuses: accepted, + TimeoutSeconds: 12, + InsecureSkipTLS: o.cfg.Startup.IngressChecklistInsecureSkip, + } + ok, detail := o.serviceCheckReady(ctx, check) + if !ok { + return false, fmt.Sprintf("%s: %s", host, detail) + } + } + return true, fmt.Sprintf("hosts=%d", len(hosts)) +} + +func (o *Orchestrator) discoverIngressHosts(ctx context.Context) ([]string, error) { + out, err := o.kubectl(ctx, 25*time.Second, "get", "ingress", "-A", "-o", "json") + if err != nil { + return nil, fmt.Errorf("query ingresses: %w", err) + } + var list ingressList + if err := json.Unmarshal([]byte(out), &list); err != nil { + return nil, fmt.Errorf("decode ingresses: %w", err) + } + ignored := makeStringSet(o.cfg.Startup.IngressChecklistIgnoreHosts) + hosts := map[string]struct{}{} + for _, item := range list.Items { + for _, rule := range item.Spec.Rules { + host := strings.TrimSpace(rule.Host) + if host == "" || strings.Contains(host, "*") { + continue + } + if _, skip := ignored[host]; skip { + continue + } + hosts[host] = struct{}{} + } + } + outHosts := make([]string, 0, len(hosts)) + for host := range hosts { + outHosts = append(outHosts, host) + } + sort.Strings(outHosts) + return outHosts, nil +} + func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.ServiceChecklistWaitSeconds) * time.Second if wait <= 0 { @@ -2170,8 +2616,10 @@ func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error { lastFailure := "unknown" lastLogged := time.Time{} lastRecycleAttempt := time.Time{} + lastReplicaHeal := time.Time{} for { o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) + o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) prevFailure := lastFailure ready, detail := o.serviceChecklistReady(ctx) lastFailure = detail @@ -2323,9 +2771,11 @@ func (o *Orchestrator) waitForStabilityWindow(ctx context.Context) error { deadline := time.Now().Add(window) lastStatus := time.Time{} lastRecycleAttempt := time.Time{} + lastReplicaHeal := time.Time{} for { o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) + o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) if err := o.startupStabilityHealthy(ctx); err != nil { return fmt.Errorf("startup stability window failed: %w", err) } @@ -2374,6 +2824,12 @@ func (o *Orchestrator) startupStabilityHealthy(ctx context.Context) error { return fmt.Errorf("external services not healthy: %s", detail) } } + if o.cfg.Startup.RequireIngressChecklist { + ready, detail := o.ingressChecklistReady(ctx) + if !ready { + return fmt.Errorf("ingress reachability not healthy: %s", detail) + } + } failures, err := o.startupFailurePods(ctx) if err != nil { return fmt.Errorf("pod failure check error: %w", err) @@ -2398,8 +2854,10 @@ func (o *Orchestrator) waitForFluxHealth(ctx context.Context) error { lastLogged := time.Time{} lastImmutableHealAttempt := time.Time{} lastRecycleAttempt := time.Time{} + lastReplicaHeal := time.Time{} for { o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) + o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) prevFailure := lastFailure ready, detail, err := o.fluxHealthReady(ctx) if err != nil { @@ -2418,6 +2876,7 @@ func (o *Orchestrator) waitForFluxHealth(ctx context.Context) error { o.log.Printf("warning: immutable-job self-heal attempt failed: %v", healErr) } else if healed { o.log.Printf("detected immutable-job failure and removed stale failed job(s); re-requesting reconcile") + o.noteStartupAutoHeal("deleted stale failed flux-managed job(s) after immutable template error") o.bestEffort("reconcile flux after immutable-job cleanup", func() error { return o.resumeFluxAndReconcile(ctx) }) } } @@ -2573,9 +3032,11 @@ func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error { lastFailure := "unknown" lastLogged := time.Time{} lastRecycleAttempt := time.Time{} + lastReplicaHeal := time.Time{} for { prevFailure := lastFailure o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) + o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) ready, detail, err := o.workloadConvergenceReady(ctx) if err != nil { lastFailure = err.Error() @@ -2739,6 +3200,7 @@ func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error { if len(recycled) > 0 { sort.Strings(recycled) o.log.Printf("recycled stuck controller pods (%d): %s", len(recycled), joinLimited(recycled, 10)) + o.noteStartupAutoHeal(fmt.Sprintf("recycled stuck controller pods: %s", joinLimited(recycled, 10))) } return nil } @@ -2812,6 +3274,77 @@ func (o *Orchestrator) maybeAutoRecycleStuckPods(ctx context.Context, lastAttemp o.bestEffort("recycle stuck controller pods", func() error { return o.recycleStuckControllerPods(ctx) }) } +func (o *Orchestrator) maybeAutoHealCriticalWorkloadReplicas(ctx context.Context, lastAttempt *time.Time) { + if o.runner.DryRun { + return + } + now := time.Now() + if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second { + return + } + if lastAttempt != nil { + *lastAttempt = now + } + healed, err := o.healCriticalWorkloadReplicas(ctx) + if err != nil { + o.log.Printf("warning: critical workload replica auto-heal failed: %v", err) + return + } + if len(healed) == 0 { + return + } + sort.Strings(healed) + detail := fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8)) + o.log.Printf("%s", detail) + o.noteStartupAutoHeal(detail) +} + +func (o *Orchestrator) healCriticalWorkloadReplicas(ctx context.Context) ([]string, error) { + out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json") + if err != nil { + return nil, fmt.Errorf("query workloads: %w", err) + } + var list workloadList + if err := json.Unmarshal([]byte(out), &list); err != nil { + return nil, fmt.Errorf("decode workloads: %w", err) + } + current := map[string]int32{} + for _, item := range list.Items { + kind := strings.ToLower(strings.TrimSpace(item.Kind)) + ns := strings.TrimSpace(item.Metadata.Namespace) + name := strings.TrimSpace(item.Metadata.Name) + if kind == "" || ns == "" || name == "" { + continue + } + if kind != "deployment" && kind != "statefulset" { + continue + } + desired := int32(1) + if item.Spec.Replicas != nil { + desired = *item.Spec.Replicas + } + key := ns + "/" + kind + "/" + name + current[key] = desired + } + + healed := []string{} + for _, w := range criticalStartupWorkloads { + key := w.Namespace + "/" + strings.ToLower(w.Kind) + "/" + w.Name + desired, ok := current[key] + if !ok || desired >= 1 { + continue + } + if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil { + if isNotFoundErr(err) { + continue + } + return healed, fmt.Errorf("scale %s to 1: %w", key, err) + } + healed = append(healed, key) + } + return healed, nil +} + func (o *Orchestrator) startupFailurePods(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") if err != nil { @@ -3149,6 +3682,10 @@ func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args } func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) { + return o.sshWithTimeout(ctx, node, command, 45*time.Second) +} + +func (o *Orchestrator) sshWithTimeout(ctx context.Context, node string, command string, timeout time.Duration) (string, error) { host := node if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" { host = strings.TrimSpace(mapped) @@ -3206,7 +3743,7 @@ func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (st var lastOut string var lastErr error for i, args := range attempts { - out, err := o.run(ctx, 45*time.Second, "ssh", args...) + out, err := o.run(ctx, timeout, "ssh", args...) if err == nil { if i > 0 { o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i]) @@ -3216,7 +3753,7 @@ func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (st if sshutil.ShouldAttemptKnownHostsRepair(out, err) { o.log.Printf("warning: ssh failure on %s via %s path may be host-key related; repairing known_hosts and retrying once", node, attemptNames[i]) sshutil.RepairKnownHosts(ctx, o.log, knownHostsFiles, repairHosts, o.cfg.SSHPort) - retryOut, retryErr := o.run(ctx, 45*time.Second, "ssh", args...) + retryOut, retryErr := o.run(ctx, timeout, "ssh", args...) if retryErr == nil { return retryOut, nil } diff --git a/internal/cluster/orchestrator_test.go b/internal/cluster/orchestrator_test.go index b4422c9..f842f4a 100644 --- a/internal/cluster/orchestrator_test.go +++ b/internal/cluster/orchestrator_test.go @@ -7,6 +7,7 @@ import ( "net/http/httptest" "os" "reflect" + "strings" "testing" "time" @@ -264,3 +265,70 @@ func TestStuckVaultInitReasonIgnoresFreshOrNonVaultPods(t *testing.T) { t.Fatalf("expected no reason for non-vault pod, got %q", reason) } } + +func TestValidateNodeInventoryPassesForStrictMappings(t *testing.T) { + orch := &Orchestrator{ + cfg: config.Config{ + SSHUser: "atlas", + SSHPort: 2277, + SSHNodeHosts: map[string]string{ + "titan-0a": "192.168.22.11", + "titan-0b": "192.168.22.12", + "titan-0c": "192.168.22.13", + "titan-22": "192.168.22.22", + }, + SSHManagedNodes: []string{"titan-0a", "titan-0b", "titan-0c", "titan-22"}, + ControlPlanes: []string{"titan-0a", "titan-0b", "titan-0c"}, + Workers: []string{"titan-22"}, + }, + log: log.New(os.Stdout, "", 0), + } + if err := orch.validateNodeInventory(); err != nil { + t.Fatalf("expected inventory to pass, got error: %v", err) + } +} + +func TestValidateNodeInventoryFailsWhenNodeMappingMissing(t *testing.T) { + orch := &Orchestrator{ + cfg: config.Config{ + SSHUser: "atlas", + SSHPort: 2277, + SSHNodeHosts: map[string]string{"titan-0a": "192.168.22.11"}, + SSHManagedNodes: []string{"titan-0a", "titan-0b"}, + ControlPlanes: []string{"titan-0a"}, + Workers: []string{"titan-0b"}, + }, + log: log.New(os.Stdout, "", 0), + } + err := orch.validateNodeInventory() + if err == nil { + t.Fatalf("expected inventory error for missing mapping") + } + if !strings.Contains(err.Error(), "missing ssh_node_hosts entry") { + t.Fatalf("expected missing-mapping detail, got: %v", err) + } +} + +func TestValidateNodeInventoryFailsWhenWorkerNotManaged(t *testing.T) { + orch := &Orchestrator{ + cfg: config.Config{ + SSHUser: "atlas", + SSHPort: 2277, + SSHNodeHosts: map[string]string{ + "titan-0a": "192.168.22.11", + "titan-22": "192.168.22.22", + }, + SSHManagedNodes: []string{"titan-0a"}, + ControlPlanes: []string{"titan-0a"}, + Workers: []string{"titan-22"}, + }, + log: log.New(os.Stdout, "", 0), + } + err := orch.validateNodeInventory() + if err == nil { + t.Fatalf("expected inventory error for unmanaged worker") + } + if !strings.Contains(err.Error(), "missing from ssh_managed_nodes") { + t.Fatalf("expected unmanaged-worker detail, got: %v", err) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index a347d6f..77e8b62 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -63,6 +63,15 @@ type Startup struct { ServiceChecklistPollSeconds int `yaml:"service_checklist_poll_seconds"` ServiceChecklistStabilitySec int `yaml:"service_checklist_stability_seconds"` ServiceChecklist []ServiceChecklistCheck `yaml:"service_checklist"` + RequireIngressChecklist bool `yaml:"require_ingress_checklist"` + IngressChecklistWaitSeconds int `yaml:"ingress_checklist_wait_seconds"` + IngressChecklistPollSeconds int `yaml:"ingress_checklist_poll_seconds"` + IngressChecklistAccepted []int `yaml:"ingress_checklist_accepted_statuses"` + IngressChecklistIgnoreHosts []string `yaml:"ingress_checklist_ignore_hosts"` + IngressChecklistInsecureSkip bool `yaml:"ingress_checklist_insecure_skip_tls"` + RequireNodeSSHAuth bool `yaml:"require_node_ssh_auth"` + NodeSSHAuthWaitSeconds int `yaml:"node_ssh_auth_wait_seconds"` + NodeSSHAuthPollSeconds int `yaml:"node_ssh_auth_poll_seconds"` RequireFluxHealth bool `yaml:"require_flux_health"` FluxHealthWaitSeconds int `yaml:"flux_health_wait_seconds"` FluxHealthPollSeconds int `yaml:"flux_health_poll_seconds"` @@ -309,6 +318,28 @@ func (c Config) Validate() error { } } } + if c.Startup.IngressChecklistWaitSeconds <= 0 { + return fmt.Errorf("config.startup.ingress_checklist_wait_seconds must be > 0") + } + if c.Startup.IngressChecklistPollSeconds <= 0 { + return fmt.Errorf("config.startup.ingress_checklist_poll_seconds must be > 0") + } + for _, code := range c.Startup.IngressChecklistAccepted { + if code < 100 || code > 599 { + return fmt.Errorf("config.startup.ingress_checklist_accepted_statuses contains invalid HTTP code %d", code) + } + } + for _, host := range c.Startup.IngressChecklistIgnoreHosts { + if strings.TrimSpace(host) == "" { + return fmt.Errorf("config.startup.ingress_checklist_ignore_hosts entries must not be empty") + } + } + if c.Startup.NodeSSHAuthWaitSeconds <= 0 { + return fmt.Errorf("config.startup.node_ssh_auth_wait_seconds must be > 0") + } + if c.Startup.NodeSSHAuthPollSeconds <= 0 { + return fmt.Errorf("config.startup.node_ssh_auth_poll_seconds must be > 0") + } if c.Startup.FluxHealthWaitSeconds <= 0 { return fmt.Errorf("config.startup.flux_health_wait_seconds must be > 0") } @@ -513,6 +544,14 @@ func defaults() Config { TimeoutSeconds: 12, }, }, + RequireIngressChecklist: true, + IngressChecklistWaitSeconds: 420, + IngressChecklistPollSeconds: 5, + IngressChecklistAccepted: []int{200, 301, 302, 307, 308, 401, 403, 404}, + IngressChecklistIgnoreHosts: []string{}, + RequireNodeSSHAuth: true, + NodeSSHAuthWaitSeconds: 240, + NodeSSHAuthPollSeconds: 5, RequireFluxHealth: true, FluxHealthWaitSeconds: 900, FluxHealthPollSeconds: 5, @@ -684,6 +723,24 @@ func (c *Config) applyDefaults() { c.Startup.ServiceChecklist[i].TimeoutSeconds = 12 } } + if c.Startup.IngressChecklistWaitSeconds <= 0 { + c.Startup.IngressChecklistWaitSeconds = 420 + } + if c.Startup.IngressChecklistPollSeconds <= 0 { + c.Startup.IngressChecklistPollSeconds = 5 + } + if len(c.Startup.IngressChecklistAccepted) == 0 { + c.Startup.IngressChecklistAccepted = []int{200, 301, 302, 307, 308, 401, 403, 404} + } + if c.Startup.IngressChecklistIgnoreHosts == nil { + c.Startup.IngressChecklistIgnoreHosts = []string{} + } + if c.Startup.NodeSSHAuthWaitSeconds <= 0 { + c.Startup.NodeSSHAuthWaitSeconds = 240 + } + if c.Startup.NodeSSHAuthPollSeconds <= 0 { + c.Startup.NodeSSHAuthPollSeconds = 5 + } if c.Startup.FluxHealthWaitSeconds <= 0 { c.Startup.FluxHealthWaitSeconds = 900 }