Skip to content

Commit

Permalink
Cleanup SpoolingOutputStats
Browse files Browse the repository at this point in the history
Makes rowCount final instead of a volatile AtomicLong, and avoids
repeated atomic updates inside of the SpoolingExchangeOutputBuffer in
favor of a single final update outside of the loop.
  • Loading branch information
pettyjamesm authored and wendigo committed Sep 19, 2024
1 parent f2f9d9e commit 9bdfe96
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,17 @@ public void enqueue(int partition, List<Slice> pages)
ExchangeSink sink = exchangeSink;
checkState(sink != null, "exchangeSink is null");
long dataSizeInBytes = 0;
long addedPositions = 0;
for (Slice page : pages) {
dataSizeInBytes += getSerializedPageUncompressedSizeInBytes(page);
addedPositions += getSerializedPagePositionCount(page);
sink.add(partition, page);
int serializedPagePositionCount = getSerializedPagePositionCount(page);
totalRowsAdded.addAndGet(serializedPagePositionCount);
outputStats.updateRowCount(serializedPagePositionCount);
}
updateMemoryUsage(sink.getMemoryUsage());
totalPagesAdded.addAndGet(pages.size());
totalRowsAdded.addAndGet(addedPositions);
outputStats.updateRowCount(addedPositions);
outputStats.updatePartitionDataSize(partition, dataSizeInBytes);
updateMemoryUsage(sink.getMemoryUsage());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@
public class SpoolingOutputStats
{
private final int partitionCount;
private final AtomicLong rowCount = new AtomicLong();
private volatile AtomicLongArray partitionDataSizes;
private volatile Snapshot finalSnapshot;
private volatile AtomicLong rowCount;

public SpoolingOutputStats(int partitionCount)
{
checkArgument(partitionCount > 0, "partitionCount must be greater than zero");
this.partitionCount = partitionCount;
partitionDataSizes = new AtomicLongArray(partitionCount);
rowCount = new AtomicLong();
}

public void updatePartitionDataSize(int partition, long dataSizeInBytes)
Expand All @@ -56,7 +55,7 @@ public void updatePartitionDataSize(int partition, long dataSizeInBytes)
}
}

public void updateRowCount(int rowCount)
public void updateRowCount(long rowCount)
{
this.rowCount.addAndGet(rowCount);
}
Expand Down

0 comments on commit 9bdfe96

Please sign in to comment.