From fcca4107db77cc76dc4cf2739496703e7cfb4810 Mon Sep 17 00:00:00 2001 From: Brad Stein Date: Wed, 28 Jan 2026 12:34:26 -0300 Subject: [PATCH] fix: wrap JetStream payloads with reply info --- atlasbot/queue/nats.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/atlasbot/queue/nats.py b/atlasbot/queue/nats.py index 51fe53f..a274659 100644 --- a/atlasbot/queue/nats.py +++ b/atlasbot/queue/nats.py @@ -41,7 +41,8 @@ class QueueManager: raise RuntimeError("queue not initialized") reply = self._nc.new_inbox() sub = await self._nc.subscribe(reply) - await self._js.publish(self._settings.nats_subject, json.dumps(payload).encode(), reply=reply) + envelope = {"reply": reply, "payload": payload} + await self._js.publish(self._settings.nats_subject, json.dumps(envelope).encode()) msg = await sub.next_msg(timeout=300) await sub.unsubscribe() return json.loads(msg.data.decode()) @@ -73,11 +74,12 @@ class QueueManager: async def _handle_message(self, msg) -> None: try: - payload = json.loads(msg.data.decode()) + envelope = json.loads(msg.data.decode()) except Exception: await msg.ack() return - reply = msg.reply + payload = envelope.get("payload", envelope) + reply = envelope.get("reply") or msg.reply try: result = await self._handler(payload) if reply and self._nc: