From 0f66e6e4c7d48a977a30c424282fd59413f98e4b Mon Sep 17 00:00:00 2001 From: ailegion Date: Thu, 17 Dec 2020 15:40:53 +0100 Subject: [PATCH] updated: SpannerSource schema to match import query --- docs/Spanner-batchsource.md | 3 +- .../gcp/spanner/source/SpannerSource.java | 150 ++++++++++++++++-- 2 files changed, 135 insertions(+), 18 deletions(-) diff --git a/docs/Spanner-batchsource.md b/docs/Spanner-batchsource.md index 003bf2ecd6..d56ff99550 100644 --- a/docs/Spanner-batchsource.md +++ b/docs/Spanner-batchsource.md @@ -52,4 +52,5 @@ The actual number of partitions returned may be smaller or larger than this maxi This is only a hint. The actual size of each partition may be smaller or larger than this size request. More information about partition options can be found at https://cloud.google.com/spanner/docs/reference/rest/v1/PartitionOptions -**Schema**: Schema of the Spanner table to read. +**Schema**: Schema of the Spanner table to read. When *import query* is used and column name is not in the metadata table +the field will be nullable. diff --git a/src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSource.java b/src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSource.java index 5e09251ec0..0dff0c69a7 100644 --- a/src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSource.java +++ b/src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSource.java @@ -52,6 +52,7 @@ import io.cdap.plugin.gcp.spanner.SpannerArrayConstants; import io.cdap.plugin.gcp.spanner.SpannerConstants; import io.cdap.plugin.gcp.spanner.common.SpannerUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.slf4j.Logger; @@ -62,8 +63,11 @@ import java.io.ObjectOutputStream; import java.util.ArrayList; import java.util.Base64; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -82,6 +86,8 @@ public class SpannerSource extends BatchSource schemaFields = new ArrayList<>(); - while (resultSet.next()) { - String columnName = resultSet.getString("column_name"); - String spannerType = resultSet.getString("spanner_type"); - String nullable = resultSet.getString("is_nullable"); - boolean isNullable = "YES".equals(nullable); - Schema typeSchema = parseSchemaFromSpannerTypeString(columnName, spannerType, collector); - if (typeSchema == null) { - // this means there were failures added to failure collector. Continue to collect more failures - continue; + if (Strings.isNullOrEmpty(config.importQuery)) { + Statement getTableSchemaStatement = SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(config.table).build(); + try (ResultSet resultSet = databaseClient.singleUse().executeQuery(getTableSchemaStatement)) { + List schemaFields = new ArrayList<>(); + while (resultSet.next()) { + String columnName = resultSet.getString("column_name"); + String spannerType = resultSet.getString("spanner_type"); + String nullable = resultSet.getString("is_nullable"); + boolean isNullable = "YES".equals(nullable); + Schema typeSchema = parseSchemaFromSpannerTypeString(columnName, spannerType, collector); + if (typeSchema == null) { + // this means there were failures added to failure collector. Continue to collect more failures + continue; + } + Schema fieldSchema = isNullable ? Schema.nullableOf(typeSchema) : typeSchema; + schemaFields.add(Schema.Field.of(columnName, fieldSchema)); + } + if (schemaFields.isEmpty() && !collector.getValidationFailures().isEmpty()) { + collector.getOrThrowException(); } - Schema fieldSchema = isNullable ? Schema.nullableOf(typeSchema) : typeSchema; - schemaFields.add(Schema.Field.of(columnName, fieldSchema)); + return Schema.recordOf("outputSchema", schemaFields); } - if (schemaFields.isEmpty() && !collector.getValidationFailures().isEmpty()) { - collector.getOrThrowException(); + } else { + final Map nullableFields = getFieldsNullability(databaseClient); + Statement importQueryStatement = getStatementForOneRow(config.importQuery); + List schemaFields = new ArrayList<>(); + try (ResultSet resultSet = databaseClient.singleUse().executeQuery(importQueryStatement)) { + while (resultSet.next()) { + final List structFields = resultSet.getCurrentRowAsStruct().getType().getStructFields(); + for (Type.StructField structField : structFields) { + final Type fieldSpannerType = structField.getType(); + final String columnName = structField.getName(); + // there are cases when column name is not in metadata table such as "Select FirstName as name", + // so fallback is nullable + final boolean isNullable = nullableFields.getOrDefault(columnName, true); + final Schema typeSchema = parseSchemaFromSpannerType(fieldSpannerType, columnName, collector); + if (typeSchema == null) { + // this means there were failures added to failure collector. Continue to collect more failures + continue; + } + Schema fieldSchema = isNullable ? Schema.nullableOf(typeSchema) : typeSchema; + schemaFields.add(Schema.Field.of(columnName, fieldSchema)); + } + } + if (schemaFields.isEmpty() && !collector.getValidationFailures().isEmpty()) { + collector.getOrThrowException(); + } + return Schema.recordOf("outputSchema", schemaFields); } - return Schema.recordOf("outputSchema", schemaFields); } } catch (IOException e) { collector.addFailure("Unable to get Spanner Client: " + e.getMessage(), null) @@ -274,6 +309,22 @@ private Schema getSchema(FailureCollector collector) { } + private Statement getStatementForOneRow(String importQuery) { + String query; + // Matches any String containing the word 'limit' followed by a number + // ex: SELECT NAME FROM TABLE LIMIT 15 + String regex = "^(?:[^;']|(?:'[^']+'))+ LIMIT +\\d+(.*)"; + Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE | Pattern.CASE_INSENSITIVE); + if (pattern.matcher(importQuery).matches()) { + int index = StringUtils.lastIndexOf(importQuery, LIMIT); + String substringToReplace = importQuery.substring(index); + query = importQuery.replace(substringToReplace, "limit 1"); + } else { + query = String.format("%s limit 1", importQuery); + } + return Statement.newBuilder(query).build(); + } + @Nullable private Schema parseSchemaFromSpannerTypeString(String columnName, String spannerType, FailureCollector collector) { @@ -323,4 +374,69 @@ private Schema parseSchemaFromSpannerTypeString(String columnName, } return null; } + + @Nullable + Schema parseSchemaFromSpannerType(Type spannerType, String columnName, FailureCollector collector) { + final Type.Code code = spannerType.getCode(); + + if (code == Type.Code.ARRAY) { + final Type arrayElementType = spannerType.getArrayElementType(); + final Type.Code arrayElementTypeCode = arrayElementType.getCode(); + switch (arrayElementTypeCode) { + case BOOL: + return Schema.arrayOf(Schema.of(Schema.Type.BOOLEAN)); + case INT64: + return Schema.arrayOf(Schema.of(Schema.Type.LONG)); + case FLOAT64: + return Schema.arrayOf(Schema.of(Schema.Type.DOUBLE)); + case STRING: + return Schema.arrayOf(Schema.of(Schema.Type.STRING)); + case BYTES: + return Schema.arrayOf(Schema.of(Schema.Type.BYTES)); + case TIMESTAMP: + return Schema.arrayOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)); + case DATE: + return Schema.arrayOf(Schema.of(Schema.LogicalType.DATE)); + default: + collector.addFailure(String.format("Column '%s' has unsupported type '%s'.", columnName, spannerType), null); + return null; + } + } else { + switch (code) { + case BOOL: + return Schema.of(Schema.Type.BOOLEAN); + case INT64: + return Schema.of(Schema.Type.LONG); + case FLOAT64: + return Schema.of(Schema.Type.DOUBLE); + case STRING: + return Schema.of(Schema.Type.STRING); + case BYTES: + return Schema.of(Schema.Type.BYTES); + case TIMESTAMP: + return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + case DATE: + return Schema.of(Schema.LogicalType.DATE); + default: + collector.addFailure(String.format("Column '%s' has unsupported type '%s'.", columnName, spannerType), null); + return null; + } + } + } + + /** Get from table metadata nullability for each field + * @param databaseClient Database Client + * @return Map where key is field name and value is nullability true or false + */ + private Map getFieldsNullability(DatabaseClient databaseClient) { + Statement tableMetadataStatement = SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(config.table).build(); + Map nullableState = new HashMap<>(); + ResultSet resultSet = databaseClient.singleUse().executeQuery(tableMetadataStatement); + while (resultSet.next()) { + String columnName = resultSet.getString("column_name"); + String nullable = resultSet.getString("is_nullable"); + nullableState.put(columnName, "YES".equals(nullable)); + } + return nullableState; + } }