diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 09c19ad071a98..0468f6dfc07b8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -132,6 +132,14 @@ public final class StreamStatisticNames {
public static final String STREAM_READ_OPERATIONS =
"stream_read_operations";
+ /** GET requests made by the analytics stream. */
+ public static final String STREAM_READ_ANALYTICS_GET_REQUESTS =
+ "stream_read_analytics_get_requests";
+
+ /** HEAD requests made by the analytics stream. */
+ public static final String STREAM_READ_ANALYTICS_HEAD_REQUESTS =
+ "stream_read_analytics_head_requests";
+
/**
* Count of readVectored() operations in an input stream.
* Value: {@value}.
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index a66c55359393f..4e664b9014821 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -207,7 +207,7 @@
1.12.720
2.29.52
3.1.1
- 1.0.0
+ 1.2.1
1.0.1
2.7.1
1.11.2
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index b3c907428ac4e..f74987703a0b9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -872,6 +872,8 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_CLOSE_OPERATIONS,
StreamStatisticNames.STREAM_READ_OPENED,
StreamStatisticNames.STREAM_READ_BYTES,
+ StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS,
+ StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS,
StreamStatisticNames.STREAM_READ_EXCEPTIONS,
StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS,
StreamStatisticNames.STREAM_READ_OPERATIONS,
@@ -1128,6 +1130,15 @@ public void readVectoredBytesDiscarded(int discarded) {
bytesDiscardedInVectoredIO.addAndGet(discarded);
}
+ @Override
+ public void incrementAnalyticsGetRequests() {
+ increment(StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS);
+ }
+ @Override
+ public void incrementAnalyticsHeadRequests() {
+ increment(StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS);
+ }
+
@Override
public void executorAcquired(Duration timeInQueue) {
// update the duration fields in the IOStatistics.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 6389742167def..e9582b15c8045 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -459,6 +459,14 @@ public enum Statistic {
"Gauge of active memory in use",
TYPE_GAUGE),
+ ANALYTICS_GET_REQUESTS(
+ StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS,
+ "GET requests made by analytics streams",
+ TYPE_COUNTER),
+ ANALYTICS_HEAD_REQUESTS(
+ StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS,
+ "HEAD requests made by analytics streams",
+ TYPE_COUNTER),
/* Stream Write statistics */
STREAM_WRITE_EXCEPTIONS(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
new file mode 100644
index 0000000000000..6176c290aaa7f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
+
+/**
+ * Implementation of AAL's RequestCallback interface that tracks analytics operations.
+ */
+public class AnalyticsRequestCallback implements RequestCallback {
+ private final S3AInputStreamStatistics statistics;
+
+ /**
+ * Create a new callback instance.
+ * @param statistics the statistics to update
+ */
+ public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
+ this.statistics = statistics;
+ }
+
+ @Override
+ public void onGetRequest() {
+ statistics.incrementAnalyticsGetRequests();
+ // Update ACTION_HTTP_GET_REQUEST statistic
+ DurationTracker tracker = statistics.initiateGetRequest();
+ tracker.close();
+ }
+
+ @Override
+ public void onHeadRequest() {
+ statistics.incrementAnalyticsHeadRequests();
+ }
+}
+
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
index 6b910c6538070..7e20addcc358b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -28,6 +28,7 @@
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,8 @@ public class AnalyticsStream extends ObjectInputStream implements StreamCapabili
private S3SeekableInputStream inputStream;
private long lastReadCurrentPos = 0;
private volatile boolean closed;
+ private final long contentLength;
+ private final long lengthLimit;
public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class);
@@ -55,6 +58,8 @@ public AnalyticsStream(final ObjectReadParameters parameters,
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
super(InputStreamType.Analytics, parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+ this.contentLength = s3Attributes.getLen();
+ this.lengthLimit = s3Attributes.getLen();
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
@@ -63,6 +68,11 @@ public AnalyticsStream(final ObjectReadParameters parameters,
@Override
public int read() throws IOException {
throwIfClosed();
+ if (getPos() >= lengthLimit) {
+ return -1; // EOF reached due to length limit
+ }
+ getS3AStreamStatistics().readOperationStarted(getPos(), 1);
+
int bytesRead;
try {
bytesRead = inputStream.read();
@@ -70,6 +80,11 @@ public int read() throws IOException {
onReadFailure(ioe);
throw ioe;
}
+
+ if (bytesRead != -1) {
+ incrementBytesRead(1);
+ }
+
return bytesRead;
}
@@ -105,6 +120,8 @@ public synchronized long getPos() {
*/
public int readTail(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
+ getS3AStreamStatistics().readOperationStarted(getPos(), len);
+
int bytesRead;
try {
bytesRead = inputStream.readTail(buf, off, len);
@@ -112,19 +129,37 @@ public int readTail(byte[] buf, int off, int len) throws IOException {
onReadFailure(ioe);
throw ioe;
}
+
+ if (bytesRead > 0) {
+ incrementBytesRead(bytesRead);
+ }
+
return bytesRead;
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
+ long pos = getPos();
+ if (pos >= lengthLimit) {
+ return -1; // EOF reached due to length limit
+ }
+ // Limit read length to not exceed the length limit
+ int maxRead = (int) Math.min(len, lengthLimit - pos);
+ getS3AStreamStatistics().readOperationStarted(pos, maxRead);
+
int bytesRead;
try {
- bytesRead = inputStream.read(buf, off, len);
+ bytesRead = inputStream.read(buf, off, maxRead);
} catch (IOException ioe) {
onReadFailure(ioe);
throw ioe;
}
+
+ if (bytesRead > 0) {
+ incrementBytesRead(bytesRead);
+ }
+
return bytesRead;
}
@@ -194,10 +229,13 @@ private void onReadFailure(IOException ioe) throws IOException {
}
private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {
+
+ final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics());
+
OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
OpenStreamInformation.builder()
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
- .getInputPolicy()));
+ .getInputPolicy())).requestCallback(requestCallback);
if (parameters.getObjectAttributes().getETag() != null) {
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
@@ -235,4 +273,16 @@ protected void throwIfClosed() throws IOException {
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}
+
+ /**
+ * Increment the bytes read counter if there is a stats instance
+ * and the number of bytes read is more than zero.
+ * @param bytesRead number of bytes read
+ */
+ private void incrementBytesRead(long bytesRead) {
+ getS3AStreamStatistics().bytesRead(bytesRead);
+ if (getContext().getStats() != null && bytesRead > 0) {
+ getContext().getStats().incrementBytesRead(bytesRead);
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 7ad7cf75367e2..34f0e72fe2861 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -190,7 +190,14 @@ void readVectoredOperationStarted(int numIncomingRanges,
long getVersionMismatches();
long getInputPolicy();
-
+ /**
+ * Increment the counter for GET requests made by Analytics Accelerator Library.
+ */
+ void incrementAnalyticsGetRequests();
+ /**
+ * Increment the counter for HEAD requests made by Analytics Accelerator Library.
+ */
+ void incrementAnalyticsHeadRequests();
/**
* Get the value of a counter.
* @param name counter name
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index 26b9f2b1568ca..48739181efee6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -159,7 +159,12 @@ public void seekForwards(final long skipped,
final long bytesReadInSeek) {
}
-
+ @Override
+ public void incrementAnalyticsGetRequests() {
+ }
+ @Override
+ public void incrementAnalyticsHeadRequests() {
+ }
@Override
public long streamOpened() {
return 0;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
index dff171bbdd8eb..a18c9108b0501 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
@@ -42,13 +42,22 @@
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_GET_REQUESTS;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_HEAD_REQUESTS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
+import static org.apache.hadoop.io.Sizes.S_1K;
+import static org.apache.hadoop.io.Sizes.S_1M;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
@@ -104,6 +113,13 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
Assertions.assertThat(objectInputStream.getInputPolicy())
.isEqualTo(S3AInputPolicy.Sequential);
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+ long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead();
+ Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read")
+ .isEqualTo(500);
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
@@ -129,14 +145,24 @@ public void testMalformedParquetFooter() throws IOException {
byte[] buffer = new byte[500];
IOStatistics ioStats;
+ int bytesRead;
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
ioStats = inputStream.getIOStatistics();
inputStream.seek(5);
- inputStream.read(buffer, 0, 500);
+ bytesRead = inputStream.read(buffer, 0, 500);
+
+ ObjectInputStream objectInputStream = (ObjectInputStream) inputStream.getWrappedStream();
+ long streamBytesRead = objectInputStream.getS3AStreamStatistics().getBytesRead();
+ Assertions.assertThat(streamBytesRead).as("Stream statistics should track bytes read")
+ .isEqualTo(bytesRead);
+
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
+ // S3A passes in the meta data on file open, we expect AAL to make no HEAD requests
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
}
/**
@@ -166,15 +192,20 @@ public void testMultiRowGroupParquet() throws Throwable {
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
-
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
.must(FS_OPTION_OPENFILE_READ_POLICY, FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
.build().get()) {
ioStats = inputStream.getIOStatistics();
inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
- }
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) fileStatus.getLen());
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+ }
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ // S3A passes in the meta-data(content length) on file open,
+ // we expect AAL to make no HEAD requests
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
}
@Test
@@ -194,4 +225,97 @@ public void testInvalidConfigurationThrows() throws Exception {
() -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
}
+ /**
+ * TXT files(SEQUENTIAL format) use SequentialPrefetcher(requests the entire 10MB file).
+ * RangeOptimiser splits ranges larger than maxRangeSizeBytes (8MB) using partSizeBytes (8MB)
+ * The 10MB range gets split into: [0-8MB) and [8MB-10MB)
+ * Each split range becomes a separate Block, resulting in 2 GET requests:
+ */
+ @Test
+ public void testLargeFileMultipleGets() throws Throwable {
+ describe("Large file should trigger multiple GET requests");
+
+ Path dest = path("large-test-file.txt");
+ byte[] data = dataset(10 * S_1M, 256, 255);
+ writeDataset(getFileSystem(), dest, data, 10 * S_1M, 1024, true);
+
+ byte[] buffer = new byte[S_1M * 10];
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ IOStatistics ioStats = inputStream.getIOStatistics();
+ inputStream.readFully(buffer);
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 2);
+ // Because S3A passes in the meta-data(content length) on file open,
+ // we expect AAL to make no HEAD requests
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
+ }
+ }
+
+ @Test
+ public void testSmallFileSingleGet() throws Throwable {
+ describe("Small file should trigger only one GET request");
+
+ Path dest = path("small-test-file.txt");
+ byte[] data = dataset(S_1M, 256, 255);
+ writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);
+
+ byte[] buffer = new byte[S_1M];
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ IOStatistics ioStats = inputStream.getIOStatistics();
+ inputStream.readFully(buffer);
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
+ // Because S3A passes in the meta-data(content length) on file open,
+ // we expect AAL to make no HEAD requests
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
+ }
+ }
+
+
+ @Test
+ public void testRandomSeekPatternGets() throws Throwable {
+ describe("Random seek pattern should optimize GET requests");
+
+ Path dest = path("seek-test.txt");
+ byte[] data = dataset(5 * S_1M, 256, 255);
+ writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true);
+
+ byte[] buffer = new byte[S_1M];
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ IOStatistics ioStats = inputStream.getIOStatistics();
+
+ inputStream.read(buffer);
+ inputStream.seek(2 * S_1M);
+ inputStream.read(new byte[512 * S_1K]);
+ inputStream.seek(3 * S_1M);
+ inputStream.read(new byte[512 * S_1K]);
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
+ }
+ }
+
+
+ @Test
+ public void testSequentialStreamsNoDuplicateGets() throws Throwable {
+ describe("Sequential streams reading same object should not duplicate GETs");
+
+ Path dest = path("sequential-test.txt");
+ byte[] data = dataset(S_1M, 256, 255);
+ writeDataset(getFileSystem(), dest, data, S_1M, 1024, true);
+
+ byte[] buffer = new byte[1024];
+ try (FSDataInputStream stream1 = getFileSystem().open(dest);
+ FSDataInputStream stream2 = getFileSystem().open(dest)) {
+
+ stream1.read(buffer);
+ stream2.read(buffer);
+
+ IOStatistics stats1 = stream1.getIOStatistics();
+ IOStatistics stats2 = stream2.getIOStatistics();
+
+ verifyStatisticCounterValue(stats1, STREAM_READ_ANALYTICS_GET_REQUESTS, 1);
+ verifyStatisticCounterValue(stats2, STREAM_READ_ANALYTICS_GET_REQUESTS, 0);
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
index 12a1cd7d8f63e..2bc342717a16c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
@@ -45,7 +45,6 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -81,10 +80,7 @@ protected Configuration createConfiguration() {
public void setup() throws Exception {
super.setup();
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
- // Analytics accelerator currently does not support IOStatisticsContext, this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support IOStatisticsContext");
+
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
index 2ae28c74fe5b7..548b30a3b2dcb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
@@ -28,7 +28,6 @@
import java.io.IOException;
import java.io.InputStream;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
/**
@@ -52,10 +51,6 @@ public void testMetricsRegister()
@Test
public void testStreamStatistics() throws IOException {
- // Analytics accelerator currently does not support IOStatistics, this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path file = path("testStreamStatistics");
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index 9ad2c0625a094..7a62f76dd0b7d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -42,7 +42,6 @@
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
@@ -174,10 +173,7 @@ private void abortActiveStream() throws IOException {
public void testCostOfCreatingMagicFile() throws Throwable {
describe("Files created under magic paths skip existence checks and marker deletes");
- // Analytics accelerator currently does not support IOStatistics, this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
+
S3AFileSystem fs = getFileSystem();
Path destFile = methodSubPath("file.txt");
fs.delete(destFile.getParent(), true);
@@ -255,10 +251,6 @@ public void testCostOfCreatingMagicFile() throws Throwable {
public void testCostOfSavingLoadingPendingFile() throws Throwable {
describe("Verify costs of saving .pending file under a magic path");
- // Analytics accelerator currently does not support IOStatistics, this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path partDir = methodSubPath("file.pending");
Path destFile = new Path(partDir, "file.pending");
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index f81a3882d8918..715df2f447d82 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -29,7 +29,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -46,10 +45,6 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
@BeforeEach
public void setUp() throws Exception {
conf = new Configuration();
- // Analytics accelerator currently does not support IOStatistics, this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(conf,
- "Analytics Accelerator currently does not support stream statistics");
fc = S3ATestUtils.createTestFileContext(conf);
testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
fc.mkdir(testRootPath,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index c1c03ca6e7212..34a7d5a54037b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -58,7 +58,6 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
@@ -113,10 +112,7 @@ public Configuration createConfiguration() {
@Override
public void setup() throws Exception {
super.setup();
- // Analytics accelerator currently does not support IOStatistics, this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
+
S3AFileSystem fs = getFileSystem();
testFile = methodPath();
@@ -177,6 +173,11 @@ public void testStreamIsNotChecksummed() throws Throwable {
// if prefetching is enabled, skip this test
assumeNoPrefetching();
+ // Skip for Analytics streams - checksum validation only exists in S3AInputStream.
+ // AnalyticsStream handles data integrity through AWS Analytics Accelerator internally.
+ if (isAnalyticsStream()) {
+ skip("Analytics stream doesn't use checksums");
+ }
S3AFileSystem fs = getFileSystem();
// open the file
@@ -261,10 +262,13 @@ public void testOpenFileLongerLengthReadFully() throws Throwable {
}
},
always(),
- // two GET calls were made, one for readFully,
- // the second on the read() past the EOF
- // the operation has got as far as S3
- probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));
+ // Analytics stream: 1 open (persistent connection)
+ // S3AInputStream: 2 opens (reopen on EOF)
+ // two GET calls were made, one for readFully,
+ // the second on the read() past the EOF
+ // the operation has got as far as S3
+ probe(!prefetching() && !isAnalyticsStream(), STREAM_READ_OPENED, 2),
+ probe(!prefetching() && isAnalyticsStream(), STREAM_READ_OPENED, 1));
// now on a new stream, try a full read from after the EOF
verifyMetrics(() -> {
@@ -348,7 +352,9 @@ public void testReadPastEOF() throws Throwable {
}
},
always(),
- probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
+ // Analytics streams don't make HTTP requests when reading past EOF
+ probe(!prefetching && !isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, extra),
+ probe(!prefetching && isAnalyticsStream(), Statistic.ACTION_HTTP_GET_REQUEST, 0));
}
/**
@@ -461,6 +467,14 @@ private boolean prefetching() {
return InputStreamType.Prefetch == streamType(getFileSystem());
}
+ /**
+ * Is the current stream type Analytics?
+ * @return true if Analytics stream is enabled.
+ */
+ private boolean isAnalyticsStream() {
+ return streamType(getFileSystem()) == InputStreamType.Analytics;
+ }
+
/**
* Skip the test if prefetching is enabled.
*/
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
index da2a39a986ea4..2bf35ad19704b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
@@ -27,7 +27,6 @@
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
/**
@@ -78,13 +77,4 @@ public List outputStreamStatisticKeys() {
STREAM_WRITE_BLOCK_UPLOADS,
STREAM_WRITE_EXCEPTIONS);
}
-
- @Override
- public void testInputStreamStatisticRead() throws Throwable {
- // Analytics accelerator currently does not support IOStatistics, this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
- "Analytics Accelerator currently does not support stream statistics");
- super.testInputStreamStatisticRead();
- }
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
index 2f54cab00b13d..81a719fbea2b2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
@@ -31,7 +31,6 @@
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
@@ -44,10 +43,6 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
*/
@Test
public void testBytesReadWithStream() throws IOException {
- // Analytics accelerator currently does not support IOStatistics, this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path filePath = path(getMethodName());
byte[] oneKbBuf = new byte[ONE_KB];