Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ add_library(dfly_core allocation_tracker.cc bloom.cc compact_object.cc dense_set
interpreter.cc glob_matcher.cc mi_memory_resource.cc qlist.cc sds_utils.cc
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc task_queue.cc
tx_queue.cc string_set.cc string_map.cc top_keys.cc detail/bitpacking.cc
page_usage_stats.cc)
page_usage_stats.cc count_min_sketch.cc)

cxx_link(dfly_core base dfly_search_core fibers2 jsonpath
absl::flat_hash_map absl::str_format absl::random_random redis_lib
Expand Down Expand Up @@ -56,6 +56,7 @@ cxx_test(qlist_test dfly_core DATA testdata/list.txt.zst LABELS DFLY)
cxx_test(zstd_test dfly_core TRDP::zstd LABELS DFLY)
cxx_test(top_keys_test dfly_core LABELS DFLY)
cxx_test(page_usage_stats_test dfly_core LABELS DFLY)
cxx_test(count_min_sketch_test dfly_core LABELS DFLY)

if(LIB_PCRE2)
target_compile_definitions(dfly_core_test PRIVATE USE_PCRE2=1)
Expand Down
123 changes: 123 additions & 0 deletions src/core/count_min_sketch.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "core/count_min_sketch.h"

#include <absl/time/clock.h>
#include <xxhash.h>

#include <cmath>
#include <functional>
#include <iostream>

namespace {

constexpr auto MAX = std::numeric_limits<dfly::CountMinSketch::SizeT>::max();

uint64_t GetCurrentMS() {
return absl::GetCurrentTimeNanos() / 1000 / 1000;
}

uint64_t ExponentialDecay(uint64_t value, int64_t time_delta) {
// Value halves every 5000 ms: ln(2) / 5000
static constexpr double EXP_DECAY_CONST = 0.000138629;
Copy link
Contributor

@dranikpg dranikpg Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought std::log must be constexpr for sure! But it's only such from c++ 26 😆 💀


return value * std::exp(-time_delta * EXP_DECAY_CONST);
}

uint64_t LinearDecay(uint64_t value, int64_t time_delta) {
// Value decrements by one every 1000 ms
static constexpr double LIN_DECAY_CONST = 0.001;

const double decay = time_delta * LIN_DECAY_CONST;
return value - std::min(static_cast<double>(value), decay);
}

using DecayFn = std::function<uint64_t(uint64_t, int64_t)>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function pointer is enough, though std function of one shouldn't incurr any overhead


std::array<DecayFn, 3> decay_fns = {ExponentialDecay, LinearDecay, [](auto v, auto) { return v; }};

} // namespace

namespace dfly {

CountMinSketch::CountMinSketch(double epsilon, double delta) {
width_ = std::exp(1) / epsilon;
depth_ = std::log(1.0 / delta);
counters_.reserve(depth_);
for (uint64_t i = 0; i < depth_; ++i) {
counters_.emplace_back(width_, 0);
}
}

void CountMinSketch::Update(uint64_t key, CountMinSketch::SizeT incr) {
uint64_t i = 0;
std::for_each(counters_.begin(), counters_.end(), [&](auto& counter) {
// It is possible to compute just two initial hashes and then use them to derive next i-2
// hashes, but it results in a lot more collisions and thus much larger overestimates.
const uint64_t index = Hash(key, i++);
const SizeT curr = counter[index];
const SizeT updated = curr + incr;
counter[index] = updated < curr ? MAX : updated;
});
}

CountMinSketch::SizeT CountMinSketch::EstimateFrequency(uint64_t key) const {
SizeT estimate = MAX;
for (uint64_t i = 0; i < counters_.size(); ++i) {
estimate = std::min(estimate, counters_[i][Hash(key, i)]);
}
return estimate;
}

void CountMinSketch::Reset() {
for (auto& ctr : counters_) {
std::fill(ctr.begin(), ctr.end(), 0);
}
}

uint64_t CountMinSketch::Hash(uint64_t key, uint64_t i) const {
return XXH3_64bits_withSeed(&key, sizeof(key), i) % width_;
}

MultiSketch::MultiSketch(uint64_t rollover_ms, double epsilon, double delta, Decay decay)
: rollover_ms_(rollover_ms), current_sketch_(sketches_.size() - 1), decay_t_(decay) {
const uint64_t now = GetCurrentMS();
for (uint64_t i = 0; i < sketches_.size(); ++i) {
sketches_[i] = SketchWithTimestamp{CountMinSketch{epsilon, delta}, now, now};
}
}

void MultiSketch::Update(uint64_t key, CountMinSketch::SizeT incr) {
if (++rollover_check_ >= rollover_check_every_) {
MaybeRolloverCurrentSketch();
rollover_check_ = 0;
}
sketches_[current_sketch_].sketch_.Update(key, incr);
}

CountMinSketch::SizeT MultiSketch::EstimateFrequency(uint64_t key) const {
CountMinSketch::SizeT estimate = 0;
const uint64_t now = GetCurrentMS();

for (const auto& sketch : sketches_) {
const auto e = sketch.sketch_.EstimateFrequency(key);
// TODO use average time of sketch to compute delta
estimate += decay_fns[static_cast<uint8_t>(decay_t_)](e, now - sketch.start_time_);
}
return estimate;
}

void MultiSketch::MaybeRolloverCurrentSketch() {
const uint64_t now = GetCurrentMS();
const uint64_t oldest = (current_sketch_ + 1) % sketches_.size();
if (const uint64_t oldest_ts = sketches_[oldest].start_time_; now - oldest_ts > rollover_ms_) {
sketches_[oldest].sketch_.Reset();
sketches_[oldest].start_time_ = now;
sketches_[current_sketch_].end_time_ = now;
current_sketch_ = oldest;
}
}

} // namespace dfly
109 changes: 109 additions & 0 deletions src/core/count_min_sketch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once

#include <array>
#include <cstdint>
#include <vector>

namespace dfly {

/// The CountMinSketch and the MultiSketch together are intended to record values which naturally
/// reduce over time, but for which explicitly recording decrements is not possible or easy.
/// An example is short-lived, large memory allocations. The allocation site can record the size of
/// the block allocated, and the count reduces over time, eventually going down to zero.

// Keeps count of items added with a small probability of overestimating counts due to hash
// collisions. Counts are stored in a table where each row stores counts for each item, and the
// minimum count across all rows is returned for an item when requested.
class CountMinSketch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some comments from your PR description to the header, i.e. what it's for, what the functions, do etc. So anyone who finds it in the future can tell by them what its there for 🙂

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments, will add some references for the width and depth calculations

public:
using SizeT = uint16_t;

// epsilon is the maximum deviation from actual frequency allowed per element times the sum of all
// frequencies:
// f_actual <= f_estimated <= f_actual + epsilon * N
// where N is the sum of all frequencies
// delta is the probability that f_estimated overshoots the epsilon threshold for a single
// estimate, aka failure probability which means all bets are off as to what estimate is returned.

// With default values, the dimension of the counter table is 27182 x 9, and the size is
// around 490 KiBs.
explicit CountMinSketch(double epsilon = 0.0001, double delta = 0.0001);

// Increases the count associated with a key, a potentially slow operation as several hashes are
// calculated
void Update(uint64_t key, SizeT incr = 1);

// Estimated count for the key with a small probability for overshooting the estimate.
SizeT EstimateFrequency(uint64_t key) const;

// Loses all existing counts by resetting them to zero.
void Reset();

CountMinSketch(const CountMinSketch& other) = delete;
CountMinSketch& operator=(const CountMinSketch& other) = delete;

CountMinSketch(CountMinSketch&& other) noexcept = default;
CountMinSketch& operator=(CountMinSketch&& other) noexcept = default;

private:
uint64_t Hash(uint64_t key, uint64_t i) const;

std::vector<std::vector<SizeT>> counters_;
uint64_t width_;
uint64_t depth_;
};

// Maintains a list of three sketches with timestamps. Updates are made to the current sketch.
// Once the oldest sketch is older than a fixed limit, it is discarded and becomes the current
// sketch. Estimates are the sum across all sketches. The counts returned by the sketches "decay" to
// lower values as the sketches become older.
class MultiSketch {
struct SketchWithTimestamp {
CountMinSketch sketch_;
uint64_t start_time_{0};
uint64_t end_time_{0};
};

public:
// The decay model decides how fast values in sketches reduce as time passes.
// Exponential: larger values reduce faster
// Linear: all values decrease at a fixed rate
// SlidingWindow: values do not decrease until the sketch containing them resets
enum class Decay : uint8_t {
Exponential,
Linear,
SlidingWindow,
};

explicit MultiSketch(uint64_t rollover_ms = 1000, double epsilon = 0.0001, double delta = 0.0001,
Decay decay = Decay::Linear);

// Updates the current sketch, which is associated with the latest timestamp. Can cause the oldest
// sketch to be reset as a side effect if the oldest sketch is older than rollover_ms.
void Update(uint64_t key, CountMinSketch::SizeT incr = 1);

// Returns estimate by summing estimates from all internal sketches.
CountMinSketch::SizeT EstimateFrequency(uint64_t key) const;

// For unit tests, allow setting a smaller limit
void SetRolloverCheckLimit(uint64_t rollover_check_limit) {
rollover_check_every_ = rollover_check_limit;
}

private:
void MaybeRolloverCurrentSketch();

std::array<SketchWithTimestamp, 3> sketches_;
uint64_t rollover_ms_;
uint64_t current_sketch_;

// Do a rollover check every N calls to avoid expensive GetTime calls
uint64_t rollover_check_every_{512};
uint64_t rollover_check_{0};
Decay decay_t_;
};

} // namespace dfly
Loading
Loading