Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 76 additions & 37 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@
import logging
from typing import TYPE_CHECKING, cast

import attr

from twisted.python.failure import Failure

from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.events.utils import SerializeEventConfig
from synapse.events import EventBase
from synapse.handlers.relations import BundledAggregations
from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
from synapse.logging.opentracing import trace
from synapse.rest.admin._base import assert_user_is_admin
from synapse.streams.config import PaginationConfig
from synapse.types import (
JsonDict,
JsonMapping,
Requester,
ScheduledTask,
StreamKeyType,
StreamToken,
TaskStatus,
)
from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse
Expand Down Expand Up @@ -69,6 +72,55 @@
SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class GetMessagesResult:
"""
Everything needed to serialize a `/messages` response.
"""

messages_chunk: list[EventBase]
"""
A list of room events.

When the request was `Direction.FORWARDS`, the events are in chronological order.

When the request was `Direction.BACKWARDS`, the events are in reverse chronological order.

Note that an empty chunk does not necessarily imply that no more events are
available. Clients should continue to paginate until no `end_token` property is returned.
"""

bundled_aggregations: dict[str, BundledAggregations]
"""
A map of event ID to the bundled aggregations for the events in the chunk.

If an event doesn't have any bundled aggregations, it may not appear in the map.
"""

state: list[EventBase] | None
"""
A list of state events relevant to showing the chunk. For example, if
lazy_load_members is enabled in the filter then this may contain the membership
events for the senders of events in the chunk.
"""
Comment on lines +102 to +106
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should also state that this field is omitted from the response if it's none, as we do with end_token.

And perhaps especially note that empty lists will be omitted as well.


start_token: StreamToken
"""
Token corresponding to the start of chunk. This will be the same as the value given
in `from` query parameter of the `/messages` request.
"""

end_token: StreamToken | None
"""
A token corresponding to the end of chunk. This token can be passed back to this
endpoint to request further events.

If no further events are available (either because we have reached the start of the
timeline, or because the user does not have permission to see any more events), this
property is omitted from the response.
"""


class PaginationHandler:
"""Handles pagination and purge history requests.

Expand Down Expand Up @@ -417,7 +469,7 @@ async def get_messages(
as_client_event: bool = True,
event_filter: Filter | None = None,
use_admin_priviledge: bool = False,
) -> JsonDict:
) -> GetMessagesResult:
"""Get messages in a room.

Args:
Expand Down Expand Up @@ -616,10 +668,13 @@ async def get_messages(
# In that case we do not return end, to tell the client
# there is no need for further queries.
if not events:
return {
"chunk": [],
"start": await from_token.to_string(self.store),
}
return GetMessagesResult(
messages_chunk=[],
bundled_aggregations={},
state=None,
start_token=from_token,
end_token=None,
)

if event_filter:
events = await event_filter.filter(events)
Expand All @@ -635,11 +690,13 @@ async def get_messages(
# if after the filter applied there are no more events
# return immediately - but there might be more in next_token batch
if not events:
return {
"chunk": [],
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}
return GetMessagesResult(
messages_chunk=[],
bundled_aggregations={},
state=None,
start_token=from_token,
end_token=next_token,
)

state = None
if event_filter and event_filter.lazy_load_members and len(events) > 0:
Expand All @@ -656,38 +713,20 @@ async def get_messages(

if state_ids:
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()
state = list(state_dict.values())

aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id
)

time_now = self.clock.time_msec()

serialize_options = SerializeEventConfig(
as_client_event=as_client_event, requester=requester
return GetMessagesResult(
messages_chunk=events,
bundled_aggregations=aggregations,
state=state,
start_token=from_token,
end_token=next_token,
)

chunk = {
"chunk": (
await self._event_serializer.serialize_events(
events,
time_now,
config=serialize_options,
bundle_aggregations=aggregations,
)
),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}

if state:
chunk["state"] = await self._event_serializer.serialize_events(
state, time_now, config=serialize_options
)

return chunk
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better separation of concerns. We now do the serializing/encoding at the REST layer instead of in the handler (see encode_messages_response)


async def _shutdown_and_purge_room(
self,
task: ScheduledTask,
Expand Down
48 changes: 45 additions & 3 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
format_event_for_client_v2,
serialize_event,
)
from synapse.handlers.pagination import GetMessagesResult
from synapse.http.server import HttpServer
from synapse.http.servlet import (
ResolveRoomIdMixin,
Expand All @@ -64,7 +65,7 @@
)
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag
from synapse.logging.opentracing import set_tag, trace
from synapse.metrics import SERVER_NAME_LABEL
from synapse.rest.client._base import client_patterns
from synapse.rest.client.transactions import HttpTransactionCache
Expand Down Expand Up @@ -806,6 +807,7 @@ def __init__(self, hs: "HomeServer"):
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
self.event_serializer = hs.get_event_client_serializer()

async def on_GET(
self, request: SynapseRequest, room_id: str
Expand Down Expand Up @@ -839,22 +841,62 @@ async def on_GET(
):
as_client_event = False

msgs = await self.pagination_handler.get_messages(
serialize_options = SerializeEventConfig(
as_client_event=as_client_event, requester=requester
)

get_messages_result = await self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
as_client_event=as_client_event,
event_filter=event_filter,
)

response_content = await self.encode_response(
get_messages_result, serialize_options
)

processing_end_time = self.clock.time_msec()
room_member_count = await make_deferred_yieldable(room_member_count_deferred)
messsages_response_timer.labels(
room_size=_RoomSize.from_member_count(room_member_count),
**{SERVER_NAME_LABEL: self.server_name},
).observe((processing_end_time - processing_start_time) / 1000)

return 200, msgs
return 200, response_content

@trace
async def encode_response(
self,
get_messages_result: GetMessagesResult,
serialize_options: SerializeEventConfig,
) -> JsonDict:
time_now = self.clock.time_msec()

serialized_result = {
"chunk": (
await self.event_serializer.serialize_events(
get_messages_result.messages_chunk,
time_now,
config=serialize_options,
bundle_aggregations=get_messages_result.bundled_aggregations,
)
),
"start": await get_messages_result.start_token.to_string(self.store),
}

if get_messages_result.end_token:
serialized_result["end"] = await get_messages_result.end_token.to_string(
self.store
)

if get_messages_result.state:
serialized_result["state"] = await self.event_serializer.serialize_events(
get_messages_result.state, time_now, config=serialize_options
)

return serialized_result


# TODO: Needs unit testing
Expand Down