From dc4c4ed909e738ad16d63f0bdaeec7aed8ea5582 Mon Sep 17 00:00:00 2001 From: Aleksandar Milosevic Date: Thu, 25 Jan 2024 03:06:05 +0100 Subject: [PATCH 1/3] wip --- .../api/client/ITDeltaSharingClient.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java b/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java index fc052021a..55447b1a5 100644 --- a/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java +++ b/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java @@ -24,6 +24,7 @@ public class ITDeltaSharingClient implements DatasetComparer, ScalaUtils { private final StorageManagerInitializer storageManagerInitializer; private final String deltaTablePath; + public ITDeltaSharingClient() { this.storageManagerInitializer = new StorageManagerInitializer(); this.deltaTablePath = @@ -59,6 +60,30 @@ void showS3Table1withQueryTableApi() { assertSmallDatasetEquality(ds, expectedData, true, false, false, 500); } + @Test + void showS3IcebergTableWithQueryTableApi() { + var spark = TestSparkSession.newSparkSession(); + registerAnIcebergTable(); + var ds = spark.read().format("deltaSharing").load(deltaTablePath); + var expectedSchema = new StructType(new StructField[] { + new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap())) + }); + var expectedData = spark + .createDataFrame( + List.of( + new MrFoxDeltaTableSchema(0), + new MrFoxDeltaTableSchema(3), + new MrFoxDeltaTableSchema(2), + new MrFoxDeltaTableSchema(1), + new MrFoxDeltaTableSchema(4)), + MrFoxDeltaTableSchema.class) + .toDF(); + + assertEquals(expectedSchema, ds.schema()); + assertEquals(5, ds.count()); + assertSmallDatasetEquality(ds, expectedData, true, false, false, 500); + } + @Test void registerAnIcebergTable() { TableInfo tableInfo = storageManagerInitializer.createIcebergTableWithGlueMetastore(); From d2878e673d7ebbb4141fe50df75018ce2de27d60 Mon Sep 17 00:00:00 2001 From: Aleksandar Milosevic Date: Fri, 26 Jan 2024 20:35:13 +0100 Subject: [PATCH 2/3] added iceberg table to share --- .../api/client/ITDeltaSharingClient.java | 34 ++++++++++--------- .../api/utils/StorageManagerInitializer.java | 7 ++++ .../java/io/whitefox/api/utils/TablePath.java | 4 +++ 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java b/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java index 55447b1a5..ab595325c 100644 --- a/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java +++ b/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java @@ -23,12 +23,14 @@ public class ITDeltaSharingClient implements DatasetComparer, ScalaUtils { private final StorageManagerInitializer storageManagerInitializer; private final String deltaTablePath; - + private final String icebergTablePath; public ITDeltaSharingClient() { this.storageManagerInitializer = new StorageManagerInitializer(); this.deltaTablePath = TablePath.getDeltaTablePath(getClass().getClassLoader().getResource("MrFoxProfile.json")); + this.icebergTablePath = + TablePath.getIcebergTablePath(getClass().getClassLoader().getResource("MrFoxProfile.json")); } @BeforeAll @@ -37,10 +39,10 @@ static void initStorageManager() { } @Test - void showS3Table1withQueryTableApi() { + void showS3IcebergTable1withQueryTableApi() { var spark = TestSparkSession.newSparkSession(); - storageManagerInitializer.createS3DeltaTable(); - var ds = spark.read().format("deltaSharing").load(deltaTablePath); + storageManagerInitializer.createIcebergTableWithGlueMetastore(); + var ds = spark.read().format("deltaSharing").load(icebergTablePath); var expectedSchema = new StructType(new StructField[] { new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap())) }); @@ -61,23 +63,23 @@ void showS3Table1withQueryTableApi() { } @Test - void showS3IcebergTableWithQueryTableApi() { + void showS3Table1withQueryTableApi() { var spark = TestSparkSession.newSparkSession(); - registerAnIcebergTable(); + storageManagerInitializer.createS3DeltaTable(); var ds = spark.read().format("deltaSharing").load(deltaTablePath); var expectedSchema = new StructType(new StructField[] { - new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap())) + new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap())) }); var expectedData = spark - .createDataFrame( - List.of( - new MrFoxDeltaTableSchema(0), - new MrFoxDeltaTableSchema(3), - new MrFoxDeltaTableSchema(2), - new MrFoxDeltaTableSchema(1), - new MrFoxDeltaTableSchema(4)), - MrFoxDeltaTableSchema.class) - .toDF(); + .createDataFrame( + List.of( + new MrFoxDeltaTableSchema(0), + new MrFoxDeltaTableSchema(3), + new MrFoxDeltaTableSchema(2), + new MrFoxDeltaTableSchema(1), + new MrFoxDeltaTableSchema(4)), + MrFoxDeltaTableSchema.class) + .toDF(); assertEquals(expectedSchema, ds.schema()); assertEquals(5, ds.count()); diff --git a/client-spark/src/test/java/io/whitefox/api/utils/StorageManagerInitializer.java b/client-spark/src/test/java/io/whitefox/api/utils/StorageManagerInitializer.java index 19667e9e5..2a91d6991 100644 --- a/client-spark/src/test/java/io/whitefox/api/utils/StorageManagerInitializer.java +++ b/client-spark/src/test/java/io/whitefox/api/utils/StorageManagerInitializer.java @@ -58,7 +58,14 @@ public TableInfo createIcebergTableWithGlueMetastore() { var provider = ApiUtils.recoverConflictLazy( () -> providerV1Api.addProvider(providerRequest), () -> providerV1Api.getProvider(providerRequest.getName())); + var schemaRequest = createSchemaRequest(TableFormat.iceberg); + var shareRequest = createShareRequest(); + ignoreConflict(() -> schemaV1Api.createSchema(shareRequest.getName(), schemaRequest)); var createTableRequest = createIcebergTableRequest(); + ignoreConflict(() -> schemaV1Api.addTableToSchema( + shareRequest.getName(), + schemaRequest, + addTableToSchemaRequest(providerRequest.getName(), createTableRequest.getName()))); return ApiUtils.recoverConflictLazy( () -> tableV1Api.createTableInProvider(provider.getName(), createTableRequest), () -> tableV1Api.describeTableInProvider(provider.getName(), createTableRequest.getName())); diff --git a/client-spark/src/test/java/io/whitefox/api/utils/TablePath.java b/client-spark/src/test/java/io/whitefox/api/utils/TablePath.java index 63a910545..e3186f693 100644 --- a/client-spark/src/test/java/io/whitefox/api/utils/TablePath.java +++ b/client-spark/src/test/java/io/whitefox/api/utils/TablePath.java @@ -4,6 +4,10 @@ public class TablePath { + public static String getIcebergTablePath(URL resource) { + return String.format("%s#%s.%s.%s", resource, "s3share", "s3schemaiceberg", "s3IcebergTable1"); + } + public static String getDeltaTablePath(URL resource) { return String.format("%s#%s.%s.%s", resource, "s3share", "s3schemadelta", "s3Table1"); } From c589fe6f3c7b78a570b82a79e3d7d7349e9eeb45 Mon Sep 17 00:00:00 2001 From: Aleksandar Milosevic Date: Mon, 29 Jan 2024 22:35:18 +0100 Subject: [PATCH 3/3] fixed catalog loading --- .../io/whitefox/api/client/ITDeltaSharingClient.java | 4 +++- .../test/java/io/whitefox/api/utils/S3TestConfig.java | 11 +++++++++-- .../whitefox/api/utils/StorageManagerInitializer.java | 9 +++++---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java b/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java index ab595325c..7a923b181 100644 --- a/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java +++ b/client-spark/src/test/java/io/whitefox/api/client/ITDeltaSharingClient.java @@ -44,7 +44,7 @@ void showS3IcebergTable1withQueryTableApi() { storageManagerInitializer.createIcebergTableWithGlueMetastore(); var ds = spark.read().format("deltaSharing").load(icebergTablePath); var expectedSchema = new StructType(new StructField[] { - new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap())) + new StructField("id", DataType.fromDDL("long"), false, new Metadata(emptyScalaMap())) }); var expectedData = spark .createDataFrame( @@ -70,6 +70,8 @@ void showS3Table1withQueryTableApi() { var expectedSchema = new StructType(new StructField[] { new StructField("id", DataType.fromDDL("long"), true, new Metadata(emptyScalaMap())) }); + + ds.show(); var expectedData = spark .createDataFrame( List.of( diff --git a/client-spark/src/test/java/io/whitefox/api/utils/S3TestConfig.java b/client-spark/src/test/java/io/whitefox/api/utils/S3TestConfig.java index 3f1a15f85..72b0a59ce 100644 --- a/client-spark/src/test/java/io/whitefox/api/utils/S3TestConfig.java +++ b/client-spark/src/test/java/io/whitefox/api/utils/S3TestConfig.java @@ -4,6 +4,7 @@ public class S3TestConfig { private final String region; private final String accessKey; private final String secretKey; + private final String glueCatalogId; public String getRegion() { return region; @@ -17,16 +18,22 @@ public String getSecretKey() { return secretKey; } - public S3TestConfig(String region, String accessKey, String secretKey) { + public String getGlueCatalogId() { + return glueCatalogId; + } + + public S3TestConfig(String region, String accessKey, String secretKey, String glueCatalogId) { this.region = region; this.accessKey = accessKey; this.secretKey = secretKey; + this.glueCatalogId = glueCatalogId; } public static S3TestConfig loadFromEnv() { return new S3TestConfig( System.getenv().get("WHITEFOX_TEST_AWS_REGION"), System.getenv().get("WHITEFOX_TEST_AWS_ACCESS_KEY_ID"), - System.getenv().get("WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY")); + System.getenv().get("WHITEFOX_TEST_AWS_SECRET_ACCESS_KEY"), + System.getenv().get("WHITEFOX_TEST_GLUE_CATALOG_ID")); } } diff --git a/client-spark/src/test/java/io/whitefox/api/utils/StorageManagerInitializer.java b/client-spark/src/test/java/io/whitefox/api/utils/StorageManagerInitializer.java index 2a91d6991..8824003bd 100644 --- a/client-spark/src/test/java/io/whitefox/api/utils/StorageManagerInitializer.java +++ b/client-spark/src/test/java/io/whitefox/api/utils/StorageManagerInitializer.java @@ -62,13 +62,14 @@ public TableInfo createIcebergTableWithGlueMetastore() { var shareRequest = createShareRequest(); ignoreConflict(() -> schemaV1Api.createSchema(shareRequest.getName(), schemaRequest)); var createTableRequest = createIcebergTableRequest(); + ApiUtils.recoverConflictLazy( + () -> tableV1Api.createTableInProvider(provider.getName(), createTableRequest), + () -> tableV1Api.describeTableInProvider(provider.getName(), createTableRequest.getName())); ignoreConflict(() -> schemaV1Api.addTableToSchema( shareRequest.getName(), schemaRequest, addTableToSchemaRequest(providerRequest.getName(), createTableRequest.getName()))); - return ApiUtils.recoverConflictLazy( - () -> tableV1Api.createTableInProvider(provider.getName(), createTableRequest), - () -> tableV1Api.describeTableInProvider(provider.getName(), createTableRequest.getName())); + return tableV1Api.describeTableInProvider(provider.getName(), createTableRequest.getName()); } private String createSchemaRequest(TableFormat tableFormat) { @@ -129,7 +130,7 @@ private CreateMetastore createMetastoreRequest( .type(type) .skipValidation(true) .properties(new MetastoreProperties(new GlueProperties() - .catalogId("catalogId") // TODO + .catalogId(s3TestConfig.getGlueCatalogId()) .credentials(new SimpleAwsCredentials() .region(s3TestConfig.getRegion()) .awsAccessKeyId(s3TestConfig.getAccessKey())