Skip to content

fix(gateway): queue pending messages instead of dropping them#8292

Closed
jooray wants to merge 1 commit intoNousResearch:mainfrom
jooray:fix/pending-message-queue
Closed

fix(gateway): queue pending messages instead of dropping them#8292
jooray wants to merge 1 commit intoNousResearch:mainfrom
jooray:fix/pending-message-queue

Conversation

@jooray
Copy link
Copy Markdown

@jooray jooray commented Apr 12, 2026

Summary

When a message arrives while the agent is processing a previous one, BasePlatformAdapter stores it in _pending_messages for processing after the current turn. The problem: _pending_messages was Dict[str, MessageEvent] — a single slot per session. If messages B and C both arrive while A is processing, C overwrites B and B is silently lost.

This affects all messaging platforms (Telegram, Discord, WhatsApp, Signal, SimpleX, Matrix, etc.) since the logic lives in base.py. It is most noticeable on platforms without built-in batching delays.

Changes

  • Change _pending_messages from Dict[str, MessageEvent] to Dict[str, List[MessageEvent]]
  • merge_pending_message_event() appends to the queue instead of overwriting (photo album merging still works — consecutive PHOTO events are merged into the last queued photo)
  • Pending consumption pops from the front of the queue and continues processing remaining items
  • get_pending_message() returns the next queued event instead of the only one
  • Interrupt queueing now uses merge_pending_message_event() consistently

Test plan

  • Send 3+ messages quickly while the agent is processing — all should get responses or be included in the next turn
  • Send multiple photos in quick succession — should still be grouped as an album
  • Verify interrupt behavior still works (new message interrupts current processing)
  • Test across platforms (Telegram, Discord, SimpleX)

Copilot AI review requested due to automatic review settings April 12, 2026 09:35
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Updates the gateway’s base platform adapter to queue multiple “pending” incoming messages per session (instead of overwriting a single slot), preventing message loss when users send bursts while an agent turn is in progress.

Changes:

  • Change pending-message storage to a per-session FIFO queue (Dict[str, List[MessageEvent]]) and append new events instead of overwriting.
  • Preserve photo-album behavior by merging consecutive PHOTO events into the last queued photo event.
  • Dequeue pending events in order and update get_pending_message() to return the next queued event.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread gateway/platforms/base.py
Comment on lines 749 to 753
# Track active message handlers per session for interrupt support
# Key: session_key (e.g., chat_id), Value: (event, asyncio.Event for interrupt)
self._active_sessions: Dict[str, asyncio.Event] = {}
self._pending_messages: Dict[str, MessageEvent] = {}
self._pending_messages: Dict[str, List[MessageEvent]] = {}
# Background message-processing tasks spawned by handle_message().
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing _pending_messages to Dict[str, List[MessageEvent]] breaks existing call sites that treat adapter._pending_messages[session_key] as a single MessageEvent (e.g., gateway/run.py assigns adapter._pending_messages[_quick_key] = event and later reads _adapter._pending_messages.get(session_key).text). Those will now receive a list and crash at runtime; update the runner/tests to use merge_pending_message_event() / get_pending_message() (and adjust any peeking logic to read the first/last queued event).

Copilot uses AI. Check for mistakes.
Comment thread gateway/platforms/base.py
Comment on lines +1755 to +1759
if session_key in self._pending_messages and self._pending_messages[session_key]:
pending_queue = self._pending_messages[session_key]
pending_event = pending_queue.pop(0)
if not pending_queue:
del self._pending_messages[session_key]
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a list queue with pop(0) is O(n) per dequeue. Now that multiple pending messages can accumulate, this can become a hotspot under bursty traffic; consider using collections.deque (popleft) or maintaining an index to avoid repeated front-pops.

Copilot uses AI. Check for mistakes.
Comment thread gateway/platforms/base.py
Comment on lines +1755 to 1763
if session_key in self._pending_messages and self._pending_messages[session_key]:
pending_queue = self._pending_messages[session_key]
pending_event = pending_queue.pop(0)
if not pending_queue:
del self._pending_messages[session_key]
logger.debug("[%s] Processing queued message from interrupt (%d more pending)", self.name, len(pending_queue) if session_key in self._pending_messages else 0)
# Clean up current session before processing pending
if session_key in self._active_sessions:
del self._active_sessions[session_key]
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block now dequeues only one event from a per-session queue; _process_message_background() then continues via self-recursion later in the method. With multi-message bursts, that recursion depth can grow with the queue length and risk hitting recursion limits. Consider refactoring the pending-drain path to an iterative loop instead of recursive await self._process_message_background(...) chaining.

Copilot uses AI. Check for mistakes.
Comment thread gateway/platforms/base.py
if not queue:
del self._pending_messages[session_key]
return event

Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_pending_message() previously cleared the entire pending entry for a session; now it removes only one queued event. Callers that use this method to “consume and discard” pending input (e.g., during /stop or /new cleanup in gateway/run.py) will leave additional queued messages behind, which can replay unexpectedly after a reset/stop. Either provide a dedicated clear/drain API or ensure those call sites explicitly drain the queue.

Suggested change
def drain_pending_messages(self, session_key: str) -> list[MessageEvent]:
"""Remove and return all pending messages for a session."""
queue = self._pending_messages.pop(session_key, None)
if not queue:
return []
return list(queue)

Copilot uses AI. Check for mistakes.
Change _pending_messages from Dict[str, MessageEvent] to
Dict[str, List[MessageEvent]] so multiple messages arriving during
an active session are queued rather than overwriting each other.

Also fix two call sites in run.py that directly assigned to
adapter._pending_messages[key] instead of using
merge_pending_message_event(), which would produce a bare
MessageEvent instead of a list after the type change.
@jooray jooray force-pushed the fix/pending-message-queue branch from 262f4e5 to 0597af3 Compare April 14, 2026 12:29
@jooray
Copy link
Copy Markdown
Author

jooray commented Apr 14, 2026

Closing — the list-based approach caused regressions. Will address message dropping differently per-platform instead.

@jooray jooray closed this Apr 14, 2026
@jooray jooray deleted the fix/pending-message-queue branch April 17, 2026 21:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants