Skip to content

Commit

Permalink
[FLINK-34640][metrics] Replace DummyMetricGroup usage with Unregister…
Browse files Browse the repository at this point in the history
…edMetricsGroup
  • Loading branch information
jeyhunkarimov authored and JingGe committed Mar 14, 2024
1 parent 649e2b4 commit a0c811e
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

import org.junit.jupiter.api.Test;

import java.io.Serializable;
import java.time.Duration;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -173,72 +167,10 @@ public long extractTimestamp(Object element, long recordTimestamp) {
}

static TimestampAssignerSupplier.Context assignerContext() {
return DummyMetricGroup::new;
return UnregisteredMetricsGroup::new;
}

static WatermarkGeneratorSupplier.Context generatorContext() {
return DummyMetricGroup::new;
}

/**
* A dummy {@link MetricGroup} to be used when a group is required as an argument but not
* actually used.
*/
public static class DummyMetricGroup implements MetricGroup {

@Override
public Counter counter(String name) {
return null;
}

@Override
public <C extends Counter> C counter(String name, C counter) {
return null;
}

@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
return null;
}

@Override
public <H extends Histogram> H histogram(String name, H histogram) {
return null;
}

@Override
public <M extends Meter> M meter(String name, M meter) {
return null;
}

@Override
public MetricGroup addGroup(String name) {
return null;
}

@Override
public MetricGroup addGroup(String key, String value) {
return null;
}

@Override
public String[] getScopeComponents() {
return new String[0];
}

@Override
public Map<String, String> getAllVariables() {
return null;
}

@Override
public String getMetricIdentifier(String metricName) {
return null;
}

@Override
public String getMetricIdentifier(String metricName, CharacterFilter filter) {
return null;
}
return UnregisteredMetricsGroup::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategyTest.DummyMetricGroup;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext;
Expand Down Expand Up @@ -500,7 +500,7 @@ private CheckpointCoordinator instantiateCheckpointCoordinator(
new ExecutionGraphCheckpointPlanCalculatorContext(graph),
graph.getVerticesTopologically(),
false),
new CheckpointStatsTracker(1, new DummyMetricGroup(), new JobID()));
new CheckpointStatsTracker(1, new UnregisteredMetricsGroup(), new JobID()));
}

private static <T> T mockGeneric(Class<?> clazz) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategyTest.DummyMetricGroup;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
Expand Down Expand Up @@ -781,7 +781,7 @@ public static class CheckpointCoordinatorBuilder {
private boolean allowCheckpointsAfterTasksFinished;

private CheckpointStatsTracker checkpointStatsTracker =
new CheckpointStatsTracker(1, new DummyMetricGroup(), new JobID());
new CheckpointStatsTracker(1, new UnregisteredMetricsGroup(), new JobID());

private BiFunction<
Set<ExecutionJobVertex>,
Expand Down

0 comments on commit a0c811e

Please sign in to comment.