metis/pkg/service/remote.go

640 lines
20 KiB
Go
Raw Normal View History

package service
import (
"encoding/json"
"fmt"
"math"
"path/filepath"
"sort"
"strings"
"time"
"metis/pkg/inventory"
)
const (
hostTmpDevicePath = "hosttmp:///tmp"
vaultRoleMaintenance = "maintenance"
vaultRuntimeSecretPath = "kv/data/atlas/maintenance/metis-runtime"
vaultHarborSecretPath = "kv/data/atlas/harbor/harbor-core"
vaultSSHKeysSecretPath = "kv/data/atlas/maintenance/metis-ssh-keys"
)
func (a *App) ListDevices(host string) ([]Device, error) {
return a.cachedDevices(host)
}
func (a *App) RefreshDevices(host string) ([]Device, error) {
if host == "" {
host = a.settings.DefaultFlashHost
}
nodeMap := map[string]clusterNode{}
for _, node := range clusterNodes() {
nodeMap[node.Name] = node
}
target, ok := nodeMap[host]
if !ok {
err := fmt.Errorf("flash host %s is not a current cluster node", host)
a.recordDevices(host, nil, err)
return nil, err
}
image := a.podImageForArch(target.Arch)
if image == "" {
err := fmt.Errorf("no runner image configured for arch %s", target.Arch)
a.recordDevices(host, nil, err)
return nil, err
}
podName := fmt.Sprintf("metis-devices-%d", time.Now().UTC().UnixNano())
logs, err := a.runRemotePod("", podName, a.remoteDevicePodSpec(podName, host, image))
if err != nil {
a.recordDevices(host, nil, err)
return nil, err
}
var payload struct {
Devices []Device `json:"devices"`
}
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &payload); err != nil {
decodeErr := fmt.Errorf("decode remote devices: %w: %s", err, strings.TrimSpace(logs))
a.recordDevices(host, nil, decodeErr)
return nil, decodeErr
}
sort.Slice(payload.Devices, func(i, j int) bool {
left := deviceScore(payload.Devices[i])
right := deviceScore(payload.Devices[j])
if left != right {
return left > right
}
if payload.Devices[i].SizeBytes != payload.Devices[j].SizeBytes {
return payload.Devices[i].SizeBytes < payload.Devices[j].SizeBytes
}
return payload.Devices[i].Path < payload.Devices[j].Path
})
a.recordDevices(host, payload.Devices, nil)
return payload.Devices, nil
}
func (a *App) runBuild(job *Job, flash bool) {
nodeSpec, class, err := a.inventory.FindNode(job.Node)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
if err := a.ensureHarborProject(); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
builder, err := a.selectBuilderHost(class.Arch, job.Host)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
job.Builder = builder.Name
buildTag := time.Now().UTC().Format("20060102t150405z")
artifactRef := a.artifactRepo(job.Node)
a.setJob(job.ID, func(j *Job) {
j.Status = JobRunning
j.Stage = "build"
j.StageStartedAt = time.Now().UTC()
j.Message = fmt.Sprintf("Building on %s (%s) and publishing to Harbor", builder.Name, builder.Arch)
j.ProgressPct = 8
j.Artifact = artifactRef + ":latest"
j.Builder = builder.Name
})
buildImage := a.podImageForArch(builder.Arch)
if buildImage == "" {
a.failJob(job.ID, fmt.Errorf("no runner image configured for arch %s", builder.Arch))
a.metrics.RecordBuild(job.Node, "error")
return
}
buildPod := fmt.Sprintf("metis-build-%d", time.Now().UTC().UnixNano())
logs, err := a.runRemotePod(job.ID, buildPod, a.remoteBuildPodSpec(buildPod, builder.Name, buildImage, job.Node, artifactRef, buildTag))
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
var summary ArtifactSummary
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &summary); err != nil {
a.failJob(job.ID, fmt.Errorf("decode remote build output: %w: %s", err, strings.TrimSpace(logs)))
a.metrics.RecordBuild(job.Node, "error")
return
}
summary.Node = job.Node
summary.Ref = artifactRef + ":latest"
summary.BuilderHost = builder.Name
if err := a.recordArtifact(summary); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
if err := a.pruneHarborArtifacts(job.Node, 3); err != nil {
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "artifact.prune.warning",
Summary: fmt.Sprintf("Harbor cleanup warning for %s", job.Node),
Details: map[string]any{"node": job.Node, "error": err.Error()},
})
}
a.metrics.RecordBuild(job.Node, "ok")
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "image.build",
Summary: fmt.Sprintf("Built replacement image for %s on %s", job.Node, builder.Name),
Details: map[string]any{"node": job.Node, "artifact": artifactRef + ":latest", "builder": builder.Name},
})
if !flash {
a.completeJob(job.ID, func(j *Job) {
j.Stage = "complete"
j.Message = "Image build complete"
j.ProgressPct = 100
j.Artifact = artifactRef + ":latest"
})
return
}
a.setJob(job.ID, func(j *Job) {
j.Stage = "preflight"
j.StageStartedAt = time.Now().UTC()
j.Message = fmt.Sprintf("Validating %s and preparing the latest Harbor artifact for %s", prettyDeviceTarget(j.Device), j.Host)
j.ProgressPct = 78
j.Artifact = artifactRef + ":latest"
})
if _, err := a.ensureDevice(job.Host, job.Device); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordFlash(job.Node, job.Host, "error")
return
}
if !strings.HasPrefix(job.Device, "hosttmp://") {
if err := deleteNodeObject(job.Node); err != nil {
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "node.delete.warning",
Summary: fmt.Sprintf("Could not delete stale Kubernetes node object for %s", job.Node),
Details: map[string]any{"node": job.Node, "error": err.Error()},
})
}
}
if err := a.flashArtifact(job.ID, artifactRef); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordFlash(job.Node, job.Host, "error")
return
}
a.metrics.RecordFlash(job.Node, job.Host, "ok")
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "image.flash",
Summary: fmt.Sprintf("Flashed %s latest image on %s", job.Node, job.Host),
Details: map[string]any{"node": job.Node, "device": job.Device, "host": job.Host, "artifact": artifactRef + ":latest"},
})
a.completeJob(job.ID, func(j *Job) {
j.Stage = "complete"
if strings.HasPrefix(j.Device, "hosttmp://") {
j.Message = fmt.Sprintf("Test flash complete on %s host /tmp.", j.Host)
} else {
j.Message = fmt.Sprintf("Flash complete on %s. Move the card into %s and power-cycle it.", j.Host, j.Node)
}
j.ProgressPct = 100
j.Artifact = artifactRef + ":latest"
})
_ = nodeSpec
}
func (a *App) flashArtifact(jobID, artifactRef string) error {
nodes := clusterNodes()
nodeMap := map[string]clusterNode{}
for _, node := range nodes {
nodeMap[node.Name] = node
}
target, ok := nodeMap[a.job(jobID).Host]
if !ok {
return fmt.Errorf("flash host %s is not a current cluster node", a.job(jobID).Host)
}
image := a.podImageForArch(target.Arch)
if image == "" {
return fmt.Errorf("no runner image configured for arch %s", target.Arch)
}
a.setJob(jobID, func(j *Job) {
j.Stage = "flash"
j.StageStartedAt = time.Now().UTC()
j.Message = fmt.Sprintf("Pulling %s and writing it on %s", artifactRef+":latest", j.Host)
j.ProgressPct = 84
})
podName := fmt.Sprintf("metis-flash-%d", time.Now().UTC().UnixNano())
logs, err := a.runRemotePod(jobID, podName, a.remoteFlashPodSpec(podName, target.Name, image, a.job(jobID).Node, a.job(jobID).Device, artifactRef))
if err != nil {
return err
}
var payload map[string]any
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &payload); err == nil {
a.setJob(jobID, func(j *Job) {
if dest, ok := payload["dest_path"].(string); ok && dest != "" {
j.Message = fmt.Sprintf("Wrote latest artifact to %s", dest)
}
})
}
return nil
}
func (a *App) heartbeatRemoteJob(jobID string) {
if strings.TrimSpace(jobID) == "" {
return
}
a.setJob(jobID, func(j *Job) {
if j == nil || j.Status != JobRunning {
return
}
stageStart := j.StageStartedAt
if stageStart.IsZero() {
stageStart = j.StartedAt
}
elapsed := time.Since(stageStart)
switch j.Stage {
case "build":
progress, message := buildStageHeartbeat(j.Node, j.Builder, elapsed)
if progress > j.ProgressPct {
j.ProgressPct = progress
}
if strings.TrimSpace(message) != "" {
j.Message = message
}
case "preflight":
if j.ProgressPct < 80 {
j.ProgressPct = 80
}
j.Message = fmt.Sprintf("Validating %s on %s and resolving the latest Harbor artifact", prettyDeviceTarget(j.Device), j.Host)
case "flash":
progress, message := flashStageHeartbeat(j.Host, j.Artifact, elapsed)
if progress > j.ProgressPct {
j.ProgressPct = progress
}
if strings.TrimSpace(message) != "" {
j.Message = message
}
}
})
}
func buildStageHeartbeat(node, builder string, elapsed time.Duration) (float64, string) {
seconds := elapsed.Seconds()
switch {
case seconds < 20:
return ramp(seconds, 0, 20, 8, 14), fmt.Sprintf("Scheduling a remote builder on %s for %s", builder, node)
case seconds < 120:
return ramp(seconds, 20, 120, 14, 30), fmt.Sprintf("Injecting %s recovery config into the base image on %s", node, builder)
case seconds < 360:
return ramp(seconds, 120, 360, 30, 58), fmt.Sprintf("Building the replacement image filesystem for %s on %s", node, builder)
case seconds < 540:
return ramp(seconds, 360, 540, 58, 70), fmt.Sprintf("Compressing the replacement image for %s before upload", node)
default:
return math.Min(76, ramp(seconds, 540, 900, 70, 76)), fmt.Sprintf("Publishing %s to Harbor and refreshing the latest tag", node)
}
}
func flashStageHeartbeat(host, artifact string, elapsed time.Duration) (float64, string) {
seconds := elapsed.Seconds()
switch {
case seconds < 10:
return ramp(seconds, 0, 10, 84, 88), fmt.Sprintf("Pulling %s from Harbor on %s", artifact, host)
case seconds < 45:
return ramp(seconds, 10, 45, 88, 96), fmt.Sprintf("Writing the latest image to the selected target on %s", host)
default:
return math.Min(98, ramp(seconds, 45, 120, 96, 98)), fmt.Sprintf("Flushing buffers and finishing the write on %s", host)
}
}
func prettyDeviceTarget(path string) string {
switch {
case strings.HasPrefix(path, "hosttmp://"):
return "/tmp"
case strings.TrimSpace(path) == "":
return "the selected target"
default:
return path
}
}
func ramp(value, start, end, min, max float64) float64 {
if end <= start {
return max
}
if value <= start {
return min
}
if value >= end {
return max
}
return min + ((value-start)/(end-start))*(max-min)
}
func (a *App) ensureDevice(host, path string) (*Device, error) {
if strings.TrimSpace(path) == "" {
return nil, fmt.Errorf("select removable media before starting a flash run")
}
devices, err := a.RefreshDevices(host)
if err != nil {
return nil, err
}
for _, device := range devices {
if device.Path == path {
return &device, nil
}
}
return nil, fmt.Errorf("device %s is not a current flash candidate on %s", path, host)
}
func (a *App) selectBuilderHost(arch, flashHost string) (clusterNode, error) {
nodes := clusterNodes()
storageNodes := map[string]struct{}{}
for _, node := range a.inventory.Nodes {
if len(node.LonghornDisks) > 0 {
storageNodes[node.Name] = struct{}{}
}
}
type scored struct {
node clusterNode
score int
}
candidates := make([]scored, 0)
for _, node := range nodes {
if node.Arch != arch || node.Unschedulable || node.ControlPlane {
continue
}
score := 0
if node.Worker {
score += 40
}
switch arch {
case "arm64":
if node.Hardware == "rpi5" {
score += 30
}
if _, storage := storageNodes[node.Name]; storage {
score -= 50
}
case "amd64":
if node.Name == a.settings.DefaultFlashHost {
score += 30
}
if node.Name == "titan-24" {
score -= 10
}
}
if flashHost != "" && node.Name == flashHost {
score += 5
}
candidates = append(candidates, scored{node: node, score: score})
}
sort.Slice(candidates, func(i, j int) bool {
if candidates[i].score != candidates[j].score {
return candidates[i].score > candidates[j].score
}
return candidates[i].node.Name < candidates[j].node.Name
})
if len(candidates) == 0 {
return clusterNode{}, fmt.Errorf("no build host available for arch %s", arch)
}
return candidates[0].node, nil
}
func (a *App) remoteDevicePodSpec(name, host, image string) map[string]any {
return map[string]any{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]any{
"name": name,
"namespace": a.settings.Namespace,
"labels": map[string]string{"app": "metis-remote", "metis-run": "devices"},
},
"spec": map[string]any{
"restartPolicy": "Never",
"serviceAccountName": "metis",
"nodeSelector": map[string]string{
"kubernetes.io/hostname": host,
},
"containers": []map[string]any{
{
"name": "remote-devices",
"image": image,
"imagePullPolicy": "Always",
"command": []string{
"metis", "remote-devices",
"--max-device-bytes", fmt.Sprintf("%d", a.settings.MaxDeviceBytes),
2026-03-31 21:46:59 -03:00
"--host-tmp-dir", mountedHostTmpDir(a.settings.HostTmpDir),
},
"securityContext": map[string]any{"privileged": true, "runAsUser": 0},
"volumeMounts": []map[string]any{
{"name": "host-dev", "mountPath": "/dev"},
{"name": "host-sys", "mountPath": "/sys", "readOnly": true},
{"name": "host-udev", "mountPath": "/run/udev", "readOnly": true},
{"name": "host-tmp", "mountPath": "/host-tmp"},
},
},
},
"imagePullSecrets": []map[string]string{{"name": "harbor-regcred"}},
"volumes": []map[string]any{
{"name": "host-dev", "hostPath": map[string]any{"path": "/dev"}},
{"name": "host-sys", "hostPath": map[string]any{"path": "/sys"}},
{"name": "host-udev", "hostPath": map[string]any{"path": "/run/udev"}},
{"name": "host-tmp", "hostPath": map[string]any{"path": "/tmp"}},
},
},
}
}
func (a *App) remoteBuildPodSpec(name, host, image, node, artifactRef, buildTag string) map[string]any {
return map[string]any{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]any{
"name": name,
"namespace": a.settings.Namespace,
"labels": map[string]string{"app": "metis-remote", "metis-run": "build"},
"annotations": vaultRuntimeAnnotations(true),
},
"spec": map[string]any{
"restartPolicy": "Never",
"serviceAccountName": "metis",
"nodeSelector": map[string]string{
"kubernetes.io/hostname": host,
},
"containers": []map[string]any{
{
"name": "remote-build",
"image": image,
"imagePullPolicy": "Always",
"command": []string{"/bin/sh", "-c"},
"args": []string{
remoteWorkerEntrypoint(
true,
"remote-build",
"--inventory", a.settings.InventoryPath,
"--node", node,
"--cache", "/workspace/cache",
"--work-dir", "/workspace/build",
"--artifact-ref", artifactRef,
"--build-tag", buildTag,
"--harbor-registry", a.settings.HarborRegistry,
),
},
"envFrom": []map[string]any{
{"configMapRef": map[string]any{"name": "metis"}},
},
"volumeMounts": []map[string]any{
{"name": "workspace", "mountPath": "/workspace"},
},
},
},
"imagePullSecrets": []map[string]string{{"name": "harbor-regcred"}},
"volumes": []map[string]any{
{"name": "workspace", "emptyDir": map[string]any{}},
},
},
}
}
func (a *App) remoteFlashPodSpec(name, host, image, node, device, artifactRef string) map[string]any {
return map[string]any{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]any{
"name": name,
"namespace": a.settings.Namespace,
"labels": map[string]string{"app": "metis-remote", "metis-run": "flash"},
"annotations": vaultRuntimeAnnotations(false),
},
"spec": map[string]any{
"restartPolicy": "Never",
"serviceAccountName": "metis",
"nodeSelector": map[string]string{
"kubernetes.io/hostname": host,
},
"containers": []map[string]any{
{
"name": "remote-flash",
"image": image,
"imagePullPolicy": "Always",
"command": []string{"/bin/sh", "-c"},
"args": []string{
remoteWorkerEntrypoint(
false,
"remote-flash",
"--node", node,
"--device", device,
"--artifact-ref", artifactRef,
"--work-dir", "/workspace/flash",
"--harbor-registry", a.settings.HarborRegistry,
"--host-tmp-dir", mountedHostTmpDir(a.settings.HostTmpDir),
),
},
"securityContext": map[string]any{"privileged": true, "runAsUser": 0},
"envFrom": []map[string]any{
{"configMapRef": map[string]any{"name": "metis"}},
},
"volumeMounts": []map[string]any{
{"name": "workspace", "mountPath": "/workspace"},
{"name": "host-dev", "mountPath": "/dev"},
{"name": "host-sys", "mountPath": "/sys", "readOnly": true},
{"name": "host-udev", "mountPath": "/run/udev", "readOnly": true},
{"name": "host-tmp", "mountPath": "/host-tmp"},
},
},
},
"imagePullSecrets": []map[string]string{{"name": "harbor-regcred"}},
"volumes": []map[string]any{
{"name": "workspace", "emptyDir": map[string]any{}},
{"name": "host-dev", "hostPath": map[string]any{"path": "/dev"}},
{"name": "host-sys", "hostPath": map[string]any{"path": "/sys"}},
{"name": "host-udev", "hostPath": map[string]any{"path": "/run/udev"}},
{"name": "host-tmp", "hostPath": map[string]any{"path": "/tmp"}},
},
},
}
}
func (a *App) remoteArtifactNote(node string) string {
if summary, ok := a.artifacts()[node]; ok && strings.TrimSpace(summary.Ref) != "" {
return summary.Ref
}
return a.artifactRepo(node) + ":latest"
}
func inventoryNodeArch(spec *inventory.NodeSpec, class *inventory.NodeClass) string {
if class != nil && strings.TrimSpace(class.Arch) != "" {
return strings.TrimSpace(class.Arch)
}
return "arm64"
}
2026-03-31 21:46:59 -03:00
func mountedHostTmpDir(path string) string {
path = strings.TrimSpace(path)
switch {
case path == "", path == "/tmp":
return "/host-tmp"
case strings.HasPrefix(path, "/tmp/"):
return filepath.Join("/host-tmp", strings.TrimPrefix(path, "/tmp/"))
default:
return filepath.Join("/host-tmp", strings.TrimPrefix(path, "/"))
}
}
func vaultRuntimeAnnotations(includeSSHKeys bool) map[string]string {
annotations := map[string]string{
"vault.hashicorp.com/agent-inject": "true",
"vault.hashicorp.com/agent-pre-populate-only": "true",
"vault.hashicorp.com/role": vaultRoleMaintenance,
"vault.hashicorp.com/agent-inject-secret-metis-runtime-env.sh": vaultRuntimeSecretPath,
"vault.hashicorp.com/agent-inject-template-metis-runtime-env.sh": `{{ with secret "kv/data/atlas/maintenance/metis-runtime" }}
export METIS_K3S_TOKEN="{{ .Data.data.k3s_token }}"
{{ end }}`,
"vault.hashicorp.com/agent-inject-secret-metis-harbor-env.sh": vaultHarborSecretPath,
"vault.hashicorp.com/agent-inject-template-metis-harbor-env.sh": `{{ with secret "kv/data/atlas/harbor/harbor-core" }}
export METIS_HARBOR_PASSWORD="{{ .Data.data.harbor_admin_password }}"
{{ end }}`,
}
if includeSSHKeys {
annotations["vault.hashicorp.com/agent-inject-secret-metis-ssh-env.sh"] = vaultSSHKeysSecretPath
annotations["vault.hashicorp.com/agent-inject-template-metis-ssh-env.sh"] = `{{ with secret "kv/data/atlas/maintenance/metis-ssh-keys" }}
export METIS_SSH_KEY_BASTION="{{ .Data.data.bastion_pub }}"
export METIS_SSH_KEY_BRAD="{{ .Data.data.brad_pub }}"
export METIS_SSH_KEY_HECATE_TETHYS="{{ .Data.data.hecate_tethys_pub }}"
export METIS_SSH_KEY_HECATE_DB="{{ .Data.data.hecate_db_pub }}"
{{ end }}`
}
return annotations
}
func remoteWorkerEntrypoint(includeSSHKeys bool, args ...string) string {
lines := []string{
"set -e",
". /vault/secrets/metis-runtime-env.sh",
". /vault/secrets/metis-harbor-env.sh",
}
if includeSSHKeys {
lines = append(lines, ". /vault/secrets/metis-ssh-env.sh")
}
lines = append(lines, "exec "+shellJoin(append([]string{"metis"}, args...)...))
return strings.Join(lines, "\n")
}
func shellJoin(args ...string) string {
quoted := make([]string, 0, len(args))
for _, arg := range args {
quoted = append(quoted, shellQuote(arg))
}
return strings.Join(quoted, " ")
}
func shellQuote(value string) string {
if value == "" {
return "''"
}
return "'" + strings.ReplaceAll(value, "'", `'"'"'`) + "'"
}