From 75de8ce82fb4a49111de3500f154306755875c07 Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Fri, 23 Feb 2024 16:19:20 +0800 Subject: [PATCH] [FLINK-34504][autoscaler] Avoid the parallelism adjustment when the upstream shuffle type doesn't have keyBy --- .../flink/autoscaler/JobVertexScaler.java | 20 +++-- .../flink/autoscaler/ScalingExecutor.java | 10 ++- .../flink/autoscaler/JobVertexScalerTest.java | 81 +++++++++++-------- .../MetricsCollectionAndEvaluationTest.java | 6 +- .../flink/autoscaler/ScalingExecutorTest.java | 58 +++++++++---- 5 files changed, 115 insertions(+), 60 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 01f6d94019..0c2c7d6a8b 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -71,6 +71,7 @@ public JobVertexScaler(AutoScalerEventHandler autoScalerEventHandl public int computeScaleTargetParallelism( Context context, JobVertexID vertex, + boolean hasKeyBy, Map evaluatedMetrics, SortedMap history, Duration restartTime) { @@ -124,7 +125,8 @@ public int computeScaleTargetParallelism( (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(), scaleFactor, Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)), - Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM))); + Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)), + hasKeyBy); if (newParallelism == currentParallelism || blockScalingBasedOnPastActions( @@ -251,7 +253,8 @@ protected static int scale( int numKeyGroups, double scaleFactor, int minParallelism, - int maxParallelism) { + int maxParallelism, + boolean hasKeyBy) { Preconditions.checkArgument( minParallelism <= maxParallelism, "The minimum parallelism must not be greater than the maximum parallelism."); @@ -280,11 +283,14 @@ protected static int scale( // Apply min/max parallelism newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound); - // Try to adjust the parallelism such that it divides the number of key groups without a - // remainder => state is evenly spread across subtasks - for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) { - if (numKeyGroups % p == 0) { - return p; + // When the shuffle type of vertex data source contains keyBy, we try to adjust the + // parallelism such that it divides the number of key groups without a remainder => + // data is evenly spread across subtasks + if (hasKeyBy) { + for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) { + if (numKeyGroups % p == 0) { + return p; + } } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index ef401ffedb..faeeec9b13 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -102,7 +102,8 @@ public boolean scaleResource( var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf); var scalingSummaries = - computeScalingSummary(context, evaluatedMetrics, scalingHistory, restartTime); + computeScalingSummary( + context, evaluatedMetrics, scalingHistory, restartTime, jobTopology); if (scalingSummaries.isEmpty()) { LOG.info("All job vertices are currently running at their target parallelism."); @@ -203,7 +204,8 @@ Map computeScalingSummary( Context context, EvaluatedMetrics evaluatedMetrics, Map> scalingHistory, - Duration restartTime) { + Duration restartTime, + JobTopology jobTopology) { LOG.debug("Restart time used in scaling summary computation: {}", restartTime); if (isJobUnderMemoryPressure(context, evaluatedMetrics.getGlobalMetrics())) { @@ -225,10 +227,14 @@ Map computeScalingSummary( } else { var currentParallelism = (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); + + final boolean hasKeyBy = + jobTopology.get(v).getInputs().containsValue("HASH"); var newParallelism = jobVertexScaler.computeScaleTargetParallelism( context, v, + hasKeyBy, metrics, scalingHistory.getOrDefault( v, Collections.emptySortedMap()), diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 456105d2a0..de15956135 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -85,6 +85,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 50, 100), Collections.emptySortedMap(), restartTime)); @@ -95,6 +96,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 50, 100), Collections.emptySortedMap(), restartTime)); @@ -105,6 +107,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 80, 100), Collections.emptySortedMap(), restartTime)); @@ -115,6 +118,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 60, 100), Collections.emptySortedMap(), restartTime)); @@ -124,6 +128,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 59, 100), Collections.emptySortedMap(), restartTime)); @@ -134,6 +139,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(2, 100, 40), Collections.emptySortedMap(), restartTime)); @@ -144,6 +150,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(2, 100, 100), Collections.emptySortedMap(), restartTime)); @@ -155,6 +162,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 10, 100), Collections.emptySortedMap(), restartTime)); @@ -165,6 +173,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 10, 100), Collections.emptySortedMap(), restartTime)); @@ -176,6 +185,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 200, 10), Collections.emptySortedMap(), restartTime)); @@ -186,6 +196,7 @@ public void testParallelismScaling() { vertexScaler.computeScaleTargetParallelism( context, op, + true, evaluated(10, 200, 10), Collections.emptySortedMap(), restartTime)); @@ -195,26 +206,28 @@ public void testParallelismScaling() { public void testParallelismComputation() { final int minParallelism = 1; final int maxParallelism = Integer.MAX_VALUE; - assertEquals(1, JobVertexScaler.scale(1, 720, 0.0001, minParallelism, maxParallelism)); - assertEquals(1, JobVertexScaler.scale(2, 720, 0.1, minParallelism, maxParallelism)); - assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, minParallelism, maxParallelism)); - assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, minParallelism, maxParallelism)); - assertEquals(400, JobVertexScaler.scale(200, 720, 2, minParallelism, maxParallelism)); + assertEquals( + 1, JobVertexScaler.scale(1, 720, 0.0001, minParallelism, maxParallelism, true)); + assertEquals(1, JobVertexScaler.scale(2, 720, 0.1, minParallelism, maxParallelism, true)); + assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, minParallelism, maxParallelism, true)); + assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, minParallelism, maxParallelism, true)); + assertEquals(400, JobVertexScaler.scale(200, 720, 2, minParallelism, maxParallelism, true)); assertEquals( 720, - JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism)); + JobVertexScaler.scale( + 200, 720, Integer.MAX_VALUE, minParallelism, maxParallelism, true)); } @Test public void testParallelismComputationWithLimit() { - assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, 1, 700)); - assertEquals(8, JobVertexScaler.scale(8, 720, 0.8, 8, 700)); + assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, 1, 700, true)); + assertEquals(8, JobVertexScaler.scale(8, 720, 0.8, 8, 700, true)); - assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, 1, Integer.MAX_VALUE)); - assertEquals(64, JobVertexScaler.scale(16, 128, 1.5, 60, Integer.MAX_VALUE)); + assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, 1, Integer.MAX_VALUE, true)); + assertEquals(64, JobVertexScaler.scale(16, 128, 1.5, 60, Integer.MAX_VALUE, true)); - assertEquals(300, JobVertexScaler.scale(200, 720, 2, 1, 300)); - assertEquals(600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 1, 600)); + assertEquals(300, JobVertexScaler.scale(200, 720, 2, 1, 300, true)); + assertEquals(600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 1, 600, true)); } @Test @@ -225,7 +238,7 @@ public void ensureMinParallelismDoesNotExceedMax() { assertEquals( 600, JobVertexScaler.scale( - 200, 720, Integer.MAX_VALUE, 500, 499))); + 200, 720, Integer.MAX_VALUE, 500, 499, true))); } @Test @@ -236,6 +249,7 @@ public void testMinParallelismLimitIsUsed() { vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), + true, evaluated(10, 100, 500), Collections.emptySortedMap(), restartTime)); @@ -246,6 +260,7 @@ public void testMinParallelismLimitIsUsed() { vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), + true, evaluated(4, 100, 500), Collections.emptySortedMap(), restartTime)); @@ -260,6 +275,7 @@ public void testMaxParallelismLimitIsUsed() { vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), + true, evaluated(10, 500, 100), Collections.emptySortedMap(), restartTime)); @@ -270,6 +286,7 @@ public void testMaxParallelismLimitIsUsed() { vertexScaler.computeScaleTargetParallelism( context, new JobVertexID(), + true, evaluated(12, 500, 100), Collections.emptySortedMap(), restartTime)); @@ -288,7 +305,7 @@ public void testScaleDownAfterScaleUpDetection() { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); history.put(clock.instant(), new ScalingSummary(5, 10, evaluated)); @@ -297,7 +314,7 @@ public void testScaleDownAfterScaleUpDetection() { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); // Pass some time... clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61)); @@ -306,7 +323,7 @@ public void testScaleDownAfterScaleUpDetection() { assertEquals( 5, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); history.put(clock.instant(), new ScalingSummary(10, 5, evaluated)); // Allow immediate scale up @@ -314,7 +331,7 @@ public void testScaleDownAfterScaleUpDetection() { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); history.put(clock.instant(), new ScalingSummary(5, 10, evaluated)); } @@ -330,7 +347,7 @@ public void testIneffectiveScalingDetection() { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(5, 10, evaluated)); @@ -339,7 +356,7 @@ public void testIneffectiveScalingDetection() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(10, 20, evaluated)); @@ -349,7 +366,7 @@ public void testIneffectiveScalingDetection() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Still considered ineffective (less than <10%) @@ -357,7 +374,7 @@ public void testIneffectiveScalingDetection() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Allow scale up if current parallelism doesnt match last (user rescaled manually) @@ -365,14 +382,14 @@ public void testIneffectiveScalingDetection() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); // Over 10%, effective evaluated = evaluated(20, 180, 100); assertEquals( 36, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Ineffective but detection is turned off @@ -381,7 +398,7 @@ public void testIneffectiveScalingDetection() { assertEquals( 40, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true); @@ -390,7 +407,7 @@ public void testIneffectiveScalingDetection() { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - context, op, evaluated, history, restartTime)); + context, op, true, evaluated, history, restartTime)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); } @@ -406,7 +423,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - context, jobVertexID, evaluated, history, restartTime)); + context, jobVertexID, true, evaluated, history, restartTime)); assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(5, 10, evaluated)); @@ -415,7 +432,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, jobVertexID, evaluated, history, restartTime)); + context, jobVertexID, true, evaluated, history, restartTime)); assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(10, 20, evaluated)); assertEquals(0, eventCollector.events.size()); @@ -425,7 +442,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, jobVertexID, evaluated, history, restartTime)); + context, jobVertexID, true, evaluated, history, restartTime)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(1, eventCollector.events.size()); var event = eventCollector.events.poll(); @@ -441,7 +458,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, jobVertexID, evaluated, history, restartTime)); + context, jobVertexID, true, evaluated, history, restartTime)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(0, eventCollector.events.size()); @@ -450,7 +467,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, jobVertexID, evaluated, history, restartTime)); + context, jobVertexID, true, evaluated, history, restartTime)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(0, eventCollector.events.size()); @@ -459,7 +476,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 20, vertexScaler.computeScaleTargetParallelism( - context, jobVertexID, evaluated, history, restartTime)); + context, jobVertexID, true, evaluated, history, restartTime)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); assertEquals(1, eventCollector.events.size()); event = eventCollector.events.poll(); @@ -476,7 +493,7 @@ public void testSendingIneffectiveScalingEvents() { assertEquals( 40, vertexScaler.computeScaleTargetParallelism( - context, jobVertexID, evaluated, history, restartTime)); + context, jobVertexID, true, evaluated, history, restartTime)); assertEquals(1, eventCollector.events.size()); event = eventCollector.events.poll(); assertThat(event).isNotNull(); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java index d0b0daabb8..0ee6128261 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -183,7 +183,7 @@ public void testEndToEnd() throws Exception { new HashMap<>(), new ScalingTracking(), clock.instant(), - new JobTopology()); + topology); var scaledParallelism = ScalingExecutorTest.getScaledParallelism(stateStore, context); assertEquals(4, scaledParallelism.size()); @@ -401,7 +401,7 @@ public void testTolerateAbsenceOfPendingRecordsMetric() throws Exception { new HashMap<>(), new ScalingTracking(), clock.instant(), - new JobTopology()); + topology); var scaledParallelism = ScalingExecutorTest.getScaledParallelism(stateStore, context); assertEquals(1, scaledParallelism.get(source1)); } @@ -652,7 +652,7 @@ public void testScaleDownWithZeroProcessingRate() throws Exception { new HashMap<>(), new ScalingTracking(), clock.instant(), - new JobTopology()); + topology); var scaledParallelism = ScalingExecutorTest.getScaledParallelism(stateStore, context); assertEquals(1, scaledParallelism.get(source1)); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java index 46b3242483..04de7110e2 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java @@ -151,6 +151,14 @@ public void testVertexesExclusionForScaling() throws Exception { var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; var sink = JobVertexID.fromHexString(sinkHexString); + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 1000, false, null), + new VertexInfo( + filterOperator, Map.of(source, "HASH"), 10, 1000, false, null), + new VertexInfo( + sink, Map.of(filterOperator, "HASH"), 10, 1000, false, null)); + var conf = context.getConfiguration(); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); var metrics = @@ -173,7 +181,7 @@ public void testVertexesExclusionForScaling() throws Exception { new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); // filter operator should scale conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of()); assertTrue( @@ -183,7 +191,7 @@ public void testVertexesExclusionForScaling() throws Exception { new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); } @Test @@ -192,6 +200,12 @@ public void testExcludedPeriodsForScaling() throws Exception { var source = JobVertexID.fromHexString(sourceHexString); var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; var sink = JobVertexID.fromHexString(sinkHexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 1000, false, null), + new VertexInfo(sink, Map.of(source, "HASH"), 10, 1000, false, null)); + var conf = context.getConfiguration(); var now = Instant.now(); var localTime = ZonedDateTime.ofInstant(now, ZoneId.systemDefault()).toLocalTime(); @@ -213,7 +227,7 @@ public void testExcludedPeriodsForScaling() throws Exception { new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); // scaling execution outside excluded periods excludedPeriod = new StringBuilder(localTime.plusSeconds(100).toString().split("\\.")[0]) @@ -228,7 +242,7 @@ public void testExcludedPeriodsForScaling() throws Exception { new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); } @Test @@ -237,6 +251,12 @@ public void testBlockScalingOnFailedResourceCheck() throws Exception { var source = JobVertexID.fromHexString(sourceHexString); var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7"; var sink = JobVertexID.fromHexString(sinkHexString); + + JobTopology jobTopology = + new JobTopology( + new VertexInfo(source, Map.of(), 10, 1000, false, null), + new VertexInfo(sink, Map.of(source, "HASH"), 10, 1000, false, null)); + var now = Instant.now(); var metrics = new EvaluatedMetrics( @@ -257,7 +277,7 @@ public void testBlockScalingOnFailedResourceCheck() throws Exception { new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); scalingDecisionExecutor = new ScalingExecutor<>( @@ -282,7 +302,7 @@ public boolean trySchedule( new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); } @Test @@ -334,9 +354,9 @@ public void testMemoryTuning() throws Exception { TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "0.652", TaskManagerOptions.NETWORK_MEMORY_MIN.key(), - "25 mb", + "23040 kb", TaskManagerOptions.NETWORK_MEMORY_MAX.key(), - "25 mb", + "23040 kb", TaskManagerOptions.JVM_METASPACE.key(), "360 mb", TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(), @@ -344,7 +364,7 @@ public void testMemoryTuning() throws Exception { TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(), "0 bytes", TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(), - "7681 mb")); + "7862784 kb")); } @ParameterizedTest @@ -368,6 +388,10 @@ public void testScalingEventsWithDefaultIntervalConfig(boolean scalingEnabled) private void testScalingEvents(boolean scalingEnabled, Duration interval) throws Exception { var jobVertexID = new JobVertexID(); + + JobTopology jobTopology = + new JobTopology(new VertexInfo(jobVertexID, Map.of(), 10, 1000, false, null)); + var conf = context.getConfiguration(); conf.set(AutoScalerOptions.SCALING_ENABLED, scalingEnabled); if (interval != null) { @@ -386,7 +410,7 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); assertEquals( scalingEnabled, scalingDecisionExecutor.scaleResource( @@ -395,7 +419,7 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); int expectedSize = (interval == null || interval.toMillis() > 0) && !scalingEnabled ? 1 : 2; assertEquals(expectedSize, eventCollector.events.size()); @@ -434,7 +458,7 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws new HashMap<>(), new ScalingTracking(), now, - new JobTopology())); + jobTopology)); var event2 = eventCollector.events.poll(); assertThat(event2).isNotNull(); assertThat(event2.getContext()).isSameAs(event.getContext()); @@ -450,6 +474,8 @@ public void testScalingUnderGcPressure() throws Exception { conf.set(AutoScalerOptions.HEAP_USAGE_THRESHOLD, 0.8); var vertexMetrics = Map.of(jobVertexID, evaluated(1, 110, 100)); + JobTopology jobTopology = + new JobTopology(new VertexInfo(jobVertexID, Map.of(), 10, 1000, false, null)); var metrics = new EvaluatedMetrics( vertexMetrics, @@ -467,7 +493,7 @@ public void testScalingUnderGcPressure() throws Exception { new HashMap<>(), new ScalingTracking(), Instant.now(), - new JobTopology())); + jobTopology)); // Just below the thresholds metrics = @@ -485,7 +511,7 @@ public void testScalingUnderGcPressure() throws Exception { new HashMap<>(), new ScalingTracking(), Instant.now(), - new JobTopology())); + jobTopology)); eventCollector.events.clear(); @@ -505,7 +531,7 @@ public void testScalingUnderGcPressure() throws Exception { new HashMap<>(), new ScalingTracking(), Instant.now(), - new JobTopology())); + jobTopology)); assertEquals("MemoryPressure", eventCollector.events.poll().getReason()); assertTrue(eventCollector.events.isEmpty()); @@ -525,7 +551,7 @@ public void testScalingUnderGcPressure() throws Exception { new HashMap<>(), new ScalingTracking(), Instant.now(), - new JobTopology())); + jobTopology)); assertEquals("MemoryPressure", eventCollector.events.poll().getReason()); assertTrue(eventCollector.events.isEmpty()); }