503 lines
11 KiB
Go
503 lines
11 KiB
Go
package service
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"metis/pkg/facts"
|
|
"metis/pkg/inventory"
|
|
)
|
|
|
|
func (a *App) newJob(kind, node, host, device string) *Job {
|
|
job := &Job{
|
|
ID: fmt.Sprintf("%d", time.Now().UTC().UnixNano()),
|
|
Kind: kind,
|
|
Node: node,
|
|
Host: host,
|
|
Device: device,
|
|
Status: JobQueued,
|
|
ProgressPct: 0,
|
|
StartedAt: time.Now().UTC(),
|
|
UpdatedAt: time.Now().UTC(),
|
|
}
|
|
a.mu.Lock()
|
|
a.jobs[job.ID] = job
|
|
a.mu.Unlock()
|
|
return job
|
|
}
|
|
|
|
type activeNodeJobError struct {
|
|
Node string
|
|
Kind string
|
|
JobID string
|
|
}
|
|
|
|
func (e *activeNodeJobError) Error() string {
|
|
if e == nil {
|
|
return "node already has an active metis job"
|
|
}
|
|
return fmt.Sprintf("node %s already has an active %s job (%s)", e.Node, e.Kind, e.JobID)
|
|
}
|
|
|
|
func (a *App) activeJobForNode(node string) *Job {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
return a.activeJobForNodeLocked(node)
|
|
}
|
|
|
|
func (a *App) activeJobForNodeLocked(node string) *Job {
|
|
node = strings.TrimSpace(node)
|
|
if node == "" {
|
|
return nil
|
|
}
|
|
var active *Job
|
|
for _, job := range a.jobs {
|
|
if job == nil || strings.TrimSpace(job.Node) != node {
|
|
continue
|
|
}
|
|
if job.Status != JobQueued && job.Status != JobRunning {
|
|
continue
|
|
}
|
|
switch job.Kind {
|
|
case "build", "replace":
|
|
default:
|
|
continue
|
|
}
|
|
if active == nil || job.StartedAt.Before(active.StartedAt) {
|
|
active = job
|
|
}
|
|
}
|
|
if active == nil {
|
|
return nil
|
|
}
|
|
copyJob := *active
|
|
return ©Job
|
|
}
|
|
|
|
func (a *App) reserveJob(kind, node, host, device string) (*Job, error) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
if active := a.activeJobForNodeLocked(node); active != nil {
|
|
return nil, &activeNodeJobError{Node: node, Kind: active.Kind, JobID: active.ID}
|
|
}
|
|
now := time.Now().UTC()
|
|
job := &Job{
|
|
ID: fmt.Sprintf("%d", now.UnixNano()),
|
|
Kind: kind,
|
|
Node: node,
|
|
Host: host,
|
|
Device: device,
|
|
Status: JobQueued,
|
|
ProgressPct: 0,
|
|
StartedAt: now,
|
|
UpdatedAt: now,
|
|
}
|
|
a.jobs[job.ID] = job
|
|
return job, nil
|
|
}
|
|
|
|
func (a *App) job(id string) *Job {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
return a.jobs[id]
|
|
}
|
|
|
|
func (a *App) setJob(id string, update func(*Job)) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
job := a.jobs[id]
|
|
if job == nil {
|
|
return
|
|
}
|
|
update(job)
|
|
job.UpdatedAt = time.Now().UTC()
|
|
}
|
|
|
|
func (a *App) failJob(id string, err error) {
|
|
a.completeJob(id, func(j *Job) {
|
|
j.Status = JobError
|
|
j.Error = err.Error()
|
|
j.Message = err.Error()
|
|
})
|
|
}
|
|
|
|
func (a *App) completeJob(id string, update func(*Job)) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
job := a.jobs[id]
|
|
if job == nil {
|
|
return
|
|
}
|
|
update(job)
|
|
if job.Status != JobError {
|
|
job.Status = JobDone
|
|
}
|
|
job.UpdatedAt = time.Now().UTC()
|
|
job.FinishedAt = time.Now().UTC()
|
|
}
|
|
|
|
func (a *App) appendEvent(event Event) {
|
|
line, err := json.Marshal(event)
|
|
if err != nil {
|
|
return
|
|
}
|
|
f, err := os.OpenFile(a.settings.HistoryPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
_, _ = f.Write(append(line, '\n'))
|
|
}
|
|
|
|
func (a *App) recentEvents(limit int) []Event {
|
|
f, err := os.Open(a.settings.HistoryPath)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer f.Close()
|
|
events := make([]Event, 0, limit)
|
|
scanner := bufio.NewScanner(f)
|
|
for scanner.Scan() {
|
|
var event Event
|
|
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
|
|
continue
|
|
}
|
|
events = append(events, event)
|
|
}
|
|
if len(events) > limit {
|
|
events = events[len(events)-limit:]
|
|
}
|
|
for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 {
|
|
events[i], events[j] = events[j], events[i]
|
|
}
|
|
return events
|
|
}
|
|
|
|
func cachedImageName(source string) string {
|
|
return strings.TrimSuffix(filepath.Base(source), ".xz")
|
|
}
|
|
|
|
func (a *App) replacementNodes() []inventory.NodeSpec {
|
|
nodes := make([]inventory.NodeSpec, 0, len(a.inventory.Nodes))
|
|
for _, node := range a.inventory.Nodes {
|
|
spec, class, err := a.inventory.FindNode(node.Name)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if replacementReady(spec, class) {
|
|
nodes = append(nodes, node)
|
|
}
|
|
}
|
|
sort.Slice(nodes, func(i, j int) bool {
|
|
return nodes[i].Name < nodes[j].Name
|
|
})
|
|
return nodes
|
|
}
|
|
|
|
func (a *App) ensureReplacementReady(nodeName string) error {
|
|
node, class, err := a.inventory.FindNode(nodeName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if replacementReady(node, class) {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("node %s does not yet have a complete replacement definition", nodeName)
|
|
}
|
|
|
|
func replacementReady(node *inventory.NodeSpec, class *inventory.NodeClass) bool {
|
|
if node == nil || class == nil {
|
|
return false
|
|
}
|
|
if strings.TrimSpace(class.Image) == "" || strings.TrimSpace(class.Checksum) == "" {
|
|
return false
|
|
}
|
|
if strings.TrimSpace(node.Name) == "" || strings.TrimSpace(node.Hostname) == "" || strings.TrimSpace(node.IP) == "" {
|
|
return false
|
|
}
|
|
if strings.TrimSpace(node.K3sRole) == "" {
|
|
return false
|
|
}
|
|
if strings.TrimSpace(node.K3sRole) != "server" && strings.TrimSpace(node.K3sURL) == "" {
|
|
return false
|
|
}
|
|
if strings.TrimSpace(node.K3sToken) == "" {
|
|
return false
|
|
}
|
|
if strings.TrimSpace(node.SSHUser) == "" || len(node.SSHAuthorized) == 0 {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (a *App) flashHosts() []string {
|
|
hosts := map[string]struct{}{}
|
|
for _, host := range a.settings.FlashHosts {
|
|
if value := strings.TrimSpace(host); value != "" {
|
|
hosts[value] = struct{}{}
|
|
}
|
|
}
|
|
for _, host := range []string{a.settings.DefaultFlashHost, a.settings.LocalHost} {
|
|
if value := strings.TrimSpace(host); value != "" {
|
|
hosts[value] = struct{}{}
|
|
}
|
|
}
|
|
for _, node := range clusterNodes() {
|
|
if value := strings.TrimSpace(node.Name); value != "" {
|
|
hosts[value] = struct{}{}
|
|
}
|
|
}
|
|
out := make([]string, 0, len(hosts))
|
|
for host := range hosts {
|
|
out = append(out, host)
|
|
}
|
|
sort.Strings(out)
|
|
if a.settings.DefaultFlashHost == "" {
|
|
return out
|
|
}
|
|
return moveToFront(out, a.settings.DefaultFlashHost)
|
|
}
|
|
|
|
func (a *App) loadSnapshots() error {
|
|
data, err := os.ReadFile(a.settings.SnapshotsPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var snapshots map[string]SnapshotRecord
|
|
if err := json.Unmarshal(data, &snapshots); err != nil {
|
|
return err
|
|
}
|
|
a.mu.Lock()
|
|
a.snapshots = snapshots
|
|
a.mu.Unlock()
|
|
for _, snap := range snapshots {
|
|
a.metrics.RecordSnapshot(snap.Node, "ok", snap.CollectedAt)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *App) persistSnapshots() error {
|
|
a.mu.RLock()
|
|
data, err := json.MarshalIndent(a.snapshots, "", " ")
|
|
a.mu.RUnlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(a.settings.SnapshotsPath), 0o755); err != nil {
|
|
return err
|
|
}
|
|
return os.WriteFile(a.settings.SnapshotsPath, data, 0o644)
|
|
}
|
|
|
|
func (a *App) loadTargets() error {
|
|
data, err := os.ReadFile(a.settings.TargetsPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var targets map[string]facts.Targets
|
|
if err := json.Unmarshal(data, &targets); err != nil {
|
|
return err
|
|
}
|
|
a.mu.Lock()
|
|
a.targets = targets
|
|
a.mu.Unlock()
|
|
a.metrics.SetDriftTargets(targets, 0)
|
|
return nil
|
|
}
|
|
|
|
func (a *App) persistTargets() error {
|
|
a.mu.RLock()
|
|
data, err := json.MarshalIndent(a.targets, "", " ")
|
|
a.mu.RUnlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(a.settings.TargetsPath), 0o755); err != nil {
|
|
return err
|
|
}
|
|
return os.WriteFile(a.settings.TargetsPath, data, 0o644)
|
|
}
|
|
|
|
func diffTargets(prev, next map[string]facts.Targets) []string {
|
|
classes := map[string]struct{}{}
|
|
for class := range prev {
|
|
classes[class] = struct{}{}
|
|
}
|
|
for class := range next {
|
|
classes[class] = struct{}{}
|
|
}
|
|
out := make([]string, 0)
|
|
for class := range classes {
|
|
if !targetsEqual(prev[class], next[class]) {
|
|
out = append(out, class)
|
|
}
|
|
}
|
|
sort.Strings(out)
|
|
return out
|
|
}
|
|
|
|
func targetsEqual(a, b facts.Targets) bool {
|
|
if a.Kernel != b.Kernel || a.OSImage != b.OSImage || a.Containerd != b.Containerd || a.K3sVersion != b.K3sVersion {
|
|
return false
|
|
}
|
|
if len(a.Packages) != len(b.Packages) {
|
|
return false
|
|
}
|
|
for key, value := range a.Packages {
|
|
if b.Packages[key] != value {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func humanBytes(value int64) string {
|
|
const unit = 1024
|
|
if value < unit {
|
|
return fmt.Sprintf("%d B", value)
|
|
}
|
|
div, exp := int64(unit), 0
|
|
for n := value / unit; n >= unit; n /= unit {
|
|
div *= unit
|
|
exp++
|
|
}
|
|
return fmt.Sprintf("%.1f %ciB", float64(value)/float64(div), "KMGTPE"[exp])
|
|
}
|
|
|
|
func firstLine(value string) string {
|
|
value = strings.TrimSpace(value)
|
|
if idx := strings.IndexByte(value, '\n'); idx >= 0 {
|
|
return strings.TrimSpace(value[:idx])
|
|
}
|
|
return value
|
|
}
|
|
|
|
func preferredDevice(devices []Device) string {
|
|
if len(devices) == 0 {
|
|
return ""
|
|
}
|
|
return devices[0].Path
|
|
}
|
|
|
|
func errorString(err error) string {
|
|
if err == nil {
|
|
return ""
|
|
}
|
|
return err.Error()
|
|
}
|
|
|
|
func cloneDevices(devices []Device) []Device {
|
|
if len(devices) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]Device, len(devices))
|
|
copy(out, devices)
|
|
return out
|
|
}
|
|
|
|
func (a *App) cachedDevices(host string) ([]Device, error) {
|
|
host = strings.TrimSpace(host)
|
|
if host == "" {
|
|
host = a.settings.DefaultFlashHost
|
|
}
|
|
a.mu.RLock()
|
|
snapshot, ok := a.deviceStore[host]
|
|
a.mu.RUnlock()
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
if strings.TrimSpace(snapshot.Err) != "" {
|
|
return cloneDevices(snapshot.Devices), errors.New(snapshot.Err)
|
|
}
|
|
return cloneDevices(snapshot.Devices), nil
|
|
}
|
|
|
|
func (a *App) recordDevices(host string, devices []Device, err error) {
|
|
host = strings.TrimSpace(host)
|
|
if host == "" {
|
|
host = a.settings.DefaultFlashHost
|
|
}
|
|
snapshot := deviceSnapshot{
|
|
Devices: cloneDevices(devices),
|
|
CheckedAt: time.Now().UTC(),
|
|
}
|
|
if err != nil {
|
|
snapshot.Err = err.Error()
|
|
}
|
|
a.mu.Lock()
|
|
if existing, ok := a.deviceStore[host]; ok && len(snapshot.Devices) == 0 {
|
|
snapshot.Devices = cloneDevices(existing.Devices)
|
|
}
|
|
a.deviceStore[host] = snapshot
|
|
a.mu.Unlock()
|
|
}
|
|
|
|
func deviceScore(device Device) int {
|
|
score := 0
|
|
model := strings.ToLower(strings.TrimSpace(device.Model))
|
|
switch {
|
|
case strings.Contains(model, "microsd"), strings.Contains(model, "micro sd"):
|
|
score += 60
|
|
case strings.Contains(model, "sdxc"), strings.Contains(model, "sdhc"), strings.Contains(model, "sd "):
|
|
score += 50
|
|
case strings.Contains(model, "card"), strings.Contains(model, "reader"):
|
|
score += 40
|
|
}
|
|
if device.Removable {
|
|
score += 20
|
|
}
|
|
if device.Hotplug {
|
|
score += 10
|
|
}
|
|
if device.Transport == "usb" {
|
|
score += 5
|
|
}
|
|
if strings.HasPrefix(device.Name, "mmcblk") {
|
|
score += 25
|
|
}
|
|
return score
|
|
}
|
|
|
|
func moveToFront(values []string, preferred string) []string {
|
|
if preferred == "" || len(values) < 2 {
|
|
return values
|
|
}
|
|
out := append([]string{}, values...)
|
|
for idx, value := range out {
|
|
if value != preferred {
|
|
continue
|
|
}
|
|
copy(out[1:idx+1], out[:idx])
|
|
out[0] = preferred
|
|
return out
|
|
}
|
|
return out
|
|
}
|
|
|
|
func deleteNodeObject(node string) error {
|
|
if err := deleteNodeObjectInCluster(node); err == nil {
|
|
return nil
|
|
}
|
|
cmd := exec.Command("kubectl", "delete", "node", node, "--ignore-not-found")
|
|
if out, err := cmd.CombinedOutput(); err != nil {
|
|
return fmt.Errorf("delete node: %w: %s", err, strings.TrimSpace(string(out)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func deleteNodeObjectInCluster(node string) error {
|
|
kube, err := kubeClientFactory()
|
|
if err != nil {
|
|
return errors.New("not running in cluster")
|
|
}
|
|
return kube.deleteRequest(fmt.Sprintf("/api/v1/nodes/%s", node))
|
|
}
|