Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor cleanup in BigQuery #24068

Merged
merged 7 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
public enum BigQueryErrorCode
implements ErrorCodeSupplier
{
BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED(0, EXTERNAL),
BIGQUERY_DATETIME_PARSING_ERROR(1, EXTERNAL),
// BIGQUERY_VIEW_DESTINATION_TABLE_CREATION_FAILED(0, EXTERNAL),
// BIGQUERY_DATETIME_PARSING_ERROR(1, EXTERNAL),
BIGQUERY_FAILED_TO_EXECUTE_QUERY(2, EXTERNAL),
BIGQUERY_AMBIGUOUS_OBJECT_NAME(3, EXTERNAL),
BIGQUERY_LISTING_DATASET_ERROR(4, EXTERNAL),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
protected <T, R> Stream<R> processInParallel(List<T> list, Function<T, R> function)
{
if (list.size() == 1) {
return Stream.of(function.apply(list.get(0)));
return Stream.of(function.apply(list.getFirst()));
}

List<ListenableFuture<R>> futures = list.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ public ConnectorPageSource createPageSource(
log.debug("createPageSource(transaction=%s, session=%s, split=%s, table=%s, columns=%s)", transaction, session, split, table, columns);
BigQuerySplit bigQuerySplit = (BigQuerySplit) split;

Set<String> projectedColumnNames = bigQuerySplit.getColumns().stream().map(BigQueryColumnHandle::name).collect(Collectors.toSet());
Set<String> projectedColumnNames = bigQuerySplit.columns().stream().map(BigQueryColumnHandle::name).collect(Collectors.toSet());
// because we apply logic (download only parent columns - BigQueryMetadata.projectParentColumns)
// columns and split columns could differ
columns.stream()
.map(BigQueryColumnHandle.class::cast)
.forEach(column -> checkArgument(projectedColumnNames.contains(column.name()), "projected columns should contain all reader columns"));
if (bigQuerySplit.representsEmptyProjection()) {
return new BigQueryEmptyProjectionPageSource(bigQuerySplit.getEmptyRowsToGenerate());
return new BigQueryEmptyProjectionPageSource(bigQuerySplit.emptyRowsToGenerate());
}

// not empty projection
Expand All @@ -98,9 +98,9 @@ private ConnectorPageSource createPageSource(
BigQuerySplit split,
List<BigQueryColumnHandle> columnHandles)
{
return switch (split.getMode()) {
return switch (split.mode()) {
case STORAGE -> createStoragePageSource(session, split, columnHandles);
case QUERY -> createQueryPageSource(session, table, columnHandles, split.getFilter());
case QUERY -> createQueryPageSource(session, table, columnHandles, split.filter());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,58 +13,44 @@
*/
package io.trino.plugin.bigquery;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.ConnectorSplit;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;

import static com.google.common.base.MoreObjects.toStringHelper;
import static io.airlift.slice.SizeOf.estimatedSizeOf;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.trino.plugin.bigquery.BigQuerySplit.Mode.QUERY;
import static io.trino.plugin.bigquery.BigQuerySplit.Mode.STORAGE;
import static java.util.Objects.requireNonNull;

public class BigQuerySplit
public record BigQuerySplit(
Mode mode,
String streamName,
String schemaString,
List<BigQueryColumnHandle> columns,
long emptyRowsToGenerate,
Optional<String> filter,
OptionalInt dataSize)
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = instanceSize(BigQuerySplit.class);

private static final int NO_ROWS_TO_GENERATE = -1;

private final Mode mode;
private final String streamName;
private final String schemaString;
private final List<BigQueryColumnHandle> columns;
private final long emptyRowsToGenerate;
private final Optional<String> filter;
private final OptionalInt dataSize;

// do not use directly, it is public only for Jackson
@JsonCreator
public BigQuerySplit(
@JsonProperty("mode") Mode mode,
@JsonProperty("streamName") String streamName,
@JsonProperty("schemaString") String schemaString,
@JsonProperty("columns") List<BigQueryColumnHandle> columns,
@JsonProperty("emptyRowsToGenerate") long emptyRowsToGenerate,
@JsonProperty("filter") Optional<String> filter,
@JsonProperty("dataSize") OptionalInt dataSize)
public BigQuerySplit
{
this.mode = requireNonNull(mode, "mode is null");
this.streamName = requireNonNull(streamName, "streamName cannot be null");
this.schemaString = requireNonNull(schemaString, "schemaString cannot be null");
this.columns = ImmutableList.copyOf(requireNonNull(columns, "columns cannot be null"));
this.emptyRowsToGenerate = emptyRowsToGenerate;
this.filter = requireNonNull(filter, "filter is null");
this.dataSize = requireNonNull(dataSize, "dataSize is null");
requireNonNull(mode, "mode is null");
requireNonNull(streamName, "streamName cannot be null");
requireNonNull(schemaString, "schemaString cannot be null");
columns = ImmutableList.copyOf(requireNonNull(columns, "columns cannot be null"));
requireNonNull(filter, "filter is null");
requireNonNull(dataSize, "dataSize is null");
}

static BigQuerySplit forStream(String streamName, String schemaString, List<BigQueryColumnHandle> columns, OptionalInt dataSize)
Expand All @@ -82,48 +68,6 @@ static BigQuerySplit emptyProjection(long numberOfRows)
return new BigQuerySplit(STORAGE, "", "", ImmutableList.of(), numberOfRows, Optional.empty(), OptionalInt.of(0));
}

@JsonProperty
public Mode getMode()
{
return mode;
}

@JsonProperty
public String getStreamName()
{
return streamName;
}

@JsonProperty
public String getSchemaString()
{
return schemaString;
}

@JsonProperty
public List<BigQueryColumnHandle> getColumns()
{
return columns;
}

@JsonProperty
public long getEmptyRowsToGenerate()
{
return emptyRowsToGenerate;
}

@JsonProperty
public Optional<String> getFilter()
{
return filter;
}

@JsonProperty
public OptionalInt getDataSize()
{
return dataSize;
}

@Override
public Map<String, String> getSplitInfo()
{
Expand All @@ -143,41 +87,6 @@ public long getRetainedSizeInBytes()
+ estimatedSizeOf(columns, BigQueryColumnHandle::getRetainedSizeInBytes);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BigQuerySplit that = (BigQuerySplit) o;
return Objects.equals(mode, that.mode) &&
Objects.equals(streamName, that.streamName) &&
Objects.equals(schemaString, that.schemaString) &&
Objects.equals(columns, that.columns) &&
emptyRowsToGenerate == that.emptyRowsToGenerate;
}

@Override
public int hashCode()
{
return Objects.hash(mode, streamName, schemaString, columns, emptyRowsToGenerate);
}

@Override
public String toString()
{
return toStringHelper(this)
.add("mode", mode)
.add("streamName", streamName)
.add("schemaString", schemaString)
.add("columns", columns)
.add("emptyRowsToGenerate", emptyRowsToGenerate)
.toString();
}

boolean representsEmptyProjection()
{
return emptyRowsToGenerate != NO_ROWS_TO_GENERATE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ public BigQueryStorageArrowPageSource(
this.bigQueryReadClient = requireNonNull(bigQueryReadClient, "bigQueryReadClient is null");
this.executor = requireNonNull(executor, "executor is null");
requireNonNull(split, "split is null");
this.streamName = split.getStreamName();
this.streamName = split.streamName();
requireNonNull(columns, "columns is null");
Schema schema = deserializeSchema(split.getSchemaString());
log.debug("Starting to read from %s", split.getStreamName());
responses = new ReadRowsHelper(bigQueryReadClient, split.getStreamName(), maxReadRowsRetries).readRows();
Schema schema = deserializeSchema(split.schemaString());
log.debug("Starting to read from %s", split.streamName());
responses = new ReadRowsHelper(bigQueryReadClient, split.streamName(), maxReadRowsRetries).readRows();
nextResponse = CompletableFuture.supplyAsync(this::getResponse, executor);
this.streamBufferAllocator = allocator.newChildAllocator(split.getStreamName(), 1024, Long.MAX_VALUE);
this.streamBufferAllocator = allocator.newChildAllocator(split.streamName(), 1024, Long.MAX_VALUE);
this.bigQueryArrowToPageConverter = new BigQueryArrowToPageConverter(typeManager, streamBufferAllocator, schema, columns);
this.pageBuilder = new PageBuilder(columns.stream()
.map(BigQueryColumnHandle::trinoType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ public BigQueryStorageAvroPageSource(
this.executor = requireNonNull(executor, "executor is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
requireNonNull(split, "split is null");
this.streamName = split.getStreamName();
this.avroSchema = parseSchema(split.getSchemaString());
this.streamName = split.streamName();
this.avroSchema = parseSchema(split.schemaString());
this.columns = requireNonNull(columns, "columns is null");
this.pageBuilder = new PageBuilder(columns.stream()
.map(BigQueryColumnHandle::trinoType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testNonLowerCaseViewName()
String trinoSchema = bigQuerySchema.toLowerCase(ENGLISH);
String namePrefix = format("%s.Test_Case", bigQuerySchema);

try (AutoCloseable schema = withSchema(bigQuerySchema);
try (AutoCloseable _ = withSchema(bigQuerySchema);
TestView view = new TestView(bigQuerySqlExecutor, namePrefix, "SELECT 'a' AS lower_case_name, 'b' AS Mixed_Case_Name, 'c' AS UPPER_CASE_NAME")) {
String viewName = view.getName().substring(bigQuerySchema.length() + 1).toLowerCase(ENGLISH);
assertThat(computeActual("SHOW TABLES FROM " + trinoSchema).getOnlyColumn()).contains(viewName);
Expand Down Expand Up @@ -216,7 +216,7 @@ public void testCreateSchemaNameClash()
throws Exception
{
String schemaName = "Test_Create_Case_Sensitive_Clash_" + randomNameSuffix();
try (AutoCloseable schema = withSchema(schemaName)) {
try (AutoCloseable _ = withSchema(schemaName)) {
assertQueryFails("CREATE SCHEMA " + schemaName.toLowerCase(ENGLISH), ".*Schema 'bigquery\\.\\Q" + schemaName.toLowerCase(ENGLISH) + "\\E' already exists");
}
}
Expand All @@ -226,7 +226,7 @@ public void testDropSchema()
throws Exception
{
String schemaName = "Test_Drop_Case_Sensitive_" + randomNameSuffix();
try (AutoCloseable schema = withSchema(schemaName)) {
try (AutoCloseable _ = withSchema(schemaName)) {
assertUpdate("DROP SCHEMA " + schemaName.toLowerCase(ENGLISH));
}
}
Expand All @@ -236,8 +236,8 @@ public void testDropSchemaNameClash()
throws Exception
{
String schemaName = "Test_Drop_Case_Sensitive_Clash_" + randomNameSuffix();
try (AutoCloseable schema = withSchema(schemaName);
AutoCloseable secondSchema = withSchema(schemaName.toLowerCase(ENGLISH))) {
try (AutoCloseable _ = withSchema(schemaName);
AutoCloseable _ = withSchema(schemaName.toLowerCase(ENGLISH))) {
assertQueryFails("DROP SCHEMA " + schemaName.toLowerCase(ENGLISH), "Found ambiguous names in BigQuery.*");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,18 +300,17 @@ private void testEmptyProjection(Consumer<String> createTable, Consumer<String>
@Override
protected Optional<DataMappingTestSetup> filterDataMappingSmokeTestData(DataMappingTestSetup dataMappingTestSetup)
{
switch (dataMappingTestSetup.getTrinoTypeName()) {
case "real":
case "char(3)":
case "time":
case "time(3)":
case "time(6)":
case "timestamp":
case "timestamp(3)":
case "timestamp(3) with time zone":
return Optional.of(dataMappingTestSetup.asUnsupported());
}
return Optional.of(dataMappingTestSetup);
return switch (dataMappingTestSetup.getTrinoTypeName()) {
case "real",
"char(3)",
"time",
"time(3)",
"time(6)",
"timestamp",
"timestamp(3)",
"timestamp(3) with time zone" -> Optional.of(dataMappingTestSetup.asUnsupported());
default -> Optional.of(dataMappingTestSetup);
};
}

@Override
Expand Down Expand Up @@ -615,17 +614,18 @@ public void testPredicatePushdownOnView()
public void testShowCreateTable()
{
assertThat((String) computeActual("SHOW CREATE TABLE orders").getOnlyValue())
.isEqualTo("CREATE TABLE bigquery.tpch.orders (\n" +
" orderkey bigint NOT NULL,\n" +
" custkey bigint NOT NULL,\n" +
" orderstatus varchar NOT NULL,\n" +
" totalprice double NOT NULL,\n" +
" orderdate date NOT NULL,\n" +
" orderpriority varchar NOT NULL,\n" +
" clerk varchar NOT NULL,\n" +
" shippriority bigint NOT NULL,\n" +
" comment varchar NOT NULL\n" +
")");
.isEqualTo("""
CREATE TABLE bigquery.tpch.orders (
orderkey bigint NOT NULL,
custkey bigint NOT NULL,
orderstatus varchar NOT NULL,
totalprice double NOT NULL,
orderdate date NOT NULL,
orderpriority varchar NOT NULL,
clerk varchar NOT NULL,
shippriority bigint NOT NULL,
comment varchar NOT NULL
)""");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testSelectFailsForColumnName()
assertThat(query("SELECT * FROM " + tableName))
.failure().hasMessageMatching("(Cannot create read|Invalid Avro schema).*(Illegal initial character|Invalid name).*");
assertThat(bigQuerySqlExecutor.executeQuery("SELECT * FROM " + tableName).getValues())
.extracting(field -> field.get(0).getStringValue())
.extracting(field -> field.getFirst().getStringValue())
.containsExactly("test value");
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,18 +155,13 @@ private void logObjectsCount(String schemaName)

private static String getDropStatement(String schemaName, String objectName, String objectType)
{
switch (objectType) {
case "BASE TABLE":
return format("DROP TABLE IF EXISTS %s.%s", quoted(schemaName), quoted(objectName));
case "VIEW":
return format("DROP VIEW IF EXISTS %s.%s", quoted(schemaName), quoted(objectName));
case "MATERIALIZED VIEW":
return format("DROP MATERIALIZED VIEW IF EXISTS %s.%s", quoted(schemaName), quoted(objectName));
case "SNAPSHOT":
return format("DROP SNAPSHOT TABLE IF EXISTS %s.%s", quoted(schemaName), quoted(objectName));
default:
throw new IllegalArgumentException("Unexpected object type " + objectType);
}
return switch (objectType) {
case "BASE TABLE" -> format("DROP TABLE IF EXISTS %s.%s", quoted(schemaName), quoted(objectName));
case "VIEW" -> format("DROP VIEW IF EXISTS %s.%s", quoted(schemaName), quoted(objectName));
case "MATERIALIZED VIEW" -> format("DROP MATERIALIZED VIEW IF EXISTS %s.%s", quoted(schemaName), quoted(objectName));
case "SNAPSHOT" -> format("DROP SNAPSHOT TABLE IF EXISTS %s.%s", quoted(schemaName), quoted(objectName));
default -> throw new IllegalArgumentException("Unexpected object type " + objectType);
};
}

private static String quoted(String identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TestBigQueryType
public void testTimeToStringConverter()
{
assertThat(BigQueryTypeManager.timeToStringConverter(
Long.valueOf(303497217825L)))
303497217825L))
.isEqualTo("'00:00:00.303497'");
}

Expand All @@ -53,7 +53,7 @@ public void testTimestampToStringConverter()
public void testDateToStringConverter()
{
assertThat(BigQueryTypeManager.dateToStringConverter(
Long.valueOf(18352)))
18352L))
.isEqualTo("'2020-03-31'");
}

Expand Down
Loading