Skip to content

Commit 2eb0b14

Browse files
committed
Binary KVReplayGenerator
------------------------ This offers much lower overhead of trace replaying. It assumes the kvcache trace format and kvcache behavoir. This patch supports the following: - binary request generation and replay - fast forwarding of a trace - preloading requests into memory - object size amplification - queue free for even lower request overhead - can parse many more requests per second than cachelib can process, so we can get 100% CPU usage The limitations are: - no trace amplification (however you can amplify the original .csv trace and save it in binary format) - ~4GB overhead per 100 million requests - you need some disk space to store large traces
1 parent dd8a6c5 commit 2eb0b14

12 files changed

+661
-70
lines changed

cachelib/cachebench/cache/Cache.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -314,8 +314,8 @@ class Cache {
314314
// return true if the key was previously detected to be inconsistent. This
315315
// is useful only when consistency checking is enabled by calling
316316
// enableConsistencyCheck()
317-
bool isInvalidKey(const std::string& key) {
318-
return invalidKeys_[key].load(std::memory_order_relaxed);
317+
bool isInvalidKey(const std::string_view key) {
318+
return invalidKeys_[std::string(key)].load(std::memory_order_relaxed);
319319
}
320320

321321
// Get overall stats on the whole cache allocator

cachelib/cachebench/runner/AsyncCacheStressor.h

+21-22
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,9 @@ class AsyncCacheStressor : public Stressor {
222222
ThroughputStats& stats,
223223
const Request* req,
224224
folly::EventBase* evb,
225-
const std::string* key) {
225+
const std::string_view key) {
226226
++stats.get;
227-
auto lock = chainedItemAcquireSharedLock(*key);
227+
auto lock = chainedItemAcquireSharedLock(key);
228228

229229
if (ticker_) {
230230
ticker_->updateTimeStamp(req->timestamp);
@@ -233,8 +233,7 @@ class AsyncCacheStressor : public Stressor {
233233
// add a distribution over sequences of requests/access patterns
234234
// e.g. get-no-set and set-no-get
235235

236-
auto onReadyFn = [&, req, key = *key,
237-
l = std::move(lock)](auto hdl) mutable {
236+
auto onReadyFn = [&, req, key, l = std::move(lock)](auto hdl) mutable {
238237
auto result = OpResultType::kGetMiss;
239238

240239
if (hdl == nullptr) {
@@ -247,7 +246,7 @@ class AsyncCacheStressor : public Stressor {
247246
// appropriate here)
248247
l.unlock();
249248
auto xlock = chainedItemAcquireUniqueLock(key);
250-
setKey(pid, stats, &key, *(req->sizeBegin), req->ttlSecs,
249+
setKey(pid, stats, key, *(req->sizeBegin), req->ttlSecs,
251250
req->admFeatureMap);
252251
}
253252
} else {
@@ -260,8 +259,8 @@ class AsyncCacheStressor : public Stressor {
260259
}
261260
};
262261

263-
cache_->recordAccess(*key);
264-
auto sf = cache_->asyncFind(*key);
262+
cache_->recordAccess(key);
263+
auto sf = cache_->asyncFind(key);
265264
if (sf.isReady()) {
266265
// If the handle is ready, call onReadyFn directly to process the handle
267266
onReadyFn(std::move(sf).value());
@@ -283,9 +282,9 @@ class AsyncCacheStressor : public Stressor {
283282
ThroughputStats& stats,
284283
const Request* req,
285284
folly::EventBase* evb,
286-
const std::string* key) {
285+
const std::string_view key) {
287286
++stats.get;
288-
auto lock = chainedItemAcquireUniqueLock(*key);
287+
auto lock = chainedItemAcquireUniqueLock(key);
289288

290289
// This was moved outside the lambda, as otherwise gcc-8.x crashes with an
291290
// internal compiler error here (suspected regression in folly).
@@ -297,7 +296,7 @@ class AsyncCacheStressor : public Stressor {
297296
++stats.getMiss;
298297

299298
++stats.set;
300-
wHdl = cache_->allocate(pid, *key, *(req->sizeBegin), req->ttlSecs);
299+
wHdl = cache_->allocate(pid, key, *(req->sizeBegin), req->ttlSecs);
301300
if (!wHdl) {
302301
++stats.setFailure;
303302
return;
@@ -327,7 +326,7 @@ class AsyncCacheStressor : public Stressor {
327326
};
328327

329328
// Always use asyncFind as findToWrite is sync when using HybridCache
330-
auto sf = cache_->asyncFind(*key);
329+
auto sf = cache_->asyncFind(key);
331330
if (sf.isReady()) {
332331
onReadyFn(std::move(sf).value());
333332
return;
@@ -345,10 +344,10 @@ class AsyncCacheStressor : public Stressor {
345344
void asyncUpdate(ThroughputStats& stats,
346345
const Request* req,
347346
folly::EventBase* evb,
348-
const std::string* key) {
347+
const std::string_view key) {
349348
++stats.get;
350349
++stats.update;
351-
auto lock = chainedItemAcquireUniqueLock(*key);
350+
auto lock = chainedItemAcquireUniqueLock(key);
352351
if (ticker_) {
353352
ticker_->updateTimeStamp(req->timestamp);
354353
}
@@ -363,7 +362,7 @@ class AsyncCacheStressor : public Stressor {
363362
cache_->updateItemRecordVersion(wHdl);
364363
};
365364

366-
auto sf = cache_->asyncFind(*key);
365+
auto sf = cache_->asyncFind(key);
367366
if (sf.isReady()) {
368367
onReadyFn(std::move(sf).value());
369368
return;
@@ -457,18 +456,18 @@ class AsyncCacheStressor : public Stressor {
457456
const auto pid = static_cast<PoolId>(opPoolDist(gen));
458457
const Request& req(getReq(pid, gen, lastRequestId));
459458
OpType op = req.getOp();
460-
const std::string* key = &(req.key);
461-
std::string oneHitKey;
459+
std::string_view key = req.key;
460+
std::string_view oneHitKey;
462461
if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
463462
oneHitKey = Request::getUniqueKey();
464-
key = &oneHitKey;
463+
key = oneHitKey;
465464
}
466465

467466
OpResultType result(OpResultType::kNop);
468467
switch (op) {
469468
case OpType::kLoneSet:
470469
case OpType::kSet: {
471-
auto lock = chainedItemAcquireUniqueLock(*key);
470+
auto lock = chainedItemAcquireUniqueLock(key);
472471
result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
473472
req.admFeatureMap);
474473

@@ -481,8 +480,8 @@ class AsyncCacheStressor : public Stressor {
481480
}
482481
case OpType::kDel: {
483482
++stats.del;
484-
auto lock = chainedItemAcquireUniqueLock(*key);
485-
auto res = cache_->remove(*key);
483+
auto lock = chainedItemAcquireUniqueLock(key);
484+
auto res = cache_->remove(key);
486485
if (res == CacheT::RemoveRes::kNotFoundInRam) {
487486
++stats.delNotFound;
488487
}
@@ -532,7 +531,7 @@ class AsyncCacheStressor : public Stressor {
532531
OpResultType setKey(
533532
PoolId pid,
534533
ThroughputStats& stats,
535-
const std::string* key,
534+
const std::string_view key,
536535
size_t size,
537536
uint32_t ttlSecs,
538537
const std::unordered_map<std::string, std::string>& featureMap) {
@@ -543,7 +542,7 @@ class AsyncCacheStressor : public Stressor {
543542
}
544543

545544
++stats.set;
546-
auto it = cache_->allocate(pid, *key, size, ttlSecs);
545+
auto it = cache_->allocate(pid, key, size, ttlSecs);
547546
if (it == nullptr) {
548547
++stats.setFailure;
549548
return OpResultType::kSetFailure;

cachelib/cachebench/runner/CacheStressor.h

+20-20
Original file line numberDiff line numberDiff line change
@@ -321,24 +321,24 @@ class CacheStressor : public Stressor {
321321
const auto pid = static_cast<PoolId>(opPoolDist(gen));
322322
const Request& req(getReq(pid, gen, lastRequestId));
323323
OpType op = req.getOp();
324-
const std::string* key = &(req.key);
325-
std::string oneHitKey;
324+
std::string_view key = req.key;
325+
std::string_view oneHitKey;
326326
if (op == OpType::kLoneGet || op == OpType::kLoneSet) {
327327
oneHitKey = Request::getUniqueKey();
328-
key = &oneHitKey;
328+
key = oneHitKey;
329329
}
330330

331331
OpResultType result(OpResultType::kNop);
332332
switch (op) {
333333
case OpType::kLoneSet:
334334
case OpType::kSet: {
335335
if (config_.onlySetIfMiss) {
336-
auto it = cache_->find(*key);
336+
auto it = cache_->find(key);
337337
if (it != nullptr) {
338338
continue;
339339
}
340340
}
341-
auto lock = chainedItemAcquireUniqueLock(*key);
341+
auto lock = chainedItemAcquireUniqueLock(key);
342342
result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
343343
req.admFeatureMap, req.itemValue);
344344

@@ -348,17 +348,17 @@ class CacheStressor : public Stressor {
348348
case OpType::kGet: {
349349
++stats.get;
350350

351-
auto slock = chainedItemAcquireSharedLock(*key);
352-
auto xlock = decltype(chainedItemAcquireUniqueLock(*key)){};
351+
auto slock = chainedItemAcquireSharedLock(key);
352+
auto xlock = decltype(chainedItemAcquireUniqueLock(key)){};
353353

354354
if (ticker_) {
355355
ticker_->updateTimeStamp(req.timestamp);
356356
}
357357
// TODO currently pure lookaside, we should
358358
// add a distribution over sequences of requests/access patterns
359359
// e.g. get-no-set and set-no-get
360-
cache_->recordAccess(*key);
361-
auto it = cache_->find(*key);
360+
cache_->recordAccess(key);
361+
auto it = cache_->find(key);
362362
if (it == nullptr) {
363363
++stats.getMiss;
364364
result = OpResultType::kGetMiss;
@@ -368,7 +368,7 @@ class CacheStressor : public Stressor {
368368
// upgrade access privledges, (lock_upgrade is not
369369
// appropriate here)
370370
slock = {};
371-
xlock = chainedItemAcquireUniqueLock(*key);
371+
xlock = chainedItemAcquireUniqueLock(key);
372372
setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs,
373373
req.admFeatureMap, req.itemValue);
374374
}
@@ -380,22 +380,22 @@ class CacheStressor : public Stressor {
380380
}
381381
case OpType::kDel: {
382382
++stats.del;
383-
auto lock = chainedItemAcquireUniqueLock(*key);
384-
auto res = cache_->remove(*key);
383+
auto lock = chainedItemAcquireUniqueLock(key);
384+
auto res = cache_->remove(key);
385385
if (res == CacheT::RemoveRes::kNotFoundInRam) {
386386
++stats.delNotFound;
387387
}
388388
break;
389389
}
390390
case OpType::kAddChained: {
391391
++stats.get;
392-
auto lock = chainedItemAcquireUniqueLock(*key);
393-
auto it = cache_->findToWrite(*key);
392+
auto lock = chainedItemAcquireUniqueLock(key);
393+
auto it = cache_->findToWrite(key);
394394
if (!it) {
395395
++stats.getMiss;
396396

397397
++stats.set;
398-
it = cache_->allocate(pid, *key, *(req.sizeBegin), req.ttlSecs);
398+
it = cache_->allocate(pid, key, *(req.sizeBegin), req.ttlSecs);
399399
if (!it) {
400400
++stats.setFailure;
401401
break;
@@ -426,11 +426,11 @@ class CacheStressor : public Stressor {
426426
case OpType::kUpdate: {
427427
++stats.get;
428428
++stats.update;
429-
auto lock = chainedItemAcquireUniqueLock(*key);
429+
auto lock = chainedItemAcquireUniqueLock(key);
430430
if (ticker_) {
431431
ticker_->updateTimeStamp(req.timestamp);
432432
}
433-
auto it = cache_->findToWrite(*key);
433+
auto it = cache_->findToWrite(key);
434434
if (it == nullptr) {
435435
++stats.getMiss;
436436
++stats.updateMiss;
@@ -441,7 +441,7 @@ class CacheStressor : public Stressor {
441441
}
442442
case OpType::kCouldExist: {
443443
++stats.couldExistOp;
444-
if (!cache_->couldExist(*key)) {
444+
if (!cache_->couldExist(key)) {
445445
++stats.couldExistOpFalse;
446446
}
447447
break;
@@ -476,7 +476,7 @@ class CacheStressor : public Stressor {
476476
OpResultType setKey(
477477
PoolId pid,
478478
ThroughputStats& stats,
479-
const std::string* key,
479+
const std::string_view key,
480480
size_t size,
481481
uint32_t ttlSecs,
482482
const std::unordered_map<std::string, std::string>& featureMap,
@@ -488,7 +488,7 @@ class CacheStressor : public Stressor {
488488
}
489489

490490
++stats.set;
491-
auto it = cache_->allocate(pid, *key, size, ttlSecs);
491+
auto it = cache_->allocate(pid, key, size, ttlSecs);
492492
if (it == nullptr) {
493493
++stats.setFailure;
494494
return OpResultType::kSetFailure;

cachelib/cachebench/runner/Stressor.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "cachelib/cachebench/runner/FastShutdown.h"
2323
#include "cachelib/cachebench/runner/IntegrationStressor.h"
2424
#include "cachelib/cachebench/workload/BlockChunkReplayGenerator.h"
25+
#include "cachelib/cachebench/workload/BinaryKVReplayGenerator.h"
2526
#include "cachelib/cachebench/workload/KVReplayGenerator.h"
2627
#include "cachelib/cachebench/workload/OnlineGenerator.h"
2728
#include "cachelib/cachebench/workload/PieceWiseReplayGenerator.h"
@@ -145,6 +146,8 @@ std::unique_ptr<GeneratorBase> makeGenerator(const StressorConfig& config) {
145146
return std::make_unique<KVReplayGenerator>(config);
146147
} else if (config.generator == "block-replay") {
147148
return std::make_unique<BlockChunkReplayGenerator>(config);
149+
} else if (config.generator == "binary-replay") {
150+
return std::make_unique<BinaryKVReplayGenerator>(config);
148151
} else if (config.generator.empty() || config.generator == "workload") {
149152
// TODO: Remove the empty() check once we label workload-based configs
150153
// properly

cachelib/cachebench/util/Config.cpp

+6-2
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ StressorConfig::StressorConfig(const folly::dynamic& configJson) {
9090
// If you added new fields to the configuration, update the JSONSetVal
9191
// to make them available for the json configs and increment the size
9292
// below
93-
checkCorrectSize<StressorConfig, 512>();
93+
checkCorrectSize<StressorConfig, 560>();
9494
}
9595

9696
bool StressorConfig::usesChainedItems() const {
@@ -197,6 +197,10 @@ DistributionConfig::DistributionConfig(const folly::dynamic& jsonConfig,
197197

198198
ReplayGeneratorConfig::ReplayGeneratorConfig(const folly::dynamic& configJson) {
199199
JSONSetVal(configJson, ampFactor);
200+
JSONSetVal(configJson, ampSizeFactor);
201+
JSONSetVal(configJson, binaryFileName);
202+
JSONSetVal(configJson, fastForwardCount);
203+
JSONSetVal(configJson, preLoadReqs);
200204
JSONSetVal(configJson, replaySerializationMode);
201205
JSONSetVal(configJson, relaxedSerialIntervalMs);
202206
JSONSetVal(configJson, numAggregationFields);
@@ -217,7 +221,7 @@ ReplayGeneratorConfig::ReplayGeneratorConfig(const folly::dynamic& configJson) {
217221
"Unsupported request serialization mode: {}", replaySerializationMode));
218222
}
219223

220-
checkCorrectSize<ReplayGeneratorConfig, 136>();
224+
checkCorrectSize<ReplayGeneratorConfig, 184>();
221225
}
222226

223227
ReplayGeneratorConfig::SerializeMode

cachelib/cachebench/util/Config.h

+13
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,19 @@ struct ReplayGeneratorConfig : public JSONConfig {
125125
std::string replaySerializationMode{"strict"};
126126

127127
uint32_t ampFactor{1};
128+
uint32_t ampSizeFactor{1};
129+
130+
// the path of the binary file to make
131+
std::string binaryFileName{};
132+
133+
// The number of requests (not including ampFactor) to skip
134+
// in the trace. This is so that after warming up the cache
135+
// with a certain number of requests, we can easily reattach
136+
// and resume execution with different cache configurations.
137+
uint64_t fastForwardCount{0};
138+
139+
// The number of requests to pre load into the request queues
140+
uint64_t preLoadReqs{0};
128141

129142
// The time interval threshold when replaySerializationMode is relaxed.
130143
uint64_t relaxedSerialIntervalMs{500};

0 commit comments

Comments
 (0)