From d67dd8553ed02218d9c4d5678f3ed580c7e4ecc9 Mon Sep 17 00:00:00 2001 From: 1996fanrui <1996fanrui@gmail.com> Date: Thu, 20 Jul 2023 18:58:53 +0800 Subject: [PATCH] [FLIP-334] Move non-kubernetes related autoscaler classes to flink-autoscaler module --- Dockerfile | 2 +- flink-autoscaler/pom.xml | 68 +++++++++++++++++++ .../autoscaler/AutoscalerFlinkMetrics.java | 24 +++++-- .../autoscaler/ScalingMetricEvaluator.java | 53 ++++++++------- .../flink}/autoscaler/ScalingSummary.java | 8 +-- .../autoscaler/config/AutoScalerOptions.java | 10 +-- .../metrics/CollectedMetricHistory.java | 4 +- .../autoscaler/metrics/CollectedMetrics.java | 2 +- .../flink}/autoscaler/metrics/Edge.java | 2 +- .../metrics/EvaluatedScalingMetric.java | 2 +- .../autoscaler/metrics/FlinkMetric.java | 2 +- .../autoscaler/metrics/MetricAggregator.java | 2 +- .../autoscaler/metrics/ScalingMetric.java | 2 +- .../autoscaler/metrics/ScalingMetrics.java | 8 +-- .../autoscaler/topology/JobTopology.java | 2 +- .../autoscaler/topology/VertexInfo.java | 2 +- .../utils/AutoScalerSerDeModule.java | 4 +- .../autoscaler/utils/AutoScalerUtils.java | 17 +++-- .../ScalingMetricEvaluatorTest.java | 44 ++++++------ .../metrics/ScalingMetricsTest.java | 8 +-- .../autoscaler/utils/AutoScalerUtilsTest.java | 4 +- .../src/test/resources/log4j2-test.properties | 26 +++++++ .../ConfigOptionsDocGenerator.java | 4 +- flink-kubernetes-operator-autoscaler/pom.xml | 6 ++ .../operator/autoscaler/AutoScalerInfo.java | 7 +- .../autoscaler/JobAutoScalerImpl.java | 18 ++--- .../autoscaler/JobAutoscalerFactoryImpl.java | 1 + .../operator/autoscaler/JobVertexScaler.java | 29 ++++---- .../autoscaler/RestApiMetricsCollector.java | 2 +- .../operator/autoscaler/ScalingExecutor.java | 19 +++--- .../autoscaler/ScalingMetricCollector.java | 16 ++--- .../autoscaler/AutoScalerInfoTest.java | 9 +-- .../autoscaler/BacklogBasedScalingTest.java | 14 ++-- .../autoscaler/JobAutoScalerImplTest.java | 12 ++-- .../operator/autoscaler/JobTopologyTest.java | 2 +- .../autoscaler/JobVertexScalerTest.java | 8 ++- .../MetricsCollectionAndEvaluationTest.java | 15 ++-- .../RecommendedParallelismTest.java | 15 ++-- .../RestApiMetricsCollectorTest.java | 2 +- .../autoscaler/ScalingExecutorTest.java | 8 ++- .../ScalingMetricCollectorTest.java | 6 +- .../autoscaler/TestingMetricsCollector.java | 4 +- flink-kubernetes-operator/pom.xml | 6 ++ pom.xml | 1 + 44 files changed, 317 insertions(+), 183 deletions(-) create mode 100644 flink-autoscaler/pom.xml rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/AutoscalerFlinkMetrics.java (90%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/ScalingMetricEvaluator.java (82%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/ScalingSummary.java (85%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/config/AutoScalerOptions.java (96%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/metrics/CollectedMetricHistory.java (88%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/metrics/CollectedMetrics.java (94%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/metrics/Edge.java (94%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/metrics/EvaluatedScalingMetric.java (95%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/metrics/FlinkMetric.java (97%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/metrics/MetricAggregator.java (95%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/metrics/ScalingMetric.java (97%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/metrics/ScalingMetrics.java (97%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/topology/JobTopology.java (99%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/topology/VertexInfo.java (95%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/utils/AutoScalerSerDeModule.java (95%) rename {flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/main/java/org/apache/flink}/autoscaler/utils/AutoScalerUtils.java (82%) rename {flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/test/java/org/apache/flink}/autoscaler/ScalingMetricEvaluatorTest.java (86%) rename {flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/test/java/org/apache/flink}/autoscaler/metrics/ScalingMetricsTest.java (98%) rename {flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator => flink-autoscaler/src/test/java/org/apache/flink}/autoscaler/utils/AutoScalerUtilsTest.java (94%) create mode 100644 flink-autoscaler/src/test/resources/log4j2-test.properties diff --git a/Dockerfile b/Dockerfile index 0d4a9fc8fa..548d2d3df4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,7 +23,7 @@ WORKDIR /app COPY . . -RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-operator-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS +RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-operator-autoscaler,flink-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS RUN cd /app/tools/license; mkdir jars; cd jars; \ cp /app/flink-kubernetes-operator/target/flink-kubernetes-operator-*-shaded.jar . && \ diff --git a/flink-autoscaler/pom.xml b/flink-autoscaler/pom.xml new file mode 100644 index 0000000000..f945a262ac --- /dev/null +++ b/flink-autoscaler/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + + org.apache.flink + flink-kubernetes-operator-parent + 1.6-SNAPSHOT + .. + + + flink-autoscaler + Flink Autoscaler + jar + + + 2.15.0 + + + + + + org.apache.flink + flink-runtime + ${flink.version} + provided + + + + jackson-dataformat-yaml + com.fasterxml.jackson.dataformat + ${jackson.version} + provided + + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + org.junit.jupiter + junit-jupiter-params + test + + + + \ No newline at end of file diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java similarity index 90% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java index fab3cb3268..1a0f067132 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/AutoscalerFlinkMetrics.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -37,11 +37,11 @@ public class AutoscalerFlinkMetrics { private static final Logger LOG = LoggerFactory.getLogger(AutoscalerFlinkMetrics.class); - final Counter numScalings; + private final Counter numScalings; - final Counter numErrors; + private final Counter numErrors; - final Counter numBalanced; + private final Counter numBalanced; private final MetricGroup metricGroup; @@ -54,6 +54,18 @@ public AutoscalerFlinkMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; } + public Counter getNumScalings() { + return numScalings; + } + + public Counter getNumErrors() { + return numErrors; + } + + public Counter getNumBalanced() { + return numBalanced; + } + public void registerScalingMetrics( Supplier>> currentVertexMetrics) { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java similarity index 82% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java index af6d173222..7cb0bf1c5e 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java @@ -15,44 +15,45 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.commons.math3.stat.StatUtils; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Job scaling evaluator for autoscaler. */ public class ScalingMetricEvaluator { @@ -110,7 +111,7 @@ protected static boolean isProcessingBacklog( }); } - @NotNull + @Nonnull private Map evaluateMetrics( Configuration conf, HashMap> scalingOutput, @@ -154,7 +155,7 @@ private Map evaluateMetrics( } @VisibleForTesting - protected static void computeProcessingRateThresholds( + public static void computeProcessingRateThresholds( Map metrics, Configuration conf, boolean processingBacklog) { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java similarity index 85% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java index 4ff7a6e94c..50a3d17b19 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingSummary.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingSummary.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; @@ -26,7 +26,7 @@ import java.util.Map; -/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */ +/** Scaling summary returned by the {@link org.apache.flink.autoscaler.ScalingMetricEvaluator}. */ @Data @NoArgsConstructor public class ScalingSummary { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java similarity index 96% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index b31a9becc1..68f644670e 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -15,22 +15,22 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.config; +package org.apache.flink.autoscaler.config; +import org.apache.flink.autoscaler.metrics.MetricAggregator; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator; import java.time.Duration; import java.util.List; -import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig; - /** Config options related to the autoscaler module. */ public class AutoScalerOptions { + public static final String K8S_OP_CONF_PREFIX = "kubernetes.operator."; + private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { - return operatorConfig("job.autoscaler." + key); + return ConfigOptions.key(K8S_OP_CONF_PREFIX + "job.autoscaler." + key); } public static final ConfigOption AUTOSCALER_ENABLED = diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java similarity index 88% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java index 43bbf47b32..31a19879be 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetricHistory.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetricHistory.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.JobTopology; import lombok.Data; import lombok.Setter; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java index 97e3f9fe0d..bfd85ca263 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/CollectedMetrics.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/CollectedMetrics.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java index c89692a9ef..1fe938b8a7 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/Edge.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/Edge.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java index 237888864a..c60adf1827 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/EvaluatedScalingMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/EvaluatedScalingMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import lombok.Data; import lombok.NoArgsConstructor; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java index 8160977523..7f28d947f2 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/FlinkMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java index c79d4b7dce..e147d211e6 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricAggregator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java index 608e4f1cfd..b6a3e17dfd 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; /** * Supported scaling metrics. These represent high level metrics computed from Flink job metrics diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java similarity index 97% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java index 592d06021c..afaa21aefa 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java similarity index 99% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java index 3f16019319..1f2f5a6379 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/JobTopology.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.topology; +package org.apache.flink.autoscaler.topology; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java index a261ca6d70..249be598a1 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/topology/VertexInfo.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.topology; +package org.apache.flink.autoscaler.topology; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java similarity index 95% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java index fad516a531..ce9914033e 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerSerDeModule.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerSerDeModule.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.Edge; import org.apache.flink.runtime.jobgraph.JobVertexID; import com.fasterxml.jackson.core.JsonGenerator; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java similarity index 82% rename from flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java rename to flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java index 54b6bef3cf..176ad5826e 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtils.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import java.util.ArrayList; @@ -30,9 +30,6 @@ import java.util.Map; import java.util.Set; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; - /** AutoScaler utilities. */ public class AutoScalerUtils { @@ -45,7 +42,8 @@ public static double getTargetProcessingCapacity( // Target = Lag Catchup Rate + Restart Catchup Rate + Processing at utilization // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + INPUT_RATE/TARGET_UTIL - double lagCatchupTargetRate = evaluatedMetrics.get(CATCH_UP_DATA_RATE).getCurrent(); + double lagCatchupTargetRate = + evaluatedMetrics.get(ScalingMetric.CATCH_UP_DATA_RATE).getCurrent(); if (Double.isNaN(lagCatchupTargetRate)) { return Double.NaN; } @@ -56,7 +54,8 @@ public static double getTargetProcessingCapacity( targetUtilization = Math.max(0., targetUtilization); targetUtilization = Math.min(1., targetUtilization); - double avgInputTargetRate = evaluatedMetrics.get(TARGET_DATA_RATE).getAverage(); + double avgInputTargetRate = + evaluatedMetrics.get(ScalingMetric.TARGET_DATA_RATE).getAverage(); if (Double.isNaN(avgInputTargetRate)) { return Double.NaN; } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java similarity index 86% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java index c67b27fe25..13b19445a1 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java @@ -15,18 +15,18 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler; +package org.apache.flink.autoscaler; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.Edge; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.jupiter.api.Test; @@ -39,19 +39,19 @@ import java.util.Set; import java.util.TreeMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.RESTART_TIME; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.CATCH_UP_DURATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java similarity index 98% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java index 0ae3e2b743..85a72e5a32 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.metrics; +package org.apache.flink.autoscaler.metrics; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java similarity index 94% rename from flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java rename to flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java index 90588994a3..a037258af2 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/utils/AutoScalerUtilsTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/AutoScalerUtilsTest.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.flink.kubernetes.operator.autoscaler.utils; +package org.apache.flink.autoscaler.utils; +import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.junit.jupiter.api.Test; diff --git a/flink-autoscaler/src/test/resources/log4j2-test.properties b/flink-autoscaler/src/test/resources/log4j2-test.properties new file mode 100644 index 0000000000..47b6664408 --- /dev/null +++ b/flink-autoscaler/src/test/resources/log4j2-test.properties @@ -0,0 +1,26 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +rootLogger.level = INFO +rootLogger.appenderRef.console.ref = ConsoleAppender + +# Log all infos to the console +appender.console.name = ConsoleAppender +appender.console.type = CONSOLE +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level]%notEmpty{[%X{resource.namespace}/}%notEmpty{%X{resource.name}]} %msg%n%throwable} diff --git a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java index 2fb1e6a47d..7f38f1fae8 100644 --- a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java +++ b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java @@ -76,9 +76,7 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation( "flink-kubernetes-operator", "org.apache.flink.kubernetes.operator.metrics"), - new OptionsClassLocation( - "flink-kubernetes-operator-autoscaler", - "org.apache.flink.kubernetes.operator.autoscaler.config") + new OptionsClassLocation("flink-autoscaler", "org.apache.flink.autoscaler.config") }; static final String DEFAULT_PATH_PREFIX = "src/main/java"; diff --git a/flink-kubernetes-operator-autoscaler/pom.xml b/flink-kubernetes-operator-autoscaler/pom.xml index 66307e8ce6..4b77ee05cc 100644 --- a/flink-kubernetes-operator-autoscaler/pom.xml +++ b/flink-kubernetes-operator-autoscaler/pom.xml @@ -36,6 +36,12 @@ under the License. + + org.apache.flink + flink-autoscaler + ${project.version} + + io.fabric8 kubernetes-client diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java index 55c0b0ac80..c37b299ffb 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java @@ -18,11 +18,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerSerDeModule; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java index fbefd15a94..12904af480 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java @@ -19,10 +19,12 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; import org.apache.flink.kubernetes.operator.utils.EventRecorder; @@ -36,9 +38,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; /** Application and SessionJob autoscaler. */ public class JobAutoScalerImpl implements JobAutoScaler { @@ -158,16 +160,16 @@ public boolean scale(FlinkResourceContext ctx) { scalingExecutor.scaleResource(resource, autoScalerInfo, conf, evaluatedMetrics); if (specAdjusted) { - flinkMetrics.numScalings.inc(); + flinkMetrics.getNumScalings().inc(); } else { - flinkMetrics.numBalanced.inc(); + flinkMetrics.getNumBalanced().inc(); } autoScalerInfo.replaceInKubernetes(kubernetesClient); return specAdjusted; } catch (Throwable e) { LOG.error("Error while scaling resource", e); - flinkMetrics.numErrors.inc(); + flinkMetrics.getNumErrors().inc(); eventRecorder.triggerEvent( resource, EventRecorder.Type.Warning, diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java index 277796e5a9..5ebe967049 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoscalerFactoryImpl.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler; import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScalerFactory; import org.apache.flink.kubernetes.operator.utils.EventRecorder; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java index 497bb27056..d5351fd36f 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java @@ -18,12 +18,13 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -37,16 +38,16 @@ import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Component responsible for computing vertex parallelism based on the scaling metrics. */ public class JobVertexScaler { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java index 158bb5a26e..dca52a0c1a 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.java @@ -18,9 +18,9 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java index 8ede4d9911..f2ec20d672 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java @@ -18,11 +18,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -38,12 +39,12 @@ import java.util.Map; import java.util.SortedMap; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; /** Class responsible for executing scaling decisions. */ public class ScalingExecutor { diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java index 3ad1db39bf..e620fb0ec9 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java @@ -19,17 +19,17 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetrics; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.utils.AutoScalerUtils; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java index cfc2e7f567..94ada9fbfd 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java @@ -17,11 +17,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetrics; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import com.fasterxml.jackson.core.JsonProcessingException; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java index 2c76ae7d3a..c9e67b7109 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java @@ -19,14 +19,16 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.AutoscalerFlinkMetrics; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; @@ -441,7 +443,7 @@ private void assertFlinkMetricsCount( AutoscalerFlinkMetrics autoscalerFlinkMetrics = autoscaler.flinkMetrics.get( ResourceID.fromResource(getResourceContext(app, ctx).getResource())); - assertEquals(scalingCount, autoscalerFlinkMetrics.numScalings.getCount()); - assertEquals(balancedCount, autoscalerFlinkMetrics.numBalanced.getCount()); + assertEquals(scalingCount, autoscalerFlinkMetrics.getNumScalings().getCount()); + assertEquals(balancedCount, autoscalerFlinkMetrics.getNumBalanced().getCount()); } } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java index bb46f610c1..f1749ad942 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImplTest.java @@ -38,8 +38,8 @@ import java.util.Map; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; -import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -76,12 +76,14 @@ void testErrorReporting() { ResourceID resourceId = ResourceID.fromResource(app); autoscaler.scale(resourceContext); - Assertions.assertEquals(1, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount()); + Assertions.assertEquals( + 1, autoscaler.flinkMetrics.get(resourceId).getNumErrors().getCount()); autoscaler.scale(resourceContext); - Assertions.assertEquals(2, autoscaler.flinkMetrics.get(resourceId).numErrors.getCount()); + Assertions.assertEquals( + 2, autoscaler.flinkMetrics.get(resourceId).getNumErrors().getCount()); - assertEquals(0, autoscaler.flinkMetrics.get(resourceId).numScalings.getCount()); + assertEquals(0, autoscaler.flinkMetrics.get(resourceId).getNumScalings().getCount()); } @Test diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java index f76fadce77..bd64be28b4 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobTopologyTest.java @@ -17,7 +17,7 @@ package org.apache.flink.kubernetes.operator.autoscaler; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java index fb4655dfbc..82eb2e45f7 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java @@ -17,12 +17,14 @@ package org.apache.flink.kubernetes.operator.autoscaler; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java index 0d973b00cf..f7dbb723fb 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -19,17 +19,18 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.CollectedMetricHistory; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java index 23ee405588..5246eefeef 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RecommendedParallelismTest.java @@ -19,15 +19,16 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventCollector; @@ -52,9 +53,9 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; import static org.apache.flink.kubernetes.operator.autoscaler.AutoscalerTestUtils.getOrCreateInfo; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM; -import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java index 04d8310740..9f3cb451df 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollectorTest.java @@ -18,11 +18,11 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java index 2191f72310..a5ada0c45f 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java @@ -18,13 +18,15 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.ScalingMetricEvaluator; +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.config.AutoScalerOptions; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; -import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.utils.EventCollector; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java index ddf2645320..3c11a7f638 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollectorTest.java @@ -19,13 +19,13 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; +import org.apache.flink.autoscaler.topology.VertexInfo; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; -import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.JobPlanInfo; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java index 9a3c1d130d..277ee33b54 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingMetricsCollector.java @@ -18,11 +18,11 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.api.common.JobID; +import org.apache.flink.autoscaler.metrics.FlinkMetric; +import org.apache.flink.autoscaler.topology.JobTopology; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; -import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric; -import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; import org.apache.flink.kubernetes.operator.service.FlinkService; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; diff --git a/flink-kubernetes-operator/pom.xml b/flink-kubernetes-operator/pom.xml index 99f74baf1e..7009cc7f97 100644 --- a/flink-kubernetes-operator/pom.xml +++ b/flink-kubernetes-operator/pom.xml @@ -49,6 +49,12 @@ under the License. ${project.version} + + org.apache.flink + flink-autoscaler + ${project.version} + + io.fabric8 kubernetes-httpclient-okhttp diff --git a/pom.xml b/pom.xml index 330d1d7ee4..4ba9a6de6a 100644 --- a/pom.xml +++ b/pom.xml @@ -57,6 +57,7 @@ under the License. flink-kubernetes-operator-api flink-kubernetes-webhook flink-kubernetes-docs + flink-autoscaler examples/flink-sql-runner-example examples/flink-beam-example examples/kubernetes-client-examples