package service import ( "bufio" "encoding/json" "errors" "fmt" "os" "os/exec" "path/filepath" "sort" "strings" "time" "metis/pkg/facts" "metis/pkg/inventory" ) func (a *App) newJob(kind, node, host, device string) *Job { job := &Job{ ID: fmt.Sprintf("%d", time.Now().UTC().UnixNano()), Kind: kind, Node: node, Host: host, Device: device, Status: JobQueued, ProgressPct: 0, StartedAt: time.Now().UTC(), UpdatedAt: time.Now().UTC(), } a.mu.Lock() a.jobs[job.ID] = job a.mu.Unlock() return job } type activeNodeJobError struct { Node string Kind string JobID string } // Error reports that a replacement-capable job is already active for the node. func (e *activeNodeJobError) Error() string { if e == nil { return "node already has an active metis job" } return fmt.Sprintf("node %s already has an active %s job (%s)", e.Node, e.Kind, e.JobID) } func (a *App) activeJobForNode(node string) *Job { a.mu.RLock() defer a.mu.RUnlock() return a.activeJobForNodeLocked(node) } func (a *App) activeJobForNodeLocked(node string) *Job { node = strings.TrimSpace(node) if node == "" { return nil } var active *Job for _, job := range a.jobs { if job == nil || strings.TrimSpace(job.Node) != node { continue } if job.Status != JobQueued && job.Status != JobRunning { continue } switch job.Kind { case "build", "flash", "replace": default: continue } if active == nil || job.StartedAt.Before(active.StartedAt) { active = job } } if active == nil { return nil } copyJob := *active return ©Job } func (a *App) reserveJob(kind, node, host, device string) (*Job, error) { a.mu.Lock() defer a.mu.Unlock() if active := a.activeJobForNodeLocked(node); active != nil { return nil, &activeNodeJobError{Node: node, Kind: active.Kind, JobID: active.ID} } now := time.Now().UTC() job := &Job{ ID: fmt.Sprintf("%d", now.UnixNano()), Kind: kind, Node: node, Host: host, Device: device, Status: JobQueued, ProgressPct: 0, StartedAt: now, UpdatedAt: now, } a.jobs[job.ID] = job return job, nil } func (a *App) job(id string) *Job { a.mu.RLock() defer a.mu.RUnlock() return a.jobs[id] } func (a *App) setJob(id string, update func(*Job)) { a.mu.Lock() defer a.mu.Unlock() job := a.jobs[id] if job == nil { return } update(job) job.UpdatedAt = time.Now().UTC() } func (a *App) failJob(id string, err error) { a.completeJob(id, func(j *Job) { j.Status = JobError j.Error = err.Error() j.Message = err.Error() }) } func (a *App) completeJob(id string, update func(*Job)) { a.mu.Lock() defer a.mu.Unlock() job := a.jobs[id] if job == nil { return } update(job) if job.Status != JobError { job.Status = JobDone } job.UpdatedAt = time.Now().UTC() job.FinishedAt = time.Now().UTC() } func (a *App) appendEvent(event Event) { line, err := json.Marshal(event) if err != nil { return } f, err := os.OpenFile(a.settings.HistoryPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644) if err != nil { return } defer f.Close() _, _ = f.Write(append(line, '\n')) } func (a *App) recentEvents(limit int) []Event { f, err := os.Open(a.settings.HistoryPath) if err != nil { return nil } defer f.Close() events := make([]Event, 0, limit) scanner := bufio.NewScanner(f) for scanner.Scan() { var event Event if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { continue } events = append(events, event) } if len(events) > limit { events = events[len(events)-limit:] } for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 { events[i], events[j] = events[j], events[i] } return events } func cachedImageName(source string) string { return strings.TrimSuffix(filepath.Base(source), ".xz") } func (a *App) replacementNodes() []inventory.NodeSpec { nodes := make([]inventory.NodeSpec, 0, len(a.inventory.Nodes)) for _, node := range a.inventory.Nodes { spec, class, err := a.inventory.FindNode(node.Name) if err != nil { continue } if replacementReady(spec, class) { nodes = append(nodes, node) } } sort.Slice(nodes, func(i, j int) bool { return nodes[i].Name < nodes[j].Name }) return nodes } func (a *App) ensureReplacementReady(nodeName string) error { node, class, err := a.inventory.FindNode(nodeName) if err != nil { return err } if replacementReady(node, class) { return nil } return fmt.Errorf("node %s does not yet have a complete replacement definition", nodeName) } func replacementReady(node *inventory.NodeSpec, class *inventory.NodeClass) bool { if node == nil || class == nil { return false } if strings.TrimSpace(class.Image) == "" || strings.TrimSpace(class.Checksum) == "" { return false } if strings.TrimSpace(node.Name) == "" || strings.TrimSpace(node.Hostname) == "" || strings.TrimSpace(node.IP) == "" { return false } if strings.TrimSpace(node.K3sRole) == "" { return false } if strings.TrimSpace(node.K3sRole) != "server" && strings.TrimSpace(node.K3sURL) == "" { return false } if strings.TrimSpace(node.K3sToken) == "" { return false } if strings.TrimSpace(node.SSHUser) == "" || len(node.SSHAuthorized) == 0 { return false } return true } func (a *App) flashHosts() []string { hosts := map[string]struct{}{} for _, host := range a.settings.FlashHosts { if value := strings.TrimSpace(host); value != "" { hosts[value] = struct{}{} } } for _, host := range []string{a.settings.DefaultFlashHost, a.settings.LocalHost} { if value := strings.TrimSpace(host); value != "" { hosts[value] = struct{}{} } } for _, node := range clusterNodes() { if value := strings.TrimSpace(node.Name); value != "" { hosts[value] = struct{}{} } } out := make([]string, 0, len(hosts)) for host := range hosts { out = append(out, host) } sort.Strings(out) if a.settings.DefaultFlashHost == "" { return out } return moveToFront(out, a.settings.DefaultFlashHost) } func (a *App) loadSnapshots() error { data, err := os.ReadFile(a.settings.SnapshotsPath) if err != nil { return err } var snapshots map[string]SnapshotRecord if err := json.Unmarshal(data, &snapshots); err != nil { return err } a.mu.Lock() a.snapshots = snapshots a.mu.Unlock() for _, snap := range snapshots { a.metrics.RecordSnapshot(snap.Node, "ok", snap.CollectedAt) } return nil } func (a *App) persistSnapshots() error { a.mu.RLock() data, err := json.MarshalIndent(a.snapshots, "", " ") a.mu.RUnlock() if err != nil { return err } if err := os.MkdirAll(filepath.Dir(a.settings.SnapshotsPath), 0o755); err != nil { return err } return os.WriteFile(a.settings.SnapshotsPath, data, 0o644) } func (a *App) loadTargets() error { data, err := os.ReadFile(a.settings.TargetsPath) if err != nil { return err } var targets map[string]facts.Targets if err := json.Unmarshal(data, &targets); err != nil { return err } a.mu.Lock() a.targets = targets a.mu.Unlock() a.metrics.SetDriftTargets(targets, 0) return nil } func (a *App) persistTargets() error { a.mu.RLock() data, err := json.MarshalIndent(a.targets, "", " ") a.mu.RUnlock() if err != nil { return err } if err := os.MkdirAll(filepath.Dir(a.settings.TargetsPath), 0o755); err != nil { return err } return os.WriteFile(a.settings.TargetsPath, data, 0o644) } func diffTargets(prev, next map[string]facts.Targets) []string { classes := map[string]struct{}{} for class := range prev { classes[class] = struct{}{} } for class := range next { classes[class] = struct{}{} } out := make([]string, 0) for class := range classes { if !targetsEqual(prev[class], next[class]) { out = append(out, class) } } sort.Strings(out) return out } func targetsEqual(a, b facts.Targets) bool { if a.Kernel != b.Kernel || a.OSImage != b.OSImage || a.Containerd != b.Containerd || a.K3sVersion != b.K3sVersion { return false } if len(a.Packages) != len(b.Packages) { return false } for key, value := range a.Packages { if b.Packages[key] != value { return false } } return true } func humanBytes(value int64) string { const unit = 1024 if value < unit { return fmt.Sprintf("%d B", value) } div, exp := int64(unit), 0 for n := value / unit; n >= unit; n /= unit { div *= unit exp++ } return fmt.Sprintf("%.1f %ciB", float64(value)/float64(div), "KMGTPE"[exp]) } func firstLine(value string) string { value = strings.TrimSpace(value) if idx := strings.IndexByte(value, '\n'); idx >= 0 { return strings.TrimSpace(value[:idx]) } return value } func preferredDevice(devices []Device) string { if len(devices) == 0 { return "" } return devices[0].Path } func errorString(err error) string { if err == nil { return "" } return err.Error() } func cloneDevices(devices []Device) []Device { if len(devices) == 0 { return nil } out := make([]Device, len(devices)) copy(out, devices) return out } func deleteNodeObject(node string) error { if err := deleteNodeObjectInCluster(node); err == nil { return nil } cmd := exec.Command("kubectl", "delete", "node", node, "--ignore-not-found") if out, err := cmd.CombinedOutput(); err != nil { return fmt.Errorf("delete node: %w: %s", err, strings.TrimSpace(string(out))) } return nil } func deleteNodeObjectInCluster(node string) error { kube, err := kubeClientFactory() if err != nil { return errors.New("not running in cluster") } return kube.deleteRequest(fmt.Sprintf("/api/v1/nodes/%s", node)) }