Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1224,11 +1224,13 @@ CompactObj::ExternalRep CompactObj::GetExternalRep() const {
return static_cast<CompactObj::ExternalRep>(u_.ext_ptr.representation);
}

void CompactObj::SetCool(size_t offset, uint32_t sz, detail::TieredColdRecord* record) {
void CompactObj::SetCool(size_t offset, uint32_t sz, ExternalRep rep,
detail::TieredColdRecord* record) {
// We copy the mask of the "cooled" referenced object because it contains the encoding info.
SetMeta(EXTERNAL_TAG, record->value.mask_);

u_.ext_ptr.is_cool = 1;
u_.ext_ptr.representation = static_cast<uint8_t>(rep);
u_.ext_ptr.page_offset = offset % 4096;
u_.ext_ptr.serialized_size = sz;
u_.ext_ptr.cool_record = record;
Expand All @@ -1244,6 +1246,10 @@ auto CompactObj::GetCool() const -> CoolItem {
return res;
}

void CompactObj::KeepExternal(size_t offset, size_t sz) {
SetExternal(offset, sz, GetExternalRep());
}

std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
DCHECK_EQ(EXTERNAL_TAG, taglen_);
auto& ext = u_.ext_ptr;
Expand Down
7 changes: 6 additions & 1 deletion src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ class CompactObj {
}

// Assigns a cooling record to the object together with its external slice.
void SetCool(size_t offset, uint32_t serialized_size, detail::TieredColdRecord* record);
void SetCool(size_t offset, uint32_t serialized_size, ExternalRep rep,
detail::TieredColdRecord* record);

struct CoolItem {
uint16_t page_offset;
Expand All @@ -376,6 +377,10 @@ class CompactObj {
// Returns the external data of the object incuding its ColdRecord.
CoolItem GetCool() const;

// Prequisite: IsCool() is true.
// Keeps cool record only as external value and discard in-memory part.
void KeepExternal(size_t offset, size_t sz);

std::pair<size_t, size_t> GetExternalSlice() const;

// Injects either the the raw string (extracted with GetRawString()) or the usual string
Expand Down
4 changes: 3 additions & 1 deletion src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ bool ParseDouble(string_view src, double* value) {
#define ADD(x) (x) += o.x

TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 160);
static_assert(sizeof(TieredStats) == 168);

ADD(total_stashes);
ADD(total_fetches);
Expand All @@ -182,6 +182,8 @@ TieredStats& TieredStats::operator+=(const TieredStats& o) {
ADD(small_bins_cnt);
ADD(small_bins_entries_cnt);
ADD(small_bins_filling_bytes);
ADD(small_bins_filling_entries_cnt);

ADD(total_stash_overflows);
ADD(cold_storage_bytes);
ADD(total_offloading_steps);
Expand Down
1 change: 1 addition & 0 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct TieredStats {
uint64_t small_bins_cnt = 0;
uint64_t small_bins_entries_cnt = 0;
size_t small_bins_filling_bytes = 0;
size_t small_bins_filling_entries_cnt = 0;
size_t cold_storage_bytes = 0;

uint64_t clients_throttled = 0; // current number of throttled clients
Expand Down
3 changes: 3 additions & 0 deletions src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,9 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu

op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);

if (auto* ts = op_args.shard->tiered_storage(); ts)
ts->TryStash(op_args.db_cntx.db_index, key, &pv);

return created;
}

Expand Down
149 changes: 117 additions & 32 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@
#include "base/flag_utils.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/detail/listpack_wrap.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/snapshot.h"
#include "server/table.h"
#include "server/tiering/common.h"
#include "server/tiering/op_manager.h"
#include "server/tiering/serialized_map.h"
#include "server/tiering/small_bins.h"
#include "server/tx_base.h"

extern "C" {
#include "redis/listpack.h"
}

using namespace facade;

using AtLeast64 = base::ConstrainedNumericFlagValue<size_t, 64>; // ABSL_FLAG breaks with commas
Expand All @@ -46,6 +52,8 @@ ABSL_FLAG(float, tiered_offload_threshold, 0.5,
ABSL_FLAG(float, tiered_upload_threshold, 0.1,
"Ratio of free memory (free/max memory) below which uploading stops");

ABSL_FLAG(bool, tiered_experimental_hash_support, false, "Experimental hash datatype offloading");

namespace dfly {

using namespace std;
Expand Down Expand Up @@ -73,6 +81,58 @@ tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size};
}

// Determine required byte size and encoding type based on value.
// TODO(vlad): Maybe split into different accessors?
// Do NOT enforce rules depending on dynamic runtime values as this is called
// when scheduling stash and just before succeeeding and is expected to return the same results
optional<pair<size_t /*size*/, CompactObj::ExternalRep>> EstimateSerializedSize(
const PrimeValue& pv) {
switch (pv.ObjType()) {
case OBJ_STRING:
return std::make_pair(pv.GetRawString().view().size(), CompactObj::ExternalRep::STRING);
case OBJ_HASH:
if (pv.Encoding() == kEncodingListPack) {
auto* lp = static_cast<uint8_t*>(pv.RObjPtr());
size_t bytes = lpBytes(lp);
bytes += lpLength(lp) * 2 * 4;
return std::make_pair(bytes, CompactObj::ExternalRep::SERIALIZED_MAP);
}
return {};
default:
return {};
};
}

size_t Serialize(CompactObj::ExternalRep rep, const PrimeValue& pv, io::MutableBytes buffer) {
DCHECK_LE(EstimateSerializedSize(pv)->first, buffer.size());
switch (rep) {
case CompactObj::ExternalRep::STRING: {
auto sv = pv.GetRawString();
memcpy(buffer.data(), sv.view().data(), sv.view().size());
return sv.view().size();
}
case CompactObj::ExternalRep::SERIALIZED_MAP: {
DCHECK_EQ(pv.Encoding(), kEncodingListPack);

// TODO(vlad): Optimize copy for serialization
detail::ListpackWrap lw{static_cast<uint8_t*>(pv.RObjPtr())};
vector<pair<string, string>> entries(lw.begin(), lw.end());
return tiering::SerializedMap::Serialize(
entries, {reinterpret_cast<char*>(buffer.data()), buffer.length()});
}
};
return 0;
}

string SerializeString(const PrimeValue& pv) {
auto estimate = EstimateSerializedSize(pv);
string s(estimate->first, 0);
size_t written =
Serialize(estimate->second, pv, {reinterpret_cast<uint8_t*>(s.data()), s.size()});
s.resize(written);
return s;
}

} // anonymous namespace

class TieredStorage::ShardOpManager : public tiering::OpManager {
Expand Down Expand Up @@ -133,12 +193,20 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
void RetireColdEntries(size_t additional_memory);

// Set value to be an in-memory type again. Update memory stats.
void Upload(DbIndex dbid, string_view value, bool is_raw, size_t serialized_len, PrimeValue* pv) {
void Upload(DbIndex dbid, string_view value, PrimeValue* pv) {
DCHECK(!value.empty());
DCHECK_EQ(uint8_t(pv->GetExternalRep()), uint8_t(CompactObj::ExternalRep::STRING));

pv->Materialize(value, is_raw);
RecordDeleted(*pv, serialized_len, GetDbTableStats(dbid));
switch (pv->GetExternalRep()) {
case CompactObj::ExternalRep::STRING:
pv->Materialize(value, true);
break;
case CompactObj::ExternalRep::SERIALIZED_MAP:
pv->InitRobj(OBJ_HASH, kEncodingListPack, nullptr);
break;
};

RecordDeleted(*pv, value.size(), GetDbTableStats(dbid));
}

// Find entry by key in db_slice and store external segment in place of original value.
Expand All @@ -153,12 +221,13 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
stats->tiered_used_bytes += segment.length;
stats_.total_stashes++;

CompactObj::ExternalRep rep = EstimateSerializedSize(*pv)->second;
if (ts_->config_.experimental_cooling) {
RetireColdEntries(pv->MallocUsed());
ts_->CoolDown(key.first, key.second, segment, pv);
ts_->CoolDown(key.first, key.second, segment, rep, pv);
} else {
stats->AddTypeMemoryUsage(pv->ObjType(), -pv->MallocUsed());
pv->SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING);
pv->SetExternal(segment.offset, segment.length, rep);
}
} else {
LOG(DFATAL) << "Should not reach here";
Expand Down Expand Up @@ -215,7 +284,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
} else {
// Cut out relevant part of value and restore it to memory
string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length);
Upload(dbid, value, true, item_segment.length, &pv);
Upload(dbid, value, &pv);
}
}
}
Expand Down Expand Up @@ -386,36 +455,40 @@ std::optional<util::fb2::Future<bool>> TieredStorage::TryStash(DbIndex dbid, str
return {};
}

StringOrView raw_string = value->GetRawString();
value->SetStashPending(true);
auto estimated = EstimateSerializedSize(*value);
DCHECK(estimated);

tiering::OpManager::EntryId id;
error_code ec;

// TODO(vlad): Replace with encoders for different types
auto stash_string = [&](std::string_view str) {
if (auto prepared = op_manager_->PrepareStash(str.size()); prepared) {
value->SetStashPending(true);
if (OccupiesWholePages(estimated->first)) { // large enough for own page
id = KeyRef(dbid, key);
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
auto [offset, buf] = *prepared;
memcpy(buf.bytes.data(), str.data(), str.size());
tiering::DiskSegment segment{offset, str.size()};
size_t written = Serialize(estimated->second, *value, buf.bytes);
tiering::DiskSegment segment{offset, written};
op_manager_->Stash(id, segment, buf);
} else {
ec = prepared.error();
}
};

if (OccupiesWholePages(value->Size())) { // large enough for own page
id = KeyRef(dbid, key);
stash_string(raw_string.view());
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view()); bin) {
} else if (auto bin = bins_->Stash(dbid, key, SerializeString(*value)); bin) {
id = bin->first;
// TODO(vlad): Write bin to prepared buffer instead of allocating one
stash_string(bin->second);
if (auto prepared = op_manager_->PrepareStash(estimated->first); prepared) {
auto [offset, buf] = *prepared;
memcpy(buf.bytes.data(), bin->second.data(), bin->second.size());
tiering::DiskSegment segment{offset, bin->second.size()};
op_manager_->Stash(id, segment, buf);
} else {
ec = prepared.error();
}
} else {
return {}; // silently added to bin
}

if (ec) {
value->SetStashPending(false);
LOG_IF(ERROR, ec != errc::file_too_large) << "Stash failed immediately" << ec.message();
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
return {};
Expand Down Expand Up @@ -489,6 +562,7 @@ TieredStats TieredStorage::GetStats() const {
stats.small_bins_cnt = bins_stats.stashed_bins_cnt;
stats.small_bins_entries_cnt = bins_stats.stashed_entries_cnt;
stats.small_bins_filling_bytes = bins_stats.current_bin_bytes;
stats.small_bins_filling_entries_cnt = bins_stats.current_entries_cnt;
}

{ // Own stats
Expand All @@ -513,13 +587,14 @@ void TieredStorage::UpdateFromFlags() {
.write_depth_limit = absl::GetFlag(FLAGS_tiered_storage_write_depth),
.offload_threshold = absl::GetFlag(FLAGS_tiered_offload_threshold),
.upload_threshold = absl::GetFlag(FLAGS_tiered_upload_threshold),
.experimental_hash_offload = absl::GetFlag(FLAGS_tiered_experimental_hash_support),
};
}

std::vector<std::string> TieredStorage::GetMutableFlagNames() {
return base::GetFlagNames(FLAGS_tiered_min_value_size, FLAGS_tiered_experimental_cooling,
FLAGS_tiered_storage_write_depth, FLAGS_tiered_offload_threshold,
FLAGS_tiered_upload_threshold);
FLAGS_tiered_upload_threshold, FLAGS_tiered_experimental_hash_support);
}

bool TieredStorage::ShouldOffload() const {
Expand Down Expand Up @@ -596,10 +671,10 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
->prime.FindFirst(record->key_hash, predicate);
CHECK(IsValid(it));
PrimeValue& pv = it->second;
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());

// Now the item is only in storage.
pv.SetExternal(segment.offset, segment.length, CompactObj::ExternalRep::STRING);
tiering::DiskSegment segment = FromCoolItem(pv.GetCool());
pv.KeepExternal(segment.offset, segment.length);

auto* stats = op_manager_->GetDbTableStats(record->db_index);
stats->AddTypeMemoryUsage(record->value.ObjType(), -record->value.MallocUsed());
Expand All @@ -610,14 +685,28 @@ size_t TieredStorage::ReclaimMemory(size_t goal) {
}

bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
// Check value state
if (pv.IsExternal() || pv.HasStashPending())
return false;

// Estimate value size
auto estimation = EstimateSerializedSize(pv);
if (!estimation)
return false;

// For now, hash offloading is conditional
if (pv.ObjType() == OBJ_HASH && !config_.experimental_hash_offload)
return false;

const auto& disk_stats = op_manager_->GetStats().disk_stats;
return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING &&
pv.Size() >= config_.min_value_size &&
disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size;
return estimation->first >= config_.min_value_size &&
disk_stats.allocated_bytes + tiering::kPageSize + estimation->first <
disk_stats.max_file_size;
}

void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,
const tiering::DiskSegment& segment, PrimeValue* pv) {
const tiering::DiskSegment& segment, CompactObj::ExternalRep rep,
PrimeValue* pv) {
detail::TieredColdRecord* record = CompactObj::AllocateMR<detail::TieredColdRecord>();
cool_queue_.push_front(*record);
stats_.cool_memory_used += (sizeof(detail::TieredColdRecord) + pv->MallocUsed());
Expand All @@ -627,7 +716,7 @@ void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,
record->page_index = segment.offset / tiering::kPageSize;
record->value = std::move(*pv);

pv->SetCool(segment.offset, segment.length, record);
pv->SetCool(segment.offset, segment.length, rep, record);
}

PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) {
Expand All @@ -636,10 +725,6 @@ PrimeValue TieredStorage::Warmup(DbIndex dbid, PrimeValue::CoolItem item) {
// We remove it from both cool storage and the offline storage.
PrimeValue hot = DeleteCool(item.record);
op_manager_->DeleteOffloaded(dbid, segment);

// Bring it back to the PrimeTable.
DCHECK(hot.ObjType() == OBJ_STRING);

return hot;
}

Expand Down
3 changes: 2 additions & 1 deletion src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class TieredStorage {

// Moves pv contents to the cool storage and updates pv to point to it.
void CoolDown(DbIndex db_ind, std::string_view str, const tiering::DiskSegment& segment,
PrimeValue* pv);
CompactObj::ExternalRep rep, PrimeValue* pv);

PrimeValue DeleteCool(detail::TieredColdRecord* record);
detail::TieredColdRecord* PopCool();
Expand All @@ -138,6 +138,7 @@ class TieredStorage {
unsigned write_depth_limit;
float offload_threshold;
float upload_threshold;
bool experimental_hash_offload;
} config_;
struct {
uint64_t stash_overflow_cnt = 0;
Expand Down
Loading
Loading