Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

populate map from JDBC metadata #77

Open
github-actions bot opened this issue Jun 8, 2023 · 0 comments
Open

populate map from JDBC metadata #77

github-actions bot opened this issue Jun 8, 2023 · 0 comments
Labels

Comments

@github-actions
Copy link

github-actions bot commented Jun 8, 2023

populate map from JDBC metadata

returned by Phoenix among others, maps to TableType.SYSTEM_TABLE.

We know enum constants are upper-case without spaces, so we can't

make things worse.

This can happen if you start JdbcSchema off a "public" PG schema

The tables are not designed to be queried by users, however we do

not filter them as we keep all the other table types.

object, hence try to get it from there if it was not specified by user

because we're creating a proto-type, not a type; before being used, the

proto-type will be copied into a real type factory.

scale = resultSet.getInt(9); // SCALE

"INTEGER".

that we need to re-build our own cache.

https://github.com/databloom-ai/incubator-wayang/blob/c51b95e414816ed2b6ac7f8f3cad5857b40fea3b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/jdbc/JdbcSchema.java#L212

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to you under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.wayang.api.sql.calcite.jdbc;

import com.google.common.collect.*;
import org.apache.calcite.adapter.jdbc.JdbcConvention;
//import org.apache.calcite.adapter.jdbc.JdbcTable;
//import org.apache.calcite.adapter.jdbc.JdbcUtils;
import org.apache.calcite.avatica.AvaticaUtils;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.SqlType;
import org.apache.calcite.linq4j.function.Experimental;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.*;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.*;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactory;
import org.apache.calcite.sql.SqlDialectFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Util;
import org.checkerframework.checker.nullness.qual.Nullable;

import javax.sql.DataSource;
import java.sql.*;
import java.util.*;
import java.util.function.BiFunction;

import static java.util.Objects.requireNonNull;

/**
 * Implementation of {@link Schema} that is backed by a JDBC data source.
 *
 * <p>The tables in the JDBC data source appear to be tables in this schema;
 * queries against this schema are executed against those tables, pushing down
 * as much as possible of the query logic to SQL.</p>
 */
public class JdbcSchema implements Schema {
  final DataSource dataSource;
  final @Nullable String catalog;
  final @Nullable String schema;
  public final SqlDialect dialect;
  final JdbcConvention convention;
  private @Nullable ImmutableMap<String, JdbcTable> tableMap;
  private final boolean snapshot;

  @Experimental
  public static final ThreadLocal<@Nullable Foo> THREAD_METADATA = new ThreadLocal<>();

  private static final Ordering<Iterable<Integer>> VERSION_ORDERING =
      Ordering.<Integer>natural().lexicographical();

  /**
   * Creates a JDBC schema.
   *
   * @param dataSource Data source
   * @param dialect SQL dialect
   * @param convention Calling convention
   * @param catalog Catalog name, or null
   * @param schema Schema name pattern
   */
  public JdbcSchema(DataSource dataSource, SqlDialect dialect,
                    JdbcConvention convention, @Nullable String catalog, @Nullable String schema) {
    this(dataSource, dialect, convention, catalog, schema, null);
  }

  private JdbcSchema(DataSource dataSource, SqlDialect dialect,
                     JdbcConvention convention, @Nullable String catalog, @Nullable String schema,
                     @Nullable ImmutableMap<String, JdbcTable> tableMap) {
    this.dataSource = requireNonNull(dataSource, "dataSource");
    this.dialect = requireNonNull(dialect, "dialect");
    this.convention = convention;
    this.catalog = catalog;
    this.schema = schema;
    this.tableMap = tableMap;
    this.snapshot = tableMap != null;
  }

  public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      DataSource dataSource,
      @Nullable String catalog,
      @Nullable String schema) {
    return create(parentSchema, name, dataSource,
        SqlDialectFactoryImpl.INSTANCE, catalog, schema);
  }

  public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      DataSource dataSource,
      SqlDialectFactory dialectFactory,
      @Nullable String catalog,
      @Nullable String schema) {
    final Expression expression =
        Schemas.subSchemaExpression(parentSchema, name, JdbcSchema.class);
    final SqlDialect dialect = createDialect(dialectFactory, dataSource);
    final JdbcConvention convention =
        JdbcConvention.of(dialect, expression, name);
    return new JdbcSchema(dataSource, dialect, convention, catalog, schema);
  }

  /**
   * Creates a JdbcSchema, taking credentials from a map.
   *
   * @param parentSchema Parent schema
   * @param name Name
   * @param operand Map of property/value pairs
   * @return A JdbcSchema
   */
  public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      Map<String, Object> operand) {
    DataSource dataSource;
    try {
      final String dataSourceName = (String) operand.get("dataSource");
      if (dataSourceName != null) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
      } else {
        final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
        final String jdbcDriver = (String) operand.get("jdbcDriver");
        final String jdbcUser = (String) operand.get("jdbcUser");
        final String jdbcPassword = (String) operand.get("jdbcPassword");
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
      }
    } catch (Exception e) {
      throw new RuntimeException("Error while reading dataSource", e);
    }
    String jdbcCatalog = (String) operand.get("jdbcCatalog");
    String jdbcSchema = (String) operand.get("jdbcSchema");
    String sqlDialectFactory = (String) operand.get("sqlDialectFactory");

    if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
      return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else {
      SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
          SqlDialectFactory.class, sqlDialectFactory);
      return JdbcSchema.create(
          parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
    }
  }

  /**
   * Returns a suitable SQL dialect for the given data source.
   *
   * @param dataSource The data source
   *
   * @deprecated Use {@link #createDialect(SqlDialectFactory, DataSource)} instead
   */
  @Deprecated // to be removed before 2.0
  public static SqlDialect createDialect(DataSource dataSource) {
    return createDialect(SqlDialectFactoryImpl.INSTANCE, dataSource);
  }

  /** Returns a suitable SQL dialect for the given data source. */
  public static SqlDialect createDialect(SqlDialectFactory dialectFactory,
                                         DataSource dataSource) {
    return JdbcUtils.DialectPool.INSTANCE.get(dialectFactory, dataSource);
  }

  /** Creates a JDBC data source with the given specification. */
  public static DataSource dataSource(String url, @Nullable String driverClassName,
                                      @Nullable String username, @Nullable String password) {
    if (url.startsWith("jdbc:hsqldb:")) {
      // Prevent hsqldb from screwing up java.util.logging.
      System.setProperty("hsqldb.reconfig_logging", "false");
    }
    return JdbcUtils.DataSourcePool.INSTANCE.get(url, driverClassName, username,
        password);
  }

  @Override public boolean isMutable() {
    return false;
  }

  @Override public Schema snapshot(SchemaVersion version) {
    return new JdbcSchema(dataSource, dialect, convention, catalog, schema,
        tableMap);
  }

  // Used by generated code.
  public DataSource getDataSource() {
    return dataSource;
  }

  @Override public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) {
    requireNonNull(parentSchema, "parentSchema must not be null for JdbcSchema");
    return Schemas.subSchemaExpression(parentSchema, name, JdbcSchema.class);
  }

  protected Multimap<String, Function> getFunctions() {
    // TODO: populate map from JDBC metadata
    return ImmutableMultimap.of();
  }

  @Override public final Collection<Function> getFunctions(String name) {
    return getFunctions().get(name); // never null
  }

  @Override public final Set<String> getFunctionNames() {
    return getFunctions().keySet();
  }

  private ImmutableMap<String, JdbcTable> computeTables() {
    Connection connection = null;
    ResultSet resultSet = null;
    try {
      connection = dataSource.getConnection();
      final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
      final String catalog = catalogSchema.left;
      final String schema = catalogSchema.right;
      final Iterable<MetaImpl.MetaTable> tableDefs;
      Foo threadMetadata = THREAD_METADATA.get();
      if (threadMetadata != null) {
        tableDefs = threadMetadata.apply(catalog, schema);
      } else {
        final List<MetaImpl.MetaTable> tableDefList = new ArrayList<>();
        final DatabaseMetaData metaData = connection.getMetaData();
        resultSet = metaData.getTables(catalog, schema, null, null);
        while (resultSet.next()) {
          final String catalogName = resultSet.getString(1);
          final String schemaName = resultSet.getString(2);
          final String tableName = resultSet.getString(3);
          final String tableTypeName = resultSet.getString(4);
          tableDefList.add(
              new MetaImpl.MetaTable(catalogName, schemaName, tableName,
                  tableTypeName));
        }
        tableDefs = tableDefList;
      }

      final ImmutableMap.Builder<String, JdbcTable> builder =
          ImmutableMap.builder();
      for (MetaImpl.MetaTable tableDef : tableDefs) {
        // Clean up table type. In particular, this ensures that 'SYSTEM TABLE',
        // returned by Phoenix among others, maps to TableType.SYSTEM_TABLE.
        // We know enum constants are upper-case without spaces, so we can't
        // make things worse.
        //
        // PostgreSQL returns tableTypeName==null for pg_toast* tables
        // This can happen if you start JdbcSchema off a "public" PG schema
        // The tables are not designed to be queried by users, however we do
        // not filter them as we keep all the other table types.
        final String tableTypeName2 =
            tableDef.tableType == null
            ? null
            : tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
        final TableType tableType =
            Util.enumVal(TableType.OTHER, tableTypeName2);
        if (tableType == TableType.OTHER  && tableTypeName2 != null) {
          System.out.println("Unknown table type: " + tableTypeName2);
        }
        final JdbcTable table =
            new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
                tableDef.tableName, tableType);
        builder.put(tableDef.tableName, table);
      }
      return builder.build();
    } catch (SQLException e) {
      throw new RuntimeException(
          "Exception while reading tables", e);
    } finally {
      close(connection, null, resultSet);
    }
  }

  /** Returns [major, minor] version from a database metadata. */
  private static List<Integer> version(DatabaseMetaData metaData) throws SQLException {
    return ImmutableList.of(metaData.getJDBCMajorVersion(),
        metaData.getJDBCMinorVersion());
  }

  /** Returns a pair of (catalog, schema) for the current connection. */
  private Pair<@Nullable String, @Nullable String> getCatalogSchema(Connection connection)
      throws SQLException {
    final DatabaseMetaData metaData = connection.getMetaData();
    final List<Integer> version41 = ImmutableList.of(4, 1); // JDBC 4.1
    String catalog = this.catalog;
    String schema = this.schema;
    final boolean jdbc41OrAbove =
        VERSION_ORDERING.compare(version(metaData), version41) >= 0;
    if (catalog == null && jdbc41OrAbove) {
      // From JDBC 4.1, catalog and schema can be retrieved from the connection
      // object, hence try to get it from there if it was not specified by user
      catalog = connection.getCatalog();
    }
    if (schema == null && jdbc41OrAbove) {
      schema = connection.getSchema();
      if ("".equals(schema)) {
        schema = null; // PostgreSQL returns useless "" sometimes
      }
    }
    if ((catalog == null || schema == null)
        && metaData.getDatabaseProductName().equals("PostgreSQL")) {
      final String sql = "select current_database(), current_schema()";
      try (Statement statement = connection.createStatement();
           ResultSet resultSet = statement.executeQuery(sql)) {
        if (resultSet.next()) {
          catalog = resultSet.getString(1);
          schema = resultSet.getString(2);
        }
      }
    }
    return Pair.of(catalog, schema);
  }

  @Override public @Nullable Table getTable(String name) {
    return getTableMap(false).get(name);
  }

  private synchronized ImmutableMap<String, JdbcTable> getTableMap(
      boolean force) {
    if (force || tableMap == null) {
      tableMap = computeTables();
    }
    return tableMap;
  }

  RelProtoDataType getRelDataType(String catalogName, String schemaName,
                                  String tableName) throws SQLException {
    Connection connection = null;
    try {
      connection = dataSource.getConnection();
      DatabaseMetaData metaData = connection.getMetaData();
      return getRelDataType(metaData, catalogName, schemaName, tableName);
    } finally {
      close(connection, null, null);
    }
  }

  RelProtoDataType getRelDataType(DatabaseMetaData metaData, String catalogName,
                                  String schemaName, String tableName) throws SQLException {
    final ResultSet resultSet =
        metaData.getColumns(catalogName, schemaName, tableName, null);

    // Temporary type factory, just for the duration of this method. Allowable
    // because we're creating a proto-type, not a type; before being used, the
    // proto-type will be copied into a real type factory.
    final RelDataTypeFactory typeFactory =
        new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
    final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
    while (resultSet.next()) {
      final String columnName = requireNonNull(resultSet.getString(4), "columnName");
      final int dataType = resultSet.getInt(5);
      final String typeString = resultSet.getString(6);
      final int precision;
      final int scale;
      switch (SqlType.valueOf(dataType)) {
      case TIMESTAMP:
      case TIME:
        precision = resultSet.getInt(9); // SCALE
        scale = 0;
        break;
      default:
        precision = resultSet.getInt(7); // SIZE
        scale = resultSet.getInt(9); // SCALE
        break;
      }
      RelDataType sqlType =
          sqlType(typeFactory, dataType, precision, scale, typeString);
      boolean nullable = resultSet.getInt(11) != DatabaseMetaData.columnNoNulls;
      fieldInfo.add(columnName, sqlType).nullable(nullable);
    }
    resultSet.close();
    return RelDataTypeImpl.proto(fieldInfo.build());
  }

  private static RelDataType sqlType(RelDataTypeFactory typeFactory, int dataType,
                                     int precision, int scale, @Nullable String typeString) {
    // Fall back to ANY if type is unknown
    final SqlTypeName sqlTypeName =
        Util.first(SqlTypeName.getNameForJdbcType(dataType), SqlTypeName.ANY);
    switch (sqlTypeName) {
    case ARRAY:
      RelDataType component = null;
      if (typeString != null && typeString.endsWith(" ARRAY")) {
        // E.g. hsqldb gives "INTEGER ARRAY", so we deduce the component type
        // "INTEGER".
        final String remaining = typeString.substring(0,
            typeString.length() - " ARRAY".length());
        component = parseTypeString(typeFactory, remaining);
      }
      if (component == null) {
        component = typeFactory.createTypeWithNullability(
            typeFactory.createSqlType(SqlTypeName.ANY), true);
      }
      return typeFactory.createArrayType(component, -1);
    default:
      break;
    }
    if (precision >= 0
        && scale >= 0
        && sqlTypeName.allowsPrecScale(true, true)) {
      return typeFactory.createSqlType(sqlTypeName, precision, scale);
    } else if (precision >= 0 && sqlTypeName.allowsPrecNoScale()) {
      return typeFactory.createSqlType(sqlTypeName, precision);
    } else {
      assert sqlTypeName.allowsNoPrecNoScale();
      return typeFactory.createSqlType(sqlTypeName);
    }
  }

  /** Given "INTEGER", returns BasicSqlType(INTEGER).
   * Given "VARCHAR(10)", returns BasicSqlType(VARCHAR, 10).
   * Given "NUMERIC(10, 2)", returns BasicSqlType(NUMERIC, 10, 2). */
  private static RelDataType parseTypeString(RelDataTypeFactory typeFactory,
                                             String typeString) {
    int precision = -1;
    int scale = -1;
    int open = typeString.indexOf("(");
    if (open >= 0) {
      int close = typeString.indexOf(")", open);
      if (close >= 0) {
        String rest = typeString.substring(open + 1, close);
        typeString = typeString.substring(0, open);
        int comma = rest.indexOf(",");
        if (comma >= 0) {
          precision = Integer.parseInt(rest.substring(0, comma));
          scale = Integer.parseInt(rest.substring(comma));
        } else {
          precision = Integer.parseInt(rest);
        }
      }
    }
    try {
      final SqlTypeName typeName = SqlTypeName.valueOf(typeString);
      return typeName.allowsPrecScale(true, true)
          ? typeFactory.createSqlType(typeName, precision, scale)
          : typeName.allowsPrecScale(true, false)
          ? typeFactory.createSqlType(typeName, precision)
          : typeFactory.createSqlType(typeName);
    } catch (IllegalArgumentException e) {
      return typeFactory.createTypeWithNullability(
          typeFactory.createSqlType(SqlTypeName.ANY), true);
    }
  }

  @Override public Set<String> getTableNames() {
    // This method is called during a cache refresh. We can take it as a signal
    // that we need to re-build our own cache.
    return getTableMap(!snapshot).keySet();
  }

  protected Map<String, RelProtoDataType> getTypes() {
    // TODO: populate map from JDBC metadata
    return ImmutableMap.of();
  }

  @Override public @Nullable RelProtoDataType getType(String name) {
    return getTypes().get(name);
  }

  @Override public Set<String> getTypeNames() {
    //noinspection RedundantCast
    return (Set<String>) getTypes().keySet();
  }

  @Override public @Nullable Schema getSubSchema(String name) {
    // JDBC does not support sub-schemas.
    return null;
  }

  @Override public Set<String> getSubSchemaNames() {
    return ImmutableSet.of();
  }

  private static void close(
      @Nullable Connection connection,
      @Nullable Statement statement,
      @Nullable ResultSet resultSet) {
    if (resultSet != null) {
      try {
        resultSet.close();
      } catch (SQLException e) {
        // ignore
      }
    }
    if (statement != null) {
      try {
        statement.close();
      } catch (SQLException e) {
        // ignore
      }
    }
    if (connection != null) {
      try {
        connection.close();
      } catch (SQLException e) {
        // ignore
      }
    }
  }

  /** Schema factory that creates a
   * {@link org.apache.calcite.adapter.jdbc.JdbcSchema}.
   *
   * <p>This allows you to create a jdbc schema inside a model.json file, like
   * this:
   *
   * <blockquote><pre>
   * {
   *   "version": "1.0",
   *   "defaultSchema": "FOODMART_CLONE",
   *   "schemas": [
   *     {
   *       "name": "FOODMART_CLONE",
   *       "type": "custom",
   *       "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
   *       "operand": {
   *         "jdbcDriver": "com.mysql.jdbc.Driver",
   *         "jdbcUrl": "jdbc:mysql://localhost/foodmart",
   *         "jdbcUser": "foodmart",
   *         "jdbcPassword": "foodmart"
   *       }
   *     }
   *   ]
   * }</pre></blockquote>
   */
  public static class Factory implements SchemaFactory {
    public static final Factory INSTANCE = new Factory();

    private Factory() {}

    @Override public Schema create(
        SchemaPlus parentSchema,
        String name,
        Map<String, Object> operand) {
      return JdbcSchema.create(parentSchema, name, operand);
    }
  }

  /** Do not use. */
  @Experimental
  public interface Foo
      extends BiFunction<@Nullable String, @Nullable String, Iterable<MetaImpl.MetaTable>> {
  }
}

d7b3ff8e666398287175c746713394a77f6cd125

@github-actions github-actions bot added the todo label Jun 8, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

0 participants