feat(native): Implement read support for Iceberg row lineage#27743
Open
Joe-Abraham wants to merge 2 commits into
Open
feat(native): Implement read support for Iceberg row lineage#27743Joe-Abraham wants to merge 2 commits into
Joe-Abraham wants to merge 2 commits into
Conversation
Contributor
Reviewer's GuideImplements end-to-end native read support for Iceberg v3 row lineage metadata columns Sequence diagram for native Iceberg row lineage read pathsequenceDiagram
participant PrestoEngine
participant IcebergPrestoToVeloxConnector
participant VeloxReader
PrestoEngine->>IcebergPrestoToVeloxConnector: toIcebergTableHandle
IcebergPrestoToVeloxConnector->>IcebergPrestoToVeloxConnector: fieldNamesToLowerCase
IcebergPrestoToVeloxConnector->>IcebergPrestoToVeloxConnector: build finalDataColumns
alt [columnHandles include _row_id or _last_updated_sequence_number]
IcebergPrestoToVeloxConnector->>IcebergPrestoToVeloxConnector: add _row_id
IcebergPrestoToVeloxConnector->>IcebergPrestoToVeloxConnector: add _last_updated_sequence_number
end
IcebergPrestoToVeloxConnector-->>PrestoEngine: ConnectorTableHandle
PrestoEngine->>IcebergPrestoToVeloxConnector: toVeloxSplit
IcebergPrestoToVeloxConnector->>IcebergPrestoToVeloxConnector: build infoColumns
alt [icebergSplit->firstRowId >= 0]
IcebergPrestoToVeloxConnector->>IcebergPrestoToVeloxConnector: set kDataSequenceNumberInfoColumn
IcebergPrestoToVeloxConnector->>IcebergPrestoToVeloxConnector: set kFirstRowIdInfoColumn
else [V1/V2 or missing row lineage]
IcebergPrestoToVeloxConnector->>IcebergPrestoToVeloxConnector: omit row lineage infoColumns
end
IcebergPrestoToVeloxConnector-->>PrestoEngine: HiveIcebergSplit
PrestoEngine->>VeloxReader: HiveIcebergSplit
VeloxReader-->>PrestoEngine: _row_id, _last_updated_sequence_number
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
6791f23 to
1240978
Compare
2fe0691 to
fe35fd2
Compare
fe35fd2 to
1b960c1
Compare
27 tasks
f0c9da2 to
996b9c2
Compare
f6035a7 to
b1f349f
Compare
Contributor
Author
|
@sourcery-ai review |
Contributor
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In
toIcebergTableHandle, the iteration overcolumnHandleswithstd::findon thenamesvector is O(n^2); consider tracking names in astd::unordered_set(or similar) to avoid repeated linear scans when more columns are involved. - The test class
TestPrestoNativeIcebergRowLineagerepeats logic to buildexpectedPairsfromFileScanTask/DataFilein multiple tests; consider extracting this into a helper method to reduce duplication and keep the assertions focused on behavior differences.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `toIcebergTableHandle`, the iteration over `columnHandles` with `std::find` on the `names` vector is O(n^2); consider tracking names in a `std::unordered_set` (or similar) to avoid repeated linear scans when more columns are involved.
- The test class `TestPrestoNativeIcebergRowLineage` repeats logic to build `expectedPairs` from `FileScanTask`/`DataFile` in multiple tests; consider extracting this into a helper method to reduce duplication and keep the assertions focused on behavior differences.
## Individual Comments
### Comment 1
<location path="presto-native-execution/pom.xml" line_range="249-251" />
<code_context>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>${dep.iceberg.version}</version>
+ <scope>test</scope>
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider excluding `iceberg-bundled-guava` from `iceberg-core` as well to avoid guava conflicts
`iceberg-core` also brings in `iceberg-bundled-guava`, so leaving it unexcluded can cause classpath conflicts with the rest of the stack. Please apply the same exclusion here so tests use the shared Guava version instead of the shaded one.
</issue_to_address>
### Comment 2
<location path="presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeIcebergRowLineage.java" line_range="248-257" />
<code_context>
+ // Per the Iceberg spec, _row_id is null for pre-V3 snapshots (first_row_id unset),
</code_context>
<issue_to_address>
**suggestion (testing):** Strengthen pre-upgrade V2 assertions by checking `_last_updated_sequence_number` against Iceberg metadata
In `testV2TableRowLineageNullThenBackfilledAfterV3Upgrade`, the pre-upgrade checks cover `_row_id` being NULL and `_last_updated_sequence_number` being non-null, but they don’t validate that these sequence values match the Iceberg `dataSequenceNumber` per data file. To make this a more robust end-to-end test, please also capture the pre-upgrade `dataSequenceNumber` for each file and assert that the values returned by Presto for `_last_updated_sequence_number` align with those, ensuring the native engine correctly inherits sequence numbers from split metadata when `firstRowId` is unset.
Suggested implementation:
```java
// Per the Iceberg spec, _row_id is null for pre-V3 snapshots (first_row_id unset),
// but _last_updated_sequence_number is always inherited from the data sequence number.
assertQuery("SELECT \"_row_id\", \"_last_updated_sequence_number\", * FROM " + tableName);
assertEquals(computeActual("SELECT \"_row_id\", * FROM " + tableName).getRowCount(), 3);
assertQuery("SELECT \"_row_id\" FROM " + tableName + " ORDER BY id", "VALUES NULL, NULL, NULL");
assertEquals(
computeActual("SELECT count(*) FROM " + tableName +
" WHERE \"_last_updated_sequence_number\" IS NULL").getOnlyValue(),
0L,
"_last_updated_sequence_number must be non-null for V2 rows (inherited from data sequence number)");
// Strengthen the pre-upgrade V2 assertions by verifying that the values returned by Presto for
// _last_updated_sequence_number match the Iceberg dataSequenceNumber values for the underlying data files.
//
// In this test:
// - The first write produces a file containing rows with id = 1, 2
// - The second write produces a file containing the row with id = 3
//
// For V2 tables, the dataSequenceNumber is stored per data file and should be inherited by all rows
// read from that file when firstRowId is unset (pre-V3 snapshots).
Snapshot snapshot = table.currentSnapshot();
List<Long> dataSequenceNumbers = Streams.stream(SnapshotUtil.snapshotDataFiles(table, snapshot.snapshotId()))
.map(DataFile::dataSequenceNumber)
.distinct()
.sorted()
.collect(toImmutableList());
assertEquals(
dataSequenceNumbers.size(),
2,
"Expected two distinct data sequence numbers for the two data files written in the test");
MaterializedResult v2Rows = computeActual(
"SELECT id, \"_last_updated_sequence_number\" FROM " + tableName + " ORDER BY id");
List<Long> lastUpdatedSequenceNumbers = v2Rows.getMaterializedRows().stream()
.map(row -> ((Number) row.getField(1)).longValue())
.collect(toImmutableList());
assertEquals(lastUpdatedSequenceNumbers.size(), 3, "Expected three V2 rows");
// Rows with id = 1 and 2 come from the first file, and id = 3 from the second file. Validate that
// the _last_updated_sequence_number exported by the native engine matches the Iceberg dataSequenceNumber
// for the corresponding data file.
assertEquals(
lastUpdatedSequenceNumbers.get(0),
dataSequenceNumbers.get(0),
"Row with id = 1 should inherit the dataSequenceNumber of the first data file");
assertEquals(
lastUpdatedSequenceNumbers.get(1),
dataSequenceNumbers.get(0),
"Row with id = 2 should inherit the dataSequenceNumber of the first data file");
assertEquals(
lastUpdatedSequenceNumbers.get(2),
dataSequenceNumbers.get(1),
"Row with id = 3 should inherit the dataSequenceNumber of the second data file");
```
To compile successfully, the following imports (or their equivalents, depending on what is already used in this test class) need to be added at the top of `TestPrestoNativeIcebergRowLineage.java`:
1. Import Iceberg metadata utilities and data file type:
- `import org.apache.iceberg.Snapshot;`
- `import org.apache.iceberg.SnapshotUtil;`
- `import org.apache.iceberg.DataFile;`
2. Import streaming/collection utilities:
- `import java.util.List;`
- `import com.google.common.collect.Streams;`
- `import static com.google.common.collect.ImmutableList.toImmutableList;`
If the file already uses `MaterializedResult` and `toImmutableList`, re-use the existing imports instead of duplicating them.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
b1f349f to
6861fb9
Compare
Contributor
Author
|
@sourcery-ai review |
Contributor
Author
|
@sourcery-ai summary |
Contributor
Author
|
@sourcery-ai guide |
Contributor
There was a problem hiding this comment.
Hey - I've left some high level feedback:
- In
IcebergPrestoToVeloxConnector::toVeloxSplit, now that the data sequence and first row id info column keys useIcebergMetadataColumnconstants, consider also replacing the hardcoded"$path"key with the corresponding constant (if available) to keep these info column names centralized and less error-prone. - The repeated use of the literal
"format-version"and string values'2'/'3'across the new and modified tests could be refactored into shared constants or helper methods to reduce duplication and make future Iceberg version-related changes easier.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `IcebergPrestoToVeloxConnector::toVeloxSplit`, now that the data sequence and first row id info column keys use `IcebergMetadataColumn` constants, consider also replacing the hardcoded `"$path"` key with the corresponding constant (if available) to keep these info column names centralized and less error-prone.
- The repeated use of the literal `"format-version"` and string values `'2'`/`'3'` across the new and modified tests could be refactored into shared constants or helper methods to reduce duplication and make future Iceberg version-related changes easier.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Adds end-to-end read support for the Iceberg V3 row lineage hidden metadata columns
_row_idand_last_updated_sequence_numberin the Presto native execution engine.Motivation and Context
Fixes
Impact
Adds Iceberg v3 row lineage read support in native engine.
Test Plan
Tests added in this PR
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
Summary by Sourcery
Add native execution engine support for reading Iceberg V3 row lineage metadata columns and validate behavior across table versions and upgrades.
New Features:
Enhancements:
Build:
Tests: