Skip to content

Commit

Permalink
Address Gyula's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Aug 30, 2024
1 parent 68c8c46 commit b6726b5
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,28 @@ public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandl
this.autoScalerEventHandler = autoScalerEventHandler;
}

/** The parallelism change type of {@link ParallelismChange}. */
public enum ParallelismChangeType {
NO_CHANGE,
REQUIRED_CHANGE,
OPTIONAL_CHANGE;
}

/**
* The rescaling will be triggered if any vertex's ParallelismResult is required. This means
* that if all vertices' ParallelismResult is optional, rescaling will be ignored.
* The rescaling will be triggered if any vertex's ParallelismChange is required. This means
* that if all vertices' ParallelismChange is optional, rescaling will be ignored.
*/
@Getter
public static class ParallelismResult {
public static class ParallelismChange {

private static final ParallelismChange NO_CHANGE =
new ParallelismChange(ParallelismChangeType.NO_CHANGE, -1);

// This rescale is required or optional.
private final boolean required;
private final ParallelismChangeType changeType;
private final int newParallelism;

private ParallelismResult(boolean required, int newParallelism) {
this.required = required;
private ParallelismChange(ParallelismChangeType changeType, int newParallelism) {
this.changeType = changeType;
this.newParallelism = newParallelism;
}

Expand All @@ -98,35 +107,39 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
ParallelismResult that = (ParallelismResult) o;
return required == that.required && newParallelism == that.newParallelism;
ParallelismChange that = (ParallelismChange) o;
return changeType == that.changeType && newParallelism == that.newParallelism;
}

@Override
public int hashCode() {
return Objects.hash(required, newParallelism);
return Objects.hash(changeType, newParallelism);
}

@Override
public String toString() {
return "ParallelismResult{"
+ "required="
+ required
return "ParallelismChange{"
+ "changeType="
+ changeType
+ ", newParallelism="
+ newParallelism
+ '}';
}

public static ParallelismResult required(int newParallelism) {
return new ParallelismResult(true, newParallelism);
public static ParallelismChange required(int newParallelism) {
return new ParallelismChange(ParallelismChangeType.REQUIRED_CHANGE, newParallelism);
}

public static ParallelismChange optional(int newParallelism) {
return new ParallelismChange(ParallelismChangeType.OPTIONAL_CHANGE, newParallelism);
}

public static ParallelismResult optional(int newParallelism) {
return new ParallelismResult(false, newParallelism);
public static ParallelismChange noChange() {
return NO_CHANGE;
}
}

public ParallelismResult computeScaleTargetParallelism(
public ParallelismChange computeScaleTargetParallelism(
Context context,
JobVertexID vertex,
Collection<ShipStrategy> inputShipStrategies,
Expand All @@ -141,7 +154,7 @@ public ParallelismResult computeScaleTargetParallelism(
LOG.warn(
"True processing rate is not available for {}, cannot compute new parallelism",
vertex);
return ParallelismResult.optional(currentParallelism);
return ParallelismChange.noChange();
}

double targetCapacity =
Expand All @@ -151,7 +164,7 @@ public ParallelismResult computeScaleTargetParallelism(
LOG.warn(
"Target data rate is not available for {}, cannot compute new parallelism",
vertex);
return ParallelismResult.optional(currentParallelism);
return ParallelismChange.noChange();
}

LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
Expand Down Expand Up @@ -191,7 +204,7 @@ public ParallelismResult computeScaleTargetParallelism(
// Clear delayed scale down request if the new parallelism is equal to
// currentParallelism.
delayedScaleDown.clearVertex(vertex);
return ParallelismResult.optional(currentParallelism);
return ParallelismChange.noChange();
}

// We record our expectations for this scaling operation
Expand All @@ -210,7 +223,7 @@ public ParallelismResult computeScaleTargetParallelism(
delayedScaleDown);
}

private ParallelismResult detectBlockScaling(
private ParallelismChange detectBlockScaling(
Context context,
JobVertexID vertex,
Configuration conf,
Expand All @@ -232,7 +245,7 @@ private ParallelismResult detectBlockScaling(

// If we don't have past scaling actions for this vertex, don't block scale up.
if (history.isEmpty()) {
return ParallelismResult.required(newParallelism);
return ParallelismChange.required(newParallelism);
}

var lastSummary = history.get(history.lastKey());
Expand All @@ -241,38 +254,38 @@ private ParallelismResult detectBlockScaling(
&& detectIneffectiveScaleUp(
context, vertex, conf, evaluatedMetrics, lastSummary)) {
// Block scale up when last rescale is ineffective.
return ParallelismResult.optional(currentParallelism);
return ParallelismChange.noChange();
}

return ParallelismResult.required(newParallelism);
return ParallelismChange.required(newParallelism);
} else {
return detectImmediateScaleDown(delayedScaleDown, vertex, conf, newParallelism);
return applyScaleDownInterval(delayedScaleDown, vertex, conf, newParallelism);
}
}

private ParallelismResult detectImmediateScaleDown(
private ParallelismChange applyScaleDownInterval(
DelayedScaleDown delayedScaleDown,
JobVertexID vertex,
Configuration conf,
int newParallelism) {
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
if (scaleDownInterval.toMillis() <= 0) {
// The scale down interval is disable, so don't block scaling.
return ParallelismResult.required(newParallelism);
return ParallelismChange.required(newParallelism);
}

var firstTriggerTime = delayedScaleDown.getFirstTriggerTimeForVertex(vertex);
if (firstTriggerTime.isEmpty()) {
LOG.info("The scale down request is delayed for {}", vertex);
LOG.info("The scale down of {} is delayed by {}.", vertex, scaleDownInterval);
delayedScaleDown.updateTriggerTime(vertex, clock.instant());
return ParallelismResult.optional(newParallelism);
return ParallelismChange.optional(newParallelism);
}

if (clock.instant().isBefore(firstTriggerTime.get().plus(scaleDownInterval))) {
LOG.debug("Try to skip immediate scale down within scale-down interval for {}", vertex);
return ParallelismResult.optional(newParallelism);
return ParallelismChange.optional(newParallelism);
} else {
return ParallelismResult.required(newParallelism);
return ParallelismChange.required(newParallelism);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.Set;
import java.util.SortedMap;

import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.NO_CHANGE;
import static org.apache.flink.autoscaler.JobVertexScaler.ParallelismChangeType.REQUIRED_CHANGE;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.EXCLUDED_PERIODS;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
Expand Down Expand Up @@ -182,7 +184,7 @@ private void updateRecommendedParallelism(
static boolean allRequiredVerticesWithinUtilizationTarget(
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
Set<JobVertexID> requiredVertices) {
// All vertices' ParallelismResult is optional, rescaling will be ignored.
// All vertices' ParallelismChange is optional, rescaling will be ignored.
if (requiredVertices.isEmpty()) {
return true;
}
Expand Down Expand Up @@ -248,7 +250,7 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
var currentParallelism =
(int) metrics.get(ScalingMetric.PARALLELISM).getCurrent();

var parallelismResult =
var parallelismChange =
jobVertexScaler.computeScaleTargetParallelism(
context,
v,
Expand All @@ -258,17 +260,16 @@ Map<JobVertexID, ScalingSummary> computeScalingSummary(
v, Collections.emptySortedMap()),
restartTime,
delayedScaleDown);
if (currentParallelism == parallelismResult.getNewParallelism()) {
if (NO_CHANGE == parallelismChange.getChangeType()) {
return;
}
if (parallelismResult.isRequired()) {
} else if (REQUIRED_CHANGE == parallelismChange.getChangeType()) {
requiredVertices.add(v);
}
out.put(
v,
new ScalingSummary(
currentParallelism,
parallelismResult.getNewParallelism(),
parallelismChange.getNewParallelism(),
metrics));
}
});
Expand Down
Loading

0 comments on commit b6726b5

Please sign in to comment.