diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index e9dd94ff4ed88..6c5929d2b9087 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -679,6 +679,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE) private int prefetchRequestPriorityValue; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_POLICY, + DefaultValue = DEFAULT_AZURE_READ_POLICY) + private String abfsReadPolicy; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; @@ -1433,6 +1437,14 @@ public String getPrefetchRequestPriorityValue() { return Integer.toString(prefetchRequestPriorityValue); } + /** + * Get the ABFS read policy set by user. + * @return the ABFS read policy. + */ + public String getAbfsReadPolicy() { + return abfsReadPolicy; + } + /** * Enum config to allow user to pick format of x-ms-client-request-id header * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT @@ -2139,6 +2151,15 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) this.isChecksumValidationEnabled = isChecksumValidationEnabled; } + /** + * Sets the ABFS read policy for testing purposes. + * @param readPolicy the read policy to set. + */ + @VisibleForTesting + public void setAbfsReadPolicy(String readPolicy) { + abfsReadPolicy = readPolicy; + } + public boolean isFullBlobChecksumValidationEnabled() { return isFullBlobChecksumValidationEnabled; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index de4bc79d55aa8..6ec5c51eb1d1a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -77,7 +77,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; -import org.apache.hadoop.fs.azurebfs.services.ListResponseData; import org.apache.hadoop.fs.azurebfs.enums.Trilean; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper; @@ -90,6 +89,7 @@ import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper; +import org.apache.hadoop.fs.azurebfs.services.AbfsAdaptiveInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext; import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder; @@ -97,6 +97,7 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsClientRenameResult; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; +import org.apache.hadoop.fs.azurebfs.services.AbfsReadPolicy; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; @@ -107,10 +108,13 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo; import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker; import org.apache.hadoop.fs.azurebfs.services.AbfsPermission; +import org.apache.hadoop.fs.azurebfs.services.AbfsPrefetchInputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsRandomInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.ListingSupport; +import org.apache.hadoop.fs.azurebfs.services.ListResponseData; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.TailLatencyRequestTimeoutRetryPolicy; @@ -950,8 +954,37 @@ public AbfsInputStream openFileForRead(Path path, perfInfo.registerSuccess(true); - // Add statistics for InputStream - return new AbfsInputStream(getClient(), statistics, relativePath, + return getRelevantInputStream(statistics, relativePath, contentLength, + parameters, contextEncryptionAdapter, eTag, tracingContext); + } + } + + private AbfsInputStream getRelevantInputStream(final FileSystem.Statistics statistics, + final String relativePath, + final long contentLength, + final Optional parameters, + final ContextEncryptionAdapter contextEncryptionAdapter, + final String eTag, + TracingContext tracingContext) { + AbfsReadPolicy inputPolicy = AbfsReadPolicy.getAbfsReadPolicy(getAbfsConfiguration().getAbfsReadPolicy()); + switch (inputPolicy) { + case SEQUENTIAL: + return new AbfsPrefetchInputStream(getClient(), statistics, relativePath, + contentLength, populateAbfsInputStreamContext( + parameters.map(OpenFileParameters::getOptions), + contextEncryptionAdapter), + eTag, tracingContext); + + case RANDOM: + return new AbfsRandomInputStream(getClient(), statistics, relativePath, + contentLength, populateAbfsInputStreamContext( + parameters.map(OpenFileParameters::getOptions), + contextEncryptionAdapter), + eTag, tracingContext); + + case ADAPTIVE: + default: + return new AbfsAdaptiveInputStream(getClient(), statistics, relativePath, contentLength, populateAbfsInputStreamContext( parameters.map(OpenFileParameters::getOptions), contextEncryptionAdapter), diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index c5eb9235fbb54..115568c7853b3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options.OpenFileOptions; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT; @@ -266,6 +267,12 @@ public final class ConfigurationKeys { public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize"; public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize"; + /** + * Provides hint for the read workload pattern. + * Possible Values Exposed in {@link OpenFileOptions#FS_OPTION_OPENFILE_READ_POLICIES} + */ + public static final String FS_AZURE_READ_POLICY = "fs.azure.read.policy"; + /** Provides a config control to enable or disable ABFS Flush operations - * HFlush and HSync. Default is true. **/ public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 6f76f2e033c06..a7717c124dbf4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; /** @@ -108,6 +109,7 @@ public final class FileSystemConfigurations { public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost"; public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000; + public static final String DEFAULT_AZURE_READ_POLICY = FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java index 332a5a5ac56e2..51391cc747740 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java @@ -48,6 +48,10 @@ public enum ReadType { * Only triggered when small file read optimization kicks in. */ SMALLFILE_READ("SR"), + /** + * Reads from Random Input Stream with read ahead up to readAheadRange + */ + RANDOM_READ("RR"), /** * None of the above read types were applicable. */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java index c257309c8c9fb..76126049f077c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/HttpResponseException.java @@ -28,12 +28,22 @@ */ public class HttpResponseException extends IOException { private final HttpResponse httpResponse; + + /** + * Constructor for HttpResponseException. + * @param s the exception message + * @param httpResponse the HttpResponse object + */ public HttpResponseException(final String s, final HttpResponse httpResponse) { super(s); Objects.requireNonNull(httpResponse, "httpResponse should be non-null"); this.httpResponse = httpResponse; } + /** + * Gets the HttpResponse associated with this exception. + * @return the HttpResponse + */ public HttpResponse getHttpResponse() { return httpResponse; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java new file mode 100644 index 0000000000000..25b4529aa0863 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +/** + * Input stream implementation optimized for adaptive read patterns. + * This is the default implementation used for cases where user does not specify any input policy. + * It switches between sequential and random read optimizations based on the detected read pattern. + * It also keeps footer read and small file optimizations enabled. + */ +public class AbfsAdaptiveInputStream extends AbfsInputStream { + + /** + * Constructs AbfsAdaptiveInputStream instance. + * @param client to be used for read operations + * @param statistics to record input stream statistics + * @param path file path + * @param contentLength file content length + * @param abfsInputStreamContext input stream context + * @param eTag file eTag + * @param tracingContext tracing context to trace the read operations + */ + public AbfsAdaptiveInputStream( + final AbfsClient client, + final FileSystem.Statistics statistics, + final String path, + final long contentLength, + final AbfsInputStreamContext abfsInputStreamContext, + final String eTag, + TracingContext tracingContext) { + super(client, statistics, path, contentLength, + abfsInputStreamContext, eTag, tracingContext); + } + + /** + * {@inheritDoc} + */ + @Override + protected int readOneBlock(final byte[] b, final int off, final int len) throws IOException { + if (len == 0) { + return 0; + } + if (!validate(b, off, len)) { + return -1; + } + // If buffer is empty, then fill the buffer. + if (getBCursor() == getLimit()) { + // If EOF, then return -1 + if (getFCursor() >= getContentLength()) { + return -1; + } + + long bytesRead = 0; + // reset buffer to initial state - i.e., throw away existing data + setBCursor(0); + setLimit(0); + if (getBuffer() == null) { + LOG.debug("created new buffer size {}", getBufferSize()); + setBuffer(new byte[getBufferSize()]); + } + + // Reset Read Type back to normal and set again based on code flow. + getTracingContext().setReadType(ReadType.NORMAL_READ); + if (shouldAlwaysReadBufferSize()) { + bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false); + } else { + // Enable readAhead when reading sequentially + if (-1 == getFCursorAfterLastRead() || getFCursorAfterLastRead() == getFCursor() || b.length >= getBufferSize()) { + LOG.debug("Sequential read with read ahead size of {}", getBufferSize()); + bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false); + } else { + /* + * Disable queuing prefetches when random read pattern detected. + * Instead, read ahead only for readAheadRange above what is asked by caller. + */ + getTracingContext().setReadType(ReadType.RANDOM_READ); + int lengthWithReadAhead = Math.min(b.length + getReadAheadRange(), getBufferSize()); + LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); + bytesRead = readInternal(getFCursor(), getBuffer(), 0, lengthWithReadAhead, true); + } + } + if (isFirstRead()) { + setFirstRead(false); + } + if (bytesRead == -1) { + return -1; + } + + setLimit(getLimit() + (int) bytesRead); + setFCursor(getFCursor() + bytesRead); + setFCursorAfterLastRead(getFCursor()); + } + return copyToUserBuffer(b, off, len); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 31b6f0f073940..4a9880794a89b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -26,7 +26,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; @@ -62,9 +61,9 @@ /** * The AbfsInputStream for AbfsClient. */ -public class AbfsInputStream extends FSInputStream implements CanUnbuffer, +public abstract class AbfsInputStream extends FSInputStream implements CanUnbuffer, StreamCapabilities, IOStatisticsSource { - private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); + protected static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); // Footer size is set to qualify for both ORC and parquet files public static final int FOOTER_SIZE = 16 * ONE_KB; public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2; @@ -73,6 +72,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final AbfsClient client; private final Statistics statistics; private final String path; + private final long contentLength; private final int bufferSize; // default buffer size private final int footerReadSize; // default buffer size to read when reading footer @@ -94,7 +94,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, // User configured size of read ahead. private final int readAheadRange; - private boolean firstRead = true; + private boolean firstRead = true; // to identify first read for optimizations + // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; private byte[] buffer = null; // will be initialized on first use @@ -134,6 +135,16 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final BackReference fsBackRef; private final ReadBufferManager readBufferManager; + /** + * Constructor for AbfsInputStream. + * @param client the ABFS client + * @param statistics the statistics + * @param path the file path + * @param contentLength the content length + * @param abfsInputStreamContext the input stream context + * @param eTag the eTag of the file + * @param tracingContext the tracing context + */ public AbfsInputStream( final AbfsClient client, final Statistics statistics, @@ -197,6 +208,10 @@ public AbfsInputStream( } } + /** + * Returns the path of file associated with this stream. + * @return the path of the file + */ public String getPath() { return path; } @@ -258,7 +273,7 @@ public synchronized int read(final byte[] b, final int off, final int len) throw // check if buffer is null before logging the length if (b != null) { LOG.debug("read requested b.length = {} offset = {} len = {}", b.length, - off, len); + off, len); } else { LOG.debug("read requested b = null offset = {} len = {}", off, len); } @@ -327,58 +342,15 @@ private boolean shouldReadLastBlock() { && this.fCursor >= footerStart; } - private int readOneBlock(final byte[] b, final int off, final int len) throws IOException { - if (len == 0) { - return 0; - } - if (!validate(b, off, len)) { - return -1; - } - //If buffer is empty, then fill the buffer. - if (bCursor == limit) { - //If EOF, then return -1 - if (fCursor >= contentLength) { - return -1; - } - - long bytesRead = 0; - //reset buffer to initial state - i.e., throw away existing data - bCursor = 0; - limit = 0; - if (buffer == null) { - LOG.debug("created new buffer size {}", bufferSize); - buffer = new byte[bufferSize]; - } - - // Reset Read Type back to normal and set again based on code flow. - tracingContext.setReadType(ReadType.NORMAL_READ); - if (alwaysReadBufferSize) { - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - // Enable readAhead when reading sequentially - if (-1 == fCursorAfterLastRead || fCursorAfterLastRead == fCursor || b.length >= bufferSize) { - LOG.debug("Sequential read with read ahead size of {}", bufferSize); - bytesRead = readInternal(fCursor, buffer, 0, bufferSize, false); - } else { - // Enabling read ahead for random reads as well to reduce number of remote calls. - int lengthWithReadAhead = Math.min(b.length + readAheadRange, bufferSize); - LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); - bytesRead = readInternal(fCursor, buffer, 0, lengthWithReadAhead, true); - } - } - if (firstRead) { - firstRead = false; - } - if (bytesRead == -1) { - return -1; - } - - limit += bytesRead; - fCursor += bytesRead; - fCursorAfterLastRead = fCursor; - } - return copyToUserBuffer(b, off, len); - } + /** + * Read one block of data into buffer. + * @param b buffer + * @param off offset + * @param len length + * @return number of bytes read + * @throws IOException if there is an error + */ + protected abstract int readOneBlock(byte[] b, int off, int len) throws IOException; private int readFileCompletely(final byte[] b, final int off, final int len) throws IOException { @@ -472,7 +444,15 @@ private void restorePointerState() { this.bCursor = this.bCursorBkp; } - private boolean validate(final byte[] b, final int off, final int len) + /** + * Validate the read parameters. + * @param b buffer byte array + * @param off offset in buffer + * @param len length to read + * @return true if valid else false + * @throws IOException if there is an error + */ + protected boolean validate(final byte[] b, final int off, final int len) throws IOException { if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); @@ -492,7 +472,14 @@ private boolean validate(final byte[] b, final int off, final int len) return true; } - private int copyToUserBuffer(byte[] b, int off, int len){ + /** + * Copy data from internal buffer to user buffer. + * @param b user buffer + * @param off offset + * @param len length + * @return number of bytes copied + */ + protected int copyToUserBuffer(byte[] b, int off, int len){ //If there is anything in the buffer, then return lesser of (requested bytes) and (bytes in buffer) //(bytes returned may be less than requested) int bytesRemaining = limit - bCursor; @@ -511,7 +498,17 @@ private int copyToUserBuffer(byte[] b, int off, int len){ return bytesToRead; } - private int readInternal(final long position, final byte[] b, final int offset, final int length, + /** + * Internal read method which handles read-ahead logic. + * @param position to read from + * @param b buffer + * @param offset in buffer + * @param length to read + * @param bypassReadAhead whether to bypass read-ahead + * @return number of bytes read + * @throws IOException if there is an error + */ + protected int readInternal(final long position, final byte[] b, final int offset, final int length, final boolean bypassReadAhead) throws IOException { if (isReadAheadEnabled() && !bypassReadAhead) { // try reading from read-ahead @@ -728,6 +725,10 @@ public synchronized long getPos() throws IOException { return nextReadPos < 0 ? 0 : nextReadPos; } + /** + * Get the tracing context associated with this stream. + * @return the tracing context + */ public TracingContext getTracingContext() { return tracingContext; } @@ -797,10 +798,22 @@ public boolean hasCapability(String capability) { return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability)); } + /** + * Getter for buffer. + * @return the buffer + */ byte[] getBuffer() { return buffer; } + /** + * Setter for buffer. + * @param buffer the buffer to set + */ + protected void setBuffer(byte[] buffer) { + this.buffer = buffer; + } + /** * Checks if any version of read ahead is enabled. * If both are disabled, then skip read ahead logic. @@ -811,21 +824,38 @@ public boolean isReadAheadEnabled() { return (readAheadEnabled || readAheadV2Enabled) && getReadBufferManager() != null; } + /** + * Getter for user configured read ahead range. + * @return the read ahead range in int. + */ @VisibleForTesting public int getReadAheadRange() { return readAheadRange; } + /** + * Setter for cachedSasToken. + * @param cachedSasToken the cachedSasToken to set + */ @VisibleForTesting protected void setCachedSasToken(final CachedSASToken cachedSasToken) { this.cachedSasToken = cachedSasToken; } + /** + * Getter for inputStreamId. + * @return the inputStreamId + */ @VisibleForTesting public String getStreamID() { return inputStreamId; } + /** + * Getter for eTag. + * + * @return the eTag + */ public String getETag() { return eTag; } @@ -840,6 +870,10 @@ public AbfsInputStreamStatistics getStreamStatistics() { return streamStatistics; } + /** + * Register a listener for this stream. + * @param listener1 the listener to register + */ @VisibleForTesting public void registerListener(Listener listener1) { listener = listener1; @@ -866,26 +900,46 @@ public long getBytesFromRemoteRead() { return bytesFromRemoteRead; } + /** + * Getter for buffer size. + * @return the buffer size + */ @VisibleForTesting public int getBufferSize() { return bufferSize; } + /** + * Getter for footer read buffer size. + * @return the footer read buffer size + */ @VisibleForTesting protected int getFooterReadBufferSize() { return footerReadSize; } + /** + * Getter for read ahead queue depth. + * @return the read ahead queue depth + */ @VisibleForTesting public int getReadAheadQueueDepth() { return readAheadQueueDepth; } + /** + * Getter for alwaysReadBufferSize. + * @return the alwaysReadBufferSize + */ @VisibleForTesting public boolean shouldAlwaysReadBufferSize() { return alwaysReadBufferSize; } + /** + * Get the IOStatistics for the stream. + * @return IOStatistics + */ @Override public IOStatistics getIOStatistics() { return ioStatistics; @@ -907,48 +961,131 @@ public String toString() { return sb.toString(); } + /** + * Getter for bCursor. + * @return the bCursor + */ @VisibleForTesting int getBCursor() { return this.bCursor; } + /** + * Setter for bCursor. + * @param bCursor the bCursor to set + */ + protected void setBCursor(int bCursor) { + this.bCursor = bCursor; + } + + /** + * Getter for fCursor. + * @return the fCursor + */ @VisibleForTesting long getFCursor() { return this.fCursor; } + /** + * Setter for fCursor. + * @param fCursor the fCursor to set + */ + protected void setFCursor(long fCursor) { + this.fCursor = fCursor; + } + + /** + * Getter for fCursorAfterLastRead. + * @return the fCursorAfterLastRead + */ @VisibleForTesting long getFCursorAfterLastRead() { return this.fCursorAfterLastRead; } + /** + * Setter for fCursorAfterLastRead. + * @param fCursorAfterLastRead the fCursorAfterLastRead to set + */ + protected void setFCursorAfterLastRead(long fCursorAfterLastRead) { + this.fCursorAfterLastRead = fCursorAfterLastRead; + } + + /** + * Getter for limit. + * @return the limit + */ @VisibleForTesting - long getLimit() { + int getLimit() { return this.limit; } + /** + * Setter for limit. + * @param limit the limit to set + */ + protected void setLimit(int limit) { + this.limit = limit; + } + + /** + * Getter for firstRead. + * @return the firstRead + */ boolean isFirstRead() { return this.firstRead; } + /** + * Setter for firstRead. + * @param firstRead the firstRead to set + */ + protected void setFirstRead(boolean firstRead) { + this.firstRead = firstRead; + } + + /** + * Getter for fsBackRef. + * @return the fsBackRef + */ @VisibleForTesting BackReference getFsBackRef() { return fsBackRef; } + /** + * Getter for readBufferManager. + * @return the readBufferManager + */ @VisibleForTesting ReadBufferManager getReadBufferManager() { return readBufferManager; } + /** + * Minimum seek distance for vector reads. + * @return the minimum seek distance + */ @Override public int minSeekForVectorReads() { return S_128K; } + /** + * Maximum read size for vector reads. + * @return the maximum read size + */ @Override public int maxReadSizeForVectorReads() { return S_2M; } + /** + * Getter for contentLength. + * @return the contentLength + */ + protected long getContentLength() { + return contentLength; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java index fa4b0ac209de9..aab6f3d5510e8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsLease.java @@ -22,6 +22,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +71,7 @@ public final class AbfsLease { private volatile boolean leaseFreed; private volatile String leaseID = null; private volatile Throwable exception = null; - private volatile int acquireRetryCount = 0; + private AtomicInteger acquireRetryCount = new AtomicInteger(0); private volatile ListenableScheduledFuture future = null; private final long leaseRefreshDuration; private final int leaseRefreshDurationInSeconds; @@ -197,7 +198,7 @@ public void onFailure(Throwable throwable) { if (RetryPolicy.RetryAction.RetryDecision.RETRY == retryPolicy.shouldRetry(null, numRetries, 0, true).action) { LOG.debug("Failed to acquire lease on {}, retrying: {}", path, throwable); - acquireRetryCount++; + acquireRetryCount.incrementAndGet(); acquireLease(retryPolicy, numRetries + 1, retryInterval, retryInterval, eTag, tracingContext); } else { @@ -289,7 +290,7 @@ public String getLeaseID() { */ @VisibleForTesting public int getAcquireRetryCount() { - return acquireRetryCount; + return acquireRetryCount.get(); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java new file mode 100644 index 0000000000000..c0343ca724e05 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +/** + * Input stream implementation optimized for prefetching data. + * This implementation always prefetches data in advance if enabled + * to optimize for sequential read patterns. + */ +public class AbfsPrefetchInputStream extends AbfsInputStream { + + /** + * Constructs AbfsPrefetchInputStream + * @param client AbfsClient to be used for read operations + * @param statistics to recordinput stream statistics + * @param path file path + * @param contentLength file content length + * @param abfsInputStreamContext input stream context + * @param eTag file eTag + * @param tracingContext tracing context to trace the read operations + */ + public AbfsPrefetchInputStream( + final AbfsClient client, + final FileSystem.Statistics statistics, + final String path, + final long contentLength, + final AbfsInputStreamContext abfsInputStreamContext, + final String eTag, + TracingContext tracingContext) { + super(client, statistics, path, contentLength, + abfsInputStreamContext, eTag, tracingContext); + } + + /** + * {@inheritDoc} + */ + @Override + protected int readOneBlock(final byte[] b, final int off, final int len) throws IOException { + if (len == 0) { + return 0; + } + if (!validate(b, off, len)) { + return -1; + } + // If buffer is empty, then fill the buffer. + if (getBCursor() == getLimit()) { + // If EOF, then return -1 + if (getFCursor() >= getContentLength()) { + return -1; + } + + long bytesRead = 0; + // reset buffer to initial state - i.e., throw away existing data + setBCursor(0); + setLimit(0); + if (getBuffer() == null) { + LOG.debug("created new buffer size {}", getBufferSize()); + setBuffer(new byte[getBufferSize()]); + } + + /* + * Always start with Prefetch even from first read. + * Even if out of order seek comes, prefetches will be triggered for next set of blocks. + */ + bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false); + if (isFirstRead()) { + setFirstRead(false); + } + if (bytesRead == -1) { + return -1; + } + + setLimit(getLimit() + (int) bytesRead); + setFCursor(getFCursor() + bytesRead); + setFCursorAfterLastRead(getFCursor()); + } + return copyToUserBuffer(b, off, len); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java new file mode 100644 index 0000000000000..b484cc6c84353 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +/** + * Input stream implementation optimized for random read patterns. + * This implementation disables prefetching of data blocks instead only + * reads ahead for a small range beyond what is requested by the caller. + */ +public class AbfsRandomInputStream extends AbfsInputStream { + + /** + * Constructs AbfsRandomInputStream + * @param client AbfsClient to be used for read operations + * @param statistics to record input stream statistics + * @param path file path + * @param contentLength file content length + * @param abfsInputStreamContext input stream context + * @param eTag file eTag + * @param tracingContext tracing context to trace the read operations + */ + public AbfsRandomInputStream( + final AbfsClient client, + final FileSystem.Statistics statistics, + final String path, + final long contentLength, + final AbfsInputStreamContext abfsInputStreamContext, + final String eTag, + TracingContext tracingContext) { + super(client, statistics, path, contentLength, + abfsInputStreamContext, eTag, tracingContext); + } + + /** + * inheritDoc + */ + @Override + protected int readOneBlock(final byte[] b, final int off, final int len) + throws IOException { + if (len == 0) { + return 0; + } + if (!validate(b, off, len)) { + return -1; + } + // If buffer is empty, then fill the buffer. + if (getBCursor() == getLimit()) { + // If EOF, then return -1 + if (getFCursor() >= getContentLength()) { + return -1; + } + + long bytesRead = 0; + // reset buffer to initial state - i.e., throw away existing data + setBCursor(0); + setLimit(0); + if (getBuffer() == null) { + LOG.debug("created new buffer size {}", getBufferSize()); + setBuffer(new byte[getBufferSize()]); + } + + /* + * Disable queuing prefetches when random read pattern detected. + * Instead, read ahead only for readAheadRange above what is asked by caller. + */ + getTracingContext().setReadType(ReadType.RANDOM_READ); + int lengthWithReadAhead = Math.min(b.length + getReadAheadRange(), getBufferSize()); + LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead); + bytesRead = readInternal(getFCursor(), getBuffer(), 0, lengthWithReadAhead, true); + if (isFirstRead()) { + setFirstRead(false); + } + if (bytesRead == -1) { + return -1; + } + + setLimit(getLimit() + (int) bytesRead); + setFCursor(getFCursor() + bytesRead); + setFCursorAfterLastRead(getFCursor()); + } + return copyToUserBuffer(b, off, len); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java new file mode 100644 index 0000000000000..bdd895a39def6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadPolicy.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Locale; + +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE; + +/** + * Enum for ABFS Input Policies. + * Each policy maps to a particular implementation of {@link AbfsInputStream} + */ +public enum AbfsReadPolicy { + + SEQUENTIAL(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL), + RANDOM(FS_OPTION_OPENFILE_READ_POLICY_RANDOM), + ADAPTIVE(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE); + + private final String readPolicy; + + AbfsReadPolicy(String readPolicy) { + this.readPolicy = readPolicy; + } + + @Override + public String toString() { + return readPolicy; + } + + /** + * Get the enum constant from the string name. + * @param name policy name as configured by user + * @return the corresponding AbsInputPolicy to be used + */ + public static AbfsReadPolicy getAbfsReadPolicy(String name) { + String readPolicyStr = name.trim().toLowerCase(Locale.ENGLISH); + switch (readPolicyStr) { + // all these options currently map to random IO. + case FS_OPTION_OPENFILE_READ_POLICY_RANDOM: + case FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR: + case FS_OPTION_OPENFILE_READ_POLICY_ORC: + case FS_OPTION_OPENFILE_READ_POLICY_PARQUET: + return RANDOM; + + // handle the sequential formats. + case FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL: + case FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE: + return SEQUENTIAL; + + // Everything else including ABFS Default Policy maps to Adaptive + case FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE: + default: + return ADAPTIVE; + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java index f3e1e582f9dab..7164e55e90ce5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRetryPolicy.java @@ -41,6 +41,11 @@ public abstract class AbfsRetryPolicy { */ private final String retryPolicyAbbreviation; + /** + * Constructor to initialize max retry count and abbreviation + * @param maxRetryCount maximum retry count + * @param retryPolicyAbbreviation abbreviation for retry policy + */ protected AbfsRetryPolicy(final int maxRetryCount, final String retryPolicyAbbreviation) { this.maxRetryCount = maxRetryCount; this.retryPolicyAbbreviation = retryPolicyAbbreviation; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 712b04fb4999c..5b53d641a20df 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -23,7 +23,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; -import java.util.Stack; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; @@ -45,7 +44,6 @@ public abstract class ReadBufferManager { private static int thresholdAgeMilliseconds; private static int blockSize = DEFAULT_READ_AHEAD_BLOCK_SIZE; // default block size for read-ahead in bytes - private Stack freeList = new Stack<>(); // indices in buffers[] array that are available private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading @@ -200,15 +198,6 @@ protected static void setReadAheadBlockSize(int readAheadBlockSize) { blockSize = readAheadBlockSize; } - /** - * Gets the stack of free buffer indices. - * - * @return the stack of free buffer indices - */ - Stack getFreeList() { - return freeList; - } - /** * Gets the queue of read-ahead requests. * @@ -243,9 +232,7 @@ LinkedList getCompletedReadList() { * @return a list of free buffer indices */ @VisibleForTesting - List getFreeListCopy() { - return new ArrayList<>(freeList); - } + abstract List getFreeListCopy(); /** * Gets a copy of the read-ahead queue. @@ -294,7 +281,9 @@ int getCompletedReadListSize() { */ @VisibleForTesting protected void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) { - freeList.clear(); + clearFreeList(); completedReadList.add(buf); } + + abstract void clearFreeList(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index 240a618666621..c034d85659603 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; +import java.util.List; +import java.util.Stack; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; @@ -42,6 +44,7 @@ public final class ReadBufferManagerV1 extends ReadBufferManager { private Thread[] threads = new Thread[NUM_THREADS]; private byte[][] buffers; + private Stack freeList = new Stack<>(); // indices in buffers[] array that are available private static ReadBufferManagerV1 bufferManager; // hide instance constructor @@ -607,7 +610,21 @@ void resetBufferManager() { setBufferManager(null); // reset the singleton instance } + @Override + protected List getFreeListCopy() { + return new ArrayList<>(freeList); + } + + private Stack getFreeList() { + return freeList; + } + private static void setBufferManager(ReadBufferManagerV1 manager) { bufferManager = manager; } + + @Override + protected void clearFreeList() { + getFreeList().clear(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index 7f276eb77d859..5cbe4893b12a2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -27,7 +27,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Stack; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -100,7 +100,12 @@ public final class ReadBufferManagerV2 extends ReadBufferManager { private byte[][] bufferPool; - private final Stack removedBufferList = new Stack<>(); + /* + * List of buffer indexes that are currently free and can be assigned to new read-ahead requests. + * Using a thread safe data structure as multiple threads can access this concurrently. + */ + private final ConcurrentSkipListSet removedBufferList = new ConcurrentSkipListSet<>(); + private ConcurrentSkipListSet freeList = new ConcurrentSkipListSet<>(); private ScheduledExecutorService memoryMonitorThread; @@ -209,7 +214,7 @@ void init() { // Start with just minimum number of buffers. bufferPool[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. The byte array never goes back to GC - getFreeList().add(i); + pushToFreeList(i); numberOfActiveBuffers.getAndIncrement(); } memoryMonitorThread = Executors.newSingleThreadScheduledExecutor( @@ -768,12 +773,17 @@ private synchronized boolean tryMemoryUpscale() { if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) { // Create and Add more buffers in getFreeList(). int nextIndx = getNumBuffers(); - if (removedBufferList.isEmpty() && nextIndx < bufferPool.length) { + if (removedBufferList.isEmpty()) { + if (nextIndx >= bufferPool.length) { + printTraceLog("Invalid next index: {}. Current buffer pool size: {}", + nextIndx, bufferPool.length); + return false; + } bufferPool[nextIndx] = new byte[getReadAheadBlockSize()]; pushToFreeList(nextIndx); } else { // Reuse a removed buffer index. - int freeIndex = removedBufferList.pop(); + int freeIndex = removedBufferList.pollFirst(); if (freeIndex >= bufferPool.length || bufferPool[freeIndex] != null) { printTraceLog("Invalid free index: {}. Current buffer pool size: {}", freeIndex, bufferPool.length); @@ -811,7 +821,7 @@ > getThresholdAgeMilliseconds()) { } double memoryLoad = ResourceUtilizationUtils.getMemoryLoad(); - if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) { + if (isDynamicScalingEnabled && memoryLoad > memoryThreshold && getNumBuffers() > minBufferPoolSize) { synchronized (this) { if (isFreeListEmpty()) { printTraceLog( @@ -980,7 +990,7 @@ public void testResetReadBufferManager() { getReadAheadQueue().clear(); getInProgressList().clear(); getCompletedReadList().clear(); - getFreeList().clear(); + clearFreeList(); for (int i = 0; i < maxBufferPoolSize; i++) { bufferPool[i] = null; } @@ -1023,6 +1033,16 @@ void resetBufferManager() { setIsConfigured(false); } + @Override + protected List getFreeListCopy() { + return new ArrayList<>(freeList); + } + + @Override + protected void clearFreeList() { + freeList.clear(); + } + private static void setBufferManager(ReadBufferManagerV2 manager) { bufferManager = manager; } @@ -1062,11 +1082,20 @@ public int getMinBufferPoolSize() { return minBufferPoolSize; } + @VisibleForTesting + public void setMinBufferPoolSize(int size) { + this.minBufferPoolSize = size; + } + @VisibleForTesting public int getMaxBufferPoolSize() { return maxBufferPoolSize; } + /** + * Gets the maximum buffer pool size. + * @return size of the maximum buffer pool + */ @VisibleForTesting public int getCurrentThreadPoolSize() { return workerRefs.size(); @@ -1082,6 +1111,10 @@ public int getMemoryMonitoringIntervalInMilliSec() { return memoryMonitoringIntervalInMilliSec; } + /** + * Returns the scheduled executor service used for CPU monitoring. + * @return the ScheduledExecutorService for CPU monitoring tasks + */ @VisibleForTesting public ScheduledExecutorService getCpuMonitoringThread() { return cpuMonitorThread; @@ -1098,6 +1131,13 @@ public long getMaxJvmCpuUtilization() { return maxJvmCpuUtilization; } + /** + * Calculates the required thread pool size based on the current + * read-ahead queue size and in-progress list size, applying a buffer + * to accommodate workload fluctuations. + * + * @return the calculated required thread pool size + */ public int getRequiredThreadPoolSize() { return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER * (getReadAheadQueue().size() @@ -1105,30 +1145,15 @@ public int getRequiredThreadPoolSize() { } private boolean isFreeListEmpty() { - LOCK.lock(); - try { - return getFreeList().isEmpty(); - } finally { - LOCK.unlock(); - } + return this.freeList.isEmpty(); } private Integer popFromFreeList() { - LOCK.lock(); - try { - return getFreeList().pop(); - } finally { - LOCK.unlock(); - } + return this.freeList.pollFirst(); } private void pushToFreeList(int idx) { - LOCK.lock(); - try { - getFreeList().push(idx); - } finally { - LOCK.unlock(); - } + this.freeList.add(idx); } private void incrementActiveBufferCount() { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java index 6b87f1b73ef20..c215094eca0ce 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsInputStreamStatistics.java @@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsAdaptiveInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; @@ -275,7 +276,7 @@ public void testWithNullStreamStatistics() throws IOException { getTestTracingContext(fs, false), null); // AbfsInputStream with no StreamStatistics. - in = new AbfsInputStream(fs.getAbfsClient(), null, + in = new AbfsAdaptiveInputStream(fs.getAbfsClient(), null, nullStatFilePath.toUri().getPath(), ONE_KB, abfsInputStreamContext, abfsRestOperation.getResult().getResponseHeader("ETag"), getTestTracingContext(fs, false)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java index 938f5f4300ce9..2d8629294f643 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java @@ -28,11 +28,11 @@ import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamTestUtils.HUNDRED; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; @@ -117,7 +117,7 @@ public void testAzureBlobFileSystemBackReferenceInInputStream() FSDataInputStream in = getFileSystem().open(path)) { AbfsInputStream abfsInputStream = (AbfsInputStream) in.getWrappedStream(); - Assertions.assertThat(abfsInputStream.getFsBackRef().isNull()) + assertThat(abfsInputStream.getFsBackRef().isNull()) .describedAs("BackReference in input stream should not be null") .isFalse(); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 93df6529cb869..5cf0bd473fc24 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,6 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -57,6 +58,10 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderVersion; import org.apache.hadoop.fs.impl.OpenFileParameters; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_AVRO; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET; +import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; @@ -67,6 +72,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.NORMAL_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.PREFETCH_READ; +import static org.apache.hadoop.fs.azurebfs.constants.ReadType.RANDOM_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -92,8 +98,7 @@ /** * Unit test AbfsInputStream. */ -public class TestAbfsInputStream extends - AbstractAbfsIntegrationTest { +public class TestAbfsInputStream extends AbstractAbfsIntegrationTest { private static final int ONE_KB = 1 * 1024; private static final int TWO_KB = 2 * 1024; @@ -148,7 +153,7 @@ AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) throws IOException { AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); // Create AbfsInputStream with the client instance - AbfsInputStream inputStream = new AbfsInputStream( + AbfsInputStream inputStream = new AbfsAdaptiveInputStream( mockAbfsClient, null, FORWARD_SLASH + fileName, @@ -176,7 +181,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, int readAheadBlockSize) throws IOException { AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1); // Create AbfsInputStream with the client instance - AbfsInputStream inputStream = new AbfsInputStream( + AbfsInputStream inputStream = new AbfsAdaptiveInputStream( abfsClient, null, FORWARD_SLASH + fileName, @@ -848,6 +853,7 @@ public void testReadTypeInTracingContextHeader() throws Exception { fileSize = 3 * ONE_MB; // To make sure multiple blocks are read with MR totalReadCalls += 3; // 3 block of 1MB. Mockito.doReturn(0).when(spiedConfig).getReadAheadQueueDepth(); + Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL).when(spiedConfig).getAbfsReadPolicy(); doReturn(true).when(spiedConfig).isReadAheadEnabled(); testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, MISSEDCACHE_READ, 3, totalReadCalls); @@ -881,6 +887,15 @@ public void testReadTypeInTracingContextHeader() throws Exception { doReturn(false).when(spiedConfig).optimizeFooterRead(); testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, SMALLFILE_READ, 1, totalReadCalls); + /* + * Test to verify Random Read Type. + * Setting Read Policy to Parquet ensures Random Read Type. + */ + fileSize = 3 * ONE_MB; // To make sure multiple blocks are read. + totalReadCalls += 3; // Full file will be read along with footer. + doReturn(FS_OPTION_OPENFILE_READ_POLICY_PARQUET).when(spiedConfig).getAbfsReadPolicy(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, RANDOM_READ, 1, totalReadCalls); + /* * Test to verify Direct Read Type and a read from random position. * Separate AbfsInputStream method needs to be called. @@ -904,11 +919,11 @@ public void testReadTypeInTracingContextHeader() throws Exception { private void testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem fs, int fileSize, ReadType readType, int numOfReadCalls, int totalReadCalls) throws Exception { Path testPath = createTestFile(fs, fileSize); - readFile(fs, testPath, fileSize); + readFile(fs, testPath, fileSize, readType); assertReadTypeInClientRequestId(fs, numOfReadCalls, totalReadCalls, readType); } - /* + /** * Test to verify that both conditions of prefetch read and respective config * enabled needs to be true for the priority header to be added */ @@ -937,6 +952,101 @@ public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs() executePrefetchReadTest(tracingContext1, configuration1, false); } + /** + * Test to verify that the correct AbfsInputStream instance is created + * based on the read policy set in AbfsConfiguration. + */ + @Test + public void testAbfsInputStreamInstance() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + Path path = new Path("/testPath"); + fs.create(path).close(); + + // Assert that Sequential Read Policy uses Prefetch Input Stream + getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL); + InputStream stream = fs.open(path).getWrappedStream(); + assertThat(stream).isInstanceOf(AbfsPrefetchInputStream.class); + stream.close(); + + // Assert that Adaptive Read Policy uses Adaptive Input Stream + getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE); + stream = fs.open(path).getWrappedStream(); + assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class); + stream.close(); + + // Assert that Parquet Read Policy uses Random Input Stream + getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_PARQUET); + stream = fs.open(path).getWrappedStream(); + assertThat(stream).isInstanceOf(AbfsRandomInputStream.class); + stream.close(); + + // Assert that Avro Read Policy uses Adaptive Input Stream + getAbfsStore(fs).getAbfsConfiguration().setAbfsReadPolicy(FS_OPTION_OPENFILE_READ_POLICY_AVRO); + stream = fs.open(path).getWrappedStream(); + assertThat(stream).isInstanceOf(AbfsAdaptiveInputStream.class); + stream.close(); + } + + /** + * Test to verify that Random Input Stream does not queue prefetches. + * @throws Exception if any error occurs during the test + */ + @Test + public void testRandomInputStreamDoesNotQueuePrefetches() throws Exception { + AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + AbfsConfiguration spiedConfig = Mockito.spy(spiedStore.getAbfsConfiguration()); + AbfsClient spiedClient = Mockito.spy(spiedStore.getClient()); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize(); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize(); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); + Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); + Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration(); + + int fileSize = 3 * ONE_MB; // To make sure multiple blocks are read. + int totalReadCalls = 3; + Mockito.doReturn(3).when(spiedConfig).getReadAheadQueueDepth(); + Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_PARQUET).when(spiedConfig).getAbfsReadPolicy(); + testReadTypeInTracingContextHeaderInternal(spiedFs, fileSize, RANDOM_READ, 3, totalReadCalls); + } + + /** + * Test to verify that Adaptive Input Stream queues prefetches for in-order reads + * and performs random reads for out-of-order seeks. + * @throws Exception if any error occurs during the test + */ + @Test + public void testAdaptiveInputStream() throws Exception { + AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + AbfsConfiguration spiedConfig = Mockito.spy(spiedStore.getAbfsConfiguration()); + AbfsClient spiedClient = Mockito.spy(spiedStore.getClient()); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize(); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize(); + Mockito.doReturn(ONE_KB).when(spiedConfig).getReadAheadRange(); + Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE).when(spiedConfig).getAbfsReadPolicy(); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); + Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); + Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration(); + + int fileSize = 10 * ONE_MB; + Path testPath = createTestFile(spiedFs, fileSize); + + try (FSDataInputStream iStream = spiedFs.open(testPath)) { + assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsAdaptiveInputStream.class); + + // In order reads trigger prefetches in adaptive stream + int bytesRead = iStream.read(new byte[2 * ONE_MB], 0, 2 * ONE_MB); + assertReadTypeInClientRequestId(spiedFs, 3, 3, PREFETCH_READ); + assertThat(bytesRead).isEqualTo(2 * ONE_MB); + + // Out of order seek causes random read + iStream.seek(7 * ONE_MB); + bytesRead = iStream.read(new byte[ONE_MB/2], 0, ONE_MB/2); + assertReadTypeInClientRequestId(spiedFs, 1, 4, RANDOM_READ); + } + } + /* * Helper method to execute read and verify if priority header is added or not as expected */ @@ -1005,8 +1115,15 @@ private Path createTestFile(AzureBlobFileSystem fs, int fileSize) throws Excepti return testPath; } - private void readFile(AzureBlobFileSystem fs, Path testPath, int fileSize) throws Exception { + private void readFile(AzureBlobFileSystem fs, Path testPath, int fileSize, ReadType readType) throws Exception { try (FSDataInputStream iStream = fs.open(testPath)) { + if (readType == PREFETCH_READ || readType == MISSEDCACHE_READ) { + assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsPrefetchInputStream.class); + } else if (readType == NORMAL_READ) { + assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsAdaptiveInputStream.class); + } else if (readType == RANDOM_READ) { + assertThat(iStream.getWrappedStream()).isInstanceOf(AbfsRandomInputStream.class); + } int bytesRead = iStream.read(new byte[fileSize], 0, fileSize); assertThat(fileSize) @@ -1027,6 +1144,7 @@ private void assertReadTypeInClientRequestId(AzureBlobFileSystem fs, int numOfRe ArgumentCaptor captor8 = ArgumentCaptor.forClass(ContextEncryptionAdapter.class); ArgumentCaptor captor9 = ArgumentCaptor.forClass(TracingContext.class); + List paths = captor1.getAllValues(); verify(fs.getAbfsStore().getClient(), times(totalReadCalls)).read( captor1.capture(), captor2.capture(), captor3.capture(), captor4.capture(), captor5.capture(), captor6.capture(), @@ -1071,8 +1189,14 @@ private void verifyHeaderForReadTypeInTracingContextHeader(TracingContext tracin } assertThat(idList[OPERATION_INDEX]).describedAs("Operation Type Should Be Read") .isEqualTo(FSOperationType.READ.toString()); - assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing context header should match") - .isEqualTo(readType.toString()); + if (readType == PREFETCH_READ) { + // For prefetch read, it might be missed cache as well. + assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing context header should match") + .isIn(PREFETCH_READ.toString(), MISSEDCACHE_READ.toString()); + } else { + assertThat(idList[READTYPE_INDEX]).describedAs("Read type in tracing context header should match") + .isEqualTo(readType.toString()); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java index e94c535bd3900..a7cbc83f0c7c4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java @@ -268,6 +268,7 @@ public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception { ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()); int initialBuffers = bufferManagerV2.getMinBufferPoolSize(); assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers); + bufferManagerV2.setMinBufferPoolSize(initialBuffers - 5); // allow downscale running = true; Thread t = new Thread(() -> { while (running) { @@ -314,6 +315,7 @@ public void testReadMetricUpdation() throws Exception { bufferManagerV2.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad()); int initialBuffers = bufferManagerV2.getMinBufferPoolSize(); assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers); + bufferManagerV2.setMinBufferPoolSize(initialBuffers - 5); // allow downscale running = true; Thread t = new Thread(() -> { while (running) {