# services/communication/atlasbot-configmap.yaml apiVersion: v1 kind: ConfigMap metadata: name: atlasbot data: bot.py: | import json, os, time, collections, re from urllib import request, parse, error BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008") AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080") USER = os.environ["BOT_USER"] PASSWORD = os.environ["BOT_PASS"] ROOM_ALIAS = "#othrys:live.bstein.dev" OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/") MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0") API_KEY = os.environ.get("CHAT_API_KEY", "") BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas") SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev") MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()] MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS] MENTION_RE = re.compile(r"(? str: t = token.strip() if not t: return "" if t.startswith("@") and ":" in t: return t t = t.lstrip("@") if ":" in t: return f"@{t}" return f"@{t}:{SERVER_NAME}" MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)} def is_mentioned(content: dict, body: str) -> bool: if MENTION_RE.search(body or "") is not None: return True mentions = content.get("m.mentions", {}) user_ids = mentions.get("user_ids", []) if not isinstance(user_ids, list): return False return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids) def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None): url = (base or BASE) + path data = None headers = {} if body is not None: data = json.dumps(body).encode() headers["Content-Type"] = "application/json" if token: headers["Authorization"] = f"Bearer {token}" r = request.Request(url, data=data, headers=headers, method=method) with request.urlopen(r, timeout=timeout) as resp: raw = resp.read() return json.loads(raw.decode()) if raw else {} def login() -> str: payload = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": USER}, "password": PASSWORD, } res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE) return res["access_token"] def resolve_alias(token: str, alias: str) -> str: enc = parse.quote(alias) res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token) return res["room_id"] def join_room(token: str, room: str): req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={}) def send_msg(token: str, room: str, text: str): path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message" req("POST", path, token, body={"msgtype": "m.text", "body": text}) history = collections.defaultdict(list) # (room_id, sender|None) -> list of str (short transcript) def key_for(room_id: str, sender: str, is_dm: bool): return (room_id, None) if is_dm else (room_id, sender) def ollama_reply(hist_key, prompt: str) -> str: try: # Keep short context as plain text transcript transcript = "\n".join( ["System: You are Atlas, the Titan lab assistant for Othrys. Be helpful, direct, and concise."] + history[hist_key][-24:] + [f"User: {prompt}"] ) payload = {"model": MODEL, "message": transcript} headers = {"Content-Type": "application/json"} if API_KEY: headers["x-api-key"] = API_KEY r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers) with request.urlopen(r, timeout=15) as resp: data = json.loads(resp.read().decode()) reply = data.get("message") or data.get("response") or data.get("reply") or "I'm here to help." history[hist_key].append(f"Atlas: {reply}") return reply except Exception: return "Hi! I'm Atlas." def sync_loop(token: str, room_id: str): since = None try: res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10) since = res.get("next_batch") except Exception: pass while True: params = {"timeout": 30000} if since: params["since"] = since query = parse.urlencode(params) try: res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35) except Exception: time.sleep(5) continue since = res.get("next_batch", since) # invites for rid, data in res.get("rooms", {}).get("invite", {}).items(): try: join_room(token, rid) except Exception: pass # messages for rid, data in res.get("rooms", {}).get("join", {}).items(): timeline = data.get("timeline", {}).get("events", []) for ev in timeline: if ev.get("type") != "m.room.message": continue content = ev.get("content", {}) body = content.get("body", "") if not body.strip(): continue sender = ev.get("sender", "") if sender == f"@{USER}:live.bstein.dev": continue # Only respond if bot is mentioned or in a DM joined_count = data.get("summary", {}).get("m.joined_member_count") is_dm = joined_count is not None and joined_count <= 2 mentioned = is_mentioned(content, body) hist_key = key_for(rid, sender, is_dm) history[hist_key].append(f"{sender}: {body}") history[hist_key] = history[hist_key][-80:] if is_dm or mentioned: reply = ollama_reply(hist_key, body) send_msg(token, rid, reply) def main(): token = login() try: room_id = resolve_alias(token, ROOM_ALIAS) join_room(token, room_id) except Exception: room_id = None sync_loop(token, room_id) if __name__ == "__main__": main()