173 lines
6.2 KiB
Go
173 lines
6.2 KiB
Go
|
|
package cluster
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"fmt"
|
||
|
|
"sort"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
)
|
||
|
|
|
||
|
|
// waitForCriticalServiceEndpoints runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) waitForCriticalServiceEndpoints(ctx context.Context) error.
|
||
|
|
// Why: some externally-healthy services (like Grafana) still require backend
|
||
|
|
// in-cluster dependencies; endpoint checks catch this drift before startup passes.
|
||
|
|
func (o *Orchestrator) waitForCriticalServiceEndpoints(ctx context.Context) error {
|
||
|
|
wait := time.Duration(o.cfg.Startup.CriticalServiceEndpointWaitSec) * time.Second
|
||
|
|
if wait <= 0 {
|
||
|
|
wait = 7 * time.Minute
|
||
|
|
}
|
||
|
|
poll := time.Duration(o.cfg.Startup.CriticalServiceEndpointPollSec) * time.Second
|
||
|
|
if poll <= 0 {
|
||
|
|
poll = 5 * time.Second
|
||
|
|
}
|
||
|
|
deadline := time.Now().Add(wait)
|
||
|
|
lastFailure := "unknown"
|
||
|
|
lastLogged := time.Time{}
|
||
|
|
lastHealAttempt := time.Time{}
|
||
|
|
|
||
|
|
for {
|
||
|
|
ready, detail, failedNamespace, failedService, err := o.criticalServiceEndpointsReady(ctx)
|
||
|
|
if err != nil {
|
||
|
|
lastFailure = err.Error()
|
||
|
|
} else {
|
||
|
|
lastFailure = detail
|
||
|
|
}
|
||
|
|
if ready {
|
||
|
|
o.log.Printf("critical service endpoint checklist passed (%s)", detail)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
now := time.Now()
|
||
|
|
if !o.runner.DryRun && failedNamespace != "" && failedService != "" &&
|
||
|
|
(lastHealAttempt.IsZero() || now.Sub(lastHealAttempt) >= 30*time.Second) {
|
||
|
|
lastHealAttempt = now
|
||
|
|
healed, healErr := o.maybeHealCriticalEndpointBackends(ctx, failedNamespace, failedService)
|
||
|
|
if healErr != nil {
|
||
|
|
o.log.Printf("warning: critical endpoint backend auto-heal failed for %s/%s: %v", failedNamespace, failedService, healErr)
|
||
|
|
}
|
||
|
|
if len(healed) > 0 {
|
||
|
|
sort.Strings(healed)
|
||
|
|
healDetail := fmt.Sprintf("restored critical endpoint backends: %s", joinLimited(healed, 8))
|
||
|
|
o.log.Printf("%s", healDetail)
|
||
|
|
o.noteStartupAutoHeal(healDetail)
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if now.Sub(lastLogged) >= 30*time.Second {
|
||
|
|
remaining := time.Until(deadline).Round(time.Second)
|
||
|
|
if remaining < 0 {
|
||
|
|
remaining = 0
|
||
|
|
}
|
||
|
|
o.log.Printf("waiting for critical service endpoints (%s remaining): %s", remaining, lastFailure)
|
||
|
|
lastLogged = now
|
||
|
|
}
|
||
|
|
if time.Now().After(deadline) {
|
||
|
|
return fmt.Errorf("startup blocked: critical service endpoint checklist not satisfied within %s (%s)", wait, lastFailure)
|
||
|
|
}
|
||
|
|
select {
|
||
|
|
case <-ctx.Done():
|
||
|
|
return ctx.Err()
|
||
|
|
case <-time.After(poll):
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
// criticalServiceEndpointsReady runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) criticalServiceEndpointsReady(ctx context.Context) (bool, string, string, string, error).
|
||
|
|
// Why: startup should only declare success when critical services have active
|
||
|
|
// backend addresses, not just rendered objects.
|
||
|
|
func (o *Orchestrator) criticalServiceEndpointsReady(ctx context.Context) (bool, string, string, string, error) {
|
||
|
|
entries := o.cfg.Startup.CriticalServiceEndpoints
|
||
|
|
if len(entries) == 0 {
|
||
|
|
return true, "no critical service endpoints configured", "", "", nil
|
||
|
|
}
|
||
|
|
for _, entry := range entries {
|
||
|
|
namespace, service, err := parseCriticalServiceEndpoint(entry)
|
||
|
|
if err != nil {
|
||
|
|
return false, "", "", "", err
|
||
|
|
}
|
||
|
|
count, err := o.endpointAddressCount(ctx, namespace, service)
|
||
|
|
if err != nil {
|
||
|
|
if isNotFoundErr(err) {
|
||
|
|
return false, fmt.Sprintf("%s/%s not found", namespace, service), namespace, service, nil
|
||
|
|
}
|
||
|
|
return false, "", namespace, service, fmt.Errorf("query endpoints %s/%s: %w", namespace, service, err)
|
||
|
|
}
|
||
|
|
if count <= 0 {
|
||
|
|
return false, fmt.Sprintf("%s/%s endpoints=0", namespace, service), namespace, service, nil
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return true, fmt.Sprintf("services=%d", len(entries)), "", "", nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// maybeHealCriticalEndpointBackends runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) maybeHealCriticalEndpointBackends(ctx context.Context, namespace string, service string) ([]string, error).
|
||
|
|
// Why: endpoint-ready gating should include controlled self-healing for workloads
|
||
|
|
// that are configured to be available after startup.
|
||
|
|
func (o *Orchestrator) maybeHealCriticalEndpointBackends(ctx context.Context, namespace string, service string) ([]string, error) {
|
||
|
|
namespace = strings.TrimSpace(namespace)
|
||
|
|
service = strings.TrimSpace(service)
|
||
|
|
if namespace == "" || service == "" {
|
||
|
|
return nil, nil
|
||
|
|
}
|
||
|
|
healed := []string{}
|
||
|
|
for _, kind := range []string{"deployment", "statefulset"} {
|
||
|
|
workload := startupWorkload{Namespace: namespace, Kind: kind, Name: service}
|
||
|
|
if err := o.ensureWorkloadReplicas(ctx, workload, 1); err != nil {
|
||
|
|
if isNotFoundErr(err) {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
return healed, fmt.Errorf("scale %s/%s/%s to 1: %w", namespace, kind, service, err)
|
||
|
|
}
|
||
|
|
if err := o.waitWorkloadReady(ctx, workload); err != nil {
|
||
|
|
if !isNotFoundErr(err) {
|
||
|
|
return healed, err
|
||
|
|
}
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
healed = append(healed, namespace+"/"+kind+"/"+service)
|
||
|
|
}
|
||
|
|
return healed, nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// endpointAddressCount runs one orchestration or CLI step.
|
||
|
|
// Signature: (o *Orchestrator) endpointAddressCount(ctx context.Context, namespace string, service string) (int, error).
|
||
|
|
// Why: endpoint address counts provide an objective service-backend readiness
|
||
|
|
// signal independent of ingress/controller Ready conditions.
|
||
|
|
func (o *Orchestrator) endpointAddressCount(ctx context.Context, namespace string, service string) (int, error) {
|
||
|
|
out, err := o.kubectl(
|
||
|
|
ctx,
|
||
|
|
20*time.Second,
|
||
|
|
"-n",
|
||
|
|
namespace,
|
||
|
|
"get",
|
||
|
|
"endpoints",
|
||
|
|
service,
|
||
|
|
"-o",
|
||
|
|
`jsonpath={range .subsets[*].addresses[*]}{.ip}{"\n"}{end}`,
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
return 0, err
|
||
|
|
}
|
||
|
|
return len(lines(out)), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// parseCriticalServiceEndpoint runs one orchestration or CLI step.
|
||
|
|
// Signature: parseCriticalServiceEndpoint(entry string) (string, string, error).
|
||
|
|
// Why: endpoint config should be parsed consistently so validation and runtime
|
||
|
|
// checks agree on namespace/service identity.
|
||
|
|
func parseCriticalServiceEndpoint(entry string) (string, string, error) {
|
||
|
|
entry = strings.TrimSpace(entry)
|
||
|
|
parts := strings.SplitN(entry, "/", 2)
|
||
|
|
if len(parts) != 2 {
|
||
|
|
return "", "", fmt.Errorf("invalid startup.critical_service_endpoints entry %q: expected namespace/service", entry)
|
||
|
|
}
|
||
|
|
namespace := strings.TrimSpace(parts[0])
|
||
|
|
service := strings.TrimSpace(parts[1])
|
||
|
|
if namespace == "" || service == "" {
|
||
|
|
return "", "", fmt.Errorf("invalid startup.critical_service_endpoints entry %q: namespace and service must be non-empty", entry)
|
||
|
|
}
|
||
|
|
return namespace, service, nil
|
||
|
|
}
|