-
Notifications
You must be signed in to change notification settings - Fork 751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GOBBLIN-2159] Adding support for partition level copy in Iceberg distcp #4058
[GOBBLIN-2159] Adding support for partition level copy in Iceberg distcp #4058
Conversation
b4f6369
to
d8356e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a great start! mostly suggestions to leverage a bit more of the existing classes (rather than creating near clones) and also to simplify some interfaces (esp. for the partition filter predicates) to take in specific params, rather than Properties
. given the latter may hold just about anything, the API "contract" they define is weaker than we'd want.
...main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
Outdated
Show resolved
Hide resolved
...a-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
Show resolved
Hide resolved
...a-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
Outdated
Show resolved
Hide resolved
...a-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
Outdated
Show resolved
Hide resolved
...t/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
Outdated
Show resolved
Hide resolved
...t/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
Outdated
Show resolved
Hide resolved
CopyableFile fileEntity = CopyableFile.fromOriginAndDestination( | ||
actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), copyConfig) | ||
.fileSet(fileSet) | ||
.datasetOutputPath(targetFs.getUri().getPath()) | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you skip first doing this, like in IcebergDataset
:
// preserving ancestor permissions till root path's child between src and dest
List<OwnerAndPermission> ancestorOwnerAndPermissionList =
CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(actualSourceFs,
srcPath.getParent(), greatestAncestorPath, copyConfig);
is that intentional? do you feel it's not necessary or actually contra-indicated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the IcebergDataset the path of tables are exactly since table UUID are same on source and destination here it can be different, so copying permissions atleast in first draft is not necessary I believe.
Even if there is need that we need to make sure ancestor path, parent path are ones we want, that's why I have removed it for now.
// Adding this check to avoid adding post publish step when there are no files to copy. | ||
if (CollectionUtils.isNotEmpty(destDataFiles)) { | ||
copyEntities.add(createPostPublishStep(destDataFiles)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this is one difference with IcebergDataset::generateCopyEntities
, which always wants to add its post-publish step. (but it shouldn't be hard to refactor to isolate this difference)
* @throws IOException if an I/O error occurs | ||
*/ | ||
@Override | ||
Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this impl is really, really similar to the one it's based on in its base class. deriving from a class and then overriding methods w/ only small changes is pretty nearly cut-and-paste code. sometimes it's inevitable, but let's avoid when we can. in this case, could we NOT override this method, but only GetFilePathsToFileStatusResult getFilePathsToFileStatus(...)
so this derived class's version runs the new code instead:
IcebergTable srcIcebergTable = getSrcIcebergTable();
List<DataFile> srcDataFiles = srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
List<DataFile> destDataFiles = getDestDataFiles(srcDataFiles);
Configuration defaultHadoopConfiguration = new Configuration();
for (FilePathsWithStatus filePathsWithStatus : getFilePathsStatus(srcDataFiles, destDataFiles, this.sourceFs)) {
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will list down my reason here -
- In IcebergDataset implementation it is assumed that srcPath and destPath are same which is not the case here, if you see the code we are using srcPath, srcFileStatus but here those needs to be changed to destPath & srcFileStatus for readability and maintaining the code as well.
- Currently I have added just ReplacePartitionStep as post publish step but IcebergRegisterStep too needs to be added based on Schema Validation scenario which I will be raising as different PR because that needs a proper validation so that we are not corrupting datafiles on dest table.
- I am not fully convinced on copying Ancestor Permission, whether it is even required or not, although I did tried making it work by changing ancestor path parent path but wasn't working so removing it is a must for now.
- If i will try to just override GetFilePathsToFileStatusResult getFilePathsToFileStatus(...) then we need to override Data class GetFilePathsToFileStatusResult too as we need datafiles too along with destPath srcFileStatus.
To conclude it -
reader should understand whether it is actually srcPath or destPath while creating copyable file
need of adding replacepartition commit step along with registerstep (based on condition)
and to remove copying permission for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looking good. part 1 of 2 done on this re-review... will return
...main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
Outdated
Show resolved
Hide resolved
...a-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
Outdated
Show resolved
Hide resolved
...a-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
Outdated
Show resolved
Hide resolved
...che/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
Show resolved
Hide resolved
...ain/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
Show resolved
Hide resolved
...t/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
Show resolved
Hide resolved
...t/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
Outdated
Show resolved
Hide resolved
...a-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
Outdated
Show resolved
Hide resolved
} catch (IOException e) { | ||
log.warn("Failed to read manifest file: {} " , manifestFile.path(), e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg is atomic/transactional, so I really don't agree w/ swallowing exceptions and still proceeding onward when the table is corrupted. that has the potential for us to lay even more corruption on top of that...
please explain if you see a genuine argument for ignoring errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah completely agree with your suggestion, somehow i missed it let me correct it by failing the copy with proper logging
* @return the index of the partition column if found, otherwise -1 | ||
* @throws IllegalArgumentException if the partition transform is not supported | ||
*/ | ||
public static int getPartitionColumnIndex( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this single static
seems closely related enough to IcebergMatchesAnyPropNamePartitionFilterPredicate
that it could reasonably live there as a public static
(eliminating the need for an additional separate class).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it looks like that but in future we will need more filter predicates and every filter will need the partition column index, so i believe keeping it separate for now should be fine , maybe we can put this in factory class itself or convert this class to factory class
...che/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
Outdated
Show resolved
Hide resolved
...che/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks close
...ain/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
Outdated
Show resolved
Hide resolved
...t/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
Show resolved
Hide resolved
...c/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
Outdated
Show resolved
Hide resolved
Assert.assertEquals(copyEntities.size(), 2); | ||
verifyCopyEntities(copyEntities, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the path of every copy entity needs validation. pass that in the way we did in IcebergDatasetTest::verifyCopyEntities(Collection<CopyEntity> copyEntities, List<String> expected)
once you do, copyEntities.size()
validation can and should be encapsulated within "verify"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am doing that but just in a different way since we are adding UUID at runtime so cant have expected path beforehand - please have a look at function -
private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, int expectedCopyEntitiesSize,
boolean sameSrcAndDestWriteLocation) {
Assert.assertEquals(copyEntities.size(), expectedCopyEntitiesSize);
String srcWriteLocationStart = SRC_FS_URI + SRC_WRITE_LOCATION;
String destWriteLocationStart = DEST_FS_URI + (sameSrcAndDestWriteLocation ? SRC_WRITE_LOCATION : DEST_WRITE_LOCATION);
String srcErrorMsg = String.format("Source Location should start with %s", srcWriteLocationStart);
String destErrorMsg = String.format("Destination Location should start with %s", destWriteLocationStart);
for (CopyEntity copyEntity : copyEntities) {
String json = copyEntity.toString();
if (IcebergDatasetTest.isCopyableFile(json)) {
String originFilepath = IcebergDatasetTest.CopyEntityDeserializer.getOriginFilePathAsStringFromJson(json);
String destFilepath = IcebergDatasetTest.CopyEntityDeserializer.getDestinationFilePathAsStringFromJson(json);
Assert.assertTrue(originFilepath.startsWith(srcWriteLocationStart), srcErrorMsg);
Assert.assertTrue(destFilepath.startsWith(destWriteLocationStart), destErrorMsg);
String originFileName = originFilepath.substring(srcWriteLocationStart.length() + 1);
String destFileName = destFilepath.substring(destWriteLocationStart.length() + 1);
Assert.assertTrue(destFileName.endsWith(originFileName), "Incorrect file name in destination path");
Assert.assertTrue(destFileName.length() > originFileName.length() + 1,
"Destination file name should be longer than source file name as UUID is appended");
} else{
IcebergDatasetTest.verifyPostPublishStep(json, OVERWRITE_COMMIT_STEP);
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand what you're doing w/ the UUID - which is a good thing to validate - but the difference in your verifyCopyEntities
method def is that it blindly verifies the length of the list without knowing specifically which paths should be there (i.e. it's missing a List<String> expected
parameter).
why not take List<String> expectedSrcFilePaths
to verify against each copyable file's getOriginFilePathAsStringFromJson
? then in addition continue to validate the relationship between each origin file path and its getDestinationFilePathAsStringFromJson
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -67,4 +71,6 @@ protected String getDatasetDescriptorPlatform() { | |||
} | |||
|
|||
protected abstract TableOperations createTableOperations(TableIdentifier tableId); | |||
|
|||
protected abstract Table loadTableInstance(TableIdentifier tableId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for good measure you could also make IcebergTable.TableNotFoundException
a declared/checked exception here.
I'm tempted to re-situate the exception as IcebergCatalog.TableNotFoundException
, but I don't want two classes w/ the same semantics - and renaming public interfaces is probably too late... so I'll make peace with the current name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed not throwing here instead catching NoSuchTableException in BaseIcebergCatalog::openTable and throwing IcebergTable.TableNotFoundException from there.
...c/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
Outdated
Show resolved
Hide resolved
...nagement/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
Outdated
Show resolved
Hide resolved
...nagement/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
Show resolved
Hide resolved
...nagement/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
Outdated
Show resolved
Hide resolved
...nagement/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
Outdated
Show resolved
Hide resolved
...gement/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
Outdated
Show resolved
Hide resolved
} catch (IOException e) { | ||
String errMsg = String.format("~%s~ Failed to get file status for path : %s", this.getFileSetId(), srcPath); | ||
log.error(errMsg); | ||
throw new RuntimeException(errMsg, e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really wish java.util.function.*
played along better w/ checked exceptions... but that's clearly not the case... *sigh*
throwing IOException
is actually a key part of the FileSet
"contract", so substituting an unchecked RuntimeException
(that no caller expects and would NOT be looking out for) is not something we ought to do at this late stage.
instead, either write this iteratively (using for
-each loop) or follow IcebergDataset
's use of CheckedExceptionFunction.wrapToTunneled
try {
...
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
wrapper.rethrowWrapped();
}
the code there actually uses:
copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()
for caching, which shouldn't be necessary here, given IcebergTable::getPartitionSpecificDataFiles
examines only a single snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done using IcebergDataset's use of CheckedExceptionFunction.wrapToTunneled
...gement/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
Outdated
Show resolved
Hide resolved
...nagement/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
Outdated
Show resolved
Hide resolved
for (ManifestFile manifestFile : dataManifestFiles) { | ||
if (growthMilestoneTracker.isAnotherMilestone(knownDataFiles.size())) { | ||
log.info("~{}~ for snapshot '{}' - before manifest-file '{}' '{}' total known iceberg datafiles", tableId, | ||
currentSnapshotId, | ||
manifestFile.path(), | ||
knownDataFiles.size() | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this makes more sense here, given the synchronous reading of every manifest files happens within this method, rather than in the style of the Iterator<IcebergSnapshotInfo>
returned by IcebergTable::getIncrementalSnapshotInfosIterator
.
that said, I doubt we should still log tracked growth as this very same list is later transformed in IcebergPartitionDataset::calcDestDataFileBySrcPath
. all the network calls are in this method, rather than over there, so the in-process transformation into CopyEntities should be quite fast. maybe just log once at the end of calcDestDataFileBySrcPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, seems a valid approach let me remove growthMileStonetracker from that function
@@ -224,23 +230,23 @@ private static void setupDestFileSystem() throws IOException { | |||
Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new FileNotFoundException()); | |||
} | |||
|
|||
private static List<DataFile> createDataFileMocks() throws IOException { | |||
List<DataFile> dataFiles = new ArrayList<>(); | |||
private static Map<String, DataFile> createDataFileMocksBySrcPath(List<String> srcFilePaths) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like how returning this Map
allows you to be so succinct at every point of use:
Map<String, DataFile> mockDataFilesBySrcPath = createDataFileMocksBySrcPath(srcFilePaths);
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).
thenReturn(new ArrayList<>(mockDataFilesBySrcPath.values()));
... // (above just a `.values()` and simply a `.keySet()` below)
verifyCopyEntities(copyEntities, new ArrayList<>(mockDataFilesBySrcPath.keySet()), false);
nice work!
copyEntities.add(createOverwritePostPublishStep(destDataFiles)); | ||
} | ||
|
||
log.info("~{}~ generated {} copy--entities", fileSet, copyEntities.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the two dashes between copy--entities
seems like a typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excellent work here!
Dear Gobblin maintainers,
Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
JIRA
Description
Tests
- testGetPartitionSpecificDataFiles()
- testReplacePartitions()
Commits