diff --git a/services/logging/kustomization.yaml b/services/logging/kustomization.yaml index dc487155..5a4c8334 100644 --- a/services/logging/kustomization.yaml +++ b/services/logging/kustomization.yaml @@ -18,6 +18,7 @@ resources: - oneoffs/opensearch-ism-job.yaml - oneoffs/opensearch-dashboards-setup-job.yaml - oneoffs/opensearch-observability-setup-job.yaml + - opensearch-single-node-tune-cronjob.yaml - opensearch-prune-cronjob.yaml - fluent-bit-helmrelease.yaml - node-log-rotation-daemonset.yaml @@ -52,6 +53,12 @@ configMapGenerator: - prune.py=scripts/opensearch_prune.py options: disableNameSuffixHash: true + - name: opensearch-single-node-tune-script + namespace: logging + files: + - tune.py=scripts/opensearch_single_node_tune.py + options: + disableNameSuffixHash: true - name: opensearch-observability-script namespace: logging files: diff --git a/services/logging/opensearch-single-node-tune-cronjob.yaml b/services/logging/opensearch-single-node-tune-cronjob.yaml new file mode 100644 index 00000000..fe9ba712 --- /dev/null +++ b/services/logging/opensearch-single-node-tune-cronjob.yaml @@ -0,0 +1,50 @@ +# services/logging/opensearch-single-node-tune-cronjob.yaml +apiVersion: batch/v1 +kind: CronJob +metadata: + name: opensearch-single-node-tune + namespace: logging +spec: + schedule: "*/30 * * * *" + concurrencyPolicy: Forbid + successfulJobsHistoryLimit: 1 + failedJobsHistoryLimit: 3 + jobTemplate: + spec: + backoffLimit: 2 + template: + spec: + restartPolicy: OnFailure + nodeSelector: + node-role.kubernetes.io/worker: "true" + hardware: rpi5 + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: hardware + operator: In + values: + - rpi5 + containers: + - name: tune + image: python:3.11-alpine + command: ["python", "/scripts/tune.py"] + env: + - name: OPENSEARCH_URL + value: http://opensearch-master.logging.svc.cluster.local:9200 + resources: + requests: + cpu: 10m + memory: 32Mi + limits: + cpu: 200m + memory: 128Mi + volumeMounts: + - name: scripts + mountPath: /scripts + volumes: + - name: scripts + configMap: + name: opensearch-single-node-tune-script diff --git a/services/logging/scripts/opensearch_single_node_tune.py b/services/logging/scripts/opensearch_single_node_tune.py new file mode 100644 index 00000000..f12a6527 --- /dev/null +++ b/services/logging/scripts/opensearch_single_node_tune.py @@ -0,0 +1,111 @@ +import json +import os +import time +import urllib.error +import urllib.request + +OS_URL = os.environ.get("OPENSEARCH_URL", "http://opensearch-master.logging.svc.cluster.local:9200").rstrip("/") + + +def request_json(path, method="GET", payload=None): + data = None + headers = {} + if payload is not None: + data = json.dumps(payload).encode("utf-8") + headers["Content-Type"] = "application/json" + request = urllib.request.Request(f"{OS_URL}{path}", data=data, headers=headers, method=method) + with urllib.request.urlopen(request, timeout=30) as response: + body = response.read().decode("utf-8") + return json.loads(body) if body else {} + + +def wait_for_opensearch(): + for _ in range(60): + try: + request_json("/") + return + except urllib.error.URLError: + time.sleep(5) + raise RuntimeError("OpenSearch did not become reachable") + + +def put(path, payload): + response = request_json(path, method="PUT", payload=payload) + if not response.get("acknowledged", True): + raise RuntimeError(f"OpenSearch did not acknowledge {path}: {response}") + + +def ensure_policy(policy_id, description, min_index_age): + payload = { + "policy": { + "description": description, + "schema_version": 1, + "default_state": "hot", + "states": [ + { + "name": "hot", + "actions": [], + "transitions": [ + { + "state_name": "delete", + "conditions": {"min_index_age": min_index_age}, + } + ], + }, + { + "name": "delete", + "actions": [{"delete": {}}], + "transitions": [], + }, + ], + } + } + put(f"/_plugins/_ism/policies/{policy_id}", payload) + + +def ensure_template(name, patterns, priority, policy_id=None): + settings = { + "index": { + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": "30s", + } + } + if policy_id: + settings["index"]["plugins"] = { + "index_state_management": { + "policy_id": policy_id, + } + } + payload = { + "index_patterns": patterns, + "priority": priority, + "template": { + "settings": settings, + }, + } + put(f"/_index_template/{name}", payload) + + +def ensure_single_node_replicas(): + # A one-node OpenSearch cluster cannot allocate replicas; leaving them at 1 + # makes the cluster permanently yellow and keeps Data Prepper waiting. + put("/*/_settings?expand_wildcards=all", {"index": {"number_of_replicas": 0}}) + + +def main(): + wait_for_opensearch() + ensure_policy("logging-180d", "Delete logs after 180 days", "180d") + ensure_policy("trace-analytics-30d", "Delete trace analytics after 30 days", "30d") + ensure_template("kube-logs", ["kube-*"], 200, "logging-180d") + ensure_template("journald-logs", ["journald-*"], 200, "logging-180d") + ensure_template("trace-analytics", ["trace-analytics-*"], 200, "trace-analytics-30d") + ensure_template("otel-v1-apm-span-index-template", ["otel-v1-apm-span-*"], 250, "trace-analytics-30d") + ensure_template("otel-v1-apm-service-map-index-template", ["otel-v1-apm-service-map"], 250, "trace-analytics-30d") + ensure_template("opendistro-ism-history", [".opendistro-ism-*"], 250) + ensure_single_node_replicas() + print("opensearch_single_node_tune_ok") + + +if __name__ == "__main__": + main()