diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index b72b7e83d9cb0..30b2c25b7a98f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -386,6 +386,41 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ENABLE_READAHEAD) private boolean enabledReadAhead; + @BooleanConfigurationValidatorAnnotation( + ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2, + DefaultValue = DEFAULT_ENABLE_READAHEAD_V2) + private boolean isReadAheadV2Enabled; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE, + DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE) + private int minReadAheadV2ThreadPoolSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE, + DefaultValue = DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE) + private int maxReadAheadV2ThreadPoolSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE, + DefaultValue = DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE) + private int minReadAheadV2BufferPoolSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE, + DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE) + private int maxReadAheadV2BufferPoolSize; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS, + DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS) + private int readAheadExecutorServiceTTLMillis; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = + FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS, + DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS) + private int readAheadV2CachedBufferTTLMillis; + @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS, MinValue = 0, DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS) @@ -1392,6 +1427,54 @@ public boolean isReadAheadEnabled() { return this.enabledReadAhead; } + public int getMinReadAheadV2ThreadPoolSize() { + if (minReadAheadV2ThreadPoolSize <= 0) { + // If the minReadAheadV2ThreadPoolSize is not set, use the default value + return 2 * Runtime.getRuntime().availableProcessors(); + } + return minReadAheadV2ThreadPoolSize; + } + + public int getMaxReadAheadV2ThreadPoolSize() { + if (maxReadAheadV2ThreadPoolSize <= 0) { + // If the maxReadAheadV2ThreadPoolSize is not set, use the default value + return 4 * Runtime.getRuntime().availableProcessors(); + } + return maxReadAheadV2ThreadPoolSize; + } + + public int getMinReadAheadV2BufferPoolSize() { + if (minReadAheadV2BufferPoolSize <= 0) { + // If the minReadAheadV2BufferPoolSize is not set, use the default value + return 2 * Runtime.getRuntime().availableProcessors(); + } + return minReadAheadV2BufferPoolSize; + } + + public int getMaxReadAheadV2BufferPoolSize() { + if (maxReadAheadV2BufferPoolSize <= 0) { + // If the maxReadAheadV2BufferPoolSize is not set, use the default value + return 4 * Runtime.getRuntime().availableProcessors(); + } + return maxReadAheadV2BufferPoolSize; + } + + public int getReadAheadExecutorServiceTTLInMillis() { + return readAheadExecutorServiceTTLMillis; + } + + public int getReadAheadV2CachedBufferTTLMillis() { + return readAheadV2CachedBufferTTLMillis; + } + + /** + * Checks if the read-ahead v2 feature is enabled by user. + * @return true if read-ahead v2 is enabled, false otherwise. + */ + public boolean isReadAheadV2Enabled() { + return this.isReadAheadV2Enabled; + } + @VisibleForTesting void setReadAheadEnabled(final boolean enabledReadAhead) { this.enabledReadAhead = enabledReadAhead; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 999b5371763f2..2732c0ed8fb31 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -956,6 +956,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()) .withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth()) .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends()) .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled()) + .isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled()) .withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely()) .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead()) .withFooterReadBufferSize(footerReadBufferSize) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index eaad2737e5627..7d73f1a3fe7fc 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -259,10 +259,46 @@ public final class ConfigurationKeys { public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; /** - * Enable or disable readahead buffer in AbfsInputStream. + * Enable or disable readahead V1 in AbfsInputStream. * Value: {@value}. */ public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead"; + /** + * Enable or disable readahead V2 in AbfsInputStream. This will work independent of V1. + * Value: {@value}. + */ + public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead.v2"; + + /** + * Minimum number of prefetch threads in the thread pool for readahead V2. + * {@value } + */ + public static final String FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE = "fs.azure.readahead.v2.min.thread.pool.size"; + /** + * Maximum number of prefetch threads in the thread pool for readahead V2. + * {@value } + */ + public static final String FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE = "fs.azure.readahead.v2.max.thread.pool.size"; + /** + * Minimum size of the buffer pool for caching prefetched data for readahead V2. + * {@value } + */ + public static final String FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.min.buffer.pool.size"; + /** + * Maximum size of the buffer pool for caching prefetched data for readahead V2. + * {@value } + */ + public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.max.buffer.pool.size"; + + /** + * TTL in milliseconds for the idle threads in executor service used by read ahead v2. + */ + public static final String FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = "fs.azure.readahead.v2.executor.service.ttl.millis"; + + /** + * TTL in milliseconds for the cached buffers in buffer pool used by read ahead v2. + */ + public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = "fs.azure.readahead.v2.cached.buffer.ttl.millis"; /** Setting this true will make the driver use it's own RemoteIterator implementation */ public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 104708da72c61..92fb6da79b43b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -128,6 +128,14 @@ public final class FileSystemConfigurations { public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120; public static final boolean DEFAULT_ENABLE_READAHEAD = true; + public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false; + public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1; + public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1; + public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1; + public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1; + public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 3_000; + public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 6_000; + public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING; public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 4f3c04147e51c..38b49603fbb00 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -80,6 +80,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, private final String eTag; // eTag of the path when InputStream are created private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; + private final boolean readAheadV2Enabled; // whether enable readAhead V2; private final String inputStreamId; private final boolean alwaysReadBufferSize; /* @@ -131,6 +132,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, /** ABFS instance to be held by the input stream to avoid GC close. */ private final BackReference fsBackRef; + private ReadBufferManager readBufferManager; public AbfsInputStream( final AbfsClient client, @@ -151,6 +153,7 @@ public AbfsInputStream( this.eTag = eTag; this.readAheadRange = abfsInputStreamContext.getReadAheadRange(); this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled(); + this.readAheadV2Enabled = abfsInputStreamContext.isReadAheadV2Enabled(); this.alwaysReadBufferSize = abfsInputStreamContext.shouldReadBufferSizeAlways(); this.bufferedPreadDisabled = abfsInputStreamContext @@ -175,9 +178,19 @@ public AbfsInputStream( this.fsBackRef = abfsInputStreamContext.getFsBackRef(); contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter(); - // Propagate the config values to ReadBufferManager so that the first instance - // to initialize can set the readAheadBlockSize - ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize); + /* + * Initialize the ReadBufferManager based on whether readAheadV2 is enabled or not. + * Precedence is given to ReadBufferManagerV2. + * If none of the V1 and V2 are enabled, then no read ahead will be done. + */ + if (readAheadV2Enabled) { + ReadBufferManagerV2.setReadBufferManagerConfigs( + readAheadBlockSize, client.getAbfsConfiguration()); + readBufferManager = ReadBufferManagerV2.getBufferManager(); + } else { + ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize); + readBufferManager = ReadBufferManagerV1.getBufferManager(); + } if (streamStatistics != null) { ioStatistics = streamStatistics.getIOStatistics(); } @@ -499,7 +512,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){ private int readInternal(final long position, final byte[] b, final int offset, final int length, final boolean bypassReadAhead) throws IOException { - if (readAheadEnabled && !bypassReadAhead) { + if (isReadAheadEnabled() && !bypassReadAhead) { // try reading from read-ahead if (offset != 0) { throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets"); @@ -519,7 +532,7 @@ private int readInternal(final long position, final byte[] b, final int offset, while (numReadAheads > 0 && nextOffset < contentLength) { LOG.debug("issuing read ahead requestedOffset = {} requested size {}", nextOffset, nextSize); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize, + readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize, new TracingContext(readAheadTracingContext)); nextOffset = nextOffset + nextSize; numReadAheads--; @@ -528,7 +541,7 @@ private int readInternal(final long position, final byte[] b, final int offset, } // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); + receivedBytes = readBufferManager.getBlock(this, position, length, b); bytesFromReadAhead += receivedBytes; if (receivedBytes > 0) { incrementReadOps(); @@ -732,7 +745,9 @@ public boolean seekToNewSource(long l) throws IOException { public synchronized void close() throws IOException { LOG.debug("Closing {}", this); closed = true; - ReadBufferManager.getBufferManager().purgeBuffersForStream(this); + if (readBufferManager != null) { + readBufferManager.purgeBuffersForStream(this); + } buffer = null; // de-reference the buffer so it can be GC'ed sooner if (contextEncryptionAdapter != null) { contextEncryptionAdapter.destroy(); @@ -785,9 +800,14 @@ byte[] getBuffer() { return buffer; } + /** + * Checks if any version of read ahead is enabled. + * If both are disabled, then skip read ahead logic. + * @return true if read ahead is enabled, false otherwise. + */ @VisibleForTesting public boolean isReadAheadEnabled() { - return readAheadEnabled; + return (readAheadEnabled || readAheadV2Enabled) && readBufferManager != null; } @VisibleForTesting diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index fdcad5ac3a0d0..f6272492d6081 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -41,6 +41,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean isReadAheadEnabled = true; + private boolean isReadAheadV2Enabled; + private boolean alwaysReadBufferSize; private int readAheadBlockSize; @@ -91,6 +93,12 @@ public AbfsInputStreamContext isReadAheadEnabled( return this; } + public AbfsInputStreamContext isReadAheadV2Enabled( + final boolean isReadAheadV2Enabled) { + this.isReadAheadV2Enabled = isReadAheadV2Enabled; + return this; + } + public AbfsInputStreamContext withReadAheadRange( final int readAheadRange) { this.readAheadRange = readAheadRange; @@ -181,6 +189,10 @@ public boolean isReadAheadEnabled() { return isReadAheadEnabled; } + public boolean isReadAheadV2Enabled() { + return isReadAheadV2Enabled; + } + public int getReadAheadRange() { return readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 031545f57a193..9ee128fbc3275 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -15,636 +15,287 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.fs.azurebfs.services; -import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Stack; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_BLOCK_SIZE; /** - * The Read Buffer Manager for Rest AbfsClient. + * Abstract class for managing read buffers for Azure Blob File System input streams. */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - private static final int ONE_KB = 1024; - private static final int ONE_MB = ONE_KB * ONE_KB; - - private static final int NUM_BUFFERS = 16; - private static final int NUM_THREADS = 8; - private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private static int blockSize = 4 * ONE_MB; - private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack freeList = new Stack<>(); // indices in buffers[] array that are available +public abstract class ReadBufferManager { + protected static final Logger LOGGER = LoggerFactory.getLogger( + ReadBufferManager.class); + protected static final ReentrantLock LOCK = new ReentrantLock(); + private static int thresholdAgeMilliseconds; + private static int blockSize = DEFAULT_READ_AHEAD_BLOCK_SIZE; // default block size for read-ahead in bytes + private Stack freeList = new Stack<>(); // indices in buffers[] array that are available private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading - private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block - private static final ReentrantLock LOCK = new ReentrantLock(); - - static ReadBufferManager getBufferManager() { - if (bufferManager == null) { - LOCK.lock(); - try { - if (bufferManager == null) { - bufferManager = new ReadBufferManager(); - bufferManager.init(); - } - } finally { - LOCK.unlock(); - } - } - return bufferManager; - } - static void setReadBufferManagerConfigs(int readAheadBlockSize) { - if (bufferManager == null) { - LOGGER.debug( - "ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}", - readAheadBlockSize); - blockSize = readAheadBlockSize; - } - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("ABFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); - } - - - /* - * - * AbfsInputStream-facing methods - * + /** + * Initializes the ReadBufferManager singleton instance. Creates the read buffers and threads. + * This method should be called once to set up the read buffer manager. */ - + abstract void init(); /** - * {@link AbfsInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link AbfsInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read + * Queues a read-ahead request from {@link AbfsInputStream} + * for a given offset in file and given length. + * @param stream the input stream requesting the read-ahead + * @param requestedOffset the offset in the remote file to start reading + * @param requestedLength the number of bytes to read from file + * @param tracingContext the tracing context for diagnostics */ - void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, - TracingContext tracingContext) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - buffer.setTracingContext(tracingContext); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - } - + abstract void queueReadAhead(AbfsInputStream stream, + long requestedOffset, + int requestedLength, + TracingContext tracingContext); /** + * Gets a block of data from the prefetched data by ReadBufferManager. * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do its own + * read to get the data faster (compared to the read waiting in queue for an indeterminate amount of time). * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read + * @param stream the input stream requesting the block + * @param position the position in the file to read from + * @param length the number of bytes to read + * @param buffer the buffer to store the read data + * @return the number of bytes actually read + * @throws IOException if an I/O error occurs */ - int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) - throws IOException { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } + abstract int getBlock(AbfsInputStream stream, + long position, + int length, + byte[] buffer) throws IOException; - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods + /** + * {@link ReadBufferWorker} calls this to get the next buffer to read from read-ahead queue. + * Requested read will be performed by background thread. * + * @return the next {@link ReadBuffer} to read + * @throws InterruptedException if interrupted while waiting */ - - private void waitForProcess(final AbfsInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } + abstract ReadBuffer getNextBlockToRead() throws InterruptedException; /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. + * Marks the specified buffer as done reading and updates its status. + * Called by {@link ReadBufferWorker} after reading is complete. * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer + * @param buffer the buffer that was read by worker thread + * @param result the status of the read operation + * @param bytesActuallyRead the number of bytes actually read by worker thread. */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - long currentTimeInMs = currentTimeMillis(); - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } - - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - // Failed read buffers (with buffer index=-1) that are older than - // thresholdAge should be cleaned up, but at the same time should not - // report successful eviction. - // Queue logic expects that a buffer is freed up for read ahead when - // eviction is successful, whereas a failed ReadBuffer would have released - // its buffer when its status was set to READ_FAILED. - long earliestBirthday = Long.MAX_VALUE; - ArrayList oldFailedBuffers = new ArrayList<>(); - for (ReadBuffer buf : completedReadList) { - if ((buf.getBufferindex() != -1) - && (buf.getTimeStamp() < earliestBirthday)) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } else if ((buf.getBufferindex() == -1) - && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) { - oldFailedBuffers.add(buf); - } - } - - for (ReadBuffer buf : oldFailedBuffers) { - evict(buf); - } - - if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - LOGGER.trace("No buffer eligible for eviction"); - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, - // avoid adding it to freeList. - if (buf.getBufferindex() != -1) { - freeList.push(buf.getBufferindex()); - } - - completedReadList.remove(buf); - buf.setTracingContext(null); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } + abstract void doneReading(ReadBuffer buffer, + ReadBufferStatus result, + int bytesActuallyRead); /** - * Returns buffers that failed or passed from completed queue. - * @param stream - * @param requestedOffset - * @return + * Purging the buffers associated with an {@link AbfsInputStream} + * from {@link ReadBufferManager} when stream is closed. + * + * @param stream the input stream whose buffers should be purged. */ - private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : completedReadList) { - // Buffer is returned if the requestedOffset is at or above buffer's - // offset but less than buffer's length or the actual requestedLength - if ((buffer.getStream() == stream) - && (requestedOffset >= buffer.getOffset()) - && ((requestedOffset < buffer.getOffset() + buffer.getLength()) - || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { - return buffer; - } - } - - return null; - } - - private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, - final byte[] buffer) throws IOException { - ReadBuffer buf = getBufferFromCompletedQueue(stream, position); - - if (buf == null) { - return 0; - } + abstract void purgeBuffersForStream(AbfsInputStream stream); - if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { - // To prevent new read requests to fail due to old read-ahead attempts, - // return exception only from buffers that failed within last thresholdAgeMilliseconds - if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) { - throw buf.getErrException(); - } else { - return 0; - } - } - if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) - || (position >= buf.getOffset() + buf.getLength())) { - return 0; - } + // Following Methods are for testing purposes only and should not be used in production code. - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } - - /* - * - * ReadBufferWorker-thread-facing methods + /** + * Gets the number of buffers currently managed by the read buffer manager. * + * @return the number of buffers */ + @VisibleForTesting + abstract int getNumBuffers(); /** - * ReadBufferWorker thread calls this to get the next buffer that it should work on. - * - * @return {@link ReadBuffer} - * @throws InterruptedException if thread is interrupted + * Attempts to evict buffers based on the eviction policy. */ - ReadBuffer getNextBlockToRead() throws InterruptedException { - ReadBuffer buffer = null; - synchronized (this) { - //buffer = readAheadQueue.take(); // blocking method - while (readAheadQueue.size() == 0) { - wait(); - } - buffer = readAheadQueue.remove(); - notifyAll(); - if (buffer == null) { - return null; // should never happen - } - buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); - inProgressList.add(buffer); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker picked file {} for offset {}", - buffer.getStream().getPath(), buffer.getOffset()); - } - return buffer; - } + @VisibleForTesting + abstract void callTryEvict(); /** - * ReadBufferWorker thread calls this method to post completion. + * Resets the read buffer manager for testing purposes. Clean up the current + * state of readAhead buffers and the lists. Will also trigger a fresh init. + */ + @VisibleForTesting + abstract void testResetReadBufferManager(); + + /** + * Resets the read buffer manager for testing with the specified block size and threshold age. * - * @param buffer the buffer whose read was completed - * @param result the {@link ReadBufferStatus} after the read operation in the worker thread - * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read + * @param readAheadBlockSize the block size for read-ahead + * @param thresholdAgeMilliseconds the threshold age in milliseconds */ - void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); - } - synchronized (this) { - // If this buffer has already been purged during - // close of InputStream then we don't update the lists. - if (inProgressList.contains(buffer)) { - inProgressList.remove(buffer); - if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setLength(bytesActuallyRead); - } else { - freeList.push(buffer.getBufferindex()); - // buffer will be deleted as per the eviction policy. - } - // completed list also contains FAILED read buffers - // for sending exception message to clients. - buffer.setStatus(result); - buffer.setTimeStamp(currentTimeMillis()); - completedReadList.add(buffer); - } - } + @VisibleForTesting + abstract void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds); - //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results - buffer.getLatch().countDown(); // wake up waiting threads (if any) - } + /** + * Resets the buffer manager instance to null for testing purposes. + * This allows for reinitialization in tests. + */ + abstract void resetBufferManager(); /** - * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). - * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), - * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. - * Note: it is not monotonic across Sockets, and even within a CPU, its only the - * more recent parts which share a clock across all cores. + * Gets the threshold age in milliseconds for buffer eviction. * - * @return current time in milliseconds + * @return the threshold age in milliseconds */ - private long currentTimeMillis() { - return System.nanoTime() / 1000 / 1000; - } - @VisibleForTesting - int getThresholdAgeMilliseconds() { + protected static int getThresholdAgeMilliseconds() { return thresholdAgeMilliseconds; } + /** + * Sets the threshold age in milliseconds for buffer eviction. + * + * @param thresholdAgeMs the threshold age in milliseconds + */ @VisibleForTesting - static void setThresholdAgeMilliseconds(int thresholdAgeMs) { + protected static void setThresholdAgeMilliseconds(int thresholdAgeMs) { thresholdAgeMilliseconds = thresholdAgeMs; } + /** + * Gets the block size used for read-ahead operations. + * + * @return the read-ahead block size in bytes + */ @VisibleForTesting - int getCompletedReadListSize() { - return completedReadList.size(); - } - - @VisibleForTesting - public synchronized List getCompletedReadListCopy() { - return new ArrayList<>(completedReadList); - } - - @VisibleForTesting - public synchronized List getFreeListCopy() { - return new ArrayList<>(freeList); + protected static int getReadAheadBlockSize() { + return blockSize; } + /** + * Sets the block size used for read-ahead operations. + * + * @param readAheadBlockSize the read-ahead block size in bytes + */ @VisibleForTesting - public synchronized List getReadAheadQueueCopy() { - return new ArrayList<>(readAheadQueue); + protected static void setReadAheadBlockSize(int readAheadBlockSize) { + if (readAheadBlockSize <= 0) { + throw new IllegalArgumentException("Read-ahead block size must be positive"); + } + blockSize = readAheadBlockSize; } - @VisibleForTesting - public synchronized List getInProgressCopiedList() { - return new ArrayList<>(inProgressList); + /** + * Gets the stack of free buffer indices. + * + * @return the stack of free buffer indices + */ + public Stack getFreeList() { + return freeList; } - @VisibleForTesting - void callTryEvict() { - tryEvict(); + /** + * Gets the queue of read-ahead requests. + * + * @return the queue of {@link ReadBuffer} objects in the read-ahead queue + */ + public Queue getReadAheadQueue() { + return readAheadQueue; } - /** - * Purging the buffers associated with an {@link AbfsInputStream} - * from {@link ReadBufferManager} when stream is closed. - * @param stream input stream. + * Gets the list of in-progress read buffers. + * + * @return the list of {@link ReadBuffer} objects that are currently being processed */ - public synchronized void purgeBuffersForStream(AbfsInputStream stream) { - LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); - readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); - purgeList(stream, completedReadList); + public LinkedList getInProgressList() { + return inProgressList; } /** - * Method to remove buffers associated with a {@link AbfsInputStream} - * when its close method is called. - * NOTE: This method is not threadsafe and must be called inside a - * synchronised block. See caller. - * @param stream associated input stream. - * @param list list of buffers like {@link this#completedReadList} - * or {@link this#inProgressList}. + * Gets the list of completed read buffers. + * + * @return the list of {@link ReadBuffer} objects that have been read and are available for use */ - private void purgeList(AbfsInputStream stream, LinkedList list) { - for (Iterator it = list.iterator(); it.hasNext();) { - ReadBuffer readBuffer = it.next(); - if (readBuffer.getStream() == stream) { - it.remove(); - // As failed ReadBuffers (bufferIndex = -1) are already pushed to free - // list in doneReading method, we will skip adding those here again. - if (readBuffer.getBufferindex() != -1) { - freeList.push(readBuffer.getBufferindex()); - } - } - } + public LinkedList getCompletedReadList() { + return completedReadList; } + /** - * Test method that can clean up the current state of readAhead buffers and - * the lists. Will also trigger a fresh init. + * Gets a copy of the list of free buffer indices. + * + * @return a list of free buffer indices */ @VisibleForTesting - void testResetReadBufferManager() { - synchronized (this) { - ArrayList completedBuffers = new ArrayList<>(); - for (ReadBuffer buf : completedReadList) { - if (buf != null) { - completedBuffers.add(buf); - } - } - - for (ReadBuffer buf : completedBuffers) { - evict(buf); - } - - readAheadQueue.clear(); - inProgressList.clear(); - completedReadList.clear(); - freeList.clear(); - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = null; - } - buffers = null; - resetBufferManager(); - } + protected synchronized List getFreeListCopy() { + return new ArrayList<>(freeList); } /** - * Reset buffer manager to null. + * Gets a copy of the read-ahead queue. + * + * @return a list of {@link ReadBuffer} objects in the read-ahead queue */ @VisibleForTesting - static void resetBufferManager() { - bufferManager = null; + protected synchronized List getReadAheadQueueCopy() { + return new ArrayList<>(readAheadQueue); } /** - * Reset readAhead buffer to needed readAhead block size and - * thresholdAgeMilliseconds. - * @param readAheadBlockSize - * @param thresholdAgeMilliseconds + * Gets a copy of the list of in-progress read buffers. + * + * @return a list of in-progress {@link ReadBuffer} objects */ @VisibleForTesting - void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) { - setBlockSize(readAheadBlockSize); - setThresholdAgeMilliseconds(thresholdAgeMilliseconds); - testResetReadBufferManager(); + protected synchronized List getInProgressCopiedList() { + return new ArrayList<>(inProgressList); } + /** + * Gets a copy of the list of completed read buffers. + * + * @return a list of completed {@link ReadBuffer} objects + */ @VisibleForTesting - static void setBlockSize(int readAheadBlockSize) { - blockSize = readAheadBlockSize; + protected synchronized List getCompletedReadListCopy() { + return new ArrayList<>(completedReadList); } + /** + * Gets the size of the completed read list. + * + * @return the number of completed read buffers + */ @VisibleForTesting - int getReadAheadBlockSize() { - return blockSize; + protected int getCompletedReadListSize() { + return completedReadList.size(); } /** - * Test method that can mimic no free buffers scenario and also add a ReadBuffer - * into completedReadList. This readBuffer will get picked up by TryEvict() - * next time a new queue request comes in. - * @param buf that needs to be added to completedReadlist + * Simulates full buffer usage and adds a failed buffer for testing. + * + * @param buf the buffer to add as failed */ @VisibleForTesting - void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) { + protected void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) { freeList.clear(); completedReadList.add(buf); } - - @VisibleForTesting - int getNumBuffers() { - return NUM_BUFFERS; - } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java new file mode 100644 index 0000000000000..fe1ac3fa1f235 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -0,0 +1,612 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * The Read Buffer Manager for Rest AbfsClient. + * V1 implementation of ReadBufferManager. + */ +final class ReadBufferManagerV1 extends ReadBufferManager { + + private static final int NUM_BUFFERS = 16; + private static final int NUM_THREADS = 8; + private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; + + private Thread[] threads = new Thread[NUM_THREADS]; + private byte[][] buffers; + private static ReadBufferManagerV1 bufferManager; + + // hide instance constructor + private ReadBufferManagerV1() { + LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); + } + + /** + * Sets the read buffer manager configurations. + * @param readAheadBlockSize the size of the read-ahead block in bytes + */ + static void setReadBufferManagerConfigs(int readAheadBlockSize) { + if (bufferManager == null) { + LOGGER.debug( + "ReadBufferManagerV1 not initialized yet. Overriding readAheadBlockSize as {}", + readAheadBlockSize); + setReadAheadBlockSize(readAheadBlockSize); + setThresholdAgeMilliseconds(DEFAULT_THRESHOLD_AGE_MILLISECONDS); + } + } + + /** + * Returns the singleton instance of ReadBufferManagerV1. + * @return the singleton instance of ReadBufferManagerV1 + */ + static ReadBufferManagerV1 getBufferManager() { + if (bufferManager == null) { + LOCK.lock(); + try { + if (bufferManager == null) { + bufferManager = new ReadBufferManagerV1(); + bufferManager.init(); + } + } finally { + LOCK.unlock(); + } + } + return bufferManager; + } + + /** + * {@inheritDoc} + */ + @Override + void init() { + buffers = new byte[NUM_BUFFERS][]; + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. These byte arrays are never garbage collected + getFreeList().add(i); + } + for (int i = 0; i < NUM_THREADS; i++) { + Thread t = new Thread(new ReadBufferWorker(i, this)); + t.setDaemon(true); + threads[i] = t; + t.setName("ABFS-prefetch-" + i); + t.start(); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + /** + * {@inheritDoc} + */ + @Override + public void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + TracingContext tracingContext) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", + stream.getPath(), requestedOffset, requestedLength); + } + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream, requestedOffset)) { + return; // already queued, do not queue again + } + if (getFreeList().isEmpty() && !tryEvict()) { + return; // no buffers available, cannot queue anything + } + + buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.setTracingContext(tracingContext); + + Integer bufferIndex = getFreeList().pop(); // will return a value, since we have checked size > 0 already + + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + getReadAheadQueue().add(buffer); + notifyAll(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws IOException { + // not synchronized, so have to be careful with locking + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("getBlock for file {} position {} thread {}", + stream.getPath(), position, Thread.currentThread().getName()); + } + + waitForProcess(stream, position); + + int bytesRead = 0; + synchronized (this) { + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); + } + if (bytesRead > 0) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done read from Cache for {} position {} length {}", + stream.getPath(), position, bytesRead); + } + return bytesRead; + } + + // otherwise, just say we got nothing - calling thread can do its own read + return 0; + } + + /** + * {@inheritDoc} + */ + @Override + public ReadBuffer getNextBlockToRead() throws InterruptedException { + ReadBuffer buffer = null; + synchronized (this) { + while (getReadAheadQueue().isEmpty()) { + wait(); + } + buffer = getReadAheadQueue().remove(); + notifyAll(); + if (buffer == null) { + return null; // should never happen + } + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); + getInProgressList().add(buffer); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker picked file {} for offset {}", + buffer.getStream().getPath(), buffer.getOffset()); + } + return buffer; + } + + /** + * {@inheritDoc} + */ + @Override + public void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); + } + synchronized (this) { + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. + if (getInProgressList().contains(buffer)) { + getInProgressList().remove(buffer); + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setLength(bytesActuallyRead); + } else { + getFreeList().push(buffer.getBufferindex()); + // buffer will be deleted as per the eviction policy. + } + // completed list also contains FAILED read buffers + // for sending exception message to clients. + buffer.setStatus(result); + buffer.setTimeStamp(currentTimeMillis()); + getCompletedReadList().add(buffer); + } + } + + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + buffer.getLatch().countDown(); // wake up waiting threads (if any) + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized void purgeBuffersForStream(AbfsInputStream stream) { + LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); + getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() == stream); + purgeList(stream, getCompletedReadList()); + } + + /** + * Waits for the process to complete for the given stream and position. + * If the buffer is in progress, it waits for the latch to be released. + * If the buffer is not in progress, it clears it from the read-ahead queue. + * + * @param stream the AbfsInputStream associated with the read request + * @param position the position in the stream to wait for + */ + private void waitForProcess(final AbfsInputStream stream, final long position) { + ReadBuffer readBuf; + synchronized (this) { + clearFromReadAheadQueue(stream, position); + readBuf = getFromList(getInProgressList(), stream, position); + } + if (readBuf != null) { // if in in-progress queue, then block for it + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", + stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); + } + readBuf.getLatch().await(); // blocking wait on the caller stream's thread + // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread + // is done processing it (in doneReading). There, the latch is set after removing the buffer from + // inProgressList. So this latch is safe to be outside the synchronized block. + // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock + // while waiting, so no one will be able to change any state. If this becomes more complex in the future, + // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("latch done for file {} buffer idx {} length {}", + stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); + } + } + } + + /** + * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. + * The objective is to find just one buffer - there is no advantage to evicting more than one. + * + * @return whether the eviction succeeeded - i.e., were we able to free up one buffer + */ + private synchronized boolean tryEvict() { + ReadBuffer nodeToEvict = null; + if (getCompletedReadList().size() <= 0) { + return false; // there are no evict-able buffers + } + + long currentTimeInMs = currentTimeMillis(); + + // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + for (ReadBuffer buf : getCompletedReadList()) { + if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + nodeToEvict = buf; + break; + } + } + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try buffers where any bytes have been consumed (maybe a bad idea? have to experiment and see) + for (ReadBuffer buf : getCompletedReadList()) { + if (buf.isAnyByteConsumed()) { + nodeToEvict = buf; + break; + } + } + + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try any old nodes that have not been consumed + // Failed read buffers (with buffer index=-1) that are older than + // thresholdAge should be cleaned up, but at the same time should not + // report successful eviction. + // Queue logic expects that a buffer is freed up for read ahead when + // eviction is successful, whereas a failed ReadBuffer would have released + // its buffer when its status was set to READ_FAILED. + long earliestBirthday = Long.MAX_VALUE; + ArrayList oldFailedBuffers = new ArrayList<>(); + for (ReadBuffer buf : getCompletedReadList()) { + if ((buf.getBufferindex() != -1) + && (buf.getTimeStamp() < earliestBirthday)) { + nodeToEvict = buf; + earliestBirthday = buf.getTimeStamp(); + } else if ((buf.getBufferindex() == -1) + && (currentTimeInMs - buf.getTimeStamp()) > getThresholdAgeMilliseconds()) { + oldFailedBuffers.add(buf); + } + } + + for (ReadBuffer buf : oldFailedBuffers) { + evict(buf); + } + + if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds()) && (nodeToEvict != null)) { + return evict(nodeToEvict); + } + + LOGGER.trace("No buffer eligible for eviction"); + // nothing can be evicted + return false; + } + + /** + * Evicts the given buffer by removing it from the completedReadList and adding its index to the freeList. + * + * @param buf the ReadBuffer to evict + * @return true if eviction was successful, false otherwise + */ + private boolean evict(final ReadBuffer buf) { + // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, + // avoid adding it to freeList. + if (buf.getBufferindex() != -1) { + getFreeList().push(buf.getBufferindex()); + } + + getCompletedReadList().remove(buf); + buf.setTracingContext(null); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", + buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); + } + return true; + } + + /** + * Checks if the requested offset is already queued in any of the lists: + * @param stream the AbfsInputStream associated with the read request + * @param requestedOffset the offset in the stream to check + * @return true if the requested offset is already queued in any of the lists, + */ + private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { + // returns true if any part of the buffer is already queued + return (isInList(getReadAheadQueue(), stream, requestedOffset) + || isInList(getInProgressList(), stream, requestedOffset) + || isInList(getCompletedReadList(), stream, requestedOffset)); + } + + /** + * Checks if the requested offset is in the given list. + * @param list the collection of ReadBuffer to check against + * @param stream the AbfsInputStream associated with the read request + * @param requestedOffset the offset in the stream to check + * @return true if the requested offset is in the list, + */ + private boolean isInList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + return (getFromList(list, stream, requestedOffset) != null); + } + + /** + * Returns the ReadBuffer from the given list that matches the stream and requested offset. + * If the buffer is found, it checks if the requested offset is within the buffer's range. + * @param list the collection of ReadBuffer to search in + * @param stream the AbfsInputStream associated with the read request + * @param requestedOffset the offset in the stream to check + * @return the ReadBuffer if found, null otherwise + */ + private ReadBuffer getFromList(final Collection list, final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : list) { + if (buffer.getStream() == stream) { + if (buffer.getStatus() == ReadBufferStatus.AVAILABLE + && requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getLength()) { + return buffer; + } else if (requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { + return buffer; + } + } + } + return null; + } + + /** + * Returns a ReadBuffer from the completedReadList that matches the stream and requested offset. + * The buffer is returned if the requestedOffset is at or above buffer's offset but less than buffer's length + * or the actual requestedLength. + * + * @param stream the AbfsInputStream associated with the read request + * @param requestedOffset the offset in the stream to check + * @return the ReadBuffer if found, null otherwise + */ + private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : getCompletedReadList()) { + // Buffer is returned if the requestedOffset is at or above buffer's + // offset but less than buffer's length or the actual requestedLength + if ((buffer.getStream() == stream) + && (requestedOffset >= buffer.getOffset()) + && ((requestedOffset < buffer.getOffset() + buffer.getLength()) + || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { + return buffer; + } + } + + return null; + } + + /** + * Clears the buffer from the read-ahead queue for the given stream and requested offset. + * This method is called when the stream is waiting for a process to complete. + * It removes the buffer from the read-ahead queue and adds its index back to the free list. + * + * @param stream the AbfsInputStream associated with the read request + * @param requestedOffset the offset in the stream to check + */ + private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { + ReadBuffer buffer = getFromList(getReadAheadQueue(), stream, requestedOffset); + if (buffer != null) { + getReadAheadQueue().remove(buffer); + notifyAll(); // lock is held in calling method + getFreeList().push(buffer.getBufferindex()); + } + } + + /** + * Gets a block of data from the completed read buffers. + * If the buffer is found, it copies the data to the provided buffer and updates the status of the ReadBuffer. + * If the buffer is not found or not available, it returns 0. + * + * @param stream the AbfsInputStream associated with the read request + * @param position the position in the file to read from + * @param length the number of bytes to read + * @param buffer the buffer to store the read data + * @return the number of bytes actually read + * @throws IOException if an I/O error occurs while reading from the buffer + */ + private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, + final byte[] buffer) throws IOException { + ReadBuffer buf = getBufferFromCompletedQueue(stream, position); + + if (buf == null) { + return 0; + } + + if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { + // To prevent new read requests to fail due to old read-ahead attempts, + // return exception only from buffers that failed within last thresholdAgeMilliseconds + if ((currentTimeMillis() - (buf.getTimeStamp()) < getThresholdAgeMilliseconds())) { + throw buf.getErrException(); + } else { + return 0; + } + } + + if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) + || (position >= buf.getOffset() + buf.getLength())) { + return 0; + } + + int cursor = (int) (position - buf.getOffset()); + int availableLengthInBuffer = buf.getLength() - cursor; + int lengthToCopy = Math.min(length, availableLengthInBuffer); + System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); + if (cursor == 0) { + buf.setFirstByteConsumed(true); + } + if (cursor + lengthToCopy == buf.getLength()) { + buf.setLastByteConsumed(true); + } + buf.setAnyByteConsumed(true); + return lengthToCopy; + } + + /** + * Similar to System.currentTimeMillis, except implemented with System.nanoTime(). + * System.currentTimeMillis can go backwards when system clock is changed (e.g., with NTP time synchronization), + * making it unsuitable for measuring time intervals. nanotime is strictly monotonically increasing per CPU core. + * Note: it is not monotonic across Sockets, and even within a CPU, its only the + * more recent parts which share a clock across all cores. + * + * @return current time in milliseconds + */ + private long currentTimeMillis() { + return System.nanoTime() / 1000 / 1000; + } + + /** + * Method to remove buffers associated with a {@link AbfsInputStream} + * when its close method is called. + * NOTE: This method is not threadsafe and must be called inside a + * synchronised block. See caller. + * @param stream associated input stream. + * @param list list of buffers like completedReadList or inProgressList + */ + private void purgeList(AbfsInputStream stream, LinkedList list) { + for (Iterator it = list.iterator(); it.hasNext();) { + ReadBuffer readBuffer = it.next(); + if (readBuffer.getStream() == stream) { + it.remove(); + // As failed ReadBuffers (bufferIndex = -1) are already pushed to free + // list in doneReading method, we will skip adding those here again. + if (readBuffer.getBufferindex() != -1) { + getFreeList().push(readBuffer.getBufferindex()); + } + } + } + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public int getNumBuffers() { + return NUM_BUFFERS; + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public void callTryEvict() { + tryEvict(); + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public void testResetReadBufferManager() { + synchronized (this) { + ArrayList completedBuffers = new ArrayList<>(); + for (ReadBuffer buf : getCompletedReadList()) { + if (buf != null) { + completedBuffers.add(buf); + } + } + + for (ReadBuffer buf : completedBuffers) { + evict(buf); + } + + getReadAheadQueue().clear(); + getInProgressList().clear(); + getCompletedReadList().clear(); + getFreeList().clear(); + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = null; + } + buffers = null; + resetBufferManager(); + } + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public void testResetReadBufferManager(int readAheadBlockSize, int thresholdAgeMilliseconds) { + setReadAheadBlockSize(readAheadBlockSize); + setThresholdAgeMilliseconds(thresholdAgeMilliseconds); + testResetReadBufferManager(); + } + + @Override + void resetBufferManager() { + setBufferManager(null); // reset the singleton instance + } + + private static void setBufferManager(ReadBufferManagerV1 manager) { + bufferManager = manager; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java new file mode 100644 index 0000000000000..9cce860127dae --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +final class ReadBufferManagerV2 extends ReadBufferManager { + + // Thread Pool Configurations + private static int minThreadPoolSize; + private static int maxThreadPoolSize; + private static int executorServiceKeepAliveTimeInMilliSec; + private ThreadPoolExecutor workerPool; + + // Buffer Pool Configurations + private static int minBufferPoolSize; + private static int maxBufferPoolSize; + private int numberOfActiveBuffers = 0; + private byte[][] bufferPool; + + private static ReadBufferManagerV2 bufferManager; + + // hide instance constructor + private ReadBufferManagerV2() { + LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); + } + + /** + * Sets the read buffer manager configurations. + * @param readAheadBlockSize the size of the read-ahead block in bytes + * @param abfsConfiguration the AbfsConfiguration instance for other configurations + */ + static void setReadBufferManagerConfigs(int readAheadBlockSize, AbfsConfiguration abfsConfiguration) { + if (bufferManager == null) { + minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize(); + maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize(); + executorServiceKeepAliveTimeInMilliSec = abfsConfiguration.getReadAheadExecutorServiceTTLInMillis(); + + minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize(); + maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize(); + setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis()); + setReadAheadBlockSize(readAheadBlockSize); + } + } + + /** + * Returns the singleton instance of ReadBufferManagerV2. + * @return the singleton instance of ReadBufferManagerV2 + */ + static ReadBufferManagerV2 getBufferManager() { + if (bufferManager == null) { + LOCK.lock(); + try { + if (bufferManager == null) { + bufferManager = new ReadBufferManagerV2(); + bufferManager.init(); + } + } finally { + LOCK.unlock(); + } + } + return bufferManager; + } + + /** + * {@inheritDoc} + */ + @Override + void init() { + // Initialize Buffer Pool + bufferPool = new byte[maxBufferPoolSize][]; + for (int i = 0; i < minBufferPoolSize; i++) { + bufferPool[i] = new byte[getReadAheadBlockSize()]; // same buffers are reused. These byte arrays are never garbage collected + getFreeList().add(i); + numberOfActiveBuffers++; + } + + // Initialize a Fixed Size Thread Pool with minThreadPoolSize threads + workerPool = new ThreadPoolExecutor( + minThreadPoolSize, + maxThreadPoolSize, + executorServiceKeepAliveTimeInMilliSec, + TimeUnit.MILLISECONDS, + new SynchronousQueue<>(), + namedThreadFactory); + workerPool.allowCoreThreadTimeOut(true); + for (int i = 0; i < minThreadPoolSize; i++) { + ReadBufferWorker worker = new ReadBufferWorker(i, this); + workerPool.submit(worker); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + /** + * {@inheritDoc} + */ + @Override + public void queueReadAhead(final AbfsInputStream stream, + final long requestedOffset, + final int requestedLength, + final TracingContext tracingContext) { + // TODO: To be implemented + } + + /** + * {@inheritDoc} + */ + @Override + public int getBlock(final AbfsInputStream stream, + final long position, + final int length, + final byte[] buffer) throws IOException { + // TODO: To be implemented + return 0; + } + + /** + * {@inheritDoc} + */ + @Override + public ReadBuffer getNextBlockToRead() throws InterruptedException { + // TODO: To be implemented + return null; + } + + /** + * {@inheritDoc} + */ + @Override + public void doneReading(final ReadBuffer buffer, + final ReadBufferStatus result, + final int bytesActuallyRead) { + // TODO: To be implemented + } + + /** + * {@inheritDoc} + */ + @Override + public void purgeBuffersForStream(final AbfsInputStream stream) { + // TODO: To be implemented + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public int getNumBuffers() { + return numberOfActiveBuffers; + } + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public void callTryEvict() { + // TODO: To be implemented + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public void testResetReadBufferManager() { + // TODO: To be implemented + } + + /** + * {@inheritDoc} + */ + @VisibleForTesting + @Override + public void testResetReadBufferManager(final int readAheadBlockSize, + final int thresholdAgeMilliseconds) { + // TODO: To be implemented + } + + /** + * {@inheritDoc} + */ + @Override + public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) { + // TODO: To be implemented + } + + private final ThreadFactory namedThreadFactory = new ThreadFactory() { + private int count = 0; + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "ReadAheadV2-Thread-" + count++); + } + }; + + @Override + void resetBufferManager() { + setBufferManager(null); // reset the singleton instance + } + + private static void setBufferManager(ReadBufferManagerV2 manager) { + bufferManager = manager; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index a30f06261ef6f..79d5eef955a4a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -28,9 +28,11 @@ class ReadBufferWorker implements Runnable { protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1); private int id; + private ReadBufferManager bufferManager; - ReadBufferWorker(final int id) { + ReadBufferWorker(final int id, final ReadBufferManager bufferManager) { this.id = id; + this.bufferManager = bufferManager; } /** @@ -51,7 +53,6 @@ public void run() { } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); ReadBuffer buffer; while (true) { try { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java index dd32643063a3a..78cd6bd9d6ac8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java @@ -26,7 +26,6 @@ import org.assertj.core.api.Assertions; import org.mockito.Mockito; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; @@ -43,8 +42,11 @@ import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; import static java.net.HttpURLConnection.HTTP_NOT_FOUND; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -139,7 +141,7 @@ private AzureBlobFileSystem getNewFSWithHnsConf( this.getAccountName()), isNamespaceEnabledAccount); rawConfig .setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); - rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, + rawConfig.set(FS_DEFAULT_NAME_KEY, getNonExistingUrl()); return (AzureBlobFileSystem) FileSystem.get(rawConfig); } @@ -315,29 +317,29 @@ public void testAccountSpecificConfig() throws Exception { rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_KEY, testAccountName), dummyAcountKey); rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_KEY, otherAccountName), dummyAcountKey); // Assert that account specific config takes precedence - rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri); + rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri); assertFileSystemInitWithExpectedHNSSettings(rawConfig, false); // Assert that other account still uses account agnostic config - rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, otherUri); + rawConfig.set(FS_DEFAULT_NAME_KEY, otherUri); assertFileSystemInitWithExpectedHNSSettings(rawConfig, true); // Set only the account specific config for test account rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, testAccountName), FALSE_STR); rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED); // Assert that only account specific config is enough for test account - rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri); + rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri); assertFileSystemInitWithExpectedHNSSettings(rawConfig, false); // Set only account agnostic config rawConfig.set(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, FALSE_STR); rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, testAccountName)); - rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri); + rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri); assertFileSystemInitWithExpectedHNSSettings(rawConfig, false); // Unset both account specific and account agnostic config rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED); rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, testAccountName)); - rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri); + rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri); rawConfig.set(AZURE_MAX_IO_RETRIES, "0"); // Assert that file system init fails with UnknownHost exception as getAcl() is needed. try { @@ -471,10 +473,10 @@ private Configuration getConfigurationWithoutHnsConfig() { rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED); rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, this.getAccountName())); - String testAccountName = "testAccount.dfs.core.windows.net"; - String defaultUri = this.getTestUrl().replace(this.getAccountName(), testAccountName); + String defaultUri = getRawConfiguration().get(FS_DEFAULT_NAME_KEY). + replace(ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME); // Assert that account specific config takes precedence - rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri); + rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri); return rawConfig; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index a57430fa808cc..b70f36de31867 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -67,7 +67,7 @@ public ITestReadBufferManager() throws Exception { @Test public void testPurgeBufferManagerForParallelStreams() throws Exception { - describe("Testing purging of buffers from ReadBufferManager for " + describe("Testing purging of buffers from ReadBufferManagerV1 for " + "parallel input streams"); final int numBuffers = 16; final LinkedList freeList = new LinkedList<>(); @@ -99,7 +99,7 @@ public void testPurgeBufferManagerForParallelStreams() throws Exception { executorService.awaitTermination(1, TimeUnit.MINUTES); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager(); // readahead queue is empty assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy()); // verify the in progress list eventually empties out. @@ -115,7 +115,7 @@ private void assertListEmpty(String listName, List list) { @Test public void testPurgeBufferManagerForSequentialStream() throws Exception { - describe("Testing purging of buffers in ReadBufferManager for " + describe("Testing purging of buffers in ReadBufferManagerV1 for " + "sequential input streams"); AzureBlobFileSystem fs = getABFSWithReadAheadConfig(); final String fileName = methodName.getMethodName(); @@ -131,7 +131,7 @@ public void testPurgeBufferManagerForSequentialStream() throws Exception { } finally { IOUtils.closeStream(iStream1); } - ReadBufferManager bufferManager = ReadBufferManager.getBufferManager(); + ReadBufferManagerV1 bufferManager = ReadBufferManagerV1.getBufferManager(); AbfsInputStream iStream2 = null; try { iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index f0d2ad6eeccf5..e64178f0a52ac 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -107,7 +107,7 @@ public class TestAbfsInputStream extends @Override public void teardown() throws Exception { super.teardown(); - ReadBufferManager.getBufferManager().testResetReadBufferManager(); + getBufferManager().testResetReadBufferManager(); } private AbfsRestOperation getMockRestOp() { @@ -182,12 +182,12 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient, private void queueReadAheads(AbfsInputStream inputStream) { // Mimic AbfsInputStream readAhead queue requests - ReadBufferManager.getBufferManager() + getBufferManager() .queueReadAhead(inputStream, 0, ONE_KB, inputStream.getTracingContext()); - ReadBufferManager.getBufferManager() + getBufferManager() .queueReadAhead(inputStream, ONE_KB, ONE_KB, inputStream.getTracingContext()); - ReadBufferManager.getBufferManager() + getBufferManager() .queueReadAhead(inputStream, TWO_KB, TWO_KB, inputStream.getTracingContext()); } @@ -205,15 +205,15 @@ private void verifyReadCallCount(AbfsClient client, int count) private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException) throws Exception { // Sleep for the eviction threshold time - Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000); + Thread.sleep(getBufferManager().getThresholdAgeMilliseconds() + 1000); // Eviction is done only when AbfsInputStream tries to queue new items. // 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer // will get evicted (considering there could be other tests running in parallel), // call tryEvict for the number of items that are there in completedReadList. - int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize(); + int numOfCompletedReadListItems = getBufferManager().getCompletedReadListSize(); while (numOfCompletedReadListItems > 0) { - ReadBufferManager.getBufferManager().callTryEvict(); + getBufferManager().callTryEvict(); numOfCompletedReadListItems--; } @@ -228,7 +228,7 @@ private void checkEvictedStatus(AbfsInputStream inputStream, int position, boole public TestAbfsInputStream() throws Exception { super(); // Reduce thresholdAgeMilliseconds to 3 sec for the tests - ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); + getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD); } private void writeBufferToNewFile(Path testFile, byte[] buffer) throws IOException { @@ -382,7 +382,7 @@ public void testFailedReadAhead() throws Exception { public void testFailedReadAheadEviction() throws Exception { AbfsClient client = getMockAbfsClient(); AbfsRestOperation successOp = getMockRestOp(); - ReadBufferManager.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD); + getBufferManager().setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD); // Stub : // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // Actual read request fails with the failure in readahead thread @@ -397,7 +397,8 @@ public void testFailedReadAheadEviction() throws Exception { // Add a failed buffer to completed queue and set to no free buffers to read ahead. ReadBuffer buff = new ReadBuffer(); buff.setStatus(ReadBufferStatus.READ_FAILED); - ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff); + buff.setStream(inputStream); + getBufferManager().testMimicFullUseAndAddFailedBuffer(buff); // if read failed buffer eviction is tagged as a valid eviction, it will lead to // wrong assumption of queue logic that a buffer is freed up and can lead to : @@ -405,7 +406,7 @@ public void testFailedReadAheadEviction() throws Exception { // at java.util.Stack.peek(Stack.java:102) // at java.util.Stack.pop(Stack.java:84) // at org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.queueReadAhead - ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0, ONE_KB, + getBufferManager().queueReadAhead(inputStream, 0, ONE_KB, getTestTracingContext(getFileSystem(), true)); } @@ -447,7 +448,7 @@ public void testOlderReadAheadFailure() throws Exception { verifyReadCallCount(client, 3); // Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old. - Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); + Thread.sleep(getBufferManager().getThresholdAgeMilliseconds()); // Second read request should retry the read (and not issue any new readaheads) inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB); @@ -492,7 +493,7 @@ public void testSuccessfulReadAhead() throws Exception { any(String.class), any(), any(TracingContext.class)); AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); - int beforeReadCompletedListSize = ReadBufferManager.getBufferManager().getCompletedReadListSize(); + int beforeReadCompletedListSize = getBufferManager().getCompletedReadListSize(); // First read request that triggers readAheads. inputStream.read(new byte[ONE_KB]); @@ -500,7 +501,7 @@ public void testSuccessfulReadAhead() throws Exception { // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); int newAdditionsToCompletedRead = - ReadBufferManager.getBufferManager().getCompletedReadListSize() + getBufferManager().getCompletedReadListSize() - beforeReadCompletedListSize; // read buffer might be dumped if the ReadBufferManager getblock preceded // the action of buffer being picked for reading from readaheadqueue, so that @@ -548,7 +549,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() throws Exception { any(TracingContext.class)); final ReadBufferManager readBufferManager - = ReadBufferManager.getBufferManager(); + = getBufferManager(); final int readBufferTotal = readBufferManager.getNumBuffers(); final int expectedFreeListBufferCount = readBufferTotal @@ -625,7 +626,7 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { // if readAhead failed for specific offset, getBlock should // throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec intercept(IOException.class, - () -> ReadBufferManager.getBufferManager().getBlock( + () -> getBufferManager().getBlock( inputStream, 0, ONE_KB, @@ -673,14 +674,14 @@ public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for thresholdAgeMilliseconds so that // read buffer qualifies for to be an old buffer - Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); + Thread.sleep(getBufferManager().getThresholdAgeMilliseconds()); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); // getBlock from a new read request should return 0 if there is a failure // 30 sec before in read ahead buffer for respective offset. - int bytesRead = ReadBufferManager.getBufferManager().getBlock( + int bytesRead = getBufferManager().getBlock( inputStream, ONE_KB, ONE_KB, @@ -733,7 +734,7 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { verifyReadCallCount(client, 3); // getBlock for a new read should return the buffer read-ahead - int bytesRead = ReadBufferManager.getBufferManager().getBlock( + int bytesRead = getBufferManager().getBlock( inputStream, ONE_KB, ONE_KB, @@ -1047,7 +1048,7 @@ public AbfsInputStream testReadAheadConfigs(int readRequestSize, .describedAs("Unexpected AlwaysReadBufferSize settings") .isEqualTo(alwaysReadBufferSizeEnabled); - Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize()) + Assertions.assertThat(getBufferManager().getReadAheadBlockSize()) .describedAs("Unexpected readAhead block size") .isEqualTo(readAheadBlockSize); @@ -1115,10 +1116,14 @@ private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize, } private void resetReadBufferManager(int bufferSize, int threshold) { - ReadBufferManager.getBufferManager() + getBufferManager() .testResetReadBufferManager(bufferSize, threshold); // Trigger GC as aggressive recreation of ReadBufferManager buffers // by successive tests can lead to OOM based on the dev VM/machine capacity. System.gc(); } -} \ No newline at end of file + + private ReadBufferManager getBufferManager() { + return ReadBufferManagerV1.getBufferManager(); + } +}