-
Notifications
You must be signed in to change notification settings - Fork 333
Snowflake export update #3124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Snowflake export update #3124
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3124 +/- ##
============================================
+ Coverage 63.67% 68.99% +5.32%
- Complexity 4344 5020 +676
============================================
Files 621 638 +17
Lines 23286 24015 +729
Branches 2859 2965 +106
============================================
+ Hits 14827 16569 +1742
+ Misses 7070 6015 -1055
- Partials 1389 1431 +42 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
-update partition logic for S3Export path -Introduce sql string method in SQL builder, remove duplicate logic in snowflake SQL string builder. -fix query failed when contains predicate -fix query return empty result when multi sources within same query - Use Zero copy for reading export file into our spill file - remove row restriction to a block - fix snowflake where we compute environment vaiable each time we call isS3Enable - add vector converter from Timestamp to datetime milli vector
622ac37 to
f84587f
Compare
| conjuncts.add(toPredicate(column.getName(), valueSet, type, accumulator)); | ||
| } | ||
| } | ||
| if (sql == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; shouldn't this be null or empty?
...ake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeQueryStringBuilder.java
Outdated
Show resolved
Hide resolved
...ake/src/main/java/com/amazonaws/athena/connectors/snowflake/SnowflakeQueryStringBuilder.java
Outdated
Show resolved
Hide resolved
| VARCHAR(Types.MinorType.VARCHAR), | ||
| STRUCT(Types.MinorType.STRUCT), | ||
| LIST(Types.MinorType.LIST), | ||
| TIMESTAMPNTZ(Types.MinorType.TIMESTAMPMILLI), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: is this going to be true across all connectors? can we test other connectors to make sure before/after is reflecting correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we don't have extractor support for this, it was coming as separate CR. but i agree, let me separate this out
| for (Field next : schema.getFields()) { | ||
| vectors.add(next.createVector(rootAllocator)); | ||
| FieldVector vector = next.createVector(rootAllocator); | ||
| vector.allocateNew(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussing this offline; but just for the sake of keeping this PR in the loop; is allocateNew() required now in the newer version? Since we never actually had to do this before? and is the default allocation good enough initially?
| throw (ex instanceof RuntimeException) ? (RuntimeException) ex : new RuntimeException(ex); | ||
| } | ||
|
|
||
| if (rows > maxRowsPerCall) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this no longer needed? or better question; why did we need maxRowsPerCall initially and now we don't?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and it seems to be used only within this method; if not needed should we clean up the rest of the class and the constructor? but only after we answer the first question; i.e. why we no longer need max rows per call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we need maxRowsPerCall initially and now we don't?
The initial was to prevent blocks that exceed the max block size. We should not have this to begin with, from engine we don't care about the block size (s3blockSpillReader). I think this was use to protect the block size being too large hence create issue on s3 PutObject. However, with sdkv2 we can use high level Multi-part upload. This is less a concern now and Multi-part upload is coming next
| * @param scale Decimal scale. | ||
| * @return Arrow type. See {@link ArrowType}. | ||
| */ | ||
| public static Optional<ArrowType> toArrowType(String name, final int jdbcType, final int precision, final int scale, java.util.Map<String, String> configOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems to be more or less the exact copy of athena-jdbc/src/main/java/com/amazonaws/athena/connectors/jdbc/manager/JdbcArrowTypeConverter.java
I know we are trying to handle special snow flake type; can't we just add it in the original JDBC class instead? that way we don't have to maintain two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can try to deduplicate this, but the main idea is we are treating the decimal completely different as Snowflake tread number as decimal(38,x,128) hence doing the slight adjustment here with UTC timestamp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to say that that one line that treats Snowflake's differently; we can just move it to the original file; it should still work no? I can talk to you offline to explain what I mean.
| public static final String SEPARATOR = "/"; | ||
| static final String BLOCK_PARTITION_COLUMN_NAME = "partition"; | ||
| private static final int MAX_SPLITS_PER_REQUEST = 1000_000; | ||
| private static final String STORAGE_INTEGRATION_CONFIG_KEY = "snowflake_storage_integration_name"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should all these be moved to the Constants for the connector?
| * Get Snowflake storage integration name from config | ||
| * @return | ||
| */ | ||
| private String getStorageIntegrationName() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq; what exactly is this used for? *(still going through the PR so it might be answered in this class);
| GetFunctionResponse response = lambdaClient.getFunction(request); | ||
| return response.configuration().role(); | ||
| @VisibleForTesting | ||
| Map<String, String> getStorageIntegrationProperties(Connection connection, String integrationName) throws SQLException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just couple of thoughts on getStorageIntegrationProperties and isSFStorageIntegrationExistAndValid;
I think we can make isSFStorageIntegrationExistAndValid return an optional; that we we don't have to try to get the same environment properties twice calling this in both methods;
Optional.ofNullable(properties.get(STORAGE_INTEGRATION_BUCKET_KEY))
I'd write it something like this;
private String requireProperty(Map<String, String> properties, String key)
{
return Optional.ofNullable(properties.get(key))
.orElseThrow(() -> new IllegalArgumentException(
String.format("Snowflake Storage Integration, field:%s cannot be null", key)));
}
this should check any missing field, and the following call should make the two methods simpler to read:
private String resolveAndValidateS3BucketPath(Connection connection, String integrationName) throws SQLException
{
Map<String, String> properties = getStorageIntegrationProperties(connection, integrationName);
if (properties.isEmpty()) {
throw new IllegalArgumentException(
String.format("Snowflake Storage Integration: name:%s not found", integrationName));
}
String bucketPath = requireProperty(properties, STORAGE_INTEGRATION_BUCKET_KEY);
String provider = requireProperty(properties, STORAGE_INTEGRATION_STORAGE_PROVIDER_KEY);
if (!"S3".equalsIgnoreCase(provider)) {
throw new IllegalArgumentException(
String.format("Snowflake Storage Integration, field:%s must be S3",
STORAGE_INTEGRATION_STORAGE_PROVIDER_KEY));
}
// Single path only
if (bucketPath.split(",").length != 1) {
throw new IllegalArgumentException(
String.format("Snowflake Storage Integration, field:%s must be a single S3 path",
STORAGE_INTEGRATION_BUCKET_KEY));
}
// Validate it's an S3 path
if (!bucketPath.startsWith("s3://")) {
throw new IllegalArgumentException(
String.format("Storage integration bucket path must be an S3 path: %s", bucketPath));
}
// Normalize trailing slash
if (bucketPath.endsWith("/")) {
bucketPath = bucketPath.substring(0, bucketPath.length() - 1);
}
return bucketPath;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
| LOGGER.error("Error checking for integration {}: {}", integrationName, e.getMessage()); | ||
| throw new SQLException("Failed to check for integration existence: " + e.getMessage(), e); | ||
| } | ||
| return constraints.getQueryPassthroughArguments().get(JdbcQueryPassthrough.QUERY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add a check to see if its QPT; and S3 integration enabled; we should throw an error? meaning users shouldn't be able to use S3 integration directly with QPT, given that we need to do good amount of work prior.
| ScanOptions options = new ScanOptions(/*batchSize*/ 32768); | ||
|
|
||
| // do a scan projection, only getting the column we want | ||
| ScanOptions options = new ScanOptions(/*batchSize*/ 32768, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is this 32768 comes from? and can we make it static?
| mapOfNamesAndTypes.put(field.getName(), minorTypeForArrowType); | ||
| mapOfCols.put(field.getName(), null); | ||
| } | ||
| if (s3ObjectKey.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a warning is warranted here; just in case Cx want to look at their logs. however it might be confusing; so maybe a debug?
-update teradata unit test -code clean up -add qpt support -remove unnecessary call
0a616b0 to
b1f5640
Compare
-add support for specify storage integration
-update partition logic for S3Export path
-Introduce sql string method in SQL builder, remove duplicate logic in snowflake SQL string builder.
-fix query failed when contains predicate
-fix query return empty result when multi sources within same query
-fix support for datetime, timestamp and datetimemilli predicate on s3Export path
Description of changes:
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.