atlasbot/atlasbot/main.py

105 lines
3.8 KiB
Python

import asyncio
import logging
import uvicorn
from atlasbot.api.http import Api
from atlasbot.config import load_settings
from atlasbot.engine.answerer import AnswerEngine, AnswerResult, AnswerScores
from atlasbot.knowledge.loader import KnowledgeBase
from atlasbot.llm.client import LLMClient
from atlasbot.logging import configure_logging
from atlasbot.matrix.bot import MatrixBot
from atlasbot.queue.nats import QueueManager
from atlasbot.snapshot.builder import SnapshotProvider
log = logging.getLogger(__name__)
def _build_engine(settings) -> AnswerEngine:
kb = KnowledgeBase(settings.kb_dir)
snapshot = SnapshotProvider(settings)
llm = LLMClient(settings)
return AnswerEngine(settings, llm, kb, snapshot)
async def main() -> None:
settings = load_settings()
configure_logging("INFO")
engine = _build_engine(settings)
async def handler(payload: dict[str, object]) -> dict[str, object]:
history = payload.get("history") if isinstance(payload, dict) else None
conversation_id = payload.get("conversation_id") if isinstance(payload, dict) else None
snapshot_pin = payload.get("snapshot_pin") if isinstance(payload, dict) else None
result = await engine.answer(
str(payload.get("question", "") or ""),
mode=str(payload.get("mode", "quick") or "quick"),
history=history if isinstance(history, list) else None,
conversation_id=str(conversation_id) if isinstance(conversation_id, str) else None,
snapshot_pin=bool(snapshot_pin) if isinstance(snapshot_pin, bool) else None,
)
return {"reply": result.reply, "scores": result.scores.__dict__}
queue = QueueManager(settings, handler)
await queue.start()
async def answer_handler( # noqa: PLR0913
question: str,
mode: str,
history=None,
conversation_id=None,
snapshot_pin: bool | None = None,
observer=None,
) -> AnswerResult:
if settings.queue_enabled:
payload = await queue.submit(
{
"question": question,
"mode": mode,
"history": history or [],
"conversation_id": conversation_id,
"snapshot_pin": snapshot_pin,
}
)
reply = payload.get("reply", "") if isinstance(payload, dict) else ""
return AnswerResult(reply=reply or "", scores=result_scores(payload), meta={"mode": mode})
return await engine.answer(
question,
mode=mode,
history=history,
observer=observer,
conversation_id=conversation_id,
snapshot_pin=snapshot_pin,
)
api = Api(settings, answer_handler)
server = uvicorn.Server(uvicorn.Config(api.app, host="0.0.0.0", port=settings.http_port, log_level="info"))
tasks = []
for bot in settings.matrix_bots:
tasks.append(asyncio.create_task(MatrixBot(settings, bot, engine, answer_handler).run()))
tasks.append(asyncio.create_task(server.serve()))
await asyncio.gather(*tasks)
def result_scores(payload: dict[str, object]) -> AnswerScores:
scores = payload.get("scores") if isinstance(payload, dict) else None
if isinstance(scores, dict):
try:
return AnswerScores(
confidence=int(scores.get("confidence", 60)),
relevance=int(scores.get("relevance", 60)),
satisfaction=int(scores.get("satisfaction", 60)),
hallucination_risk=str(scores.get("hallucination_risk", "medium")),
)
except Exception:
pass
return AnswerScores(confidence=60, relevance=60, satisfaction=60, hallucination_risk="medium")
if __name__ == "__main__":
asyncio.run(main())