Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public class ExemplarSampler {
// to be overwritten by automatic exemplar sampling. exemplars.length == customExemplars.length
private final AtomicBoolean acceptingNewExemplars = new AtomicBoolean(true);
private final AtomicBoolean acceptingNewCustomExemplars = new AtomicBoolean(true);
private final LongSupplier currentTimeMillis;

@Nullable
private final SpanContext
spanContext; // may be null, in that case SpanContextSupplier.getSpanContext() is used.

public ExemplarSampler(ExemplarSamplerConfig config) {
this(config, null);
this(config, null, System::currentTimeMillis);
}

/**
Expand All @@ -58,15 +59,32 @@ public ExemplarSampler(ExemplarSamplerConfig config) {
* SpanContextSupplier.getSpanContext()} is called to find a span context.
*/
public ExemplarSampler(ExemplarSamplerConfig config, @Nullable SpanContext spanContext) {
this(config, spanContext, System::currentTimeMillis);
}

/**
* Constructor with an additional {code currentTimeMillis} argument for testing. This allows
* injecting a custom time source to make tests deterministic and avoid flaky tests caused by
* timing issues.
*
* @param config the exemplar sampler configuration
* @param spanContext the span context, may be null
* @param currentTimeMillis time source function that returns current time in milliseconds
*/
public ExemplarSampler(
ExemplarSamplerConfig config,
@Nullable SpanContext spanContext,
LongSupplier currentTimeMillis) {
this.config = config;
this.exemplars = new Exemplar[config.getNumberOfExemplars()];
this.customExemplars = new Exemplar[exemplars.length];
this.spanContext = spanContext;
this.currentTimeMillis = currentTimeMillis;
}

public Exemplars collect() {
// this may run in parallel with observe()
long now = System.currentTimeMillis();
long now = currentTimeMillis.getAsLong();
List<Exemplar> result = new ArrayList<>(exemplars.length);
for (int i = 0; i < customExemplars.length; i++) {
Exemplar exemplar = customExemplars[i];
Expand Down Expand Up @@ -129,7 +147,7 @@ private long doObserve(double value) {
}

private long doObserveSingleExemplar(double value) {
long now = System.currentTimeMillis();
long now = currentTimeMillis.getAsLong();
Exemplar current = exemplars[0];
if (current == null
|| now - current.getTimestampMillis() > config.getMinRetentionPeriodMillis()) {
Expand All @@ -139,7 +157,7 @@ private long doObserveSingleExemplar(double value) {
}

private long doObserveSingleExemplar(double amount, Labels labels) {
long now = System.currentTimeMillis();
long now = currentTimeMillis.getAsLong();
Exemplar current = customExemplars[0];
if (current == null
|| now - current.getTimestampMillis() > config.getMinRetentionPeriodMillis()) {
Expand All @@ -149,7 +167,7 @@ private long doObserveSingleExemplar(double amount, Labels labels) {
}

private long doObserveWithUpperBounds(double value, double[] classicUpperBounds) {
long now = System.currentTimeMillis();
long now = currentTimeMillis.getAsLong();
for (int i = 0; i < classicUpperBounds.length; i++) {
if (value <= classicUpperBounds[i]) {
Exemplar previous = exemplars[i];
Expand All @@ -165,7 +183,7 @@ private long doObserveWithUpperBounds(double value, double[] classicUpperBounds)
}

private long doObserveWithoutUpperBounds(double value) {
final long now = System.currentTimeMillis();
final long now = currentTimeMillis.getAsLong();
Exemplar smallest = null;
int smallestIndex = -1;
Exemplar largest = null;
Expand Down Expand Up @@ -234,7 +252,7 @@ private long doObserveWithExemplar(double amount, Labels labels) {

private long doObserveWithExemplarWithUpperBounds(
double value, Labels labels, double[] classicUpperBounds) {
long now = System.currentTimeMillis();
long now = currentTimeMillis.getAsLong();
for (int i = 0; i < classicUpperBounds.length; i++) {
if (value <= classicUpperBounds[i]) {
Exemplar previous = customExemplars[i];
Expand All @@ -250,7 +268,7 @@ private long doObserveWithExemplarWithUpperBounds(
}

private long doObserveWithExemplarWithoutUpperBounds(double amount, Labels labels) {
final long now = System.currentTimeMillis();
final long now = currentTimeMillis.getAsLong();
int nullPos = -1;
int oldestPos = -1;
Exemplar oldest = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongSupplier;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -71,6 +72,9 @@ public class Histogram extends StatefulMetric<DistributionDataPoint, Histogram.D

@Nullable private final ExemplarSamplerConfig exemplarSamplerConfig;

// For testing: allows injecting a custom time source for exemplar sampling
@Nullable LongSupplier exemplarSamplerTimeSource = null;

// Upper bounds for the classic histogram buckets. Contains at least +Inf.
// An empty array indicates that this is a native histogram only.
private final double[] classicUpperBounds;
Expand Down Expand Up @@ -199,7 +203,12 @@ public class DataPoint implements DistributionDataPoint {

private DataPoint() {
if (exemplarSamplerConfig != null) {
exemplarSampler = new ExemplarSampler(exemplarSamplerConfig);
if (exemplarSamplerTimeSource != null) {
exemplarSampler =
new ExemplarSampler(exemplarSamplerConfig, null, exemplarSamplerTimeSource);
} else {
exemplarSampler = new ExemplarSampler(exemplarSamplerConfig);
}
} else {
exemplarSampler = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.prometheus.metrics.core.exemplars;

import java.lang.reflect.Field;
import java.util.function.LongSupplier;

public class ExemplarSamplerConfigTestUtil {

Expand Down Expand Up @@ -29,4 +30,11 @@ public static void setSampleIntervalMillis(Object metric, long value)
ExemplarSamplerConfig config = getConfig(metric, "exemplarSamplerConfig");
setRetentionPeriod(config, "sampleIntervalMillis", value);
}

public static void setExemplarSamplerTimeSource(Object metric, LongSupplier timeSource)
throws NoSuchFieldException, IllegalAccessException {
Field timeSourceField = metric.getClass().getDeclaredField("exemplarSamplerTimeSource");
timeSourceField.setAccessible(true);
timeSourceField.set(metric, timeSource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -956,6 +957,21 @@ public void testDefaults() throws IOException {

@Test
public void testExemplarsClassicHistogram() throws Exception {
class MockTimeSource implements LongSupplier {
private long currentTimeMillis = 0;

@Override
public long getAsLong() {
return currentTimeMillis;
}

void advance(long millis) {
currentTimeMillis += millis;
}
}

MockTimeSource mockTime = new MockTimeSource();

SpanContext spanContext =
new SpanContext() {
int callCount = 0;
Expand Down Expand Up @@ -990,6 +1006,7 @@ public void markCurrentSpanAsExemplar() {}

long sampleIntervalMillis = 10;
ExemplarSamplerConfigTestUtil.setSampleIntervalMillis(histogram, sampleIntervalMillis);
ExemplarSamplerConfigTestUtil.setExemplarSamplerTimeSource(histogram, mockTime);
SpanContextSupplier.setSpanContext(spanContext);

Exemplar ex1a = Exemplar.builder().value(0.5).spanId("spanId-1").traceId("traceId-1").build();
Expand Down Expand Up @@ -1020,7 +1037,9 @@ public void markCurrentSpanAsExemplar() {}
assertThat(getExemplar(snapshot, Double.POSITIVE_INFINITY, "path", "/hello")).isNull();
assertThat(getExemplar(snapshot, Double.POSITIVE_INFINITY, "path", "/world")).isNull();

Thread.sleep(sampleIntervalMillis + 1);
mockTime.advance(sampleIntervalMillis + 1);
// Small wait to let the scheduler re-enable accepting new exemplars
Thread.sleep(50);
histogram.labelValues("/hello").observe(4.5);
histogram.labelValues("/world").observe(4.5);

Expand All @@ -1036,13 +1055,16 @@ public void markCurrentSpanAsExemplar() {}
assertExemplarEquals(ex2a, getExemplar(snapshot, Double.POSITIVE_INFINITY, "path", "/hello"));
assertExemplarEquals(ex2b, getExemplar(snapshot, Double.POSITIVE_INFINITY, "path", "/world"));

Thread.sleep(sampleIntervalMillis + 1);
mockTime.advance(sampleIntervalMillis + 1);
Thread.sleep(50);
histogram.labelValues("/hello").observe(1.5);
histogram.labelValues("/world").observe(1.5);
Thread.sleep(sampleIntervalMillis + 1);
mockTime.advance(sampleIntervalMillis + 1);
Thread.sleep(50);
histogram.labelValues("/hello").observe(2.5);
histogram.labelValues("/world").observe(2.5);
Thread.sleep(sampleIntervalMillis + 1);
mockTime.advance(sampleIntervalMillis + 1);
Thread.sleep(50);
histogram.labelValues("/hello").observe(3.5);
histogram.labelValues("/world").observe(3.5);

Expand Down Expand Up @@ -1072,7 +1094,8 @@ public void markCurrentSpanAsExemplar() {}
"span_id",
"spanId-11"))
.build();
Thread.sleep(sampleIntervalMillis + 1);
mockTime.advance(sampleIntervalMillis + 1);
Thread.sleep(50);
histogram
.labelValues("/hello")
.observeWithExemplar(3.4, Labels.of("key1", "value1", "key2", "value2"));
Expand Down