Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【Iceberg equality scan】fix duplicate key issue when querying metadata column "$data_sequence_number" from an iceberg table within equality deletes #24629

Open
Dream-hu opened this issue Feb 26, 2025 · 3 comments
Labels

Comments

@Dream-hu
Copy link

Your Environment

  • Presto version used: 0.291
  • Storage (HDFS/S3/GCS..): local file
  • Data source and connector used: IcebergConnecntor
  • Deployment (Cloud or On-prem): run IcebergQueryRunner on local env
  • Pastebin link to the complete debug logs:

Expected Behavior

Select the hidden column "$data_sequence_number" successfully

Current Behavior

Query failed, error messages as follow:

Query 20250226_023830_00016_bkm44 failed: Duplicate key $data_sequence_number

Possible Solution

When update the new TableScanNode, check whether need to add the extra metadata column to the assignments and outputs.

Steps to Reproduce

  1. Create an iceberg table with equality deletes by flink cdc
  2. Configure the IcebergQueryRunner, the properties are:

Image

  1. Running the IcebergQueryRunner and query with the sql:
    select "$data_sequence_number", * from tbl_cdc_with_equality;
    or
    select "$data_sequence_number" from tbl_cdc_with_equality;
  2. Query failed, and receive the error

Screenshots (if appropriate)

Image

Context

@ZacBlanco
Copy link
Contributor

Thank you for reporting this. We will look into it and try to get a fix in before the next release. I will try to reproduce this myself, but if you are able to provide instructions on how to reproduce it without using Flink it would be helpful. Specifically a CREATE TABLE statement and some sequence of queries which insert data from the tpch or tpcds connectors or even just using VALUES statements in order to get the table in a state where this error appears will help speed up the process

@Dream-hu
Copy link
Author

Dream-hu commented Mar 10, 2025

It`s my pleasure!

However, Flink is the primary tool that supports writing Iceberg equality deletes Currently.
Flink provides two methods to handle data equality in Iceberg tables: upsert and CDC (Change Data Capture). Upsert is the simpler approach. Here's an example of creating and using an Iceberg table with upsert capability.
It`s easier to install flink and write iceberg table on local env then an object store like aws s3.

`-- Create local catalog
CREATE CATALOG hadoop_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hadoop',
'warehouse' = 'file:///xxxx',
'property-version' = '1'
);

-- Set up database and table
USE CATALOG hadoop_catalog;
CREATE DATABASE iceberg_flink_db;
USE iceberg_flink_db;

CREATE TABLE hadoop_catalog.iceberg_flink_db.sample (
id INT COMMENT 'unique id',
data STRING NOT NULL,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'format-version' = '2',
'write.upsert.enabled' = 'true'
);

-- Insert test data
INSERT INTO hadoop_catalog.iceberg_flink_db.sample VALUES (1, 'a');
INSERT INTO hadoop_catalog.iceberg_flink_db.sample VALUES
(1, 'b'), (2, 'a'), (3, 'a'), (4, 'a'),
(5, 'a'), (6, 'a'), (7, 'a'), (8, 'a'), (9, 'a');
`

Then, Querying select "$data_sequence_number", * from iceberg_flink_db.sample by presto will fail as above.

@Dream-hu
Copy link
Author

And even though I have set up Flink and written data to S3, please note that sharing metadata and data files alone here isn't sufficient for direct use.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: 🆕 Unprioritized
Development

No branches or pull requests

2 participants