diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml index 1cedf0c54b040..f18a2a6a1bc18 100644 --- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml +++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml @@ -48,6 +48,8 @@ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]AbfsClient.java"/> + rawLength) { + rawBlockId = rawBlockId.substring(0, rawLength); + } + } + + return Base64.encodeBase64String(rawBlockId.getBytes(StandardCharsets.UTF_8)); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index a0567da97b283..9e8a792763858 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -129,6 +129,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOCK_NAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_COMMITTED_BLOCKS; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_HDI_ISFOLDER; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_HDI_PERMISSION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_NAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_VERSION; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; @@ -898,7 +899,7 @@ public AbfsRestOperation append(final String path, requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); } if (isChecksumValidationEnabled()) { - addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer); + addCheckSumHeaderForWrite(requestHeaders, reqParams); } if (reqParams.isRetryDueToExpect()) { String userAgentRetry = getUserAgent(); @@ -982,6 +983,9 @@ public AbfsRestOperation appendBlock(final String path, if (requestParameters.getLeaseId() != null) { requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, requestParameters.getLeaseId())); } + if (isChecksumValidationEnabled()) { + addCheckSumHeaderForWrite(requestHeaders, requestParameters); + } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, APPEND_BLOCK); String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder); @@ -1021,6 +1025,7 @@ public AbfsRestOperation appendBlock(final String path, * @param leaseId if there is an active lease on the path. * @param contextEncryptionAdapter to provide encryption context. * @param tracingContext for tracing the server calls. + * @param blobMd5 the MD5 hash of the blob for integrity verification. * @return exception as this operation is not supported on Blob Endpoint. * @throws UnsupportedOperationException always. */ @@ -1032,7 +1037,7 @@ public AbfsRestOperation flush(final String path, final String cachedSasToken, final String leaseId, final ContextEncryptionAdapter contextEncryptionAdapter, - final TracingContext tracingContext) throws AzureBlobFileSystemException { + final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { throw new UnsupportedOperationException( "Flush without blockIds not supported on Blob Endpoint"); } @@ -1049,6 +1054,7 @@ public AbfsRestOperation flush(final String path, * @param eTag The etag of the blob. * @param contextEncryptionAdapter to provide encryption context. * @param tracingContext for tracing the service call. + * @param blobMd5 the MD5 hash of the blob for integrity verification. * @return executed rest operation containing response from server. * @throws AzureBlobFileSystemException if rest operation fails. */ @@ -1060,7 +1066,7 @@ public AbfsRestOperation flush(byte[] buffer, final String leaseId, final String eTag, ContextEncryptionAdapter contextEncryptionAdapter, - final TracingContext tracingContext) throws AzureBlobFileSystemException { + final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); addEncryptionKeyRequestHeaders(path, requestHeaders, false, contextEncryptionAdapter, tracingContext); @@ -1070,9 +1076,9 @@ public AbfsRestOperation flush(byte[] buffer, if (leaseId != null) { requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); } - String md5Hash = computeMD5Hash(buffer, 0, buffer.length); - requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Hash)); - + if (blobMd5 != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); + } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); @@ -1097,7 +1103,7 @@ public AbfsRestOperation flush(byte[] buffer, AbfsRestOperation op1 = getPathStatus(path, true, tracingContext, contextEncryptionAdapter); String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5); - if (!md5Hash.equals(metadataMd5)) { + if (blobMd5 != null && !blobMd5.equals(metadataMd5)) { throw ex; } return op; @@ -1914,7 +1920,11 @@ private List getMetadataHeadersList(final Hashtable requestHeaders, - final AppendRequestParameters reqParams, final byte[] buffer) - throws AbfsRestOperationException { - String md5Hash = computeMD5Hash(buffer, reqParams.getoffset(), - reqParams.getLength()); - requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, md5Hash)); + final AppendRequestParameters reqParams) { + if (reqParams.getMd5() != null) { + requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, reqParams.getMd5())); + } } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 577d99560ded2..197744e79fe6c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -116,6 +116,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.RANGE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_HTTP_METHOD_OVERRIDE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_BLOB_CONTENT_MD5; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_CLIENT_TRANSACTION_ID; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_EXISTING_RESOURCE_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_LEASE_ACTION; @@ -762,7 +763,7 @@ public AbfsRestOperation append(final String path, // Add MD5 Hash of request content as request header if feature is enabled if (isChecksumValidationEnabled()) { - addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer); + addCheckSumHeaderForWrite(requestHeaders, reqParams); } // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance @@ -844,6 +845,7 @@ && appendSuccessCheckOp(op, path, * @param leaseId if there is an active lease on the path. * @param contextEncryptionAdapter to provide encryption context. * @param tracingContext for tracing the server calls. + * @param blobMd5 the MD5 hash of the blob for integrity verification. * @return executed rest operation containing response from server. * @throws AzureBlobFileSystemException if rest operation fails. */ @@ -855,7 +857,7 @@ public AbfsRestOperation flush(final String path, final String cachedSasToken, final String leaseId, ContextEncryptionAdapter contextEncryptionAdapter, - TracingContext tracingContext) throws AzureBlobFileSystemException { + TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { final List requestHeaders = createDefaultHeaders(); addEncryptionKeyRequestHeaders(path, requestHeaders, false, contextEncryptionAdapter, tracingContext); @@ -865,6 +867,9 @@ public AbfsRestOperation flush(final String path, if (leaseId != null) { requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); } + if (isChecksumValidationEnabled() && blobMd5 != null) { + requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); + } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION); @@ -886,6 +891,21 @@ public AbfsRestOperation flush(final String path, return op; } + /** + * Flushes data to a file at the specified path, using the provided buffer and other parameters. + * This operation is not supported on the DFS endpoint and will throw an {@link UnsupportedOperationException}. + * + * @param buffer the byte array containing the data to be flushed to the file. + * @param path the path where the data has to be flushed. + * @param isClose whether this is the last flush operation to the file. + * @param cachedSasToken the SAS token to authenticate the operation. + * @param leaseId the lease ID, if an active lease exists on the path. + * @param eTag the ETag for concurrency control to ensure the flush is applied to the correct file version. + * @param contextEncryptionAdapter the adapter providing the encryption context. + * @param tracingContext the tracing context for tracking server calls. + * @param blobMd5 the MD5 hash of the blob for integrity verification. + * @throws UnsupportedOperationException if flush with blockIds is called on a DFS endpoint. + */ @Override public AbfsRestOperation flush(byte[] buffer, final String path, @@ -894,7 +914,7 @@ public AbfsRestOperation flush(byte[] buffer, final String leaseId, final String eTag, final ContextEncryptionAdapter contextEncryptionAdapter, - final TracingContext tracingContext) throws AzureBlobFileSystemException { + final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { throw new UnsupportedOperationException( "Flush with blockIds not supported on DFS Endpoint"); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 29f83ae6f03ff..10e7d9910c884 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -22,6 +22,9 @@ import java.io.IOException; import java.io.OutputStream; import java.net.HttpURLConnection; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Future; import java.util.UUID; @@ -58,6 +61,7 @@ import org.apache.hadoop.fs.Syncable; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MD5; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_WRITE_WITHOUT_LEASE; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; @@ -150,6 +154,19 @@ public class AbfsOutputStream extends OutputStream implements Syncable, /** The handler for managing Abfs client operations. */ private final AbfsClientHandler clientHandler; + /** + * The `MessageDigest` instance used for computing the incremental MD5 hash + * of the data written so far. This is updated as data is written to the stream. + */ + private MessageDigest md5 = null; + + /** + * The `MessageDigest` instance used for computing the MD5 hash + * of the full blob content. This is updated with all data written to the stream + * and represents the complete MD5 checksum of the blob. + */ + private MessageDigest fullBlobContentMd5 = null; + public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) throws IOException { this.statistics = abfsOutputStreamContext.getStatistics(); @@ -202,6 +219,14 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.clientHandler = abfsOutputStreamContext.getClientHandler(); createIngressHandler(serviceTypeAtInit, abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null); + try { + md5 = MessageDigest.getInstance(MD5); + fullBlobContentMd5 = MessageDigest.getInstance(MD5); + } catch (NoSuchAlgorithmException e) { + if (client.isChecksumValidationEnabled()) { + throw new IOException("MD5 algorithm not available", e); + } + } } /** @@ -438,6 +463,11 @@ public synchronized void write(final byte[] data, final int off, final int lengt AbfsBlock block = createBlockIfNeeded(position); int written = bufferData(block, data, off, length); + // Update the incremental MD5 hash with the written data. + getMessageDigest().update(data, off, written); + + // Update the full blob MD5 hash with the written data. + getFullBlobContentMd5().update(data, off, written); int remainingCapacity = block.remainingCapacity(); if (written < length) { @@ -514,6 +544,7 @@ private void uploadBlockAsync(AbfsBlock blockToUpload, outputStreamStatistics.bytesToUpload(bytesLength); outputStreamStatistics.writeCurrentBuffer(); DataBlocks.BlockUploadData blockUploadData = blockToUpload.startUpload(); + String md5Hash = getMd5(); final Future job = executorService.submit(() -> { AbfsPerfTracker tracker = @@ -535,7 +566,7 @@ private void uploadBlockAsync(AbfsBlock blockToUpload, * leaseId - The AbfsLeaseId for this request. */ AppendRequestParameters reqParams = new AppendRequestParameters( - offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled); + offset, 0, bytesLength, mode, false, leaseId, isExpectHeaderEnabled, md5Hash); AbfsRestOperation op; try { op = remoteWrite(blockToUpload, blockUploadData, reqParams, tracingContext); @@ -1170,4 +1201,39 @@ public Boolean areWriteOperationsTasksDone() { } return true; } + + /** + * Returns the MessageDigest used for computing the incremental MD5 hash + * of the data written so far. + * + * @return the MessageDigest used for partial MD5 calculation. + */ + public MessageDigest getMessageDigest() { + return md5; + } + + /** + * Returns the MessageDigest used for computing the MD5 hash + * of the full blob content. + * + * @return the MessageDigest used for full blob MD5 calculation. + */ + public MessageDigest getFullBlobContentMd5() { + return fullBlobContentMd5; + } + + /** + * Returns the Base64-encoded MD5 checksum based on the current digest state. + * This finalizes the digest calculation. Returns null if the digest is empty. + * + * @return the Base64-encoded MD5 string, or null if no digest is available. + */ + public String getMd5() { + byte[] digest = getMessageDigest().digest(); + String md5 = null; + if (digest.length != 0) { + md5 = Base64.getEncoder().encodeToString(digest); + } + return md5; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java index c72bf721dc6f7..4e3e20f00246c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.store.DataBlocks; @@ -46,6 +47,7 @@ public class AzureBlobBlockManager extends AzureBlockManager { /** The list to store blockId, position, and status. */ private final LinkedList blockEntryList = new LinkedList<>(); + private int blockIdLength = 0; /** * Constructs an AzureBlobBlockManager. @@ -70,6 +72,15 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream, abfsOutputStream.getStreamID(), abfsOutputStream.getPath()); } + /** + * Retrieves the length of the block ID. + * + * @return the length of the block ID in bytes. + */ + public int getBlockIdLength() { + return blockIdLength; + } + /** * Creates a new block. * @@ -82,8 +93,9 @@ protected synchronized AbfsBlock createBlockInternal(long position) throws IOException { if (getActiveBlock() == null) { setBlockCount(getBlockCount() + 1); - AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), position); + AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), position, getBlockIdLength(), getBlockCount()); activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), activeBlock.getOffset())); + getAbfsOutputStream().getMessageDigest().reset(); setActiveBlock(activeBlock); } return getActiveBlock(); @@ -104,6 +116,9 @@ private List getBlockList(TracingContext tracingContext) .getBlockList(getAbfsOutputStream().getPath(), tracingContext); if (op != null && op.getResult() != null) { committedBlockIdList = op.getResult().getBlockIdList(); + if (!committedBlockIdList.isEmpty()) { + blockIdLength = Base64.decodeBase64(committedBlockIdList.get(0)).length; + } } return committedBlockIdList; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java index 150d85d474a03..eaa4ca41c781e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java @@ -165,6 +165,7 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, TracingContext tracingContext) throws IOException { AbfsRestOperation op; + AzureBlobBlockManager blobBlockManager = (AzureBlobBlockManager) getBlockManager(); if (getAbfsOutputStream().isAppendBlob()) { return null; } @@ -179,10 +180,11 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, tracingContextFlush.setIngressHandler(BLOB_FLUSH); tracingContextFlush.setPosition(String.valueOf(offset)); LOG.trace("Flushing data at offset {} for path {}", offset, getAbfsOutputStream().getPath()); + String fullBlobMd5 = computeFullBlobMd5(); op = getClient().flush(blockListXml.getBytes(StandardCharsets.UTF_8), getAbfsOutputStream().getPath(), isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId, - getETag(), getAbfsOutputStream().getContextEncryptionAdapter(), tracingContextFlush); + getETag(), getAbfsOutputStream().getContextEncryptionAdapter(), tracingContextFlush, fullBlobMd5); setETag(op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG)); } catch (AbfsRestOperationException ex) { LOG.error("Error in remote flush requiring handler switch for path {}", getAbfsOutputStream().getPath(), ex); @@ -191,6 +193,8 @@ isClose, getAbfsOutputStream().getCachedSasTokenString(), leaseId, } LOG.error("Error in remote flush for path {} and offset {}", getAbfsOutputStream().getPath(), offset, ex); throw ex; + } finally { + getAbfsOutputStream().getFullBlobContentMd5().reset(); } return op; } @@ -289,7 +293,9 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException { LOG.trace("Writing current buffer to service at offset {} and path {}", offset, getAbfsOutputStream().getPath()); AppendRequestParameters reqParams = new AppendRequestParameters( offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE, - true, getAbfsOutputStream().getLeaseId(), getAbfsOutputStream().isExpectHeaderEnabled()); + true, getAbfsOutputStream().getLeaseId(), + getAbfsOutputStream().isExpectHeaderEnabled(), + getAbfsOutputStream().getMd5()); AbfsRestOperation op; try { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java index 426e0c8194f8a..3ebc09da0b4a1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlockManager.java @@ -89,7 +89,7 @@ protected abstract AbfsBlock createBlockInternal(long position) * * @return the active block */ - protected synchronized AbfsBlock getActiveBlock() { + public synchronized AbfsBlock getActiveBlock() { return activeBlock; } @@ -125,7 +125,7 @@ protected DataBlocks.BlockFactory getBlockFactory() { * * @return the block count */ - protected long getBlockCount() { + public long getBlockCount() { return blockCount; } @@ -134,7 +134,7 @@ protected long getBlockCount() { * * @param blockCount the count of blocks to set */ - public void setBlockCount(final long blockCount) { + protected void setBlockCount(final long blockCount) { this.blockCount = blockCount; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java index f7a4542c3f2ae..c58f1d1b28461 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSBlockManager.java @@ -62,6 +62,7 @@ protected synchronized AbfsBlock createBlockInternal(long position) if (getActiveBlock() == null) { setBlockCount(getBlockCount() + 1); AbfsBlock activeBlock = new AbfsBlock(getAbfsOutputStream(), position); + getAbfsOutputStream().getMessageDigest().reset(); setActiveBlock(activeBlock); } return getActiveBlock(); @@ -73,7 +74,7 @@ protected synchronized AbfsBlock createBlockInternal(long position) * @return the active block */ @Override - protected synchronized AbfsBlock getActiveBlock() { + public synchronized AbfsBlock getActiveBlock() { return super.getActiveBlock(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java index 9b6562f2da1e7..faf293b9242ce 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.fs.store.DataBlocks; @@ -178,13 +179,24 @@ protected synchronized AbfsRestOperation remoteFlush(final long offset, tracingContextFlush.setIngressHandler(DFS_FLUSH); tracingContextFlush.setPosition(String.valueOf(offset)); } + String fullBlobMd5 = computeFullBlobMd5(); LOG.trace("Flushing data at offset {} and path {}", offset, getAbfsOutputStream().getPath()); - return getClient() - .flush(getAbfsOutputStream().getPath(), offset, retainUncommitedData, - isClose, - getAbfsOutputStream().getCachedSasTokenString(), leaseId, - getAbfsOutputStream().getContextEncryptionAdapter(), - tracingContextFlush); + AbfsRestOperation op; + try { + op = getClient() + .flush(getAbfsOutputStream().getPath(), offset, retainUncommitedData, + isClose, + getAbfsOutputStream().getCachedSasTokenString(), leaseId, + getAbfsOutputStream().getContextEncryptionAdapter(), + tracingContextFlush, fullBlobMd5); + } catch (AbfsRestOperationException ex) { + LOG.error("Error in remote flush for path {} and offset {}", + getAbfsOutputStream().getPath(), offset, ex); + throw ex; + } finally { + getAbfsOutputStream().getFullBlobContentMd5().reset(); + } + return op; } /** @@ -225,7 +237,9 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException { LOG.trace("Writing current buffer to service at offset {} and path {}", offset, getAbfsOutputStream().getPath()); AppendRequestParameters reqParams = new AppendRequestParameters( offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE, - true, getAbfsOutputStream().getLeaseId(), getAbfsOutputStream().isExpectHeaderEnabled()); + true, getAbfsOutputStream().getLeaseId(), + getAbfsOutputStream().isExpectHeaderEnabled(), + getAbfsOutputStream().getMd5()); // Perform the remote write operation. AbfsRestOperation op = remoteWrite(activeBlock, uploadData, reqParams, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java index ba842cbb79b62..cfa01315f2ae8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java @@ -230,7 +230,9 @@ protected void writeAppendBlobCurrentBufferToService() throws IOException { LOG.trace("Writing current buffer to service at offset {} and path {}", offset, getAbfsOutputStream().getPath()); AppendRequestParameters reqParams = new AppendRequestParameters( offset, 0, bytesLength, AppendRequestParameters.Mode.APPEND_MODE, - true, getAbfsOutputStream().getLeaseId(), getAbfsOutputStream().isExpectHeaderEnabled()); + true, getAbfsOutputStream().getLeaseId(), + getAbfsOutputStream().isExpectHeaderEnabled(), + getAbfsOutputStream().getMd5()); // Perform the remote write operation. AbfsRestOperation op; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java index 3072bdf5d04d6..81007e1c3dd1d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.security.MessageDigest; +import java.util.Base64; import java.util.Objects; import org.slf4j.Logger; @@ -206,4 +208,32 @@ protected InvalidIngressServiceException getIngressHandlerSwitchException( * @return the block manager */ public abstract AbfsClient getClient(); + + /** + * Computes the Base64-encoded MD5 hash of the full blob content. + * + *

This method clones the current state of the {@link MessageDigest} instance + * associated with the blob content to avoid resetting its original state. It then + * calculates the MD5 digest and encodes it into a Base64 string.

+ * + * @return A Base64-encoded string representing the MD5 hash of the full blob content, + * or {@code null} if the digest could not be computed. + */ + protected String computeFullBlobMd5() { + byte[] digest = null; + String fullBlobMd5 = null; + try { + // Clone the MessageDigest to avoid resetting the original state + MessageDigest clonedMd5 + = (MessageDigest) getAbfsOutputStream().getFullBlobContentMd5() + .clone(); + digest = clonedMd5.digest(); + } catch (CloneNotSupportedException e) { + LOG.warn("Failed to clone MessageDigest instance", e); + } + if (digest != null && digest.length != 0) { + fullBlobMd5 = Base64.getEncoder().encodeToString(digest); + } + return fullBlobMd5; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java index 860d9eb527763..24f97238bde36 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java @@ -209,19 +209,21 @@ void createRenamePendingJson(Path path, byte[] bytes) String eTag = extractEtagHeader(putBlobOp.getResult()); String blockId = generateBlockId(); + String blockList = generateBlockListXml(blockId); + byte[] buffer = blockList.getBytes(StandardCharsets.UTF_8); + String computedMd5 = abfsClient.computeMD5Hash(buffer, 0, buffer.length); + AppendRequestParameters appendRequestParameters = new AppendRequestParameters(0, 0, bytes.length, AppendRequestParameters.Mode.APPEND_MODE, false, null, abfsClient.getAbfsConfiguration().isExpectHeaderEnabled(), - new BlobAppendRequestParameters(blockId, eTag)); + new BlobAppendRequestParameters(blockId, eTag), abfsClient.computeMD5Hash(bytes, 0, bytes.length)); abfsClient.append(path.toUri().getPath(), bytes, appendRequestParameters, null, null, tracingContext); - String blockList = generateBlockListXml(blockId); - // PutBlockList on the path. - abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8), - path.toUri().getPath(), true, null, null, eTag, null, tracingContext); + abfsClient.flush(buffer, + path.toUri().getPath(), true, null, null, eTag, null, tracingContext, computedMd5); } /** diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index e31df5eec65bb..207f0a8c7e39e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -64,8 +64,10 @@ import org.apache.hadoop.io.IOUtils; import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_ACCOUNT_NAME_DOMAIN_SUFFIX; +import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.APPEND_SUPPORT_ENABLE_PROPERTY_NAME; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME; @@ -210,6 +212,7 @@ public void setup() throws Exception { if (rawConfig.get(keyProperty) == null) { rawConfig.set(keyProperty, getAccountKey()); } + rawConfig.set(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, TRUE); azureNativeFileSystemStore.initialize( wasbUri, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java index 781cd701400f8..45de7b3d2348e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java @@ -320,22 +320,22 @@ private AbfsRestOperation callOperation(AzureBlobFileSystem fs, case WRITE: if (ingressClient instanceof AbfsDfsClient) { return ingressClient.flush(path, 3, false, false, null, - null, encryptionAdapter, getTestTracingContext(fs, false)); + null, encryptionAdapter, getTestTracingContext(fs, false), null); } else { byte[] buffer = generateBlockListXml(EMPTY_STRING).getBytes(StandardCharsets.UTF_8); return ingressClient.flush(buffer, path, false, null, - null, null, encryptionAdapter, getTestTracingContext(fs, false)); + null, null, encryptionAdapter, getTestTracingContext(fs, false), null); } case APPEND: if (ingressClient instanceof AbfsDfsClient) { return ingressClient.append(path, "val".getBytes(), new AppendRequestParameters(3, 0, 3, APPEND_MODE, false, null, - true), + true, null, null), null, encryptionAdapter, getTestTracingContext(fs, false)); } else { return ingressClient.append(path, "val".getBytes(), new AppendRequestParameters(3, 0, 3, APPEND_MODE, false, null, - true, new BlobAppendRequestParameters(BLOCK_ID, null)), + true, new BlobAppendRequestParameters(BLOCK_ID, null), null), null, encryptionAdapter, getTestTracingContext(fs, false)); } case SET_ACL: diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index 920c4964a559f..414f830aa2c3d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Field; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; @@ -39,7 +38,6 @@ import org.junit.Test; import org.mockito.Mockito; -import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -77,7 +75,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INFINITE_LEASE_KEY; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_INGRESS_SERVICE_TYPE; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_LEASE_THREADS; -import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; import static org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient.generateBlockListXml; import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY; @@ -1039,20 +1036,6 @@ public void testWriteAsyncOpFailedAfterCloseCalled() throws Exception { } } - /** - * Helper method that generates blockId. - * @param position The offset needed to generate blockId. - * @return String representing the block ID generated. - */ - private String generateBlockId(AbfsOutputStream os, long position) { - String streamId = os.getStreamID(); - String streamIdHash = Integer.toString(streamId.hashCode()); - String blockId = String.format("%d_%s", position, streamIdHash); - byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH]; - System.arraycopy(blockId.getBytes(), 0, blockIdByteArray, 0, Math.min(BLOCK_ID_LENGTH, blockId.length())); - return new String(Base64.encodeBase64(blockIdByteArray), StandardCharsets.UTF_8); - } - /** * Test to simulate a successful flush operation followed by a connection reset * on the response, triggering a retry. @@ -1089,17 +1072,17 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep new Path("/test/file"), blobClient); AbfsOutputStream out = (AbfsOutputStream) os.getWrappedStream(); String eTag = out.getIngressHandler().getETag(); - byte[] bytes = new byte[1024 * 1024 * 8]; + byte[] bytes = new byte[1024 * 1024 * 4]; new Random().nextBytes(bytes); // Write some bytes and attempt to flush, which should retry out.write(bytes); - String blockId = generateBlockId(out, 0); + String blockId = out.getBlockManager().getActiveBlock().getBlockId(); String blockListXml = generateBlockListXml(blockId); Mockito.doAnswer(answer -> { // Set up the mock for the flush operation AbfsClientTestUtil.setMockAbfsRestOperationForFlushOperation(blobClient, - eTag, blockListXml, + eTag, blockListXml, out, (httpOperation) -> { Mockito.doAnswer(invocation -> { // Call the real processResponse method @@ -1132,7 +1115,7 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep Mockito.nullable(String.class), Mockito.anyString(), Mockito.nullable(ContextEncryptionAdapter.class), - Mockito.any(TracingContext.class) + Mockito.any(TracingContext.class), Mockito.nullable(String.class) ); out.hsync(); @@ -1145,7 +1128,7 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep Mockito.nullable(String.class), Mockito.anyString(), Mockito.nullable(ContextEncryptionAdapter.class), - Mockito.any(TracingContext.class)); + Mockito.any(TracingContext.class), Mockito.nullable(String.class)); } } @@ -1186,17 +1169,17 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc new Path("/test/file"), blobClient); AbfsOutputStream out = (AbfsOutputStream) os.getWrappedStream(); String eTag = out.getIngressHandler().getETag(); - byte[] bytes = new byte[1024 * 1024 * 8]; + byte[] bytes = new byte[1024 * 1024 * 4]; new Random().nextBytes(bytes); // Write some bytes and attempt to flush, which should retry out.write(bytes); - String blockId = generateBlockId(out, 0); + String blockId = out.getBlockManager().getActiveBlock().getBlockId(); String blockListXml = generateBlockListXml(blockId); Mockito.doAnswer(answer -> { // Set up the mock for the flush operation AbfsClientTestUtil.setMockAbfsRestOperationForFlushOperation(blobClient, - eTag, blockListXml, + eTag, blockListXml, out, (httpOperation) -> { Mockito.doAnswer(invocation -> { // Call the real processResponse method @@ -1234,7 +1217,7 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc Mockito.nullable(String.class), Mockito.anyString(), Mockito.nullable(ContextEncryptionAdapter.class), - Mockito.any(TracingContext.class) + Mockito.any(TracingContext.class), Mockito.nullable(String.class) ); FSDataOutputStream os1 = createMockedOutputStream(fs, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java index 3e0959d5dd6a0..b3f99651de4f4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAttributes.java @@ -67,7 +67,7 @@ public void testSetGetXAttrCreateReplace() throws Exception { // after creating a file, it must be possible to create a new xAttr touch(testFile); fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG); - assertAttributeEqual(fs.getXAttr(testFile, attributeName), attributeValue, decodedAttributeValue); + assertAttributeEqual(fs, fs.getXAttr(testFile, attributeName), attributeValue, decodedAttributeValue); // however after the xAttr is created, creating it again must fail intercept(IOException.class, () -> fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG)); @@ -92,7 +92,7 @@ public void testSetGetXAttrReplace() throws Exception { // however after the xAttr is created, replacing it must succeed fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG); fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG); - assertAttributeEqual(fs.getXAttr(testFile, attributeName), attributeValue2, + assertAttributeEqual(fs, fs.getXAttr(testFile, attributeName), attributeValue2, decodedAttribute2); } @@ -176,7 +176,7 @@ public void testSetXAttrMultipleOperations() throws Exception { // Check if the attribute is retrievable byte[] rv = fs.getXAttr(path, attributeName); - assertAttributeEqual(rv, attributeValue, decodedAttributeValue); + assertAttributeEqual(fs, rv, attributeValue, decodedAttributeValue); } /** @@ -220,7 +220,7 @@ private void testGetSetXAttrHelper(final AzureBlobFileSystem fs, // Check if the attribute is retrievable fs.setListenerOperation(FSOperationType.GET_ATTR); byte[] rv = fs.getXAttr(testPath, attributeName1); - assertAttributeEqual(rv, attributeValue1, decodedAttributeValue1); + assertAttributeEqual(fs, rv, attributeValue1, decodedAttributeValue1); fs.registerListener(null); // Set the second Attribute @@ -228,10 +228,10 @@ private void testGetSetXAttrHelper(final AzureBlobFileSystem fs, // Check all the attributes present and previous Attribute not overridden rv = fs.getXAttr(testPath, attributeName1); - assertAttributeEqual(rv, attributeValue1, decodedAttributeValue1); + assertAttributeEqual(fs, rv, attributeValue1, decodedAttributeValue1); rv = fs.getXAttr(testPath, attributeName2); - assertAttributeEqual(rv, attributeValue2, decodedAttributeValue2); + assertAttributeEqual(fs, rv, attributeValue2, decodedAttributeValue2); } private void assertAttributeNull(byte[] rv) { @@ -240,12 +240,12 @@ private void assertAttributeNull(byte[] rv) { .isNull(); } - private void assertAttributeEqual(byte[] rv, byte[] attributeValue, + public static void assertAttributeEqual(AzureBlobFileSystem fs, byte[] rv, byte[] attributeValue, String decodedAttributeValue) throws Exception { Assertions.assertThat(rv) .describedAs("Retrieved Attribute Does not Matches in Encoded Form") .containsExactly(attributeValue); - Assertions.assertThat(getFileSystem().getAbfsStore().decodeAttribute(rv)) + Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv)) .describedAs("Retrieved Attribute Does not Matches in Decoded Form") .isEqualTo(decodedAttributeValue); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java index bf8c14ae5e14d..41aaeaf37ef30 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java @@ -84,10 +84,10 @@ public void testAppendWithChecksumAtDifferentOffsets() throws Exception { byte[] data = generateRandomBytes(MB_4); int pos = 0; - pos += appendWithOffsetHelper(os, client, path, data, fs, pos, 0); - pos += appendWithOffsetHelper(os, client, path, data, fs, pos, ONE_MB); - pos += appendWithOffsetHelper(os, client, path, data, fs, pos, MB_2); - appendWithOffsetHelper(os, client, path, data, fs, pos, MB_4 - 1); + pos += appendWithOffsetHelper(os, client, path, data, fs, pos, 0, client.computeMD5Hash(data, 0, data.length)); + pos += appendWithOffsetHelper(os, client, path, data, fs, pos, ONE_MB, client.computeMD5Hash(data, ONE_MB, data.length - ONE_MB)); + pos += appendWithOffsetHelper(os, client, path, data, fs, pos, MB_2, client.computeMD5Hash(data, MB_2, data.length-MB_2)); + appendWithOffsetHelper(os, client, path, data, fs, pos, MB_4 - 1, client.computeMD5Hash(data, MB_4 - 1, data.length - (MB_4 - 1))); fs.close(); } @@ -118,14 +118,15 @@ public void testAbfsInvalidChecksumExceptionInAppend() throws Exception { AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true); AbfsClient spiedClient = Mockito.spy(fs.getAbfsStore().getClientHandler().getIngressClient()); Path path = path("testPath" + getMethodName()); - AbfsOutputStream os = (AbfsOutputStream) fs.create(path).getWrappedStream(); + AbfsOutputStream os = Mockito.spy((AbfsOutputStream) fs.create(path).getWrappedStream()); byte[] data= generateRandomBytes(MB_4); String invalidMD5Hash = spiedClient.computeMD5Hash( INVALID_MD5_TEXT.getBytes(), 0, INVALID_MD5_TEXT.length()); Mockito.doReturn(invalidMD5Hash).when(spiedClient).computeMD5Hash(any(), any(Integer.class), any(Integer.class)); + Mockito.doReturn(invalidMD5Hash).when(os).getMd5(); AbfsRestOperationException ex = intercept(AbfsInvalidChecksumException.class, () -> { - appendWithOffsetHelper(os, spiedClient, path, data, fs, 0, 0); + appendWithOffsetHelper(os, spiedClient, path, data, fs, 0, 0, invalidMD5Hash); }); Assertions.assertThat(ex.getErrorCode()) @@ -197,12 +198,12 @@ private String generateBlockId(AbfsOutputStream os, long position) { * @throws Exception */ private int appendWithOffsetHelper(AbfsOutputStream os, AbfsClient client, Path path, - byte[] data, AzureBlobFileSystem fs, final int pos, final int offset) throws Exception { + byte[] data, AzureBlobFileSystem fs, final int pos, final int offset, String md5) throws Exception { String blockId = generateBlockId(os, pos); String eTag = os.getIngressHandler().getETag(); AppendRequestParameters reqParams = new AppendRequestParameters( pos, offset, data.length - offset, APPEND_MODE, isAppendBlobEnabled(), null, true, - new BlobAppendRequestParameters(blockId, eTag)); + new BlobAppendRequestParameters(blockId, eTag), md5); client.append(path.toUri().getPath(), data, reqParams, null, null, getTestTracingContext(fs, false)); return reqParams.getLength(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java index 408f94d78d88c..14f2a14b2b64d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRename.java @@ -2128,7 +2128,7 @@ private void testRenamePreRenameFailureResolution(final AzureBlobFileSystem fs) Mockito.anyBoolean(), Mockito.nullable(String.class), Mockito.nullable(String.class), Mockito.anyString(), Mockito.nullable(ContextEncryptionAdapter.class), - Mockito.any(TracingContext.class)); + Mockito.any(TracingContext.class), Mockito.anyString()); return createAnswer.callRealMethod(); }; RenameAtomicityTestUtils.addCreatePathMock(client, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java index 38d4d12b10f34..6f7d37d992951 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java @@ -18,21 +18,31 @@ package org.apache.hadoop.fs.azurebfs; import java.io.BufferedReader; +import java.io.IOException; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.EnumSet; +import java.util.UUID; +import org.assertj.core.api.Assertions; import org.junit.Assume; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; -import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.contract.ContractTestUtils; +import static java.net.HttpURLConnection.HTTP_CONFLICT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsDirectory; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs; @@ -45,7 +55,15 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest { private static final String WASB_TEST_CONTEXT = "wasb test file"; private static final String ABFS_TEST_CONTEXT = "abfs test file"; private static final String TEST_CONTEXT = "THIS IS FOR TEST"; - + private static final String TEST_CONTEXT1 = "THIS IS FOR TEST1"; + private static final byte[] ATTRIBUTE_VALUE_1 = "one".getBytes( + StandardCharsets.UTF_8); + private static final byte[] ATTRIBUTE_VALUE_2 = "two".getBytes( + StandardCharsets.UTF_8); + private static final String ATTRIBUTE_NAME_1 = "user_someAttribute"; + private static final String ATTRIBUTE_NAME_2 = "user_someAttribute1"; + private static final EnumSet CREATE_FLAG = EnumSet.of( + XAttrSetFlag.CREATE); private static final Logger LOG = LoggerFactory.getLogger(ITestWasbAbfsCompatibility.class); @@ -66,7 +84,7 @@ public void testListFileStatus() throws Exception { Path testFiles = path("/testfiles"); Path path1 = new Path(testFiles + "/~12/!008/3/abFsTestfile"); - try(FSDataOutputStream abfsStream = fs.create(path1, true)) { + try (FSDataOutputStream abfsStream = fs.create(path1, true)) { abfsStream.write(ABFS_TEST_CONTEXT.getBytes()); abfsStream.flush(); abfsStream.hsync(); @@ -75,14 +93,16 @@ public void testListFileStatus() throws Exception { // create file using wasb Path path2 = new Path(testFiles + "/~12/!008/3/nativeFsTestfile"); LOG.info("{}", wasb.getUri()); - try(FSDataOutputStream nativeFsStream = wasb.create(path2, true)) { + try (FSDataOutputStream nativeFsStream = wasb.create(path2, true)) { nativeFsStream.write(WASB_TEST_CONTEXT.getBytes()); nativeFsStream.flush(); nativeFsStream.hsync(); } // list file using abfs and wasb - FileStatus[] abfsFileStatus = fs.listStatus(new Path(testFiles + "/~12/!008/3/")); - FileStatus[] nativeFsFileStatus = wasb.listStatus(new Path(testFiles + "/~12/!008/3/")); + FileStatus[] abfsFileStatus = fs.listStatus( + new Path(testFiles + "/~12/!008/3/")); + FileStatus[] nativeFsFileStatus = wasb.listStatus( + new Path(testFiles + "/~12/!008/3/")); assertEquals(2, abfsFileStatus.length); assertEquals(2, nativeFsFileStatus.length); @@ -102,18 +122,13 @@ public void testReadFile() throws Exception { NativeAzureFileSystem wasb = getWasbFileSystem(); Path testFile = path("/testReadFile"); - for (int i = 0; i< 4; i++) { + for (int i = 0; i < 4; i++) { Path path = new Path(testFile + "/~12/!008/testfile" + i); final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb; // Read final FileSystem readFs = readFileWithAbfs[i] ? abfs : wasb; - if (createFs == abfs && readFs == wasb) { - //Since flush changes the md5Hash value, md5 returned by GetBlobProperties will not match the one returned by GetBlob. - Assume.assumeFalse(getIngressServiceType() == AbfsServiceType.BLOB); - } - // Write - try(FSDataOutputStream nativeFsStream = createFs.create(path, true)) { + try (FSDataOutputStream nativeFsStream = createFs.create(path, true)) { nativeFsStream.write(TEST_CONTEXT.getBytes()); nativeFsStream.flush(); nativeFsStream.hsync(); @@ -122,7 +137,8 @@ public void testReadFile() throws Exception { // Check file status ContractTestUtils.assertIsFile(createFs, path); - try(BufferedReader br =new BufferedReader(new InputStreamReader(readFs.open(path)))) { + try (BufferedReader br = new BufferedReader( + new InputStreamReader(readFs.open(path)))) { String line = br.readLine(); assertEquals("Wrong text from " + readFs, TEST_CONTEXT, line); @@ -133,6 +149,112 @@ public void testReadFile() throws Exception { } } + /** + * Flow: Create and write a file using WASB, then read and append to it using ABFS. Finally, delete the file via ABFS after verifying content consistency. + * Expected: WASB successfully creates the file and writes content. ABFS reads, appends, and deletes the file without data loss or errors. + */ + @Test + public void testwriteFile() throws Exception { + AzureBlobFileSystem abfs = getFileSystem(); + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(wasb, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + try (FSDataOutputStream abfsOutputStream = abfs.append(path)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Flow: Create and write a file using ABFS, append to the file using WASB, then write again using ABFS. + * Expected: File is created and written correctly by ABFS, appended by WASB, and final ABFS write reflects all updates without errors. + */ + + @Test + public void testwriteFile1() throws Exception { + AzureBlobFileSystem abfs = getFileSystem(); + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + // Write + try (FSDataOutputStream nativeFsStream = abfs.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (FSDataOutputStream nativeFsStream = wasb.append(path)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + try (FSDataOutputStream nativeFsStream = abfs.append(path)) { + nativeFsStream.write(TEST_CONTEXT1.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Flow: Create the file using AzCopy, then append to the file using ABFS. + * Expected: ABFS append succeeds and final file reflects both AzCopy and appended data correctly. + */ + @Test + public void testazcopywasbcompatibility() throws Exception { + AzureBlobFileSystem abfs = getFileSystem(); + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + createAzCopyFile(path); + + try (FSDataOutputStream nativeFsStream = abfs.append(path)) { + nativeFsStream.write(TEST_CONTEXT1.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // Remove file + assertDeleted(abfs, path, true); + } + + @Test public void testDir() throws Exception { boolean[] createDirWithAbfs = new boolean[]{false, true, false, true}; @@ -152,7 +274,8 @@ public void testDir() throws Exception { final FileSystem createFs = createDirWithAbfs[i] ? abfs : wasb; assertTrue(createFs.mkdirs(path)); //check - assertPathExists(createFs, "Created dir not found with " + createFs, path); + assertPathExists(createFs, "Created dir not found with " + createFs, + path); //read final FileSystem readFs = readDirWithAbfs[i] ? abfs : wasb; assertPathExists(readFs, "Created dir not found with " + readFs, @@ -164,9 +287,11 @@ public void testDir() throws Exception { @Test - public void testUrlConversion(){ - String abfsUrl = "abfs://abcde-1111-1111-1111-1111@xxxx.dfs.xxx.xxx.xxxx.xxxx"; - String wabsUrl = "wasb://abcde-1111-1111-1111-1111@xxxx.blob.xxx.xxx.xxxx.xxxx"; + public void testUrlConversion() { + String abfsUrl + = "abfs://abcde-1111-1111-1111-1111@xxxx.dfs.xxx.xxx.xxxx.xxxx"; + String wabsUrl + = "wasb://abcde-1111-1111-1111-1111@xxxx.blob.xxx.xxx.xxxx.xxxx"; assertEquals(abfsUrl, wasbUrlToAbfsUrl(wabsUrl)); assertEquals(wabsUrl, abfsUrlToWasbUrl(abfsUrl, false)); } @@ -201,4 +326,1806 @@ public void testSetWorkingDirectory() throws Exception { assertEquals(path3, wasb.getWorkingDirectory()); assertEquals(path3, abfs.getWorkingDirectory()); } + + // Scenario wise testing + + /** + * Scenario 1: Create and write a file using WASB, then read the file using ABFS. + * Expected Outcome: ABFS should correctly read the content written by WASB. + */ + @Test + public void testScenario1() throws Exception { + AzureBlobFileSystem abfs = getFileSystem(); + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // Check file status + ContractTestUtils.assertIsFile(wasb, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 2: Create and write a file using WASB, read it using ABFS, then write to the same file using ABFS. + * Expected Outcome: ABFS should read the WASB-written content and successfully write new content to the same file. + */ + @Test + public void testScenario2() throws Exception { + AzureBlobFileSystem abfs = getFileSystem(); + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // Check file status + ContractTestUtils.assertIsFile(wasb, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + + // Write + try (FSDataOutputStream abfsOutputStream = abfs.append(path)) { + abfsOutputStream.write(TEST_CONTEXT1.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 3: Create and write a file using ABFS, then read it using WASB. + * Expected Outcome: WASB should be able to read the content written by ABFS without any data mismatch or error. + */ + @Test + public void testScenario3() throws Exception { + AzureBlobFileSystem abfs = getFileSystem(); + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream abfsOutputStream = abfs.create(path, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(wasb.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + wasb, + TEST_CONTEXT, line); + } + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 4: Create a file using WASB, write to it using ABFS, and then write again using WASB. + * Expected Outcome: All writes should succeed and the final content should reflect changes from both ABFS and WASB. + */ + @Test + public void testScenario4() throws Exception { + AzureBlobFileSystem abfs = getFileSystem(); + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + wasb.create(path, true); + try (FSDataOutputStream abfsOutputStream = abfs.append(path)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + try (FSDataOutputStream nativeFsStream = wasb.append(path)) { + nativeFsStream.write(TEST_CONTEXT1.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 5: Create a file using ABFS, write to it using WASB, and read it back using ABFS with checksum validation disabled. + * Expected Outcome: The read operation should succeed and reflect the data written via WASB despite checksum validation being off. + */ + @Test + public void testScenario5() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, false); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + abfs.create(path, true); + try (FSDataOutputStream nativeFsStream = wasb.append(path)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 6: Create a file using ABFS, write to it using WASB, and read it via ABFS with checksum validation enabled. + * Expected Outcome: Read should fail due to checksum mismatch caused by WASB write, verifying integrity enforcement. + */ + @Test + public void testScenario6() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + assumeBlobServiceType(); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + abfs.create(path, true); + try (FSDataOutputStream nativeFsStream = wasb.append(path)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 7: Create a file using WASB and then overwrite it using ABFS with overwrite=true. + * Expected Outcome: ABFS should successfully overwrite the existing file created by WASB without error. + */ + @Test + public void testScenario7() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + abfs.create(path, true); + FileStatus fileStatus = abfs.getFileStatus(path); + Assertions.assertThat(fileStatus.getLen()) + .as("Expected file length to be 0 after overwrite") + .isEqualTo(0L); + + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 8: Create a file using WASB and then attempt to create the same file using ABFS with overwrite=false. + * Expected Outcome: ABFS should fail to create the file due to the file already existing. + */ + @Test + public void testScenario8() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + try { + abfs.create(path, false); + } catch (IOException e) { + AbfsRestOperationException restEx = (AbfsRestOperationException) e.getCause(); + if (restEx != null) { + Assertions.assertThat(restEx.getStatusCode()) + .as("Expected HTTP status code 409 (Conflict) when file already exists") + .isEqualTo(HTTP_CONFLICT); + } + Assertions.assertThat(e.getMessage()) + .as("Expected error message to contain 'AlreadyExists'") + .contains("AlreadyExists"); + } + + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 9: Create a file using ABFS and then attempt to create the same file using WASB with overwrite=true. + * Expected Outcome: WASB should successfully overwrite the existing file. + */ + @Test + public void testScenario9() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + try (FSDataOutputStream abfsOutputStream = abfs.create(path, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + wasb.create(path, true); + FileStatus fileStatus = abfs.getFileStatus(path); + Assertions.assertThat(fileStatus.getLen()) + .as("Expected file length to be 0 after overwrite") + .isEqualTo(0L); + + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 10: Create a file using ABFS and then attempt to create the same file using WASB with overwrite=false. + * Expected Outcome: WASB should fail to create the file as it already exists. The exception should indicate + * an "AlreadyExists" error with HTTP status code 409 (Conflict). + */ + @Test + public void testScenario10() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + try (FSDataOutputStream abfsOutputStream = abfs.create(path, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + try { + wasb.create(path, false); + } catch (IOException e) { + AbfsRestOperationException restEx + = (AbfsRestOperationException) e.getCause(); + if (restEx != null) { + Assertions.assertThat(restEx.getStatusCode()) + .as("Expected HTTP status code 409 (Conflict) when file already exists") + .isEqualTo(HTTP_CONFLICT); + } + Assertions.assertThat(e.getMessage()) + .as("Expected error message to contain 'exists'") + .contains("exists"); + } + // Remove file + assertDeleted(abfs, path, true); + } + + /** + * Scenario 11: Create a file using ABFS, write data to it using WASB, and then delete the file using ABFS. + * Expected Outcome: File should be created via ABFS and writable by WASB. + * ABFS delete should succeed, and the file should no longer exist. + */ + @Test + public void testScenario11() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + abfs.create(path, true); + try (FSDataOutputStream nativeFsStream = wasb.append(path)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + abfs.delete(path, true); + } + + /** + * Scenario 12: Create and write a file using ABFS, and then delete the same file using WASB. + * Expected Outcome: File should be created and written successfully via ABFS. + * WASB should be able to delete the file without errors. + */ + @Test + public void testScenario12() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream abfsOutputStream = abfs.create(path, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + wasb.delete(path, true); + } + + /** + * Scenario 13: Create a file using ABFS, write data to it using WASB, and then read the file using WASB. + * Expected Outcome: The read operation via WASB should return the correct content written via WASB. + */ + @Test + public void testScenario13() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + abfs.create(path, true); + try (FSDataOutputStream nativeFsStream = wasb.append(path)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(wasb.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + wasb, + TEST_CONTEXT, line); + } + abfs.delete(path, true); + } + + /** + * Scenario 14: Create a file using ABFS, write data to it using WASB, and delete the file using WASB. + * Expected Outcome: Write via WASB should succeed and data should be persisted; delete via WASB should succeed without errors. + */ + @Test + public void testScenario14() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + abfs.create(path, true); + try (FSDataOutputStream nativeFsStream = wasb.append(path)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(wasb.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + wasb, + TEST_CONTEXT, line); + } + wasb.delete(path, true); + } + + /** + * Scenario 15: Create and write a file using WASB, then delete the file using ABFS. + * Expected Outcome: Write via WASB should succeed and data should be persisted; delete via ABFS should succeed without errors. + */ + @Test + public void testScenario15() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(wasb.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + wasb, + TEST_CONTEXT, line); + } + abfs.delete(path, true); + } + + /** + * Scenario 16: Create a file using WASB, write data to it using ABFS, and then delete the file using WASB. + * Expected Outcome: Write via ABFS should succeed and persist data; delete via WASB should succeed without errors. + */ + @Test + public void testScenario16() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + wasb.create(path, true); + try (FSDataOutputStream abfsOutputStream = abfs.append(path)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, path); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(path)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + wasb.delete(path, true); + } + + /** + * Scenario 17: Create a file using ABFS, set attribute (xAttr), and retrieve it using ABFS. + * Expected Outcome: setXAttr and getXAttr operations via ABFS should succeed and return the correct value. + */ + @Test + public void testScenario17() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream abfsOutputStream = abfs.create(path, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = abfs.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2); + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + abfs.delete(path, true); + } + + /** + * Scenario 18: Create a file using WASB, set an attribute (xAttr), and retrieve it using WASB. + * Expected Outcome: setXAttr and getXAttr operations via WASB should succeed and return the correct value. + */ + @Test + public void testScenario18() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2); + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.delete(path, true); + } + + /** + * Scenario 19: Create a file using WASB, set an attribute using WASB, and retrieve it using ABFS. + * Expected Outcome: Attribute set via WASB should be retrievable via ABFS and should match the original value. + */ + @Test + public void testScenario19() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2); + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.delete(path, true); + } + + /** + * Scenario 20: Create a file using WASB, set an attribute via WASB, retrieve the attribute via ABFS, + * and then create the file again using ABFS with overwrite=true. + * Expected Outcome: Attribute set via WASB should be retrievable via ABFS before overwrite. + * After overwrite via ABFS, the attribute should no longer exist. + */ + @Test + public void testScenario20() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2); + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + abfs.create(path, true); + FileStatus fileStatus = abfs.getFileStatus(path); + Assertions.assertThat(fileStatus.getLen()) + .as("Expected file length to be 0 after overwrite") + .isEqualTo(0L); + wasb.delete(path, true); + } + + /** + * Scenario 21: Create a file using ABFS, set an attribute via ABFS, retrieve the attribute via WASB, + * and then create the file again using WASB with overwrite=true. + * Expected Outcome: Attribute set via ABFS should be retrievable via WASB before overwrite. + * After overwrite via WASB, the attribute should no longer exist. + */ + @Test + public void testScenario21() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream abfsOutputStream = abfs.create(path, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2); + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.create(path, true); + FileStatus fileStatus = abfs.getFileStatus(path); + Assertions.assertThat(fileStatus.getLen()) + .as("Expected file length to be 0 after overwrite") + .isEqualTo(0L); + wasb.delete(path, true); + } + + /** + * Scenario 22: Create a file using WASB, set an attribute via ABFS, + * retrieve the attribute via WASB, and then create the file again using WASB with overwrite=true. + * Expected Outcome: Attribute set via ABFS should be retrievable via WASB before overwrite. + * After overwrite via WASB, the attribute should be removed. + */ + @Test + public void testScenario22() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2); + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.create(path, true); + FileStatus fileStatus = abfs.getFileStatus(path); + Assertions.assertThat(fileStatus.getLen()) + .as("Expected file length to be 0 after overwrite") + .isEqualTo(0L); + wasb.delete(path, true); + } + + /** + * Scenario 23: Create a file using WASB, set an attribute via ABFS, + * then set another attribute via WASB, and retrieve attributes via ABFS. + * Expected Outcome: Both attributes should be retrievable via ABFS, + * confirming that updates from both ABFS and WASB are visible. + */ + @Test + public void testScenario23() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2, CREATE_FLAG); + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.delete(path, true); + } + + /** + * Scenario 24: Create a file using ABFS, then set an attribute via WASB, + * and retrieve the attribute via ABFS. + * Expected Outcome: Attribute set via WASB should be retrievable via ABFS, + * verifying cross-compatibility of attribute operations. + */ + @Test + public void testScenario24() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream abfsOutputStream = abfs.create(path, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2, CREATE_FLAG); + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.delete(path, true); + } + + /** + * Scenario 25: Create a file using WASB, then set and retrieve an attribute via ABFS, + * and finally delete the file using WASB. + * Expected Outcome: Attribute set via ABFS should be retrievable via ABFS, + * and file deletion via WASB should succeed. + */ + @Test + public void testScenario25() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + abfs.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2, CREATE_FLAG); + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = abfs.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.delete(path, true); + } + + /** + * Scenario 26: Create a file using ABFS, then set and retrieve an attribute via WASB, + * and finally delete the file using WASB. + * Expected Outcome: Attribute set via WASB should be retrievable via WASB, + * and file deletion via WASB should succeed. + */ + @Test + public void testScenario26() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream abfsOutputStream = abfs.create(path, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = abfs.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2, CREATE_FLAG); + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.delete(path, true); + } + + /** + * Scenario 27: Create and write a file using ABFS, then rename the file using WASB. + * Expected Outcome: WASB should successfully rename the file created and written by ABFS. + */ + @Test + public void testScenario27() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream abfsOutputStream = abfs.create(testPath1, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME FILE --- + boolean renamed = wasb.rename(testPath1, testPath2); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + + // --- LIST FILES IN DIRECTORY --- + Path parentDir = new Path(testFile + "/~12/!008"); + int noOfFiles = listAllFilesAndDirs(wasb, parentDir); + Assertions.assertThat(noOfFiles) + .as("Expected only 1 file or directory under path: %s", parentDir) + .isEqualTo(1); + wasb.delete(testPath2, true); + } + + /** + * Scenario 28: Create and write a file using WASB, rename the file using ABFS, and list files using ABFS. + * Expected Outcome: ABFS should successfully rename the file created by WASB, and the renamed file should appear in listings. + */ + @Test + public void testScenario28() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(testPath1, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME FILE --- + boolean renamed = abfs.rename(testPath1, testPath2); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + + // --- LIST FILES IN DIRECTORY --- + Path parentDir = new Path(testFile + "/~12/!008"); + int noOfFiles = listAllFilesAndDirs(abfs, parentDir); + Assertions.assertThat(noOfFiles) + .as("Expected only 1 file or directory under path: %s", parentDir) + .isEqualTo(1); + wasb.delete(testPath2, true); + } + + /** + * Scenario 29: Create a file using WASB, write data to it via ABFS, rename the file using ABFS, and list files using ABFS. + * Expected Outcome: ABFS should successfully rename the file and list the renamed file accurately. + */ + @Test + public void testScenario29() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + wasb.create(testPath1, true); + try (FSDataOutputStream abfsOutputStream = abfs.append(testPath1)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME FILE --- + boolean renamed = abfs.rename(testPath1, testPath2); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + + // --- LIST FILES IN DIRECTORY --- + Path parentDir = new Path(testFile + "/~12/!008"); + int noOfFiles = listAllFilesAndDirs(abfs, parentDir); + Assertions.assertThat(noOfFiles) + .as("Expected only 1 file or directory under path: %s", parentDir) + .isEqualTo(1); + wasb.delete(testPath2, true); + } + + /** + * Scenario 30: Create and write a file using WASB, rename it via WASB, rename again via ABFS, and list files using ABFS. + * Expected Outcome: Both renames should succeed, and ABFS listing should reflect the latest filename. + */ + @Test + public void testScenario30() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath3 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(testPath1, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME FILE --- + boolean renamed = wasb.rename(testPath1, testPath2); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + + // --- RENAME FILE --- + boolean renamed1 = abfs.rename(testPath2, testPath3); + Assertions.assertThat(renamed1) + .as("Rename failed") + .isTrue(); + + // --- LIST FILES IN DIRECTORY --- + Path parentDir = new Path(testFile + "/~12/!008"); + int noOfFiles = listAllFilesAndDirs(abfs, parentDir); + Assertions.assertThat(noOfFiles) + .as("Expected only 1 file or directory under path: %s", parentDir) + .isEqualTo(1); + wasb.delete(testPath3, true); + } + + /** + * Scenario 31: Create and write a file using WASB, delete it via WASB, then attempt to rename the deleted file via ABFS. + * Expected Outcome: Rename should fail since the file was deleted, ensuring proper error handling. + */ + @Test + public void testScenario31() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(testPath1, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + wasb.delete(testPath1, true); + + // --- RENAME FILE --- + boolean renamed = abfs.rename(testPath1, testPath2); + Assertions.assertThat(renamed) + .as("Rename operation should have failed but returned true") + .isFalse(); + } + + /** + * Scenario 32: Create a directory and file using WASB, rename the directory using ABFS, and list files using ABFS. + * Expected Outcome: ABFS should successfully rename the directory, and listing should reflect the updated directory name. + */ + @Test + public void testScenario32() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testFile1 = path("/testReadFile1"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath3 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + wasb.mkdirs(testFile); + try (FSDataOutputStream nativeFsStream = wasb.create(testPath1, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + wasb.create(testPath2, true); + wasb.create(testPath3, true); + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME DIR --- + boolean renamed = abfs.rename(testFile, testFile1); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + // --- LIST FILES IN DIRECTORY --- + int listResult = listAllFilesAndDirs(abfs, testFile1); + Assertions.assertThat(listResult) + .as("Expected only 5 entries under path: %s", testFile1) + .isEqualTo(5); + } + + /** + * Scenario 33: Create a directory and file using ABFS, rename the directory using WASB, and list files using WASB. + * Expected Outcome: WASB should successfully rename the directory, and listing should reflect the updated directory name. + */ + @Test + public void testScenario33() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testFile1 = path("/testReadFile1"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath3 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + abfs.mkdirs(testFile); + try (FSDataOutputStream abfsOutputStream = abfs.create(testPath1, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + abfs.create(testPath2, true); + abfs.create(testPath3, true); + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME DIR --- + boolean renamed = wasb.rename(testFile, testFile1); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + // --- LIST FILES IN DIRECTORY --- + int listResult = listAllFilesAndDirs(wasb, testFile1); + Assertions.assertThat(listResult) + .as("Expected only 5 entries under path: %s", testFile1) + .isEqualTo(5); + } + + /** + * Scenario 34: Create a directory via ABFS, rename a file inside the directory using WASB, and list files via ABFS. + * Expected Outcome: WASB should successfully rename the file, and ABFS listing should reflect the updated filename. + */ + @Test + public void testScenario34() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath3 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + abfs.mkdirs(testFile); + try (FSDataOutputStream abfsOutputStream = abfs.create(testPath1, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + abfs.create(testPath3, true); + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME DIR --- + boolean renamed = wasb.rename(testPath1, testPath2); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + // --- LIST FILES IN DIRECTORY --- + int listResult = listAllFilesAndDirs(abfs, testFile); + Assertions.assertThat(listResult) + .as("Expected only 4 entries under path: %s", testFile) + .isEqualTo(4); + } + + /** + * Scenario 35: Create a directory via WASB, rename a file inside the directory using ABFS, and list files via WASB. + * Expected Outcome: ABFS should successfully rename the file, and WASB listing should reflect the updated filename. + */ + @Test + public void testScenario35() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath3 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + wasb.mkdirs(testFile); + try (FSDataOutputStream nativeFsStream = wasb.create(testPath1, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + wasb.create(testPath3, true); + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME DIR --- + boolean renamed = abfs.rename(testPath1, testPath2); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + // --- LIST FILES IN DIRECTORY --- + int listResult = listAllFilesAndDirs(wasb, testFile); + Assertions.assertThat(listResult) + .as("Expected only 4 entries under path: %s", testFile) + .isEqualTo(4); + } + + /** + * Scenario 36: Create a file via WASB, attempt to rename it to an existing filename using ABFS, and list files via WASB. + * Expected Outcome: Rename should fail due to existing target name, and WASB listing should remain unchanged. + */ + + @Test + public void testScenario36() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath3 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + wasb.mkdirs(testFile); + try (FSDataOutputStream nativeFsStream = wasb.create(testPath1, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + wasb.create(testPath3, true); + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME DIR --- + boolean renamed = abfs.rename(testFile, testFile); + Assertions.assertThat(renamed) + .as("Rename operation should have failed but returned true") + .isFalse(); + } + + /** + * Scenario 37: Attempt to rename a non-existent file using WASB. + * Expected Outcome: Rename operation should fail with an appropriate error indicating the file does not exist. + */ + @Test + public void testScenario37() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath3 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + abfs.mkdirs(testFile); + try (FSDataOutputStream abfsOutputStream = abfs.create(testPath1, true)) { + abfsOutputStream.write(TEST_CONTEXT.getBytes()); + abfsOutputStream.flush(); + abfsOutputStream.hsync(); + } + abfs.create(testPath3, true); + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME NON EXISTENT FILE --- + boolean renamed = wasb.rename(testPath2, testPath3); + Assertions.assertThat(renamed) + .as("Rename operation should have failed but returned true") + .isFalse(); + } + + /** + * Scenario 38: Create a file using WASB, set and get an attribute via WASB, then create the file again with overwrite=true using WASB. + * Expected Outcome: Attribute operations should succeed before overwrite, and after overwrite, the file should be replaced with no prior attributes. + */ + @Test + public void testScenario38() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + Path testFile = path("/testReadFile"); + Path path = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + try (FSDataOutputStream nativeFsStream = wasb.create(path, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + // --- VALIDATE FILE --- + FileStatus status = wasb.getFileStatus(path); + assertIsFile(path, status); + + // --- SET XATTR #1 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_1, ATTRIBUTE_VALUE_1); + byte[] readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + // --- SET XATTR #2 --- + wasb.setXAttr(path, ATTRIBUTE_NAME_2, ATTRIBUTE_VALUE_2); + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_2); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_2, "two"); + + // --- VERIFY XATTR #1 AGAIN --- + readValue = wasb.getXAttr(path, ATTRIBUTE_NAME_1); + ITestAzureBlobFileSystemAttributes.assertAttributeEqual(abfs, readValue, ATTRIBUTE_VALUE_1, "one"); + + wasb.create(path, true); + FileStatus fileStatus = abfs.getFileStatus(path); + Assertions.assertThat(fileStatus.getLen()) + .as("Expected file length to be 0 after overwrite") + .isEqualTo(0L); + wasb.delete(path, true); + } + + /** + * Scenario 39: Create and write a file using WASB, rename the file using WASB, and list files using WASB. + * Expected Outcome: WASB should successfully rename the file, and the renamed file should appear in the listing. + */ + @Test + public void testScenario39() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean(FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, true); + FileSystem fileSystem = FileSystem.newInstance(conf); + AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem; + Assume.assumeFalse("Namespace enabled account does not support this test", + getIsNamespaceEnabled(abfs)); + NativeAzureFileSystem wasb = getWasbFileSystem(); + + String testRunId = UUID.randomUUID().toString(); + Path baseDir = path("/testScenario39_" + testRunId); + Path testFile = new Path(baseDir, "testReadFile"); + Path testPath1 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath2 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + Path testPath3 = new Path(testFile + "/~12/!008/testfile_" + UUID.randomUUID()); + + // Write + wasb.mkdirs(testFile); + try (FSDataOutputStream nativeFsStream = wasb.create(testPath1, true)) { + nativeFsStream.write(TEST_CONTEXT.getBytes()); + nativeFsStream.flush(); + nativeFsStream.hsync(); + } + wasb.create(testPath3, true); + + // Check file status + ContractTestUtils.assertIsFile(abfs, testPath1); + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(abfs.open(testPath1)))) { + String line = br.readLine(); + assertEquals("Wrong text from " + abfs, + TEST_CONTEXT, line); + } + // --- RENAME DIR --- + boolean renamed = wasb.rename(testPath1, testPath2); + Assertions.assertThat(renamed) + .as("Rename failed") + .isTrue(); + // --- LIST FILES IN DIRECTORY --- + int listResult = listAllFilesAndDirs(wasb, testFile); + Assertions.assertThat(listResult) + .as("Expected only 4 entries under path: %s", testFile) + .isEqualTo(4); + } + + /** + * Recursively counts all files and directories under the given path. + * + * @param fs The file system to use. + * @param path The starting path. + * @return Total number of files and directories. + * @throws IOException If an error occurs while accessing the file system. + */ + public static int listAllFilesAndDirs(FileSystem fs, Path path) throws IOException { + int count = 0; + RemoteIterator iter = fs.listStatusIterator(path); + + while (iter.hasNext()) { + FileStatus status = iter.next(); + count++; // Count this file or directory + + if (status.isDirectory()) { + count += listAllFilesAndDirs(fs, status.getPath()); // Recurse into directory + } + } + + return count; + } + + /** + * Checks that the given path is a regular file (not a directory or symlink). + * + * @param path The file path. + * @param status The file status. + * @throws AssertionError If the path is a directory or a symlink. + */ + private static void assertIsFile(Path path, FileStatus status) { + Assertions.assertThat(status.isDirectory()) + .as("Expected a regular file, but was a directory: %s %s", path, status) + .isFalse(); + + Assertions.assertThat(status.isSymlink()) + .as("Expected a regular file, but was a symlink: %s %s", path, status) + .isFalse(); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index 848f686f8eb6a..4ab5f68bb3ca5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -24,7 +24,9 @@ import java.net.URISyntaxException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; import java.util.ArrayList; +import java.util.Base64; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -140,14 +142,31 @@ public static void setMockAbfsRestOperationForListOperation( * @throws Exception If an error occurs while setting up the mock operation. */ public static void setMockAbfsRestOperationForFlushOperation( - final AbfsClient spiedClient, String eTag, String blockListXml, FunctionRaisingIOE functionRaisingIOE) + final AbfsClient spiedClient, + String eTag, + String blockListXml, + AbfsOutputStream os, + FunctionRaisingIOE functionRaisingIOE) throws Exception { - List requestHeaders = ITestAbfsClient.getTestRequestHeaders(spiedClient); + List requestHeaders = ITestAbfsClient.getTestRequestHeaders( + spiedClient); + String blobMd5 = null; + MessageDigest blobDigest = os.getFullBlobContentMd5(); + if (blobDigest != null) { + try { + MessageDigest clonedMd5 = (MessageDigest) blobDigest.clone(); + byte[] digest = clonedMd5.digest(); + if (digest != null && digest.length != 0) { + blobMd5 = Base64.getEncoder().encodeToString(digest); + } + } catch (CloneNotSupportedException ignored) { + } + } byte[] buffer = blockListXml.getBytes(StandardCharsets.UTF_8); requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, String.valueOf(buffer.length))); requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, APPLICATION_XML)); requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); - requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, spiedClient.computeMD5Hash(buffer, 0, buffer.length))); + requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); final AbfsUriQueryBuilder abfsUriQueryBuilder = spiedClient.createDefaultUriQueryBuilder(); abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST); abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(false)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 9433fad206613..b20596e310130 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -658,7 +658,7 @@ public void testExpectHundredContinue() throws Exception { AppendRequestParameters appendRequestParameters = new AppendRequestParameters( BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH, - AppendRequestParameters.Mode.APPEND_MODE, false, null, true); + AppendRequestParameters.Mode.APPEND_MODE, false, null, true, null); byte[] buffer = getRandomBytesArray(BUFFER_LENGTH); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java index dff94aae117da..0b7cbb38db76e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java @@ -25,6 +25,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.security.MessageDigest; import java.util.Arrays; import org.assertj.core.api.Assertions; @@ -48,10 +49,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.http.HttpResponse; import static java.net.HttpURLConnection.HTTP_CONFLICT; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.EXPECT; @@ -378,7 +381,7 @@ public void testNoNetworkCallsForFlush() throws Exception { Mockito.verify(blobClient, Mockito.times(0)). flush(Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(), - Mockito.any(TracingContext.class)); + Mockito.any(TracingContext.class), Mockito.anyString()); } private AbfsRestOperationException getMockAbfsRestOperationException(int status) { @@ -424,6 +427,60 @@ public void testNoNetworkCallsForSecondFlush() throws Exception { Mockito.any(TracingContext.class)); Mockito.verify(blobClient, Mockito.times(1)). flush(Mockito.any(byte[].class), Mockito.anyString(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any(), - Mockito.any(TracingContext.class)); + Mockito.any(TracingContext.class), Mockito.anyString()); + } + + /** + * Tests that the message digest is reset when an exception occurs during remote flush. + * Simulates a failure in the flush operation and verifies reset is called on MessageDigest. + */ + @Test + public void testResetCalledOnExceptionInRemoteFlush() throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + Assume.assumeTrue(!getIsNamespaceEnabled(fs)); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + assumeBlobServiceType(); + Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled()); + + // Create a file and spy on AbfsOutputStream + Path path = new Path("/testFile"); + AbfsOutputStream realOs = (AbfsOutputStream) fs.create(path).getWrappedStream(); + AbfsOutputStream os = Mockito.spy(realOs); + AzureIngressHandler ingressHandler = Mockito.spy(os.getIngressHandler()); + Mockito.doReturn(ingressHandler).when(os).getIngressHandler(); + AbfsClient spiedClient = Mockito.spy(ingressHandler.getClient()); + Mockito.doReturn(spiedClient).when(ingressHandler).getClient(); + AzureBlobBlockManager blockManager = Mockito.spy((AzureBlobBlockManager) os.getBlockManager()); + Mockito.doReturn(blockManager).when(ingressHandler).getBlockManager(); + Mockito.doReturn(true).when(blockManager).hasBlocksToCommit(); + Mockito.doReturn("dummy-block-id").when(blockManager).getBlockIdToCommit(); + + MessageDigest mockMessageDigest = Mockito.mock(MessageDigest.class); + Mockito.doReturn(mockMessageDigest).when(os).getFullBlobContentMd5(); + Mockito.doReturn(os).when(ingressHandler).getAbfsOutputStream(); + Mockito.doReturn("dummyMd5").when(ingressHandler).computeFullBlobMd5(); + + // Simulating the exception in client flush call + Mockito.doThrow( + new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "", new Exception())) + .when(spiedClient).flush( + Mockito.any(byte[].class), + Mockito.anyString(), + Mockito.anyBoolean(), + Mockito.nullable(String.class), + Mockito.nullable(String.class), + Mockito.anyString(), + Mockito.nullable(ContextEncryptionAdapter.class), + Mockito.any(TracingContext.class), Mockito.nullable(String.class)); + + // Triggering the flush to simulate exception + try { + ingressHandler.remoteFlush(0, false, false, null, + getTestTracingContext(fs, true)); + } catch (AzureBlobFileSystemException e) { + //expected exception + } + // Verify that reset was called on the message digest + Mockito.verify(mockMessageDigest, Mockito.times(1)).reset(); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java index 11c0e104a6a69..d6572443bb9fc 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsRestOperation.java @@ -194,7 +194,7 @@ private AbfsRestOperation getRestOperation() throws Exception { = new AppendRequestParameters( BUFFER_OFFSET, BUFFER_OFFSET, BUFFER_LENGTH, AppendRequestParameters.Mode.APPEND_MODE, false, null, - expectHeaderEnabled); + expectHeaderEnabled, null); byte[] buffer = getRandomBytesArray(5); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java index 5a2ed528d5d5d..a4eefce0cb876 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; @@ -120,16 +121,16 @@ public void verifyShortWriteRequest() throws Exception { abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.getAbfsConfiguration()).thenReturn(abfsConf); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))) .thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), - isNull(), any(), any(TracingContext.class))).thenReturn(op); + isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op); when(clientHandler.getClient(any())).thenReturn(client); when(clientHandler.getDfsClient()).thenReturn(client); - - AbfsOutputStream out = new AbfsOutputStream( + AbfsOutputStream out = Mockito.spy(new AbfsOutputStream( populateAbfsOutputStreamContext( BUFFER_SIZE, true, @@ -141,7 +142,9 @@ public void verifyShortWriteRequest() throws Exception { new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null), - createExecutorService(abfsConf))); + createExecutorService(abfsConf)))); + when(out.getClient()).thenReturn(client); + when(out.getMd5()).thenReturn(null); final byte[] b = new byte[WRITE_SIZE]; new Random().nextBytes(b); out.write(b); @@ -156,9 +159,9 @@ public void verifyShortWriteRequest() throws Exception { out.hsync(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, WRITE_SIZE, APPEND_MODE, false, null, true); + 0, 0, WRITE_SIZE, APPEND_MODE, false, null, true, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null, true); + WRITE_SIZE, 0, 2 * WRITE_SIZE, APPEND_MODE, false, null, true, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), @@ -184,17 +187,19 @@ public void verifyWriteRequest() throws Exception { conf.set(accountKey1, accountValue1); abfsConf = new AbfsConfiguration(conf, accountName1); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); + when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.getAbfsConfiguration()).thenReturn(abfsConf); TracingContext tracingContext = new TracingContext("test-corr-id", "test-fs-id", FSOperationType.WRITE, TracingHeaderFormat.ALL_ID_FORMAT, null); when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(), any(TracingContext.class))).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op); when(clientHandler.getClient(any())).thenReturn(client); when(clientHandler.getDfsClient()).thenReturn(client); - AbfsOutputStream out = new AbfsOutputStream( + AbfsOutputStream out = Mockito.spy(new AbfsOutputStream( populateAbfsOutputStreamContext( BUFFER_SIZE, true, @@ -204,7 +209,9 @@ public void verifyWriteRequest() throws Exception { clientHandler, PATH, tracingContext, - createExecutorService(abfsConf))); + createExecutorService(abfsConf)))); + when(out.getClient()).thenReturn(client); + when(out.getMd5()).thenReturn(null); final byte[] b = new byte[WRITE_SIZE]; new Random().nextBytes(b); @@ -214,9 +221,9 @@ public void verifyWriteRequest() throws Exception { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null, true); + BUFFER_SIZE, 0, 5*WRITE_SIZE-BUFFER_SIZE, APPEND_MODE, false, null, true, null); verify(client, times(1)).append(eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class)); @@ -233,10 +240,11 @@ public void verifyWriteRequest() throws Exception { ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acMd5 = ArgumentCaptor.forClass(String.class); verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), acFlushSASToken.capture(), isNull(), isNull(), - acTracingContext.capture()); + acTracingContext.capture(), acMd5.capture()); assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); assertThat(Arrays.asList(Long.valueOf(5*WRITE_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); @@ -257,6 +265,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { final Configuration conf = new Configuration(); conf.set(accountKey1, accountValue1); abfsConf = new AbfsConfiguration(conf, accountName1); + when(client.getAbfsConfiguration()).thenReturn(abfsConf); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); TracingContext tracingContext = new TracingContext( abfsConf.getClientCorrelationId(), "test-fs-id", @@ -264,13 +273,14 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))).thenReturn(op); - when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(), any(TracingContext.class))).thenReturn(op); + when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); when(clientHandler.getClient(any())).thenReturn(client); when(clientHandler.getDfsClient()).thenReturn(client); - AbfsOutputStream out = new AbfsOutputStream( + + AbfsOutputStream out = Mockito.spy(Mockito.spy(new AbfsOutputStream( populateAbfsOutputStreamContext( BUFFER_SIZE, true, @@ -280,7 +290,9 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { clientHandler, PATH, tracingContext, - createExecutorService(abfsConf))); + createExecutorService(abfsConf))))); + when(out.getClient()).thenReturn(client); + when(out.getMd5()).thenReturn(null); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -290,9 +302,9 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { out.close(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class)); @@ -309,10 +321,11 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception { ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acMd5 = ArgumentCaptor.forClass(String.class); verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), acFlushSASToken.capture(), isNull(), isNull(), - acTracingContext.capture()); + acTracingContext.capture(), acMd5.capture()); assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); @@ -333,6 +346,7 @@ public void verifyWriteRequestOfBufferSize() throws Exception { final Configuration conf = new Configuration(); conf.set(accountKey1, accountValue1); abfsConf = new AbfsConfiguration(conf, accountName1); + when(client.getAbfsConfiguration()).thenReturn(abfsConf); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); @@ -340,13 +354,13 @@ public void verifyWriteRequestOfBufferSize() throws Exception { any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))) .thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), - any(), isNull(), any(), any(TracingContext.class))).thenReturn(op); + any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op); when(op.getSasToken()).thenReturn("testToken"); when(op.getResult()).thenReturn(httpOp); when(clientHandler.getClient(any())).thenReturn(client); when(clientHandler.getDfsClient()).thenReturn(client); - AbfsOutputStream out = new AbfsOutputStream( + AbfsOutputStream out = Mockito.spy(new AbfsOutputStream( populateAbfsOutputStreamContext( BUFFER_SIZE, true, @@ -358,7 +372,9 @@ public void verifyWriteRequestOfBufferSize() throws Exception { new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null), - createExecutorService(abfsConf))); + createExecutorService(abfsConf)))); + when(out.getClient()).thenReturn(client); + when(out.getMd5()).thenReturn(null); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -368,9 +384,9 @@ public void verifyWriteRequestOfBufferSize() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class)); @@ -394,6 +410,7 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { final Configuration conf = new Configuration(); conf.set(accountKey1, accountValue1); abfsConf = new AbfsConfiguration(conf, accountName1); + when(client.getAbfsConfiguration()).thenReturn(abfsConf); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); @@ -401,10 +418,10 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))) .thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), - isNull(), any(), any(TracingContext.class))).thenReturn(op); + isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op); when(clientHandler.getClient(any())).thenReturn(client); when(clientHandler.getDfsClient()).thenReturn(client); - AbfsOutputStream out = new AbfsOutputStream( + AbfsOutputStream out = Mockito.spy(new AbfsOutputStream( populateAbfsOutputStreamContext( BUFFER_SIZE, true, @@ -416,7 +433,12 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), null), - createExecutorService(abfsConf))); + createExecutorService(abfsConf)))); + AzureIngressHandler ingressHandler = Mockito.spy(out.getIngressHandler()); + Mockito.doReturn(ingressHandler).when(out).getIngressHandler(); + Mockito.doReturn(out).when(ingressHandler).getAbfsOutputStream(); + when(out.getClient()).thenReturn(client); + when(out.getMd5()).thenReturn(null); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -426,9 +448,9 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, true, null, true); + 0, 0, BUFFER_SIZE, APPEND_MODE, true, null, true, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null, true); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, true, null, true, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class)); @@ -453,6 +475,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { final Configuration conf = new Configuration(); conf.set(accountKey1, accountValue1); abfsConf = new AbfsConfiguration(conf, accountName1); + when(client.getAbfsConfiguration()).thenReturn(abfsConf); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); TracingContext tracingContext = new TracingContext( abfsConf.getClientCorrelationId(), "test-fs-id", @@ -463,10 +486,10 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))) .thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), - isNull(), any(), any(TracingContext.class))).thenReturn(op); + isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op); when(clientHandler.getClient(any())).thenReturn(client); when(clientHandler.getDfsClient()).thenReturn(client); - AbfsOutputStream out = new AbfsOutputStream( + AbfsOutputStream out = Mockito.spy(new AbfsOutputStream( populateAbfsOutputStreamContext( BUFFER_SIZE, true, @@ -478,7 +501,9 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", FSOperationType.OPEN, abfsConf.getTracingHeaderFormat(), null), - createExecutorService(abfsConf))); + createExecutorService(abfsConf)))); + when(out.getClient()).thenReturn(client); + when(out.getMd5()).thenReturn(null); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -488,9 +513,9 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { out.hflush(); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class)); @@ -507,9 +532,10 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception { ArgumentCaptor acFlushRetainUnCommittedData = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushClose = ArgumentCaptor.forClass(Boolean.class); ArgumentCaptor acFlushSASToken = ArgumentCaptor.forClass(String.class); + ArgumentCaptor acFlushMd5 = ArgumentCaptor.forClass(String.class); verify(client, times(1)).flush(acFlushPath.capture(), acFlushPosition.capture(), acFlushRetainUnCommittedData.capture(), acFlushClose.capture(), - acFlushSASToken.capture(), isNull(), isNull(), acTracingContext.capture()); + acFlushSASToken.capture(), isNull(), isNull(), acTracingContext.capture(), acFlushMd5.capture()); assertThat(Arrays.asList(PATH)).describedAs("path").isEqualTo(acFlushPath.getAllValues()); assertThat(Arrays.asList(Long.valueOf(2*BUFFER_SIZE))).describedAs("position").isEqualTo(acFlushPosition.getAllValues()); assertThat(Arrays.asList(false)).describedAs("RetainUnCommittedData flag").isEqualTo(acFlushRetainUnCommittedData.getAllValues()); @@ -529,16 +555,17 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { final Configuration conf = new Configuration(); conf.set(accountKey1, accountValue1); abfsConf = new AbfsConfiguration(conf, accountName1); + when(client.getAbfsConfiguration()).thenReturn(abfsConf); AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf); when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))) .thenReturn(op); when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), - isNull(), any(), any(TracingContext.class))).thenReturn(op); + isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op); when(clientHandler.getClient(any())).thenReturn(client); when(clientHandler.getDfsClient()).thenReturn(client); - AbfsOutputStream out = new AbfsOutputStream( + AbfsOutputStream out = Mockito.spy(new AbfsOutputStream( populateAbfsOutputStreamContext( BUFFER_SIZE, true, @@ -550,7 +577,9 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id", FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null), - createExecutorService(abfsConf))); + createExecutorService(abfsConf)))); + when(out.getClient()).thenReturn(client); + when(out.getMd5()).thenReturn(null); final byte[] b = new byte[BUFFER_SIZE]; new Random().nextBytes(b); @@ -562,9 +591,9 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception { Thread.sleep(1000); AppendRequestParameters firstReqParameters = new AppendRequestParameters( - 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + 0, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); AppendRequestParameters secondReqParameters = new AppendRequestParameters( - BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true); + BUFFER_SIZE, 0, BUFFER_SIZE, APPEND_MODE, false, null, true, null); verify(client, times(1)).append( eq(PATH), any(byte[].class), refEq(firstReqParameters), any(), any(), any(TracingContext.class));