Skip to content

Commit

Permalink
Address Max's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Feb 23, 2024
1 parent 4fd2654 commit 466c1fa
Showing 1 changed file with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,23 +255,27 @@ private static MemorySize adjustNetworkMemory(
for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
// Add max amount of memory for each input gate
for (Map.Entry<JobVertexID, String> inputEntry : vertexInfo.getInputs().entrySet()) {
final JobVertexID inputVertexId = inputEntry.getKey();
final String shipStrategy = inputEntry.getValue();
maxNetworkMemory +=
calculateNetworkSegmentNumber(
updatedParallelisms.get(vertexInfo.getId()),
updatedParallelisms.get(inputEntry.getKey()),
inputEntry.getValue(),
updatedParallelisms.get(inputVertexId),
shipStrategy,
buffersPerChannel,
floatingBuffers)
* memorySegmentBytes;
}
// Add max amount of memory for each output gate
// Usually, there is just one output per task
for (Map.Entry<JobVertexID, String> outputEntry : vertexInfo.getOutputs().entrySet()) {
final JobVertexID outputVertexId = outputEntry.getKey();
final String shipStrategy = outputEntry.getValue();
maxNetworkMemory +=
calculateNetworkSegmentNumber(
updatedParallelisms.get(vertexInfo.getId()),
updatedParallelisms.get(outputEntry.getKey()),
outputEntry.getValue(),
updatedParallelisms.get(outputVertexId),
shipStrategy,
buffersPerChannel,
floatingBuffers)
* memorySegmentBytes;
Expand All @@ -290,25 +294,26 @@ private static MemorySize adjustNetworkMemory(
* Calculate how many network segment current vertex needs.
*
* @param currentVertexParallelism The parallelism of current vertex.
* @param otherVertexParallelism The parallelism of other vertex.
* @param connectedVertexParallelism The parallelism of connected vertex.
*/
@VisibleForTesting
static int calculateNetworkSegmentNumber(
int currentVertexParallelism,
int otherVertexParallelism,
int connectedVertexParallelism,
String shipStrategy,
int buffersPerChannel,
int floatingBuffers) {
// TODO When the parallelism is changed via the rescale api, the FORWARD may be changed to
// RESCALE. This logic may needs to be updated after FLINK-33123.
if (currentVertexParallelism == otherVertexParallelism && "FORWARD".equals(shipStrategy)) {
if (currentVertexParallelism == connectedVertexParallelism
&& "FORWARD".equals(shipStrategy)) {
return buffersPerChannel + floatingBuffers;
} else if ("FORWARD".equals(shipStrategy) || "RESCALE".equals(shipStrategy)) {
final int channelCount =
(int) Math.ceil(1.0d * otherVertexParallelism / currentVertexParallelism);
(int) Math.ceil(connectedVertexParallelism / (double) currentVertexParallelism);
return channelCount * buffersPerChannel + floatingBuffers;
} else {
return otherVertexParallelism * buffersPerChannel + floatingBuffers;
return connectedVertexParallelism * buffersPerChannel + floatingBuffers;
}
}

Expand Down

0 comments on commit 466c1fa

Please sign in to comment.