diff --git a/core/benchmarks/counter_bench.cc b/core/benchmarks/counter_bench.cc index fc419d86..f14ea6bb 100644 --- a/core/benchmarks/counter_bench.cc +++ b/core/benchmarks/counter_bench.cc @@ -1,23 +1,54 @@ #include #include +static void BM_Counter_IncrementBaseline(benchmark::State& state) { + struct { + void Increment() { v += 1.0; } + double v; + } counter; + + for (auto _ : state) { + counter.Increment(); + } + benchmark::DoNotOptimize(counter.v); +} +BENCHMARK(BM_Counter_IncrementBaseline); + static void BM_Counter_Increment(benchmark::State& state) { - using prometheus::Registry; - using prometheus::Counter; using prometheus::BuildCounter; + using prometheus::Counter; + using prometheus::Registry; Registry registry; auto& counter_family = BuildCounter().Name("benchmark_counter").Help("").Register(registry); auto& counter = counter_family.Add({}); - while (state.KeepRunning()) counter.Increment(); + for (auto _ : state) { + counter.Increment(); + } + benchmark::DoNotOptimize(counter.Value()); } BENCHMARK(BM_Counter_Increment); +class BM_Counter : public benchmark::Fixture { + protected: + BM_Counter() { this->ThreadPerCpu(); } + + prometheus::Counter counter{}; +}; + +BENCHMARK_F(BM_Counter, ConcurrentIncrement) +(benchmark::State& state) { + for (auto _ : state) { + counter.Increment(); + } + benchmark::DoNotOptimize(counter.Value()); +} + static void BM_Counter_Collect(benchmark::State& state) { - using prometheus::Registry; - using prometheus::Counter; using prometheus::BuildCounter; + using prometheus::Counter; + using prometheus::Registry; Registry registry; auto& counter_family = BuildCounter().Name("benchmark_counter").Help("").Register(registry); diff --git a/core/benchmarks/gauge_bench.cc b/core/benchmarks/gauge_bench.cc index 8fb40948..711ea11e 100644 --- a/core/benchmarks/gauge_bench.cc +++ b/core/benchmarks/gauge_bench.cc @@ -2,9 +2,9 @@ #include static void BM_Gauge_Increment(benchmark::State& state) { - using prometheus::Registry; - using prometheus::Gauge; using prometheus::BuildGauge; + using prometheus::Gauge; + using prometheus::Registry; Registry registry; auto& gauge_family = BuildGauge().Name("benchmark_gauge").Help("").Register(registry); @@ -14,10 +14,24 @@ static void BM_Gauge_Increment(benchmark::State& state) { } BENCHMARK(BM_Gauge_Increment); +class BM_Gauge : public benchmark::Fixture { + protected: + BM_Gauge() { this->ThreadPerCpu(); } + + prometheus::Gauge gauge{}; +}; + +BENCHMARK_F(BM_Gauge, ConcurrentIncrement) +(benchmark::State& state) { + for (auto _ : state) { + gauge.Increment(); + } +} + static void BM_Gauge_Decrement(benchmark::State& state) { - using prometheus::Registry; - using prometheus::Gauge; using prometheus::BuildGauge; + using prometheus::Gauge; + using prometheus::Registry; Registry registry; auto& gauge_family = BuildGauge().Name("benchmark_gauge").Help("").Register(registry); @@ -28,9 +42,9 @@ static void BM_Gauge_Decrement(benchmark::State& state) { BENCHMARK(BM_Gauge_Decrement); static void BM_Gauge_SetToCurrentTime(benchmark::State& state) { - using prometheus::Registry; - using prometheus::Gauge; using prometheus::BuildGauge; + using prometheus::Gauge; + using prometheus::Registry; Registry registry; auto& gauge_family = BuildGauge().Name("benchmark_gauge").Help("").Register(registry); @@ -41,9 +55,9 @@ static void BM_Gauge_SetToCurrentTime(benchmark::State& state) { BENCHMARK(BM_Gauge_SetToCurrentTime); static void BM_Gauge_Collect(benchmark::State& state) { - using prometheus::Registry; - using prometheus::Gauge; using prometheus::BuildGauge; + using prometheus::Gauge; + using prometheus::Registry; Registry registry; auto& gauge_family = BuildGauge().Name("benchmark_gauge").Help("").Register(registry); diff --git a/core/include/prometheus/counter.h b/core/include/prometheus/counter.h index 170602f5..c1523d69 100644 --- a/core/include/prometheus/counter.h +++ b/core/include/prometheus/counter.h @@ -1,7 +1,10 @@ #pragma once +#include +#include +#include + #include "prometheus/client_metric.h" -#include "prometheus/gauge.h" #include "prometheus/metric_type.h" namespace prometheus { @@ -17,7 +20,17 @@ namespace prometheus { /// - errors /// /// Do not use a counter to expose a value that can decrease - instead use a -/// Gauge. +/// Gauge. If an montonically increasing counter is applicable a counter shall +/// be prefered to a Gauge because of a better update performance. +/// +/// The implementation exhibits a performance which is near a sequential +/// implementation and scales linearly with increasing number of updater threads +/// in a multi-threaded environment invoking Increment(). However, this +/// excellent update-side scalability comes at read-side expense invoking +/// Collect(). Increment() can therefor be used in the fast-path of the code, +/// where the count is updated extremely frequently. The Collect() function on +/// the other hand shall read the counter at a low sample rate, e.g., in the +/// order of milliseconds. /// /// The class is thread-safe. No concurrent call to any API of this type causes /// a data race. @@ -29,12 +42,17 @@ class Counter { Counter() = default; /// \brief Increment the counter by 1. - void Increment(); + void Increment() { IncrementUnchecked(1.0); } /// \brief Increment the counter by a given amount. /// /// The counter will not change if the given amount is negative. - void Increment(double); + void Increment(const double value) { + if (value < 0.0) { + return; + } + IncrementUnchecked(value); + } /// \brief Get the current value of the counter. double Value() const; @@ -45,7 +63,37 @@ class Counter { ClientMetric Collect() const; private: - Gauge gauge_{0.0}; + int ThreadId() { + thread_local int id{-1}; + + if (id == -1) { + id = AssignThreadId(); + } + return id; + } + + int AssignThreadId() { + const int id{count_.fetch_add(1)}; + + if (id >= per_thread_counter_.size()) { + std::terminate(); + } + + return id; + } + + void IncrementUnchecked(const double v) { + CacheLine& c = per_thread_counter_[ThreadId()]; + const double new_value{c.v.load(std::memory_order_relaxed) + v}; + c.v.store(new_value, std::memory_order_relaxed); + } + + struct alignas(128) CacheLine { + std::atomic v{0.0}; + }; + + std::atomic count_{0}; + std::array per_thread_counter_{}; }; } // namespace prometheus diff --git a/core/include/prometheus/gauge.h b/core/include/prometheus/gauge.h index 341828e4..72866299 100644 --- a/core/include/prometheus/gauge.h +++ b/core/include/prometheus/gauge.h @@ -17,6 +17,9 @@ namespace prometheus { /// memory usage, but also "counts" that can go up and down, like the number of /// running processes. /// +/// If an montonically increasing counter is applicable a Counter shall be +/// prefered to a Gauge because of a better update performance. +/// /// The class is thread-safe. No concurrent call to any API of this type causes /// a data race. class Gauge { diff --git a/core/src/counter.cc b/core/src/counter.cc index fc5b6f30..f3839219 100644 --- a/core/src/counter.cc +++ b/core/src/counter.cc @@ -1,12 +1,14 @@ #include "prometheus/counter.h" -namespace prometheus { - -void Counter::Increment() { gauge_.Increment(); } +#include -void Counter::Increment(const double val) { gauge_.Increment(val); } +namespace prometheus { -double Counter::Value() const { return gauge_.Value(); } +double Counter::Value() const { + return std::accumulate( + std::begin(per_thread_counter_), std::end(per_thread_counter_), 0.0, + [](const double a, const CacheLine& b) { return a + b.v; }); +} ClientMetric Counter::Collect() const { ClientMetric metric; diff --git a/core/tests/counter_test.cc b/core/tests/counter_test.cc index 1c5566cb..c633d8dd 100644 --- a/core/tests/counter_test.cc +++ b/core/tests/counter_test.cc @@ -2,6 +2,11 @@ #include +#include +#include + +#include + namespace prometheus { namespace { @@ -37,5 +42,46 @@ TEST(CounterTest, inc_negative_value) { EXPECT_EQ(counter.Value(), 5.0); } +TEST(CounterTest, concurrent_writes) { + Counter counter; + std::vector threads(std::thread::hardware_concurrency()); + + for (auto& thread : threads) { + thread = std::thread{[&counter]() { + for (int i{0}; i < 100000; ++i) { + counter.Increment(); + } + }}; + } + + for (auto& thread : threads) { + thread.join(); + } + + EXPECT_EQ(100000 * threads.size(), counter.Value()); +} + +TEST(CounterTest, concurrent_read_write) { + Counter counter; + std::vector values; + values.reserve(100000); + + std::thread reader{[&counter, &values]() { + for (int i{0}; i < 100000; ++i) { + values.push_back(counter.Value()); + } + }}; + std::thread writer{[&counter]() { + for (int i{0}; i < 100000; ++i) { + counter.Increment(); + } + }}; + + reader.join(); + writer.join(); + + EXPECT_TRUE(std::is_sorted(std::begin(values), std::end(values))); +} + } // namespace } // namespace prometheus