feat: enable chat events on coordinator WebSocket#237
Conversation
The coordinator WS now subscribes to both video and chat products, allowing chat events (message.new, reactions, typing, etc.) to arrive on the same connection as video call events. No second WS needed. - Add "chat" to products in StreamAPIWS auth payload - Add watch_channels() in connection_utils (same pattern as watch_call) - ConnectionManager subscribes to messaging channel after watch_call - Expose coordinator_ws property for event listener registration
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 54 minutes and 39 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughA read-only Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Chat event subscription needs discussion with the team -- JS SDK uses two separate WS connections for chat and video, and mixing them on one connection may affect MAU billing. Keep coordinator_ws property for custom events which already work via watch_call.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@getstream/video/rtc/connection_manager.py`:
- Around line 623-632: The property coordinator_ws should honor "Returns None if
not connected" by returning the websocket client only when it is actually
connected; update coordinator_ws to check _coordinator_ws_client's connection
state (e.g., _coordinator_ws_client.connected or
_coordinator_ws_client.is_connected) and/or ensure the coordinator helper/task
(e.g., _coordinator_task) is still running, and return None otherwise; use
getattr checks to avoid attribute errors so callers never receive a
non-connected client from coordinator_ws.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 66b96399-2289-4c8e-be81-8a56b62ec916
📒 Files selected for processing (1)
getstream/video/rtc/connection_manager.py
Return Optional[StreamAPIWS] and check _connected before returning, so callers get None instead of a disconnected client.
…r_ws
Verifies the full pub/sub cycle: send a custom event via REST
(call.send_call_event), receive it on ConnectionManager.coordinator_ws
through ws.on("custom") listener. Uses asyncio.Event for reliable
timing instead of fixed sleep.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/rtc/coordinator/test_custom_events.py (1)
36-69: Group integration tests in a test class for consistency.This test is currently top-level; move it under a test class to match repository test-organization rules.
Suggested structure
-@pytest.mark.asyncio -@pytest.mark.integration -@skip_on_rate_limit -async def test_custom_event_round_trip(async_client: AsyncStream, test_user: str): - """Send a custom event via REST and verify it arrives on coordinator_ws.""" - call = async_client.video.call("default", str(uuid.uuid4())) - await call.get_or_create(data=CallRequest(created_by_id=test_user)) - - async with await rtc.join(call, test_user) as connection: - assert connection.connection_state == ConnectionState.JOINED - - ws = connection.coordinator_ws - assert ws is not None - - received_event = None - event_received = asyncio.Event() - - `@ws.on`("custom") - def on_custom(event): - nonlocal received_event - received_event = event - event_received.set() - - await call.send_call_event( - user_id=test_user, - custom={"type": "test_event", "payload": "hello from test"}, - ) - - await asyncio.wait_for(event_received.wait(), timeout=10.0) - - assert received_event is not None - custom_data = received_event.get("custom", {}) - assert custom_data.get("type") == "test_event" - assert custom_data.get("payload") == "hello from test" +class TestCoordinatorCustomEvents: + `@pytest.mark.asyncio` + `@pytest.mark.integration` + `@skip_on_rate_limit` + async def test_custom_event_round_trip(self, async_client: AsyncStream, test_user: str): + """Send a custom event via REST and verify it arrives on coordinator_ws.""" + call = async_client.video.call("default", str(uuid.uuid4())) + await call.get_or_create(data=CallRequest(created_by_id=test_user)) + + async with await rtc.join(call, test_user) as connection: + assert connection.connection_state == ConnectionState.JOINED + + ws = connection.coordinator_ws + assert ws is not None + + received_event = None + event_received = asyncio.Event() + + `@ws.on`("custom") + def on_custom(event): + nonlocal received_event + received_event = event + event_received.set() + + await call.send_call_event( + user_id=test_user, + custom={"type": "test_event", "payload": "hello from test"}, + ) + + await asyncio.wait_for(event_received.wait(), timeout=10.0) + + assert received_event is not None + custom_data = received_event.get("custom", {}) + assert custom_data.get("type") == "test_event" + assert custom_data.get("payload") == "hello from test"As per coding guidelines, "
**/test_*.py: Keep tests well organized and use test classes to group similar tests."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/rtc/coordinator/test_custom_events.py` around lines 36 - 69, Move the standalone test_custom_event_round_trip coroutine into a test class (e.g., class TestCustomEvents:) to group integration tests; keep all existing decorators (`@pytest.mark.asyncio`, `@pytest.mark.integration`, `@skip_on_rate_limit`) on the method and retain the signature (async_client: AsyncStream, test_user: str) and body including the call variable, rtc.join usage, ws.on("custom") handler, and assertions; ensure the new method name remains test_custom_event_round_trip and update any imports or fixtures if needed so pytest discovers the test class and method.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 28-33: The teardown currently swallows all errors in the
try/except around async_client.delete_users (the block calling await
async_client.delete_users(user_ids=[user_id], user="hard", conversations="hard",
messages="hard")), which hides failures; change the except to either log the
exception (e.g., with logger.exception or print) or re-raise/assert so CI fails
— locate the try/except around delete_users in
tests/rtc/coordinator/test_custom_events.py and replace the bare "except
Exception: pass" with an exception handler that records the error details and/or
raises the exception to surface teardown failures.
---
Nitpick comments:
In `@tests/rtc/coordinator/test_custom_events.py`:
- Around line 36-69: Move the standalone test_custom_event_round_trip coroutine
into a test class (e.g., class TestCustomEvents:) to group integration tests;
keep all existing decorators (`@pytest.mark.asyncio`, `@pytest.mark.integration`,
`@skip_on_rate_limit`) on the method and retain the signature (async_client:
AsyncStream, test_user: str) and body including the call variable, rtc.join
usage, ws.on("custom") handler, and assertions; ensure the new method name
remains test_custom_event_round_trip and update any imports or fixtures if
needed so pytest discovers the test class and method.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 47b0d709-cb15-49fa-8573-c6965a0645f9
📒 Files selected for processing (2)
getstream/video/rtc/connection_manager.pytests/rtc/coordinator/test_custom_events.py
🚧 Files skipped from review as they are similar to previous changes (1)
- getstream/video/rtc/connection_manager.py
Why
The coordinator WebSocket is connected and receiving events, but there's no public way to register listeners on it from outside
ConnectionManager. This blocks use cases like forwarding call events (turn detection, custom events) to external systems.Changes
coordinator_wsproperty onConnectionManagerfor event listener registrationUsage
Receiving events:
Sending custom events:
Verified event types:
custom-- user-defined events (turn detection, agent heartbeat, etc.)call.session_participant_count_updated-- participant count changeshealth.check-- keep-alive pingsFuture work
channel.watch()+channel.on()) is a future task once we determine the best approach for the Python SDK.Summary by CodeRabbit
New Features
Documentation
Tests