Skip to content

Commit

Permalink
Address Gyula's comments:
Browse files Browse the repository at this point in the history
1. Refactor the ShipStrategy
2. Return early when `!hasKeyBy`
  • Loading branch information
1996fanrui committed Feb 24, 2024
1 parent e9a9d02 commit de579da
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,16 @@ protected static int scale(
// Apply min/max parallelism
newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);

if (!hasKeyBy) {
return newParallelism;
}

// When the shuffle type of vertex data source contains keyBy, we try to adjust the
// parallelism such that it divides the number of key groups without a remainder =>
// data is evenly spread across subtasks
if (hasKeyBy) {
for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
if (numKeyGroups % p == 0) {
return p;
}
for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
if (numKeyGroups % p == 0) {
return p;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;

/** Class responsible for executing scaling decisions. */
public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
Expand Down Expand Up @@ -229,7 +230,7 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();

final boolean hasKeyBy =
jobTopology.get(v).getInputs().containsValue("HASH");
jobTopology.get(v).getInputs().containsValue(HASH);
var newParallelism =
jobVertexScaler.computeScaleTargetParallelism(
context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public JobTopology(VertexInfo... vertexInfo) {

public JobTopology(Set<VertexInfo> vertexInfo) {

Map<JobVertexID, Map<JobVertexID, String>> vertexOutputs = new HashMap<>();
Map<JobVertexID, Map<JobVertexID, ShipStrategy>> vertexOutputs = new HashMap<>();
vertexInfos =
ImmutableMap.copyOf(
vertexInfo.stream().collect(Collectors.toMap(VertexInfo::getId, v -> v)));
Expand Down Expand Up @@ -145,7 +145,7 @@ public static JobTopology fromJsonPlan(

for (JsonNode node : nodes) {
var vertexId = JobVertexID.fromHexString(node.get("id").asText());
var inputs = new HashMap<JobVertexID, String>();
var inputs = new HashMap<JobVertexID, ShipStrategy>();
var ioMetrics = metrics.get(vertexId);
var finished = finishedVertices.contains(vertexId);
vertexInfo.add(
Expand All @@ -160,7 +160,7 @@ public static JobTopology fromJsonPlan(
for (JsonNode input : node.get("inputs")) {
inputs.put(
JobVertexID.fromHexString(input.get("id").asText()),
input.get("ship_strategy").asText());
ShipStrategy.of(input.get("ship_strategy").asText()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.topology;

import javax.annotation.Nonnull;

/** The ship strategy between 2 JobVertices. */
public enum ShipStrategy {
HASH,

REBALANCE,

RESCALE,

FORWARD,

CUSTOM,

BROADCAST,

GLOBAL,

SHUFFLE,

UNKNOWN;

/**
* Generates a ShipStrategy from a string, or returns {@link #UNKNOWN} if the value cannot match
* any ShipStrategy.
*/
@Nonnull
public static ShipStrategy of(String value) {
for (ShipStrategy shipStrategy : ShipStrategy.values()) {
if (shipStrategy.toString().equalsIgnoreCase(value)) {
return shipStrategy;
}
}
return UNKNOWN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public class VertexInfo {
private final JobVertexID id;

// All input vertices and the ship_strategy
private final Map<JobVertexID, String> inputs;
private final Map<JobVertexID, ShipStrategy> inputs;

// All output vertices and the ship_strategy
private Map<JobVertexID, String> outputs;
private Map<JobVertexID, ShipStrategy> outputs;

private final int parallelism;

Expand All @@ -48,7 +48,7 @@ public class VertexInfo {

public VertexInfo(
JobVertexID id,
Map<JobVertexID, String> inputs,
Map<JobVertexID, ShipStrategy> inputs,
int parallelism,
int maxParallelism,
boolean finished,
Expand All @@ -65,7 +65,7 @@ public VertexInfo(
@VisibleForTesting
public VertexInfo(
JobVertexID id,
Map<JobVertexID, String> inputs,
Map<JobVertexID, ShipStrategy> inputs,
int parallelism,
int maxParallelism,
IOMetrics ioMetrics) {
Expand All @@ -74,7 +74,10 @@ public VertexInfo(

@VisibleForTesting
public VertexInfo(
JobVertexID id, Map<JobVertexID, String> inputs, int parallelism, int maxParallelism) {
JobVertexID id,
Map<JobVertexID, ShipStrategy> inputs,
int parallelism,
int maxParallelism) {
this(id, inputs, parallelism, maxParallelism, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -53,6 +54,8 @@
import static org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MEMORY_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
import static org.apache.flink.autoscaler.topology.ShipStrategy.FORWARD;
import static org.apache.flink.autoscaler.topology.ShipStrategy.RESCALE;

/** Tunes the TaskManager memory. */
public class MemoryTuning {
Expand Down Expand Up @@ -254,9 +257,9 @@ private static MemorySize adjustNetworkMemory(
long maxNetworkMemory = 0;
for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
// Add max amount of memory for each input gate
for (Map.Entry<JobVertexID, String> inputEntry : vertexInfo.getInputs().entrySet()) {
final JobVertexID inputVertexId = inputEntry.getKey();
final String shipStrategy = inputEntry.getValue();
for (var inputEntry : vertexInfo.getInputs().entrySet()) {
var inputVertexId = inputEntry.getKey();
var shipStrategy = inputEntry.getValue();
maxNetworkMemory +=
calculateNetworkSegmentNumber(
updatedParallelisms.get(vertexInfo.getId()),
Expand All @@ -268,9 +271,9 @@ private static MemorySize adjustNetworkMemory(
}
// Add max amount of memory for each output gate
// Usually, there is just one output per task
for (Map.Entry<JobVertexID, String> outputEntry : vertexInfo.getOutputs().entrySet()) {
final JobVertexID outputVertexId = outputEntry.getKey();
final String shipStrategy = outputEntry.getValue();
for (var outputEntry : vertexInfo.getOutputs().entrySet()) {
var outputVertexId = outputEntry.getKey();
var shipStrategy = outputEntry.getValue();
maxNetworkMemory +=
calculateNetworkSegmentNumber(
updatedParallelisms.get(vertexInfo.getId()),
Expand Down Expand Up @@ -300,15 +303,15 @@ private static MemorySize adjustNetworkMemory(
static int calculateNetworkSegmentNumber(
int currentVertexParallelism,
int connectedVertexParallelism,
String shipStrategy,
ShipStrategy shipStrategy,
int buffersPerChannel,
int floatingBuffers) {
// TODO When the parallelism is changed via the rescale api, the FORWARD may be changed to
// RESCALE. This logic may needs to be updated after FLINK-33123.
if (currentVertexParallelism == connectedVertexParallelism
&& "FORWARD".equals(shipStrategy)) {
&& FORWARD.equals(shipStrategy)) {
return buffersPerChannel + floatingBuffers;
} else if ("FORWARD".equals(shipStrategy) || "RESCALE".equals(shipStrategy)) {
} else if (FORWARD.equals(shipStrategy) || RESCALE.equals(shipStrategy)) {
final int channelCount =
(int) Math.ceil(connectedVertexParallelism / (double) currentVertexParallelism);
return channelCount * buffersPerChannel + floatingBuffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import static org.apache.flink.autoscaler.JobAutoScalerImpl.AUTOSCALER_ERROR;
import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -81,7 +82,7 @@ public void setup() {
new VertexInfo(source1, Map.of(), 1, 720, new IOMetrics(0, 0, 0)),
new VertexInfo(
sink,
Map.of(source1, "REBALANCE"),
Map.of(source1, REBALANCE),
1,
720,
new IOMetrics(0, 0, 0))));
Expand Down Expand Up @@ -157,7 +158,7 @@ public void test() throws Exception {
metricsCollector.setJobTopology(
new JobTopology(
new VertexInfo(source1, Map.of(), 4, 24),
new VertexInfo(sink, Map.of(source1, "REBALANCE"), 4, 720)));
new VertexInfo(sink, Map.of(source1, REBALANCE), 4, 720)));

metricsCollector.updateMetrics(
source1,
Expand Down Expand Up @@ -239,7 +240,7 @@ public void test() throws Exception {
metricsCollector.setJobTopology(
new JobTopology(
new VertexInfo(source1, Map.of(), 2, 24),
new VertexInfo(sink, Map.of(source1, "REBALANCE"), 2, 720)));
new VertexInfo(sink, Map.of(source1, REBALANCE), 2, 720)));

/* Test stability while processing backlog. */

Expand Down Expand Up @@ -361,7 +362,7 @@ public void shouldTrackRestartDurationCorrectly() throws Exception {
metricsCollector.setJobTopology(
new JobTopology(
new VertexInfo(source1, Map.of(), 4, 720),
new VertexInfo(sink, Map.of(source1, "REBALANCE"), 4, 720)));
new VertexInfo(sink, Map.of(source1, REBALANCE), 4, 720)));

var expectedEndTime = Instant.ofEpochMilli(10);
metricsCollector.setJobUpdateTs(expectedEndTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.function.Supplier;

import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -92,12 +93,12 @@ public void setup() {
new VertexInfo(source2, Map.of(), 2, 720, new IOMetrics(0, 0, 0)),
new VertexInfo(
map,
Map.of(source1, "REBALANCE", source2, "REBALANCE"),
Map.of(source1, REBALANCE, source2, REBALANCE),
12,
720,
new IOMetrics(0, 0, 0)),
new VertexInfo(
sink, Map.of(map, "REBALANCE"), 8, 24, new IOMetrics(0, 0, 0)));
sink, Map.of(map, REBALANCE), 8, 24, new IOMetrics(0, 0, 0)));

metricsCollector = new TestingMetricsCollector<>(topology);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.flink.autoscaler.TestingAutoscalerUtils.getRestClusterClientSupplier;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -75,7 +76,7 @@ public void setup() {
new TestingMetricsCollector<>(
new JobTopology(
new VertexInfo(source, Map.of(), 1, 720),
new VertexInfo(sink, Map.of(source, "REBALANCE"), 1, 720)));
new VertexInfo(sink, Map.of(source, REBALANCE), 1, 720)));

var defaultConf = context.getConfiguration();
defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
Expand Down Expand Up @@ -199,7 +200,7 @@ public void endToEnd() throws Exception {
metricsCollector.setJobTopology(
new JobTopology(
new VertexInfo(source, Map.of(), 4, 24),
new VertexInfo(sink, Map.of(source, "REBALANCE"), 4, 720)));
new VertexInfo(sink, Map.of(source, REBALANCE), 4, 720)));

now = now.plus(Duration.ofSeconds(10));
setClocksTo(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_ENTRY;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED;
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED;
import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -154,10 +156,8 @@ public void testVertexesExclusionForScaling() throws Exception {
JobTopology jobTopology =
new JobTopology(
new VertexInfo(source, Map.of(), 10, 1000, false, null),
new VertexInfo(
filterOperator, Map.of(source, "HASH"), 10, 1000, false, null),
new VertexInfo(
sink, Map.of(filterOperator, "HASH"), 10, 1000, false, null));
new VertexInfo(filterOperator, Map.of(source, HASH), 10, 1000, false, null),
new VertexInfo(sink, Map.of(filterOperator, HASH), 10, 1000, false, null));

var conf = context.getConfiguration();
conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testExcludedPeriodsForScaling() throws Exception {
JobTopology jobTopology =
new JobTopology(
new VertexInfo(source, Map.of(), 10, 1000, false, null),
new VertexInfo(sink, Map.of(source, "HASH"), 10, 1000, false, null));
new VertexInfo(sink, Map.of(source, HASH), 10, 1000, false, null));

var conf = context.getConfiguration();
var now = Instant.now();
Expand Down Expand Up @@ -255,7 +255,7 @@ public void testBlockScalingOnFailedResourceCheck() throws Exception {
JobTopology jobTopology =
new JobTopology(
new VertexInfo(source, Map.of(), 10, 1000, false, null),
new VertexInfo(sink, Map.of(source, "HASH"), 10, 1000, false, null));
new VertexInfo(sink, Map.of(source, HASH), 10, 1000, false, null));

var now = Instant.now();
var metrics =
Expand Down Expand Up @@ -338,7 +338,7 @@ public void testMemoryTuning() throws Exception {
JobTopology jobTopology =
new JobTopology(
new VertexInfo(source, Map.of(), 10, 1000, false, null),
new VertexInfo(sink, Map.of(source, "REBALANCE"), 10, 1000, false, null));
new VertexInfo(sink, Map.of(source, REBALANCE), 10, 1000, false, null));

assertTrue(
scalingDecisionExecutor.scaleResource(
Expand Down
Loading

0 comments on commit de579da

Please sign in to comment.