Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18877.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix an issue which could cause room state to diverge from other servers under bad federation connectivity.
20 changes: 17 additions & 3 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1306,9 +1306,17 @@ async def _get_state_ids_after_missing_prev_event(
await self._get_state_and_persist(destination, room_id, event_id)
else:
logger.debug("Fetching %i events from remote", len(missing_event_ids))
await self._get_events_and_persist(
persisted_all = await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=missing_event_ids
)
# We cannot continue else we will end up persisting the prev event in question with
# known bad room state with missing events. This will cause us to state drift from other
# servers. Fallback to just requesting the entire state.
if not persisted_all:
logger.warning(
"Failed to fetch missed state via /event calls, requesting complete state"
)
await self._get_state_and_persist(destination, room_id, event_id)

# We now need to fill out the state map, which involves fetching the
# type and state key for each event ID in the state.
Expand Down Expand Up @@ -1399,9 +1407,11 @@ async def _get_state_and_persist(

# we also need the event itself.
if not await self._store.have_seen_event(room_id, event_id):
await self._get_events_and_persist(
persisted = await self._get_events_and_persist(
destination=destination, room_id=room_id, event_ids=(event_id,)
)
if not persisted:
raise Exception(f"failed to retrieve and persist event {event_id}")

@trace
async def _process_received_pdu(
Expand Down Expand Up @@ -1598,14 +1608,17 @@ async def backfill_event_id(
@tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: StrCollection
) -> None:
) -> bool:
"""Fetch the given events from a server, and persist them as outliers.

This function *does not* recursively get missing auth events of the
newly fetched events. Callers must include in the `event_ids` argument
any missing events from the auth chain.

Logs a warning if we can't find the given event.

Returns:
True if all events were persisted.
"""

room_version = await self._store.get_room_version(room_id)
Expand Down Expand Up @@ -1640,6 +1653,7 @@ async def get_event(event_id: str) -> None:
await concurrently_execute(get_event, event_ids, 5)
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)
return len(events) == len(event_ids)

@trace
async def _auth_and_persist_outliers(
Expand Down
Loading