Skip to content
10 changes: 8 additions & 2 deletions protocol/communities/community.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,8 +1552,14 @@ func (o *Community) MemberUpdateChannelID() string {
return o.IDString() + "-memberUpdate"
}

func (o *Community) PubsubTopic() string {
return o.Shard().PubsubTopic()
func (o *Community) PubsubTopic(fallbackPubsubTopic ...string) string {
if o.Shard().PubsubTopic() != "" {
return o.Shard().PubsubTopic()
}
if len(fallbackPubsubTopic) > 0 {
return fallbackPubsubTopic[0]
}
return ""
}

func (o *Community) PubsubTopicPrivateKey() *ecdsa.PrivateKey {
Expand Down
2 changes: 1 addition & 1 deletion protocol/communities_key_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (ckd *CommunitiesKeyDistributorImpl) sendKeyExchangeMessage(community *comm
Recipients: pubkeys,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
HashRatchetGroupID: hashRatchetGroupID,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()),
}
_, err := ckd.messaging.SendCommunityMessage(context.Background(), &rawMessage)

Expand Down
4 changes: 2 additions & 2 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@ func (m *Messenger) publishContactCode() error {
}
for _, community := range joinedCommunities {
rawMessage.LocalChatID = community.MemberUpdateChannelID()
rawMessage.PubsubTopic = community.PubsubTopic()
rawMessage.PubsubTopic = community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic())
_, err = m.messaging.SendPublic(ctx, rawMessage.LocalChatID, rawMessage)
if err != nil {
return err
Expand Down Expand Up @@ -1816,7 +1816,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage messagingtyp
// Use a single content-topic for all community chats.
// Reasoning: https://github.com/status-im/status-go/pull/5864
rawMessage.ContentTopic = community.UniversalChatID()
rawMessage.PubsubTopic = community.PubsubTopic()
rawMessage.PubsubTopic = community.PubsubTopic(messagingtypes.GlobalCommunityContentPubsubTopic())

canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), rawMessage.MessageType)
if err != nil {
Expand Down
37 changes: 20 additions & 17 deletions protocol/messenger_communities.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/status-im/status-go/crypto/types"
"github.com/status-im/status-go/images"
messagingtypes "github.com/status-im/status-go/messaging/types"
wakuv2 "github.com/status-im/status-go/messaging/waku"
multiaccountscommon "github.com/status-im/status-go/multiaccounts/common"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/communities"
Expand Down Expand Up @@ -135,7 +136,7 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err
SkipEncryptionLayer: true,
CommunityID: org.ID(),
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
PubsubTopic: org.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
PubsubTopic: org.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()),
Priority: &messagingtypes.HighPriority,
}
if org.Encrypted() {
Expand All @@ -151,7 +152,7 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err
messageID, err := m.messaging.SendPublic(context.Background(), org.IDString(), rawMessage)
if err == nil {
m.logger.Debug("published community",
zap.String("pubsubTopic", org.PubsubTopic()),
zap.String("pubsubTopic", org.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic())),
zap.String("communityID", org.IDString()),
zap.String("messageID", hexutil.Encode(messageID)),
zap.Uint64("clock", org.Clock()),
Expand All @@ -174,7 +175,7 @@ func (m *Messenger) publishCommunityEvents(community *communities.Community, msg
// we don't want to wrap in an encryption layer message
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EVENTS_MESSAGE,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()), // TODO: confirm if it should be sent in community pubsub topic
Priority: &messagingtypes.LowPriority,
}

Expand Down Expand Up @@ -202,6 +203,7 @@ func (m *Messenger) publishCommunityPrivilegedMemberSyncMessage(msg *communities
Sender: community.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PRIVILEGED_USER_SYNC_MESSAGE,
PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()),
}

for _, receivers := range msg.Receivers {
Expand Down Expand Up @@ -337,7 +339,7 @@ func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscripti
Sender: community.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_USER_KICKED,
PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(),
PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(),
}

_, err = m.messaging.SendPrivate(context.Background(), pk, rawMessage)
Expand Down Expand Up @@ -672,7 +674,7 @@ func (m *Messenger) handleCommunitySharedAddressesRequest(state *ReceivedMessage
CommunityID: community.ID(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_SHARED_ADDRESSES_RESPONSE,
PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(),
PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(),
ResendType: messagingtypes.ResendTypeRawMessage,
ResendMethod: messagingtypes.ResendMethodSendPrivate,
Recipients: []*ecdsa.PublicKey{signer},
Expand Down Expand Up @@ -759,7 +761,7 @@ func (m *Messenger) publishGroupGrantMessage(community *communities.Community, t
Sender: community.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_UPDATE_GRANT,
PubsubTopic: community.PubsubTopic(),
PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()),
Priority: &messagingtypes.LowPriority,
}

Expand Down Expand Up @@ -1460,7 +1462,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun
ResendType: messagingtypes.ResendTypeRawMessage,
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(),
PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(),
Priority: &messagingtypes.HighPriority,
}

Expand Down Expand Up @@ -1634,7 +1636,7 @@ func (m *Messenger) EditSharedAddressesForCommunity(request *requests.EditShared
CommunityID: community.ID(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EDIT_SHARED_ADDRESSES,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()), // TODO: confirm if it should be sent in community pubsub topic
ResendType: messagingtypes.ResendTypeRawMessage,
}

Expand Down Expand Up @@ -1677,7 +1679,7 @@ func (m *Messenger) PublishTokenActionToPrivilegedMembers(communityID []byte, ch
ResendType: messagingtypes.ResendTypeRawMessage,
ResendMethod: messagingtypes.ResendMethodSendPrivate,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_TOKEN_ACTION,
PubsubTopic: community.PubsubTopic(),
PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()),
}

skipMembers := make(map[string]struct{})
Expand Down Expand Up @@ -1838,7 +1840,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(ctx context.Context, request *r
CommunityID: community.ID(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN,
PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(),
PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(),
ResendType: messagingtypes.ResendTypeRawMessage,
Priority: &messagingtypes.HighPriority,
}
Expand Down Expand Up @@ -1971,7 +1973,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ
Community: encryptedDescription, // Deprecated but kept for backward compatibility, to be removed in future
Grant: grant,
ProtectedTopicPrivateKey: crypto.FromECDSA(key),
Shard: community.Shard().Protobuffer(),
Shard: community.Shard().Protobuffer(), // TODO p test
CommunityDescriptionProtocolMessage: descriptionMessage,
}

Expand All @@ -1996,7 +1998,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ
CommunityID: community.ID(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN_RESPONSE,
PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(),
PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(),
ResendType: messagingtypes.ResendTypeRawMessage,
ResendMethod: messagingtypes.ResendMethodSendPrivate,
Recipients: []*ecdsa.PublicKey{pk},
Expand Down Expand Up @@ -2100,6 +2102,7 @@ func (m *Messenger) declineRequestToJoinCommunity(requestToJoin *communities.Req
Sender: community.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PRIVILEGED_USER_SYNC_MESSAGE,
PubsubTopic: wakuv2.GlobalCommunityControlPubsubTopic(),
}

privilegedMembers := community.GetPrivilegedMembers()
Expand Down Expand Up @@ -2204,7 +2207,7 @@ func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerRespon
CommunityID: communityID,
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_LEAVE,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in the community pubsub topic
PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()),
ResendType: messagingtypes.ResendTypeRawMessage,
Priority: &messagingtypes.HighPriority,
}
Expand Down Expand Up @@ -2781,7 +2784,7 @@ func (m *Messenger) ReevaluateCommunityMembersPermissions(request *requests.Reev
CommunityID: request.CommunityID,
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REEVALUATE_PERMISSIONS_REQUEST,
PubsubTopic: community.PubsubTopic(),
PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()),
}
_, err = m.SendMessageToControlNode(community, &rawMessage)
if err != nil {
Expand Down Expand Up @@ -3578,7 +3581,7 @@ func (m *Messenger) sendSharedAddressToControlNode(receiver *ecdsa.PublicKey, co
CommunityID: community.ID(),
SkipEncryptionLayer: false,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()),
ResendType: messagingtypes.ResendTypeDataSync,
ResendMethod: messagingtypes.ResendMethodSendPrivate,
Recipients: []*ecdsa.PublicKey{receiver},
Expand Down Expand Up @@ -4107,7 +4110,7 @@ func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error {
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_MESSAGE_ARCHIVE_MAGNETLINK,
SkipGroupMessageWrap: true,
PubsubTopic: community.PubsubTopic(),
PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()),
Priority: &messagingtypes.LowPriority,
}

Expand Down Expand Up @@ -4912,7 +4915,7 @@ func (m *Messenger) DeleteCommunityMemberMessages(request *requests.DeleteCommun
Sender: community.PrivateKey(),
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_DELETE_COMMUNITY_MEMBER_MESSAGES,
PubsubTopic: community.PubsubTopic(),
PubsubTopic: community.PubsubTopic(wakuv2.GlobalCommunityControlPubsubTopic()),
}

_, err = m.messaging.SendPublic(context.Background(), community.IDString(), rawMessage)
Expand Down
2 changes: 1 addition & 1 deletion protocol/messenger_community_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *Messenger) sendPublicCommunityShardInfo(community *communities.Communit
// we don't want to wrap in an encryption layer message
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PUBLIC_SHARD_INFO,
PubsubTopic: messagingtypes.DefaultNonProtectedPubsubTopic(), // it must be sent always to default shard pubsub topic
PubsubTopic: messagingtypes.GlobalCommunityControlPubsubTopic(), // it must be sent always to default shard pubsub topic
Priority: &messagingtypes.HighPriority,
}

Expand Down
2 changes: 1 addition & 1 deletion protocol/messenger_peersyncing.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (m *Messenger) sendDatasyncOffersForCommunities() error {
Payload: payload,
Ephemeral: true,
SkipApplicationWrap: true,
PubsubTopic: community.PubsubTopic(),
PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()),
Priority: &messagingtypes.LowPriority,
}
_, err = m.messaging.SendPublic(context.Background(), community.IDString(), rawMessage)
Expand Down
4 changes: 2 additions & 2 deletions protocol/messenger_status_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (m *Messenger) sendUserStatus(ctx context.Context, status UserStatus) error
}
for _, community := range joinedCommunities {
rawMessage.LocalChatID = community.StatusUpdatesChannelID()
rawMessage.PubsubTopic = community.PubsubTopic()
rawMessage.PubsubTopic = community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic())
_, err = m.messaging.SendPublic(ctx, rawMessage.LocalChatID, rawMessage)
if err != nil {
return err
Expand Down Expand Up @@ -175,7 +175,7 @@ func (m *Messenger) sendCurrentUserStatusToCommunity(ctx context.Context, commun
MessageType: protobuf.ApplicationMetadataMessage_STATUS_UPDATE,
ResendType: messagingtypes.ResendTypeNone, // does this need to be resent?
Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC,
PubsubTopic: community.PubsubTopic(),
PubsubTopic: community.PubsubTopic(messagingtypes.GlobalCommunityControlPubsubTopic()),
Priority: &messagingtypes.LowPriority,
}

Expand Down
4 changes: 4 additions & 0 deletions tests-functional/docker-compose.waku.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ services:
"--rest-admin",
"--shard=32",
"--shard=64",
"--shard=128",
"--shard=256",
"--staticnode=/dns4/store/tcp/60002/p2p/16Uiu2HAmCDqxtfF1DwBqs7UJ4TgSnjoh6j1RtE1hhQxLLao84jLi",
"--storenode=/dns4/store/tcp/60002/p2p/16Uiu2HAmCDqxtfF1DwBqs7UJ4TgSnjoh6j1RtE1hhQxLLao84jLi",
"--tcp-port=60001"
Expand Down Expand Up @@ -53,6 +55,8 @@ services:
"--rest-admin",
"--shard=32",
"--shard=64",
"--shard=128",
"--shard=256",
"--staticnode=/dns4/boot-1/tcp/60001/p2p/16Uiu2HAm3vFYHkGRURyJ6F7bwDyzMLtPEuCg4DU89T7km2u8Fjyb",
"--store=true",
"--tcp-port=60002"
Expand Down