From 8104378b6fef74e3501212d2ea71f7c80e348b7c Mon Sep 17 00:00:00 2001 From: vaibhav5140 Date: Tue, 1 Jul 2025 08:59:04 +0100 Subject: [PATCH 1/5] IoStatistics for AAL --- .../fs/statistics/StreamStatisticNames.java | 6 + .../hadoop/fs/s3a/S3AInstrumentation.java | 11 ++ .../org/apache/hadoop/fs/s3a/Statistic.java | 8 + .../streams/AnalyticsRequestCallback.java | 30 ++++ .../fs/s3a/impl/streams/AnalyticsStream.java | 27 +++- .../statistics/S3AInputStreamStatistics.java | 9 +- .../impl/EmptyS3AStatisticsContext.java | 7 +- ...tS3AAnalyticsAcceleratorStreamReading.java | 148 +++++++++++++++++- .../ITestS3AContractStreamIOStatistics.java | 10 -- 9 files changed, 241 insertions(+), 15 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 09c19ad071a98..35692a54c0f38 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -132,6 +132,12 @@ public final class StreamStatisticNames { public static final String STREAM_READ_OPERATIONS = "stream_read_operations"; + /** Analytics GET requests made by stream. */ + public static final String STREAM_READ_ANALYTICS_GET_REQUESTS = "stream_read_analytics_get_requests"; + + /** Analytics HEAD requests made by stream. */ + public static final String STREAM_READ_ANALYTICS_HEAD_REQUESTS = "stream_read_analytics_head_requests"; + /** * Count of readVectored() operations in an input stream. * Value: {@value}. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index b3c907428ac4e..f74987703a0b9 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -872,6 +872,8 @@ private InputStreamStatistics( StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS, StreamStatisticNames.STREAM_READ_OPENED, StreamStatisticNames.STREAM_READ_BYTES, + StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS, + StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS, StreamStatisticNames.STREAM_READ_EXCEPTIONS, StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS, StreamStatisticNames.STREAM_READ_OPERATIONS, @@ -1128,6 +1130,15 @@ public void readVectoredBytesDiscarded(int discarded) { bytesDiscardedInVectoredIO.addAndGet(discarded); } + @Override + public void incrementAnalyticsGetRequests() { + increment(StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS); + } + @Override + public void incrementAnalyticsHeadRequests() { + increment(StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS); + } + @Override public void executorAcquired(Duration timeInQueue) { // update the duration fields in the IOStatistics. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 6389742167def..e9582b15c8045 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -459,6 +459,14 @@ public enum Statistic { "Gauge of active memory in use", TYPE_GAUGE), + ANALYTICS_GET_REQUESTS( + StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS, + "GET requests made by analytics streams", + TYPE_COUNTER), + ANALYTICS_HEAD_REQUESTS( + StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS, + "HEAD requests made by analytics streams", + TYPE_COUNTER), /* Stream Write statistics */ STREAM_WRITE_EXCEPTIONS( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java new file mode 100644 index 0000000000000..0b3c8f8ee0b0d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java @@ -0,0 +1,30 @@ +package org.apache.hadoop.fs.s3a.impl.streams; + +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; + +/** + * Implementation of AAL's RequestCallback interface that tracks analytics operations. + */ +public class AnalyticsRequestCallback implements RequestCallback { + private final S3AInputStreamStatistics statistics; + + /** + * Create a new callback instance. + * @param statistics the statistics to update + */ + public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) { + this.statistics = statistics; + } + + @Override + public void onGetRequest() { + statistics.incrementAnalyticsGetRequests(); + } + + @Override + public void onHeadRequest() { + statistics.incrementAnalyticsHeadRequests(); + } +} + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 6b910c6538070..59add12483916 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -28,6 +28,7 @@ import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; +import software.amazon.s3.analyticsaccelerator.util.RequestCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +64,8 @@ public AnalyticsStream(final ObjectReadParameters parameters, @Override public int read() throws IOException { throwIfClosed(); + getS3AStreamStatistics().readOperationStarted(getPos(), 1); + int bytesRead; try { bytesRead = inputStream.read(); @@ -70,6 +73,11 @@ public int read() throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead != -1) { + getS3AStreamStatistics().bytesRead(1); + } + return bytesRead; } @@ -105,6 +113,8 @@ public synchronized long getPos() { */ public int readTail(byte[] buf, int off, int len) throws IOException { throwIfClosed(); + getS3AStreamStatistics().readOperationStarted(getPos(), len); + int bytesRead; try { bytesRead = inputStream.readTail(buf, off, len); @@ -112,12 +122,19 @@ public int readTail(byte[] buf, int off, int len) throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead > 0) { + getS3AStreamStatistics().bytesRead(bytesRead); + } + return bytesRead; } @Override public int read(byte[] buf, int off, int len) throws IOException { throwIfClosed(); + getS3AStreamStatistics().readOperationStarted(getPos(), len); + int bytesRead; try { bytesRead = inputStream.read(buf, off, len); @@ -125,6 +142,11 @@ public int read(byte[] buf, int off, int len) throws IOException { onReadFailure(ioe); throw ioe; } + + if (bytesRead > 0) { + getS3AStreamStatistics().bytesRead(bytesRead); + } + return bytesRead; } @@ -194,10 +216,13 @@ private void onReadFailure(IOException ioe) throws IOException { } private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) { + + final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics()); + OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder = OpenStreamInformation.builder() .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext() - .getInputPolicy())); + .getInputPolicy())).requestCallback(requestCallback); if (parameters.getObjectAttributes().getETag() != null) { openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder() diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index 7ad7cf75367e2..34f0e72fe2861 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -190,7 +190,14 @@ void readVectoredOperationStarted(int numIncomingRanges, long getVersionMismatches(); long getInputPolicy(); - + /** + * Increment the counter for GET requests made by Analytics Accelerator Library. + */ + void incrementAnalyticsGetRequests(); + /** + * Increment the counter for HEAD requests made by Analytics Accelerator Library. + */ + void incrementAnalyticsHeadRequests(); /** * Get the value of a counter. * @param name counter name diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 26b9f2b1568ca..48739181efee6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -159,7 +159,12 @@ public void seekForwards(final long skipped, final long bytesReadInSeek) { } - + @Override + public void incrementAnalyticsGetRequests() { + } + @Override + public void incrementAnalyticsHeadRequests() { + } @Override public long streamOpened() { return 0; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index dff171bbdd8eb..d289e3640eae1 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -47,6 +47,11 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; + +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -104,6 +109,12 @@ public void testConnectorFrameWorkIntegration() throws Throwable { Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); Assertions.assertThat(objectInputStream.getInputPolicy()) .isEqualTo(S3AInputPolicy.Sequential); + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); + + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read").isEqualTo(500); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); @@ -129,11 +140,17 @@ public void testMalformedParquetFooter() throws IOException { byte[] buffer = new byte[500]; IOStatistics ioStats; + int bytesRead; try (FSDataInputStream inputStream = getFileSystem().open(dest)) { ioStats = inputStream.getIOStatistics(); inputStream.seek(5); - inputStream.read(buffer, 0, 500); + bytesRead = inputStream.read(buffer, 0, 500); + + ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream(); + long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read").isEqualTo(bytesRead); + } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); @@ -166,15 +183,19 @@ public void testMultiRowGroupParquet() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); try (FSDataInputStream inputStream = getFileSystem().openFile(dest) .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) .build().get()) { ioStats = inputStream.getIOStatistics(); inputStream.readFully(buffer, 0, (int) fileStatus.getLen()); + + verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen()); + verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } @Test @@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws Exception { () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } + @Test + public void testLargeFileMultipleGets() throws Throwable { + describe("Large file should trigger multiple GET requests"); + + Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 10MB + + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + inputStream.readFully(new byte[(int) getFileSystem().getFileStatus(dest).getLen()]); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2); + } + } + + @Test + public void testSmallFileSingleGet() throws Throwable { + describe("Small file should trigger only one GET request"); + + Path dest = writeThenReadFile("small-test-file.txt", 1 * 1024 * 1024); // 1KB + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + inputStream.readFully(new byte[(int) getFileSystem().getFileStatus(dest).getLen()]); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + } + } + + + @Test + public void testRandomSeekPatternGets() throws Throwable { + describe("Random seek pattern should optimize GET requests"); + + Path dest = writeThenReadFile("seek-test.txt", 100 * 1024); + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + + inputStream.seek(1000); + inputStream.read(new byte[100]); + + inputStream.seek(50000); + inputStream.read(new byte[100]); + + inputStream.seek(90000); + inputStream.read(new byte[100]); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + } +} + + @Test + public void testAALNeverMakesHeadRequests() throws Throwable { + describe("Prove AAL never makes HEAD requests - S3A provides all metadata"); + + Path dest = writeThenReadFile("no-head-test.txt", 1024 * 1024); // 1MB + + try (FSDataInputStream inputStream = getFileSystem().open(dest)) { + IOStatistics ioStats = inputStream.getIOStatistics(); + inputStream.read(new byte[1024]); + + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + + ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream(); + Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); + + } + } + + + @Test + public void testParquetReadingNoHeadRequests() throws Throwable { + describe("Parquet-optimized reading should not trigger AAL HEAD requests"); + + Path dest = path("parquet-head-test.parquet"); + File file = new File("src/test/resources/multi_row_group.parquet"); + Path sourcePath = new Path(file.toURI().getPath()); + getFileSystem().copyFromLocalFile(false, true, sourcePath, dest); + + try (FSDataInputStream stream = getFileSystem().openFile(dest) + .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) + .build().get()) { + + FileStatus fileStatus = getFileSystem().getFileStatus(dest); + stream.readFully(new byte[(int) fileStatus.getLen()]); + + IOStatistics stats = stream.getIOStatistics(); + + verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); + verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_OPENED, 1); + + verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + } + } + + + @Test + public void testConcurrentStreamsNoDuplicateGets() throws Throwable { + describe("Concurrent streams reading same object should not duplicate GETs"); + + Path dest = writeThenReadFile("concurrent-test.txt", 1 * 1024 * 1024); + + try (FSDataInputStream stream1 = getFileSystem().open(dest); + FSDataInputStream stream2 = getFileSystem().open(dest)) { + + byte[] buffer1 = new byte[1024]; + byte[] buffer2 = new byte[1024]; + + stream1.read(buffer1); + stream2.read(buffer2); + + IOStatistics stats1 = stream1.getIOStatistics(); + IOStatistics stats2 = stream2.getIOStatistics(); + + long totalGets = stats1.counters().getOrDefault( + STREAM_READ_ANALYTICS_GET_REQUESTS, 0L) + + stats2.counters().getOrDefault( + STREAM_READ_ANALYTICS_GET_REQUESTS, 0L); + Assertions.assertThat(totalGets).isEqualTo(1); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java index da2a39a986ea4..2bf35ad19704b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.statistics.StreamStatisticNames; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; /** @@ -78,13 +77,4 @@ public List outputStreamStatisticKeys() { STREAM_WRITE_BLOCK_UPLOADS, STREAM_WRITE_EXCEPTIONS); } - - @Override - public void testInputStreamStatisticRead() throws Throwable { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), - "Analytics Accelerator currently does not support stream statistics"); - super.testInputStreamStatisticRead(); - } } From 284b97dfa47c01dfea5418e65a3ae7fbcc765a0e Mon Sep 17 00:00:00 2001 From: vaibhav5140 Date: Wed, 2 Jul 2025 15:46:34 +0100 Subject: [PATCH 2/5] Enabled tests[IOStats] --- .../streams/AnalyticsRequestCallback.java | 22 ++++ .../fs/s3a/impl/streams/AnalyticsStream.java | 36 +++++- ...tS3AAnalyticsAcceleratorStreamReading.java | 119 +++++++----------- .../fs/s3a/ITestS3AIOStatisticsContext.java | 5 +- .../apache/hadoop/fs/s3a/ITestS3AMetrics.java | 4 - .../s3a/commit/ITestCommitOperationCost.java | 9 +- .../ITestS3AFileContextStatistics.java | 4 - .../fs/s3a/performance/ITestS3AOpenCost.java | 33 +++-- .../ITestS3AFileSystemStatistic.java | 4 - 9 files changed, 127 insertions(+), 109 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java index 0b3c8f8ee0b0d..ec36b9936c7da 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java @@ -1,6 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.fs.s3a.impl.streams; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; +import org.apache.hadoop.fs.statistics.DurationTracker; import software.amazon.s3.analyticsaccelerator.util.RequestCallback; /** @@ -20,6 +39,9 @@ public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) { @Override public void onGetRequest() { statistics.incrementAnalyticsGetRequests(); + // Update ACTION_HTTP_GET_REQUEST statistic + DurationTracker tracker = statistics.initiateGetRequest(); + tracker.close(); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 59add12483916..c0d61113b0abe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -49,6 +49,8 @@ public class AnalyticsStream extends ObjectInputStream implements StreamCapabili private S3SeekableInputStream inputStream; private long lastReadCurrentPos = 0; private volatile boolean closed; + private final long contentLength; + private final long lengthLimit; public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class); @@ -56,6 +58,8 @@ public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException { super(InputStreamType.Analytics, parameters); S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); + this.contentLength = s3Attributes.getLen(); + this.lengthLimit = s3Attributes.getLen(); this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenStreamInformation(parameters)); getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); @@ -64,6 +68,9 @@ public AnalyticsStream(final ObjectReadParameters parameters, @Override public int read() throws IOException { throwIfClosed(); + if (getPos() >= lengthLimit) { + return -1; // EOF reached due to length limit + } getS3AStreamStatistics().readOperationStarted(getPos(), 1); int bytesRead; @@ -75,7 +82,7 @@ public int read() throws IOException { } if (bytesRead != -1) { - getS3AStreamStatistics().bytesRead(1); + incrementBytesRead(1); } return bytesRead; @@ -124,7 +131,7 @@ public int readTail(byte[] buf, int off, int len) throws IOException { } if (bytesRead > 0) { - getS3AStreamStatistics().bytesRead(bytesRead); + incrementBytesRead(bytesRead); } return bytesRead; @@ -133,18 +140,25 @@ public int readTail(byte[] buf, int off, int len) throws IOException { @Override public int read(byte[] buf, int off, int len) throws IOException { throwIfClosed(); - getS3AStreamStatistics().readOperationStarted(getPos(), len); + long pos = getPos(); + if (pos >= lengthLimit) { + return -1; // EOF reached due to length limit + } + + // Limit read length to not exceed the length limit + int maxRead = (int) Math.min(len, lengthLimit - pos); + getS3AStreamStatistics().readOperationStarted(pos, maxRead); int bytesRead; try { - bytesRead = inputStream.read(buf, off, len); + bytesRead = inputStream.read(buf, off, maxRead); } catch (IOException ioe) { onReadFailure(ioe); throw ioe; } if (bytesRead > 0) { - getS3AStreamStatistics().bytesRead(bytesRead); + incrementBytesRead(bytesRead); } return bytesRead; @@ -260,4 +274,16 @@ protected void throwIfClosed() throws IOException { throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } + + /** + * Increment the bytes read counter if there is a stats instance + * and the number of bytes read is more than zero. + * @param bytesRead number of bytes read + */ + private void incrementBytesRead(long bytesRead) { + getS3AStreamStatistics().bytesRead(bytesRead); + if (getContext().getStats() != null && bytesRead > 0) { + getContext().getStats().incrementBytesRead(bytesRead); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index d289e3640eae1..a6b4be7330fd0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -42,6 +42,8 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; +import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; @@ -54,6 +56,8 @@ import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED; +import static org.apache.hadoop.io.Sizes.S_1K; +import static org.apache.hadoop.io.Sizes.S_1M; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -154,6 +158,9 @@ public void testMalformedParquetFooter() throws IOException { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + // S3A passes in the meta data on file open, we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } /** @@ -195,6 +202,7 @@ public void testMultiRowGroupParquet() throws Throwable { } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); + // S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } @@ -215,18 +223,29 @@ public void testInvalidConfigurationThrows() throws Exception { () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } + /** + * + * TXT files are classified as SEQUENTIAL format and use SequentialPrefetcher(requests the entire 10MB file) + * RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB) + * The 10MB range gets split into: [0-8MB) and [8MB-10MB) + * Each split range becomes a separate Block, resulting in 2 GET requests: + */ @Test public void testLargeFileMultipleGets() throws Throwable { describe("Large file should trigger multiple GET requests"); - Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 10MB - + Path dest = path("large-test-file.txt"); + byte[] data = dataset(10 * S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, 10 * S_1M, 1024, true); + byte[] buffer = new byte[S_1M * 10]; try (FSDataInputStream inputStream = getFileSystem().open(dest)) { IOStatistics ioStats = inputStream.getIOStatistics(); - inputStream.readFully(new byte[(int) getFileSystem().getFileStatus(dest).getLen()]); + inputStream.readFully(buffer); verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2); + // Because S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } } @@ -234,13 +253,18 @@ public void testLargeFileMultipleGets() throws Throwable { public void testSmallFileSingleGet() throws Throwable { describe("Small file should trigger only one GET request"); - Path dest = writeThenReadFile("small-test-file.txt", 1 * 1024 * 1024); // 1KB + Path dest = path("small-test-file.txt"); + byte[] data = dataset(S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); + byte[] buffer = new byte[S_1M]; try (FSDataInputStream inputStream = getFileSystem().open(dest)) { IOStatistics ioStats = inputStream.getIOStatistics(); - inputStream.readFully(new byte[(int) getFileSystem().getFileStatus(dest).getLen()]); + inputStream.readFully(buffer); verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + // Because S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests + verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } } @@ -249,93 +273,46 @@ public void testSmallFileSingleGet() throws Throwable { public void testRandomSeekPatternGets() throws Throwable { describe("Random seek pattern should optimize GET requests"); - Path dest = writeThenReadFile("seek-test.txt", 100 * 1024); + Path dest = path("seek-test.txt"); + byte[] data = dataset(5 * S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true); + byte[] buffer = new byte[S_1M]; try (FSDataInputStream inputStream = getFileSystem().open(dest)) { IOStatistics ioStats = inputStream.getIOStatistics(); - inputStream.seek(1000); - inputStream.read(new byte[100]); - - inputStream.seek(50000); - inputStream.read(new byte[100]); - - inputStream.seek(90000); - inputStream.read(new byte[100]); + inputStream.read(buffer); + inputStream.seek(2 * S_1M); + inputStream.read(new byte[512 * S_1K]); + inputStream.seek(3 * S_1M); + inputStream.read(new byte[512 * S_1K]); verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); - } -} - - @Test - public void testAALNeverMakesHeadRequests() throws Throwable { - describe("Prove AAL never makes HEAD requests - S3A provides all metadata"); - - Path dest = writeThenReadFile("no-head-test.txt", 1024 * 1024); // 1MB - - try (FSDataInputStream inputStream = getFileSystem().open(dest)) { - IOStatistics ioStats = inputStream.getIOStatistics(); - inputStream.read(new byte[1024]); - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - - ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream(); - Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); - } } @Test - public void testParquetReadingNoHeadRequests() throws Throwable { - describe("Parquet-optimized reading should not trigger AAL HEAD requests"); - - Path dest = path("parquet-head-test.parquet"); - File file = new File("src/test/resources/multi_row_group.parquet"); - Path sourcePath = new Path(file.toURI().getPath()); - getFileSystem().copyFromLocalFile(false, true, sourcePath, dest); - - try (FSDataInputStream stream = getFileSystem().openFile(dest) - .must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET) - .build().get()) { - - FileStatus fileStatus = getFileSystem().getFileStatus(dest); - stream.readFully(new byte[(int) fileStatus.getLen()]); + public void testSequentialStreamsNoDuplicateGets() throws Throwable { + describe("Sequential streams reading same object should not duplicate GETs"); - IOStatistics stats = stream.getIOStatistics(); - - verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); - verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_OPENED, 1); - - verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); - } - } - - - @Test - public void testConcurrentStreamsNoDuplicateGets() throws Throwable { - describe("Concurrent streams reading same object should not duplicate GETs"); - - Path dest = writeThenReadFile("concurrent-test.txt", 1 * 1024 * 1024); + Path dest = path("sequential-test.txt"); + byte[] data = dataset(S_1M, 256, 255); + writeDataset(getFileSystem(), dest, data, S_1M, 1024, true); + byte[] buffer = new byte[1024]; try (FSDataInputStream stream1 = getFileSystem().open(dest); FSDataInputStream stream2 = getFileSystem().open(dest)) { - byte[] buffer1 = new byte[1024]; - byte[] buffer2 = new byte[1024]; - - stream1.read(buffer1); - stream2.read(buffer2); + stream1.read(buffer); + stream2.read(buffer); IOStatistics stats1 = stream1.getIOStatistics(); IOStatistics stats2 = stream2.getIOStatistics(); - long totalGets = stats1.counters().getOrDefault( - STREAM_READ_ANALYTICS_GET_REQUESTS, 0L) + - stats2.counters().getOrDefault( - STREAM_READ_ANALYTICS_GET_REQUESTS, 0L); - Assertions.assertThat(totalGets).isEqualTo(1); + verifyStatisticCounterValue(stats1, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); + verifyStatisticCounterValue(stats2, STREAM_READ_ANALYTICS_GET_REQUESTS, 0); } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index 12a1cd7d8f63e..d07e326119bfa 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -81,10 +81,7 @@ protected Configuration createConfiguration() { public void setup() throws Exception { super.setup(); executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); - // Analytics accelerator currently does not support IOStatisticsContext, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support IOStatisticsContext"); + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 2ae28c74fe5b7..13818ad9c0914 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -52,10 +52,6 @@ public void testMetricsRegister() @Test public void testStreamStatistics() throws IOException { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path file = path("testStreamStatistics"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index 9ad2c0625a094..e998953683d60 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -174,10 +174,7 @@ private void abortActiveStream() throws IOException { public void testCostOfCreatingMagicFile() throws Throwable { describe("Files created under magic paths skip existence checks and marker deletes"); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); Path destFile = methodSubPath("file.txt"); fs.delete(destFile.getParent(), true); @@ -255,10 +252,6 @@ public void testCostOfCreatingMagicFile() throws Throwable { public void testCostOfSavingLoadingPendingFile() throws Throwable { describe("Verify costs of saving .pending file under a magic path"); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path partDir = methodSubPath("file.pending"); Path destFile = new Path(partDir, "file.pending"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index f81a3882d8918..059237c90ad97 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -46,10 +46,6 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest { @BeforeEach public void setUp() throws Exception { conf = new Configuration(); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(conf, - "Analytics Accelerator currently does not support stream statistics"); fc = S3ATestUtils.createTestFileContext(conf); testRootPath = fileContextTestHelper.getTestRootPath(fc, "test"); fc.mkdir(testRootPath, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index c1c03ca6e7212..1ee2b37ea8aef 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -113,10 +113,7 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); testFile = methodPath(); @@ -177,6 +174,11 @@ public void testStreamIsNotChecksummed() throws Throwable { // if prefetching is enabled, skip this test assumeNoPrefetching(); + // Skip for Analytics streams - checksum validation only exists in S3AInputStream. + // AnalyticsStream handles data integrity through AWS Analytics Accelerator internally. + if (isAnalyticsStream()) { + skip("Analytics stream doesn't use checksums"); + } S3AFileSystem fs = getFileSystem(); // open the file @@ -261,10 +263,13 @@ public void testOpenFileLongerLengthReadFully() throws Throwable { } }, always(), - // two GET calls were made, one for readFully, - // the second on the read() past the EOF - // the operation has got as far as S3 - probe(!prefetching(), STREAM_READ_OPENED, 1 + 1)); + // Analytics stream: 1 open (persistent connection) + // S3AInputStream: 2 opens (reopen on EOF) + // two GET calls were made, one for readFully, + // the second on the read() past the EOF + // the operation has got as far as S3 + probe(!prefetching() && !isAnalyticsStream(), STREAM_READ_OPENED, 2), + probe(!prefetching() && isAnalyticsStream(), STREAM_READ_OPENED, 1)); // now on a new stream, try a full read from after the EOF verifyMetrics(() -> { @@ -348,7 +353,9 @@ public void testReadPastEOF() throws Throwable { } }, always(), - probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra)); + // Analytics streams don't make HTTP requests when reading past EOF + probe(!prefetching && !isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, extra), + probe(!prefetching && isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, 0)); } /** @@ -461,6 +468,14 @@ private boolean prefetching() { return InputStreamType.Prefetch == streamType(getFileSystem()); } + /** + * Is the current stream type Analytics? + * @return true if Analytics stream is enabled. + */ + private boolean isAnalyticsStream() { + return streamType(getFileSystem()) == InputStreamType.Analytics; + } + /** * Skip the test if prefetching is enabled. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 2f54cab00b13d..a16f265a80c40 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -44,10 +44,6 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { */ @Test public void testBytesReadWithStream() throws IOException { - // Analytics accelerator currently does not support IOStatistics, this will be added as - // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path filePath = path(getMethodName()); byte[] oneKbBuf = new byte[ONE_KB]; From 84f2a46e5cc5980bb1d9bb1dda76bbd89ee87ac0 Mon Sep 17 00:00:00 2001 From: vaibhav5140 Date: Wed, 2 Jul 2025 15:58:42 +0100 Subject: [PATCH 3/5] NIT fix --- .../org/apache/hadoop/fs/statistics/StreamStatisticNames.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index 35692a54c0f38..b80177dd866e8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -132,10 +132,10 @@ public final class StreamStatisticNames { public static final String STREAM_READ_OPERATIONS = "stream_read_operations"; - /** Analytics GET requests made by stream. */ + /** GET requests made by the analytics stream. */ public static final String STREAM_READ_ANALYTICS_GET_REQUESTS = "stream_read_analytics_get_requests"; - /** Analytics HEAD requests made by stream. */ + /** HEAD requests made by the analytics stream. */ public static final String STREAM_READ_ANALYTICS_HEAD_REQUESTS = "stream_read_analytics_head_requests"; /** From 048d8aa5692bc9624a8777a1c7500f86c56b9eff Mon Sep 17 00:00:00 2001 From: vaibhav5140 Date: Tue, 8 Jul 2025 14:04:24 +0100 Subject: [PATCH 4/5] Updated version --- hadoop-project/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index a66c55359393f..4e664b9014821 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -207,7 +207,7 @@ 1.12.720 2.29.52 3.1.1 - 1.0.0 + 1.2.1 1.0.1 2.7.1 1.11.2 From 08a5f4261cdd33e27997d4702f517237d4158c3e Mon Sep 17 00:00:00 2001 From: vaibhav5140 Date: Tue, 8 Jul 2025 17:26:33 +0100 Subject: [PATCH 5/5] checkstyle fix --- .../fs/statistics/StreamStatisticNames.java | 6 +++-- .../streams/AnalyticsRequestCallback.java | 26 +++++++++---------- .../fs/s3a/impl/streams/AnalyticsStream.java | 1 - ...tS3AAnalyticsAcceleratorStreamReading.java | 19 ++++++++------ .../fs/s3a/ITestS3AIOStatisticsContext.java | 1 - .../apache/hadoop/fs/s3a/ITestS3AMetrics.java | 1 - .../s3a/commit/ITestCommitOperationCost.java | 1 - .../ITestS3AFileContextStatistics.java | 1 - .../fs/s3a/performance/ITestS3AOpenCost.java | 1 - .../ITestS3AFileSystemStatistic.java | 1 - 10 files changed, 28 insertions(+), 30 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index b80177dd866e8..0468f6dfc07b8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -133,10 +133,12 @@ public final class StreamStatisticNames { "stream_read_operations"; /** GET requests made by the analytics stream. */ - public static final String STREAM_READ_ANALYTICS_GET_REQUESTS = "stream_read_analytics_get_requests"; + public static final String STREAM_READ_ANALYTICS_GET_REQUESTS = + "stream_read_analytics_get_requests"; /** HEAD requests made by the analytics stream. */ - public static final String STREAM_READ_ANALYTICS_HEAD_REQUESTS = "stream_read_analytics_head_requests"; + public static final String STREAM_READ_ANALYTICS_HEAD_REQUESTS = + "stream_read_analytics_head_requests"; /** * Count of readVectored() operations in an input stream. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java index ec36b9936c7da..6176c290aaa7f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java @@ -26,27 +26,27 @@ * Implementation of AAL's RequestCallback interface that tracks analytics operations. */ public class AnalyticsRequestCallback implements RequestCallback { - private final S3AInputStreamStatistics statistics; + private final S3AInputStreamStatistics statistics; /** * Create a new callback instance. * @param statistics the statistics to update */ - public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) { - this.statistics = statistics; - } + public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) { + this.statistics = statistics; + } - @Override + @Override public void onGetRequest() { - statistics.incrementAnalyticsGetRequests(); - // Update ACTION_HTTP_GET_REQUEST statistic - DurationTracker tracker = statistics.initiateGetRequest(); - tracker.close(); - } + statistics.incrementAnalyticsGetRequests(); + // Update ACTION_HTTP_GET_REQUEST statistic + DurationTracker tracker = statistics.initiateGetRequest(); + tracker.close(); + } - @Override + @Override public void onHeadRequest() { - statistics.incrementAnalyticsHeadRequests(); - } + statistics.incrementAnalyticsHeadRequests(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index c0d61113b0abe..7e20addcc358b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -144,7 +144,6 @@ public int read(byte[] buf, int off, int len) throws IOException { if (pos >= lengthLimit) { return -1; // EOF reached due to length limit } - // Limit read length to not exceed the length limit int maxRead = (int) Math.min(len, lengthLimit - pos); getS3AStreamStatistics().readOperationStarted(pos, maxRead); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index a6b4be7330fd0..a18c9108b0501 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -118,7 +118,8 @@ public void testConnectorFrameWorkIntegration() throws Throwable { verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); - Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read").isEqualTo(500); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(500); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); @@ -153,7 +154,8 @@ public void testMalformedParquetFooter() throws IOException { ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream(); long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead(); - Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read").isEqualTo(bytesRead); + Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read") + .isEqualTo(bytesRead); } @@ -200,9 +202,9 @@ public void testMultiRowGroupParquet() throws Throwable { verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen()); verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1); } - verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); - // S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests + // S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } @@ -224,8 +226,7 @@ public void testInvalidConfigurationThrows() throws Exception { } /** - * - * TXT files are classified as SEQUENTIAL format and use SequentialPrefetcher(requests the entire 10MB file) + * TXT files(SEQUENTIAL format) use SequentialPrefetcher(requests the entire 10MB file). * RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB) * The 10MB range gets split into: [0-8MB) and [8MB-10MB) * Each split range becomes a separate Block, resulting in 2 GET requests: @@ -244,7 +245,8 @@ public void testLargeFileMultipleGets() throws Throwable { inputStream.readFully(buffer); verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2); - // Because S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests + // Because S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } } @@ -263,7 +265,8 @@ public void testSmallFileSingleGet() throws Throwable { inputStream.readFully(buffer); verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1); - // Because S3A passes in the meta-data(content length) on file open, we expect AAL to make no HEAD requests + // Because S3A passes in the meta-data(content length) on file open, + // we expect AAL to make no HEAD requests verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index d07e326119bfa..2bc342717a16c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -45,7 +45,6 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 13818ad9c0914..548b30a3b2dcb 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -28,7 +28,6 @@ import java.io.IOException; import java.io.InputStream; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index e998953683d60..7a62f76dd0b7d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -42,7 +42,6 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index 059237c90ad97..715df2f447d82 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -29,7 +29,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.junit.jupiter.api.Assertions.assertEquals; /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 1ee2b37ea8aef..34a7d5a54037b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -58,7 +58,6 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index a16f265a80c40..81a719fbea2b2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.StreamStatisticNames; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {