diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java index 5292967518a93..61cc306a33399 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingExchangeOutputBuffer.java @@ -192,16 +192,17 @@ public void enqueue(int partition, List pages) ExchangeSink sink = exchangeSink; checkState(sink != null, "exchangeSink is null"); long dataSizeInBytes = 0; + long addedPositions = 0; for (Slice page : pages) { dataSizeInBytes += getSerializedPageUncompressedSizeInBytes(page); + addedPositions += getSerializedPagePositionCount(page); sink.add(partition, page); - int serializedPagePositionCount = getSerializedPagePositionCount(page); - totalRowsAdded.addAndGet(serializedPagePositionCount); - outputStats.updateRowCount(serializedPagePositionCount); } - updateMemoryUsage(sink.getMemoryUsage()); totalPagesAdded.addAndGet(pages.size()); + totalRowsAdded.addAndGet(addedPositions); + outputStats.updateRowCount(addedPositions); outputStats.updatePartitionDataSize(partition, dataSizeInBytes); + updateMemoryUsage(sink.getMemoryUsage()); } @Override diff --git a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputStats.java b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputStats.java index e9ed348c2997b..491b9e4aa206e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/buffer/SpoolingOutputStats.java @@ -34,16 +34,15 @@ public class SpoolingOutputStats { private final int partitionCount; + private final AtomicLong rowCount = new AtomicLong(); private volatile AtomicLongArray partitionDataSizes; private volatile Snapshot finalSnapshot; - private volatile AtomicLong rowCount; public SpoolingOutputStats(int partitionCount) { checkArgument(partitionCount > 0, "partitionCount must be greater than zero"); this.partitionCount = partitionCount; partitionDataSizes = new AtomicLongArray(partitionCount); - rowCount = new AtomicLong(); } public void updatePartitionDataSize(int partition, long dataSizeInBytes) @@ -56,7 +55,7 @@ public void updatePartitionDataSize(int partition, long dataSizeInBytes) } } - public void updateRowCount(int rowCount) + public void updateRowCount(long rowCount) { this.rowCount.addAndGet(rowCount); }