ananke/internal/cluster/orchestrator_core.go

125 lines
4.5 KiB
Go

package cluster
import (
"context"
"errors"
"log"
"regexp"
"sync"
"time"
"scm.bstein.dev/bstein/ananke/internal/config"
"scm.bstein.dev/bstein/ananke/internal/execx"
"scm.bstein.dev/bstein/ananke/internal/state"
)
type Orchestrator struct {
cfg config.Config
runner *execx.Runner
store *state.Store
log *log.Logger
runOverride func(timeoutCtx context.Context, timeout time.Duration, name string, args ...string) (string, error)
runSensitiveOverride func(timeoutCtx context.Context, timeout time.Duration, name string, args ...string) (string, error)
startupReportMu sync.Mutex
activeStartupReport *startupReport
}
type commandOverrideFunc func(timeoutCtx context.Context, timeout time.Duration, name string, args ...string) (string, error)
type StartupOptions struct {
ForceFluxBranch string
SkipLocalBootstrap bool
Reason string
}
type ShutdownOptions struct {
SkipEtcdSnapshot bool
SkipDrain bool
Mode string
Reason string
}
type EtcdRestoreOptions struct {
ControlPlane string
SnapshotPath string
}
type startupWorkload struct {
Namespace string
Kind string
Name string
}
type workloadScaleEntry struct {
Namespace string `json:"namespace"`
Kind string `json:"kind"`
Name string `json:"name"`
Replicas int `json:"replicas"`
}
type remotePeerStatus struct {
Intent state.Intent
BootstrapActive bool
}
type workloadScaleSnapshot struct {
GeneratedAt time.Time `json:"generated_at"`
Entries []workloadScaleEntry `json:"entries"`
}
type startupReport struct {
StartedAt time.Time `json:"started_at"`
Completed time.Time `json:"completed_at"`
Reason string `json:"reason"`
Status string `json:"status"`
Phase string `json:"phase"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Checks map[string]startupCheckRecord `json:"checks"`
AutoHeals []string `json:"auto_heals"`
SourceHost string `json:"source_host"`
LastUpdated time.Time `json:"last_updated"`
}
type startupCheckRecord struct {
Status string `json:"status"`
Detail string `json:"detail"`
UpdatedAt time.Time `json:"updated_at"`
}
var datastoreEndpointPattern = regexp.MustCompile(`--datastore-endpoint(?:=|\s+)(?:'([^']+)'|"([^"]+)"|([^\s\\]+))`)
var criticalStartupWorkloads = []startupWorkload{
{Namespace: "flux-system", Kind: "deployment", Name: "source-controller"},
{Namespace: "flux-system", Kind: "deployment", Name: "kustomize-controller"},
{Namespace: "flux-system", Kind: "deployment", Name: "helm-controller"},
{Namespace: "flux-system", Kind: "deployment", Name: "notification-controller"},
{Namespace: "vault", Kind: "statefulset", Name: "vault"},
{Namespace: "postgres", Kind: "statefulset", Name: "postgres"},
{Namespace: "gitea", Kind: "deployment", Name: "gitea"},
{Namespace: "monitoring", Kind: "deployment", Name: "grafana"},
{Namespace: "monitoring", Kind: "statefulset", Name: "victoria-metrics-single-server"},
{Namespace: "monitoring", Kind: "deployment", Name: "kube-state-metrics"},
{Namespace: "logging", Kind: "deployment", Name: "oauth2-proxy-logs"},
{Namespace: "logging", Kind: "deployment", Name: "opensearch-dashboards"},
{Namespace: "logging", Kind: "statefulset", Name: "opensearch"},
}
var ErrEtcdRestoreNotApplicable = errors.New("etcd restore not applicable")
// New runs one orchestration or CLI step.
// Signature: New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator.
// Why: keeps behavior explicit so startup/shutdown workflows remain maintainable as services evolve.
func New(cfg config.Config, runner *execx.Runner, store *state.Store, logger *log.Logger) *Orchestrator {
return &Orchestrator{cfg: cfg, runner: runner, store: store, log: logger}
}
// SetCommandOverrides runs one orchestration or CLI step.
// Signature: (o *Orchestrator) SetCommandOverrides(run commandOverrideFunc, runSensitive commandOverrideFunc).
// Why: enables deterministic integration testing from the top-level testing module
// without requiring package-local test files or live cluster dependencies.
func (o *Orchestrator) SetCommandOverrides(run commandOverrideFunc, runSensitive commandOverrideFunc) {
o.runOverride = run
o.runSensitiveOverride = runSensitive
}