Skip to content

Commit 1790684

Browse files
xuan-cao-swimwear
andauthored
feat(metrics-sdk)!: Metrics cardinality limit (#1909)
* feat(metrics)!: add cardinality_limit * update remaining aggregation * add cardinality limit * add basic cardinality test for aggregtion * add more edge case * refactor * fix * update otlp metrics exporter for cardinality * fix test case * remove testing comments * merge * resolve merge issue * resolve merge issue * Apply suggestions from code review Co-authored-by: Matthew Wear <matthew.wear@gmail.com> * update --------- Co-authored-by: Matthew Wear <matthew.wear@gmail.com>
1 parent a77b740 commit 1790684

24 files changed

Lines changed: 910 additions & 316 deletions

exporter/otlp-metrics/lib/opentelemetry/exporter/otlp/metrics/metrics_exporter.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,13 @@ def initialize(endpoint: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPOR
5252
ssl_verify_mode: MetricsExporter.ssl_verify_mode,
5353
headers: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS', default: {}),
5454
compression: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_COMPRESSION', 'OTEL_EXPORTER_OTLP_COMPRESSION', default: 'gzip'),
55-
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10))
55+
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10),
56+
aggregation_cardinality_limit: nil)
5657
raise ArgumentError, "invalid url for OTLP::MetricsExporter #{endpoint}" unless OpenTelemetry::Common::Utilities.valid_url?(endpoint)
5758
raise ArgumentError, "unsupported compression key #{compression}" unless compression.nil? || %w[gzip none].include?(compression)
5859

5960
# create the MetricStore object
60-
super()
61+
super(aggregation_cardinality_limit: aggregation_cardinality_limit)
6162

6263
@uri = if endpoint == ENV['OTEL_EXPORTER_OTLP_ENDPOINT']
6364
endpoint += '/' unless endpoint.end_with?('/')

metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/drop.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def collect(start_time, end_time, data_points)
2222
data_points.values.map!(&:dup)
2323
end
2424

25-
def update(increment, attributes, data_points, exemplar_offer: false)
25+
def update(increment, attributes, data_points, cardinality_limit, exemplar_offer: false)
2626
data_points[attributes] = NumberDataPoint.new(
2727
{},
2828
0,

metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/explicit_bucket_histogram.rb

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ module Aggregation
1111
# Contains the implementation of the ExplicitBucketHistogram aggregation
1212
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation
1313
class ExplicitBucketHistogram
14+
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze
1415
attr_reader :exemplar_reservoir
1516

1617
DEFAULT_BOUNDARIES = [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000].freeze
@@ -59,60 +60,76 @@ def collect(start_time, end_time, data_points)
5960
end
6061
end
6162

62-
def update(amount, attributes, data_points, exemplar_offer: false)
63-
hdp = data_points.fetch(attributes) do
64-
if @record_min_max
65-
min = Float::INFINITY
66-
max = -Float::INFINITY
67-
end
63+
def update(amount, attributes, data_points, cardinality_limit, exemplar_offer: false)
64+
hdp = if data_points.key?(attributes)
65+
data_points[attributes]
66+
elsif data_points.size >= cardinality_limit - 1
67+
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
68+
else
69+
create_new_data_point(attributes, data_points)
70+
end
6871

69-
data_points[attributes] = HistogramDataPoint.new(
70-
attributes,
71-
nil, # :start_time_unix_nano
72-
nil, # :time_unix_nano
73-
0, # :count
74-
0, # :sum
75-
empty_bucket_counts, # :bucket_counts
76-
@boundaries, # :explicit_bounds
77-
nil, # :exemplars
78-
min, # :min
79-
max # :max
80-
)
81-
end
72+
update_histogram_data_point(hdp, amount, exemplar_offer: exemplar_offer)
73+
nil
74+
end
8275

83-
reservoir = @exemplar_reservoir_storage[attributes]
84-
unless reservoir
85-
reservoir = @exemplar_reservoir.dup
86-
reservoir.reset
87-
@exemplar_reservoir_storage[attributes] = reservoir
88-
end
76+
def aggregation_temporality
77+
@aggregation_temporality.temporality
78+
end
8979

90-
if exemplar_offer
91-
reservoir.offer(value: amount,
92-
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
93-
attributes: attributes,
94-
context: OpenTelemetry::Context.current)
80+
private
81+
82+
def create_new_data_point(attributes, data_points)
83+
if @record_min_max
84+
min = Float::INFINITY
85+
max = -Float::INFINITY
9586
end
9687

88+
data_points[attributes] = HistogramDataPoint.new(
89+
attributes,
90+
nil, # :start_time_unix_nano
91+
nil, # :time_unix_nano
92+
0, # :count
93+
0, # :sum
94+
empty_bucket_counts, # :bucket_counts
95+
@boundaries, # :explicit_bounds
96+
nil, # :exemplars
97+
min, # :min
98+
max # :max
99+
)
100+
end
101+
102+
def update_histogram_data_point(hdp, amount, exemplar_offer: false)
103+
reservior_update(hdp.attributes, amount, exemplar_offer)
104+
97105
if @record_min_max
98106
hdp.max = amount if amount > hdp.max
99107
hdp.min = amount if amount < hdp.min
100108
end
101109

102110
hdp.sum += amount
103111
hdp.count += 1
104-
if @boundaries
105-
bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size
106-
hdp.bucket_counts[bucket_index] += 1
107-
end
108-
nil
109-
end
112+
return unless @boundaries
110113

111-
def aggregation_temporality
112-
@aggregation_temporality.temporality
114+
bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size
115+
hdp.bucket_counts[bucket_index] += 1
113116
end
114117

115-
private
118+
def reservior_update(attributes, amount, exemplar_offer)
119+
reservoir = @exemplar_reservoir_storage[attributes]
120+
unless reservoir
121+
reservoir = @exemplar_reservoir.dup
122+
reservoir.reset
123+
@exemplar_reservoir_storage[attributes] = reservoir
124+
end
125+
126+
return unless exemplar_offer
127+
128+
reservoir.offer(value: amount,
129+
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
130+
attributes: attributes,
131+
context: OpenTelemetry::Context.current)
132+
end
116133

117134
def empty_bucket_counts
118135
@boundaries ? Array.new(@boundaries.size + 1, 0) : nil

metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/exponential_bucket_histogram.rb

Lines changed: 61 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ module Metrics
1717
module Aggregation
1818
# Contains the implementation of the {https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram ExponentialBucketHistogram} aggregation
1919
class ExponentialBucketHistogram # rubocop:disable Metrics/ClassLength
20+
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze
21+
2022
# relate to min max scale: https://opentelemetry.io/docs/specs/otel/metrics/sdk/#support-a-minimum-and-maximum-scale
2123
DEFAULT_SIZE = 160
2224
DEFAULT_SCALE = 20
@@ -219,49 +221,53 @@ def collect(start_time, end_time, data_points)
219221
# rubocop:enable Metrics/MethodLength
220222

221223
# this is aggregate in python; there is no merge in aggregate; but rescale happened
222-
# rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
223-
def update(amount, attributes, data_points, exemplar_offer: false)
224-
# fetch or initialize the ExponentialHistogramDataPoint
225-
hdp = data_points.fetch(attributes) do
226-
if @record_min_max
227-
min = Float::INFINITY
228-
max = -Float::INFINITY
229-
end
224+
def update(amount, attributes, data_points, cardinality_limit, exemplar_offer: false)
225+
hdp = if data_points.key?(attributes)
226+
data_points[attributes]
227+
elsif data_points.size >= cardinality_limit - 1
228+
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
229+
else
230+
create_new_data_point(attributes, data_points)
231+
end
232+
233+
update_histogram_data_point(hdp, attributes, amount, exemplar_offer: exemplar_offer)
234+
nil
235+
end
230236

231-
# this code block will only be executed if no data_points was found with the attributes
232-
data_points[attributes] = ExponentialHistogramDataPoint.new(
233-
attributes,
234-
nil, # :start_time_unix_nano
235-
0, # :time_unix_nano
236-
0, # :count
237-
0, # :sum
238-
@scale, # :scale
239-
@zero_count, # :zero_count
240-
ExponentialHistogram::Buckets.new, # :positive
241-
ExponentialHistogram::Buckets.new, # :negative
242-
0, # :flags
243-
nil, # :exemplars
244-
min, # :min
245-
max, # :max
246-
@zero_threshold # :zero_threshold
247-
)
248-
end
237+
def aggregation_temporality
238+
@aggregation_temporality.temporality
239+
end
249240

250-
reservoir = @exemplar_reservoir_storage[attributes]
251-
unless reservoir
252-
reservoir = @exemplar_reservoir.dup
253-
reservoir.reset
254-
@exemplar_reservoir_storage[attributes] = reservoir
255-
end
241+
private
256242

257-
if exemplar_offer
258-
reservoir.offer(value: amount,
259-
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
260-
attributes: attributes,
261-
context: OpenTelemetry::Context.current)
243+
def create_new_data_point(attributes, data_points)
244+
if @record_min_max
245+
min = Float::INFINITY
246+
max = -Float::INFINITY
262247
end
263248

264-
# Start to populate the data point (esp. the buckets)
249+
data_points[attributes] = ExponentialHistogramDataPoint.new(
250+
attributes,
251+
nil, # :start_time_unix_nano
252+
0, # :time_unix_nano
253+
0, # :count
254+
0, # :sum
255+
@scale, # :scale
256+
@zero_count, # :zero_count
257+
ExponentialHistogram::Buckets.new, # :positive
258+
ExponentialHistogram::Buckets.new, # :negative
259+
0, # :flags
260+
nil, # :exemplars
261+
min, # :min
262+
max, # :max
263+
@zero_threshold # :zero_threshold
264+
)
265+
end
266+
267+
# rubocop:disable Metrics/CyclomaticComplexity,Metrics/MethodLength
268+
def update_histogram_data_point(hdp, attributes, amount, exemplar_offer: false)
269+
reservior_update(attributes, amount, exemplar_offer)
270+
265271
if @record_min_max
266272
hdp.max = amount if amount > hdp.max
267273
hdp.min = amount if amount < hdp.min
@@ -337,15 +343,8 @@ def update(amount, attributes, data_points, exemplar_offer: false)
337343
bucket_index += buckets.counts.size if bucket_index.negative?
338344

339345
buckets.increment_bucket(bucket_index)
340-
nil
341346
end
342-
# rubocop:enable Metrics/MethodLength, Metrics/CyclomaticComplexity
343-
344-
def aggregation_temporality
345-
@aggregation_temporality.temporality
346-
end
347-
348-
private
347+
# rubocop:enable Metrics/CyclomaticComplexity,Metrics/MethodLength
349348

350349
def grow_buckets(span, buckets)
351350
return if span < buckets.counts.size
@@ -354,6 +353,22 @@ def grow_buckets(span, buckets)
354353
buckets.grow(span + 1, @size)
355354
end
356355

356+
def reservior_update(attributes, amount, exemplar_offer)
357+
reservoir = @exemplar_reservoir_storage[attributes]
358+
unless reservoir
359+
reservoir = @exemplar_reservoir.dup
360+
reservoir.reset
361+
@exemplar_reservoir_storage[attributes] = reservoir
362+
end
363+
364+
return unless exemplar_offer
365+
366+
reservoir.offer(value: amount,
367+
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
368+
attributes: attributes,
369+
context: OpenTelemetry::Context.current)
370+
end
371+
357372
def new_mapping(scale)
358373
scale = validate_scale(scale)
359374
scale <= 0 ? ExponentialHistogram::ExponentMapping.new(scale) : ExponentialHistogram::LogarithmMapping.new(scale)

metrics_sdk/lib/opentelemetry/sdk/metrics/aggregation/last_value.rb

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ module Metrics
1010
module Aggregation
1111
# Contains the implementation of the LastValue aggregation
1212
class LastValue
13+
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze
1314
attr_reader :exemplar_reservoir
1415

1516
# if no reservoir pass from instrument, then use this empty reservoir to avoid no method found error
@@ -33,29 +34,51 @@ def collect(start_time, end_time, data_points)
3334
ndps
3435
end
3536

36-
def update(increment, attributes, data_points, exemplar_offer: false)
37-
reservoir = @exemplar_reservoir_storage[attributes]
38-
unless reservoir
39-
reservoir = @exemplar_reservoir.dup
40-
reservoir.reset
41-
@exemplar_reservoir_storage[attributes] = reservoir
42-
end
37+
def update(increment, attributes, data_points, cardinality_limit, exemplar_offer: false)
38+
# Check if we already have this attribute set
39+
ndp = if data_points.key?(attributes)
40+
data_points[attributes]
41+
elsif data_points.size >= cardinality_limit - 1
42+
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
43+
else
44+
create_new_data_point(attributes, data_points)
45+
end
4346

44-
if exemplar_offer
45-
reservoir.offer(value: increment,
46-
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
47-
attributes: attributes,
48-
context: OpenTelemetry::Context.current)
49-
end
47+
update_number_data_point(ndp, increment, exemplar_offer: exemplar_offer)
48+
nil
49+
end
5050

51+
private
52+
53+
def create_new_data_point(attributes, data_points)
5154
data_points[attributes] = NumberDataPoint.new(
5255
attributes,
5356
nil,
5457
nil,
55-
increment,
58+
0,
5659
nil
5760
)
58-
nil
61+
end
62+
63+
def update_number_data_point(ndp, increment, exemplar_offer: false)
64+
ndp.value = increment
65+
reservior_update(ndp.attributes, increment, exemplar_offer)
66+
end
67+
68+
def reservior_update(attributes, increment, exemplar_offer)
69+
reservoir = @exemplar_reservoir_storage[attributes]
70+
unless reservoir
71+
reservoir = @exemplar_reservoir.dup
72+
reservoir.reset
73+
@exemplar_reservoir_storage[attributes] = reservoir
74+
end
75+
76+
return unless exemplar_offer
77+
78+
reservoir.offer(value: increment,
79+
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
80+
attributes: attributes,
81+
context: OpenTelemetry::Context.current)
5982
end
6083
end
6184
end

0 commit comments

Comments
 (0)