Skip to content

Commit

Permalink
Prepare the ScalingExecutorTest
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Aug 28, 2024
1 parent 48e3657 commit 24e5f3a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,16 @@ private void updateRecommendedParallelism(
scalingSummary.getNewParallelism())));
}

protected static boolean allRequiredVerticesWithinUtilizationTarget(
@VisibleForTesting
static boolean allRequiredVerticesWithinUtilizationTarget(
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Map<JobVertexID, ScalingSummary> scalingSummaries,
Set<JobVertexID> requiredVertices) {
// All vertices' ParallelismResult is optional, rescaling will be ignored.
if (requiredVertices.isEmpty()) {
return true;
}

for (Map.Entry<JobVertexID, ScalingSummary> entry : scalingSummaries.entrySet()) {
var vertex = entry.getKey();
if (!requiredVertices.contains(vertex)) {
// Don't need to check optional vertex.
continue;
}

for (JobVertexID vertex : requiredVertices) {
var metrics = evaluatedMetrics.get(vertex);

double trueProcessingRate = metrics.get(TRUE_PROCESSING_RATE).getAverage();
Expand Down Expand Up @@ -275,19 +273,10 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
}
});

// TODO : 4. all tasks with parallelism changing are optional
// All vertices' ParallelismResult is optional, rescaling will be ignored.
if (requiredVertices.isEmpty()) {
return Map.of();
}

// TODO : 1. add test for only optional tasks are out of scope.
// TODO : 2. all task are out of scope.
// TODO : 3. one required task is out of scope.
// If the Utilization of all required tasks is within range, we can skip scaling.
// It means that if only optional tasks are out of scope, we still need to ignore scale.
if (allRequiredVerticesWithinUtilizationTarget(
evaluatedMetrics.getVertexMetrics(), out, requiredVertices)) {
evaluatedMetrics.getVertexMetrics(), requiredVertices)) {
return Map.of();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -119,7 +120,7 @@ protected boolean scalingWouldExceedMaxResources(
}

@Test
public void testUtilizationBoundaries() throws Exception {
public void testUtilizationBoundariesForAllRequiredVertices() throws Exception {
// Restart time should not affect utilization boundary
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
Expand All @@ -131,55 +132,75 @@ public void testUtilizationBoundaries() throws Exception {
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);

var evaluated = Map.of(op1, evaluated(1, 70, 100));
var scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1)));
assertFalse(
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
evaluated, scalingSummary, scalingSummary.keySet()));
evaluated, evaluated.keySet()));

conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.2);
evaluated = Map.of(op1, evaluated(1, 70, 100));
scalingSummary = Map.of(op1, new ScalingSummary(2, 1, evaluated.get(op1)));
assertTrue(
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
evaluated, scalingSummary, scalingSummary.keySet()));
evaluated, evaluated.keySet()));
assertTrue(getScaledParallelism(stateStore, context).isEmpty());

var op2 = new JobVertexID();
evaluated =
Map.of(
op1, evaluated(1, 70, 100),
op2, evaluated(1, 85, 100));
scalingSummary =
Map.of(
op1,
new ScalingSummary(1, 2, evaluated.get(op1)),
op2,
new ScalingSummary(1, 2, evaluated.get(op2)));

assertFalse(
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
evaluated, scalingSummary, scalingSummary.keySet()));
evaluated, evaluated.keySet()));

evaluated =
Map.of(
op1, evaluated(1, 70, 100),
op2, evaluated(1, 70, 100));
scalingSummary =
Map.of(
op1,
new ScalingSummary(1, 2, evaluated.get(op1)),
op2,
new ScalingSummary(1, 2, evaluated.get(op2)));
assertTrue(
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
evaluated, scalingSummary, scalingSummary.keySet()));
evaluated, evaluated.keySet()));

// Test with backlog based scaling
evaluated = Map.of(op1, evaluated(1, 70, 100, 15));
scalingSummary = Map.of(op1, new ScalingSummary(1, 2, evaluated.get(op1)));
assertFalse(
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
evaluated, scalingSummary, scalingSummary.keySet()));
evaluated, evaluated.keySet()));
}

@Test
public void testUtilizationBoundariesWithOptionalVertex() {
// Restart time should not affect utilization boundary
var conf = context.getConfiguration();
conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
var op1 = new JobVertexID();
var op2 = new JobVertexID();

// All vertices are optional
conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.);

var evaluated =
Map.of(
op1, evaluated(1, 70, 100),
op2, evaluated(1, 85, 100));

assertTrue(ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of()));

// One vertex is required, and it's out of range.
assertFalse(
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of(op1)));

// One vertex is required, and it's within the range.
// The op2 is optional, so it shouldn't affect the scaling even if it is out of range,
conf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
evaluated =
Map.of(
op1, evaluated(1, 65, 100),
op2, evaluated(1, 85, 100));
assertTrue(
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(evaluated, Set.of(op1)));
}

@Test
Expand All @@ -203,17 +224,9 @@ public void testNoScaleDownOnZeroLowerUtilizationBoundary() throws Exception {
Map.of(vertex, evaluated(parallelism, targetRate, trueProcessingRate)),
dummyGlobalMetrics);

// Verify precondition
var scalingSummary =
Map.of(
vertex,
new ScalingSummary(
parallelism,
expectedParallelism,
evaluated.getVertexMetrics().get(vertex)));
assertTrue(
ScalingExecutor.allRequiredVerticesWithinUtilizationTarget(
evaluated.getVertexMetrics(), scalingSummary, scalingSummary.keySet()));
evaluated.getVertexMetrics(), evaluated.getVertexMetrics().keySet()));

// Execute the full scaling path
var now = Instant.now();
Expand Down

0 comments on commit 24e5f3a

Please sign in to comment.