package cluster import ( "context" "encoding/json" "fmt" "sort" "strings" "time" ) // waitForWorkloadConvergence runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.WorkloadConvergenceWaitSeconds) * time.Second if wait <= 0 { wait = 15 * time.Minute } poll := time.Duration(o.cfg.Startup.WorkloadConvergencePollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} lastSchedulingStormHeal := time.Time{} for { prevFailure := lastFailure o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) o.maybeAutoQuarantineSchedulingStorms(ctx, &lastSchedulingStormHeal) ready, detail, err := o.workloadConvergenceReady(ctx) if err != nil { lastFailure = err.Error() } else { lastFailure = detail } if ready { o.log.Printf("workload convergence check passed (%s)", detail) return nil } if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for workload convergence (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: workload convergence not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } // workloadConvergenceReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error) { out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset,daemonset", "-A", "-o", "json") if err != nil { return false, "", fmt.Errorf("query controllers: %w", err) } var list workloadList if err := json.Unmarshal([]byte(out), &list); err != nil { return false, "", fmt.Errorf("decode controllers: %w", err) } requiredNamespaces := o.startupRequiredWorkloadNamespaces() ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) ignoredByFlux := namespaceCandidatesFromIgnoreKustomizations(o.cfg.Startup.IgnoreFluxKustomizations) pending := []string{} checked := 0 for _, item := range list.Items { kind := strings.ToLower(strings.TrimSpace(item.Kind)) ns := strings.TrimSpace(item.Metadata.Namespace) name := strings.TrimSpace(item.Metadata.Name) if kind == "" || ns == "" || name == "" { continue } if len(requiredNamespaces) > 0 { if _, ok := requiredNamespaces[ns]; !ok { continue } } if _, ok := ignoredNamespaces[ns]; ok { continue } if _, ok := ignoredByFlux[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, kind, name) { continue } if workloadTargetsIgnoredNodes(item.Spec.Template.Spec, ignoredNodes) { continue } desired, ready, ok := desiredReady(item) if !ok || desired <= 0 { continue } if kind == "daemonset" && desired > ready && len(ignoredNodes) > 0 { missing := desired - ready if missing <= int32(len(ignoredNodes)) { ready = desired } } checked++ if ready < desired { pending = append(pending, fmt.Sprintf("%s/%s/%s ready=%d desired=%d", ns, kind, name, ready, desired)) } } if len(pending) > 0 { sort.Strings(pending) return false, "not ready: " + joinLimited(pending, 8), nil } return true, fmt.Sprintf("controllers ready=%d", checked), nil } // desiredReady runs one orchestration or CLI step. // Signature: desiredReady(item workloadResource) (int32, int32, bool). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func desiredReady(item workloadResource) (int32, int32, bool) { switch strings.ToLower(strings.TrimSpace(item.Kind)) { case "deployment", "statefulset": desired := int32(1) if item.Spec.Replicas != nil { desired = *item.Spec.Replicas } return desired, item.Status.ReadyReplicas, true case "daemonset": return item.Status.DesiredNumberScheduled, item.Status.NumberReady, true default: return 0, 0, false } } // recycleStuckControllerPods runs one orchestration or CLI step. // Signature: (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error { out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") if err != nil { return fmt.Errorf("query pods: %w", err) } var list podList if err := json.Unmarshal([]byte(out), &list); err != nil { return fmt.Errorf("decode pods: %w", err) } ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second if grace <= 0 { grace = 180 * time.Second } stuckReasons := map[string]struct{}{ "ImagePullBackOff": {}, "ErrImagePull": {}, "CrashLoopBackOff": {}, "CreateContainerConfigError": {}, "CreateContainerError": {}, } recycled := []string{} for _, pod := range list.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) if ns == "" || name == "" { continue } if _, ok := ignoredNamespaces[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, "", name) { continue } if podTargetsIgnoredNode(pod, ignoredNodes) { continue } if !podControllerOwned(pod) { continue } age := time.Since(pod.Metadata.CreationTimestamp) if !pod.Metadata.CreationTimestamp.IsZero() && age < grace { continue } reason := stuckContainerReason(pod, stuckReasons) if reason == "" { reason = stuckVaultInitReason(pod, grace) } if reason == "" { continue } o.log.Printf("warning: recycling stuck pod %s/%s reason=%s age=%s", ns, name, reason, age.Round(time.Second)) if _, err := o.kubectl(ctx, 30*time.Second, "-n", ns, "delete", "pod", name, "--wait=false"); err != nil && !isNotFoundErr(err) { o.log.Printf("warning: recycle pod failed for %s/%s: %v", ns, name, err) continue } recycled = append(recycled, ns+"/"+name) } if len(recycled) > 0 { sort.Strings(recycled) o.log.Printf("recycled stuck controller pods (%d): %s", len(recycled), joinLimited(recycled, 10)) o.noteStartupAutoHeal(fmt.Sprintf("recycled stuck controller pods: %s", joinLimited(recycled, 10))) } return nil } // podControllerOwned runs one orchestration or CLI step. // Signature: podControllerOwned(p podResource) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func podControllerOwned(p podResource) bool { for _, owner := range p.Metadata.OwnerReferences { switch strings.TrimSpace(owner.Kind) { case "ReplicaSet", "StatefulSet", "DaemonSet": return true } } return false } // stuckContainerReason runs one orchestration or CLI step. // Signature: stuckContainerReason(p podResource, reasons map[string]struct{}) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func stuckContainerReason(p podResource, reasons map[string]struct{}) string { check := func(statuses []podContainerStatus) string { for _, st := range statuses { if st.State.Waiting == nil { continue } reason := strings.TrimSpace(st.State.Waiting.Reason) if reason == "" { continue } if _, ok := reasons[reason]; ok { return reason } } return "" } if reason := check(p.Status.InitContainerStatuses); reason != "" { return reason } return check(p.Status.ContainerStatuses) } // stuckVaultInitReason runs one orchestration or CLI step. // Signature: stuckVaultInitReason(p podResource, grace time.Duration) string. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func stuckVaultInitReason(p podResource, grace time.Duration) string { if !strings.EqualFold(strings.TrimSpace(p.Status.Phase), "Pending") { return "" } if !strings.EqualFold(strings.TrimSpace(p.Metadata.Annotations["vault.hashicorp.com/agent-inject"]), "true") { return "" } for _, st := range p.Status.InitContainerStatuses { if strings.TrimSpace(st.Name) != "vault-agent-init" || st.State.Running == nil { continue } startedAt := st.State.Running.StartedAt if startedAt.IsZero() { continue } if time.Since(startedAt) < grace { return "" } return "VaultInitStuck" } return "" }