Skip to content

feat: enable chat events on coordinator WebSocket#237

Open
aliev wants to merge 7 commits intomainfrom
feat/chat-events-on-coordinator-ws
Open

feat: enable chat events on coordinator WebSocket#237
aliev wants to merge 7 commits intomainfrom
feat/chat-events-on-coordinator-ws

Conversation

@aliev
Copy link
Copy Markdown
Member

@aliev aliev commented Apr 13, 2026

Why

The coordinator WebSocket is connected and receiving events, but there's no public way to register listeners on it from outside ConnectionManager. This blocks use cases like forwarding call events (turn detection, custom events) to external systems.

Changes

  • Expose coordinator_ws property on ConnectionManager for event listener registration

Usage

Receiving events:

ws = connection_manager.coordinator_ws

@ws.on("custom")
def on_custom(event):
    print(event["custom"])

ws.on_wildcard("call.**", lambda event_type, event: print(event_type))

Sending custom events:

await call.send_call_event(
    user_id="agent",
    custom={"type": "status_update", "status": "processing"},
)

Verified event types:

  • custom -- user-defined events (turn detection, agent heartbeat, etc.)
  • call.session_participant_count_updated -- participant count changes
  • health.check -- keep-alive pings

Future work

  • The Python SDK docs currently don't cover event subscription (only JS/Flutter/Kotlin/Swift do). Adding a proper event subscription API (lazy channel watching, similar to JS SDK's channel.watch() + channel.on()) is a future task once we determine the best approach for the Python SDK.

Summary by CodeRabbit

  • New Features

    • Access to coordinator events is now available after connecting, enabling real-time interaction with the coordinator.
  • Documentation

    • Added guidance on subscribing to coordinator WebSocket events and when the interface becomes available.
  • Tests

    • Added an integration test validating end-to-end custom event publish/subscribe via the coordinator WebSocket.

The coordinator WS now subscribes to both video and chat products,
allowing chat events (message.new, reactions, typing, etc.) to arrive
on the same connection as video call events. No second WS needed.

- Add "chat" to products in StreamAPIWS auth payload
- Add watch_channels() in connection_utils (same pattern as watch_call)
- ConnectionManager subscribes to messaging channel after watch_call
- Expose coordinator_ws property for event listener registration
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Apr 13, 2026

Warning

Rate limit exceeded

@aliev has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 54 minutes and 39 seconds before requesting another review.

Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 54 minutes and 39 seconds.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d17dc48e-fe12-46ba-aca0-f309e8746210

📥 Commits

Reviewing files that changed from the base of the PR and between c68730c and f3f36d1.

📒 Files selected for processing (1)
  • tests/rtc/coordinator/test_custom_events.py
📝 Walkthrough

Walkthrough

A read-only coordinator_ws property was added to ConnectionManager to expose the internal _coordinator_ws_client when it exists and is connected; otherwise it returns None. A new integration test verifies custom event pub/sub over the coordinator WebSocket.

Changes

Cohort / File(s) Summary
ConnectionManager property
getstream/video/rtc/connection_manager.py
Added coordinator_ws read-only @property returning the internal _coordinator_ws_client only when present and _connected is truthy; includes docstring describing purpose and availability.
RTC coordinator integration test
tests/rtc/coordinator/test_custom_events.py
New integration test and fixture that upserts a temp user, joins a call via rtc.join, subscribes to connection.coordinator_ws events, sends a REST call.send_call_event, and asserts round-trip receipt of the custom event.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 I twitch my nose, a tiny cheer,
A websocket door now crystal clear,
Events hop in, and echoes play,
I watch the messages dance and stay.
Hooray — the coordinator sings today! 🎩

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (2 warnings)

Check name Status Explanation Resolution
Title check ⚠️ Warning The PR title 'feat: enable chat events on coordinator WebSocket' is partially related to the actual changes. The code adds a public property to expose the coordinator WebSocket for custom event listeners, not specifically for chat events. The word 'chat' is misleading since the property enables all custom events (verified event types include custom, call.session_participant_count_updated, health.check), not just chat. Update the title to better reflect the actual change, e.g., 'feat: expose coordinator WebSocket for custom event listeners' or 'feat: add public coordinator_ws property to ConnectionManager'.
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/chat-events-on-coordinator-ws

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Chat event subscription needs discussion with the team -- JS SDK uses
two separate WS connections for chat and video, and mixing them on one
connection may affect MAU billing. Keep coordinator_ws property for
custom events which already work via watch_call.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@getstream/video/rtc/connection_manager.py`:
- Around line 623-632: The property coordinator_ws should honor "Returns None if
not connected" by returning the websocket client only when it is actually
connected; update coordinator_ws to check _coordinator_ws_client's connection
state (e.g., _coordinator_ws_client.connected or
_coordinator_ws_client.is_connected) and/or ensure the coordinator helper/task
(e.g., _coordinator_task) is still running, and return None otherwise; use
getattr checks to avoid attribute errors so callers never receive a
non-connected client from coordinator_ws.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 66b96399-2289-4c8e-be81-8a56b62ec916

📥 Commits

Reviewing files that changed from the base of the PR and between 7c4c123 and 810998f.

📒 Files selected for processing (1)
  • getstream/video/rtc/connection_manager.py

Comment thread getstream/video/rtc/connection_manager.py Outdated
Return Optional[StreamAPIWS] and check _connected before returning,
so callers get None instead of a disconnected client.
…r_ws

Verifies the full pub/sub cycle: send a custom event via REST
(call.send_call_event), receive it on ConnectionManager.coordinator_ws
through ws.on("custom") listener. Uses asyncio.Event for reliable
timing instead of fixed sleep.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/rtc/coordinator/test_custom_events.py (1)

36-69: Group integration tests in a test class for consistency.

This test is currently top-level; move it under a test class to match repository test-organization rules.

Suggested structure
-@pytest.mark.asyncio
-@pytest.mark.integration
-@skip_on_rate_limit
-async def test_custom_event_round_trip(async_client: AsyncStream, test_user: str):
-    """Send a custom event via REST and verify it arrives on coordinator_ws."""
-    call = async_client.video.call("default", str(uuid.uuid4()))
-    await call.get_or_create(data=CallRequest(created_by_id=test_user))
-
-    async with await rtc.join(call, test_user) as connection:
-        assert connection.connection_state == ConnectionState.JOINED
-
-        ws = connection.coordinator_ws
-        assert ws is not None
-
-        received_event = None
-        event_received = asyncio.Event()
-
-        `@ws.on`("custom")
-        def on_custom(event):
-            nonlocal received_event
-            received_event = event
-            event_received.set()
-
-        await call.send_call_event(
-            user_id=test_user,
-            custom={"type": "test_event", "payload": "hello from test"},
-        )
-
-        await asyncio.wait_for(event_received.wait(), timeout=10.0)
-
-        assert received_event is not None
-        custom_data = received_event.get("custom", {})
-        assert custom_data.get("type") == "test_event"
-        assert custom_data.get("payload") == "hello from test"
+class TestCoordinatorCustomEvents:
+    `@pytest.mark.asyncio`
+    `@pytest.mark.integration`
+    `@skip_on_rate_limit`
+    async def test_custom_event_round_trip(self, async_client: AsyncStream, test_user: str):
+        """Send a custom event via REST and verify it arrives on coordinator_ws."""
+        call = async_client.video.call("default", str(uuid.uuid4()))
+        await call.get_or_create(data=CallRequest(created_by_id=test_user))
+
+        async with await rtc.join(call, test_user) as connection:
+            assert connection.connection_state == ConnectionState.JOINED
+
+            ws = connection.coordinator_ws
+            assert ws is not None
+
+            received_event = None
+            event_received = asyncio.Event()
+
+            `@ws.on`("custom")
+            def on_custom(event):
+                nonlocal received_event
+                received_event = event
+                event_received.set()
+
+            await call.send_call_event(
+                user_id=test_user,
+                custom={"type": "test_event", "payload": "hello from test"},
+            )
+
+            await asyncio.wait_for(event_received.wait(), timeout=10.0)
+
+            assert received_event is not None
+            custom_data = received_event.get("custom", {})
+            assert custom_data.get("type") == "test_event"
+            assert custom_data.get("payload") == "hello from test"

As per coding guidelines, "**/test_*.py: Keep tests well organized and use test classes to group similar tests."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/rtc/coordinator/test_custom_events.py` around lines 36 - 69, Move the
standalone test_custom_event_round_trip coroutine into a test class (e.g., class
TestCustomEvents:) to group integration tests; keep all existing decorators
(`@pytest.mark.asyncio`, `@pytest.mark.integration`, `@skip_on_rate_limit`) on the
method and retain the signature (async_client: AsyncStream, test_user: str) and
body including the call variable, rtc.join usage, ws.on("custom") handler, and
assertions; ensure the new method name remains test_custom_event_round_trip and
update any imports or fixtures if needed so pytest discovers the test class and
method.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 28-33: The teardown currently swallows all errors in the
try/except around async_client.delete_users (the block calling await
async_client.delete_users(user_ids=[user_id], user="hard", conversations="hard",
messages="hard")), which hides failures; change the except to either log the
exception (e.g., with logger.exception or print) or re-raise/assert so CI fails
— locate the try/except around delete_users in
tests/rtc/coordinator/test_custom_events.py and replace the bare "except
Exception: pass" with an exception handler that records the error details and/or
raises the exception to surface teardown failures.

---

Nitpick comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 36-69: Move the standalone test_custom_event_round_trip coroutine
into a test class (e.g., class TestCustomEvents:) to group integration tests;
keep all existing decorators (`@pytest.mark.asyncio`, `@pytest.mark.integration`,
`@skip_on_rate_limit`) on the method and retain the signature (async_client:
AsyncStream, test_user: str) and body including the call variable, rtc.join
usage, ws.on("custom") handler, and assertions; ensure the new method name
remains test_custom_event_round_trip and update any imports or fixtures if
needed so pytest discovers the test class and method.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 47b0d709-cb15-49fa-8573-c6965a0645f9

📥 Commits

Reviewing files that changed from the base of the PR and between 810998f and c68730c.

📒 Files selected for processing (2)
  • getstream/video/rtc/connection_manager.py
  • tests/rtc/coordinator/test_custom_events.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • getstream/video/rtc/connection_manager.py

Comment thread tests/rtc/coordinator/test_custom_events.py Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants