From 74d992e78d8d3ce0c7193f51ff4099f02c4f7f4c Mon Sep 17 00:00:00 2001 From: APlyusnin Date: Thu, 27 Jun 2024 02:55:28 +0700 Subject: [PATCH] [FLINK-31215] Backpropagate processing rate limits from non-scalable bottlenecks to upstream operators --- .../autoscaler/IntermediateScalingResult.java | 60 ++ .../flink/autoscaler/JobVertexScaler.java | 69 +- .../flink/autoscaler/ScalingExecutor.java | 91 ++- .../flink/autoscaler/VertexScalingResult.java | 34 + .../autoscaler/config/AutoScalerOptions.java | 11 +- .../flink/autoscaler/JobVertexScalerTest.java | 709 ++++++++++++------ .../flink/autoscaler/ScalingExecutorTest.java | 217 ++++++ 7 files changed, 926 insertions(+), 265 deletions(-) create mode 100644 flink-autoscaler/src/main/java/org/apache/flink/autoscaler/IntermediateScalingResult.java create mode 100644 flink-autoscaler/src/main/java/org/apache/flink/autoscaler/VertexScalingResult.java diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/IntermediateScalingResult.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/IntermediateScalingResult.java new file mode 100644 index 0000000000..f659c97f01 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/IntermediateScalingResult.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Class for storing intermediate scaling results. */ +public class IntermediateScalingResult { + + private final Map scalingSummaries; + private final List bottlenecks; + + private double backpropagationScaleFactor = 1.0; + + public IntermediateScalingResult() { + scalingSummaries = new HashMap<>(); + bottlenecks = new ArrayList<>(); + } + + void addScalingSummary(JobVertexID vertex, ScalingSummary scalingSummary) { + scalingSummaries.put(vertex, scalingSummary); + } + + void addBottleneckVertex(JobVertexID bottleneck, double factor) { + bottlenecks.add(bottleneck); + backpropagationScaleFactor = Math.min(backpropagationScaleFactor, factor); + } + + public List getBottlenecks() { + return bottlenecks; + } + + public double getBackpropagationScaleFactor() { + return backpropagationScaleFactor; + } + + public Map getScalingSummaries() { + return scalingSummaries; + } +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 7442532a8b..7b37b98013 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -71,21 +71,31 @@ public JobVertexScaler(AutoScalerEventHandler autoScalerEventHandl this.autoScalerEventHandler = autoScalerEventHandler; } - public int computeScaleTargetParallelism( + public VertexScalingResult computeScaleTargetParallelism( Context context, JobVertexID vertex, Collection inputShipStrategies, Map evaluatedMetrics, SortedMap history, - Duration restartTime) { + Duration restartTime, + double backpropagationScaleFactor) { var conf = context.getConfiguration(); + + boolean excluded = + conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(vertex.toHexString()); + if (excluded) { + LOG.debug( + "Vertex {} is part of `vertex.exclude.ids` config, Check for bottleneck but not scale", + vertex); + } + var currentParallelism = (int) evaluatedMetrics.get(PARALLELISM).getCurrent(); double averageTrueProcessingRate = evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage(); if (Double.isNaN(averageTrueProcessingRate)) { LOG.warn( "True processing rate is not available for {}, cannot compute new parallelism", vertex); - return currentParallelism; + return VertexScalingResult.normalScaling(currentParallelism); } double targetCapacity = @@ -95,9 +105,11 @@ public int computeScaleTargetParallelism( LOG.warn( "Target data rate is not available for {}, cannot compute new parallelism", vertex); - return currentParallelism; + return VertexScalingResult.normalScaling(currentParallelism); } + targetCapacity *= backpropagationScaleFactor; + LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity); double scaleFactor = targetCapacity / averageTrueProcessingRate; double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR); @@ -122,16 +134,25 @@ public int computeScaleTargetParallelism( double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor; LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity); - int newParallelism = + int parallelismLowerLimit = + excluded + ? currentParallelism + : Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)); + int parallelismUpperLimit = + excluded + ? currentParallelism + : Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)); + + var scalingResult = scale( currentParallelism, inputShipStrategies, (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(), scaleFactor, - Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)), - Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM))); + parallelismLowerLimit, + parallelismUpperLimit); - if (newParallelism == currentParallelism + if (scalingResult.getParallelism() == currentParallelism || blockScalingBasedOnPastActions( context, vertex, @@ -139,15 +160,18 @@ public int computeScaleTargetParallelism( evaluatedMetrics, history, currentParallelism, - newParallelism)) { - return currentParallelism; + scalingResult.getParallelism())) { + return new VertexScalingResult( + currentParallelism, + scalingResult.getBottleneckScaleFactor(), + scalingResult.isBottleneck()); } // We record our expectations for this scaling operation evaluatedMetrics.put( ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(cappedTargetCapacity)); - return newParallelism; + return scalingResult; } private boolean blockScalingBasedOnPastActions( @@ -249,9 +273,12 @@ private boolean detectIneffectiveScaleUp( *

Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the * parallelism for source and keyed vertex such that it divides the maxParallelism without a * remainder. + * + *

If newParallelism exceeds min(parallelismUpperLimit, maxParallelism) the job vertex + * considered to be a bottleneck. */ @VisibleForTesting - protected static int scale( + protected static VertexScalingResult scale( int currentParallelism, Collection inputShipStrategies, int maxParallelism, @@ -284,13 +311,23 @@ protected static int scale( // parallelism upper limit final int upperBound = Math.min(maxParallelism, parallelismUpperLimit); + boolean isBottleneck = false; + double bottleneckScaleFactor = 1.0; + + // If required parallelism is higher than upper bound ---> the vertex is a bottleneck + if (newParallelism > upperBound) { + isBottleneck = true; + bottleneckScaleFactor = (double) upperBound / newParallelism; + newParallelism = upperBound; + } + // Apply min/max parallelism - newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); + newParallelism = Math.max(parallelismLowerLimit, newParallelism); var adjustByMaxParallelism = inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); if (!adjustByMaxParallelism) { - return newParallelism; + return new VertexScalingResult(newParallelism, bottleneckScaleFactor, isBottleneck); } // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to @@ -298,12 +335,12 @@ protected static int scale( // => data is evenly spread across subtasks for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { if (maxParallelism % p == 0) { - return p; + return new VertexScalingResult(p, bottleneckScaleFactor, isBottleneck); } } // If parallelism adjustment fails, use originally computed parallelism - return newParallelism; + return new VertexScalingResult(newParallelism, bottleneckScaleFactor, isBottleneck); } @VisibleForTesting diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index af32537118..0fe57c5ee0 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -49,6 +49,7 @@ import java.util.SortedMap; import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS; +import static org.apache.flink.autoscaler.config.AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED; import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL; import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON; @@ -220,39 +221,77 @@ Map computeScalingSummary( return Map.of(); } - var out = new HashMap(); - var excludeVertexIdList = - context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS); + var scalingResult = + computeScalingSummaryInternal( + context, evaluatedMetrics, scalingHistory, restartTime, jobTopology, 1.0); + + if (scalingResult.getBottlenecks().isEmpty() + || !context.getConfiguration() + .getBoolean(PROCESSING_RATE_BACKPROPAGATION_ENABLED)) { + return scalingResult.getScalingSummaries(); + } + + LOG.info("Vertices with ids {} are bottlenecks", scalingResult.getBottlenecks()); + + double backpropagationScaleFactor = scalingResult.getBackpropagationScaleFactor(); + + LOG.info( + "Processing rate back propagation scaling factor is {}", + backpropagationScaleFactor); + + scalingResult = + computeScalingSummaryInternal( + context, + evaluatedMetrics, + scalingHistory, + restartTime, + jobTopology, + backpropagationScaleFactor); + + return scalingResult.getScalingSummaries(); + } + + IntermediateScalingResult computeScalingSummaryInternal( + Context context, + EvaluatedMetrics evaluatedMetrics, + Map> scalingHistory, + Duration restartTime, + JobTopology jobTopology, + double backpropagationScaleFactor) { + + var scalingResult = new IntermediateScalingResult(); evaluatedMetrics .getVertexMetrics() .forEach( (v, metrics) -> { - if (excludeVertexIdList.contains(v.toHexString())) { - LOG.debug( - "Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling", - v); - } else { - var currentParallelism = - (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); - - var newParallelism = - jobVertexScaler.computeScaleTargetParallelism( - context, - v, - jobTopology.get(v).getInputs().values(), - metrics, - scalingHistory.getOrDefault( - v, Collections.emptySortedMap()), - restartTime); - if (currentParallelism != newParallelism) { - out.put( + var currentParallelism = + (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); + + var newParallelism = + jobVertexScaler.computeScaleTargetParallelism( + context, v, - new ScalingSummary( - currentParallelism, newParallelism, metrics)); - } + jobTopology.get(v).getInputs().values(), + metrics, + scalingHistory.getOrDefault( + v, Collections.emptySortedMap()), + restartTime, + backpropagationScaleFactor); + if (currentParallelism != newParallelism.getParallelism()) { + scalingResult.addScalingSummary( + v, + new ScalingSummary( + currentParallelism, + newParallelism.getParallelism(), + metrics)); + } + // Even if parallelism didn't change, vertex can be a bottleneck + if (newParallelism.isBottleneck()) { + scalingResult.addBottleneckVertex( + v, newParallelism.getBottleneckScaleFactor()); } }); - return out; + return scalingResult; } private boolean isJobUnderMemoryPressure( diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/VertexScalingResult.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/VertexScalingResult.java new file mode 100644 index 0000000000..f0f19971bb --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/VertexScalingResult.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +/** Class for storing information on how a single vertex is scaled. */ +@AllArgsConstructor +@Getter +public class VertexScalingResult { + private int parallelism; + private double bottleneckScaleFactor; + private boolean isBottleneck; + + public static VertexScalingResult normalScaling(int parallelism) { + return new VertexScalingResult(parallelism, 1.0, false); + } +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java index 6922448b20..4e1caee52a 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java @@ -58,6 +58,15 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs."); + public static final ConfigOption PROCESSING_RATE_BACKPROPAGATION_ENABLED = + autoScalerConfig("processing.rate.backpropagation.enabled") + .booleanType() + .defaultValue(false) + .withFallbackKeys( + oldOperatorConfigKey("processing.rate.backpropagation.enabled")) + .withDescription( + "Enable backpropagation of processing rate during autoscaling to reduce resources usage."); + public static final ConfigOption METRICS_WINDOW = autoScalerConfig("metrics.window") .durationType() @@ -313,7 +322,7 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .defaultValues() .withFallbackKeys(oldOperatorConfigKey("vertex.exclude.ids")) .withDescription( - "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented."); + "A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling."); public static final ConfigOption SCALING_EVENT_INTERVAL = autoScalerConfig("scaling.event.interval") diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 7856a3e723..0ef039ff85 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -98,159 +98,190 @@ public void testParallelismScaling(Collection inputShipStrategies) assertEquals( 5, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 50, 100), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 50, 100), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 8, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 50, 100), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 50, 100), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 10, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 80, 100), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 80, 100), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 8, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 60, 100), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 60, 100), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); assertEquals( 8, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 59, 100), - Collections.emptySortedMap(), - restartTime)); - - conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); - assertEquals( - 10, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(2, 100, 40), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 59, 100), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); assertEquals( 4, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(2, 100, 100), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(2, 100, 100), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); assertEquals( 5, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 10, 100), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 10, 100), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6); assertEquals( 4, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 10, 100), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 10, 100), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5); assertEquals( 15, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 200, 10), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 200, 10), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6); assertEquals( 16, - vertexScaler.computeScaleTargetParallelism( - context, - op, - inputShipStrategies, - evaluated(10, 200, 10), - Collections.emptySortedMap(), - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + inputShipStrategies, + evaluated(10, 200, 10), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); } @Test public void testParallelismComputation() { final int minParallelism = 1; final int maxParallelism = Integer.MAX_VALUE; - assertEquals( - 1, + + var result = JobVertexScaler.scale( - 1, NOT_ADJUST_INPUTS, 720, 0.0001, minParallelism, maxParallelism)); - assertEquals( - 1, + 1, NOT_ADJUST_INPUTS, 720, 0.0001, minParallelism, maxParallelism); + assertEquals(1, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale( - 2, NOT_ADJUST_INPUTS, 720, 0.1, minParallelism, maxParallelism)); - assertEquals( - 5, + 2, NOT_ADJUST_INPUTS, 720, 0.1, minParallelism, maxParallelism); + assertEquals(1, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale( - 6, NOT_ADJUST_INPUTS, 720, 0.8, minParallelism, maxParallelism)); - assertEquals( - 24, + 6, NOT_ADJUST_INPUTS, 720, 0.8, minParallelism, maxParallelism); + assertEquals(5, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale( - 16, NOT_ADJUST_INPUTS, 128, 1.5, minParallelism, maxParallelism)); - assertEquals( - 400, + 16, NOT_ADJUST_INPUTS, 128, 1.5, minParallelism, maxParallelism); + assertEquals(24, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale( - 200, NOT_ADJUST_INPUTS, 720, 2, minParallelism, maxParallelism)); - assertEquals( - 720, + 200, NOT_ADJUST_INPUTS, 720, 2, minParallelism, maxParallelism); + assertEquals(400, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale( 200, NOT_ADJUST_INPUTS, 720, Integer.MAX_VALUE, minParallelism, - maxParallelism)); + maxParallelism); + assertEquals(720, result.getParallelism()); + assertTrue(result.isBottleneck()); } @ParameterizedTest @@ -259,45 +290,64 @@ public void testParallelismComputationWithAdjustment( Collection inputShipStrategies) { final int minParallelism = 1; final int maxParallelism = Integer.MAX_VALUE; - assertEquals( - 6, + + var result = JobVertexScaler.scale( - 6, inputShipStrategies, 36, 0.8, minParallelism, maxParallelism)); - assertEquals( - 32, + 6, inputShipStrategies, 36, 0.8, minParallelism, maxParallelism); + assertEquals(6, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale( - 16, inputShipStrategies, 128, 1.5, minParallelism, maxParallelism)); - assertEquals( - 360, + 16, inputShipStrategies, 128, 1.5, minParallelism, maxParallelism); + assertEquals(32, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale( - 200, inputShipStrategies, 720, 1.3, minParallelism, maxParallelism)); - assertEquals( - 720, + 200, inputShipStrategies, 720, 1.3, minParallelism, maxParallelism); + assertEquals(360, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale( 200, inputShipStrategies, 720, Integer.MAX_VALUE, minParallelism, - maxParallelism)); + maxParallelism); + assertEquals(720, result.getParallelism()); + assertTrue(result.isBottleneck()); } @ParameterizedTest @MethodSource("adjustmentInputsProvider") public void testParallelismComputationWithLimit(Collection inputShipStrategies) { - assertEquals(5, JobVertexScaler.scale(6, inputShipStrategies, 720, 0.8, 1, 700)); - assertEquals(8, JobVertexScaler.scale(8, inputShipStrategies, 720, 0.8, 8, 700)); - assertEquals( - 32, JobVertexScaler.scale(16, inputShipStrategies, 128, 1.5, 1, Integer.MAX_VALUE)); - assertEquals( - 64, - JobVertexScaler.scale(16, inputShipStrategies, 128, 1.5, 60, Integer.MAX_VALUE)); + var result = JobVertexScaler.scale(6, inputShipStrategies, 720, 0.8, 1, 700); + assertEquals(5, result.getParallelism()); + assertFalse(result.isBottleneck()); - assertEquals(300, JobVertexScaler.scale(200, inputShipStrategies, 720, 2, 1, 300)); - assertEquals( - 600, - JobVertexScaler.scale(200, inputShipStrategies, 720, Integer.MAX_VALUE, 1, 600)); + result = JobVertexScaler.scale(8, inputShipStrategies, 720, 0.8, 8, 700); + assertEquals(8, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale(16, inputShipStrategies, 128, 1.5, 1, Integer.MAX_VALUE); + assertEquals(32, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale(16, inputShipStrategies, 128, 1.5, 60, Integer.MAX_VALUE); + assertEquals(64, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = JobVertexScaler.scale(200, inputShipStrategies, 720, 2, 1, 300); + assertEquals(300, result.getParallelism()); + assertTrue(result.isBottleneck()); + + result = JobVertexScaler.scale(200, inputShipStrategies, 720, Integer.MAX_VALUE, 1, 600); + assertEquals(600, result.getParallelism()); + assertTrue(result.isBottleneck()); } @Test @@ -308,12 +358,13 @@ public void ensureMinParallelismDoesNotExceedMax() { assertEquals( 600, JobVertexScaler.scale( - 200, - NOT_ADJUST_INPUTS, - 720, - Integer.MAX_VALUE, - 500, - 499))); + 200, + NOT_ADJUST_INPUTS, + 720, + Integer.MAX_VALUE, + 500, + 499) + .getParallelism())); } @Test @@ -321,50 +372,147 @@ public void testMinParallelismLimitIsUsed() { conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5); assertEquals( 5, + vertexScaler + .computeScaleTargetParallelism( + context, + new JobVertexID(), + NOT_ADJUST_INPUTS, + evaluated(10, 100, 500), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); + + // Make sure we respect current parallelism in case it's lower + assertEquals( + 4, + vertexScaler + .computeScaleTargetParallelism( + context, + new JobVertexID(), + NOT_ADJUST_INPUTS, + evaluated(4, 100, 500), + Collections.emptySortedMap(), + restartTime, + 1.0) + .getParallelism()); + } + + @Test + public void testMaxParallelismLimitIsUsed() { + conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + + var result = vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), NOT_ADJUST_INPUTS, - evaluated(10, 100, 500), + evaluated(10, 500, 100), Collections.emptySortedMap(), - restartTime)); + restartTime, + 1.0); - // Make sure we respect current parallelism in case it's lower - assertEquals( - 4, + assertEquals(10, result.getParallelism()); + assertTrue(result.isBottleneck()); + + result = vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), NOT_ADJUST_INPUTS, - evaluated(4, 100, 500), + evaluated(12, 500, 100), Collections.emptySortedMap(), - restartTime)); + restartTime, + 1.0); + + // Make sure we respect current parallelism in case it's higher + assertEquals(12, result.getParallelism()); + assertTrue(result.isBottleneck()); } @Test - public void testMaxParallelismLimitIsUsed() { - conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10); + public void testExcludedVerticesDontScale() { + + JobVertexID id = new JobVertexID(); + + conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 20); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); - assertEquals( - 10, + + conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(id.toHexString())); + + var result = vertexScaler.computeScaleTargetParallelism( context, - new JobVertexID(), + id, NOT_ADJUST_INPUTS, - evaluated(10, 500, 100), + evaluated(10, 100, 500), Collections.emptySortedMap(), - restartTime)); + restartTime, + 1.0); - // Make sure we respect current parallelism in case it's higher - assertEquals( - 12, + assertEquals(10, result.getParallelism()); + assertFalse(result.isBottleneck()); + + result = vertexScaler.computeScaleTargetParallelism( context, - new JobVertexID(), + id, NOT_ADJUST_INPUTS, - evaluated(12, 500, 100), + evaluated(10, 500, 100), + Collections.emptySortedMap(), + restartTime, + 1.0); + + // Check parallelism didn't change and vertex is a bottleneck + assertEquals(10, result.getParallelism()); + assertTrue(result.isBottleneck()); + } + + @Test + public void testBackPropagationScaleFactorAffectsScaling() { + var op = new JobVertexID(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); + conf.set(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 20); + + var result = vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated(4, 400, 100), Collections.emptySortedMap(), - restartTime)); + restartTime, + 1.0); + + assertEquals(16, result.getParallelism()); + assertFalse(result.isBottleneck()); + + conf.set(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 8); + result = vertexScaler.computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated(4, 400, 100), + Collections.emptySortedMap(), + restartTime, + 1.0); + + assertEquals(8, result.getParallelism()); + assertTrue(result.isBottleneck()); + assertEquals(0.5, result.getBottleneckScaleFactor()); + + result = vertexScaler.computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated(4, 400, 100), + Collections.emptySortedMap(), + restartTime, + 0.5); + + assertEquals(8, result.getParallelism()); + assertFalse(result.isBottleneck()); } @Test @@ -379,8 +527,16 @@ public void testScaleDownAfterScaleUpDetection() { var history = new TreeMap(); assertEquals( 10, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); history.put(clock.instant(), new ScalingSummary(5, 10, evaluated)); @@ -388,8 +544,16 @@ public void testScaleDownAfterScaleUpDetection() { evaluated = evaluated(10, 50, 100); assertEquals( 10, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); // Pass some time... clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61)); @@ -397,16 +561,32 @@ public void testScaleDownAfterScaleUpDetection() { assertEquals( 5, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); history.put(clock.instant(), new ScalingSummary(10, 5, evaluated)); // Allow immediate scale up evaluated = evaluated(5, 100, 50); assertEquals( 10, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); history.put(clock.instant(), new ScalingSummary(5, 10, evaluated)); } @@ -421,8 +601,16 @@ public void testIneffectiveScalingDetection() { var history = new TreeMap(); assertEquals( 10, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(5, 10, evaluated)); @@ -430,8 +618,16 @@ public void testIneffectiveScalingDetection() { evaluated = evaluated(10, 180, 90); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(10, 20, evaluated)); @@ -440,31 +636,63 @@ public void testIneffectiveScalingDetection() { evaluated = evaluated(20, 180, 94); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Still considered ineffective (less than <10%) evaluated = evaluated(20, 180, 98); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Allow scale up if current parallelism doesnt match last (user rescaled manually) evaluated = evaluated(10, 180, 90); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); // Over 10%, effective evaluated = evaluated(20, 180, 100); assertEquals( 36, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Ineffective but detection is turned off @@ -472,8 +700,16 @@ public void testIneffectiveScalingDetection() { evaluated = evaluated(20, 180, 90); assertEquals( 40, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true); @@ -481,8 +717,16 @@ public void testIneffectiveScalingDetection() { evaluated = evaluated(20, 45, 90); assertEquals( 10, - vertexScaler.computeScaleTargetParallelism( - context, op, NOT_ADJUST_INPUTS, evaluated, history, restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + op, + NOT_ADJUST_INPUTS, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); } @@ -498,13 +742,16 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh var history = new TreeMap(); assertEquals( 10, - vertexScaler.computeScaleTargetParallelism( - context, - jobVertexID, - inputShipStrategies, - evaluated, - history, - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + jobVertexID, + inputShipStrategies, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(5, 10, evaluated)); @@ -512,13 +759,16 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh evaluated = evaluated(10, 180, 90); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, - jobVertexID, - inputShipStrategies, - evaluated, - history, - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + jobVertexID, + inputShipStrategies, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(10, 20, evaluated)); assertEquals(0, eventCollector.events.size()); @@ -527,13 +777,16 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh evaluated = evaluated(20, 180, 95); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, - jobVertexID, - inputShipStrategies, - evaluated, - history, - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + jobVertexID, + inputShipStrategies, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(1, eventCollector.events.size()); var event = eventCollector.events.poll(); @@ -553,30 +806,36 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh EvaluatedScalingMetric.avg(tpr.getAverage() + 0.01)); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, - jobVertexID, - inputShipStrategies, - evaluated, - history, - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + jobVertexID, + inputShipStrategies, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(0, eventCollector.events.size()); // reset tpr evaluated.put(ScalingMetric.TRUE_PROCESSING_RATE, tpr); - // Repeat ineffective scale with postive interval, no event is triggered + // Repeat ineffective scale with positive interval, no event is triggered conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ofSeconds(1800)); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, - jobVertexID, - inputShipStrategies, - evaluated, - history, - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + jobVertexID, + inputShipStrategies, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(0, eventCollector.events.size()); @@ -584,13 +843,16 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO); assertEquals( 20, - vertexScaler.computeScaleTargetParallelism( - context, - jobVertexID, - inputShipStrategies, - evaluated, - history, - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + jobVertexID, + inputShipStrategies, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(1, eventCollector.events.size()); event = eventCollector.events.poll(); @@ -606,13 +868,16 @@ public void testSendingIneffectiveScalingEvents(Collection inputSh conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); assertEquals( 40, - vertexScaler.computeScaleTargetParallelism( - context, - jobVertexID, - inputShipStrategies, - evaluated, - history, - restartTime)); + vertexScaler + .computeScaleTargetParallelism( + context, + jobVertexID, + inputShipStrategies, + evaluated, + history, + restartTime, + 1.0) + .getParallelism()); assertEquals(1, eventCollector.events.size()); event = eventCollector.events.poll(); assertThat(event).isNotNull(); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index 9797226946..5dd5749e2c 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -643,6 +643,216 @@ public void testScalingUnderGcPressure() throws Exception { assertTrue(eventCollector.events.isEmpty()); } + @Test + public void testScalingWithBackPropEnabledSimpleGraph() throws Exception { + var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source = JobVertexID.fromHexString(sourceHexString); + var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e"; + var filterOperator = JobVertexID.fromHexString(filterOperatorHexString); + var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink = JobVertexID.fromHexString(sinkHexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 1, 10, false, null), + new VertexInfo( + filterOperator, Map.of(source, REBALANCE), 1, 5, false, null), + new VertexInfo(sink, Map.of(filterOperator, HASH), 1, 10, false, null)); + + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true); + + // If back propagation is enabled, then parallelism of all vertices is 5 + var metrics = + new EvaluatedMetrics( + Map.of( + source, + evaluatedWithMaxParallelism(1, 100, 10, 10), + filterOperator, + evaluatedWithMaxParallelism(1, 100, 10, 5), + sink, + evaluatedWithMaxParallelism(1, 100, 10, 10)), + dummyGlobalMetrics); + var now = Instant.now(); + assertThat( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology)) + .isTrue(); + + Map parallelismOverrides = stateStore.getParallelismOverrides(context); + + // Check if backpropagation has effect on another vertices + assertThat(parallelismOverrides) + .containsAllEntriesOf( + Map.of( + "0bfd135746ac8efb3cce668b12e16d3a", + "5", + "869fb403873411306404e9f2e4438c0e", + "5", + "a6b7102b8d3e3a9564998c1ffeb5e2b7", + "5")); + + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, false); + now = Instant.now(); + assertThat( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology)) + .isTrue(); + + parallelismOverrides = stateStore.getParallelismOverrides(context); + + assertThat(parallelismOverrides) + .containsAllEntriesOf( + Map.of( + "0bfd135746ac8efb3cce668b12e16d3a", + "10", + "869fb403873411306404e9f2e4438c0e", + "5", + "a6b7102b8d3e3a9564998c1ffeb5e2b7", + "10")); + + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true); + + jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 1, 11, false, null), + new VertexInfo( + filterOperator, Map.of(source, REBALANCE), 1, 6, false, null), + new VertexInfo(sink, Map.of(filterOperator, HASH), 1, 11, false, null)); + + metrics = + new EvaluatedMetrics( + Map.of( + source, + evaluatedWithMaxParallelism(1, 100, 10, 11), + filterOperator, + evaluatedWithMaxParallelism(1, 100, 10, 6), + sink, + evaluatedWithMaxParallelism(1, 100, 10, 11)), + dummyGlobalMetrics); + now = Instant.now(); + assertThat( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology)) + .isTrue(); + + parallelismOverrides = stateStore.getParallelismOverrides(context); + + // Check max parallelism of the bottleneck vertex affects on other vertices parallelism + assertThat(parallelismOverrides) + .containsAllEntriesOf( + Map.of( + "0bfd135746ac8efb3cce668b12e16d3a", + "6", + "869fb403873411306404e9f2e4438c0e", + "6", + "a6b7102b8d3e3a9564998c1ffeb5e2b7", + "6")); + } + + @Test + public void testScalingWithBackPropEnabledComplexGraph() throws Exception { + var source1HexString = "0bfd135746ac8efb3cce668b12e16d3a"; + var source1 = JobVertexID.fromHexString(source1HexString); + var source2HexString = "3082f1a7c9ee7aaab5172f2c0a5fba90"; + var source2 = JobVertexID.fromHexString(source2HexString); + var source3HexString = "108440e208a8fe04cca0fee9d0161721"; + var source3 = JobVertexID.fromHexString(source3HexString); + var joinOperator1HexString = "869fb403873411306404e9f2e4438c0e"; + var joinOperator1 = JobVertexID.fromHexString(joinOperator1HexString); + var joinOperator2HexString = "50d0addd07a68c439013f2767a6bd813"; + var joinOperator2 = JobVertexID.fromHexString(joinOperator2HexString); + var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; + var sink = JobVertexID.fromHexString(sinkHexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source1, Map.of(), 1, 1, false, null), + new VertexInfo(source2, Map.of(), 1, 1, false, null), + new VertexInfo(source3, Map.of(), 1, 3, false, null), + new VertexInfo( + joinOperator1, + Map.of(source1, HASH, source2, HASH), + 1, + 5, + false, + null), + new VertexInfo( + joinOperator2, + Map.of(joinOperator1, HASH, source3, HASH), + 1, + 15, + false, + null), + new VertexInfo(sink, Map.of(joinOperator2, REBALANCE), 1, 3, false, null)); + + var conf = context.getConfiguration(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d); + conf.set(AutoScalerOptions.PROCESSING_RATE_BACKPROPAGATION_ENABLED, true); + + // The expected new parallelism is 7 without adjustment by max parallelism. + var metrics = + new EvaluatedMetrics( + Map.of( + source1, + evaluatedWithMaxParallelism(1, 8, 10, 1), + source2, + evaluatedWithMaxParallelism(1, 7, 10, 1), + source3, + evaluatedWithMaxParallelism(1, 6, 50, 1), + joinOperator1, + evaluatedWithMaxParallelism(1, 200, 10, 5), + joinOperator2, + evaluatedWithMaxParallelism(1, 250, 10, 15), + sink, + evaluatedWithMaxParallelism(1, 100, 200, 3)), + dummyGlobalMetrics); + var now = Instant.now(); + assertThat( + scalingExecutor.scaleResource( + context, + metrics, + new HashMap<>(), + new ScalingTracking(), + now, + jobTopology)) + .isTrue(); + Map parallelismOverrides = stateStore.getParallelismOverrides(context); + // The source and keyed Operator should enable the parallelism adjustment, so the + // parallelism of source and sink are adjusted, but filter is not. + assertThat(parallelismOverrides) + .containsAllEntriesOf( + Map.of( + "0bfd135746ac8efb3cce668b12e16d3a", + "1", + "3082f1a7c9ee7aaab5172f2c0a5fba90", + "1", + "108440e208a8fe04cca0fee9d0161721", + "1", + "869fb403873411306404e9f2e4438c0e", + "5", + "50d0addd07a68c439013f2767a6bd813", + "7", + "a6b7102b8d3e3a9564998c1ffeb5e2b7", + "1")); + } + @Test public void testAdjustByMaxParallelism() throws Exception { var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a"; @@ -846,6 +1056,13 @@ private Map evaluated( return evaluated(parallelism, target, trueProcessingRate, 0.); } + private Map evaluatedWithMaxParallelism( + int parallelism, double target, double trueProcessingRate, int maxParallelism) { + var res = evaluated(parallelism, target, trueProcessingRate, 0.); + res.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(maxParallelism)); + return res; + } + protected static > Map getScaledParallelism( AutoScalerStateStore stateStore, Context context)