Skip to content

Commit

Permalink
Do not include last level in compaction when `allow_ingest_behind=tru…
Browse files Browse the repository at this point in the history
…e` (#11489)

Summary:
when a DB is configured with `allow_ingest_behind = true`, the last level should be reserved for ingested files and these files should not be included in any compaction. Currently, a major compaction can compact these files to smaller levels. This can cause future files to be rejected for ingest behind (see `ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile()`). This PR fixes the issue such that files in the last level is not included in any compaction.

Pull Request resolved: #11489

Test Plan: * Updated unit test `ExternalSSTFileTest.IngestBehind` to test that last level is not included in manual and auto-compaction.

Reviewed By: ajkr

Differential Revision: D46455711

Pulled By: cbi42

fbshipit-source-id: 5e2142c2a709ef932ad797897795021c06c4ac8c
  • Loading branch information
cbi42 authored and facebook-github-bot committed Jun 14, 2023
1 parent cac3240 commit 15e8a84
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 61 deletions.
16 changes: 7 additions & 9 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,23 +611,21 @@ Compaction* CompactionPicker::CompactRange(
// Universal compaction with more than one level always compacts all the
// files together to the last level.
assert(vstorage->num_levels() > 1);
int max_output_level =
vstorage->MaxOutputLevel(ioptions_.allow_ingest_behind);
// DBImpl::CompactRange() set output level to be the last level
if (ioptions_.allow_ingest_behind) {
assert(output_level == vstorage->num_levels() - 2);
} else {
assert(output_level == vstorage->num_levels() - 1);
}
assert(output_level == max_output_level);
// DBImpl::RunManualCompaction will make full range for universal compaction
assert(begin == nullptr);
assert(end == nullptr);
*compaction_end = nullptr;

int start_level = 0;
for (; start_level < vstorage->num_levels() &&
for (; start_level <= max_output_level &&
vstorage->NumLevelFiles(start_level) == 0;
start_level++) {
}
if (start_level == vstorage->num_levels()) {
if (start_level > max_output_level) {
return nullptr;
}

Expand All @@ -637,9 +635,9 @@ Compaction* CompactionPicker::CompactRange(
return nullptr;
}

std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
std::vector<CompactionInputFiles> inputs(max_output_level + 1 -
start_level);
for (int level = start_level; level < vstorage->num_levels(); level++) {
for (int level = start_level; level <= max_output_level; level++) {
inputs[level - start_level].level = level;
auto& files = inputs[level - start_level].files;
for (FileMetaData* f : vstorage->LevelFiles(level)) {
Expand Down
10 changes: 9 additions & 1 deletion db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) {

TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) {
const uint64_t kFileSize = 100000;
NewVersionStorage(1, kCompactionStyleUniversal);
NewVersionStorage(3 /* num_levels */, kCompactionStyleUniversal);
ioptions_.allow_ingest_behind = true;
ioptions_.num_levels = 3;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
Expand All @@ -532,6 +532,14 @@ TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) {

// output level should be the one above the bottom-most
ASSERT_EQ(1, compaction->output_level());

// input should not include the reserved level
const std::vector<CompactionInputFiles>* inputs = compaction->inputs();
for (const auto& compaction_input : *inputs) {
if (!compaction_input.empty()) {
ASSERT_LT(compaction_input.level, 2);
}
}
}
// Tests if the files can be trivially moved in multi level
// universal compaction when allow_trivial_move option is set
Expand Down
55 changes: 24 additions & 31 deletions db/compaction/compaction_picker_universal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ class UniversalCompactionBuilder {
UniversalCompactionPicker* picker_;
LogBuffer* log_buffer_;

static std::vector<SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage);
static std::vector<UniversalCompactionBuilder::SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage, int last_level);

// Pick a path ID to place a newly generated file, with its estimated file
// size.
Expand Down Expand Up @@ -339,13 +339,13 @@ void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(

std::vector<UniversalCompactionBuilder::SortedRun>
UniversalCompactionBuilder::CalculateSortedRuns(
const VersionStorageInfo& vstorage) {
const VersionStorageInfo& vstorage, int last_level) {
std::vector<UniversalCompactionBuilder::SortedRun> ret;
for (FileMetaData* f : vstorage.LevelFiles(0)) {
ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
f->being_compacted);
}
for (int level = 1; level < vstorage.num_levels(); level++) {
for (int level = 1; level <= last_level; level++) {
uint64_t total_compensated_size = 0U;
uint64_t total_size = 0U;
bool being_compacted = false;
Expand Down Expand Up @@ -374,7 +374,9 @@ UniversalCompactionBuilder::CalculateSortedRuns(
Compaction* UniversalCompactionBuilder::PickCompaction() {
const int kLevel0 = 0;
score_ = vstorage_->CompactionScore(kLevel0);
sorted_runs_ = CalculateSortedRuns(*vstorage_);
int max_output_level =
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind);
sorted_runs_ = CalculateSortedRuns(*vstorage_, max_output_level);

if (sorted_runs_.size() == 0 ||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
Expand Down Expand Up @@ -471,6 +473,8 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
"UniversalCompactionBuilder::PickCompaction:Return", nullptr);
return nullptr;
}
assert(c->output_level() <=
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind));

if (mutable_cf_options_.compaction_options_universal.allow_trivial_move ==
true &&
Expand Down Expand Up @@ -698,22 +702,18 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
int start_level = sorted_runs_[start_index].level;
int output_level;
// last level is reserved for the files ingested behind
int max_output_level =
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind);
if (first_index_after == sorted_runs_.size()) {
output_level = vstorage_->num_levels() - 1;
output_level = max_output_level;
} else if (sorted_runs_[first_index_after].level == 0) {
output_level = 0;
} else {
output_level = sorted_runs_[first_index_after].level - 1;
}

// last level is reserved for the files ingested behind
if (ioptions_.allow_ingest_behind &&
(output_level == vstorage_->num_levels() - 1)) {
assert(output_level > 1);
output_level--;
}

std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
std::vector<CompactionInputFiles> inputs(max_output_level + 1);
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
Expand Down Expand Up @@ -1192,18 +1192,20 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
return nullptr;
}

int max_output_level =
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind);
// Pick the first non-empty level after the start_level
for (output_level = start_level + 1; output_level < vstorage_->num_levels();
for (output_level = start_level + 1; output_level <= max_output_level;
output_level++) {
if (vstorage_->NumLevelFiles(output_level) != 0) {
break;
}
}

// If all higher levels are empty, pick the highest level as output level
if (output_level == vstorage_->num_levels()) {
if (output_level > max_output_level) {
if (start_level == 0) {
output_level = vstorage_->num_levels() - 1;
output_level = max_output_level;
} else {
// If start level is non-zero and all higher levels are empty, this
// compaction will translate into a trivial move. Since the idea is
Expand All @@ -1212,11 +1214,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
return nullptr;
}
}
if (ioptions_.allow_ingest_behind &&
output_level == vstorage_->num_levels() - 1) {
assert(output_level > 1);
output_level--;
}
assert(output_level <= max_output_level);

if (output_level != 0) {
if (start_level == 0) {
Expand Down Expand Up @@ -1293,8 +1291,9 @@ Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange(
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
int start_level = sorted_runs_[start_index].level;

std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
int max_output_level =
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind);
std::vector<CompactionInputFiles> inputs(max_output_level + 1);
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
Expand Down Expand Up @@ -1331,13 +1330,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange(

int output_level;
if (end_index == sorted_runs_.size() - 1) {
// output files at the last level, unless it's reserved
output_level = vstorage_->num_levels() - 1;
// last level is reserved for the files ingested behind
if (ioptions_.allow_ingest_behind) {
assert(output_level > 1);
output_level--;
}
output_level = max_output_level;
} else {
// if it's not including all sorted_runs, it can only output to the level
// above the `end_index + 1` sorted_run.
Expand Down
8 changes: 8 additions & 0 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,14 @@ Status DBImpl::CompactFilesImpl(
}
}

if (cfd->ioptions()->allow_ingest_behind &&
output_level >= cfd->ioptions()->num_levels - 1) {
return Status::InvalidArgument(
"Exceed the maximum output level defined by "
"the current compaction algorithm with ingest_behind --- " +
std::to_string(cfd->ioptions()->num_levels - 1));
}

Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
&input_set, cf_meta, output_level);
TEST_SYNC_POINT("DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles");
Expand Down
40 changes: 36 additions & 4 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2160,13 +2160,13 @@ TEST_P(ExternalSSTFileTest, IngestBehind) {
// Insert 100 -> 200 into the memtable
for (int i = 100; i <= 200; i++) {
ASSERT_OK(Put(Key(i), "memtable"));
true_data[Key(i)] = "memtable";
}

// Insert 100 -> 200 using IngestExternalFile
file_data.clear();
for (int i = 0; i <= 20; i++) {
file_data.emplace_back(Key(i), "ingest_behind");
true_data[Key(i)] = "ingest_behind";
}

bool allow_global_seqno = true;
Expand All @@ -2188,6 +2188,7 @@ TEST_P(ExternalSSTFileTest, IngestBehind) {

options.num_levels = 3;
DestroyAndReopen(options);
true_data.clear();
// Insert 100 -> 200 into the memtable
for (int i = 100; i <= 200; i++) {
ASSERT_OK(Put(Key(i), "memtable"));
Expand All @@ -2207,12 +2208,43 @@ TEST_P(ExternalSSTFileTest, IngestBehind) {
verify_checksums_before_ingest, true /*ingest_behind*/,
false /*sort_data*/, &true_data));
ASSERT_EQ("0,1,1", FilesPerLevel());
std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files);
uint64_t ingested_file_number = level_to_files[2][0].fd.GetNumber();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// bottom level should be empty
ASSERT_EQ("0,1", FilesPerLevel());

// Last level should not be compacted
ASSERT_EQ("0,1,1", FilesPerLevel());
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files);
ASSERT_EQ(ingested_file_number, level_to_files[2][0].fd.GetNumber());
size_t kcnt = 0;
VerifyDBFromMap(true_data, &kcnt, false);

// Auto-compaction should not include the last level.
// Trigger compaction if size amplification exceeds 110%.
options.compaction_options_universal.max_size_amplification_percent = 110;
options.level0_file_num_compaction_trigger = 4;
TryReopen(options);
Random rnd(301);
for (int i = 0; i < 4; ++i) {
for (int j = 0; j < 10; j++) {
true_data[Key(j)] = rnd.RandomString(1000);
ASSERT_OK(Put(Key(j), true_data[Key(j)]));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files);
ASSERT_EQ(1, level_to_files[2].size());
ASSERT_EQ(ingested_file_number, level_to_files[2][0].fd.GetNumber());

// Turning off the option allows DB to compact ingested files.
options.allow_ingest_behind = false;
TryReopen(options);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files);
ASSERT_EQ(1, level_to_files[2].size());
ASSERT_NE(ingested_file_number, level_to_files[2][0].fd.GetNumber());
VerifyDBFromMap(true_data, &kcnt, false);
}

TEST_F(ExternalSSTFileTest, SkipBloomFilter) {
Expand Down
19 changes: 11 additions & 8 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3390,6 +3390,7 @@ void VersionStorageInfo::ComputeCompactionScore(
// maintaining it to be over 1.0, we scale the original score by 10x
// if it is larger than 1.0.
const double kScoreScale = 10.0;
int max_output_level = MaxOutputLevel(immutable_options.allow_ingest_behind);
for (int level = 0; level <= MaxInputLevel(); level++) {
double score;
if (level == 0) {
Expand Down Expand Up @@ -3417,7 +3418,7 @@ void VersionStorageInfo::ComputeCompactionScore(
// For universal compaction, we use level0 score to indicate
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
for (int i = 1; i <= max_output_level; i++) {
// It's possible that a subset of the files in a level may be in a
// compaction, due to delete triggered compaction or trivial move.
// In that case, the below check may not catch a level being
Expand Down Expand Up @@ -3561,16 +3562,18 @@ void VersionStorageInfo::ComputeCompactionScore(
}
}
}
ComputeFilesMarkedForCompaction();
ComputeFilesMarkedForCompaction(max_output_level);
if (!immutable_options.allow_ingest_behind) {
ComputeBottommostFilesMarkedForCompaction();
}
if (mutable_cf_options.ttl > 0) {
if (mutable_cf_options.ttl > 0 &&
compaction_style_ == kCompactionStyleLevel) {
ComputeExpiredTtlFiles(immutable_options, mutable_cf_options.ttl);
}
if (mutable_cf_options.periodic_compaction_seconds > 0) {
ComputeFilesMarkedForPeriodicCompaction(
immutable_options, mutable_cf_options.periodic_compaction_seconds);
immutable_options, mutable_cf_options.periodic_compaction_seconds,
max_output_level);
}

if (mutable_cf_options.enable_blob_garbage_collection &&
Expand All @@ -3584,14 +3587,14 @@ void VersionStorageInfo::ComputeCompactionScore(
EstimateCompactionBytesNeeded(mutable_cf_options);
}

void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
void VersionStorageInfo::ComputeFilesMarkedForCompaction(int last_level) {
files_marked_for_compaction_.clear();
int last_qualify_level = 0;

// Do not include files from the last level with data
// If table properties collector suggests a file on the last level,
// we should not move it to a new level.
for (int level = num_levels() - 1; level >= 1; level--) {
for (int level = last_level; level >= 1; level--) {
if (!files_[level].empty()) {
last_qualify_level = level - 1;
break;
Expand Down Expand Up @@ -3635,7 +3638,7 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(

void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
const ImmutableOptions& ioptions,
const uint64_t periodic_compaction_seconds) {
const uint64_t periodic_compaction_seconds, int last_level) {
assert(periodic_compaction_seconds > 0);

files_marked_for_periodic_compaction_.clear();
Expand All @@ -3656,7 +3659,7 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
const uint64_t allowed_time_limit =
current_time - periodic_compaction_seconds;

for (int level = 0; level < num_levels(); level++) {
for (int level = 0; level <= last_level; level++) {
for (auto f : files_[level]) {
if (!f->being_compacted) {
// Compute a file's modification time in the following order:
Expand Down
Loading

0 comments on commit 15e8a84

Please sign in to comment.