Skip to content

Binary KVReplayGenerator #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cachelib/cachebench/cache/Cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ class Cache {
// return true if the key was previously detected to be inconsistent. This
// is useful only when consistency checking is enabled by calling
// enableConsistencyCheck()
bool isInvalidKey(const std::string& key) {
return invalidKeys_[key].load(std::memory_order_relaxed);
bool isInvalidKey(const std::string_view key) {
return invalidKeys_[std::string(key)].load(std::memory_order_relaxed);
}

// Get overall stats on the whole cache allocator
Expand Down
43 changes: 21 additions & 22 deletions cachelib/cachebench/runner/AsyncCacheStressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ class AsyncCacheStressor : public Stressor {
ThroughputStats& stats,
const Request* req,
folly::EventBase* evb,
const std::string* key) {
const std::string_view key) {
++stats.get;
auto lock = chainedItemAcquireSharedLock(*key);
auto lock = chainedItemAcquireSharedLock(key);

if (ticker_) {
ticker_->updateTimeStamp(req->timestamp);
Expand All @@ -233,8 +233,7 @@ class AsyncCacheStressor : public Stressor {
// add a distribution over sequences of requests/access patterns
// e.g. get-no-set and set-no-get

auto onReadyFn = [&, req, key = *key,
l = std::move(lock)](auto hdl) mutable {
auto onReadyFn = [&, req, key, l = std::move(lock)](auto hdl) mutable {
auto result = OpResultType::kGetMiss;

if (hdl == nullptr) {
Expand All @@ -247,7 +246,7 @@ class AsyncCacheStressor : public Stressor {
// appropriate here)
l.unlock();
auto xlock = chainedItemAcquireUniqueLock(key);
setKey(pid, stats, &key, *(req->sizeBegin), req->ttlSecs,
setKey(pid, stats, key, *(req->sizeBegin), req->ttlSecs,
req->admFeatureMap);
}
} else {
Expand All @@ -260,8 +259,8 @@ class AsyncCacheStressor : public Stressor {
}
};

cache_->recordAccess(*key);
auto sf = cache_->asyncFind(*key);
cache_->recordAccess(key);
auto sf = cache_->asyncFind(key);
if (sf.isReady()) {
// If the handle is ready, call onReadyFn directly to process the handle
onReadyFn(std::move(sf).value());
Expand All @@ -283,9 +282,9 @@ class AsyncCacheStressor : public Stressor {
ThroughputStats& stats,
const Request* req,
folly::EventBase* evb,
const std::string* key) {
const std::string_view key) {
++stats.get;
auto lock = chainedItemAcquireUniqueLock(*key);
auto lock = chainedItemAcquireUniqueLock(key);

// This was moved outside the lambda, as otherwise gcc-8.x crashes with an
// internal compiler error here (suspected regression in folly).
Expand All @@ -297,7 +296,7 @@ class AsyncCacheStressor : public Stressor {
++stats.getMiss;

++stats.set;
wHdl = cache_->allocate(pid, *key, *(req->sizeBegin), req->ttlSecs);
wHdl = cache_->allocate(pid, key, *(req->sizeBegin), req->ttlSecs);
if (!wHdl) {
++stats.setFailure;
return;
Expand Down Expand Up @@ -327,7 +326,7 @@ class AsyncCacheStressor : public Stressor {
};

// Always use asyncFind as findToWrite is sync when using HybridCache
auto sf = cache_->asyncFind(*key);
auto sf = cache_->asyncFind(key);
if (sf.isReady()) {
onReadyFn(std::move(sf).value());
return;
Expand All @@ -345,10 +344,10 @@ class AsyncCacheStressor : public Stressor {
void asyncUpdate(ThroughputStats& stats,
const Request* req,
folly::EventBase* evb,
const std::string* key) {
const std::string_view key) {
++stats.get;
++stats.update;
auto lock = chainedItemAcquireUniqueLock(*key);
auto lock = chainedItemAcquireUniqueLock(key);
if (ticker_) {
ticker_->updateTimeStamp(req->timestamp);
}
Expand All @@ -363,7 +362,7 @@ class AsyncCacheStressor : public Stressor {
cache_->updateItemRecordVersion(wHdl);
};

auto sf = cache_->asyncFind(*key);
auto sf = cache_->asyncFind(key);
if (sf.isReady()) {
onReadyFn(std::move(sf).value());
return;
Expand Down Expand Up @@ -457,18 +456,18 @@ class AsyncCacheStressor : public Stressor {
const auto pid = static_cast<PoolId>(opPoolDist(gen));
const Request& req(getReq(pid, gen, lastRequestId));
OpType op = req.getOp();
const std::string* key = &(req.key);
std::string oneHitKey;
std::string_view key = req.key;
std::string_view oneHitKey;
if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
oneHitKey = Request::getUniqueKey();
key = &oneHitKey;
key = oneHitKey;
}

OpResultType result(OpResultType::kNop);
switch (op) {
case OpType::kLoneSet:
case OpType::kSet: {
auto lock = chainedItemAcquireUniqueLock(*key);
auto lock = chainedItemAcquireUniqueLock(key);
result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
req.admFeatureMap);

Expand All @@ -481,8 +480,8 @@ class AsyncCacheStressor : public Stressor {
}
case OpType::kDel: {
++stats.del;
auto lock = chainedItemAcquireUniqueLock(*key);
auto res = cache_->remove(*key);
auto lock = chainedItemAcquireUniqueLock(key);
auto res = cache_->remove(key);
if (res == CacheT::RemoveRes::kNotFoundInRam) {
++stats.delNotFound;
}
Expand Down Expand Up @@ -532,7 +531,7 @@ class AsyncCacheStressor : public Stressor {
OpResultType setKey(
PoolId pid,
ThroughputStats& stats,
const std::string* key,
const std::string_view key,
size_t size,
uint32_t ttlSecs,
const std::unordered_map<std::string, std::string>& featureMap) {
Expand All @@ -543,7 +542,7 @@ class AsyncCacheStressor : public Stressor {
}

++stats.set;
auto it = cache_->allocate(pid, *key, size, ttlSecs);
auto it = cache_->allocate(pid, key, size, ttlSecs);
if (it == nullptr) {
++stats.setFailure;
return OpResultType::kSetFailure;
Expand Down
40 changes: 20 additions & 20 deletions cachelib/cachebench/runner/CacheStressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,24 +321,24 @@ class CacheStressor : public Stressor {
const auto pid = static_cast<PoolId>(opPoolDist(gen));
const Request& req(getReq(pid, gen, lastRequestId));
OpType op = req.getOp();
const std::string* key = &(req.key);
std::string oneHitKey;
std::string_view key = req.key;
std::string_view oneHitKey;
if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
oneHitKey = Request::getUniqueKey();
key = &oneHitKey;
key = oneHitKey;
}

OpResultType result(OpResultType::kNop);
switch (op) {
case OpType::kLoneSet:
case OpType::kSet: {
if (config_.onlySetIfMiss) {
auto it = cache_->find(*key);
auto it = cache_->find(key);
if (it != nullptr) {
continue;
}
}
auto lock = chainedItemAcquireUniqueLock(*key);
auto lock = chainedItemAcquireUniqueLock(key);
result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
req.admFeatureMap, req.itemValue);

Expand All @@ -348,17 +348,17 @@ class CacheStressor : public Stressor {
case OpType::kGet: {
++stats.get;

auto slock = chainedItemAcquireSharedLock(*key);
auto xlock = decltype(chainedItemAcquireUniqueLock(*key)){};
auto slock = chainedItemAcquireSharedLock(key);
auto xlock = decltype(chainedItemAcquireUniqueLock(key)){};

if (ticker_) {
ticker_->updateTimeStamp(req.timestamp);
}
// TODO currently pure lookaside, we should
// add a distribution over sequences of requests/access patterns
// e.g. get-no-set and set-no-get
cache_->recordAccess(*key);
auto it = cache_->find(*key);
cache_->recordAccess(key);
auto it = cache_->find(key);
if (it == nullptr) {
++stats.getMiss;
result = OpResultType::kGetMiss;
Expand All @@ -368,7 +368,7 @@ class CacheStressor : public Stressor {
// upgrade access privledges, (lock_upgrade is not
// appropriate here)
slock = {};
xlock = chainedItemAcquireUniqueLock(*key);
xlock = chainedItemAcquireUniqueLock(key);
setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
req.admFeatureMap, req.itemValue);
}
Expand All @@ -380,22 +380,22 @@ class CacheStressor : public Stressor {
}
case OpType::kDel: {
++stats.del;
auto lock = chainedItemAcquireUniqueLock(*key);
auto res = cache_->remove(*key);
auto lock = chainedItemAcquireUniqueLock(key);
auto res = cache_->remove(key);
if (res == CacheT::RemoveRes::kNotFoundInRam) {
++stats.delNotFound;
}
break;
}
case OpType::kAddChained: {
++stats.get;
auto lock = chainedItemAcquireUniqueLock(*key);
auto it = cache_->findToWrite(*key);
auto lock = chainedItemAcquireUniqueLock(key);
auto it = cache_->findToWrite(key);
if (!it) {
++stats.getMiss;

++stats.set;
it = cache_->allocate(pid, *key, *(req.sizeBegin), req.ttlSecs);
it = cache_->allocate(pid, key, *(req.sizeBegin), req.ttlSecs);
if (!it) {
++stats.setFailure;
break;
Expand Down Expand Up @@ -426,11 +426,11 @@ class CacheStressor : public Stressor {
case OpType::kUpdate: {
++stats.get;
++stats.update;
auto lock = chainedItemAcquireUniqueLock(*key);
auto lock = chainedItemAcquireUniqueLock(key);
if (ticker_) {
ticker_->updateTimeStamp(req.timestamp);
}
auto it = cache_->findToWrite(*key);
auto it = cache_->findToWrite(key);
if (it == nullptr) {
++stats.getMiss;
++stats.updateMiss;
Expand All @@ -441,7 +441,7 @@ class CacheStressor : public Stressor {
}
case OpType::kCouldExist: {
++stats.couldExistOp;
if (!cache_->couldExist(*key)) {
if (!cache_->couldExist(key)) {
++stats.couldExistOpFalse;
}
break;
Expand Down Expand Up @@ -476,7 +476,7 @@ class CacheStressor : public Stressor {
OpResultType setKey(
PoolId pid,
ThroughputStats& stats,
const std::string* key,
const std::string_view key,
size_t size,
uint32_t ttlSecs,
const std::unordered_map<std::string, std::string>& featureMap,
Expand All @@ -488,7 +488,7 @@ class CacheStressor : public Stressor {
}

++stats.set;
auto it = cache_->allocate(pid, *key, size, ttlSecs);
auto it = cache_->allocate(pid, key, size, ttlSecs);
if (it == nullptr) {
++stats.setFailure;
return OpResultType::kSetFailure;
Expand Down
3 changes: 3 additions & 0 deletions cachelib/cachebench/runner/Stressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "cachelib/cachebench/runner/FastShutdown.h"
#include "cachelib/cachebench/runner/IntegrationStressor.h"
#include "cachelib/cachebench/workload/BlockChunkReplayGenerator.h"
#include "cachelib/cachebench/workload/BinaryKVReplayGenerator.h"
#include "cachelib/cachebench/workload/KVReplayGenerator.h"
#include "cachelib/cachebench/workload/OnlineGenerator.h"
#include "cachelib/cachebench/workload/PieceWiseReplayGenerator.h"
Expand Down Expand Up @@ -145,6 +146,8 @@ std::unique_ptr<GeneratorBase> makeGenerator(const StressorConfig& config) {
return std::make_unique<KVReplayGenerator>(config);
} else if (config.generator == "block-replay") {
return std::make_unique<BlockChunkReplayGenerator>(config);
} else if (config.generator == "binary-replay") {
return std::make_unique<BinaryKVReplayGenerator>(config);
} else if (config.generator.empty() || config.generator == "workload") {
// TODO: Remove the empty() check once we label workload-based configs
// properly
Expand Down
8 changes: 6 additions & 2 deletions cachelib/cachebench/util/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ StressorConfig::StressorConfig(const folly::dynamic& configJson) {
// If you added new fields to the configuration, update the JSONSetVal
// to make them available for the json configs and increment the size
// below
checkCorrectSize<StressorConfig, 512>();
checkCorrectSize<StressorConfig, 560>();
}

bool StressorConfig::usesChainedItems() const {
Expand Down Expand Up @@ -197,6 +197,10 @@ DistributionConfig::DistributionConfig(const folly::dynamic& jsonConfig,

ReplayGeneratorConfig::ReplayGeneratorConfig(const folly::dynamic& configJson) {
JSONSetVal(configJson, ampFactor);
JSONSetVal(configJson, ampSizeFactor);
JSONSetVal(configJson, binaryFileName);
JSONSetVal(configJson, fastForwardCount);
JSONSetVal(configJson, preLoadReqs);
JSONSetVal(configJson, replaySerializationMode);
JSONSetVal(configJson, relaxedSerialIntervalMs);
JSONSetVal(configJson, numAggregationFields);
Expand All @@ -217,7 +221,7 @@ ReplayGeneratorConfig::ReplayGeneratorConfig(const folly::dynamic& configJson) {
"Unsupported request serialization mode: {}", replaySerializationMode));
}

checkCorrectSize<ReplayGeneratorConfig, 136>();
checkCorrectSize<ReplayGeneratorConfig, 184>();
}

ReplayGeneratorConfig::SerializeMode
Expand Down
13 changes: 13 additions & 0 deletions cachelib/cachebench/util/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,19 @@ struct ReplayGeneratorConfig : public JSONConfig {
std::string replaySerializationMode{"strict"};

uint32_t ampFactor{1};
uint32_t ampSizeFactor{1};

// the path of the binary file to make
std::string binaryFileName{};

// The number of requests (not including ampFactor) to skip
// in the trace. This is so that after warming up the cache
// with a certain number of requests, we can easily reattach
// and resume execution with different cache configurations.
uint64_t fastForwardCount{0};

// The number of requests to pre load into the request queues
uint64_t preLoadReqs{0};

// The time interval threshold when replaySerializationMode is relaxed.
uint64_t relaxedSerialIntervalMs{500};
Expand Down
Loading
Loading