titan-iac/scripts/logging_render_observability.py

314 lines
10 KiB
Python
Raw Normal View History

2026-01-09 23:27:07 -03:00
#!/usr/bin/env python3
"""Generate OpenSearch Observability seed objects and render them into ConfigMaps.
Usage:
scripts/logging_render_observability.py --build # rebuild JSON + ConfigMap
scripts/logging_render_observability.py # re-render ConfigMap from JSON
"""
from __future__ import annotations
import argparse
import json
import textwrap
from dataclasses import dataclass
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
OBS_DIR = ROOT / "services" / "logging" / "observability"
APPS_PATH = OBS_DIR / "applications.json"
QUERIES_PATH = OBS_DIR / "saved_queries.json"
VIS_PATH = OBS_DIR / "saved_visualizations.json"
CONFIG_PATH = ROOT / "services" / "logging" / "opensearch-observability-objects.yaml"
CONFIG_TEMPLATE = textwrap.dedent(
"""# {relative_path}
# Generated by scripts/logging_render_observability.py --build
apiVersion: v1
kind: ConfigMap
metadata:
name: opensearch-observability-objects
namespace: logging
data:
applications.json: |
{applications}
saved_queries.json: |
{queries}
saved_visualizations.json: |
{visualizations}
"""
)
DEFAULT_RANGE = {"start": "now-24h", "end": "now", "text": ""}
DEFAULT_TIMESTAMP = {"name": "@timestamp", "type": "timestamp"}
DEFAULT_FIELDS = {"text": "", "tokens": []}
@dataclass(frozen=True)
class AppSpec:
name: str
base_query: str
kind: str = "kube"
description: str = ""
@dataclass(frozen=True)
class QuerySpec:
name: str
query: str
description: str = ""
@dataclass(frozen=True)
class VisualizationSpec:
name: str
query: str
vis_type: str
description: str = ""
def source_query(index: str, where: str | None = None) -> str:
query = f"source = {index}"
if where:
query += f" | where {where}"
return query
def error_filter(fields: list[str]) -> str:
parts = [f"match({field}, 'error|exception|fail')" for field in fields]
return " or ".join(parts)
def saved_query(spec: QuerySpec) -> dict:
return {
"name": spec.name,
"description": spec.description,
"query": spec.query,
"selected_date_range": DEFAULT_RANGE,
"selected_timestamp": DEFAULT_TIMESTAMP,
"selected_fields": DEFAULT_FIELDS,
}
def saved_visualization(spec: VisualizationSpec) -> dict:
return {
"name": spec.name,
"description": spec.description,
"query": spec.query,
"type": spec.vis_type,
"selected_date_range": DEFAULT_RANGE,
"selected_timestamp": DEFAULT_TIMESTAMP,
"selected_fields": DEFAULT_FIELDS,
}
def build_objects() -> tuple[list[dict], list[dict], list[dict]]:
kube_error = error_filter(["log", "message"])
journald_error = error_filter(["MESSAGE"])
apps = [
AppSpec("bstein-dev-home", source_query("kube-*", "kubernetes.namespace_name = 'bstein-dev-home'")),
AppSpec(
"pegasus",
source_query(
"kube-*",
"kubernetes.namespace_name = 'jellyfin' and kubernetes.labels.app = 'pegasus'",
),
),
AppSpec(
"jellyfin",
source_query(
"kube-*",
"kubernetes.namespace_name = 'jellyfin' and kubernetes.labels.app = 'jellyfin'",
),
),
AppSpec("vaultwarden", source_query("kube-*", "kubernetes.namespace_name = 'vaultwarden'")),
AppSpec("mailu", source_query("kube-*", "kubernetes.namespace_name = 'mailu-mailserver'")),
AppSpec("nextcloud", source_query("kube-*", "kubernetes.namespace_name = 'nextcloud'")),
AppSpec("gitea", source_query("kube-*", "kubernetes.namespace_name = 'gitea'")),
AppSpec("jenkins", source_query("kube-*", "kubernetes.namespace_name = 'jenkins'")),
AppSpec("harbor", source_query("kube-*", "kubernetes.namespace_name = 'harbor'")),
AppSpec("vault", source_query("kube-*", "kubernetes.namespace_name = 'vault'")),
AppSpec("keycloak", source_query("kube-*", "kubernetes.namespace_name = 'sso'")),
AppSpec("flux-system", source_query("kube-*", "kubernetes.namespace_name = 'flux-system'")),
AppSpec("comms", source_query("kube-*", "kubernetes.namespace_name = 'comms'")),
AppSpec(
"element-web",
source_query(
"kube-*",
"kubernetes.namespace_name = 'comms' and kubernetes.container_name = 'element-web'",
),
),
AppSpec(
"element-call",
source_query(
"kube-*",
"kubernetes.namespace_name = 'comms' and kubernetes.labels.app = 'element-call'",
),
),
AppSpec(
"matrix-synapse",
source_query(
"kube-*",
"kubernetes.namespace_name = 'comms' and kubernetes.container_name = 'synapse'",
),
),
AppSpec(
"livekit",
source_query(
"kube-*",
"kubernetes.namespace_name = 'comms' and kubernetes.labels.app = 'livekit'",
),
),
AppSpec(
"coturn",
source_query(
"kube-*",
"kubernetes.namespace_name = 'comms' and kubernetes.labels.app = 'coturn'",
),
),
AppSpec(
"lesavka",
source_query("journald-*", "_HOSTNAME = 'titan-jh'"),
kind="journald",
),
]
applications = [
{
"name": app.name,
"description": app.description,
"baseQuery": app.base_query,
"servicesEntities": [],
2026-01-10 00:12:55 -03:00
"traceGroups": [app.name],
2026-01-09 23:27:07 -03:00
}
for app in apps
]
queries = [
saved_query(QuerySpec("kube logs", source_query("kube-*"))),
saved_query(QuerySpec("kube errors", f"{source_query('kube-*')} | where {kube_error}")),
saved_query(QuerySpec("journald logs", source_query("journald-*"))),
saved_query(QuerySpec("journald errors", f"{source_query('journald-*')} | where {journald_error}")),
]
for app in apps:
query_base = app.base_query
error_clause = journald_error if app.kind == "journald" else kube_error
queries.append(saved_query(QuerySpec(f"{app.name} logs", query_base)))
queries.append(saved_query(QuerySpec(f"{app.name} errors", f"{query_base} | where {error_clause}")))
visualizations = [
saved_visualization(
VisualizationSpec(
"[Kube] Logs per hour",
"source = kube-* | stats count() as log_count by span(`@timestamp`, 1h)",
"line",
)
),
saved_visualization(
VisualizationSpec(
"[Kube] Errors per hour",
f"source = kube-* | where {kube_error} | stats count() as error_count by span(`@timestamp`, 1h)",
"line",
)
),
saved_visualization(
VisualizationSpec(
"[Kube] Top namespaces",
"source = kube-* | stats count() as log_count by kubernetes.namespace_name | sort - log_count",
"bar",
)
),
saved_visualization(
VisualizationSpec(
"[Kube] Top error namespaces",
f"source = kube-* | where {kube_error} | stats count() as error_count by kubernetes.namespace_name | sort - error_count",
"bar",
)
),
saved_visualization(
VisualizationSpec(
"[Kube] Top pods",
"source = kube-* | stats count() as log_count by kubernetes.pod_name | sort - log_count",
"bar",
)
),
saved_visualization(
VisualizationSpec(
"[Kube] Top error pods",
f"source = kube-* | where {kube_error} | stats count() as error_count by kubernetes.pod_name | sort - error_count",
"bar",
)
),
saved_visualization(
VisualizationSpec(
"[Kube] Top nodes",
"source = kube-* | stats count() as log_count by kubernetes.node_name | sort - log_count",
"bar",
)
),
saved_visualization(
VisualizationSpec(
"[Journald] Top units",
"source = journald-* | stats count() as log_count by _SYSTEMD_UNIT | sort - log_count",
"bar",
)
),
saved_visualization(
VisualizationSpec(
"[Journald] Top error units",
f"source = journald-* | where {journald_error} | stats count() as error_count by _SYSTEMD_UNIT | sort - error_count",
"bar",
)
),
]
return applications, queries, visualizations
def write_json(payload: list[dict], path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2) + "\n")
def render_configmap(apps_path: Path, queries_path: Path, vis_path: Path, output_path: Path) -> None:
relative_path = output_path.relative_to(ROOT)
applications = indent_payload(apps_path)
queries = indent_payload(queries_path)
visualizations = indent_payload(vis_path)
output_path.write_text(
CONFIG_TEMPLATE.format(
relative_path=relative_path,
applications=applications,
queries=queries,
visualizations=visualizations,
)
)
def indent_payload(path: Path) -> str:
lines = path.read_text().splitlines()
return "\n".join(" " + line for line in lines)
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("--build", action="store_true", help="Regenerate JSON payloads and ConfigMap")
args = parser.parse_args()
if args.build:
applications, queries, visualizations = build_objects()
write_json(applications, APPS_PATH)
write_json(queries, QUERIES_PATH)
write_json(visualizations, VIS_PATH)
if not (APPS_PATH.exists() and QUERIES_PATH.exists() and VIS_PATH.exists()):
raise SystemExit("Missing observability JSON payloads. Run with --build first.")
render_configmap(APPS_PATH, QUERIES_PATH, VIS_PATH, CONFIG_PATH)
if __name__ == "__main__":
main()