diff --git a/src/Socket/chats.ts b/src/Socket/chats.ts index 639ebd4db89..ab4b4b401e9 100644 --- a/src/Socket/chats.ts +++ b/src/Socket/chats.ts @@ -28,7 +28,7 @@ import type { import { ALL_WA_PATCH_NAMES } from '../Types' import type { QuickReplyAction } from '../Types/Bussines.js' import type { LabelActionBody } from '../Types/Label' -import { SyncState } from '../Types/State' +import { SyncState } from '../Types' import { chatModificationToAppPatch, type ChatMutationMap, @@ -41,7 +41,7 @@ import { newLTHashState, processSyncAction } from '../Utils' -import { makeMutex } from '../Utils/make-mutex' +import { makeMutex , makeKeyedMutex} from '../Utils/make-mutex' import processMessage from '../Utils/process-message' import { type BinaryNode, @@ -74,16 +74,16 @@ export const makeChatsSocket = (config: SocketConfig) => { let syncState: SyncState = SyncState.Connecting /** this mutex ensures that messages are processed in order */ - const messageMutex = makeMutex() + const messageMutex = makeKeyedMutex() /** this mutex ensures that receipts are processed in order */ - const receiptMutex = makeMutex() + const receiptMutex = makeKeyedMutex() /** this mutex ensures that app state patches are processed in order */ const appStatePatchMutex = makeMutex() /** this mutex ensures that notifications are processed in order */ - const notificationMutex = makeMutex() + const notificationMutex = makeKeyedMutex() // Timeout for AwaitingInitialSync state let awaitingSyncTimeout: NodeJS.Timeout | undefined diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index 9363d2b4bea..296c4887ba7 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -2,7 +2,7 @@ import NodeCache from '@cacheable/node-cache' import { Boom } from '@hapi/boom' import { randomBytes } from 'crypto' import Long from 'long' -import { proto } from '../../WAProto/index.js' +import { proto } from '../../WAProto' import { DEFAULT_CACHE_TTLS, KEY_BUNDLE_TYPE, MIN_PREKEY_COUNT } from '../Defaults' import type { GroupParticipant, @@ -41,7 +41,7 @@ import { xmppPreKey, xmppSignedPreKey } from '../Utils' -import { makeMutex } from '../Utils/make-mutex' +import { makeKeyedMutex } from '../Utils/make-mutex' import { areJidsSameUser, type BinaryNode, @@ -89,7 +89,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } = sock /** this mutex ensures that each retryRequest will wait for the previous one to finish */ - const retryMutex = makeMutex() + const retryMutex = makeKeyedMutex(); const msgRetryCache = config.msgRetryCounterCache || @@ -1045,7 +1045,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { attrs.participant || attrs.from, isLid ? authState.creds.me?.lid : authState.creds.me?.id ) - const remoteJid = !isNodeFromMe || isJidGroup(attrs.from) ? attrs.from : attrs.recipient + const remoteJid = !isNodeFromMe || isJidGroup(attrs.from) ? attrs.from : attrs.recipient; const fromMe = !attrs.recipient || ((attrs.type === 'retry' || attrs.type === 'sender') && isNodeFromMe) const key: proto.IMessageKey = { @@ -1069,7 +1069,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { try { await Promise.all([ - receiptMutex.mutex(async () => { + receiptMutex.mutex(remoteJid!,async () => { const status = getStatusFromReceiptType(attrs.type) if ( typeof status !== 'undefined' && @@ -1143,7 +1143,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { try { await Promise.all([ - notificationMutex.mutex(async () => { + notificationMutex.mutex(remoteJid!,async () => { const msg = await processNotification(node) if (msg) { const fromMe = areJidsSameUser(node.attrs.participant || remoteJid, authState.creds.me!.id) @@ -1208,8 +1208,11 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } } - if (msg.key?.remoteJid && msg.key?.id && messageRetryManager) { - messageRetryManager.addRecentMessage(msg.key.remoteJid, msg.key.id, msg.message!) + const remoteJid = msg.key.remoteJid!; + const msgId = msg.key.id!; + + if (messageRetryManager && remoteJid && msgId ) { + messageRetryManager.addRecentMessage(remoteJid, msgId, msg.message!) logger.debug( { jid: msg.key.remoteJid, @@ -1220,7 +1223,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { } try { - await messageMutex.mutex(async () => { + await messageMutex.mutex(remoteJid,async () => { await decrypt() // message failed to decrypt if (msg.messageStubType === proto.WebMessageInfo.StubType.CIPHERTEXT && msg.category !== 'peer') { @@ -1237,7 +1240,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { logger.debug(`[handleMessage] Attempting retry request for failed decryption`) // Handle both pre-key and normal retries in single mutex - await retryMutex.mutex(async () => { + await retryMutex.mutex(msgId,async () => { try { if (!ws.isOpen) { logger.debug({ node }, 'Connection closed, skipping retry') diff --git a/src/Socket/messages-send.ts b/src/Socket/messages-send.ts index 9762c268c0c..52a1b631820 100644 --- a/src/Socket/messages-send.ts +++ b/src/Socket/messages-send.ts @@ -1199,7 +1199,7 @@ export const makeMessagesSocket = (config: SocketConfig) => { }) if (config.emitOwnEvents) { process.nextTick(async () => { - await messageMutex.mutex(() => upsertMessage(fullMsg, 'append')) + await messageMutex.mutex(jid,() => upsertMessage(fullMsg, 'append')) }) }