Skip to content

BaseMemoryService has no efficient way to access cross-session events — forces N+1 queries on every custom implementation #5300

@surfai

Description

@surfai

BaseMemoryService has no efficient way to access cross-session events — forces N+1 queries on every custom implementation

The problem affects all custom BaseMemoryService implementations

Any BaseMemoryService that needs cross-session context (conversation history across multiple sessions for the same user) is forced into an N+1 query pattern. There is no bulk method on BaseSessionService to retrieve events across sessions — you must call list_sessions() (which explicitly returns sessions without events) followed by get_session() for each session individually.

This is not a niche use case. Long-term memory, user preference extraction, cross-conversation summarization, and observational memory all require seeing events beyond the single session passed to add_session_to_memory(session). Every implementation of these patterns on ADK today must work around this gap.

LangGraph avoids this entirely by decoupling its Store (cross-thread memory) from the Checkpointer (per-thread state) — the Store never reads from session records. Mastra avoids it by owning its own mastra_messages table and querying across threads via listMessagesByResourceId(). ADK has neither escape hatch.

Reproduction

Minimal BaseMemoryService that needs cross-session history:

from google.adk.memory import BaseMemoryService
from google.adk.sessions import BaseSessionService, Session

class CrossSessionMemoryService(BaseMemoryService):
    """Memory service that compresses conversation history across sessions."""

    def __init__(self, session_service: BaseSessionService):
        self._session_service = session_service

    async def add_session_to_memory(self, session: Session) -> None:
        # We need events from ALL of this user's sessions, not just the current one.
        # But `session.events` only contains the current session's events.

        # Step 1: list_sessions returns sessions WITHOUT events or state.
        # (ListSessionsResponse docstring: "The events and states are not set
        #  within each Session object.")
        list_resp = await self._session_service.list_sessions(
            app_name=session.app_name,
            user_id=session.user_id,
        )

        # Step 2: N+1 — must call get_session() individually for each session
        # to get their events. No bulk alternative exists.
        all_events = []
        for s in list_resp.sessions:
            if s.id == session.id:
                continue  # current session's events already in hand
            full = await self._session_service.get_session(
                app_name=session.app_name,
                user_id=session.user_id,
                session_id=s.id,
            )
            if full and full.events:
                all_events.extend(full.events)  # one DB round-trip per session

        all_events.extend(session.events)
        # ... compress, summarize, store ...

    async def search_memory(self, *, app_name: str, user_id: str, query: str):
        return {"memory": "..."}

The N+1 cost scales with the number of sessions per user. For DatabaseSessionService, each get_session() call executes a separate SELECT against the app_events table filtered by session_id. The table already has app_name and user_id columns that could support a single cross-session query — but no method exposes this.

Proposed fix

Add an optional get_user_events() method to BaseSessionService with a default N+1 fallback, so concrete backends can override with a single query:

# google/adk/sessions/base_session_service.py

class BaseSessionService(ABC):
    # ... existing methods ...

    async def get_user_events(
        self,
        *,
        app_name: str,
        user_id: str,
        since: float | None = None,
    ) -> list[Event]:
        """Returns events across all sessions for a user.

        Args:
            app_name: The name of the app.
            user_id: The ID of the user.
            since: If provided, only return events with timestamp > since.
                   Enables efficient delta queries for memory services that
                   track their last observation time.

        Returns:
            Events sorted chronologically across all sessions.

        Override in concrete implementations for single-query efficiency.
        Default implementation falls back to list_sessions + get_session (N+1).
        """
        list_resp = await self.list_sessions(
            app_name=app_name, user_id=user_id
        )
        all_events: list[Event] = []
        for s in list_resp.sessions:
            if since and s.last_update_time <= since:
                continue
            full = await self.get_session(
                app_name=app_name, user_id=user_id, session_id=s.id
            )
            if full:
                all_events.extend(full.events)
        all_events.sort(key=lambda e: getattr(e, "timestamp", 0))
        if since:
            all_events = [
                e for e in all_events
                if getattr(e, "timestamp", 0) > since
            ]
        return all_events
# google/adk/sessions/database_session_service.py

class DatabaseSessionService(BaseSessionService):
    # ... existing methods ...

    async def get_user_events(
        self,
        *,
        app_name: str,
        user_id: str,
        since: float | None = None,
    ) -> list[Event]:
        """Single-query override — reads directly from app_events table."""
        async with self._engine.begin() as conn:
            stmt = (
                select(self._event_table)
                .where(self._event_table.c.app_name == app_name)
                .where(self._event_table.c.user_id == user_id)
            )
            if since:
                stmt = stmt.where(self._event_table.c.timestamp > since)
            stmt = stmt.order_by(self._event_table.c.timestamp)
            rows = (await conn.execute(stmt)).fetchall()
        return [self._row_to_event(row) for row in rows]

This lets any BaseMemoryService replace:

# Before: N+1 queries
list_resp = await session_service.list_sessions(...)
for s in list_resp.sessions:
    full = await session_service.get_session(...)

With:

# After: 1 query (on DatabaseSessionService), N+1 fallback (on others)
events = await session_service.get_user_events(
    app_name=session.app_name,
    user_id=session.user_id,
    since=last_observed_at,
)

Context

Environment

  • ADK version: 2.0.0a3
  • Python: 3.14
  • Session backend: DatabaseSessionService (PostgreSQL via asyncpg)

Metadata

Metadata

Assignees

Labels

services[Component] This issue is related to runtime services, e.g. sessions, memory, artifacts, etc

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions