From 98c9098b7d880469cb45e518ee538dacb46cc03f Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Mon, 16 Sep 2024 11:46:33 +0200 Subject: [PATCH] Aggregation: respect sampling for timings (#378) * Enabling samping for timings when aggregation is enabled Signed-off-by: Pedro Tanaka * bump ruby to 3.3.3 Signed-off-by: Pedro Tanaka * adding test for new behavior Also allowing customizing the max context size for aggregation Signed-off-by: Pedro Tanaka * Introduce constant for max context size Signed-off-by: Pedro Tanaka * reword comments Signed-off-by: Pedro Tanaka * fixing constant placement Signed-off-by: Pedro Tanaka * reword test comments Signed-off-by: Pedro Tanaka --------- Signed-off-by: Pedro Tanaka --- .ruby-version | 2 +- CHANGELOG.md | 4 +++ lib/statsd/instrument/aggregator.rb | 12 ++++++- lib/statsd/instrument/client.rb | 54 +++++++++++++++++++++------- lib/statsd/instrument/environment.rb | 7 ++++ test/client_test.rb | 12 +++++++ 6 files changed, 76 insertions(+), 15 deletions(-) diff --git a/.ruby-version b/.ruby-version index bea438e9..619b5376 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -3.3.1 +3.3.3 diff --git a/CHANGELOG.md b/CHANGELOG.md index c40a9d0a..d1bed2cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ section below. ## Unreleased changes +- [#378](https://github.com/Shopify/statsd-instrument/pull/378) Respect sampling rate when aggregation is enabled, just for timing metrics. + Not respecting sampling rate, incurs in a performance penalty, as we will send more metrics than expected. + Moreover, it overloads the StatsD server, which has to send out and process more metrics than expected. + ## Version 3.9.0 - Introduced an experimental aggregation feature to improve the efficiency of metrics reporting by aggregating diff --git a/lib/statsd/instrument/aggregator.rb b/lib/statsd/instrument/aggregator.rb index 177a2ea0..55a4b40f 100644 --- a/lib/statsd/instrument/aggregator.rb +++ b/lib/statsd/instrument/aggregator.rb @@ -24,12 +24,15 @@ def ==(other) end class Aggregator + DEFAULT_MAX_CONTEXT_SIZE = 250 + CONST_SAMPLE_RATE = 1.0 COUNT = :c DISTRIBUTION = :d MEASURE = :ms HISTOGRAM = :h GAUGE = :g + private_constant :COUNT, :DISTRIBUTION, :MEASURE, :HISTOGRAM, :GAUGE, :CONST_SAMPLE_RATE class << self def finalize(aggregation_state, sink, datagram_builders, datagram_builder_class, default_tags) @@ -78,7 +81,14 @@ def finalize(aggregation_state, sink, datagram_builders, datagram_builder_class, # @param default_tags [Array] The tags to add to all metrics. # @param flush_interval [Float] The interval at which to flush the aggregated metrics. # @param max_values [Integer] The maximum number of values to aggregate before flushing. - def initialize(sink, datagram_builder_class, prefix, default_tags, flush_interval: 5.0, max_values: 100) + def initialize( + sink, + datagram_builder_class, + prefix, + default_tags, + flush_interval: 5.0, + max_values: DEFAULT_MAX_CONTEXT_SIZE + ) @sink = sink @datagram_builder_class = datagram_builder_class @metric_prefix = prefix diff --git a/lib/statsd/instrument/client.rb b/lib/statsd/instrument/client.rb index 254bd158..c5fb33c7 100644 --- a/lib/statsd/instrument/client.rb +++ b/lib/statsd/instrument/client.rb @@ -156,7 +156,8 @@ def initialize( sink: StatsD::Instrument::NullSink.new, datagram_builder_class: self.class.datagram_builder_class_for_implementation(implementation), enable_aggregation: false, - aggregation_flush_interval: 2.0 + aggregation_flush_interval: 2.0, + aggregation_max_context_size: StatsD::Instrument::Aggregator::DEFAULT_MAX_CONTEXT_SIZE ) @sink = sink @datagram_builder_class = datagram_builder_class @@ -176,6 +177,7 @@ def initialize( prefix, default_tags, flush_interval: @aggregation_flush_interval, + max_values: aggregation_max_context_size, ) end end @@ -237,6 +239,19 @@ def increment(name, value = 1, sample_rate: nil, tags: nil, no_prefix: false) # @param tags (see #increment) # @return [void] def measure(name, value = nil, sample_rate: nil, tags: nil, no_prefix: false, &block) + sample_rate ||= @default_sample_rate + if sample_rate && !sample?(sample_rate) + # For all timing metrics, we have to use the sampling logic. + # Not doing so would impact performance and CPU usage. + # See Datadog's documentation for more details: https://github.com/DataDog/datadog-go/blob/20af2dbfabbbe6bd0347780cd57ed931f903f223/statsd/aggregator.go#L281-L283 + + if block_given? + return yield + end + + return StatsD::Instrument::VOID + end + if block_given? return latency(name, sample_rate: sample_rate, tags: tags, metric_type: :ms, no_prefix: no_prefix, &block) end @@ -245,10 +260,7 @@ def measure(name, value = nil, sample_rate: nil, tags: nil, no_prefix: false, &b @aggregator.aggregate_timing(name, value, tags: tags, no_prefix: no_prefix, type: :ms) return StatsD::Instrument::VOID end - sample_rate ||= @default_sample_rate - if sample_rate.nil? || sample?(sample_rate) - emit(datagram_builder(no_prefix: no_prefix).ms(name, value, sample_rate, tags)) - end + emit(datagram_builder(no_prefix: no_prefix).ms(name, value, sample_rate, tags)) StatsD::Instrument::VOID end @@ -306,6 +318,19 @@ def set(name, value, sample_rate: nil, tags: nil, no_prefix: false) # @param tags (see #increment) # @return [void] def distribution(name, value = nil, sample_rate: nil, tags: nil, no_prefix: false, &block) + sample_rate ||= @default_sample_rate + if sample_rate && !sample?(sample_rate) + # For all timing metrics, we have to use the sampling logic. + # Not doing so would impact performance and CPU usage. + # See Datadog's documentation for more details: https://github.com/DataDog/datadog-go/blob/20af2dbfabbbe6bd0347780cd57ed931f903f223/statsd/aggregator.go#L281-L283 + + if block_given? + return yield + end + + return StatsD::Instrument::VOID + end + if block_given? return latency(name, sample_rate: sample_rate, tags: tags, metric_type: :d, no_prefix: no_prefix, &block) end @@ -315,10 +340,7 @@ def distribution(name, value = nil, sample_rate: nil, tags: nil, no_prefix: fals return StatsD::Instrument::VOID end - sample_rate ||= @default_sample_rate - if sample_rate.nil? || sample?(sample_rate) - emit(datagram_builder(no_prefix: no_prefix).d(name, value, sample_rate, tags)) - end + emit(datagram_builder(no_prefix: no_prefix).d(name, value, sample_rate, tags)) StatsD::Instrument::VOID end @@ -334,14 +356,20 @@ def distribution(name, value = nil, sample_rate: nil, tags: nil, no_prefix: fals # @param tags (see #increment) # @return [void] def histogram(name, value, sample_rate: nil, tags: nil, no_prefix: false) + sample_rate ||= @default_sample_rate + if sample_rate && !sample?(sample_rate) + # For all timing metrics, we have to use the sampling logic. + # Not doing so would impact performance and CPU usage. + # See Datadog's documentation for more details: https://github.com/DataDog/datadog-go/blob/20af2dbfabbbe6bd0347780cd57ed931f903f223/statsd/aggregator.go#L281-L283 + return StatsD::Instrument::VOID + end + if @enable_aggregation @aggregator.aggregate_timing(name, value, tags: tags, no_prefix: no_prefix, type: :h) + return StatsD::Instrument::VOID end - sample_rate ||= @default_sample_rate - if sample_rate.nil? || sample?(sample_rate) - emit(datagram_builder(no_prefix: no_prefix).h(name, value, sample_rate, tags)) - end + emit(datagram_builder(no_prefix: no_prefix).h(name, value, sample_rate, tags)) StatsD::Instrument::VOID end diff --git a/lib/statsd/instrument/environment.rb b/lib/statsd/instrument/environment.rb index 08b7b788..0ded1e1e 100644 --- a/lib/statsd/instrument/environment.rb +++ b/lib/statsd/instrument/environment.rb @@ -125,6 +125,13 @@ def aggregation_interval Float(env.fetch("STATSD_AGGREGATION_INTERVAL", 2.0)) end + def aggregation_max_context_size + Integer(env.fetch( + "STATSD_AGGREGATION_MAX_CONTEXT_SIZE", + StatsD::Instrument::Aggregator::DEFAULT_MAX_CONTEXT_SIZE, + )) + end + def client StatsD::Instrument::Client.from_env(self) end diff --git a/test/client_test.rb b/test/client_test.rb index 6aa46b13..a9c57828 100644 --- a/test/client_test.rb +++ b/test/client_test.rb @@ -245,6 +245,18 @@ def test_sampling 5.times { client.increment("metric") } end + def test_sampling_with_aggregation + mock_sink = mock("sink") + mock_sink.stubs(:sample?).returns(false, true, false, false, true) + # Since we are aggregating, we only expect a single datagram. + mock_sink.expects(:<<).with("metric:60:60|d").once + mock_sink.expects(:flush).once + + client = StatsD::Instrument::Client.new(sink: mock_sink, default_sample_rate: 0.5, enable_aggregation: true) + 5.times { client.distribution("metric", 60) } + client.force_flush + end + def test_clone_with_prefix_option # Both clients will use the same sink. mock_sink = mock("sink")