fix(gateway): queue pending messages instead of dropping them#8292
fix(gateway): queue pending messages instead of dropping them#8292jooray wants to merge 1 commit intoNousResearch:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Updates the gateway’s base platform adapter to queue multiple “pending” incoming messages per session (instead of overwriting a single slot), preventing message loss when users send bursts while an agent turn is in progress.
Changes:
- Change pending-message storage to a per-session FIFO queue (
Dict[str, List[MessageEvent]]) and append new events instead of overwriting. - Preserve photo-album behavior by merging consecutive
PHOTOevents into the last queued photo event. - Dequeue pending events in order and update
get_pending_message()to return the next queued event.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Track active message handlers per session for interrupt support | ||
| # Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt) | ||
| self._active_sessions: Dict[str, asyncio.Event] = {} | ||
| self._pending_messages: Dict[str, MessageEvent] = {} | ||
| self._pending_messages: Dict[str, List[MessageEvent]] = {} | ||
| # Background message-processing tasks spawned by handle_message(). |
There was a problem hiding this comment.
Changing _pending_messages to Dict[str, List[MessageEvent]] breaks existing call sites that treat adapter._pending_messages[session_key] as a single MessageEvent (e.g., gateway/run.py assigns adapter._pending_messages[_quick_key] = event and later reads _adapter._pending_messages.get(session_key).text). Those will now receive a list and crash at runtime; update the runner/tests to use merge_pending_message_event() / get_pending_message() (and adjust any peeking logic to read the first/last queued event).
| if session_key in self._pending_messages and self._pending_messages[session_key]: | ||
| pending_queue = self._pending_messages[session_key] | ||
| pending_event = pending_queue.pop(0) | ||
| if not pending_queue: | ||
| del self._pending_messages[session_key] |
There was a problem hiding this comment.
Using a list queue with pop(0) is O(n) per dequeue. Now that multiple pending messages can accumulate, this can become a hotspot under bursty traffic; consider using collections.deque (popleft) or maintaining an index to avoid repeated front-pops.
| if session_key in self._pending_messages and self._pending_messages[session_key]: | ||
| pending_queue = self._pending_messages[session_key] | ||
| pending_event = pending_queue.pop(0) | ||
| if not pending_queue: | ||
| del self._pending_messages[session_key] | ||
| logger.debug("[%s] Processing queued message from interrupt (%d more pending)", self.name, len(pending_queue) if session_key in self._pending_messages else 0) | ||
| # Clean up current session before processing pending | ||
| if session_key in self._active_sessions: | ||
| del self._active_sessions[session_key] |
There was a problem hiding this comment.
This block now dequeues only one event from a per-session queue; _process_message_background() then continues via self-recursion later in the method. With multi-message bursts, that recursion depth can grow with the queue length and risk hitting recursion limits. Consider refactoring the pending-drain path to an iterative loop instead of recursive await self._process_message_background(...) chaining.
| if not queue: | ||
| del self._pending_messages[session_key] | ||
| return event | ||
|
|
There was a problem hiding this comment.
get_pending_message() previously cleared the entire pending entry for a session; now it removes only one queued event. Callers that use this method to “consume and discard” pending input (e.g., during /stop or /new cleanup in gateway/run.py) will leave additional queued messages behind, which can replay unexpectedly after a reset/stop. Either provide a dedicated clear/drain API or ensure those call sites explicitly drain the queue.
| def drain_pending_messages(self, session_key: str) -> list[MessageEvent]: | |
| """Remove and return all pending messages for a session.""" | |
| queue = self._pending_messages.pop(session_key, None) | |
| if not queue: | |
| return [] | |
| return list(queue) | |
Change _pending_messages from Dict[str, MessageEvent] to Dict[str, List[MessageEvent]] so multiple messages arriving during an active session are queued rather than overwriting each other. Also fix two call sites in run.py that directly assigned to adapter._pending_messages[key] instead of using merge_pending_message_event(), which would produce a bare MessageEvent instead of a list after the type change.
262f4e5 to
0597af3
Compare
|
Closing — the list-based approach caused regressions. Will address message dropping differently per-platform instead. |
Summary
When a message arrives while the agent is processing a previous one,
BasePlatformAdapterstores it in_pending_messagesfor processing after the current turn. The problem:_pending_messageswasDict[str, MessageEvent]— a single slot per session. If messages B and C both arrive while A is processing, C overwrites B and B is silently lost.This affects all messaging platforms (Telegram, Discord, WhatsApp, Signal, SimpleX, Matrix, etc.) since the logic lives in
base.py. It is most noticeable on platforms without built-in batching delays.Changes
_pending_messagesfromDict[str, MessageEvent]toDict[str, List[MessageEvent]]merge_pending_message_event()appends to the queue instead of overwriting (photo album merging still works — consecutive PHOTO events are merged into the last queued photo)get_pending_message()returns the next queued event instead of the only onemerge_pending_message_event()consistentlyTest plan