Skip to content

Commit

Permalink
Aggregation: respect sampling for timings (#378)
Browse files Browse the repository at this point in the history
* Enabling samping for timings when aggregation is enabled

Signed-off-by: Pedro Tanaka <[email protected]>

* bump ruby to 3.3.3

Signed-off-by: Pedro Tanaka <[email protected]>

* adding test for new behavior

Also allowing customizing the max context size for aggregation

Signed-off-by: Pedro Tanaka <[email protected]>

* Introduce constant for max context size

Signed-off-by: Pedro Tanaka <[email protected]>

* reword comments

Signed-off-by: Pedro Tanaka <[email protected]>

* fixing constant placement

Signed-off-by: Pedro Tanaka <[email protected]>

* reword test comments

Signed-off-by: Pedro Tanaka <[email protected]>

---------

Signed-off-by: Pedro Tanaka <[email protected]>
  • Loading branch information
pedro-stanaka authored Sep 16, 2024
1 parent c024653 commit 98c9098
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.3.1
3.3.3
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ section below.

## Unreleased changes

- [#378](https://github.com/Shopify/statsd-instrument/pull/378) Respect sampling rate when aggregation is enabled, just for timing metrics.
Not respecting sampling rate, incurs in a performance penalty, as we will send more metrics than expected.
Moreover, it overloads the StatsD server, which has to send out and process more metrics than expected.

## Version 3.9.0

- Introduced an experimental aggregation feature to improve the efficiency of metrics reporting by aggregating
Expand Down
12 changes: 11 additions & 1 deletion lib/statsd/instrument/aggregator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ def ==(other)
end

class Aggregator
DEFAULT_MAX_CONTEXT_SIZE = 250

CONST_SAMPLE_RATE = 1.0
COUNT = :c
DISTRIBUTION = :d
MEASURE = :ms
HISTOGRAM = :h
GAUGE = :g
private_constant :COUNT, :DISTRIBUTION, :MEASURE, :HISTOGRAM, :GAUGE, :CONST_SAMPLE_RATE

class << self
def finalize(aggregation_state, sink, datagram_builders, datagram_builder_class, default_tags)
Expand Down Expand Up @@ -78,7 +81,14 @@ def finalize(aggregation_state, sink, datagram_builders, datagram_builder_class,
# @param default_tags [Array<String>] The tags to add to all metrics.
# @param flush_interval [Float] The interval at which to flush the aggregated metrics.
# @param max_values [Integer] The maximum number of values to aggregate before flushing.
def initialize(sink, datagram_builder_class, prefix, default_tags, flush_interval: 5.0, max_values: 100)
def initialize(
sink,
datagram_builder_class,
prefix,
default_tags,
flush_interval: 5.0,
max_values: DEFAULT_MAX_CONTEXT_SIZE
)
@sink = sink
@datagram_builder_class = datagram_builder_class
@metric_prefix = prefix
Expand Down
54 changes: 41 additions & 13 deletions lib/statsd/instrument/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ def initialize(
sink: StatsD::Instrument::NullSink.new,
datagram_builder_class: self.class.datagram_builder_class_for_implementation(implementation),
enable_aggregation: false,
aggregation_flush_interval: 2.0
aggregation_flush_interval: 2.0,
aggregation_max_context_size: StatsD::Instrument::Aggregator::DEFAULT_MAX_CONTEXT_SIZE
)
@sink = sink
@datagram_builder_class = datagram_builder_class
Expand All @@ -176,6 +177,7 @@ def initialize(
prefix,
default_tags,
flush_interval: @aggregation_flush_interval,
max_values: aggregation_max_context_size,
)
end
end
Expand Down Expand Up @@ -237,6 +239,19 @@ def increment(name, value = 1, sample_rate: nil, tags: nil, no_prefix: false)
# @param tags (see #increment)
# @return [void]
def measure(name, value = nil, sample_rate: nil, tags: nil, no_prefix: false, &block)
sample_rate ||= @default_sample_rate
if sample_rate && !sample?(sample_rate)
# For all timing metrics, we have to use the sampling logic.
# Not doing so would impact performance and CPU usage.
# See Datadog's documentation for more details: https://github.com/DataDog/datadog-go/blob/20af2dbfabbbe6bd0347780cd57ed931f903f223/statsd/aggregator.go#L281-L283

if block_given?
return yield
end

return StatsD::Instrument::VOID
end

if block_given?
return latency(name, sample_rate: sample_rate, tags: tags, metric_type: :ms, no_prefix: no_prefix, &block)
end
Expand All @@ -245,10 +260,7 @@ def measure(name, value = nil, sample_rate: nil, tags: nil, no_prefix: false, &b
@aggregator.aggregate_timing(name, value, tags: tags, no_prefix: no_prefix, type: :ms)
return StatsD::Instrument::VOID
end
sample_rate ||= @default_sample_rate
if sample_rate.nil? || sample?(sample_rate)
emit(datagram_builder(no_prefix: no_prefix).ms(name, value, sample_rate, tags))
end
emit(datagram_builder(no_prefix: no_prefix).ms(name, value, sample_rate, tags))
StatsD::Instrument::VOID
end

Expand Down Expand Up @@ -306,6 +318,19 @@ def set(name, value, sample_rate: nil, tags: nil, no_prefix: false)
# @param tags (see #increment)
# @return [void]
def distribution(name, value = nil, sample_rate: nil, tags: nil, no_prefix: false, &block)
sample_rate ||= @default_sample_rate
if sample_rate && !sample?(sample_rate)
# For all timing metrics, we have to use the sampling logic.
# Not doing so would impact performance and CPU usage.
# See Datadog's documentation for more details: https://github.com/DataDog/datadog-go/blob/20af2dbfabbbe6bd0347780cd57ed931f903f223/statsd/aggregator.go#L281-L283

if block_given?
return yield
end

return StatsD::Instrument::VOID
end

if block_given?
return latency(name, sample_rate: sample_rate, tags: tags, metric_type: :d, no_prefix: no_prefix, &block)
end
Expand All @@ -315,10 +340,7 @@ def distribution(name, value = nil, sample_rate: nil, tags: nil, no_prefix: fals
return StatsD::Instrument::VOID
end

sample_rate ||= @default_sample_rate
if sample_rate.nil? || sample?(sample_rate)
emit(datagram_builder(no_prefix: no_prefix).d(name, value, sample_rate, tags))
end
emit(datagram_builder(no_prefix: no_prefix).d(name, value, sample_rate, tags))
StatsD::Instrument::VOID
end

Expand All @@ -334,14 +356,20 @@ def distribution(name, value = nil, sample_rate: nil, tags: nil, no_prefix: fals
# @param tags (see #increment)
# @return [void]
def histogram(name, value, sample_rate: nil, tags: nil, no_prefix: false)
sample_rate ||= @default_sample_rate
if sample_rate && !sample?(sample_rate)
# For all timing metrics, we have to use the sampling logic.
# Not doing so would impact performance and CPU usage.
# See Datadog's documentation for more details: https://github.com/DataDog/datadog-go/blob/20af2dbfabbbe6bd0347780cd57ed931f903f223/statsd/aggregator.go#L281-L283
return StatsD::Instrument::VOID
end

if @enable_aggregation
@aggregator.aggregate_timing(name, value, tags: tags, no_prefix: no_prefix, type: :h)
return StatsD::Instrument::VOID
end

sample_rate ||= @default_sample_rate
if sample_rate.nil? || sample?(sample_rate)
emit(datagram_builder(no_prefix: no_prefix).h(name, value, sample_rate, tags))
end
emit(datagram_builder(no_prefix: no_prefix).h(name, value, sample_rate, tags))
StatsD::Instrument::VOID
end

Expand Down
7 changes: 7 additions & 0 deletions lib/statsd/instrument/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ def aggregation_interval
Float(env.fetch("STATSD_AGGREGATION_INTERVAL", 2.0))
end

def aggregation_max_context_size
Integer(env.fetch(
"STATSD_AGGREGATION_MAX_CONTEXT_SIZE",
StatsD::Instrument::Aggregator::DEFAULT_MAX_CONTEXT_SIZE,
))
end

def client
StatsD::Instrument::Client.from_env(self)
end
Expand Down
12 changes: 12 additions & 0 deletions test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,18 @@ def test_sampling
5.times { client.increment("metric") }
end

def test_sampling_with_aggregation
mock_sink = mock("sink")
mock_sink.stubs(:sample?).returns(false, true, false, false, true)
# Since we are aggregating, we only expect a single datagram.
mock_sink.expects(:<<).with("metric:60:60|d").once
mock_sink.expects(:flush).once

client = StatsD::Instrument::Client.new(sink: mock_sink, default_sample_rate: 0.5, enable_aggregation: true)
5.times { client.distribution("metric", 60) }
client.force_flush
end

def test_clone_with_prefix_option
# Both clients will use the same sink.
mock_sink = mock("sink")
Expand Down

0 comments on commit 98c9098

Please sign in to comment.