startup: add strict preflight, ssh auth gate, ingress checks, and startup report

This commit is contained in:
Brad Stein 2026-04-07 22:40:15 -03:00
parent 1f54cd3d46
commit c7d7407008
7 changed files with 705 additions and 8 deletions

View File

@ -25,12 +25,18 @@ Vault unseal breakglass is wired for remote retrieval (magic mirror host). If lo
## What “startup complete” means now
Ananke does **not** stop at “Flux says Ready”. Startup only completes when all configured gates pass:
- node inventory preflight passes (host mapping + ssh user + port sanity)
- node SSH auth gate passes (real command execution, not just TCP)
- Flux source drift guard passes (`expected_flux_source_url` + branch expectation)
- Flux kustomizations are healthy
- controller convergence is healthy (deployments/statefulsets/daemonsets)
- ingress checklist passes (all discovered ingress hosts reachable with accepted status)
- external service checklist passes (Gitea, Grafana, Keycloak OIDC, Harbor registry auth challenge, Longhorn auth redirect)
- stability soak window passes (no regressions, no CrashLoop/ImagePull failures)
During startup, Ananke also auto-heals known failure patterns (stuck controller pods, immutable Flux Jobs, critical workloads scaled to zero) and writes a report:
- `/var/lib/ananke/last-startup-report.json`
If any gate fails, startup is blocked with a concrete reason.
## Command quick sheet
@ -76,6 +82,8 @@ Primary path:
Core settings to keep accurate:
- `expected_flux_branch`
- `expected_flux_source_url`
- `startup.require_node_ssh_auth`
- `startup.require_ingress_checklist`
- `startup.service_checklist`
- `startup.service_checklist_stability_seconds`
- `startup.ignore_unavailable_nodes` (for planned temporary node outages)

View File

@ -103,6 +103,15 @@ startup:
url: https://longhorn.bstein.dev/
accepted_statuses: [200, 302]
timeout_seconds: 12
require_ingress_checklist: true
ingress_checklist_wait_seconds: 420
ingress_checklist_poll_seconds: 5
ingress_checklist_accepted_statuses: [200, 301, 302, 307, 308, 401, 403, 404]
ingress_checklist_ignore_hosts: []
ingress_checklist_insecure_skip_tls: false
require_node_ssh_auth: true
node_ssh_auth_wait_seconds: 240
node_ssh_auth_poll_seconds: 5
require_flux_health: true
flux_health_wait_seconds: 900
flux_health_poll_seconds: 5

View File

@ -169,6 +169,15 @@ startup:
url: https://longhorn.bstein.dev/
accepted_statuses: [200, 302]
timeout_seconds: 12
require_ingress_checklist: true
ingress_checklist_wait_seconds: 420
ingress_checklist_poll_seconds: 5
ingress_checklist_accepted_statuses: [200, 301, 302, 307, 308, 401, 403, 404]
ingress_checklist_ignore_hosts: []
ingress_checklist_insecure_skip_tls: false
require_node_ssh_auth: true
node_ssh_auth_wait_seconds: 240
node_ssh_auth_poll_seconds: 5
require_flux_health: true
flux_health_wait_seconds: 900
flux_health_poll_seconds: 5

View File

@ -169,6 +169,15 @@ startup:
url: https://longhorn.bstein.dev/
accepted_statuses: [200, 302]
timeout_seconds: 12
require_ingress_checklist: true
ingress_checklist_wait_seconds: 420
ingress_checklist_poll_seconds: 5
ingress_checklist_accepted_statuses: [200, 301, 302, 307, 308, 401, 403, 404]
ingress_checklist_ignore_hosts: []
ingress_checklist_insecure_skip_tls: false
require_node_ssh_auth: true
node_ssh_auth_wait_seconds: 240
node_ssh_auth_poll_seconds: 5
require_flux_health: true
flux_health_wait_seconds: 900
flux_health_poll_seconds: 5

View File

@ -30,10 +30,12 @@ import (
)
type Orchestrator struct {
cfg config.Config
runner *execx.Runner
store *state.Store
log *log.Logger
cfg config.Config
runner *execx.Runner
store *state.Store
log *log.Logger
startupReportMu sync.Mutex
activeStartupReport *startupReport
}
type StartupOptions struct {
@ -77,6 +79,23 @@ type workloadScaleSnapshot struct {
Entries []workloadScaleEntry `json:"entries"`
}
type startupReport struct {
StartedAt time.Time `json:"started_at"`
Completed time.Time `json:"completed_at"`
Reason string `json:"reason"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Checks map[string]startupCheckRecord `json:"checks"`
AutoHeals []string `json:"auto_heals"`
SourceHost string `json:"source_host"`
}
type startupCheckRecord struct {
Status string `json:"status"`
Detail string `json:"detail"`
UpdatedAt time.Time `json:"updated_at"`
}
var datastoreEndpointPattern = regexp.MustCompile(`--datastore-endpoint(?:=|\s+)(?:'([^']+)'|"([^"]+)"|([^\s\\]+))`)
var criticalStartupWorkloads = []startupWorkload{
@ -87,6 +106,8 @@ var criticalStartupWorkloads = []startupWorkload{
{Namespace: "vault", Kind: "statefulset", Name: "vault"},
{Namespace: "postgres", Kind: "statefulset", Name: "postgres"},
{Namespace: "gitea", Kind: "deployment", Name: "gitea"},
{Namespace: "monitoring", Kind: "deployment", Name: "grafana"},
{Namespace: "monitoring", Kind: "deployment", Name: "kube-state-metrics"},
}
var ErrEtcdRestoreNotApplicable = errors.New("etcd restore not applicable")
@ -101,6 +122,8 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
return err
}
defer unlock()
o.beginStartupReport(opts.Reason)
defer o.finalizeStartupReport(err)
record := state.RunRecord{
ID: fmt.Sprintf("startup-%d", time.Now().UnixNano()),
@ -110,6 +133,11 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
StartedAt: time.Now().UTC(),
}
defer o.finalizeRecord(&record, &err)
if invErr := o.validateNodeInventory(); invErr != nil {
o.noteStartupCheck("node-inventory", false, invErr.Error())
return invErr
}
o.noteStartupCheck("node-inventory", true, "inventory/user/port validation passed")
resumedFlux := false
defer func() {
if o.runner.DryRun || err == nil || resumedFlux {
@ -258,6 +286,13 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
}
o.startWorkers(ctx, workers)
o.bestEffort("uncordon workers", func() error { return o.uncordonWorkers(ctx, workers) })
sshCheckNodes := append([]string{}, o.cfg.ControlPlanes...)
sshCheckNodes = append(sshCheckNodes, workers...)
if err := o.waitForNodeSSHAuth(ctx, sshCheckNodes); err != nil {
o.noteStartupCheck("node-ssh-auth", false, err.Error())
return err
}
o.noteStartupCheck("node-ssh-auth", true, fmt.Sprintf("nodes=%d", len(sshCheckNodes)))
needsLocalBootstrap := false
bootstrapReasons := []string{}
@ -283,8 +318,10 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
}
if o.cfg.Startup.RequireStorageReady {
if err := o.waitForStorageReady(ctx); err != nil {
o.noteStartupCheck("storage-readiness", false, err.Error())
return err
}
o.noteStartupCheck("storage-readiness", true, "longhorn and critical PVCs ready")
}
if err := o.ensureCriticalStartupWorkloads(ctx); err != nil {
return err
@ -322,10 +359,23 @@ func (o *Orchestrator) Startup(ctx context.Context, opts StartupOptions) (err er
return err
}
resumedFlux = true
o.bestEffort("heal critical zero-replica workloads", func() error {
healed, healErr := o.healCriticalWorkloadReplicas(ctx)
if healErr != nil {
return healErr
}
if len(healed) > 0 {
sort.Strings(healed)
o.noteStartupAutoHeal(fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8)))
}
return nil
})
if o.cfg.Startup.RequirePostStartProbes {
if err := o.waitForPostStartProbes(ctx); err != nil {
o.noteStartupCheck("post-start-probes", false, err.Error())
return err
}
o.noteStartupCheck("post-start-probes", true, "post-start probes passed")
}
if err := o.waitForStartupConvergence(ctx); err != nil {
return err
@ -419,6 +469,9 @@ func (o *Orchestrator) Shutdown(ctx context.Context, opts ShutdownOptions) (err
return err
}
defer unlock()
if invErr := o.validateNodeInventory(); invErr != nil {
return invErr
}
record := state.RunRecord{
ID: fmt.Sprintf("shutdown-%d", time.Now().UnixNano()),
@ -520,6 +573,79 @@ func (o *Orchestrator) finalizeRecord(record *state.RunRecord, err *error) {
}
}
func (o *Orchestrator) startupReportPath() string {
return filepath.Join(o.cfg.State.Dir, "last-startup-report.json")
}
func (o *Orchestrator) beginStartupReport(reason string) {
host, _ := os.Hostname()
o.startupReportMu.Lock()
defer o.startupReportMu.Unlock()
o.activeStartupReport = &startupReport{
StartedAt: time.Now().UTC(),
Reason: strings.TrimSpace(reason),
Checks: map[string]startupCheckRecord{},
AutoHeals: []string{},
SourceHost: strings.TrimSpace(host),
}
}
func (o *Orchestrator) noteStartupCheck(name string, success bool, detail string) {
o.startupReportMu.Lock()
defer o.startupReportMu.Unlock()
if o.activeStartupReport == nil {
return
}
status := "failed"
if success {
status = "passed"
}
o.activeStartupReport.Checks[strings.TrimSpace(name)] = startupCheckRecord{
Status: status,
Detail: strings.TrimSpace(detail),
UpdatedAt: time.Now().UTC(),
}
}
func (o *Orchestrator) noteStartupAutoHeal(detail string) {
o.startupReportMu.Lock()
defer o.startupReportMu.Unlock()
if o.activeStartupReport == nil {
return
}
detail = strings.TrimSpace(detail)
if detail == "" {
return
}
o.activeStartupReport.AutoHeals = append(o.activeStartupReport.AutoHeals, detail)
}
func (o *Orchestrator) finalizeStartupReport(runErr error) {
o.startupReportMu.Lock()
report := o.activeStartupReport
o.activeStartupReport = nil
o.startupReportMu.Unlock()
if report == nil {
return
}
report.Completed = time.Now().UTC()
report.Success = runErr == nil
if runErr != nil {
report.Error = strings.TrimSpace(runErr.Error())
}
b, err := json.MarshalIndent(report, "", " ")
if err != nil {
o.log.Printf("warning: encode startup report failed: %v", err)
return
}
path := o.startupReportPath()
if writeErr := os.WriteFile(path, b, 0o640); writeErr != nil {
o.log.Printf("warning: write startup report %s failed: %v", path, writeErr)
return
}
o.log.Printf("startup report written: %s", path)
}
func (o *Orchestrator) effectiveWorkers(ctx context.Context) ([]string, error) {
if len(o.cfg.Workers) > 0 {
return append([]string{}, o.cfg.Workers...), nil
@ -1517,6 +1643,101 @@ func (o *Orchestrator) nodeNameForHost(host string) string {
return ""
}
func (o *Orchestrator) inventoryNodesForValidation() []string {
set := map[string]struct{}{}
add := func(node string) {
node = strings.TrimSpace(node)
if node == "" {
return
}
set[node] = struct{}{}
}
for _, n := range o.cfg.ControlPlanes {
add(n)
}
for _, n := range o.cfg.Workers {
add(n)
}
for _, n := range o.cfg.SSHManagedNodes {
add(n)
}
for _, n := range o.cfg.Shutdown.ExtraPoweroffHosts {
add(n)
}
for _, n := range o.cfg.Coordination.PeerHosts {
add(n)
}
add(o.cfg.Coordination.ForwardShutdownHost)
nodes := make([]string, 0, len(set))
for n := range set {
nodes = append(nodes, n)
}
sort.Strings(nodes)
return nodes
}
func (o *Orchestrator) validateNodeInventory() error {
issues := []string{}
if o.cfg.SSHPort <= 0 || o.cfg.SSHPort > 65535 {
issues = append(issues, fmt.Sprintf("ssh_port=%d is invalid", o.cfg.SSHPort))
}
managed := makeStringSet(o.cfg.SSHManagedNodes)
for _, cp := range o.cfg.ControlPlanes {
cp = strings.TrimSpace(cp)
if cp == "" {
continue
}
if _, ok := managed[cp]; !ok {
issues = append(issues, fmt.Sprintf("control plane %s is missing from ssh_managed_nodes", cp))
}
}
for _, node := range o.cfg.Workers {
node = strings.TrimSpace(node)
if node == "" {
continue
}
if _, ok := managed[node]; !ok {
issues = append(issues, fmt.Sprintf("worker %s is missing from ssh_managed_nodes", node))
}
}
baseUser := strings.TrimSpace(o.cfg.SSHUser)
for _, node := range o.inventoryNodesForValidation() {
if _, ok := o.cfg.SSHNodeHosts[node]; !ok {
issues = append(issues, fmt.Sprintf("%s is missing ssh_node_hosts entry", node))
}
host := strings.TrimSpace(o.cfg.SSHNodeHosts[node])
if host == "" {
host = node
}
if strings.ContainsAny(host, " \t\r\n") {
issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains whitespace)", node, host))
}
if strings.Contains(host, "/") {
issues = append(issues, fmt.Sprintf("%s has invalid ssh host %q (contains slash)", node, host))
}
user := baseUser
if override, ok := o.cfg.SSHNodeUsers[node]; ok {
user = strings.TrimSpace(override)
}
if user == "" {
issues = append(issues, fmt.Sprintf("%s has empty ssh user (ssh_user/ssh_node_users)", node))
}
if strings.ContainsAny(user, " \t\r\n@") {
issues = append(issues, fmt.Sprintf("%s has invalid ssh user %q", node, user))
}
}
if len(issues) > 0 {
sort.Strings(issues)
return fmt.Errorf("node inventory preflight failed: %s", joinLimited(issues, 10))
}
return nil
}
func (o *Orchestrator) tcpReachable(address string, timeout time.Duration) bool {
conn, err := net.DialTimeout("tcp", address, timeout)
if err != nil {
@ -1570,6 +1791,97 @@ func (o *Orchestrator) reconcileNodeAccess(ctx context.Context, nodes []string)
return fmt.Errorf("access validation had %d errors (first: %s)", len(errCh), strings.Join(samples, " | "))
}
func (o *Orchestrator) waitForNodeSSHAuth(ctx context.Context, nodes []string) error {
if o.runner.DryRun || !o.cfg.Startup.RequireNodeSSHAuth {
return nil
}
wait := time.Duration(o.cfg.Startup.NodeSSHAuthWaitSeconds) * time.Second
if wait <= 0 {
wait = 4 * time.Minute
}
poll := time.Duration(o.cfg.Startup.NodeSSHAuthPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
ignored := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes)
seen := map[string]struct{}{}
targets := make([]string, 0, len(nodes))
for _, node := range nodes {
node = strings.TrimSpace(node)
if node == "" {
continue
}
if _, skip := ignored[node]; skip {
continue
}
if _, ok := seen[node]; ok {
continue
}
seen[node] = struct{}{}
targets = append(targets, node)
}
sort.Strings(targets)
if len(targets) == 0 {
return nil
}
deadline := time.Now().Add(wait)
lastFailure := "unknown"
lastLogged := time.Time{}
for {
authDenied := []string{}
pending := []string{}
for _, node := range targets {
if !o.sshManaged(node) {
authDenied = append(authDenied, fmt.Sprintf("%s(not in ssh_managed_nodes)", node))
continue
}
out, err := o.sshWithTimeout(ctx, node, "echo __ANANKE_SSH_AUTH_OK__", 12*time.Second)
if err != nil {
detail := strings.TrimSpace(err.Error())
full := strings.ToLower(strings.TrimSpace(detail + " " + out))
switch {
case strings.Contains(full, "permission denied"), strings.Contains(full, "publickey"), strings.Contains(full, "authentication"):
authDenied = append(authDenied, fmt.Sprintf("%s(auth denied)", node))
default:
pending = append(pending, fmt.Sprintf("%s(unreachable)", node))
}
continue
}
if !strings.Contains(out, "__ANANKE_SSH_AUTH_OK__") {
pending = append(pending, fmt.Sprintf("%s(unexpected output)", node))
}
}
if len(authDenied) > 0 {
sort.Strings(authDenied)
return fmt.Errorf("ssh auth gate failed: %s", joinLimited(authDenied, 8))
}
if len(pending) == 0 {
return nil
}
sort.Strings(pending)
lastFailure = joinLimited(pending, 8)
if time.Now().After(deadline) {
return fmt.Errorf("node ssh auth gate did not pass within %s (%s)", wait, lastFailure)
}
if time.Since(lastLogged) >= 30*time.Second {
remaining := time.Until(deadline).Round(time.Second)
if remaining < 0 {
remaining = 0
}
o.log.Printf("waiting for node ssh auth (%s remaining): %s", remaining, lastFailure)
lastLogged = time.Now()
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
func (o *Orchestrator) fluxSourceReady(ctx context.Context) (bool, error) {
out, err := o.kubectl(ctx, 10*time.Second, "-n", "flux-system", "get", "gitrepository", "flux-system", "-o", "jsonpath={.status.conditions[?(@.type==\"Ready\")].status}")
if err != nil {
@ -1965,6 +2277,22 @@ type workloadList struct {
Items []workloadResource `json:"items"`
}
type ingressList struct {
Items []ingressResource `json:"items"`
}
type ingressResource struct {
Metadata struct {
Namespace string `json:"namespace"`
Name string `json:"name"`
} `json:"metadata"`
Spec struct {
Rules []struct {
Host string `json:"host"`
} `json:"rules"`
} `json:"spec"`
}
type jobList struct {
Items []jobResource `json:"items"`
}
@ -2136,27 +2464,145 @@ func (o *Orchestrator) waitForStartupConvergence(ctx context.Context) error {
if o.runner.DryRun {
return nil
}
if o.cfg.Startup.RequireServiceChecklist {
if err := o.waitForServiceChecklist(ctx); err != nil {
if o.cfg.Startup.RequireIngressChecklist {
if err := o.waitForIngressChecklist(ctx); err != nil {
o.noteStartupCheck("ingress-checklist", false, err.Error())
return err
}
o.noteStartupCheck("ingress-checklist", true, "all ingress hosts reachable")
}
if o.cfg.Startup.RequireServiceChecklist {
if err := o.waitForServiceChecklist(ctx); err != nil {
o.noteStartupCheck("service-checklist", false, err.Error())
return err
}
o.noteStartupCheck("service-checklist", true, "all configured service checks passed")
}
if o.cfg.Startup.RequireFluxHealth {
if err := o.waitForFluxHealth(ctx); err != nil {
o.noteStartupCheck("flux-health", false, err.Error())
return err
}
o.noteStartupCheck("flux-health", true, "all flux kustomizations ready")
}
if o.cfg.Startup.RequireWorkloadConvergence {
if err := o.waitForWorkloadConvergence(ctx); err != nil {
o.noteStartupCheck("workload-convergence", false, err.Error())
return err
}
o.noteStartupCheck("workload-convergence", true, "controllers converged")
}
if err := o.waitForStabilityWindow(ctx); err != nil {
o.noteStartupCheck("stability-window", false, err.Error())
return err
}
o.noteStartupCheck("stability-window", true, "startup soak passed")
return nil
}
func (o *Orchestrator) waitForIngressChecklist(ctx context.Context) error {
wait := time.Duration(o.cfg.Startup.IngressChecklistWaitSeconds) * time.Second
if wait <= 0 {
wait = 7 * time.Minute
}
poll := time.Duration(o.cfg.Startup.IngressChecklistPollSeconds) * time.Second
if poll <= 0 {
poll = 5 * time.Second
}
deadline := time.Now().Add(wait)
lastFailure := "unknown"
lastLogged := time.Time{}
lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{}
for {
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
prevFailure := lastFailure
ready, detail := o.ingressChecklistReady(ctx)
lastFailure = detail
if ready {
o.log.Printf("ingress checklist passed (%s)", detail)
return nil
}
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
remaining := time.Until(deadline).Round(time.Second)
if remaining < 0 {
remaining = 0
}
o.log.Printf("waiting for ingress checklist (%s remaining): %s", remaining, lastFailure)
lastLogged = time.Now()
}
if time.Now().After(deadline) {
return fmt.Errorf("startup blocked: ingress checklist not satisfied within %s (%s)", wait, lastFailure)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(poll):
}
}
}
func (o *Orchestrator) ingressChecklistReady(ctx context.Context) (bool, string) {
hosts, err := o.discoverIngressHosts(ctx)
if err != nil {
return false, err.Error()
}
if len(hosts) == 0 {
return true, "no ingress hosts discovered"
}
accepted := o.cfg.Startup.IngressChecklistAccepted
if len(accepted) == 0 {
accepted = []int{200, 301, 302, 307, 308, 401, 403, 404}
}
for _, host := range hosts {
check := config.ServiceChecklistCheck{
Name: "ingress-" + host,
URL: "https://" + host + "/",
AcceptedStatuses: accepted,
TimeoutSeconds: 12,
InsecureSkipTLS: o.cfg.Startup.IngressChecklistInsecureSkip,
}
ok, detail := o.serviceCheckReady(ctx, check)
if !ok {
return false, fmt.Sprintf("%s: %s", host, detail)
}
}
return true, fmt.Sprintf("hosts=%d", len(hosts))
}
func (o *Orchestrator) discoverIngressHosts(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 25*time.Second, "get", "ingress", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query ingresses: %w", err)
}
var list ingressList
if err := json.Unmarshal([]byte(out), &list); err != nil {
return nil, fmt.Errorf("decode ingresses: %w", err)
}
ignored := makeStringSet(o.cfg.Startup.IngressChecklistIgnoreHosts)
hosts := map[string]struct{}{}
for _, item := range list.Items {
for _, rule := range item.Spec.Rules {
host := strings.TrimSpace(rule.Host)
if host == "" || strings.Contains(host, "*") {
continue
}
if _, skip := ignored[host]; skip {
continue
}
hosts[host] = struct{}{}
}
}
outHosts := make([]string, 0, len(hosts))
for host := range hosts {
outHosts = append(outHosts, host)
}
sort.Strings(outHosts)
return outHosts, nil
}
func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error {
wait := time.Duration(o.cfg.Startup.ServiceChecklistWaitSeconds) * time.Second
if wait <= 0 {
@ -2170,8 +2616,10 @@ func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error {
lastFailure := "unknown"
lastLogged := time.Time{}
lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{}
for {
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
prevFailure := lastFailure
ready, detail := o.serviceChecklistReady(ctx)
lastFailure = detail
@ -2323,9 +2771,11 @@ func (o *Orchestrator) waitForStabilityWindow(ctx context.Context) error {
deadline := time.Now().Add(window)
lastStatus := time.Time{}
lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{}
for {
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
if err := o.startupStabilityHealthy(ctx); err != nil {
return fmt.Errorf("startup stability window failed: %w", err)
}
@ -2374,6 +2824,12 @@ func (o *Orchestrator) startupStabilityHealthy(ctx context.Context) error {
return fmt.Errorf("external services not healthy: %s", detail)
}
}
if o.cfg.Startup.RequireIngressChecklist {
ready, detail := o.ingressChecklistReady(ctx)
if !ready {
return fmt.Errorf("ingress reachability not healthy: %s", detail)
}
}
failures, err := o.startupFailurePods(ctx)
if err != nil {
return fmt.Errorf("pod failure check error: %w", err)
@ -2398,8 +2854,10 @@ func (o *Orchestrator) waitForFluxHealth(ctx context.Context) error {
lastLogged := time.Time{}
lastImmutableHealAttempt := time.Time{}
lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{}
for {
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
prevFailure := lastFailure
ready, detail, err := o.fluxHealthReady(ctx)
if err != nil {
@ -2418,6 +2876,7 @@ func (o *Orchestrator) waitForFluxHealth(ctx context.Context) error {
o.log.Printf("warning: immutable-job self-heal attempt failed: %v", healErr)
} else if healed {
o.log.Printf("detected immutable-job failure and removed stale failed job(s); re-requesting reconcile")
o.noteStartupAutoHeal("deleted stale failed flux-managed job(s) after immutable template error")
o.bestEffort("reconcile flux after immutable-job cleanup", func() error { return o.resumeFluxAndReconcile(ctx) })
}
}
@ -2573,9 +3032,11 @@ func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error {
lastFailure := "unknown"
lastLogged := time.Time{}
lastRecycleAttempt := time.Time{}
lastReplicaHeal := time.Time{}
for {
prevFailure := lastFailure
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
ready, detail, err := o.workloadConvergenceReady(ctx)
if err != nil {
lastFailure = err.Error()
@ -2739,6 +3200,7 @@ func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error {
if len(recycled) > 0 {
sort.Strings(recycled)
o.log.Printf("recycled stuck controller pods (%d): %s", len(recycled), joinLimited(recycled, 10))
o.noteStartupAutoHeal(fmt.Sprintf("recycled stuck controller pods: %s", joinLimited(recycled, 10)))
}
return nil
}
@ -2812,6 +3274,77 @@ func (o *Orchestrator) maybeAutoRecycleStuckPods(ctx context.Context, lastAttemp
o.bestEffort("recycle stuck controller pods", func() error { return o.recycleStuckControllerPods(ctx) })
}
func (o *Orchestrator) maybeAutoHealCriticalWorkloadReplicas(ctx context.Context, lastAttempt *time.Time) {
if o.runner.DryRun {
return
}
now := time.Now()
if lastAttempt != nil && !lastAttempt.IsZero() && now.Sub(*lastAttempt) < 30*time.Second {
return
}
if lastAttempt != nil {
*lastAttempt = now
}
healed, err := o.healCriticalWorkloadReplicas(ctx)
if err != nil {
o.log.Printf("warning: critical workload replica auto-heal failed: %v", err)
return
}
if len(healed) == 0 {
return
}
sort.Strings(healed)
detail := fmt.Sprintf("restored critical workload replicas: %s", joinLimited(healed, 8))
o.log.Printf("%s", detail)
o.noteStartupAutoHeal(detail)
}
func (o *Orchestrator) healCriticalWorkloadReplicas(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json")
if err != nil {
return nil, fmt.Errorf("query workloads: %w", err)
}
var list workloadList
if err := json.Unmarshal([]byte(out), &list); err != nil {
return nil, fmt.Errorf("decode workloads: %w", err)
}
current := map[string]int32{}
for _, item := range list.Items {
kind := strings.ToLower(strings.TrimSpace(item.Kind))
ns := strings.TrimSpace(item.Metadata.Namespace)
name := strings.TrimSpace(item.Metadata.Name)
if kind == "" || ns == "" || name == "" {
continue
}
if kind != "deployment" && kind != "statefulset" {
continue
}
desired := int32(1)
if item.Spec.Replicas != nil {
desired = *item.Spec.Replicas
}
key := ns + "/" + kind + "/" + name
current[key] = desired
}
healed := []string{}
for _, w := range criticalStartupWorkloads {
key := w.Namespace + "/" + strings.ToLower(w.Kind) + "/" + w.Name
desired, ok := current[key]
if !ok || desired >= 1 {
continue
}
if err := o.ensureWorkloadReplicas(ctx, w, 1); err != nil {
if isNotFoundErr(err) {
continue
}
return healed, fmt.Errorf("scale %s to 1: %w", key, err)
}
healed = append(healed, key)
}
return healed, nil
}
func (o *Orchestrator) startupFailurePods(ctx context.Context) ([]string, error) {
out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json")
if err != nil {
@ -3149,6 +3682,10 @@ func (o *Orchestrator) kubectl(ctx context.Context, timeout time.Duration, args
}
func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (string, error) {
return o.sshWithTimeout(ctx, node, command, 45*time.Second)
}
func (o *Orchestrator) sshWithTimeout(ctx context.Context, node string, command string, timeout time.Duration) (string, error) {
host := node
if mapped, ok := o.cfg.SSHNodeHosts[node]; ok && strings.TrimSpace(mapped) != "" {
host = strings.TrimSpace(mapped)
@ -3206,7 +3743,7 @@ func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (st
var lastOut string
var lastErr error
for i, args := range attempts {
out, err := o.run(ctx, 45*time.Second, "ssh", args...)
out, err := o.run(ctx, timeout, "ssh", args...)
if err == nil {
if i > 0 {
o.log.Printf("warning: ssh %s path failed for %s, using %s path", attemptNames[i-1], node, attemptNames[i])
@ -3216,7 +3753,7 @@ func (o *Orchestrator) ssh(ctx context.Context, node string, command string) (st
if sshutil.ShouldAttemptKnownHostsRepair(out, err) {
o.log.Printf("warning: ssh failure on %s via %s path may be host-key related; repairing known_hosts and retrying once", node, attemptNames[i])
sshutil.RepairKnownHosts(ctx, o.log, knownHostsFiles, repairHosts, o.cfg.SSHPort)
retryOut, retryErr := o.run(ctx, 45*time.Second, "ssh", args...)
retryOut, retryErr := o.run(ctx, timeout, "ssh", args...)
if retryErr == nil {
return retryOut, nil
}

View File

@ -7,6 +7,7 @@ import (
"net/http/httptest"
"os"
"reflect"
"strings"
"testing"
"time"
@ -264,3 +265,70 @@ func TestStuckVaultInitReasonIgnoresFreshOrNonVaultPods(t *testing.T) {
t.Fatalf("expected no reason for non-vault pod, got %q", reason)
}
}
func TestValidateNodeInventoryPassesForStrictMappings(t *testing.T) {
orch := &Orchestrator{
cfg: config.Config{
SSHUser: "atlas",
SSHPort: 2277,
SSHNodeHosts: map[string]string{
"titan-0a": "192.168.22.11",
"titan-0b": "192.168.22.12",
"titan-0c": "192.168.22.13",
"titan-22": "192.168.22.22",
},
SSHManagedNodes: []string{"titan-0a", "titan-0b", "titan-0c", "titan-22"},
ControlPlanes: []string{"titan-0a", "titan-0b", "titan-0c"},
Workers: []string{"titan-22"},
},
log: log.New(os.Stdout, "", 0),
}
if err := orch.validateNodeInventory(); err != nil {
t.Fatalf("expected inventory to pass, got error: %v", err)
}
}
func TestValidateNodeInventoryFailsWhenNodeMappingMissing(t *testing.T) {
orch := &Orchestrator{
cfg: config.Config{
SSHUser: "atlas",
SSHPort: 2277,
SSHNodeHosts: map[string]string{"titan-0a": "192.168.22.11"},
SSHManagedNodes: []string{"titan-0a", "titan-0b"},
ControlPlanes: []string{"titan-0a"},
Workers: []string{"titan-0b"},
},
log: log.New(os.Stdout, "", 0),
}
err := orch.validateNodeInventory()
if err == nil {
t.Fatalf("expected inventory error for missing mapping")
}
if !strings.Contains(err.Error(), "missing ssh_node_hosts entry") {
t.Fatalf("expected missing-mapping detail, got: %v", err)
}
}
func TestValidateNodeInventoryFailsWhenWorkerNotManaged(t *testing.T) {
orch := &Orchestrator{
cfg: config.Config{
SSHUser: "atlas",
SSHPort: 2277,
SSHNodeHosts: map[string]string{
"titan-0a": "192.168.22.11",
"titan-22": "192.168.22.22",
},
SSHManagedNodes: []string{"titan-0a"},
ControlPlanes: []string{"titan-0a"},
Workers: []string{"titan-22"},
},
log: log.New(os.Stdout, "", 0),
}
err := orch.validateNodeInventory()
if err == nil {
t.Fatalf("expected inventory error for unmanaged worker")
}
if !strings.Contains(err.Error(), "missing from ssh_managed_nodes") {
t.Fatalf("expected unmanaged-worker detail, got: %v", err)
}
}

View File

@ -63,6 +63,15 @@ type Startup struct {
ServiceChecklistPollSeconds int `yaml:"service_checklist_poll_seconds"`
ServiceChecklistStabilitySec int `yaml:"service_checklist_stability_seconds"`
ServiceChecklist []ServiceChecklistCheck `yaml:"service_checklist"`
RequireIngressChecklist bool `yaml:"require_ingress_checklist"`
IngressChecklistWaitSeconds int `yaml:"ingress_checklist_wait_seconds"`
IngressChecklistPollSeconds int `yaml:"ingress_checklist_poll_seconds"`
IngressChecklistAccepted []int `yaml:"ingress_checklist_accepted_statuses"`
IngressChecklistIgnoreHosts []string `yaml:"ingress_checklist_ignore_hosts"`
IngressChecklistInsecureSkip bool `yaml:"ingress_checklist_insecure_skip_tls"`
RequireNodeSSHAuth bool `yaml:"require_node_ssh_auth"`
NodeSSHAuthWaitSeconds int `yaml:"node_ssh_auth_wait_seconds"`
NodeSSHAuthPollSeconds int `yaml:"node_ssh_auth_poll_seconds"`
RequireFluxHealth bool `yaml:"require_flux_health"`
FluxHealthWaitSeconds int `yaml:"flux_health_wait_seconds"`
FluxHealthPollSeconds int `yaml:"flux_health_poll_seconds"`
@ -309,6 +318,28 @@ func (c Config) Validate() error {
}
}
}
if c.Startup.IngressChecklistWaitSeconds <= 0 {
return fmt.Errorf("config.startup.ingress_checklist_wait_seconds must be > 0")
}
if c.Startup.IngressChecklistPollSeconds <= 0 {
return fmt.Errorf("config.startup.ingress_checklist_poll_seconds must be > 0")
}
for _, code := range c.Startup.IngressChecklistAccepted {
if code < 100 || code > 599 {
return fmt.Errorf("config.startup.ingress_checklist_accepted_statuses contains invalid HTTP code %d", code)
}
}
for _, host := range c.Startup.IngressChecklistIgnoreHosts {
if strings.TrimSpace(host) == "" {
return fmt.Errorf("config.startup.ingress_checklist_ignore_hosts entries must not be empty")
}
}
if c.Startup.NodeSSHAuthWaitSeconds <= 0 {
return fmt.Errorf("config.startup.node_ssh_auth_wait_seconds must be > 0")
}
if c.Startup.NodeSSHAuthPollSeconds <= 0 {
return fmt.Errorf("config.startup.node_ssh_auth_poll_seconds must be > 0")
}
if c.Startup.FluxHealthWaitSeconds <= 0 {
return fmt.Errorf("config.startup.flux_health_wait_seconds must be > 0")
}
@ -513,6 +544,14 @@ func defaults() Config {
TimeoutSeconds: 12,
},
},
RequireIngressChecklist: true,
IngressChecklistWaitSeconds: 420,
IngressChecklistPollSeconds: 5,
IngressChecklistAccepted: []int{200, 301, 302, 307, 308, 401, 403, 404},
IngressChecklistIgnoreHosts: []string{},
RequireNodeSSHAuth: true,
NodeSSHAuthWaitSeconds: 240,
NodeSSHAuthPollSeconds: 5,
RequireFluxHealth: true,
FluxHealthWaitSeconds: 900,
FluxHealthPollSeconds: 5,
@ -684,6 +723,24 @@ func (c *Config) applyDefaults() {
c.Startup.ServiceChecklist[i].TimeoutSeconds = 12
}
}
if c.Startup.IngressChecklistWaitSeconds <= 0 {
c.Startup.IngressChecklistWaitSeconds = 420
}
if c.Startup.IngressChecklistPollSeconds <= 0 {
c.Startup.IngressChecklistPollSeconds = 5
}
if len(c.Startup.IngressChecklistAccepted) == 0 {
c.Startup.IngressChecklistAccepted = []int{200, 301, 302, 307, 308, 401, 403, 404}
}
if c.Startup.IngressChecklistIgnoreHosts == nil {
c.Startup.IngressChecklistIgnoreHosts = []string{}
}
if c.Startup.NodeSSHAuthWaitSeconds <= 0 {
c.Startup.NodeSSHAuthWaitSeconds = 240
}
if c.Startup.NodeSSHAuthPollSeconds <= 0 {
c.Startup.NodeSSHAuthPollSeconds = 5
}
if c.Startup.FluxHealthWaitSeconds <= 0 {
c.Startup.FluxHealthWaitSeconds = 900
}