import json import os import time import urllib.error import urllib.request OS_URL = os.environ.get("OPENSEARCH_URL", "http://opensearch-master.logging.svc.cluster.local:9200").rstrip("/") def request_json(path, method="GET", payload=None): data = None headers = {} if payload is not None: data = json.dumps(payload).encode("utf-8") headers["Content-Type"] = "application/json" request = urllib.request.Request(f"{OS_URL}{path}", data=data, headers=headers, method=method) with urllib.request.urlopen(request, timeout=30) as response: body = response.read().decode("utf-8") return json.loads(body) if body else {} def wait_for_opensearch(): for _ in range(60): try: request_json("/") return except urllib.error.URLError: time.sleep(5) raise RuntimeError("OpenSearch did not become reachable") def put(path, payload): response = request_json(path, method="PUT", payload=payload) if not response.get("acknowledged", True): raise RuntimeError(f"OpenSearch did not acknowledge {path}: {response}") def ensure_policy(policy_id, description, min_index_age): payload = { "policy": { "description": description, "schema_version": 1, "default_state": "hot", "states": [ { "name": "hot", "actions": [], "transitions": [ { "state_name": "delete", "conditions": {"min_index_age": min_index_age}, } ], }, { "name": "delete", "actions": [{"delete": {}}], "transitions": [], }, ], } } try: put(f"/_plugins/_ism/policies/{policy_id}", payload) except urllib.error.HTTPError as error: if error.code == 409: return raise def ensure_template(name, patterns, priority, policy_id=None): settings = { "index": { "number_of_shards": 1, "number_of_replicas": 0, "refresh_interval": "30s", } } if policy_id: settings["index"]["plugins"] = { "index_state_management": { "policy_id": policy_id, } } payload = { "index_patterns": patterns, "priority": priority, "template": { "settings": settings, }, } put(f"/_index_template/{name}", payload) def ensure_single_node_replicas(): # A one-node OpenSearch cluster cannot allocate replicas; leaving them at 1 # makes the cluster permanently yellow and keeps Data Prepper waiting. put("/*/_settings?expand_wildcards=all", {"index": {"number_of_replicas": 0}}) def main(): wait_for_opensearch() ensure_policy("logging-180d", "Delete logs after 180 days", "180d") ensure_policy("trace-analytics-30d", "Delete trace analytics after 30 days", "30d") ensure_template("kube-logs", ["kube-*"], 200, "logging-180d") ensure_template("journald-logs", ["journald-*"], 200, "logging-180d") ensure_template("trace-analytics", ["trace-analytics-*"], 200, "trace-analytics-30d") ensure_template("otel-v1-apm-span-index-template", ["otel-v1-apm-span-*"], 250, "trace-analytics-30d") ensure_template("otel-v1-apm-service-map-index-template", ["otel-v1-apm-service-map"], 250, "trace-analytics-30d") ensure_template("opendistro-ism-history", [".opendistro-ism-*"], 250) ensure_single_node_replicas() print("opensearch_single_node_tune_ok") if __name__ == "__main__": main()