356 lines
13 KiB
Go
356 lines
13 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os/exec"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"scm.bstein.dev/bstein/ananke/internal/config"
|
|
"scm.bstein.dev/bstein/ananke/internal/metrics"
|
|
)
|
|
|
|
var gitOpsKubectlOutput = runGitOpsKubectlOutput
|
|
|
|
type gitOpsResourceList struct {
|
|
Items []gitOpsResource `json:"items"`
|
|
}
|
|
|
|
type gitOpsResource struct {
|
|
Metadata struct {
|
|
Name string `json:"name"`
|
|
Namespace string `json:"namespace"`
|
|
} `json:"metadata"`
|
|
Spec struct {
|
|
Suspend bool `json:"suspend"`
|
|
Ref struct {
|
|
Branch string `json:"branch"`
|
|
Tag string `json:"tag"`
|
|
SemVer string `json:"semver"`
|
|
} `json:"ref"`
|
|
URL string `json:"url"`
|
|
Path string `json:"path"`
|
|
SourceRef struct {
|
|
Name string `json:"name"`
|
|
Namespace string `json:"namespace"`
|
|
} `json:"sourceRef"`
|
|
Chart struct {
|
|
Spec struct {
|
|
Chart string `json:"chart"`
|
|
Version string `json:"version"`
|
|
} `json:"spec"`
|
|
} `json:"chart"`
|
|
} `json:"spec"`
|
|
Status struct {
|
|
Artifact struct {
|
|
Revision string `json:"revision"`
|
|
} `json:"artifact"`
|
|
LastAppliedRevision string `json:"lastAppliedRevision"`
|
|
LastAttemptedRevision string `json:"lastAttemptedRevision"`
|
|
Conditions []struct {
|
|
Type string `json:"type"`
|
|
Status string `json:"status"`
|
|
Reason string `json:"reason"`
|
|
} `json:"conditions"`
|
|
History []struct {
|
|
ChartName string `json:"chartName"`
|
|
ChartVersion string `json:"chartVersion"`
|
|
AppVersion string `json:"appVersion"`
|
|
Digest string `json:"digest"`
|
|
} `json:"history"`
|
|
} `json:"status"`
|
|
}
|
|
|
|
// maybeStartGitOpsSnapshot starts a bounded background scrape when the GitOps
|
|
// sample interval has elapsed.
|
|
// Signature: (d *Daemon) maybeStartGitOpsSnapshot(ctx context.Context, lastRun *time.Time, running bool, done chan<- struct{}) bool.
|
|
// Why: Kubernetes API reads must not block UPS polling, especially during
|
|
// emergency power events when the daemon's first job is still safe shutdown.
|
|
func (d *Daemon) maybeStartGitOpsSnapshot(ctx context.Context, lastRun *time.Time, running bool, done chan<- struct{}) bool {
|
|
if !d.cfg.GitOps.Enabled || d.exporter == nil || running {
|
|
return running
|
|
}
|
|
interval := time.Duration(d.cfg.GitOps.PollSeconds) * time.Second
|
|
if interval <= 0 {
|
|
interval = 60 * time.Second
|
|
}
|
|
now := time.Now()
|
|
if lastRun != nil && !lastRun.IsZero() && now.Sub(*lastRun) < interval {
|
|
return running
|
|
}
|
|
if lastRun != nil {
|
|
*lastRun = now
|
|
}
|
|
cfg := d.cfg
|
|
exporter := d.exporter
|
|
logger := d.log
|
|
go func() {
|
|
defer func() {
|
|
select {
|
|
case done <- struct{}{}:
|
|
default:
|
|
}
|
|
}()
|
|
scrapeCtx, cancel := context.WithTimeout(ctx, 25*time.Second)
|
|
defer cancel()
|
|
snapshot := collectGitOpsSnapshot(scrapeCtx, cfg)
|
|
exporter.UpdateGitOpsSnapshot(snapshot)
|
|
if !snapshot.ScrapeSuccess && logger != nil {
|
|
logger.Printf("warning: gitops metrics scrape partial: %s", gitOpsErrorSummary(snapshot.Errors))
|
|
}
|
|
}()
|
|
return true
|
|
}
|
|
|
|
// collectGitOpsSnapshot queries Flux custom resources and converts them into
|
|
// Prometheus-safe metric state.
|
|
// Signature: collectGitOpsSnapshot(ctx context.Context, cfg config.Config) metrics.GitOpsSnapshot.
|
|
// Why: the Grafana overview needs current branch/readiness/suspend state, which
|
|
// is object status rather than controller operational telemetry.
|
|
func collectGitOpsSnapshot(ctx context.Context, cfg config.Config) metrics.GitOpsSnapshot {
|
|
snapshot := metrics.GitOpsSnapshot{
|
|
UpdatedAt: time.Now().UTC(),
|
|
ScrapeSuccess: true,
|
|
Errors: map[string]string{},
|
|
}
|
|
|
|
if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "gitrepositories.source.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil {
|
|
snapshot.Errors["gitrepository"] = err.Error()
|
|
} else if sources, err := parseGitRepositories(raw); err != nil {
|
|
snapshot.Errors["gitrepository"] = err.Error()
|
|
} else {
|
|
snapshot.FluxSources = sources
|
|
}
|
|
|
|
if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil {
|
|
snapshot.Errors["kustomization"] = err.Error()
|
|
} else if kustomizations, err := parseKustomizations(raw); err != nil {
|
|
snapshot.Errors["kustomization"] = err.Error()
|
|
} else {
|
|
snapshot.Kustomizations = kustomizations
|
|
}
|
|
|
|
if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "helmreleases.helm.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil {
|
|
snapshot.Errors["helmrelease"] = err.Error()
|
|
} else if releases, err := parseHelmReleases(raw); err != nil {
|
|
snapshot.Errors["helmrelease"] = err.Error()
|
|
} else {
|
|
snapshot.HelmReleases = releases
|
|
}
|
|
|
|
snapshot.ScrapeSuccess = len(snapshot.Errors) == 0
|
|
return snapshot
|
|
}
|
|
|
|
// parseGitRepositories extracts Git source branch/revision/readiness from Flux
|
|
// GitRepository JSON.
|
|
// Signature: parseGitRepositories(raw []byte) ([]metrics.GitOpsFluxSource, error).
|
|
// Why: keeping status parsing isolated makes it testable without a live Flux
|
|
// API and protects the daemon loop from brittle JSON traversal code.
|
|
func parseGitRepositories(raw []byte) ([]metrics.GitOpsFluxSource, error) {
|
|
var list gitOpsResourceList
|
|
if err := json.Unmarshal(raw, &list); err != nil {
|
|
return nil, fmt.Errorf("decode gitrepositories: %w", err)
|
|
}
|
|
out := make([]metrics.GitOpsFluxSource, 0, len(list.Items))
|
|
for _, item := range list.Items {
|
|
ready, reason := fluxReady(item.Status.Conditions)
|
|
out = append(out, metrics.GitOpsFluxSource{
|
|
Namespace: defaultString(item.Metadata.Namespace, "default"),
|
|
Name: item.Metadata.Name,
|
|
URL: item.Spec.URL,
|
|
Branch: sourceBranch(item),
|
|
Revision: item.Status.Artifact.Revision,
|
|
Ready: ready,
|
|
Reason: reason,
|
|
Suspended: item.Spec.Suspend,
|
|
})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name
|
|
})
|
|
return out, nil
|
|
}
|
|
|
|
// parseKustomizations extracts Kustomization health and source metadata from
|
|
// Flux Kustomization JSON.
|
|
// Signature: parseKustomizations(raw []byte) ([]metrics.GitOpsKustomization, error).
|
|
// Why: Kustomization status drives both the overview summary and the detailed
|
|
// GitOps table, so the parser keeps the metric contract in one place.
|
|
func parseKustomizations(raw []byte) ([]metrics.GitOpsKustomization, error) {
|
|
var list gitOpsResourceList
|
|
if err := json.Unmarshal(raw, &list); err != nil {
|
|
return nil, fmt.Errorf("decode kustomizations: %w", err)
|
|
}
|
|
out := make([]metrics.GitOpsKustomization, 0, len(list.Items))
|
|
for _, item := range list.Items {
|
|
ready, reason := fluxReady(item.Status.Conditions)
|
|
namespace := defaultString(item.Metadata.Namespace, "default")
|
|
out = append(out, metrics.GitOpsKustomization{
|
|
Namespace: namespace,
|
|
Name: item.Metadata.Name,
|
|
Path: item.Spec.Path,
|
|
SourceNamespace: defaultString(item.Spec.SourceRef.Namespace, namespace),
|
|
SourceName: item.Spec.SourceRef.Name,
|
|
Revision: firstNonEmpty(item.Status.LastAppliedRevision, item.Status.LastAttemptedRevision),
|
|
Ready: ready,
|
|
Reason: reason,
|
|
Suspended: item.Spec.Suspend,
|
|
})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name
|
|
})
|
|
return out, nil
|
|
}
|
|
|
|
// parseHelmReleases extracts HelmRelease health and chart metadata from Flux
|
|
// HelmRelease JSON.
|
|
// Signature: parseHelmReleases(raw []byte) ([]metrics.GitOpsHelmRelease, error).
|
|
// Why: HelmRelease status has different chart fields than Kustomizations, and
|
|
// isolating it keeps dashboard metrics stable as Flux payloads evolve.
|
|
func parseHelmReleases(raw []byte) ([]metrics.GitOpsHelmRelease, error) {
|
|
var list gitOpsResourceList
|
|
if err := json.Unmarshal(raw, &list); err != nil {
|
|
return nil, fmt.Errorf("decode helmreleases: %w", err)
|
|
}
|
|
out := make([]metrics.GitOpsHelmRelease, 0, len(list.Items))
|
|
for _, item := range list.Items {
|
|
ready, reason := fluxReady(item.Status.Conditions)
|
|
chart := item.Spec.Chart.Spec.Chart
|
|
version := item.Spec.Chart.Spec.Version
|
|
appVersion := ""
|
|
revision := firstNonEmpty(item.Status.LastAppliedRevision, item.Status.LastAttemptedRevision)
|
|
if len(item.Status.History) > 0 {
|
|
latest := item.Status.History[0]
|
|
chart = firstNonEmpty(latest.ChartName, chart)
|
|
version = firstNonEmpty(latest.ChartVersion, version)
|
|
appVersion = latest.AppVersion
|
|
revision = firstNonEmpty(latest.Digest, revision)
|
|
}
|
|
out = append(out, metrics.GitOpsHelmRelease{
|
|
Namespace: defaultString(item.Metadata.Namespace, "default"),
|
|
Name: item.Metadata.Name,
|
|
Chart: chart,
|
|
Version: version,
|
|
AppVersion: appVersion,
|
|
Revision: revision,
|
|
Ready: ready,
|
|
Reason: reason,
|
|
Suspended: item.Spec.Suspend,
|
|
})
|
|
}
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name
|
|
})
|
|
return out, nil
|
|
}
|
|
|
|
// gitOpsKubectlJSON runs kubectl with the configured kubeconfig and a short
|
|
// request timeout.
|
|
// Signature: gitOpsKubectlJSON(ctx context.Context, cfg config.Config, args ...string) ([]byte, error).
|
|
// Why: every GitOps scrape should fail quickly instead of stealing time from
|
|
// UPS safety polling during outage recovery.
|
|
func gitOpsKubectlJSON(ctx context.Context, cfg config.Config, args ...string) ([]byte, error) {
|
|
finalArgs := []string{}
|
|
if strings.TrimSpace(cfg.Kubeconfig) != "" {
|
|
finalArgs = append(finalArgs, "--kubeconfig", cfg.Kubeconfig)
|
|
}
|
|
finalArgs = append(finalArgs, "--request-timeout=8s")
|
|
finalArgs = append(finalArgs, args...)
|
|
return gitOpsKubectlOutput(ctx, "kubectl", finalArgs...)
|
|
}
|
|
|
|
// runGitOpsKubectlOutput is the production command runner for GitOps snapshot
|
|
// collection.
|
|
// Signature: runGitOpsKubectlOutput(ctx context.Context, name string, args ...string) ([]byte, error).
|
|
// Why: preserving stderr in errors makes live diagnosis possible when Flux CRD
|
|
// reads fail on a recovering control plane.
|
|
func runGitOpsKubectlOutput(ctx context.Context, name string, args ...string) ([]byte, error) {
|
|
cmd := exec.CommandContext(ctx, name, args...)
|
|
out, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
trimmed := strings.TrimSpace(string(out))
|
|
if trimmed == "" {
|
|
return nil, err
|
|
}
|
|
return nil, fmt.Errorf("%w: %s", err, trimmed)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// fluxReady resolves a Flux Ready condition into a boolean and reason.
|
|
// Signature: fluxReady(conditions []struct { Type string; Status string; Reason string }) (bool, string).
|
|
// Why: Flux resources all use Ready conditions, so one helper keeps status
|
|
// interpretation consistent across GitRepository, Kustomization, and HelmRelease.
|
|
func fluxReady(conditions []struct {
|
|
Type string `json:"type"`
|
|
Status string `json:"status"`
|
|
Reason string `json:"reason"`
|
|
}) (bool, string) {
|
|
for _, condition := range conditions {
|
|
if condition.Type == "Ready" {
|
|
return strings.EqualFold(condition.Status, "True"), defaultString(condition.Reason, "ReadyUnknown")
|
|
}
|
|
}
|
|
return false, "ReadyMissing"
|
|
}
|
|
|
|
// sourceBranch returns the most useful source reference label for a
|
|
// GitRepository.
|
|
// Signature: sourceBranch(item gitOpsResource) string.
|
|
// Why: branch is preferred for Atlas, but tags and semver refs should still
|
|
// render clearly if Flux is ever pinned differently.
|
|
func sourceBranch(item gitOpsResource) string {
|
|
return firstNonEmpty(item.Spec.Ref.Branch, item.Spec.Ref.Tag, item.Spec.Ref.SemVer, "unknown")
|
|
}
|
|
|
|
// defaultString trims a value and returns a fallback when it is empty.
|
|
// Signature: defaultString(value string, fallback string) string.
|
|
// Why: Flux omits some optional fields; dashboards are easier to read with
|
|
// explicit fallback labels.
|
|
func defaultString(value string, fallback string) string {
|
|
value = strings.TrimSpace(value)
|
|
if value == "" {
|
|
return fallback
|
|
}
|
|
return value
|
|
}
|
|
|
|
// firstNonEmpty returns the first non-empty trimmed value.
|
|
// Signature: firstNonEmpty(values ...string) string.
|
|
// Why: Flux has several revision fields depending on resource kind and
|
|
// reconciliation stage, so callers can express preferred fallback order.
|
|
func firstNonEmpty(values ...string) string {
|
|
for _, value := range values {
|
|
value = strings.TrimSpace(value)
|
|
if value != "" {
|
|
return value
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// gitOpsErrorSummary renders resource-family scrape errors for daemon logs.
|
|
// Signature: gitOpsErrorSummary(errors map[string]string) string.
|
|
// Why: a compact sorted summary keeps recurring scrape warnings readable in
|
|
// journald during partial cluster recovery.
|
|
func gitOpsErrorSummary(errors map[string]string) string {
|
|
if len(errors) == 0 {
|
|
return "none"
|
|
}
|
|
keys := make([]string, 0, len(errors))
|
|
for key := range errors {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
parts := make([]string, 0, len(keys))
|
|
for _, key := range keys {
|
|
parts = append(parts, key+"="+errors[key])
|
|
}
|
|
return strings.Join(parts, "; ")
|
|
}
|