Skip to content

Commit

Permalink
[FLINK-33099][autoscaler] Introduce the Standalone Autoscaler and sup…
Browse files Browse the repository at this point in the history
…port flink cluster
  • Loading branch information
1996fanrui committed Nov 7, 2023
1 parent a309af6 commit c9dde4e
Show file tree
Hide file tree
Showing 13 changed files with 1,507 additions and 0 deletions.
75 changes: 75 additions & 0 deletions flink-autoscaler-standalone/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Flink Autoscaler Standalone

## What's the autoscaler standalone?

`Flink Autoscaler Standalone` is an implementation of `Flink Autoscaler`, it runs as
a separate java process. It computes the reasonable parallelism of all job vertices
by monitoring the metrics, such as: processing rate, busy time, etc. Please see
[Autoscaler official doc](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/)
for an overview of how autoscaling works.

`Flink Autoscaler Standalone` rescales flink job in-place by rest api of
[Externalized Declarative Resource Management](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#externalized-declarative-resource-management).
`RescaleApiScalingRealizer` is the default implementation of `ScalingRealizer`,
it uses the Rescale API to apply parallelism changes.

Kubernetes Operator is well integrated with Autoscaler, we strongly recommend using
Kubernetes Operator directly for the kubernetes flink jobs, and only flink jobs in
non-kubernetes environments use Autoscaler Standalone.

## How To Use

Currently, `Flink Autoscaler Standalone` only supports a single Flink cluster.
It can be any type of Flink cluster, includes:

- Flink Standalone Cluster
- MiniCluster
- Flink yarn session cluster
- Flink yarn application cluster
- Flink kubernetes session cluster
- Flink kubernetes application cluster
- etc

You can start a Flink Streaming job with the following ConfigOptions.

```
# Enable Adaptvie scheduler to play the in-place rescaling.
jobmanager.scheduler : adaptive
# Enable autoscale and scaling
job.autoscaler.enabled : true
job.autoscaler.scaling.enabled : true
job.autoscaler.stabilization.interval : 1m
job.autoscaler.metrics.window : 3m
```

Note: In-place rescaling is only supported since Flink 1.18. Flink jobs before version
1.18 cannot be scaled automatically, but you can view the ScalingReport in Log.
ScalingReport will show the recommended parallelism for each vertex.

After the flink job starts, please start the StandaloneAutoscaler process by the
following command.

```
java -cp flink-autoscaler-standalone-1.7-SNAPSHOT.jar \
org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint \
--flinkClusterHost localhost \
--flinkClusterPort 8081
```

Updating the `flinkClusterHost` and `flinkClusterPort` based on your flink cluster.
In general, the host and port are the same as Flink WebUI.

## Extensibility of autoscaler standalone

Please click [here](../flink-autoscaler/README.md) to check out extensibility of generic autoscaler.

`Autoscaler Standalone` isn't responsible for job management, so it doesn't have job information.
`Autoscaler Standalone` defines the `JobListFetcher` interface in order to get the
`JobAutoScalerContext` of the job. It has a control loop that periodically calls
`JobListFetcher#fetch` to fetch the job list and scale these jobs.

Currently `FlinkClusterJobListFetcher` is the only implementation of the `JobListFetcher`
interface, that's why `Flink Autoscaler Standalone` only supports a single Flink cluster so far.
We will implement `YarnJobListFetcher` in the future, `Flink Autoscaler Standalone` will call
`YarnJobListFetcher#fetch` to fetch job list from yarn cluster periodically.
207 changes: 207 additions & 0 deletions flink-autoscaler-standalone/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-kubernetes-operator-parent</artifactId>
<version>1.7-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>flink-autoscaler-standalone</artifactId>
<name>Flink Autoscaler Standalone</name>
<packaging>jar</packaging>


<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-autoscaler</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-rpc-akka-loader</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-zookeeper-3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-client-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-fs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</exclusion>
<exclusion>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>flink-streaming-java</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>flink-optimizer</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>

<!-- Logging -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
<version>${log4j.version}</version>
</dependency>

<!-- Test -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-autoscaler</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/**/module-info.class</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.autoscaler.standalone.StandaloneAutoscalerEntrypoint</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.standalone;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.autoscaler.JobAutoScalerContext;

import java.util.List;

/** The JobListFetcher will fetch the jobContext of all jobs. */
@Experimental
public interface JobListFetcher<KEY, Context extends JobAutoScalerContext<KEY>> {

List<Context> fetch() throws Exception;
}
Loading

0 comments on commit c9dde4e

Please sign in to comment.