metis/pkg/service/remote.go

372 lines
12 KiB
Go

package service
import (
"encoding/json"
"fmt"
"sort"
"strings"
"time"
)
const (
hostTmpDevicePath = "hosttmp:///var/tmp/metis-flash-test"
vaultRoleMaintenance = "maintenance"
vaultRuntimeSecretPath = "kv/data/atlas/maintenance/metis-runtime"
vaultHarborSecretPath = "kv/data/atlas/harbor/harbor-core"
vaultSSHKeysSecretPath = "kv/data/atlas/maintenance/metis-ssh-keys"
)
// ListDevices returns cached device data because the UI needs a cheap refresh
// path while remote enumeration is still in flight.
func (a *App) ListDevices(host string) ([]Device, error) {
return a.cachedDevices(host)
}
// RefreshDevices rebuilds the flash-device list because the chooser needs the
// latest host-specific USB inventory before a burn can start.
func (a *App) RefreshDevices(host string) ([]Device, error) {
if host == "" {
host = a.settings.DefaultFlashHost
}
nodeMap := map[string]clusterNode{}
for _, node := range clusterNodes() {
nodeMap[node.Name] = node
}
target, ok := nodeMap[host]
if !ok {
err := fmt.Errorf("flash host %s is not a current cluster node", host)
a.recordDevices(host, nil, err)
return nil, err
}
image := a.podImageForArch(target.Arch)
if image == "" {
err := fmt.Errorf("no runner image configured for arch %s", target.Arch)
a.recordDevices(host, nil, err)
return nil, err
}
podName := fmt.Sprintf("metis-devices-%d", time.Now().UTC().UnixNano())
logs, err := a.runRemotePod("", podName, a.remoteDevicePodSpec(podName, host, image))
if err != nil {
a.recordDevices(host, nil, err)
return nil, err
}
var payload struct {
Devices []Device `json:"devices"`
}
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &payload); err != nil {
decodeErr := fmt.Errorf("decode remote devices: %w: %s", err, strings.TrimSpace(logs))
a.recordDevices(host, nil, decodeErr)
return nil, decodeErr
}
sort.Slice(payload.Devices, func(i, j int) bool {
left := deviceScore(payload.Devices[i])
right := deviceScore(payload.Devices[j])
if left != right {
return left > right
}
if payload.Devices[i].SizeBytes != payload.Devices[j].SizeBytes {
return payload.Devices[i].SizeBytes < payload.Devices[j].SizeBytes
}
return payload.Devices[i].Path < payload.Devices[j].Path
})
a.recordDevices(host, payload.Devices, nil)
return payload.Devices, nil
}
func (a *App) runBuild(job *Job, flash bool) {
nodeSpec, class, err := a.inventory.FindNode(job.Node)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
if _, err := a.stageDesiredNodeMetadata(job.Node); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
if err := a.ensureHarborProject(); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
builder, err := a.selectBuilderHost(class.Arch, job.Host)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
job.Builder = builder.Name
buildTag := time.Now().UTC().Format("20060102t150405z")
artifactRef := a.artifactRepo(job.Node)
a.setJob(job.ID, func(j *Job) {
j.Status = JobRunning
j.Stage = "build"
j.StageStartedAt = time.Now().UTC()
j.Message = fmt.Sprintf("Building on %s (%s) and publishing to Harbor", builder.Name, builder.Arch)
j.ProgressPct = 8
j.Artifact = artifactRef + ":latest"
j.Builder = builder.Name
})
buildImage := a.podImageForArch(builder.Arch)
if buildImage == "" {
a.failJob(job.ID, fmt.Errorf("no runner image configured for arch %s", builder.Arch))
a.metrics.RecordBuild(job.Node, "error")
return
}
buildPod := fmt.Sprintf("metis-build-%d", time.Now().UTC().UnixNano())
job.Builder = builder.Name
logs, err := a.runRemotePod(job.ID, buildPod, a.remoteBuildPodSpec(buildPod, builder.Name, buildImage, job.Node, strings.TrimSpace(nodeSpec.Hostname), artifactRef, buildTag))
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
var summary ArtifactSummary
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &summary); err != nil {
a.failJob(job.ID, fmt.Errorf("decode remote build output: %w: %s", err, strings.TrimSpace(logs)))
a.metrics.RecordBuild(job.Node, "error")
return
}
summary.Node = job.Node
summary.Ref = artifactRef + ":latest"
summary.BuilderHost = builder.Name
if err := a.recordArtifact(summary); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
if err := a.pruneHarborArtifacts(job.Node, 3); err != nil {
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "artifact.prune.warning",
Summary: fmt.Sprintf("Harbor cleanup warning for %s", job.Node),
Details: map[string]any{"node": job.Node, "error": err.Error()},
})
}
a.metrics.RecordBuild(job.Node, "ok")
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "image.build",
Summary: fmt.Sprintf("Built replacement image for %s on %s", job.Node, builder.Name),
Details: map[string]any{"node": job.Node, "artifact": artifactRef + ":latest", "builder": builder.Name},
})
if !flash {
a.completeJob(job.ID, func(j *Job) {
j.Stage = "complete"
j.Message = "Image build complete"
j.ProgressPct = 100
j.Artifact = artifactRef + ":latest"
})
return
}
result, err := a.runFlashSequence(job, artifactRef)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordFlash(job.Node, job.Host, "error")
return
}
a.metrics.RecordFlash(job.Node, job.Host, "ok")
a.appendFlashEvent(job, artifactRef, result)
a.completeFlashJob(job.ID, artifactRef, result)
}
func (a *App) runFlash(job *Job) {
artifactRef := a.artifactRepo(job.Node)
result, err := a.runFlashSequence(job, artifactRef)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordFlash(job.Node, job.Host, "error")
return
}
a.metrics.RecordFlash(job.Node, job.Host, "ok")
a.appendFlashEvent(job, artifactRef, result)
a.completeFlashJob(job.ID, artifactRef, result)
}
func (a *App) runFlashSequence(job *Job, artifactRef string) (RemoteFlashResult, error) {
if _, err := a.stageDesiredNodeMetadata(job.Node); err != nil {
return RemoteFlashResult{}, err
}
a.setJob(job.ID, func(j *Job) {
j.Status = JobRunning
j.Stage = "preflight"
j.StageStartedAt = time.Now().UTC()
j.Message = fmt.Sprintf("Validating %s and preparing the latest Harbor artifact for %s", prettyDeviceTarget(j.Device), j.Host)
j.ProgressPct = 80
j.Artifact = artifactRef + ":latest"
})
if _, err := a.ensureDevice(job.Host, job.Device); err != nil {
return RemoteFlashResult{}, err
}
if !strings.HasPrefix(job.Device, "hosttmp://") {
if err := deleteNodeObject(job.Node); err != nil {
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "node.delete.warning",
Summary: fmt.Sprintf("Could not delete stale Kubernetes node object for %s", job.Node),
Details: map[string]any{"node": job.Node, "error": err.Error()},
})
}
}
return a.flashArtifact(job.ID, artifactRef)
}
func (a *App) appendFlashEvent(job *Job, artifactRef string, result RemoteFlashResult) {
summary := fmt.Sprintf("Flashed and verified %s latest image on %s", job.Node, job.Host)
if strings.HasPrefix(job.Device, "hosttmp://") {
summary = fmt.Sprintf("Verified %s latest image on %s host scratch", job.Node, job.Host)
}
details := map[string]any{
"node": job.Node,
"device": job.Device,
"host": job.Host,
"artifact": artifactRef + ":latest",
"dest_path": result.DestPath,
"verified": result.Verified,
"verification_kind": result.VerificationKind,
}
if strings.TrimSpace(result.BootPartition) != "" {
details["boot_partition"] = result.BootPartition
}
if strings.TrimSpace(result.RootPartition) != "" {
details["root_partition"] = result.RootPartition
}
if len(result.CheckedFiles) > 0 {
details["checked_files"] = append([]string{}, result.CheckedFiles...)
}
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "image.flash",
Summary: summary,
Details: details,
})
}
func (a *App) completeFlashJob(jobID, artifactRef string, result RemoteFlashResult) {
a.completeJob(jobID, func(j *Job) {
j.Stage = "complete"
j.ProgressPct = 100
j.Artifact = artifactRef + ":latest"
if strings.HasPrefix(j.Device, "hosttmp://") {
if strings.TrimSpace(result.VerificationSummary) != "" {
j.Message = fmt.Sprintf("%s Test flash complete on %s host /tmp.", strings.TrimRight(result.VerificationSummary, "."), j.Host)
return
}
j.Message = fmt.Sprintf("Verified test flash on %s host /tmp.", j.Host)
return
}
if strings.TrimSpace(result.VerificationSummary) != "" {
j.Message = fmt.Sprintf("%s Move the card into %s and power-cycle it.", strings.TrimRight(result.VerificationSummary, "."), j.Node)
return
}
j.Message = fmt.Sprintf("Flash verified on %s. Move the card into %s and power-cycle it.", j.Host, j.Node)
})
}
func (a *App) flashArtifact(jobID, artifactRef string) (RemoteFlashResult, error) {
job := a.job(jobID)
if job == nil {
return RemoteFlashResult{}, fmt.Errorf("job %s no longer exists", jobID)
}
nodes := clusterNodes()
nodeMap := map[string]clusterNode{}
for _, node := range nodes {
nodeMap[node.Name] = node
}
target, ok := nodeMap[job.Host]
if !ok {
return RemoteFlashResult{}, fmt.Errorf("flash host %s is not a current cluster node", job.Host)
}
image := a.podImageForArch(target.Arch)
if image == "" {
return RemoteFlashResult{}, fmt.Errorf("no runner image configured for arch %s", target.Arch)
}
a.setJob(jobID, func(j *Job) {
j.Stage = "flash_pull"
j.StageStartedAt = time.Now().UTC()
j.Message = fmt.Sprintf("Pulling %s and preparing it for %s", artifactRef+":latest", prettyDeviceTarget(j.Device))
j.ProgressPct = 84
})
podName := fmt.Sprintf("metis-flash-%d", time.Now().UTC().UnixNano())
logs, err := a.runRemotePod(jobID, podName, a.remoteFlashPodSpec(podName, target.Name, image, job.Node, job.Device, artifactRef))
if err != nil {
return RemoteFlashResult{}, err
}
var result RemoteFlashResult
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &result); err != nil {
return RemoteFlashResult{}, fmt.Errorf("decode remote flash output: %w: %s", err, strings.TrimSpace(logs))
}
if !result.Verified {
return RemoteFlashResult{}, fmt.Errorf("flash verification did not succeed for %s on %s", prettyDeviceTarget(job.Device), job.Host)
}
a.setJob(jobID, func(j *Job) {
if result.SizeBytes > 0 {
j.Written = result.SizeBytes
j.Total = result.SizeBytes
}
if strings.TrimSpace(result.VerificationSummary) != "" {
j.Message = result.VerificationSummary
return
}
if strings.TrimSpace(result.DestPath) != "" {
j.Message = fmt.Sprintf("Verified the latest image write at %s", result.DestPath)
}
})
return result, nil
}
func (a *App) heartbeatRemoteJob(jobID string) {
if strings.TrimSpace(jobID) == "" {
return
}
a.setJob(jobID, func(j *Job) {
if j == nil || j.Status != JobRunning {
return
}
stageStart := j.StageStartedAt
if stageStart.IsZero() {
stageStart = j.StartedAt
}
elapsed := time.Since(stageStart)
switch {
case j.Stage == "build":
progress, message := buildStageHeartbeat(j.Node, j.Builder, elapsed)
if progress > j.ProgressPct {
j.ProgressPct = progress
}
if strings.TrimSpace(message) != "" {
j.Message = message
}
case j.Stage == "preflight":
if j.ProgressPct < 80 {
j.ProgressPct = 80
}
j.Message = fmt.Sprintf("Validating %s on %s and resolving the latest Harbor artifact", prettyDeviceTarget(j.Device), j.Host)
case isFlashStage(j.Stage):
if j.Stage == "flash_write" && j.Total > 0 && j.Written > 0 {
actual := 92 + (float64(j.Written)/float64(j.Total))*6
if actual > 98 {
actual = 98
}
if actual > j.ProgressPct {
j.ProgressPct = actual
}
j.Message = fmt.Sprintf("Writing %s of %s on %s", humanBytes(j.Written), humanBytes(j.Total), j.Host)
return
}
progress, message := flashStagePhaseHeartbeat(j.Stage, j.Host, j.Artifact, elapsed)
if progress > j.ProgressPct {
j.ProgressPct = progress
}
if strings.TrimSpace(message) != "" {
j.Message = message
}
}
})
}