390 lines
13 KiB
Go
390 lines
13 KiB
Go
package cluster
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
"unicode"
|
|
|
|
"scm.bstein.dev/bstein/ananke/internal/config"
|
|
)
|
|
|
|
// isLikelyHostname runs one orchestration or CLI step.
|
|
// Signature: isLikelyHostname(value string) bool.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func isLikelyHostname(value string) bool {
|
|
value = strings.TrimSpace(value)
|
|
if value == "" {
|
|
return false
|
|
}
|
|
if strings.Contains(value, " ") || strings.Contains(value, "/") {
|
|
return false
|
|
}
|
|
return strings.Contains(value, ".")
|
|
}
|
|
|
|
// healIngressHostBackendReplicas runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) healIngressHostBackendReplicas(ctx context.Context, host string) ([]string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) healIngressHostBackendReplicas(ctx context.Context, host string) ([]string, error) {
|
|
namespaces, err := o.discoverIngressNamespacesForHost(ctx, host)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(namespaces) == 0 {
|
|
return nil, nil
|
|
}
|
|
targetNamespaces := makeStringSet(namespaces)
|
|
out, err := o.kubectl(ctx, 30*time.Second, "get", "deploy,statefulset", "-A", "-o", "json")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query workloads: %w", err)
|
|
}
|
|
var list workloadList
|
|
if err := json.Unmarshal([]byte(out), &list); err != nil {
|
|
return nil, fmt.Errorf("decode workloads: %w", err)
|
|
}
|
|
healed := []string{}
|
|
for _, item := range list.Items {
|
|
kind := strings.ToLower(strings.TrimSpace(item.Kind))
|
|
ns := strings.TrimSpace(item.Metadata.Namespace)
|
|
name := strings.TrimSpace(item.Metadata.Name)
|
|
if kind == "" || ns == "" || name == "" {
|
|
continue
|
|
}
|
|
if kind != "deployment" && kind != "statefulset" {
|
|
continue
|
|
}
|
|
if _, ok := targetNamespaces[ns]; !ok {
|
|
continue
|
|
}
|
|
desired := int32(1)
|
|
if item.Spec.Replicas != nil {
|
|
desired = *item.Spec.Replicas
|
|
}
|
|
if desired >= 1 {
|
|
continue
|
|
}
|
|
workload := startupWorkload{Namespace: ns, Kind: kind, Name: name}
|
|
if err := o.ensureWorkloadReplicas(ctx, workload, 1); err != nil {
|
|
if isNotFoundErr(err) {
|
|
continue
|
|
}
|
|
return healed, fmt.Errorf("scale %s/%s/%s to 1: %w", ns, kind, name, err)
|
|
}
|
|
healed = append(healed, ns+"/"+kind+"/"+name)
|
|
}
|
|
return healed, nil
|
|
}
|
|
|
|
// waitForServiceChecklist runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) waitForServiceChecklist(ctx context.Context) error {
|
|
wait := time.Duration(o.cfg.Startup.ServiceChecklistWaitSeconds) * time.Second
|
|
if wait <= 0 {
|
|
wait = 7 * time.Minute
|
|
}
|
|
poll := time.Duration(o.cfg.Startup.ServiceChecklistPollSeconds) * time.Second
|
|
if poll <= 0 {
|
|
poll = 5 * time.Second
|
|
}
|
|
deadline := time.Now().Add(wait)
|
|
lastFailure := "unknown"
|
|
lastLogged := time.Time{}
|
|
lastRecycleAttempt := time.Time{}
|
|
lastReplicaHeal := time.Time{}
|
|
lastIngressHeal := time.Time{}
|
|
for {
|
|
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
|
|
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
|
|
prevFailure := lastFailure
|
|
ready, detail := o.serviceChecklistReady(ctx)
|
|
lastFailure = detail
|
|
if ready {
|
|
o.log.Printf("external service checklist passed (%s)", detail)
|
|
return nil
|
|
}
|
|
o.maybeAutoHealIngressHostBackends(ctx, &lastIngressHeal, lastFailure)
|
|
if lastFailure != prevFailure || time.Since(lastLogged) >= 30*time.Second {
|
|
remaining := time.Until(deadline).Round(time.Second)
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
o.log.Printf("waiting for external service checklist (%s remaining): %s", remaining, lastFailure)
|
|
lastLogged = time.Now()
|
|
}
|
|
if time.Now().After(deadline) {
|
|
return fmt.Errorf("startup blocked: external service checklist not satisfied within %s (%s)", wait, lastFailure)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(poll):
|
|
}
|
|
}
|
|
}
|
|
|
|
// serviceChecklistReady runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) serviceChecklistReady(ctx context.Context) (bool, string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) serviceChecklistReady(ctx context.Context) (bool, string) {
|
|
checks := o.cfg.Startup.ServiceChecklist
|
|
if len(checks) == 0 {
|
|
return true, "no checklist items configured"
|
|
}
|
|
for _, check := range checks {
|
|
ok, detail := o.serviceCheckReady(ctx, check)
|
|
if !ok {
|
|
name := strings.TrimSpace(check.Name)
|
|
if name == "" {
|
|
name = strings.TrimSpace(check.URL)
|
|
}
|
|
return false, fmt.Sprintf("%s: %s", name, detail)
|
|
}
|
|
}
|
|
return true, fmt.Sprintf("checks=%d", len(checks))
|
|
}
|
|
|
|
// serviceCheckReady runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) serviceCheckReady(ctx context.Context, check config.ServiceChecklistCheck) (bool, string).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) serviceCheckReady(ctx context.Context, check config.ServiceChecklistCheck) (bool, string) {
|
|
result, err := o.httpChecklistProbeResult(ctx, check)
|
|
if err != nil {
|
|
return false, err.Error()
|
|
}
|
|
|
|
accepted := check.AcceptedStatuses
|
|
if len(accepted) == 0 {
|
|
accepted = []int{200, 201, 202, 203, 204, 301, 302, 303, 307, 308, 401, 403}
|
|
}
|
|
statusOk := false
|
|
for _, code := range accepted {
|
|
if result.Status == code {
|
|
statusOk = true
|
|
break
|
|
}
|
|
}
|
|
if !statusOk {
|
|
return false, fmt.Sprintf("unexpected status code=%d", result.Status)
|
|
}
|
|
|
|
locationContains := strings.TrimSpace(check.LocationContains)
|
|
if locationContains != "" && !checklistContains(result.Location, locationContains) {
|
|
return false, fmt.Sprintf("location header missing expected marker %q", locationContains)
|
|
}
|
|
|
|
locationNotContains := strings.TrimSpace(check.LocationNotContains)
|
|
if locationNotContains != "" && checklistContains(result.Location, locationNotContains) {
|
|
return false, fmt.Sprintf("location header contained forbidden marker %q", locationNotContains)
|
|
}
|
|
|
|
bodyContains := strings.TrimSpace(check.BodyContains)
|
|
if bodyContains != "" && !checklistContains(result.Body, bodyContains) {
|
|
return false, fmt.Sprintf("response missing expected marker %q", bodyContains)
|
|
}
|
|
|
|
bodyNotContains := strings.TrimSpace(check.BodyNotContains)
|
|
if bodyNotContains != "" && checklistContains(result.Body, bodyNotContains) {
|
|
return false, fmt.Sprintf("response contained forbidden marker %q", bodyNotContains)
|
|
}
|
|
|
|
return true, fmt.Sprintf("status=%d", result.Status)
|
|
}
|
|
|
|
type checklistHTTPProbeResult struct {
|
|
Status int
|
|
Body string
|
|
Location string
|
|
}
|
|
|
|
// httpChecklistProbeResult runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) httpChecklistProbeResult(ctx context.Context, check config.ServiceChecklistCheck) (checklistHTTPProbeResult, error).
|
|
// Why: checklist checks need response headers (for redirect verification) in
|
|
// addition to status/body so startup can validate real user-facing behavior.
|
|
func (o *Orchestrator) httpChecklistProbeResult(ctx context.Context, check config.ServiceChecklistCheck) (checklistHTTPProbeResult, error) {
|
|
result := checklistHTTPProbeResult{}
|
|
status, body, location, err := o.httpChecklistProbeWithLocation(ctx, check)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
result.Status = status
|
|
result.Body = body
|
|
result.Location = location
|
|
return result, nil
|
|
}
|
|
|
|
// httpChecklistProbe runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) httpChecklistProbe(ctx context.Context, check config.ServiceChecklistCheck) (int, string, error).
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) httpChecklistProbe(ctx context.Context, check config.ServiceChecklistCheck) (int, string, error) {
|
|
status, body, _, err := o.httpChecklistProbeWithLocation(ctx, check)
|
|
return status, body, err
|
|
}
|
|
|
|
// httpChecklistProbeWithLocation runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) httpChecklistProbeWithLocation(ctx context.Context, check config.ServiceChecklistCheck) (int, string, string, error).
|
|
// Why: redirects and auth gates require location-header assertions to prevent
|
|
// startup false-positives on partially healthy protected services.
|
|
func (o *Orchestrator) httpChecklistProbeWithLocation(ctx context.Context, check config.ServiceChecklistCheck) (int, string, string, error) {
|
|
timeout := time.Duration(check.TimeoutSeconds) * time.Second
|
|
if timeout <= 0 {
|
|
timeout = 12 * time.Second
|
|
}
|
|
|
|
transport := &http.Transport{}
|
|
if check.InsecureSkipTLS {
|
|
transport.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
|
}
|
|
client := &http.Client{
|
|
Timeout: timeout,
|
|
Transport: transport,
|
|
CheckRedirect: func(_ *http.Request, _ []*http.Request) error {
|
|
return http.ErrUseLastResponse
|
|
},
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, strings.TrimSpace(check.URL), nil)
|
|
if err != nil {
|
|
return 0, "", "", fmt.Errorf("build request: %w", err)
|
|
}
|
|
req.Header.Set("User-Agent", "ananke/startup-checklist")
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return 0, "", "", fmt.Errorf("request failed: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
body, readErr := io.ReadAll(io.LimitReader(resp.Body, 64*1024))
|
|
if readErr != nil {
|
|
return resp.StatusCode, "", "", fmt.Errorf("read response body: %w", readErr)
|
|
}
|
|
|
|
return resp.StatusCode, string(body), strings.TrimSpace(resp.Header.Get("Location")), nil
|
|
}
|
|
|
|
// checklistContains runs one orchestration or CLI step.
|
|
// Signature: checklistContains(body, marker string) bool.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func checklistContains(body, marker string) bool {
|
|
bodyLower := strings.ToLower(body)
|
|
markerLower := strings.ToLower(marker)
|
|
if strings.Contains(bodyLower, markerLower) {
|
|
return true
|
|
}
|
|
bodyCompact := compactLowerNoSpace(bodyLower)
|
|
markerCompact := compactLowerNoSpace(markerLower)
|
|
if markerCompact == "" {
|
|
return true
|
|
}
|
|
return strings.Contains(bodyCompact, markerCompact)
|
|
}
|
|
|
|
// compactLowerNoSpace runs one orchestration or CLI step.
|
|
// Signature: compactLowerNoSpace(s string) string.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func compactLowerNoSpace(s string) string {
|
|
var b strings.Builder
|
|
b.Grow(len(s))
|
|
for _, r := range s {
|
|
if unicode.IsSpace(r) {
|
|
continue
|
|
}
|
|
b.WriteRune(r)
|
|
}
|
|
return b.String()
|
|
}
|
|
|
|
// waitForStabilityWindow runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) waitForStabilityWindow(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) waitForStabilityWindow(ctx context.Context) error {
|
|
window := time.Duration(o.cfg.Startup.ServiceChecklistStabilitySec) * time.Second
|
|
if window <= 0 {
|
|
return nil
|
|
}
|
|
poll := time.Duration(o.cfg.Startup.ServiceChecklistPollSeconds) * time.Second
|
|
if poll <= 0 {
|
|
poll = 5 * time.Second
|
|
}
|
|
deadline := time.Now().Add(window)
|
|
lastStatus := time.Time{}
|
|
lastRecycleAttempt := time.Time{}
|
|
lastReplicaHeal := time.Time{}
|
|
|
|
for {
|
|
o.maybeAutoRecycleStuckPods(ctx, &lastRecycleAttempt)
|
|
o.maybeAutoHealCriticalWorkloadReplicas(ctx, &lastReplicaHeal)
|
|
if err := o.startupStabilityHealthy(ctx); err != nil {
|
|
return fmt.Errorf("startup stability window failed: %w", err)
|
|
}
|
|
if time.Now().After(deadline) {
|
|
o.log.Printf("startup stability window passed (%s)", window)
|
|
return nil
|
|
}
|
|
if time.Since(lastStatus) >= 30*time.Second {
|
|
remaining := time.Until(deadline).Round(time.Second)
|
|
if remaining < 0 {
|
|
remaining = 0
|
|
}
|
|
o.log.Printf("startup stability soak in progress (%s remaining)", remaining)
|
|
lastStatus = time.Now()
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(poll):
|
|
}
|
|
}
|
|
}
|
|
|
|
// startupStabilityHealthy runs one orchestration or CLI step.
|
|
// Signature: (o *Orchestrator) startupStabilityHealthy(ctx context.Context) error.
|
|
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
|
|
func (o *Orchestrator) startupStabilityHealthy(ctx context.Context) error {
|
|
if o.cfg.Startup.RequireFluxHealth {
|
|
ready, detail, err := o.fluxHealthReady(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("flux check error: %w", err)
|
|
}
|
|
if !ready {
|
|
return fmt.Errorf("flux not ready: %s", detail)
|
|
}
|
|
}
|
|
if o.cfg.Startup.RequireWorkloadConvergence {
|
|
ready, detail, err := o.workloadConvergenceReady(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("workload check error: %w", err)
|
|
}
|
|
if !ready {
|
|
return fmt.Errorf("workloads not converged: %s", detail)
|
|
}
|
|
}
|
|
if o.cfg.Startup.RequireServiceChecklist {
|
|
ready, detail := o.serviceChecklistReady(ctx)
|
|
if !ready {
|
|
return fmt.Errorf("external services not healthy: %s", detail)
|
|
}
|
|
}
|
|
if o.cfg.Startup.RequireIngressChecklist {
|
|
ready, detail := o.ingressChecklistReady(ctx)
|
|
if !ready {
|
|
return fmt.Errorf("ingress reachability not healthy: %s", detail)
|
|
}
|
|
}
|
|
failures, err := o.startupFailurePods(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("pod failure check error: %w", err)
|
|
}
|
|
if len(failures) > 0 {
|
|
return fmt.Errorf("pods in crash/image-pull failures: %s", joinLimited(failures, 8))
|
|
}
|
|
return nil
|
|
}
|