Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor improvements to tests and others #23518

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions core/trino-main/src/main/java/io/trino/sql/DynamicFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,16 @@ public static Expression replaceDynamicFilterId(Call dynamicFilterFunctionCall,

public static boolean isDynamicFilter(Expression expression)
{
return getDescriptor(expression).isPresent();
return (expression instanceof Call call) && isDynamicFilterFunction(call);
}

public static Optional<Descriptor> getDescriptor(Expression expression)
{
if (!(expression instanceof Call call)) {
if (!isDynamicFilter(expression)) {
return Optional.empty();
}

if (!isDynamicFilterFunction(call)) {
return Optional.empty();
}

List<Expression> arguments = call.arguments();
List<Expression> arguments = ((Call) expression).arguments();
checkArgument(arguments.size() == 4, "invalid arguments count: %s", arguments.size());

Expression probeSymbol = arguments.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.Session.SessionBuilder;
import io.trino.connector.MockConnector;
import io.trino.connector.MockConnectorColumnHandle;
import io.trino.connector.MockConnectorFactory;
Expand Down Expand Up @@ -63,7 +64,7 @@
import static io.trino.sql.planner.plan.ExchangeNode.Scope.REMOTE;
import static io.trino.sql.planner.plan.ExchangeNode.Type.GATHER;
import static io.trino.sql.planner.plan.ExchangeNode.Type.REPARTITION;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.TestingSession.testSession;

public class TestAddLocalExchangesForTaskScaleWriters
extends BasePlanTest
Expand All @@ -73,7 +74,7 @@ public class TestAddLocalExchangesForTaskScaleWriters
@Override
protected PlanTester createPlanTester()
{
PlanTester planTester = PlanTester.create(testSessionBuilder().build());
PlanTester planTester = PlanTester.create(testSession());
planTester.createCatalog(
"mock_with_scaled_writers",
createConnectorFactory("mock_with_scaled_writers", true, true),
Expand Down Expand Up @@ -150,7 +151,7 @@ public void testLocalScaledUnpartitionedWriterDistribution()
{
assertDistributedPlan(
"INSERT INTO unpartitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog("mock_without_multiple_writer_per_partition")
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true")
Expand All @@ -166,7 +167,7 @@ public void testLocalScaledUnpartitionedWriterDistribution()

assertDistributedPlan(
"INSERT INTO unpartitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog("mock_without_multiple_writer_per_partition")
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false")
Expand All @@ -186,7 +187,7 @@ public void testLocalScaledUnpartitionedWriterWithPerTaskScalingDisabled()
{
assertDistributedPlan(
"INSERT INTO unpartitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog("mock_without_scaled_writers")
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true")
Expand All @@ -202,7 +203,7 @@ public void testLocalScaledUnpartitionedWriterWithPerTaskScalingDisabled()

assertDistributedPlan(
"INSERT INTO unpartitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog("mock_without_scaled_writers")
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false")
Expand All @@ -229,7 +230,7 @@ public void testLocalScaledPartitionedWriterWithoutSupportForMultipleWritersPerP

assertDistributedPlan(
"INSERT INTO connector_partitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog(catalogName)
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, String.valueOf(taskScaleWritersEnabled))
Expand Down Expand Up @@ -257,7 +258,7 @@ public void testLocalScaledPartitionedWriterWithPerTaskScalingDisabled()

assertDistributedPlan(
"INSERT INTO connector_partitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog(catalogName)
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, String.valueOf(taskScaleWritersEnabled))
Expand All @@ -278,7 +279,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre
{
assertDistributedPlan(
"INSERT INTO system_partitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog("mock_with_scaled_writers")
.setSchema("mock")
// Enforce preferred partitioning
Expand All @@ -296,7 +297,7 @@ public void testLocalScaledPartitionedWriterForSystemPartitioningWithEnforcedPre

assertDistributedPlan(
"INSERT INTO system_partitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog("mock_with_scaled_writers")
.setSchema("mock")
// Enforce preferred partitioning
Expand Down Expand Up @@ -329,7 +330,7 @@ public void testLocalScaledPartitionedWriterForConnectorPartitioning()

assertDistributedPlan(
"INSERT INTO connector_partitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog(catalogName)
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true")
Expand All @@ -345,7 +346,7 @@ public void testLocalScaledPartitionedWriterForConnectorPartitioning()

assertDistributedPlan(
"INSERT INTO connector_partitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog(catalogName)
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false")
Expand All @@ -365,7 +366,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni
{
assertDistributedPlan(
"INSERT INTO system_partitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog("mock_with_scaled_writers")
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true")
Expand All @@ -381,7 +382,7 @@ public void testLocalScaledPartitionedWriterWithEnforcedLocalPreferredPartitioni

assertDistributedPlan(
"INSERT INTO system_partitioned_table SELECT * FROM source_table",
testSessionBuilder()
testingSessionBuilder()
.setCatalog("mock_with_scaled_writers")
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "false")
Expand All @@ -401,7 +402,7 @@ public void testTableExecuteLocalScalingDisabledForPartitionedTable()
{
@Language("SQL") String query = "ALTER TABLE system_partitioned_table EXECUTE optimize(file_size_threshold => '10MB')";

Session session = Session.builder(getPlanTester().getDefaultSession())
Session session = testingSessionBuilder()
.setCatalog("mock_with_scaled_writers")
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true")
Expand All @@ -422,7 +423,7 @@ public void testTableExecuteLocalScalingDisabledForUnpartitionedTable()
{
@Language("SQL") String query = "ALTER TABLE unpartitioned_table EXECUTE optimize(file_size_threshold => '10MB')";

Session session = Session.builder(getPlanTester().getDefaultSession())
Session session = testingSessionBuilder()
.setCatalog("mock_with_scaled_writers")
.setSchema("mock")
.setSystemProperty(TASK_SCALE_WRITERS_ENABLED, "true")
Expand All @@ -437,4 +438,9 @@ public void testTableExecuteLocalScalingDisabledForUnpartitionedTable()
exchange(REMOTE, REPARTITION, SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION,
node(TableScanNode.class))))));
}

private SessionBuilder testingSessionBuilder()
{
return Session.builder(getPlanTester().getDefaultSession());
}
}
6 changes: 6 additions & 0 deletions plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-tpcds</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-tpch</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.trino.plugin.hive.containers.HiveMinioDataLake;
import io.trino.plugin.iceberg.catalog.jdbc.TestingIcebergJdbcServer;
import io.trino.plugin.iceberg.catalog.rest.TestingPolarisCatalog;
import io.trino.plugin.tpcds.TpcdsPlugin;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
Expand Down Expand Up @@ -84,6 +85,7 @@ public static class Builder
private Optional<File> metastoreDirectory = Optional.empty();
private ImmutableMap.Builder<String, String> icebergProperties = ImmutableMap.builder();
private Optional<SchemaInitializer> schemaInitializer = Optional.empty();
private boolean tpcdsCatalogEnabled;

protected Builder()
{
Expand Down Expand Up @@ -139,6 +141,12 @@ public Builder setSchemaInitializer(SchemaInitializer schemaInitializer)
return self();
}

public Builder setTpcdsCatalogEnabled(boolean tpcdsCatalogEnabled)
{
this.tpcdsCatalogEnabled = tpcdsCatalogEnabled;
return self();
}

@Override
public DistributedQueryRunner build()
throws Exception
Expand All @@ -148,6 +156,11 @@ public DistributedQueryRunner build()
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");

if (tpcdsCatalogEnabled) {
queryRunner.installPlugin(new TpcdsPlugin());
queryRunner.createCatalog("tpcds", "tpcds");
}

if (!icebergProperties.buildOrThrow().containsKey("fs.hadoop.enabled")) {
icebergProperties.put("fs.hadoop.enabled", "true");
}
Expand All @@ -166,6 +179,13 @@ public DistributedQueryRunner build()
}
}

private static Builder icebergQueryRunnerMainBuilder()
{
return IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
.setTpcdsCatalogEnabled(true);
}

public static final class IcebergRestQueryRunnerMain
{
private IcebergRestQueryRunnerMain() {}
Expand All @@ -186,8 +206,7 @@ public static void main(String[] args)
testServer.start();

@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.setBaseDataDir(Optional.of(warehouseLocation.toPath()))
.setIcebergProperties(ImmutableMap.of(
"iceberg.catalog.type", "rest",
Expand Down Expand Up @@ -215,8 +234,7 @@ public static void main(String[] args)
TestingPolarisCatalog polarisCatalog = new TestingPolarisCatalog(warehouseLocation.getPath());

@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.setBaseDataDir(Optional.of(warehouseLocation.toPath()))
.addIcebergProperty("iceberg.catalog.type", "rest")
.addIcebergProperty("iceberg.rest-catalog.uri", polarisCatalog.restUri() + "/api/catalog")
Expand All @@ -243,8 +261,7 @@ public static void main(String[] args)
// Requires AWS credentials, which can be provided any way supported by the DefaultProviderChain
// See https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html#credentials-default
@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.setIcebergProperties(ImmutableMap.of("iceberg.catalog.type", "glue"))
.build();

Expand Down Expand Up @@ -310,8 +327,7 @@ public static void main(String[] args)
minio.createBucket(bucketName);

@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.setIcebergProperties(Map.of(
"iceberg.catalog.type", "TESTING_FILE_METASTORE",
"hive.metastore.catalog.dir", "s3://%s/".formatted(bucketName),
Expand Down Expand Up @@ -364,8 +380,7 @@ public static void main(String[] args)
hiveHadoop.start();

@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.setIcebergProperties(Map.of(
"iceberg.catalog.type", "HIVE_METASTORE",
"hive.metastore.uri", hiveHadoop.getHiveMetastoreEndpoint().toString(),
Expand Down Expand Up @@ -400,8 +415,7 @@ public static void main(String[] args)
TestingIcebergJdbcServer server = new TestingIcebergJdbcServer();

@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.setIcebergProperties(ImmutableMap.<String, String>builder()
.put("iceberg.catalog.type", "jdbc")
.put("iceberg.jdbc-catalog.driver-class", "org.postgresql.Driver")
Expand All @@ -428,8 +442,7 @@ public static void main(String[] args)
throws Exception
{
@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.setIcebergProperties(ImmutableMap.<String, String>builder()
.put("iceberg.catalog.type", "snowflake")
.put("fs.native-s3.enabled", "true")
Expand Down Expand Up @@ -466,8 +479,7 @@ public static void main(String[] args)
metastoreDir.deleteOnExit();

@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.addIcebergProperty("hive.metastore.catalog.dir", metastoreDir.toURI().toString())
.setInitialTables(TpchTable.getTables())
.build();
Expand Down Expand Up @@ -495,8 +507,7 @@ public static void main(String[] args)
metastoreDir.deleteOnExit();

@SuppressWarnings("resource")
QueryRunner queryRunner = IcebergQueryRunner.builder()
.addCoordinatorProperty("http-server.http.port", "8080")
QueryRunner queryRunner = icebergQueryRunnerMainBuilder()
.addIcebergProperty("hive.metastore.catalog.dir", metastoreDir.toURI().toString())
.setExtraProperties(ImmutableMap.<String, String>builder()
.put("retry-policy", "TASK")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,23 @@ protected String getConfigurationDirectory(String dockerImage)
private String getConfigFileFor(String dockerImage)
{
if (getVersionFromDockerImageName(dockerImage) < 369) {
return "config-with-system-memory.properties";
return "config-pre369.properties";
}
return "config.properties";
}

private String getHiveConfigFor(String dockerImage)
{
if (getVersionFromDockerImageName(dockerImage) < 359) {
return "hive-hadoop2.properties";
return "hive-pre359.properties";
}
return "hive.properties";
}

private String getIcebergConfigFor(String dockerImage)
{
if (getVersionFromDockerImageName(dockerImage) < 359) {
return "iceberg_old.properties";
return "iceberg-pre359.properties";
}
return "iceberg.properties";
}
Expand Down
Loading