Skip to content

Optimize column listing to not require fetching table properties #23429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
for (SchemaTableName tableName : tables) {
try {
jdbcClient.getTableHandle(session, tableName)
.ifPresent(tableHandle -> columns.put(tableName, getTableMetadata(session, tableHandle).getColumns()));
.ifPresent(tableHandle -> columns.put(tableName, getColumnMetadata(session, tableHandle)));
}
catch (TableNotFoundException | AccessDeniedException e) {
// table disappeared during listing operation or user is not allowed to access it
Expand All @@ -1092,6 +1092,15 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return columns.buildOrThrow();
}

public List<ColumnMetadata> getColumnMetadata(ConnectorSession session, JdbcTableHandle tableHandle)
{
return getColumnHandles(session, tableHandle).values()
.stream()
.map(JdbcColumnHandle.class::cast)
.map(JdbcColumnHandle::getColumnMetadata)
.collect(toImmutableList());
}

@Override
public Iterator<RelationColumnsMetadata> streamRelationColumns(ConnectorSession session, Optional<String> schemaName, UnaryOperator<Set<SchemaTableName>> relationFilter)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.jdbc;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.trino.plugin.base.mapping.IdentifierMapping;
import io.trino.plugin.jdbc.credential.EmptyCredentialProvider;
import io.trino.testing.QueryRunner;
import org.h2.Driver;
import org.junit.jupiter.api.Test;

import java.util.Optional;

import static io.trino.plugin.jdbc.H2QueryRunner.createH2QueryRunner;
import static io.trino.plugin.jdbc.TestingH2JdbcModule.createH2ConnectionUrl;
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
import static io.trino.tpch.TpchTable.NATION;
import static io.trino.tpch.TpchTable.REGION;
import static java.util.Objects.requireNonNull;

// TODO: Implement dedicated test to count number of queries executed.
// This test approximates the number of I/O operations by counting number of times connection is opened since we almost always open a new connection to execute a query.
public class TestJdbcConnectionAccesses
extends BaseJdbcConnectionCreationTest
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
String connectionUrl = createH2ConnectionUrl();
DriverConnectionFactory delegate = DriverConnectionFactory.builder(new Driver(), connectionUrl, new EmptyCredentialProvider()).build();
this.connectionFactory = new ConnectionCountingConnectionFactory(delegate);
return createH2QueryRunner(
ImmutableList.of(NATION, REGION),
ImmutableMap.of("connection-url", connectionUrl,
// disables connection reuse to approximate number of I/O operations since we almost always open a new connection to execute a query
"query.reuse-connection", "false"),
// to make sure we always open connections in the same way
ImmutableMap.of("node-scheduler.include-coordinator", "false"),
new TestingConnectionH2Module(connectionFactory));
}

@Test
public void testJdbcConnectionCreations()
{
assertJdbcConnections("SELECT * FROM nation LIMIT 1", 3, Optional.empty());
assertJdbcConnections("SELECT * FROM nation ORDER BY nationkey LIMIT 1", 3, Optional.empty());
assertJdbcConnections("SELECT * FROM nation WHERE nationkey = 1", 3, Optional.empty());
assertJdbcConnections("SELECT avg(nationkey) FROM nation", 3, Optional.empty());
assertJdbcConnections("SELECT * FROM nation, region", 6, Optional.empty());
assertJdbcConnections("SELECT * FROM nation n, region r WHERE n.regionkey = r.regionkey", 6, Optional.empty());
assertJdbcConnections("SELECT * FROM nation JOIN region USING(regionkey)", 6, Optional.empty());
assertJdbcConnections("SELECT * FROM information_schema.schemata", 1, Optional.empty());
assertJdbcConnections("SELECT * FROM information_schema.tables", 1, Optional.empty());
assertJdbcConnections("SELECT * FROM information_schema.columns", 5, Optional.empty());
assertJdbcConnections("SELECT * FROM nation", 3, Optional.empty());
assertJdbcConnections("CREATE TABLE copy_of_nation AS SELECT * FROM nation", 11, Optional.empty());
assertJdbcConnections("INSERT INTO copy_of_nation SELECT * FROM nation", 12, Optional.empty());
assertJdbcConnections("DELETE FROM copy_of_nation WHERE nationkey = 3", 3, Optional.empty());
assertJdbcConnections("UPDATE copy_of_nation SET name = 'POLAND' WHERE nationkey = 1", 3, Optional.empty());
assertJdbcConnections("MERGE INTO copy_of_nation n USING region r ON r.regionkey= n.regionkey WHEN MATCHED THEN DELETE", 4, Optional.of(MODIFYING_ROWS_MESSAGE));
assertJdbcConnections("DROP TABLE copy_of_nation", 2, Optional.empty());
assertJdbcConnections("SHOW SCHEMAS", 1, Optional.empty());
assertJdbcConnections("SHOW TABLES", 2, Optional.empty());
assertJdbcConnections("SHOW STATS FOR nation", 2, Optional.empty());
assertJdbcConnections("SELECT * FROM system.jdbc.columns WHERE table_cat = 'jdbc'", 5, Optional.empty());
}

private static class TestingConnectionH2Module
implements Module
{
private final ConnectionCountingConnectionFactory connectionCountingConnectionFactory;

TestingConnectionH2Module(ConnectionCountingConnectionFactory connectionCountingConnectionFactory)
{
this.connectionCountingConnectionFactory = requireNonNull(connectionCountingConnectionFactory, "connectionCountingConnectionFactory is null");
}

@Override
public void configure(Binder binder) {}

@Provides
@Singleton
@ForBaseJdbc
public static JdbcClient provideJdbcClient(BaseJdbcConfig config, ConnectionFactory connectionFactory, QueryBuilder queryBuilder, IdentifierMapping identifierMapping)
{
return new TestingH2JdbcClient(config, connectionFactory, queryBuilder, identifierMapping);
}

@Provides
@Singleton
@ForBaseJdbc
public ConnectionFactory getConnectionFactory()
{
return connectionCountingConnectionFactory;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void testJdbcConnectionCreations()
assertJdbcConnections("SHOW SCHEMAS", 1, Optional.empty());
assertJdbcConnections("SHOW TABLES", 1, Optional.empty());
assertJdbcConnections("SHOW STATS FOR nation", 1, Optional.empty());
assertJdbcConnections("SELECT * FROM system.jdbc.columns WHERE table_cat = 'jdbc'", 1, Optional.empty());
}

private static class TestingConnectionH2Module
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
igniteClient.getTableProperties(session, handle));
}

private List<ColumnMetadata> getColumnMetadata(ConnectorSession session, JdbcTableHandle handle)
@Override
public List<ColumnMetadata> getColumnMetadata(ConnectorSession session, JdbcTableHandle handle)
{
return igniteClient.getColumns(session, handle).stream()
.filter(column -> !IGNITE_DUMMY_ID.equalsIgnoreCase(column.getColumnName()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.spi.security.Identity;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -26,7 +25,6 @@

import java.util.Map;

import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
import static org.junit.jupiter.api.parallel.ExecutionMode.CONCURRENT;
Expand All @@ -49,20 +47,13 @@ public void createQueryRunner()
throws Exception
{
mySqlServer = new TestingMySqlServer();
try {
queryRunner = DistributedQueryRunner.builder(testSessionBuilder().build()).build();
queryRunner.installPlugin(new MySqlPlugin());
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("connection-url", mySqlServer.getJdbcUrl())
.put("user-credential-name", "mysql.user")
.put("password-credential-name", "mysql.password")
.buildOrThrow();
queryRunner.createCatalog("mysql", "mysql", properties);
}
catch (Exception e) {
closeAllSuppress(e, queryRunner, mySqlServer);
throw e;
}
queryRunner = MySqlQueryRunner.builder(mySqlServer)
.addConnectorProperties(ImmutableMap.<String, String>builder()
.put("connection-url", mySqlServer.getJdbcUrl())
.put("user-credential-name", "mysql.user")
.put("password-credential-name", "mysql.password")
.buildOrThrow())
.build();
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
phoenixClient.getTableProperties(session, handle));
}

private List<ColumnMetadata> getColumnMetadata(ConnectorSession session, JdbcTableHandle handle)
@Override
public List<ColumnMetadata> getColumnMetadata(ConnectorSession session, JdbcTableHandle handle)
{
return phoenixClient.getColumns(session, handle).stream()
.filter(column -> !ROWKEY.equalsIgnoreCase(column.getColumnName()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.postgresql;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.jdbc.BaseJdbcConnectionCreationTest;
import io.trino.plugin.jdbc.ConnectionFactory;
import io.trino.plugin.jdbc.DriverConnectionFactory;
import io.trino.plugin.jdbc.ForBaseJdbc;
import io.trino.plugin.jdbc.JdbcPlugin;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.credential.StaticCredentialProvider;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import org.junit.jupiter.api.Test;
import org.postgresql.Driver;

import java.util.Optional;
import java.util.Properties;

import static io.airlift.configuration.ConfigurationAwareModule.combine;
import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
import static io.trino.testing.QueryAssertions.copyTpchTables;
import static io.trino.tpch.TpchTable.NATION;
import static io.trino.tpch.TpchTable.REGION;
import static java.util.Objects.requireNonNull;

// TODO: Implement dedicated test to count number of queries executed.
// This test approximates the number of I/O operations by counting number of times connection is opened since we almost always open a new connection to execute a query.
public class TestPostgreSqlJdbcConnectionAccesses
extends BaseJdbcConnectionCreationTest
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
TestingPostgreSqlServer postgreSqlServer = closeAfterClass(new TestingPostgreSqlServer());
this.connectionFactory = getConnectionCountingConnectionFactory(postgreSqlServer);
DistributedQueryRunner queryRunner = PostgreSqlQueryRunner.builder(postgreSqlServer)
// to make sure we always open connections in the same way
.addCoordinatorProperty("node-scheduler.include-coordinator", "false")
.amendSession(sessionBuilder -> sessionBuilder.setCatalog("counting_postgresql"))
.setAdditionalSetup(runner -> {
runner.installPlugin(new JdbcPlugin(
"counting_postgresql",
combine(new PostgreSqlClientModule(), new TestingPostgreSqlModule(connectionFactory))));
runner.createCatalog("counting_postgresql", "counting_postgresql", ImmutableMap.of(
"connection-url", postgreSqlServer.getJdbcUrl(),
"connection-user", postgreSqlServer.getUser(),
"connection-password", postgreSqlServer.getPassword(),
// disables connection reuse to approximate number of I/O operations since we almost always open a new connection to execute a query
"query.reuse-connection", "false"));
})
.build();
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, ImmutableList.of(NATION, REGION));
return queryRunner;
}

private static ConnectionCountingConnectionFactory getConnectionCountingConnectionFactory(TestingPostgreSqlServer postgreSqlServer)
{
Properties connectionProperties = new Properties();
CredentialProvider credentialProvider = new StaticCredentialProvider(
Optional.of(postgreSqlServer.getUser()),
Optional.of(postgreSqlServer.getPassword()));
DriverConnectionFactory delegate = DriverConnectionFactory.builder(new Driver(), postgreSqlServer.getJdbcUrl(), credentialProvider)
.setConnectionProperties(connectionProperties)
.build();
return new ConnectionCountingConnectionFactory(delegate);
}

@Test
public void testJdbcConnectionCreations()
{
assertJdbcConnections("SELECT * FROM nation LIMIT 1", 5, Optional.empty());
assertJdbcConnections("SELECT * FROM nation ORDER BY nationkey LIMIT 1", 5, Optional.empty());
assertJdbcConnections("SELECT * FROM nation WHERE nationkey = 1", 5, Optional.empty());
assertJdbcConnections("SELECT avg(nationkey) FROM nation", 4, Optional.empty());
assertJdbcConnections("SELECT * FROM nation, region", 6, Optional.empty());
assertJdbcConnections("SELECT * FROM nation n, region r WHERE n.regionkey = r.regionkey", 9, Optional.empty());
assertJdbcConnections("SELECT * FROM nation JOIN region USING(regionkey)", 10, Optional.empty());
assertJdbcConnections("SELECT * FROM information_schema.schemata", 1, Optional.empty());
assertJdbcConnections("SELECT * FROM information_schema.tables", 1, Optional.empty());
assertJdbcConnections("SELECT * FROM information_schema.columns", 5, Optional.empty());
assertJdbcConnections("SELECT * FROM nation", 3, Optional.empty());
assertJdbcConnections("SELECT * FROM TABLE (system.query(query => 'SELECT * FROM tpch.nation'))", 2, Optional.empty());
assertJdbcConnections("CREATE TABLE copy_of_nation AS SELECT * FROM nation", 15, Optional.empty());
assertJdbcConnections("INSERT INTO copy_of_nation SELECT * FROM nation", 13, Optional.empty());
assertJdbcConnections("DELETE FROM copy_of_nation WHERE nationkey = 3", 4, Optional.empty());
assertJdbcConnections("UPDATE copy_of_nation SET name = 'POLAND' WHERE nationkey = 1", 4, Optional.empty());
assertJdbcConnections("MERGE INTO copy_of_nation n USING region r ON r.regionkey= n.regionkey WHEN MATCHED THEN DELETE", 5, Optional.of(MODIFYING_ROWS_MESSAGE));
assertJdbcConnections("DROP TABLE copy_of_nation", 2, Optional.empty());
assertJdbcConnections("SHOW SCHEMAS", 1, Optional.empty());
assertJdbcConnections("SHOW TABLES", 2, Optional.empty());
assertJdbcConnections("SHOW STATS FOR nation", 4, Optional.empty());
assertJdbcConnections("SELECT * FROM system.jdbc.columns WHERE table_cat = 'counting_postgresql'", 5, Optional.empty());
}

private static final class TestingPostgreSqlModule
extends AbstractConfigurationAwareModule
{
private final ConnectionCountingConnectionFactory connectionCountingConnectionFactory;

private TestingPostgreSqlModule(ConnectionCountingConnectionFactory connectionCountingConnectionFactory)
{
this.connectionCountingConnectionFactory = requireNonNull(connectionCountingConnectionFactory, "connectionCountingConnectionFactory is null");
}

@Override
protected void setup(Binder binder) {}

@Provides
@Singleton
@ForBaseJdbc
public ConnectionFactory getConnectionFactory()
{
return connectionCountingConnectionFactory;
}
}
}
Loading
Loading