From 2a494fdd2271aa48fb13bd0628adc5cc5c4fa003 Mon Sep 17 00:00:00 2001 From: dimitrisstaratzis Date: Fri, 7 Apr 2023 16:08:15 +0300 Subject: [PATCH] Exceed 2GB limit for SQL queries --- build.gradle | 2 +- src/main/java/examples/Examples.java | 25 +++++++++++++++++++ src/main/java/io/tiledb/cloud/TileDBSQL.java | 9 ++++--- .../io/tiledb/cloud/rest_api/ApiClient.java | 2 ++ .../io/tiledb/cloud/rest_api/api/SqlApi.java | 16 +++++++----- 5 files changed, 43 insertions(+), 11 deletions(-) diff --git a/build.gradle b/build.gradle index a119f57..f3ba8ce 100644 --- a/build.gradle +++ b/build.gradle @@ -27,7 +27,7 @@ apply plugin: 'java' apply plugin: 'com.diffplug.spotless' group 'io.tiledb' -version = '0.1.1-SNAPSHOT' +version = '0.2.1-SNAPSHOT' repositories { mavenCentral() diff --git a/src/main/java/examples/Examples.java b/src/main/java/examples/Examples.java index c6ae13f..49460d3 100644 --- a/src/main/java/examples/Examples.java +++ b/src/main/java/examples/Examples.java @@ -2,6 +2,7 @@ // Import classes: import io.tiledb.cloud.TileDBClient; +import io.tiledb.cloud.TileDBSQL; import io.tiledb.cloud.TileDBUDF; import io.tiledb.cloud.rest_api.ApiClient; import io.tiledb.cloud.rest_api.ApiException; @@ -9,6 +10,8 @@ import io.tiledb.cloud.rest_api.api.GroupsApi; import io.tiledb.cloud.rest_api.api.ArrayApi; import io.tiledb.cloud.rest_api.model.*; +import io.tiledb.java.api.Pair; +import org.apache.arrow.vector.ValueVector; import java.math.BigDecimal; import java.util.ArrayList; @@ -37,6 +40,7 @@ public static void main(String[] args) { // Uncomment to run whichever example you want // runGenericUDF(tileDBClient); +// runSQL("SELECT * FROM `tiledb://TileDB-Inc/quickstart_sparse`", tileDBClient); // runArrayUDF(tileDBClient); // runMultiArrayUDF(tileDBClient); // getArraySchema(apiInstance); @@ -48,6 +52,27 @@ public static void main(String[] args) { // deregisterArray(apiInstance); } + /** + * Runs a simple SQL query + * @param s the query + * @param tileDBClient + */ + private static void runSQL(String s, TileDBClient tileDBClient) { + SQLParameters sqlParameters = new SQLParameters(); + sqlParameters.setQuery(s); + // get results in arrow format + sqlParameters.setResultFormat(ResultFormat.ARROW); + + //set timeout to unlimited + tileDBClient.setReadTimeout(0); + + // create TileDBSQL object + TileDBSQL tileDBSQL = new TileDBSQL(tileDBClient, "TileDB-Inc", sqlParameters); + + // run query and expect results in arrow format + Pair, Integer> valueVectors = tileDBSQL.execArrow(); + } + /** * Runs a generic UDF * @param tileDBClient diff --git a/src/main/java/io/tiledb/cloud/TileDBSQL.java b/src/main/java/io/tiledb/cloud/TileDBSQL.java index 7793e4d..0929d58 100644 --- a/src/main/java/io/tiledb/cloud/TileDBSQL.java +++ b/src/main/java/io/tiledb/cloud/TileDBSQL.java @@ -13,6 +13,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.*; import org.apache.arrow.compression.CommonsCompressionFactory; @@ -54,16 +55,16 @@ public TileDBSQL(TileDBClient tileDBClient, String namespace, SQLParameters sql) * @return A pair that consists of an ArrayList of all valueVectors and the * number of batches read. */ - public io.tiledb.java.api.Pair, Integer> execArrow(){ + public io.tiledb.java.api.Pair, Integer> execArrow() { try { assert sql.getResultFormat() != null; - byte[] bytes = apiInstance.runSQLBytes(namespace, sql, "none"); + InputStream in = apiInstance.runSQLBytes(namespace, sql, "none"); + ArrayList valueVectors = null; int readBatchesCount = 0; -// RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); RootAllocator allocator = new RootAllocator(RootAllocator.configBuilder().allocationManagerFactory(UnsafeAllocationManager.FACTORY).build()); - ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), allocator, CommonsCompressionFactory.INSTANCE); + ArrowStreamReader reader = new ArrowStreamReader(in, allocator, CommonsCompressionFactory.INSTANCE); VectorSchemaRoot root = reader.getVectorSchemaRoot(); diff --git a/src/main/java/io/tiledb/cloud/rest_api/ApiClient.java b/src/main/java/io/tiledb/cloud/rest_api/ApiClient.java index 1bbcfc2..e816f3b 100644 --- a/src/main/java/io/tiledb/cloud/rest_api/ApiClient.java +++ b/src/main/java/io/tiledb/cloud/rest_api/ApiClient.java @@ -916,6 +916,8 @@ public T deserialize(Response response, Type returnType) throws ApiException } else if (returnType.equals(File.class)) { // Handle file downloading. return (T) downloadFileFromResponse(response); + } else if ("class java.io.InputStream".equals(returnType.toString())) { + return (T) response.body().byteStream(); } String respBody; diff --git a/src/main/java/io/tiledb/cloud/rest_api/api/SqlApi.java b/src/main/java/io/tiledb/cloud/rest_api/api/SqlApi.java index 3a65900..e343844 100644 --- a/src/main/java/io/tiledb/cloud/rest_api/api/SqlApi.java +++ b/src/main/java/io/tiledb/cloud/rest_api/api/SqlApi.java @@ -25,7 +25,11 @@ import io.tiledb.cloud.rest_api.model.SQLParameters; +import java.io.ByteArrayInputStream; +import java.io.FileInputStream; +import java.io.InputStream; import java.lang.reflect.Type; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -180,7 +184,7 @@ public List runSQL(String namespace, SQLParameters sql, String acceptEnc * @param namespace namespace to run task under is in (an organization name or user's username) (required) * @param sql sql being submitted (required) * @param acceptEncoding Encoding to use (optional) - * @return byte[] + * @return Input stream of bytes * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body * @http.response.details @@ -190,8 +194,8 @@ public List runSQL(String namespace, SQLParameters sql, String acceptEnc
0 error response -
*/ - public byte[] runSQLBytes(String namespace, SQLParameters sql, String acceptEncoding) throws ApiException { - ApiResponse localVarResp = runSQLWithHttpInfoBytes(namespace, sql, acceptEncoding); + public InputStream runSQLBytes(String namespace, SQLParameters sql, String acceptEncoding) throws ApiException { + ApiResponse localVarResp = runSQLWithHttpInfoBytes(namespace, sql, acceptEncoding); return localVarResp.getData(); } @@ -223,7 +227,7 @@ public ApiResponse> runSQLWithHttpInfo(String namespace, SQLParamet * @param namespace namespace to run task under is in (an organization name or user's username) (required) * @param sql sql being submitted (required) * @param acceptEncoding Encoding to use (optional) - * @return ApiResponse with byte[] + * @return ApiResponse with an InputStream of bytes * @throws ApiException If fail to call the API, e.g. server error or cannot deserialize the response body * @http.response.details @@ -233,9 +237,9 @@ public ApiResponse> runSQLWithHttpInfo(String namespace, SQLParamet
0 error response -
*/ - public ApiResponse runSQLWithHttpInfoBytes(String namespace, SQLParameters sql, String acceptEncoding) throws ApiException { + public ApiResponse runSQLWithHttpInfoBytes(String namespace, SQLParameters sql, String acceptEncoding) throws ApiException { okhttp3.Call localVarCall = runSQLValidateBeforeCall(namespace, sql, acceptEncoding, null); - Type localVarReturnType = new TypeToken(){}.getType(); + Type localVarReturnType = new TypeToken(){}.getType(); return localVarApiClient.execute(localVarCall, localVarReturnType); }