Skip to content

Commit

Permalink
[FLIP-334] Move non-kubernetes related autoscaler classes to flink-au…
Browse files Browse the repository at this point in the history
…toscaler module
  • Loading branch information
1996fanrui committed Jul 21, 2023
1 parent 0c341eb commit d67dd85
Show file tree
Hide file tree
Showing 44 changed files with 317 additions and 183 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ WORKDIR /app

COPY . .

RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-operator-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS
RUN --mount=type=cache,target=/root/.m2 mvn -ntp clean install -pl flink-kubernetes-standalone,flink-kubernetes-operator-api,flink-kubernetes-operator,flink-kubernetes-operator-autoscaler,flink-autoscaler,flink-kubernetes-webhook -DskipTests=$SKIP_TESTS

RUN cd /app/tools/license; mkdir jars; cd jars; \
cp /app/flink-kubernetes-operator/target/flink-kubernetes-operator-*-shaded.jar . && \
Expand Down
68 changes: 68 additions & 0 deletions flink-autoscaler/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-operator-parent</artifactId>
<version>1.6-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-autoscaler</artifactId>
<name>Flink Autoscaler</name>
<packaging>jar</packaging>

<properties>
<jackson.version>2.15.0</jackson.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<artifactId>jackson-dataformat-yaml</artifactId>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler;
package org.apache.flink.autoscaler;

import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand All @@ -37,11 +37,11 @@ public class AutoscalerFlinkMetrics {

private static final Logger LOG = LoggerFactory.getLogger(AutoscalerFlinkMetrics.class);

final Counter numScalings;
private final Counter numScalings;

final Counter numErrors;
private final Counter numErrors;

final Counter numBalanced;
private final Counter numBalanced;

private final MetricGroup metricGroup;

Expand All @@ -54,6 +54,18 @@ public AutoscalerFlinkMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}

public Counter getNumScalings() {
return numScalings;
}

public Counter getNumErrors() {
return numErrors;
}

public Counter getNumBalanced() {
return numBalanced;
}

public void registerScalingMetrics(
Supplier<Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>>
currentVertexMetrics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,45 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler;
package org.apache.flink.autoscaler;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.Edge;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import org.apache.commons.math3.stat.StatUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;

import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;

/** Job scaling evaluator for autoscaler. */
public class ScalingMetricEvaluator {
Expand Down Expand Up @@ -110,7 +111,7 @@ protected static boolean isProcessingBacklog(
});
}

@NotNull
@Nonnull
private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
Configuration conf,
HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> scalingOutput,
Expand Down Expand Up @@ -154,7 +155,7 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(
}

@VisibleForTesting
protected static void computeProcessingRateThresholds(
public static void computeProcessingRateThresholds(
Map<ScalingMetric, EvaluatedScalingMetric> metrics,
Configuration conf,
boolean processingBacklog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler;
package org.apache.flink.autoscaler;

import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;

import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

/** Scaling summary returned by the {@link ScalingMetricEvaluator}. */
/** Scaling summary returned by the {@link org.apache.flink.autoscaler.ScalingMetricEvaluator}. */
@Data
@NoArgsConstructor
public class ScalingSummary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.config;
package org.apache.flink.autoscaler.config;

import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;

import java.time.Duration;
import java.util.List;

import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.operatorConfig;

/** Config options related to the autoscaler module. */
public class AutoScalerOptions {

public static final String K8S_OP_CONF_PREFIX = "kubernetes.operator.";

private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
return operatorConfig("job.autoscaler." + key);
return ConfigOptions.key(K8S_OP_CONF_PREFIX + "job.autoscaler." + key);
}

public static final ConfigOption<Boolean> AUTOSCALER_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.metrics;
package org.apache.flink.autoscaler.metrics;

import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.JobTopology;

import lombok.Data;
import lombok.Setter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.metrics;
package org.apache.flink.autoscaler.metrics;

import org.apache.flink.runtime.jobgraph.JobVertexID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.metrics;
package org.apache.flink.autoscaler.metrics;

import org.apache.flink.runtime.jobgraph.JobVertexID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.metrics;
package org.apache.flink.autoscaler.metrics;

import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.metrics;
package org.apache.flink.autoscaler.metrics;

import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.metrics;
package org.apache.flink.autoscaler.metrics;

import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.metrics;
package org.apache.flink.autoscaler.metrics;

/**
* Supported scaling metrics. These represent high level metrics computed from Flink job metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.metrics;
package org.apache.flink.autoscaler.metrics;

import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.topology;
package org.apache.flink.autoscaler.topology;

import org.apache.flink.runtime.jobgraph.JobVertexID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.topology;
package org.apache.flink.autoscaler.topology;

import org.apache.flink.runtime.jobgraph.JobVertexID;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.flink.kubernetes.operator.autoscaler.utils;
package org.apache.flink.autoscaler.utils;

import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
import org.apache.flink.autoscaler.metrics.Edge;
import org.apache.flink.runtime.jobgraph.JobVertexID;

import com.fasterxml.jackson.core.JsonGenerator;
Expand Down
Loading

0 comments on commit d67dd85

Please sign in to comment.