-
Notifications
You must be signed in to change notification settings - Fork 12
Fast join client #191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fast join client #191
Conversation
WalkthroughThe PR introduces a fast_join feature to the Stream Video SDK. When enabled, the system fetches multiple edge server credentials from the coordinator, then races these connections to establish the first successful WebSocket link, improving join latency while maintaining backward compatibility. Changes
Sequence DiagramsequenceDiagram
participant User
participant join() as join()
participant CM as ConnectionManager
participant FA as fast_join_call()
participant CA as Coordinator API
participant Race as _race_edges()
participant WS as WebSocket
User->>join(): join(call, fast_join=True)
join()->>CM: ConnectionManager(fast_join=True)
CM->>CM: _connect_internal()
alt fast_join enabled
CM->>FA: fast_join_call(call, user_id, ...)
FA->>CA: fast_join_call_coordinator_request()
CA-->>FA: StreamResponse[FastJoinCallResponse]
FA-->>CM: credentials_list
CM->>Race: _race_edges(credentials_list, session_id)
loop for each edge credential
Race->>WS: connect_websocket(edge_url)
alt connection succeeds
WS-->>Race: ✓ WebSocket client
Race->>CM: (ws_client, sfu_event, selected_credentials)
CM->>CM: Update ws_url, token with selected credentials
else connection fails
Race->>Race: Try next credential
end
end
else regular join
CM->>CA: join_call_coordinator_request()
CA-->>CM: StreamResponse[JoinCallResponse]
CM->>WS: connect_websocket(ws_url)
end
CM-->>User: Connected ConnectionManager
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (1)
getstream/video/rtc/connection_utils.py (1)
161-216: Code duplication with join_call function.The
fast_join_callfunction duplicates most logic fromjoin_call(lines 129-158), differing primarily in which coordinator function is called and handling multiple credentials vs. a single credential.Consider refactoring to reduce duplication, similar to the suggestion for
coordinator_api.py. The shared logic (error handling, logging, local_sfu override) could be extracted.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (5)
getstream/video/rtc/__init__.py(2 hunks)getstream/video/rtc/connection_manager.py(7 hunks)getstream/video/rtc/connection_utils.py(3 hunks)getstream/video/rtc/coordinator_api.py(2 hunks)getstream/video/rtc/models.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
getstream/video/rtc/models.py (1)
getstream/models/__init__.py (2)
CallResponse(3152-3220)MemberResponse(10671-10700)
getstream/video/rtc/connection_utils.py (3)
getstream/video/rtc/models.py (2)
JoinCallResponse(47-52)FastJoinCallResponse(56-61)getstream/stream_response.py (2)
StreamResponse(11-36)data(22-24)getstream/video/rtc/coordinator_api.py (1)
fast_join_call_coordinator_request(84-146)
getstream/video/rtc/__init__.py (1)
getstream/video/rtc/connection_manager.py (1)
ConnectionManager(49-631)
getstream/video/rtc/connection_manager.py (4)
getstream/stream_response.py (2)
StreamResponse(11-36)data(22-24)getstream/video/rtc/connection_utils.py (4)
fast_join_call(161-215)join_call(129-158)connect_websocket(430-476)SfuConnectionError(83-86)getstream/video/rtc/models.py (1)
JoinCallResponse(47-52)getstream/video/rtc/__init__.py (1)
join(49-71)
getstream/video/rtc/coordinator_api.py (5)
getstream/video/rtc/models.py (2)
JoinCallResponse(47-52)FastJoinCallResponse(56-61)getstream/stream_response.py (3)
data(22-24)StreamResponse(11-36)headers(30-32)getstream/models/__init__.py (1)
CallRequest(3117-3148)getstream/stream.py (1)
create_token(75-104)getstream/utils/__init__.py (1)
build_body_dict(151-175)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Tests (3.11)
🔇 Additional comments (5)
getstream/video/rtc/models.py (1)
55-61: Code duplication with JoinCallResponse is acceptable for clarity.The
FastJoinCallResponsedataclass duplicates all fields fromJoinCallResponseexcept for thecredentialsfield type. While this creates some duplication, it provides clear type safety and makes the distinction between single vs. multiple credentials explicit.getstream/video/rtc/connection_manager.py (3)
11-11: LGTM!Importing
StreamResponseis necessary for constructing the response wrapper when mapping fast join credentials.
58-67: LGTM!The
fast_joinparameter is properly added to the constructor and stored as an instance attribute.
276-307: LGTM!The conditional logic correctly branches between fast join and regular join based on the
fast_joinflag.getstream/video/rtc/__init__.py (1)
49-71: LGTM!The
fast_joinparameter is properly added to thejoinfunction signature, documented in the docstring, and correctly propagated to theConnectionManagerconstructor.
| if self.fast_join and hasattr(self, "_fast_join_response"): | ||
| # Fast join - race multiple edges | ||
| self._ws_client, sfu_event, selected_cred = await self._race_edges( | ||
| self._fast_join_response.data.credentials, current_session_id | ||
| ) | ||
|
|
||
| # Use the selected credentials | ||
| ws_url = selected_cred.server.ws_endpoint | ||
| token = selected_cred.token | ||
|
|
||
| #map it to standard join call object so that retry/migration can happen | ||
| self.join_response = StreamResponse( | ||
| response=self._fast_join_response._StreamResponse__response, | ||
| data=JoinCallResponse( | ||
| call=self._fast_join_response.data.call, | ||
| members=self._fast_join_response.data.members, | ||
| credentials=selected_cred, | ||
| stats_options=self._fast_join_response.data.stats_options, | ||
| duration=self._fast_join_response.data.duration, | ||
| ) | ||
| ) | ||
|
|
||
| span.set_attribute("credentials", selected_cred.to_json()) | ||
| else: | ||
| # Regular join - connect to single edge | ||
| self._ws_client, sfu_event = await connect_websocket( | ||
| token=token, | ||
| ws_url=ws_url, | ||
| session_id=current_session_id, | ||
| options=self._connection_options, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accessing private attribute via name mangling is fragile.
Line 332 accesses self._fast_join_response._StreamResponse__response using Python name mangling to reach a private attribute. This is brittle and will break if the StreamResponse class is refactored.
Consider one of these alternatives:
- Add a public accessor method to
StreamResponse:
# In getstream/stream_response.py
@property
def response(self) -> httpx.Response:
"""Returns the underlying httpx Response."""
return self.__response- Store the httpx.Response separately when creating
_fast_join_response:
# Store the response separately
self._fast_join_http_response = fast_join_response._StreamResponse__response # or use accessor- Allow
StreamResponseto be constructed without the httpx response if it's not needed:
# Make response optional in StreamResponse constructor
self.join_response = JoinCallResponse(
call=self._fast_join_response.data.call,
# ... rest of mapping
)
# Wrap it later if needed🤖 Prompt for AI Agents
In getstream/video/rtc/connection_manager.py around lines 320 to 350, the code
is directly accessing a mangled private attribute
self._fast_join_response._StreamResponse__response which is brittle; change the
usage to rely on a public API: add a public property or accessor on
StreamResponse (e.g. response -> returns underlying httpx.Response) and then
replace the mangled access with self._fast_join_response.response;
alternatively, if exposing the underlying http response is undesirable, store
the httpx.Response separately when creating _fast_join_response or refactor
StreamResponse to allow construction without the httpx.Response and map
join_response without needing that private attribute.
| async def _race_edges(self, credentials_list, session_id): | ||
| """Try multiple edge WebSocket connections sequentially and return the first successful one. | ||
| This method iterates through edge URLs one by one, attempting to connect to each. | ||
| The first edge that successfully connects is used, and the iteration stops. | ||
| Args: | ||
| credentials_list: List of Credentials to try | ||
| session_id: Session ID for the connection | ||
| Returns: | ||
| Tuple of (WebSocket client, SFU event, selected Credentials) | ||
| Raises: | ||
| SfuConnectionError: If all edge connections fail | ||
| """ | ||
| if not credentials_list: | ||
| raise SfuConnectionError("No edge credentials provided for racing") | ||
|
|
||
| logger.info(f"Trying {len(credentials_list)} edge connections sequentially") | ||
|
|
||
| errors = [] | ||
|
|
||
| # Try each edge sequentially | ||
| for cred in credentials_list: | ||
| logger.debug(f"Trying edge {cred.server.edge_name} at {cred.server.ws_endpoint}") | ||
|
|
||
| try: | ||
| # Attempt to connect to this edge | ||
| ws_client, sfu_event = await connect_websocket( | ||
| token=cred.token, | ||
| ws_url=cred.server.ws_endpoint, | ||
| session_id=session_id, | ||
| options=self._connection_options, | ||
| ) | ||
|
|
||
| # Success! Return the connection and credentials | ||
| logger.info( | ||
| f"Edge {cred.server.edge_name} connected successfully" | ||
| ) | ||
| return ws_client, sfu_event, cred | ||
|
|
||
| except Exception as e: | ||
| errors.append((cred.server.edge_name, str(e))) | ||
| # Continue to next edge | ||
|
|
||
| # All connections failed | ||
| error_msg = "All edge connections failed:\n" + "\n".join( | ||
| f" - {edge}: {error}" for edge, error in errors | ||
| ) | ||
| raise SfuConnectionError(error_msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sequential iteration defeats the purpose of "racing" connections.
The _race_edges method tries connections sequentially (one after another), not in parallel. This contradicts the PR's stated goal of "racing" connections to find the fastest edge and the method name suggests parallel racing.
Sequential attempts mean:
- Total time = sum of all timeouts for failed edges before success
- No latency benefit compared to trying a single edge
- The "fast join" feature doesn't deliver on its promise
Implement true parallel racing using asyncio.wait with FIRST_COMPLETED:
async def _race_edges(self, credentials_list, session_id):
"""Race multiple edge WebSocket connections in parallel and return the first successful one.
Args:
credentials_list: List of Credentials to try
session_id: Session ID for the connection
Returns:
Tuple of (WebSocket client, SFU event, selected Credentials)
Raises:
SfuConnectionError: If all edge connections fail
"""
if not credentials_list:
raise SfuConnectionError("No edge credentials provided for racing")
logger.info(f"Racing {len(credentials_list)} edge connections in parallel")
# Create tasks for all edges
async def try_edge(cred):
try:
ws_client, sfu_event = await connect_websocket(
token=cred.token,
ws_url=cred.server.ws_endpoint,
session_id=session_id,
options=self._connection_options,
)
return (ws_client, sfu_event, cred, None)
except Exception as e:
return (None, None, cred, e)
tasks = [asyncio.create_task(try_edge(cred)) for cred in credentials_list]
try:
# Wait for the first successful connection
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Check if we got a success
for task in done:
ws_client, sfu_event, cred, error = task.result()
if error is None:
# Success! Cancel remaining tasks
for t in pending:
t.cancel()
logger.info(f"Edge {cred.server.edge_name} won the race")
return ws_client, sfu_event, cred
# First task completed but failed, keep waiting
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
ws_client, sfu_event, cred, error = task.result()
if error is None:
# Success! Cancel remaining tasks
for t in pending:
t.cancel()
logger.info(f"Edge {cred.server.edge_name} connected successfully")
return ws_client, sfu_event, cred
# All failed - gather errors
errors = []
for task in tasks:
_, _, cred, error = task.result()
if error:
errors.append((cred.server.edge_name, str(error)))
error_msg = "All edge connections failed:\n" + "\n".join(
f" - {edge}: {error}" for edge, error in errors
)
raise SfuConnectionError(error_msg)
finally:
# Ensure all tasks are cleaned up
for task in tasks:
if not task.done():
task.cancel()🤖 Prompt for AI Agents
In getstream/video/rtc/connection_manager.py around lines 581 to 631, the
_race_edges implementation attempts connections sequentially which defeats the
"race" semantics; replace the loop with parallel tasks that each try to connect
(wrap per-edge connect in an async helper and asyncio.create_task), use
asyncio.wait(return_when=FIRST_COMPLETED) to capture the first successful
connection, on success cancel remaining pending tasks and return the ws_client,
sfu_event and credentials, and if all tasks fail gather their exceptions and
raise SfuConnectionError with the combined error message; ensure tasks are
cancelled/cleaned up in a finally block and maintain the existing logging.
| async def fast_join_call_coordinator_request( | ||
| call: Call, | ||
| user_id: str, | ||
| create: bool = False, | ||
| data: Optional[CallRequest] = None, | ||
| ring: Optional[bool] = None, | ||
| notify: Optional[bool] = None, | ||
| video: Optional[bool] = None, | ||
| location: Optional[str] = None, | ||
| ) -> StreamResponse[FastJoinCallResponse]: | ||
| """Make a fast join request to get multiple edge credentials from the coordinator. | ||
| Args: | ||
| call: The call to join | ||
| user_id: The user ID to join the call with | ||
| create: Whether to create the call if it doesn't exist | ||
| data: Additional call data if creating | ||
| ring: Whether to ring other users | ||
| notify: Whether to notify other users | ||
| video: Whether to enable video | ||
| location: The preferred location | ||
| Returns: | ||
| A response containing the call information and an array of credentials for multiple edges | ||
| """ | ||
| # Create a token for this user | ||
| token = call.client.stream.create_token(user_id=user_id) | ||
|
|
||
| # Create a new client with this token | ||
| client = call.client.stream.__class__( | ||
| api_key=call.client.stream.api_key, | ||
| api_secret=call.client.stream.api_secret, | ||
| base_url=call.client.stream.base_url, | ||
| ) | ||
|
|
||
| # Set up authentication | ||
| client.token = token | ||
| client.headers["Authorization"] = token | ||
| client.client.headers["Authorization"] = token | ||
|
|
||
| # Prepare path parameters for the request | ||
| path_params = { | ||
| "type": call.call_type, | ||
| "id": call.id, | ||
| } | ||
|
|
||
| # Build the request body | ||
| json_body = build_body_dict( | ||
| location=location or "FRA", # Default to Frankfurt if not specified | ||
| create=create, | ||
| notify=notify, | ||
| ring=ring, | ||
| video=video, | ||
| data=data, | ||
| ) | ||
|
|
||
| # Make the POST request to fast join the call | ||
| return await client.post( | ||
| "/api/v2/video/call/{type}/{id}/fast_join", | ||
| FastJoinCallResponse, | ||
| path_params=path_params, | ||
| json=json_body, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Significant code duplication with join_call_coordinator_request.
The fast_join_call_coordinator_request function duplicates almost all logic from join_call_coordinator_request (lines 19-81), differing only in the endpoint path and return type.
Consider refactoring to a shared internal function:
async def _call_coordinator_request(
call: Call,
user_id: str,
endpoint: str,
response_type: type,
create: bool = False,
data: Optional[CallRequest] = None,
ring: Optional[bool] = None,
notify: Optional[bool] = None,
video: Optional[bool] = None,
location: Optional[str] = None,
):
"""Internal function to make coordinator requests."""
token = call.client.stream.create_token(user_id=user_id)
client = call.client.stream.__class__(
api_key=call.client.stream.api_key,
api_secret=call.client.stream.api_secret,
base_url=call.client.stream.base_url,
)
client.token = token
client.headers["Authorization"] = token
client.client.headers["Authorization"] = token
path_params = {"type": call.call_type, "id": call.id}
json_body = build_body_dict(
location=location or "FRA",
create=create,
notify=notify,
ring=ring,
video=video,
data=data,
)
return await client.post(
f"/api/v2/video/call/{{type}}/{{id}}/{endpoint}",
response_type,
path_params=path_params,
json=json_body,
)
async def join_call_coordinator_request(...) -> StreamResponse[JoinCallResponse]:
return await _call_coordinator_request(call, user_id, "join", JoinCallResponse, ...)
async def fast_join_call_coordinator_request(...) -> StreamResponse[FastJoinCallResponse]:
return await _call_coordinator_request(call, user_id, "fast_join", FastJoinCallResponse, ...)🤖 Prompt for AI Agents
In getstream/video/rtc/coordinator_api.py around lines 84 to 146,
fast_join_call_coordinator_request duplicates almost all logic from
join_call_coordinator_request (lines ~19-81); extract the shared logic into a
single internal helper (e.g., _call_coordinator_request) that accepts
parameters: call, user_id, endpoint (string), response_type (type), and the
existing optional flags (create, data, ring, notify, video, location); move
token creation, client construction and auth header setup, path_params and
json_body construction into that helper and have join_call_coordinator_request
and fast_join_call_coordinator_request simply call the helper with
endpoint="join"/"fast_join" and
response_type=JoinCallResponse/FastJoinCallResponse, preserving default location
fallback and await/return semantics.
this add's option to use the alternative api to join call which is faster cause it delegates it to the client.
Summary by CodeRabbit