titan-iac/services/communication/atlasbot-configmap.yaml

168 lines
6.9 KiB
YAML

# services/communication/atlasbot-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: atlasbot
data:
bot.py: |
import json, os, time, collections, re
from urllib import request, parse, error
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
BOT_MENTIONS = os.environ.get("BOT_MENTIONS", f"{USER},atlas")
SERVER_NAME = os.environ.get("MATRIX_SERVER_NAME", "live.bstein.dev")
MENTION_TOKENS = [m.strip() for m in BOT_MENTIONS.split(",") if m.strip()]
MENTION_LOCALPARTS = [m.lstrip("@").split(":", 1)[0] for m in MENTION_TOKENS]
MENTION_RE = re.compile(r"(?<!\\w)@(?:" + "|".join(re.escape(m) for m in MENTION_LOCALPARTS) + r")(?:\\:[^\\s]+)?(?!\\w)", re.IGNORECASE)
def normalize_user_id(token: str) -> str:
t = token.strip()
if not t:
return ""
if t.startswith("@") and ":" in t:
return t
t = t.lstrip("@")
if ":" in t:
return f"@{t}"
return f"@{t}:{SERVER_NAME}"
MENTION_USER_IDS = {normalize_user_id(t).lower() for t in MENTION_TOKENS if normalize_user_id(t)}
def is_mentioned(content: dict, body: str) -> bool:
if MENTION_RE.search(body or "") is not None:
return True
mentions = content.get("m.mentions", {})
user_ids = mentions.get("user_ids", [])
if not isinstance(user_ids, list):
return False
return any(isinstance(uid, str) and uid.lower() in MENTION_USER_IDS for uid in user_ids)
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": USER},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
history = collections.defaultdict(list) # (room_id, sender|None) -> list of str (short transcript)
def key_for(room_id: str, sender: str, is_dm: bool):
return (room_id, None) if is_dm else (room_id, sender)
def ollama_reply(hist_key, prompt: str) -> str:
try:
# Keep short context as plain text transcript
transcript = "\n".join(
["System: You are Atlas, the Titan lab assistant for Othrys. Be helpful, direct, and concise."]
+ history[hist_key][-24:]
+ [f"User: {prompt}"]
)
payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=15) as resp:
data = json.loads(resp.read().decode())
reply = data.get("message") or data.get("response") or data.get("reply") or "I'm here to help."
history[hist_key].append(f"Atlas: {reply}")
return reply
except Exception:
return "Hi! I'm Atlas."
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = content.get("body", "")
if not body.strip():
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
# Only respond if bot is mentioned or in a DM
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
mentioned = is_mentioned(content, body)
hist_key = key_for(rid, sender, is_dm)
history[hist_key].append(f"{sender}: {body}")
history[hist_key] = history[hist_key][-80:]
if is_dm or mentioned:
reply = ollama_reply(hist_key, body)
send_msg(token, rid, reply)
def main():
token = login()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()