diff --git a/protocol/communities/community.go b/protocol/communities/community.go index 429b74c140..49e632d8ef 100644 --- a/protocol/communities/community.go +++ b/protocol/communities/community.go @@ -1552,8 +1552,14 @@ func (o *Community) MemberUpdateChannelID() string { return o.IDString() + "-memberUpdate" } -func (o *Community) PubsubTopic() string { - return o.Shard().PubsubTopic() +func (o *Community) PubsubTopic(fallbackPubsubTopic ...string) string { + if o.Shard().PubsubTopic() != "" { + return o.Shard().PubsubTopic() + } + if len(fallbackPubsubTopic) > 0 { + return fallbackPubsubTopic[0] + } + return "" } func (o *Community) PubsubTopicPrivateKey() *ecdsa.PrivateKey { diff --git a/protocol/communities_key_distributor.go b/protocol/communities_key_distributor.go index 0df3bcbe2f..e81130e2cf 100644 --- a/protocol/communities_key_distributor.go +++ b/protocol/communities_key_distributor.go @@ -94,7 +94,7 @@ func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *comm Recipients: pubkeys, MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, HashRatchetGroupID: hashRatchetGroupID, - PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()), } _, err := ckd.messaging.SendCommunityMessage(context.Background(), &rawMessage) diff --git a/protocol/messenger.go b/protocol/messenger.go index 6d626b5225..65284dbb1f 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -917,7 +917,7 @@ func (m *Messenger) publishContactCode() error { } for _, community := range joinedCommunities { rawMessage.LocalChatID = community.MemberUpdateChannelID() - rawMessage.PubsubTopic = community.PubsubTopic() + rawMessage.PubsubTopic = community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()) _, err = m.messaging.SendPublic(ctx, rawMessage.LocalChatID, rawMessage) if err != nil { return err @@ -1816,7 +1816,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage messagingtyp // Use a single content-topic for all community chats. // Reasoning: https://github.com/status-im/status-go/pull/5864 rawMessage.ContentTopic = community.UniversalChatID() - rawMessage.PubsubTopic = community.PubsubTopic() + rawMessage.PubsubTopic = community.PubsubTopic(messagingtypes.GlobalCommunityContentPubsubTopic()) canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), rawMessage.MessageType) if err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 5ea3768c8e..1a5ded01a2 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -26,6 +26,7 @@ import ( "github.com/status-im/status-go/crypto/types" "github.com/status-im/status-go/images" messagingtypes "github.com/status-im/status-go/messaging/types" + wakuv2 "github.com/status-im/status-go/messaging/waku" multiaccountscommon "github.com/status-im/status-go/multiaccounts/common" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/communities" @@ -135,7 +136,7 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err SkipEncryptionLayer: true, CommunityID: org.ID(), MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, - PubsubTopic: org.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + PubsubTopic: org.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()), Priority: &messagingtypes.HighPriority, } if org.Encrypted() { @@ -151,7 +152,7 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err messageID, err := m.messaging.SendPublic(context.Background(), org.IDString(), rawMessage) if err == nil { m.logger.Debug("published community", - zap.String("pubsubTopic", org.PubsubTopic()), + zap.String("pubsubTopic", org.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic())), zap.String("communityID", org.IDString()), zap.String("messageID", hexutil.Encode(messageID)), zap.Uint64("clock", org.Clock()), @@ -174,7 +175,7 @@ func (m *Messenger) publishCommunityEvents(community *communities.Community, msg // we don't want to wrap in an encryption layer message SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EVENTS_MESSAGE, - PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()), // TODO: confirm if it should be sent in community pubsub topic Priority: &messagingtypes.LowPriority, } @@ -202,6 +203,7 @@ func (m *Messenger) publishCommunityPrivilegedMemberSyncMessage(msg *communities Sender: community.PrivateKey(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PRIVILEGED_USER_SYNC_MESSAGE, + PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()), } for _, receivers := range msg.Receivers { @@ -337,7 +339,7 @@ func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscripti Sender: community.PrivateKey(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_USER_KICKED, - PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(), + PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(), } _, err = m.messaging.SendPrivate(context.Background(), pk, rawMessage) @@ -672,7 +674,7 @@ func (m *Messenger) handleCommunitySharedAddressesRequest(state *ReceivedMessage CommunityID: community.ID(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_SHARED_ADDRESSES_RESPONSE, - PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(), + PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(), ResendType: messagingtypes.ResendTypeRawMessage, ResendMethod: messagingtypes.ResendMethodSendPrivate, Recipients: []*ecdsa.PublicKey{signer}, @@ -759,7 +761,7 @@ func (m *Messenger) publishGroupGrantMessage(community *communities.Community, t Sender: community.PrivateKey(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_UPDATE_GRANT, - PubsubTopic: community.PubsubTopic(), + PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()), Priority: &messagingtypes.LowPriority, } @@ -1460,7 +1462,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun ResendType: messagingtypes.ResendTypeRawMessage, SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN, - PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(), + PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(), Priority: &messagingtypes.HighPriority, } @@ -1634,7 +1636,7 @@ func (m *Messenger) EditSharedAddressesForCommunity(request *requests.EditShared CommunityID: community.ID(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EDIT_SHARED_ADDRESSES, - PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()), // TODO: confirm if it should be sent in community pubsub topic ResendType: messagingtypes.ResendTypeRawMessage, } @@ -1677,7 +1679,7 @@ func (m *Messenger) PublishTokenActionToPrivilegedMembers(communityID []byte, ch ResendType: messagingtypes.ResendTypeRawMessage, ResendMethod: messagingtypes.ResendMethodSendPrivate, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_TOKEN_ACTION, - PubsubTopic: community.PubsubTopic(), + PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()), } skipMembers := make(map[string]struct{}) @@ -1838,7 +1840,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(ctx context.Context, request *r CommunityID: community.ID(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN, - PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(), + PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(), ResendType: messagingtypes.ResendTypeRawMessage, Priority: &messagingtypes.HighPriority, } @@ -1971,7 +1973,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ Community: encryptedDescription, // Deprecated but kept for backward compatibility, to be removed in future Grant: grant, ProtectedTopicPrivateKey: crypto.FromECDSA(key), - Shard: community.Shard().Protobuffer(), + Shard: community.Shard().Protobuffer(), // TODO p test CommunityDescriptionProtocolMessage: descriptionMessage, } @@ -1996,7 +1998,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ CommunityID: community.ID(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN_RESPONSE, - PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(), + PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(), ResendType: messagingtypes.ResendTypeRawMessage, ResendMethod: messagingtypes.ResendMethodSendPrivate, Recipients: []*ecdsa.PublicKey{pk}, @@ -2100,6 +2102,7 @@ func (m *Messenger) declineRequestToJoinCommunity(requestToJoin *communities.Req Sender: community.PrivateKey(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PRIVILEGED_USER_SYNC_MESSAGE, + PubsubTopic: wakuv2.GlobalCommunityControlPubsubTopic(), } privilegedMembers := community.GetPrivilegedMembers() @@ -2204,7 +2207,7 @@ func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerRespon CommunityID: communityID, SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_LEAVE, - PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in the community pubsub topic + PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()), ResendType: messagingtypes.ResendTypeRawMessage, Priority: &messagingtypes.HighPriority, } @@ -2781,7 +2784,7 @@ func (m *Messenger) ReevaluateCommunityMembersPermissions(request *requests.Reev CommunityID: request.CommunityID, SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REEVALUATE_PERMISSIONS_REQUEST, - PubsubTopic: community.PubsubTopic(), + PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()), } _, err = m.SendMessageToControlNode(community, &rawMessage) if err != nil { @@ -3578,7 +3581,7 @@ func (m *Messenger) sendSharedAddressToControlNode(receiver *ecdsa.PublicKey, co CommunityID: community.ID(), SkipEncryptionLayer: false, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN, - PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()), ResendType: messagingtypes.ResendTypeDataSync, ResendMethod: messagingtypes.ResendMethodSendPrivate, Recipients: []*ecdsa.PublicKey{receiver}, @@ -4107,7 +4110,7 @@ func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error { Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_MESSAGE_ARCHIVE_MAGNETLINK, SkipGroupMessageWrap: true, - PubsubTopic: community.PubsubTopic(), + PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()), Priority: &messagingtypes.LowPriority, } @@ -4912,7 +4915,7 @@ func (m *Messenger) DeleteCommunityMemberMessages(request *requests.DeleteCommun Sender: community.PrivateKey(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_DELETE_COMMUNITY_MEMBER_MESSAGES, - PubsubTopic: community.PubsubTopic(), + PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()), } _, err = m.messaging.SendPublic(context.Background(), community.IDString(), rawMessage) diff --git a/protocol/messenger_community_shard.go b/protocol/messenger_community_shard.go index 9181a1ca69..193c5356dd 100644 --- a/protocol/messenger_community_shard.go +++ b/protocol/messenger_community_shard.go @@ -57,7 +57,7 @@ func (m *Messenger) sendPublicCommunityShardInfo(community *communities.Communit // we don't want to wrap in an encryption layer message SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PUBLIC_SHARD_INFO, - PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(), // it must be sent always to default shard pubsub topic + PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(), // it must be sent always to default shard pubsub topic Priority: &messagingtypes.HighPriority, } diff --git a/protocol/messenger_peersyncing.go b/protocol/messenger_peersyncing.go index 8274ed480c..11ccff563b 100644 --- a/protocol/messenger_peersyncing.go +++ b/protocol/messenger_peersyncing.go @@ -175,7 +175,7 @@ func (m *Messenger) sendDatasyncOffersForCommunities() error { Payload: payload, Ephemeral: true, SkipApplicationWrap: true, - PubsubTopic: community.PubsubTopic(), + PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()), Priority: &messagingtypes.LowPriority, } _, err = m.messaging.SendPublic(context.Background(), community.IDString(), rawMessage) diff --git a/protocol/messenger_status_updates.go b/protocol/messenger_status_updates.go index 20b8fa2fd2..eeda198356 100644 --- a/protocol/messenger_status_updates.go +++ b/protocol/messenger_status_updates.go @@ -86,7 +86,7 @@ func (m *Messenger) sendUserStatus(ctx context.Context, status UserStatus) error } for _, community := range joinedCommunities { rawMessage.LocalChatID = community.StatusUpdatesChannelID() - rawMessage.PubsubTopic = community.PubsubTopic() + rawMessage.PubsubTopic = community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()) _, err = m.messaging.SendPublic(ctx, rawMessage.LocalChatID, rawMessage) if err != nil { return err @@ -175,7 +175,7 @@ func (m *Messenger) sendCurrentUserStatusToCommunity(ctx context.Context, commun MessageType: protobuf.ApplicationMetadataMessage_STATUS_UPDATE, ResendType: messagingtypes.ResendTypeNone, // does this need to be resent? Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC, - PubsubTopic: community.PubsubTopic(), + PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()), Priority: &messagingtypes.LowPriority, } diff --git a/tests-functional/docker-compose.waku.yml b/tests-functional/docker-compose.waku.yml index 0fcaa9f1e2..e62b5771eb 100644 --- a/tests-functional/docker-compose.waku.yml +++ b/tests-functional/docker-compose.waku.yml @@ -22,6 +22,8 @@ services: "--rest-admin", "--shard=32", "--shard=64", + "--shard=128", + "--shard=256", "--staticnode=/dns4/store/tcp/60002/p2p/16Uiu2HAmCDqxtfF1DwBqs7UJ4TgSnjoh6j1RtE1hhQxLLao84jLi", "--storenode=/dns4/store/tcp/60002/p2p/16Uiu2HAmCDqxtfF1DwBqs7UJ4TgSnjoh6j1RtE1hhQxLLao84jLi", "--tcp-port=60001" @@ -53,6 +55,8 @@ services: "--rest-admin", "--shard=32", "--shard=64", + "--shard=128", + "--shard=256", "--staticnode=/dns4/boot-1/tcp/60001/p2p/16Uiu2HAm3vFYHkGRURyJ6F7bwDyzMLtPEuCg4DU89T7km2u8Fjyb", "--store=true", "--tcp-port=60002"