Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Socket/messages-recv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@

await delay(5000)

if (!await placeholderResendCache.get(messageKey?.id!)) {
if (!(await placeholderResendCache.get(messageKey?.id!))) {
logger.debug({ messageKey }, 'message received while resend requested')
return 'RESOLVED'
}
Expand Down Expand Up @@ -183,7 +183,7 @@
return
}

let data: any

Check warning on line 186 in src/Socket/messages-recv.ts

View workflow job for this annotation

GitHub Actions / check-lint

Unexpected any. Specify a different type
try {
data = JSON.parse(mexNode.content.toString())
} catch (error) {
Expand Down Expand Up @@ -279,7 +279,7 @@
case 'update':
const settingsNode = getBinaryNodeChild(child, 'settings')
if (settingsNode) {
const update: Record<string, any> = {}

Check warning on line 282 in src/Socket/messages-recv.ts

View workflow job for this annotation

GitHub Actions / check-lint

Unexpected any. Specify a different type
const nameNode = getBinaryNodeChild(settingsNode, 'name')
if (nameNode?.content) update.name = nameNode.content.toString()

Expand Down
74 changes: 55 additions & 19 deletions src/Utils/auth-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ export const addTransactionCapability = (
): SignalKeyStoreWithTransaction => {
const txStorage = new AsyncLocalStorage<TransactionContext>()

// Queues for concurrency control
// Queues for concurrency control (keyed by signal data type - bounded set)
const keyQueues = new Map<string, PQueue>()

// Transaction mutexes with reference counting for cleanup
const txMutexes = new Map<string, Mutex>()
const txMutexRefCounts = new Map<string, number>()

// Pre-key manager for specialized operations
const preKeyManager = new PreKeyManager(state, logger)
Expand All @@ -142,11 +145,37 @@ export const addTransactionCapability = (
function getTxMutex(key: string): Mutex {
if (!txMutexes.has(key)) {
txMutexes.set(key, new Mutex())
txMutexRefCounts.set(key, 0)
}

return txMutexes.get(key)!
}

/**
* Acquire a reference to a transaction mutex
*/
function acquireTxMutexRef(key: string): void {
const count = txMutexRefCounts.get(key) ?? 0
txMutexRefCounts.set(key, count + 1)
}

/**
* Release a reference to a transaction mutex and cleanup if no longer needed
*/
function releaseTxMutexRef(key: string): void {
const count = (txMutexRefCounts.get(key) ?? 1) - 1
txMutexRefCounts.set(key, count)

// Cleanup if no more references and mutex is not locked
if (count <= 0) {
const mutex = txMutexes.get(key)
if (mutex && !mutex.isLocked()) {
txMutexes.delete(key)
txMutexRefCounts.delete(key)
}
}
}

/**
* Check if currently in a transaction
*/
Expand Down Expand Up @@ -279,29 +308,36 @@ export const addTransactionCapability = (
}

// New transaction - acquire mutex and create context
return getTxMutex(key).runExclusive(async () => {
const ctx: TransactionContext = {
cache: {},
mutations: {},
dbQueries: 0
}
const mutex = getTxMutex(key)
acquireTxMutexRef(key)

logger.trace('entering transaction')
try {
return await mutex.runExclusive(async () => {
const ctx: TransactionContext = {
cache: {},
mutations: {},
dbQueries: 0
}

try {
const result = await txStorage.run(ctx, work)
logger.trace('entering transaction')

// Commit mutations
await commitWithRetry(ctx.mutations)
try {
const result = await txStorage.run(ctx, work)

logger.trace({ dbQueries: ctx.dbQueries }, 'transaction completed')
// Commit mutations
await commitWithRetry(ctx.mutations)

return result
} catch (error) {
logger.error({ error }, 'transaction failed, rolling back')
throw error
}
})
logger.trace({ dbQueries: ctx.dbQueries }, 'transaction completed')

return result
} catch (error) {
logger.error({ error }, 'transaction failed, rolling back')
throw error
}
})
} finally {
releaseTxMutexRef(key)
}
}
}
}
Expand Down
Loading