diff --git a/src/IStorage.h b/src/IStorage.h index ad956beb6..e53078fdb 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -33,10 +33,11 @@ class IStorage virtual bool enumerate(callback fn) const = 0; virtual size_t count() const = 0; - virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) { beginWriteBatch(); for (size_t ielem = 0; ielem < celem; ++ielem) { - insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], false); + bool fOverwrite = (rgfOverwrite != nullptr) ? rgfOverwrite[ielem] : false; + insert(rgkeys[ielem], rgcbkeys[ielem], rgvals[ielem], rgcbvals[ielem], fOverwrite); } endWriteBatch(); } diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 9b89023aa..9c1dab937 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -114,13 +114,14 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr } long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing); -void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) +void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) { std::vector vechashes; if (m_pdict != nullptr) { vechashes.reserve(celem); for (size_t ielem = 0; ielem < celem; ++ielem) { + if (rgfOverwrite != nullptr && rgfOverwrite[ielem]) continue; dictEntry *de = (dictEntry*)zmalloc(sizeof(dictEntry)); de->key = (void*)dictGenHashFunction(rgkeys[ielem], (int)rgcbkeys[ielem]); de->v.u64 = 1; @@ -152,7 +153,7 @@ void StorageCache::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, si } ul.unlock(); - m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, celem); + m_spstorage->bulkInsert(rgkeys, rgcbkeys, rgvals, rgcbvals, rgfOverwrite, celem); bulkInsertsInProgress--; } diff --git a/src/StorageCache.h b/src/StorageCache.h index 3c38450fb..6682e969c 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -41,7 +41,7 @@ class StorageCache void clear(void(callback)(void*)); void clearAsync(); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); - void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem); + void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; bool erase(sds key); void emergencyFreeCache(); diff --git a/src/db.cpp b/src/db.cpp index 21d00ad10..c5c69c24c 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2908,15 +2908,13 @@ void redisDbPersistentData::storeDatabase() dictReleaseIterator(di); } -/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate) +/* static */ sds redisDbPersistentData::serializeChange(redisDbPersistentData *db, const char *key) { auto itr = db->find_cached_threadsafe(key); if (itr == nullptr) - return; + return nullptr; robj *o = itr.val(); - sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); - storage->insert((sds)key, temp, sdslen(temp), fUpdate); - sdsfree(temp); + return serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); } bool redisDbPersistentData::processChanges(bool fSnapshot) @@ -2959,10 +2957,30 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) { dictIterator *di = dictGetIterator(m_dictChanged); dictEntry *de; + std::vector veckeys; + std::vector veccbkeys; + std::vector vecvals; + std::vector veccbvals; + std::vector vecoverwrite; + veckeys.reserve(dictSize(m_dictChanged)); + veccbkeys.reserve(dictSize(m_dictChanged)); + vecvals.reserve(dictSize(m_dictChanged)); + veccbvals.reserve(dictSize(m_dictChanged)); + vecoverwrite.reserve(dictSize(m_dictChanged)); while ((de = dictNext(di)) != nullptr) { - serializeAndStoreChange(m_spstorage.get(), this, (const char*)dictGetKey(de), (bool)dictGetVal(de)); + sds val = serializeChange(this, (const char*)dictGetKey(de)); + if (val != nullptr) { + veckeys.push_back((char*)dictGetKey(de)); + veccbkeys.push_back(sdslen((sds)dictGetKey(de))); + vecvals.push_back(val); + veccbvals.push_back(sdslen(val)); + vecoverwrite.push_back((bool)dictGetVal(de)); + } } + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size()); + for (auto val : vecvals) + sdsfree(val); dictReleaseIterator(di); } } @@ -2996,7 +3014,7 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) vecvals.push_back(temp); veccbvals.push_back(sdslen(temp)); } - m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), veckeys.size()); + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), nullptr, veckeys.size()); for (auto val : vecvals) sdsfree(val); dictReleaseIterator(di); @@ -3015,7 +3033,7 @@ void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbK } aeReleaseLock(); } - m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem); + m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, nullptr, celem); } void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) @@ -3024,10 +3042,30 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** { dictIterator *di = dictGetIterator(m_dictChangedStorageFlush); dictEntry *de; + std::vector veckeys; + std::vector veccbkeys; + std::vector vecvals; + std::vector veccbvals; + std::vector vecoverwrite; + veckeys.reserve(dictSize(m_dictChanged)); + veccbkeys.reserve(dictSize(m_dictChanged)); + vecvals.reserve(dictSize(m_dictChanged)); + veccbvals.reserve(dictSize(m_dictChanged)); + vecoverwrite.resize(dictSize(m_dictChanged)); while ((de = dictNext(di)) != nullptr) { - serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de), (bool)dictGetVal(de)); + sds val = serializeChange((redisDbPersistentData*)m_pdbSnapshotStorageFlush, (const char*)dictGetKey(de)); + if (val != nullptr) { + veckeys.push_back((char*)dictGetKey(de)); + veccbkeys.push_back(sdslen((sds)dictGetKey(de))); + vecvals.push_back(val); + veccbvals.push_back(sdslen(val)); + vecoverwrite.push_back((bool)dictGetVal(de)); + } } + m_spstorage->bulkInsert(veckeys.data(), veccbkeys.data(), vecvals.data(), veccbvals.data(), vecoverwrite.data(), veckeys.size()); + for (auto val : vecvals) + sdsfree(val); dictReleaseIterator(di); dictRelease(m_dictChangedStorageFlush); m_dictChangedStorageFlush = nullptr; diff --git a/src/server.h b/src/server.h index eb54f608c..fb6c0c027 100644 --- a/src/server.h +++ b/src/server.h @@ -1217,7 +1217,7 @@ class redisDbPersistentData uint64_t m_mvccCheckpoint = 0; private: - static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const char *key, bool fUpdate); + static sds serializeChange(redisDbPersistentData *db, const char *key); void ensure(const char *key); void ensure(const char *key, dictEntry **de); diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index e592c74c5..230825eb7 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -43,8 +43,9 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, ++m_count; } -void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) +void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) { + size_t coverwrites = 0; if (celem >= 16384) { rocksdb::Options options = DefaultRocksDBOptions(); rocksdb::SstFileWriter sst_file_writer(rocksdb::EnvOptions(), options, options.comparator); @@ -92,8 +93,15 @@ void RocksDBStorageProvider::bulkInsert(char **rgkeys, size_t *rgcbkeys, char ** m_spdb->Write(WriteOptions(), spbatch.get()); } + if (rgfOverwrite != nullptr) { + for (size_t ielem = 0; ielem < celem; ++ielem) { + if (rgfOverwrite[ielem]) + ++coverwrites; + } + } + std::unique_lock l(m_lock); - m_count += celem; + m_count += celem - coverwrites; } bool RocksDBStorageProvider::erase(const char *key, size_t cchKey) diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 10c54606c..f0c20dee8 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -38,7 +38,7 @@ class RocksDBStorageProvider : public IStorage virtual void beginWriteBatch() override; virtual void endWriteBatch() override; - virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) override; + virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, char *rgfOverwrite, size_t celem) override; virtual void batch_lock() override; virtual void batch_unlock() override;