2121import logging
2222from typing import TYPE_CHECKING , cast
2323
24+ import attr
25+
2426from twisted .python .failure import Failure
2527
2628from synapse .api .constants import Direction , EventTypes , Membership
2729from synapse .api .errors import SynapseError
2830from synapse .api .filtering import Filter
29- from synapse .events .utils import SerializeEventConfig
31+ from synapse .events import EventBase
32+ from synapse .handlers .relations import BundledAggregations
3033from synapse .handlers .worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
3134from synapse .logging .opentracing import trace
3235from synapse .rest .admin ._base import assert_user_is_admin
3336from synapse .streams .config import PaginationConfig
3437from synapse .types import (
35- JsonDict ,
3638 JsonMapping ,
3739 Requester ,
3840 ScheduledTask ,
3941 StreamKeyType ,
42+ StreamToken ,
4043 TaskStatus ,
4144)
4245from synapse .types .handlers import ShutdownRoomParams , ShutdownRoomResponse
7073SHUTDOWN_AND_PURGE_ROOM_ACTION_NAME = "shutdown_and_purge_room"
7174
7275
76+ @attr .s (slots = True , frozen = True , auto_attribs = True )
77+ class GetMessagesResult :
78+ """
79+ Everything needed to serialize a `/messages` response.
80+ """
81+
82+ messages_chunk : list [EventBase ]
83+ """
84+ A list of room events.
85+
86+ - When the request is `Direction.FORWARDS`, events will be in the range:
87+ `start_token` < x <= `end_token`, (ascending topological_order)
88+ - When the request is `Direction.BACKWARDS`, events will be in the range:
89+ `start_token` >= x > `end_token`, (descending topological_order)
90+
91+ Note that an empty chunk does not necessarily imply that no more events are
92+ available. Clients should continue to paginate until no `end_token` property is returned.
93+ """
94+
95+ bundled_aggregations : dict [str , BundledAggregations ]
96+ """
97+ A map of event ID to the bundled aggregations for the events in the chunk.
98+
99+ If an event doesn't have any bundled aggregations, it may not appear in the map.
100+ """
101+
102+ state : list [EventBase ] | None
103+ """
104+ A list of state events relevant to showing the chunk. For example, if
105+ lazy_load_members is enabled in the filter then this may contain the membership
106+ events for the senders of events in the chunk.
107+
108+ Omitted from the response when `None`.
109+ """
110+
111+ start_token : StreamToken
112+ """
113+ Token corresponding to the start of chunk. This will be the same as the value given
114+ in `from` query parameter of the `/messages` request.
115+ """
116+
117+ end_token : StreamToken | None
118+ """
119+ A token corresponding to the end of chunk. This token can be passed back to this
120+ endpoint to request further events.
121+
122+ If no further events are available (either because we have reached the start of the
123+ timeline, or because the user does not have permission to see any more events), this
124+ property is omitted from the response.
125+ """
126+
127+
73128class PaginationHandler :
74129 """Handles pagination and purge history requests.
75130
@@ -418,7 +473,7 @@ async def get_messages(
418473 as_client_event : bool = True ,
419474 event_filter : Filter | None = None ,
420475 use_admin_priviledge : bool = False ,
421- ) -> JsonDict :
476+ ) -> GetMessagesResult :
422477 """Get messages in a room.
423478
424479 Args:
@@ -617,10 +672,13 @@ async def get_messages(
617672 # In that case we do not return end, to tell the client
618673 # there is no need for further queries.
619674 if not events :
620- return {
621- "chunk" : [],
622- "start" : await from_token .to_string (self .store ),
623- }
675+ return GetMessagesResult (
676+ messages_chunk = [],
677+ bundled_aggregations = {},
678+ state = None ,
679+ start_token = from_token ,
680+ end_token = None ,
681+ )
624682
625683 if event_filter :
626684 events = await event_filter .filter (events )
@@ -636,11 +694,13 @@ async def get_messages(
636694 # if after the filter applied there are no more events
637695 # return immediately - but there might be more in next_token batch
638696 if not events :
639- return {
640- "chunk" : [],
641- "start" : await from_token .to_string (self .store ),
642- "end" : await next_token .to_string (self .store ),
643- }
697+ return GetMessagesResult (
698+ messages_chunk = [],
699+ bundled_aggregations = {},
700+ state = None ,
701+ start_token = from_token ,
702+ end_token = next_token ,
703+ )
644704
645705 state = None
646706 if event_filter and event_filter .lazy_load_members and len (events ) > 0 :
@@ -657,38 +717,20 @@ async def get_messages(
657717
658718 if state_ids :
659719 state_dict = await self .store .get_events (list (state_ids .values ()))
660- state = state_dict .values ()
720+ state = list ( state_dict .values () )
661721
662722 aggregations = await self ._relations_handler .get_bundled_aggregations (
663723 events , user_id
664724 )
665725
666- time_now = self .clock .time_msec ()
667-
668- serialize_options = SerializeEventConfig (
669- as_client_event = as_client_event , requester = requester
726+ return GetMessagesResult (
727+ messages_chunk = events ,
728+ bundled_aggregations = aggregations ,
729+ state = state ,
730+ start_token = from_token ,
731+ end_token = next_token ,
670732 )
671733
672- chunk = {
673- "chunk" : (
674- await self ._event_serializer .serialize_events (
675- events ,
676- time_now ,
677- config = serialize_options ,
678- bundle_aggregations = aggregations ,
679- )
680- ),
681- "start" : await from_token .to_string (self .store ),
682- "end" : await next_token .to_string (self .store ),
683- }
684-
685- if state :
686- chunk ["state" ] = await self ._event_serializer .serialize_events (
687- state , time_now , config = serialize_options
688- )
689-
690- return chunk
691-
692734 async def _shutdown_and_purge_room (
693735 self ,
694736 task : ScheduledTask ,
0 commit comments