Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unable to create target format Delta with source format as Iceberg when the source table is on S3 #431

Open
rajender07 opened this issue Apr 30, 2024 · 11 comments

Comments

@rajender07
Copy link

I followed the documentation "Creating your first interoperable table", able to build the utilities-0.1.0-SNAPSHOT-bundled.jar successfully.

Initiated a pyspark session using below command. Spark version is 3.4.1 running on Amazon EMR 6.14

pyspark --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.spark_catalog.type=hive"

Create an Iceberg table using below commands:

data =[("James","Smith","01012020","M","3000"),
("Michael","","02012021","M","4000"),
("Robert","Williams","03012023","M","4000"),
("Maria","Jones","04012024","F","4000"),
("Jen","Brown","05012025","F","-1")]

columns=["firstname","lastname","dob","gender","salary"]

df=spark.createDataFrame(data,columns)

spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_table (firstname string,lastname string,dob string,gender string,salary string) USING iceberg""");

df.writeTo("iceberg_table").append()

I see the data and metadata directory under the table name on s3.

Created my_config.yaml as mentioned in the documentation
my_config.txt

executed below command and see failing with metadata/version-hint.text not available
sudo java -jar ./utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml

2024-04-30 10:24:25 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync.
2024-04-30 10:24:25 WARN org.apache.iceberg.hadoop.HadoopTableOperations:325 - Error reading version hint file s3:///iceberg_table_1/metadata/version-hint.text
java.io.FileNotFoundException: No such file or directory: s3:////iceberg_table_1/metadata/version-hint.text
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3801) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT]
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3652) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT]
at org.apache.hadoop.fs.s3a.S3AFileSystem.extractOrFetchSimpleFileStatus(S3AFileSystem.java:5288) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT]
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$executeOpen$6(S3AFileSystem.java:1578) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT]
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) ~[utilities-0.1.0-SNAPSHOT-bundled.jar:0.1.0-SNAPSHOT]

@rajender07
Copy link
Author

I was looking into the documentation and understand if the source is Iceberg table I need to include catalog.yaml as well.
But I am not sure what should be the value for catalogImpl in my case. Any insights on this would be very helpful.

catalogImpl: io.my.CatalogImpl
catalogName: name
catalogOptions: # all other options are passed through in a map
key1: value1
key2: value2

@dipankarmazumdar
Copy link
Contributor

Hi @rajender07! The error clarifies the problem. It says the version-hint.text file was not found in the source table format (Iceberg). Do you see it on S3?
This is the metadata file on Iceberg side when used with a Hadoop catalog. XTable would need this file to translate into the target Delta format.

The important part to understand here is that Iceberg needs a CATALOG to get started with. Your config currently connects Iceberg with a Hive catalog but I don't see any thrift URL or such here.
pyspark --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" --conf "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" --conf "spark.sql.catalog.spark_catalog.type=hive"

Can you instead use a Hadoop catalog & configure with something like this:

spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = s3a://your-bucket

@rajender07
Copy link
Author

@dipankarmazumdar , Thank you for looking into the issue.
No, I do not version-hint.text this file on s3. when I looked into the documentation I understand this file is created while using Hadoop catalog. Since i was use Iceberg session catalog its not generated.

I will try as you suggested using Hadoop catalog and let you know the findings.

Could you please guide me to solve the issue while using Iceberg catalog. Should I use catalog.yaml file? if yes, I am confused on catalogName that should be used. FYI, I have added Thrift related properties under /etc/spark/conf/spark-default.conf and /etc/spark/conf/hive-site.xml. I have no issues connecting to my metastore and read/write data from it.

@the-other-tim-brown
Copy link
Contributor

@rajender07 Which catalog are you using? If it is HMS, the implementation is org.apache.iceberg.hive.HiveCatalog, the other args and name are going to be used to configure any required configurations for using this catalog like a uri for your thrift server.

@rajender07
Copy link
Author

@dipankarmazumdar @the-other-tim-brown

I used Hadoop catalog as you mentioned and created a new Iceberg table. Now, I can see version-hint.text file as well.

However when I executed sync command it is with below error. Could you please assist how to resolve this issue.
sudo java -jar ./utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml

2024-05-13 13:43:04 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync.
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(Lorg/apache/hadoop/fs/statistics/DurationTracker;Lorg/apache/hadoop/util/functional/CallableRaisingIOE;)Ljava/lang/Object;

Here is my my_config.yaml

**sourceFormat: ICEBERG
targetFormats:

  • DELTA
    datasets:
  • tableBasePath:
    s3:////x4_iceberg_table
    tableName: x4_iceberg_table**

@dipankarmazumdar
Copy link
Contributor

@rajender07 - I am not really sure about this particular error. However, I tried reproducing this on my end and I was able to translate from ICEBERG to DELTA using the setup I suggested.

ICEBERG TABLE CONFIG & CREATION:

import pyspark
from pyspark.sql import SparkSession
import os
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
        .set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://my-bucket/new_iceberg/')
        .set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")
spark.sql("CREATE TABLE hdfs_catalog.table1 (name string) USING iceberg")
spark.sql("INSERT INTO hdfs_catalog.table1 VALUES ('Alex'), ('Dipankar'), ('Mary')")

my_config.yaml

sourceFormat: ICEBERG
targetFormats:
  - DELTA
datasets:
  -
    tableBasePath: s3://my-bucket/new_iceberg/table1/
    tableDataPath: s3://my-bucket/new_iceberg/table1/data
    tableName: table1

Run Sync

java -jar utilities/target/utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml

@amnchauhan
Copy link

@rajender07 Which catalog are you using? If it is HMS, the implementation is org.apache.iceberg.hive.HiveCatalog, the other args and name are going to be used to configure any required configurations for using this catalog like a uri for your thrift server.
@the-other-tim-brown referring this when I'm using hive catalog and passing catalogImpl: org.apache.iceberg.hive.HiveCatalog I'm getting java.lang.NoSuchMethodException: Cannot find constructor for interface org,apache.iceberg.catalog.Catalog while if i use 'org.apache.iceberg.hadoop.HadoopCatalog' iam getting no such error. Is there anything else we need to implement if we are using hive Catalog for our iceberg tables?

@dipankarmazumdar
Copy link
Contributor

@rajender07 - LMK if you were able to get past the error with the recommendation.

@vinishjail97
Copy link
Contributor

@rajender07 Can you pull the latest master try again ?
This is the fix.
#441

@rajender07
Copy link
Author

@vinishjail97, Thank you. I will test the fix today and share an update.

@ambaricloud
Copy link

ambaricloud commented Jun 26, 2024

Creating empty _delta_log dir and erroring out.

--Config File
sourceFormat: ICEBERG
targetFormats:

  • DELTA
    datasets:
  • tableBasePath: s3://<>/prod/orders
    tableDataPath: s3://<>/prod/orders/data
    tableName: orders
    namespace: prod.db

--
java -jar /Users/satyak/iceberg/demo/xtable/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig /Users/satyak/iceberg/demo/xtable/s3_orders_ice_delta.yaml
WARNING: Runtime environment or build system does not support multi-release JARs. This will impact location-based features.
2024-06-26 15:10:21 INFO org.apache.xtable.utilities.RunSync:148 - Running sync for basePath s3://<>/prod/orders for following table formats [DELTA]
2024-06-26 15:10:22 WARN org.apache.hadoop.util.NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/satyak/iceberg/demo/xtable/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2024-06-26 15:10:22 WARN org.apache.spark.util.Utils:72 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2024-06-26 15:10:24 WARN org.apache.hadoop.metrics2.impl.MetricsConfig:136 - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2024-06-26 15:10:25 WARN org.apache.hadoop.fs.s3a.SDKV2Upgrade:39 - Directly referencing AWS SDK V1 credential provider com.amazonaws.auth.DefaultAWSCredentialsProviderChain. AWS SDK V1 credential providers will be removed once S3A is upgraded to SDK V2
2024-06-26 15:10:25 INFO org.apache.spark.sql.delta.storage.DelegatingLogStore:60 - LogStore LogStoreAdapter(io.delta.storage.S3SingleDriverLogStore) is used for scheme s3
2024-06-26 15:10:26 INFO org.apache.spark.sql.delta.DeltaLog:60 - Creating initial snapshot without metadata, because the directory is empty
2024-06-26 15:10:27 INFO org.apache.spark.sql.delta.InitialSnapshot:60 - [tableId=9f0c6a5d-2170-4167-b464-ec54fee685c3] Created snapshot InitialSnapshot(path=s3://ambaricloudsatya/prod/orders/data/_delta_log, version=-1, metadata=Metadata(e3727e72-0eda-476e-8cd7-bf7f85269529,null,null,Format(parquet,Map()),null,List(),Map(),Some(1719432627880)), logSegment=LogSegment(s3://ambaricloudsatya/prod/orders/data/_delta_log,-1,List(),None,-1), checksumOpt=None)
2024-06-26 15:10:28 INFO org.apache.xtable.conversion.ConversionController:240 - No previous InternalTable sync for target. Falling back to snapshot sync.
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(Lorg/apache/hadoop/fs/statistics/DurationTracker;Lorg/apache/hadoop/util/functional/CallableRaisingIOE;)Ljava/lang/Object;
at org.apache.hadoop.fs.s3a.Invoker.onceTrackingDuration(Invoker.java:147)
at org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:282)
at org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$1(S3AInputStream.java:435)
at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:284)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:122)
at org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:408)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:404)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:282)
at org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:326)
at org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:427)
at org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:545)
at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:320)
at org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:104)
at org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:84)
at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:94)
at org.apache.xtable.iceberg.IcebergTableManager.lambda$getTable$1(IcebergTableManager.java:58)
at java.base/java.util.Optional.orElseGet(Optional.java:369)
at org.apache.xtable.iceberg.IcebergTableManager.getTable(IcebergTableManager.java:58)
at org.apache.xtable.iceberg.IcebergConversionSource.initSourceTable(IcebergConversionSource.java:81)
at org.apache.xtable.iceberg.IcebergConversionSource.getSourceTable(IcebergConversionSource.java:60)
at org.apache.xtable.iceberg.IcebergConversionSource.getCurrentSnapshot(IcebergConversionSource.java:121)
at org.apache.xtable.spi.extractor.ExtractFromSource.extractSnapshot(ExtractFromSource.java:38)
at org.apache.xtable.conversion.ConversionController.syncSnapshot(ConversionController.java:183)
at org.apache.xtable.conversion.ConversionController.sync(ConversionController.java:121)
at org.apache.xtable.utilities.RunSync.main(RunSync.java:169)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants