Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ public synchronized Block createBlock(Schema schema)
List<FieldVector> vectors = new ArrayList();
try {
for (Field next : schema.getFields()) {
vectors.add(next.createVector(rootAllocator));
FieldVector vector = next.createVector(rootAllocator);
vector.allocateNew();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussing this offline; but just for the sake of keeping this PR in the loop; is allocateNew() required now in the newer version? Since we never actually had to do this before? and is the default allocation good enough initially?

vectors.add(vector);
}
vectorSchemaRoot = new VectorSchemaRoot(schema, vectors, 0);
block = new Block(id, schema, vectorSchemaRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,6 @@ public void writeRows(RowWriter rowWriter)
throw (ex instanceof RuntimeException) ? (RuntimeException) ex : new RuntimeException(ex);
}

if (rows > maxRowsPerCall) {
Copy link
Contributor

@AbdulR3hman AbdulR3hman Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this no longer needed? or better question; why did we need maxRowsPerCall initially and now we don't?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it seems to be used only within this method; if not needed should we clean up the rest of the class and the constructor? but only after we answer the first question; i.e. why we no longer need max rows per call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we need maxRowsPerCall initially and now we don't?

The initial was to prevent blocks that exceed the max block size. We should not have this to begin with, from engine we don't care about the block size (s3blockSpillReader). I think this was use to protect the block size being too large hence create issue on s3 PutObject. However, with sdkv2 we can use high level Multi-part upload. This is less a concern now and Multi-part upload is coming next

throw new AthenaConnectorException("Call generated more than " + maxRowsPerCall + "rows. Generating " +
"too many rows per call to writeRows(...) can result in blocks that exceed the max size.", ErrorDetails.builder().errorCode(FederationSourceErrorCode.INVALID_INPUT_EXCEPTION.toString()).build());
}
if (rows > 0) {
block.setRowCount(rowCount + rows);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public enum SupportedTypes
VARCHAR(Types.MinorType.VARCHAR),
STRUCT(Types.MinorType.STRUCT),
LIST(Types.MinorType.LIST),
TIMESTAMPNTZ(Types.MinorType.TIMESTAMPMILLI),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: is this going to be true across all connectors? can we test other connectors to make sure before/after is reflecting correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we don't have extractor support for this, it was coming as separate CR. but i agree, let me separate this out

MAP(Types.MinorType.MAP);

private Types.MinorType arrowMinorType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ public AthenaConnectorException(@Nonnull final Object response,
requireNonNull(e);
}

public AthenaConnectorException(@Nonnull final String message,
@Nonnull final Exception e,
@Nonnull final ErrorDetails errorDetails)
{
super(message, e);
this.errorDetails = requireNonNull(errorDetails);
this.response = null;
requireNonNull(message);
requireNonNull(e);
}

public Object getResponse()
{
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,15 @@ public PreparedStatement buildSql(
return prepareStatementWithSql(jdbcConnection, catalog, schema, table, tableSchema, constraints, split, columnNames);
}

protected PreparedStatement prepareStatementWithSql(
final Connection jdbcConnection,
protected String buildSQLStringLiteral(
final String catalog,
final String schema,
final String table,
final Schema tableSchema,
final Constraints constraints,
final Split split,
final String columnNames)
throws SQLException
final String columnNames,
List<TypeAndValue> accumulator)
{
StringBuilder sql = new StringBuilder();
sql.append("SELECT ");
Expand All @@ -139,8 +138,6 @@ protected PreparedStatement prepareStatementWithSql(
}
sql.append(getFromClauseWithSplit(catalog, schema, table, split));

List<TypeAndValue> accumulator = new ArrayList<>();

List<String> clauses = toConjuncts(tableSchema.getFields(), constraints, accumulator, split.getProperties());
clauses.addAll(getPartitionWhereClauses(split));
if (!clauses.isEmpty()) {
Expand All @@ -161,7 +158,23 @@ protected PreparedStatement prepareStatementWithSql(
sql.append(appendLimitOffset(split)); // legacy method to preserve functionality of existing connector impls
}
LOGGER.info("Generated SQL : {}", sql.toString());
PreparedStatement statement = jdbcConnection.prepareStatement(sql.toString());
return sql.toString();
}

protected PreparedStatement prepareStatementWithSql(
final Connection jdbcConnection,
final String catalog,
final String schema,
final String table,
final Schema tableSchema,
final Constraints constraints,
final Split split,
final String columnNames)
throws SQLException
{
List<TypeAndValue> accumulator = new ArrayList<>();
PreparedStatement statement = jdbcConnection.prepareStatement(
this.buildSQLStringLiteral(catalog, schema, table, tableSchema, constraints, split, columnNames, accumulator));
// TODO all types, converts Arrow values to JDBC.
for (int i = 0; i < accumulator.size(); i++) {
TypeAndValue typeAndValue = accumulator.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class SnowflakeCompositeHandler
{
public SnowflakeCompositeHandler() throws CertificateEncodingException, IOException, NoSuchAlgorithmException, KeyStoreException
{
super(new SnowflakeMetadataHandler(new SnowflakeEnvironmentProperties(System.getenv()).createEnvironment()), new SnowflakeRecordHandler(new SnowflakeEnvironmentProperties(System.getenv()).createEnvironment()));
super(new SnowflakeMetadataHandler(new SnowflakeEnvironmentProperties().createEnvironment()), new SnowflakeRecordHandler(new SnowflakeEnvironmentProperties().createEnvironment()));
installCaCertificate();
setupNativeEnvironmentVariables();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package com.amazonaws.athena.connectors.snowflake;

import java.util.Map;

public final class SnowflakeConstants
{
public static final String SNOWFLAKE_NAME = "snowflake";
Expand Down Expand Up @@ -75,5 +77,12 @@ public final class SnowflakeConstants
public static final String PASSWORD = "password";
public static final String USER = "user";

public static final String SNOWFLAKE_ENABLE_S3_EXPORT = "SNOWFLAKE_ENABLE_S3_EXPORT";

private SnowflakeConstants() {}

public static boolean isS3ExportEnabled(Map<String, String> configOptions)
{
return Boolean.parseBoolean(configOptions.getOrDefault(SNOWFLAKE_ENABLE_S3_EXPORT, "false"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ public class SnowflakeEnvironmentProperties extends JdbcEnvironmentProperties
private static final String DB_PROPERTY_KEY = "db";
private static final String SCHEMA_PROPERTY_KEY = "schema";
private static final String SNOWFLAKE_ESCAPE_CHARACTER = "\"";
public static final String ENABLE_S3_EXPORT = "SNOWFLAKE_ENABLE_S3_EXPORT";

private final boolean enableS3Export;

public SnowflakeEnvironmentProperties(Map<String, String> properties)
{
this.enableS3Export = Boolean.parseBoolean(properties.getOrDefault(ENABLE_S3_EXPORT, "false"));
}

@Override
public Map<String, String> connectionPropertiesToEnvironment(Map<String, String> connectionProperties)
Expand Down Expand Up @@ -142,9 +134,4 @@ public static Map<String, String> getSnowFlakeParameter(Map<String, String> base

return parameters;
}

public boolean isS3ExportEnabled()
{
return enableS3Export;
}
}
Loading
Loading