Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/Spanner-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ The actual number of partitions returned may be smaller or larger than this maxi
This is only a hint. The actual size of each partition may be smaller or larger than this size request.
More information about partition options can be found at https://cloud.google.com/spanner/docs/reference/rest/v1/PartitionOptions

**Schema**: Schema of the Spanner table to read.
**Schema**: Schema of the Spanner table to read. When *import query* is used and column name is not in the metadata table
the field will be nullable.
150 changes: 133 additions & 17 deletions src/main/java/io/cdap/plugin/gcp/spanner/source/SpannerSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.cdap.plugin.gcp.spanner.SpannerArrayConstants;
import io.cdap.plugin.gcp.spanner.SpannerConstants;
import io.cdap.plugin.gcp.spanner.common.SpannerUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
Expand All @@ -62,8 +63,11 @@
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand All @@ -82,6 +86,8 @@ public class SpannerSource extends BatchSource<NullWritable, ResultSet, Structur
private static final Statement.Builder SCHEMA_STATEMENT_BUILDER = Statement.newBuilder(
String.format("SELECT t.column_name,t.spanner_type, t.is_nullable FROM information_schema.columns AS t WHERE " +
" t.table_catalog = '' AND t.table_schema = '' AND t.table_name = @%s", TABLE_NAME));
private static final String LIMIT = "limit";

public static final String NAME = "Spanner";
private final SpannerSourceConfig config;
private Schema schema;
Expand Down Expand Up @@ -243,26 +249,55 @@ private Schema getSchema(FailureCollector collector) {
projectId)) {
DatabaseClient databaseClient =
spanner.getDatabaseClient(DatabaseId.of(projectId, config.instance, config.database));
Statement getTableSchemaStatement = SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(config.table).build();
try (ResultSet resultSet = databaseClient.singleUse().executeQuery(getTableSchemaStatement)) {
List<Schema.Field> schemaFields = new ArrayList<>();
while (resultSet.next()) {
String columnName = resultSet.getString("column_name");
String spannerType = resultSet.getString("spanner_type");
String nullable = resultSet.getString("is_nullable");
boolean isNullable = "YES".equals(nullable);
Schema typeSchema = parseSchemaFromSpannerTypeString(columnName, spannerType, collector);
if (typeSchema == null) {
// this means there were failures added to failure collector. Continue to collect more failures
continue;
if (Strings.isNullOrEmpty(config.importQuery)) {
Statement getTableSchemaStatement = SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(config.table).build();
try (ResultSet resultSet = databaseClient.singleUse().executeQuery(getTableSchemaStatement)) {
List<Schema.Field> schemaFields = new ArrayList<>();
while (resultSet.next()) {
String columnName = resultSet.getString("column_name");
String spannerType = resultSet.getString("spanner_type");
String nullable = resultSet.getString("is_nullable");
boolean isNullable = "YES".equals(nullable);
Schema typeSchema = parseSchemaFromSpannerTypeString(columnName, spannerType, collector);
if (typeSchema == null) {
// this means there were failures added to failure collector. Continue to collect more failures
continue;
}
Schema fieldSchema = isNullable ? Schema.nullableOf(typeSchema) : typeSchema;
schemaFields.add(Schema.Field.of(columnName, fieldSchema));
}
if (schemaFields.isEmpty() && !collector.getValidationFailures().isEmpty()) {
collector.getOrThrowException();
}
Schema fieldSchema = isNullable ? Schema.nullableOf(typeSchema) : typeSchema;
schemaFields.add(Schema.Field.of(columnName, fieldSchema));
return Schema.recordOf("outputSchema", schemaFields);
}
if (schemaFields.isEmpty() && !collector.getValidationFailures().isEmpty()) {
collector.getOrThrowException();
} else {
final Map<String, Boolean> nullableFields = getFieldsNullability(databaseClient);
Statement importQueryStatement = getStatementForOneRow(config.importQuery);
List<Schema.Field> schemaFields = new ArrayList<>();
try (ResultSet resultSet = databaseClient.singleUse().executeQuery(importQueryStatement)) {
while (resultSet.next()) {
final List<Type.StructField> structFields = resultSet.getCurrentRowAsStruct().getType().getStructFields();
for (Type.StructField structField : structFields) {
final Type fieldSpannerType = structField.getType();
final String columnName = structField.getName();
// there are cases when column name is not in metadata table such as "Select FirstName as name",
// so fallback is nullable
final boolean isNullable = nullableFields.getOrDefault(columnName, true);
final Schema typeSchema = parseSchemaFromSpannerType(fieldSpannerType, columnName, collector);
if (typeSchema == null) {
// this means there were failures added to failure collector. Continue to collect more failures
continue;
}
Schema fieldSchema = isNullable ? Schema.nullableOf(typeSchema) : typeSchema;
schemaFields.add(Schema.Field.of(columnName, fieldSchema));
}
}
if (schemaFields.isEmpty() && !collector.getValidationFailures().isEmpty()) {
collector.getOrThrowException();
}
return Schema.recordOf("outputSchema", schemaFields);
}
return Schema.recordOf("outputSchema", schemaFields);
}
} catch (IOException e) {
collector.addFailure("Unable to get Spanner Client: " + e.getMessage(), null)
Expand All @@ -274,6 +309,22 @@ private Schema getSchema(FailureCollector collector) {

}

private Statement getStatementForOneRow(String importQuery) {
String query;
// Matches any String containing the word 'limit' followed by a number
// ex: SELECT NAME FROM TABLE LIMIT 15
String regex = "^(?:[^;']|(?:'[^']+'))+ LIMIT +\\d+(.*)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment that explains what this regex matches.

Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE | Pattern.CASE_INSENSITIVE);
if (pattern.matcher(importQuery).matches()) {
int index = StringUtils.lastIndexOf(importQuery, LIMIT);
String substringToReplace = importQuery.substring(index);
query = importQuery.replace(substringToReplace, "limit 1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can break in some corner cases where the table name contains "limit" substring. For example something like SELECT <columns> from limited.
Can you make sure we only replace "limit < number >" with "limit 1".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} else {
query = String.format("%s limit 1", importQuery);
}
return Statement.newBuilder(query).build();
}

@Nullable
private Schema parseSchemaFromSpannerTypeString(String columnName,
String spannerType, FailureCollector collector) {
Expand Down Expand Up @@ -323,4 +374,69 @@ private Schema parseSchemaFromSpannerTypeString(String columnName,
}
return null;
}

@Nullable
Schema parseSchemaFromSpannerType(Type spannerType, String columnName, FailureCollector collector) {
final Type.Code code = spannerType.getCode();

if (code == Type.Code.ARRAY) {
final Type arrayElementType = spannerType.getArrayElementType();
final Type.Code arrayElementTypeCode = arrayElementType.getCode();
switch (arrayElementTypeCode) {
case BOOL:
return Schema.arrayOf(Schema.of(Schema.Type.BOOLEAN));
case INT64:
return Schema.arrayOf(Schema.of(Schema.Type.LONG));
case FLOAT64:
return Schema.arrayOf(Schema.of(Schema.Type.DOUBLE));
case STRING:
return Schema.arrayOf(Schema.of(Schema.Type.STRING));
case BYTES:
return Schema.arrayOf(Schema.of(Schema.Type.BYTES));
case TIMESTAMP:
return Schema.arrayOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS));
case DATE:
return Schema.arrayOf(Schema.of(Schema.LogicalType.DATE));
default:
collector.addFailure(String.format("Column '%s' has unsupported type '%s'.", columnName, spannerType), null);
return null;
}
} else {
switch (code) {
case BOOL:
return Schema.of(Schema.Type.BOOLEAN);
case INT64:
return Schema.of(Schema.Type.LONG);
case FLOAT64:
return Schema.of(Schema.Type.DOUBLE);
case STRING:
return Schema.of(Schema.Type.STRING);
case BYTES:
return Schema.of(Schema.Type.BYTES);
case TIMESTAMP:
return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
case DATE:
return Schema.of(Schema.LogicalType.DATE);
default:
collector.addFailure(String.format("Column '%s' has unsupported type '%s'.", columnName, spannerType), null);
return null;
}
}
}

/** Get from table metadata nullability for each field
* @param databaseClient Database Client
* @return Map where key is field name and value is nullability true or false
*/
private Map<String, Boolean> getFieldsNullability(DatabaseClient databaseClient) {
Statement tableMetadataStatement = SCHEMA_STATEMENT_BUILDER.bind(TABLE_NAME).to(config.table).build();
Map<String, Boolean> nullableState = new HashMap<>();
ResultSet resultSet = databaseClient.singleUse().executeQuery(tableMetadataStatement);
while (resultSet.next()) {
String columnName = resultSet.getString("column_name");
String nullable = resultSet.getString("is_nullable");
nullableState.put(columnName, "YES".equals(nullable));
}
return nullableState;
}
}