diff --git a/internal/cluster/orchestrator.go b/internal/cluster/orchestrator.go index ffb8187..9ea5da4 100644 --- a/internal/cluster/orchestrator.go +++ b/internal/cluster/orchestrator.go @@ -2267,6 +2267,7 @@ type fluxKustomization struct { } `json:"metadata"` Spec struct { Suspend bool `json:"suspend"` + Timeout string `json:"timeout"` } `json:"spec"` Status struct { Conditions []fluxCondition `json:"conditions"` @@ -3009,6 +3010,12 @@ func (o *Orchestrator) waitForFluxHealth(ctx context.Context) error { if wait <= 0 { wait = 15 * time.Minute } + if effective, reason, err := o.adaptiveFluxHealthWait(ctx, wait); err != nil { + o.log.Printf("warning: unable to evaluate adaptive flux wait window: %v", err) + } else if effective > wait { + o.log.Printf("adjusted flux convergence wait window from %s to %s (%s)", wait, effective, reason) + wait = effective + } poll := time.Duration(o.cfg.Startup.FluxHealthPollSeconds) * time.Second if poll <= 0 { poll = 5 * time.Second @@ -3063,6 +3070,53 @@ func (o *Orchestrator) waitForFluxHealth(ctx context.Context) error { } } +func (o *Orchestrator) adaptiveFluxHealthWait(ctx context.Context, base time.Duration) (time.Duration, string, error) { + if base <= 0 { + base = 15 * time.Minute + } + out, err := o.kubectl(ctx, 20*time.Second, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json") + if err != nil { + return base, "", fmt.Errorf("query flux kustomizations: %w", err) + } + var list fluxKustomizationList + if err := json.Unmarshal([]byte(out), &list); err != nil { + return base, "", fmt.Errorf("decode flux kustomizations: %w", err) + } + maxTimeout := time.Duration(0) + maxName := "" + for _, ks := range list.Items { + if ks.Spec.Suspend { + continue + } + timeout := parseFluxKustomizationTimeout(ks.Spec.Timeout) + if timeout <= maxTimeout { + continue + } + maxTimeout = timeout + maxName = strings.TrimSpace(ks.Metadata.Namespace) + "/" + strings.TrimSpace(ks.Metadata.Name) + } + if maxTimeout <= 0 { + return base, "no explicit kustomization timeouts found", nil + } + required := maxTimeout + 2*time.Minute + if required <= base { + return base, fmt.Sprintf("max flux timeout %s on %s", maxTimeout, maxName), nil + } + return required, fmt.Sprintf("max flux timeout %s on %s", maxTimeout, maxName), nil +} + +func parseFluxKustomizationTimeout(raw string) time.Duration { + raw = strings.TrimSpace(raw) + if raw == "" { + return 0 + } + d, err := time.ParseDuration(raw) + if err != nil { + return 0 + } + return d +} + func (o *Orchestrator) fluxHealthReady(ctx context.Context) (bool, string, error) { out, err := o.kubectl(ctx, 20*time.Second, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json") if err != nil { diff --git a/internal/cluster/orchestrator_test.go b/internal/cluster/orchestrator_test.go index 6a88cc7..c9254ca 100644 --- a/internal/cluster/orchestrator_test.go +++ b/internal/cluster/orchestrator_test.go @@ -172,6 +172,21 @@ func TestProbeStatusAcceptedRejects404(t *testing.T) { } } +func TestParseFluxKustomizationTimeout(t *testing.T) { + if got := parseFluxKustomizationTimeout("30m"); got != 30*time.Minute { + t.Fatalf("expected 30m duration, got %s", got) + } + if got := parseFluxKustomizationTimeout("5m30s"); got != 5*time.Minute+30*time.Second { + t.Fatalf("expected 5m30s duration, got %s", got) + } + if got := parseFluxKustomizationTimeout(""); got != 0 { + t.Fatalf("expected zero duration for empty timeout, got %s", got) + } + if got := parseFluxKustomizationTimeout("not-a-duration"); got != 0 { + t.Fatalf("expected zero duration for invalid timeout, got %s", got) + } +} + func TestServiceCheckReadyRequiresBodyContains(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK)