diff --git a/lib/model/message_list.dart b/lib/model/message_list.dart index 4f555f7124..a9535136b2 100644 --- a/lib/model/message_list.dart +++ b/lib/model/message_list.dart @@ -403,6 +403,7 @@ class MessageListView with ChangeNotifier, _MessageSequence { numAfter: 0, ); store.reconcileMessages(result.messages); + store.recentSenders.handleMessages(result.messages); // TODO(#824) for (final message in result.messages) { if (_messageVisible(message)) { _addMessage(message); @@ -439,6 +440,7 @@ class MessageListView with ChangeNotifier, _MessageSequence { } store.reconcileMessages(result.messages); + store.recentSenders.handleMessages(result.messages); // TODO(#824) final fetchedMessages = _allMessagesVisible ? result.messages // Avoid unnecessarily copying the list. diff --git a/lib/model/recent_senders.dart b/lib/model/recent_senders.dart new file mode 100644 index 0000000000..d075d05eee --- /dev/null +++ b/lib/model/recent_senders.dart @@ -0,0 +1,149 @@ +import 'package:collection/collection.dart'; +import 'package:flutter/foundation.dart'; + +import '../api/model/events.dart'; +import '../api/model/model.dart'; +import 'algorithms.dart'; + +/// Tracks the latest messages sent by each user, in each stream and topic. +/// +/// Use [latestMessageIdOfSenderInStream] and [latestMessageIdOfSenderInTopic] +/// for queries. +class RecentSenders { + // streamSenders[streamId][senderId] = MessageIdTracker + @visibleForTesting + final Map> streamSenders = {}; + + // topicSenders[streamId][topic][senderId] = MessageIdTracker + @visibleForTesting + final Map>> topicSenders = {}; + + /// The latest message the given user sent to the given stream, + /// or null if no such message is known. + int? latestMessageIdOfSenderInStream({ + required int streamId, + required int senderId, + }) => streamSenders[streamId]?[senderId]?.maxId; + + /// The latest message the given user sent to the given topic, + /// or null if no such message is known. + int? latestMessageIdOfSenderInTopic({ + required int streamId, + required String topic, + required int senderId, + }) => topicSenders[streamId]?[topic]?[senderId]?.maxId; + + /// Records the necessary data from a batch of just-fetched messages. + /// + /// The messages must be sorted by [Message.id] ascending. + void handleMessages(List messages) { + final messagesByUserInStream = <(int, int), QueueList>{}; + final messagesByUserInTopic = <(int, String, int), QueueList>{}; + for (final message in messages) { + if (message is! StreamMessage) continue; + final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message; + (messagesByUserInStream[(streamId, senderId)] ??= QueueList()).add(messageId); + (messagesByUserInTopic[(streamId, topic, senderId)] ??= QueueList()).add(messageId); + } + + for (final entry in messagesByUserInStream.entries) { + final (streamId, senderId) = entry.key; + ((streamSenders[streamId] ??= {}) + [senderId] ??= MessageIdTracker()).addAll(entry.value); + } + for (final entry in messagesByUserInTopic.entries) { + final (streamId, topic, senderId) = entry.key; + (((topicSenders[streamId] ??= {})[topic] ??= {}) + [senderId] ??= MessageIdTracker()).addAll(entry.value); + } + } + + /// Records the necessary data from a new message. + void handleMessage(Message message) { + if (message is! StreamMessage) return; + final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message; + ((streamSenders[streamId] ??= {}) + [senderId] ??= MessageIdTracker()).add(messageId); + (((topicSenders[streamId] ??= {})[topic] ??= {}) + [senderId] ??= MessageIdTracker()).add(messageId); + } + + void handleDeleteMessageEvent(DeleteMessageEvent event, Map cachedMessages) { + if (event.messageType != MessageType.stream) return; + + final messagesByUser = >{}; + for (final id in event.messageIds) { + final message = cachedMessages[id] as StreamMessage?; + if (message == null) continue; + (messagesByUser[message.senderId] ??= []).add(id); + } + + final DeleteMessageEvent(:streamId!, :topic!) = event; + final sendersInStream = streamSenders[streamId]; + final topicsInStream = topicSenders[streamId]; + final sendersInTopic = topicsInStream?[topic]; + for (final entry in messagesByUser.entries) { + final MapEntry(key: senderId, value: messages) = entry; + + final streamTracker = sendersInStream?[senderId]; + streamTracker?.removeAll(messages); + if (streamTracker?.maxId == null) sendersInStream?.remove(senderId); + + final topicTracker = sendersInTopic?[senderId]; + topicTracker?.removeAll(messages); + if (topicTracker?.maxId == null) sendersInTopic?.remove(senderId); + } + if (sendersInStream?.isEmpty ?? false) streamSenders.remove(streamId); + if (sendersInTopic?.isEmpty ?? false) topicsInStream?.remove(topic); + if (topicsInStream?.isEmpty ?? false) topicSenders.remove(streamId); + } +} + +@visibleForTesting +class MessageIdTracker { + /// A list of distinct message IDs, sorted ascending. + @visibleForTesting + QueueList ids = QueueList(); + + /// The maximum id in the tracker list, or `null` if the list is empty. + int? get maxId => ids.lastOrNull; + + /// Add the message ID to the tracker list at the proper place, if not present. + /// + /// Optimized, taking O(1) time for the case where that place is the end, + /// because that's the common case for a message that is received through + /// [PerAccountStore.handleEvent]. May take O(n) time in some rare cases. + void add(int id) { + if (ids.isEmpty || id > ids.last) { + ids.addLast(id); + return; + } + final i = lowerBound(ids, id); + if (i < ids.length && ids[i] == id) { + // The ID is already present. Nothing to do. + return; + } + ids.insert(i, id); + } + + /// Add the messages IDs to the tracker list at the proper place, if not present. + /// + /// [newIds] should be sorted ascending. + void addAll(QueueList newIds) { + if (ids.isEmpty) { + ids = newIds; + return; + } + ids = setUnion(ids, newIds); + } + + void removeAll(List idsToRemove) { + ids.removeWhere((id) { + final i = lowerBound(idsToRemove, id); + return i < idsToRemove.length && idsToRemove[i] == id; + }); + } + + @override + String toString() => ids.toString(); +} diff --git a/lib/model/store.dart b/lib/model/store.dart index 0b27a91e9e..953d71283e 100644 --- a/lib/model/store.dart +++ b/lib/model/store.dart @@ -23,6 +23,7 @@ import 'database.dart'; import 'message.dart'; import 'message_list.dart'; import 'recent_dm_conversations.dart'; +import 'recent_senders.dart'; import 'channel.dart'; import 'typing_status.dart'; import 'unreads.dart'; @@ -256,6 +257,7 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore { ), recentDmConversationsView: RecentDmConversationsView( initial: initialSnapshot.recentPrivateConversations, selfUserId: account.userId), + recentSenders: RecentSenders(), ); } @@ -276,6 +278,7 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore { required MessageStoreImpl messages, required this.unreads, required this.recentDmConversationsView, + required this.recentSenders, }) : assert(selfUserId == globalStore.getAccount(accountId)!.userId), assert(realmUrl == globalStore.getAccount(accountId)!.realmUrl), assert(realmUrl == connection.realmUrl), @@ -369,6 +372,8 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore { final RecentDmConversationsView recentDmConversationsView; + final RecentSenders recentSenders; + //////////////////////////////// // Other digests of data. @@ -492,6 +497,7 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore { _messages.handleMessageEvent(event); unreads.handleMessageEvent(event); recentDmConversationsView.handleMessageEvent(event); + recentSenders.handleMessage(event.message); // TODO(#824) // When adding anything here (to handle [MessageEvent]), // it probably belongs in [reconcileMessages] too. @@ -502,6 +508,11 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore { case DeleteMessageEvent(): assert(debugLog("server event: delete_message ${event.messageIds}")); + // This should be called before [_messages.handleDeleteMessageEvent(event)], + // as we need to know about each message for [event.messageIds], + // specifically, their `senderId`s. By calling this after the + // aforementioned line, we'll lose reference to those messages. + recentSenders.handleDeleteMessageEvent(event, messages); _messages.handleDeleteMessageEvent(event); unreads.handleDeleteMessageEvent(event); diff --git a/test/model/autocomplete_test.dart b/test/model/autocomplete_test.dart index 03fbaa9c28..5493598200 100644 --- a/test/model/autocomplete_test.dart +++ b/test/model/autocomplete_test.dart @@ -296,6 +296,7 @@ void main() { check(done).isFalse(); for (int i = 0; i < 3; i++) { await Future(() {}); + if (done) break; } check(done).isTrue(); final results = view.results diff --git a/test/model/message_list_test.dart b/test/model/message_list_test.dart index 812a8bcdda..65fda053c5 100644 --- a/test/model/message_list_test.dart +++ b/test/model/message_list_test.dart @@ -18,6 +18,7 @@ import '../api/model/model_checks.dart'; import '../example_data.dart' as eg; import '../stdlib_checks.dart'; import 'content_checks.dart'; +import 'recent_senders_test.dart' as recent_senders_test; import 'test_store.dart'; void main() { @@ -141,6 +142,25 @@ void main() { ..haveOldest.isTrue(); }); + // TODO(#824): move this test + test('fetchInitial, recent senders track all the messages', () async { + const narrow = CombinedFeedNarrow(); + await prepare(narrow: narrow); + final messages = [ + eg.streamMessage(), + // Not subscribed to the stream with id 10. + eg.streamMessage(stream: eg.stream(streamId: 10)), + ]; + connection.prepare(json: newestResult( + foundOldest: false, + messages: messages, + ).toJson()); + await model.fetchInitial(); + + check(model).messages.length.equals(1); + recent_senders_test.checkMatchesMessages(store.recentSenders, messages); + }); + test('fetchOlder', () async { const narrow = CombinedFeedNarrow(); await prepare(narrow: narrow); @@ -233,6 +253,27 @@ void main() { ..messages.length.equals(200); }); + // TODO(#824): move this test + test('fetchOlder, recent senders track all the messages', () async { + const narrow = CombinedFeedNarrow(); + await prepare(narrow: narrow); + final initialMessages = List.generate(10, (i) => eg.streamMessage(id: 100 + i)); + await prepareMessages(foundOldest: false, messages: initialMessages); + + final oldMessages = List.generate(10, (i) => eg.streamMessage(id: 89 + i)) + // Not subscribed to the stream with id 10. + ..add(eg.streamMessage(id: 99, stream: eg.stream(streamId: 10))); + connection.prepare(json: olderResult( + anchor: 100, foundOldest: false, + messages: oldMessages, + ).toJson()); + await model.fetchOlder(); + + check(model).messages.length.equals(20); + recent_senders_test.checkMatchesMessages(store.recentSenders, + [...initialMessages, ...oldMessages]); + }); + test('MessageEvent', () async { final stream = eg.stream(); await prepare(narrow: StreamNarrow(stream.streamId)); diff --git a/test/model/recent_senders_test.dart b/test/model/recent_senders_test.dart new file mode 100644 index 0000000000..2da516e11c --- /dev/null +++ b/test/model/recent_senders_test.dart @@ -0,0 +1,213 @@ +import 'package:checks/checks.dart'; +import 'package:flutter_test/flutter_test.dart'; +import 'package:zulip/api/model/model.dart'; +import 'package:zulip/model/recent_senders.dart'; +import '../example_data.dart' as eg; + +/// [messages] should be sorted by [id] ascending. +void checkMatchesMessages(RecentSenders model, List messages) { + final Map>> messagesByUserInStream = {}; + final Map>>> messagesByUserInTopic = {}; + for (final message in messages) { + if (message is! StreamMessage) { + throw UnsupportedError('Message of type ${message.runtimeType} is not expected.'); + } + + final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message; + + ((messagesByUserInStream[streamId] ??= {}) + [senderId] ??= {}).add(messageId); + (((messagesByUserInTopic[streamId] ??= {})[topic] ??= {}) + [senderId] ??= {}).add(messageId); + } + + final actualMessagesByUserInStream = model.streamSenders.map((streamId, sendersByStream) => + MapEntry(streamId, sendersByStream.map((senderId, tracker) => + MapEntry(senderId, Set.from(tracker.ids))))); + final actualMessagesByUserInTopic = model.topicSenders.map((streamId, topicsByStream) => + MapEntry(streamId, topicsByStream.map((topic, sendersByTopic) => + MapEntry(topic, sendersByTopic.map((senderId, tracker) => + MapEntry(senderId, Set.from(tracker.ids))))))); + + check(actualMessagesByUserInStream).deepEquals(messagesByUserInStream); + check(actualMessagesByUserInTopic).deepEquals(messagesByUserInTopic); +} + +void main() { + test('starts with empty stream and topic senders', () { + final model = RecentSenders(); + checkMatchesMessages(model, []); + }); + + group('RecentSenders.handleMessage', () { + test('stream message gets included', () { + final model = RecentSenders(); + final streamMessage = eg.streamMessage(); + model.handleMessage(streamMessage); + checkMatchesMessages(model, [streamMessage]); + }); + + test('DM message gets ignored', () { + final model = RecentSenders(); + final dmMessage = eg.dmMessage(from: eg.selfUser, to: [eg.otherUser]); + model.handleMessage(dmMessage); + checkMatchesMessages(model, []); + }); + }); + + group('RecentSenders.handleMessages', () { + late RecentSenders model; + final streamA = eg.stream(); + final streamB = eg.stream(); + final userX = eg.user(); + final userY = eg.user(); + + void setupModel(List messages) { + model = RecentSenders(); + for (final message in messages) { + model.handleMessage(message); + } + } + + void checkHandleMessages(List oldMessages, List newMessages) { + setupModel(oldMessages); + model.handleMessages(newMessages); + final expectedMessages = [...oldMessages, ...newMessages] + ..removeWhere((m) => m is! StreamMessage); + checkMatchesMessages(model, expectedMessages); + } + + group('single tracker', () { + void checkHandleMessagesSingle(List oldIds, List newIds) { + checkHandleMessages([ + for (final id in oldIds) + eg.streamMessage(stream: streamA, topic: 'a', sender: userX, id: id), + ], [ + for (final id in newIds) + eg.streamMessage(stream: streamA, topic: 'a', sender: userX, id: id), + ]); + } + + test('batch goes before the existing messages', () { + checkHandleMessagesSingle([300, 400], [100, 200]); + }); + + test('batch goes after the existing messages', () { + checkHandleMessagesSingle([300, 400], [500, 600]); + }); + + test('batch is interspersed among the existing messages', () { + checkHandleMessagesSingle([200, 400], [100, 300, 500]); + }); + + test('batch contains some of already-existing messages', () { + checkHandleMessagesSingle([200, 300, 400], [100, 200, 400, 500]); + }); + }); + + test('batch with both DM and stream messages -> ignores DM, processes stream messages', () { + checkHandleMessages([], [ + eg.streamMessage(stream: streamA, topic: 'thing', sender: userX), + eg.dmMessage(from: eg.otherUser, to: [eg.selfUser]), + eg.streamMessage(stream: streamA, topic: 'thing', sender: userX), + ]); + }); + + test('add new sender', () { + checkHandleMessages( + [eg.streamMessage(stream: streamA, topic: 'thing', sender: userX)], + [eg.streamMessage(stream: streamA, topic: 'thing', sender: userY)]); + }); + + test('add new topic', () { + checkHandleMessages( + [eg.streamMessage(stream: streamA, topic: 'thing', sender: userX)], + [eg.streamMessage(stream: streamA, topic: 'other', sender: userX)]); + }); + + test('add new stream', () { + checkHandleMessages( + [eg.streamMessage(stream: streamA, topic: 'thing', sender: userX)], + [eg.streamMessage(stream: streamB, topic: 'thing', sender: userX)]); + }); + + test('multiple conversations and senders interspersed', () { + checkHandleMessages([], [ + eg.streamMessage(stream: streamA, topic: 'thing', sender: userX), + eg.streamMessage(stream: streamA, topic: 'other', sender: userX), + eg.streamMessage(stream: streamB, topic: 'thing', sender: userX), + eg.streamMessage(stream: streamA, topic: 'thing', sender: userY), + eg.streamMessage(stream: streamA, topic: 'thing', sender: userX), + ]); + }); + }); + + test('RecentSenders.handleDeleteMessageEvent', () { + final model = RecentSenders(); + final stream = eg.stream(); + final userX = eg.user(); + final userY = eg.user(); + + final messages = [ + eg.streamMessage(stream: stream, topic: 'thing', sender: userX), + eg.streamMessage(stream: stream, topic: 'other', sender: userX), + eg.streamMessage(stream: stream, topic: 'thing', sender: userY), + ]; + + model.handleMessages(messages); + checkMatchesMessages(model, messages); + + model.handleDeleteMessageEvent(eg.deleteMessageEvent([messages[0], messages[2]]), + Map.fromEntries(messages.map((msg) => MapEntry(msg.id, msg)))); + + checkMatchesMessages(model, [messages[1]]); + }); + + test('RecentSenders.latestMessageIdOfSenderInStream', () { + final model = RecentSenders(); + final stream1 = eg.stream(streamId: 1); + final user10 = eg.user(userId: 10); + + final messages = [ + eg.streamMessage(stream: stream1, sender: user10, id: 100), + eg.streamMessage(stream: stream1, sender: user10, id: 200), + eg.streamMessage(stream: stream1, sender: user10, id: 300), + ]; + + model.handleMessages(messages); + + check(model.latestMessageIdOfSenderInStream( + streamId: 1, senderId: 10)).equals(300); + // No message of user 20 in stream1. + check(model.latestMessageIdOfSenderInStream( + streamId: 1, senderId: 20)).equals(null); + // No message in stream 2 at all. + check(model.latestMessageIdOfSenderInStream( + streamId: 2, senderId: 10)).equals(null); + }); + + test('RecentSenders.latestMessageIdOfSenderInTopic', () { + final model = RecentSenders(); + final stream1 = eg.stream(streamId: 1); + final user10 = eg.user(userId: 10); + + final messages = [ + eg.streamMessage(stream: stream1, topic: 'a', sender: user10, id: 200), + eg.streamMessage(stream: stream1, topic: 'a', sender: user10, id: 300), + ]; + + model.handleMessages(messages); + + check(model.latestMessageIdOfSenderInTopic(streamId: 1, + topic: 'a', senderId: 10)).equals(300); + // No message of user 20 in topic "a". + check(model.latestMessageIdOfSenderInTopic(streamId: 1, + topic: 'a', senderId: 20)).equals(null); + // No message in topic "b" at all. + check(model.latestMessageIdOfSenderInTopic(streamId: 1, + topic: 'b', senderId: 10)).equals(null); + // No message in stream 2 at all. + check(model.latestMessageIdOfSenderInTopic(streamId: 2, + topic: 'a', senderId: 10)).equals(null); + }); +}