961 lines
25 KiB
Go
961 lines
25 KiB
Go
package service
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"metis/pkg/facts"
|
|
"metis/pkg/image"
|
|
"metis/pkg/inventory"
|
|
"metis/pkg/plan"
|
|
"metis/pkg/sentinel"
|
|
"metis/pkg/writer"
|
|
)
|
|
|
|
type JobStatus string
|
|
|
|
const (
|
|
JobQueued JobStatus = "queued"
|
|
JobRunning JobStatus = "running"
|
|
JobDone JobStatus = "done"
|
|
JobError JobStatus = "error"
|
|
)
|
|
|
|
// Device describes a flashable block device.
|
|
type Device struct {
|
|
Name string `json:"name"`
|
|
Path string `json:"path"`
|
|
Model string `json:"model,omitempty"`
|
|
Transport string `json:"transport,omitempty"`
|
|
Type string `json:"type,omitempty"`
|
|
Removable bool `json:"removable"`
|
|
Hotplug bool `json:"hotplug"`
|
|
SizeBytes int64 `json:"size_bytes"`
|
|
}
|
|
|
|
// Job is a long-running Metis action visible in the UI.
|
|
type Job struct {
|
|
ID string `json:"id"`
|
|
Kind string `json:"kind"`
|
|
Node string `json:"node,omitempty"`
|
|
Host string `json:"host,omitempty"`
|
|
Device string `json:"device,omitempty"`
|
|
Status JobStatus `json:"status"`
|
|
Stage string `json:"stage,omitempty"`
|
|
Message string `json:"message,omitempty"`
|
|
Artifact string `json:"artifact,omitempty"`
|
|
ProgressPct float64 `json:"progress_pct"`
|
|
Written int64 `json:"written_bytes,omitempty"`
|
|
Total int64 `json:"total_bytes,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
StartedAt time.Time `json:"started_at"`
|
|
FinishedAt time.Time `json:"finished_at,omitempty"`
|
|
}
|
|
|
|
// Event is a user-facing activity item for recent changes and runs.
|
|
type Event struct {
|
|
Time time.Time `json:"time"`
|
|
Kind string `json:"kind"`
|
|
Summary string `json:"summary"`
|
|
Details map[string]any `json:"details,omitempty"`
|
|
}
|
|
|
|
// SnapshotRecord stores the last fact snapshot pushed by a node sentinel.
|
|
type SnapshotRecord struct {
|
|
Node string `json:"node"`
|
|
CollectedAt time.Time `json:"collected_at"`
|
|
Snapshot sentinel.Snapshot `json:"snapshot"`
|
|
}
|
|
|
|
// PageState is the UI/API view model.
|
|
type PageState struct {
|
|
LocalHost string `json:"local_host"`
|
|
DefaultFlashHost string `json:"default_flash_host"`
|
|
SelectedHost string `json:"selected_host"`
|
|
FlashHosts []string `json:"flash_hosts"`
|
|
Nodes []inventory.NodeSpec `json:"nodes"`
|
|
Jobs []*Job `json:"jobs"`
|
|
Devices []Device `json:"devices"`
|
|
PreferredDevice string `json:"preferred_device,omitempty"`
|
|
DeviceError string `json:"device_error,omitempty"`
|
|
Events []Event `json:"events"`
|
|
Snapshots []SnapshotRecord `json:"snapshots"`
|
|
Targets map[string]facts.Targets `json:"targets"`
|
|
Artifacts map[string]ArtifactSummary `json:"artifacts"`
|
|
}
|
|
|
|
// ArtifactSummary describes the latest built image for a node.
|
|
type ArtifactSummary struct {
|
|
Path string `json:"path"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
SizeBytes int64 `json:"size_bytes"`
|
|
}
|
|
|
|
// App coordinates builds, flashes, sentinel snapshots, and the web UI state.
|
|
type App struct {
|
|
settings Settings
|
|
inventory *inventory.Inventory
|
|
metrics *Metrics
|
|
|
|
mu sync.RWMutex
|
|
jobs map[string]*Job
|
|
snapshots map[string]SnapshotRecord
|
|
targets map[string]facts.Targets
|
|
}
|
|
|
|
// NewApp creates a Metis service app instance.
|
|
func NewApp(settings Settings) (*App, error) {
|
|
if err := os.MkdirAll(settings.CacheDir, 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := os.MkdirAll(settings.ArtifactDir, 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(settings.HistoryPath), 0o755); err != nil {
|
|
return nil, err
|
|
}
|
|
inv, err := inventory.Load(settings.InventoryPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
app := &App{
|
|
settings: settings,
|
|
inventory: inv,
|
|
metrics: NewMetrics(),
|
|
jobs: map[string]*Job{},
|
|
snapshots: map[string]SnapshotRecord{},
|
|
targets: map[string]facts.Targets{},
|
|
}
|
|
_ = app.loadSnapshots()
|
|
_ = app.loadTargets()
|
|
return app, nil
|
|
}
|
|
|
|
// State returns the current UI/API snapshot.
|
|
func (a *App) State(deviceHost string) PageState {
|
|
if strings.TrimSpace(deviceHost) == "" {
|
|
deviceHost = a.settings.DefaultFlashHost
|
|
}
|
|
a.mu.RLock()
|
|
jobs := make([]*Job, 0, len(a.jobs))
|
|
for _, job := range a.jobs {
|
|
copyJob := *job
|
|
jobs = append(jobs, ©Job)
|
|
}
|
|
sort.Slice(jobs, func(i, j int) bool {
|
|
return jobs[i].StartedAt.After(jobs[j].StartedAt)
|
|
})
|
|
|
|
snaps := make([]SnapshotRecord, 0, len(a.snapshots))
|
|
for _, snap := range a.snapshots {
|
|
snaps = append(snaps, snap)
|
|
}
|
|
aTargets := map[string]facts.Targets{}
|
|
for key, value := range a.targets {
|
|
aTargets[key] = value
|
|
}
|
|
a.mu.RUnlock()
|
|
|
|
sort.Slice(snaps, func(i, j int) bool {
|
|
return snaps[i].Node < snaps[j].Node
|
|
})
|
|
|
|
flashHosts := a.flashHosts()
|
|
devices, deviceErr := a.ListDevices(deviceHost)
|
|
preferredDevice := preferredDevice(devices)
|
|
return PageState{
|
|
LocalHost: a.settings.LocalHost,
|
|
DefaultFlashHost: a.settings.DefaultFlashHost,
|
|
SelectedHost: deviceHost,
|
|
FlashHosts: flashHosts,
|
|
Nodes: append([]inventory.NodeSpec{}, a.inventory.Nodes...),
|
|
Jobs: jobs,
|
|
Devices: devices,
|
|
PreferredDevice: preferredDevice,
|
|
DeviceError: errorString(deviceErr),
|
|
Events: a.recentEvents(40),
|
|
Snapshots: snaps,
|
|
Targets: aTargets,
|
|
Artifacts: a.artifacts(),
|
|
}
|
|
}
|
|
|
|
// Build starts a background image build for a node.
|
|
func (a *App) Build(node string) (*Job, error) {
|
|
if _, _, err := a.inventory.FindNode(node); err != nil {
|
|
return nil, err
|
|
}
|
|
job := a.newJob("build", node, "", "")
|
|
go a.runBuild(job, false)
|
|
return job, nil
|
|
}
|
|
|
|
// Replace starts a background build+flash workflow for a node.
|
|
func (a *App) Replace(node, host, device string) (*Job, error) {
|
|
if host == "" {
|
|
host = a.settings.DefaultFlashHost
|
|
}
|
|
if _, _, err := a.inventory.FindNode(node); err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := a.ensureDevice(host, device); err != nil {
|
|
return nil, err
|
|
}
|
|
job := a.newJob("replace", node, host, device)
|
|
go a.runBuild(job, true)
|
|
return job, nil
|
|
}
|
|
|
|
// StoreSnapshot records a pushed sentinel snapshot.
|
|
func (a *App) StoreSnapshot(record SnapshotRecord) error {
|
|
if record.Node == "" {
|
|
record.Node = record.Snapshot.Hostname
|
|
}
|
|
if record.CollectedAt.IsZero() {
|
|
record.CollectedAt = time.Now().UTC()
|
|
}
|
|
if strings.TrimSpace(record.Node) == "" {
|
|
return fmt.Errorf("snapshot node required")
|
|
}
|
|
a.mu.Lock()
|
|
a.snapshots[record.Node] = record
|
|
a.mu.Unlock()
|
|
if err := a.persistSnapshots(); err != nil {
|
|
return err
|
|
}
|
|
a.metrics.RecordSnapshot(record.Node, "ok", record.CollectedAt)
|
|
a.appendEvent(Event{
|
|
Time: record.CollectedAt,
|
|
Kind: "sentinel.snapshot",
|
|
Summary: fmt.Sprintf("Captured sentinel snapshot for %s", record.Node),
|
|
Details: map[string]any{
|
|
"node": record.Node,
|
|
"kernel": record.Snapshot.Kernel,
|
|
"k3s_version": record.Snapshot.K3sVersion,
|
|
},
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// WatchSentinel recomputes class targets and logs meaningful drift.
|
|
func (a *App) WatchSentinel() (*Event, error) {
|
|
a.mu.RLock()
|
|
snaps := make([]facts.Snapshot, 0, len(a.snapshots))
|
|
for _, snap := range a.snapshots {
|
|
snaps = append(snaps, facts.Snapshot{
|
|
Hostname: snap.Node,
|
|
Kernel: snap.Snapshot.Kernel,
|
|
OSImage: snap.Snapshot.OSImage,
|
|
K3sVersion: firstLine(snap.Snapshot.K3sVersion),
|
|
Containerd: firstLine(snap.Snapshot.Containerd),
|
|
PackageSample: snap.Snapshot.PackageSample,
|
|
DropInsSample: snap.Snapshot.DropInsSample,
|
|
})
|
|
}
|
|
prevTargets := map[string]facts.Targets{}
|
|
for key, value := range a.targets {
|
|
prevTargets[key] = value
|
|
}
|
|
a.mu.RUnlock()
|
|
|
|
nextTargets := facts.RecommendTargets(a.inventory, snaps)
|
|
changes := diffTargets(prevTargets, nextTargets)
|
|
|
|
a.mu.Lock()
|
|
a.targets = nextTargets
|
|
a.mu.Unlock()
|
|
if err := a.persistTargets(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
event := &Event{
|
|
Time: time.Now().UTC(),
|
|
Kind: "sentinel.watch",
|
|
Summary: "Metis sentinel watch completed with no template changes",
|
|
Details: map[string]any{
|
|
"classes": len(nextTargets),
|
|
"changes": 0,
|
|
},
|
|
}
|
|
if len(changes) > 0 {
|
|
event.Summary = fmt.Sprintf("Metis sentinel watch detected %d template change(s)", len(changes))
|
|
event.Details["changes"] = changes
|
|
}
|
|
a.appendEvent(*event)
|
|
a.metrics.RecordWatch("ok")
|
|
a.metrics.SetDriftTargets(nextTargets, len(changes))
|
|
return event, nil
|
|
}
|
|
|
|
// ListDevices returns locally attached removable media that are safe candidates for flashing.
|
|
func (a *App) ListDevices(host string) ([]Device, error) {
|
|
if host == "" {
|
|
host = a.settings.DefaultFlashHost
|
|
}
|
|
if !a.supportsLocalMedia(host) {
|
|
return nil, fmt.Errorf("flash host %s is listed for planning, but this Metis instance only has direct removable-media access on %s", host, a.settings.LocalHost)
|
|
}
|
|
cmd := exec.Command("lsblk", "-J", "-b", "-o", "NAME,PATH,RM,HOTPLUG,SIZE,MODEL,TRAN,TYPE")
|
|
out, err := cmd.Output()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var payload struct {
|
|
Blockdevices []struct {
|
|
Name string `json:"name"`
|
|
Path string `json:"path"`
|
|
RM bool `json:"rm"`
|
|
Hotplug bool `json:"hotplug"`
|
|
Size any `json:"size"`
|
|
Model string `json:"model"`
|
|
Tran string `json:"tran"`
|
|
Type string `json:"type"`
|
|
} `json:"blockdevices"`
|
|
}
|
|
if err := json.Unmarshal(out, &payload); err != nil {
|
|
return nil, err
|
|
}
|
|
devices := make([]Device, 0)
|
|
for _, dev := range payload.Blockdevices {
|
|
if dev.Type != "disk" {
|
|
continue
|
|
}
|
|
size := int64(0)
|
|
switch value := dev.Size.(type) {
|
|
case string:
|
|
size, _ = strconv.ParseInt(value, 10, 64)
|
|
case float64:
|
|
size = int64(value)
|
|
}
|
|
if size <= 0 || size > a.settings.MaxDeviceBytes {
|
|
continue
|
|
}
|
|
if dev.Tran != "usb" && !dev.RM && !dev.Hotplug {
|
|
continue
|
|
}
|
|
devices = append(devices, Device{
|
|
Name: dev.Name,
|
|
Path: dev.Path,
|
|
Model: strings.TrimSpace(dev.Model),
|
|
Transport: dev.Tran,
|
|
Type: dev.Type,
|
|
Removable: dev.RM,
|
|
Hotplug: dev.Hotplug,
|
|
SizeBytes: size,
|
|
})
|
|
}
|
|
sort.Slice(devices, func(i, j int) bool {
|
|
left := deviceScore(devices[i])
|
|
right := deviceScore(devices[j])
|
|
if left != right {
|
|
return left > right
|
|
}
|
|
if devices[i].SizeBytes != devices[j].SizeBytes {
|
|
return devices[i].SizeBytes < devices[j].SizeBytes
|
|
}
|
|
return devices[i].Path < devices[j].Path
|
|
})
|
|
return devices, nil
|
|
}
|
|
|
|
func (a *App) runBuild(job *Job, flash bool) {
|
|
a.setJob(job.ID, func(j *Job) {
|
|
j.Status = JobRunning
|
|
j.Stage = "download"
|
|
j.Message = "Fetching base image"
|
|
j.ProgressPct = 5
|
|
})
|
|
output := a.artifactPath(job.Node)
|
|
cacheDir := a.settings.CacheDir
|
|
|
|
planData, err := plan.Build(a.inventory, job.Node, output, cacheDir)
|
|
if err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordBuild(job.Node, "error")
|
|
return
|
|
}
|
|
_, class, err := a.inventory.FindNode(job.Node)
|
|
if err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordBuild(job.Node, "error")
|
|
return
|
|
}
|
|
cacheImage := filepath.Join(cacheDir, filepath.Base(planData.Image))
|
|
if err := image.Download(planData.Image, cacheImage); err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordBuild(job.Node, "error")
|
|
return
|
|
}
|
|
a.setJob(job.ID, func(j *Job) {
|
|
j.Stage = "verify"
|
|
j.Message = "Verifying base image checksum"
|
|
j.ProgressPct = 18
|
|
})
|
|
if err := image.VerifyChecksum(cacheImage, class.Checksum); err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordBuild(job.Node, "error")
|
|
return
|
|
}
|
|
a.setJob(job.ID, func(j *Job) {
|
|
j.Stage = "copy"
|
|
j.Message = "Copying base image into artifact"
|
|
j.ProgressPct = 35
|
|
})
|
|
if err := writer.WriteImage(context.Background(), cacheImage, output); err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordBuild(job.Node, "error")
|
|
return
|
|
}
|
|
files, err := plan.Files(a.inventory, job.Node)
|
|
if err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordBuild(job.Node, "error")
|
|
return
|
|
}
|
|
a.setJob(job.ID, func(j *Job) {
|
|
j.Stage = "inject"
|
|
j.Message = "Injecting node-specific rootfs config"
|
|
j.ProgressPct = 70
|
|
})
|
|
if err := image.InjectRootFS(output, files); err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordBuild(job.Node, "error")
|
|
return
|
|
}
|
|
a.metrics.RecordBuild(job.Node, "ok")
|
|
a.appendEvent(Event{
|
|
Time: time.Now().UTC(),
|
|
Kind: "image.build",
|
|
Summary: fmt.Sprintf("Built replacement image for %s", job.Node),
|
|
Details: map[string]any{"node": job.Node, "artifact": output},
|
|
})
|
|
|
|
if !flash {
|
|
a.completeJob(job.ID, func(j *Job) {
|
|
j.Stage = "complete"
|
|
j.Message = "Image build complete"
|
|
j.ProgressPct = 100
|
|
j.Artifact = output
|
|
})
|
|
return
|
|
}
|
|
|
|
a.setJob(job.ID, func(j *Job) {
|
|
j.Stage = "preflight"
|
|
j.Message = "Validating device and deleting stale node object"
|
|
j.ProgressPct = 78
|
|
j.Artifact = output
|
|
})
|
|
if _, err := a.ensureDevice(job.Host, job.Device); err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordFlash(job.Node, job.Host, "error")
|
|
return
|
|
}
|
|
if err := deleteNodeObject(job.Node); err != nil {
|
|
a.appendEvent(Event{
|
|
Time: time.Now().UTC(),
|
|
Kind: "node.delete.warning",
|
|
Summary: fmt.Sprintf("Could not delete stale Kubernetes node object for %s", job.Node),
|
|
Details: map[string]any{"node": job.Node, "error": err.Error()},
|
|
})
|
|
}
|
|
if err := a.flashArtifact(job.ID, output); err != nil {
|
|
a.failJob(job.ID, err)
|
|
a.metrics.RecordFlash(job.Node, job.Host, "error")
|
|
return
|
|
}
|
|
a.metrics.RecordFlash(job.Node, job.Host, "ok")
|
|
a.appendEvent(Event{
|
|
Time: time.Now().UTC(),
|
|
Kind: "image.flash",
|
|
Summary: fmt.Sprintf("Flashed %s image to %s on %s", job.Node, job.Device, job.Host),
|
|
Details: map[string]any{"node": job.Node, "device": job.Device, "host": job.Host},
|
|
})
|
|
a.completeJob(job.ID, func(j *Job) {
|
|
j.Stage = "complete"
|
|
j.Message = fmt.Sprintf("Flash complete. Move the card into %s and power-cycle it.", j.Node)
|
|
j.ProgressPct = 100
|
|
j.Artifact = output
|
|
})
|
|
}
|
|
|
|
func (a *App) flashArtifact(jobID, artifact string) error {
|
|
info, err := os.Stat(artifact)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
a.setJob(jobID, func(j *Job) {
|
|
j.Stage = "flash"
|
|
j.Message = "Writing image to removable media"
|
|
j.ProgressPct = 82
|
|
j.Total = info.Size()
|
|
})
|
|
err = writer.WriteImageWithProgress(context.Background(), artifact, a.job(jobID).Device, func(written, total int64) {
|
|
pct := 82.0
|
|
if total > 0 {
|
|
pct = 82.0 + (float64(written)/float64(total))*17.0
|
|
}
|
|
a.setJob(jobID, func(j *Job) {
|
|
j.Written = written
|
|
j.Total = total
|
|
j.ProgressPct = pct
|
|
j.Message = fmt.Sprintf("Flashing %s of %s", humanBytes(written), humanBytes(total))
|
|
})
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (a *App) ensureDevice(host, path string) (*Device, error) {
|
|
if strings.TrimSpace(path) == "" {
|
|
return nil, fmt.Errorf("select removable media before starting a flash run")
|
|
}
|
|
devices, err := a.ListDevices(host)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, device := range devices {
|
|
if device.Path == path {
|
|
return &device, nil
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("device %s is not a current removable flash candidate", path)
|
|
}
|
|
|
|
func (a *App) newJob(kind, node, host, device string) *Job {
|
|
job := &Job{
|
|
ID: fmt.Sprintf("%d", time.Now().UTC().UnixNano()),
|
|
Kind: kind,
|
|
Node: node,
|
|
Host: host,
|
|
Device: device,
|
|
Status: JobQueued,
|
|
ProgressPct: 0,
|
|
StartedAt: time.Now().UTC(),
|
|
}
|
|
a.mu.Lock()
|
|
a.jobs[job.ID] = job
|
|
a.mu.Unlock()
|
|
return job
|
|
}
|
|
|
|
func (a *App) job(id string) *Job {
|
|
a.mu.RLock()
|
|
defer a.mu.RUnlock()
|
|
return a.jobs[id]
|
|
}
|
|
|
|
func (a *App) setJob(id string, update func(*Job)) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
job := a.jobs[id]
|
|
if job == nil {
|
|
return
|
|
}
|
|
update(job)
|
|
}
|
|
|
|
func (a *App) failJob(id string, err error) {
|
|
a.completeJob(id, func(j *Job) {
|
|
j.Status = JobError
|
|
j.Error = err.Error()
|
|
j.Message = err.Error()
|
|
})
|
|
}
|
|
|
|
func (a *App) completeJob(id string, update func(*Job)) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
job := a.jobs[id]
|
|
if job == nil {
|
|
return
|
|
}
|
|
update(job)
|
|
if job.Status != JobError {
|
|
job.Status = JobDone
|
|
}
|
|
job.FinishedAt = time.Now().UTC()
|
|
}
|
|
|
|
func (a *App) appendEvent(event Event) {
|
|
line, err := json.Marshal(event)
|
|
if err != nil {
|
|
return
|
|
}
|
|
f, err := os.OpenFile(a.settings.HistoryPath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer f.Close()
|
|
_, _ = f.Write(append(line, '\n'))
|
|
}
|
|
|
|
func (a *App) recentEvents(limit int) []Event {
|
|
f, err := os.Open(a.settings.HistoryPath)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer f.Close()
|
|
events := make([]Event, 0, limit)
|
|
scanner := bufio.NewScanner(f)
|
|
for scanner.Scan() {
|
|
var event Event
|
|
if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
|
|
continue
|
|
}
|
|
events = append(events, event)
|
|
}
|
|
if len(events) > limit {
|
|
events = events[len(events)-limit:]
|
|
}
|
|
for i, j := 0, len(events)-1; i < j; i, j = i+1, j-1 {
|
|
events[i], events[j] = events[j], events[i]
|
|
}
|
|
return events
|
|
}
|
|
|
|
func (a *App) artifacts() map[string]ArtifactSummary {
|
|
result := map[string]ArtifactSummary{}
|
|
for _, node := range a.inventory.Nodes {
|
|
path := a.artifactPath(node.Name)
|
|
info, err := os.Stat(path)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
result[node.Name] = ArtifactSummary{
|
|
Path: path,
|
|
UpdatedAt: info.ModTime().UTC(),
|
|
SizeBytes: info.Size(),
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (a *App) artifactPath(node string) string {
|
|
return filepath.Join(a.settings.ArtifactDir, fmt.Sprintf("%s.img", node))
|
|
}
|
|
|
|
func (a *App) flashHosts() []string {
|
|
hosts := map[string]struct{}{}
|
|
for _, host := range a.settings.FlashHosts {
|
|
if value := strings.TrimSpace(host); value != "" {
|
|
hosts[value] = struct{}{}
|
|
}
|
|
}
|
|
for _, host := range []string{a.settings.DefaultFlashHost, a.settings.LocalHost} {
|
|
if value := strings.TrimSpace(host); value != "" {
|
|
hosts[value] = struct{}{}
|
|
}
|
|
}
|
|
for _, host := range clusterNodeNames() {
|
|
hosts[host] = struct{}{}
|
|
}
|
|
out := make([]string, 0, len(hosts))
|
|
for host := range hosts {
|
|
out = append(out, host)
|
|
}
|
|
sort.Strings(out)
|
|
if a.settings.DefaultFlashHost == "" {
|
|
return out
|
|
}
|
|
return moveToFront(out, a.settings.DefaultFlashHost)
|
|
}
|
|
|
|
func (a *App) loadSnapshots() error {
|
|
data, err := os.ReadFile(a.settings.SnapshotsPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var snapshots map[string]SnapshotRecord
|
|
if err := json.Unmarshal(data, &snapshots); err != nil {
|
|
return err
|
|
}
|
|
a.mu.Lock()
|
|
a.snapshots = snapshots
|
|
a.mu.Unlock()
|
|
for _, snap := range snapshots {
|
|
a.metrics.RecordSnapshot(snap.Node, "ok", snap.CollectedAt)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *App) persistSnapshots() error {
|
|
a.mu.RLock()
|
|
data, err := json.MarshalIndent(a.snapshots, "", " ")
|
|
a.mu.RUnlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(a.settings.SnapshotsPath), 0o755); err != nil {
|
|
return err
|
|
}
|
|
return os.WriteFile(a.settings.SnapshotsPath, data, 0o644)
|
|
}
|
|
|
|
func (a *App) loadTargets() error {
|
|
data, err := os.ReadFile(a.settings.TargetsPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var targets map[string]facts.Targets
|
|
if err := json.Unmarshal(data, &targets); err != nil {
|
|
return err
|
|
}
|
|
a.mu.Lock()
|
|
a.targets = targets
|
|
a.mu.Unlock()
|
|
a.metrics.SetDriftTargets(targets, 0)
|
|
return nil
|
|
}
|
|
|
|
func (a *App) persistTargets() error {
|
|
a.mu.RLock()
|
|
data, err := json.MarshalIndent(a.targets, "", " ")
|
|
a.mu.RUnlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := os.MkdirAll(filepath.Dir(a.settings.TargetsPath), 0o755); err != nil {
|
|
return err
|
|
}
|
|
return os.WriteFile(a.settings.TargetsPath, data, 0o644)
|
|
}
|
|
|
|
func diffTargets(prev, next map[string]facts.Targets) []string {
|
|
classes := map[string]struct{}{}
|
|
for class := range prev {
|
|
classes[class] = struct{}{}
|
|
}
|
|
for class := range next {
|
|
classes[class] = struct{}{}
|
|
}
|
|
out := make([]string, 0)
|
|
for class := range classes {
|
|
if !targetsEqual(prev[class], next[class]) {
|
|
out = append(out, class)
|
|
}
|
|
}
|
|
sort.Strings(out)
|
|
return out
|
|
}
|
|
|
|
func targetsEqual(a, b facts.Targets) bool {
|
|
if a.Kernel != b.Kernel || a.OSImage != b.OSImage || a.Containerd != b.Containerd || a.K3sVersion != b.K3sVersion {
|
|
return false
|
|
}
|
|
if len(a.Packages) != len(b.Packages) {
|
|
return false
|
|
}
|
|
for key, value := range a.Packages {
|
|
if b.Packages[key] != value {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func humanBytes(value int64) string {
|
|
const unit = 1024
|
|
if value < unit {
|
|
return fmt.Sprintf("%d B", value)
|
|
}
|
|
div, exp := int64(unit), 0
|
|
for n := value / unit; n >= unit; n /= unit {
|
|
div *= unit
|
|
exp++
|
|
}
|
|
return fmt.Sprintf("%.1f %ciB", float64(value)/float64(div), "KMGTPE"[exp])
|
|
}
|
|
|
|
func firstLine(value string) string {
|
|
value = strings.TrimSpace(value)
|
|
if idx := strings.IndexByte(value, '\n'); idx >= 0 {
|
|
return strings.TrimSpace(value[:idx])
|
|
}
|
|
return value
|
|
}
|
|
|
|
func preferredDevice(devices []Device) string {
|
|
if len(devices) == 0 {
|
|
return ""
|
|
}
|
|
return devices[0].Path
|
|
}
|
|
|
|
func errorString(err error) string {
|
|
if err == nil {
|
|
return ""
|
|
}
|
|
return err.Error()
|
|
}
|
|
|
|
func (a *App) supportsLocalMedia(host string) bool {
|
|
host = strings.TrimSpace(host)
|
|
return host == "" || host == a.settings.LocalHost || host == a.settings.DefaultFlashHost
|
|
}
|
|
|
|
func deviceScore(device Device) int {
|
|
score := 0
|
|
model := strings.ToLower(strings.TrimSpace(device.Model))
|
|
switch {
|
|
case strings.Contains(model, "microsd"), strings.Contains(model, "micro sd"):
|
|
score += 60
|
|
case strings.Contains(model, "sdxc"), strings.Contains(model, "sdhc"), strings.Contains(model, "sd "):
|
|
score += 50
|
|
case strings.Contains(model, "card"), strings.Contains(model, "reader"):
|
|
score += 40
|
|
}
|
|
if device.Removable {
|
|
score += 20
|
|
}
|
|
if device.Hotplug {
|
|
score += 10
|
|
}
|
|
if device.Transport == "usb" {
|
|
score += 5
|
|
}
|
|
if strings.HasPrefix(device.Name, "mmcblk") {
|
|
score += 25
|
|
}
|
|
return score
|
|
}
|
|
|
|
func moveToFront(values []string, preferred string) []string {
|
|
if preferred == "" || len(values) < 2 {
|
|
return values
|
|
}
|
|
out := append([]string{}, values...)
|
|
for idx, value := range out {
|
|
if value != preferred {
|
|
continue
|
|
}
|
|
copy(out[1:idx+1], out[:idx])
|
|
out[0] = preferred
|
|
return out
|
|
}
|
|
return out
|
|
}
|
|
|
|
func deleteNodeObject(node string) error {
|
|
if err := deleteNodeObjectInCluster(node); err == nil {
|
|
return nil
|
|
}
|
|
cmd := exec.Command("kubectl", "delete", "node", node, "--ignore-not-found")
|
|
if out, err := cmd.CombinedOutput(); err != nil {
|
|
return fmt.Errorf("delete node: %w: %s", err, strings.TrimSpace(string(out)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func deleteNodeObjectInCluster(node string) error {
|
|
host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST"))
|
|
port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT"))
|
|
if host == "" || port == "" {
|
|
return errors.New("not running in cluster")
|
|
}
|
|
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
caPEM, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pool := x509.NewCertPool()
|
|
if !pool.AppendCertsFromPEM(caPEM) {
|
|
return errors.New("append kubernetes CA")
|
|
}
|
|
client := &http.Client{
|
|
Timeout: 15 * time.Second,
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{RootCAs: pool},
|
|
},
|
|
}
|
|
req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("https://%s:%s/api/v1/nodes/%s", host, port, node), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(string(token)))
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode == http.StatusNotFound || resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusAccepted {
|
|
return nil
|
|
}
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
|
|
return fmt.Errorf("delete node %s failed: %s: %s", node, resp.Status, strings.TrimSpace(string(body)))
|
|
}
|
|
|
|
func clusterNodeNames() []string {
|
|
host := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_HOST"))
|
|
port := strings.TrimSpace(os.Getenv("KUBERNETES_SERVICE_PORT"))
|
|
if host == "" || port == "" {
|
|
return nil
|
|
}
|
|
token, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
caPEM, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
pool := x509.NewCertPool()
|
|
if !pool.AppendCertsFromPEM(caPEM) {
|
|
return nil
|
|
}
|
|
client := &http.Client{
|
|
Timeout: 10 * time.Second,
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{RootCAs: pool},
|
|
},
|
|
}
|
|
req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("https://%s:%s/api/v1/nodes", host, port), nil)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+strings.TrimSpace(string(token)))
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil
|
|
}
|
|
var payload struct {
|
|
Items []struct {
|
|
Metadata struct {
|
|
Name string `json:"name"`
|
|
} `json:"metadata"`
|
|
} `json:"items"`
|
|
}
|
|
if err := json.NewDecoder(io.LimitReader(resp.Body, 1<<20)).Decode(&payload); err != nil {
|
|
return nil
|
|
}
|
|
names := make([]string, 0, len(payload.Items))
|
|
for _, item := range payload.Items {
|
|
if name := strings.TrimSpace(item.Metadata.Name); name != "" {
|
|
names = append(names, name)
|
|
}
|
|
}
|
|
sort.Strings(names)
|
|
return names
|
|
}
|