Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19887.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pre-MSC implementation of a federated user directory.
20 changes: 20 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,23 @@ def read_config(

# MSC4491: Invite reasons in room creation
self.msc4491_enabled: bool = experimental.get("msc4491_enabled", False)

# Pre-MSC implementation of federated user search.
self.bwi_federated_user_dir_enabled: bool = experimental.get(
"bwi_federated_user_dir_enabled", False
)

self.bwi_federated_user_dir_federation_search_timeout: int = experimental.get(
"bwi_federated_user_dir_federation_search_timeout", 2000

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use self.parse_duration here as well?

)

self.bwi_federated_user_dir_sync_interval_ms: int = self.parse_duration(
experimental.get("bwi_federated_user_dir_sync_interval", "4h")
)

if self.bwi_federated_user_dir_enabled:
if self.bwi_federated_user_dir_sync_interval_ms < 1:
raise ConfigError(
"experimental_features.bwi_federated_user_dir_sync_interval must "
"be positive"
)
215 changes: 214 additions & 1 deletion synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@
Optional,
Sequence,
TypeVar,
cast,
)

import attr
from prometheus_client import Counter

from twisted.internet import defer

from synapse.api.constants import Direction, EventContentFields, EventTypes, Membership
from synapse.api.errors import (
CodeMessageException,
Expand Down Expand Up @@ -68,15 +71,24 @@
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.client import is_unknown_endpoint
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.types import (
JsonDict,
RemoteUserDirectoryEntry,
StrCollection,
UserID,
get_domain_from_id,
)
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.duration import Duration
from synapse.util.retryutils import NotRetryingDestination

if TYPE_CHECKING:
from synapse.handlers.user_directory import UserDirectoryFederationHandler
from synapse.server import HomeServer

logger = logging.getLogger(__name__)
Expand All @@ -88,6 +100,10 @@

PDU_RETRY_TIME_MS = 1 * 60 * 1000

# Localpart of the synthetic requester MXID used when querying remote homeservers
# during the periodic federated user directory sync.
FEDERATED_USER_DIR_SYNC_REQUESTER_LOCALPART = "_user_directory_sync"

T = TypeVar("T")


Expand Down Expand Up @@ -134,6 +150,9 @@ def __init__(self, hs: "HomeServer"):
self._clock.looping_call(self._clear_tried_cache, Duration(minutes=1))
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
self.user_directory_search_timeout = (
hs.config.experimental.bwi_federated_user_dir_federation_search_timeout
)

self.server_name = hs.hostname
self.signing_key = hs.signing_key
Expand Down Expand Up @@ -170,6 +189,25 @@ def __init__(self, hs: "HomeServer"):
reset_expiry_on_get=False,
)

# Synthetic requester used for the federated user directory sync. The
# remote server requires the requester to belong to our server.
self._federated_user_dir_sync_requester = (
f"@{FEDERATED_USER_DIR_SYNC_REQUESTER_LOCALPART}:{self.server_name}"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of making up a synthetic requester? It appears to not even be used on the receiving end (see on_user_directory_search_request). It's only used to determine that this user is on our homeserver, but federation authentication (via signing keys) should already prove that.

)

# Periodically sync remote homeservers' user directories into our own,
# but only on the worker that runs background tasks.
if (
hs.config.experimental.bwi_federated_user_dir_enabled
and hs.config.worker.run_background_tasks
):
self._clock.looping_call(
self._sync_federated_user_directory,
Duration(
milliseconds=hs.config.experimental.bwi_federated_user_dir_sync_interval_ms
),
)

def _clear_tried_cache(self) -> None:
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()
Expand Down Expand Up @@ -1916,6 +1954,181 @@ def filter_user_id(user_id: str) -> bool:

return filtered_statuses, filtered_failures

async def user_directory_search(
self,
requester: str,
destination: str,
timeout: int,
) -> JsonDict:
"""Fetch users from the user directory of a remote server.

The federation endpoint always returns the remote server's full local
directory, so no result limit is sent.
Comment on lines +1963 to +1966

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Fetch users from the user directory of a remote server.
The federation endpoint always returns the remote server's full local
directory, so no result limit is sent.
"""Fetch the full user directory of a remote server.


Args:
requester: The user that initiated the request.
destination: The server to query.
timeout: Timeout in milliseconds for the request.

Returns:
The results containing a list of users from the remote directory.
"""
try:
response = await self.transport_layer.user_directory_search(
requester, destination, timeout
)
return response
except Exception as e:
# If something goes wrong, we still want to return what we have

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If something goes wrong, we still want to return what we have

Which is... nothing?

Suggested change
# If something goes wrong, we still want to return what we have
# If something goes wrong, log and return an empty result set.

logger.exception(
"Error searching user directory across federation[destination=%s] : %s",
destination,
e,
)
return {"limited": False, "results": []}
Comment on lines +1981 to +1988

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may wish to handle 429's separately (as the endpoint is rate-limited), i.e. not log an exception.


async def search_user_directory_across_federation(
self,
requester: str,
destinations: Collection[str],
limit: int = 10,
) -> JsonDict:
"""Fetch users from the directories of multiple federated servers.

Args:
requester: The user that initiated the request.
destinations: The servers to query.
limit: Maximum number of results to return per server.

Returns:
Combined results from all servers.
"""

if not destinations:
return {"limited": False, "results": []}

# Query each server individually and collect results
combined_results = []
limited = False

# Create a list of deferreds to query each server
query_tasks = []
for destination in destinations:
if not self._is_mine_server_name(destination):
# Convert coroutine to Deferred
deferred = defer.ensureDeferred(
self.user_directory_search(
requester,
destination,
self.user_directory_search_timeout,
)
)
query_tasks.append(deferred)

# Execute all queries in parallel
if query_tasks:
server_results = await make_deferred_yieldable(
defer.gatherResults(
query_tasks,
consumeErrors=True,
)
)

# Process results from each server
for result in server_results:
if result.get("limited", False):
limited = True
combined_results.extend(result.get("results", []))

# Limit the total number of results
if len(combined_results) > limit:
combined_results = combined_results[:limit]
limited = True

return {"limited": limited, "results": combined_results}
Comment on lines +1990 to +2048

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to be a left-over method that's not actually used anywhere besides tests?


@staticmethod
def _parse_remote_user_directory_results(
response: JsonDict,
) -> list[RemoteUserDirectoryEntry]:
"""Parse a remote user directory search response into typed entries.

Malformed entries are skipped. This keeps the federation wire format
contained within the federation layer.
"""
entries: list[RemoteUserDirectoryEntry] = []
for user in response.get("results", []):
if not isinstance(user, dict):
continue

user_id = user.get("user_id")
if not isinstance(user_id, str):
continue

display_name = user.get("display_name")
if not isinstance(display_name, str):
display_name = None

avatar_url = user.get("avatar_url")
if not isinstance(avatar_url, str):
avatar_url = None
Comment on lines +2062 to +2074

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would help to log at the DEBUG level in these cases, to help debug why a certain user's attributes may not be appearing over federation.


entries.append(
RemoteUserDirectoryEntry(
user_id=user_id,
display_name=display_name,
avatar_url=avatar_url,
)
)

return entries

@wrap_as_background_process("federated_user_directory_sync")
async def _sync_federated_user_directory(self) -> None:
"""Periodically fetch users from known homeservers' user directories
and hand them to the local user directory for storage.

This is the federation-side background job: it decides which servers to
contact and how to query them. Persisting the results is delegated to
the user directory handler, which knows nothing about federation.
"""
destinations = await self.store.get_known_destinations()
if not destinations:
logger.debug("Federated user directory sync: no known destinations")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.debug("Federated user directory sync: no known destinations")
logger.debug("ending federated user directory sync: no known destinations")

return

# De-duplicate by user id across destinations.
entries_by_user: dict[str, RemoteUserDirectoryEntry] = {}

for destination in destinations:
if self._is_mine_server_name(destination):
continue

response = await self.user_directory_search(
self._federated_user_dir_sync_requester,
destination,
self.user_directory_search_timeout,
)

for entry in self._parse_remote_user_directory_results(response):
entries_by_user[entry.user_id] = entry

if not entries_by_user:
logger.debug("Federated user directory sync found no remote users")
return

# This job is only scheduled when the feature is enabled, in which case
# the homeserver exposes the federation-aware handler variant.
handler = cast(
"UserDirectoryFederationHandler", self.hs.get_user_directory_handler()
)
await handler.upsert_remote_users(list(entries_by_user.values()))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There doesn't appear to be any mechanism for deleting users who have been deactivated on the remote homeserver.

Nor will ceasing federation with a homeserver ever remove those users from the local user directory (as they'll never leave the "fake" room).


logger.info(
"Federated user directory sync upserted %d remote users",
len(entries_by_user),
)
Comment on lines +2127 to +2130

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also be at the DEBUG level.


async def federation_download_media(
self,
destination: str,
Expand Down
51 changes: 50 additions & 1 deletion synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,13 @@
from synapse.storage.databases.main.lock import Lock
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.roommember import MemberSummary
from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id
from synapse.types import (
JsonDict,
JsonMapping,
StateMap,
UserID,
get_domain_from_id,
)
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
Expand Down Expand Up @@ -1477,6 +1483,49 @@ async def check_server_matches_acl(self, server_name: str, room_id: str) -> None
):
raise AuthError(code=403, msg="Server is banned from room")

async def on_user_directory_search_request(
self, requester_id: str, origin: str
) -> tuple[int, JsonMapping]:
"""Handle a user directory request from a remote server.

Returns every searchable local user, since the federation endpoint
always syncs the full local directory rather than matching a term.

Args:
requester_id: The user ID of the requester on the origin server.
origin: The server that sent the request.

Returns:
A tuple of (response code, response json)
"""
return 200, await self._search_all_users()

async def _search_all_users(self) -> JsonDict:
"""Return all of this server's own users from the user directory.

Reads the directory straight from the database and filters to locally
owned users, since the federation endpoint must only expose this
homeserver's own users (the table may also hold cached remote users).

Returns:
A dict of the form ``{"limited": False, "results": [...]}``.
"""
results = await self.store.get_users_in_user_dir()

# Federation endpoint: only return users local to this homeserver.
filtered_results = []
for user in results.get("results", []):
try:
if self.hs.is_mine_id(user["user_id"]):
filtered_results.append(user)
except SynapseError:
# Ignore malformed user IDs.
continue
Comment on lines +1518 to +1523

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_mind_id will never raise a SynapseError.

In fact, is_mine_id appears to currently be infallible (as long as user_id is a str):

synapse/synapse/server.py

Lines 682 to 693 in 2517db4

def is_mine_id(self, user_id: str) -> bool:
"""Determines whether a user ID or room alias originates from this homeserver.
Returns:
`True` if the hostname part of the user ID or room alias matches this
homeserver.
`False` otherwise, or if the user ID or room alias is malformed.
"""
localpart_hostname = user_id.split(":", 1)
if len(localpart_hostname) < 2:
return False
return localpart_hostname[1] == self.hostname


# The federation endpoint never truncates: it always returns the full
# local directory.
return {"limited": False, "results": filtered_results}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why define limited if it's always False?



class FederationHandlerRegistry:
"""Allows classes to register themselves as handlers for a given EDU or
Expand Down
Loading
Loading