Skip to content

Commit

Permalink
Address Max's comment
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Feb 23, 2024
1 parent 4fd2654 commit 80be610
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,23 +255,27 @@ private static MemorySize adjustNetworkMemory(
for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
// Add max amount of memory for each input gate
for (Map.Entry<JobVertexID, String> inputEntry : vertexInfo.getInputs().entrySet()) {
final JobVertexID inputVertexId = inputEntry.getKey();
final String shipStrategy = inputEntry.getValue();
maxNetworkMemory +=
calculateNetworkSegmentNumber(
updatedParallelisms.get(vertexInfo.getId()),
updatedParallelisms.get(inputEntry.getKey()),
inputEntry.getValue(),
updatedParallelisms.get(inputVertexId),
shipStrategy,
buffersPerChannel,
floatingBuffers)
* memorySegmentBytes;
}
// Add max amount of memory for each output gate
// Usually, there is just one output per task
for (Map.Entry<JobVertexID, String> outputEntry : vertexInfo.getOutputs().entrySet()) {
final JobVertexID outputVertexId = outputEntry.getKey();
final String shipStrategy = outputEntry.getValue();
maxNetworkMemory +=
calculateNetworkSegmentNumber(
updatedParallelisms.get(vertexInfo.getId()),
updatedParallelisms.get(outputEntry.getKey()),
outputEntry.getValue(),
updatedParallelisms.get(outputVertexId),
shipStrategy,
buffersPerChannel,
floatingBuffers)
* memorySegmentBytes;
Expand Down Expand Up @@ -305,7 +309,7 @@ static int calculateNetworkSegmentNumber(
return buffersPerChannel + floatingBuffers;
} else if ("FORWARD".equals(shipStrategy) || "RESCALE".equals(shipStrategy)) {
final int channelCount =
(int) Math.ceil(1.0d * otherVertexParallelism / currentVertexParallelism);
(int) Math.ceil(otherVertexParallelism / (double) currentVertexParallelism);
return channelCount * buffersPerChannel + floatingBuffers;
} else {
return otherVertexParallelism * buffersPerChannel + floatingBuffers;
Expand Down

0 comments on commit 80be610

Please sign in to comment.