package cluster import ( "context" "encoding/json" "fmt" "sort" "strings" "time" ) // waitForFluxHealth runs one orchestration or CLI step. // Signature: (o *Orchestrator) waitForFluxHealth(ctx context.Context) error. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) waitForFluxHealth(ctx context.Context) error { wait := time.Duration(o.cfg.Startup.FluxHealthWaitSeconds) * time.Second if wait <= 0 { wait = 15 * time.Minute } if effective, reason, err := o.adaptiveFluxHealthWait(ctx, wait); err != nil { o.log.Printf("warning: unable to evaluate adaptive flux wait window: %v", err) } else if effective > wait { o.log.Printf("adjusted flux convergence wait window from %s to %s (%s)", wait, effective, reason) wait = effective } poll := time.Duration(o.cfg.Startup.FluxHealthPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second } deadline := time.Now().Add(wait) lastFailure := "unknown" lastLogged := time.Time{} lastImmutableHealAttempt := time.Time{} lastRecycleAttempt := time.Time{} lastReplicaHeal := time.Time{} for { o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt) o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal) prevFailure := lastFailure ready, detail, err := o.fluxHealthReady(ctx) if err != nil { lastFailure = err.Error() } else { lastFailure = detail } if ready { o.log.Printf("flux convergence check passed (%s)", detail) return nil } if !o.runner.DryRun && looksLikeImmutableJobError(lastFailure) && time.Since(lastImmutableHealAttempt) >= 30*time.Second { lastImmutableHealAttempt = time.Now() healed, healErr := o.healImmutableFluxJobs(ctx) if healErr != nil { o.log.Printf("warning: immutable-job self-heal attempt failed: %v", healErr) } else if healed { o.log.Printf("detected immutable-job failure and removed stale failed job(s); re-requesting reconcile") o.noteStartupAutoHeal("deleted stale failed flux-managed job(s) after immutable template error") o.bestEffort("reconcile flux after immutable-job cleanup", func() error { return o.resumeFluxAndReconcile(ctx) }) } } if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second { remaining := time.Until(deadline).Round(time.Second) if remaining < 0 { remaining = 0 } o.log.Printf("waiting for Flux convergence (%s remaining): %s", remaining, lastFailure) lastLogged = time.Now() } if time.Now().After(deadline) { return fmt.Errorf("startup blocked: flux convergence not satisfied within %s (%s)", wait, lastFailure) } select { case <-ctx.Done(): return ctx.Err() case <-time.After(poll): } } } // adaptiveFluxHealthWait runs one orchestration or CLI step. // Signature: (o *Orchestrator) adaptiveFluxHealthWait(ctx context.Context, base time.Duration) (time.Duration, string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) adaptiveFluxHealthWait(ctx context.Context, base time.Duration) (time.Duration, string, error) { if base <= 0 { base = 15 * time.Minute } out, err := o.kubectl(ctx, 20*time.Second, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json") if err != nil { return base, "", fmt.Errorf("query flux kustomizations: %w", err) } var list fluxKustomizationList if err := json.Unmarshal([]byte(out), &list); err != nil { return base, "", fmt.Errorf("decode flux kustomizations: %w", err) } maxTimeout := time.Duration(0) maxName := "" for _, ks := range list.Items { if ks.Spec.Suspend { continue } timeout := parseFluxKustomizationTimeout(ks.Spec.Timeout) if timeout <= maxTimeout { continue } maxTimeout = timeout maxName = strings.TrimSpace(ks.Metadata.Namespace) + "/" + strings.TrimSpace(ks.Metadata.Name) } if maxTimeout <= 0 { return base, "no explicit kustomization timeouts found", nil } required := maxTimeout + 2*time.Minute if required <= base { return base, fmt.Sprintf("max flux timeout %s on %s", maxTimeout, maxName), nil } return required, fmt.Sprintf("max flux timeout %s on %s", maxTimeout, maxName), nil } // parseFluxKustomizationTimeout runs one orchestration or CLI step. // Signature: parseFluxKustomizationTimeout(raw string) time.Duration. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func parseFluxKustomizationTimeout(raw string) time.Duration { raw = strings.TrimSpace(raw) if raw == "" { return 0 } d, err := time.ParseDuration(raw) if err != nil { return 0 } return d } // fluxHealthReady runs one orchestration or CLI step. // Signature: (o *Orchestrator) fluxHealthReady(ctx context.Context) (bool, string, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) fluxHealthReady(ctx context.Context) (bool, string, error) { out, err := o.kubectl(ctx, 20*time.Second, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json") if err != nil { return false, "", fmt.Errorf("query flux kustomizations: %w", err) } var list fluxKustomizationList if err := json.Unmarshal([]byte(out), &list); err != nil { return false, "", fmt.Errorf("decode flux kustomizations: %w", err) } ignored := makeStringSet(o.cfg.Startup.IgnoreFluxKustomizations) required := o.startupRequiredFluxKustomizations() requiredSeen := map[string]struct{}{} notReady := []string{} for _, ks := range list.Items { ns := strings.TrimSpace(ks.Metadata.Namespace) name := strings.TrimSpace(ks.Metadata.Name) if ns == "" || name == "" { continue } full := ns + "/" + name if ks.Spec.Suspend { continue } if len(required) > 0 { if _, ok := required[full]; !ok { continue } requiredSeen[full] = struct{}{} } if _, ok := ignored[full]; ok { continue } cond := readyCondition(ks.Status.Conditions) if cond != nil && strings.EqualFold(strings.TrimSpace(cond.Status), "True") { continue } reason := "ready condition missing" if cond != nil { reason = strings.TrimSpace(cond.Message) if reason == "" { reason = strings.TrimSpace(cond.Reason) } if reason == "" { reason = "ready=false" } } notReady = append(notReady, fmt.Sprintf("%s(%s)", full, reason)) } if len(required) > 0 { missing := []string{} for full := range required { if _, ok := requiredSeen[full]; !ok { missing = append(missing, full+"(missing)") } } if len(missing) > 0 { sort.Strings(missing) notReady = append(notReady, missing...) } } if len(notReady) > 0 { sort.Strings(notReady) return false, "not ready: " + joinLimited(notReady, 6), nil } if len(required) > 0 { return true, fmt.Sprintf("required kustomizations ready=%d", len(requiredSeen)), nil } return true, fmt.Sprintf("all kustomizations ready=%d", len(list.Items)), nil } // looksLikeImmutableJobError runs one orchestration or CLI step. // Signature: looksLikeImmutableJobError(detail string) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func looksLikeImmutableJobError(detail string) bool { d := strings.ToLower(strings.TrimSpace(detail)) if d == "" { return false } return strings.Contains(d, "field is immutable") && strings.Contains(d, "job") } // healImmutableFluxJobs runs one orchestration or CLI step. // Signature: (o *Orchestrator) healImmutableFluxJobs(ctx context.Context) (bool, error). // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func (o *Orchestrator) healImmutableFluxJobs(ctx context.Context) (bool, error) { out, err := o.kubectl(ctx, 25*time.Second, "get", "jobs", "-A", "-o", "json") if err != nil { return false, fmt.Errorf("query jobs: %w", err) } var list jobList if err := json.Unmarshal([]byte(out), &list); err != nil { return false, fmt.Errorf("decode jobs: %w", err) } deleted := []string{} for _, job := range list.Items { ns := strings.TrimSpace(job.Metadata.Namespace) name := strings.TrimSpace(job.Metadata.Name) if ns == "" || name == "" { continue } if !jobLooksFluxManaged(job) { continue } if !jobFailed(job) { continue } o.log.Printf("warning: deleting stale failed flux-managed job %s/%s to recover immutable template drift", ns, name) if _, err := o.kubectl(ctx, 20*time.Second, "-n", ns, "delete", "job", name, "--wait=false"); err != nil && !isNotFoundErr(err) { o.log.Printf("warning: delete failed for stale job %s/%s: %v", ns, name, err) continue } deleted = append(deleted, ns+"/"+name) } if len(deleted) == 0 { return false, nil } sort.Strings(deleted) o.log.Printf("immutable-job cleanup removed %d job(s): %s", len(deleted), joinLimited(deleted, 8)) return true, nil } // jobLooksFluxManaged runs one orchestration or CLI step. // Signature: jobLooksFluxManaged(job jobResource) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func jobLooksFluxManaged(job jobResource) bool { if strings.TrimSpace(job.Metadata.Labels["kustomize.toolkit.fluxcd.io/name"]) != "" { return true } for _, owner := range job.Metadata.OwnerReferences { if strings.EqualFold(strings.TrimSpace(owner.Kind), "CronJob") { return false } } return false } // jobFailed runs one orchestration or CLI step. // Signature: jobFailed(job jobResource) bool. // Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve. func jobFailed(job jobResource) bool { if job.Status.Succeeded > 0 { return false } if job.Status.Failed <= 0 { return false } for _, cond := range job.Status.Conditions { if strings.EqualFold(strings.TrimSpace(cond.Type), "Failed") && strings.EqualFold(strings.TrimSpace(cond.Status), "True") { return true } } return false }