metis/pkg/service/node_recovery.go

484 lines
12 KiB
Go

package service
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"time"
"metis/pkg/config"
)
// DesiredNodeMetadata captures the node identity Metis should preserve through
// recovery builds and re-assert after the node rejoins the cluster.
type DesiredNodeMetadata struct {
Node string `json:"node"`
Hostname string `json:"hostname,omitempty"`
CapturedAt time.Time `json:"captured_at,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
Taints []string `json:"taints,omitempty"`
Unschedulable bool `json:"unschedulable,omitempty"`
}
func (a *App) loadDesiredNodeMetadata() error {
data, err := os.ReadFile(a.settings.DesiredMetadataPath)
if err != nil {
return err
}
var desired map[string]DesiredNodeMetadata
if err := json.Unmarshal(data, &desired); err != nil {
return err
}
a.mu.Lock()
a.desiredMetadata = desired
a.mu.Unlock()
return nil
}
func (a *App) persistDesiredNodeMetadata() error {
a.mu.RLock()
data, err := json.MarshalIndent(a.desiredMetadata, "", " ")
a.mu.RUnlock()
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(a.settings.DesiredMetadataPath), 0o755); err != nil {
return err
}
return os.WriteFile(a.settings.DesiredMetadataPath, data, 0o644)
}
func (a *App) desiredMetadataForNode(node string) (DesiredNodeMetadata, bool) {
node = strings.TrimSpace(node)
if node == "" {
return DesiredNodeMetadata{}, false
}
a.mu.RLock()
defer a.mu.RUnlock()
desired, ok := a.desiredMetadata[node]
if !ok {
return DesiredNodeMetadata{}, false
}
return cloneDesiredNodeMetadata(desired), true
}
func (a *App) stageDesiredNodeMetadata(nodeName string) (DesiredNodeMetadata, error) {
nodeName = strings.TrimSpace(nodeName)
if nodeName == "" {
return DesiredNodeMetadata{}, fmt.Errorf("node metadata requires a node name")
}
nodeSpec, _, err := a.inventory.FindNode(nodeName)
if err != nil {
return DesiredNodeMetadata{}, err
}
cfg, err := config.Build(a.inventory, nodeName)
if err != nil {
return DesiredNodeMetadata{}, err
}
desired := DesiredNodeMetadata{
Node: nodeName,
Hostname: strings.TrimSpace(nodeSpec.Hostname),
CapturedAt: time.Now().UTC(),
Labels: filteredRestorableLabels(cfg.Labels),
Taints: restorableTaints(cfg.Taints),
}
if existing, ok := a.desiredMetadataForNode(nodeName); ok {
desired = mergeDesiredNodeMetadata(desired, existing)
}
if live, ok := liveClusterNode(nodeName); ok {
desired = mergeDesiredNodeMetadata(desired, desiredMetadataFromCluster(*live))
}
desired.Labels = normalizeStringMap(desired.Labels)
desired.Annotations = normalizeStringMap(desired.Annotations)
desired.Taints = normalizeTaints(desired.Taints)
a.mu.Lock()
if a.desiredMetadata == nil {
a.desiredMetadata = map[string]DesiredNodeMetadata{}
}
a.desiredMetadata[nodeName] = desired
a.mu.Unlock()
if err := a.persistDesiredNodeMetadata(); err != nil {
return DesiredNodeMetadata{}, err
}
return cloneDesiredNodeMetadata(desired), nil
}
func (a *App) syncDesiredNodeMetadata(record SnapshotRecord) error {
desired, ok := a.desiredMetadataForNode(record.Node)
if !ok {
return nil
}
live, ok := liveClusterNode(record.Node)
if !ok {
return nil
}
return patchDesiredNodeMetadata(*live, desired)
}
func desiredMetadataFromCluster(node clusterNode) DesiredNodeMetadata {
return DesiredNodeMetadata{
Node: strings.TrimSpace(node.Name),
Labels: filteredRestorableLabels(node.Labels),
Annotations: filteredRestorableAnnotations(node.Annotations),
Taints: restorableTaints(node.Taints),
Unschedulable: node.Unschedulable,
}
}
func mergeDesiredNodeMetadata(base, overlay DesiredNodeMetadata) DesiredNodeMetadata {
merged := cloneDesiredNodeMetadata(base)
if hostname := strings.TrimSpace(overlay.Hostname); hostname != "" {
merged.Hostname = hostname
}
if !overlay.CapturedAt.IsZero() {
merged.CapturedAt = overlay.CapturedAt
}
if merged.Labels == nil {
merged.Labels = map[string]string{}
}
for key, value := range overlay.Labels {
if key = strings.TrimSpace(key); key == "" {
continue
}
merged.Labels[key] = strings.TrimSpace(value)
}
if merged.Annotations == nil {
merged.Annotations = map[string]string{}
}
for key, value := range overlay.Annotations {
if key = strings.TrimSpace(key); key == "" {
continue
}
merged.Annotations[key] = strings.TrimSpace(value)
}
if len(overlay.Taints) > 0 {
merged.Taints = normalizeTaints(overlay.Taints)
}
merged.Unschedulable = overlay.Unschedulable
return merged
}
func patchDesiredNodeMetadata(live clusterNode, desired DesiredNodeMetadata) error {
node := strings.TrimSpace(desired.Node)
if node == "" {
node = strings.TrimSpace(live.Name)
}
if node == "" {
return nil
}
labelPatch := metadataStringPatch(live.Labels, desired.Labels, isRestorableLabel)
annotationPatch := metadataStringPatch(live.Annotations, desired.Annotations, isRestorableAnnotation)
mergedTaints := mergeLiveAndDesiredTaints(live.Taints, desired.Taints)
body := map[string]any{}
metadata := map[string]any{}
if len(labelPatch) > 0 {
metadata["labels"] = labelPatch
}
if len(annotationPatch) > 0 {
metadata["annotations"] = annotationPatch
}
if len(metadata) > 0 {
body["metadata"] = metadata
}
spec := map[string]any{}
if live.Unschedulable != desired.Unschedulable {
spec["unschedulable"] = desired.Unschedulable
}
if !sameTaints(live.Taints, mergedTaints) {
spec["taints"] = taintPatchPayload(mergedTaints)
}
if len(spec) > 0 {
body["spec"] = spec
}
if len(body) == 0 {
return nil
}
kube, err := kubeClientFactory()
if err != nil {
return err
}
return kube.mergePatch("/api/v1/nodes/"+node, body)
}
func metadataStringPatch(live, desired map[string]string, allow func(string) bool) map[string]any {
patch := map[string]any{}
for key, value := range desired {
key = strings.TrimSpace(key)
if key == "" || !allow(key) {
continue
}
value = strings.TrimSpace(value)
if strings.TrimSpace(live[key]) != value {
patch[key] = value
}
}
for key := range live {
key = strings.TrimSpace(key)
if key == "" || !allow(key) {
continue
}
if _, ok := desired[key]; !ok {
patch[key] = nil
}
}
return patch
}
func liveClusterNode(node string) (*clusterNode, bool) {
node = strings.TrimSpace(node)
if node == "" {
return nil, false
}
for _, live := range clusterNodes() {
if strings.TrimSpace(live.Name) == node {
copyNode := live
return &copyNode, true
}
}
return nil, false
}
func filteredRestorableLabels(values map[string]string) map[string]string {
filtered := map[string]string{}
for key, value := range values {
key = strings.TrimSpace(key)
if key == "" || !isRestorableLabel(key) {
continue
}
filtered[key] = strings.TrimSpace(value)
}
return filtered
}
func filteredRestorableAnnotations(values map[string]string) map[string]string {
filtered := map[string]string{}
for key, value := range values {
key = strings.TrimSpace(key)
if key == "" || !isRestorableAnnotation(key) {
continue
}
filtered[key] = strings.TrimSpace(value)
}
return filtered
}
func normalizeStringMap(values map[string]string) map[string]string {
if len(values) == 0 {
return nil
}
normalized := map[string]string{}
for key, value := range values {
key = strings.TrimSpace(key)
if key == "" {
continue
}
normalized[key] = strings.TrimSpace(value)
}
if len(normalized) == 0 {
return nil
}
return normalized
}
func restorableTaints(values []string) []string {
filtered := make([]string, 0, len(values))
for _, value := range values {
value = normalizeTaint(value)
if value == "" || !isRestorableTaint(value) {
continue
}
filtered = append(filtered, value)
}
return normalizeTaints(filtered)
}
func normalizeTaints(values []string) []string {
if len(values) == 0 {
return nil
}
seen := map[string]struct{}{}
out := make([]string, 0, len(values))
for _, value := range values {
value = normalizeTaint(value)
if value == "" {
continue
}
if _, ok := seen[value]; ok {
continue
}
seen[value] = struct{}{}
out = append(out, value)
}
sort.Strings(out)
if len(out) == 0 {
return nil
}
return out
}
func normalizeTaint(value string) string {
return strings.TrimSpace(value)
}
func sameTaints(left, right []string) bool {
left = normalizeTaints(left)
right = normalizeTaints(right)
if len(left) != len(right) {
return false
}
for idx := range left {
if left[idx] != right[idx] {
return false
}
}
return true
}
func mergeLiveAndDesiredTaints(live, desired []string) []string {
merged := make([]string, 0, len(live)+len(desired))
for _, taint := range live {
taint = normalizeTaint(taint)
if taint == "" || isRestorableTaint(taint) {
continue
}
merged = append(merged, taint)
}
merged = append(merged, restorableTaints(desired)...)
return normalizeTaints(merged)
}
func taintPatchPayload(values []string) []map[string]string {
payload := make([]map[string]string, 0, len(values))
for _, value := range normalizeTaints(values) {
key, taintValue, effect := splitTaint(value)
if key == "" {
continue
}
entry := map[string]string{"key": key}
if taintValue != "" {
entry["value"] = taintValue
}
if effect != "" {
entry["effect"] = effect
}
payload = append(payload, entry)
}
return payload
}
func splitTaint(raw string) (string, string, string) {
raw = strings.TrimSpace(raw)
if raw == "" {
return "", "", ""
}
effect := ""
body := raw
if idx := strings.LastIndex(raw, ":"); idx >= 0 {
body = strings.TrimSpace(raw[:idx])
effect = strings.TrimSpace(raw[idx+1:])
}
key := body
value := ""
if idx := strings.Index(body, "="); idx >= 0 {
key = strings.TrimSpace(body[:idx])
value = strings.TrimSpace(body[idx+1:])
}
return strings.TrimSpace(key), value, effect
}
func isRestorableTaint(raw string) bool {
key, _, _ := splitTaint(raw)
if key == "" {
return false
}
for _, prefix := range []string{
"node.kubernetes.io/",
"node.cloudprovider.kubernetes.io/",
"ToBeDeletedByClusterAutoscaler",
} {
if strings.HasPrefix(key, prefix) {
return false
}
}
return true
}
func isRestorableLabel(key string) bool {
key = strings.TrimSpace(key)
if key == "" {
return false
}
if strings.HasPrefix(key, "node-role.kubernetes.io/") {
return true
}
for _, prefix := range []string{
"kubernetes.io/",
"beta.kubernetes.io/",
"node.kubernetes.io/",
"topology.kubernetes.io/",
"feature.node.kubernetes.io/",
"failure-domain.beta.kubernetes.io/",
"nvidia.com/",
"k3s.io/",
"rke2.io/",
"volumes.kubernetes.io/",
"node.cloudprovider.kubernetes.io/",
} {
if strings.HasPrefix(key, prefix) {
return false
}
}
return true
}
func isRestorableAnnotation(key string) bool {
key = strings.TrimSpace(key)
if key == "" {
return false
}
for _, prefix := range []string{
"kubectl.kubernetes.io/",
"kubeadm.alpha.kubernetes.io/",
"kubernetes.io/",
"node.alpha.kubernetes.io/",
"node.kubernetes.io/",
"volumes.kubernetes.io/",
"csi.volume.kubernetes.io/",
"csi.storage.k8s.io/",
"flannel.alpha.coreos.com/",
"projectcalico.org/",
"rke2.io/",
"k3s.io/",
"nvidia.com/",
} {
if strings.HasPrefix(key, prefix) {
return false
}
}
return true
}
func cloneDesiredNodeMetadata(value DesiredNodeMetadata) DesiredNodeMetadata {
clone := value
clone.Labels = normalizeStringMap(value.Labels)
clone.Annotations = normalizeStringMap(value.Annotations)
clone.Taints = normalizeTaints(value.Taints)
return clone
}
func desiredNodeMetadataSyncEvent(node string, err error) Event {
return Event{
Time: time.Now().UTC(),
Kind: "sentinel.node-metadata",
Summary: fmt.Sprintf("Could not restore desired node metadata for %s", node),
Details: map[string]any{
"node": node,
"error": err.Error(),
},
}
}