diff --git a/hadoop-tools/hadoop-azure/dev-support/testrun-scripts/runtests.sh b/hadoop-tools/hadoop-azure/dev-support/testrun-scripts/runtests.sh index a0039ced06d70..88c50173a860c 100755 --- a/hadoop-tools/hadoop-azure/dev-support/testrun-scripts/runtests.sh +++ b/hadoop-tools/hadoop-azure/dev-support/testrun-scripts/runtests.sh @@ -48,14 +48,6 @@ runHNSSharedKeyDFSTest() triggerRun "HNS-SharedKey-DFS" "$accountName" "$runTest" $processCount "$cleanUpTestContainers" } -runNonHNSSharedKeyDFSTest() -{ - accountName=$(xmlstarlet sel -t -v '//property[name = "fs.azure.nonHnsTestAccountName"]/value' -n $azureTestXmlPath) - PROPERTIES=("fs.azure.account.auth.type") - VALUES=("SharedKey") - triggerRun "NonHNS-SharedKey-DFS" "$accountName" "$runTest" $processCount "$cleanUpTestContainers" -} - runAppendBlobHNSOAuthDFSTest() { accountName=$(xmlstarlet sel -t -v '//property[name = "fs.azure.hnsTestAccountName"]/value' -n $azureTestXmlPath) @@ -73,14 +65,6 @@ runNonHNSSharedKeyBlobTest() triggerRun "NonHNS-SharedKey-Blob" "${accountName}_blob" "$runTest" $processCount "$cleanUpTestContainers" } -runNonHNSOAuthDFSTest() -{ - accountName=$(xmlstarlet sel -t -v '//property[name = "fs.azure.nonHnsTestAccountName"]/value' -n $azureTestXmlPath) - PROPERTIES=("fs.azure.account.auth.type") - VALUES=("OAuth") - triggerRun "NonHNS-OAuth-DFS" "$accountName" "$runTest" $processCount "$cleanUpTestContainers" -} - runNonHNSOAuthBlobTest() { accountName=$(xmlstarlet sel -t -v '//property[name = "fs.azure.nonHnsTestAccountName"]/value' -n $azureTestXmlPath) @@ -107,14 +91,6 @@ runHNSOAuthDFSIngressBlobTest() triggerRun "HNS-Oauth-DFS-IngressBlob" "$accountName" "$runTest" $processCount "$cleanUpTestContainers" } -runNonHNSOAuthDFSIngressBlobTest() -{ - accountName=$(xmlstarlet sel -t -v '//property[name = "fs.azure.nonHnsTestAccountName"]/value' -n $azureTestXmlPath) - PROPERTIES=("fs.azure.account.auth.type" "fs.azure.ingress.service.type") - VALUES=("OAuth" "blob") - triggerRun "NonHNS-OAuth-DFS-IngressBlob" "$accountName" "$runTest" $processCount "$cleanUpTestContainers" -} - runTest=false cleanUpTestContainers=false echo 'Ensure below are complete before running script:' @@ -181,7 +157,7 @@ done echo ' ' echo 'Set the active test combination to run the action:' -select combo in HNS-OAuth-DFS HNS-SharedKey-DFS NonHNS-SharedKey-DFS AppendBlob-HNS-OAuth-DFS NonHNS-SharedKey-Blob NonHNS-OAuth-DFS NonHNS-OAuth-Blob AppendBlob-NonHNS-OAuth-Blob HNS-Oauth-DFS-IngressBlob NonHNS-Oauth-DFS-IngressBlob AllCombinationsTestRun Quit +select combo in HNS-OAuth-DFS HNS-SharedKey-DFS AppendBlob-HNS-OAuth-DFS NonHNS-SharedKey-Blob NonHNS-OAuth-Blob AppendBlob-NonHNS-OAuth-Blob HNS-Oauth-DFS-IngressBlob AllCombinationsTestRun Quit do case $combo in HNS-OAuth-DFS) @@ -192,10 +168,6 @@ do runHNSSharedKeyDFSTest break ;; - NonHNS-SharedKey-DFS) - runNonHNSSharedKeyDFSTest - break - ;; AppendBlob-HNS-OAuth-DFS) runAppendBlobHNSOAuthDFSTest break @@ -204,10 +176,6 @@ do runNonHNSSharedKeyBlobTest break ;; - NonHNS-OAuth-DFS) - runNonHNSOAuthDFSTest - break - ;; NonHNS-OAuth-Blob) runNonHNSOAuthBlobTest break @@ -220,10 +188,6 @@ do runHNSOAuthDFSIngressBlobTest break ;; - NonHNS-Oauth-DFS-IngressBlob) - runNonHNSOAuthDFSIngressBlobTest - break - ;; AllCombinationsTestRun) if [ $runTest == false ] then @@ -232,14 +196,11 @@ do fi runHNSOAuthDFSTest runHNSSharedKeyDFSTest - runNonHNSSharedKeyDFSTest runAppendBlobHNSOAuthDFSTest runNonHNSSharedKeyBlobTest - runNonHNSOAuthDFSTest runNonHNSOAuthBlobTest runAppendBlobNonHNSOAuthBlobTest runHNSOAuthDFSIngressBlobTest - runNonHNSOAuthDFSIngressBlobTest break ;; Quit) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index e7591292c919a..080240d9587c4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -80,7 +80,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*; -import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE; /** * Configuration for Azure Blob FileSystem. @@ -93,7 +92,7 @@ public class AbfsConfiguration{ private final String accountName; private String fsName; // Service type identified from URL used to initialize FileSystem. - private final AbfsServiceType fsConfiguredServiceType; + private AbfsServiceType fsConfiguredServiceTypeFromUrl; private final boolean isSecure; private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class); private Trilean isNamespaceEnabled = null; @@ -663,18 +662,18 @@ public class AbfsConfiguration{ * Constructor for AbfsConfiguration for specified service type. * @param rawConfig used to initialize the configuration. * @param accountName the name of the azure storage account. - * @param fsConfiguredServiceType service type configured for the file system. + * @param fsConfiguredServiceTypeFromUrl service type configured for the file system. * @throws IllegalAccessException if the field is not accessible. * @throws IOException if an I/O error occurs. */ public AbfsConfiguration(final Configuration rawConfig, String accountName, - AbfsServiceType fsConfiguredServiceType) + AbfsServiceType fsConfiguredServiceTypeFromUrl) throws IllegalAccessException, IOException { this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders( rawConfig, AzureBlobFileSystem.class); this.accountName = accountName; - this.fsConfiguredServiceType = fsConfiguredServiceType; + this.fsConfiguredServiceTypeFromUrl = fsConfiguredServiceTypeFromUrl; this.isSecure = getBoolean(FS_AZURE_SECURE_MODE, false); Field[] fields = this.getClass().getDeclaredFields(); @@ -701,16 +700,16 @@ public AbfsConfiguration(final Configuration rawConfig, * @param rawConfig used to initialize the configuration. * @param accountName the name of the azure storage account. * @param fsName the name of the file system (container name). - * @param fsConfiguredServiceType service type configured for the file system. + * @param fsConfiguredServiceTypeFromUrl service type configured for the file system. * @throws IllegalAccessException if the field is not accessible. * @throws IOException if an I/O error occurs. */ public AbfsConfiguration(final Configuration rawConfig, String accountName, String fsName, - AbfsServiceType fsConfiguredServiceType) + AbfsServiceType fsConfiguredServiceTypeFromUrl) throws IllegalAccessException, IOException { - this(rawConfig, accountName, fsConfiguredServiceType); + this(rawConfig, accountName, fsConfiguredServiceTypeFromUrl); this.fsName = fsName; } @@ -749,7 +748,16 @@ public Trilean getIsNamespaceEnabledAccount() { * @return the service type. */ public AbfsServiceType getFsConfiguredServiceType() { - return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceType); + return getCaseInsensitiveEnum(FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, fsConfiguredServiceTypeFromUrl); + } + + /** + * Returns the service type identified from the URL used to initialize the FileSystem. + * + * @return the configured AbfsServiceType from the URL + */ + public AbfsServiceType getFsConfiguredServiceTypeFromUrl() { + return fsConfiguredServiceTypeFromUrl; } /** @@ -790,13 +798,9 @@ public void validateConfiguredServiceType(boolean isHNSEnabled) if (isHNSEnabled && getConfiguredServiceTypeForFNSAccounts() == AbfsServiceType.BLOB) { throw new InvalidConfigurationValueException( FS_AZURE_FNS_ACCOUNT_SERVICE_TYPE, "Service Type Cannot be BLOB for HNS Account"); - } else if (isHNSEnabled && fsConfiguredServiceType == AbfsServiceType.BLOB) { + } else if (isHNSEnabled && fsConfiguredServiceTypeFromUrl == AbfsServiceType.BLOB) { throw new InvalidConfigurationValueException(FS_DEFAULT_NAME_KEY, "Blob Endpoint Url Cannot be used to initialize filesystem for HNS Account"); - } else if (getFsConfiguredServiceType() == AbfsServiceType.BLOB - && getIngressServiceType() == AbfsServiceType.DFS) { - throw new InvalidConfigurationValueException( - FS_AZURE_INGRESS_SERVICE_TYPE, INCORRECT_INGRESS_TYPE); } } @@ -1799,6 +1803,14 @@ void setReadAheadEnabled(final boolean enabledReadAhead) { this.enabledReadAhead = enabledReadAhead; } + /** + * Sets the configured service type. + * Used to update the service type identified from the URL. + */ + void setFsConfiguredServiceType(AbfsServiceType serviceType) { + this.fsConfiguredServiceTypeFromUrl = serviceType; + } + public int getReadAheadRange() { return this.readAheadRange; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 572bc873b1c2e..e41f73af77635 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -129,6 +129,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FSOperationType.CREATE_FILESYSTEM; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_INVALID_ABFS_STATE; @@ -315,6 +316,18 @@ public void initialize(URI uri, Configuration configuration) throw new InvalidConfigurationValueException(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, ex); } + /* + * For FNS accounts, restrict the endpoint and service type to Blob + * For FNS-DFS, also update the tracing context to add metric to show endpoint conversion. + */ + if (!tryGetIsNamespaceEnabled(new TracingContext(initFSTracingContext))) { + LOG.debug("FNS account detected; restricting service type to Blob."); + abfsStore.restrictServiceTypeToBlob(); + if (uri.toString().contains(ABFS_DFS_DOMAIN_NAME)) { + initFSTracingContext.setFNSEndpointConverted(); + } + } + // Create the file system if it does not exist. if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { TracingContext createFSTracingContext = new TracingContext(initFSTracingContext); @@ -1861,4 +1874,3 @@ public IOStatistics getIOStatistics() { return abfsCounters != null ? abfsCounters.getIOStatistics() : null; } } - diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 5d7d0895d0223..93f1cfb436c9e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -798,9 +798,11 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) { int bufferSize = abfsConfiguration.getWriteBufferSize(); + if (isAppendBlob && bufferSize > FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE) { bufferSize = FileSystemConfigurations.APPENDBLOB_MAX_WRITE_BUFFER_SIZE; } + return new AbfsOutputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds()) .withWriteBufferSize(bufferSize) .enableExpectHeader(abfsConfiguration.isExpectHeaderEnabled()) @@ -824,7 +826,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext( .withWriteThreadPoolManager(writeThreadPoolSizeManager) .withTracingContext(tracingContext) .withAbfsBackRef(fsBackRef) - .withIngressServiceType(abfsConfiguration.getIngressServiceType()) + .withIngressServiceType(clientHandler.getIngressServiceType()) .withDFSToBlobFallbackEnabled(abfsConfiguration.isDfsToBlobFallbackEnabled()) .withETag(eTag) .build(); @@ -1906,6 +1908,17 @@ private AbfsPerfInfo startTracking(String callerName, String calleeName) { return new AbfsPerfInfo(abfsPerfTracker, callerName, calleeName); } + /** + * Restricts all service types to BLOB when FNS account detected + * Updates the client to reflect the new default service type. + */ + public void restrictServiceTypeToBlob() { + clientHandler.setDefaultServiceType(AbfsServiceType.BLOB); + clientHandler.setIngressServiceType(AbfsServiceType.BLOB); + getAbfsConfiguration().setFsConfiguredServiceType(AbfsServiceType.BLOB); + this.client = clientHandler.getClient(); + } + /** * Permissions class contain provided permission and umask in octalNotation. * If the object is created for namespace-disabled account, the permission and diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java index 784365b4c9cf5..ca362e02faf3e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java @@ -42,6 +42,14 @@ public class MsiTokenProvider extends AccessTokenProvider { private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + /** + * Constructs an MsiTokenProvider. + * + * @param authEndpoint the authentication endpoint for MSI + * @param tenantGuid the tenant GUID + * @param clientId the client ID for MSI + * @param authority the authority URL + */ public MsiTokenProvider(final String authEndpoint, final String tenantGuid, final String clientId, final String authority) { this.authEndpoint = authEndpoint; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 18e8183754d5c..9260662c57c89 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -1373,7 +1373,8 @@ String initializeUserAgent(final AbfsConfiguration abfsConfiguration, // Add a unique identifier in FNS-Blob user agent string // Current filesystem init restricts HNS-Blob combination // so namespace check not required. - if (abfsConfiguration.getFsConfiguredServiceType() == BLOB) { + // We need to rely on URL check to identify Blob service instead of user config + if (abfsConfiguration.getFsConfiguredServiceTypeFromUrl() == BLOB) { sb.append(SEMICOLON) .append(SINGLE_WHITE_SPACE) .append(FNS_BLOB_USER_AGENT_IDENTIFIER); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java index 393811c256bdd..26c18a41459c6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java @@ -91,6 +91,33 @@ private void initServiceType(final AbfsConfiguration abfsConfiguration) { this.ingressServiceType = abfsConfiguration.getIngressServiceType(); } + /** + * Sets the default service type. + * + * @param defaultServiceType the service type to set as default + */ + public void setDefaultServiceType(AbfsServiceType defaultServiceType) { + this.defaultServiceType = defaultServiceType; + } + + /** + * Sets the ingress service type. + * + * @param ingressServiceType the ingress service type + */ + public void setIngressServiceType(AbfsServiceType ingressServiceType) { + this.ingressServiceType = ingressServiceType; + } + + /** + * Gets the default ingress service type. + * + * @return the default ingress service type + */ + public AbfsServiceType getIngressServiceType() { + return ingressServiceType; + } + /** * Get the AbfsClient based on the default service type. * @return AbfsClient diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 6c68782c39821..883af56c5892d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -320,7 +320,11 @@ private AzureIngressHandler createNewHandler(AbfsServiceType serviceType, boolean isSwitch, AzureBlockManager blockManager) throws IOException { this.client = clientHandler.getClient(serviceType); - if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != AbfsServiceType.DFS) { + + // Check ingress service type is also set to DFS along with enabling the config for fallback + // Separate ingress service type is only allowed for HNS accounts + if (isDFSToBlobFallbackEnabled && client.getIsNamespaceEnabled() + && serviceTypeAtInit != AbfsServiceType.DFS) { throw new InvalidConfigurationValueException( "The ingress service type must be configured as DFS"); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java index 87f212034b7ad..05c587acc441b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java @@ -35,6 +35,13 @@ public class ListResponseData { private AbfsRestOperation executedRestOperation; private String continuationToken; + /** + * Default constructor for ListResponseData. + */ + public ListResponseData() { + // do nothing + } + /** * Returns the list of VersionedFileStatus objects. * @return the list of VersionedFileStatus objects diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java index f3c08c4a30036..2dc137f7a5082 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListingSupport.java @@ -29,6 +29,10 @@ @InterfaceAudience.Private @InterfaceStability.Unstable + +/** + * Interface for listing support in Azure Blob File System. + */ public interface ListingSupport { /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java index 859b9474cfe1a..5f5cd355ca4a5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/Listener.java @@ -32,6 +32,7 @@ public interface Listener { Listener getClone(); void setOperation(FSOperationType operation); void updateIngressHandler(String ingressHandler); + void updateFNSEndpointConverted(); void updatePosition(String position); void updateReadType(ReadType readType); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 8decba90b9f37..ff3ab405959af 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -67,6 +67,9 @@ public class TracingContext { //final concatenated ID list set into x-ms-client-request-id header private String header = EMPTY_STRING; private String ingressHandler = EMPTY_STRING; + private Boolean fnsEndpointConverted = false; + // Represents endpoint was converted to Blob for FNS; "T" stands for "True" + private String fnsEndptConvertedIndicator = "T"; private String position = EMPTY_STRING; // position of read/write in remote file private String metricResults = EMPTY_STRING; private ReadType readType = ReadType.UNKNOWN_READ; @@ -148,6 +151,7 @@ public TracingContext(TracingContext originalTracingContext) { this.format = originalTracingContext.format; this.position = originalTracingContext.getPosition(); this.ingressHandler = originalTracingContext.getIngressHandler(); + this.fnsEndpointConverted = originalTracingContext.fnsEndpointConverted; this.operatedBlobCount = originalTracingContext.operatedBlobCount; if (originalTracingContext.listener != null) { this.listener = originalTracingContext.listener.getClone(); @@ -212,6 +216,7 @@ public void setListener(Listener listener) { *
  • operatedBlobCount - number of blobs operated on by this request
  • *
  • operationSpecificHeader - different operation types can publish info relevant to that operation
  • *
  • httpOperationHeader - suffix for network library used
  • + *
  • fnsEndpointConverted - if endpoint was converted to Blob for FNS accounts
  • * * @param httpOperation AbfsHttpOperation instance to set header into * connection @@ -237,7 +242,8 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail + operatedBlobCount + COLON + getOperationSpecificHeader(opType) + COLON + httpOperation.getTracingContextSuffix() + COLON - + metricResults + COLON + resourceUtilizationMetricResults; + + metricResults + COLON + resourceUtilizationMetricResults + COLON + + (fnsEndpointConverted ? fnsEndptConvertedIndicator : EMPTY_STRING); break; case TWO_ID_FORMAT: header = TracingHeaderVersion.getCurrentVersion() + COLON @@ -371,6 +377,17 @@ public void setIngressHandler(final String ingressHandler) { } } +/** + * Marks that the endpoint was force converted to Blob for FNS account + * Sets the fnsEndpointConverted flag to true and notifies the listener if present. + */ + public void setFNSEndpointConverted() { + this.fnsEndpointConverted = true; + if (listener != null) { + listener.updateFNSEndpointConverted(); + } + } + /** * Sets the position. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java index 6ce0299ff8931..b4b3fe4c072a8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java @@ -45,9 +45,9 @@ public enum TracingHeaderVersion { * Schema: version:clientCorrelationId:clientRequestId:fileSystemId * :primaryRequestId:streamId:opType:retryHeader:ingressHandler * :position:operatedBlobCount:operationSpecificHeader:httpOperationHeader - * :aggregatedMetrics:resourceUtilizationMetrics + * :aggregatedMetrics:resourceUtilizationMetrics:fnsEndptConvertedIndicator */ - V2("v2", 15); + V2("v2", 16); private final String versionString; private final int fieldCount; diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 5c5a134fc5bac..c482e7864456e 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -802,16 +802,15 @@ requests. User can specify them as fixed SAS Token to be used across all the req ### User-bound SAS - **Description**: The user-bound SAS auth type allows to track the usage of the SAS token generated- something that was not possible in user-delegation SAS authentication type. Reach out to us at 'askabfs@microsoft.com' for more information. - To use this authentication type, both custom SAS token provider class (that implements org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider) as - well as OAuth 2.0 provider type need to be specified. - - Refer to 'Shared Access Signature (SAS) Token Provider' section above for user-delegation SAS token provider class details and example class implementation. - - There are multiple identity configurations for OAuth settings. Listing the main ones below: +- To use this authentication type, both custom SAS token provider class as well as OAuth 2.0 provider type need to be specified. + - Read the section below for SAS Token Provider class details and example class implementation. + - There are multiple identity configurations for OAuth Token Provider settings. Listing the main ones below: - Client Credentials - Custom token provider - Managed Identity - Workload Identity - Refer to respective OAuth 2.0 sections above to correctly chose the OAuth provider type + Refer to respective OAuth 2.0 sections above to correctly choose the OAuth provider type - NOTE: User-bound SAS Authentication is **only supported** with HNS Enabled accounts. - **Configuration**: To use this method with ABFS Driver, specify the following properties in your `core-site.xml` file: @@ -838,6 +837,57 @@ requests. User can specify them as fixed SAS Token to be used across all the req ``` + - ABFS allows you to implement your custom SAS Token Provider. The declared class must implement + `org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider`. + ABFS Hadoop Driver provides + a [MockUserBoundSASTokenProvider](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockUserBoundSASTokenProvider.java) + implementation that can be used as an example on how to implement your own custom + SASTokenProvider. This requires the Application credentials to be specifed using + the following configurations apart from above three: + 4. Delegator App Service Principal Tenant Id: + ```xml + + fs.azure.test.app.service.principal.tenant.id + DELEGATOR_TENANT_ID + + ``` + 5. Delegator App Service Principal Object Id: + ```xml + + fs.azure.test.app.service.principal.object.id + DELEGATOR_OBJECT_ID + + ``` + 6. End-user App Service Principal Tenant Id: + ```xml + + fs.azure.test.end.user.tenant.id + DELEGATED_TENANT_ID + + ``` + 7. End-user App Service Principal Object Id: + ```xml + + fs.azure.test.end.user.object.id + DELEGATED_OBJECT_ID + + ``` + 8. Delegator App Id: + ```xml + + fs.azure.test.app.id + APPLICATION_ID + + ``` + 9. Delegator App Secret: + ```xml + + fs.azure.test.app.secret + APPLICATION_SECRET + + ``` + - Add all additional configs required by the chosen OAuth Token provider from the sections above as well. + ## Technical notes ### Proxy setup diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 557333ff0fd9a..76bdaaba467aa 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -223,9 +223,7 @@ public void testCreateOverDfsAppendOverBlob() throws IOException { createPath(makeQualified(testPath).toUri().getPath(), true, false, permissions, false, null, null, getTestTracingContext(fs, true)); - fs.getAbfsStore() - .getAbfsConfiguration() - .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name()); + fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.BLOB); FSDataOutputStream outputStream = fs.append(testPath); AzureIngressHandler ingressHandler = ((AbfsOutputStream) outputStream.getWrappedStream()).getIngressHandler(); @@ -262,9 +260,7 @@ public void testMultipleAppendsQualifyForSwitch() throws Exception { createPath(makeQualified(testPath).toUri().getPath(), true, false, permissions, false, null, null, getTestTracingContext(fs, true)); - fs.getAbfsStore() - .getAbfsConfiguration() - .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name()); + fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.BLOB); ExecutorService executorService = Executors.newFixedThreadPool(5); List> futures = new ArrayList<>(); @@ -328,9 +324,7 @@ public void testParallelWritesOnDfsAndBlob() throws Exception { createPath(makeQualified(testPath).toUri().getPath(), true, false, permissions, false, null, null, getTestTracingContext(fs, true)); - fs.getAbfsStore() - .getAbfsConfiguration() - .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name()); + fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.BLOB); FSDataOutputStream out1 = fs.create(testPath); fs.getAbfsStore().getClientHandler().getDfsClient(). createPath(makeQualified(testPath1).toUri().getPath(), true, false, @@ -389,10 +383,7 @@ public void testCreateOverBlobAppendOverDfs() throws IOException { fs.getAbfsStore() .getAbfsConfiguration() .setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true); - fs.getAbfsStore() - .getAbfsConfiguration() - .set(FS_AZURE_INGRESS_SERVICE_TYPE, - String.valueOf(AbfsServiceType.DFS)); + fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.DFS); fs.getAbfsStore().getClientHandler().getBlobClient(). createPath(makeQualified(testPath).toUri().getPath(), true, false, permissions, false, null, @@ -438,10 +429,7 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs() fs.getAbfsStore() .getAbfsConfiguration() .setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true); - fs.getAbfsStore() - .getAbfsConfiguration() - .set(FS_AZURE_INGRESS_SERVICE_TYPE, - String.valueOf(AbfsServiceType.DFS)); + fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.DFS); fs.getAbfsStore().getClientHandler().getBlobClient(). createPath(makeQualified(testPath).toUri().getPath(), true, false, permissions, true, null, @@ -483,9 +471,7 @@ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob() createPath(makeQualified(testPath).toUri().getPath(), true, false, permissions, true, null, null, getTestTracingContext(fs, true)); - fs.getAbfsStore() - .getAbfsConfiguration() - .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.BLOB.name()); + fs.getAbfsStore().getClientHandler().setIngressServiceType(AbfsServiceType.BLOB); FSDataOutputStream outputStream = fs.append(testPath); AzureIngressHandler ingressHandler = ((AbfsOutputStream) outputStream.getWrappedStream()).getIngressHandler(); @@ -507,61 +493,101 @@ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob() .isInstanceOf(AbfsDfsClient.class); } - /** - * Tests the correct retrieval of the AzureIngressHandler based on the configured ingress service type. + * Validates that the correct ingress handler and client are used for the specified + * ingress service type. * - * @throws IOException if an I/O error occurs + * @param ingressServiceType the ingress service type to test (e.g., DFS or BLOB) + * @param expectedIngressHandler the expected class of the AzureIngressHandler + * @param expectedClient the expected class of the AbfsClient + * @throws IOException if an I/O error occurs during validation */ - @Test - public void testValidateIngressHandler() throws IOException { + private void validateIngressHandler(AbfsServiceType ingressServiceType, + Class expectedIngressHandler, + Class expectedClient) + throws IOException { + Configuration configuration = getRawConfiguration(); configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE, - AbfsServiceType.BLOB.name()); - try (AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( - configuration)) { + ingressServiceType.name()); + + try (AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(configuration)) { + Path testPath = path(TEST_FILE_PATH); - AzureBlobFileSystemStore.Permissions permissions - = new AzureBlobFileSystemStore.Permissions(false, - FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); - fs.getAbfsStore().getClientHandler().getBlobClient(). - createPath(makeQualified(testPath).toUri().getPath(), true, - false, - permissions, false, null, - null, getTestTracingContext(fs, true)); + AzureBlobFileSystemStore.Permissions permissions = + new AzureBlobFileSystemStore.Permissions( + false, + FsPermission.getDefault(), + FsPermission.getUMask(fs.getConf())); + + fs.getAbfsStore() + .getClientHandler() + .getBlobClient() + .createPath( + makeQualified(testPath).toUri().getPath(), + true, + false, + permissions, + false, + null, + null, + getTestTracingContext(fs, true)); + FSDataOutputStream outputStream = fs.append(testPath); - AzureIngressHandler ingressHandler - = ((AbfsOutputStream) outputStream.getWrappedStream()).getIngressHandler(); + AzureIngressHandler ingressHandler = + ((AbfsOutputStream) outputStream.getWrappedStream()) + .getIngressHandler(); + assertThat(ingressHandler) - .as("Blob Ingress handler instance is not correct") - .isInstanceOf(AzureBlobIngressHandler.class); - AbfsClient client = ingressHandler.getClient(); - assertThat(client) - .as("Blob client was not used correctly") - .isInstanceOf(AbfsBlobClient.class); - - Path testPath1 = new Path("testFile1"); - fs.getAbfsStore().getClientHandler().getBlobClient(). - createPath(makeQualified(testPath1).toUri().getPath(), true, - false, - permissions, false, null, - null, getTestTracingContext(fs, true)); - fs.getAbfsStore() - .getAbfsConfiguration() - .set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name()); - FSDataOutputStream outputStream1 = fs.append(testPath1); - AzureIngressHandler ingressHandler1 - = ((AbfsOutputStream) outputStream1.getWrappedStream()).getIngressHandler(); - assertThat(ingressHandler1) - .as("DFS Ingress handler instance is not correct") - .isInstanceOf(AzureDFSIngressHandler.class); - AbfsClient client1 = ingressHandler1.getClient(); - assertThat(client1) - .as("Dfs client was not used correctly") - .isInstanceOf(AbfsDfsClient.class); + .as("Unexpected ingress handler type") + .isInstanceOf(expectedIngressHandler); + + assertThat(ingressHandler.getClient()) + .as("Unexpected client used by ingress handler") + .isInstanceOf(expectedClient); } } + /** + * Validates that for FNS, both DFS and BLOB ingress service types force the use of + * AzureBlobIngressHandler and AbfsBlobClient. + * + * @throws IOException if an I/O error occurs during validation + */ + @Test + public void testValidateIngressHandlerForFNS() throws IOException { + assumeHnsDisabled(); + + validateIngressHandler(AbfsServiceType.DFS, + AzureBlobIngressHandler.class, + AbfsBlobClient.class); + validateIngressHandler(AbfsServiceType.BLOB, + AzureBlobIngressHandler.class, + AbfsBlobClient.class); + } + + + /** + * Validates that for HNS, the correct ingress handler and client + * are used for both DFS and BLOB service types. + * For DFS ingress service type, expects AzureDFSIngressHandler and AbfsDfsClient. + * For BLOB ingress service type, expects AzureBlobIngressHandler and AbfsBlobClient. + * + * @throws IOException if an I/O error occurs during validation + */ + @Test + public void testValidateIngressHandlerForHNS() throws IOException { + assumeHnsEnabled(); + + validateIngressHandler(AbfsServiceType.DFS, + AzureDFSIngressHandler.class, + AbfsDfsClient.class); + validateIngressHandler(AbfsServiceType.BLOB, + AzureBlobIngressHandler.class, + AbfsBlobClient.class); + } + @Test public void testAppendImplicitDirectory() throws Exception { assertThrows(FileNotFoundException.class, () -> { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java index f6ca7042b1cb6..355a33441e68a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java @@ -19,38 +19,39 @@ package org.apache.hadoop.fs.azurebfs; import java.io.FileNotFoundException; +import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.enums.Trilean; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_SERVICE_TYPE; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; -import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.INCORRECT_INGRESS_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION; +import static org.apache.hadoop.fs.azurebfs.services.RetryPolicyConstants.EXPONENTIAL_RETRY_POLICY_ABBREVIATION; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.mockito.ArgumentMatchers.any; + import org.junit.jupiter.api.Assertions; /** @@ -120,33 +121,6 @@ public void testNoGetAclCallOnHnsConfigPresence() throws Exception { .getAclStatus(Mockito.anyString(), any(TracingContext.class)); } - /** - * Test to verify that the initialization of the AzureBlobFileSystem fails - * when an invalid ingress service type is configured. - * - * This test sets up a configuration with an invalid ingress service type - * (DFS) for a Blob endpoint and expects an InvalidConfigurationValueException - * to be thrown during the initialization of the filesystem. - * - * @throws Exception if an error occurs during the test execution - */ - @Test - public void testFileSystemInitializationFailsForInvalidIngress() throws Exception { - assumeHnsDisabled(); - Configuration configuration = new Configuration(getRawConfiguration()); - String defaultUri = configuration.get(FS_DEFAULT_NAME_KEY); - String accountKey = configuration.get( - accountProperty(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, getAccountName()), - configuration.get(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME)); - configuration.set(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, - accountKey.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME)); - configuration.set(FS_AZURE_INGRESS_SERVICE_TYPE, AbfsServiceType.DFS.name()); - String blobUri = defaultUri.replace(ABFS_DFS_DOMAIN_NAME, ABFS_BLOB_DOMAIN_NAME); - intercept(InvalidConfigurationValueException.class, - INCORRECT_INGRESS_TYPE, () -> - FileSystem.newInstance(new Path(blobUri).toUri(), configuration)); - } - @Test public void testFileSystemInitFailsIfNotAbleToDetermineAccountType() throws Exception { AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance( @@ -160,4 +134,54 @@ public void testFileSystemInitFailsIfNotAbleToDetermineAccountType() throws Exce FS_AZURE_ACCOUNT_IS_HNS_ENABLED, () -> mockedFs.initialize(fs.getUri(), getRawConfiguration())); } + + /** + * Test to verify that the fnsEndptConvertedIndicator ("T") is present in the tracing header + * after endpoint conversion during AzureBlobFileSystem initialization. + * + * @throws Exception if any error occurs during the test + */ + @Test + public void testFNSEndptConvertedIndicatorInHeader() throws Exception { + assumeHnsDisabled(); + String scheme = "abfs"; + String dfsDomain = "dfs.core.windows.net"; + String endptConversionIndicatorInTc = "T"; + Configuration conf = new Configuration(getRawConfiguration()); + conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); + + String dfsUri = String.format("%s://%s@%s.%s/", + scheme, getFileSystemName(), + getAccountName().substring(0, getAccountName().indexOf(DOT)), + dfsDomain); + + // Initialize filesystem with DFS endpoint + try (AzureBlobFileSystem fs = + (AzureBlobFileSystem) FileSystem.newInstance(new URI(dfsUri), conf)) { + AzureBlobFileSystem spiedFs = Mockito.spy(fs); + AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + AbfsClient spiedClient = Mockito.spy(spiedStore.getClient()); + + Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); + + // re-init the FS so the spy wiring is used + spiedFs.initialize(fs.getUri(), conf); + ArgumentCaptor ctxCaptor = ArgumentCaptor.forClass(TracingContext.class); + Mockito.verify(spiedClient, Mockito.atLeastOnce()) + .getFilesystemProperties(ctxCaptor.capture()); + + TracingContext captured = ctxCaptor.getValue(); + + AbfsHttpOperation abfsHttpOperation = Mockito.mock(AbfsHttpOperation.class); + captured.constructHeader(abfsHttpOperation, null, + EXPONENTIAL_RETRY_POLICY_ABBREVIATION); + + // The tracing context being used FS Initialization should have the endpoint conversion indicator set to 'T' + final int endpointConversionIndicatorIndex = 15; + String endpointConversionIndicator = captured.getHeader().split(COLON, SPLIT_NO_LIMIT)[endpointConversionIndicatorIndex ]; + Assertions.assertFalse(endpointConversionIndicator.isEmpty(), "Endpoint conversion indicator should be present"); + Assertions.assertEquals(endptConversionIndicatorInTc, endpointConversionIndicator, "Endpoint conversion indicator should be 'T'"); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java index f02086316a731..9e310a8e9cdb1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java @@ -435,7 +435,7 @@ public void testFsInitShouldSetNamespaceConfig() throws Exception { AzureBlobFileSystem mockFileSystem = Mockito.spy((AzureBlobFileSystem) FileSystem.newInstance(getConfigurationWithoutHnsConfig())); AzureBlobFileSystemStore mockStore = Mockito.spy(mockFileSystem.getAbfsStore()); - AbfsClient abfsClient = Mockito.spy(mockStore.getClient()); + AbfsClient abfsClient = Mockito.spy(mockStore.getClient(AbfsServiceType.DFS)); Mockito.doReturn(abfsClient).when(mockStore).getClient(); Mockito.doReturn(abfsClient).when(mockStore).getClient(any()); abfsClient.getIsNamespaceEnabled(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 4bf1f56e7eecc..1eb21ce9da9d9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -370,9 +370,8 @@ public void verifyUserAgentClusterType() throws Exception { @Test // Test to verify the unique identifier in user agent string for FNS-Blob accounts - public void verifyUserAgentForFNSBlob() throws Exception { + public void verifyUserAgentForFNS() throws Exception { assumeHnsDisabled(); - assumeBlobServiceType(); final AzureBlobFileSystem fs = getFileSystem(); final AbfsConfiguration configuration = fs.getAbfsStore() .getAbfsConfiguration(); @@ -386,24 +385,6 @@ public void verifyUserAgentForFNSBlob() throws Exception { .contains(FNS_BLOB_USER_AGENT_IDENTIFIER); } - @Test - // Test to verify that the user agent string for non-FNS-Blob accounts - // does not contain the FNS identifier. - public void verifyUserAgentForDFS() throws Exception { - assumeDfsServiceType(); - final AzureBlobFileSystem fs = getFileSystem(); - final AbfsConfiguration configuration = fs.getAbfsStore() - .getAbfsConfiguration(); - - String userAgentStr = getUserAgentString(configuration, false); - verifyBasicInfo(userAgentStr); - Assertions.assertThat(userAgentStr) - .describedAs( - "User-Agent string for non-FNS-Blob accounts should not contain" - + FNS_BLOB_USER_AGENT_IDENTIFIER) - .doesNotContain(FNS_BLOB_USER_AGENT_IDENTIFIER); - } - public static AbfsClient createTestClientFromCurrentContext( AbfsClient baseAbfsClientInstance, AbfsConfiguration abfsConfig) throws IOException, URISyntaxException { @@ -850,6 +831,39 @@ public void testAuthTypeProviderSetup(AuthType authType) throws Exception { fs.close(); } + /** + * Test to verify that when initializing a filesystem with a DFS endpoint for a FNS account, + * we force to Blob endpoint internally. + * + * @throws Exception if the test fails + */ + @Test + public void testFNSDfsUsesBlobInstance() throws Exception { + assumeHnsDisabled(); + String scheme = "abfs"; + String dfsDomain = "dfs.core.windows.net"; + String blobDomain = "blob.core.windows.net"; + Configuration conf = new Configuration(getRawConfiguration()); + conf.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); + + String dfsUri = String.format("%s://%s@%s.%s/", + scheme, getFileSystemName(), + getAccountName().substring(0, getAccountName().indexOf(DOT)), + dfsDomain); + + // Initialize filesystem with DFS endpoint + AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance( + new URI(dfsUri), conf); + + // Filesystem initialization should have forced to use Blob instance for FNS-DFS + AbfsClient abfsClient = fs.getAbfsStore().getClient(); + Assertions.assertThat(abfsClient) + .as("abfsClient should be instance of AbfsBlobClient") + .isInstanceOf(AbfsBlobClient.class); + Assertions.assertThat(abfsClient.getBaseUrl().toString()) + .contains(blobDomain); + } + @Test public void testIsNonEmptyDirectory() throws IOException { testIsNonEmptyDirectoryInternal(EMPTY_STRING, true, EMPTY_STRING, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index 54521c9c971b5..152c6a43f4c0d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -21,6 +21,8 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.net.ProtocolException; import java.net.URI; import java.net.URISyntaxException; @@ -30,6 +32,8 @@ import java.util.Random; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -54,6 +58,7 @@ import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.http.HttpResponse; +import org.apache.hadoop.fs.store.DataBlocks; import static java.net.HttpURLConnection.HTTP_CONFLICT; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; @@ -593,4 +598,109 @@ public void testChecksumComputedWhenConfigTrue() throws Exception { .isEqualTo(1); } } + + /** + * Tests the selection logic for the DFS-to-Blob fallback handler in AbfsOutputStream. + * Verifies: + * For FNS, fallback succeeds regardless of ingress service type. + * For HNS with BLOB ingress, fallback fails with InvalidConfigurationValueException. + * For HNS with DFS ingress, fallback succeeds. + */ + @Test + public void testDFSToBlobFallbackHandlerSelection() throws Exception { + // Common mocks + DataBlocks.BlockFactory blockFactory = Mockito.mock(DataBlocks.BlockFactory.class); + AzureBlockManager blockManager = Mockito.mock(AzureBlockManager.class); + AbfsClientHandler clientHandler = Mockito.mock(AbfsClientHandler.class); + AbfsClient client = Mockito.mock(AbfsClient.class); + + Mockito.when(clientHandler.getClient(Mockito.any())).thenReturn(client); + + Method createNewHandler = + AbfsOutputStream.class.getDeclaredMethod( + "createNewHandler", + AbfsServiceType.class, + DataBlocks.BlockFactory.class, + int.class, + boolean.class, + AzureBlockManager.class); + createNewHandler.setAccessible(true); + + Field fallbackField = + AbfsOutputStream.class.getDeclaredField("isDFSToBlobFallbackEnabled"); + fallbackField.setAccessible(true); + + Field serviceTypeField = + AbfsOutputStream.class.getDeclaredField("serviceTypeAtInit"); + serviceTypeField.setAccessible(true); + + Field clientHandlerField = + AbfsOutputStream.class.getDeclaredField("clientHandler"); + clientHandlerField.setAccessible(true); + + // FNS case: fallback should succeed regardless of ingress service type + // Only setting isDFSToBlobFallbackEnabled config is enough + Mockito.when(client.getIsNamespaceEnabled()).thenReturn(false); + + AbfsOutputStream fnsStream = + Mockito.mock(AbfsOutputStream.class, Mockito.CALLS_REAL_METHODS); + + fallbackField.set(fnsStream, true); + clientHandlerField.set(fnsStream, clientHandler); + + Object fnsHandler = + createNewHandler.invoke( + fnsStream, + AbfsServiceType.BLOB, + blockFactory, + 1024, + false, + blockManager); + + Assertions.assertThat(fnsHandler) + .as("FNS: fallback should succeed regardless of ingress service type") + .isInstanceOf(AzureDfsToBlobIngressFallbackHandler.class); + + // HNS case: if ingress service type is BLOB, fallback should fail + Mockito.when(client.getIsNamespaceEnabled()).thenReturn(true); + + AbfsOutputStream hnsBlobStream = + Mockito.mock(AbfsOutputStream.class, Mockito.CALLS_REAL_METHODS); + + fallbackField.set(hnsBlobStream, true); + serviceTypeField.set(hnsBlobStream, AbfsServiceType.BLOB); + clientHandlerField.set(hnsBlobStream, clientHandler); + + Assertions.assertThatThrownBy(() -> + createNewHandler.invoke( + hnsBlobStream, + AbfsServiceType.BLOB, + blockFactory, + 1024, + false, + blockManager)) + .as("HNS with BLOB ingress should not allow fallback") + .hasCauseInstanceOf(InvalidConfigurationValueException.class); + + // HNS case: if ingress service type is DFS, fallback should succeed + AbfsOutputStream hnsDfsStream = + Mockito.mock(AbfsOutputStream.class, Mockito.CALLS_REAL_METHODS); + + fallbackField.set(hnsDfsStream, true); + serviceTypeField.set(hnsDfsStream, AbfsServiceType.DFS); + clientHandlerField.set(hnsDfsStream, clientHandler); + + Object hnsHandler = + createNewHandler.invoke( + hnsDfsStream, + AbfsServiceType.DFS, + blockFactory, + 1024, + false, + blockManager); + + Assertions.assertThat(hnsHandler) + .as("HNS with DFS ingress should allow fallback") + .isInstanceOf(AzureDfsToBlobIngressFallbackHandler.class); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java index 997818b5f9f96..c9f3b34f90911 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRenameRetryRecovery.java @@ -449,60 +449,6 @@ public void testExistingPathCorrectlyRejected() throws Exception { false))); } - /** - * Test that rename recovery remains unsupported for - * FNS configurations. - */ - @Test - public void testRenameRecoveryUnsupportedForFlatNamespace() throws Exception { - // In DFS endpoint, renamePath is O(1) API call and idempotency issue can happen. - // For blob endpoint, client orchestrates the rename operation. - assumeDfsServiceType(); - assumeHnsDisabled(); - AzureBlobFileSystem fs = getFileSystem(); - AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); - TracingContext testTracingContext = getTestTracingContext(fs, false); - - AbfsClient mockClient = getMockAbfsClient(); - - String base = "/" + getMethodName(); - String path1 = base + "/dummyFile1"; - String path2 = base + "/dummyFile2"; - - touch(new Path(path1)); - - setAbfsClient(abfsStore, mockClient); - - // checking correct count in AbfsCounters - AbfsCounters counter = mockClient.getAbfsCounters(); - IOStatistics ioStats = counter.getIOStatistics(); - - Long connMadeBeforeRename = lookupCounterStatistic(ioStats, CONNECTIONS_MADE.getStatName()); - Long renamePathAttemptsBeforeRename = lookupCounterStatistic(ioStats, RENAME_PATH_ATTEMPTS.getStatName()); - - expectErrorCode(SOURCE_PATH_NOT_FOUND, intercept(AbfsRestOperationException.class, () -> - mockClient.renamePath(path1, path2, null, - testTracingContext, null, - false))); - - // validating stat counters after rename - - // only 2 calls should have happened in total for rename - // 1 -> original rename rest call, 2 -> first retry, - // no getPathStatus calls - // last getPathStatus call should be skipped - assertThatStatisticCounter(ioStats, - CONNECTIONS_MADE.getStatName()) - .isEqualTo(2 + connMadeBeforeRename); - - // the RENAME_PATH_ATTEMPTS stat should be incremented by 1 - // retries happen internally within AbfsRestOperation execute() - // the stat for RENAME_PATH_ATTEMPTS is updated only once before execute() is called - assertThatStatisticCounter(ioStats, - RENAME_PATH_ATTEMPTS.getStatName()) - .isEqualTo(1 + renamePathAttemptsBeforeRename); - } - /** * Test the resilient commit code works through fault injection, including * reporting recovery. diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java index 4bdb3a68ae3f2..c0e0191014f97 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderValidator.java @@ -42,6 +42,7 @@ public class TracingHeaderValidator implements Listener { private static final String GUID_PATTERN = "^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$"; private String ingressHandler = null; + private boolean fnsEndpointConverted = false; private String position = String.valueOf(0); private ReadType readType = ReadType.UNKNOWN_READ; @@ -61,6 +62,7 @@ public TracingHeaderValidator getClone() { retryNum, streamID); tracingHeaderValidator.primaryRequestId = primaryRequestId; tracingHeaderValidator.ingressHandler = ingressHandler; + tracingHeaderValidator.fnsEndpointConverted = fnsEndpointConverted; tracingHeaderValidator.position = position; tracingHeaderValidator.readType = readType; tracingHeaderValidator.operatedBlobCount = operatedBlobCount; @@ -196,6 +198,11 @@ public void updateIngressHandler(String ingressHandler) { this.ingressHandler = ingressHandler; } + @Override + public void updateFNSEndpointConverted() { + this.fnsEndpointConverted = true; + } + @Override public void updatePosition(String position) { this.position = position;