diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index b08faee98e5c..9a9b23ecd6f5 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -27,7 +27,7 @@ add_library(dfly_core allocation_tracker.cc bloom.cc compact_object.cc dense_set interpreter.cc glob_matcher.cc mi_memory_resource.cc qlist.cc sds_utils.cc segment_allocator.cc score_map.cc small_string.cc sorted_map.cc task_queue.cc tx_queue.cc string_set.cc string_map.cc top_keys.cc detail/bitpacking.cc - page_usage_stats.cc) + page_usage_stats.cc count_min_sketch.cc) cxx_link(dfly_core base dfly_search_core fibers2 jsonpath absl::flat_hash_map absl::str_format absl::random_random redis_lib @@ -56,6 +56,7 @@ cxx_test(qlist_test dfly_core DATA testdata/list.txt.zst LABELS DFLY) cxx_test(zstd_test dfly_core TRDP::zstd LABELS DFLY) cxx_test(top_keys_test dfly_core LABELS DFLY) cxx_test(page_usage_stats_test dfly_core LABELS DFLY) +cxx_test(count_min_sketch_test dfly_core LABELS DFLY) if(LIB_PCRE2) target_compile_definitions(dfly_core_test PRIVATE USE_PCRE2=1) diff --git a/src/core/count_min_sketch.cc b/src/core/count_min_sketch.cc new file mode 100644 index 000000000000..f7f3fce1eae7 --- /dev/null +++ b/src/core/count_min_sketch.cc @@ -0,0 +1,123 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/count_min_sketch.h" + +#include +#include + +#include +#include +#include + +namespace { + +constexpr auto MAX = std::numeric_limits::max(); + +uint64_t GetCurrentMS() { + return absl::GetCurrentTimeNanos() / 1000 / 1000; +} + +uint64_t ExponentialDecay(uint64_t value, int64_t time_delta) { + // Value halves every 5000 ms: ln(2) / 5000 + static constexpr double EXP_DECAY_CONST = 0.000138629; + + return value * std::exp(-time_delta * EXP_DECAY_CONST); +} + +uint64_t LinearDecay(uint64_t value, int64_t time_delta) { + // Value decrements by one every 1000 ms + static constexpr double LIN_DECAY_CONST = 0.001; + + const double decay = time_delta * LIN_DECAY_CONST; + return value - std::min(static_cast(value), decay); +} + +using DecayFn = std::function; + +std::array decay_fns = {ExponentialDecay, LinearDecay, [](auto v, auto) { return v; }}; + +} // namespace + +namespace dfly { + +CountMinSketch::CountMinSketch(double epsilon, double delta) { + width_ = std::exp(1) / epsilon; + depth_ = std::log(1.0 / delta); + counters_.reserve(depth_); + for (uint64_t i = 0; i < depth_; ++i) { + counters_.emplace_back(width_, 0); + } +} + +void CountMinSketch::Update(uint64_t key, CountMinSketch::SizeT incr) { + uint64_t i = 0; + std::for_each(counters_.begin(), counters_.end(), [&](auto& counter) { + // It is possible to compute just two initial hashes and then use them to derive next i-2 + // hashes, but it results in a lot more collisions and thus much larger overestimates. + const uint64_t index = Hash(key, i++); + const SizeT curr = counter[index]; + const SizeT updated = curr + incr; + counter[index] = updated < curr ? MAX : updated; + }); +} + +CountMinSketch::SizeT CountMinSketch::EstimateFrequency(uint64_t key) const { + SizeT estimate = MAX; + for (uint64_t i = 0; i < counters_.size(); ++i) { + estimate = std::min(estimate, counters_[i][Hash(key, i)]); + } + return estimate; +} + +void CountMinSketch::Reset() { + for (auto& ctr : counters_) { + std::fill(ctr.begin(), ctr.end(), 0); + } +} + +uint64_t CountMinSketch::Hash(uint64_t key, uint64_t i) const { + return XXH3_64bits_withSeed(&key, sizeof(key), i) % width_; +} + +MultiSketch::MultiSketch(uint64_t rollover_ms, double epsilon, double delta, Decay decay) + : rollover_ms_(rollover_ms), current_sketch_(sketches_.size() - 1), decay_t_(decay) { + const uint64_t now = GetCurrentMS(); + for (uint64_t i = 0; i < sketches_.size(); ++i) { + sketches_[i] = SketchWithTimestamp{CountMinSketch{epsilon, delta}, now, now}; + } +} + +void MultiSketch::Update(uint64_t key, CountMinSketch::SizeT incr) { + if (++rollover_check_ >= rollover_check_every_) { + MaybeRolloverCurrentSketch(); + rollover_check_ = 0; + } + sketches_[current_sketch_].sketch_.Update(key, incr); +} + +CountMinSketch::SizeT MultiSketch::EstimateFrequency(uint64_t key) const { + CountMinSketch::SizeT estimate = 0; + const uint64_t now = GetCurrentMS(); + + for (const auto& sketch : sketches_) { + const auto e = sketch.sketch_.EstimateFrequency(key); + // TODO use average time of sketch to compute delta + estimate += decay_fns[static_cast(decay_t_)](e, now - sketch.start_time_); + } + return estimate; +} + +void MultiSketch::MaybeRolloverCurrentSketch() { + const uint64_t now = GetCurrentMS(); + const uint64_t oldest = (current_sketch_ + 1) % sketches_.size(); + if (const uint64_t oldest_ts = sketches_[oldest].start_time_; now - oldest_ts > rollover_ms_) { + sketches_[oldest].sketch_.Reset(); + sketches_[oldest].start_time_ = now; + sketches_[current_sketch_].end_time_ = now; + current_sketch_ = oldest; + } +} + +} // namespace dfly diff --git a/src/core/count_min_sketch.h b/src/core/count_min_sketch.h new file mode 100644 index 000000000000..4a0ee67851a4 --- /dev/null +++ b/src/core/count_min_sketch.h @@ -0,0 +1,109 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include +#include +#include + +namespace dfly { + +/// The CountMinSketch and the MultiSketch together are intended to record values which naturally +/// reduce over time, but for which explicitly recording decrements is not possible or easy. +/// An example is short-lived, large memory allocations. The allocation site can record the size of +/// the block allocated, and the count reduces over time, eventually going down to zero. + +// Keeps count of items added with a small probability of overestimating counts due to hash +// collisions. Counts are stored in a table where each row stores counts for each item, and the +// minimum count across all rows is returned for an item when requested. +class CountMinSketch { + public: + using SizeT = uint16_t; + + // epsilon is the maximum deviation from actual frequency allowed per element times the sum of all + // frequencies: + // f_actual <= f_estimated <= f_actual + epsilon * N + // where N is the sum of all frequencies + // delta is the probability that f_estimated overshoots the epsilon threshold for a single + // estimate, aka failure probability which means all bets are off as to what estimate is returned. + + // With default values, the dimension of the counter table is 27182 x 9, and the size is + // around 490 KiBs. + explicit CountMinSketch(double epsilon = 0.0001, double delta = 0.0001); + + // Increases the count associated with a key, a potentially slow operation as several hashes are + // calculated + void Update(uint64_t key, SizeT incr = 1); + + // Estimated count for the key with a small probability for overshooting the estimate. + SizeT EstimateFrequency(uint64_t key) const; + + // Loses all existing counts by resetting them to zero. + void Reset(); + + CountMinSketch(const CountMinSketch& other) = delete; + CountMinSketch& operator=(const CountMinSketch& other) = delete; + + CountMinSketch(CountMinSketch&& other) noexcept = default; + CountMinSketch& operator=(CountMinSketch&& other) noexcept = default; + + private: + uint64_t Hash(uint64_t key, uint64_t i) const; + + std::vector> counters_; + uint64_t width_; + uint64_t depth_; +}; + +// Maintains a list of three sketches with timestamps. Updates are made to the current sketch. +// Once the oldest sketch is older than a fixed limit, it is discarded and becomes the current +// sketch. Estimates are the sum across all sketches. The counts returned by the sketches "decay" to +// lower values as the sketches become older. +class MultiSketch { + struct SketchWithTimestamp { + CountMinSketch sketch_; + uint64_t start_time_{0}; + uint64_t end_time_{0}; + }; + + public: + // The decay model decides how fast values in sketches reduce as time passes. + // Exponential: larger values reduce faster + // Linear: all values decrease at a fixed rate + // SlidingWindow: values do not decrease until the sketch containing them resets + enum class Decay : uint8_t { + Exponential, + Linear, + SlidingWindow, + }; + + explicit MultiSketch(uint64_t rollover_ms = 1000, double epsilon = 0.0001, double delta = 0.0001, + Decay decay = Decay::Linear); + + // Updates the current sketch, which is associated with the latest timestamp. Can cause the oldest + // sketch to be reset as a side effect if the oldest sketch is older than rollover_ms. + void Update(uint64_t key, CountMinSketch::SizeT incr = 1); + + // Returns estimate by summing estimates from all internal sketches. + CountMinSketch::SizeT EstimateFrequency(uint64_t key) const; + + // For unit tests, allow setting a smaller limit + void SetRolloverCheckLimit(uint64_t rollover_check_limit) { + rollover_check_every_ = rollover_check_limit; + } + + private: + void MaybeRolloverCurrentSketch(); + + std::array sketches_; + uint64_t rollover_ms_; + uint64_t current_sketch_; + + // Do a rollover check every N calls to avoid expensive GetTime calls + uint64_t rollover_check_every_{512}; + uint64_t rollover_check_{0}; + Decay decay_t_; +}; + +} // namespace dfly diff --git a/src/core/count_min_sketch_test.cc b/src/core/count_min_sketch_test.cc new file mode 100644 index 000000000000..2aea5058c05a --- /dev/null +++ b/src/core/count_min_sketch_test.cc @@ -0,0 +1,175 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/count_min_sketch.h" + +#include +#include + +namespace { + +std::vector CollectFrequencies(dfly::MultiSketch::Decay decay) { + constexpr auto rollover_ms = 100; + constexpr auto eps = 0.0001; + constexpr auto delta = 0.0001; + auto ms = dfly::MultiSketch{rollover_ms, eps, delta, decay}; + ms.SetRolloverCheckLimit(1); + + constexpr auto elem = 999; + + for (auto i = 0; i < 52345; ++i) { + ms.Update(elem); + } + + std::vector freqs; + + freqs.push_back(ms.EstimateFrequency(elem)); + for (auto i = 0; i < 3; ++i) { + usleep(rollover_ms * 1000); + ms.Update(1); + freqs.push_back(ms.EstimateFrequency(elem)); + } + return freqs; +} + +constexpr auto EPS = 0.0001; +constexpr auto DELTA = 0.0001; + +} // namespace + +TEST(CountMinSketch, Estimate) { + auto cms = dfly::CountMinSketch{EPS, DELTA}; + + constexpr auto actual_freq = 52345; + const auto elems = {999, 123456, 2785, 0, 96, 221}; + for (auto i = 0; i < actual_freq; ++i) { + for (const auto elem : elems) { + cms.Update(elem); + } + } + + constexpr auto err_margin_per_elem = actual_freq * EPS; + auto errors = 0; + for (const auto elem : elems) { + if (cms.EstimateFrequency(elem) - actual_freq > err_margin_per_elem) { + errors++; + } + } + + // Total errors should be less than the probability of overshoot per element * elements + ASSERT_LE(errors, elems.size() * DELTA); +} + +TEST(CountMinSketch, EstimateDoesNotOverflow) { + auto cms = dfly::CountMinSketch{}; + for (uint32_t i = 0; i < 65535 + 100; ++i) { + cms.Update(42); + } + ASSERT_EQ(cms.EstimateFrequency(42), std::numeric_limits::max()); +} + +TEST(MultiSketch, Estimate) { + auto ms = dfly::MultiSketch{1000, EPS, DELTA}; + + constexpr auto actual_freq = 52345; + const auto elems = {999, 123456, 2785, 0, 96, 221}; + for (auto i = 0; i < actual_freq; ++i) { + for (const auto elem : elems) { + ms.Update(elem); + } + } + + constexpr auto err_margin_per_elem = actual_freq * EPS; + auto errors = 0; + for (const auto elem : elems) { + if (ms.EstimateFrequency(elem) - actual_freq > err_margin_per_elem) { + ++errors; + } + } + ASSERT_LE(errors, DELTA * elems.size()); +} + +TEST(MultiSketch, RollOverDiscardsOldEstimates) { + constexpr auto rollover_ms = 100; + auto ms = dfly::MultiSketch{rollover_ms, EPS, DELTA, dfly::MultiSketch::Decay::SlidingWindow}; + ms.SetRolloverCheckLimit(1); + + constexpr auto actual_freq = 52345; + const auto elems = {999, 123456, 2785, 0, 96, 221}; + for (auto i = 0; i < actual_freq; ++i) { + for (const auto elem : elems) { + ms.Update(elem); + } + } + + constexpr auto rollover_count = 3; + constexpr auto sleep_usec = rollover_ms * 1000; + + // Force rollover + for (auto i = 0; i < rollover_count; ++i) { + usleep(sleep_usec); + ms.Update(1); + } + + for (const auto elem : elems) { + ASSERT_EQ(ms.EstimateFrequency(elem), 0) << "item still has non zero count" << elem; + } + + ASSERT_EQ(ms.EstimateFrequency(1), rollover_count); +} + +TEST(MultiSketch, Decay) { + using dfly::MultiSketch; + for (const auto decay : {MultiSketch::Decay::Exponential, MultiSketch::Decay::Linear}) { + auto freqs = CollectFrequencies(decay); + // counts keep decreasing + ASSERT_TRUE(std::is_sorted(freqs.begin(), freqs.end(), std::greater<>{})); + } + + // For sliding window, estimates either remain the same or fall off and become 0 + const auto freqs = CollectFrequencies(MultiSketch::Decay::SlidingWindow); + const auto f = freqs[0]; + ASSERT_THAT(freqs, testing::Each(testing::AnyOf(0, f))); +} + +TEST(CountMinSketch, EstimateOverLargeRange) { + // track each size from 1 KiB to 1 GiB in steps of 150 KiB + constexpr uint64_t start = 1024; + constexpr uint64_t end = 1024 * 1024 * 1024; + constexpr uint64_t step_size = 1024 * 150; + + auto cms = dfly::CountMinSketch{EPS, DELTA}; + + uint64_t num_items_in_sketch = 0; + for (uint64_t i = start; i <= end; i += step_size, ++num_items_in_sketch) { + cms.Update(i, num_items_in_sketch); + } + + const auto max_err = num_items_in_sketch * EPS; + auto errors = 0; + for (uint64_t i = start, value = 0; i <= end; i += step_size, value += 1) { + if (cms.EstimateFrequency(i) - value > max_err) { + ++errors; + } + } + ASSERT_LE(errors, num_items_in_sketch * DELTA); +} + +void BM_CMSEstimate(benchmark::State& state) { + auto cms = dfly::CountMinSketch{}; + const uint64_t start = 1; + const uint64_t end = state.range(0); + const uint64_t step_size = 10; + for (auto _ : state) { + for (auto i = start; i < end; i += step_size) { + cms.Update(i); + } + + for (auto i = start; i < end; i += step_size) { + benchmark::DoNotOptimize(cms.EstimateFrequency(i)); + } + } +} + +BENCHMARK(BM_CMSEstimate)->Arg(UINT32_MAX / 100);