Skip to content

Commit 7b7fe51

Browse files
committed
agent: lock rows for concurrent queries in PostgreSQL
1 parent a7b43b1 commit 7b7fe51

File tree

8 files changed

+223
-94
lines changed

8 files changed

+223
-94
lines changed

src/Simplex/FileTransfer/Agent.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
223223
agentXFTPDownloadChunk c userId digest replica chunkSpec
224224
liftIO $ waitUntilForeground c
225225
(entityId, complete, progress) <- withStore c $ \db -> runExceptT $ do
226+
liftIO $ lockRcvFileForUpdate db rcvFileId
226227
liftIO $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId relChunkPath
227228
RcvFile {size = FileSize currentSize, chunks, redirect} <- ExceptT $ getRcvFile db rcvFileId
228229
let rcvd = receivedSize chunks
@@ -413,6 +414,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
413414
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSEncrypting
414415
(digest, chunkSpecsDigests) <- encryptFileForUpload sndFile fsEncPath
415416
withStore c $ \db -> do
417+
lockSndFileForUpdate db sndFileId
416418
updateSndFileEncrypted db sndFileId digest chunkSpecsDigests
417419
getSndFile db sndFileId
418420
else pure sndFile
@@ -530,6 +532,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
530532
agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec'
531533
liftIO $ waitUntilForeground c
532534
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
535+
lockSndFileForUpdate db sndFileId
533536
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
534537
getSndFile db sndFileId
535538
let uploaded = uploadedSize chunks

src/Simplex/Messaging/Agent.hs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,7 +1129,8 @@ startJoinInvitation c userId connId sq_ enableNtfs cReqUri pqSup =
11291129
let cData = ConnData {userId, connId, connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport}
11301130
case sq_ of
11311131
Just sq@SndQueue {e2ePubKey = Just _k} -> do
1132-
e2eSndParams <- withStore c $ \db ->
1132+
e2eSndParams <- withStore c $ \db -> do
1133+
lockConnForUpdate db connId
11331134
getSndRatchet db connId v >>= \case
11341135
Right r -> pure $ Right $ snd r
11351136
Left e -> do
@@ -1143,6 +1144,7 @@ startJoinInvitation c userId connId sq_ enableNtfs cReqUri pqSup =
11431144
sndKey_ = snd <$> invLink_
11441145
(q, _) <- lift $ newSndQueue userId "" qInfo sndKey_
11451146
withStore c $ \db -> runExceptT $ do
1147+
liftIO $ lockConnForUpdate db connId
11461148
e2eSndParams <- createRatchet_ db g maxSupported pqSupport e2eRcvParams
11471149
sq' <- maybe (ExceptT $ updateNewConnSnd db connId q) pure sq_
11481150
pure (cData, sq', e2eSndParams, lnkId_)
@@ -1221,7 +1223,8 @@ joinConnSrv c nm userId connId enableNtfs cReqUri@CRContactUri {} cInfo pqSup su
12211223
AgentConfig {smpClientVRange = vr, smpAgentVRange, e2eEncryptVRange = e2eVR} <- asks config
12221224
let qUri = SMPQueueUri vr $ (rcvSMPQueueAddress rq) {queueMode = Just QMMessaging}
12231225
crData = ConnReqUriData SSSimplex smpAgentVRange [qUri] Nothing
1224-
e2eRcvParams <- withStore' c $ \db ->
1226+
e2eRcvParams <- withStore' c $ \db -> do
1227+
lockConnForUpdate db connId
12251228
getRatchetX3dhKeys db connId >>= \case
12261229
Right keys -> pure $ CR.mkRcvE2ERatchetParams (maxVersion e2eVR) keys
12271230
Left e -> do
@@ -2077,10 +2080,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
20772080
notifyDelMsgs :: InternalId -> AgentErrorType -> UTCTime -> AM ()
20782081
notifyDelMsgs msgId err expireTs = do
20792082
notifyDel msgId $ MERR (unId msgId) err
2080-
msgIds_ <- withStore' c $ \db -> getExpiredSndMessages db connId sq expireTs
2083+
msgIds_ <- withStore' c $ \db -> do
2084+
msgIds_ <- getExpiredSndMessages db connId sq expireTs
2085+
forM_ msgIds_ $ \msgId' -> deleteSndMsgDelivery db connId sq msgId' False `catchAll_` pure ()
2086+
pure msgIds_
20812087
forM_ (L.nonEmpty msgIds_) $ \msgIds -> do
20822088
notify $ MERRS (L.map unId msgIds) err
2083-
withStore' c $ \db -> forM_ msgIds $ \msgId' -> deleteSndMsgDelivery db connId sq msgId' False `catchAll_` pure ()
20842089
atomically $ incSMPServerStat' c userId server sentExpiredErrs (length msgIds_ + 1)
20852090
delMsg :: InternalId -> AM ()
20862091
delMsg = delMsgKeep False
@@ -3005,7 +3010,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
30053010
throwE e
30063011
agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448))
30073012
agentClientMsg g encryptedMsgHash = withStore c $ \db -> runExceptT $ do
3008-
rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY
3013+
liftIO $ lockConnForUpdate db connId
3014+
rc <- ExceptT $ getRatchetForUpdate db connId -- ratchet state pre-decryption - required for processing EREADY
30093015
(agentMsgBody, pqEncryption) <- agentRatchetDecrypt' g db connId rc encAgentMessage
30103016
liftEither (parse smpP (SEAgentError $ AGENT A_MESSAGE) agentMsgBody) >>= \case
30113017
agentMsg@(AgentMessage APrivHeader {sndMsgId, prevMsgHash} aMessage) -> do
@@ -3240,6 +3246,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
32403246
Just sqs' -> do
32413247
(sq_@SndQueue {sndPrivateKey}, dhPublicKey) <- lift $ newSndQueue userId connId qInfo Nothing
32423248
sq2 <- withStore c $ \db -> do
3249+
lockConnForUpdate db connId
32433250
liftIO $ mapM_ (deleteConnSndQueue db connId) delSqs
32443251
addConnSndQueue db connId (sq_ :: NewSndQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
32453252
logServer "<--" c srv rId $ "MSG <QADD>:" <> logSecret' srvMsgId <> " " <> logSecret (senderId queueAddress)
@@ -3544,7 +3551,7 @@ agentRatchetEncrypt db cData msg getPaddedLen pqEnc_ currentE2EVersion = do
35443551

35453552
agentRatchetEncryptHeader :: DB.Connection -> ConnData -> (VersionSMPA -> PQSupport -> Int) -> Maybe PQEncryption -> CR.VersionE2E -> ExceptT StoreError IO (CR.MsgEncryptKeyX448, Int, PQEncryption)
35463553
agentRatchetEncryptHeader db ConnData {connId, connAgentVersion = v, pqSupport} getPaddedLen pqEnc_ currentE2EVersion = do
3547-
rc <- ExceptT $ getRatchet db connId
3554+
rc <- ExceptT $ getRatchetForUpdate db connId
35483555
let paddedLen = getPaddedLen v pqSupport
35493556
(mek, rc') <- withExceptT (SEAgentError . cryptoError) $ CR.rcEncryptHeader rc pqEnc_ currentE2EVersion
35503557
liftIO $ updateRatchet db connId rc' CR.SMDNoChange
@@ -3553,7 +3560,7 @@ agentRatchetEncryptHeader db ConnData {connId, connAgentVersion = v, pqSupport}
35533560
-- encoded EncAgentMessage -> encoded AgentMessage
35543561
agentRatchetDecrypt :: TVar ChaChaDRG -> DB.Connection -> ConnId -> ByteString -> ExceptT StoreError IO (ByteString, PQEncryption)
35553562
agentRatchetDecrypt g db connId encAgentMsg = do
3556-
rc <- ExceptT $ getRatchet db connId
3563+
rc <- ExceptT $ getRatchetForUpdate db connId
35573564
agentRatchetDecrypt' g db connId rc encAgentMsg
35583565

35593566
agentRatchetDecrypt' :: TVar ChaChaDRG -> DB.Connection -> ConnId -> CR.RatchetX448 -> ByteString -> ExceptT StoreError IO (ByteString, PQEncryption)

0 commit comments

Comments
 (0)