ananke/internal/service/gitops_snapshot.go

356 lines
13 KiB
Go

package service
import (
"context"
"encoding/json"
"fmt"
"os/exec"
"sort"
"strings"
"time"
"scm.bstein.dev/bstein/ananke/internal/config"
"scm.bstein.dev/bstein/ananke/internal/metrics"
)
var gitOpsKubectlOutput = runGitOpsKubectlOutput
type gitOpsResourceList struct {
Items []gitOpsResource `json:"items"`
}
type gitOpsResource struct {
Metadata struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
} `json:"metadata"`
Spec struct {
Suspend bool `json:"suspend"`
Ref struct {
Branch string `json:"branch"`
Tag string `json:"tag"`
SemVer string `json:"semver"`
} `json:"ref"`
URL string `json:"url"`
Path string `json:"path"`
SourceRef struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
} `json:"sourceRef"`
Chart struct {
Spec struct {
Chart string `json:"chart"`
Version string `json:"version"`
} `json:"spec"`
} `json:"chart"`
} `json:"spec"`
Status struct {
Artifact struct {
Revision string `json:"revision"`
} `json:"artifact"`
LastAppliedRevision string `json:"lastAppliedRevision"`
LastAttemptedRevision string `json:"lastAttemptedRevision"`
Conditions []struct {
Type string `json:"type"`
Status string `json:"status"`
Reason string `json:"reason"`
} `json:"conditions"`
History []struct {
ChartName string `json:"chartName"`
ChartVersion string `json:"chartVersion"`
AppVersion string `json:"appVersion"`
Digest string `json:"digest"`
} `json:"history"`
} `json:"status"`
}
// maybeStartGitOpsSnapshot starts a bounded background scrape when the GitOps
// sample interval has elapsed.
// Signature: (d *Daemon) maybeStartGitOpsSnapshot(ctx context.Context, lastRun *time.Time, running bool, done chan<- struct{}) bool.
// Why: Kubernetes API reads must not block UPS polling, especially during
// emergency power events when the daemon's first job is still safe shutdown.
func (d *Daemon) maybeStartGitOpsSnapshot(ctx context.Context, lastRun *time.Time, running bool, done chan<- struct{}) bool {
if !d.cfg.GitOps.Enabled || d.exporter == nil || running {
return running
}
interval := time.Duration(d.cfg.GitOps.PollSeconds) * time.Second
if interval <= 0 {
interval = 60 * time.Second
}
now := time.Now()
if lastRun != nil && !lastRun.IsZero() && now.Sub(*lastRun) < interval {
return running
}
if lastRun != nil {
*lastRun = now
}
cfg := d.cfg
exporter := d.exporter
logger := d.log
go func() {
defer func() {
select {
case done <- struct{}{}:
default:
}
}()
scrapeCtx, cancel := context.WithTimeout(ctx, 25*time.Second)
defer cancel()
snapshot := collectGitOpsSnapshot(scrapeCtx, cfg)
exporter.UpdateGitOpsSnapshot(snapshot)
if !snapshot.ScrapeSuccess && logger != nil {
logger.Printf("warning: gitops metrics scrape partial: %s", gitOpsErrorSummary(snapshot.Errors))
}
}()
return true
}
// collectGitOpsSnapshot queries Flux custom resources and converts them into
// Prometheus-safe metric state.
// Signature: collectGitOpsSnapshot(ctx context.Context, cfg config.Config) metrics.GitOpsSnapshot.
// Why: the Grafana overview needs current branch/readiness/suspend state, which
// is object status rather than controller operational telemetry.
func collectGitOpsSnapshot(ctx context.Context, cfg config.Config) metrics.GitOpsSnapshot {
snapshot := metrics.GitOpsSnapshot{
UpdatedAt: time.Now().UTC(),
ScrapeSuccess: true,
Errors: map[string]string{},
}
if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "gitrepositories.source.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil {
snapshot.Errors["gitrepository"] = err.Error()
} else if sources, err := parseGitRepositories(raw); err != nil {
snapshot.Errors["gitrepository"] = err.Error()
} else {
snapshot.FluxSources = sources
}
if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "kustomizations.kustomize.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil {
snapshot.Errors["kustomization"] = err.Error()
} else if kustomizations, err := parseKustomizations(raw); err != nil {
snapshot.Errors["kustomization"] = err.Error()
} else {
snapshot.Kustomizations = kustomizations
}
if raw, err := gitOpsKubectlJSON(ctx, cfg, "get", "helmreleases.helm.toolkit.fluxcd.io", "-A", "-o", "json"); err != nil {
snapshot.Errors["helmrelease"] = err.Error()
} else if releases, err := parseHelmReleases(raw); err != nil {
snapshot.Errors["helmrelease"] = err.Error()
} else {
snapshot.HelmReleases = releases
}
snapshot.ScrapeSuccess = len(snapshot.Errors) == 0
return snapshot
}
// parseGitRepositories extracts Git source branch/revision/readiness from Flux
// GitRepository JSON.
// Signature: parseGitRepositories(raw []byte) ([]metrics.GitOpsFluxSource, error).
// Why: keeping status parsing isolated makes it testable without a live Flux
// API and protects the daemon loop from brittle JSON traversal code.
func parseGitRepositories(raw []byte) ([]metrics.GitOpsFluxSource, error) {
var list gitOpsResourceList
if err := json.Unmarshal(raw, &list); err != nil {
return nil, fmt.Errorf("decode gitrepositories: %w", err)
}
out := make([]metrics.GitOpsFluxSource, 0, len(list.Items))
for _, item := range list.Items {
ready, reason := fluxReady(item.Status.Conditions)
out = append(out, metrics.GitOpsFluxSource{
Namespace: defaultString(item.Metadata.Namespace, "default"),
Name: item.Metadata.Name,
URL: item.Spec.URL,
Branch: sourceBranch(item),
Revision: item.Status.Artifact.Revision,
Ready: ready,
Reason: reason,
Suspended: item.Spec.Suspend,
})
}
sort.Slice(out, func(i, j int) bool {
return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name
})
return out, nil
}
// parseKustomizations extracts Kustomization health and source metadata from
// Flux Kustomization JSON.
// Signature: parseKustomizations(raw []byte) ([]metrics.GitOpsKustomization, error).
// Why: Kustomization status drives both the overview summary and the detailed
// GitOps table, so the parser keeps the metric contract in one place.
func parseKustomizations(raw []byte) ([]metrics.GitOpsKustomization, error) {
var list gitOpsResourceList
if err := json.Unmarshal(raw, &list); err != nil {
return nil, fmt.Errorf("decode kustomizations: %w", err)
}
out := make([]metrics.GitOpsKustomization, 0, len(list.Items))
for _, item := range list.Items {
ready, reason := fluxReady(item.Status.Conditions)
namespace := defaultString(item.Metadata.Namespace, "default")
out = append(out, metrics.GitOpsKustomization{
Namespace: namespace,
Name: item.Metadata.Name,
Path: item.Spec.Path,
SourceNamespace: defaultString(item.Spec.SourceRef.Namespace, namespace),
SourceName: item.Spec.SourceRef.Name,
Revision: firstNonEmpty(item.Status.LastAppliedRevision, item.Status.LastAttemptedRevision),
Ready: ready,
Reason: reason,
Suspended: item.Spec.Suspend,
})
}
sort.Slice(out, func(i, j int) bool {
return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name
})
return out, nil
}
// parseHelmReleases extracts HelmRelease health and chart metadata from Flux
// HelmRelease JSON.
// Signature: parseHelmReleases(raw []byte) ([]metrics.GitOpsHelmRelease, error).
// Why: HelmRelease status has different chart fields than Kustomizations, and
// isolating it keeps dashboard metrics stable as Flux payloads evolve.
func parseHelmReleases(raw []byte) ([]metrics.GitOpsHelmRelease, error) {
var list gitOpsResourceList
if err := json.Unmarshal(raw, &list); err != nil {
return nil, fmt.Errorf("decode helmreleases: %w", err)
}
out := make([]metrics.GitOpsHelmRelease, 0, len(list.Items))
for _, item := range list.Items {
ready, reason := fluxReady(item.Status.Conditions)
chart := item.Spec.Chart.Spec.Chart
version := item.Spec.Chart.Spec.Version
appVersion := ""
revision := firstNonEmpty(item.Status.LastAppliedRevision, item.Status.LastAttemptedRevision)
if len(item.Status.History) > 0 {
latest := item.Status.History[0]
chart = firstNonEmpty(latest.ChartName, chart)
version = firstNonEmpty(latest.ChartVersion, version)
appVersion = latest.AppVersion
revision = firstNonEmpty(latest.Digest, revision)
}
out = append(out, metrics.GitOpsHelmRelease{
Namespace: defaultString(item.Metadata.Namespace, "default"),
Name: item.Metadata.Name,
Chart: chart,
Version: version,
AppVersion: appVersion,
Revision: revision,
Ready: ready,
Reason: reason,
Suspended: item.Spec.Suspend,
})
}
sort.Slice(out, func(i, j int) bool {
return out[i].Namespace+"/"+out[i].Name < out[j].Namespace+"/"+out[j].Name
})
return out, nil
}
// gitOpsKubectlJSON runs kubectl with the configured kubeconfig and a short
// request timeout.
// Signature: gitOpsKubectlJSON(ctx context.Context, cfg config.Config, args ...string) ([]byte, error).
// Why: every GitOps scrape should fail quickly instead of stealing time from
// UPS safety polling during outage recovery.
func gitOpsKubectlJSON(ctx context.Context, cfg config.Config, args ...string) ([]byte, error) {
finalArgs := []string{}
if strings.TrimSpace(cfg.Kubeconfig) != "" {
finalArgs = append(finalArgs, "--kubeconfig", cfg.Kubeconfig)
}
finalArgs = append(finalArgs, "--request-timeout=8s")
finalArgs = append(finalArgs, args...)
return gitOpsKubectlOutput(ctx, "kubectl", finalArgs...)
}
// runGitOpsKubectlOutput is the production command runner for GitOps snapshot
// collection.
// Signature: runGitOpsKubectlOutput(ctx context.Context, name string, args ...string) ([]byte, error).
// Why: preserving stderr in errors makes live diagnosis possible when Flux CRD
// reads fail on a recovering control plane.
func runGitOpsKubectlOutput(ctx context.Context, name string, args ...string) ([]byte, error) {
cmd := exec.CommandContext(ctx, name, args...)
out, err := cmd.CombinedOutput()
if err != nil {
trimmed := strings.TrimSpace(string(out))
if trimmed == "" {
return nil, err
}
return nil, fmt.Errorf("%w: %s", err, trimmed)
}
return out, nil
}
// fluxReady resolves a Flux Ready condition into a boolean and reason.
// Signature: fluxReady(conditions []struct { Type string; Status string; Reason string }) (bool, string).
// Why: Flux resources all use Ready conditions, so one helper keeps status
// interpretation consistent across GitRepository, Kustomization, and HelmRelease.
func fluxReady(conditions []struct {
Type string `json:"type"`
Status string `json:"status"`
Reason string `json:"reason"`
}) (bool, string) {
for _, condition := range conditions {
if condition.Type == "Ready" {
return strings.EqualFold(condition.Status, "True"), defaultString(condition.Reason, "ReadyUnknown")
}
}
return false, "ReadyMissing"
}
// sourceBranch returns the most useful source reference label for a
// GitRepository.
// Signature: sourceBranch(item gitOpsResource) string.
// Why: branch is preferred for Atlas, but tags and semver refs should still
// render clearly if Flux is ever pinned differently.
func sourceBranch(item gitOpsResource) string {
return firstNonEmpty(item.Spec.Ref.Branch, item.Spec.Ref.Tag, item.Spec.Ref.SemVer, "unknown")
}
// defaultString trims a value and returns a fallback when it is empty.
// Signature: defaultString(value string, fallback string) string.
// Why: Flux omits some optional fields; dashboards are easier to read with
// explicit fallback labels.
func defaultString(value string, fallback string) string {
value = strings.TrimSpace(value)
if value == "" {
return fallback
}
return value
}
// firstNonEmpty returns the first non-empty trimmed value.
// Signature: firstNonEmpty(values ...string) string.
// Why: Flux has several revision fields depending on resource kind and
// reconciliation stage, so callers can express preferred fallback order.
func firstNonEmpty(values ...string) string {
for _, value := range values {
value = strings.TrimSpace(value)
if value != "" {
return value
}
}
return ""
}
// gitOpsErrorSummary renders resource-family scrape errors for daemon logs.
// Signature: gitOpsErrorSummary(errors map[string]string) string.
// Why: a compact sorted summary keeps recurring scrape warnings readable in
// journald during partial cluster recovery.
func gitOpsErrorSummary(errors map[string]string) string {
if len(errors) == 0 {
return "none"
}
keys := make([]string, 0, len(errors))
for key := range errors {
keys = append(keys, key)
}
sort.Strings(keys)
parts := make([]string, 0, len(keys))
for _, key := range keys {
parts = append(parts, key+"="+errors[key])
}
return strings.Join(parts, "; ")
}