Skip to content

Commit

Permalink
[FLINK-34504][autoscaler] Avoid the parallelism adjustment when the u…
Browse files Browse the repository at this point in the history
…pstream shuffle type doesn't have keyBy
  • Loading branch information
1996fanrui committed Feb 23, 2024
1 parent 304fca8 commit e9a9d02
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
public int computeScaleTargetParallelism(
Context context,
JobVertexID vertex,
boolean hasKeyBy,
Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
SortedMap<Instant, ScalingSummary> history,
Duration restartTime) {
Expand Down Expand Up @@ -124,7 +125,8 @@ public int computeScaleTargetParallelism(
(int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
scaleFactor,
Math.min(currentParallelism, conf.getInteger(VERTEX_MIN_PARALLELISM)),
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)));
Math.max(currentParallelism, conf.getInteger(VERTEX_MAX_PARALLELISM)),
hasKeyBy);

if (newParallelism == currentParallelism
|| blockScalingBasedOnPastActions(
Expand Down Expand Up @@ -251,7 +253,8 @@ protected static int scale(
int numKeyGroups,
double scaleFactor,
int minParallelism,
int maxParallelism) {
int maxParallelism,
boolean hasKeyBy) {
Preconditions.checkArgument(
minParallelism <= maxParallelism,
"The minimum parallelism must not be greater than the maximum parallelism.");
Expand Down Expand Up @@ -280,11 +283,14 @@ protected static int scale(
// Apply min/max parallelism
newParallelism = Math.min(Math.max(minParallelism, newParallelism), upperBound);

// Try to adjust the parallelism such that it divides the number of key groups without a
// remainder => state is evenly spread across subtasks
for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
if (numKeyGroups % p == 0) {
return p;
// When the shuffle type of vertex data source contains keyBy, we try to adjust the
// parallelism such that it divides the number of key groups without a remainder =>
// data is evenly spread across subtasks
if (hasKeyBy) {
for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; p++) {
if (numKeyGroups % p == 0) {
return p;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public boolean scaleResource(
var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);

var scalingSummaries =
computeScalingSummary(context, evaluatedMetrics, scalingHistory, restartTime);
computeScalingSummary(
context, evaluatedMetrics, scalingHistory, restartTime, jobTopology);

if (scalingSummaries.isEmpty()) {
LOG.info("All job vertices are currently running at their target parallelism.");
Expand Down Expand Up @@ -203,7 +204,8 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
Context context,
EvaluatedMetrics evaluatedMetrics,
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory,
Duration restartTime) {
Duration restartTime,
JobTopology jobTopology) {
LOG.debug("Restart time used in scaling summary computation: {}", restartTime);

if (isJobUnderMemoryPressure(context, evaluatedMetrics.getGlobalMetrics())) {
Expand All @@ -225,10 +227,14 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
} else {
var currentParallelism =
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();

final boolean hasKeyBy =
jobTopology.get(v).getInputs().containsValue("HASH");
var newParallelism =
jobVertexScaler.computeScaleTargetParallelism(
context,
v,
hasKeyBy,
metrics,
scalingHistory.getOrDefault(
v, Collections.emptySortedMap()),
Expand Down
Loading

0 comments on commit e9a9d02

Please sign in to comment.