Skip to content

Commit

Permalink
fixup! Add support for add_files procedure in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 25, 2024
1 parent a056cc9 commit fe148d7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void doAddFiles(
checkProcedureArgument(
table.schemas().size() == sourceTable.getDataColumns().size(),
"Data column count mismatch: %d vs %d", table.schemas().size(), sourceTable.getDataColumns().size());
for (Column sourceColumn : sourceTable.getDataColumns()) {
for (Column sourceColumn : Stream.concat(sourceTable.getDataColumns().stream(), sourceTable.getPartitionColumns().stream()).toList()) {
Types.NestedField targetColumn = schema.caseInsensitiveFindField(sourceColumn.getName());
if (targetColumn == null) {
throw new TrinoException(COLUMN_NOT_FOUND, "Column '%s' does not exist".formatted(sourceColumn.getName()));
Expand Down Expand Up @@ -308,7 +308,7 @@ else if (location != null && format != null) {
try {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
ImmutableList.Builder<DataFile> dataFilesBuilder = ImmutableList.builder();
if (!partitionSpec.isPartitioned()) {
if (partitionSpec.isUnpartitioned()) {
log.debug("Building data files from %s", sourceLocation);
dataFilesBuilder.addAll(buildDataFiles(fileSystem, sourceType, recursive, storageFormat, sourceLocation, partitionSpec, Optional.empty(), schema, requiredFields));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

import static io.trino.testing.TestingNames.randomNameSuffix;

Expand Down Expand Up @@ -111,12 +112,22 @@ void testAddFilesNotNullViolation()

@Test
void testAddFilesDifferentFileFormat()
{
testAddFilesDifferentFileFormat("PARQUET", "ORC");
testAddFilesDifferentFileFormat("PARQUET", "AVRO");
testAddFilesDifferentFileFormat("ORC", "PARQUET");
testAddFilesDifferentFileFormat("ORC", "AVRO");
testAddFilesDifferentFileFormat("AVRO", "PARQUET");
testAddFilesDifferentFileFormat("AVRO", "ORC");
}

private void testAddFilesDifferentFileFormat(String hiveFormat, String icebergFormat)
{
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = 'PARQUET') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'ORC') AS SELECT 2 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = '" + icebergFormat + "') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = '" + hiveFormat + "') AS SELECT 2 x", 1);

assertUpdate("CALL iceberg.system.add_files('tpch', '" + icebergTableName + "', 'tpch', '" + hiveTableName + "')");

Expand Down Expand Up @@ -147,51 +158,53 @@ void testAddFilesAcrossSchema()
}

@Test
void testAddFilesOrcDifferentColumnDefinitions()
void testAddFilesTypeMismatch()
{
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'ORC') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = 'ORC') AS SELECT 2 y", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = 'ORC') AS SELECT '1' x", 1);
assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'ORC') AS SELECT 2 x", 1);

assertQueryFails(
"CALL iceberg.system.add_files('tpch', '" + icebergTableName + "', 'tpch', '" + hiveTableName + "')",
"Column 'x' does not exist");
"Expected target 'string' type, but got source 'int' type");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
}

@Test
void testAddFilesParquetDifferentColumnDefinitions()
void testAddFilesDifferentDataColumnDefinitions()
{
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();
for (String format : List.of("ORC", "PARQUET", "AVRO")) {
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'PARQUET') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = 'PARQUET') AS SELECT 2 y", 1);
assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = '" + format + "') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = '" + format + "') AS SELECT 2 y", 1);

assertQueryFails(
"CALL iceberg.system.add_files('tpch', '" + icebergTableName + "', 'tpch', '" + hiveTableName + "')",
"Column 'x' does not exist");
assertQueryFails(
"CALL iceberg.system.add_files('tpch', '" + icebergTableName + "', 'tpch', '" + hiveTableName + "')",
"Column 'x' does not exist");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
}
}

@Test
void testAddFilesAvroDifferentColumnDefinitions()
void testAddFilesDifferentPartitionColumnDefinitions()
{
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'AVRO') AS SELECT 1 x", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = 'AVRO') AS SELECT 2 y", 1);
assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (partitioned_by = ARRAY['hive_part']) AS SELECT 1 x, 10 hive_part", 1);
assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (partitioning = ARRAY['iceberg_part']) AS SELECT 2 x, 20 iceberg_part", 1);

assertQueryFails(
"CALL iceberg.system.add_files('tpch', '" + icebergTableName + "', 'tpch', '" + hiveTableName + "')",
"Column 'x' does not exist");
"Column 'hive_part' does not exist");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
Expand Down Expand Up @@ -440,23 +453,6 @@ void testAddFilesToNonIcebergTable()
assertUpdate("DROP TABLE hive.tpch." + targetHiveTableName);
}

@Test
void testAddFilesTypeMismatch()
{
String hiveTableName = "test_add_files_" + randomNameSuffix();
String icebergTableName = "test_add_files_" + randomNameSuffix();

assertUpdate("CREATE TABLE iceberg.tpch." + icebergTableName + " WITH (format = 'ORC') AS SELECT '1' x", 1);
assertUpdate("CREATE TABLE hive.tpch." + hiveTableName + " WITH (format = 'ORC') AS SELECT 2 x", 1);

assertQueryFails(
"CALL iceberg.system.add_files('tpch', '" + icebergTableName + "', 'tpch', '" + hiveTableName + "')",
"Expected target 'string' type, but got source 'int' type");

assertUpdate("DROP TABLE hive.tpch." + hiveTableName);
assertUpdate("DROP TABLE iceberg.tpch." + icebergTableName);
}

@Test
void testAddDuplicatedFiles()
{
Expand Down

0 comments on commit fe148d7

Please sign in to comment.