Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
private int prefetchRequestPriorityValue;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_POLICY,
DefaultValue = DEFAULT_AZURE_READ_POLICY)
private String abfsReadPolicy;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1381,6 +1385,14 @@ public String getPrefetchRequestPriorityValue() {
return Integer.toString(prefetchRequestPriorityValue);
}

/**
* Get the ABFS read policy set by user.
* @return the ABFS read policy.
*/
public String getAbfsReadPolicy() {
return abfsReadPolicy;
}

/**
* Enum config to allow user to pick format of x-ms-client-request-id header
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
Expand Down Expand Up @@ -2079,6 +2091,15 @@ public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled)
this.isChecksumValidationEnabled = isChecksumValidationEnabled;
}

/**
* Sets the ABFS read policy for testing purposes.
* @param readPolicy the read policy to set.
*/
@VisibleForTesting
public void setAbfsReadPolicy(String readPolicy) {
abfsReadPolicy = readPolicy;
}

public boolean isFullBlobChecksumValidationEnabled() {
return isFullBlobChecksumValidationEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
Expand All @@ -90,13 +89,15 @@
import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
import org.apache.hadoop.fs.azurebfs.services.AbfsAdaptiveInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientContextBuilder;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientRenameResult;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsReadPolicy;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
Expand All @@ -107,10 +108,13 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfInfo;
import org.apache.hadoop.fs.azurebfs.services.AbfsPerfTracker;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsPrefetchInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRandomInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.ListingSupport;
import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials;
import org.apache.hadoop.fs.azurebfs.services.StaticRetryPolicy;
import org.apache.hadoop.fs.azurebfs.services.TailLatencyRequestTimeoutRetryPolicy;
Expand Down Expand Up @@ -946,8 +950,37 @@ public AbfsInputStream openFileForRead(Path path,

perfInfo.registerSuccess(true);

// Add statistics for InputStream
return new AbfsInputStream(getClient(), statistics, relativePath,
return getRelevantInputStream(statistics, relativePath, contentLength,
parameters, contextEncryptionAdapter, eTag, tracingContext);
}
}

private AbfsInputStream getRelevantInputStream(final FileSystem.Statistics statistics,
final String relativePath,
final long contentLength,
final Optional<OpenFileParameters> parameters,
final ContextEncryptionAdapter contextEncryptionAdapter,
final String eTag,
TracingContext tracingContext) {
AbfsReadPolicy inputPolicy = AbfsReadPolicy.getAbfsReadPolicy(getAbfsConfiguration().getAbfsReadPolicy());
switch (inputPolicy) {
case SEQUENTIAL:
return new AbfsPrefetchInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
eTag, tracingContext);

case RANDOM:
return new AbfsRandomInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
eTag, tracingContext);

case ADAPTIVE:
default:
return new AbfsAdaptiveInputStream(getClient(), statistics, relativePath,
contentLength, populateAbfsInputStreamContext(
parameters.map(OpenFileParameters::getOptions),
contextEncryptionAdapter),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Options.OpenFileOptions;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT;

Expand Down Expand Up @@ -215,6 +216,12 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ALWAYS_READ_BUFFER_SIZE = "fs.azure.read.alwaysReadBufferSize";
public static final String FS_AZURE_READ_AHEAD_BLOCK_SIZE = "fs.azure.read.readahead.blocksize";
/**
* Provides hint for the read workload pattern.
* Possible Values Exposed in {@link OpenFileOptions}
*/
public static final String FS_AZURE_READ_POLICY = "fs.azure.read.policy";

/** Provides a config control to enable or disable ABFS Flush operations -
* HFlush and HSync. Default is true. **/
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;

/**
Expand Down Expand Up @@ -108,6 +109,7 @@ public final class FileSystemConfigurations {
public static final long MAX_AZURE_BLOCK_SIZE = 256 * 1024 * 1024L; // changing default abfs blocksize to 256MB
public static final String AZURE_BLOCK_LOCATION_HOST_DEFAULT = "localhost";
public static final int DEFAULT_AZURE_LIST_MAX_RESULTS = 5000;
public static final String DEFAULT_AZURE_READ_POLICY = FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;

public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "AES256";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public enum ReadType {
* Only triggered when small file read optimization kicks in.
*/
SMALLFILE_READ("SR"),
/**
* Reads from Random Input Stream with read ahead up to readAheadRange
*/
RANDOM_READ("RR"),
/**
* None of the above read types were applicable.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,22 @@
*/
public class HttpResponseException extends IOException {
private final HttpResponse httpResponse;

/**
* Constructor for HttpResponseException.
* @param s the exception message
* @param httpResponse the HttpResponse object
*/
public HttpResponseException(final String s, final HttpResponse httpResponse) {
super(s);
Objects.requireNonNull(httpResponse, "httpResponse should be non-null");
this.httpResponse = httpResponse;
}

/**
* Gets the HttpResponse associated with this exception.
* @return the HttpResponse
*/
public HttpResponse getHttpResponse() {
return httpResponse;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.constants.ReadType;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;

/**
* Input stream implementation optimized for adaptive read patterns.
* This is the default implementation used for cases where user does not specify any input policy.
* It switches between sequential and random read optimizations based on the detected read pattern.
* It also keeps footer read and small file optimizations enabled.
*/
public class AbfsAdaptiveInputStream extends AbfsInputStream {

/**
* Constructs AbfsAdaptiveInputStream instance.
* @param client to be used for read operations
* @param statistics to record input stream statistics
* @param path file path
* @param contentLength file content length
* @param abfsInputStreamContext input stream context
* @param eTag file eTag
* @param tracingContext tracing context to trace the read operations
*/
public AbfsAdaptiveInputStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: needs javadoc comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

final AbfsClient client,
final FileSystem.Statistics statistics,
final String path,
final long contentLength,
final AbfsInputStreamContext abfsInputStreamContext,
final String eTag,
TracingContext tracingContext) {
super(client, statistics, path, contentLength,
abfsInputStreamContext, eTag, tracingContext);
}

/**
* {@inheritDoc}
*/
@Override
protected int readOneBlock(final byte[] b, final int off, final int len) throws IOException {
if (len == 0) {
return 0;
}
if (!validate(b, off, len)) {
return -1;
}
// If buffer is empty, then fill the buffer.
if (getBCursor() == getLimit()) {
// If EOF, then return -1
if (getFCursor() >= getContentLength()) {
return -1;
}

long bytesRead = 0;
// reset buffer to initial state - i.e., throw away existing data
setBCursor(0);
setLimit(0);
if (getBuffer() == null) {
LOG.debug("created new buffer size {}", getBufferSize());
setBuffer(new byte[getBufferSize()]);
}

// Reset Read Type back to normal and set again based on code flow.
getTracingContext().setReadType(ReadType.NORMAL_READ);
if (shouldAlwaysReadBufferSize()) {
bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false);
} else {
// Enable readAhead when reading sequentially
if (-1 == getFCursorAfterLastRead() || getFCursorAfterLastRead() == getFCursor() || b.length >= getBufferSize()) {
LOG.debug("Sequential read with read ahead size of {}", getBufferSize());
bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false);
} else {
/*
* Disable queuing prefetches when random read pattern detected.
* Instead, read ahead only for readAheadRange above what is asked by caller.
*/
getTracingContext().setReadType(ReadType.RANDOM_READ);
int lengthWithReadAhead = Math.min(b.length + getReadAheadRange(), getBufferSize());
LOG.debug("Random read with read ahead size of {}", lengthWithReadAhead);
bytesRead = readInternal(getFCursor(), getBuffer(), 0, lengthWithReadAhead, true);
}
}
if (isFirstRead()) {
setFirstRead(false);
}
if (bytesRead == -1) {
return -1;
}

setLimit(getLimit() + (int) bytesRead);
setFCursor(getFCursor() + bytesRead);
setFCursorAfterLastRead(getFCursor());
}
return copyToUserBuffer(b, off, len);
}
}
Loading
Loading