diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index bb7307d80ce..ca16daf5415 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -151,7 +151,7 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { await delay(5000) - if (!await placeholderResendCache.get(messageKey?.id!)) { + if (!(await placeholderResendCache.get(messageKey?.id!))) { logger.debug({ messageKey }, 'message received while resend requested') return 'RESOLVED' } diff --git a/src/Utils/auth-utils.ts b/src/Utils/auth-utils.ts index b42bd93b5a0..a326c1246d7 100644 --- a/src/Utils/auth-utils.ts +++ b/src/Utils/auth-utils.ts @@ -118,9 +118,12 @@ export const addTransactionCapability = ( ): SignalKeyStoreWithTransaction => { const txStorage = new AsyncLocalStorage() - // Queues for concurrency control + // Queues for concurrency control (keyed by signal data type - bounded set) const keyQueues = new Map() + + // Transaction mutexes with reference counting for cleanup const txMutexes = new Map() + const txMutexRefCounts = new Map() // Pre-key manager for specialized operations const preKeyManager = new PreKeyManager(state, logger) @@ -142,11 +145,37 @@ export const addTransactionCapability = ( function getTxMutex(key: string): Mutex { if (!txMutexes.has(key)) { txMutexes.set(key, new Mutex()) + txMutexRefCounts.set(key, 0) } return txMutexes.get(key)! } + /** + * Acquire a reference to a transaction mutex + */ + function acquireTxMutexRef(key: string): void { + const count = txMutexRefCounts.get(key) ?? 0 + txMutexRefCounts.set(key, count + 1) + } + + /** + * Release a reference to a transaction mutex and cleanup if no longer needed + */ + function releaseTxMutexRef(key: string): void { + const count = (txMutexRefCounts.get(key) ?? 1) - 1 + txMutexRefCounts.set(key, count) + + // Cleanup if no more references and mutex is not locked + if (count <= 0) { + const mutex = txMutexes.get(key) + if (mutex && !mutex.isLocked()) { + txMutexes.delete(key) + txMutexRefCounts.delete(key) + } + } + } + /** * Check if currently in a transaction */ @@ -279,29 +308,36 @@ export const addTransactionCapability = ( } // New transaction - acquire mutex and create context - return getTxMutex(key).runExclusive(async () => { - const ctx: TransactionContext = { - cache: {}, - mutations: {}, - dbQueries: 0 - } + const mutex = getTxMutex(key) + acquireTxMutexRef(key) - logger.trace('entering transaction') + try { + return await mutex.runExclusive(async () => { + const ctx: TransactionContext = { + cache: {}, + mutations: {}, + dbQueries: 0 + } - try { - const result = await txStorage.run(ctx, work) + logger.trace('entering transaction') - // Commit mutations - await commitWithRetry(ctx.mutations) + try { + const result = await txStorage.run(ctx, work) - logger.trace({ dbQueries: ctx.dbQueries }, 'transaction completed') + // Commit mutations + await commitWithRetry(ctx.mutations) - return result - } catch (error) { - logger.error({ error }, 'transaction failed, rolling back') - throw error - } - }) + logger.trace({ dbQueries: ctx.dbQueries }, 'transaction completed') + + return result + } catch (error) { + logger.error({ error }, 'transaction failed, rolling back') + throw error + } + }) + } finally { + releaseTxMutexRef(key) + } } } }