Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Simplex/FileTransfer/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ runXFTPRcvWorker c srv Worker {doWork} = do
agentXFTPDownloadChunk c userId digest replica chunkSpec
liftIO $ waitUntilForeground c
(entityId, complete, progress) <- withStore c $ \db -> runExceptT $ do
liftIO $ lockRcvFileForUpdate db rcvFileId
liftIO $ updateRcvFileChunkReceived db (rcvChunkReplicaId replica) rcvChunkId relChunkPath
RcvFile {size = FileSize currentSize, chunks, redirect} <- ExceptT $ getRcvFile db rcvFileId
let rcvd = receivedSize chunks
Expand Down Expand Up @@ -413,6 +414,7 @@ runXFTPSndPrepareWorker c Worker {doWork} = do
withStore' c $ \db -> updateSndFileStatus db sndFileId SFSEncrypting
(digest, chunkSpecsDigests) <- encryptFileForUpload sndFile fsEncPath
withStore c $ \db -> do
lockSndFileForUpdate db sndFileId
updateSndFileEncrypted db sndFileId digest chunkSpecsDigests
getSndFile db sndFileId
else pure sndFile
Expand Down Expand Up @@ -530,6 +532,7 @@ runXFTPSndWorker c srv Worker {doWork} = do
agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec'
liftIO $ waitUntilForeground c
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
lockSndFileForUpdate db sndFileId
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
getSndFile db sndFileId
let uploaded = uploadedSize chunks
Expand Down
23 changes: 15 additions & 8 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,7 +1129,8 @@ startJoinInvitation c userId connId sq_ enableNtfs cReqUri pqSup =
let cData = ConnData {userId, connId, connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport}
case sq_ of
Just sq@SndQueue {e2ePubKey = Just _k} -> do
e2eSndParams <- withStore c $ \db ->
e2eSndParams <- withStore c $ \db -> do
lockConnForUpdate db connId
getSndRatchet db connId v >>= \case
Right r -> pure $ Right $ snd r
Left e -> do
Expand All @@ -1143,6 +1144,7 @@ startJoinInvitation c userId connId sq_ enableNtfs cReqUri pqSup =
sndKey_ = snd <$> invLink_
(q, _) <- lift $ newSndQueue userId "" qInfo sndKey_
withStore c $ \db -> runExceptT $ do
liftIO $ lockConnForUpdate db connId
e2eSndParams <- createRatchet_ db g maxSupported pqSupport e2eRcvParams
sq' <- maybe (ExceptT $ updateNewConnSnd db connId q) pure sq_
pure (cData, sq', e2eSndParams, lnkId_)
Expand Down Expand Up @@ -1221,7 +1223,8 @@ joinConnSrv c nm userId connId enableNtfs cReqUri@CRContactUri {} cInfo pqSup su
AgentConfig {smpClientVRange = vr, smpAgentVRange, e2eEncryptVRange = e2eVR} <- asks config
let qUri = SMPQueueUri vr $ (rcvSMPQueueAddress rq) {queueMode = Just QMMessaging}
crData = ConnReqUriData SSSimplex smpAgentVRange [qUri] Nothing
e2eRcvParams <- withStore' c $ \db ->
e2eRcvParams <- withStore' c $ \db -> do
lockConnForUpdate db connId
getRatchetX3dhKeys db connId >>= \case
Right keys -> pure $ CR.mkRcvE2ERatchetParams (maxVersion e2eVR) keys
Left e -> do
Expand Down Expand Up @@ -1937,7 +1940,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
withRetryLock2 ri' qLock $ \riState loop -> do
liftIO $ waitWhileSuspended c
liftIO $ waitForUserNetwork c
resp <- tryError $ case msgType of
resp <- tryAllErrors $ case msgType of
AM_CONN_INFO -> sendConfirmation c NRMBackground sq msgBody
AM_CONN_INFO_REPLY -> sendConfirmation c NRMBackground sq msgBody
_ -> case pendingMsgPrepData_ of
Expand Down Expand Up @@ -2077,10 +2080,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} sq@SndQueue {userId, connId, server,
notifyDelMsgs :: InternalId -> AgentErrorType -> UTCTime -> AM ()
notifyDelMsgs msgId err expireTs = do
notifyDel msgId $ MERR (unId msgId) err
msgIds_ <- withStore' c $ \db -> getExpiredSndMessages db connId sq expireTs
msgIds_ <- withStore' c $ \db -> do
msgIds_ <- getExpiredSndMessages db connId sq expireTs
forM_ msgIds_ $ \msgId' -> deleteSndMsgDelivery db connId sq msgId' False `catchAll_` pure ()
pure msgIds_
forM_ (L.nonEmpty msgIds_) $ \msgIds -> do
notify $ MERRS (L.map unId msgIds) err
withStore' c $ \db -> forM_ msgIds $ \msgId' -> deleteSndMsgDelivery db connId sq msgId' False `catchAll_` pure ()
atomically $ incSMPServerStat' c userId server sentExpiredErrs (length msgIds_ + 1)
delMsg :: InternalId -> AM ()
delMsg = delMsgKeep False
Expand Down Expand Up @@ -3005,7 +3010,8 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
throwE e
agentClientMsg :: TVar ChaChaDRG -> ByteString -> AM (Maybe (InternalId, MsgMeta, AMessage, CR.RatchetX448))
agentClientMsg g encryptedMsgHash = withStore c $ \db -> runExceptT $ do
rc <- ExceptT $ getRatchet db connId -- ratchet state pre-decryption - required for processing EREADY
liftIO $ lockConnForUpdate db connId
rc <- ExceptT $ getRatchetForUpdate db connId -- ratchet state pre-decryption - required for processing EREADY
(agentMsgBody, pqEncryption) <- agentRatchetDecrypt' g db connId rc encAgentMessage
liftEither (parse smpP (SEAgentError $ AGENT A_MESSAGE) agentMsgBody) >>= \case
agentMsg@(AgentMessage APrivHeader {sndMsgId, prevMsgHash} aMessage) -> do
Expand Down Expand Up @@ -3240,6 +3246,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
Just sqs' -> do
(sq_@SndQueue {sndPrivateKey}, dhPublicKey) <- lift $ newSndQueue userId connId qInfo Nothing
sq2 <- withStore c $ \db -> do
lockConnForUpdate db connId
liftIO $ mapM_ (deleteConnSndQueue db connId) delSqs
addConnSndQueue db connId (sq_ :: NewSndQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
logServer "<--" c srv rId $ "MSG <QADD>:" <> logSecret' srvMsgId <> " " <> logSecret (senderId queueAddress)
Expand Down Expand Up @@ -3544,7 +3551,7 @@ agentRatchetEncrypt db cData msg getPaddedLen pqEnc_ currentE2EVersion = do

agentRatchetEncryptHeader :: DB.Connection -> ConnData -> (VersionSMPA -> PQSupport -> Int) -> Maybe PQEncryption -> CR.VersionE2E -> ExceptT StoreError IO (CR.MsgEncryptKeyX448, Int, PQEncryption)
agentRatchetEncryptHeader db ConnData {connId, connAgentVersion = v, pqSupport} getPaddedLen pqEnc_ currentE2EVersion = do
rc <- ExceptT $ getRatchet db connId
rc <- ExceptT $ getRatchetForUpdate db connId
let paddedLen = getPaddedLen v pqSupport
(mek, rc') <- withExceptT (SEAgentError . cryptoError) $ CR.rcEncryptHeader rc pqEnc_ currentE2EVersion
liftIO $ updateRatchet db connId rc' CR.SMDNoChange
Expand All @@ -3553,7 +3560,7 @@ agentRatchetEncryptHeader db ConnData {connId, connAgentVersion = v, pqSupport}
-- encoded EncAgentMessage -> encoded AgentMessage
agentRatchetDecrypt :: TVar ChaChaDRG -> DB.Connection -> ConnId -> ByteString -> ExceptT StoreError IO (ByteString, PQEncryption)
agentRatchetDecrypt g db connId encAgentMsg = do
rc <- ExceptT $ getRatchet db connId
rc <- ExceptT $ getRatchetForUpdate db connId
agentRatchetDecrypt' g db connId rc encAgentMsg

agentRatchetDecrypt' :: TVar ChaChaDRG -> DB.Connection -> ConnId -> CR.RatchetX448 -> ByteString -> ExceptT StoreError IO (ByteString, PQEncryption)
Expand Down
33 changes: 18 additions & 15 deletions src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2114,39 +2114,42 @@ withWork :: AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError (
withWork c doWork = withWork_ c doWork . withStore' c
{-# INLINE withWork #-}

-- setting doWork flag to "no work" before getWork rather than after prevents race condition when flag is set to "has work" by another thread after getWork call.
withWork_ :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' (Maybe a)) -> (a -> ExceptT e m ()) -> ExceptT e m ()
withWork_ c doWork getWork action =
getWork >>= \case
Right (Just r) -> action r
Right Nothing -> noWork
-- worker is stopped here (noWork) because the next iteration is likely to produce the same result
noWork >> getWork >>= \case
Right (Just r) -> hasWork >> action r
Right Nothing -> pure ()
Left e
| isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e
| otherwise -> notifyErr INTERNAL e
| isWorkItemError e -> notifyErr (CRITICAL False) e -- worker remains stopped here because the next iteration is likely to produce the same result
| otherwise -> hasWork >> notifyErr INTERNAL e
where
hasWork = atomically $ hasWorkToDo' doWork
noWork = liftIO $ noWorkToDo doWork
notifyErr err e = do
logError $ "withWork_ error: " <> tshow e
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)

withWorkItems :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' [Either e' a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m ()
withWorkItems c doWork getWork action = do
getWork >>= \case
Right [] -> noWork
noWork >> getWork >>= \case
Right [] -> pure ()
Right rs -> do
let (errs, items) = partitionEithers rs
case L.nonEmpty items of
Just items' -> action items'
Just items' -> hasWork >> action items'
Nothing -> do
let criticalErr = find isWorkItemError errs
forM_ criticalErr $ \err -> do
notifyErr (CRITICAL False) err
when (all isWorkItemError errs) noWork
case find isWorkItemError errs of
Nothing -> hasWork
Just err -> do
notifyErr (CRITICAL False) err
unless (all isWorkItemError errs) hasWork
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map (\e -> ("", INTERNAL $ show e))
Left e
| isWorkItemError e -> noWork >> notifyErr (CRITICAL False) e
| otherwise -> notifyErr INTERNAL e
| isWorkItemError e -> notifyErr (CRITICAL False) e
| otherwise -> hasWork >> notifyErr INTERNAL e
where
hasWork = atomically $ hasWorkToDo' doWork
noWork = liftIO $ noWorkToDo doWork
notifyErr err e = do
logError $ "withWorkItems error: " <> tshow e
Expand Down
Loading
Loading