titan-iac/services/communication/atlasbot-configmap.yaml

133 lines
5.3 KiB
YAML

# services/communication/atlasbot-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: atlasbot
data:
bot.py: |
import json, os, time, collections
from urllib import request, parse, error
BASE = os.environ.get("MATRIX_BASE", "http://othrys-synapse-matrix-synapse:8008")
AUTH_BASE = os.environ.get("AUTH_BASE", "http://matrix-authentication-service:8080")
USER = os.environ["BOT_USER"]
PASSWORD = os.environ["BOT_PASS"]
ROOM_ALIAS = "#othrys:live.bstein.dev"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "https://chat.ai.bstein.dev/")
MODEL = os.environ.get("OLLAMA_MODEL", "qwen2.5-coder:7b-instruct-q4_0")
API_KEY = os.environ.get("CHAT_API_KEY", "")
def req(method: str, path: str, token: str | None = None, body=None, timeout=60, base: str | None = None):
url = (base or BASE) + path
data = None
headers = {}
if body is not None:
data = json.dumps(body).encode()
headers["Content-Type"] = "application/json"
if token:
headers["Authorization"] = f"Bearer {token}"
r = request.Request(url, data=data, headers=headers, method=method)
with request.urlopen(r, timeout=timeout) as resp:
raw = resp.read()
return json.loads(raw.decode()) if raw else {}
def login() -> str:
payload = {
"type": "m.login.password",
"identifier": {"type": "m.id.user", "user": USER},
"password": PASSWORD,
}
res = req("POST", "/_matrix/client/v3/login", body=payload, base=AUTH_BASE)
return res["access_token"]
def resolve_alias(token: str, alias: str) -> str:
enc = parse.quote(alias)
res = req("GET", f"/_matrix/client/v3/directory/room/{enc}", token)
return res["room_id"]
def join_room(token: str, room: str):
req("POST", f"/_matrix/client/v3/rooms/{parse.quote(room)}/join", token, body={})
def send_msg(token: str, room: str, text: str):
path = f"/_matrix/client/v3/rooms/{parse.quote(room)}/send/m.room.message"
req("POST", path, token, body={"msgtype": "m.text", "body": text})
history = collections.defaultdict(list) # room_id -> list of str (short transcript)
def ollama_reply(room_id: str, prompt: str) -> str:
try:
# Keep short context as plain text transcript
transcript = "\n".join(history[room_id][-12:] + [f"User: {prompt}"])
payload = {"model": MODEL, "message": transcript}
headers = {"Content-Type": "application/json"}
if API_KEY:
headers["x-api-key"] = API_KEY
r = request.Request(OLLAMA_URL, data=json.dumps(payload).encode(), headers=headers)
with request.urlopen(r, timeout=15) as resp:
data = json.loads(resp.read().decode())
reply = data.get("message") or data.get("response") or data.get("reply") or "I'm here to help."
history[room_id].append(f"Atlas: {reply}")
return reply
except Exception:
return "Hi! I'm Atlas."
def sync_loop(token: str, room_id: str):
since = None
try:
res = req("GET", "/_matrix/client/v3/sync?timeout=0", token, timeout=10)
since = res.get("next_batch")
except Exception:
pass
while True:
params = {"timeout": 30000}
if since:
params["since"] = since
query = parse.urlencode(params)
try:
res = req("GET", f"/_matrix/client/v3/sync?{query}", token, timeout=35)
except Exception:
time.sleep(5)
continue
since = res.get("next_batch", since)
# invites
for rid, data in res.get("rooms", {}).get("invite", {}).items():
try:
join_room(token, rid)
except Exception:
pass
# messages
for rid, data in res.get("rooms", {}).get("join", {}).items():
timeline = data.get("timeline", {}).get("events", [])
for ev in timeline:
if ev.get("type") != "m.room.message":
continue
content = ev.get("content", {})
body = content.get("body", "")
if not body.strip():
continue
sender = ev.get("sender", "")
if sender == f"@{USER}:live.bstein.dev":
continue
# Only respond if bot is mentioned or in a DM
joined_count = data.get("summary", {}).get("m.joined_member_count")
is_dm = joined_count is not None and joined_count <= 2
mentioned = f"@{USER}" in body
history[rid].append(f"{sender}: {body}")
if is_dm or mentioned:
reply = ollama_reply(rid, body)
send_msg(token, rid, reply)
def main():
token = login()
try:
room_id = resolve_alias(token, ROOM_ALIAS)
join_room(token, room_id)
except Exception:
room_id = None
sync_loop(token, room_id)
if __name__ == "__main__":
main()