Skip to content

Commit e4d4b51

Browse files
committed
smp server: fix/test database import (#1521)
1 parent 6b60f8b commit e4d4b51

File tree

4 files changed

+57
-21
lines changed

4 files changed

+57
-21
lines changed

src/Simplex/Messaging/Server/Main.hs

+35-19
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import Data.Char (isAlpha, isAscii, toUpper)
2626
import Data.Either (fromRight)
2727
import Data.Functor (($>))
2828
import Data.Ini (Ini, lookupValue, readIniFile)
29+
import Data.Int (Int64)
2930
import Data.List (find, isPrefixOf)
3031
import qualified Data.List.NonEmpty as L
3132
import Data.Maybe (fromMaybe, isJust, isNothing)
@@ -117,7 +118,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
117118
confirmOrExit
118119
("WARNING: message log file " <> storeMsgsFilePath <> " will be imported to journal directory " <> storeMsgsJournalDir)
119120
"Messages not imported"
120-
ms <- newJournalMsgStore MQStoreCfg
121+
ms <- newJournalMsgStore logPath MQStoreCfg
121122
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
122123
msgStats <- importMessages True ms storeMsgsFilePath Nothing False -- no expiration
123124
putStrLn "Import completed"
@@ -135,7 +136,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
135136
confirmOrExit
136137
("WARNING: journal directory " <> storeMsgsJournalDir <> " will be exported to message log file " <> storeMsgsFilePath)
137138
"Journal not exported"
138-
ms <- newJournalMsgStore MQStoreCfg
139+
ms <- newJournalMsgStore logPath MQStoreCfg
139140
-- TODO [postgres] in case postgres configured, queues must be read from database
140141
readQueueStore True (mkQueue ms False) storeLogFile $ stmQueueStore ms
141142
exportMessages True ms storeMsgsFilePath False
@@ -178,14 +179,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
178179
confirmOrExit
179180
("WARNING: store log file " <> storeLogFile <> " will be compacted and imported to PostrgreSQL database: " <> B.unpack connstr <> ", schema: " <> B.unpack schema)
180181
"Queue records not imported"
181-
ms <- newJournalMsgStore MQStoreCfg
182-
sl <- readWriteQueueStore True (mkQueue ms False) storeLogFile (queueStore ms)
183-
closeStoreLog sl
184-
queues <- readTVarIO $ loadedQueues $ stmQueueStore ms
185-
let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini}
186-
ps <- newJournalMsgStore $ PQStoreCfg storeCfg
187-
qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps
188-
renameFile storeLogFile $ storeLogFile <> ".bak"
182+
qCnt <- importStoreLogToDatabase logPath storeLogFile dbOpts
189183
putStrLn $ "Import completed: " <> show qCnt <> " queues"
190184
putStrLn $ case readStoreType ini of
191185
Right (ASType SQSMemory SMSMemory) -> setToDbStr <> "\nstore_messages set to `memory`, import messages to journal to use PostgreSQL database for queues (`smp-server journal import`)"
@@ -207,10 +201,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
207201
confirmOrExit
208202
("WARNING: PostrgreSQL database schema " <> B.unpack schema <> " (database: " <> B.unpack connstr <> ") will be exported to store log file " <> storeLogFilePath)
209203
"Queue records not exported"
210-
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = iniDeletedTTL ini}
211-
ps <- newJournalMsgStore $ PQStoreCfg storeCfg
212-
sl <- openWriteStoreLog False storeLogFilePath
213-
Sum qCnt <- foldQueueRecs True True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
204+
qCnt <- exportDatabaseToStoreLog logPath dbOpts storeLogFilePath
214205
putStrLn $ "Export completed: " <> show qCnt <> " queues"
215206
putStrLn $ case readStoreType ini of
216207
Right (ASType SQSPostgres SMSJournal) -> "store_queues set to `database`, update it to `memory` in INI file."
@@ -239,16 +230,12 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
239230
(pure storeLogFile)
240231
(putStrLn ("Store log file " <> storeLogFile <> " not found") >> exitFailure)
241232
Nothing -> putStrLn "Store log disabled, see `[STORE_LOG] enable`" >> exitFailure
242-
newJournalMsgStore :: QStoreCfg s -> IO (JournalMsgStore s)
243-
newJournalMsgStore qsCfg =
244-
let cfg = mkJournalStoreConfig qsCfg storeMsgsJournalDir defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration
245-
in newMsgStore cfg
246233
iniFile = combine cfgPath "smp-server.ini"
247234
serverVersion = "SMP server v" <> simplexMQVersion
248235
executableName = "smp-server"
249236
storeLogFilePath = combine logPath "smp-server-store.log"
250237
storeMsgsFilePath = combine logPath "smp-server-messages.log"
251-
storeMsgsJournalDir = combine logPath "messages"
238+
storeMsgsJournalDir = storeMsgsJournalDir' logPath
252239
storeNtfsFilePath = combine logPath "smp-server-ntfs.log"
253240
readStoreType :: Ini -> Either String AStoreType
254241
readStoreType ini = case (iniStoreQueues, iniStoreMessage) of
@@ -567,8 +554,37 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
567554
putStrLn $ "Error: both " <> storeLogFilePath <> " file and " <> B.unpack schema <> " schema are present (database: " <> B.unpack connstr <> ")."
568555
putStrLn "Configure queue storage."
569556
exitFailure
557+
558+
importStoreLogToDatabase :: FilePath -> FilePath -> DBOpts -> IO Int64
559+
importStoreLogToDatabase logPath storeLogFile dbOpts = do
560+
ms <- newJournalMsgStore logPath MQStoreCfg
561+
sl <- readWriteQueueStore True (mkQueue ms False) storeLogFile (queueStore ms)
562+
closeStoreLog sl
563+
queues <- readTVarIO $ loadedQueues $ stmQueueStore ms
564+
let storeCfg = PostgresStoreCfg {dbOpts = dbOpts {createSchema = True}, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
565+
ps <- newJournalMsgStore logPath $ PQStoreCfg storeCfg
566+
qCnt <- batchInsertQueues @(JournalQueue 'QSMemory) True queues $ postgresQueueStore ps
567+
renameFile storeLogFile $ storeLogFile <> ".bak"
568+
pure qCnt
569+
570+
exportDatabaseToStoreLog :: FilePath -> DBOpts -> FilePath -> IO Int
571+
exportDatabaseToStoreLog logPath dbOpts storeLogFilePath = do
572+
let storeCfg = PostgresStoreCfg {dbOpts, dbStoreLogPath = Nothing, confirmMigrations = MCConsole, deletedTTL = 86400 * defaultDeletedTTL}
573+
ps <- newJournalMsgStore logPath $ PQStoreCfg storeCfg
574+
sl <- openWriteStoreLog False storeLogFilePath
575+
Sum qCnt <- foldQueueRecs True True (postgresQueueStore ps) Nothing $ \(rId, qr) -> logCreateQueue sl rId qr $> Sum (1 :: Int)
576+
closeStoreLog sl
577+
pure qCnt
570578
#endif
571579

580+
newJournalMsgStore :: FilePath -> QStoreCfg s -> IO (JournalMsgStore s)
581+
newJournalMsgStore logPath qsCfg =
582+
let cfg = mkJournalStoreConfig qsCfg (storeMsgsJournalDir' logPath) defaultMsgQueueQuota defaultMaxJournalMsgCount defaultMaxJournalStateLines $ checkInterval defaultMessageExpiration
583+
in newMsgStore cfg
584+
585+
storeMsgsJournalDir' :: FilePath -> FilePath
586+
storeMsgsJournalDir' logPath = combine logPath "messages"
587+
572588
data EmbeddedWebParams = EmbeddedWebParams
573589
{ webStaticPath :: FilePath,
574590
webHttpPort :: Maybe Int,

src/Simplex/Messaging/Server/QueueStore/Postgres.hs

+6-1
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,12 @@ batchInsertQueues tty queues toStore = do
374374
let st = dbStore toStore
375375
count <-
376376
withConnection st $ \db -> do
377-
DB.copy_ db "COPY msg_queues (recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, snd_secure, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at) FROM STDIN WITH (FORMAT CSV)"
377+
DB.copy_
378+
db
379+
[sql|
380+
COPY msg_queues (recipient_id, recipient_keys, rcv_dh_secret, sender_id, sender_key, queue_mode, notifier_id, notifier_key, rcv_ntf_dh_secret, status, updated_at, link_id, fixed_data, user_data)
381+
FROM STDIN WITH (FORMAT CSV)
382+
|]
378383
mapM_ (putQueue db) (zip [1..] qs)
379384
DB.putCopyEnd db
380385
Only qCnt : _ <- withConnection st (`DB.query_` "SELECT count(*) FROM msg_queues")

tests/CoreTests/StoreLogTests.hs

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
{-# LANGUAGE CPP #-}
12
{-# LANGUAGE DataKinds #-}
23
{-# LANGUAGE DuplicateRecordFields #-}
34
{-# LANGUAGE NamedFieldPuns #-}
@@ -21,6 +22,7 @@ import qualified Simplex.Messaging.Crypto as C
2122
import Simplex.Messaging.Encoding.String
2223
import Simplex.Messaging.Protocol
2324
import Simplex.Messaging.Server.Env.STM (readWriteQueueStore)
25+
import Simplex.Messaging.Server.Main
2426
import Simplex.Messaging.Server.MsgStore.Journal
2527
import Simplex.Messaging.Server.MsgStore.Types
2628
import Simplex.Messaging.Server.QueueStore
@@ -53,7 +55,6 @@ deriving instance Eq StoreLogRecord
5355

5456
deriving instance Eq NtfCreds
5557

56-
-- TODO [short links] test store log with queue data
5758
storeLogTests :: Spec
5859
storeLogTests =
5960
forM_ [QMMessaging, QMContact] $ \qm -> do
@@ -138,6 +139,15 @@ testSMPStoreLog testSuite tests =
138139
mapM_ (writeStoreLogRecord l) saved
139140
closeStoreLog l
140141
replicateM_ 3 $ testReadWrite t
142+
#if defined(dbServerPostgres)
143+
qCnt <- fromIntegral <$> importStoreLogToDatabase "tests/tmp/" testStoreLogFile testStoreDBOpts
144+
qCnt `shouldBe` length (compacted t)
145+
imported <- B.readFile $ testStoreLogFile <> ".bak"
146+
qCnt' <- exportDatabaseToStoreLog "tests/tmp/" testStoreDBOpts testStoreLogFile
147+
qCnt' `shouldBe` qCnt
148+
exported <- B.readFile testStoreLogFile
149+
imported `shouldBe` exported
150+
#endif
141151
where
142152
testReadWrite SLTC {compacted, state} = do
143153
st <- newMsgStore $ testJournalStoreCfg MQStoreCfg

tests/Test.hs

+5
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,12 @@ main = do
8181
describe "Message store tests" msgStoreTests
8282
describe "Retry interval tests" retryIntervalTests
8383
describe "SOCKS settings tests" socksSettingsTests
84+
#if defined(dbServerPostgres)
85+
around_ (postgressBracket testServerDBConnectInfo) $
86+
describe "Store log tests" storeLogTests
87+
#else
8488
describe "Store log tests" storeLogTests
89+
#endif
8590
describe "TRcvQueues tests" tRcvQueuesTests
8691
describe "Util tests" utilTests
8792
describe "Agent core tests" agentCoreTests

0 commit comments

Comments
 (0)