Skip to content

Commit

Permalink
Prepare the JobVertexScalerTest
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Aug 28, 2024
1 parent 6e67526 commit b06fe14
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.time.ZoneId;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.SortedMap;

import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
Expand Down Expand Up @@ -89,6 +90,33 @@ private ParallelismResult(boolean required, int newParallelism) {
this.newParallelism = newParallelism;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ParallelismResult that = (ParallelismResult) o;
return required == that.required && newParallelism == that.newParallelism;
}

@Override
public int hashCode() {
return Objects.hash(required, newParallelism);
}

@Override
public String toString() {
return "ParallelismResult{"
+ "required="
+ required
+ ", newParallelism="
+ newParallelism
+ '}';
}

public static ParallelismResult required(int newParallelism) {
return new ParallelismResult(true, newParallelism);
}
Expand Down Expand Up @@ -166,7 +194,6 @@ public ParallelismResult computeScaleTargetParallelism(
return ParallelismResult.optional(currentParallelism);
}

// TODO The rescaling may not happen, what happen if we always put it.
// We record our expectations for this scaling operation
evaluatedMetrics.put(
ScalingMetric.EXPECTED_PROCESSING_RATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,15 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
}
});

// TODO : 4. all tasks with parallelism changing are optional
// All vertices' ParallelismResult is optional, rescaling will be ignored.
if (requiredVertices.isEmpty()) {
return Map.of();
}

// TODO : add test for only optional tasks are out of scope.
// TODO : 1. add test for only optional tasks are out of scope.
// TODO : 2. all task are out of scope.
// TODO : 3. one required task is out of scope.
// If the Utilization of all required tasks is within range, we can skip scaling.
// It means that if only optional tasks are out of scope, we still need to ignore scale.
if (allRequiredVerticesWithinUtilizationTarget(
Expand Down
Loading

0 comments on commit b06fe14

Please sign in to comment.