diff --git a/rfcs/2025-03-30-ios-notifications-3.md b/rfcs/2025-03-30-ios-notifications-3.md index 441525767..7fbf19a8b 100644 --- a/rfcs/2025-03-30-ios-notifications-3.md +++ b/rfcs/2025-03-30-ios-notifications-3.md @@ -46,7 +46,7 @@ NSUB :: Maybe NtfServerId -> Command Notifier -- subscribe to notificaions from all queues associated with the server -- should be signed with server key -- entity ID - NtfServerId -NSSUB :: Command NtfServer +NRDY :: Command NtfServer data NtfServerCreds = NtfServerCreds { server :: NtfServer, diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index 1a7a67806..e8f5a03e2 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -358,7 +358,7 @@ smpSubscribeQueues party ca smp srv subs = do pending <- maybe (pure M.empty) readTVar =<< TM.lookup srv (pendingSrvSubs ca) let acc@(_, _, oks, notPending) = foldr (groupSub pending) (False, [], [], []) (L.zip subs rs) unless (null oks) $ addSubscriptions ca srv party oks - unless (null notPending) $ removePendingSubs ca srv party notPending + unless (null notPending) $ removePendingSubs ca srv party $ S.fromList notPending pure acc sessId = sessionId $ thParams smp groupSub :: Map SMPSub C.APrivateAuthKey -> ((QueueId, C.APrivateAuthKey), Either SMPClientError ()) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]) @@ -412,14 +412,22 @@ removeSubscription :: SMPClientAgent -> SMPServer -> SMPSub -> STM () removeSubscription = removeSub_ . srvSubs {-# INLINE removeSubscription #-} +removePendingSub :: SMPClientAgent -> SMPServer -> SMPSub -> STM () +removePendingSub = removeSub_ . pendingSrvSubs +{-# INLINE removePendingSub #-} + removeSub_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSub -> STM () removeSub_ subs srv s = TM.lookup srv subs >>= mapM_ (TM.delete s) -removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> [QueueId] -> STM () +removeSubscriptions :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM () +removeSubscriptions = removeSubs_ . srvSubs +{-# INLINE removeSubscriptions #-} + +removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM () removePendingSubs = removeSubs_ . pendingSrvSubs {-# INLINE removePendingSubs #-} -removeSubs_ :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey) -> SMPServer -> SMPSubParty -> [QueueId] -> STM () +removeSubs_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSubParty -> Set QueueId -> STM () removeSubs_ subs srv party qs = TM.lookup srv subs >>= mapM_ (`modifyTVar'` (`M.withoutKeys` ss)) where - ss = S.fromList $ map (party,) qs + ss = S.map (party,) qs diff --git a/src/Simplex/Messaging/Crypto.hs b/src/Simplex/Messaging/Crypto.hs index e3326b98a..f043544db 100644 --- a/src/Simplex/Messaging/Crypto.hs +++ b/src/Simplex/Messaging/Crypto.hs @@ -179,8 +179,6 @@ module Simplex.Messaging.Crypto unPad, -- * X509 Certificates - SignedCertificate, - Certificate, signCertificate, signX509, verifyX509, diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 84aebf9db..6a310afaf 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -691,9 +691,10 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu TDEL -> do logDebug "TDEL" st <- asks store - qs <- atomically $ deleteNtfToken st tknId - forM_ qs $ \SMPQueueNtf {smpServer, notifierId} -> - atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) + ss <- atomically $ deleteNtfToken st tknId + forM_ (M.assocs ss) $ \(smpServer, nIds) -> do + atomically $ removeSubscriptions ca smpServer SPNotifier nIds + atomically $ removePendingSubs ca smpServer SPNotifier nIds cancelInvervalNotifications tknId withNtfLog (`logDeleteToken` tknId) incNtfStatT token tknDeleted @@ -756,6 +757,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu st <- asks store atomically $ deleteNtfSubscription st subId atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) + atomically $ removePendingSub ca smpServer (SPNotifier, notifierId) withNtfLog (`logDeleteSubscription` subId) incNtfStat subDeleted pure NROk diff --git a/src/Simplex/Messaging/Notifications/Server/Store.hs b/src/Simplex/Messaging/Notifications/Server/Store.hs index 259a933b6..e4de7d37e 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store.hs @@ -7,6 +7,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ScopedTypeVariables #-} module Simplex.Messaging.Notifications.Server.Store where @@ -16,15 +17,15 @@ import Control.Monad import Data.ByteString.Char8 (ByteString) import Data.Functor (($>)) import Data.List.NonEmpty (NonEmpty (..), (<|)) +import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes) import Data.Set (Set) import qualified Data.Set as S import Data.Word (Word16) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol -import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer) +import Simplex.Messaging.Protocol (NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer) import Simplex.Messaging.Server.QueueStore (RoundedSystemTime) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -35,8 +36,11 @@ data NtfStore = NtfStore -- multiple registrations exist to protect from malicious registrations if token is compromised tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId), subscriptions :: TMap NtfSubscriptionId NtfSubData, - tokenSubscriptions :: TMap NtfTokenId (TVar (Set NtfSubscriptionId)), - subscriptionLookup :: TMap SMPQueueNtf NtfSubscriptionId, + -- the first set is used to delete from `subscriptions` when token is deleted, the second - to cancel SMP subsriptions. + -- TODO [notifications] it can be simplified once NtfSubData is fully removed. + tokenSubscriptions :: TMap NtfTokenId (TMap SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId))), + -- TODO [notifications] for subscriptions that "migrated" to server subscription, we may replace NtfSubData with NtfTokenId here (Either NtfSubData NtfTokenId). + subscriptionLookup :: TMap SMPServer (TMap NotifierId NtfSubData), tokenLastNtfs :: TMap NtfTokenId (TVar (NonEmpty PNMessageData)) } @@ -134,7 +138,7 @@ removeTokenRegistration st NtfTknData {ntfTknId = tId, token, tknVerifyKey} = >>= mapM_ (\tId' -> when (tId == tId') $ TM.delete k regs) k = C.toPubKey C.pubKeyBytes tknVerifyKey -deleteNtfToken :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf] +deleteNtfToken :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId)) deleteNtfToken st tknId = do void $ TM.lookupDelete tknId (tokens st) $>>= \NtfTknData {token, tknVerifyKey} -> @@ -147,25 +151,25 @@ deleteNtfToken st tknId = do regs = tokenRegistrations st regKey = C.toPubKey C.pubKeyBytes -deleteTokenSubs :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf] +deleteTokenSubs :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId)) deleteTokenSubs st tknId = do - qs <- - TM.lookupDelete tknId (tokenSubscriptions st) - >>= mapM (readTVar >=> mapM deleteSub . S.toList) - pure $ maybe [] catMaybes qs + TM.lookupDelete tknId (tokenSubscriptions st) + >>= maybe (pure M.empty) (readTVar >=> deleteSrvSubs) where - deleteSub subId = do - TM.lookupDelete subId (subscriptions st) - $>>= \NtfSubData {smpQueue} -> - TM.delete smpQueue (subscriptionLookup st) $> Just smpQueue + deleteSrvSubs :: Map SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM (Map SMPServer (Set NotifierId)) + deleteSrvSubs = M.traverseWithKey $ \smpServer (sVar, nVar) -> do + sIds <- readTVar sVar + modifyTVar' (subscriptions st) (`M.withoutKeys` sIds) + nIds <- readTVar nVar + TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (`modifyTVar'` (`M.withoutKeys` nIds)) + pure nIds getNtfSubscriptionIO :: NtfStore -> NtfSubscriptionId -> IO (Maybe NtfSubData) getNtfSubscriptionIO st subId = TM.lookupIO subId (subscriptions st) findNtfSubscription :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfSubData) -findNtfSubscription st smpQueue = do - TM.lookup smpQueue (subscriptionLookup st) - $>>= \subId -> TM.lookup subId (subscriptions st) +findNtfSubscription st SMPQueueNtf {smpServer, notifierId} = + TM.lookup smpServer (subscriptionLookup st) $>>= TM.lookup notifierId findNtfSubscriptionToken :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfTknData) findNtfSubscriptionToken st smpQueue = do @@ -183,30 +187,44 @@ mkNtfSubData ntfSubId (NewNtfSub tokenId smpQueue notifierKey) = do subStatus <- newTVar NSNew pure NtfSubData {ntfSubId, smpQueue, tokenId, subStatus, notifierKey} -addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe ()) -addNtfSubscription st subId sub@NtfSubData {smpQueue, tokenId} = - TM.lookup tokenId (tokenSubscriptions st) >>= maybe newTokenSub pure >>= insertSub +addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe NtfSubData) +addNtfSubscription st subId sub@NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = + TM.lookup tokenId (tokenSubscriptions st) + >>= maybe newTokenSubs pure + >>= \ts -> TM.lookup smpServer ts + >>= maybe (newTokenSrvSubs ts) pure + >>= insertSub where - newTokenSub = do - ts <- newTVar S.empty + newTokenSubs = do + ts <- newTVar M.empty TM.insert tokenId ts $ tokenSubscriptions st pure ts - insertSub ts = do - modifyTVar' ts $ S.insert subId + newTokenSrvSubs ts = do + tss <- (,) <$> newTVar S.empty <*> newTVar S.empty + TM.insert smpServer tss ts + pure tss + insertSub :: (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM (Maybe NtfSubData) + insertSub (sIds, nIds) = do + modifyTVar' sIds $ S.insert subId + modifyTVar' nIds $ S.insert notifierId TM.insert subId sub $ subscriptions st - TM.insert smpQueue subId (subscriptionLookup st) - -- return Nothing if subscription existed before - pure $ Just () + TM.lookup smpServer (subscriptionLookup st) + >>= maybe newSubs pure + >>= TM.lookupInsert notifierId sub + newSubs = do + ss <- newTVar M.empty + TM.insert smpServer ss $ subscriptionLookup st + pure ss deleteNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM () -deleteNtfSubscription st subId = do - TM.lookupDelete subId (subscriptions st) - >>= mapM_ - ( \NtfSubData {smpQueue, tokenId} -> do - TM.delete smpQueue $ subscriptionLookup st - ts_ <- TM.lookup tokenId (tokenSubscriptions st) - forM_ ts_ $ \ts -> modifyTVar' ts $ S.delete subId - ) +deleteNtfSubscription st subId = TM.lookupDelete subId (subscriptions st) >>= mapM_ deleteSubIndices + where + deleteSubIndices NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = do + TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (TM.delete notifierId) + tss_ <- (TM.lookup tokenId (tokenSubscriptions st) $>>= TM.lookup smpServer) + forM_ tss_ $ \(sIds, nIds) -> do + modifyTVar' sIds $ S.delete subId + modifyTVar' nIds $ S.delete notifierId addTokenLastNtf :: NtfStore -> NtfTokenId -> PNMessageData -> IO (NonEmpty PNMessageData) addTokenLastNtf st tknId newNtf = diff --git a/src/Simplex/Messaging/Notifications/Server/StoreLog.hs b/src/Simplex/Messaging/Notifications/Server/StoreLog.hs index fa0ae373c..2eae72670 100644 --- a/src/Simplex/Messaging/Notifications/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Notifications/Server/StoreLog.hs @@ -29,6 +29,7 @@ import Control.Concurrent.STM import Control.Logger.Simple import Control.Monad import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Base64 as B64 import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import qualified Data.Text as T @@ -37,10 +38,10 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Store -import Simplex.Messaging.Protocol (NtfPrivateAuthKey) +import Simplex.Messaging.Protocol (EntityId (..), NtfPrivateAuthKey) import Simplex.Messaging.Server.QueueStore (RoundedSystemTime) import Simplex.Messaging.Server.StoreLog -import Simplex.Messaging.Util (safeDecodeUtf8) +import Simplex.Messaging.Util (safeDecodeUtf8, tshow) import System.IO data NtfStoreLogRecord @@ -235,7 +236,8 @@ readNtfStore f st = mapM_ (addNtfLogRecord . LB.toStrict) . LB.lines =<< LB.read >>= mapM_ (\NtfTknData {tknUpdatedAt} -> atomically $ writeTVar tknUpdatedAt $ Just t) CreateSubscription r@NtfSubRec {ntfSubId} -> do sub <- mkSubData r - void $ atomically $ addNtfSubscription st ntfSubId sub + atomically (addNtfSubscription st ntfSubId sub) + >>= mapM_ (\_ -> logWarn $ "subscription " <> tshow (B64.encode $ unEntityId ntfSubId) <> " already exists") SubscriptionStatus subId status -> do getNtfSubscriptionIO st subId >>= mapM_ (\NtfSubData {subStatus} -> atomically $ writeTVar subStatus status) diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index cb2eea43b..e71be5c19 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -116,6 +116,8 @@ module Simplex.Messaging.Protocol SenderId, LinkId, NotifierId, + NtfServerHost, + NtfServerCreds (..), RcvPrivateAuthKey, RcvPublicAuthKey, RcvPublicDhKey, @@ -297,7 +299,7 @@ e2eEncMessageLength :: Int e2eEncMessageLength = 16000 -- 15988 .. 16005 -- | SMP protocol clients -data Party = Recipient | Sender | Notifier | LinkClient | ProxiedClient +data Party = Recipient | Sender | Notifier | LinkClient | ProxiedClient | NtfSrvClient deriving (Show) -- | Singleton types for SMP protocol clients @@ -307,6 +309,7 @@ data SParty :: Party -> Type where SNotifier :: SParty Notifier SSenderLink :: SParty LinkClient SProxiedClient :: SParty ProxiedClient + SNtfSrvClient :: SParty NtfSrvClient instance TestEquality SParty where testEquality SRecipient SRecipient = Just Refl @@ -314,6 +317,7 @@ instance TestEquality SParty where testEquality SNotifier SNotifier = Just Refl testEquality SSenderLink SSenderLink = Just Refl testEquality SProxiedClient SProxiedClient = Just Refl + testEquality SNtfSrvClient SNtfSrvClient = Just Refl testEquality _ _ = Nothing deriving instance Show (SParty p) @@ -330,6 +334,8 @@ instance PartyI LinkClient where sParty = SSenderLink instance PartyI ProxiedClient where sParty = SProxiedClient +instance PartyI NtfSrvClient where sParty = SNtfSrvClient + type family DirectParty (p :: Party) :: Constraint where DirectParty Recipient = () DirectParty Sender = () @@ -400,6 +406,11 @@ type NotifierId = QueueId type LinkId = QueueId +-- A server transport host will be used as entity ID in transmissions. +-- While we could assign some IDs, that requires some unnecessary tracking of these IDs, +-- both in SMP and in Ntf servers, so it's simpler to just use transport host. +type NtfServerHost = EntityId + -- | SMP queue ID on the server. type QueueId = EntityId @@ -442,6 +453,8 @@ data Command (p :: Party) where LGET :: Command LinkClient -- SMP notification subscriber commands NSUB :: Command Notifier + NSRV :: NtfServerCreds -> Command NtfSrvClient + NRDY :: Command NtfSrvClient -- NtfServerHost is used as transmission entity ID PRXY :: SMPServer -> Maybe BasicAuth -> Command ProxiedClient -- request a relay server connection by URI -- Transmission to proxy: -- - entity ID: ID of the session with relay returned in PKEY (response to PRXY) @@ -544,6 +557,23 @@ instance Encoding QueueReqData where -- smpEncode (NewNtfCreds authKey dhKey) = smpEncode (authKey, dhKey) -- smpP = NewNtfCreds <$> smpP <*> smpP +data NtfServerCreds = NtfServerCreds + { ntfServer :: NtfServer, + -- ntf server certificate chain that should match fingerprint in address + -- and Ed25519 key to verify server command NRDY, signed by key from certificate. + ntfAuthPubKey :: (X.CertificateChain, X.SignedExact X.PubKey) + } + deriving (Show) + +instance Encoding NtfServerCreds where + smpEncode NtfServerCreds {ntfServer, ntfAuthPubKey = (cert, key)} = + smpEncode (ntfServer, C.encodeCertChain cert, C.SignedObject key) + smpP = do + ntfServer <- smpP + cert <- C.certChainP + C.SignedObject key <- smpP + pure NtfServerCreds {ntfServer, ntfAuthPubKey = (cert, key)} + newtype EncTransmission = EncTransmission ByteString deriving (Show) @@ -792,6 +822,8 @@ data CommandTag (p :: Party) where PFWD_ :: CommandTag ProxiedClient RFWD_ :: CommandTag Sender NSUB_ :: CommandTag Notifier + NSRV_ :: CommandTag NtfSrvClient + NRDY_ :: CommandTag NtfSrvClient data CmdTag = forall p. PartyI p => CT (SParty p) (CommandTag p) @@ -848,6 +880,8 @@ instance PartyI p => Encoding (CommandTag p) where PFWD_ -> "PFWD" RFWD_ -> "RFWD" NSUB_ -> "NSUB" + NSRV_ -> "NSRV" + NRDY_ -> "NRDY" smpP = messageTagP instance ProtocolMsgTag CmdTag where @@ -874,6 +908,8 @@ instance ProtocolMsgTag CmdTag where "PFWD" -> Just $ CT SProxiedClient PFWD_ "RFWD" -> Just $ CT SSender RFWD_ "NSUB" -> Just $ CT SNotifier NSUB_ + "NSRV" -> Just $ CT SNtfSrvClient NSRV_ + "NRDY" -> Just $ CT SNtfSrvClient NRDY_ _ -> Nothing instance Encoding CmdTag where @@ -1514,6 +1550,8 @@ instance PartyI p => ProtocolEncoding SMPVersion ErrorType (Command p) where SEND flags msg -> e (SEND_, ' ', flags, ' ', Tail msg) PING -> e PING_ NSUB -> e NSUB_ + NSRV creds -> e (NSRV_, creds) + NRDY -> e NRDY_ LKEY k -> e (LKEY_, ' ', k) LGET -> e LGET_ PRXY host auth_ -> e (PRXY_, ' ', host, auth_) @@ -1609,6 +1647,10 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where PFWD_ -> PFWD <$> _smpP <*> smpP <*> (EncTransmission . unTail <$> smpP) PRXY_ -> PRXY <$> _smpP <*> smpP CT SNotifier NSUB_ -> pure $ Cmd SNotifier NSUB + CT SNtfSrvClient tag -> + Cmd SNtfSrvClient <$> case tag of + NSRV_ -> NSRV <$> _smpP + NRDY_ -> pure NRDY fromProtocolError = fromProtocolError @SMPVersion @ErrorType @BrokerMsg {-# INLINE fromProtocolError #-} diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 32534ccf9..3a8f90759 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -1071,7 +1071,9 @@ verifyTransmission ms auth_ tAuth authorized queueId cmd = Cmd SSenderLink (LKEY k) -> verifySecure SSenderLink k Cmd SSenderLink LGET -> verifyQueue (\q -> if isContact (snd q) then VRVerified (Just q) else VRFailed) <$> get SSenderLink -- NSUB will not be accepted without authorization - Cmd SNotifier NSUB -> verifyQueue (\q -> maybe dummyVerify (\n -> Just q `verifiedWith` notifierKey n) (notifier $ snd q)) <$> get SNotifier + Cmd SNotifier NSUB -> verifyQueue (\q -> maybe dummyVerify (Just q `verifiedWith`) (notifierKey =<< notifier (snd q))) <$> get SNotifier + Cmd SNtfSrvClient (NSRV _creds) -> undefined -- TODO [notifications] + Cmd SNtfSrvClient NRDY -> undefined -- TODO [notifications] Cmd SProxiedClient _ -> pure $ VRVerified Nothing where verify = verifyCmdAuthorization auth_ tAuth authorized @@ -1253,6 +1255,9 @@ client LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr LGET -> withQueue $ \q qr -> checkMode QMContact qr $ getQueueLink_ q qr Cmd SNotifier NSUB -> Just <$> subscribeNotifications + Cmd SNtfSrvClient command -> Just <$> case command of + NSRV _creds -> undefined -- TODO [notifications] + NRDY -> undefined -- TODO [notifications] Cmd SRecipient command -> Just <$> case command of NEW nqr@NewQueueReq {auth_} -> @@ -1355,7 +1360,7 @@ client getQueueLink_ q qr = liftIO $ LNK (senderId qr) <$$> getQueueLinkData (queueStore ms) q entId addQueueNotifier_ :: StoreQueue s -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M (Transmission BrokerMsg) - addQueueNotifier_ q notifierKey dhKey = time "NKEY" $ do + addQueueNotifier_ q nKey dhKey = time "NKEY" $ do (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random let rcvNtfDhSecret = C.dh' dhKey privDhKey (corrId,entId,) <$> addNotifierRetry 3 rcvPublicDhKey rcvNtfDhSecret @@ -1364,7 +1369,8 @@ client addNotifierRetry 0 _ _ = pure $ ERR INTERNAL addNotifierRetry n rcvPublicDhKey rcvNtfDhSecret = do notifierId <- randomId =<< asks (queueIdBytes . config) - let ntfCreds = NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} + -- TODO [notifications] receive server in NEW command (and in NKEY) + let ntfCreds = NtfCreds {notifierId, notifierKey = Just nKey, ntfServerHost = Nothing, rcvNtfDhSecret} liftIO (addQueueNotifier (queueStore ms) q ntfCreds) >>= \case Left DUPLICATE_ -> addNotifierRetry (n - 1) rcvPublicDhKey rcvNtfDhSecret Left e -> pure $ ERR e diff --git a/src/Simplex/Messaging/Server/MsgStore/Journal.hs b/src/Simplex/Messaging/Server/MsgStore/Journal.hs index d28300a75..d44c3d7dd 100644 --- a/src/Simplex/Messaging/Server/MsgStore/Journal.hs +++ b/src/Simplex/Messaging/Server/MsgStore/Journal.hs @@ -422,16 +422,18 @@ instance MsgStoreClass (JournalMsgStore s) where q <- mkQueue ms False rId qr withSharedWaitLock rId queueLocks sharedLock $ run $ tryStore' "deleteExpiredMsgs" rId $ getLoadedQueue q >>= unStoreIO . expireQueueMsgs ms now old -#endif where - old = now - ttl - veryOld = now - 2 * ttl - 86400 - run :: ExceptT ErrorType IO MessageStats -> IO MessageStats - run = fmap (fromRight newMessageStats) . runExceptT -- Use cached queue if available. -- Also see the comment in loadQueue in PostgresQueueStore getLoadedQueue :: JournalQueue s -> IO (JournalQueue s) getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms) +#else + where +#endif + old = now - ttl + veryOld = now - 2 * ttl - 86400 + run :: ExceptT ErrorType IO MessageStats -> IO MessageStats + run = fmap (fromRight newMessageStats) . runExceptT logQueueStates :: JournalMsgStore s -> IO () logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index e90359d1d..2151e83fe 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -10,13 +10,17 @@ module Simplex.Messaging.Server.QueueStore where -import Control.Applicative ((<|>)) +import Control.Applicative (optional, (<|>)) +import qualified Data.Attoparsec.ByteString.Char8 as A +import Data.ByteString.Char8 (ByteString) +import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) import Data.Time.Clock.System (SystemTime (..), getSystemTime) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol +import Simplex.Messaging.Transport.Client (TransportHost) #if defined(dbServerPostgres) import Data.Text.Encoding (decodeLatin1, encodeUtf8) import Database.PostgreSQL.Simple.FromField (FromField (..)) @@ -39,17 +43,32 @@ data QueueRec = QueueRec deriving (Show) data NtfCreds = NtfCreds - { notifierId :: !NotifierId, - notifierKey :: !NtfPublicAuthKey, - rcvNtfDhSecret :: !RcvNtfDhSecret + { notifierId :: NotifierId, + -- `notifierKey` and `ntfServerHost` are mutually exclusive (and one of them is required), + -- but for some period of time from switching to `ntfServerHost` + -- we will continue storing `notifierKey` to allow ntf/smp server downgrades. + -- we could use `These NtfPublicAuthKey TransportHost` type here (https://hackage.haskell.org/package/these-1.2.1/docs/Data-These.html) + notifierKey :: Maybe NtfPublicAuthKey, + ntfServerHost :: Maybe TransportHost, + rcvNtfDhSecret :: RcvNtfDhSecret } deriving (Show) instance StrEncoding NtfCreds where - strEncode NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} = strEncode (notifierId, notifierKey, rcvNtfDhSecret) + strEncode NtfCreds {notifierId = nId, notifierKey = nKey, ntfServerHost = nsrv, rcvNtfDhSecret} = + strEncode nId <> opt " nkey=" nKey <> opt " nsrv=" nsrv <> " ndhs=" <> strEncode rcvNtfDhSecret + where + opt :: StrEncoding a => ByteString -> Maybe a -> ByteString + opt param = maybe B.empty ((param <>) . strEncode) strP = do - (notifierId, notifierKey, rcvNtfDhSecret) <- strP - pure NtfCreds {notifierId, notifierKey, rcvNtfDhSecret} + notifierId <- strP + (notifierKey, ntfServerHost, rcvNtfDhSecret) <- newP <|> legacyP + pure NtfCreds {notifierId, notifierKey, ntfServerHost, rcvNtfDhSecret} + where + newP = (,,) <$> optional (" nkey=" *> strP) <*> optional (" nsrv=" *> strP) <*> (" ndhs=" *> strP) + legacyP = do + (nKey, rcvNtfDhSecret) <- A.space *> strP + pure (Just nKey, Nothing, rcvNtfDhSecret) data ServerEntityStatus = EntityActive diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs index f500a3f42..88b0df5cc 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres.hs @@ -438,11 +438,14 @@ queueRecQueryWithData = FROM msg_queues |] -type QueueRecRow = (RecipientId, NonEmpty RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, Maybe QueueMode, Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret, ServerEntityStatus, Maybe RoundedSystemTime, Maybe LinkId) +type QueueRecRow = (RecipientId, NonEmpty RcvPublicAuthKey, RcvDhSecret, SenderId, Maybe SndPublicAuthKey, Maybe QueueMode) :. NotifierRow :. (ServerEntityStatus, Maybe RoundedSystemTime, Maybe LinkId) + +-- TODO [notifications] add ntf server +type NotifierRow = (Maybe NotifierId, Maybe NtfPublicAuthKey, Maybe RcvNtfDhSecret) queueRecToRow :: (RecipientId, QueueRec) -> QueueRecRow :. (Maybe EncDataBytes, Maybe EncDataBytes) queueRecToRow (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier = n, status, updatedAt}) = - (rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId <$> n, notifierKey <$> n, rcvNtfDhSecret <$> n, status, updatedAt, linkId_) + ((rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode) :. (notifierId <$> n, notifierKey =<< n, rcvNtfDhSecret <$> n) :. (status, updatedAt, linkId_)) :. (fst <$> queueData_, snd <$> queueData_) where (linkId_, queueData_) = queueDataColumns queueData @@ -460,7 +463,8 @@ queueRecToText (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, nullable senderKey, nullable queueMode, nullable (notifierId <$> n), - nullable (notifierKey <$> n), + nullable (notifierKey =<< n), + -- TODO [notifications] add ntf server ID nullable (rcvNtfDhSecret <$> n), BB.char7 '"' <> renderField (toField status) <> BB.char7 '"', nullable updatedAt, @@ -485,18 +489,24 @@ queueDataColumns = \case Nothing -> (Nothing, Nothing) rowToQueueRec :: QueueRecRow -> (RecipientId, QueueRec) -rowToQueueRec (rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId_, notifierKey_, rcvNtfDhSecret_, status, updatedAt, linkId_) = - let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_ +rowToQueueRec ((rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode) :. notifierRow :. (status, updatedAt, linkId_)) = + let notifier = toNotifier notifierRow queueData = (,(EncDataBytes "", EncDataBytes "")) <$> linkId_ in (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier, status, updatedAt}) rowToQueueRecWithData :: QueueRecRow :. (Maybe EncDataBytes, Maybe EncDataBytes) -> (RecipientId, QueueRec) -rowToQueueRecWithData ((rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, notifierId_, notifierKey_, rcvNtfDhSecret_, status, updatedAt, linkId_) :. (immutableData_, userData_)) = - let notifier = NtfCreds <$> notifierId_ <*> notifierKey_ <*> rcvNtfDhSecret_ +rowToQueueRecWithData (((rId, recipientKeys, rcvDhSecret, senderId, senderKey, queueMode) :. notifierRow :. (status, updatedAt, linkId_)) :. (immutableData_, userData_)) = + let notifier = toNotifier notifierRow encData = fromMaybe (EncDataBytes "") queueData = (,(encData immutableData_, encData userData_)) <$> linkId_ in (rId, QueueRec {recipientKeys, rcvDhSecret, senderId, senderKey, queueMode, queueData, notifier, status, updatedAt}) +-- TODO [notifications] add ntf server +toNotifier :: NotifierRow -> Maybe NtfCreds +toNotifier (notifierId_, notifierKey, rcvNtfDhSecret_) = case (notifierId_, rcvNtfDhSecret_) of + (Just notifierId, Just rcvNtfDhSecret) -> Just NtfCreds {notifierId, notifierKey, ntfServerHost = Nothing, rcvNtfDhSecret} + _ -> Nothing + setStatusDB :: StoreQueueClass q => String -> PostgresQueueStore q -> q -> ServerEntityStatus -> ExceptT ErrorType IO () -> IO (Either ErrorType ()) setStatusDB op st sq status writeLog = withQueueRec sq op $ \q -> do diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs index b1c5501f6..80ca769e9 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/Migrations.hs @@ -13,7 +13,8 @@ serverSchemaMigrations :: [(String, Text, Maybe Text)] serverSchemaMigrations = [ ("20250207_initial", m20250207_initial, Nothing), ("20250319_updated_index", m20250319_updated_index, Just down_m20250319_updated_index), - ("20250320_short_links", m20250320_short_links, Just down_m20250320_short_links) + ("20250320_short_links", m20250320_short_links, Just down_m20250320_short_links), + ("20250415_ntf_servers", m20250415_ntf_servers, Just down_m20250415_ntf_servers) ] -- | The list of migrations in ascending order by date @@ -119,3 +120,35 @@ UPDATE msg_queues SET recipient_keys = substring(recipient_keys from 3); ALTER TABLE msg_queues RENAME COLUMN recipient_keys TO recipient_key; |] + +m20250415_ntf_servers :: Text +m20250415_ntf_servers = + T.pack + [r| +CREATE TABLE ntf_servers( + ntf_server_host BYTEA NOT NULL, + additional_hosts BYTEA, + port TEXT NOT NULL, + key_hash BYTEA NOT NULL, + cert_chain BYTEA NOT NULL, + signed_auth_key BYTEA NOT NULL, + auth_key BYTEA NOT NULL, + PRIMARY KEY (ntf_server_host) +); + +ALTER TABLE msg_queues ADD COLUMN ntf_server_host BYTEA REFERENCES ntf_servers(ntf_server_host); + +CREATE INDEX idx_msg_queues_ntf_server_host ON msg_queues(ntf_server_host); + |] + + +down_m20250415_ntf_servers :: Text +down_m20250415_ntf_servers = + T.pack + [r| +DROP INDEX idx_msg_queues_ntf_server_host; + +ALTER TABLE msg_queues DROP COLUMN ntf_server_host; + +DROP TABLE ntf_servers; + |] \ No newline at end of file diff --git a/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql b/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql index 2910b6959..038b81ee1 100644 --- a/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql +++ b/src/Simplex/Messaging/Server/QueueStore/Postgres/server_schema.sql @@ -41,7 +41,20 @@ CREATE TABLE smp_server.msg_queues ( queue_mode text, link_id bytea, fixed_data bytea, - user_data bytea + user_data bytea, + ntf_server_host bytea +); + + + +CREATE TABLE smp_server.ntf_servers ( + ntf_server_host bytea NOT NULL, + additional_hosts bytea, + port text NOT NULL, + key_hash bytea NOT NULL, + cert_chain bytea NOT NULL, + signed_auth_key bytea NOT NULL, + auth_key bytea NOT NULL ); @@ -56,6 +69,11 @@ ALTER TABLE ONLY smp_server.msg_queues +ALTER TABLE ONLY smp_server.ntf_servers + ADD CONSTRAINT ntf_servers_pkey PRIMARY KEY (ntf_server_host); + + + CREATE UNIQUE INDEX idx_msg_queues_link_id ON smp_server.msg_queues USING btree (link_id); @@ -64,6 +82,10 @@ CREATE UNIQUE INDEX idx_msg_queues_notifier_id ON smp_server.msg_queues USING bt +CREATE INDEX idx_msg_queues_ntf_server_host ON smp_server.msg_queues USING btree (ntf_server_host); + + + CREATE UNIQUE INDEX idx_msg_queues_sender_id ON smp_server.msg_queues USING btree (sender_id); @@ -72,3 +94,8 @@ CREATE INDEX idx_msg_queues_updated_at ON smp_server.msg_queues USING btree (del +ALTER TABLE ONLY smp_server.msg_queues + ADD CONSTRAINT msg_queues_ntf_server_host_fkey FOREIGN KEY (ntf_server_host) REFERENCES smp_server.ntf_servers(ntf_server_host); + + + diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 61fa3af45..37ce4b757 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -37,6 +37,7 @@ import Simplex.Messaging.Server.QueueStore.Types import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Transport.Client (TransportHost) import Simplex.Messaging.Util (anyM, ifM, ($>>), ($>>=), (<$$)) import System.IO import UnliftIO.STM @@ -46,6 +47,7 @@ data STMQueueStore q = STMQueueStore senders :: TMap SenderId RecipientId, notifiers :: TMap NotifierId RecipientId, links :: TMap LinkId RecipientId, + ntfServers :: TMap TransportHost NtfServerCreds, storeLog :: TVar (Maybe (StoreLog 'WriteMode)) } @@ -61,8 +63,9 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where senders <- TM.emptyIO notifiers <- TM.emptyIO links <- TM.emptyIO + ntfServers <- TM.emptyIO storeLog <- newTVarIO Nothing - pure STMQueueStore {queues, senders, notifiers, links, storeLog} + pure STMQueueStore {queues, senders, notifiers, links, ntfServers, storeLog} closeQueueStore :: STMQueueStore q -> IO () closeQueueStore STMQueueStore {queues, senders, notifiers, storeLog} = do diff --git a/src/Simplex/RemoteControl/Client.hs b/src/Simplex/RemoteControl/Client.hs index acb86602c..0c85c2b78 100644 --- a/src/Simplex/RemoteControl/Client.hs +++ b/src/Simplex/RemoteControl/Client.hs @@ -190,7 +190,7 @@ connectRCHost drg pairing@RCHostPairing {caKey, caCert, idPrivKey, knownHost} ct } pure $ signInvitation (snd sessKeys) idPrivKey inv -genTLSCredentials :: TVar ChaChaDRG -> C.APrivateSignKey -> C.SignedCertificate -> IO TLS.Credential +genTLSCredentials :: TVar ChaChaDRG -> C.APrivateSignKey -> X509.SignedCertificate -> IO TLS.Credential genTLSCredentials drg caKey caCert = do let caCreds = (C.signatureKeyPair caKey, caCert) leaf <- genCredentials drg (Just caCreds) (0, 24 * 999999) "localhost" -- session-signing cert diff --git a/src/Simplex/RemoteControl/Types.hs b/src/Simplex/RemoteControl/Types.hs index bc191824a..abf660f2f 100644 --- a/src/Simplex/RemoteControl/Types.hs +++ b/src/Simplex/RemoteControl/Types.hs @@ -18,6 +18,7 @@ import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import Data.Word (Word16) +import qualified Data.X509 as X import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.SNTRUP761.Bindings import Simplex.Messaging.Encoding @@ -140,7 +141,7 @@ $(JQ.deriveJSON defaultJSON {J.nullaryToObject = True} ''RCCtrlHello) -- | Long-term part of controller (desktop) connection to host (mobile) data RCHostPairing = RCHostPairing { caKey :: C.APrivateSignKey, - caCert :: C.SignedCertificate, + caCert :: X.SignedCertificate, idPrivKey :: C.PrivateKeyEd25519, knownHost :: Maybe KnownHostPairing } @@ -159,7 +160,7 @@ data RCCtrlAddress = RCCtrlAddress -- | Long-term part of host (mobile) connection to controller (desktop) data RCCtrlPairing = RCCtrlPairing { caKey :: C.APrivateSignKey, - caCert :: C.SignedCertificate, + caCert :: X.SignedCertificate, ctrlFingerprint :: C.KeyHash, -- long-term identity of connected remote controller idPubKey :: C.PublicKeyEd25519, dhPrivKey :: C.PrivateKeyX25519, diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 0b261672a..0035073dd 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -41,7 +41,7 @@ agentTests ps = do #endif describe "Functional API" $ functionalAPITests ps describe "Chosen servers" serverChoiceTests - describe "Notification tests" $ notificationTests ps + fdescribe "Notification tests" $ notificationTests ps #if !defined(dbPostgres) describe "SQLite store" storeTests #endif diff --git a/tests/CoreTests/StoreLogTests.hs b/tests/CoreTests/StoreLogTests.hs index 9556c6788..f920af55f 100644 --- a/tests/CoreTests/StoreLogTests.hs +++ b/tests/CoreTests/StoreLogTests.hs @@ -34,12 +34,13 @@ testPublicAuthKey = C.APublicAuthKey C.SEd25519 (C.publicKey "MC4CAQAwBQYDK2VwBC testNtfCreds :: TVar ChaChaDRG -> IO NtfCreds testNtfCreds g = do - (notifierKey, _) <- atomically $ C.generateAuthKeyPair C.SX25519 g + (nKey, _) <- atomically $ C.generateAuthKeyPair C.SX25519 g (k, pk) <- atomically $ C.generateKeyPair @'C.X25519 g pure NtfCreds { notifierId = EntityId "ijkl", - notifierKey, + notifierKey = Just nKey, + ntfServerHost = Nothing, rcvNtfDhSecret = C.dh' k pk }