Experimental federated user directory#19887
Conversation
anoadragon453
left a comment
There was a problem hiding this comment.
I read through the linked doc and had a look through the code (but not yet through the tests). Overall it looks like it's on the right track! I poked a fair few holes below though.
Heads up that a new profile_updates stream is being added shortly in #19556 as part of the User Status work. One could leverage this stream to only sync changes to user profiles when fetching remote user directory entries. Thus, you'd only get diffs instead of needing to sync the whole directory every time. That could help the feature scale to a federation with many more homeservers.
That PR is expected to land on develop within a week.
| ) | ||
|
|
||
| self.bwi_federated_user_dir_federation_search_timeout: int = experimental.get( | ||
| "bwi_federated_user_dir_federation_search_timeout", 2000 |
There was a problem hiding this comment.
Why not use self.parse_duration here as well?
| # Synthetic requester used for the federated user directory sync. The | ||
| # remote server requires the requester to belong to our server. | ||
| self._federated_user_dir_sync_requester = ( | ||
| f"@{FEDERATED_USER_DIR_SYNC_REQUESTER_LOCALPART}:{self.server_name}" |
There was a problem hiding this comment.
What's the point of making up a synthetic requester? It appears to not even be used on the receiving end (see on_user_directory_search_request). It's only used to determine that this user is on our homeserver, but federation authentication (via signing keys) should already prove that.
| """Fetch users from the user directory of a remote server. | ||
|
|
||
| The federation endpoint always returns the remote server's full local | ||
| directory, so no result limit is sent. |
There was a problem hiding this comment.
| """Fetch users from the user directory of a remote server. | |
| The federation endpoint always returns the remote server's full local | |
| directory, so no result limit is sent. | |
| """Fetch the full user directory of a remote server. |
| async def search_user_directory_across_federation( | ||
| self, | ||
| requester: str, | ||
| destinations: Collection[str], | ||
| limit: int = 10, | ||
| ) -> JsonDict: | ||
| """Fetch users from the directories of multiple federated servers. | ||
|
|
||
| Args: | ||
| requester: The user that initiated the request. | ||
| destinations: The servers to query. | ||
| limit: Maximum number of results to return per server. | ||
|
|
||
| Returns: | ||
| Combined results from all servers. | ||
| """ | ||
|
|
||
| if not destinations: | ||
| return {"limited": False, "results": []} | ||
|
|
||
| # Query each server individually and collect results | ||
| combined_results = [] | ||
| limited = False | ||
|
|
||
| # Create a list of deferreds to query each server | ||
| query_tasks = [] | ||
| for destination in destinations: | ||
| if not self._is_mine_server_name(destination): | ||
| # Convert coroutine to Deferred | ||
| deferred = defer.ensureDeferred( | ||
| self.user_directory_search( | ||
| requester, | ||
| destination, | ||
| self.user_directory_search_timeout, | ||
| ) | ||
| ) | ||
| query_tasks.append(deferred) | ||
|
|
||
| # Execute all queries in parallel | ||
| if query_tasks: | ||
| server_results = await make_deferred_yieldable( | ||
| defer.gatherResults( | ||
| query_tasks, | ||
| consumeErrors=True, | ||
| ) | ||
| ) | ||
|
|
||
| # Process results from each server | ||
| for result in server_results: | ||
| if result.get("limited", False): | ||
| limited = True | ||
| combined_results.extend(result.get("results", [])) | ||
|
|
||
| # Limit the total number of results | ||
| if len(combined_results) > limit: | ||
| combined_results = combined_results[:limit] | ||
| limited = True | ||
|
|
||
| return {"limited": limited, "results": combined_results} |
There was a problem hiding this comment.
This appears to be a left-over method that's not actually used anywhere besides tests?
| continue | ||
|
|
||
| user_id = user.get("user_id") | ||
| if not isinstance(user_id, str): | ||
| continue | ||
|
|
||
| display_name = user.get("display_name") | ||
| if not isinstance(display_name, str): | ||
| display_name = None | ||
|
|
||
| avatar_url = user.get("avatar_url") | ||
| if not isinstance(avatar_url, str): | ||
| avatar_url = None |
There was a problem hiding this comment.
It would help to log at the DEBUG level in these cases, to help debug why a certain user's attributes may not be appearing over federation.
| # Only load the federation-aware variant when the experimental feature | ||
| # is enabled; otherwise use the plain, federation-agnostic handler. | ||
| if self.config.experimental.bwi_federated_user_dir_enabled: | ||
| return UserDirectoryFederationHandler(self) |
There was a problem hiding this comment.
Given the methods within UserDirectoryFederationHandler are only called when the experimental feature is enabled, I see no reason to make a new subclass of UserDirectoryHandler. The experimental methods can just be added to UserDirectoryHandler to save yourself an indirection.
| "user_id": "@user:example.com", | ||
| "display_name": "Display Name", | ||
| "avatar_url": "mxc://example.com/avatar", | ||
| "m.user_directory.visibility": "local" |
There was a problem hiding this comment.
This field might be old? I don't see it elsewhere in the PR.
| except Exception as e: | ||
| # If something goes wrong, we still want to return what we have | ||
| logger.exception( | ||
| "Error searching user directory across federation[destination=%s] : %s", | ||
| destination, | ||
| e, | ||
| ) | ||
| return {"limited": False, "results": []} |
There was a problem hiding this comment.
You may wish to handle 429's separately (as the endpoint is rate-limited), i.e. not log an exception.
| ) | ||
| return response | ||
| except Exception as e: | ||
| # If something goes wrong, we still want to return what we have |
There was a problem hiding this comment.
If something goes wrong, we still want to return what we have
Which is... nothing?
| # If something goes wrong, we still want to return what we have | |
| # If something goes wrong, log and return an empty result set. |
| handler = cast( | ||
| "UserDirectoryFederationHandler", self.hs.get_user_directory_handler() | ||
| ) | ||
| await handler.upsert_remote_users(list(entries_by_user.values())) |
There was a problem hiding this comment.
There doesn't appear to be any mechanism for deleting users who have been deactivated on the remote homeserver.
Nor will ceasing federation with a homeserver ever remove those users from the local user directory (as they'll never leave the "fake" room).
Pre-MSC implementation of a federated user directory enabling remote user discovery beyond shared public and private rooms.
The functionality is gated behind the
experimental_features.bwi_federated_user_dir_enabledtoggle.The federated user directory is implemented by periodically fetching user infos from known remote homeservers and caching them locally. Synapse’s existing mechanisms for user search are used for responding to users’ search queries, thus the remote users’ info becomes part of the regular search response.
This change consists of roughly three parts:
/de.bwi.federated_user_dir/user_directory/search.user_directory,users_in_public_rooms) by using a sentinel room.Further implementation details and explanations are documented in more detail under bwi_federated_user_directory.md.
Note that in its current stage the feature is not viable for the open federation, but can sensibly be used for a limited and controlled set of federating homeservers.
Thanks to @TrevisGordan for doing the actual work on this PR!
Pull Request Checklist
EventStoretoEventWorkerStore.".code blocks.