metis/pkg/service/remote.go

456 lines
14 KiB
Go
Raw Normal View History

package service
import (
"encoding/json"
"fmt"
"path/filepath"
"sort"
"strings"
"time"
"metis/pkg/inventory"
)
const hostTmpDevicePath = "hosttmp:///tmp"
func (a *App) ListDevices(host string) ([]Device, error) {
if host == "" {
host = a.settings.DefaultFlashHost
}
nodeMap := map[string]clusterNode{}
for _, node := range clusterNodes() {
nodeMap[node.Name] = node
}
target, ok := nodeMap[host]
if !ok {
return nil, fmt.Errorf("flash host %s is not a current cluster node", host)
}
image := a.podImageForArch(target.Arch)
if image == "" {
return nil, fmt.Errorf("no runner image configured for arch %s", target.Arch)
}
podName := fmt.Sprintf("metis-devices-%d", time.Now().UTC().UnixNano())
logs, err := a.runRemotePod("", podName, a.remoteDevicePodSpec(podName, host, image))
if err != nil {
return nil, err
}
var payload struct {
Devices []Device `json:"devices"`
}
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &payload); err != nil {
return nil, fmt.Errorf("decode remote devices: %w: %s", err, strings.TrimSpace(logs))
}
sort.Slice(payload.Devices, func(i, j int) bool {
left := deviceScore(payload.Devices[i])
right := deviceScore(payload.Devices[j])
if left != right {
return left > right
}
if payload.Devices[i].SizeBytes != payload.Devices[j].SizeBytes {
return payload.Devices[i].SizeBytes < payload.Devices[j].SizeBytes
}
return payload.Devices[i].Path < payload.Devices[j].Path
})
return payload.Devices, nil
}
func (a *App) runBuild(job *Job, flash bool) {
nodeSpec, class, err := a.inventory.FindNode(job.Node)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
if err := a.ensureHarborProject(); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
builder, err := a.selectBuilderHost(class.Arch, job.Host)
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
job.Builder = builder.Name
buildTag := time.Now().UTC().Format("20060102t150405z")
artifactRef := a.artifactRepo(job.Node)
a.setJob(job.ID, func(j *Job) {
j.Status = JobRunning
j.Stage = "build"
j.Message = fmt.Sprintf("Building on %s (%s) and publishing to Harbor", builder.Name, builder.Arch)
j.ProgressPct = 8
j.Artifact = artifactRef + ":latest"
j.Builder = builder.Name
})
buildImage := a.podImageForArch(builder.Arch)
if buildImage == "" {
a.failJob(job.ID, fmt.Errorf("no runner image configured for arch %s", builder.Arch))
a.metrics.RecordBuild(job.Node, "error")
return
}
buildPod := fmt.Sprintf("metis-build-%d", time.Now().UTC().UnixNano())
logs, err := a.runRemotePod(job.ID, buildPod, a.remoteBuildPodSpec(buildPod, builder.Name, buildImage, job.Node, artifactRef, buildTag))
if err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
var summary ArtifactSummary
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &summary); err != nil {
a.failJob(job.ID, fmt.Errorf("decode remote build output: %w: %s", err, strings.TrimSpace(logs)))
a.metrics.RecordBuild(job.Node, "error")
return
}
summary.Node = job.Node
summary.Ref = artifactRef + ":latest"
summary.BuilderHost = builder.Name
if err := a.recordArtifact(summary); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordBuild(job.Node, "error")
return
}
if err := a.pruneHarborArtifacts(job.Node, 3); err != nil {
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "artifact.prune.warning",
Summary: fmt.Sprintf("Harbor cleanup warning for %s", job.Node),
Details: map[string]any{"node": job.Node, "error": err.Error()},
})
}
a.metrics.RecordBuild(job.Node, "ok")
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "image.build",
Summary: fmt.Sprintf("Built replacement image for %s on %s", job.Node, builder.Name),
Details: map[string]any{"node": job.Node, "artifact": artifactRef + ":latest", "builder": builder.Name},
})
if !flash {
a.completeJob(job.ID, func(j *Job) {
j.Stage = "complete"
j.Message = "Image build complete"
j.ProgressPct = 100
j.Artifact = artifactRef + ":latest"
})
return
}
a.setJob(job.ID, func(j *Job) {
j.Stage = "preflight"
j.Message = fmt.Sprintf("Preparing to flash from Harbor on %s", j.Host)
j.ProgressPct = 78
j.Artifact = artifactRef + ":latest"
})
if _, err := a.ensureDevice(job.Host, job.Device); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordFlash(job.Node, job.Host, "error")
return
}
if !strings.HasPrefix(job.Device, "hosttmp://") {
if err := deleteNodeObject(job.Node); err != nil {
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "node.delete.warning",
Summary: fmt.Sprintf("Could not delete stale Kubernetes node object for %s", job.Node),
Details: map[string]any{"node": job.Node, "error": err.Error()},
})
}
}
if err := a.flashArtifact(job.ID, artifactRef); err != nil {
a.failJob(job.ID, err)
a.metrics.RecordFlash(job.Node, job.Host, "error")
return
}
a.metrics.RecordFlash(job.Node, job.Host, "ok")
a.appendEvent(Event{
Time: time.Now().UTC(),
Kind: "image.flash",
Summary: fmt.Sprintf("Flashed %s latest image on %s", job.Node, job.Host),
Details: map[string]any{"node": job.Node, "device": job.Device, "host": job.Host, "artifact": artifactRef + ":latest"},
})
a.completeJob(job.ID, func(j *Job) {
j.Stage = "complete"
if strings.HasPrefix(j.Device, "hosttmp://") {
j.Message = fmt.Sprintf("Test flash complete on %s host /tmp.", j.Host)
} else {
j.Message = fmt.Sprintf("Flash complete on %s. Move the card into %s and power-cycle it.", j.Host, j.Node)
}
j.ProgressPct = 100
j.Artifact = artifactRef + ":latest"
})
_ = nodeSpec
}
func (a *App) flashArtifact(jobID, artifactRef string) error {
nodes := clusterNodes()
nodeMap := map[string]clusterNode{}
for _, node := range nodes {
nodeMap[node.Name] = node
}
target, ok := nodeMap[a.job(jobID).Host]
if !ok {
return fmt.Errorf("flash host %s is not a current cluster node", a.job(jobID).Host)
}
image := a.podImageForArch(target.Arch)
if image == "" {
return fmt.Errorf("no runner image configured for arch %s", target.Arch)
}
a.setJob(jobID, func(j *Job) {
j.Stage = "flash"
j.Message = fmt.Sprintf("Pulling %s and writing it on %s", artifactRef+":latest", j.Host)
j.ProgressPct = 84
})
podName := fmt.Sprintf("metis-flash-%d", time.Now().UTC().UnixNano())
logs, err := a.runRemotePod(jobID, podName, a.remoteFlashPodSpec(podName, target.Name, image, a.job(jobID).Node, a.job(jobID).Device, artifactRef))
if err != nil {
return err
}
var payload map[string]any
if err := json.Unmarshal([]byte(strings.TrimSpace(logs)), &payload); err == nil {
a.setJob(jobID, func(j *Job) {
if dest, ok := payload["dest_path"].(string); ok && dest != "" {
j.Message = fmt.Sprintf("Wrote latest artifact to %s", dest)
}
})
}
return nil
}
func (a *App) ensureDevice(host, path string) (*Device, error) {
if strings.TrimSpace(path) == "" {
return nil, fmt.Errorf("select removable media before starting a flash run")
}
devices, err := a.ListDevices(host)
if err != nil {
return nil, err
}
for _, device := range devices {
if device.Path == path {
return &device, nil
}
}
return nil, fmt.Errorf("device %s is not a current flash candidate on %s", path, host)
}
func (a *App) selectBuilderHost(arch, flashHost string) (clusterNode, error) {
nodes := clusterNodes()
storageNodes := map[string]struct{}{}
for _, node := range a.inventory.Nodes {
if len(node.LonghornDisks) > 0 {
storageNodes[node.Name] = struct{}{}
}
}
type scored struct {
node clusterNode
score int
}
candidates := make([]scored, 0)
for _, node := range nodes {
if node.Arch != arch || node.Unschedulable || node.ControlPlane {
continue
}
score := 0
if node.Worker {
score += 40
}
switch arch {
case "arm64":
if node.Hardware == "rpi5" {
score += 30
}
if _, storage := storageNodes[node.Name]; storage {
score -= 50
}
case "amd64":
if node.Name == a.settings.DefaultFlashHost {
score += 30
}
if node.Name == "titan-24" {
score -= 10
}
}
if flashHost != "" && node.Name == flashHost {
score += 5
}
candidates = append(candidates, scored{node: node, score: score})
}
sort.Slice(candidates, func(i, j int) bool {
if candidates[i].score != candidates[j].score {
return candidates[i].score > candidates[j].score
}
return candidates[i].node.Name < candidates[j].node.Name
})
if len(candidates) == 0 {
return clusterNode{}, fmt.Errorf("no build host available for arch %s", arch)
}
return candidates[0].node, nil
}
func (a *App) remoteDevicePodSpec(name, host, image string) map[string]any {
return map[string]any{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]any{
"name": name,
"namespace": a.settings.Namespace,
"labels": map[string]string{"app": "metis-remote", "metis-run": "devices"},
},
"spec": map[string]any{
"restartPolicy": "Never",
"serviceAccountName": "metis",
"nodeSelector": map[string]string{
"kubernetes.io/hostname": host,
},
"containers": []map[string]any{
{
"name": "remote-devices",
"image": image,
"imagePullPolicy": "Always",
"command": []string{
"metis", "remote-devices",
"--max-device-bytes", fmt.Sprintf("%d", a.settings.MaxDeviceBytes),
"--host-tmp-dir", filepath.Join("/host-tmp", strings.TrimPrefix(a.settings.HostTmpDir, "/")),
},
"securityContext": map[string]any{"privileged": true, "runAsUser": 0},
"volumeMounts": []map[string]any{
{"name": "host-dev", "mountPath": "/dev"},
{"name": "host-sys", "mountPath": "/sys", "readOnly": true},
{"name": "host-udev", "mountPath": "/run/udev", "readOnly": true},
{"name": "host-tmp", "mountPath": "/host-tmp"},
},
},
},
"imagePullSecrets": []map[string]string{{"name": "harbor-regcred"}},
"volumes": []map[string]any{
{"name": "host-dev", "hostPath": map[string]any{"path": "/dev"}},
{"name": "host-sys", "hostPath": map[string]any{"path": "/sys"}},
{"name": "host-udev", "hostPath": map[string]any{"path": "/run/udev"}},
{"name": "host-tmp", "hostPath": map[string]any{"path": "/tmp"}},
},
},
}
}
func (a *App) remoteBuildPodSpec(name, host, image, node, artifactRef, buildTag string) map[string]any {
return map[string]any{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]any{
"name": name,
"namespace": a.settings.Namespace,
"labels": map[string]string{"app": "metis-remote", "metis-run": "build"},
},
"spec": map[string]any{
"restartPolicy": "Never",
"serviceAccountName": "metis",
"nodeSelector": map[string]string{
"kubernetes.io/hostname": host,
},
"containers": []map[string]any{
{
"name": "remote-build",
"image": image,
"imagePullPolicy": "Always",
"command": []string{
"metis", "remote-build",
"--inventory", a.settings.InventoryPath,
"--node", node,
"--cache", "/workspace/cache",
"--work-dir", "/workspace/build",
"--artifact-ref", artifactRef,
"--build-tag", buildTag,
"--harbor-registry", a.settings.HarborRegistry,
},
"envFrom": []map[string]any{
{"configMapRef": map[string]any{"name": "metis"}},
{"secretRef": map[string]any{"name": "metis-harbor"}},
},
"env": []map[string]any{
{"name": "METIS_K3S_TOKEN", "valueFrom": map[string]any{"secretKeyRef": map[string]any{"name": "metis-runtime", "key": "k3s_token", "optional": true}}},
},
"volumeMounts": []map[string]any{
{"name": "workspace", "mountPath": "/workspace"},
},
},
},
"imagePullSecrets": []map[string]string{{"name": "harbor-regcred"}},
"volumes": []map[string]any{
{"name": "workspace", "emptyDir": map[string]any{}},
},
},
}
}
func (a *App) remoteFlashPodSpec(name, host, image, node, device, artifactRef string) map[string]any {
return map[string]any{
"apiVersion": "v1",
"kind": "Pod",
"metadata": map[string]any{
"name": name,
"namespace": a.settings.Namespace,
"labels": map[string]string{"app": "metis-remote", "metis-run": "flash"},
},
"spec": map[string]any{
"restartPolicy": "Never",
"serviceAccountName": "metis",
"nodeSelector": map[string]string{
"kubernetes.io/hostname": host,
},
"containers": []map[string]any{
{
"name": "remote-flash",
"image": image,
"imagePullPolicy": "Always",
"command": []string{
"metis", "remote-flash",
"--node", node,
"--device", device,
"--artifact-ref", artifactRef,
"--work-dir", "/workspace/flash",
"--harbor-registry", a.settings.HarborRegistry,
"--host-tmp-dir", filepath.Join("/host-tmp", strings.TrimPrefix(a.settings.HostTmpDir, "/")),
},
"securityContext": map[string]any{"privileged": true, "runAsUser": 0},
"envFrom": []map[string]any{
{"configMapRef": map[string]any{"name": "metis"}},
{"secretRef": map[string]any{"name": "metis-harbor"}},
},
"volumeMounts": []map[string]any{
{"name": "workspace", "mountPath": "/workspace"},
{"name": "host-dev", "mountPath": "/dev"},
{"name": "host-sys", "mountPath": "/sys", "readOnly": true},
{"name": "host-udev", "mountPath": "/run/udev", "readOnly": true},
{"name": "host-tmp", "mountPath": "/host-tmp"},
},
},
},
"imagePullSecrets": []map[string]string{{"name": "harbor-regcred"}},
"volumes": []map[string]any{
{"name": "workspace", "emptyDir": map[string]any{}},
{"name": "host-dev", "hostPath": map[string]any{"path": "/dev"}},
{"name": "host-sys", "hostPath": map[string]any{"path": "/sys"}},
{"name": "host-udev", "hostPath": map[string]any{"path": "/run/udev"}},
{"name": "host-tmp", "hostPath": map[string]any{"path": "/tmp"}},
},
},
}
}
func (a *App) remoteArtifactNote(node string) string {
if summary, ok := a.artifacts()[node]; ok && strings.TrimSpace(summary.Ref) != "" {
return summary.Ref
}
return a.artifactRepo(node) + ":latest"
}
func inventoryNodeArch(spec *inventory.NodeSpec, class *inventory.NodeClass) string {
if class != nil && strings.TrimSpace(class.Arch) != "" {
return strings.TrimSpace(class.Arch)
}
return "arm64"
}