Skip to content

ntf server: subscription commands, optimize ntf server memory usage #1515

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rfcs/2025-03-30-ios-notifications-3.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ NSUB :: Maybe NtfServerId -> Command Notifier
-- subscribe to notificaions from all queues associated with the server
-- should be signed with server key
-- entity ID - NtfServerId
NSSUB :: Command NtfServer
NRDY :: Command NtfServer

data NtfServerCreds = NtfServerCreds
{ server :: NtfServer,
Expand Down
16 changes: 12 additions & 4 deletions src/Simplex/Messaging/Client/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ smpSubscribeQueues party ca smp srv subs = do
pending <- maybe (pure M.empty) readTVar =<< TM.lookup srv (pendingSrvSubs ca)
let acc@(_, _, oks, notPending) = foldr (groupSub pending) (False, [], [], []) (L.zip subs rs)
unless (null oks) $ addSubscriptions ca srv party oks
unless (null notPending) $ removePendingSubs ca srv party notPending
unless (null notPending) $ removePendingSubs ca srv party $ S.fromList notPending
pure acc
sessId = sessionId $ thParams smp
groupSub :: Map SMPSub C.APrivateAuthKey -> ((QueueId, C.APrivateAuthKey), Either SMPClientError ()) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId]) -> (Bool, [(QueueId, SMPClientError)], [(QueueId, (SessionId, C.APrivateAuthKey))], [QueueId])
Expand Down Expand Up @@ -412,14 +412,22 @@ removeSubscription :: SMPClientAgent -> SMPServer -> SMPSub -> STM ()
removeSubscription = removeSub_ . srvSubs
{-# INLINE removeSubscription #-}

removePendingSub :: SMPClientAgent -> SMPServer -> SMPSub -> STM ()
removePendingSub = removeSub_ . pendingSrvSubs
{-# INLINE removePendingSub #-}

removeSub_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSub -> STM ()
removeSub_ subs srv s = TM.lookup srv subs >>= mapM_ (TM.delete s)

removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> [QueueId] -> STM ()
removeSubscriptions :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
removeSubscriptions = removeSubs_ . srvSubs
{-# INLINE removeSubscriptions #-}

removePendingSubs :: SMPClientAgent -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
removePendingSubs = removeSubs_ . pendingSrvSubs
{-# INLINE removePendingSubs #-}

removeSubs_ :: TMap SMPServer (TMap SMPSub C.APrivateAuthKey) -> SMPServer -> SMPSubParty -> [QueueId] -> STM ()
removeSubs_ :: TMap SMPServer (TMap SMPSub s) -> SMPServer -> SMPSubParty -> Set QueueId -> STM ()
removeSubs_ subs srv party qs = TM.lookup srv subs >>= mapM_ (`modifyTVar'` (`M.withoutKeys` ss))
where
ss = S.fromList $ map (party,) qs
ss = S.map (party,) qs
2 changes: 0 additions & 2 deletions src/Simplex/Messaging/Crypto.hs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ module Simplex.Messaging.Crypto
unPad,

-- * X509 Certificates
SignedCertificate,
Certificate,
signCertificate,
signX509,
verifyX509,
Expand Down
8 changes: 5 additions & 3 deletions src/Simplex/Messaging/Notifications/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,10 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
TDEL -> do
logDebug "TDEL"
st <- asks store
qs <- atomically $ deleteNtfToken st tknId
forM_ qs $ \SMPQueueNtf {smpServer, notifierId} ->
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
ss <- atomically $ deleteNtfToken st tknId
forM_ (M.assocs ss) $ \(smpServer, nIds) -> do
atomically $ removeSubscriptions ca smpServer SPNotifier nIds
atomically $ removePendingSubs ca smpServer SPNotifier nIds
cancelInvervalNotifications tknId
withNtfLog (`logDeleteToken` tknId)
incNtfStatT token tknDeleted
Expand Down Expand Up @@ -756,6 +757,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
st <- asks store
atomically $ deleteNtfSubscription st subId
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
atomically $ removePendingSub ca smpServer (SPNotifier, notifierId)
withNtfLog (`logDeleteSubscription` subId)
incNtfStat subDeleted
pure NROk
Expand Down
88 changes: 53 additions & 35 deletions src/Simplex/Messaging/Notifications/Server/Store.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Simplex.Messaging.Notifications.Server.Store where
Expand All @@ -16,15 +17,15 @@ import Control.Monad
import Data.ByteString.Char8 (ByteString)
import Data.Functor (($>))
import Data.List.NonEmpty (NonEmpty (..), (<|))
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Word (Word16)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
import Simplex.Messaging.Protocol (NotifierId, NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
Expand All @@ -35,8 +36,11 @@ data NtfStore = NtfStore
-- multiple registrations exist to protect from malicious registrations if token is compromised
tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId),
subscriptions :: TMap NtfSubscriptionId NtfSubData,
tokenSubscriptions :: TMap NtfTokenId (TVar (Set NtfSubscriptionId)),
subscriptionLookup :: TMap SMPQueueNtf NtfSubscriptionId,
-- the first set is used to delete from `subscriptions` when token is deleted, the second - to cancel SMP subsriptions.
-- TODO [notifications] it can be simplified once NtfSubData is fully removed.
tokenSubscriptions :: TMap NtfTokenId (TMap SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId))),
-- TODO [notifications] for subscriptions that "migrated" to server subscription, we may replace NtfSubData with NtfTokenId here (Either NtfSubData NtfTokenId).
subscriptionLookup :: TMap SMPServer (TMap NotifierId NtfSubData),
tokenLastNtfs :: TMap NtfTokenId (TVar (NonEmpty PNMessageData))
}

Expand Down Expand Up @@ -134,7 +138,7 @@ removeTokenRegistration st NtfTknData {ntfTknId = tId, token, tknVerifyKey} =
>>= mapM_ (\tId' -> when (tId == tId') $ TM.delete k regs)
k = C.toPubKey C.pubKeyBytes tknVerifyKey

deleteNtfToken :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
deleteNtfToken :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId))
deleteNtfToken st tknId = do
void $
TM.lookupDelete tknId (tokens st) $>>= \NtfTknData {token, tknVerifyKey} ->
Expand All @@ -147,25 +151,25 @@ deleteNtfToken st tknId = do
regs = tokenRegistrations st
regKey = C.toPubKey C.pubKeyBytes

deleteTokenSubs :: NtfStore -> NtfTokenId -> STM [SMPQueueNtf]
deleteTokenSubs :: NtfStore -> NtfTokenId -> STM (Map SMPServer (Set NotifierId))
deleteTokenSubs st tknId = do
qs <-
TM.lookupDelete tknId (tokenSubscriptions st)
>>= mapM (readTVar >=> mapM deleteSub . S.toList)
pure $ maybe [] catMaybes qs
TM.lookupDelete tknId (tokenSubscriptions st)
>>= maybe (pure M.empty) (readTVar >=> deleteSrvSubs)
where
deleteSub subId = do
TM.lookupDelete subId (subscriptions st)
$>>= \NtfSubData {smpQueue} ->
TM.delete smpQueue (subscriptionLookup st) $> Just smpQueue
deleteSrvSubs :: Map SMPServer (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM (Map SMPServer (Set NotifierId))
deleteSrvSubs = M.traverseWithKey $ \smpServer (sVar, nVar) -> do
sIds <- readTVar sVar
modifyTVar' (subscriptions st) (`M.withoutKeys` sIds)
nIds <- readTVar nVar
TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (`modifyTVar'` (`M.withoutKeys` nIds))
pure nIds

getNtfSubscriptionIO :: NtfStore -> NtfSubscriptionId -> IO (Maybe NtfSubData)
getNtfSubscriptionIO st subId = TM.lookupIO subId (subscriptions st)

findNtfSubscription :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfSubData)
findNtfSubscription st smpQueue = do
TM.lookup smpQueue (subscriptionLookup st)
$>>= \subId -> TM.lookup subId (subscriptions st)
findNtfSubscription st SMPQueueNtf {smpServer, notifierId} =
TM.lookup smpServer (subscriptionLookup st) $>>= TM.lookup notifierId

findNtfSubscriptionToken :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfTknData)
findNtfSubscriptionToken st smpQueue = do
Expand All @@ -183,30 +187,44 @@ mkNtfSubData ntfSubId (NewNtfSub tokenId smpQueue notifierKey) = do
subStatus <- newTVar NSNew
pure NtfSubData {ntfSubId, smpQueue, tokenId, subStatus, notifierKey}

addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe ())
addNtfSubscription st subId sub@NtfSubData {smpQueue, tokenId} =
TM.lookup tokenId (tokenSubscriptions st) >>= maybe newTokenSub pure >>= insertSub
addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe NtfSubData)
addNtfSubscription st subId sub@NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} =
TM.lookup tokenId (tokenSubscriptions st)
>>= maybe newTokenSubs pure
>>= \ts -> TM.lookup smpServer ts
>>= maybe (newTokenSrvSubs ts) pure
>>= insertSub
where
newTokenSub = do
ts <- newTVar S.empty
newTokenSubs = do
ts <- newTVar M.empty
TM.insert tokenId ts $ tokenSubscriptions st
pure ts
insertSub ts = do
modifyTVar' ts $ S.insert subId
newTokenSrvSubs ts = do
tss <- (,) <$> newTVar S.empty <*> newTVar S.empty
TM.insert smpServer tss ts
pure tss
insertSub :: (TVar (Set NtfSubscriptionId), TVar (Set NotifierId)) -> STM (Maybe NtfSubData)
insertSub (sIds, nIds) = do
modifyTVar' sIds $ S.insert subId
modifyTVar' nIds $ S.insert notifierId
TM.insert subId sub $ subscriptions st
TM.insert smpQueue subId (subscriptionLookup st)
-- return Nothing if subscription existed before
pure $ Just ()
TM.lookup smpServer (subscriptionLookup st)
>>= maybe newSubs pure
>>= TM.lookupInsert notifierId sub
newSubs = do
ss <- newTVar M.empty
TM.insert smpServer ss $ subscriptionLookup st
pure ss

deleteNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM ()
deleteNtfSubscription st subId = do
TM.lookupDelete subId (subscriptions st)
>>= mapM_
( \NtfSubData {smpQueue, tokenId} -> do
TM.delete smpQueue $ subscriptionLookup st
ts_ <- TM.lookup tokenId (tokenSubscriptions st)
forM_ ts_ $ \ts -> modifyTVar' ts $ S.delete subId
)
deleteNtfSubscription st subId = TM.lookupDelete subId (subscriptions st) >>= mapM_ deleteSubIndices
where
deleteSubIndices NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = do
TM.lookup smpServer (subscriptionLookup st) >>= mapM_ (TM.delete notifierId)
tss_ <- (TM.lookup tokenId (tokenSubscriptions st) $>>= TM.lookup smpServer)
forM_ tss_ $ \(sIds, nIds) -> do
modifyTVar' sIds $ S.delete subId
modifyTVar' nIds $ S.delete notifierId

addTokenLastNtf :: NtfStore -> NtfTokenId -> PNMessageData -> IO (NonEmpty PNMessageData)
addTokenLastNtf st tknId newNtf =
Expand Down
8 changes: 5 additions & 3 deletions src/Simplex/Messaging/Notifications/Server/StoreLog.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import Control.Concurrent.STM
import Control.Logger.Simple
import Control.Monad
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Base64 as B64
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB
import qualified Data.Text as T
Expand All @@ -37,10 +38,10 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Server.Store
import Simplex.Messaging.Protocol (NtfPrivateAuthKey)
import Simplex.Messaging.Protocol (EntityId (..), NtfPrivateAuthKey)
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Util (safeDecodeUtf8)
import Simplex.Messaging.Util (safeDecodeUtf8, tshow)
import System.IO

data NtfStoreLogRecord
Expand Down Expand Up @@ -235,7 +236,8 @@ readNtfStore f st = mapM_ (addNtfLogRecord . LB.toStrict) . LB.lines =<< LB.read
>>= mapM_ (\NtfTknData {tknUpdatedAt} -> atomically $ writeTVar tknUpdatedAt $ Just t)
CreateSubscription r@NtfSubRec {ntfSubId} -> do
sub <- mkSubData r
void $ atomically $ addNtfSubscription st ntfSubId sub
atomically (addNtfSubscription st ntfSubId sub)
>>= mapM_ (\_ -> logWarn $ "subscription " <> tshow (B64.encode $ unEntityId ntfSubId) <> " already exists")
SubscriptionStatus subId status -> do
getNtfSubscriptionIO st subId
>>= mapM_ (\NtfSubData {subStatus} -> atomically $ writeTVar subStatus status)
Expand Down
44 changes: 43 additions & 1 deletion src/Simplex/Messaging/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ module Simplex.Messaging.Protocol
SenderId,
LinkId,
NotifierId,
NtfServerHost,
NtfServerCreds (..),
RcvPrivateAuthKey,
RcvPublicAuthKey,
RcvPublicDhKey,
Expand Down Expand Up @@ -297,7 +299,7 @@ e2eEncMessageLength :: Int
e2eEncMessageLength = 16000 -- 15988 .. 16005

-- | SMP protocol clients
data Party = Recipient | Sender | Notifier | LinkClient | ProxiedClient
data Party = Recipient | Sender | Notifier | LinkClient | ProxiedClient | NtfSrvClient
deriving (Show)

-- | Singleton types for SMP protocol clients
Expand All @@ -307,13 +309,15 @@ data SParty :: Party -> Type where
SNotifier :: SParty Notifier
SSenderLink :: SParty LinkClient
SProxiedClient :: SParty ProxiedClient
SNtfSrvClient :: SParty NtfSrvClient

instance TestEquality SParty where
testEquality SRecipient SRecipient = Just Refl
testEquality SSender SSender = Just Refl
testEquality SNotifier SNotifier = Just Refl
testEquality SSenderLink SSenderLink = Just Refl
testEquality SProxiedClient SProxiedClient = Just Refl
testEquality SNtfSrvClient SNtfSrvClient = Just Refl
testEquality _ _ = Nothing

deriving instance Show (SParty p)
Expand All @@ -330,6 +334,8 @@ instance PartyI LinkClient where sParty = SSenderLink

instance PartyI ProxiedClient where sParty = SProxiedClient

instance PartyI NtfSrvClient where sParty = SNtfSrvClient

type family DirectParty (p :: Party) :: Constraint where
DirectParty Recipient = ()
DirectParty Sender = ()
Expand Down Expand Up @@ -400,6 +406,11 @@ type NotifierId = QueueId

type LinkId = QueueId

-- A server transport host will be used as entity ID in transmissions.
-- While we could assign some IDs, that requires some unnecessary tracking of these IDs,
-- both in SMP and in Ntf servers, so it's simpler to just use transport host.
type NtfServerHost = EntityId

-- | SMP queue ID on the server.
type QueueId = EntityId

Expand Down Expand Up @@ -442,6 +453,8 @@ data Command (p :: Party) where
LGET :: Command LinkClient
-- SMP notification subscriber commands
NSUB :: Command Notifier
NSRV :: NtfServerCreds -> Command NtfSrvClient
NRDY :: Command NtfSrvClient -- NtfServerHost is used as transmission entity ID
PRXY :: SMPServer -> Maybe BasicAuth -> Command ProxiedClient -- request a relay server connection by URI
-- Transmission to proxy:
-- - entity ID: ID of the session with relay returned in PKEY (response to PRXY)
Expand Down Expand Up @@ -544,6 +557,23 @@ instance Encoding QueueReqData where
-- smpEncode (NewNtfCreds authKey dhKey) = smpEncode (authKey, dhKey)
-- smpP = NewNtfCreds <$> smpP <*> smpP

data NtfServerCreds = NtfServerCreds
{ ntfServer :: NtfServer,
-- ntf server certificate chain that should match fingerprint in address
-- and Ed25519 key to verify server command NRDY, signed by key from certificate.
ntfAuthPubKey :: (X.CertificateChain, X.SignedExact X.PubKey)
}
deriving (Show)

instance Encoding NtfServerCreds where
smpEncode NtfServerCreds {ntfServer, ntfAuthPubKey = (cert, key)} =
smpEncode (ntfServer, C.encodeCertChain cert, C.SignedObject key)
smpP = do
ntfServer <- smpP
cert <- C.certChainP
C.SignedObject key <- smpP
pure NtfServerCreds {ntfServer, ntfAuthPubKey = (cert, key)}

newtype EncTransmission = EncTransmission ByteString
deriving (Show)

Expand Down Expand Up @@ -792,6 +822,8 @@ data CommandTag (p :: Party) where
PFWD_ :: CommandTag ProxiedClient
RFWD_ :: CommandTag Sender
NSUB_ :: CommandTag Notifier
NSRV_ :: CommandTag NtfSrvClient
NRDY_ :: CommandTag NtfSrvClient

data CmdTag = forall p. PartyI p => CT (SParty p) (CommandTag p)

Expand Down Expand Up @@ -848,6 +880,8 @@ instance PartyI p => Encoding (CommandTag p) where
PFWD_ -> "PFWD"
RFWD_ -> "RFWD"
NSUB_ -> "NSUB"
NSRV_ -> "NSRV"
NRDY_ -> "NRDY"
smpP = messageTagP

instance ProtocolMsgTag CmdTag where
Expand All @@ -874,6 +908,8 @@ instance ProtocolMsgTag CmdTag where
"PFWD" -> Just $ CT SProxiedClient PFWD_
"RFWD" -> Just $ CT SSender RFWD_
"NSUB" -> Just $ CT SNotifier NSUB_
"NSRV" -> Just $ CT SNtfSrvClient NSRV_
"NRDY" -> Just $ CT SNtfSrvClient NRDY_
_ -> Nothing

instance Encoding CmdTag where
Expand Down Expand Up @@ -1514,6 +1550,8 @@ instance PartyI p => ProtocolEncoding SMPVersion ErrorType (Command p) where
SEND flags msg -> e (SEND_, ' ', flags, ' ', Tail msg)
PING -> e PING_
NSUB -> e NSUB_
NSRV creds -> e (NSRV_, creds)
NRDY -> e NRDY_
LKEY k -> e (LKEY_, ' ', k)
LGET -> e LGET_
PRXY host auth_ -> e (PRXY_, ' ', host, auth_)
Expand Down Expand Up @@ -1609,6 +1647,10 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where
PFWD_ -> PFWD <$> _smpP <*> smpP <*> (EncTransmission . unTail <$> smpP)
PRXY_ -> PRXY <$> _smpP <*> smpP
CT SNotifier NSUB_ -> pure $ Cmd SNotifier NSUB
CT SNtfSrvClient tag ->
Cmd SNtfSrvClient <$> case tag of
NSRV_ -> NSRV <$> _smpP
NRDY_ -> pure NRDY

fromProtocolError = fromProtocolError @SMPVersion @ErrorType @BrokerMsg
{-# INLINE fromProtocolError #-}
Expand Down
Loading
Loading