diff --git a/src/Socket/messages-recv.ts b/src/Socket/messages-recv.ts index dbca8e883fa..350a2d8e520 100644 --- a/src/Socket/messages-recv.ts +++ b/src/Socket/messages-recv.ts @@ -884,6 +884,12 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { const handlePrivacyTokenNotification = async (node: BinaryNode) => { const tokensNode = getBinaryNodeChild(node, 'tokens') const from = jidNormalizedUser(node.attrs.from) + let lidForPN: string | null = null + try { + lidForPN = await signalRepository.lidMapping.getLIDForPN(from) + } catch (error) { + logger.warn({ error, jid: from }, 'Failed to get lid for PN in handlePrivacyTokenNotification.') + } if (!tokensNode) return @@ -907,6 +913,12 @@ export const makeMessagesRecvSocket = (config: SocketConfig) => { await authState.keys.set({ tctoken: { [from]: { token: content, timestamp } } }) + + if (lidForPN) { + await authState.keys.set({ + tctoken: { [lidForPN]: { token: content, timestamp } } + }) + } } } } diff --git a/src/Socket/messages-send.ts b/src/Socket/messages-send.ts index adfc5a7a274..eb667080820 100644 --- a/src/Socket/messages-send.ts +++ b/src/Socket/messages-send.ts @@ -1118,6 +1118,10 @@ export const makeMessagesSocket = (config: SocketConfig) => { }, sendMessage: async (jid: string, content: AnyMessageContent, options: MiscMessageGenerationOptions = {}) => { const userJid = authState.creds.me!.id + if (!options.messageId) { + options.messageId = generateMessageIDV2(sock.user?.id) + } + if ( typeof content === 'object' && 'disappearingMessagesInChat' in content && @@ -1152,7 +1156,6 @@ export const makeMessagesSocket = (config: SocketConfig) => { upload: waUploadToServer, mediaCache: config.mediaCache, options: config.options, - messageId: generateMessageIDV2(sock.user?.id), ...options }) const isEventMsg = 'event' in content && !!content.event diff --git a/src/Socket/socket.ts b/src/Socket/socket.ts index f5ce6c191fe..c85cb379faa 100644 --- a/src/Socket/socket.ts +++ b/src/Socket/socket.ts @@ -420,7 +420,7 @@ export const makeSocket = (config: SocketConfig) => { let node: proto.IClientPayload if (!creds.me) { node = generateRegistrationNode(creds, config) - logger.info({ node }, 'not logged in, attempting registration...') + //logger.info({ node }, 'not logged in, attempting registration...') } else { node = generateLoginNode(creds.me.id, config) logger.info({ node }, 'logging in...') @@ -612,7 +612,7 @@ export const makeSocket = (config: SocketConfig) => { } closed = true - logger.info({ trace: error?.stack }, error ? 'connection errored' : 'connection closed') + //logger.info({ trace: error?.stack }, error ? 'connection errored' : 'connection closed') clearInterval(keepAliveReq) clearTimeout(qrTimer) @@ -986,15 +986,7 @@ export const makeSocket = (config: SocketConfig) => { } }) - let didStartBuffer = false process.nextTick(() => { - if (creds.me?.id) { - // start buffering important events - // if we're logged in - ev.buffer() - didStartBuffer = true - } - ev.emit('connection.update', { connection: 'connecting', receivedPendingNotifications: false, qr: undefined }) }) @@ -1004,10 +996,6 @@ export const makeSocket = (config: SocketConfig) => { const offlineNotifs = +(child?.attrs.count || 0) logger.info(`handled ${offlineNotifs} offline messages/notifications`) - if (didStartBuffer) { - ev.flush() - logger.trace('flushed events for initial buffer') - } ev.emit('connection.update', { receivedPendingNotifications: true }) }) diff --git a/src/Utils/messages-media.ts b/src/Utils/messages-media.ts index 9dd82d72c25..db3b4d67fe2 100644 --- a/src/Utils/messages-media.ts +++ b/src/Utils/messages-media.ts @@ -57,8 +57,8 @@ export const getRawMediaUploadData = async (media: WAMediaUpload, mediaType: Med const hasher = Crypto.createHash('sha256') const filePath = join(tmpdir(), mediaType + generateMessageIDV2()) const fileWriteStream = createWriteStream(filePath) - let fileLength = 0 + try { for await (const data of stream) { fileLength += data.length @@ -68,24 +68,20 @@ export const getRawMediaUploadData = async (media: WAMediaUpload, mediaType: Med } } - fileWriteStream.end() - await once(fileWriteStream, 'finish') - stream.destroy() + await waitForCloseOrFinishStream(fileWriteStream) + const fileSha256 = hasher.digest() logger?.debug('hashed data for raw upload') + return { filePath: filePath, fileSha256, fileLength } } catch (error) { - fileWriteStream.destroy() - stream.destroy() - try { - await fs.unlink(filePath) - } catch { - // - } + try { stream.destroy() } catch { } + await waitForCloseOrFinishStream(fileWriteStream) + try { await fs.unlink(filePath) } catch { } throw error } @@ -381,6 +377,13 @@ type EncryptedStreamOptions = { opts?: RequestInit } +const waitForCloseOrFinishStream = async (stream?: WriteStream) => { + if (!stream) return; + try { if (!stream.writableEnded) stream.end(); } catch { } + if (stream.destroyed || stream.closed || stream.writableFinished) return; + try { await Promise.race([once(stream, 'finish'), once(stream, 'close')]); } catch { } +} + export const encryptedStream = async ( media: WAMediaUpload, mediaType: MediaType, @@ -410,10 +413,12 @@ export const encryptedStream = async ( const sha256Plain = Crypto.createHash('sha256') const sha256Enc = Crypto.createHash('sha256') - const onChunk = (buff: Buffer) => { + const writeChunk = async (buff: Buffer) => { + if (!encFileWriteStream.write(buff)) { + await once(encFileWriteStream, 'drain') + } sha256Enc.update(buff) hmac.update(buff) - encFileWriteStream.write(buff) } try { @@ -437,10 +442,10 @@ export const encryptedStream = async ( } sha256Plain.update(data) - onChunk(aes.update(data)) + await writeChunk(aes.update(data)) } - onChunk(aes.final()) + await writeChunk(aes.final()) const mac = hmac.digest().slice(0, 10) sha256Enc.update(mac) @@ -448,11 +453,14 @@ export const encryptedStream = async ( const fileSha256 = sha256Plain.digest() const fileEncSha256 = sha256Enc.digest() - encFileWriteStream.write(mac) + if (!encFileWriteStream.write(mac)) { + await once(encFileWriteStream, 'drain') + } - encFileWriteStream.end() - originalFileStream?.end?.() - stream.destroy() + await Promise.allSettled([ + waitForCloseOrFinishStream(encFileWriteStream), + waitForCloseOrFinishStream(originalFileStream) + ]) logger?.debug('encrypted data successfully') @@ -466,15 +474,13 @@ export const encryptedStream = async ( fileLength } } catch (error) { - // destroy all streams with error - encFileWriteStream.destroy() - originalFileStream?.destroy?.() - aes.destroy() - hmac.destroy() - sha256Plain.destroy() - sha256Enc.destroy() - stream.destroy() - + try { stream.destroy() } catch { } + try { + await Promise.allSettled([ + waitForCloseOrFinishStream(encFileWriteStream), + waitForCloseOrFinishStream(originalFileStream) + ]) + } catch { } try { await fs.unlink(encFilePath) if (originalFilePath) { @@ -483,6 +489,10 @@ export const encryptedStream = async ( } catch (err) { logger?.error({ err }, 'failed deleting tmp files') } + try { aes.destroy() } catch { } + try { hmac.destroy() } catch { } + try { sha256Plain.destroy() } catch { } + try { sha256Enc.destroy() } catch { } throw error } diff --git a/src/Utils/process-message.ts b/src/Utils/process-message.ts index e6988480dcf..fedd836d3a4 100644 --- a/src/Utils/process-message.ts +++ b/src/Utils/process-message.ts @@ -264,15 +264,15 @@ const processMessage = async ( const process = shouldProcessHistoryMsg const isLatest = !creds.processedHistoryMessages?.length - logger?.info( - { - histNotification, - process, - id: message.key.id, - isLatest - }, - 'got history notification' - ) + //logger?.info( + // { + // histNotification, + // process, + // id: message.key.id, + // isLatest + // }, + // 'got history notification' + //) if (process) { // TODO: investigate