metis/pkg/service/app_helpers.go

425 lines
9.3 KiB
Go
Raw Normal View History

package service
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"sort"
"strings"
"time"
"metis/pkg/facts"
"metis/pkg/inventory"
)
func (a *App) newJob(kind, node, host, device string) *Job {
job := &Job{
ID: fmt.Sprintf("%d", time.Now().UTC().UnixNano()),
Kind: kind,
Node: node,
Host: host,
Device: device,
Status: JobQueued,
ProgressPct: 0,
StartedAt: time.Now().UTC(),
UpdatedAt: time.Now().UTC(),
}
a.mu.Lock()
a.jobs[job.ID] = job
a.mu.Unlock()
return job
}
type activeNodeJobError struct {
Node string
Kind string
JobID string
}
// Error reports that a replacement-capable job is already active for the node.
func (e *activeNodeJobError) Error() string {
if e == nil {
return "node already has an active metis job"
}
return fmt.Sprintf("node %s already has an active %s job (%s)", e.Node, e.Kind, e.JobID)
}
func (a *App) activeJobForNode(node string) *Job {
a.mu.RLock()
defer a.mu.RUnlock()
return a.activeJobForNodeLocked(node)
}
func (a *App) activeJobForNodeLocked(node string) *Job {
node = strings.TrimSpace(node)
if node == "" {
return nil
}
var active *Job
for _, job := range a.jobs {
if job == nil || strings.TrimSpace(job.Node) != node {
continue
}
if job.Status != JobQueued && job.Status != JobRunning {
continue
}
switch job.Kind {
case "build", "replace":
default:
continue
}
if active == nil || job.StartedAt.Before(active.StartedAt) {
active = job
}
}
if active == nil {
return nil
}
copyJob := *active
return &copyJob
}
func (a *App) reserveJob(kind, node, host, device string) (*Job, error) {
a.mu.Lock()
defer a.mu.Unlock()
if active := a.activeJobForNodeLocked(node); active != nil {
return nil, &activeNodeJobError{Node: node, Kind: active.Kind, JobID: active.ID}
}
now := time.Now().UTC()
job := &Job{
ID: fmt.Sprintf("%d", now.UnixNano()),
Kind: kind,
Node: node,
Host: host,
Device: device,
Status: JobQueued,
ProgressPct: 0,
StartedAt: now,
UpdatedAt: now,
}
a.jobs[job.ID] = job
return job, nil
}
func (a *App) job(id string) *Job {
a.mu.RLock()
defer a.mu.RUnlock()
return a.jobs[id]
}
func (a *App) setJob(id string, update func(*Job)) {
a.mu.Lock()
defer a.mu.Unlock()
job := a.jobs[id]
if job == nil {
return
}
update(job)
job.UpdatedAt = time.Now().UTC()
}
func (a *App) failJob(id string, err error) {
a.completeJob(id, func(j *Job) {
j.Status = JobError
j.Error = err.Error()
j.Message = err.Error()
})
}
func (a *App) completeJob(id string, update func(*Job)) {
a.mu.Lock()
defer a.mu.Unlock()
job := a.jobs[id]
if job == nil {
return
}
update(job)
if job.Status != JobError {
job.Status = JobDone
}
job.UpdatedAt = time.Now().UTC()
job.FinishedAt = time.Now().UTC()
}
func (a *App) appendEvent(event Event) {
line, err := json.Marshal(event)
if err != nil {
return
}
f, err := os.OpenFile(a.settings.HistoryPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
return
}
defer f.Close()
_, _ = f.Write(append(line, '\n'))
}
func (a *App) recentEvents(limit int) []Event {
f, err := os.Open(a.settings.HistoryPath)
if err != nil {
return nil
}
defer f.Close()
events := make([]Event, 0, limit)
scanner := bufio.NewScanner(f)
for scanner.Scan() {
var event Event
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
continue
}
events = append(events, event)
}
if len(events) > limit {
events = events[len(events)-limit:]
}
for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 {
events[i], events[j] = events[j], events[i]
}
return events
}
func cachedImageName(source string) string {
return strings.TrimSuffix(filepath.Base(source), ".xz")
}
func (a *App) replacementNodes() []inventory.NodeSpec {
nodes := make([]inventory.NodeSpec, 0, len(a.inventory.Nodes))
for _, node := range a.inventory.Nodes {
spec, class, err := a.inventory.FindNode(node.Name)
if err != nil {
continue
}
if replacementReady(spec, class) {
nodes = append(nodes, node)
}
}
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Name < nodes[j].Name
})
return nodes
}
func (a *App) ensureReplacementReady(nodeName string) error {
node, class, err := a.inventory.FindNode(nodeName)
if err != nil {
return err
}
if replacementReady(node, class) {
return nil
}
return fmt.Errorf("node %s does not yet have a complete replacement definition", nodeName)
}
func replacementReady(node *inventory.NodeSpec, class *inventory.NodeClass) bool {
if node == nil || class == nil {
return false
}
if strings.TrimSpace(class.Image) == "" || strings.TrimSpace(class.Checksum) == "" {
return false
}
if strings.TrimSpace(node.Name) == "" || strings.TrimSpace(node.Hostname) == "" || strings.TrimSpace(node.IP) == "" {
return false
}
if strings.TrimSpace(node.K3sRole) == "" {
return false
}
if strings.TrimSpace(node.K3sRole) != "server" && strings.TrimSpace(node.K3sURL) == "" {
return false
}
if strings.TrimSpace(node.K3sToken) == "" {
return false
}
if strings.TrimSpace(node.SSHUser) == "" || len(node.SSHAuthorized) == 0 {
return false
}
return true
}
func (a *App) flashHosts() []string {
hosts := map[string]struct{}{}
for _, host := range a.settings.FlashHosts {
if value := strings.TrimSpace(host); value != "" {
hosts[value] = struct{}{}
}
}
for _, host := range []string{a.settings.DefaultFlashHost, a.settings.LocalHost} {
if value := strings.TrimSpace(host); value != "" {
hosts[value] = struct{}{}
}
}
for _, node := range clusterNodes() {
if value := strings.TrimSpace(node.Name); value != "" {
hosts[value] = struct{}{}
}
}
out := make([]string, 0, len(hosts))
for host := range hosts {
out = append(out, host)
}
sort.Strings(out)
if a.settings.DefaultFlashHost == "" {
return out
}
return moveToFront(out, a.settings.DefaultFlashHost)
}
func (a *App) loadSnapshots() error {
data, err := os.ReadFile(a.settings.SnapshotsPath)
if err != nil {
return err
}
var snapshots map[string]SnapshotRecord
if err := json.Unmarshal(data, &snapshots); err != nil {
return err
}
a.mu.Lock()
a.snapshots = snapshots
a.mu.Unlock()
for _, snap := range snapshots {
a.metrics.RecordSnapshot(snap.Node, "ok", snap.CollectedAt)
}
return nil
}
func (a *App) persistSnapshots() error {
a.mu.RLock()
data, err := json.MarshalIndent(a.snapshots, "", " ")
a.mu.RUnlock()
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(a.settings.SnapshotsPath), 0o755); err != nil {
return err
}
return os.WriteFile(a.settings.SnapshotsPath, data, 0o644)
}
func (a *App) loadTargets() error {
data, err := os.ReadFile(a.settings.TargetsPath)
if err != nil {
return err
}
var targets map[string]facts.Targets
if err := json.Unmarshal(data, &targets); err != nil {
return err
}
a.mu.Lock()
a.targets = targets
a.mu.Unlock()
a.metrics.SetDriftTargets(targets, 0)
return nil
}
func (a *App) persistTargets() error {
a.mu.RLock()
data, err := json.MarshalIndent(a.targets, "", " ")
a.mu.RUnlock()
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(a.settings.TargetsPath), 0o755); err != nil {
return err
}
return os.WriteFile(a.settings.TargetsPath, data, 0o644)
}
func diffTargets(prev, next map[string]facts.Targets) []string {
classes := map[string]struct{}{}
for class := range prev {
classes[class] = struct{}{}
}
for class := range next {
classes[class] = struct{}{}
}
out := make([]string, 0)
for class := range classes {
if !targetsEqual(prev[class], next[class]) {
out = append(out, class)
}
}
sort.Strings(out)
return out
}
func targetsEqual(a, b facts.Targets) bool {
if a.Kernel != b.Kernel || a.OSImage != b.OSImage || a.Containerd != b.Containerd || a.K3sVersion != b.K3sVersion {
return false
}
if len(a.Packages) != len(b.Packages) {
return false
}
for key, value := range a.Packages {
if b.Packages[key] != value {
return false
}
}
return true
}
func humanBytes(value int64) string {
const unit = 1024
if value < unit {
return fmt.Sprintf("%d B", value)
}
div, exp := int64(unit), 0
for n := value / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %ciB", float64(value)/float64(div), "KMGTPE"[exp])
}
func firstLine(value string) string {
value = strings.TrimSpace(value)
if idx := strings.IndexByte(value, '\n'); idx >= 0 {
return strings.TrimSpace(value[:idx])
}
return value
}
func preferredDevice(devices []Device) string {
if len(devices) == 0 {
return ""
}
return devices[0].Path
}
func errorString(err error) string {
if err == nil {
return ""
}
return err.Error()
}
func cloneDevices(devices []Device) []Device {
if len(devices) == 0 {
return nil
}
out := make([]Device, len(devices))
copy(out, devices)
return out
}
func deleteNodeObject(node string) error {
if err := deleteNodeObjectInCluster(node); err == nil {
return nil
}
cmd := exec.Command("kubectl", "delete", "node", node, "--ignore-not-found")
if out, err := cmd.CombinedOutput(); err != nil {
return fmt.Errorf("delete node: %w: %s", err, strings.TrimSpace(string(out)))
}
return nil
}
func deleteNodeObjectInCluster(node string) error {
kube, err := kubeClientFactory()
if err != nil {
return errors.New("not running in cluster")
}
return kube.deleteRequest(fmt.Sprintf("/api/v1/nodes/%s", node))
}