package cluster import ( "context" "encoding/json" "fmt" "sort" "strings" "time" ) // waitForWorkloadConvergence runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitForWorkloadConvergence(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.WorkloadConvergenceWaitSeconds) * time.Second if wait <= 0 { wait = 15 * time.Minute } poll := time.Duration(o.cfg.Startup.WorkloadConvergencePollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} lastSchedulingStormHeal := time.Time{} for { prevFailure := lastFailure o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) o.maybeAutoQuarantineSchedulingStorms(ctx, &lastSchedulingStormHeal) ready, detail, err := o.workloadConvergenceReady(ctx) if err != nil { lastFailure = err.Error() } else { lastFailure = detail } if ready { o.log.Printf("workload convergence check passed (%s)", detail) return nil } if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for workload convergence (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: workload convergence not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } // workloadConvergenceReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) workloadConvergenceReady(ctx context.Context) (bool, string, error) { out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset,daemonset", "-A", "-o", "json") if err != nil { return false, "", fmt.Errorf("query controllers: %w", err) } var list workloadList if err := json.Unmarshal([]byte(out), &list); err != nil { return false, "", fmt.Errorf("decode controllers: %w", err) } requiredNamespaces := o.startupRequiredWorkloadNamespaces() ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) ignoredByFlux := namespaceCandidatesFromIgnoreKustomizations(o.cfg.Startup.IgnoreFluxKustomizations) pending := []string{} checked := 0 for _, item := range list.Items { kind := strings.ToLower(strings.TrimSpace(item.Kind)) ns := strings.TrimSpace(item.Metadata.Namespace) name := strings.TrimSpace(item.Metadata.Name) if kind == "" || ns == "" || name == "" { continue } if len(requiredNamespaces) > 0 { if _, ok := requiredNamespaces[ns]; !ok { continue } } if _, ok := ignoredNamespaces[ns]; ok { continue } if _, ok := ignoredByFlux[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, kind, name) { continue } if workloadTargetsIgnoredNodes(item.Spec.Template.Spec, ignoredNodes) { continue } desired, ready, ok := desiredReady(item) if !ok || desired <= 0 { continue } if kind == "daemonset" && desired > ready && len(ignoredNodes) > 0 { missing := desired - ready if missing <= int32(len(ignoredNodes)) { ready = desired } } checked++ if ready < desired { pending = append(pending, fmt.Sprintf("%s/%s/%s ready=%d desired=%d", ns, kind, name, ready, desired)) } } if len(pending) > 0 { sort.Strings(pending) return false, "not ready: " + joinLimited(pending, 8), nil } return true, fmt.Sprintf("controllers ready=%d", checked), nil } // desiredReady runs one orchestration or CLI step. // Signature: desiredReady(item workloadResource) (int32, int32, bool). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func desiredReady(item workloadResource) (int32, int32, bool) { switch strings.ToLower(strings.TrimSpace(item.Kind)) { case "deployment", "statefulset": desired := int32(1) if item.Spec.Replicas != nil { desired = *item.Spec.Replicas } return desired, item.Status.ReadyReplicas, true case "daemonset": return item.Status.DesiredNumberScheduled, item.Status.NumberReady, true default: return 0, 0, false } } // recycleStuckControllerPods runs one orchestration or CLI step. // Signature: (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) recycleStuckControllerPods(ctx context.Context) error { out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") if err != nil { return fmt.Errorf("query pods: %w", err) } var list podList if err := json.Unmarshal([]byte(out), &list); err != nil { return fmt.Errorf("decode pods: %w", err) } ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second if grace <= 0 { grace = 180 * time.Second } stuckReasons := map[string]struct{}{ "ImagePullBackOff": {}, "ErrImagePull": {}, "CrashLoopBackOff": {}, "CreateContainerConfigError": {}, "CreateContainerError": {}, } longhornAttachReasons := map[string]string{} if reasons, scanErr := o.longhornAttachBlockedPodReasons(ctx, list, grace); scanErr != nil { o.log.Printf("warning: longhorn attach-blocked pod scan failed: %v", scanErr) } else { longhornAttachReasons = reasons } encryptedMountReasons := map[string]string{} if reasons, scanErr := o.repairEncryptedVolumeMountPrereqs(ctx, list, grace); scanErr != nil { o.log.Printf("warning: encrypted volume mount prerequisite scan failed: %v", scanErr) } else { encryptedMountReasons = reasons } stalePhaseReasons := map[string]string{} if reasons, scanErr := o.staleControllerPodReasons(ctx, list, grace); scanErr != nil { o.log.Printf("warning: stale controller pod scan failed: %v", scanErr) } else { stalePhaseReasons = reasons } containerRuntimeWedgeReasons := map[string]string{} if reasons, scanErr := o.containerRuntimeWedgePodReasons(ctx, list, grace); scanErr != nil { o.log.Printf("warning: container runtime wedge scan failed: %v", scanErr) } else { containerRuntimeWedgeReasons = reasons o.quarantineContainerRuntimeWedgeNodes(ctx, list, reasons, grace, ignoredNamespaces, ignoredNodes, ignoreRules) } recycled := []string{} for _, pod := range list.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) if ns == "" || name == "" { continue } if _, ok := ignoredNamespaces[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, "", name) { continue } if podTargetsIgnoredNode(pod, ignoredNodes) { continue } if !podControllerOwned(pod) { continue } age := time.Since(pod.Metadata.CreationTimestamp) if !pod.Metadata.CreationTimestamp.IsZero() && age < grace { continue } reason := stuckContainerReason(pod, stuckReasons) if reason == "" { reason = stuckVaultInitReason(pod, grace) } if reason == "" { reason = longhornAttachReasons[ns+"/"+name] } if reason == "" { reason = encryptedMountReasons[ns+"/"+name] } if reason == "" { reason = stalePhaseReasons[ns+"/"+name] } if runtimeReason := containerRuntimeWedgeReasons[ns+"/"+name]; runtimeReason != "" { reason = runtimeReason } if reason == "" && staleControllerPodForceDeleteSafe(pod, grace) { reason = "StaleDeletingControllerPod" } if reason == "" { continue } deleteArgs := []string{"-n", ns, "delete", "pod", name, "--wait=false"} forceDelete := staleControllerPodForceDeleteSafe(pod, grace) if forceDelete { deleteArgs = append(deleteArgs, "--grace-period=0", "--force") } if forceDelete { o.log.Printf("warning: force recycling stuck pod %s/%s reason=%s age=%s", ns, name, reason, age.Round(time.Second)) } else { o.log.Printf("warning: recycling stuck pod %s/%s reason=%s age=%s", ns, name, reason, age.Round(time.Second)) } if _, err := o.kubectl(ctx, 30*time.Second, deleteArgs...); err != nil && !isNotFoundErr(err) { o.log.Printf("warning: recycle pod failed for %s/%s: %v", ns, name, err) continue } recycled = append(recycled, ns+"/"+name) } if len(recycled) > 0 { sort.Strings(recycled) o.log.Printf("recycled stuck controller pods (%d): %s", len(recycled), joinLimited(recycled, 10)) o.noteStartupAutoHeal(fmt.Sprintf("recycled stuck controller pods: %s", joinLimited(recycled, 10))) } return nil } // quarantineContainerRuntimeWedgeNodesFromCluster runs one orchestration or CLI step. // Signature: (o *Orchestrator) quarantineContainerRuntimeWedgeNodesFromCluster(ctx context.Context) ([]string, error). // Why: worker startup needs this scan before SSH-heavy steps so a Ready but // container-runtime-wedged node cannot stall the whole recovery run. func (o *Orchestrator) quarantineContainerRuntimeWedgeNodesFromCluster(ctx context.Context) ([]string, error) { out, err := o.kubectl(ctx, 30*time.Second, "get", "pods", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query pods for container runtime wedge scan: %w", err) } var list podList if err := json.Unmarshal([]byte(out), &list); err != nil { return nil, fmt.Errorf("decode pods for container runtime wedge scan: %w", err) } grace := time.Duration(o.cfg.Startup.StuckPodGraceSeconds) * time.Second if grace <= 0 { grace = 180 * time.Second } reasons, err := o.containerRuntimeWedgePodReasons(ctx, list, grace) if err != nil { return nil, err } ignoredNamespaces := makeStringSet(o.cfg.Startup.IgnoreWorkloadNamespaces) ignoredNodes := makeStringSet(o.cfg.Startup.IgnoreUnavailableNodes) ignoreRules := parseWorkloadIgnoreRules(o.cfg.Startup.IgnoreWorkloads) return o.quarantineContainerRuntimeWedgeNodes(ctx, list, reasons, grace, ignoredNamespaces, ignoredNodes, ignoreRules), nil } // containerRuntimeWedgePodReasons runs one orchestration or CLI step. // Signature: (o *Orchestrator) containerRuntimeWedgePodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error). // Why: after a power event, a node-local container runtime can reserve names and // fail every new container start while Kubernetes still reports the node Ready. // Detecting the runtime symptom lets startup move work elsewhere without // restarting the node or touching storage objects. func (o *Orchestrator) containerRuntimeWedgePodReasons(ctx context.Context, pods podList, grace time.Duration) (map[string]string, error) { eventsOut, err := o.kubectl(ctx, 30*time.Second, "get", "events", "-A", "-o", "json") if err != nil { return nil, fmt.Errorf("query events for container runtime wedge scan: %w", err) } var events eventList if err := json.Unmarshal([]byte(eventsOut), &events); err != nil { return nil, fmt.Errorf("decode events for container runtime wedge scan: %w", err) } runtimeReasons := map[string]struct{}{ "CreateContainerError": {}, "RunContainerError": {}, } podsByKey := map[string]podResource{} for _, pod := range pods.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) node := strings.TrimSpace(pod.Spec.NodeName) if ns == "" || name == "" || node == "" { continue } if !strings.EqualFold(strings.TrimSpace(pod.Status.Phase), "Pending") { continue } if !podControllerOwned(pod) { continue } if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { continue } if stuckContainerReason(pod, runtimeReasons) == "" { continue } podsByKey[ns+"/"+name] = pod } if len(podsByKey) == 0 { return map[string]string{}, nil } reasons := map[string]string{} for _, event := range events.Items { if !strings.EqualFold(strings.TrimSpace(event.Type), "Warning") { continue } if strings.TrimSpace(event.Reason) != "Failed" { continue } if !strings.EqualFold(strings.TrimSpace(event.InvolvedObject.Kind), "Pod") { continue } key := strings.TrimSpace(event.InvolvedObject.Namespace) + "/" + strings.TrimSpace(event.InvolvedObject.Name) pod, ok := podsByKey[key] if !ok { continue } lastSeen := eventLastObservedAt(event) if !lastSeen.IsZero() && !pod.Metadata.CreationTimestamp.IsZero() && lastSeen.Before(pod.Metadata.CreationTimestamp) { continue } message := strings.ToLower(strings.TrimSpace(event.Message)) if !strings.Contains(message, "failed to reserve container name") && !strings.Contains(message, " is reserved for ") && !strings.Contains(message, "context deadline exceeded") { continue } reasons[key] = "ContainerRuntimeWedge:" + strings.TrimSpace(pod.Spec.NodeName) } return reasons, nil } // quarantineContainerRuntimeWedgeNodes runs one orchestration or CLI step. // Signature: (o *Orchestrator) quarantineContainerRuntimeWedgeNodes(ctx context.Context, pods podList, reasons map[string]string, grace time.Duration, ignoredNamespaces map[string]struct{}, ignoredNodes map[string]struct{}, ignoreRules []workloadIgnoreRule) []string. // Why: cordoning a proven-bad start node is scheduler-only; it prevents fresh // non-storage pods from being trapped while leaving running workloads and // Longhorn data-plane state alone. func (o *Orchestrator) quarantineContainerRuntimeWedgeNodes(ctx context.Context, pods podList, reasons map[string]string, grace time.Duration, ignoredNamespaces map[string]struct{}, ignoredNodes map[string]struct{}, ignoreRules []workloadIgnoreRule) []string { if len(reasons) == 0 { return nil } const minRuntimeWedgePodsPerNode = 2 byNode := map[string][]string{} for _, pod := range pods.Items { ns := strings.TrimSpace(pod.Metadata.Namespace) name := strings.TrimSpace(pod.Metadata.Name) node := strings.TrimSpace(pod.Spec.NodeName) if ns == "" || name == "" || node == "" { continue } key := ns + "/" + name if reasons[key] == "" { continue } if _, ok := ignoredNamespaces[ns]; ok { continue } if workloadIgnored(ignoreRules, ns, "", name) { continue } if podTargetsIgnoredNode(pod, ignoredNodes) { continue } if !podControllerOwned(pod) { continue } if !pod.Metadata.CreationTimestamp.IsZero() && time.Since(pod.Metadata.CreationTimestamp) < grace { continue } if podUsesPersistentVolumeClaim(pod) { continue } byNode[node] = append(byNode[node], key) } quarantined := []string{} for node, keys := range byNode { if len(keys) < minRuntimeWedgePodsPerNode { continue } sort.Strings(keys) detail := fmt.Sprintf("pods=%d %s", len(keys), joinLimited(keys, 8)) if err := o.cordonNodeWithLease(ctx, node, cordonReasonRuntimeWedge, detail); err != nil { o.log.Printf("warning: cordon container-runtime-wedged node %s failed: %v", node, err) continue } o.log.Printf("warning: cordoned node %s after repeated container runtime start failures: %s", node, joinLimited(keys, 8)) quarantined = append(quarantined, fmt.Sprintf("%s pods=%d", node, len(keys))) } if len(quarantined) == 0 { return nil } sort.Strings(quarantined) o.noteStartupAutoHeal(fmt.Sprintf("cordoned container-runtime-wedged node(s): %s", joinLimited(quarantined, 8))) nodes := make([]string, 0, len(quarantined)) for _, item := range quarantined { fields := strings.Fields(item) if len(fields) > 0 { nodes = append(nodes, fields[0]) } } return nodes }