-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19604. ABFS: BlockId generation based on blockCount along with full blob md5 computation change #7777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
59a4ea3
a8e0488
92c975c
a161e06
538a2cd
2bd7ce0
6f24907
6b138c0
8292192
f15c5e6
7f8c465
c42ac75
d0f2aea
07c396c
5e6298a
d951cc0
9ae0198
5f48139
ef1db63
4b4b7a4
513511a
dbb743f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,10 +20,15 @@ | |
|
||
import java.io.IOException; | ||
import java.nio.charset.StandardCharsets; | ||
import java.util.UUID; | ||
|
||
import org.apache.commons.codec.binary.Base64; | ||
|
||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_FORMAT; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.PADDING_CHARACTER; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.PADDING_FORMAT; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SPACE_CHARACTER; | ||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STRING_SUFFIX; | ||
|
||
/** | ||
* Represents a block in Azure Blob Storage used by Azure Data Lake Storage (ADLS). | ||
|
@@ -34,31 +39,50 @@ | |
public class AbfsBlobBlock extends AbfsBlock { | ||
|
||
private final String blockId; | ||
private final long blockIndex; | ||
|
||
/** | ||
* Gets the activeBlock and the blockId. | ||
* | ||
* @param outputStream AbfsOutputStream Instance. | ||
* @param offset Used to generate blockId based on offset. | ||
* @param blockIdLength the expected length of the generated block ID. | ||
* @param blockIndex the index of the block; used in block ID generation. | ||
* @throws IOException exception is thrown. | ||
*/ | ||
AbfsBlobBlock(AbfsOutputStream outputStream, long offset) throws IOException { | ||
AbfsBlobBlock(AbfsOutputStream outputStream, long offset, int blockIdLength, long blockIndex) throws IOException { | ||
super(outputStream, offset); | ||
this.blockId = generateBlockId(offset); | ||
this.blockIndex = blockIndex; | ||
String streamId = outputStream.getStreamID(); | ||
UUID streamIdGuid = UUID.nameUUIDFromBytes(streamId.getBytes(StandardCharsets.UTF_8)); | ||
anujmodi2021 marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can streamId be null? streamId.getBytes can raise null pointer exception. Better to handle it, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. StreamId can never be null as this is set in constructor of AbfsOutputStream itself, this.outputStreamId = createOutputStreamId(); |
||
this.blockId = generateBlockId(streamIdGuid, blockIdLength); | ||
} | ||
|
||
/** | ||
* Helper method that generates blockId. | ||
* @param position The offset needed to generate blockId. | ||
* @return String representing the block ID generated. | ||
* Generates a Base64-encoded block ID string using the given stream UUID and block index. | ||
* The block ID is first created as a raw string using a format with the stream ID and block index. | ||
* If a non-zero rawLength is provided, the raw block ID is padded or trimmed to match the length. | ||
* The final string is then Base64-encoded and returned. | ||
* | ||
* @param streamId the UUID of the stream used to generate the block ID. | ||
* @param rawLength the desired length of the raw block ID string before encoding. | ||
* If 0, no length adjustment is done. | ||
* @return the Base64-encoded block ID string. | ||
*/ | ||
private String generateBlockId(long position) { | ||
String streamId = getOutputStream().getStreamID(); | ||
String streamIdHash = Integer.toString(streamId.hashCode()); | ||
String blockId = String.format("%d_%s", position, streamIdHash); | ||
byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH]; | ||
System.arraycopy(blockId.getBytes(StandardCharsets.UTF_8), 0, blockIdByteArray, 0, Math.min(BLOCK_ID_LENGTH, blockId.length())); | ||
return new String(Base64.encodeBase64(blockIdByteArray), StandardCharsets.UTF_8); | ||
private String generateBlockId(UUID streamId, int rawLength) { | ||
String rawBlockId = String.format(BLOCK_ID_FORMAT, streamId, blockIndex); | ||
|
||
if (rawLength != 0) { | ||
// Adjust to match expected decoded length | ||
if (rawBlockId.length() < rawLength) { | ||
rawBlockId = String.format(PADDING_FORMAT + rawLength + STRING_SUFFIX, rawBlockId) | ||
.replace(SPACE_CHARACTER, PADDING_CHARACTER); | ||
} else if (rawBlockId.length() > rawLength) { | ||
rawBlockId = rawBlockId.substring(0, rawLength); | ||
} | ||
} | ||
|
||
return Base64.encodeBase64String(rawBlockId.getBytes(StandardCharsets.UTF_8)); | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -129,6 +129,7 @@ | |
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_BLOCK_NAME; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_COMMITTED_BLOCKS; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_HDI_ISFOLDER; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_HDI_PERMISSION; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_TAG_NAME; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XML_VERSION; | ||
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.XMS_PROPERTIES_ENCODING_ASCII; | ||
|
@@ -898,7 +899,7 @@ public AbfsRestOperation append(final String path, | |
requestHeaders.add(new AbfsHttpHeader(EXPECT, HUNDRED_CONTINUE)); | ||
} | ||
if (isChecksumValidationEnabled()) { | ||
addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer); | ||
addCheckSumHeaderForWrite(requestHeaders, reqParams); | ||
} | ||
if (reqParams.isRetryDueToExpect()) { | ||
String userAgentRetry = getUserAgent(); | ||
|
@@ -982,6 +983,9 @@ public AbfsRestOperation appendBlock(final String path, | |
if (requestParameters.getLeaseId() != null) { | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, requestParameters.getLeaseId())); | ||
} | ||
if (isChecksumValidationEnabled()) { | ||
addCheckSumHeaderForWrite(requestHeaders, requestParameters); | ||
} | ||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, APPEND_BLOCK); | ||
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder); | ||
|
@@ -1021,6 +1025,7 @@ public AbfsRestOperation appendBlock(final String path, | |
* @param leaseId if there is an active lease on the path. | ||
* @param contextEncryptionAdapter to provide encryption context. | ||
* @param tracingContext for tracing the server calls. | ||
* @param blobMd5 the MD5 hash of the blob for integrity verification. | ||
* @return exception as this operation is not supported on Blob Endpoint. | ||
* @throws UnsupportedOperationException always. | ||
*/ | ||
|
@@ -1032,7 +1037,7 @@ public AbfsRestOperation flush(final String path, | |
final String cachedSasToken, | ||
final String leaseId, | ||
final ContextEncryptionAdapter contextEncryptionAdapter, | ||
final TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add new argument in the comments @param. Please make this change wherever required. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. taken |
||
throw new UnsupportedOperationException( | ||
"Flush without blockIds not supported on Blob Endpoint"); | ||
} | ||
|
@@ -1049,6 +1054,7 @@ public AbfsRestOperation flush(final String path, | |
* @param eTag The etag of the blob. | ||
* @param contextEncryptionAdapter to provide encryption context. | ||
* @param tracingContext for tracing the service call. | ||
* @param blobMd5 the MD5 hash of the blob for integrity verification. | ||
* @return executed rest operation containing response from server. | ||
* @throws AzureBlobFileSystemException if rest operation fails. | ||
*/ | ||
|
@@ -1060,7 +1066,7 @@ public AbfsRestOperation flush(byte[] buffer, | |
final String leaseId, | ||
final String eTag, | ||
ContextEncryptionAdapter contextEncryptionAdapter, | ||
final TracingContext tracingContext) throws AzureBlobFileSystemException { | ||
final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. taken |
||
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders(); | ||
addEncryptionKeyRequestHeaders(path, requestHeaders, false, | ||
contextEncryptionAdapter, tracingContext); | ||
|
@@ -1070,9 +1076,9 @@ public AbfsRestOperation flush(byte[] buffer, | |
if (leaseId != null) { | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, leaseId)); | ||
} | ||
String md5Hash = computeMD5Hash(buffer, 0, buffer.length); | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, md5Hash)); | ||
|
||
if (blobMd5 != null) { | ||
requestHeaders.add(new AbfsHttpHeader(X_MS_BLOB_CONTENT_MD5, blobMd5)); | ||
} | ||
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST); | ||
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose)); | ||
|
@@ -1097,7 +1103,7 @@ public AbfsRestOperation flush(byte[] buffer, | |
AbfsRestOperation op1 = getPathStatus(path, true, tracingContext, | ||
contextEncryptionAdapter); | ||
String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5); | ||
if (!md5Hash.equals(metadataMd5)) { | ||
if (blobMd5 != null && !blobMd5.equals(metadataMd5)) { | ||
throw ex; | ||
} | ||
return op; | ||
|
@@ -1914,7 +1920,11 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri | |
// AzureBlobFileSystem supports only ASCII Characters in property values. | ||
if (isPureASCII(value)) { | ||
try { | ||
value = encodeMetadataAttribute(value); | ||
// URL encoding this JSON metadata, set by the WASB Client during file creation, causes compatibility issues. | ||
anujmodi2021 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Therefore, we need to avoid encoding this metadata. | ||
if (!XML_TAG_HDI_PERMISSION.equalsIgnoreCase(entry.getKey())) { | ||
anujmodi2021 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
value = encodeMetadataAttribute(value); | ||
} | ||
} catch (UnsupportedEncodingException e) { | ||
throw new InvalidAbfsRestOperationException(e); | ||
} | ||
|
@@ -2057,7 +2067,7 @@ public static String generateBlockListXml(String blockIdString) { | |
|
||
// Split the block ID string by commas and generate XML for each block ID | ||
if (!blockIdString.isEmpty()) { | ||
String[] blockIds = blockIdString.split(","); | ||
String[] blockIds = blockIdString.split(COMMA); | ||
for (String blockId : blockIds) { | ||
stringBuilder.append(String.format(LATEST_BLOCK_FORMAT, blockId)); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -879,27 +879,31 @@ public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path, | |
* @param leaseId if there is an active lease on the path. | ||
* @param contextEncryptionAdapter to provide encryption context. | ||
* @param tracingContext for tracing the server calls. | ||
* @param blobMd5 The Base64-encoded MD5 hash of the blob for data integrity validation. | ||
* @return executed rest operation containing response from server. | ||
* @throws AzureBlobFileSystemException if rest operation fails. | ||
*/ | ||
public abstract AbfsRestOperation flush(String path, long position, | ||
boolean retainUncommittedData, boolean isClose, | ||
String cachedSasToken, String leaseId, | ||
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext) | ||
ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext, String blobMd5) | ||
throws AzureBlobFileSystemException; | ||
|
||
/** | ||
* Flush previously uploaded data to a file. | ||
* @param buffer containing blockIds to be flushed. | ||
* @param path on which data has to be flushed. | ||
* @param isClose specify if this is the last flush to the file. | ||
* @param cachedSasToken to be used for the authenticating operation. | ||
* @param leaseId if there is an active lease on the path. | ||
* @param eTag to specify conditional headers. | ||
* @param contextEncryptionAdapter to provide encryption context. | ||
* @param tracingContext for tracing the server calls. | ||
* @return executed rest operation containing response from server. | ||
* @throws AzureBlobFileSystemException if rest operation fails. | ||
* Flushes previously uploaded data to the specified path. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT - Format can be consistent across places. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Taken |
||
* | ||
* @param buffer The buffer containing block IDs to be flushed. | ||
* @param path The file path to which data should be flushed. | ||
* @param isClose True if this is the final flush (i.e., the file is being closed). | ||
* @param cachedSasToken SAS token used for authentication (if applicable). | ||
* @param leaseId Lease ID, if a lease is active on the file. | ||
* @param eTag ETag used for conditional request headers (e.g., If-Match). | ||
* @param contextEncryptionAdapter Adapter to provide encryption context, if encryption is enabled. | ||
* @param tracingContext Context for tracing the server calls. | ||
* @param blobMd5 The Base64-encoded MD5 hash of the blob for data integrity validation. | ||
* @return The executed {@link AbfsRestOperation} containing the server response. | ||
* | ||
* @throws AzureBlobFileSystemException if the flush operation fails. | ||
*/ | ||
public abstract AbfsRestOperation flush(byte[] buffer, | ||
String path, | ||
|
@@ -908,7 +912,7 @@ public abstract AbfsRestOperation flush(byte[] buffer, | |
String leaseId, | ||
String eTag, | ||
ContextEncryptionAdapter contextEncryptionAdapter, | ||
TracingContext tracingContext) throws AzureBlobFileSystemException; | ||
TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException; | ||
|
||
/** | ||
* Set the properties of a file or directory. | ||
|
@@ -1352,17 +1356,15 @@ private void appendIfNotEmpty(StringBuilder sb, String regEx, | |
|
||
/** | ||
* Add MD5 hash as request header to the append request. | ||
* | ||
* @param requestHeaders to be updated with checksum header | ||
* @param reqParams for getting offset and length | ||
* @param buffer for getting input data for MD5 computation | ||
* @throws AbfsRestOperationException if Md5 computation fails | ||
*/ | ||
protected void addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders, | ||
final AppendRequestParameters reqParams, final byte[] buffer) | ||
throws AbfsRestOperationException { | ||
String md5Hash = computeMD5Hash(buffer, reqParams.getoffset(), | ||
reqParams.getLength()); | ||
requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, md5Hash)); | ||
final AppendRequestParameters reqParams) { | ||
if (reqParams.getMd5() != null) { | ||
requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, reqParams.getMd5())); | ||
} | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add newly added parameter in method comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken